Added support for configurable default/force target/vhost for module Z39.50
[metaproxy-moved-to-github.git] / src / filter_z3950_client.cpp
1 /* $Id: filter_z3950_client.cpp,v 1.30 2007-01-26 14:49:22 adam Exp $
2    Copyright (c) 2005-2007, Index Data.
3
4    See the LICENSE file for details
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11 #include "util.hpp"
12 #include "filter_z3950_client.hpp"
13
14 #include <map>
15 #include <stdexcept>
16 #include <list>
17 #include <iostream>
18
19 #include <boost/thread/mutex.hpp>
20 #include <boost/thread/condition.hpp>
21
22 #include <yaz/zgdu.h>
23 #include <yaz/log.h>
24 #include <yaz/otherinfo.h>
25 #include <yaz/diagbib1.h>
26
27 #include <yazpp/socket-manager.h>
28 #include <yazpp/pdu-assoc.h>
29 #include <yazpp/z-assoc.h>
30
31 namespace mp = metaproxy_1;
32 namespace yf = mp::filter;
33
34 namespace metaproxy_1 {
35     namespace filter {
36         class Z3950Client::Assoc : public yazpp_1::Z_Assoc{
37             friend class Rep;
38             Assoc(yazpp_1::SocketManager *socket_manager,
39                   yazpp_1::IPDU_Observable *PDU_Observable,
40                   std::string host, int timeout);
41             ~Assoc();
42             void connectNotify();
43             void failNotify();
44             void timeoutNotify();
45             void recv_GDU(Z_GDU *gdu, int len);
46             yazpp_1::IPDU_Observer* sessionNotify(
47                 yazpp_1::IPDU_Observable *the_PDU_Observable,
48                 int fd);
49
50             yazpp_1::SocketManager *m_socket_manager;
51             yazpp_1::IPDU_Observable *m_PDU_Observable;
52             Package *m_package;
53             bool m_in_use;
54             bool m_waiting;
55             bool m_destroyed;
56             bool m_connected;
57             int m_queue_len;
58             int m_time_elapsed;
59             int m_time_max;
60             std::string m_host;
61         };
62
63         class Z3950Client::Rep {
64         public:
65             // number of seconds to wait before we give up request
66             int m_timeout_sec;
67             std::string m_default_target;
68             std::string m_force_target;
69             boost::mutex m_mutex;
70             boost::condition m_cond_session_ready;
71             std::map<mp::Session,Z3950Client::Assoc *> m_clients;
72             Z3950Client::Assoc *get_assoc(Package &package);
73             void send_and_receive(Package &package,
74                                   yf::Z3950Client::Assoc *c);
75             void release_assoc(Package &package);
76         };
77     }
78 }
79
80 using namespace mp;
81
82 yf::Z3950Client::Assoc::Assoc(yazpp_1::SocketManager *socket_manager,
83                               yazpp_1::IPDU_Observable *PDU_Observable,
84                               std::string host, int timeout_sec)
85     :  Z_Assoc(PDU_Observable),
86        m_socket_manager(socket_manager), m_PDU_Observable(PDU_Observable),
87        m_package(0), m_in_use(true), m_waiting(false), 
88        m_destroyed(false), m_connected(false), m_queue_len(1),
89        m_time_elapsed(0), m_time_max(timeout_sec), 
90        m_host(host)
91 {
92     // std::cout << "create assoc " << this << "\n";
93 }
94
95 yf::Z3950Client::Assoc::~Assoc()
96 {
97     // std::cout << "destroy assoc " << this << "\n";
98 }
99
100 void yf::Z3950Client::Assoc::connectNotify()
101 {
102     m_waiting = false;
103
104     m_connected = true;
105 }
106
107 void yf::Z3950Client::Assoc::failNotify()
108 {
109     m_waiting = false;
110
111     mp::odr odr;
112
113     if (m_package)
114     {
115         Z_GDU *gdu = m_package->request().get();
116         Z_APDU *apdu = 0;
117         if (gdu && gdu->which == Z_GDU_Z3950)
118             apdu = gdu->u.z3950;
119         
120         m_package->response() = odr.create_close(apdu, Z_Close_peerAbort, 0);
121         m_package->session().close();
122     }
123 }
124
125 void yf::Z3950Client::Assoc::timeoutNotify()
126 {
127     m_time_elapsed++;
128     if (m_time_elapsed >= m_time_max)
129     {
130         m_waiting = false;
131
132         mp::odr odr;
133         
134         if (m_package)
135         {
136             Z_GDU *gdu = m_package->request().get();
137             Z_APDU *apdu = 0;
138             if (gdu && gdu->which == Z_GDU_Z3950)
139                 apdu = gdu->u.z3950;
140         
141             if (m_connected)
142                 m_package->response() =
143                     odr.create_close(apdu, Z_Close_lackOfActivity, 0);
144             else
145                 m_package->response() = 
146                     odr.create_close(apdu, Z_Close_peerAbort, 0);
147                 
148             m_package->session().close();
149         }
150     }
151 }
152
153 void yf::Z3950Client::Assoc::recv_GDU(Z_GDU *gdu, int len)
154 {
155     m_waiting = false;
156
157     if (m_package)
158         m_package->response() = gdu;
159 }
160
161 yazpp_1::IPDU_Observer *yf::Z3950Client::Assoc::sessionNotify(
162     yazpp_1::IPDU_Observable *the_PDU_Observable,
163     int fd)
164 {
165     return 0;
166 }
167
168
169 yf::Z3950Client::Z3950Client() :  m_p(new yf::Z3950Client::Rep)
170 {
171     m_p->m_timeout_sec = 30;
172 }
173
174 yf::Z3950Client::~Z3950Client() {
175 }
176
177 yf::Z3950Client::Assoc *yf::Z3950Client::Rep::get_assoc(Package &package) 
178 {
179     // only one thread messes with the clients list at a time
180     boost::mutex::scoped_lock lock(m_mutex);
181
182     std::map<mp::Session,yf::Z3950Client::Assoc *>::iterator it;
183     
184     Z_GDU *gdu = package.request().get();
185     // only deal with Z39.50
186     if (!gdu || gdu->which != Z_GDU_Z3950)
187     {
188         package.move();
189         return 0;
190     }
191     it = m_clients.find(package.session());
192     if (it != m_clients.end())
193     {
194         it->second->m_queue_len++;
195         while(true)
196         {
197 #if 0
198             // double init .. NOT working yet
199             if (gdu && gdu->which == Z_GDU_Z3950 &&
200                 gdu->u.z3950->which == Z_APDU_initRequest)
201             {
202                 yazpp_1::SocketManager *s = it->second->m_socket_manager;
203                 delete it->second;  // destroy Z_Assoc
204                 delete s;    // then manager
205                 m_clients.erase(it);
206                 break;
207             }
208 #endif
209             if (!it->second->m_in_use)
210             {
211                 it->second->m_in_use = true;
212                 return it->second;
213             }
214             m_cond_session_ready.wait(lock);
215         }
216     }
217     // new Z39.50 session ..
218     Z_APDU *apdu = gdu->u.z3950;
219     // check that it is init. If not, close
220     if (apdu->which != Z_APDU_initRequest)
221     {
222         mp::odr odr;
223         
224         package.response() = odr.create_close(apdu,
225                                               Z_Close_protocolError,
226                                               "First PDU was not an "
227                                               "Initialize Request");
228         package.session().close();
229         return 0;
230     }
231     std::string target = m_force_target;
232     if (!target.length())
233     {
234         target = m_default_target;
235         std::list<std::string> vhosts;
236         mp::util::remove_vhost_otherinfo(&apdu->u.initRequest->otherInfo,
237                                          vhosts);
238         size_t no_vhosts = vhosts.size();
239         if (no_vhosts == 1)
240         {
241             std::list<std::string>::const_iterator v_it = vhosts.begin();
242             target = *v_it;
243         }
244         else if (no_vhosts == 0)
245         {
246             if (!target.length())
247             {
248                 // no default target. So we don't know where to connect
249                 mp::odr odr;
250                 package.response() = odr.create_initResponse(
251                     apdu,
252                     YAZ_BIB1_INIT_NEGOTIATION_OPTION_REQUIRED,
253                     "z3950_client: No virtal host given");
254                 
255                 package.session().close();
256                 return 0;
257             }
258         }
259         else if (no_vhosts > 1)
260         {
261             mp::odr odr;
262             package.response() = odr.create_initResponse(
263                 apdu,
264                 YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP,
265                 "z3950_client: Can not cope with multiple vhosts");
266             package.session().close();
267             return 0;
268         }
269     }
270     std::list<std::string> dblist;
271     std::string host;
272     mp::util::split_zurl(target, host, dblist);
273     
274     if (dblist.size())
275     {
276         ; // z3950_client: Databases in vhost ignored
277     }
278
279     yazpp_1::SocketManager *sm = new yazpp_1::SocketManager;
280     yazpp_1::PDU_Assoc *pdu_as = new yazpp_1::PDU_Assoc(sm);
281     yf::Z3950Client::Assoc *as = new yf::Z3950Client::Assoc(sm, pdu_as,
282                                                             host.c_str(),
283                                                             m_timeout_sec);
284     m_clients[package.session()] = as;
285     return as;
286 }
287
288 void yf::Z3950Client::Rep::send_and_receive(Package &package,
289                                             yf::Z3950Client::Assoc *c)
290 {
291     Z_GDU *gdu = package.request().get();
292
293     if (c->m_destroyed)
294         return;
295
296     if (!gdu || gdu->which != Z_GDU_Z3950)
297         return;
298
299     c->m_time_elapsed = 0;
300     c->m_package = &package;
301     c->m_waiting = true;
302     if (!c->m_connected)
303     {
304         c->client(c->m_host.c_str());
305         c->timeout(1);  // so timeoutNotify gets called once per second
306
307         while (!c->m_destroyed && c->m_waiting 
308                && c->m_socket_manager->processEvent() > 0)
309             ;
310     }
311     if (!c->m_connected)
312     {
313         return;
314     }
315
316     // prepare response
317     c->m_time_elapsed = 0;
318     c->m_waiting = true;
319     
320     // relay the package  ..
321     int len;
322     c->send_GDU(gdu, &len);
323
324     switch(gdu->u.z3950->which)
325     {
326     case Z_APDU_triggerResourceControlRequest:
327         // request only..
328         break;
329     default:
330         // for the rest: wait for a response PDU
331         while (!c->m_destroyed && c->m_waiting
332                && c->m_socket_manager->processEvent() > 0)
333             ;
334         break;
335     }
336 }
337
338 void yf::Z3950Client::Rep::release_assoc(Package &package)
339 {
340     boost::mutex::scoped_lock lock(m_mutex);
341     std::map<mp::Session,yf::Z3950Client::Assoc *>::iterator it;
342     
343     it = m_clients.find(package.session());
344     if (it != m_clients.end())
345     {
346         Z_GDU *gdu = package.request().get();
347         if (gdu && gdu->which == Z_GDU_Z3950)
348         {   // only Z39.50 packages lock in get_assoc.. release it
349             it->second->m_in_use = false;
350             it->second->m_queue_len--;
351         }
352
353         if (package.session().is_closed())
354         {
355             // destroy hint (send_and_receive)
356             it->second->m_destroyed = true;
357             
358             // wait until no one is waiting for it.
359             while (it->second->m_queue_len)
360                 m_cond_session_ready.wait(lock);
361             
362             // the Z_Assoc and PDU_Assoc must be destroyed before
363             // the socket manager.. so pull that out.. first..
364             yazpp_1::SocketManager *s = it->second->m_socket_manager;
365             delete it->second;  // destroy Z_Assoc
366             delete s;    // then manager
367             m_clients.erase(it);
368         }
369         m_cond_session_ready.notify_all();
370     }
371 }
372
373 void yf::Z3950Client::process(Package &package) const
374 {
375     yf::Z3950Client::Assoc *c = m_p->get_assoc(package);
376     if (c)
377     {
378         m_p->send_and_receive(package, c);
379     }
380     m_p->release_assoc(package);
381 }
382
383 void yf::Z3950Client::configure(const xmlNode *ptr)
384 {
385     for (ptr = ptr->children; ptr; ptr = ptr->next)
386     {
387         if (ptr->type != XML_ELEMENT_NODE)
388             continue;
389         if (!strcmp((const char *) ptr->name, "timeout"))
390         {
391             m_p->m_timeout_sec = mp::xml::get_int(ptr->children, 30);
392         }
393         else if (!strcmp((const char *) ptr->name, "default_target"))
394         {
395             m_p->m_default_target = mp::xml::get_text(ptr);
396         }
397         else if (!strcmp((const char *) ptr->name, "force_target"))
398         {
399             m_p->m_force_target = mp::xml::get_text(ptr);
400         }
401         else
402         {
403             throw mp::filter::FilterException("Bad element " 
404                                                + std::string((const char *)
405                                                              ptr->name));
406         }
407     }
408 }
409
410 static mp::filter::Base* filter_creator()
411 {
412     return new mp::filter::Z3950Client;
413 }
414
415 extern "C" {
416     struct metaproxy_1_filter_struct metaproxy_1_filter_z3950_client = {
417         0,
418         "z3950_client",
419         filter_creator
420     };
421 }
422
423 /*
424  * Local variables:
425  * c-basic-offset: 4
426  * indent-tabs-mode: nil
427  * c-file-style: "stroustrup"
428  * End:
429  * vim: shiftwidth=4 tabstop=8 expandtab
430  */