Named routes for filter virt_db. Example in etc/config2.xml
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
1 /* $Id: filter_frontend_net.cpp,v 1.15 2006-01-11 11:51:49 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "pipe.hpp"
10 #include "filter.hpp"
11 #include "package.hpp"
12 #include "thread_pool_observer.hpp"
13 #include "filter_frontend_net.hpp"
14 #include <yaz++/z-assoc.h>
15 #include <yaz++/pdu-assoc.h>
16 #include <yaz++/socket-manager.h>
17 #include <yaz/log.h>
18
19 #include <iostream>
20
21 namespace yp2 {
22     namespace filter {
23         class FrontendNet::Rep {
24             friend class FrontendNet;
25             int m_no_threads;
26             std::vector<std::string> m_ports;
27             int m_listen_duration;
28         };
29     }
30     class My_Timer_Thread : public yazpp_1::ISocketObserver {
31     private:
32         yazpp_1::ISocketObservable *m_obs;
33         Pipe m_pipe;
34         bool m_timeout;
35     public:
36         My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
37         void socketNotify(int event);
38         bool timeout();
39     };
40     class ZAssocChild : public yazpp_1::Z_Assoc {
41     public:
42         ~ZAssocChild();
43         ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
44                           yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
45                           const yp2::Package *package);
46         int m_no_requests;
47     private:
48         yazpp_1::IPDU_Observer* sessionNotify(
49             yazpp_1::IPDU_Observable *the_PDU_Observable,
50             int fd);
51         void recv_GDU(Z_GDU *apdu, int len);
52         
53         void failNotify();
54         void timeoutNotify();
55         void connectNotify();
56     private:
57         yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
58         yp2::Session m_session;
59         yp2::Origin m_origin;
60         bool m_delete_flag;
61         const yp2::Package *m_package;
62     };
63     class ThreadPoolPackage : public yp2::IThreadPoolMsg {
64     public:
65         ThreadPoolPackage(yp2::Package *package, yp2::ZAssocChild *ses) :
66             m_session(ses), m_package(package) { };
67         ~ThreadPoolPackage();
68         IThreadPoolMsg *handle();
69         void result();
70         
71     private:
72         yp2::ZAssocChild *m_session;
73         yp2::Package *m_package;
74         
75     };
76     class ZAssocServer : public yazpp_1::Z_Assoc {
77     public:
78         ~ZAssocServer();
79         ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
80                      yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
81                      const yp2::Package *package);
82     private:
83         yazpp_1::IPDU_Observer* sessionNotify(
84             yazpp_1::IPDU_Observable *the_PDU_Observable,
85             int fd);
86         void recv_GDU(Z_GDU *apdu, int len);
87         
88         void failNotify();
89         void timeoutNotify();
90     void connectNotify();
91     private:
92         yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
93         const yp2::Package *m_package;
94     };
95 }
96
97 yp2::ThreadPoolPackage::~ThreadPoolPackage()
98 {
99     delete m_package;
100 }
101
102 void yp2::ThreadPoolPackage::result()
103 {
104     m_session->m_no_requests--;
105
106     yazpp_1::GDU *gdu = &m_package->response();
107     if (gdu->get())
108     {
109         int len;
110         m_session->send_GDU(gdu->get(), &len);
111     }
112     if (m_session->m_no_requests == 0 && m_package->session().is_closed())
113         delete m_session;
114     delete this;
115 }
116
117 yp2::IThreadPoolMsg *yp2::ThreadPoolPackage::handle() 
118 {
119     m_package->move();
120     return this;
121 }
122
123
124 yp2::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
125                                      yp2::ThreadPoolSocketObserver *my_thread_pool,
126                                      const yp2::Package *package)
127     :  Z_Assoc(PDU_Observable)
128 {
129     m_thread_pool_observer = my_thread_pool;
130     m_no_requests = 0;
131     m_delete_flag = false;
132     m_package = package;
133 }
134
135
136 yazpp_1::IPDU_Observer *yp2::ZAssocChild::sessionNotify(yazpp_1::IPDU_Observable
137                                                   *the_PDU_Observable, int fd)
138 {
139     return 0;
140 }
141
142 yp2::ZAssocChild::~ZAssocChild()
143 {
144 }
145
146 void yp2::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
147 {
148     m_no_requests++;
149
150     yp2::Package *p = new yp2::Package(m_session, m_origin);
151
152     yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
153     p->copy_filter(*m_package);
154     p->request() = yazpp_1::GDU(z_pdu);
155     m_thread_pool_observer->put(tp);  
156 }
157
158 void yp2::ZAssocChild::failNotify()
159 {
160     // TODO: send Package to signal "close"
161     if (m_session.is_closed())
162         return;
163     m_no_requests++;
164
165     m_session.close();
166
167     yp2::Package *p = new yp2::Package(m_session, m_origin);
168
169     yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
170     p->copy_filter(*m_package);
171     m_thread_pool_observer->put(tp);  
172 }
173
174 void yp2::ZAssocChild::timeoutNotify()
175 {
176     failNotify();
177 }
178
179 void yp2::ZAssocChild::connectNotify()
180 {
181
182 }
183
184 yp2::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
185                            yp2::ThreadPoolSocketObserver *thread_pool_observer,
186                            const yp2::Package *package)
187     :  Z_Assoc(PDU_Observable)
188 {
189     m_thread_pool_observer = thread_pool_observer;
190     m_package = package;
191
192 }
193
194 yazpp_1::IPDU_Observer *yp2::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
195                                                  *the_PDU_Observable, int fd)
196 {
197     yp2::ZAssocChild *my =
198         new yp2::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
199                              m_package);
200     return my;
201 }
202
203 yp2::ZAssocServer::~ZAssocServer()
204 {
205 }
206
207 void yp2::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
208 {
209 }
210
211 void yp2::ZAssocServer::failNotify()
212 {
213 }
214
215 void yp2::ZAssocServer::timeoutNotify()
216 {
217 }
218
219 void yp2::ZAssocServer::connectNotify()
220 {
221 }
222
223 yp2::filter::FrontendNet::FrontendNet() : m_p(new Rep)
224 {
225     m_p->m_no_threads = 5;
226     m_p->m_listen_duration = 0;
227 }
228
229 yp2::filter::FrontendNet::~FrontendNet()
230 {
231 }
232
233 bool yp2::My_Timer_Thread::timeout()
234 {
235     return m_timeout;
236 }
237
238 yp2::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
239                                  int duration) : 
240     m_obs(obs), m_pipe(9123), m_timeout(false)
241 {
242     obs->addObserver(m_pipe.read_fd(), this);
243     obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
244     obs->timeoutObserver(this, duration);
245 }
246
247 void yp2::My_Timer_Thread::socketNotify(int event)
248 {
249     m_timeout = true;
250     m_obs->deleteObserver(this);
251 }
252
253 void yp2::filter::FrontendNet::process(Package &package) const
254 {
255     if (m_p->m_ports.size() == 0)
256         return;
257
258     yazpp_1::SocketManager mySocketManager;
259
260     My_Timer_Thread *tt = 0;
261     if (m_p->m_listen_duration)
262         tt = new My_Timer_Thread(&mySocketManager, m_p->m_listen_duration);
263
264     ThreadPoolSocketObserver threadPool(&mySocketManager, m_p->m_no_threads);
265
266     yp2::ZAssocServer **az = new yp2::ZAssocServer *[m_p->m_ports.size()];
267
268     // Create yp2::ZAssocServer for each port
269     size_t i;
270     for (i = 0; i<m_p->m_ports.size(); i++)
271     {
272         // create a PDU assoc object (one per yp2::ZAssocServer)
273         yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&mySocketManager);
274
275         // create ZAssoc with PDU Assoc
276         az[i] = new yp2::ZAssocServer(as, &threadPool, &package);
277         az[i]->server(m_p->m_ports[i].c_str());
278     }
279     while (mySocketManager.processEvent() > 0)
280     {
281         if (tt && tt->timeout())
282             break;
283     }
284     for (i = 0; i<m_p->m_ports.size(); i++)
285         delete az[i];
286
287     delete [] az;
288     delete tt;
289 }
290
291 void yp2::filter::FrontendNet::configure(const xmlNode * ptr)
292 {
293     if (!ptr || !ptr->children)
294     {
295         throw yp2::filter::FilterException("No ports for Frontend");
296     }
297     std::vector<std::string> ports;
298     for (ptr = ptr->children; ptr; ptr = ptr->next)
299     {
300         if (ptr->type != XML_ELEMENT_NODE)
301             continue;
302         if (!strcmp((const char *) ptr->name, "port"))
303         {
304             std::string port = yp2::xml::get_text(ptr);
305             ports.push_back(port);
306             
307         }
308         else if (!strcmp((const char *) ptr->name, "threads"))
309         {
310             std::string threads_str = yp2::xml::get_text(ptr);
311             int threads = atoi(threads_str.c_str());
312             if (threads < 1)
313                 throw yp2::filter::FilterException("Bad value for threads: " 
314                                                    + threads_str);
315             m_p->m_no_threads = threads;
316         }
317         else
318         {
319             throw yp2::filter::FilterException("Bad element " 
320                                                + std::string((const char *)
321                                                              ptr->name));
322         }
323     }
324     m_p->m_ports = ports;
325 }
326
327 std::vector<std::string> &yp2::filter::FrontendNet::ports()
328 {
329     return m_p->m_ports;
330 }
331
332 int &yp2::filter::FrontendNet::listen_duration()
333 {
334     return m_p->m_listen_duration;
335 }
336
337 static yp2::filter::Base* filter_creator()
338 {
339     return new yp2::filter::FrontendNet;
340 }
341
342 extern "C" {
343     struct yp2_filter_struct yp2_filter_frontend_net = {
344         0,
345         "frontend_net",
346         filter_creator
347     };
348 }
349
350 /*
351  * Local variables:
352  * c-basic-offset: 4
353  * indent-tabs-mode: nil
354  * c-file-style: "stroustrup"
355  * End:
356  * vim: shiftwidth=4 tabstop=8 expandtab
357  */