Use threading utilities
authorAdam Dickmeiss <adam@indexdata.dk>
Wed, 19 May 2010 13:21:42 +0000 (15:21 +0200)
committerAdam Dickmeiss <adam@indexdata.dk>
Wed, 19 May 2010 13:21:42 +0000 (15:21 +0200)
src/sel_thread.c

index 0677761..1e13d24 100644 (file)
@@ -26,7 +26,8 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 #include <yaz/nmem.h>
 #include <unistd.h>
 #include <stdlib.h>
-#include <pthread.h>
+#include <yaz/thread_create.h>
+#include <yaz/mutex.h>
 #include <assert.h>
 
 struct work_item {
@@ -57,9 +58,9 @@ static void queue_trav(struct work_item *q, void (*f)(void *data))
 struct sel_thread {
     int fd[2];
     NMEM nmem;
-    pthread_t *thread_id;
-    pthread_mutex_t mutex;
-    pthread_cond_t input_data;
+    yaz_thread_t *thread_id;
+    YAZ_MUTEX mutex;
+    YAZ_COND input_data;
     int stop_flag;
     int no_threads;
     struct work_item *input_queue;
@@ -79,9 +80,9 @@ static void *sel_thread_handler(void *vp)
     {
         struct work_item *work_this = 0;
         /* wait for some work */
-        pthread_mutex_lock(&p->mutex);
+        yaz_mutex_enter(p->mutex);
         while (!p->stop_flag && !p->input_queue)
-            pthread_cond_wait(&p->input_data, &p->mutex);
+            yaz_cond_wait(p->input_data, p->mutex, 0);
         /* see if we were waken up because we're shutting down */
         if (p->stop_flag)
             break;
@@ -93,21 +94,21 @@ static void *sel_thread_handler(void *vp)
         yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length);
         assert(work_this);
 
-        pthread_mutex_unlock(&p->mutex);
+        yaz_mutex_leave(p->mutex);
 
         /* work on this item */
         p->work_handler(work_this->data);
         
         /* put it back into output queue */
-        pthread_mutex_lock(&p->mutex);
+        yaz_mutex_enter(p->mutex);
         work_this->next = p->output_queue;
         p->output_queue = work_this;
-        pthread_mutex_unlock(&p->mutex);
+        yaz_mutex_leave(p->mutex);
 
         /* wake up select/poll with a single byte */
         (void) write(p->fd[1], "", 1);
     }        
-    pthread_mutex_unlock(&p->mutex);
+    yaz_mutex_leave(p->mutex);
     return 0;
 }
 
@@ -136,28 +137,34 @@ sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
     p->free_queue = 0;
     p->work_handler = work_handler;
     p->work_destroy = work_destroy;
-
+    p->no_threads = 0; /* we if need to destroy */
     p->stop_flag = 0;
-    p->no_threads = no_of_threads;
-    pthread_mutex_init(&p->mutex, 0);
-    pthread_cond_init(&p->input_data, 0);
+    p->mutex = 0;
+    yaz_mutex_create(&p->mutex);
+    yaz_cond_create(&p->input_data);
+    if (p->input_data == 0) /* condition variable could not be created? */
+    {
+        sel_thread_destroy(p);
+        return 0;
+    }
 
+    p->no_threads = no_of_threads;
     p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
     for (i = 0; i < p->no_threads; i++)
-        pthread_create(p->thread_id + i, 0, sel_thread_handler, p);
+        p->thread_id[i] = yaz_thread_create(sel_thread_handler, p);
     return p;
 }
 
 void sel_thread_destroy(sel_thread_t p)
 {
     int i;
-    pthread_mutex_lock(&p->mutex);
+    yaz_mutex_enter(p->mutex);
     p->stop_flag = 1;
-    pthread_cond_broadcast(&p->input_data);
-    pthread_mutex_unlock(&p->mutex);
+    yaz_cond_broadcast(p->input_data);
+    yaz_mutex_leave(p->mutex);
     
     for (i = 0; i< p->no_threads; i++)
-        pthread_join(p->thread_id[i], 0);
+        yaz_thread_join(&p->thread_id[i], 0);
 
     if (p->work_destroy)
     {
@@ -167,8 +174,8 @@ void sel_thread_destroy(sel_thread_t p)
 
     close(p->fd[0]);
     close(p->fd[1]);
-    pthread_cond_destroy(&p->input_data);
-    pthread_mutex_destroy(&p->mutex);
+    yaz_cond_destroy(&p->input_data);
+    yaz_mutex_destroy(&p->mutex);
     nmem_destroy(p->nmem);
 }
 
@@ -176,7 +183,7 @@ void sel_thread_add(sel_thread_t p, void *data)
 {
     struct work_item *work_p;
 
-    pthread_mutex_lock(&p->mutex);
+    yaz_mutex_enter(p->mutex);
 
     if (p->free_queue)
     {
@@ -191,8 +198,8 @@ void sel_thread_add(sel_thread_t p, void *data)
     p->input_queue = work_p;
     input_queue_length++;
     yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length);
-    pthread_cond_signal(&p->input_data);
-    pthread_mutex_unlock(&p->mutex);
+    yaz_cond_signal(p->input_data);
+    yaz_mutex_leave(p->mutex);
 }
 
 void *sel_thread_result(sel_thread_t p)
@@ -201,7 +208,7 @@ void *sel_thread_result(sel_thread_t p)
     void *data = 0;
     char read_buf[1];
 
-    pthread_mutex_lock(&p->mutex);
+    yaz_mutex_enter(p->mutex);
 
     /* got something. Take the last one out of output_queue */
     work_this = queue_remove_last(&p->output_queue);
@@ -214,7 +221,7 @@ void *sel_thread_result(sel_thread_t p)
         data = work_this->data;
         (void) read(p->fd[0], read_buf, 1);
     }
-    pthread_mutex_unlock(&p->mutex);
+    yaz_mutex_leave(p->mutex);
     return data;
 }