Calls to pipe(2) replaced with usage of Pipe class. Now passes
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
1 /* $Id: filter_frontend_net.cpp,v 1.9 2005-11-07 21:57:10 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7
8 #include "config.hpp"
9
10 #include "pipe.hpp"
11 #include "filter.hpp"
12 #include "router.hpp"
13 #include "package.hpp"
14 #include "thread_pool_observer.hpp"
15 #include "filter_frontend_net.hpp"
16 #include <yaz++/z-assoc.h>
17 #include <yaz++/pdu-assoc.h>
18 #include <yaz++/socket-manager.h>
19 #include <yaz/log.h>
20
21 #include <iostream>
22
23 namespace yp2 {
24     class My_Timer_Thread : public yazpp_1::ISocketObserver {
25     private:
26         yazpp_1::ISocketObservable *m_obs;
27         Pipe m_pipe;
28         bool m_timeout;
29     public:
30         My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
31         void socketNotify(int event);
32         bool timeout();
33     };
34     class ZAssocChild : public yazpp_1::Z_Assoc {
35     public:
36         ~ZAssocChild();
37         ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
38                           yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
39                           const yp2::Package *package);
40         int m_no_requests;
41     private:
42         yazpp_1::IPDU_Observer* sessionNotify(
43             yazpp_1::IPDU_Observable *the_PDU_Observable,
44             int fd);
45         void recv_GDU(Z_GDU *apdu, int len);
46         
47         void failNotify();
48         void timeoutNotify();
49         void connectNotify();
50     private:
51         yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
52         yp2::Session m_session;
53         yp2::Origin m_origin;
54         bool m_delete_flag;
55         const yp2::Package *m_package;
56     };
57     class ThreadPoolPackage : public yp2::IThreadPoolMsg {
58     public:
59         ThreadPoolPackage(yp2::Package *package, yp2::ZAssocChild *ses) :
60             m_session(ses), m_package(package) { };
61         ~ThreadPoolPackage();
62         IThreadPoolMsg *handle();
63         void result();
64         
65     private:
66         yp2::ZAssocChild *m_session;
67         yp2::Package *m_package;
68         
69     };
70     class ZAssocServer : public yazpp_1::Z_Assoc {
71     public:
72         ~ZAssocServer();
73         ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
74                      yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
75                      const yp2::Package *package);
76     private:
77         yazpp_1::IPDU_Observer* sessionNotify(
78             yazpp_1::IPDU_Observable *the_PDU_Observable,
79             int fd);
80         void recv_GDU(Z_GDU *apdu, int len);
81         
82         void failNotify();
83         void timeoutNotify();
84     void connectNotify();
85     private:
86         yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
87         const yp2::Package *m_package;
88     };
89 }
90
91 yp2::ThreadPoolPackage::~ThreadPoolPackage()
92 {
93     delete m_package;
94 }
95
96 void yp2::ThreadPoolPackage::result()
97 {
98     m_session->m_no_requests--;
99
100     yazpp_1::GDU *gdu = &m_package->response();
101     if (gdu->get())
102     {
103         int len;
104         m_session->send_GDU(gdu->get(), &len);
105     }
106     if (m_session->m_no_requests == 0 && m_package->session().is_closed())
107         delete m_session;
108     delete this;
109 }
110
111 yp2::IThreadPoolMsg *yp2::ThreadPoolPackage::handle() 
112 {
113     m_package->move();
114     return this;
115 }
116
117
118 yp2::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
119                                      yp2::ThreadPoolSocketObserver *my_thread_pool,
120                                      const yp2::Package *package)
121     :  Z_Assoc(PDU_Observable)
122 {
123     m_thread_pool_observer = my_thread_pool;
124     m_no_requests = 0;
125     m_delete_flag = false;
126     m_package = package;
127 }
128
129
130 yazpp_1::IPDU_Observer *yp2::ZAssocChild::sessionNotify(yazpp_1::IPDU_Observable
131                                                   *the_PDU_Observable, int fd)
132 {
133     return 0;
134 }
135
136 yp2::ZAssocChild::~ZAssocChild()
137 {
138 }
139
140 void yp2::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
141 {
142     m_no_requests++;
143
144     yp2::Package *p = new yp2::Package(m_session, m_origin);
145
146     yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
147     p->copy_filter(*m_package);
148     p->request() = yazpp_1::GDU(z_pdu);
149     m_thread_pool_observer->put(tp);  
150 }
151
152 void yp2::ZAssocChild::failNotify()
153 {
154     // TODO: send Package to signal "close"
155     if (m_session.is_closed())
156         return;
157     m_no_requests++;
158
159     m_session.close();
160
161     yp2::Package *p = new yp2::Package(m_session, m_origin);
162
163     yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
164     p->copy_filter(*m_package);
165     m_thread_pool_observer->put(tp);  
166 }
167
168 void yp2::ZAssocChild::timeoutNotify()
169 {
170     failNotify();
171 }
172
173 void yp2::ZAssocChild::connectNotify()
174 {
175
176 }
177
178 yp2::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
179                            yp2::ThreadPoolSocketObserver *thread_pool_observer,
180                            const yp2::Package *package)
181     :  Z_Assoc(PDU_Observable)
182 {
183     m_thread_pool_observer = thread_pool_observer;
184     m_package = package;
185
186 }
187
188 yazpp_1::IPDU_Observer *yp2::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
189                                                  *the_PDU_Observable, int fd)
190 {
191     yp2::ZAssocChild *my =
192         new yp2::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
193                              m_package);
194     return my;
195 }
196
197 yp2::ZAssocServer::~ZAssocServer()
198 {
199 }
200
201 void yp2::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
202 {
203 }
204
205 void yp2::ZAssocServer::failNotify()
206 {
207 }
208
209 void yp2::ZAssocServer::timeoutNotify()
210 {
211 }
212
213 void yp2::ZAssocServer::connectNotify()
214 {
215 }
216
217 yp2::filter::FrontendNet::FrontendNet()
218 {
219     m_no_threads = 5;
220     m_listen_duration = 0;
221 }
222
223
224 bool yp2::My_Timer_Thread::timeout()
225 {
226     return m_timeout;
227 }
228
229 yp2::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
230                                  int duration) : 
231     m_obs(obs), m_pipe(9123), m_timeout(false)
232 {
233     obs->addObserver(m_pipe.read_fd(), this);
234     obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
235     obs->timeoutObserver(this, duration);
236 }
237
238 void yp2::My_Timer_Thread::socketNotify(int event)
239 {
240     m_timeout = true;
241     m_obs->deleteObserver(this);
242 }
243
244 void yp2::filter::FrontendNet::process(Package &package) const {
245     yazpp_1::SocketManager mySocketManager;
246
247     My_Timer_Thread *tt = 0;
248     if (m_listen_duration)
249         tt = new My_Timer_Thread(&mySocketManager, m_listen_duration);
250
251     ThreadPoolSocketObserver threadPool(&mySocketManager, m_no_threads);
252
253     yp2::ZAssocServer **az = new yp2::ZAssocServer *[m_ports.size()];
254
255     // Create yp2::ZAssocServer for each port
256     size_t i;
257     for (i = 0; i<m_ports.size(); i++)
258     {
259         // create a PDU assoc object (one per yp2::ZAssocServer)
260         yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&mySocketManager);
261
262         // create ZAssoc with PDU Assoc
263         az[i] = new yp2::ZAssocServer(as, &threadPool, &package);
264         az[i]->server(m_ports[i].c_str());
265     }
266     while (mySocketManager.processEvent() > 0)
267     {
268         if (tt && tt->timeout())
269             break;
270     }
271     for (i = 0; i<m_ports.size(); i++)
272         delete az[i];
273
274     delete [] az;
275     delete tt;
276 }
277
278 std::vector<std::string> &yp2::filter::FrontendNet::ports()
279 {
280     return m_ports;
281 }
282
283 int &yp2::filter::FrontendNet::listen_duration()
284 {
285     return m_listen_duration;
286 }
287
288 /*
289  * Local variables:
290  * c-basic-offset: 4
291  * indent-tabs-mode: nil
292  * c-file-style: "stroustrup"
293  * End:
294  * vim: shiftwidth=4 tabstop=8 expandtab
295  */