Use scoped_ptr for private data in FrontendNet. Allow threads to be set
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
1 /* $Id: filter_frontend_net.cpp,v 1.14 2006-01-09 21:19:11 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "xmlutil.hpp"
10 #include "pipe.hpp"
11 #include "filter.hpp"
12 #include "package.hpp"
13 #include "thread_pool_observer.hpp"
14 #include "filter_frontend_net.hpp"
15 #include <yaz++/z-assoc.h>
16 #include <yaz++/pdu-assoc.h>
17 #include <yaz++/socket-manager.h>
18 #include <yaz/log.h>
19
20 #include <iostream>
21
22 namespace yp2 {
23     namespace filter {
24         class FrontendNet::Rep {
25             friend class FrontendNet;
26             int m_no_threads;
27             std::vector<std::string> m_ports;
28             int m_listen_duration;
29         };
30     }
31     class My_Timer_Thread : public yazpp_1::ISocketObserver {
32     private:
33         yazpp_1::ISocketObservable *m_obs;
34         Pipe m_pipe;
35         bool m_timeout;
36     public:
37         My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
38         void socketNotify(int event);
39         bool timeout();
40     };
41     class ZAssocChild : public yazpp_1::Z_Assoc {
42     public:
43         ~ZAssocChild();
44         ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
45                           yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
46                           const yp2::Package *package);
47         int m_no_requests;
48     private:
49         yazpp_1::IPDU_Observer* sessionNotify(
50             yazpp_1::IPDU_Observable *the_PDU_Observable,
51             int fd);
52         void recv_GDU(Z_GDU *apdu, int len);
53         
54         void failNotify();
55         void timeoutNotify();
56         void connectNotify();
57     private:
58         yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
59         yp2::Session m_session;
60         yp2::Origin m_origin;
61         bool m_delete_flag;
62         const yp2::Package *m_package;
63     };
64     class ThreadPoolPackage : public yp2::IThreadPoolMsg {
65     public:
66         ThreadPoolPackage(yp2::Package *package, yp2::ZAssocChild *ses) :
67             m_session(ses), m_package(package) { };
68         ~ThreadPoolPackage();
69         IThreadPoolMsg *handle();
70         void result();
71         
72     private:
73         yp2::ZAssocChild *m_session;
74         yp2::Package *m_package;
75         
76     };
77     class ZAssocServer : public yazpp_1::Z_Assoc {
78     public:
79         ~ZAssocServer();
80         ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
81                      yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
82                      const yp2::Package *package);
83     private:
84         yazpp_1::IPDU_Observer* sessionNotify(
85             yazpp_1::IPDU_Observable *the_PDU_Observable,
86             int fd);
87         void recv_GDU(Z_GDU *apdu, int len);
88         
89         void failNotify();
90         void timeoutNotify();
91     void connectNotify();
92     private:
93         yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
94         const yp2::Package *m_package;
95     };
96 }
97
98 yp2::ThreadPoolPackage::~ThreadPoolPackage()
99 {
100     delete m_package;
101 }
102
103 void yp2::ThreadPoolPackage::result()
104 {
105     m_session->m_no_requests--;
106
107     yazpp_1::GDU *gdu = &m_package->response();
108     if (gdu->get())
109     {
110         int len;
111         m_session->send_GDU(gdu->get(), &len);
112     }
113     if (m_session->m_no_requests == 0 && m_package->session().is_closed())
114         delete m_session;
115     delete this;
116 }
117
118 yp2::IThreadPoolMsg *yp2::ThreadPoolPackage::handle() 
119 {
120     m_package->move();
121     return this;
122 }
123
124
125 yp2::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
126                                      yp2::ThreadPoolSocketObserver *my_thread_pool,
127                                      const yp2::Package *package)
128     :  Z_Assoc(PDU_Observable)
129 {
130     m_thread_pool_observer = my_thread_pool;
131     m_no_requests = 0;
132     m_delete_flag = false;
133     m_package = package;
134 }
135
136
137 yazpp_1::IPDU_Observer *yp2::ZAssocChild::sessionNotify(yazpp_1::IPDU_Observable
138                                                   *the_PDU_Observable, int fd)
139 {
140     return 0;
141 }
142
143 yp2::ZAssocChild::~ZAssocChild()
144 {
145 }
146
147 void yp2::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
148 {
149     m_no_requests++;
150
151     yp2::Package *p = new yp2::Package(m_session, m_origin);
152
153     yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
154     p->copy_filter(*m_package);
155     p->request() = yazpp_1::GDU(z_pdu);
156     m_thread_pool_observer->put(tp);  
157 }
158
159 void yp2::ZAssocChild::failNotify()
160 {
161     // TODO: send Package to signal "close"
162     if (m_session.is_closed())
163         return;
164     m_no_requests++;
165
166     m_session.close();
167
168     yp2::Package *p = new yp2::Package(m_session, m_origin);
169
170     yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
171     p->copy_filter(*m_package);
172     m_thread_pool_observer->put(tp);  
173 }
174
175 void yp2::ZAssocChild::timeoutNotify()
176 {
177     failNotify();
178 }
179
180 void yp2::ZAssocChild::connectNotify()
181 {
182
183 }
184
185 yp2::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
186                            yp2::ThreadPoolSocketObserver *thread_pool_observer,
187                            const yp2::Package *package)
188     :  Z_Assoc(PDU_Observable)
189 {
190     m_thread_pool_observer = thread_pool_observer;
191     m_package = package;
192
193 }
194
195 yazpp_1::IPDU_Observer *yp2::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
196                                                  *the_PDU_Observable, int fd)
197 {
198     yp2::ZAssocChild *my =
199         new yp2::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
200                              m_package);
201     return my;
202 }
203
204 yp2::ZAssocServer::~ZAssocServer()
205 {
206 }
207
208 void yp2::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
209 {
210 }
211
212 void yp2::ZAssocServer::failNotify()
213 {
214 }
215
216 void yp2::ZAssocServer::timeoutNotify()
217 {
218 }
219
220 void yp2::ZAssocServer::connectNotify()
221 {
222 }
223
224 yp2::filter::FrontendNet::FrontendNet() : m_p(new Rep)
225 {
226     m_p->m_no_threads = 5;
227     m_p->m_listen_duration = 0;
228 }
229
230 yp2::filter::FrontendNet::~FrontendNet()
231 {
232 }
233
234 bool yp2::My_Timer_Thread::timeout()
235 {
236     return m_timeout;
237 }
238
239 yp2::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
240                                  int duration) : 
241     m_obs(obs), m_pipe(9123), m_timeout(false)
242 {
243     obs->addObserver(m_pipe.read_fd(), this);
244     obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
245     obs->timeoutObserver(this, duration);
246 }
247
248 void yp2::My_Timer_Thread::socketNotify(int event)
249 {
250     m_timeout = true;
251     m_obs->deleteObserver(this);
252 }
253
254 void yp2::filter::FrontendNet::process(Package &package) const
255 {
256     if (m_p->m_ports.size() == 0)
257         return;
258
259     yazpp_1::SocketManager mySocketManager;
260
261     My_Timer_Thread *tt = 0;
262     if (m_p->m_listen_duration)
263         tt = new My_Timer_Thread(&mySocketManager, m_p->m_listen_duration);
264
265     ThreadPoolSocketObserver threadPool(&mySocketManager, m_p->m_no_threads);
266
267     yp2::ZAssocServer **az = new yp2::ZAssocServer *[m_p->m_ports.size()];
268
269     // Create yp2::ZAssocServer for each port
270     size_t i;
271     for (i = 0; i<m_p->m_ports.size(); i++)
272     {
273         // create a PDU assoc object (one per yp2::ZAssocServer)
274         yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&mySocketManager);
275
276         // create ZAssoc with PDU Assoc
277         az[i] = new yp2::ZAssocServer(as, &threadPool, &package);
278         az[i]->server(m_p->m_ports[i].c_str());
279     }
280     while (mySocketManager.processEvent() > 0)
281     {
282         if (tt && tt->timeout())
283             break;
284     }
285     for (i = 0; i<m_p->m_ports.size(); i++)
286         delete az[i];
287
288     delete [] az;
289     delete tt;
290 }
291
292 void yp2::filter::FrontendNet::configure(const xmlNode * ptr)
293 {
294     if (!ptr || !ptr->children)
295     {
296         throw yp2::filter::FilterException("No ports for Frontend");
297     }
298     std::vector<std::string> ports;
299     for (ptr = ptr->children; ptr; ptr = ptr->next)
300     {
301         if (ptr->type != XML_ELEMENT_NODE)
302             continue;
303         if (!strcmp((const char *) ptr->name, "port"))
304         {
305             std::string port = yp2::xml::get_text(ptr);
306             ports.push_back(port);
307             
308         }
309         else if (!strcmp((const char *) ptr->name, "threads"))
310         {
311             std::string threads_str = yp2::xml::get_text(ptr);
312             int threads = atoi(threads_str.c_str());
313             if (threads < 1)
314                 throw yp2::filter::FilterException("Bad value for threads: " 
315                                                    + threads_str);
316             m_p->m_no_threads = threads;
317         }
318         else
319         {
320             throw yp2::filter::FilterException("Bad element " 
321                                                + std::string((const char *)
322                                                              ptr->name));
323         }
324     }
325     m_p->m_ports = ports;
326 }
327
328 std::vector<std::string> &yp2::filter::FrontendNet::ports()
329 {
330     return m_p->m_ports;
331 }
332
333 int &yp2::filter::FrontendNet::listen_duration()
334 {
335     return m_p->m_listen_duration;
336 }
337
338 static yp2::filter::Base* filter_creator()
339 {
340     return new yp2::filter::FrontendNet;
341 }
342
343 extern "C" {
344     struct yp2_filter_struct yp2_filter_frontend_net = {
345         0,
346         "frontend_net",
347         filter_creator
348     };
349 }
350
351 /*
352  * Local variables:
353  * c-basic-offset: 4
354  * indent-tabs-mode: nil
355  * c-file-style: "stroustrup"
356  * End:
357  * vim: shiftwidth=4 tabstop=8 expandtab
358  */