RouterFleXML now reads XML simple config and make proper runtime
[metaproxy-moved-to-github.git] / src / filter_z3950_client.cpp
1 /* $Id: filter_z3950_client.cpp,v 1.16 2006-01-09 13:43:59 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "router.hpp"
11 #include "package.hpp"
12 #include "util.hpp"
13 #include "filter_z3950_client.hpp"
14
15 #include <map>
16 #include <stdexcept>
17 #include <list>
18 #include <iostream>
19
20 #include <boost/thread/mutex.hpp>
21 #include <boost/thread/condition.hpp>
22
23 #include <yaz/zgdu.h>
24 #include <yaz/log.h>
25 #include <yaz/otherinfo.h>
26 #include <yaz/diagbib1.h>
27
28 #include <yaz++/socket-manager.h>
29 #include <yaz++/pdu-assoc.h>
30 #include <yaz++/z-assoc.h>
31
32 namespace yf = yp2::filter;
33
34 namespace yp2 {
35     namespace filter {
36         class Z3950Client::Assoc : public yazpp_1::Z_Assoc{
37             friend class Rep;
38             Assoc(yazpp_1::SocketManager *socket_manager,
39                   yazpp_1::IPDU_Observable *PDU_Observable,
40                   std::string host);
41             ~Assoc();
42             void connectNotify();
43             void failNotify();
44             void timeoutNotify();
45             void recv_GDU(Z_GDU *gdu, int len);
46             yazpp_1::IPDU_Observer* sessionNotify(
47                 yazpp_1::IPDU_Observable *the_PDU_Observable,
48                 int fd);
49
50             yazpp_1::SocketManager *m_socket_manager;
51             yazpp_1::IPDU_Observable *m_PDU_Observable;
52             Package *m_package;
53             bool m_in_use;
54             bool m_waiting;
55             bool m_destroyed;
56             bool m_connected;
57             int m_queue_len;
58             int m_ticks;
59             std::string m_host;
60         };
61
62         class Z3950Client::Rep {
63         public:
64             boost::mutex m_mutex;
65             boost::condition m_cond_session_ready;
66             std::map<yp2::Session,Z3950Client::Assoc *> m_clients;
67             Z3950Client::Assoc *get_assoc(Package &package);
68             void send_and_receive(Package &package,
69                                   yf::Z3950Client::Assoc *c);
70             void release_assoc(Package &package);
71         };
72     }
73 }
74
75 using namespace yp2;
76
77 yf::Z3950Client::Assoc::Assoc(yazpp_1::SocketManager *socket_manager,
78                               yazpp_1::IPDU_Observable *PDU_Observable,
79                               std::string host)
80     :  Z_Assoc(PDU_Observable),
81        m_socket_manager(socket_manager), m_PDU_Observable(PDU_Observable),
82        m_package(0), m_in_use(true), m_waiting(false), 
83        m_destroyed(false), m_connected(false), m_queue_len(1), m_ticks(0),
84        m_host(host)
85 {
86     // std::cout << "create assoc " << this << "\n";
87 }
88
89 yf::Z3950Client::Assoc::~Assoc()
90 {
91     // std::cout << "destroy assoc " << this << "\n";
92 }
93
94 void yf::Z3950Client::Assoc::connectNotify()
95 {
96     m_waiting = false;
97
98     m_connected = true;
99 }
100
101 void yf::Z3950Client::Assoc::failNotify()
102 {
103     m_waiting = false;
104
105     yp2::odr odr;
106
107     if (m_package)
108     {
109         m_package->response() = odr.create_close(Z_Close_peerAbort, 0);
110         m_package->session().close();
111     }
112 }
113
114 void yf::Z3950Client::Assoc::timeoutNotify()
115 {
116     m_ticks++;
117     if (m_ticks == 30)
118     {
119         m_waiting = false;
120
121         yp2::odr odr;
122         
123         if (m_package)
124         {
125             if (m_connected)
126                 m_package->response() = odr.create_close(Z_Close_lackOfActivity, 0);
127             else
128                 m_package->response() = odr.create_close(Z_Close_peerAbort, 0);
129                 
130             m_package->session().close();
131         }
132     }
133 }
134
135 void yf::Z3950Client::Assoc::recv_GDU(Z_GDU *gdu, int len)
136 {
137     m_waiting = false;
138
139     if (m_package)
140         m_package->response() = gdu;
141 }
142
143 yazpp_1::IPDU_Observer *yf::Z3950Client::Assoc::sessionNotify(
144     yazpp_1::IPDU_Observable *the_PDU_Observable,
145     int fd)
146 {
147     return 0;
148 }
149
150
151 yf::Z3950Client::Z3950Client() :  m_p(new yf::Z3950Client::Rep)
152 {
153 }
154
155 yf::Z3950Client::~Z3950Client() {
156 }
157
158 yf::Z3950Client::Assoc *yf::Z3950Client::Rep::get_assoc(Package &package) 
159 {
160     // only one thread messes with the clients list at a time
161     boost::mutex::scoped_lock lock(m_mutex);
162
163     std::map<yp2::Session,yf::Z3950Client::Assoc *>::iterator it;
164     
165     Z_GDU *gdu = package.request().get();
166     // only deal with Z39.50
167     if (!gdu || gdu->which != Z_GDU_Z3950)
168     {
169         package.move();
170         return 0;
171     }
172     it = m_clients.find(package.session());
173     if (it != m_clients.end())
174     {
175         it->second->m_queue_len++;
176         while(true)
177         {
178 #if 0
179             if (gdu && gdu->which == Z_GDU_Z3950 &&
180                 gdu->u.z3950->which == Z_APDU_initRequest)
181             {
182                 yazpp_1::SocketManager *s = it->second->m_socket_manager;
183                 delete it->second;  // destroy Z_Assoc
184                 delete s;    // then manager
185                 m_clients.erase(it);
186                 break;
187             }
188 #endif
189             if (!it->second->m_in_use)
190             {
191                 it->second->m_in_use = true;
192                 return it->second;
193             }
194             m_cond_session_ready.wait(lock);
195         }
196     }
197     // new Z39.50 session ..
198     Z_APDU *apdu = gdu->u.z3950;
199     // check that it is init. If not, close
200     if (apdu->which != Z_APDU_initRequest)
201     {
202         yp2::odr odr;
203         
204         package.response() = odr.create_close(Z_Close_protocolError,
205                                               "First PDU was not an "
206                                               "Initialize Request");
207         package.session().close();
208         return 0;
209     }
210     // check virtual host
211     const char *vhost =
212         yaz_oi_get_string_oidval(&apdu->u.initRequest->otherInfo,
213                                  VAL_PROXY, 1, 0);
214     if (!vhost)
215     {
216         yp2::odr odr;
217         package.response() = odr.create_initResponse(
218             YAZ_BIB1_INIT_NEGOTIATION_OPTION_REQUIRED,
219             "Virtual host not given");
220         
221         package.session().close();
222         return 0;
223     }
224     
225     yazpp_1::SocketManager *sm = new yazpp_1::SocketManager;
226     yazpp_1::PDU_Assoc *pdu_as = new yazpp_1::PDU_Assoc(sm);
227     yf::Z3950Client::Assoc *as = new yf::Z3950Client::Assoc(sm, pdu_as, vhost);
228     m_clients[package.session()] = as;
229     return as;
230 }
231
232 void yf::Z3950Client::Rep::send_and_receive(Package &package,
233                                             yf::Z3950Client::Assoc *c)
234 {
235     Z_GDU *gdu = package.request().get();
236
237     if (c->m_destroyed)
238         return;
239
240     if (!gdu || gdu->which != Z_GDU_Z3950)
241         return;
242
243     c->m_ticks = 0;
244     c->m_package = &package;
245     c->m_waiting = true;
246     if (!c->m_connected)
247     {
248         c->client(c->m_host.c_str());
249         c->timeout(1);
250
251         while (!c->m_destroyed && c->m_waiting 
252                && c->m_socket_manager->processEvent() > 0)
253             ;
254     }
255     if (!c->m_connected)
256     {
257         return;
258     }
259
260     // prepare response
261     c->m_ticks = 0;
262     c->m_waiting = true;
263     
264     // relay the package  ..
265     int len;
266     c->send_GDU(gdu, &len);
267
268     switch(gdu->u.z3950->which)
269     {
270     case Z_APDU_triggerResourceControlRequest:
271         // request only..
272         break;
273     default:
274         // for the rest: wait for a response PDU
275         while (!c->m_destroyed && c->m_waiting
276                && c->m_socket_manager->processEvent() > 0)
277             ;
278         break;
279     }
280 }
281
282 void yf::Z3950Client::Rep::release_assoc(Package &package)
283 {
284     boost::mutex::scoped_lock lock(m_mutex);
285     std::map<yp2::Session,yf::Z3950Client::Assoc *>::iterator it;
286     
287     it = m_clients.find(package.session());
288     if (it != m_clients.end())
289     {
290         Z_GDU *gdu = package.request().get();
291         if (gdu && gdu->which == Z_GDU_Z3950)
292         {   // only Z39.50 packages lock in get_assoc.. release it
293             it->second->m_in_use = false;
294             it->second->m_queue_len--;
295         }
296
297         if (package.session().is_closed())
298         {
299             // destroy hint (send_and_receive)
300             it->second->m_destroyed = true;
301             
302             // wait until no one is waiting for it.
303             while (it->second->m_queue_len)
304                 m_cond_session_ready.wait(lock);
305             
306             // the Z_Assoc and PDU_Assoc must be destroyed before
307             // the socket manager.. so pull that out.. first..
308             yazpp_1::SocketManager *s = it->second->m_socket_manager;
309             delete it->second;  // destroy Z_Assoc
310             delete s;    // then manager
311             m_clients.erase(it);
312         }
313         m_cond_session_ready.notify_all();
314     }
315 }
316
317 void yf::Z3950Client::process(Package &package) const
318 {
319     yf::Z3950Client::Assoc *c = m_p->get_assoc(package);
320     if (c)
321     {
322         m_p->send_and_receive(package, c);
323     }
324     m_p->release_assoc(package);
325 }
326
327
328 static yp2::filter::Base* filter_creator()
329 {
330     return new yp2::filter::Z3950Client;
331 }
332
333 extern "C" {
334     struct yp2_filter_struct yp2_filter_z3950_client = {
335         0,
336         "z3950_client",
337         filter_creator
338     };
339 }
340
341 /*
342  * Local variables:
343  * c-basic-offset: 4
344  * indent-tabs-mode: nil
345  * c-file-style: "stroustrup"
346  * End:
347  * vim: shiftwidth=4 tabstop=8 expandtab
348  */