Select thread system now passes a simple test using IOCHANSs.
authorAdam Dickmeiss <adam@indexdata.dk>
Fri, 20 Apr 2007 11:44:58 +0000 (11:44 +0000)
committerAdam Dickmeiss <adam@indexdata.dk>
Fri, 20 Apr 2007 11:44:58 +0000 (11:44 +0000)
src/sel_thread.c
src/test_sel_thread.c

index 46ad30c..f086fa6 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: sel_thread.c,v 1.2 2007-04-20 10:15:19 adam Exp $
+/* $Id: sel_thread.c,v 1.3 2007-04-20 11:44:58 adam Exp $
    Copyright (c) 2006-2007, Index Data.
 
 This file is part of Pazpar2.
@@ -29,12 +29,27 @@ Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
 #include <unistd.h>
 #include <stdlib.h>
 #include <pthread.h>
+#include <assert.h>
 
 struct work_item {
     void *data;
     struct work_item *next;
 };
 
+static struct work_item *queue_remove_last(struct work_item **q)
+{
+    struct work_item **work_p = q, *work_this = 0;
+
+    while (*work_p && (*work_p)->next)
+        work_p = &(*work_p)->next;
+    if (*work_p)
+    {
+        work_this = *work_p;
+        *work_p = 0;
+    }
+    return work_this;
+}
+
 struct sel_thread {
     int fd[2];
     NMEM nmem;
@@ -54,7 +69,7 @@ static void *sel_thread_handler(void *vp)
 
     while(1)
     {
-        struct work_item **work_p, *work_this;
+        struct work_item *work_this = 0;
         /* wait for some work */
         pthread_mutex_lock(&p->mutex);
         while (!p->stop_flag && !p->input_queue)
@@ -63,16 +78,16 @@ static void *sel_thread_handler(void *vp)
         if (p->stop_flag)
             break;
         /* got something. Take the last one out of input_queue */
-        work_p = &p->input_queue;
-        while ((*work_p)->next)
-            work_p = &(*work_p)->next;
-        work_this = *work_p;
-        *work_p = 0;
+
+        assert(p->input_queue);
+        work_this = queue_remove_last(&p->input_queue);
+        assert(work_this);
+
         pthread_mutex_unlock(&p->mutex);
 
         /* work on this item */
         p->work_handler(work_this->data);
-
+        
         /* put it back into output queue */
         pthread_mutex_lock(&p->mutex);
         work_this->next = p->output_queue;
@@ -114,7 +129,6 @@ sel_thread_t sel_thread_create(void (*work_handler)(void *work_data),
 void sel_thread_destroy(sel_thread_t p)
 {
     pthread_mutex_lock(&p->mutex);
-
     p->stop_flag = 1;
     pthread_cond_broadcast(&p->input_data);
     pthread_mutex_unlock(&p->mutex);
@@ -133,41 +147,43 @@ void sel_thread_add(sel_thread_t p, void *data)
     struct work_item *work_p;
 
     pthread_mutex_lock(&p->mutex);
-    work_p = p->free_queue;
-    if (!work_p)
+
+    if (p->free_queue)
+    {
+        work_p = p->free_queue;
+        p->free_queue = p->free_queue->next;
+    }
+    else
         work_p = nmem_malloc(p->nmem, sizeof(*work_p));
 
     work_p->data = data;
     work_p->next = p->input_queue;
     p->input_queue = work_p;
+
+    pthread_cond_signal(&p->input_data);
     pthread_mutex_unlock(&p->mutex);
 }
 
 void *sel_thread_result(sel_thread_t p)
 {
-    struct work_item **work_p, *work_this;
-    void *data;
+    struct work_item *work_this = 0;
+    void *data = 0;
     char read_buf[1];
 
-    /* got something. Take the last one out of output_queue */
-    work_p = &p->output_queue;
-    if (!*work_p)
-        return 0;
-
-    read(p->fd[0], read_buf, 1);
-
-    while ((*work_p)->next)
-        work_p = &(*work_p)->next;
-    work_this = *work_p;
-    *work_p = 0;
-
-    /* put freed item in free list */
-    work_this->next = p->free_queue;
-    p->free_queue = work_this;
+    pthread_mutex_lock(&p->mutex);
 
-    data = work_this->data;
+    /* got something. Take the last one out of output_queue */
+    work_this = queue_remove_last(&p->output_queue);
+    if (work_this)
+    {
+        /* put freed item in free list */
+        work_this->next = p->free_queue;
+        p->free_queue = work_this;
+        
+        data = work_this->data;
+        read(p->fd[0], read_buf, 1);
+    }
     pthread_mutex_unlock(&p->mutex);
-
     return data;
 }
 
index 6f77bf7..d68f15e 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: test_sel_thread.c,v 1.1 2007-04-20 10:06:52 adam Exp $
+/* $Id: test_sel_thread.c,v 1.2 2007-04-20 11:44:58 adam Exp $
    Copyright (c) 2006-2007, Index Data.
 
 This file is part of Pazpar2.
@@ -24,33 +24,100 @@ Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
 #endif
 
 #include "sel_thread.h"
+#include "eventl.h"
 #include <yaz/test.h>
+#include <yaz/xmalloc.h>
 
+/** \brief stuff we work on in separate thread */
 struct my_work_data {
     int x;
+    int y;
 };
 
+/** \brief work to be carried out in separate thrad */
 static void work_handler(void *vp)
 {
     struct my_work_data *p = vp;
-    p->x += 2;
+    p->y = p->x * 2;
 }
 
+/** \brief see if we can create and destroy without problems */
 static void test_1(void)
 {
     int fd;
     sel_thread_t p = sel_thread_create(work_handler, &fd);
     YAZ_CHECK(p);
+    if (!p)
+        return;
 
     sel_thread_destroy(p);
 }
 
+
+void iochan_handler(struct iochan *i, int event)
+{
+    static int number = 0;
+    sel_thread_t p = iochan_getdata(i);
+
+    if (event & EVENT_INPUT)
+    {
+        struct my_work_data *work;
+
+        work = sel_thread_result(p);
+
+        YAZ_CHECK(work);
+        if (work)
+        {
+            YAZ_CHECK_EQ(work->x * 2, work->y);
+            /* stop work after a couple of iterations */
+            if (work->x > 10)
+                iochan_destroy(i);
+
+            xfree(work);
+        }
+
+    }
+    if (event & EVENT_TIMEOUT)
+    {
+        struct my_work_data *work;
+
+        work = xmalloc(sizeof(*work));
+        work->x = number;
+        sel_thread_add(p, work);
+
+        work = xmalloc(sizeof(*work));
+        work->x = number+1;
+        sel_thread_add(p, work);
+
+        number += 10;
+    }
+}
+
+/** brief use the fd for something */
+static void test_2(void)
+{
+    int thread_fd;
+    sel_thread_t p = sel_thread_create(work_handler, &thread_fd);
+    YAZ_CHECK(p);
+    if (p)
+    {
+        IOCHAN chan = iochan_create(thread_fd, iochan_handler,
+                                    EVENT_INPUT|EVENT_TIMEOUT);
+        iochan_settimeout(chan, 1);
+        iochan_setdata(chan, p);
+
+        event_loop(&chan);
+    }
+    sel_thread_destroy(p);
+}
+
 int main(int argc, char **argv)
 {
     YAZ_CHECK_INIT(argc, argv); 
     YAZ_CHECK_LOG(); 
 
     test_1();
+    test_2();
 
     YAZ_CHECK_TERM;
 }