Do not include router.hpp in filters
[metaproxy-moved-to-github.git] / src / filter_z3950_client.cpp
1 /* $Id: filter_z3950_client.cpp,v 1.18 2006-01-09 21:20:15 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "xmlutil.hpp"
10 #include "filter.hpp"
11 #include "package.hpp"
12 #include "util.hpp"
13 #include "filter_z3950_client.hpp"
14
15 #include <map>
16 #include <stdexcept>
17 #include <list>
18 #include <iostream>
19
20 #include <boost/thread/mutex.hpp>
21 #include <boost/thread/condition.hpp>
22
23 #include <yaz/zgdu.h>
24 #include <yaz/log.h>
25 #include <yaz/otherinfo.h>
26 #include <yaz/diagbib1.h>
27
28 #include <yaz++/socket-manager.h>
29 #include <yaz++/pdu-assoc.h>
30 #include <yaz++/z-assoc.h>
31
32 namespace yf = yp2::filter;
33
34 namespace yp2 {
35     namespace filter {
36         class Z3950Client::Assoc : public yazpp_1::Z_Assoc{
37             friend class Rep;
38             Assoc(yazpp_1::SocketManager *socket_manager,
39                   yazpp_1::IPDU_Observable *PDU_Observable,
40                   std::string host, int timeout);
41             ~Assoc();
42             void connectNotify();
43             void failNotify();
44             void timeoutNotify();
45             void recv_GDU(Z_GDU *gdu, int len);
46             yazpp_1::IPDU_Observer* sessionNotify(
47                 yazpp_1::IPDU_Observable *the_PDU_Observable,
48                 int fd);
49
50             yazpp_1::SocketManager *m_socket_manager;
51             yazpp_1::IPDU_Observable *m_PDU_Observable;
52             Package *m_package;
53             bool m_in_use;
54             bool m_waiting;
55             bool m_destroyed;
56             bool m_connected;
57             int m_queue_len;
58             int m_time_elapsed;
59             int m_time_max;
60             std::string m_host;
61         };
62
63         class Z3950Client::Rep {
64         public:
65             int m_timeout_sec;
66             boost::mutex m_mutex;
67             boost::condition m_cond_session_ready;
68             std::map<yp2::Session,Z3950Client::Assoc *> m_clients;
69             Z3950Client::Assoc *get_assoc(Package &package);
70             void send_and_receive(Package &package,
71                                   yf::Z3950Client::Assoc *c);
72             void release_assoc(Package &package);
73         };
74     }
75 }
76
77 using namespace yp2;
78
79 yf::Z3950Client::Assoc::Assoc(yazpp_1::SocketManager *socket_manager,
80                               yazpp_1::IPDU_Observable *PDU_Observable,
81                               std::string host, int timeout_sec)
82     :  Z_Assoc(PDU_Observable),
83        m_socket_manager(socket_manager), m_PDU_Observable(PDU_Observable),
84        m_package(0), m_in_use(true), m_waiting(false), 
85        m_destroyed(false), m_connected(false), m_queue_len(1),
86        m_time_elapsed(0), m_time_max(timeout_sec), 
87        m_host(host)
88 {
89     // std::cout << "create assoc " << this << "\n";
90 }
91
92 yf::Z3950Client::Assoc::~Assoc()
93 {
94     // std::cout << "destroy assoc " << this << "\n";
95 }
96
97 void yf::Z3950Client::Assoc::connectNotify()
98 {
99     m_waiting = false;
100
101     m_connected = true;
102 }
103
104 void yf::Z3950Client::Assoc::failNotify()
105 {
106     m_waiting = false;
107
108     yp2::odr odr;
109
110     if (m_package)
111     {
112         m_package->response() = odr.create_close(Z_Close_peerAbort, 0);
113         m_package->session().close();
114     }
115 }
116
117 void yf::Z3950Client::Assoc::timeoutNotify()
118 {
119     m_time_elapsed++;
120     if (m_time_elapsed >= m_time_max)
121     {
122         m_waiting = false;
123
124         yp2::odr odr;
125         
126         if (m_package)
127         {
128             if (m_connected)
129                 m_package->response() = odr.create_close(Z_Close_lackOfActivity, 0);
130             else
131                 m_package->response() = odr.create_close(Z_Close_peerAbort, 0);
132                 
133             m_package->session().close();
134         }
135     }
136 }
137
138 void yf::Z3950Client::Assoc::recv_GDU(Z_GDU *gdu, int len)
139 {
140     m_waiting = false;
141
142     if (m_package)
143         m_package->response() = gdu;
144 }
145
146 yazpp_1::IPDU_Observer *yf::Z3950Client::Assoc::sessionNotify(
147     yazpp_1::IPDU_Observable *the_PDU_Observable,
148     int fd)
149 {
150     return 0;
151 }
152
153
154 yf::Z3950Client::Z3950Client() :  m_p(new yf::Z3950Client::Rep)
155 {
156     m_p->m_timeout_sec = 30;
157 }
158
159 yf::Z3950Client::~Z3950Client() {
160 }
161
162 yf::Z3950Client::Assoc *yf::Z3950Client::Rep::get_assoc(Package &package) 
163 {
164     // only one thread messes with the clients list at a time
165     boost::mutex::scoped_lock lock(m_mutex);
166
167     std::map<yp2::Session,yf::Z3950Client::Assoc *>::iterator it;
168     
169     Z_GDU *gdu = package.request().get();
170     // only deal with Z39.50
171     if (!gdu || gdu->which != Z_GDU_Z3950)
172     {
173         package.move();
174         return 0;
175     }
176     it = m_clients.find(package.session());
177     if (it != m_clients.end())
178     {
179         it->second->m_queue_len++;
180         while(true)
181         {
182 #if 0
183             if (gdu && gdu->which == Z_GDU_Z3950 &&
184                 gdu->u.z3950->which == Z_APDU_initRequest)
185             {
186                 yazpp_1::SocketManager *s = it->second->m_socket_manager;
187                 delete it->second;  // destroy Z_Assoc
188                 delete s;    // then manager
189                 m_clients.erase(it);
190                 break;
191             }
192 #endif
193             if (!it->second->m_in_use)
194             {
195                 it->second->m_in_use = true;
196                 return it->second;
197             }
198             m_cond_session_ready.wait(lock);
199         }
200     }
201     // new Z39.50 session ..
202     Z_APDU *apdu = gdu->u.z3950;
203     // check that it is init. If not, close
204     if (apdu->which != Z_APDU_initRequest)
205     {
206         yp2::odr odr;
207         
208         package.response() = odr.create_close(Z_Close_protocolError,
209                                               "First PDU was not an "
210                                               "Initialize Request");
211         package.session().close();
212         return 0;
213     }
214     // check virtual host
215     const char *vhost =
216         yaz_oi_get_string_oidval(&apdu->u.initRequest->otherInfo,
217                                  VAL_PROXY, 
218                                  /* categoryValue */ 1, /* delete */ 1);
219     if (!vhost)
220     {
221         yp2::odr odr;
222         package.response() = odr.create_initResponse(
223             YAZ_BIB1_INIT_NEGOTIATION_OPTION_REQUIRED,
224             "Virtual host not given");
225         
226         package.session().close();
227         return 0;
228     }
229                       
230     yazpp_1::SocketManager *sm = new yazpp_1::SocketManager;
231     yazpp_1::PDU_Assoc *pdu_as = new yazpp_1::PDU_Assoc(sm);
232     yf::Z3950Client::Assoc *as = new yf::Z3950Client::Assoc(sm, pdu_as, vhost,
233                                                             m_timeout_sec);
234     m_clients[package.session()] = as;
235     return as;
236 }
237
238 void yf::Z3950Client::Rep::send_and_receive(Package &package,
239                                             yf::Z3950Client::Assoc *c)
240 {
241     Z_GDU *gdu = package.request().get();
242
243     if (c->m_destroyed)
244         return;
245
246     if (!gdu || gdu->which != Z_GDU_Z3950)
247         return;
248
249     c->m_time_elapsed = 0;
250     c->m_package = &package;
251     c->m_waiting = true;
252     if (!c->m_connected)
253     {
254         c->client(c->m_host.c_str());
255         c->timeout(1);
256
257         while (!c->m_destroyed && c->m_waiting 
258                && c->m_socket_manager->processEvent() > 0)
259             ;
260     }
261     if (!c->m_connected)
262     {
263         return;
264     }
265
266     // prepare response
267     c->m_time_elapsed = 0;
268     c->m_waiting = true;
269     
270     // relay the package  ..
271     int len;
272     c->send_GDU(gdu, &len);
273
274     switch(gdu->u.z3950->which)
275     {
276     case Z_APDU_triggerResourceControlRequest:
277         // request only..
278         break;
279     default:
280         // for the rest: wait for a response PDU
281         while (!c->m_destroyed && c->m_waiting
282                && c->m_socket_manager->processEvent() > 0)
283             ;
284         break;
285     }
286 }
287
288 void yf::Z3950Client::Rep::release_assoc(Package &package)
289 {
290     boost::mutex::scoped_lock lock(m_mutex);
291     std::map<yp2::Session,yf::Z3950Client::Assoc *>::iterator it;
292     
293     it = m_clients.find(package.session());
294     if (it != m_clients.end())
295     {
296         Z_GDU *gdu = package.request().get();
297         if (gdu && gdu->which == Z_GDU_Z3950)
298         {   // only Z39.50 packages lock in get_assoc.. release it
299             it->second->m_in_use = false;
300             it->second->m_queue_len--;
301         }
302
303         if (package.session().is_closed())
304         {
305             // destroy hint (send_and_receive)
306             it->second->m_destroyed = true;
307             
308             // wait until no one is waiting for it.
309             while (it->second->m_queue_len)
310                 m_cond_session_ready.wait(lock);
311             
312             // the Z_Assoc and PDU_Assoc must be destroyed before
313             // the socket manager.. so pull that out.. first..
314             yazpp_1::SocketManager *s = it->second->m_socket_manager;
315             delete it->second;  // destroy Z_Assoc
316             delete s;    // then manager
317             m_clients.erase(it);
318         }
319         m_cond_session_ready.notify_all();
320     }
321 }
322
323 void yf::Z3950Client::process(Package &package) const
324 {
325     yf::Z3950Client::Assoc *c = m_p->get_assoc(package);
326     if (c)
327     {
328         m_p->send_and_receive(package, c);
329     }
330     m_p->release_assoc(package);
331 }
332
333 void yf::Z3950Client::configure(const xmlNode *ptr)
334 {
335     for (ptr = ptr->children; ptr; ptr = ptr->next)
336     {
337         if (ptr->type != XML_ELEMENT_NODE)
338             continue;
339         if (!strcmp((const char *) ptr->name, "timeout"))
340         {
341             std::string timeout_str = yp2::xml::get_text(ptr);
342             int timeout_sec = atoi(timeout_str.c_str());
343             if (timeout_sec < 2)
344                 throw yp2::filter::FilterException("Bad timeout value " 
345                                                    + timeout_str);
346             m_p->m_timeout_sec = timeout_sec;
347         }
348         else
349         {
350             throw yp2::filter::FilterException("Bad element " 
351                                                + std::string((const char *)
352                                                              ptr->name));
353         }
354     }
355 }
356
357 static yp2::filter::Base* filter_creator()
358 {
359     return new yp2::filter::Z3950Client;
360 }
361
362 extern "C" {
363     struct yp2_filter_struct yp2_filter_z3950_client = {
364         0,
365         "z3950_client",
366         filter_creator
367     };
368 }
369
370 /*
371  * Local variables:
372  * c-basic-offset: 4
373  * indent-tabs-mode: nil
374  * c-file-style: "stroustrup"
375  * End:
376  * vim: shiftwidth=4 tabstop=8 expandtab
377  */