From 197410f262777b4955b23be723afe80d5f63cf77 Mon Sep 17 00:00:00 2001 From: Adam Dickmeiss Date: Mon, 23 Apr 2007 08:06:21 +0000 Subject: [PATCH] Allow any number of worker threads for sel_thread. Added optional parameter work_destroy handler for sel_thread_create. This will only be called sel_thread_destroy, in the case of non-empty work queues. Three resolver threads now running - seems to make resolving many.xml slightly faster. --- src/getaddrinfo.c | 6 ++++-- src/sel_thread.c | 40 ++++++++++++++++++++++++++++++++++------ src/sel_thread.h | 7 +++++-- src/test_sel_thread.c | 19 ++++++++++++++----- 4 files changed, 57 insertions(+), 15 deletions(-) diff --git a/src/getaddrinfo.c b/src/getaddrinfo.c index 4595cc4..8f0c7d9 100644 --- a/src/getaddrinfo.c +++ b/src/getaddrinfo.c @@ -1,4 +1,4 @@ -/* $Id: getaddrinfo.c,v 1.3 2007-04-23 07:34:48 adam Exp $ +/* $Id: getaddrinfo.c,v 1.4 2007-04-23 08:06:21 adam Exp $ Copyright (c) 2006-2007, Index Data. This file is part of Pazpar2. @@ -122,7 +122,9 @@ static sel_thread_t resolver_thread = 0; static void getaddrinfo_start(void) { int fd; - sel_thread_t p = resolver_thread = sel_thread_create(work_handler, &fd); + sel_thread_t p = resolver_thread = + sel_thread_create(work_handler, 0 /* work_destroy */, &fd, + 3 /* no of resolver threads */); if (!p) { yaz_log(YLOG_FATAL|YLOG_ERRNO, "sel_create_create failed"); diff --git a/src/sel_thread.c b/src/sel_thread.c index f086fa6..c7bfacc 100644 --- a/src/sel_thread.c +++ b/src/sel_thread.c @@ -1,4 +1,4 @@ -/* $Id: sel_thread.c,v 1.3 2007-04-20 11:44:58 adam Exp $ +/* $Id: sel_thread.c,v 1.4 2007-04-23 08:06:21 adam Exp $ Copyright (c) 2006-2007, Index Data. This file is part of Pazpar2. @@ -50,17 +50,25 @@ static struct work_item *queue_remove_last(struct work_item **q) return work_this; } +static void queue_trav(struct work_item *q, void (*f)(void *data)) +{ + for (; q; q = q->next) + f(q->data); +} + struct sel_thread { int fd[2]; NMEM nmem; - pthread_t thread_id; + pthread_t *thread_id; pthread_mutex_t mutex; pthread_cond_t input_data; int stop_flag; + int no_threads; struct work_item *input_queue; struct work_item *output_queue; struct work_item *free_queue; - void (*work_handler)(void *work_data);; + void (*work_handler)(void *work_data); + void (*work_destroy)(void *work_data); }; static void *sel_thread_handler(void *vp) @@ -102,11 +110,18 @@ static void *sel_thread_handler(void *vp) } sel_thread_t sel_thread_create(void (*work_handler)(void *work_data), - int *read_fd) + void (*work_destroy)(void *work_data), + int *read_fd, int no_of_threads) { + int i; NMEM nmem = nmem_create(); sel_thread_t p = nmem_malloc(nmem, sizeof(*p)); + assert(work_handler); + /* work_destroy may be NULL */ + assert(read_fd); + assert(no_of_threads >= 1); + p->nmem = nmem; if (pipe(p->fd)) { @@ -118,22 +133,35 @@ sel_thread_t sel_thread_create(void (*work_handler)(void *work_data), p->output_queue = 0; p->free_queue = 0; p->work_handler = work_handler; + p->work_destroy = work_destroy; p->stop_flag = 0; + p->no_threads = no_of_threads; pthread_mutex_init(&p->mutex, 0); pthread_cond_init(&p->input_data, 0); - pthread_create (&p->thread_id, 0, sel_thread_handler, p); + + p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads); + for (i = 0; i < p->no_threads; i++) + pthread_create (p->thread_id + i, 0, sel_thread_handler, p); return p; } void sel_thread_destroy(sel_thread_t p) { + int i; pthread_mutex_lock(&p->mutex); p->stop_flag = 1; pthread_cond_broadcast(&p->input_data); pthread_mutex_unlock(&p->mutex); - pthread_join(p->thread_id, 0); + for (i = 0; i< p->no_threads; i++) + pthread_join(p->thread_id[i], 0); + + if (p->work_destroy) + { + queue_trav(p->input_queue, p->work_destroy); + queue_trav(p->output_queue, p->work_destroy); + } close(p->fd[0]); close(p->fd[1]); diff --git a/src/sel_thread.h b/src/sel_thread.h index dc7036b..ab0a47f 100644 --- a/src/sel_thread.h +++ b/src/sel_thread.h @@ -1,4 +1,4 @@ -/* $Id: sel_thread.h,v 1.1 2007-04-20 10:06:52 adam Exp $ +/* $Id: sel_thread.h,v 1.2 2007-04-23 08:06:21 adam Exp $ Copyright (c) 2006-2007, Index Data. This file is part of Pazpar2. @@ -30,7 +30,9 @@ typedef struct sel_thread *sel_thread_t; /** \brief creates select thread \param work_handler handler that does work in worker thread + \param work_destroy optional destroy handler for work (0 = no handler) \param read_fd pointer to readable socket upon completion + \param no_of_threads number of worker threads \returns select thread handler Creates a worker thread. The worker thread will signal "completed" @@ -39,7 +41,8 @@ typedef struct sel_thread *sel_thread_t; call sel_thread_result accordingly. */ sel_thread_t sel_thread_create(void (*work_handler)(void *work_data), - int *read_fd); + void (*work_destroy)(void *work_data), + int *read_fd, int no_of_threads); /** \brief destorys select thread \param p select thread handler diff --git a/src/test_sel_thread.c b/src/test_sel_thread.c index 5f98390..11b6552 100644 --- a/src/test_sel_thread.c +++ b/src/test_sel_thread.c @@ -1,4 +1,4 @@ -/* $Id: test_sel_thread.c,v 1.4 2007-04-23 07:29:34 adam Exp $ +/* $Id: test_sel_thread.c,v 1.5 2007-04-23 08:06:21 adam Exp $ Copyright (c) 2006-2007, Index Data. This file is part of Pazpar2. @@ -41,11 +41,18 @@ static void work_handler(void *vp) p->y = p->x * 2; } +/** \brief how work is destructed */ +static void work_destroy(void *vp) +{ + struct my_work_data *p = vp; + xfree(p); +} + /** \brief see if we can create and destroy without problems */ static void test_create_destroy(void) { int fd; - sel_thread_t p = sel_thread_create(work_handler, &fd); + sel_thread_t p = sel_thread_create(work_handler, 0, &fd, 1); YAZ_CHECK(p); if (!p) return; @@ -94,10 +101,11 @@ void iochan_handler(struct iochan *i, int event) } /** brief use the fd for something */ -static void test_for_real_work(void) +static void test_for_real_work(int no_threads) { int thread_fd; - sel_thread_t p = sel_thread_create(work_handler, &thread_fd); + sel_thread_t p = sel_thread_create(work_handler, work_destroy, + &thread_fd, no_threads); YAZ_CHECK(p); if (p) { @@ -117,7 +125,8 @@ int main(int argc, char **argv) YAZ_CHECK_LOG(); test_create_destroy(); - test_for_real_work(); + test_for_real_work(1); + test_for_real_work(3); YAZ_CHECK_TERM; } -- 1.7.10.4