Allow any number of worker threads for sel_thread. Added optional parameter
authorAdam Dickmeiss <adam@indexdata.dk>
Mon, 23 Apr 2007 08:06:21 +0000 (08:06 +0000)
committerAdam Dickmeiss <adam@indexdata.dk>
Mon, 23 Apr 2007 08:06:21 +0000 (08:06 +0000)
work_destroy handler for sel_thread_create. This will only be called
sel_thread_destroy, in the case of non-empty work queues. Three resolver
threads now running - seems to make resolving many.xml slightly faster.

src/getaddrinfo.c
src/sel_thread.c
src/sel_thread.h
src/test_sel_thread.c

index 4595cc4..8f0c7d9 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: getaddrinfo.c,v 1.3 2007-04-23 07:34:48 adam Exp $
+/* $Id: getaddrinfo.c,v 1.4 2007-04-23 08:06:21 adam Exp $
    Copyright (c) 2006-2007, Index Data.
 
 This file is part of Pazpar2.
@@ -122,7 +122,9 @@ static sel_thread_t resolver_thread = 0;
 static void getaddrinfo_start(void)
 {
     int fd;
-    sel_thread_t p = resolver_thread = sel_thread_create(work_handler, &fd);
+    sel_thread_t p = resolver_thread = 
+        sel_thread_create(work_handler, 0 /* work_destroy */, &fd,
+                          3 /* no of resolver threads */);
     if (!p)
     {
         yaz_log(YLOG_FATAL|YLOG_ERRNO, "sel_create_create failed");
index f086fa6..c7bfacc 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: sel_thread.c,v 1.3 2007-04-20 11:44:58 adam Exp $
+/* $Id: sel_thread.c,v 1.4 2007-04-23 08:06:21 adam Exp $
    Copyright (c) 2006-2007, Index Data.
 
 This file is part of Pazpar2.
@@ -50,17 +50,25 @@ static struct work_item *queue_remove_last(struct work_item **q)
     return work_this;
 }
 
+static void queue_trav(struct work_item *q, void (*f)(void *data))
+{
+    for (; q; q = q->next)
+        f(q->data);
+}
+
 struct sel_thread {
     int fd[2];
     NMEM nmem;
-    pthread_t thread_id;
+    pthread_t *thread_id;
     pthread_mutex_t mutex;
     pthread_cond_t input_data;
     int stop_flag;
+    int no_threads;
     struct work_item *input_queue;
     struct work_item *output_queue;
     struct work_item *free_queue;
-    void (*work_handler)(void *work_data);;
+    void (*work_handler)(void *work_data);
+    void (*work_destroy)(void *work_data);
 };
 
 static void *sel_thread_handler(void *vp)
@@ -102,11 +110,18 @@ static void *sel_thread_handler(void *vp)
 }
 
 sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
-                               int *read_fd)
+                               void (*work_destroy)(void *work_data),
+                               int *read_fd, int no_of_threads)
 {
+    int i;
     NMEM nmem = nmem_create();
     sel_thread_t p = nmem_malloc(nmem, sizeof(*p));
 
+    assert(work_handler);
+    /* work_destroy may be NULL */
+    assert(read_fd);
+    assert(no_of_threads >= 1);
+
     p->nmem = nmem;
     if (pipe(p->fd))
     {
@@ -118,22 +133,35 @@ sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
     p->output_queue = 0;
     p->free_queue = 0;
     p->work_handler = work_handler;
+    p->work_destroy = work_destroy;
 
     p->stop_flag = 0;
+    p->no_threads = no_of_threads;
     pthread_mutex_init(&p->mutex, 0);
     pthread_cond_init(&p->input_data, 0);
-    pthread_create (&p->thread_id, 0, sel_thread_handler, p);
+
+    p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
+    for (i = 0; i < p->no_threads; i++)
+        pthread_create (p->thread_id + i, 0, sel_thread_handler, p);
     return p;
 }
 
 void sel_thread_destroy(sel_thread_t p)
 {
+    int i;
     pthread_mutex_lock(&p->mutex);
     p->stop_flag = 1;
     pthread_cond_broadcast(&p->input_data);
     pthread_mutex_unlock(&p->mutex);
     
-    pthread_join(p->thread_id, 0);
+    for (i = 0; i< p->no_threads; i++)
+        pthread_join(p->thread_id[i], 0);
+
+    if (p->work_destroy)
+    {
+        queue_trav(p->input_queue, p->work_destroy);
+        queue_trav(p->output_queue, p->work_destroy);
+    }
 
     close(p->fd[0]);
     close(p->fd[1]);
index dc7036b..ab0a47f 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: sel_thread.h,v 1.1 2007-04-20 10:06:52 adam Exp $
+/* $Id: sel_thread.h,v 1.2 2007-04-23 08:06:21 adam Exp $
    Copyright (c) 2006-2007, Index Data.
 
 This file is part of Pazpar2.
@@ -30,7 +30,9 @@ typedef struct sel_thread *sel_thread_t;
 
 /** \brief creates select thread 
     \param work_handler handler that does work in worker thread
+    \param work_destroy optional destroy handler for work (0 = no handler)
     \param read_fd pointer to readable socket upon completion
+    \param no_of_threads number of worker threads
     \returns select thread handler
 
     Creates a worker thread. The worker thread will signal "completed"
@@ -39,7 +41,8 @@ typedef struct sel_thread *sel_thread_t;
     call sel_thread_result accordingly.
 */
 sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
-                               int *read_fd);
+                               void (*work_destroy)(void *work_data),
+                               int *read_fd, int no_of_threads);
 
 /** \brief destorys select thread 
     \param p select thread handler
index 5f98390..11b6552 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: test_sel_thread.c,v 1.4 2007-04-23 07:29:34 adam Exp $
+/* $Id: test_sel_thread.c,v 1.5 2007-04-23 08:06:21 adam Exp $
    Copyright (c) 2006-2007, Index Data.
 
 This file is part of Pazpar2.
@@ -41,11 +41,18 @@ static void work_handler(void *vp)
     p->y = p->x * 2;
 }
 
+/** \brief how work is destructed */
+static void work_destroy(void *vp)
+{
+    struct my_work_data *p = vp;
+    xfree(p);
+}
+
 /** \brief see if we can create and destroy without problems */
 static void test_create_destroy(void)
 {
     int fd;
-    sel_thread_t p = sel_thread_create(work_handler, &fd);
+    sel_thread_t p = sel_thread_create(work_handler, 0, &fd, 1);
     YAZ_CHECK(p);
     if (!p)
         return;
@@ -94,10 +101,11 @@ void iochan_handler(struct iochan *i, int event)
 }
 
 /** brief use the fd for something */
-static void test_for_real_work(void)
+static void test_for_real_work(int no_threads)
 {
     int thread_fd;
-    sel_thread_t p = sel_thread_create(work_handler, &thread_fd);
+    sel_thread_t p = sel_thread_create(work_handler, work_destroy, 
+                                       &thread_fd, no_threads);
     YAZ_CHECK(p);
     if (p)
     {
@@ -117,7 +125,8 @@ int main(int argc, char **argv)
     YAZ_CHECK_LOG(); 
 
     test_create_destroy();
-    test_for_real_work();
+    test_for_real_work(1);
+    test_for_real_work(3);
 
     YAZ_CHECK_TERM;
 }