Happy new year
[pazpar2-moved-to-github.git] / src / sel_thread.c
index c7bfacc..7d1d6e2 100644 (file)
@@ -1,7 +1,5 @@
-/* $Id: sel_thread.c,v 1.4 2007-04-23 08:06:21 adam Exp $
-   Copyright (c) 2006-2007, Index Data.
-
-This file is part of Pazpar2.
+/* This file is part of Pazpar2.
+   Copyright (C) Index Data
 
 Pazpar2 is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free
@@ -14,21 +12,28 @@ FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 for more details.
 
 You should have received a copy of the GNU General Public License
-along with Pazpar2; see the file LICENSE.  If not, write to the
-Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
-02111-1307, USA.
- */
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+
+*/
 
 #if HAVE_CONFIG_H
-#include "cconfig.h"
+#include <config.h>
 #endif
 
 #include "sel_thread.h"
 #include <yaz/log.h>
 #include <yaz/nmem.h>
+#if HAVE_UNISTD_H
 #include <unistd.h>
+#endif
+#ifdef WIN32
+#include <winsock2.h>
+#endif
 #include <stdlib.h>
-#include <pthread.h>
+#include <yaz/thread_create.h>
+#include <yaz/mutex.h>
+#include <yaz/spipe.h>
 #include <assert.h>
 
 struct work_item {
@@ -57,11 +62,13 @@ static void queue_trav(struct work_item *q, void (*f)(void *data))
 }
 
 struct sel_thread {
-    int fd[2];
+    int write_fd;
+    int read_fd;
+    yaz_spipe_t spipe;
     NMEM nmem;
-    pthread_t *thread_id;
-    pthread_mutex_t mutex;
-    pthread_cond_t input_data;
+    yaz_thread_t *thread_id;
+    YAZ_MUTEX mutex;
+    YAZ_COND input_data;
     int stop_flag;
     int no_threads;
     struct work_item *input_queue;
@@ -71,17 +78,19 @@ struct sel_thread {
     void (*work_destroy)(void *work_data);
 };
 
+static int input_queue_length = 0;
+
 static void *sel_thread_handler(void *vp)
 {
     sel_thread_t p = (sel_thread_t) vp;
 
-    while(1)
+    while (1)
     {
         struct work_item *work_this = 0;
         /* wait for some work */
-        pthread_mutex_lock(&p->mutex);
+        yaz_mutex_enter(p->mutex);
         while (!p->stop_flag && !p->input_queue)
-            pthread_cond_wait(&p->input_data, &p->mutex);
+            yaz_cond_wait(p->input_data, p->mutex, 0);
         /* see if we were waken up because we're shutting down */
         if (p->stop_flag)
             break;
@@ -89,23 +98,31 @@ static void *sel_thread_handler(void *vp)
 
         assert(p->input_queue);
         work_this = queue_remove_last(&p->input_queue);
+        input_queue_length--;
+#if 0
+        yaz_log(YLOG_DEBUG, "input queue length after pop: %d", input_queue_length);
+#endif
         assert(work_this);
 
-        pthread_mutex_unlock(&p->mutex);
+        yaz_mutex_leave(p->mutex);
 
         /* work on this item */
         p->work_handler(work_this->data);
-        
+
         /* put it back into output queue */
-        pthread_mutex_lock(&p->mutex);
+        yaz_mutex_enter(p->mutex);
         work_this->next = p->output_queue;
         p->output_queue = work_this;
-        pthread_mutex_unlock(&p->mutex);
+        yaz_mutex_leave(p->mutex);
 
         /* wake up select/poll with a single byte */
-        write(p->fd[1], "", 1);
-    }        
-    pthread_mutex_unlock(&p->mutex);
+#ifdef WIN32
+        (void) send(p->write_fd, "", 1, 0);
+#else
+        (void) write(p->write_fd, "", 1);
+#endif
+    }
+    yaz_mutex_leave(p->mutex);
     return 0;
 }
 
@@ -123,39 +140,55 @@ sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
     assert(no_of_threads >= 1);
 
     p->nmem = nmem;
-    if (pipe(p->fd))
+
+#ifdef WIN32
+    /* use port 12119 temporarily on Windos and hope for the best */
+    p->spipe = yaz_spipe_create(12119, 0);
+#else
+    p->spipe = yaz_spipe_create(0, 0);
+#endif
+    if (!p->spipe)
     {
         nmem_destroy(nmem);
         return 0;
     }
