Rename Msg_Thread to ThreadPoolSocketObserver. Rename IMsg_Thread
authorAdam Dickmeiss <adam@indexdata.dk>
Thu, 6 Oct 2005 19:33:58 +0000 (19:33 +0000)
committerAdam Dickmeiss <adam@indexdata.dk>
Thu, 6 Oct 2005 19:33:58 +0000 (19:33 +0000)
to IThreadPoolMsg. Implement it in terms of std::deque instead of
linked list.

12 files changed:
.cvsignore
src/.cvsignore [new file with mode: 0644]
src/Makefile.am
src/msg-thread.cpp [deleted file]
src/msg-thread.h [deleted file]
src/p2.cpp
src/p2_frontend.cpp
src/p2_frontend.h
src/p2_msg.cpp
src/test_thread_pool_observer.cpp [new file with mode: 0644]
src/thread_pool_observer.cpp [new file with mode: 0644]
src/thread_pool_observer.h [new file with mode: 0644]

index d572015..da39623 100644 (file)
@@ -7,3 +7,4 @@ config.status
 configure
 libtool
 autom4te.cache
+Doxyfile
diff --git a/src/.cvsignore b/src/.cvsignore
new file mode 100644 (file)
index 0000000..1500107
--- /dev/null
@@ -0,0 +1,11 @@
+.libs
+.deps
+*.lo
+*.la
+Makefile
+Makefile.in
+test_filter1
+test_filter2
+design
+p2
+test_thread_pool_observer
index 784b9eb..16b46d3 100644 (file)
@@ -1,9 +1,9 @@
-## $Id: Makefile.am,v 1.3 2005-10-06 12:55:20 adam Exp $
+## $Id: Makefile.am,v 1.4 2005-10-06 19:33:58 adam Exp $
 
 AM_CXXFLAGS = $(YAZPPINC) $(XSLT_CFLAGS) $(USEMARCONINC)
 
 bin_PROGRAMS =
-check_PROGRAMS = test_filter1 test_filter2
+check_PROGRAMS = test_filter1 test_filter2 test_thread_pool_observer
 noinst_PROGRAMS =  p2 design
 
 TESTS=$(check_PROGRAMS)
@@ -17,7 +17,11 @@ p2_SOURCES=p2_frontend.cpp p2_msg.cpp p2.cpp p2_frontend.h \
  p2_config.cpp p2_config.h \
  p2_backend.h p2_backend_dummy.cpp \
  p2_modules.cpp p2_modules.h \
- p2_xmlerror.cpp p2_xmlerror.h msg-thread.h msg-thread.cpp
+ p2_xmlerror.cpp p2_xmlerror.h \
+ thread_pool_observer.cpp thread_pool_observer.h
+
+test_thread_pool_observer_SOURCES = test_thread_pool_observer.cpp \
+ thread_pool_observer.cpp thread_pool_observer.h
 
 LDADD= $(YAZPPLALIB) $(XSLT_LIBS) $(USEMARCONLALIB)
 
