Make a few internal classes part of yp2 ns
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
1 /* $Id: filter_frontend_net.cpp,v 1.8 2005-11-07 12:31:43 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7
8 #include "config.hpp"
9
10 #include "filter.hpp"
11 #include "router.hpp"
12 #include "package.hpp"
13 #include "thread_pool_observer.hpp"
14 #include "filter_frontend_net.hpp"
15 #include <yaz++/z-assoc.h>
16 #include <yaz++/pdu-assoc.h>
17 #include <yaz++/socket-manager.h>
18 #include <yaz/log.h>
19
20 #include <iostream>
21
22 namespace yp2 {
23     class My_Timer_Thread : public yazpp_1::ISocketObserver {
24     private:
25         yazpp_1::ISocketObservable *m_obs;
26         int m_fd[2];
27         bool m_timeout;
28     public:
29         My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
30         void socketNotify(int event);
31         bool timeout();
32     };
33     class ZAssocChild : public yazpp_1::Z_Assoc {
34     public:
35         ~ZAssocChild();
36         ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
37                           yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
38                           const yp2::Package *package);
39         int m_no_requests;
40     private:
41         yazpp_1::IPDU_Observer* sessionNotify(
42             yazpp_1::IPDU_Observable *the_PDU_Observable,
43             int fd);
44         void recv_GDU(Z_GDU *apdu, int len);
45         
46         void failNotify();
47         void timeoutNotify();
48         void connectNotify();
49     private:
50         yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
51         yp2::Session m_session;
52         yp2::Origin m_origin;
53         bool m_delete_flag;
54         const yp2::Package *m_package;
55     };
56     class ThreadPoolPackage : public yp2::IThreadPoolMsg {
57     public:
58         ThreadPoolPackage(yp2::Package *package, yp2::ZAssocChild *ses) :
59             m_session(ses), m_package(package) { };
60         ~ThreadPoolPackage();
61         IThreadPoolMsg *handle();
62         void result();
63         
64     private:
65         yp2::ZAssocChild *m_session;
66         yp2::Package *m_package;
67         
68     };
69     class ZAssocServer : public yazpp_1::Z_Assoc {
70     public:
71         ~ZAssocServer();
72         ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
73                      yp2::ThreadPoolSocketObserver *m_thread_pool_observer,
74                      const yp2::Package *package);
75     private:
76         yazpp_1::IPDU_Observer* sessionNotify(
77             yazpp_1::IPDU_Observable *the_PDU_Observable,
78             int fd);
79         void recv_GDU(Z_GDU *apdu, int len);
80         
81         void failNotify();
82         void timeoutNotify();
83     void connectNotify();
84     private:
85         yp2::ThreadPoolSocketObserver *m_thread_pool_observer;
86         const yp2::Package *m_package;
87     };
88 }
89
90 yp2::ThreadPoolPackage::~ThreadPoolPackage()
91 {
92     delete m_package;
93 }
94
95 void yp2::ThreadPoolPackage::result()
96 {
97     m_session->m_no_requests--;
98
99     yazpp_1::GDU *gdu = &m_package->response();
100     if (gdu->get())
101     {
102         int len;
103         m_session->send_GDU(gdu->get(), &len);
104     }
105     if (m_session->m_no_requests == 0 && m_package->session().is_closed())
106         delete m_session;
107     delete this;
108 }
109
110 yp2::IThreadPoolMsg *yp2::ThreadPoolPackage::handle() 
111 {
112     m_package->move();
113     return this;
114 }
115
116
117 yp2::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
118                                      yp2::ThreadPoolSocketObserver *my_thread_pool,
119                                      const yp2::Package *package)
120     :  Z_Assoc(PDU_Observable)
121 {
122     m_thread_pool_observer = my_thread_pool;
123     m_no_requests = 0;
124     m_delete_flag = false;
125     m_package = package;
126 }
127
128
129 yazpp_1::IPDU_Observer *yp2::ZAssocChild::sessionNotify(yazpp_1::IPDU_Observable
130                                                   *the_PDU_Observable, int fd)
131 {
132     return 0;
133 }
134
135 yp2::ZAssocChild::~ZAssocChild()
136 {
137 }
138
139 void yp2::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
140 {
141     m_no_requests++;
142
143     yp2::Package *p = new yp2::Package(m_session, m_origin);
144
145     yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
146     p->copy_filter(*m_package);
147     p->request() = yazpp_1::GDU(z_pdu);
148     m_thread_pool_observer->put(tp);  
149 }
150
151 void yp2::ZAssocChild::failNotify()
152 {
153     // TODO: send Package to signal "close"
154     if (m_session.is_closed())
155         return;
156     m_no_requests++;
157
158     m_session.close();
159
160     yp2::Package *p = new yp2::Package(m_session, m_origin);
161
162     yp2::ThreadPoolPackage *tp = new yp2::ThreadPoolPackage(p, this);
163     p->copy_filter(*m_package);
164     m_thread_pool_observer->put(tp);  
165 }
166
167 void yp2::ZAssocChild::timeoutNotify()
168 {
169     failNotify();
170 }
171
172 void yp2::ZAssocChild::connectNotify()
173 {
174
175 }
176
177 yp2::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
178                            yp2::ThreadPoolSocketObserver *thread_pool_observer,
179                            const yp2::Package *package)
180     :  Z_Assoc(PDU_Observable)
181 {
182     m_thread_pool_observer = thread_pool_observer;
183     m_package = package;
184
185 }
186
187 yazpp_1::IPDU_Observer *yp2::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
188                                                  *the_PDU_Observable, int fd)
189 {
190     yp2::ZAssocChild *my =
191         new yp2::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
192                              m_package);
193     return my;
194 }
195
196 yp2::ZAssocServer::~ZAssocServer()
197 {
198 }
199
200 void yp2::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
201 {
202 }
203
204 void yp2::ZAssocServer::failNotify()
205 {
206 }
207
208 void yp2::ZAssocServer::timeoutNotify()
209 {
210 }
211
212 void yp2::ZAssocServer::connectNotify()
213 {
214 }
215
216 yp2::filter::FrontendNet::FrontendNet()
217 {
218     m_no_threads = 5;
219     m_listen_duration = 0;
220 }
221
222
223 bool yp2::My_Timer_Thread::timeout()
224 {
225     return m_timeout;
226 }
227
228 yp2::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
229                                  int duration) : 
230     m_obs(obs), m_timeout(false)
231 {
232     pipe(m_fd);
233     obs->addObserver(m_fd[0], this);
234     obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
235     obs->timeoutObserver(this, duration);
236 }
237
238 void yp2::My_Timer_Thread::socketNotify(int event)
239 {
240     m_timeout = true;
241     m_obs->deleteObserver(this);
242     close(m_fd[0]);
243     close(m_fd[1]);
244 }
245
246 void yp2::filter::FrontendNet::process(Package &package) const {
247     yazpp_1::SocketManager mySocketManager;
248
249     My_Timer_Thread *tt = 0;
250     if (m_listen_duration)
251         tt = new My_Timer_Thread(&mySocketManager, m_listen_duration);
252
253     ThreadPoolSocketObserver threadPool(&mySocketManager, m_no_threads);
254
255     yp2::ZAssocServer **az = new yp2::ZAssocServer *[m_ports.size()];
256
257     // Create yp2::ZAssocServer for each port
258     size_t i;
259     for (i = 0; i<m_ports.size(); i++)
260     {
261         // create a PDU assoc object (one per yp2::ZAssocServer)
262         yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&mySocketManager);
263
264         // create ZAssoc with PDU Assoc
265         az[i] = new yp2::ZAssocServer(as, &threadPool, &package);
266         az[i]->server(m_ports[i].c_str());
267     }
268     while (mySocketManager.processEvent() > 0)
269     {
270         if (tt && tt->timeout())
271             break;
272     }
273     for (i = 0; i<m_ports.size(); i++)
274         delete az[i];
275
276     delete [] az;
277     delete tt;
278 }
279
280 std::vector<std::string> &yp2::filter::FrontendNet::ports()
281 {
282     return m_ports;
283 }
284
285 int &yp2::filter::FrontendNet::listen_duration()
286 {
287     return m_listen_duration;
288 }
289
290 /*
291  * Local variables:
292  * c-basic-offset: 4
293  * indent-tabs-mode: nil
294  * c-file-style: "stroustrup"
295  * End:
296  * vim: shiftwidth=4 tabstop=8 expandtab
297  */