Named routes for filter virt_db. Example in etc/config2.xml
[metaproxy-moved-to-github.git] / src / filter_z3950_client.cpp
1 /* $Id: filter_z3950_client.cpp,v 1.19 2006-01-11 11:51:50 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11 #include "util.hpp"
12 #include "filter_z3950_client.hpp"
13
14 #include <map>
15 #include <stdexcept>
16 #include <list>
17 #include <iostream>
18
19 #include <boost/thread/mutex.hpp>
20 #include <boost/thread/condition.hpp>
21
22 #include <yaz/zgdu.h>
23 #include <yaz/log.h>
24 #include <yaz/otherinfo.h>
25 #include <yaz/diagbib1.h>
26
27 #include <yaz++/socket-manager.h>
28 #include <yaz++/pdu-assoc.h>
29 #include <yaz++/z-assoc.h>
30
31 namespace yf = yp2::filter;
32
33 namespace yp2 {
34     namespace filter {
35         class Z3950Client::Assoc : public yazpp_1::Z_Assoc{
36             friend class Rep;
37             Assoc(yazpp_1::SocketManager *socket_manager,
38                   yazpp_1::IPDU_Observable *PDU_Observable,
39                   std::string host, int timeout);
40             ~Assoc();
41             void connectNotify();
42             void failNotify();
43             void timeoutNotify();
44             void recv_GDU(Z_GDU *gdu, int len);
45             yazpp_1::IPDU_Observer* sessionNotify(
46                 yazpp_1::IPDU_Observable *the_PDU_Observable,
47                 int fd);
48
49             yazpp_1::SocketManager *m_socket_manager;
50             yazpp_1::IPDU_Observable *m_PDU_Observable;
51             Package *m_package;
52             bool m_in_use;
53             bool m_waiting;
54             bool m_destroyed;
55             bool m_connected;
56             int m_queue_len;
57             int m_time_elapsed;
58             int m_time_max;
59             std::string m_host;
60         };
61
62         class Z3950Client::Rep {
63         public:
64             int m_timeout_sec;
65             boost::mutex m_mutex;
66             boost::condition m_cond_session_ready;
67             std::map<yp2::Session,Z3950Client::Assoc *> m_clients;
68             Z3950Client::Assoc *get_assoc(Package &package);
69             void send_and_receive(Package &package,
70                                   yf::Z3950Client::Assoc *c);
71             void release_assoc(Package &package);
72         };
73     }
74 }
75
76 using namespace yp2;
77
78 yf::Z3950Client::Assoc::Assoc(yazpp_1::SocketManager *socket_manager,
79                               yazpp_1::IPDU_Observable *PDU_Observable,
80                               std::string host, int timeout_sec)
81     :  Z_Assoc(PDU_Observable),
82        m_socket_manager(socket_manager), m_PDU_Observable(PDU_Observable),
83        m_package(0), m_in_use(true), m_waiting(false), 
84        m_destroyed(false), m_connected(false), m_queue_len(1),
85        m_time_elapsed(0), m_time_max(timeout_sec), 
86        m_host(host)
87 {
88     // std::cout << "create assoc " << this << "\n";
89 }
90
91 yf::Z3950Client::Assoc::~Assoc()
92 {
93     // std::cout << "destroy assoc " << this << "\n";
94 }
95
96 void yf::Z3950Client::Assoc::connectNotify()
97 {
98     m_waiting = false;
99
100     m_connected = true;
101 }
102
103 void yf::Z3950Client::Assoc::failNotify()
104 {
105     m_waiting = false;
106
107     yp2::odr odr;
108
109     if (m_package)
110     {
111         m_package->response() = odr.create_close(Z_Close_peerAbort, 0);
112         m_package->session().close();
113     }
114 }
115
116 void yf::Z3950Client::Assoc::timeoutNotify()
117 {
118     m_time_elapsed++;
119     if (m_time_elapsed >= m_time_max)
120     {
121         m_waiting = false;
122
123         yp2::odr odr;
124         
125         if (m_package)
126         {
127             if (m_connected)
128                 m_package->response() = odr.create_close(Z_Close_lackOfActivity, 0);
129             else
130                 m_package->response() = odr.create_close(Z_Close_peerAbort, 0);
131                 
132             m_package->session().close();
133         }
134     }
135 }
136
137 void yf::Z3950Client::Assoc::recv_GDU(Z_GDU *gdu, int len)
138 {
139     m_waiting = false;
140
141     if (m_package)
142         m_package->response() = gdu;
143 }
144
145 yazpp_1::IPDU_Observer *yf::Z3950Client::Assoc::sessionNotify(
146     yazpp_1::IPDU_Observable *the_PDU_Observable,
147     int fd)
148 {
149     return 0;
150 }
151
152
153 yf::Z3950Client::Z3950Client() :  m_p(new yf::Z3950Client::Rep)
154 {
155     m_p->m_timeout_sec = 30;
156 }
157
158 yf::Z3950Client::~Z3950Client() {
159 }
160
161 yf::Z3950Client::Assoc *yf::Z3950Client::Rep::get_assoc(Package &package) 
162 {
163     // only one thread messes with the clients list at a time
164     boost::mutex::scoped_lock lock(m_mutex);
165
166     std::map<yp2::Session,yf::Z3950Client::Assoc *>::iterator it;
167     
168     Z_GDU *gdu = package.request().get();
169     // only deal with Z39.50
170     if (!gdu || gdu->which != Z_GDU_Z3950)
171     {
172         package.move();
173         return 0;
174     }
175     it = m_clients.find(package.session());
176     if (it != m_clients.end())
177     {
178         it->second->m_queue_len++;
179         while(true)
180         {
181 #if 0
182             if (gdu && gdu->which == Z_GDU_Z3950 &&
183                 gdu->u.z3950->which == Z_APDU_initRequest)
184             {
185                 yazpp_1::SocketManager *s = it->second->m_socket_manager;
186                 delete it->second;  // destroy Z_Assoc
187                 delete s;    // then manager
188                 m_clients.erase(it);
189                 break;
190             }
191 #endif
192             if (!it->second->m_in_use)
193             {
194                 it->second->m_in_use = true;
195                 return it->second;
196             }
197             m_cond_session_ready.wait(lock);
198         }
199     }
200     // new Z39.50 session ..
201     Z_APDU *apdu = gdu->u.z3950;
202     // check that it is init. If not, close
203     if (apdu->which != Z_APDU_initRequest)
204     {
205         yp2::odr odr;
206         
207         package.response() = odr.create_close(Z_Close_protocolError,
208                                               "First PDU was not an "
209                                               "Initialize Request");
210         package.session().close();
211         return 0;
212     }
213     // check virtual host
214     const char *vhost =
215         yaz_oi_get_string_oidval(&apdu->u.initRequest->otherInfo,
216                                  VAL_PROXY, 
217                                  /* categoryValue */ 1, /* delete */ 1);
218     if (!vhost)
219     {
220         yp2::odr odr;
221         package.response() = odr.create_initResponse(
222             YAZ_BIB1_INIT_NEGOTIATION_OPTION_REQUIRED,
223             "Virtual host not given");
224         
225         package.session().close();
226         return 0;
227     }
228                       
229     yazpp_1::SocketManager *sm = new yazpp_1::SocketManager;
230     yazpp_1::PDU_Assoc *pdu_as = new yazpp_1::PDU_Assoc(sm);
231     yf::Z3950Client::Assoc *as = new yf::Z3950Client::Assoc(sm, pdu_as, vhost,
232                                                             m_timeout_sec);
233     m_clients[package.session()] = as;
234     return as;
235 }
236
237 void yf::Z3950Client::Rep::send_and_receive(Package &package,
238                                             yf::Z3950Client::Assoc *c)
239 {
240     Z_GDU *gdu = package.request().get();
241
242     if (c->m_destroyed)
243         return;
244
245     if (!gdu || gdu->which != Z_GDU_Z3950)
246         return;
247
248     c->m_time_elapsed = 0;
249     c->m_package = &package;
250     c->m_waiting = true;
251     if (!c->m_connected)
252     {
253         c->client(c->m_host.c_str());
254         c->timeout(1);
255
256         while (!c->m_destroyed && c->m_waiting 
257                && c->m_socket_manager->processEvent() > 0)
258             ;
259     }
260     if (!c->m_connected)
261     {
262         return;
263     }
264
265     // prepare response
266     c->m_time_elapsed = 0;
267     c->m_waiting = true;
268     
269     // relay the package  ..
270     int len;
271     c->send_GDU(gdu, &len);
272
273     switch(gdu->u.z3950->which)
274     {
275     case Z_APDU_triggerResourceControlRequest:
276         // request only..
277         break;
278     default:
279         // for the rest: wait for a response PDU
280         while (!c->m_destroyed && c->m_waiting
281                && c->m_socket_manager->processEvent() > 0)
282             ;
283         break;
284     }
285 }
286
287 void yf::Z3950Client::Rep::release_assoc(Package &package)
288 {
289     boost::mutex::scoped_lock lock(m_mutex);
290     std::map<yp2::Session,yf::Z3950Client::Assoc *>::iterator it;
291     
292     it = m_clients.find(package.session());
293     if (it != m_clients.end())
294     {
295         Z_GDU *gdu = package.request().get();
296         if (gdu && gdu->which == Z_GDU_Z3950)
297         {   // only Z39.50 packages lock in get_assoc.. release it
298             it->second->m_in_use = false;
299             it->second->m_queue_len--;
300         }
301
302         if (package.session().is_closed())
303         {
304             // destroy hint (send_and_receive)
305             it->second->m_destroyed = true;
306             
307             // wait until no one is waiting for it.
308             while (it->second->m_queue_len)
309                 m_cond_session_ready.wait(lock);
310             
311             // the Z_Assoc and PDU_Assoc must be destroyed before
312             // the socket manager.. so pull that out.. first..
313             yazpp_1::SocketManager *s = it->second->m_socket_manager;
314             delete it->second;  // destroy Z_Assoc
315             delete s;    // then manager
316             m_clients.erase(it);
317         }
318         m_cond_session_ready.notify_all();
319     }
320 }
321
322 void yf::Z3950Client::process(Package &package) const
323 {
324     yf::Z3950Client::Assoc *c = m_p->get_assoc(package);
325     if (c)
326     {
327         m_p->send_and_receive(package, c);
328     }
329     m_p->release_assoc(package);
330 }
331
332 void yf::Z3950Client::configure(const xmlNode *ptr)
333 {
334     for (ptr = ptr->children; ptr; ptr = ptr->next)
335     {
336         if (ptr->type != XML_ELEMENT_NODE)
337             continue;
338         if (!strcmp((const char *) ptr->name, "timeout"))
339         {
340             std::string timeout_str = yp2::xml::get_text(ptr);
341             int timeout_sec = atoi(timeout_str.c_str());
342             if (timeout_sec < 2)
343                 throw yp2::filter::FilterException("Bad timeout value " 
344                                                    + timeout_str);
345             m_p->m_timeout_sec = timeout_sec;
346         }
347         else
348         {
349             throw yp2::filter::FilterException("Bad element " 
350                                                + std::string((const char *)
351                                                              ptr->name));
352         }
353     }
354 }
355
356 static yp2::filter::Base* filter_creator()
357 {
358     return new yp2::filter::Z3950Client;
359 }
360
361 extern "C" {
362     struct yp2_filter_struct yp2_filter_z3950_client = {
363         0,
364         "z3950_client",
365         filter_creator
366     };
367 }
368
369 /*
370  * Local variables:
371  * c-basic-offset: 4
372  * indent-tabs-mode: nil
373  * c-file-style: "stroustrup"
374  * End:
375  * vim: shiftwidth=4 tabstop=8 expandtab
376  */