Added 'timeout' setting for filter frontend_net which specifies the number
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
1 /* $Id: filter_frontend_net.cpp,v 1.22 2006-11-17 14:03:46 adam Exp $
2    Copyright (c) 2005-2006, Index Data.
3
4    See the LICENSE file for details
5  */
6
7 #include "config.hpp"
8
9 #include "util.hpp"
10 #include "pipe.hpp"
11 #include "filter.hpp"
12 #include "package.hpp"
13 #include "thread_pool_observer.hpp"
14 #include "filter_frontend_net.hpp"
15 #include <yazpp/z-assoc.h>
16 #include <yazpp/pdu-assoc.h>
17 #include <yazpp/socket-manager.h>
18 #include <yaz/log.h>
19
20 #include <iostream>
21
22 namespace mp = metaproxy_1;
23
24 namespace metaproxy_1 {
25     namespace filter {
26         class FrontendNet::Rep {
27             friend class FrontendNet;
28             int m_no_threads;
29             std::vector<std::string> m_ports;
30             int m_listen_duration;
31             int m_session_timeout;
32         };
33     }
34     class My_Timer_Thread : public yazpp_1::ISocketObserver {
35     private:
36         yazpp_1::ISocketObservable *m_obs;
37         Pipe m_pipe;
38         bool m_timeout;
39     public:
40         My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
41         void socketNotify(int event);
42         bool timeout();
43     };
44     class ZAssocChild : public yazpp_1::Z_Assoc {
45     public:
46         ~ZAssocChild();
47         ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
48                           mp::ThreadPoolSocketObserver *m_thread_pool_observer,
49                           const mp::Package *package);
50         int m_no_requests;
51     private:
52         yazpp_1::IPDU_Observer* sessionNotify(
53             yazpp_1::IPDU_Observable *the_PDU_Observable,
54             int fd);
55         void recv_GDU(Z_GDU *apdu, int len);
56         
57         void failNotify();
58         void timeoutNotify();
59         void connectNotify();
60     private:
61         mp::ThreadPoolSocketObserver *m_thread_pool_observer;
62         mp::Session m_session;
63         mp::Origin m_origin;
64         bool m_delete_flag;
65         const mp::Package *m_package;
66     };
67     class ThreadPoolPackage : public mp::IThreadPoolMsg {
68     public:
69         ThreadPoolPackage(mp::Package *package, mp::ZAssocChild *ses) :
70             m_session(ses), m_package(package) { };
71         ~ThreadPoolPackage();
72         IThreadPoolMsg *handle();
73         void result();
74         
75     private:
76         mp::ZAssocChild *m_session;
77         mp::Package *m_package;
78         
79     };
80     class ZAssocServer : public yazpp_1::Z_Assoc {
81     public:
82         ~ZAssocServer();
83         ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
84                      mp::ThreadPoolSocketObserver *m_thread_pool_observer,
85                      const mp::Package *package,
86                      int timeout);
87     private:
88         yazpp_1::IPDU_Observer* sessionNotify(
89             yazpp_1::IPDU_Observable *the_PDU_Observable,
90             int fd);
91         void recv_GDU(Z_GDU *apdu, int len);
92         
93         void failNotify();
94         void timeoutNotify();
95     void connectNotify();
96     private:
97         mp::ThreadPoolSocketObserver *m_thread_pool_observer;
98         const mp::Package *m_package;
99         int m_session_timeout;
100     };
101 }
102
103 mp::ThreadPoolPackage::~ThreadPoolPackage()
104 {
105     delete m_package;
106 }
107
108 void mp::ThreadPoolPackage::result()
109 {
110     m_session->m_no_requests--;
111
112     yazpp_1::GDU *gdu = &m_package->response();
113
114     if (gdu->get())
115     {
116         int len;
117         m_session->send_GDU(gdu->get(), &len);
118     }
119     else if (!m_package->session().is_closed())
120     {
121         // no response package and yet the session is still open..
122         // means that request is unhandled..
123         yazpp_1::GDU *gdu_req = &m_package->request();
124         Z_GDU *z_gdu = gdu_req->get();
125         if (z_gdu && z_gdu->which == Z_GDU_Z3950)
126         {
127             // For Z39.50, response with a Close and shutdown
128             mp::odr odr;
129             int len;
130             Z_APDU *apdu_response = odr.create_close(
131                 z_gdu->u.z3950, Z_Close_systemProblem, 
132                 "unhandled Z39.50 request");
133
134             m_session->send_Z_PDU(apdu_response, &len);
135             m_package->session().close();
136         }
137     }
138
139     if (m_session->m_no_requests == 0 && m_package->session().is_closed())
140         delete m_session;
141     delete this;
142 }
143
144 mp::IThreadPoolMsg *mp::ThreadPoolPackage::handle() 
145 {
146     m_package->move();
147     return this;
148 }
149
150
151 mp::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
152                                      mp::ThreadPoolSocketObserver *my_thread_pool,
153                                      const mp::Package *package)
154     :  Z_Assoc(PDU_Observable)
155 {
156     m_thread_pool_observer = my_thread_pool;
157     m_no_requests = 0;
158     m_delete_flag = false;
159     m_package = package;
160     const char *peername = PDU_Observable->getpeername();
161     if (!peername)
162         peername = "unknown";
163     m_origin.set_tcpip_address(std::string(peername), m_session.id());
164 }
165
166
167 yazpp_1::IPDU_Observer *mp::ZAssocChild::sessionNotify(yazpp_1::IPDU_Observable
168                                                   *the_PDU_Observable, int fd)
169 {
170     return 0;
171 }
172
173 mp::ZAssocChild::~ZAssocChild()
174 {
175 }
176
177 void mp::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
178 {
179     m_no_requests++;
180
181     mp::Package *p = new mp::Package(m_session, m_origin);
182
183     mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this);
184     p->copy_filter(*m_package);
185     p->request() = yazpp_1::GDU(z_pdu);
186     m_thread_pool_observer->put(tp);  
187 }
188
189 void mp::ZAssocChild::failNotify()
190 {
191     // TODO: send Package to signal "close"
192     if (m_session.is_closed())
193         return;
194     m_no_requests++;
195
196     m_session.close();
197
198     mp::Package *p = new mp::Package(m_session, m_origin);
199
200     mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this);
201     p->copy_filter(*m_package);
202     m_thread_pool_observer->put(tp);  
203 }
204
205 void mp::ZAssocChild::timeoutNotify()
206 {
207     failNotify();
208 }
209
210 void mp::ZAssocChild::connectNotify()
211 {
212
213 }
214
215 mp::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
216                                mp::ThreadPoolSocketObserver *thread_pool_observer,
217                                const mp::Package *package,
218                                int timeout)
219     :  Z_Assoc(PDU_Observable), m_session_timeout(timeout)
220 {
221     m_thread_pool_observer = thread_pool_observer;
222     m_package = package;
223
224 }
225
226 yazpp_1::IPDU_Observer *mp::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
227                                                  *the_PDU_Observable, int fd)
228 {
229     mp::ZAssocChild *my =
230         new mp::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
231                              m_package);
232     my->timeout(m_session_timeout);
233     return my;
234 }
235
236 mp::ZAssocServer::~ZAssocServer()
237 {
238 }
239
240 void mp::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
241 {
242 }
243
244 void mp::ZAssocServer::failNotify()
245 {
246 }
247
248 void mp::ZAssocServer::timeoutNotify()
249 {
250 }
251
252 void mp::ZAssocServer::connectNotify()
253 {
254 }
255
256 mp::filter::FrontendNet::FrontendNet() : m_p(new Rep)
257 {
258     m_p->m_no_threads = 5;
259     m_p->m_listen_duration = 0;
260     m_p->m_session_timeout = 300; // 5 minutes
261 }
262
263 mp::filter::FrontendNet::~FrontendNet()
264 {
265 }
266
267 bool mp::My_Timer_Thread::timeout()
268 {
269     return m_timeout;
270 }
271
272 mp::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
273                                  int duration) : 
274     m_obs(obs), m_pipe(9123), m_timeout(false)
275 {
276     obs->addObserver(m_pipe.read_fd(), this);
277     obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
278     obs->timeoutObserver(this, duration);
279 }
280
281 void mp::My_Timer_Thread::socketNotify(int event)
282 {
283     m_timeout = true;
284     m_obs->deleteObserver(this);
285 }
286
287 void mp::filter::FrontendNet::process(Package &package) const
288 {
289     if (m_p->m_ports.size() == 0)
290         return;
291
292     yazpp_1::SocketManager mySocketManager;
293
294     My_Timer_Thread *tt = 0;
295     if (m_p->m_listen_duration)
296         tt = new My_Timer_Thread(&mySocketManager, m_p->m_listen_duration);
297
298     ThreadPoolSocketObserver threadPool(&mySocketManager, m_p->m_no_threads);
299
300     mp::ZAssocServer **az = new mp::ZAssocServer *[m_p->m_ports.size()];
301
302     // Create mp::ZAssocServer for each port
303     size_t i;
304     for (i = 0; i<m_p->m_ports.size(); i++)
305     {
306         // create a PDU assoc object (one per mp::ZAssocServer)
307         yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&mySocketManager);
308
309         // create ZAssoc with PDU Assoc
310         az[i] = new mp::ZAssocServer(as, &threadPool, &package,
311                                      m_p->m_session_timeout);
312         az[i]->server(m_p->m_ports[i].c_str());
313     }
314     while (mySocketManager.processEvent() > 0)
315     {
316         if (tt && tt->timeout())
317             break;
318     }
319     for (i = 0; i<m_p->m_ports.size(); i++)
320         delete az[i];
321
322     delete [] az;
323     delete tt;
324 }
325
326 void mp::filter::FrontendNet::configure(const xmlNode * ptr)
327 {
328     if (!ptr || !ptr->children)
329     {
330         throw mp::filter::FilterException("No ports for Frontend");
331     }
332     std::vector<std::string> ports;
333     for (ptr = ptr->children; ptr; ptr = ptr->next)
334     {
335         if (ptr->type != XML_ELEMENT_NODE)
336             continue;
337         if (!strcmp((const char *) ptr->name, "port"))
338         {
339             std::string port = mp::xml::get_text(ptr);
340             ports.push_back(port);
341             
342         }
343         else if (!strcmp((const char *) ptr->name, "threads"))
344         {
345             std::string threads_str = mp::xml::get_text(ptr);
346             int threads = atoi(threads_str.c_str());
347             if (threads < 1)
348                 throw mp::filter::FilterException("Bad value for threads: " 
349                                                    + threads_str);
350             m_p->m_no_threads = threads;
351         }
352         else if (!strcmp((const char *) ptr->name, "timeout"))
353         {
354             std::string timeout_str = mp::xml::get_text(ptr);
355             int timeout = atoi(timeout_str.c_str());
356             if (timeout < 1)
357                 throw mp::filter::FilterException("Bad value for timeout: " 
358                                                    + timeout_str);
359             m_p->m_session_timeout = timeout;
360         }
361         else
362         {
363             throw mp::filter::FilterException("Bad element " 
364                                                + std::string((const char *)
365                                                              ptr->name));
366         }
367     }
368     m_p->m_ports = ports;
369 }
370
371 std::vector<std::string> &mp::filter::FrontendNet::ports()
372 {
373     return m_p->m_ports;
374 }
375
376 int &mp::filter::FrontendNet::listen_duration()
377 {
378     return m_p->m_listen_duration;
379 }
380
381 static mp::filter::Base* filter_creator()
382 {
383     return new mp::filter::FrontendNet;
384 }
385
386 extern "C" {
387     struct metaproxy_1_filter_struct metaproxy_1_filter_frontend_net = {
388         0,
389         "frontend_net",
390         filter_creator
391     };
392 }
393
394 /*
395  * Local variables:
396  * c-basic-offset: 4
397  * indent-tabs-mode: nil
398  * c-file-style: "stroustrup"
399  * End:
400  * vim: shiftwidth=4 tabstop=8 expandtab
401  */