X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Fsel_thread.c;h=7d1d6e20ee8263ca841269b074bdd7b2c934b58e;hb=817e3ec506c4095bc4fcc1923cee36153ef4ee43;hp=1e13d24ec3164fdddcc764972e58971078bd59f9;hpb=5775b27098fab0dac8ddcbc719a8b35c67391424;p=pazpar2-moved-to-github.git diff --git a/src/sel_thread.c b/src/sel_thread.c index 1e13d24..7d1d6e2 100644 --- a/src/sel_thread.c +++ b/src/sel_thread.c @@ -1,5 +1,5 @@ /* This file is part of Pazpar2. - Copyright (C) 2006-2010 Index Data + Copyright (C) Index Data Pazpar2 is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free @@ -24,10 +24,16 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #include "sel_thread.h" #include #include +#if HAVE_UNISTD_H #include +#endif +#ifdef WIN32 +#include +#endif #include #include #include +#include #include struct work_item { @@ -56,7 +62,9 @@ static void queue_trav(struct work_item *q, void (*f)(void *data)) } struct sel_thread { - int fd[2]; + int write_fd; + int read_fd; + yaz_spipe_t spipe; NMEM nmem; yaz_thread_t *thread_id; YAZ_MUTEX mutex; @@ -91,14 +99,16 @@ static void *sel_thread_handler(void *vp) assert(p->input_queue); work_this = queue_remove_last(&p->input_queue); input_queue_length--; +#if 0 yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length); +#endif assert(work_this); yaz_mutex_leave(p->mutex); /* work on this item */ p->work_handler(work_this->data); - + /* put it back into output queue */ yaz_mutex_enter(p->mutex); work_this->next = p->output_queue; @@ -106,8 +116,12 @@ static void *sel_thread_handler(void *vp) yaz_mutex_leave(p->mutex); /* wake up select/poll with a single byte */ - (void) write(p->fd[1], "", 1); - } +#ifdef WIN32 + (void) send(p->write_fd, "", 1, 0); +#else + (void) write(p->write_fd, "", 1); +#endif + } yaz_mutex_leave(p->mutex); return 0; } @@ -126,12 +140,22 @@ sel_thread_t sel_thread_create(void (*work_handler)(void *work_data), assert(no_of_threads >= 1); p->nmem = nmem; - if (pipe(p->fd)) + +#ifdef WIN32 + /* use port 12119 temporarily on Windos and hope for the best */ + p->spipe = yaz_spipe_create(12119, 0); +#else + p->spipe = yaz_spipe_create(0, 0); +#endif + if (!p->spipe) { nmem_destroy(nmem); return 0; } - *read_fd = p->fd[0]; + + *read_fd = p->read_fd = yaz_spipe_get_read_fd(p->spipe); + p->write_fd = yaz_spipe_get_write_fd(p->spipe); + p->input_queue = 0; p->output_queue = 0; p->free_queue = 0; @@ -162,7 +186,7 @@ void sel_thread_destroy(sel_thread_t p) p->stop_flag = 1; yaz_cond_broadcast(p->input_data); yaz_mutex_leave(p->mutex); - + for (i = 0; i< p->no_threads; i++) yaz_thread_join(&p->thread_id[i], 0); @@ -172,8 +196,7 @@ void sel_thread_destroy(sel_thread_t p) queue_trav(p->output_queue, p->work_destroy); } - close(p->fd[0]); - close(p->fd[1]); + yaz_spipe_destroy(p->spipe); yaz_cond_destroy(&p->input_data); yaz_mutex_destroy(&p->mutex); nmem_destroy(p->nmem); @@ -197,7 +220,9 @@ void sel_thread_add(sel_thread_t p, void *data) work_p->next = p->input_queue; p->input_queue = work_p; input_queue_length++; +#if 0 yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length); +#endif yaz_cond_signal(p->input_data); yaz_mutex_leave(p->mutex); } @@ -217,9 +242,13 @@ void *sel_thread_result(sel_thread_t p) /* put freed item in free list */ work_this->next = p->free_queue; p->free_queue = work_this; - + data = work_this->data; - (void) read(p->fd[0], read_buf, 1); +#ifdef WIN32 + (void) recv(p->read_fd, read_buf, 1, 0); +#else + (void) read(p->read_fd, read_buf, 1); +#endif } yaz_mutex_leave(p->mutex); return data;