Thread pool for server nanohttp
authorAdam Dickmeiss <adam@indexdata.dk>
Tue, 5 Jan 2010 21:14:25 +0000 (22:14 +0100)
committerAdam Dickmeiss <adam@indexdata.dk>
Tue, 5 Jan 2010 21:14:25 +0000 (22:14 +0100)
include/yaz/Makefile.am
include/yaz/sock_man.h
include/yaz/srv.h [new file with mode: 0644]
include/yaz/tpool.h [new file with mode: 0644]
src/Makefile.am
src/nanohttp.c [deleted file]
src/sock_man.c
src/srv.c [new file with mode: 0644]
src/tpool.c [new file with mode: 0644]
test/Makefile.am
test/tst_srv.c [new file with mode: 0644]

index cf41239..bf110c9 100644 (file)
@@ -19,7 +19,8 @@ pkginclude_HEADERS= backend.h ccl.h ccl_xml.h cql.h rpn2cql.h comstack.h \
  z-grs.h z-mterm2.h z-opac.h z-rrf1.h z-rrf2.h z-sum.h z-sutrs.h z-uifr1.h \
  z-univ.h z-oclcui.h zes-expi.h zes-exps.h zes-order.h zes-pquery.h \
  zes-psched.h zes-admin.h zes-pset.h zes-update.h zes-update0.h \
- zoom.h z-charneg.h charneg.h soap.h srw.h zgdu.h matchstr.h
+ zoom.h z-charneg.h charneg.h soap.h srw.h zgdu.h matchstr.h \
+ sock_man.h srv.h
 
 EXTRA_DIST = yaz-version.h.in
 
index 0efc9b1..9748491 100644 (file)
@@ -47,11 +47,14 @@ YAZ_EXPORT
 void yaz_sock_man_destroy(yaz_sock_man_t man);
 
 YAZ_EXPORT
