Remove incoming requests that can not be handled
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2012 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include <sstream>
22 #include <iomanip>
23 #include <metaproxy/util.hpp>
24 #include "pipe.hpp"
25 #include <metaproxy/filter.hpp>
26 #include <metaproxy/package.hpp>
27 #include "thread_pool_observer.hpp"
28 #include "filter_frontend_net.hpp"
29 #include <yazpp/z-assoc.h>
30 #include <yazpp/pdu-assoc.h>
31 #include <yazpp/socket-manager.h>
32 #include <yazpp/limit-connect.h>
33 #include <yaz/timing.h>
34 #include <yaz/log.h>
35
36 #include <iostream>
37
38 namespace mp = metaproxy_1;
39
40 namespace metaproxy_1 {
41     class My_Timer_Thread;
42     class ZAssocServer;
43     namespace filter {
44         class FrontendNet::Port {
45             friend class Rep;
46             friend class FrontendNet;
47             std::string port;
48             std::string route;
49         };
50         class FrontendNet::Rep {
51             friend class FrontendNet;
52             int m_no_threads;
53             std::vector<Port> m_ports;
54             int m_listen_duration;
55             int m_session_timeout;
56             int m_connect_max;
57             std::string m_msg_config;
58             yazpp_1::SocketManager mySocketManager;
59             ZAssocServer **az;
60         };
61     }
62     class My_Timer_Thread : public yazpp_1::ISocketObserver {
63     private:
64         yazpp_1::ISocketObservable *m_obs;
65         Pipe m_pipe;
66         bool m_timeout;
67     public:
68         My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
69         void socketNotify(int event);
70         bool timeout();
71     };
72     class ZAssocChild : public yazpp_1::Z_Assoc {
73     public:
74         ~ZAssocChild();
75         ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
76                     mp::ThreadPoolSocketObserver *m_thread_pool_observer,
77                     const mp::Package *package,
78                     std::string route,
79                     const char *msg_config);
80         int m_no_requests;
81         std::string m_route;
82     private:
83         yazpp_1::IPDU_Observer* sessionNotify(
84             yazpp_1::IPDU_Observable *the_PDU_Observable,
85             int fd);
86         void recv_GDU(Z_GDU *apdu, int len);
87         
88         void failNotify();
89         void timeoutNotify();
90         void connectNotify();
91     private:
92         mp::ThreadPoolSocketObserver *m_thread_pool_observer;
93         mp::Session m_session;
94         mp::Origin m_origin;
95         bool m_delete_flag;
96         const mp::Package *m_package;
97         const char *m_msg_config;
98     };
99     class ThreadPoolPackage : public mp::IThreadPoolMsg {
100     public:
101         ThreadPoolPackage(mp::Package *package, mp::ZAssocChild *ses,
102             const char *msg_config);
103         ~ThreadPoolPackage();
104         IThreadPoolMsg *handle();
105         void result(const char *t_info);
106         bool cleanup(void *info);
107     private:
108         yaz_timing_t timer;
109         mp::ZAssocChild *m_assoc_child;
110         mp::Package *m_package;
111         const char *m_msg_config;        
112     };
113     class ZAssocServer : public yazpp_1::Z_Assoc {
114     public:
115         ~ZAssocServer();
116         ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable, int timeout,
117                      int connect_max, std::string route,
118                      const char *msg_config);
119         void set_package(const mp::Package *package);
120         void set_thread_pool(ThreadPoolSocketObserver *m_thread_pool_observer);
121     private:
122         yazpp_1::IPDU_Observer* sessionNotify(
123             yazpp_1::IPDU_Observable *the_PDU_Observable,
124             int fd);
125         void recv_GDU(Z_GDU *apdu, int len);
126         
127         void failNotify();
128         void timeoutNotify();
129     void connectNotify();
130     private:
131         mp::ThreadPoolSocketObserver *m_thread_pool_observer;
132         const mp::Package *m_package;
133         int m_session_timeout;
134         int m_connect_max;
135         yazpp_1::LimitConnect limit_connect;
136         std::string m_route;
137         const char *m_msg_config;
138     };
139 }
140
141 mp::ThreadPoolPackage::ThreadPoolPackage(mp::Package *package,
142                                          mp::ZAssocChild *ses,
143                                          const char *msg_config) :
144     m_assoc_child(ses), m_package(package), m_msg_config(msg_config)
145 {
146     if (msg_config)
147         timer = yaz_timing_create();
148     else
149         timer = 0;
150 }
151
152 mp::ThreadPoolPackage::~ThreadPoolPackage()
153 {
154     yaz_timing_destroy(&timer); // timer may be NULL
155     delete m_package;
156 }
157
158 bool mp::ThreadPoolPackage::cleanup(void *info)
159 {
160     mp::Session *ses = (mp::Session *) info;
161
162     return *ses == m_package->session();
163 }
164
165 void mp::ThreadPoolPackage::result(const char *t_info)
166 {
167     m_assoc_child->m_no_requests--;
168
169     yazpp_1::GDU *gdu = &m_package->response();
170
171     if (gdu->get())
172     {
173         int len;
174         m_assoc_child->send_GDU(gdu->get(), &len);
175     }
176     else if (!m_package->session().is_closed())
177     {
178         // no response package and yet the session is still open..
179         // means that request is unhandled..
180         yazpp_1::GDU *gdu_req = &m_package->request();
181         Z_GDU *z_gdu = gdu_req->get();
182         if (z_gdu && z_gdu->which == Z_GDU_Z3950)
183         {
184             // For Z39.50, response with a Close and shutdown
185             mp::odr odr;
186             int len;
187             Z_APDU *apdu_response = odr.create_close(
188                 z_gdu->u.z3950, Z_Close_systemProblem, 
189                 "unhandled Z39.50 request");
190
191             m_assoc_child->send_Z_PDU(apdu_response, &len);
192         }
193         else if (z_gdu && z_gdu->which == Z_GDU_HTTP_Request)
194         {
195             // For HTTP, respond with Server Error
196             int len;
197             mp::odr odr;
198             Z_GDU *zgdu_res 
199                 = odr.create_HTTP_Response(m_package->session(), 
200                                            z_gdu->u.HTTP_Request, 500);
201             m_assoc_child->send_GDU(zgdu_res, &len);
202         }
203         m_package->session().close();
204     }
205
206     if (m_assoc_child->m_no_requests == 0 && m_package->session().is_closed())
207     {
208         m_assoc_child->close();
209     }
210
211     if (m_msg_config)
212     {
213         yaz_timing_stop(timer);
214         double duration = yaz_timing_get_real(timer);
215
216         std::ostringstream os;
217         os  << m_msg_config << " "
218             << *m_package << " "
219             << std::fixed << std::setprecision (6) << duration;
220         
221         yaz_log(YLOG_LOG, "%s %s", os.str().c_str(), t_info);
222     }
223
224     delete this;
225 }
226
227 mp::IThreadPoolMsg *mp::ThreadPoolPackage::handle() 
228 {
229     m_package->move(m_assoc_child->m_route);
230     return this;
231 }
232
233
234 mp::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
235                              mp::ThreadPoolSocketObserver *my_thread_pool,
236                              const mp::Package *package,
237                              std::string route,
238                              const char *msg_config)
239     :  Z_Assoc(PDU_Observable), m_msg_config(msg_config)
240 {
241     m_thread_pool_observer = my_thread_pool;
242     m_no_requests = 0;
243     m_delete_flag = false;
244     m_package = package;
245     m_route = route;
246     const char *peername = PDU_Observable->getpeername();
247     if (!peername)
248         peername = "unknown";
249     m_origin.set_tcpip_address(std::string(peername), m_session.id());
250 }
251
252
253 yazpp_1::IPDU_Observer *mp::ZAssocChild::sessionNotify(yazpp_1::IPDU_Observable
254                                                   *the_PDU_Observable, int fd)
255 {
256     return 0;
257 }
258
259 mp::ZAssocChild::~ZAssocChild()
260 {
261 }
262
263 void mp::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
264 {
265     m_no_requests++;
266
267     mp::Package *p = new mp::Package(m_session, m_origin);
268
269     mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this,
270                                                           m_msg_config);
271     p->copy_route(*m_package);
272     p->request() = yazpp_1::GDU(z_pdu);
273     m_thread_pool_observer->put(tp);  
274 }
275
276 void mp::ZAssocChild::failNotify()
277 {
278     // TODO: send Package to signal "close"
279     if (m_session.is_closed())
280     {
281         if (m_no_requests == 0)
282             delete this;
283         return;
284     }
285     m_no_requests++;
286
287     m_session.close();
288
289     mp::Package *p = new mp::Package(m_session, m_origin);
290
291     mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this,
292                                                           m_msg_config);
293     p->copy_route(*m_package);
294     m_thread_pool_observer->cleanup(tp, &m_session);
295     m_thread_pool_observer->put(tp);
296 }
297
298 void mp::ZAssocChild::timeoutNotify()
299 {
300     failNotify();
301 }
302
303 void mp::ZAssocChild::connectNotify()
304 {
305
306 }
307
308 mp::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
309                                int timeout, int connect_max,
310                                std::string route, const char *msg_config)
311     :  Z_Assoc(PDU_Observable), m_session_timeout(timeout),
312        m_connect_max(connect_max), m_route(route), m_msg_config(msg_config)
313 {
314     m_package = 0;
315 }
316
317
318 void mp::ZAssocServer::set_package(const mp::Package *package)
319 {
320     m_package = package;
321 }
322
323 void mp::ZAssocServer::set_thread_pool(ThreadPoolSocketObserver *observer)
324 {
325     m_thread_pool_observer = observer;
326 }
327
328 yazpp_1::IPDU_Observer *mp::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
329                                                  *the_PDU_Observable, int fd)
330 {
331
332     const char *peername = the_PDU_Observable->getpeername();
333     if (peername)
334     {
335         limit_connect.add_connect(peername);
336         limit_connect.cleanup(false);
337         int con_sz = limit_connect.get_total(peername);
338         if (m_connect_max && con_sz > m_connect_max)
339             return 0;
340     }
341     mp::ZAssocChild *my =
342         new mp::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
343                             m_package, m_route, m_msg_config);
344     my->timeout(m_session_timeout);
345     return my;
346 }
347
348 mp::ZAssocServer::~ZAssocServer()
349 {
350 }
351
352 void mp::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
353 {
354 }
355
356 void mp::ZAssocServer::failNotify()
357 {
358 }
359
360 void mp::ZAssocServer::timeoutNotify()
361 {
362 }
363
364 void mp::ZAssocServer::connectNotify()
365 {
366 }
367
368 mp::filter::FrontendNet::FrontendNet() : m_p(new Rep)
369 {
370     m_p->m_no_threads = 5;
371     m_p->m_listen_duration = 0;
372     m_p->m_session_timeout = 300; // 5 minutes
373     m_p->m_connect_max = 0;
374     m_p->az = 0;
375 }
376
377 mp::filter::FrontendNet::~FrontendNet()
378 {
379     if (m_p->az)
380     {
381         size_t i;
382         for (i = 0; i<m_p->m_ports.size(); i++)
383             delete m_p->az[i];
384         delete [] m_p->az;
385     }
386 }
387
388 bool mp::My_Timer_Thread::timeout()
389 {
390     return m_timeout;
391 }
392
393 mp::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
394                                  int duration) : 
395     m_obs(obs), m_pipe(9123), m_timeout(false)
396 {
397     obs->addObserver(m_pipe.read_fd(), this);
398     obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
399     obs->timeoutObserver(this, duration);
400 }
401
402 void mp::My_Timer_Thread::socketNotify(int event)
403 {
404     m_timeout = true;
405     m_obs->deleteObserver(this);
406 }
407
408 void mp::filter::FrontendNet::process(Package &package) const
409 {
410     if (m_p->az == 0)
411         return;
412     size_t i;
413     My_Timer_Thread *tt = 0;
414
415     if (m_p->m_listen_duration)
416         tt = new My_Timer_Thread(&m_p->mySocketManager,
417                                  m_p->m_listen_duration);
418     
419     ThreadPoolSocketObserver tp(&m_p->mySocketManager, m_p->m_no_threads);
420
421     for (i = 0; i<m_p->m_ports.size(); i++)
422     {
423         m_p->az[i]->set_package(&package);
424         m_p->az[i]->set_thread_pool(&tp);
425     }
426     while (m_p->mySocketManager.processEvent() > 0)
427     {
428         if (tt && tt->timeout())
429             break;
430     }
431     delete tt;
432 }
433
434 void mp::filter::FrontendNet::configure(const xmlNode * ptr, bool test_only,
435                                         const char *path)
436 {
437     if (!ptr || !ptr->children)
438     {
439         throw mp::filter::FilterException("No ports for Frontend");
440     }
441     std::vector<Port> ports;
442     for (ptr = ptr->children; ptr; ptr = ptr->next)
443     {
444         if (ptr->type != XML_ELEMENT_NODE)
445             continue;
446         if (!strcmp((const char *) ptr->name, "port"))
447         {
448             Port port;
449             const struct _xmlAttr *attr;
450             for (attr = ptr->properties; attr; attr = attr->next)
451             {
452                 if (!strcmp((const char *) attr->name, "route"))
453                     port.route = mp::xml::get_text(attr);
454             }
455             port.port = mp::xml::get_text(ptr);
456             ports.push_back(port);
457             
458         }
459         else if (!strcmp((const char *) ptr->name, "threads"))
460         {
461             std::string threads_str = mp::xml::get_text(ptr);
462             int threads = atoi(threads_str.c_str());
463             if (threads < 1)
464                 throw mp::filter::FilterException("Bad value for threads: " 
465                                                    + threads_str);
466             m_p->m_no_threads = threads;
467         }
468         else if (!strcmp((const char *) ptr->name, "timeout"))
469         {
470             std::string timeout_str = mp::xml::get_text(ptr);
471             int timeout = atoi(timeout_str.c_str());
472             if (timeout < 1)
473                 throw mp::filter::FilterException("Bad value for timeout: " 
474                                                    + timeout_str);
475             m_p->m_session_timeout = timeout;
476         }
477         else if (!strcmp((const char *) ptr->name, "connect-max"))
478         {
479             m_p->m_connect_max = mp::xml::get_int(ptr, 0);
480         }
481         else if (!strcmp((const char *) ptr->name, "message"))
482         {
483             m_p->m_msg_config = mp::xml::get_text(ptr);
484         }
485         else
486         {
487             throw mp::filter::FilterException("Bad element " 
488                                                + std::string((const char *)
489                                                              ptr->name));
490         }
491     }
492     if (test_only)
493         return;
494     set_ports(ports);
495 }
496
497 void mp::filter::FrontendNet::set_ports(std::vector<std::string> &ports)
498 {
499     std::vector<Port> nports;
500     size_t i;
501
502     for (i = 0; i < ports.size(); i++)
503     {
504         Port nport;
505
506         nport.port = ports[i];
507
508         nports.push_back(nport);
509     }
510     set_ports(nports);
511 }
512
513
514 void mp::filter::FrontendNet::set_ports(std::vector<Port> &ports)
515 {
516     m_p->m_ports = ports;
517     
518     m_p->az = new mp::ZAssocServer *[m_p->m_ports.size()];
519     
520     // Create mp::ZAssocServer for each port
521     size_t i;
522     for (i = 0; i<m_p->m_ports.size(); i++)
523     {
524         // create a PDU assoc object (one per mp::ZAssocServer)
525         yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&m_p->mySocketManager);
526         
527         // create ZAssoc with PDU Assoc
528         m_p->az[i] = new mp::ZAssocServer(as, 
529                                           m_p->m_session_timeout,
530                                           m_p->m_connect_max,
531                                           m_p->m_ports[i].route,
532                                           m_p->m_msg_config.length() > 0 ?
533                                           m_p->m_msg_config.c_str() : 0);
534         if (m_p->az[i]->server(m_p->m_ports[i].port.c_str()))
535         {
536             throw mp::filter::FilterException("Unable to bind to address " 
537                                               + std::string(m_p->m_ports[i].port));
538         }
539     }
540 }
541
542 void mp::filter::FrontendNet::set_listen_duration(int d)
543 {
544     m_p->m_listen_duration = d;
545 }
546
547 static mp::filter::Base* filter_creator()
548 {
549     return new mp::filter::FrontendNet;
550 }
551
552 extern "C" {
553     struct metaproxy_1_filter_struct metaproxy_1_filter_frontend_net = {
554         0,
555         "frontend_net",
556         filter_creator
557     };
558 }
559
560 /*
561  * Local variables:
562  * c-basic-offset: 4
563  * c-file-style: "Stroustrup"
564  * indent-tabs-mode: nil
565  * End:
566  * vim: shiftwidth=4 tabstop=8 expandtab
567  */
568