diff --git a/src/msg-thread.cpp b/src/msg-thread.cpp
deleted file mode 100644 (file)
index 2f82157..0000000
+++ /dev/null
@@ -1,173 +0,0 @@
-/* $Id: msg-thread.cpp,v 1.1 2005-10-06 09:37:25 marc Exp $
-   Copyright (c) 1998-2005, Index Data.
-
-This file is part of the yaz-proxy.
-
-YAZ proxy is free software; you can redistribute it and/or modify it under
-the terms of the GNU General Public License as published by the Free
-Software Foundation; either version 2, or (at your option) any later
-version.
-
-YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
-WARRANTY; without even the implied warranty of MERCHANTABILITY or
-FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
-for more details.
-
-You should have received a copy of the GNU General Public License
-along with YAZ proxy; see the file LICENSE.  If not, write to the
-Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
-02111-1307, USA.
- */
-#include <pthread.h>
-#include <unistd.h>
-#include <ctype.h>
-#include <stdio.h>
-
-#include <yaz++/socket-observer.h>
-#include <yaz/log.h>
-
-#include "msg-thread.h"
-
-using namespace yazpp_1;
-
-IMsg_Thread::~IMsg_Thread()
-{
-
-}
-
-Msg_Thread_Queue::Msg_Thread_Queue()
-{
-    m_list = 0;
-}
-
-int Msg_Thread_Queue::size()
-{
-    int no = 0;
-    Msg_Thread_Queue_List *l;
-    for (l = m_list; l; l = l->m_next)
-        no++;
-    return no;
-}
-
-void Msg_Thread_Queue::enqueue(IMsg_Thread *m)
-{
-    Msg_Thread_Queue_List *l = new Msg_Thread_Queue_List;
-    l->m_next = m_list;
-    l->m_item = m;
-    m_list = l;
-}
-
-IMsg_Thread *Msg_Thread_Queue::dequeue()
-{
-    Msg_Thread_Queue_List **l = &m_list;
-    if (!*l)
-        return 0;
-    while ((*l)->m_next)
-        l = &(*l)->m_next;
-    IMsg_Thread *m = (*l)->m_item;
-    delete *l;
-    *l = 0;
-    return m;
-}
-
-static void *tfunc(void *p)
-{
-    Msg_Thread *pt = (Msg_Thread *) p;
-    pt->run(0);
-    return 0;
-}
-
-
-Msg_Thread::Msg_Thread(ISocketObservable *obs, int no_threads)
-    : m_SocketObservable(obs)
-{
-    pipe(m_fd);
-    obs->addObserver(m_fd[0], this);
-    obs->maskObserver(this, SOCKET_OBSERVE_READ);
-
-    m_stop_flag = false;
-    pthread_mutex_init(&m_mutex_input_data, 0);
-    pthread_cond_init(&m_cond_input_data, 0);
-    pthread_mutex_init(&m_mutex_output_data, 0);
-
-    m_no_threads = no_threads;
-    m_thread_id = new pthread_t[no_threads];
-    int i;
-    for (i = 0; i<m_no_threads; i++)
-        pthread_create(&m_thread_id[i], 0, tfunc, this);
-}
-
-Msg_Thread::~Msg_Thread()
-{
-    pthread_mutex_lock(&m_mutex_input_data);
-    m_stop_flag = true;
-    pthread_cond_broadcast(&m_cond_input_data);
-    pthread_mutex_unlock(&m_mutex_input_data);
-    
-    int i;
-    for (i = 0; i<m_no_threads; i++)
-        pthread_join(m_thread_id[i], 0);
-    delete [] m_thread_id;
-
-    m_SocketObservable->deleteObserver(this);
-
-    pthread_cond_destroy(&m_cond_input_data);
-    pthread_mutex_destroy(&m_mutex_input_data);
-    pthread_mutex_destroy(&m_mutex_output_data);
-    close(m_fd[0]);
-    close(m_fd[1]);
-}
-
-void Msg_Thread::socketNotify(int event)
-{
-    if (event & SOCKET_OBSERVE_READ)
-    {
-        char buf[2];
-        read(m_fd[0], buf, 1);
-        pthread_mutex_lock(&m_mutex_output_data);
-        IMsg_Thread *out = m_output.dequeue();
-        pthread_mutex_unlock(&m_mutex_output_data);
-        if (out)
-            out->result();
-    }
-}
-
-void Msg_Thread::run(void *p)
-{
-    while(1)
-    {
-        pthread_mutex_lock(&m_mutex_input_data);
-        while (!m_stop_flag && m_input.size() == 0)
-            pthread_cond_wait(&m_cond_input_data, &m_mutex_input_data);
-        if (m_stop_flag)
-        {
-            pthread_mutex_unlock(&m_mutex_input_data);
-            break;
-        }
-        IMsg_Thread *in = m_input.dequeue();
-        pthread_mutex_unlock(&m_mutex_input_data);
-
-        IMsg_Thread *out = in->handle();
-        pthread_mutex_lock(&m_mutex_output_data);
-        m_output.enqueue(out);
-        
-        write(m_fd[1], "", 1);
-        pthread_mutex_unlock(&m_mutex_output_data);
-    }
-}
-
-void Msg_Thread::put(IMsg_Thread *m)
-{
-    pthread_mutex_lock(&m_mutex_input_data);
-    m_input.enqueue(m);
-    pthread_cond_signal(&m_cond_input_data);
-    pthread_mutex_unlock(&m_mutex_input_data);
-}
-/*
- * Local variables:
- * c-basic-offset: 4
- * indent-tabs-mode: nil
- * End:
- * vim: shiftwidth=4 tabstop=8 expandtab
- */
-
diff --git a/src/msg-thread.h b/src/msg-thread.h
deleted file mode 100644 (file)
index 0218c3c..0000000
+++ /dev/null
@@ -1,85 +0,0 @@
-/* $Id: msg-thread.h,v 1.1 2005-10-06 09:37:25 marc Exp $
-   Copyright (c) 1998-2005, Index Data.
-
-This file is part of the yaz-proxy.
-
-YAZ proxy is free software; you can redistribute it and/or modify it under
-the terms of the GNU General Public License as published by the Free
-Software Foundation; either version 2, or (at your option) any later
-version.
-
-YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
-WARRANTY; without even the implied warranty of MERCHANTABILITY or
-FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
-for more details.
-
-You should have received a copy of the GNU General Public License
-along with YAZ proxy; see the file LICENSE.  If not, write to the
-Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
-02111-1307, USA.
- */
-
-#include <pthread.h>
-#include <unistd.h>
-#include <ctype.h>
-
-#if HAVE_DLFCN_H
-#include <dlfcn.h>
-#endif
-
-#include <yaz++/socket-observer.h>
-#include <yaz/yconfig.h>
-
-class IMsg_Thread {
-public:
-    virtual IMsg_Thread *handle() = 0;
-    virtual void result() = 0;
-    virtual ~IMsg_Thread();
-};
-
-class Msg_Thread_Queue_List {
-    friend class Msg_Thread_Queue;
- private:
-    IMsg_Thread *m_item;
-    Msg_Thread_Queue_List *m_next;
-};
-
-class Msg_Thread_Queue {
- public:
-    Msg_Thread_Queue();
-    void enqueue(IMsg_Thread *in);
-    IMsg_Thread *dequeue();
-    int size();
- private:
-    Msg_Thread_Queue_List *m_list;
-};
-
-class Msg_Thread : public yazpp_1::ISocketObserver {
- public:
-    Msg_Thread(yazpp_1::ISocketObservable *obs, int no_threads);
-    virtual ~Msg_Thread();
-    void socketNotify(int event);
-    void put(IMsg_Thread *m);
-    IMsg_Thread *get();
-    void run(void *p);
-    int m_fd[2];
-private:
-    yazpp_1::ISocketObservable *m_SocketObservable;
-    int m_no_threads;
-    pthread_t *m_thread_id;
-    Msg_Thread_Queue m_input;
-    Msg_Thread_Queue m_output;
-    pthread_mutex_t m_mutex_input_data;
-    pthread_cond_t m_cond_input_data;
-    pthread_mutex_t m_mutex_output_data;
-    bool m_stop_flag;
-};
-
-/*
- * Local variables:
- * c-basic-offset: 4
- * indent-tabs-mode: nil
- * End:
- * vim: shiftwidth=4 tabstop=8 expandtab
- */
-
index 11f8716..b586c20 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: p2.cpp,v 1.1 2005-10-06 09:37:25 marc Exp $
+/* $Id: p2.cpp,v 1.2 2005-10-06 19:33:58 adam Exp $
    Copyright (c) 1998-2005, Index Data.
 
 This file is part of the yaz-proxy.
@@ -57,7 +57,7 @@ void P2_Server::unlockConfig()
 }
 
 P2_Server::P2_Server(IPDU_Observable *the_PDU_Observable,
-                     Msg_Thread *my_thread,
+                     ThreadPoolSocketObserver *my_thread,
                      P2_Config *config,
                      P2_ModuleFactory *modules)
     :  Z_Assoc(the_PDU_Observable)
@@ -117,7 +117,7 @@ int main(int argc, char **argv)
 
     PDU_Assoc *my_PDU_Assoc = 0;
     
-    Msg_Thread my_thread(&mySocketManager, config.m_no_threads);
+    ThreadPoolSocketObserver my_thread(&mySocketManager, config.m_no_threads);
 
     my_PDU_Assoc = new PDU_Assoc(&mySocketManager);
 
index faa602c..6c73186 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: p2_frontend.cpp,v 1.1 2005-10-06 09:37:25 marc Exp $
+/* $Id: p2_frontend.cpp,v 1.2 2005-10-06 19:33:58 adam Exp $
    Copyright (c) 1998-2005, Index Data.
 
 This file is part of the yaz-proxy.
@@ -27,7 +27,8 @@ using namespace yazpp_1;
 using namespace std;
 
 P2_Frontend::P2_Frontend(IPDU_Observable *the_PDU_Observable,
-                         Msg_Thread *my_thread, P2_Server *server)
+                         ThreadPoolSocketObserver 
+                         *my_thread, P2_Server *server)
     :  Z_Assoc(the_PDU_Observable)
 {
     m_my_thread = my_thread;
index 1c60373..dd0c7f8 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: p2_frontend.h,v 1.1 2005-10-06 09:37:25 marc Exp $
+/* $Id: p2_frontend.h,v 1.2 2005-10-06 19:33:58 adam Exp $
    Copyright (c) 1998-2005, Index Data.
 
 This file is part of the yaz-proxy.
@@ -26,7 +26,7 @@ Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
 #include <vector>
 #include <string>
 
-#include "msg-thread.h"
+#include "thread_pool_observer.h"
 #include <yaz++/z-assoc.h>
 #include <yaz++/pdu-assoc.h>
 #include <yaz++/gdu.h>
@@ -68,7 +68,7 @@ class P2_Server : public yazpp_1::Z_Assoc {
 public:
     ~P2_Server();
     P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
-              Msg_Thread *m_my_thread,
+              ThreadPoolSocketObserver *m_my_thread,
               P2_Config *config,
               P2_ModuleFactory *modules);
     P2_Config *lockConfig();
@@ -86,7 +86,7 @@ private:
     void connectNotify();
 private:
     P2_Config *m_config;
-    Msg_Thread *m_my_thread;
+    ThreadPoolSocketObserver *m_my_thread;
     pthread_mutex_t m_mutex_config;
 };
 
@@ -101,14 +101,14 @@ public:
     yazpp_1::Yaz_Z_Query m_query;
 };
 
-class P2_Msg : public IMsg_Thread {
+class P2_Msg : public IThreadPoolMsg {
 public:
     int m_close_flag;
     yazpp_1::GDU *m_gdu;
     yazpp_1::GDU *m_output;
     P2_Frontend *m_front;
     P2_Server *m_server;
-    IMsg_Thread *handle();
+    IThreadPoolMsg *handle();
     void result();
     P2_Msg(yazpp_1::GDU *gdu, P2_Frontend *front, P2_Server *server);
     virtual ~P2_Msg();
@@ -130,7 +130,7 @@ class P2_Frontend : public yazpp_1::Z_Assoc {
  public:
     ~P2_Frontend();
     P2_Frontend(yazpp_1::IPDU_Observable *the_PDU_Observable,
-                Msg_Thread *m_my_thread, P2_Server *server);
+                ThreadPoolSocketObserver *m_my_thread, P2_Server *server);
     IPDU_Observer* sessionNotify(yazpp_1::IPDU_Observable *the_PDU_Observable,
                                  int fd);
     
@@ -146,7 +146,7 @@ class P2_Frontend : public yazpp_1::Z_Assoc {
     
  private:
     yazpp_1::GDUQueue m_in_queue;
-    Msg_Thread *m_my_thread;
+    ThreadPoolSocketObserver *m_my_thread;
     P2_Server *m_server;
  private:
     bool P2_Frontend::search(Z_GDU *z_gdu);
index ae2177f..7b2f4c4 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: p2_msg.cpp,v 1.1 2005-10-06 09:37:25 marc Exp $
+/* $Id: p2_msg.cpp,v 1.2 2005-10-06 19:33:58 adam Exp $
    Copyright (c) 1998-2005, Index Data.
 
 This file is part of the yaz-proxy.
@@ -317,7 +317,7 @@ Z_APDU *P2_Msg::frontend_present_apdu(Z_APDU *request_apdu, ODR odr)
     return zget_APDU(odr, Z_APDU_presentResponse);
 }
     
-IMsg_Thread *P2_Msg::handle()
+IThreadPoolMsg *P2_Msg::handle()
 {
     ODR odr = odr_createmem(ODR_ENCODE);
     yaz_log(YLOG_LOG, "P2_Msg:handle begin");
diff --git a/src/test_thread_pool_observer.cpp b/src/test_thread_pool_observer.cpp
new file mode 100644 (file)
index 0000000..056f671
--- /dev/null
@@ -0,0 +1,102 @@
+/* $Id: test_thread_pool_observer.cpp,v 1.1 2005-10-06 19:33:58 adam Exp $
+   Copyright (c) 1998-2005, Index Data.
+
+This file is part of the yaz-proxy.
+
+YAZ proxy is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 2, or (at your option) any later
+version.
+
+YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+for more details.
+
+You should have received a copy of the GNU General Public License
+along with YAZ proxy; see the file LICENSE.  If not, write to the
+Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
+02111-1307, USA.
+ */
+
+#include <stdlib.h>
+#include <ctype.h>
+
+#include <yaz++/pdu-assoc.h>
+#include <yaz++/socket-manager.h>
+#include <yaz/log.h>
+#include "thread_pool_observer.h"
+
+using namespace yazpp_1;
+
+class My_Msg : public IThreadPoolMsg {
+public:
+    IThreadPoolMsg *handle();
+    void result();
+    int m_val;
+};
+
+IThreadPoolMsg *My_Msg::handle()
+{
+    My_Msg *res = new My_Msg;
+    int sl = rand() % 5;
+
+    res->m_val = m_val;
+    printf("My_Msg::handle val=%d sleep=%d\n", m_val, sl);
+    sleep(sl);
+    return res;
+}
+
+void My_Msg::result()
+{
+    printf("My_Msg::result val=%d\n", m_val);
+}
+
+class My_Timer_Thread : public ISocketObserver {
+private:
+    ISocketObservable *m_obs;
+    int m_fd[2];
+    ThreadPoolSocketObserver *m_t;
+public:
+    My_Timer_Thread(ISocketObservable *obs, ThreadPoolSocketObserver *t);
+    void socketNotify(int event);
+};
+
+My_Timer_Thread::My_Timer_Thread(ISocketObservable *obs,
+                                 ThreadPoolSocketObserver *t) : m_obs(obs) 
+{
+    pipe(m_fd);
+    m_t = t;
+    obs->addObserver(m_fd[0], this);
+    obs->maskObserver(this, SOCKET_OBSERVE_READ);
+    obs->timeoutObserver(this, 1);
+}
+
+void My_Timer_Thread::socketNotify(int event)
+{
+    static int seq = 1;
+    printf("Add %d\n", seq);
+    My_Msg *m = new My_Msg;
+    m->m_val = seq++;
+    m_t->put(m);
+}
+
+int main(int argc, char **argv)
+{
+    SocketManager mySocketManager;
+
+    ThreadPoolSocketObserver m(&mySocketManager, 3);
+    My_Timer_Thread t(&mySocketManager, &m) ;
+    int i = 0;
+    while (++i < 5 && mySocketManager.processEvent() > 0)
+        ;
+    return 0;
+}
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+
diff --git a/src/thread_pool_observer.cpp b/src/thread_pool_observer.cpp
new file mode 100644 (file)
index 0000000..6a8f5cb
--- /dev/null
@@ -0,0 +1,141 @@
+/* $Id: thread_pool_observer.cpp,v 1.1 2005-10-06 19:33:58 adam Exp $
+   Copyright (c) 1998-2005, Index Data.
+
+This file is part of the yaz-proxy.
+
+YAZ proxy is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 2, or (at your option) any later
+version.
+
+YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+for more details.
+
+You should have received a copy of the GNU General Public License
+along with YAZ proxy; see the file LICENSE.  If not, write to the
+Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
+02111-1307, USA.
+ */
+#include <pthread.h>
+#include <unistd.h>
+#include <ctype.h>
+#include <stdio.h>
+
+#include <yaz++/socket-observer.h>
+#include <yaz/log.h>
+
+#include "thread_pool_observer.h"
+
+using namespace yazpp_1;
+
+IThreadPoolMsg::~IThreadPoolMsg()
+{
+
+}
+
+static void *tfunc(void *p)
+{
+    ThreadPoolSocketObserver *pt = (ThreadPoolSocketObserver *) p;
+    pt->run(0);
+    return 0;
+}
+
+
+ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs, int no_threads)
+    : m_SocketObservable(obs)
+{
+    pipe(m_fd);
+    obs->addObserver(m_fd[0], this);
+    obs->maskObserver(this, SOCKET_OBSERVE_READ);
+
+    m_stop_flag = false;
+    pthread_mutex_init(&m_mutex_input_data, 0);
+    pthread_cond_init(&m_cond_input_data, 0);
+    pthread_mutex_init(&m_mutex_output_data, 0);
+
+    m_no_threads = no_threads;
+    m_thread_id = new pthread_t[no_threads];
+    int i;
+    for (i = 0; i<m_no_threads; i++)
+        pthread_create(&m_thread_id[i], 0, tfunc, this);
+}
+
+ThreadPoolSocketObserver::~ThreadPoolSocketObserver()
+{
+    pthread_mutex_lock(&m_mutex_input_data);
+    m_stop_flag = true;
+    pthread_cond_broadcast(&m_cond_input_data);
+    pthread_mutex_unlock(&m_mutex_input_data);
+    
+    int i;
+    for (i = 0; i<m_no_threads; i++)
+        pthread_join(m_thread_id[i], 0);
+    delete [] m_thread_id;
+
+    m_SocketObservable->deleteObserver(this);
+
+    pthread_cond_destroy(&m_cond_input_data);
+    pthread_mutex_destroy(&m_mutex_input_data);
+    pthread_mutex_destroy(&m_mutex_output_data);
+    close(m_fd[0]);
+    close(m_fd[1]);
+}
+
+void ThreadPoolSocketObserver::socketNotify(int event)
+{
+    if (event & SOCKET_OBSERVE_READ)
+    {
+        char buf[2];
+        read(m_fd[0], buf, 1);
+        pthread_mutex_lock(&m_mutex_output_data);
+        IThreadPoolMsg *out = m_output.front();
+        m_output.pop_front();
+        pthread_mutex_unlock(&m_mutex_output_data);
+        if (out)
+            out->result();
+    }
+}
+
+void ThreadPoolSocketObserver::run(void *p)
+{
+    while(1)
+    {
+        pthread_mutex_lock(&m_mutex_input_data);
+        while (!m_stop_flag && m_input.size() == 0)
+            pthread_cond_wait(&m_cond_input_data, &m_mutex_input_data);
+        if (m_stop_flag)
+        {
+            pthread_mutex_unlock(&m_mutex_input_data);
+            break;
+        }
+        IThreadPoolMsg *in = m_input.front();
+        m_input.pop_front();
+        pthread_mutex_unlock(&m_mutex_input_data);
+
+        IThreadPoolMsg *out = in->handle();
+        pthread_mutex_lock(&m_mutex_output_data);
+
+        m_output.push_back(out);
+        
+        write(m_fd[1], "", 1);
+        pthread_mutex_unlock(&m_mutex_output_data);
+    }
+}
+
+void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
+{
+    pthread_mutex_lock(&m_mutex_input_data);
+    m_input.push_back(m);
+    pthread_cond_signal(&m_cond_input_data);
+    pthread_mutex_unlock(&m_mutex_input_data);
+}
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+
diff --git a/src/thread_pool_observer.h b/src/thread_pool_observer.h
new file mode 100644 (file)
index 0000000..03604c2
--- /dev/null
@@ -0,0 +1,70 @@
+/* $Id: thread_pool_observer.h,v 1.1 2005-10-06 19:33:58 adam Exp $
+   Copyright (c) 1998-2005, Index Data.
+
+This file is part of the yaz-proxy.
+
+YAZ proxy is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 2, or (at your option) any later
+version.
+
+YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+for more details.
+
+You should have received a copy of the GNU General Public License
+along with YAZ proxy; see the file LICENSE.  If not, write to the
+Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
+02111-1307, USA.
+ */
+
+#include <pthread.h>
+#include <unistd.h>
+#include <ctype.h>
+
+#if HAVE_DLFCN_H
+#include <dlfcn.h>
+#endif
+
+#include <deque>
+#include <yaz++/socket-observer.h>
+#include <yaz/yconfig.h>
+
+class IThreadPoolMsg {
+public:
+    virtual IThreadPoolMsg *handle() = 0;
+    virtual void result() = 0;
+    virtual ~IThreadPoolMsg();
+};
+
+class ThreadPoolSocketObserver : public yazpp_1::ISocketObserver {
+ public:
+    ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs, int no_threads);
+    virtual ~ThreadPoolSocketObserver();
+    void socketNotify(int event);
+    void put(IThreadPoolMsg *m);
+    IThreadPoolMsg *get();
+    void run(void *p);
+    int m_fd[2];
+private:
+    yazpp_1::ISocketObservable *m_SocketObservable;
+    int m_no_threads;
+    pthread_t *m_thread_id;
+
+    std::deque<IThreadPoolMsg *> m_input;
+    std::deque<IThreadPoolMsg *> m_output;
+    pthread_mutex_t m_mutex_input_data;
+    pthread_cond_t m_cond_input_data;
+    pthread_mutex_t m_mutex_output_data;
+    bool m_stop_flag;
+};
+
+/*
+ * Local variables:
+ * c-basic-offset: 4
+ * indent-tabs-mode: nil
+ * End:
+ * vim: shiftwidth=4 tabstop=8 expandtab
+ */
+