-yaz_sock_chan_t yaz_sock_chan_new(yaz_sock_man_t srv, int fd, void *data,
+yaz_sock_chan_t yaz_sock_man_wait(yaz_sock_man_t man);
+
+YAZ_EXPORT
+yaz_sock_chan_t yaz_sock_chan_new(yaz_sock_man_t man, int fd, void *data,
                                   unsigned mask);
 
 YAZ_EXPORT
-void yaz_sock_chan_destroy(yaz_sock_man_t srv, yaz_sock_chan_t p);
+void yaz_sock_chan_destroy(yaz_sock_chan_t p);
 
 YAZ_EXPORT
 void yaz_sock_chan_set_mask(yaz_sock_chan_t chan, unsigned mask);
diff --git a/include/yaz/srv.h b/include/yaz/srv.h
new file mode 100644 (file)
index 0000000..b588172
--- /dev/null
@@ -0,0 +1,94 @@
+/* This file is part of the YAZ toolkit.
+ * Copyright (C) 1995-2009 Index Data.
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *     * Neither the name of Index Data nor the names of its contributors
+ *       may be used to endorse or promote products derived from this
+ *       software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * \file
+ * \brief socket manager
+ */
+#ifndef YAZ_SRV_H
+#define YAZ_SRV_H
+
+#include <stddef.h>
+#include <yaz/poll.h>
+#include <yaz/zgdu.h>
+
+YAZ_BEGIN_CDECL
+
+typedef struct yaz_srv_s *yaz_srv_t;
+typedef struct yaz_pkg_s *yaz_pkg_t;
+
+struct cs_session;
+
+typedef void (yaz_srv_gdu_handler_t)(yaz_pkg_t pkg, void *user);
+typedef void *(yaz_srv_session_handler_t)(struct cs_session *cs);
+
+YAZ_EXPORT
+yaz_srv_t yaz_srv_create(const char **listeners_str);
+
+YAZ_EXPORT
+void yaz_srv_destroy(yaz_srv_t p);
+
+YAZ_EXPORT
+void yaz_srv_run(yaz_srv_t p, yaz_srv_session_handler_t *session_handler,
+                 yaz_srv_gdu_handler_t *gdu_handler);
+
+YAZ_EXPORT
+void yaz_pkg_destroy(yaz_pkg_t pkg);
+
+YAZ_EXPORT
+Z_GDU **yaz_pkg_get_gdu(yaz_pkg_t pkg);
+
+YAZ_EXPORT
+ODR yaz_pkg_get_odr(yaz_pkg_t pkg);
+
+YAZ_EXPORT
+void yaz_pkg_stop_server(yaz_pkg_t pkg);
+
+YAZ_EXPORT
+void yaz_pkg_close(yaz_pkg_t pkg);
+
+YAZ_EXPORT
+yaz_pkg_t yaz_pkg_create(yaz_pkg_t request_pkg);
+
+YAZ_EXPORT
+Z_GDU *zget_wrap_APDU(ODR o, Z_APDU *apdu);
+
+YAZ_EXPORT
+void yaz_pkg_send(yaz_pkg_t pkg);
+
+YAZ_END_CDECL
+
+#endif
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * c-file-style: "Stroustrup"
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+
diff --git a/include/yaz/tpool.h b/include/yaz/tpool.h
new file mode 100644 (file)
index 0000000..9290527
--- /dev/null
@@ -0,0 +1,64 @@
+/* This file is part of the YAZ toolkit.
+ * Copyright (C) 1995-2009 Index Data.
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *     * Neither the name of Index Data nor the names of its contributors
+ *       may be used to endorse or promote products derived from this
+ *       software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * \file
+ * \brief socket manager
+ */
+#ifndef YAZ_TPOOL_H
+#define YAZ_TPOOL_H
+
+#include <stddef.h>
+
+
+YAZ_BEGIN_CDECL
+
+typedef struct yaz_tpool_s *yaz_tpool_t;
+
+YAZ_EXPORT
+void yaz_tpool_add(yaz_tpool_t p, void *data);
+
+YAZ_EXPORT
+void yaz_tpool_destroy(yaz_tpool_t p);
+
+YAZ_EXPORT
+yaz_tpool_t yaz_tpool_create(void (*work_handler)(void *work_data),
+                             void (*work_destroy)(void *work_data),
+                             size_t no_threads);
+
+YAZ_END_CDECL
+
+#endif
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * c-file-style: "Stroustrup"
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+
index 96ad379..d70059a 100644 (file)
@@ -103,7 +103,7 @@ libyaz_la_SOURCES=version.c options.c log.c \
   copy_types.c match_glob.c poll.c daemon.c \
   iconv_encode_marc8.c iconv_encode_iso_8859_1.c iconv_encode_wchar.c \
   iconv_decode_marc8.c iconv_decode_iso5426.c iconv_decode_danmarc.c sc.c \
-  sock_man.c nanohttp.c
+  sock_man.c srv.c tpool.c
 
 libyaz_la_LDFLAGS=-version-info $(YAZ_VERSION_INFO)
 
diff --git a/src/nanohttp.c b/src/nanohttp.c
deleted file mode 100644 (file)
index 259ffb4..0000000
+++ /dev/null
@@ -1,143 +0,0 @@
-/* This file is part of the YAZ toolkit.
- * Copyright (C) 1995-2009 Index Data
- * See the file LICENSE for details.
- */
-/**
- * \file 
- * \brief Small HTTP server
- */
-
-#include <yaz/zgdu.h>
-#include <yaz/comstack.h>
-#include <yaz/nmem.h>
-#include <yaz/log.h>
-#include <yaz/sock_man.h>
-#include <assert.h>
-
-typedef struct yaz_nano_srv_s *yaz_nano_srv_t;
-typedef struct yaz_nano_pkg_s *yaz_nano_pkg_t;
-
-struct yaz_nano_pkg_s {
-    void *handle;
-    int listener_id;
-    ODR encode_odr;
-    Z_GDU *request_gdu;
-    Z_GDU *response_gdu;
-};
-
-struct yaz_nano_srv_s {
-    COMSTACK *cs_listeners;
-    size_t num_listeners;
-    NMEM nmem;
-    yaz_sock_man_t sock_man;
-};
-
-void yaz_nano_srv_destroy(yaz_nano_srv_t p)
-{
-    if (p)
-    {
-        size_t i;
-        for (i = 0; i < p->num_listeners; i++)
-            if (p->cs_listeners[i])
-                cs_close(p->cs_listeners[i]);
-        yaz_sock_man_destroy(p->sock_man);
-        nmem_destroy(p->nmem);
-    }
-}
-
-yaz_nano_srv_t yaz_nano_srv_create(const char **listeners_str)
-{
-    NMEM nmem = nmem_create();
-    yaz_nano_srv_t p = nmem_malloc(nmem, sizeof(*p));
-    size_t i;
-    for (i = 0; listeners_str[i]; i++)
-        ;
-    p->nmem = nmem;
-    p->num_listeners = i;
-    p->cs_listeners = 
-        nmem_malloc(nmem, p->num_listeners * sizeof(*p->cs_listeners));
-    for (i = 0; i < p->num_listeners; i++)
-    {
-        void *ap;
-        const char *where = listeners_str[i];
-        COMSTACK l = cs_create_host(where, 2, &ap);
-        p->cs_listeners[i] = 0; /* not OK (yet) */
-        if (!l)
-        {
-            yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_create_host(%s) failed", where);
-        }
-        else
-        {
-            if (cs_bind(l, ap, CS_SERVER) < 0)
-            {
-                if (cs_errno(l) == CSYSERR)
-                    yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to bind to %s", where);
-                else
-                    yaz_log(YLOG_FATAL, "Failed to bind to %s: %s", where,
-                            cs_strerror(l));
-                cs_close(l);
-            }
-            else
-                p->cs_listeners[i] = l; /* success */
-        }
-    }
-    p->sock_man = yaz_sock_man_new();
-
-    /* check if all are OK */
-    for (i = 0; i < p->num_listeners; i++)
-        if (!p->cs_listeners[i])
-        {
-            yaz_nano_srv_destroy(p);
-            return 0;
-        }
-
-    for (i = 0; i < p->num_listeners; i++)
-    {
-        yaz_sock_chan_t chan;
-
-        chan = yaz_sock_chan_new(p->sock_man, cs_fileno(p->cs_listeners[i]),
-                                 p->cs_listeners + i,
-                                 yaz_poll_read | yaz_poll_except);
-    }    
-    return p;
-}
-
-Z_GDU *yaz_nano_pkg_req(yaz_nano_pkg_t pkg)
-{
-    return pkg->request_gdu;
-}
-
-Z_GDU *yaz_nano_pkg_response(yaz_nano_pkg_t pkg)
-{
-    return pkg->response_gdu;
-}
-
-ODR yaz_nano_pkg_encode(yaz_nano_pkg_t pkg)
-{
-    return pkg->encode_odr;
-}
-
-int yaz_nano_pkg_listener_id(yaz_nano_pkg_t pkg)
-{
-    return pkg->listener_id;
-}
-
-yaz_nano_pkg_t yaz_nano_srv_get_pkg(yaz_nano_srv_t p)
-{
-    return 0;
-}
-
-void yaz_nano_srv_put_pkg(yaz_nano_srv_t p, yaz_nano_pkg_t pkg)
-{
-
-}
-
-/*
- * Local variables:
- * c-basic-offset: 4
- * c-file-style: "Stroustrup"
- * indent-tabs-mode: nil
- * End:
- * vim: shiftwidth=4 tabstop=8 expandtab
- */
-
index 90e6810..3988af0 100644 (file)
@@ -1,5 +1,6 @@
 
 #include <yaz/sock_man.h>
+#include <yaz/log.h>
 #include <yaz/nmem.h>
 #include <assert.h>
 #include <sys/epoll.h>
@@ -23,7 +24,7 @@ struct yaz_sock_chan_s {
     yaz_sock_chan_t next;
     yaz_sock_chan_t prev;
     int fd;
-    unsigned mask;
+    unsigned input_mask;
     unsigned output_mask;
     int max_idle;
     void *data;
@@ -42,7 +43,7 @@ yaz_sock_man_t yaz_sock_man_new(void)
     man->maxevents = 30;
     man->event_no = 0;
     man->event_ret = 0;
-    man->timeout = 0;
+    man->timeout = -1;
     man->rescan = 0;
     man->events = nmem_malloc(nmem, man->maxevents * sizeof(*man->events));
     if (man->epoll_handle == -1)
@@ -57,9 +58,14 @@ void yaz_sock_man_destroy(yaz_sock_man_t man)
 {
     if (man)
     {
+        while (man->chan_list)
+        {
+            yaz_log(YLOG_WARN, "yaz_sock_man_destroy: closing %p",
+                    man->chan_list);
+            yaz_sock_chan_destroy(man->chan_list);
+        }
         if (man->epoll_handle != -1)
             close(man->epoll_handle);
-        assert(man->chan_list == 0);
         nmem_destroy(man->nmem);
     }
 }
@@ -69,11 +75,11 @@ static void poll_ctl(int op, yaz_sock_chan_t p)
     struct epoll_event event;
 
     event.events = 0;
-    if (p->mask & yaz_poll_read)
+    if (p->input_mask & yaz_poll_read)
         event.events |= EPOLLIN;
-    if (p->mask & yaz_poll_write)
+    if (p->input_mask & yaz_poll_write)
         event.events |= EPOLLOUT;
-    if (p->mask & yaz_poll_except)
+    if (p->input_mask & yaz_poll_except)
         event.events |= EPOLLERR;
 
     event.data.ptr = p;
@@ -99,9 +105,10 @@ yaz_sock_chan_t yaz_sock_chan_new(yaz_sock_man_t srv, int fd, void *data,
     srv->chan_list = p;
 
     p->fd = fd;
-    p->mask = 0;
+    p->input_mask = mask;
+    p->output_mask = 0;
     p->data = data;
-    p->max_idle = 0;
+    p->max_idle = -1;
     p->man = srv;
 
     poll_ctl(EPOLL_CTL_ADD, p);
@@ -112,18 +119,19 @@ static void rescan_timeout(yaz_sock_man_t man)
 {
     if (man->rescan)
     {
-        int timeout = 0;
+        int timeout = -1;
         yaz_sock_chan_t p;
         for (p = man->chan_list; p; p = p->next)
-            if (p->max_idle && (timeout == 0 || p->max_idle < timeout))
+            if (p->max_idle != -1 && (timeout == -1 || p->max_idle < timeout))
                 timeout = p->max_idle;
         man->timeout = timeout;
         man->rescan = 0;
     }
 }
 
-void yaz_sock_chan_destroy(yaz_sock_man_t srv, yaz_sock_chan_t p)
+void yaz_sock_chan_destroy(yaz_sock_chan_t p)
 {
+    yaz_sock_man_t srv = p->man;
     if (p->prev)
         p->prev->next = p->next;
     else
@@ -162,7 +170,7 @@ yaz_sock_chan_t yaz_sock_man_wait(yaz_sock_man_t man)
         }
         man->timeout_list = 0; /* no more timeout events */
     }
-    assert(man->timeout_list = 0);
+    assert(man->timeout_list == 0);
     assert(man->event_no <= man->event_ret);
     if (man->event_no == man->event_ret)
     { /* must wait again */
@@ -202,9 +210,9 @@ yaz_sock_chan_t yaz_sock_man_wait(yaz_sock_man_t man)
 
 void yaz_sock_chan_set_mask(yaz_sock_chan_t chan, unsigned mask)
 {
-    if (chan->mask != mask)
+    if (chan->input_mask != mask)
     {
-        chan->mask = mask;
+        chan->input_mask = mask;
         poll_ctl(EPOLL_CTL_MOD, chan);
     }
 }
@@ -220,7 +228,7 @@ void yaz_sock_chan_set_max_idle(yaz_sock_chan_t chan, int max_idle)
 
 unsigned yaz_sock_get_mask(yaz_sock_chan_t chan)
 {
-    return chan->mask;
+    return chan->output_mask;
 }
 
 void *yaz_sock_chan_get_data(yaz_sock_chan_t chan)
diff --git a/src/srv.c b/src/srv.c
new file mode 100644 (file)
index 0000000..fa391ae
--- /dev/null
+++ b/src/srv.c
@@ -0,0 +1,382 @@
+/* This file is part of the YAZ toolkit.
+ * Copyright (C) 1995-2009 Index Data
+ * See the file LICENSE for details.
+ */
+/**
+ * \file 
+ * \brief Small HTTP server
+ */
+
+#include <yaz/zgdu.h>
+#include <yaz/comstack.h>
+#include <yaz/nmem.h>
+#include <yaz/log.h>
+#include <yaz/sock_man.h>
+#include <yaz/tpool.h>
+#include <assert.h>
+#include <yaz/srv.h>
+
+
+enum cs_ses_type {
+    cs_ses_type_listener,
+    cs_ses_type_accepting,
+    cs_ses_type_normal
+};
+
+struct cs_session {
+    enum cs_ses_type type;
+    COMSTACK cs;
+    yaz_sock_chan_t chan;
+    unsigned cs_put_mask;
+    unsigned cs_get_mask;
+    char *input_buffer;
+    int input_len;
+    void *user;
+};
+    
+struct yaz_pkg_s {
+    Z_GDU *gdu;
+    ODR odr;
+    struct cs_session *ses;
+    yaz_srv_t srv;
+};
+
+struct yaz_srv_s {
+    struct cs_session *listeners;
+    size_t num_listeners;
+    NMEM nmem;
+    yaz_sock_man_t sock_man;
+    yaz_tpool_t tpool;
+    int stop_flag;
+    yaz_srv_session_handler_t *session_handler;
+    yaz_srv_gdu_handler_t *gdu_handler;
+};
+
+static void cs_session_init(struct cs_session *ses, enum cs_ses_type type)
+{
+    ses->type = type;
+    ses->cs = 0;
+    ses->chan = 0;
+    ses->cs_put_mask = 0;
+    ses->cs_get_mask = yaz_poll_read;
+    ses->input_buffer = 0;
+    ses->input_len = 0;
+}
+
+static void cs_session_destroy(struct cs_session *ses)
+{
+    xfree(ses->input_buffer);
+    if (ses->chan)
+        yaz_sock_chan_destroy(ses->chan);
+    if (ses->cs)
+        cs_close(ses->cs);
+}
+
+void yaz_srv_destroy(yaz_srv_t p)
+{
+    if (p)
+    {
+        size_t i;
+
+        yaz_tpool_destroy(p->tpool);
+        for (i = 0; i < p->num_listeners; i++)
+        {
+            cs_session_destroy(p->listeners + i);
+        }
+        yaz_sock_man_destroy(p->sock_man);
+        nmem_destroy(p->nmem);
+    }
+}
+
+yaz_srv_t yaz_srv_create(const char **listeners_str)
+{
+    NMEM nmem = nmem_create();
+    yaz_srv_t p = nmem_malloc(nmem, sizeof(*p));
+    size_t i;
+    for (i = 0; listeners_str[i]; i++)
+        ;
+    p->nmem = nmem;
+
+    p->stop_flag = 0;
+    p->session_handler = 0;
+    p->gdu_handler = 0;
+    p->num_listeners = i;
+    p->listeners = 
+        nmem_malloc(nmem, p->num_listeners * sizeof(*p->listeners));
+    p->sock_man = yaz_sock_man_new();
+    p->tpool = 0;
+    for (i = 0; i < p->num_listeners; i++)
+    {
+        void *ap;
+        const char *where = listeners_str[i];
+        COMSTACK l = cs_create_host(where, CS_FLAGS_NUMERICHOST, &ap);
+
+        cs_session_init(p->listeners +i, cs_ses_type_listener);
+        if (!l)
+        {
+            yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_create_host(%s) failed", where);
+        }
+        else
+        {
+            if (cs_bind(l, ap, CS_SERVER) < 0)
+            {
+                if (cs_errno(l) == CSYSERR)
+                    yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to bind to %s", where);
+                else
+                    yaz_log(YLOG_FATAL, "Failed to bind to %s: %s", where,
+                            cs_strerror(l));
+                cs_close(l);
+            }
+            else
+            {
+                p->listeners[i].cs = l; /* success */
+                p->listeners[i].chan =
+                    yaz_sock_chan_new(p->sock_man,
+                                      cs_fileno(l),
+                                      p->listeners + i,
+                                      yaz_poll_read | yaz_poll_except);
+            }
+        }
+    }
+
+    /* check if all are OK */
+    for (i = 0; i < p->num_listeners; i++)
+        if (!p->listeners[i].cs)
+        {
+            yaz_srv_destroy(p);
+            return 0;
+        }
+    return p;
+}
+
+static void new_session(yaz_srv_t p, COMSTACK new_line)
+{
+    struct cs_session *ses = xmalloc(sizeof(*ses));
+    unsigned mask =  
+        ((new_line->io_pending & CS_WANT_WRITE) ? yaz_poll_write : 0) |
+        ((new_line->io_pending & CS_WANT_READ) ? yaz_poll_read : 0);
+
+    if (mask)
+    {
+        yaz_log(YLOG_LOG, "type accepting");
+        cs_session_init(ses, cs_ses_type_accepting);
+    }
+    else
+    {
+        yaz_log(YLOG_LOG, "type normal");
+        cs_session_init(ses, cs_ses_type_normal);
+        mask = yaz_poll_read;
+        ses->user = p->session_handler(ses);
+    }
+    ses->cs = new_line;
+    ses->chan = yaz_sock_chan_new(p->sock_man, cs_fileno(new_line), ses, mask);
+}
+
+void yaz_pkg_destroy(yaz_pkg_t pkg)
+{
+    if (pkg)
+    {
+        odr_destroy(pkg->odr);
+        xfree(pkg);
+    }
+}
+
+void work_handler(void *data)
+{
+    yaz_pkg_t pkg = (yaz_pkg_t) data;
+
+    pkg->srv->gdu_handler(pkg, pkg->ses->user);
+    yaz_pkg_destroy(pkg);
+}
+
+void work_destroy(void *data)
+{
+    yaz_pkg_t pkg = (yaz_pkg_t) data;
+    yaz_pkg_destroy(pkg);
+}
+
+
+void yaz_srv_run(yaz_srv_t p, yaz_srv_session_handler_t session_handler,
+                 yaz_srv_gdu_handler_t gdu_handler)
+{
+    yaz_sock_chan_t chan;
+
+    p->session_handler = session_handler;
+    p->gdu_handler = gdu_handler;
+
+    assert(!p->tpool);
+    p->tpool = yaz_tpool_create(work_handler, work_destroy, 20);
+    while ((chan = yaz_sock_man_wait(p->sock_man)))
+    {
+        unsigned output_mask = yaz_sock_get_mask(chan);
+        struct cs_session *ses = yaz_sock_chan_get_data(chan);
+
+        if (p->stop_flag)
+            break;
+        switch (ses->type)
+        {
+        case cs_ses_type_listener:
+            if (yaz_sock_get_mask(chan) & yaz_poll_read)
+            {
+                int ret = cs_listen(ses->cs, 0, 0);
+                if (ret < 0)
+                {
+                    yaz_log(YLOG_WARN|YLOG_ERRNO, "listen failed");
+                }
+                else if (ret == 1)
+                {
+                    yaz_log(YLOG_WARN, "cs_listen incomplete");
+                }
+                else
+                {
+                    COMSTACK new_line = cs_accept(ses->cs);
+                    if (new_line)
+                    {
+                        yaz_log(YLOG_LOG, "new session");
+                        new_session(p, new_line);
+                    }
+                    else
+                    {
+                        yaz_log(YLOG_WARN|YLOG_ERRNO, "accept failed");
+                    }
+                }
+            }
+            break;
+        case cs_ses_type_accepting:
+            if (!cs_accept(ses->cs))
+            {
+                yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_accept failed");
+                cs_session_destroy(ses);
+                xfree(ses);
+            }
+            else
+            {
+                unsigned mask =  
+                    ((ses->cs->io_pending & CS_WANT_WRITE) ? yaz_poll_write : 0) |
+                    ((ses->cs->io_pending & CS_WANT_READ) ? yaz_poll_read : 0);
+                if (mask)
+                {
+                    ses->type = cs_ses_type_accepting;
+                }
+                else
+                {
+                    ses->type = cs_ses_type_normal;
+                    mask = yaz_poll_read;
+                }
+                yaz_sock_chan_set_mask(ses->chan, mask);
+            }
+            break;
+        case cs_ses_type_normal:
+            if ((ses->cs_put_mask & yaz_poll_read) == 0 &&
+                output_mask & ses->cs_get_mask)
+            {
+                /* receiving package */
+                unsigned new_mask = yaz_poll_read;
+                yaz_log(YLOG_LOG, "Receive");
+                do
+                {
+                    int res = cs_get(ses->cs, &ses->input_buffer, &ses->input_len);
+                    if (res <= 0)
+                    {
+                        yaz_log(YLOG_WARN, "Connection closed by client");
+                        cs_session_destroy(ses);
+                        xfree(ses);
+                        ses = 0;
+                        break;
+                    }
+                    else if (res == 1)
+                    {
+                        if (ses->cs->io_pending & CS_WANT_WRITE)
+                            new_mask |= yaz_poll_write;
+                        break;
+                    }
+                    else
+                    {  /* complete package */
+                        yaz_pkg_t pkg = xmalloc(sizeof(*pkg));
+                        yaz_log(YLOG_LOG, "COMPLETE PACKAGE");
+
+                        pkg->ses = ses;
+                        pkg->srv = p;
+                        pkg->odr = odr_createmem(ODR_DECODE);
+                        odr_setbuf(pkg->odr, ses->input_buffer, res, 0);
+                        if (!z_GDU(pkg->odr, &pkg->gdu, 0, 0))
+                        {
+                            yaz_log(YLOG_WARN, "decoding failed");
+                            odr_destroy(pkg->odr);
+                            xfree(pkg);
+                        }
+                        else
+                        {
+                            yaz_tpool_add(p->tpool, pkg);
+                        }
+                    }
+                } while (cs_more(ses->cs));
+                yaz_sock_chan_set_mask(chan, new_mask);
+            }
+            if (ses && (output_mask & ses->cs_put_mask))
+            {  /* sending package */
+                yaz_log(YLOG_LOG, "Sending");
+            }
+        }
+    }
+}
+
+Z_GDU **yaz_pkg_get_gdu(yaz_pkg_t pkg)
+{
+    return &pkg->gdu;
+}
+
+ODR yaz_pkg_get_odr(yaz_pkg_t pkg)
+{
+    return pkg->odr;
+}
+
+void yaz_pkg_close(yaz_pkg_t pkg)
+{
+    struct cs_session *ses = pkg->ses;
+    if (ses)
+    {
+        cs_session_destroy(ses);
+        xfree(ses);
+    }
+    pkg->ses = 0;
+}
+
+void yaz_pkg_stop_server(yaz_pkg_t pkg)
+{
+    pkg->srv->stop_flag = 1;
+}
+
+yaz_pkg_t yaz_pkg_create(yaz_pkg_t request_pkg)
+{
+    yaz_pkg_t pkg = xmalloc(sizeof(*pkg));
+
+    pkg->gdu = 0;
+    pkg->odr = odr_createmem(ODR_ENCODE);
+    pkg->ses = request_pkg->ses;
+    pkg->srv = request_pkg->srv;
+    return pkg;
+}
+
+Z_GDU *zget_wrap_APDU(ODR o, Z_APDU *apdu)
+{
+    Z_GDU *gdu = odr_malloc(o, sizeof(*gdu));
+    gdu->which = Z_GDU_Z3950;
+    gdu->u.z3950 = apdu;
+    return gdu;
+}
+
+void yaz_pkg_send(yaz_pkg_t pkg)
+{
+    yaz_log(YLOG_WARN, "send.. UNFINISHED");
+}
+
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * c-file-style: "Stroustrup"
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+
diff --git a/src/tpool.c b/src/tpool.c
new file mode 100644 (file)
index 0000000..66d3dff
--- /dev/null
@@ -0,0 +1,164 @@
+/* This file is part of the YAZ toolkit.
+ * Copyright (C) 1995-2009 Index Data
+ * See the file LICENSE for details.
+ */
+/**
+ * \file 
+ * \brief thread pool workers
+ */
+
+#include <assert.h>
+#include <yaz/nmem.h>
+#include <yaz/tpool.h>
+#include <pthread.h>
+
+struct work_item {
+    void *data;
+    struct work_item *next;
+};
+
+struct yaz_tpool_s {
+    NMEM nmem;
+    pthread_t *thread_id;
+    pthread_mutex_t mutex;
+    pthread_cond_t input_data;
+    int stop_flag;
+    size_t no_threads;
+    struct work_item *input_queue;
+    struct work_item *output_queue;
+    struct work_item *free_queue;
+    void (*work_handler)(void *work_data);
+    void (*work_destroy)(void *work_data);
+};
+
+static struct work_item *queue_remove_last(struct work_item **q)
+{
+    struct work_item **work_p = q, *work_this = 0;
+
+    while (*work_p && (*work_p)->next)
+        work_p = &(*work_p)->next;
+    if (*work_p)
+    {
+        work_this = *work_p;
+        *work_p = 0;
+    }
+    return work_this;
+}
+
+static void queue_trav(struct work_item *q, void (*f)(void *data))
+{
+    for (; q; q = q->next)
+        f(q->data);
+}
+
+void yaz_tpool_add(yaz_tpool_t p, void *data)
+{
+    struct work_item *work_p;
+
+    pthread_mutex_lock(&p->mutex);
+
+    if (p->free_queue)
+    {
+        work_p = p->free_queue;
+        p->free_queue = p->free_queue->next;
+    }
+    else
+        work_p = nmem_malloc(p->nmem, sizeof(*work_p));
+
+    work_p->data = data;
+    work_p->next = p->input_queue;
+    p->input_queue = work_p;
+
+    pthread_cond_signal(&p->input_data);
+    pthread_mutex_unlock(&p->mutex);
+}
+
+void yaz_tpool_destroy(yaz_tpool_t p)
+{
+    if (p)
+    {
+        size_t i;
+
+        pthread_mutex_lock(&p->mutex);
+        p->stop_flag = 1;
+        pthread_cond_broadcast(&p->input_data);
+        pthread_mutex_unlock(&p->mutex);
+        
+        for (i = 0; i < p->no_threads; i++)
+            pthread_join(p->thread_id[i], 0);
+        
+        if (p->work_destroy)
+        {
+            queue_trav(p->input_queue, p->work_destroy);
+            queue_trav(p->output_queue, p->work_destroy);
+        }
+        
+        pthread_cond_destroy(&p->input_data);
+        pthread_mutex_destroy(&p->mutex);
+        nmem_destroy(p->nmem);
+    }
+}
+
+static void *tpool_thread_handler(void *vp)
+{
+    yaz_tpool_t p = (yaz_tpool_t) vp;
+    while (1)
+    {
+        struct work_item *work_this = 0;
+        /* wait for some work */
+        pthread_mutex_lock(&p->mutex);
+        while (!p->stop_flag && !p->input_queue)
+            pthread_cond_wait(&p->input_data, &p->mutex);
+        /* see if we were waken up because we're shutting down */
+        if (p->stop_flag)
+            break;
+        /* got something. Take the last one out of input_queue */
+        assert(p->input_queue);
+        work_this = queue_remove_last(&p->input_queue);
+        assert(work_this);
+
+        pthread_mutex_unlock(&p->mutex);
+
+        /* work on this item */
+        p->work_handler(work_this->data);
+    }        
+    pthread_mutex_unlock(&p->mutex);
+    return 0;
+}
+
+yaz_tpool_t yaz_tpool_create(void (*work_handler)(void *work_data),
+                             void (*work_destroy)(void *work_data),
+                             size_t no_threads)
+{
+    NMEM nmem = nmem_create();
+    yaz_tpool_t p = nmem_malloc(nmem, sizeof(*p));
+    size_t i;
+    p->nmem = nmem;
+    p->stop_flag = 0;
+    p->no_threads = no_threads;
+
+    p->input_queue = 0;
+    p->output_queue = 0;
+    p->free_queue = 0;
+
+    p->work_handler = work_handler;
+    p->work_destroy = work_destroy;
+
+    pthread_mutex_init(&p->mutex, 0);
+    pthread_cond_init(&p->input_data, 0);
+
+    p->thread_id = nmem_malloc(p->nmem, sizeof(*p->thread_id) * p->no_threads);
+    for (i = 0; i < p->no_threads; i++)
+        pthread_create (p->thread_id + i, 0, tpool_thread_handler, p);
+    return p;
+}
+
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * c-file-style: "Stroustrup"
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+
index 5daf359..04a9b15 100644 (file)
@@ -5,7 +5,8 @@ check_PROGRAMS = tstxmalloc tsticonv tstnmem tstmatchstr tstwrbuf tstodr \
  tstccl tstlog tstcomstack \
  tstsoap1 tstsoap2 tstodrstack tstlogthread tstxmlquery tstpquery \
  tst_comstack tst_filepath tst_record_conv tst_retrieval tst_tpath \
- tst_timing tst_query_charset tst_oid tst_icu_I18N tst_match_glob tst_rpn2cql
+ tst_timing tst_query_charset tst_oid tst_icu_I18N tst_match_glob \
+ tst_rpn2cql tst_srv
 check_SCRIPTS = tstmarc.sh tstmarccol.sh tstcql2xcql.sh tstcql2pqf.sh tsticu.sh
 
 TESTS = $(check_PROGRAMS) $(check_SCRIPTS)
@@ -47,7 +48,7 @@ tstodrcodec.c tstodrcodec.h: tstodr.asn $(YAZCOMP)
        cd $(srcdir); $(YAZCOMP) tstodr.asn
 
 LDADD = ../src/libyaz.la
-tst_icu_I18N_LDADD = ../src/libyaz_icu.la $(ICU_LIBS)
+tst_icu_I18N_LDADD = ../src/libyaz.la ../src/libyaz_icu.la $(ICU_LIBS)
 
 CONFIG_CLEAN_FILES=*.log
 
@@ -77,3 +78,5 @@ tst_query_charset_SOURCES = tst_query_charset.c
 tst_icu_I18N_SOURCES = tst_icu_I18N.c
 tst_match_glob_SOURCES = tst_match_glob.c
 tst_rpn2cql_SOURCES = tst_rpn2cql.c
+tst_srv_SOURCES = tst_srv.c
+tst_srv_LDADD =  ../src/libyaz.la $(PTHREAD_LIBS)
diff --git a/test/tst_srv.c b/test/tst_srv.c
new file mode 100644 (file)
index 0000000..65029b7
--- /dev/null
@@ -0,0 +1,104 @@
+/* This file is part of the YAZ toolkit.
+ * Copyright (C) 1995-2009 Index Data
+ * See the file LICENSE for details.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+
+#include <yaz/log.h>
+#include <yaz/test.h>
+#include <yaz/srv.h>
+#include <yaz/odr.h>
+#include <yaz/proto.h>
+
+struct my_info {
+    int x;
+};
+
+static void *create_session(struct cs_session *ses)
+{
+    struct my_info *my = xmalloc(sizeof(*my));
+    my->x = 42;
+    yaz_log(YLOG_LOG, "create_session");
+    return my;
+}
+
+static void gdu_handler(yaz_pkg_t pkg, void *user)
+{
+    struct my_info *my = user;
+    Z_GDU **gdu = yaz_pkg_get_gdu(pkg);
+    ODR o = odr_createmem(ODR_PRINT);
+
+    yaz_log(YLOG_LOG, "gdu_handler");
+    YAZ_CHECK_EQ(my->x, 42);
+    
+    z_GDU(o, gdu, 0, 0);
+    odr_destroy(o);
+
+    if ((*gdu)->which == Z_GDU_Z3950)
+    {
+        ODR encode = odr_createmem(ODR_ENCODE);
+        Z_APDU *apdu_req = (*gdu)->u.z3950;
+        Z_APDU *apdu_res = 0;
+        int must_close = 0;
+
+        if (apdu_req->which == Z_APDU_close)
+        {
+            apdu_res = zget_APDU(encode, Z_APDU_close);
+            *apdu_res->u.close->closeReason = Z_Close_finished;
+            must_close = 1;
+        }
+        else if (apdu_req->which == Z_APDU_initRequest)
+        {
+            apdu_res = zget_APDU(encode, Z_APDU_initResponse);
+        }
+        else
+        {
+            apdu_res = zget_APDU(encode, Z_APDU_close);
+
+            *apdu_res->u.close->closeReason = Z_Close_unspecified;
+            must_close = 1;
+        }
+        if (apdu_res)
+        {
+            yaz_pkg_t pkg_res = yaz_pkg_create(pkg);
+            *yaz_pkg_get_gdu(pkg) = zget_wrap_APDU(encode, apdu_res);
+            yaz_pkg_send(pkg_res);
+        }
+        if (must_close)
+            yaz_pkg_close(pkg);
+        yaz_pkg_stop_server(pkg);
+    }
+}
+
+static void tst_srv(void)
+{
+    const char *listeners[] = {"unix:socket", 0};
+
+    yaz_srv_t srv = yaz_srv_create(listeners);
+    YAZ_CHECK(srv);
+    if (!srv)
+        return;
+
+    yaz_srv_run(srv, create_session, gdu_handler);
+    yaz_srv_destroy(srv);
+}
+
+int main (int argc, char **argv)
+{
+    YAZ_CHECK_INIT(argc, argv);
+    YAZ_CHECK_LOG();
+    /* tst_srv(); */
+    YAZ_CHECK_TERM;
+}
+
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * c-file-style: "Stroustrup"
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+