Pimpl the ThreadPoolSocketObserver class so we can make code depending
[metaproxy-moved-to-github.git] / src / thread_pool_observer.cpp
1
2 /* $Id: thread_pool_observer.cpp,v 1.9 2005-11-04 11:06:52 adam Exp $
3    Copyright (c) 2005, Index Data.
4
5 %LICENSE%
6  */
7 #include "config.hpp"
8
9 #if HAVE_UNISTD_H
10 #include <unistd.h>
11 #endif
12 #ifdef WIN32
13 #include <winsock.h>
14 #endif
15
16 #include <boost/thread/thread.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/condition.hpp>
19
20 #include <ctype.h>
21 #include <stdio.h>
22
23 #include <yaz++/socket-observer.h>
24 #include <yaz/log.h>
25
26 #include "thread_pool_observer.hpp"
27
28 namespace yp2 {
29     class ThreadPoolSocketObserver::Worker {
30     public:
31         Worker(ThreadPoolSocketObserver *s) : m_s(s) {};
32         ThreadPoolSocketObserver *m_s;
33         void operator() (void) {
34             m_s->run(0);
35         }
36     };
37
38     class ThreadPoolSocketObserver::Rep : public boost::noncopyable {
39         friend class ThreadPoolSocketObserver;
40     public:
41         Rep(yazpp_1::ISocketObservable *obs);
42         ~Rep();
43     private:
44         yazpp_1::ISocketObservable *m_socketObservable;
45         int m_fd[2];
46         boost::thread_group m_thrds;
47         boost::mutex m_mutex_input_data;
48         boost::condition m_cond_input_data;
49         boost::mutex m_mutex_output_data;
50         std::deque<IThreadPoolMsg *> m_input;
51         std::deque<IThreadPoolMsg *> m_output;
52         bool m_stop_flag;
53         int m_no_threads;
54     };
55 }
56
57
58 using namespace yazpp_1;
59 using namespace yp2;
60
61 ThreadPoolSocketObserver::Rep::Rep(ISocketObservable *obs)
62     : m_socketObservable(obs)
63 {
64 }
65
66 ThreadPoolSocketObserver::Rep::~Rep()
67 {
68 }
69
70 IThreadPoolMsg::~IThreadPoolMsg()
71 {
72
73 }
74
75 ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs,
76                                                    int no_threads)
77     : m_p(new Rep(obs))
78 {
79     pipe(m_p->m_fd);
80     obs->addObserver(m_p->m_fd[0], this);
81     obs->maskObserver(this, SOCKET_OBSERVE_READ);
82
83     m_p->m_stop_flag = false;
84     m_p->m_no_threads = no_threads;
85     int i;
86     for (i = 0; i<no_threads; i++)
87     {
88         Worker w(this);
89         m_p->m_thrds.add_thread(new boost::thread(w));
90     }
91 }
92
93 ThreadPoolSocketObserver::~ThreadPoolSocketObserver()
94 {
95     {
96         boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
97         m_p->m_stop_flag = true;
98         m_p->m_cond_input_data.notify_all();
99     }
100     m_p->m_thrds.join_all();
101
102     m_p->m_socketObservable->deleteObserver(this);
103
104     close(m_p->m_fd[0]);
105     close(m_p->m_fd[1]);
106 }
107
108 void ThreadPoolSocketObserver::socketNotify(int event)
109 {
110     if (event & SOCKET_OBSERVE_READ)
111     {
112         char buf[2];
113         read(m_p->m_fd[0], buf, 1);
114         IThreadPoolMsg *out;
115         {
116             boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
117             out = m_p->m_output.front();
118             m_p->m_output.pop_front();
119         }
120         if (out)
121             out->result();
122     }
123 }
124
125 void ThreadPoolSocketObserver::run(void *p)
126 {
127     while(1)
128     {
129         IThreadPoolMsg *in = 0;
130         {
131             boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
132             while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
133                 m_p->m_cond_input_data.wait(input_lock);
134             if (m_p->m_stop_flag)
135                 break;
136             
137             in = m_p->m_input.front();
138             m_p->m_input.pop_front();
139         }
140         IThreadPoolMsg *out = in->handle();
141         {
142             boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
143             m_p->m_output.push_back(out);
144             write(m_p->m_fd[1], "", 1);
145         }
146     }
147 }
148
149 void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
150 {
151     boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
152     m_p->m_input.push_back(m);
153     m_p->m_cond_input_data.notify_one();
154 }
155 /*
156  * Local variables:
157  * c-basic-offset: 4
158  * indent-tabs-mode: nil
159  * c-file-style: "stroustrup"
160  * End:
161  * vim: shiftwidth=4 tabstop=8 expandtab
162  */
163