Move memcmp2 to util. Change use of namespaces a little because Doxygen
[metaproxy-moved-to-github.git] / src / thread_pool_observer.cpp
1
2 /* $Id: thread_pool_observer.cpp,v 1.16 2006-06-09 14:12:13 adam Exp $
3    Copyright (c) 2005-2006, Index Data.
4
5 %LICENSE%
6  */
7 #include "config.hpp"
8
9 #if HAVE_UNISTD_H
10 #include <unistd.h>
11 #endif
12 #ifdef WIN32
13 #include <winsock.h>
14 #endif
15
16 #if HAVE_SYS_SOCKET_H
17 #include <sys/socket.h>
18 #endif
19
20 #include <boost/thread/thread.hpp>
21 #include <boost/thread/mutex.hpp>
22 #include <boost/thread/condition.hpp>
23
24 #include <ctype.h>
25 #include <stdio.h>
26
27 #include <deque>
28
29 #include <yazpp/socket-observer.h>
30 #include <yaz/log.h>
31
32 #include "thread_pool_observer.hpp"
33 #include "pipe.hpp"
34
35 namespace metaproxy_1 {
36     class ThreadPoolSocketObserver::Worker {
37     public:
38         Worker(ThreadPoolSocketObserver *s) : m_s(s) {};
39         ThreadPoolSocketObserver *m_s;
40         void operator() (void) {
41             m_s->run(0);
42         }
43     };
44
45     class ThreadPoolSocketObserver::Rep : public boost::noncopyable {
46         friend class ThreadPoolSocketObserver;
47     public:
48         Rep(yazpp_1::ISocketObservable *obs);
49         ~Rep();
50     private:
51         yazpp_1::ISocketObservable *m_socketObservable;
52         Pipe m_pipe;
53         boost::thread_group m_thrds;
54         boost::mutex m_mutex_input_data;
55         boost::condition m_cond_input_data;
56         boost::mutex m_mutex_output_data;
57         std::deque<IThreadPoolMsg *> m_input;
58         std::deque<IThreadPoolMsg *> m_output;
59         bool m_stop_flag;
60         int m_no_threads;
61     };
62 }
63
64
65 using namespace yazpp_1;
66 using namespace metaproxy_1;
67
68 ThreadPoolSocketObserver::Rep::Rep(yazpp_1::ISocketObservable *obs)
69     : m_socketObservable(obs), m_pipe(9123)
70 {
71 }
72
73 ThreadPoolSocketObserver::Rep::~Rep()
74 {
75 }
76
77 IThreadPoolMsg::~IThreadPoolMsg()
78 {
79
80 }
81
82 ThreadPoolSocketObserver::ThreadPoolSocketObserver(
83     yazpp_1::ISocketObservable *obs, int no_threads)
84     : m_p(new Rep(obs))
85 {
86     obs->addObserver(m_p->m_pipe.read_fd(), this);
87     obs->maskObserver(this, SOCKET_OBSERVE_READ);
88
89     m_p->m_stop_flag = false;
90     m_p->m_no_threads = no_threads;
91     int i;
92     for (i = 0; i<no_threads; i++)
93     {
94         Worker w(this);
95         m_p->m_thrds.add_thread(new boost::thread(w));
96     }
97 }
98
99 ThreadPoolSocketObserver::~ThreadPoolSocketObserver()
100 {
101     {
102         boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
103         m_p->m_stop_flag = true;
104         m_p->m_cond_input_data.notify_all();
105     }
106     m_p->m_thrds.join_all();
107
108     m_p->m_socketObservable->deleteObserver(this);
109 }
110
111 void ThreadPoolSocketObserver::socketNotify(int event)
112 {
113     if (event & SOCKET_OBSERVE_READ)
114     {
115         char buf[2];
116         recv(m_p->m_pipe.read_fd(), buf, 1, 0);
117         IThreadPoolMsg *out;
118         {
119             boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
120             out = m_p->m_output.front();
121             m_p->m_output.pop_front();
122         }
123         if (out)
124             out->result();
125     }
126 }
127
128 void ThreadPoolSocketObserver::run(void *p)
129 {
130     while(1)
131     {
132         IThreadPoolMsg *in = 0;
133         {
134             boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
135             while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
136                 m_p->m_cond_input_data.wait(input_lock);
137             if (m_p->m_stop_flag)
138                 break;
139             
140             in = m_p->m_input.front();
141             m_p->m_input.pop_front();
142         }
143         IThreadPoolMsg *out = in->handle();
144         {
145             boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
146             m_p->m_output.push_back(out);
147             send(m_p->m_pipe.write_fd(), "", 1, 0);
148         }
149     }
150 }
151
152 void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
153 {
154     boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
155     m_p->m_input.push_back(m);
156     m_p->m_cond_input_data.notify_one();
157 }
158 /*
159  * Local variables:
160  * c-basic-offset: 4
161  * indent-tabs-mode: nil
162  * c-file-style: "stroustrup"
163  * End:
164  * vim: shiftwidth=4 tabstop=8 expandtab
165  */
166