frontend_net: refactor and use Rep class in helpers
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2012 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include <sstream>
22 #include <iomanip>
23 #include <metaproxy/util.hpp>
24 #include "pipe.hpp"
25 #include <metaproxy/filter.hpp>
26 #include <metaproxy/package.hpp>
27 #include "thread_pool_observer.hpp"
28 #include "filter_frontend_net.hpp"
29 #include <yazpp/z-assoc.h>
30 #include <yazpp/pdu-assoc.h>
31 #include <yazpp/socket-manager.h>
32 #include <yazpp/limit-connect.h>
33 #include <yaz/timing.h>
34 #include <yaz/log.h>
35 #include "gduutil.hpp"
36
37 #include <iostream>
38
39 namespace mp = metaproxy_1;
40 namespace yf = metaproxy_1::filter;
41
42 namespace metaproxy_1 {
43     namespace filter {
44         class FrontendNet::Port {
45             friend class Rep;
46             friend class FrontendNet;
47             std::string port;
48             std::string route;
49         };
50         class FrontendNet::Rep {
51             friend class FrontendNet;
52             int m_no_threads;
53             std::vector<Port> m_ports;
54             int m_listen_duration;
55             int m_session_timeout;
56             int m_connect_max;
57             std::string m_msg_config;
58             yazpp_1::SocketManager mySocketManager;
59             ZAssocServer **az;
60         };
61         class FrontendNet::My_Timer_Thread : public yazpp_1::ISocketObserver {
62         private:
63             yazpp_1::ISocketObservable *m_obs;
64             Pipe m_pipe;
65             bool m_timeout;
66         public:
67             My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
68             void socketNotify(int event);
69             bool timeout();
70         };
71         class FrontendNet::ZAssocChild : public yazpp_1::Z_Assoc {
72         public:
73             ~ZAssocChild();
74             ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
75                         mp::ThreadPoolSocketObserver *m_thread_pool_observer,
76                         const mp::Package *package,
77                         std::string route,
78                         Rep *rep);
79             int m_no_requests;
80             std::string m_route;
81         private:
82             yazpp_1::IPDU_Observer* sessionNotify(
83                 yazpp_1::IPDU_Observable *the_PDU_Observable,
84                 int fd);
85             void recv_GDU(Z_GDU *apdu, int len);
86             
87             void failNotify();
88             void timeoutNotify();
89             void connectNotify();
90         private:
91             mp::ThreadPoolSocketObserver *m_thread_pool_observer;
92             mp::Session m_session;
93             mp::Origin m_origin;
94             bool m_delete_flag;
95             const mp::Package *m_package;
96             Rep *m_p;
97         };
98         class FrontendNet::ThreadPoolPackage : public mp::IThreadPoolMsg {
99         public:
100             ThreadPoolPackage(mp::Package *package,
101                               yf::FrontendNet::ZAssocChild *ses,
102                               Rep *rep);
103             ~ThreadPoolPackage();
104             IThreadPoolMsg *handle();
105             void result(const char *t_info);
106             bool cleanup(void *info);
107         private:
108             yaz_timing_t timer;
109             ZAssocChild *m_assoc_child;
110             mp::Package *m_package;
111             Rep *m_p;
112         }; 
113         class FrontendNet::ZAssocServer : public yazpp_1::Z_Assoc {
114         public:
115             ~ZAssocServer();
116             ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
117                          std::string route,
118                          Rep *rep);
119             void set_package(const mp::Package *package);
120             void set_thread_pool(ThreadPoolSocketObserver *observer);
121         private:
122             yazpp_1::IPDU_Observer* sessionNotify(
123                 yazpp_1::IPDU_Observable *the_PDU_Observable,
124                 int fd);
125             void recv_GDU(Z_GDU *apdu, int len);
126             
127             void failNotify();
128             void timeoutNotify();
129             void connectNotify();
130         private:
131             mp::ThreadPoolSocketObserver *m_thread_pool_observer;
132             const mp::Package *m_package;
133             yazpp_1::LimitConnect limit_connect;
134             std::string m_route;
135             Rep *m_p;
136         };
137     }
138 }
139
140 yf::FrontendNet::ThreadPoolPackage::ThreadPoolPackage(mp::Package *package,
141                                                       ZAssocChild *ses,
142                                                       Rep *rep) :
143     m_assoc_child(ses), m_package(package), m_p(rep)
144 {
145     timer = yaz_timing_create();
146 }
147
148 yf::FrontendNet::ThreadPoolPackage::~ThreadPoolPackage()
149 {
150     yaz_timing_destroy(&timer); // timer may be NULL
151     delete m_package;
152 }
153
154 bool yf::FrontendNet::ThreadPoolPackage::cleanup(void *info)
155 {
156     mp::Session *ses = (mp::Session *) info;
157
158     return *ses == m_package->session();
159 }
160
161 void yf::FrontendNet::ThreadPoolPackage::result(const char *t_info)
162 {
163     m_assoc_child->m_no_requests--;
164
165     yazpp_1::GDU *gdu = &m_package->response();
166
167     if (gdu->get())
168     {
169         int len;
170         m_assoc_child->send_GDU(gdu->get(), &len);
171     }
172     else if (!m_package->session().is_closed())
173     {
174         // no response package and yet the session is still open..
175         // means that request is unhandled..
176         yazpp_1::GDU *gdu_req = &m_package->request();
177         Z_GDU *z_gdu = gdu_req->get();
178         if (z_gdu && z_gdu->which == Z_GDU_Z3950)
179         {
180             // For Z39.50, response with a Close and shutdown
181             mp::odr odr;
182             int len;
183             Z_APDU *apdu_response = odr.create_close(
184                 z_gdu->u.z3950, Z_Close_systemProblem, 
185                 "unhandled Z39.50 request");
186
187             m_assoc_child->send_Z_PDU(apdu_response, &len);
188         }
189         else if (z_gdu && z_gdu->which == Z_GDU_HTTP_Request)
190         {
191             // For HTTP, respond with Server Error
192             int len;
193             mp::odr odr;
194             Z_GDU *zgdu_res 
195                 = odr.create_HTTP_Response(m_package->session(), 
196                                            z_gdu->u.HTTP_Request, 500);
197             m_assoc_child->send_GDU(zgdu_res, &len);
198         }
199         m_package->session().close();
200     }
201
202     if (m_assoc_child->m_no_requests == 0 && m_package->session().is_closed())
203     {
204         m_assoc_child->close();
205     }
206
207     if (m_p->m_msg_config.length())
208     {
209         yaz_timing_stop(timer);
210         double duration = yaz_timing_get_real(timer);
211         Z_GDU *z_gdu = gdu->get();
212
213         std::ostringstream os;
214         os  << m_p->m_msg_config << " "
215             << *m_package << " "
216             << std::fixed << std::setprecision (6) << duration << " ";
217
218         if (z_gdu) 
219             os << *z_gdu;
220         else
221             os << "-";
222         
223         yaz_log(YLOG_LOG, "%s %s", os.str().c_str(), t_info);
224     }
225
226     delete this;
227 }
228
229 mp::IThreadPoolMsg *yf::FrontendNet::ThreadPoolPackage::handle() 
230 {
231     m_package->move(m_assoc_child->m_route);
232     return this;
233 }
234
235 yf::FrontendNet::ZAssocChild::ZAssocChild(
236     yazpp_1::IPDU_Observable *PDU_Observable,
237     mp::ThreadPoolSocketObserver *my_thread_pool,
238     const mp::Package *package,
239     std::string route, Rep *rep)
240     :  Z_Assoc(PDU_Observable), m_p(rep)
241 {
242     m_thread_pool_observer = my_thread_pool;
243     m_no_requests = 0;
244     m_delete_flag = false;
245     m_package = package;
246     m_route = route;
247     const char *peername = PDU_Observable->getpeername();
248     if (!peername)
249         peername = "unknown";
250     m_origin.set_tcpip_address(std::string(peername), m_session.id());
251     timeout(m_p->m_session_timeout);
252 }
253
254 yazpp_1::IPDU_Observer *yf::FrontendNet::ZAssocChild::sessionNotify(
255     yazpp_1::IPDU_Observable *the_PDU_Observable, int fd)
256 {
257     return 0;
258 }
259
260 yf::FrontendNet::ZAssocChild::~ZAssocChild()
261 {
262 }
263
264 void yf::FrontendNet::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
265 {
266     m_no_requests++;
267
268     mp::Package *p = new mp::Package(m_session, m_origin);
269
270     ThreadPoolPackage *tp = new ThreadPoolPackage(p, this, m_p);
271     p->copy_route(*m_package);
272     p->request() = yazpp_1::GDU(z_pdu);
273
274     if (m_p->m_msg_config.length())
275     {
276         if (z_pdu)          
277         {
278             std::ostringstream os;
279             os  << m_p->m_msg_config << " "
280                 << *p << " "
281                 << "0.000000" << " " 
282                 << *z_pdu;
283             yaz_log(YLOG_LOG, "%s", os.str().c_str());
284         }
285     }
286     m_thread_pool_observer->put(tp);  
287 }
288
289 void yf::FrontendNet::ZAssocChild::failNotify()
290 {
291     // TODO: send Package to signal "close"
292     if (m_session.is_closed())
293     {
294         if (m_no_requests == 0)
295             delete this;
296         return;
297     }
298     m_no_requests++;
299
300     m_session.close();
301
302     mp::Package *p = new mp::Package(m_session, m_origin);
303
304     ThreadPoolPackage *tp = new ThreadPoolPackage(p, this, m_p);
305     p->copy_route(*m_package);
306     m_thread_pool_observer->cleanup(tp, &m_session);
307     m_thread_pool_observer->put(tp);
308 }
309
310 void yf::FrontendNet::ZAssocChild::timeoutNotify()
311 {
312     failNotify();
313 }
314
315 void yf::FrontendNet::ZAssocChild::connectNotify()
316 {
317
318 }
319
320 yf::FrontendNet::ZAssocServer::ZAssocServer(
321     yazpp_1::IPDU_Observable *PDU_Observable,
322     std::string route,
323     Rep *rep)
324     : 
325     Z_Assoc(PDU_Observable), m_route(route), m_p(rep)
326 {
327     m_package = 0;
328 }
329
330
331 void yf::FrontendNet::ZAssocServer::set_package(const mp::Package *package)
332 {
333     m_package = package;
334 }
335
336 void yf::FrontendNet::ZAssocServer::set_thread_pool(
337     ThreadPoolSocketObserver *observer)
338 {
339     m_thread_pool_observer = observer;
340 }
341
342 yazpp_1::IPDU_Observer *yf::FrontendNet::ZAssocServer::sessionNotify(
343     yazpp_1::IPDU_Observable *the_PDU_Observable, int fd)
344 {
345
346     const char *peername = the_PDU_Observable->getpeername();
347     if (peername)
348     {
349         limit_connect.add_connect(peername);
350         limit_connect.cleanup(false);
351         int con_sz = limit_connect.get_total(peername);
352         if (m_p->m_connect_max && con_sz > m_p->m_connect_max)
353             return 0;
354     }
355     ZAssocChild *my = new ZAssocChild(the_PDU_Observable,
356                                       m_thread_pool_observer,
357                                       m_package, m_route, m_p);
358     return my;
359 }
360
361 yf::FrontendNet::ZAssocServer::~ZAssocServer()
362 {
363 }
364
365 void yf::FrontendNet::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
366 {
367 }
368
369 void yf::FrontendNet::ZAssocServer::failNotify()
370 {
371 }
372
373 void yf::FrontendNet::ZAssocServer::timeoutNotify()
374 {
375 }
376
377 void yf::FrontendNet::ZAssocServer::connectNotify()
378 {
379 }
380
381 yf::FrontendNet::FrontendNet() : m_p(new Rep)
382 {
383     m_p->m_no_threads = 5;
384     m_p->m_listen_duration = 0;
385     m_p->m_session_timeout = 300; // 5 minutes
386     m_p->m_connect_max = 0;
387     m_p->az = 0;
388 }
389
390 yf::FrontendNet::~FrontendNet()
391 {
392     if (m_p->az)
393     {
394         size_t i;
395         for (i = 0; i<m_p->m_ports.size(); i++)
396             delete m_p->az[i];
397         delete [] m_p->az;
398     }
399     m_p->az = 0;
400 }
401
402 void yf::FrontendNet::stop() const
403 {
404     if (m_p->az)
405     {
406         size_t i;
407         for (i = 0; i<m_p->m_ports.size(); i++)
408             m_p->az[i]->server("");
409     }
410 }
411
412 bool yf::FrontendNet::My_Timer_Thread::timeout()
413 {
414     return m_timeout;
415 }
416
417 yf::FrontendNet::My_Timer_Thread::My_Timer_Thread(
418     yazpp_1::ISocketObservable *obs,
419     int duration) : 
420     m_obs(obs), m_pipe(9123), m_timeout(false)
421 {
422     obs->addObserver(m_pipe.read_fd(), this);
423     obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
424     obs->timeoutObserver(this, duration);
425 }
426
427 void yf::FrontendNet::My_Timer_Thread::socketNotify(int event)
428 {
429     m_timeout = true;
430     m_obs->deleteObserver(this);
431 }
432
433 void yf::FrontendNet::process(Package &package) const
434 {
435     if (m_p->az == 0)
436         return;
437     size_t i;
438     My_Timer_Thread *tt = 0;
439
440     if (m_p->m_listen_duration)
441         tt = new My_Timer_Thread(&m_p->mySocketManager,
442                                  m_p->m_listen_duration);
443     
444     ThreadPoolSocketObserver tp(&m_p->mySocketManager, m_p->m_no_threads);
445
446     for (i = 0; i<m_p->m_ports.size(); i++)
447     {
448         m_p->az[i]->set_package(&package);
449         m_p->az[i]->set_thread_pool(&tp);
450     }
451     while (m_p->mySocketManager.processEvent() > 0)
452     {
453         int no = m_p->mySocketManager.getNumberOfObservers();
454         if (no <= 1)
455             break;
456         if (tt && tt->timeout())
457             break;
458     }
459     delete tt;
460 }
461
462 void yf::FrontendNet::configure(const xmlNode * ptr, bool test_only,
463                                 const char *path)
464 {
465     if (!ptr || !ptr->children)
466     {
467         throw yf::FilterException("No ports for Frontend");
468     }
469     std::vector<Port> ports;
470     for (ptr = ptr->children; ptr; ptr = ptr->next)
471     {
472         if (ptr->type != XML_ELEMENT_NODE)
473             continue;
474         if (!strcmp((const char *) ptr->name, "port"))
475         {
476             Port port;
477             const struct _xmlAttr *attr;
478             for (attr = ptr->properties; attr; attr = attr->next)
479             {
480                 if (!strcmp((const char *) attr->name, "route"))
481                     port.route = mp::xml::get_text(attr);
482             }
483             port.port = mp::xml::get_text(ptr);
484             ports.push_back(port);
485             
486         }
487         else if (!strcmp((const char *) ptr->name, "threads"))
488         {
489             std::string threads_str = mp::xml::get_text(ptr);
490             int threads = atoi(threads_str.c_str());
491             if (threads < 1)
492                 throw yf::FilterException("Bad value for threads: " 
493                                                    + threads_str);
494             m_p->m_no_threads = threads;
495         }
496         else if (!strcmp((const char *) ptr->name, "timeout"))
497         {
498             std::string timeout_str = mp::xml::get_text(ptr);
499             int timeout = atoi(timeout_str.c_str());
500             if (timeout < 1)
501                 throw yf::FilterException("Bad value for timeout: " 
502                                                    + timeout_str);
503             m_p->m_session_timeout = timeout;
504         }
505         else if (!strcmp((const char *) ptr->name, "connect-max"))
506         {
507             m_p->m_connect_max = mp::xml::get_int(ptr, 0);
508         }
509         else if (!strcmp((const char *) ptr->name, "message"))
510         {
511             m_p->m_msg_config = mp::xml::get_text(ptr);
512         }
513         else
514         {
515             throw yf::FilterException("Bad element " 
516                                       + std::string((const char *)
517                                                     ptr->name));
518         }
519     }
520     if (test_only)
521         return;
522     set_ports(ports);
523 }
524
525 void yf::FrontendNet::set_ports(std::vector<std::string> &ports)
526 {
527     std::vector<Port> nports;
528     size_t i;
529
530     for (i = 0; i < ports.size(); i++)
531     {
532         Port nport;
533
534         nport.port = ports[i];
535
536         nports.push_back(nport);
537     }
538     set_ports(nports);
539 }
540
541
542 void yf::FrontendNet::set_ports(std::vector<Port> &ports)
543 {
544     m_p->m_ports = ports;
545     
546     m_p->az = new yf::FrontendNet::ZAssocServer *[m_p->m_ports.size()];
547     
548     // Create yf::FrontendNet::ZAssocServer for each port
549     size_t i;
550     for (i = 0; i<m_p->m_ports.size(); i++)
551     {
552         // create a PDU assoc object (one per yf::FrontendNet::ZAssocServer)
553         yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&m_p->mySocketManager);
554         
555         // create ZAssoc with PDU Assoc
556         m_p->az[i] = new yf::FrontendNet::ZAssocServer(
557             as, m_p->m_ports[i].route, m_p.get());
558         if (m_p->az[i]->server(m_p->m_ports[i].port.c_str()))
559         {
560             throw yf::FilterException("Unable to bind to address " 
561                                       + std::string(m_p->m_ports[i].port));
562         }
563     }
564 }
565
566 void yf::FrontendNet::set_listen_duration(int d)
567 {
568     m_p->m_listen_duration = d;
569 }
570
571 static yf::Base* filter_creator()
572 {
573     return new yf::FrontendNet;
574 }
575
576 extern "C" {
577     struct metaproxy_1_filter_struct metaproxy_1_filter_frontend_net = {
578         0,
579         "frontend_net",
580         filter_creator
581     };
582 }
583
584 /*
585  * Local variables:
586  * c-basic-offset: 4
587  * c-file-style: "Stroustrup"
588  * indent-tabs-mode: nil
589  * End:
590  * vim: shiftwidth=4 tabstop=8 expandtab
591  */
592