Same header and footer for all files. Header includes copyright +
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
1 /* $Id: filter_frontend_net.cpp,v 1.7 2005-10-15 14:09:09 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7
8 #include "config.hpp"
9
10 #include "filter.hpp"
11 #include "router.hpp"
12 #include "package.hpp"
13 #include "thread_pool_observer.hpp"
14 #include "filter_frontend_net.hpp"
15 #include <yaz++/z-assoc.h>
16 #include <yaz++/pdu-assoc.h>
17 #include <yaz++/socket-manager.h>
18 #include <yaz/log.h>
19
20 #include <iostream>
21
22 class ZAssocServerChild : public yazpp_1::Z_Assoc {
23 public:
24     ~ZAssocServerChild();
25     ZAssocServerChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
26                yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
27                const yp2::Package *package);
28     int m_no_requests;
29 private:
30     yazpp_1::IPDU_Observer* sessionNotify(
31         yazpp_1::IPDU_Observable *the_PDU_Observable,
32         int fd);
33     void recv_GDU(Z_GDU *apdu, int len);
34     
35     void failNotify();
36     void timeoutNotify();
37     void connectNotify();
38 private:
39     yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
40     yp2::Session m_session;
41     yp2::Origin m_origin;
42     bool m_delete_flag;
43     const yp2::Package *m_package;
44 };
45
46
47 class ThreadPoolPackage : public yp2::IThreadPoolMsg {
48 public:
49     ThreadPoolPackage(yp2::Package *package, ZAssocServerChild *ses) :
50         m_session(ses), m_package(package) { };
51     ~ThreadPoolPackage();
52     IThreadPoolMsg *handle();
53     void result();
54     
55 private:
56     ZAssocServerChild *m_session;
57     yp2::Package *m_package;
58     
59 };
60
61 ThreadPoolPackage::~ThreadPoolPackage()
62 {
63     delete m_package;
64 }
65
66 void ThreadPoolPackage::result()
67 {
68     m_session->m_no_requests--;
69
70     yazpp_1::GDU *gdu = &m_package->response();
71     if (gdu->get())
72     {
73         int len;
74         m_session->send_GDU(gdu->get(), &len);
75     }
76     if (m_session->m_no_requests == 0 && m_package->session().is_closed())
77         delete m_session;
78     delete this;
79 }
80
81 yp2::IThreadPoolMsg *ThreadPoolPackage::handle() 
82 {
83     m_package->move();
84     return this;
85 }
86
87
88 ZAssocServerChild::ZAssocServerChild(yazpp_1::IPDU_Observable *PDU_Observable,
89                                      yp2::ThreadPoolSocketObserver *my_thread_pool,
90                                      const yp2::Package *package)
91     :  Z_Assoc(PDU_Observable)
92 {
93     m_thread_pool_observer = my_thread_pool;
94     m_no_requests = 0;
95     m_delete_flag = false;
96     m_package = package;
97 }
98
99
100 yazpp_1::IPDU_Observer *ZAssocServerChild::sessionNotify(yazpp_1::IPDU_Observable
101                                                   *the_PDU_Observable, int fd)
102 {
103     return 0;
104 }
105
106 ZAssocServerChild::~ZAssocServerChild()
107 {
108 }
109
110 void ZAssocServerChild::recv_GDU(Z_GDU *z_pdu, int len)
111 {
112     m_no_requests++;
113
114     yp2::Package *p = new yp2::Package(m_session, m_origin);
115
116     ThreadPoolPackage *tp = new ThreadPoolPackage(p, this);
117     p->copy_filter(*m_package);
118     p->request() = yazpp_1::GDU(z_pdu);
119     m_thread_pool_observer->put(tp);  
120 }
121
122 void ZAssocServerChild::failNotify()
123 {
124     // TODO: send Package to signal "close"
125     if (m_session.is_closed())
126         return;
127     m_no_requests++;
128
129     m_session.close();
130
131     yp2::Package *p = new yp2::Package(m_session, m_origin);
132
133     ThreadPoolPackage *tp = new ThreadPoolPackage(p, this);
134     p->copy_filter(*m_package);
135     m_thread_pool_observer->put(tp);  
136 }
137
138 void ZAssocServerChild::timeoutNotify()
139 {
140     failNotify();
141 }
142
143 void ZAssocServerChild::connectNotify()
144 {
145
146 }
147
148 class ZAssocServer : public yazpp_1::Z_Assoc {
149 public:
150     ~ZAssocServer();
151     ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
152               yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
153               const yp2::Package *package);
154 private:
155     yazpp_1::IPDU_Observer* sessionNotify(
156         yazpp_1::IPDU_Observable *the_PDU_Observable,
157         int fd);
158     void recv_GDU(Z_GDU *apdu, int len);
159     
160     void failNotify();
161     void timeoutNotify();
162     void connectNotify();
163 private:
164     yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
165     const yp2::Package *m_package;
166 };
167
168
169 ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
170                            yp2::ThreadPoolSocketObserver *thread_pool_observer,
171                            const yp2::Package *package)
172     :  Z_Assoc(PDU_Observable)
173 {
174     m_thread_pool_observer = thread_pool_observer;
175     m_package = package;
176
177 }
178
179 yazpp_1::IPDU_Observer *ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
180                                                  *the_PDU_Observable, int fd)
181 {
182     ZAssocServerChild *my =
183         new ZAssocServerChild(the_PDU_Observable, m_thread_pool_observer,
184                               m_package);
185     return my;
186 }
187
188 ZAssocServer::~ZAssocServer()
189 {
190 }
191
192 void ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
193 {
194 }
195
196 void ZAssocServer::failNotify()
197 {
198 }
199
200 void ZAssocServer::timeoutNotify()
201 {
202 }
203
204 void ZAssocServer::connectNotify()
205 {
206 }
207
208 yp2::filter::FrontendNet::FrontendNet()
209 {
210     m_no_threads = 5;
211     m_listen_duration = 0;
212 }
213
214 class My_Timer_Thread : public yazpp_1::ISocketObserver {
215 private:
216     yazpp_1::ISocketObservable *m_obs;
217     int m_fd[2];
218     bool m_timeout;
219 public:
220     My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
221     void socketNotify(int event);
222     bool timeout();
223 };
224
225 bool My_Timer_Thread::timeout()
226 {
227     return m_timeout;
228 }
229
230 My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
231                                  int duration) : 
232     m_obs(obs), m_timeout(false)
233 {
234     pipe(m_fd);
235     obs->addObserver(m_fd[0], this);
236     obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
237     obs->timeoutObserver(this, duration);
238 }
239
240 void My_Timer_Thread::socketNotify(int event)
241 {
242     m_timeout = true;
243     m_obs->deleteObserver(this);
244     close(m_fd[0]);
245     close(m_fd[1]);
246 }
247
248 void yp2::filter::FrontendNet::process(Package &package) const {
249     yazpp_1::SocketManager mySocketManager;
250
251     My_Timer_Thread *tt = 0;
252     if (m_listen_duration)
253         tt = new My_Timer_Thread(&mySocketManager, m_listen_duration);
254
255     ThreadPoolSocketObserver threadPool(&mySocketManager, m_no_threads);
256
257     ZAssocServer **az = new ZAssocServer *[m_ports.size()];
258
259     // Create ZAssocServer for each port
260     size_t i;
261     for (i = 0; i<m_ports.size(); i++)
262     {
263         // create a PDU assoc object (one per ZAssocServer)
264         yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&mySocketManager);
265
266         // create ZAssoc with PDU Assoc
267         az[i] = new ZAssocServer(as, &threadPool, &package);
268         az[i]->server(m_ports[i].c_str());
269     }
270     while (mySocketManager.processEvent() > 0)
271     {
272         if (tt && tt->timeout())
273             break;
274     }
275     for (i = 0; i<m_ports.size(); i++)
276         delete az[i];
277
278     delete [] az;
279     delete tt;
280 }
281
282 std::vector<std::string> &yp2::filter::FrontendNet::ports()
283 {
284     return m_ports;
285 }
286
287 int &yp2::filter::FrontendNet::listen_duration()
288 {
289     return m_listen_duration;
290 }
291
292 /*
293  * Local variables:
294  * c-basic-offset: 4
295  * indent-tabs-mode: nil
296  * c-file-style: "stroustrup"
297  * End:
298  * vim: shiftwidth=4 tabstop=8 expandtab
299  */