-    *read_fd = p->fd[0];
+
+    *read_fd = p->read_fd = yaz_spipe_get_read_fd(p->spipe);
+    p->write_fd = yaz_spipe_get_write_fd(p->spipe);
+
     p->input_queue = 0;
     p->output_queue = 0;
     p->free_queue = 0;
     p->work_handler = work_handler;
     p->work_destroy = work_destroy;
-
+    p->no_threads = 0; /* we if need to destroy */
     p->stop_flag = 0;
-    p->no_threads = no_of_threads;
-    pthread_mutex_init(&p->mutex, 0);
-    pthread_cond_init(&p->input_data, 0);
+    p->mutex = 0;
+    yaz_mutex_create(&p->mutex);
+    yaz_cond_create(&p->input_data);
+    if (p->input_data == 0) /* condition variable could not be created? */
+    {
+        sel_thread_destroy(p);
+        return 0;
+    }
 
+    p->no_threads = no_of_threads;
     p->thread_id = nmem_malloc(nmem, sizeof(*p->thread_id) * p->no_threads);
     for (i = 0; i < p->no_threads; i++)
-        pthread_create (p->thread_id + i, 0, sel_thread_handler, p);
+        p->thread_id[i] = yaz_thread_create(sel_thread_handler, p);
     return p;
 }
 
 void sel_thread_destroy(sel_thread_t p)
 {
     int i;
-    pthread_mutex_lock(&p->mutex);
+    yaz_mutex_enter(p->mutex);
     p->stop_flag = 1;
-    pthread_cond_broadcast(&p->input_data);
-    pthread_mutex_unlock(&p->mutex);
-    
+    yaz_cond_broadcast(p->input_data);
+    yaz_mutex_leave(p->mutex);
+
     for (i = 0; i< p->no_threads; i++)
-        pthread_join(p->thread_id[i], 0);
+        yaz_thread_join(&p->thread_id[i], 0);
 
     if (p->work_destroy)
     {
@@ -163,10 +196,9 @@ void sel_thread_destroy(sel_thread_t p)
         queue_trav(p->output_queue, p->work_destroy);
     }
 
-    close(p->fd[0]);
-    close(p->fd[1]);
-    pthread_cond_destroy(&p->input_data);
-    pthread_mutex_destroy(&p->mutex);
+    yaz_spipe_destroy(p->spipe);
+    yaz_cond_destroy(&p->input_data);
+    yaz_mutex_destroy(&p->mutex);
     nmem_destroy(p->nmem);
 }
 
@@ -174,7 +206,7 @@ void sel_thread_add(sel_thread_t p, void *data)
 {
     struct work_item *work_p;
 
-    pthread_mutex_lock(&p->mutex);
+    yaz_mutex_enter(p->mutex);
 
     if (p->free_queue)
     {
@@ -187,9 +219,12 @@ void sel_thread_add(sel_thread_t p, void *data)
     work_p->data = data;
     work_p->next = p->input_queue;
     p->input_queue = work_p;
-
-    pthread_cond_signal(&p->input_data);
-    pthread_mutex_unlock(&p->mutex);
+    input_queue_length++;
+#if 0
+    yaz_log(YLOG_DEBUG, "sel_thread_add: Input queue length after push: %d", input_queue_length);
+#endif
+    yaz_cond_signal(p->input_data);
+    yaz_mutex_leave(p->mutex);
 }
 
 void *sel_thread_result(sel_thread_t p)
@@ -198,7 +233,7 @@ void *sel_thread_result(sel_thread_t p)
     void *data = 0;
     char read_buf[1];
 
-    pthread_mutex_lock(&p->mutex);
+    yaz_mutex_enter(p->mutex);
 
     /* got something. Take the last one out of output_queue */
     work_this = queue_remove_last(&p->output_queue);
@@ -207,18 +242,24 @@ void *sel_thread_result(sel_thread_t p)
         /* put freed item in free list */
         work_this->next = p->free_queue;
         p->free_queue = work_this;
-        
+
         data = work_this->data;
-        read(p->fd[0], read_buf, 1);
+#ifdef WIN32
+        (void) recv(p->read_fd, read_buf, 1, 0);
+#else
+        (void) read(p->read_fd, read_buf, 1);
+#endif
     }
-    pthread_mutex_unlock(&p->mutex);
+    yaz_mutex_leave(p->mutex);
     return data;
 }
 
 /*
  * Local variables:
  * c-basic-offset: 4
+ * c-file-style: "Stroustrup"
  * indent-tabs-mode: nil
  * End:
  * vim: shiftwidth=4 tabstop=8 expandtab
  */
+