Use namespace yp2::filter for filters .. Rename filters from
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
1
2
3 #include "config.hpp"
4
5 #include "filter.hpp"
6 #include "router.hpp"
7 #include "package.hpp"
8 #include "thread_pool_observer.hpp"
9 #include "filter_frontend_net.hpp"
10 #include <yaz++/z-assoc.h>
11 #include <yaz++/pdu-assoc.h>
12 #include <yaz++/socket-manager.h>
13 #include <yaz/log.h>
14
15 #include <iostream>
16
17 class ZAssocServerChild : public yazpp_1::Z_Assoc {
18 public:
19     ~ZAssocServerChild();
20     ZAssocServerChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
21                yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
22                const yp2::Package *package);
23     int m_no_requests;
24 private:
25     yazpp_1::IPDU_Observer* sessionNotify(
26         yazpp_1::IPDU_Observable *the_PDU_Observable,
27         int fd);
28     void recv_GDU(Z_GDU *apdu, int len);
29     
30     void failNotify();
31     void timeoutNotify();
32     void connectNotify();
33 private:
34     yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
35     yp2::Session m_session;
36     yp2::Origin m_origin;
37     bool m_delete_flag;
38     const yp2::Package *m_package;
39 };
40
41
42 class ThreadPoolPackage : public yp2::IThreadPoolMsg {
43 public:
44     ThreadPoolPackage(yp2::Package *package, ZAssocServerChild *ses) :
45         m_session(ses), m_package(package) { };
46     ~ThreadPoolPackage();
47     IThreadPoolMsg *handle();
48     void result();
49     
50 private:
51     ZAssocServerChild *m_session;
52     yp2::Package *m_package;
53     
54 };
55
56 ThreadPoolPackage::~ThreadPoolPackage()
57 {
58     delete m_package;
59 }
60
61 void ThreadPoolPackage::result()
62 {
63     m_session->m_no_requests--;
64
65     yazpp_1::GDU *gdu = &m_package->response();
66     if (gdu->get())
67     {
68         int len;
69         m_session->send_GDU(gdu->get(), &len);
70     }
71     if (m_session->m_no_requests == 0 && m_package->session().is_closed())
72         delete m_session;
73     delete this;
74 }
75
76 yp2::IThreadPoolMsg *ThreadPoolPackage::handle() 
77 {
78     m_package->move();
79     return this;
80 }
81
82
83 ZAssocServerChild::ZAssocServerChild(yazpp_1::IPDU_Observable *PDU_Observable,
84                                      yp2::ThreadPoolSocketObserver *my_thread_pool,
85                                      const yp2::Package *package)
86     :  Z_Assoc(PDU_Observable)
87 {
88     m_thread_pool_observer = my_thread_pool;
89     m_no_requests = 0;
90     m_delete_flag = false;
91     m_package = package;
92 }
93
94
95 yazpp_1::IPDU_Observer *ZAssocServerChild::sessionNotify(yazpp_1::IPDU_Observable
96                                                   *the_PDU_Observable, int fd)
97 {
98     return 0;
99 }
100
101 ZAssocServerChild::~ZAssocServerChild()
102 {
103 }
104
105 void ZAssocServerChild::recv_GDU(Z_GDU *z_pdu, int len)
106 {
107     m_no_requests++;
108
109     yp2::Package *p = new yp2::Package(m_session, m_origin);
110
111     ThreadPoolPackage *tp = new ThreadPoolPackage(p, this);
112     p->copy_filter(*m_package);
113     p->request() = yazpp_1::GDU(z_pdu);
114     m_thread_pool_observer->put(tp);  
115 }
116
117 void ZAssocServerChild::failNotify()
118 {
119     // TODO: send Package to signal "close"
120     if (m_session.is_closed())
121         return;
122     m_no_requests++;
123
124     m_session.close();
125
126     yp2::Package *p = new yp2::Package(m_session, m_origin);
127
128     ThreadPoolPackage *tp = new ThreadPoolPackage(p, this);
129     p->copy_filter(*m_package);
130     m_thread_pool_observer->put(tp);  
131 }
132
133 void ZAssocServerChild::timeoutNotify()
134 {
135     failNotify();
136 }
137
138 void ZAssocServerChild::connectNotify()
139 {
140
141 }
142
143 class ZAssocServer : public yazpp_1::Z_Assoc {
144 public:
145     ~ZAssocServer();
146     ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
147               yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
148               const yp2::Package *package);
149 private:
150     yazpp_1::IPDU_Observer* sessionNotify(
151         yazpp_1::IPDU_Observable *the_PDU_Observable,
152         int fd);
153     void recv_GDU(Z_GDU *apdu, int len);
154     
155     void failNotify();
156     void timeoutNotify();
157     void connectNotify();
158 private:
159     yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
160     const yp2::Package *m_package;
161 };
162
163
164 ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
165                            yp2::ThreadPoolSocketObserver *thread_pool_observer,
166                            const yp2::Package *package)
167     :  Z_Assoc(PDU_Observable)
168 {
169     m_thread_pool_observer = thread_pool_observer;
170     m_package = package;
171
172 }
173
174 yazpp_1::IPDU_Observer *ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
175                                                  *the_PDU_Observable, int fd)
176 {
177     ZAssocServerChild *my =
178         new ZAssocServerChild(the_PDU_Observable, m_thread_pool_observer,
179                               m_package);
180     return my;
181 }
182
183 ZAssocServer::~ZAssocServer()
184 {
185 }
186
187 void ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
188 {
189 }
190
191 void ZAssocServer::failNotify()
192 {
193 }
194
195 void ZAssocServer::timeoutNotify()
196 {
197 }
198
199 void ZAssocServer::connectNotify()
200 {
201 }
202
203 yp2::filter::FrontendNet::FrontendNet()
204 {
205     m_no_threads = 5;
206     m_listen_duration = 0;
207 }
208
209 class My_Timer_Thread : public yazpp_1::ISocketObserver {
210 private:
211     yazpp_1::ISocketObservable *m_obs;
212     int m_fd[2];
213     bool m_timeout;
214 public:
215     My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
216     void socketNotify(int event);
217     bool timeout();
218 };
219
220 bool My_Timer_Thread::timeout()
221 {
222     return m_timeout;
223 }
224
225 My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
226                                  int duration) : 
227     m_obs(obs), m_timeout(false)
228 {
229     pipe(m_fd);
230     obs->addObserver(m_fd[0], this);
231     obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
232     obs->timeoutObserver(this, duration);
233 }
234
235 void My_Timer_Thread::socketNotify(int event)
236 {
237     m_timeout = true;
238     m_obs->deleteObserver(this);
239     close(m_fd[0]);
240     close(m_fd[1]);
241 }
242
243 void yp2::filter::FrontendNet::process(Package &package) const {
244     yazpp_1::SocketManager mySocketManager;
245
246     My_Timer_Thread *tt = 0;
247     if (m_listen_duration)
248         tt = new My_Timer_Thread(&mySocketManager, m_listen_duration);
249
250     ThreadPoolSocketObserver threadPool(&mySocketManager, m_no_threads);
251
252     ZAssocServer **az = new ZAssocServer *[m_ports.size()];
253
254     // Create ZAssocServer for each port
255     size_t i;
256     for (i = 0; i<m_ports.size(); i++)
257     {
258         // create a PDU assoc object (one per ZAssocServer)
259         yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&mySocketManager);
260
261         // create ZAssoc with PDU Assoc
262         az[i] = new ZAssocServer(as, &threadPool, &package);
263         az[i]->server(m_ports[i].c_str());
264     }
265     while (mySocketManager.processEvent() > 0)
266     {
267         if (tt && tt->timeout())
268             break;
269     }
270     for (i = 0; i<m_ports.size(); i++)
271         delete az[i];
272
273     delete [] az;
274     delete tt;
275 }
276
277 std::vector<std::string> &yp2::filter::FrontendNet::ports()
278 {
279     return m_ports;
280 }
281
282 int &yp2::filter::FrontendNet::listen_duration()
283 {
284     return m_listen_duration;
285 }
286