X-Git-Url: http://git.indexdata.com/?p=pazpar2-moved-to-github.git;a=blobdiff_plain;f=src%2Fsel_thread.c;h=7d1d6e20ee8263ca841269b074bdd7b2c934b58e;hp=10b6d165f6cc75aac029f42123b5269f54b3e007;hb=HEAD;hpb=8280939283c0669c7f5dddcfef027f565a795f16 diff --git a/src/sel_thread.c b/src/sel_thread.c index 10b6d16..7d1d6e2 100644 --- a/src/sel_thread.c +++ b/src/sel_thread.c @@ -1,5 +1,5 @@ /* This file is part of Pazpar2. - Copyright (C) 2006-2010 Index Data + Copyright (C) Index Data Pazpar2 is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free @@ -24,9 +24,16 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #include "sel_thread.h" #include #include +#if HAVE_UNISTD_H #include +#endif +#ifdef WIN32 +#include +#endif #include -#include +#include +#include +#include #include struct work_item { @@ -55,11 +62,13 @@ static void queue_trav(struct work_item *q, void (*f)(void *data)) } struct sel_thread { - int fd[2]; + int write_fd; + int read_fd; + yaz_spipe_t spipe; NMEM nmem; - pthread_t *thread_id; - pthread_mutex_t mutex; - pthread_cond_t input_data; + yaz_thread_t *thread_id; + YAZ_MUTEX mutex; + YAZ_COND input_data; int stop_flag; int no_threads; struct work_item *input_queue; @@ -69,6 +78,8 @@ struct sel_thread { void (*work_destroy)(void *work_data); }; +static int input_queue_length = 0; + static void *sel_thread_handler(void *vp) { sel_thread_t p = (sel_thread_t) vp; @@ -77,9 +88,9 @@ static void *sel_thread_handler(void *vp) { struct work_item *work_this = 0; /* wait for some work */ - pthread_mutex_lock(&p->mutex); + yaz_mutex_enter(p->mutex); while (!p->stop_flag && !p->input_queue) - pthread_cond_wait(&p->input_data, &p->mutex); + yaz_cond_wait(p->input_data, p->mutex, 0); /* see if we were waken up because we're shutting down */ if (p->stop_flag) break; @@ -87,23 +98,31 @@ static void *sel_thread_handler(void *vp) assert(p->input_queue); work_this = queue_remove_last(&p->input_queue); + input_queue_length--; +#if 0 + yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length); +#endif assert(work_this); - pthread_mutex_unlock(&p->mutex); + yaz_mutex_leave(p->mutex); /* work on this item */ p->work_handler(work_this->data); - + /* put it back into output queue */ - pthread_mutex_lock(&p->mutex); + yaz_mutex_enter(p->mutex); work_this->next = p->output_queue; p->output_queue = work_this; - pthread_mutex_unlock(&p->mutex); + yaz_mutex_leave(p->mutex); /* wake up select/poll with a single byte */ - (void) write(p->fd[1], "", 1); - } - pthread_mutex_unlock(&p->mutex); +#ifdef WIN32 + (void) send(p->write_fd, "", 1, 0); +#else + (void) write(p->write_fd, "", 1); +#endif + } + yaz_mutex_leave(p->mutex); return 0; } @@ -121,39 +140,55 @@ sel_thread_t sel_thread_create(void (*work_handler)(void *work_data), assert(no_of_threads >= 1); p->nmem = nmem; - if (pipe(p->fd)) + +#ifdef WIN32 + /* use port 12119 temporarily on Windos and hope for the best */ + p->spipe = yaz_spipe_create(12119, 0); +#else + p->spipe = yaz_spipe_create(0, 0); +#endif + if (!p->spipe) { nmem_destroy(nmem); return 0; } - *read_fd = p->fd[0]; + + *read_fd = p->read_fd = yaz_spipe_get_read_fd(p->spipe); + p->write_fd = yaz_spipe_get_write_fd(p->spipe); + p->input_queue = 0; p->output_queue = 0; p->free_queue = 0; p->work_handler = work_handler; p->work_destroy = work_destroy; - + p->no_threads = 0; /* we if need to destroy */ p->stop_flag = 0; - p->no_threads = no_of_threads; - pthread_mutex_init(&p->mutex, 0); - pthread_cond_init(&p->input_data, 0); + p->mutex = 0; + yaz_mutex_create(&p->mutex); + yaz_cond_create(&p->input_data); + if (p->input_data == 0) /* condition variable could not be created? */ + { + sel_thread_destroy(p); + return 0; + } + p->no_threads = no_of_threads; p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads); for (i = 0; i < p->no_threads; i++) - pthread_create(p->thread_id + i, 0, sel_thread_handler, p); + p->thread_id[i] = yaz_thread_create(sel_thread_handler, p); return p; } void sel_thread_destroy(sel_thread_t p) { int i; - pthread_mutex_lock(&p->mutex); + yaz_mutex_enter(p->mutex); p->stop_flag = 1; - pthread_cond_broadcast(&p->input_data); - pthread_mutex_unlock(&p->mutex); - + yaz_cond_broadcast(p->input_data); + yaz_mutex_leave(p->mutex); + for (i = 0; i< p->no_threads; i++) - pthread_join(p->thread_id[i], 0); + yaz_thread_join(&p->thread_id[i], 0); if (p->work_destroy) { @@ -161,10 +196,9 @@ void sel_thread_destroy(sel_thread_t p) queue_trav(p->output_queue, p->work_destroy); } - close(p->fd[0]); - close(p->fd[1]); - pthread_cond_destroy(&p->input_data); - pthread_mutex_destroy(&p->mutex); + yaz_spipe_destroy(p->spipe); + yaz_cond_destroy(&p->input_data); + yaz_mutex_destroy(&p->mutex); nmem_destroy(p->nmem); } @@ -172,7 +206,7 @@ void sel_thread_add(sel_thread_t p, void *data) { struct work_item *work_p; - pthread_mutex_lock(&p->mutex); + yaz_mutex_enter(p->mutex); if (p->free_queue) { @@ -185,9 +219,12 @@ void sel_thread_add(sel_thread_t p, void *data) work_p->data = data; work_p->next = p->input_queue; p->input_queue = work_p; - - pthread_cond_signal(&p->input_data); - pthread_mutex_unlock(&p->mutex); + input_queue_length++; +#if 0 + yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length); +#endif + yaz_cond_signal(p->input_data); + yaz_mutex_leave(p->mutex); } void *sel_thread_result(sel_thread_t p) @@ -196,7 +233,7 @@ void *sel_thread_result(sel_thread_t p) void *data = 0; char read_buf[1]; - pthread_mutex_lock(&p->mutex); + yaz_mutex_enter(p->mutex); /* got something. Take the last one out of output_queue */ work_this = queue_remove_last(&p->output_queue); @@ -205,11 +242,15 @@ void *sel_thread_result(sel_thread_t p) /* put freed item in free list */ work_this->next = p->free_queue; p->free_queue = work_this; - + data = work_this->data; - (void) read(p->fd[0], read_buf, 1); +#ifdef WIN32 + (void) recv(p->read_fd, read_buf, 1, 0); +#else + (void) read(p->read_fd, read_buf, 1); +#endif } - pthread_mutex_unlock(&p->mutex); + yaz_mutex_leave(p->mutex); return data; }