/* This file is part of Pazpar2.
- Copyright (C) 2006-2010 Index Data
+ Copyright (C) Index Data
Pazpar2 is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free
#include "sel_thread.h"
#include <yaz/log.h>
#include <yaz/nmem.h>
+#if HAVE_UNISTD_H
#include <unistd.h>
+#endif
+#ifdef WIN32
+#include <winsock2.h>
+#endif
#include <stdlib.h>
#include <yaz/thread_create.h>
#include <yaz/mutex.h>
+#include <yaz/spipe.h>
#include <assert.h>
struct work_item {
}
struct sel_thread {
- int fd[2];
+ int write_fd;
+ int read_fd;
+ yaz_spipe_t spipe;
NMEM nmem;
yaz_thread_t *thread_id;
YAZ_MUTEX mutex;
assert(p->input_queue);
work_this = queue_remove_last(&p->input_queue);
input_queue_length--;
+#if 0
yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length);
+#endif
assert(work_this);
yaz_mutex_leave(p->mutex);
/* work on this item */
p->work_handler(work_this->data);
-
+
/* put it back into output queue */
yaz_mutex_enter(p->mutex);
work_this->next = p->output_queue;
yaz_mutex_leave(p->mutex);
/* wake up select/poll with a single byte */
- (void) write(p->fd[1], "", 1);
- }
+#ifdef WIN32
+ (void) send(p->write_fd, "", 1, 0);
+#else
+ (void) write(p->write_fd, "", 1);
+#endif
+ }
yaz_mutex_leave(p->mutex);
return 0;
}
assert(no_of_threads >= 1);
p->nmem = nmem;
- if (pipe(p->fd))
+
+#ifdef WIN32
+ /* use port 12119 temporarily on Windos and hope for the best */
+ p->spipe = yaz_spipe_create(12119, 0);
+#else
+ p->spipe = yaz_spipe_create(0, 0);
+#endif
+ if (!p->spipe)
{
nmem_destroy(nmem);
return 0;
}
- *read_fd = p->fd[0];
+
+ *read_fd = p->read_fd = yaz_spipe_get_read_fd(p->spipe);
+ p->write_fd = yaz_spipe_get_write_fd(p->spipe);
+
p->input_queue = 0;
p->output_queue = 0;
p->free_queue = 0;
p->stop_flag = 1;
yaz_cond_broadcast(p->input_data);
yaz_mutex_leave(p->mutex);
-
+
for (i = 0; i< p->no_threads; i++)
yaz_thread_join(&p->thread_id[i], 0);
queue_trav(p->output_queue, p->work_destroy);
}
- close(p->fd[0]);
- close(p->fd[1]);
+ yaz_spipe_destroy(p->spipe);
yaz_cond_destroy(&p->input_data);
yaz_mutex_destroy(&p->mutex);
nmem_destroy(p->nmem);
work_p->next = p->input_queue;
p->input_queue = work_p;
input_queue_length++;
+#if 0
yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length);
+#endif
yaz_cond_signal(p->input_data);
yaz_mutex_leave(p->mutex);
}
/* put freed item in free list */
work_this->next = p->free_queue;
p->free_queue = work_this;
-
+
data = work_this->data;
- (void) read(p->fd[0], read_buf, 1);
+#ifdef WIN32
+ (void) recv(p->read_fd, read_buf, 1, 0);
+#else
+ (void) read(p->read_fd, read_buf, 1);
+#endif
}
yaz_mutex_leave(p->mutex);
return data;