Beginnings of graceful stop
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2012 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include <sstream>
22 #include <iomanip>
23 #include <metaproxy/util.hpp>
24 #include "pipe.hpp"
25 #include <metaproxy/filter.hpp>
26 #include <metaproxy/package.hpp>
27 #include "thread_pool_observer.hpp"
28 #include "filter_frontend_net.hpp"
29 #include <yazpp/z-assoc.h>
30 #include <yazpp/pdu-assoc.h>
31 #include <yazpp/socket-manager.h>
32 #include <yazpp/limit-connect.h>
33 #include <yaz/timing.h>
34 #include <yaz/log.h>
35
36 #include <iostream>
37
38 namespace mp = metaproxy_1;
39
40 namespace metaproxy_1 {
41     class My_Timer_Thread;
42     class ZAssocServer;
43     namespace filter {
44         class FrontendNet::Port {
45             friend class Rep;
46             friend class FrontendNet;
47             std::string port;
48             std::string route;
49         };
50         class FrontendNet::Rep {
51             friend class FrontendNet;
52             int m_no_threads;
53             std::vector<Port> m_ports;
54             int m_listen_duration;
55             int m_session_timeout;
56             int m_connect_max;
57             std::string m_msg_config;
58             yazpp_1::SocketManager mySocketManager;
59             ZAssocServer **az;
60         };
61     }
62     class My_Timer_Thread : public yazpp_1::ISocketObserver {
63     private:
64         yazpp_1::ISocketObservable *m_obs;
65         Pipe m_pipe;
66         bool m_timeout;
67     public:
68         My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
69         void socketNotify(int event);
70         bool timeout();
71     };
72     class ZAssocChild : public yazpp_1::Z_Assoc {
73     public:
74         ~ZAssocChild();
75         ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
76                     mp::ThreadPoolSocketObserver *m_thread_pool_observer,
77                     const mp::Package *package,
78                     std::string route,
79                     const char *msg_config);
80         int m_no_requests;
81         std::string m_route;
82     private:
83         yazpp_1::IPDU_Observer* sessionNotify(
84             yazpp_1::IPDU_Observable *the_PDU_Observable,
85             int fd);
86         void recv_GDU(Z_GDU *apdu, int len);
87         
88         void failNotify();
89         void timeoutNotify();
90         void connectNotify();
91     private:
92         mp::ThreadPoolSocketObserver *m_thread_pool_observer;
93         mp::Session m_session;
94         mp::Origin m_origin;
95         bool m_delete_flag;
96         const mp::Package *m_package;
97         const char *m_msg_config;
98     };
99     class ThreadPoolPackage : public mp::IThreadPoolMsg {
100     public:
101         ThreadPoolPackage(mp::Package *package, mp::ZAssocChild *ses,
102             const char *msg_config);
103         ~ThreadPoolPackage();
104         IThreadPoolMsg *handle();
105         void result(const char *t_info);
106         
107     private:
108         yaz_timing_t timer;
109         mp::ZAssocChild *m_assoc_child;
110         mp::Package *m_package;
111         const char *m_msg_config;        
112     };
113     class ZAssocServer : public yazpp_1::Z_Assoc {
114     public:
115         ~ZAssocServer();
116         ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable, int timeout,
117                      int connect_max, std::string route,
118                      const char *msg_config);
119         void set_package(const mp::Package *package);
120         void set_thread_pool(ThreadPoolSocketObserver *m_thread_pool_observer);
121     private:
122         yazpp_1::IPDU_Observer* sessionNotify(
123             yazpp_1::IPDU_Observable *the_PDU_Observable,
124             int fd);
125         void recv_GDU(Z_GDU *apdu, int len);
126         
127         void failNotify();
128         void timeoutNotify();
129     void connectNotify();
130     private:
131         mp::ThreadPoolSocketObserver *m_thread_pool_observer;
132         const mp::Package *m_package;
133         int m_session_timeout;
134         int m_connect_max;
135         yazpp_1::LimitConnect limit_connect;
136         std::string m_route;
137         const char *m_msg_config;
138     };
139 }
140
141 mp::ThreadPoolPackage::ThreadPoolPackage(mp::Package *package,
142                                          mp::ZAssocChild *ses,
143                                          const char *msg_config) :
144     m_assoc_child(ses), m_package(package), m_msg_config(msg_config)
145 {
146     if (msg_config)
147         timer = yaz_timing_create();
148     else
149         timer = 0;
150 }
151
152 mp::ThreadPoolPackage::~ThreadPoolPackage()
153 {
154     yaz_timing_destroy(&timer); // timer may be NULL
155     delete m_package;
156 }
157
158 void mp::ThreadPoolPackage::result(const char *t_info)
159 {
160     m_assoc_child->m_no_requests--;
161
162     yazpp_1::GDU *gdu = &m_package->response();
163
164     if (gdu->get())
165     {
166         int len;
167         m_assoc_child->send_GDU(gdu->get(), &len);
168     }
169     else if (!m_package->session().is_closed())
170     {
171         // no response package and yet the session is still open..
172         // means that request is unhandled..
173         yazpp_1::GDU *gdu_req = &m_package->request();
174         Z_GDU *z_gdu = gdu_req->get();
175         if (z_gdu && z_gdu->which == Z_GDU_Z3950)
176         {
177             // For Z39.50, response with a Close and shutdown
178             mp::odr odr;
179             int len;
180             Z_APDU *apdu_response = odr.create_close(
181                 z_gdu->u.z3950, Z_Close_systemProblem, 
182                 "unhandled Z39.50 request");
183
184             m_assoc_child->send_Z_PDU(apdu_response, &len);
185         }
186         else if (z_gdu && z_gdu->which == Z_GDU_HTTP_Request)
187         {
188             // For HTTP, respond with Server Error
189             int len;
190             mp::odr odr;
191             Z_GDU *zgdu_res 
192                 = odr.create_HTTP_Response(m_package->session(), 
193                                            z_gdu->u.HTTP_Request, 500);
194             m_assoc_child->send_GDU(zgdu_res, &len);
195         }
196         m_package->session().close();
197     }
198
199     if (m_assoc_child->m_no_requests == 0 && m_package->session().is_closed())
200     {
201         m_assoc_child->close();
202     }
203
204     if (m_msg_config)
205     {
206         yaz_timing_stop(timer);
207         double duration = yaz_timing_get_real(timer);
208
209         std::ostringstream os;
210         os  << m_msg_config << " "
211             << *m_package << " "
212             << std::fixed << std::setprecision (6) << duration;
213         
214         yaz_log(YLOG_LOG, "%s %s", os.str().c_str(), t_info);
215     }
216
217     delete this;
218 }
219
220 mp::IThreadPoolMsg *mp::ThreadPoolPackage::handle() 
221 {
222     m_package->move(m_assoc_child->m_route);
223     return this;
224 }
225
226
227 mp::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
228                              mp::ThreadPoolSocketObserver *my_thread_pool,
229                              const mp::Package *package,
230                              std::string route,
231                              const char *msg_config)
232     :  Z_Assoc(PDU_Observable), m_msg_config(msg_config)
233 {
234     m_thread_pool_observer = my_thread_pool;
235     m_no_requests = 0;
236     m_delete_flag = false;
237     m_package = package;
238     m_route = route;
239     const char *peername = PDU_Observable->getpeername();
240     if (!peername)
241         peername = "unknown";
242     m_origin.set_tcpip_address(std::string(peername), m_session.id());
243 }
244
245
246 yazpp_1::IPDU_Observer *mp::ZAssocChild::sessionNotify(yazpp_1::IPDU_Observable
247                                                   *the_PDU_Observable, int fd)
248 {
249     return 0;
250 }
251
252 mp::ZAssocChild::~ZAssocChild()
253 {
254 }
255
256 void mp::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
257 {
258     m_no_requests++;
259
260     mp::Package *p = new mp::Package(m_session, m_origin);
261
262     mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this,
263                                                           m_msg_config);
264     p->copy_route(*m_package);
265     p->request() = yazpp_1::GDU(z_pdu);
266     m_thread_pool_observer->put(tp);  
267 }
268
269 void mp::ZAssocChild::failNotify()
270 {
271     // TODO: send Package to signal "close"
272     if (m_session.is_closed())
273     {
274         if (m_no_requests == 0)
275             delete this;
276         return;
277     }
278     m_no_requests++;
279
280     m_session.close();
281
282     mp::Package *p = new mp::Package(m_session, m_origin);
283
284     mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this,
285                                                           m_msg_config);
286     p->copy_route(*m_package);
287     m_thread_pool_observer->put(tp);  
288 }
289
290 void mp::ZAssocChild::timeoutNotify()
291 {
292     failNotify();
293 }
294
295 void mp::ZAssocChild::connectNotify()
296 {
297
298 }
299
300 mp::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
301                                int timeout, int connect_max,
302                                std::string route, const char *msg_config)
303     :  Z_Assoc(PDU_Observable), m_session_timeout(timeout),
304        m_connect_max(connect_max), m_route(route), m_msg_config(msg_config)
305 {
306     m_package = 0;
307 }
308
309
310 void mp::ZAssocServer::set_package(const mp::Package *package)
311 {
312     m_package = package;
313 }
314
315 void mp::ZAssocServer::set_thread_pool(ThreadPoolSocketObserver *observer)
316 {
317     m_thread_pool_observer = observer;
318 }
319
320 yazpp_1::IPDU_Observer *mp::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
321                                                  *the_PDU_Observable, int fd)
322 {
323
324     const char *peername = the_PDU_Observable->getpeername();
325     if (peername)
326     {
327         limit_connect.add_connect(peername);
328         limit_connect.cleanup(false);
329         int con_sz = limit_connect.get_total(peername);
330         if (m_connect_max && con_sz > m_connect_max)
331             return 0;
332     }
333     mp::ZAssocChild *my =
334         new mp::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
335                             m_package, m_route, m_msg_config);
336     my->timeout(m_session_timeout);
337     return my;
338 }
339
340 mp::ZAssocServer::~ZAssocServer()
341 {
342 }
343
344 void mp::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
345 {
346 }
347
348 void mp::ZAssocServer::failNotify()
349 {
350 }
351
352 void mp::ZAssocServer::timeoutNotify()
353 {
354 }
355
356 void mp::ZAssocServer::connectNotify()
357 {
358 }
359
360 mp::filter::FrontendNet::FrontendNet() : m_p(new Rep)
361 {
362     m_p->m_no_threads = 5;
363     m_p->m_listen_duration = 0;
364     m_p->m_session_timeout = 300; // 5 minutes
365     m_p->m_connect_max = 0;
366     m_p->az = 0;
367 }
368
369 mp::filter::FrontendNet::~FrontendNet()
370 {
371     if (m_p->az)
372     {
373         size_t i;
374         for (i = 0; i<m_p->m_ports.size(); i++)
375             delete m_p->az[i];
376         delete [] m_p->az;
377     }
378     m_p->az = 0;
379 }
380
381 void mp::filter::FrontendNet::stop() const
382 {
383     if (m_p->az)
384     {
385         size_t i;
386         for (i = 0; i<m_p->m_ports.size(); i++)
387             m_p->az[i]->server("");
388     }
389 }
390
391 bool mp::My_Timer_Thread::timeout()
392 {
393     return m_timeout;
394 }
395
396 mp::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
397                                  int duration) : 
398     m_obs(obs), m_pipe(9123), m_timeout(false)
399 {
400     obs->addObserver(m_pipe.read_fd(), this);
401     obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
402     obs->timeoutObserver(this, duration);
403 }
404
405 void mp::My_Timer_Thread::socketNotify(int event)
406 {
407     m_timeout = true;
408     m_obs->deleteObserver(this);
409 }
410
411 void mp::filter::FrontendNet::process(Package &package) const
412 {
413     if (m_p->az == 0)
414         return;
415     size_t i;
416     My_Timer_Thread *tt = 0;
417
418     if (m_p->m_listen_duration)
419         tt = new My_Timer_Thread(&m_p->mySocketManager,
420                                  m_p->m_listen_duration);
421     
422     ThreadPoolSocketObserver tp(&m_p->mySocketManager, m_p->m_no_threads);
423
424     for (i = 0; i<m_p->m_ports.size(); i++)
425     {
426         m_p->az[i]->set_package(&package);
427         m_p->az[i]->set_thread_pool(&tp);
428     }
429     while (m_p->mySocketManager.processEvent() > 0)
430     {
431         if (tt && tt->timeout())
432             break;
433     }
434     delete tt;
435 }
436
437 void mp::filter::FrontendNet::configure(const xmlNode * ptr, bool test_only,
438                                         const char *path)
439 {
440     if (!ptr || !ptr->children)
441     {
442         throw mp::filter::FilterException("No ports for Frontend");
443     }
444     std::vector<Port> ports;
445     for (ptr = ptr->children; ptr; ptr = ptr->next)
446     {
447         if (ptr->type != XML_ELEMENT_NODE)
448             continue;
449         if (!strcmp((const char *) ptr->name, "port"))
450         {
451             Port port;
452             const struct _xmlAttr *attr;
453             for (attr = ptr->properties; attr; attr = attr->next)
454             {
455                 if (!strcmp((const char *) attr->name, "route"))
456                     port.route = mp::xml::get_text(attr);
457             }
458             port.port = mp::xml::get_text(ptr);
459             ports.push_back(port);
460             
461         }
462         else if (!strcmp((const char *) ptr->name, "threads"))
463         {
464             std::string threads_str = mp::xml::get_text(ptr);
465             int threads = atoi(threads_str.c_str());
466             if (threads < 1)
467                 throw mp::filter::FilterException("Bad value for threads: " 
468                                                    + threads_str);
469             m_p->m_no_threads = threads;
470         }
471         else if (!strcmp((const char *) ptr->name, "timeout"))
472         {
473             std::string timeout_str = mp::xml::get_text(ptr);
474             int timeout = atoi(timeout_str.c_str());
475             if (timeout < 1)
476                 throw mp::filter::FilterException("Bad value for timeout: " 
477                                                    + timeout_str);
478             m_p->m_session_timeout = timeout;
479         }
480         else if (!strcmp((const char *) ptr->name, "connect-max"))
481         {
482             m_p->m_connect_max = mp::xml::get_int(ptr, 0);
483         }
484         else if (!strcmp((const char *) ptr->name, "message"))
485         {
486             m_p->m_msg_config = mp::xml::get_text(ptr);
487         }
488         else
489         {
490             throw mp::filter::FilterException("Bad element " 
491                                                + std::string((const char *)
492                                                              ptr->name));
493         }
494     }
495     if (test_only)
496         return;
497     set_ports(ports);
498 }
499
500 void mp::filter::FrontendNet::set_ports(std::vector<std::string> &ports)
501 {
502     std::vector<Port> nports;
503     size_t i;
504
505     for (i = 0; i < ports.size(); i++)
506     {
507         Port nport;
508
509         nport.port = ports[i];
510
511         nports.push_back(nport);
512     }
513     set_ports(nports);
514 }
515
516
517 void mp::filter::FrontendNet::set_ports(std::vector<Port> &ports)
518 {
519     m_p->m_ports = ports;
520     
521     m_p->az = new mp::ZAssocServer *[m_p->m_ports.size()];
522     
523     // Create mp::ZAssocServer for each port
524     size_t i;
525     for (i = 0; i<m_p->m_ports.size(); i++)
526     {
527         // create a PDU assoc object (one per mp::ZAssocServer)
528         yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&m_p->mySocketManager);
529         
530         // create ZAssoc with PDU Assoc
531         m_p->az[i] = new mp::ZAssocServer(as, 
532                                           m_p->m_session_timeout,
533                                           m_p->m_connect_max,
534                                           m_p->m_ports[i].route,
535                                           m_p->m_msg_config.length() > 0 ?
536                                           m_p->m_msg_config.c_str() : 0);
537         if (m_p->az[i]->server(m_p->m_ports[i].port.c_str()))
538         {
539             throw mp::filter::FilterException("Unable to bind to address " 
540                                               + std::string(m_p->m_ports[i].port));
541         }
542     }
543 }
544
545 void mp::filter::FrontendNet::set_listen_duration(int d)
546 {
547     m_p->m_listen_duration = d;
548 }
549
550 static mp::filter::Base* filter_creator()
551 {
552     return new mp::filter::FrontendNet;
553 }
554
555 extern "C" {
556     struct metaproxy_1_filter_struct metaproxy_1_filter_frontend_net = {
557         0,
558         "frontend_net",
559         filter_creator
560     };
561 }
562
563 /*
564  * Local variables:
565  * c-basic-offset: 4
566  * c-file-style: "Stroustrup"
567  * indent-tabs-mode: nil
568  * End:
569  * vim: shiftwidth=4 tabstop=8 expandtab
570  */
571