Implemented FilterFrontendNet which is a network server based on
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
1
2
3 #include "config.hpp"
4
5 #include "filter.hpp"
6 #include "router.hpp"
7 #include "package.hpp"
8 #include "thread_pool_observer.hpp"
9 #include "filter_frontend_net.hpp"
10 #include <yaz++/z-assoc.h>
11 #include <yaz++/pdu-assoc.h>
12 #include <yaz++/socket-manager.h>
13 #include <yaz/log.h>
14
15 #include <iostream>
16
17 using namespace yp2;
18
19 class P2_Session : public yazpp_1::Z_Assoc {
20 public:
21     ~P2_Session();
22     P2_Session(yazpp_1::IPDU_Observable *the_PDU_Observable,
23                ThreadPoolSocketObserver *m_my_thread,
24                const Package *package);
25     int m_no_requests;
26 private:
27     yazpp_1::IPDU_Observer* sessionNotify(
28         yazpp_1::IPDU_Observable *the_PDU_Observable,
29         int fd);
30     void recv_GDU(Z_GDU *apdu, int len);
31     
32     void failNotify();
33     void timeoutNotify();
34     void connectNotify();
35 private:
36     ThreadPoolSocketObserver *m_my_thread;
37     Session m_session;
38     Origin m_origin;
39     bool m_delete_flag;
40     const Package *m_package;
41 };
42
43
44 class ThreadPoolPackage : public IThreadPoolMsg {
45 public:
46     ThreadPoolPackage(Package *package, P2_Session *ses) :
47         m_session(ses), m_package(package) { };
48     ~ThreadPoolPackage();
49     IThreadPoolMsg *handle();
50     void result();
51     
52 private:
53     P2_Session *m_session;
54     Package *m_package;
55     
56 };
57
58 ThreadPoolPackage::~ThreadPoolPackage()
59 {
60     delete m_package;
61 }
62
63 void ThreadPoolPackage::result()
64 {
65     m_session->m_no_requests--;
66
67     yazpp_1::GDU *gdu = &m_package->response();
68     if (gdu->get())
69     {
70         int len;
71         m_session->send_GDU(gdu->get(), &len);
72     }
73 }
74
75 IThreadPoolMsg *ThreadPoolPackage::handle() 
76 {
77     m_package->move();
78     return this;
79 }
80
81
82 P2_Session::P2_Session(yazpp_1::IPDU_Observable *the_PDU_Observable,
83                        ThreadPoolSocketObserver *my_thread_pool,
84                        const Package *package)
85     :  Z_Assoc(the_PDU_Observable)
86 {
87     m_my_thread = my_thread_pool;
88     m_no_requests = 0;
89     m_delete_flag = false;
90     m_package = package;
91 }
92
93
94 yazpp_1::IPDU_Observer *P2_Session::sessionNotify(yazpp_1::IPDU_Observable
95                                                   *the_PDU_Observable, int fd)
96 {
97     return 0;
98 }
99
100 P2_Session::~P2_Session()
101 {
102 }
103
104 void P2_Session::recv_GDU(Z_GDU *z_pdu, int len)
105 {
106     m_no_requests++;
107
108     Package *p = new Package(m_session, m_origin);
109
110     ThreadPoolPackage *m = new ThreadPoolPackage(p, this);
111     p->copy_filter(*m_package);
112     p->request() = yazpp_1::GDU(z_pdu);
113     m_my_thread->put(m);  
114 }
115
116 void P2_Session::failNotify()
117 {
118     // TODO: send Package to signal "close"
119     m_delete_flag = true;
120     if (m_no_requests == 0)
121         delete this;
122     
123 }
124
125 void P2_Session::timeoutNotify()
126 {
127     // TODO: send Package to signal "close"
128     m_delete_flag = true;
129     if (m_no_requests == 0)
130         delete this;
131 }
132
133 void P2_Session::connectNotify()
134 {
135
136 }
137
138 class P2_Server : public yazpp_1::Z_Assoc {
139 public:
140     ~P2_Server();
141     P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
142               ThreadPoolSocketObserver *m_my_thread,
143               const Package *package);
144 private:
145     yazpp_1::IPDU_Observer* sessionNotify(
146         yazpp_1::IPDU_Observable *the_PDU_Observable,
147         int fd);
148     void recv_GDU(Z_GDU *apdu, int len);
149     
150     void failNotify();
151     void timeoutNotify();
152     void connectNotify();
153 private:
154     ThreadPoolSocketObserver *m_my_thread;
155     const Package *m_package;
156 };
157
158
159 P2_Server::P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
160                      ThreadPoolSocketObserver *my_thread,
161                      const Package *package)
162     :  Z_Assoc(the_PDU_Observable)
163 {
164     m_my_thread = my_thread;
165     m_package = package;
166
167 }
168
169 yazpp_1::IPDU_Observer *P2_Server::sessionNotify(yazpp_1::IPDU_Observable
170                                                  *the_PDU_Observable, int fd)
171 {
172     P2_Session *my = new P2_Session(the_PDU_Observable, m_my_thread,
173                                     m_package);
174     return my;
175 }
176
177 P2_Server::~P2_Server()
178 {
179 }
180
181 void P2_Server::recv_GDU(Z_GDU *apdu, int len)
182 {
183 }
184
185 void P2_Server::failNotify()
186 {
187 }
188
189 void P2_Server::timeoutNotify()
190 {
191 }
192
193 void P2_Server::connectNotify()
194 {
195 }
196
197 FilterFrontendNet::FilterFrontendNet()
198 {
199     m_no_threads = 5;
200     m_listen_address = "@:9001";
201     m_listen_duration = 0;
202 }
203
204 class My_Timer_Thread : public yazpp_1::ISocketObserver {
205 private:
206     yazpp_1::ISocketObservable *m_obs;
207     int m_fd[2];
208     bool m_timeout;
209 public:
210     My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
211     void socketNotify(int event);
212     bool timeout();
213 };
214
215 bool My_Timer_Thread::timeout()
216 {
217     return m_timeout;
218 }
219
220 My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
221                                  int duration) : 
222     m_obs(obs), m_timeout(false)
223 {
224     pipe(m_fd);
225     obs->addObserver(m_fd[0], this);
226     obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
227     obs->timeoutObserver(this, duration);
228 }
229
230 void My_Timer_Thread::socketNotify(int event)
231 {
232     m_timeout = true;
233     m_obs->deleteObserver(this);
234     close(m_fd[0]);
235     close(m_fd[1]);
236 }
237
238 Package &FilterFrontendNet::process(Package &package) const {
239     yazpp_1::SocketManager mySocketManager;
240
241     My_Timer_Thread *tt = 0;
242     if (m_listen_duration)
243         tt = new My_Timer_Thread(&mySocketManager, m_listen_duration);
244
245     yazpp_1::PDU_Assoc *my_PDU_Assoc =
246         new yazpp_1::PDU_Assoc(&mySocketManager);
247     
248     ThreadPoolSocketObserver threadPool(&mySocketManager, m_no_threads);
249
250     P2_Server z(my_PDU_Assoc, &threadPool, &package);
251     z.server(m_listen_address.c_str());
252
253     while (mySocketManager.processEvent() > 0)
254     {
255         if (tt && tt->timeout())
256             break;
257     }
258     return package;
259 }
260
261 std::string &FilterFrontendNet::listen_address()
262 {
263     return m_listen_address;
264 }
265
266 int &FilterFrontendNet::listen_duration()
267 {
268     return m_listen_duration;
269 }
270