RouterFleXML now reads XML simple config and make proper runtime
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
1 /* $Id: filter_frontend_net.cpp,v 1.12 2006-01-09 13:43:59 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7
8 #include "config.hpp"
9
10 #include "xmlutil.hpp"
11 #include "pipe.hpp"
12 #include "filter.hpp"
13 #include "router.hpp"
14 #include "package.hpp"
15 #include "thread_pool_observer.hpp"
16 #include "filter_frontend_net.hpp"
17 #include <yaz++/z-assoc.h>
18 #include <yaz++/pdu-assoc.h>
19 #include <yaz++/socket-manager.h>
20 #include <yaz/log.h>
21
22 #include <iostream>
23
24 namespace yp2 {
25     class My_Timer_Thread : public yazpp_1::ISocketObserver {
26     private:
27         yazpp_1::ISocketObservable *m_obs;
28         Pipe m_pipe;
29         bool m_timeout;
30     public:
31         My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
32         void socketNotify(int event);
33         bool timeout();
34     };
35     class ZAssocChild : public yazpp_1::Z_Assoc {
36     public:
37         ~ZAssocChild();
38         ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
39                           yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
40                           const yp2::Package *package);
41         int m_no_requests;
42     private:
43         yazpp_1::IPDU_Observer* sessionNotify(
44             yazpp_1::IPDU_Observable *the_PDU_Observable,
45             int fd);
46         void recv_GDU(Z_GDU *apdu, int len);
47         
48         void failNotify();
49         void timeoutNotify();
50         void connectNotify();
51     private:
52         yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
53         yp2::Session m_session;
54         yp2::Origin m_origin;
55         bool m_delete_flag;
56         const yp2::Package *m_package;
57     };
58     class ThreadPoolPackage : public yp2::IThreadPoolMsg {
59     public:
60         ThreadPoolPackage(yp2::Package *package, yp2::ZAssocChild *ses) :
61             m_session(ses), m_package(package) { };
62         ~ThreadPoolPackage();
63         IThreadPoolMsg *handle();
64         void result();
65         
66     private:
67         yp2::ZAssocChild *m_session;
68         yp2::Package *m_package;
69         
70     };
71     class ZAssocServer : public yazpp_1::Z_Assoc {
72     public:
73         ~ZAssocServer();
74         ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
75                      yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
76                      const yp2::Package *package);
77     private:
78         yazpp_1::IPDU_Observer* sessionNotify(
79             yazpp_1::IPDU_Observable *the_PDU_Observable,
80             int fd);
81         void recv_GDU(Z_GDU *apdu, int len);
82         
83         void failNotify();
84         void timeoutNotify();
85     void connectNotify();
86     private:
87         yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
88         const yp2::Package *m_package;
89     };
90 }
91
92 yp2::ThreadPoolPackage::~ThreadPoolPackage()
93 {
94     delete m_package;
95 }
96
97 void yp2::ThreadPoolPackage::result()
98 {
99     m_session->m_no_requests--;
100
101     yazpp_1::GDU *gdu = &m_package->response();
102     if (gdu->get())
103     {
104         int len;
105         m_session->send_GDU(gdu->get(), &len);
106     }
107     if (m_session->m_no_requests == 0 && m_package->session().is_closed())
108         delete m_session;
109     delete this;
110 }
111
112 yp2::IThreadPoolMsg *yp2::ThreadPoolPackage::handle() 
113 {
114     m_package->move();
115     return this;
116 }
117
118
119 yp2::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
120                                      yp2::ThreadPoolSocketObserver *my_thread_pool,
121                                      const yp2::Package *package)
122     :  Z_Assoc(PDU_Observable)
123 {
124     m_thread_pool_observer = my_thread_pool;
125     m_no_requests = 0;
126     m_delete_flag = false;
127     m_package = package;
128 }
129
130
131 yazpp_1::IPDU_Observer *yp2::ZAssocChild::sessionNotify(yazpp_1::IPDU_Observable
132                                                   *the_PDU_Observable, int fd)
133 {
134     return 0;
135 }
136
137 yp2::ZAssocChild::~ZAssocChild()
138 {
139 }
140
141 void yp2::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
142 {
143     m_no_requests++;
144
145     yp2::Package *p = new yp2::Package(m_session, m_origin);
146
147     yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
148     p->copy_filter(*m_package);
149     p->request() = yazpp_1::GDU(z_pdu);
150     m_thread_pool_observer->put(tp);  
151 }
152
153 void yp2::ZAssocChild::failNotify()
154 {
155     // TODO: send Package to signal "close"
156     if (m_session.is_closed())
157         return;
158     m_no_requests++;
159
160     m_session.close();
161
162     yp2::Package *p = new yp2::Package(m_session, m_origin);
163
164     yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
165     p->copy_filter(*m_package);
166     m_thread_pool_observer->put(tp);  
167 }
168
169 void yp2::ZAssocChild::timeoutNotify()
170 {
171     failNotify();
172 }
173
174 void yp2::ZAssocChild::connectNotify()
175 {
176
177 }
178
179 yp2::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
180                            yp2::ThreadPoolSocketObserver *thread_pool_observer,
181                            const yp2::Package *package)
182     :  Z_Assoc(PDU_Observable)
183 {
184     m_thread_pool_observer = thread_pool_observer;
185     m_package = package;
186
187 }
188
189 yazpp_1::IPDU_Observer *yp2::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
190                                                  *the_PDU_Observable, int fd)
191 {
192     yp2::ZAssocChild *my =
193         new yp2::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
194                              m_package);
195     return my;
196 }
197
198 yp2::ZAssocServer::~ZAssocServer()
199 {
200 }
201
202 void yp2::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
203 {
204 }
205
206 void yp2::ZAssocServer::failNotify()
207 {
208 }
209
210 void yp2::ZAssocServer::timeoutNotify()
211 {
212 }
213
214 void yp2::ZAssocServer::connectNotify()
215 {
216 }
217
218 yp2::filter::FrontendNet::FrontendNet()
219 {
220     m_no_threads = 5;
221     m_listen_duration = 0;
222 }
223
224
225 bool yp2::My_Timer_Thread::timeout()
226 {
227     return m_timeout;
228 }
229
230 yp2::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
231                                  int duration) : 
232     m_obs(obs), m_pipe(9123), m_timeout(false)
233 {
234     obs->addObserver(m_pipe.read_fd(), this);
235     obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
236     obs->timeoutObserver(this, duration);
237 }
238
239 void yp2::My_Timer_Thread::socketNotify(int event)
240 {
241     m_timeout = true;
242     m_obs->deleteObserver(this);
243 }
244
245 void yp2::filter::FrontendNet::process(Package &package) const {
246     if (m_ports.size() == 0)
247         return;
248
249     yazpp_1::SocketManager mySocketManager;
250
251     My_Timer_Thread *tt = 0;
252     if (m_listen_duration)
253         tt = new My_Timer_Thread(&mySocketManager, m_listen_duration);
254
255     ThreadPoolSocketObserver threadPool(&mySocketManager, m_no_threads);
256
257     yp2::ZAssocServer **az = new yp2::ZAssocServer *[m_ports.size()];
258
259     // Create yp2::ZAssocServer for each port
260     size_t i;
261     for (i = 0; i<m_ports.size(); i++)
262     {
263         // create a PDU assoc object (one per yp2::ZAssocServer)
264         yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&mySocketManager);
265
266         // create ZAssoc with PDU Assoc
267         az[i] = new yp2::ZAssocServer(as, &threadPool, &package);
268         az[i]->server(m_ports[i].c_str());
269     }
270     while (mySocketManager.processEvent() > 0)
271     {
272         if (tt && tt->timeout())
273             break;
274     }
275     for (i = 0; i<m_ports.size(); i++)
276         delete az[i];
277
278     delete [] az;
279     delete tt;
280 }
281
282 void yp2::filter::FrontendNet::configure(const xmlNode * ptr)
283 {
284     if (!ptr || !ptr->children)
285     {
286         throw yp2::filter::FilterException("No ports for Frontend");
287     }
288     std::vector<std::string> ports;
289     for (ptr = ptr->children; ptr; ptr = ptr->next)
290     {
291         if (ptr->type == XML_ELEMENT_NODE)
292         {
293             if (!strcmp((const char *) ptr->name, "port"))
294             {
295                 std::string port = yp2::xml::get_text(ptr);
296                 ports.push_back(port);
297                 
298             }
299             else
300             {
301                 throw yp2::filter::FilterException("Bad element " 
302                                                    + std::string((const char *)
303                                                                  ptr->name));
304             }
305         }
306     }
307     m_ports = ports;
308 }
309
310 std::vector<std::string> &yp2::filter::FrontendNet::ports()
311 {
312     return m_ports;
313 }
314
315 int &yp2::filter::FrontendNet::listen_duration()
316 {
317     return m_listen_duration;
318 }
319
320 static yp2::filter::Base* filter_creator()
321 {
322     return new yp2::filter::FrontendNet;
323 }
324
325 extern "C" {
326     struct yp2_filter_struct yp2_filter_frontend_net = {
327         0,
328         "frontend_net",
329         filter_creator
330     };
331 }
332
333 /*
334  * Local variables:
335  * c-basic-offset: 4
336  * indent-tabs-mode: nil
337  * c-file-style: "stroustrup"
338  * End:
339  * vim: shiftwidth=4 tabstop=8 expandtab
340  */