GPL v2.
[metaproxy-moved-to-github.git] / src / thread_pool_observer.cpp
1 /* $Id: thread_pool_observer.cpp,v 1.21 2007-05-09 21:23:09 adam Exp $
2    Copyright (c) 2005-2007, Index Data.
3
4 This file is part of Metaproxy.
5
6 Metaproxy is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Metaproxy; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #include "config.hpp"
23
24 #if HAVE_UNISTD_H
25 #include <unistd.h>
26 #endif
27 #ifdef WIN32
28 #include <winsock.h>
29 #endif
30
31 #if HAVE_SYS_SOCKET_H
32 #include <sys/socket.h>
33 #endif
34
35 #include <boost/thread/thread.hpp>
36 #include <boost/thread/mutex.hpp>
37 #include <boost/thread/condition.hpp>
38
39 #include <ctype.h>
40 #include <stdio.h>
41
42 #include <deque>
43
44 #include <yazpp/socket-observer.h>
45 #include <yaz/log.h>
46
47 #include "thread_pool_observer.hpp"
48 #include "pipe.hpp"
49
50 namespace metaproxy_1 {
51     class ThreadPoolSocketObserver::Worker {
52     public:
53         Worker(ThreadPoolSocketObserver *s) : m_s(s) {};
54         ThreadPoolSocketObserver *m_s;
55         void operator() (void) {
56             m_s->run(0);
57         }
58     };
59
60     class ThreadPoolSocketObserver::Rep : public boost::noncopyable {
61         friend class ThreadPoolSocketObserver;
62     public:
63         Rep(yazpp_1::ISocketObservable *obs);
64         ~Rep();
65     private:
66         yazpp_1::ISocketObservable *m_socketObservable;
67         Pipe m_pipe;
68         boost::thread_group m_thrds;
69         boost::mutex m_mutex_input_data;
70         boost::condition m_cond_input_data;
71         boost::condition m_cond_input_full;
72         boost::mutex m_mutex_output_data;
73         std::deque<IThreadPoolMsg *> m_input;
74         std::deque<IThreadPoolMsg *> m_output;
75         bool m_stop_flag;
76         unsigned m_no_threads;
77     };
78     const unsigned int queue_size_per_thread = 64;
79 }
80
81
82
83 using namespace yazpp_1;
84 using namespace metaproxy_1;
85
86 ThreadPoolSocketObserver::Rep::Rep(yazpp_1::ISocketObservable *obs)
87     : m_socketObservable(obs), m_pipe(9123)
88 {
89 }
90
91 ThreadPoolSocketObserver::Rep::~Rep()
92 {
93 }
94
95 IThreadPoolMsg::~IThreadPoolMsg()
96 {
97
98 }
99
100 ThreadPoolSocketObserver::ThreadPoolSocketObserver(
101     yazpp_1::ISocketObservable *obs, int no_threads)
102     : m_p(new Rep(obs))
103 {
104     obs->addObserver(m_p->m_pipe.read_fd(), this);
105     obs->maskObserver(this, SOCKET_OBSERVE_READ);
106
107     m_p->m_stop_flag = false;
108     m_p->m_no_threads = no_threads;
109     int i;
110     for (i = 0; i<no_threads; i++)
111     {
112         Worker w(this);
113         m_p->m_thrds.add_thread(new boost::thread(w));
114     }
115 }
116
117 ThreadPoolSocketObserver::~ThreadPoolSocketObserver()
118 {
119     {
120         boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
121         m_p->m_stop_flag = true;
122         m_p->m_cond_input_data.notify_all();
123     }
124     m_p->m_thrds.join_all();
125
126     m_p->m_socketObservable->deleteObserver(this);
127 }
128
129 void ThreadPoolSocketObserver::socketNotify(int event)
130 {
131     if (event & SOCKET_OBSERVE_READ)
132     {
133         char buf[2];
134 #ifdef WIN32
135         recv(m_p->m_pipe.read_fd(), buf, 1, 0);
136 #else
137         read(m_p->m_pipe.read_fd(), buf, 1);
138 #endif
139         IThreadPoolMsg *out;
140         {
141             boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
142             out = m_p->m_output.front();
143             m_p->m_output.pop_front();
144         }
145         if (out)
146             out->result();
147     }
148 }
149
150 void ThreadPoolSocketObserver::run(void *p)
151 {
152     while(1)
153     {
154         IThreadPoolMsg *in = 0;
155         {
156             boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
157             while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
158                 m_p->m_cond_input_data.wait(input_lock);
159             if (m_p->m_stop_flag)
160                 break;
161             
162             in = m_p->m_input.front();
163             m_p->m_input.pop_front(); 
164             m_p->m_cond_input_full.notify_all();
165         }
166         IThreadPoolMsg *out = in->handle();
167         {
168             boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
169             m_p->m_output.push_back(out);
170 #ifdef WIN32
171             send(m_p->m_pipe.write_fd(), "", 1, 0);
172 #else
173             write(m_p->m_pipe.write_fd(), "", 1);
174 #endif
175         }
176     }
177 }
178
179 void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
180 {
181     boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
182     while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
183         m_p->m_cond_input_full.wait(input_lock);
184     m_p->m_input.push_back(m);
185     m_p->m_cond_input_data.notify_one();
186 }
187 /*
188  * Local variables:
189  * c-basic-offset: 4
190  * indent-tabs-mode: nil
191  * c-file-style: "stroustrup"
192  * End:
193  * vim: shiftwidth=4 tabstop=8 expandtab
194  */
195