X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Fsel_thread.c;h=6e0706864ede1d6ca3cbde362247fb962fa65bb5;hb=b6c78d80b41b2944279f6c62f777a0d1e44164db;hp=7bd62dc268926b57ee03621d7eb819a1c84f51fd;hpb=df1262e4bb33b78f429baec77e2c68e4d6c9d592;p=pazpar2-moved-to-github.git diff --git a/src/sel_thread.c b/src/sel_thread.c index 7bd62dc..6e07068 100644 --- a/src/sel_thread.c +++ b/src/sel_thread.c @@ -1,7 +1,5 @@ -/* $Id: sel_thread.c,v 1.1 2007-04-20 10:06:52 adam Exp $ - Copyright (c) 2006-2007, Index Data. - -This file is part of Pazpar2. +/* This file is part of Pazpar2. + Copyright (C) 2006-2010 Index Data Pazpar2 is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free @@ -14,13 +12,13 @@ FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License -along with Pazpar2; see the file LICENSE. If not, write to the -Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA -02111-1307, USA. - */ +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +*/ #if HAVE_CONFIG_H -#include "cconfig.h" +#include #endif #include "sel_thread.h" @@ -29,23 +27,46 @@ Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA #include #include #include +#include struct work_item { void *data; struct work_item *next; }; +static struct work_item *queue_remove_last(struct work_item **q) +{ + struct work_item **work_p = q, *work_this = 0; + + while (*work_p && (*work_p)->next) + work_p = &(*work_p)->next; + if (*work_p) + { + work_this = *work_p; + *work_p = 0; + } + return work_this; +} + +static void queue_trav(struct work_item *q, void (*f)(void *data)) +{ + for (; q; q = q->next) + f(q->data); +} + struct sel_thread { int fd[2]; NMEM nmem; - pthread_t thread_id; + pthread_t *thread_id; pthread_mutex_t mutex; pthread_cond_t input_data; int stop_flag; + int no_threads; struct work_item *input_queue; struct work_item *output_queue; struct work_item *free_queue; - void (*work_handler)(void *work_data);; + void (*work_handler)(void *work_data); + void (*work_destroy)(void *work_data); }; static void *sel_thread_handler(void *vp) @@ -54,7 +75,7 @@ static void *sel_thread_handler(void *vp) while(1) { - struct work_item **work_p, *work_this; + struct work_item *work_this = 0; /* wait for some work */ pthread_mutex_lock(&p->mutex); while (!p->stop_flag && !p->input_queue) @@ -63,16 +84,16 @@ static void *sel_thread_handler(void *vp) if (p->stop_flag) break; /* got something. Take the last one out of input_queue */ - work_p = &p->input_queue; - while ((*work_p)->next) - work_p = &(*work_p)->next; - work_this = *work_p; - *work_p = 0; + + assert(p->input_queue); + work_this = queue_remove_last(&p->input_queue); + assert(work_this); + pthread_mutex_unlock(&p->mutex); /* work on this item */ p->work_handler(work_this->data); - + /* put it back into output queue */ pthread_mutex_lock(&p->mutex); work_this->next = p->output_queue; @@ -80,18 +101,25 @@ static void *sel_thread_handler(void *vp) pthread_mutex_unlock(&p->mutex); /* wake up select/poll with a single byte */ - write(p->fd[1], "", 1); + (void) write(p->fd[1], "", 1); } pthread_mutex_unlock(&p->mutex); return 0; } sel_thread_t sel_thread_create(void (*work_handler)(void *work_data), - int *read_fd) + void (*work_destroy)(void *work_data), + int *read_fd, int no_of_threads) { + int i; NMEM nmem = nmem_create(); sel_thread_t p = nmem_malloc(nmem, sizeof(*p)); + assert(work_handler); + /* work_destroy may be NULL */ + assert(read_fd); + assert(no_of_threads >= 1); + p->nmem = nmem; if (pipe(p->fd)) { @@ -103,23 +131,35 @@ sel_thread_t sel_thread_create(void (*work_handler)(void *work_data), p->output_queue = 0; p->free_queue = 0; p->work_handler = work_handler; + p->work_destroy = work_destroy; p->stop_flag = 0; + p->no_threads = no_of_threads; pthread_mutex_init(&p->mutex, 0); pthread_cond_init(&p->input_data, 0); - pthread_create (&p->thread_id, 0, sel_thread_handler, p); + + p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads); + for (i = 0; i < p->no_threads; i++) + pthread_create (p->thread_id + i, 0, sel_thread_handler, p); return p; } void sel_thread_destroy(sel_thread_t p) { + int i; pthread_mutex_lock(&p->mutex); - p->stop_flag = 1; pthread_cond_broadcast(&p->input_data); pthread_mutex_unlock(&p->mutex); - pthread_join(p->thread_id, 0); + for (i = 0; i< p->no_threads; i++) + pthread_join(p->thread_id[i], 0); + + if (p->work_destroy) + { + queue_trav(p->input_queue, p->work_destroy); + queue_trav(p->output_queue, p->work_destroy); + } close(p->fd[0]); close(p->fd[1]); @@ -133,44 +173,52 @@ void sel_thread_add(sel_thread_t p, void *data) struct work_item *work_p; pthread_mutex_lock(&p->mutex); - work_p = p->free_queue; - if (!work_p) + + if (p->free_queue) + { + work_p = p->free_queue; + p->free_queue = p->free_queue->next; + } + else work_p = nmem_malloc(p->nmem, sizeof(*work_p)); work_p->data = data; work_p->next = p->input_queue; p->input_queue = work_p; + + pthread_cond_signal(&p->input_data); pthread_mutex_unlock(&p->mutex); } void *sel_thread_result(sel_thread_t p) { - struct work_item **work_p, *work_this; - void *data; + struct work_item *work_this = 0; + void *data = 0; + char read_buf[1]; - /* got something. Take the last one out of output_queue */ - work_p = &p->output_queue; - if (!*work_p) - return 0; - while ((*work_p)->next) - work_p = &(*work_p)->next; - work_this = *work_p; - *work_p = 0; - - /* put freed item in free list */ - work_this->next = p->free_queue; - p->free_queue = work_this; + pthread_mutex_lock(&p->mutex); - data = work_this->data; + /* got something. Take the last one out of output_queue */ + work_this = queue_remove_last(&p->output_queue); + if (work_this) + { + /* put freed item in free list */ + work_this->next = p->free_queue; + p->free_queue = work_this; + + data = work_this->data; + (void) read(p->fd[0], read_buf, 1); + } pthread_mutex_unlock(&p->mutex); - return data; } /* * Local variables: * c-basic-offset: 4 + * c-file-style: "Stroustrup" * indent-tabs-mode: nil * End: * vim: shiftwidth=4 tabstop=8 expandtab */ +