06f58e14cdd6ff718aae73c1b8b8fb4274266a0c
[metaproxy-moved-to-github.git] / src / filter_z3950_client.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2011 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include "filter_z3950_client.hpp"
22 #include <metaproxy/package.hpp>
23 #include <metaproxy/util.hpp>
24
25 #include <map>
26 #include <stdexcept>
27 #include <list>
28 #include <iostream>
29
30 #include <boost/thread/mutex.hpp>
31 #include <boost/thread/condition.hpp>
32 #include <boost/thread/xtime.hpp>
33
34 #include <yaz/zgdu.h>
35 #include <yaz/log.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38
39 #include <yazpp/socket-manager.h>
40 #include <yazpp/pdu-assoc.h>
41 #include <yazpp/z-assoc.h>
42
43 namespace mp = metaproxy_1;
44 namespace yf = mp::filter;
45
46 namespace metaproxy_1 {
47     namespace filter {
48         class Z3950Client::Assoc : public yazpp_1::Z_Assoc{
49             friend class Rep;
50             Assoc(yazpp_1::SocketManager *socket_manager,
51                   yazpp_1::IPDU_Observable *PDU_Observable,
52                   std::string host, int timeout);
53             ~Assoc();
54             void connectNotify();
55             void failNotify();
56             void timeoutNotify();
57             void recv_GDU(Z_GDU *gdu, int len);
58             yazpp_1::IPDU_Observer* sessionNotify(
59                 yazpp_1::IPDU_Observable *the_PDU_Observable,
60                 int fd);
61
62             yazpp_1::SocketManager *m_socket_manager;
63             yazpp_1::IPDU_Observable *m_PDU_Observable;
64             Package *m_package;
65             bool m_in_use;
66             bool m_waiting;
67             bool m_destroyed;
68             bool m_connected;
69             bool m_has_closed;
70             int m_queue_len;
71             int m_time_elapsed;
72             int m_time_max;
73             int m_time_connect_max;
74             std::string m_host;
75         };
76
77         class Z3950Client::Rep {
78         public:
79             // number of seconds to wait before we give up request
80             int m_timeout_sec;
81             int m_max_sockets;
82             bool m_force_close;
83             std::string m_default_target;
84             std::string m_force_target;
85             boost::mutex m_mutex;
86             boost::condition m_cond_session_ready;
87             std::map<mp::Session,Z3950Client::Assoc *> m_clients;
88             Z3950Client::Assoc *get_assoc(Package &package);
89             void send_and_receive(Package &package,
90                                   yf::Z3950Client::Assoc *c);
91             void release_assoc(Package &package);
92         };
93     }
94 }
95
96 using namespace mp;
97
98 yf::Z3950Client::Assoc::Assoc(yazpp_1::SocketManager *socket_manager,
99                               yazpp_1::IPDU_Observable *PDU_Observable,
100                               std::string host, int timeout_sec)
101     :  Z_Assoc(PDU_Observable),
102        m_socket_manager(socket_manager), m_PDU_Observable(PDU_Observable),
103        m_package(0), m_in_use(true), m_waiting(false), 
104        m_destroyed(false), m_connected(false), m_has_closed(false),
105        m_queue_len(1),
106        m_time_elapsed(0), m_time_max(timeout_sec),  m_time_connect_max(10),
107        m_host(host)
108 {
109     // std::cout << "create assoc " << this << "\n";
110 }
111
112 yf::Z3950Client::Assoc::~Assoc()
113 {
114     // std::cout << "destroy assoc " << this << "\n";
115 }
116
117 void yf::Z3950Client::Assoc::connectNotify()
118 {
119     m_waiting = false;
120
121     m_connected = true;
122 }
123
124 void yf::Z3950Client::Assoc::failNotify()
125 {
126     m_waiting = false;
127
128     mp::odr odr;
129
130     if (m_package)
131     {
132         Z_GDU *gdu = m_package->request().get();
133         Z_APDU *apdu = 0;
134         if (gdu && gdu->which == Z_GDU_Z3950)
135             apdu = gdu->u.z3950;
136         
137         m_package->response() = odr.create_close(apdu, Z_Close_peerAbort, 0);
138         m_package->session().close();
139     }
140 }
141
142 void yf::Z3950Client::Assoc::timeoutNotify()
143 {
144     m_time_elapsed++;
145     if ((m_connected && m_time_elapsed >= m_time_max)
146         || (!m_connected && m_time_elapsed >= m_time_connect_max))
147     {
148         m_waiting = false;
149
150         mp::odr odr;
151         
152         if (m_package)
153         {
154             Z_GDU *gdu = m_package->request().get();
155             Z_APDU *apdu = 0;
156             if (gdu && gdu->which == Z_GDU_Z3950)
157                 apdu = gdu->u.z3950;
158         
159             if (m_connected)
160                 m_package->response() =
161                     odr.create_close(apdu, Z_Close_lackOfActivity, 0);
162             else
163                 m_package->response() = 
164                     odr.create_close(apdu, Z_Close_peerAbort, 0);
165                 
166             m_package->session().close();
167         }
168     }
169 }
170
171 void yf::Z3950Client::Assoc::recv_GDU(Z_GDU *gdu, int len)
172 {
173     m_waiting = false;
174
175     if (m_package)
176         m_package->response() = gdu;
177 }
178
179 yazpp_1::IPDU_Observer *yf::Z3950Client::Assoc::sessionNotify(
180     yazpp_1::IPDU_Observable *the_PDU_Observable,
181     int fd)
182 {
183     return 0;
184 }
185
186
187 yf::Z3950Client::Z3950Client() :  m_p(new yf::Z3950Client::Rep)
188 {
189     m_p->m_timeout_sec = 30;
190     m_p->m_max_sockets = 0;
191     m_p->m_force_close = false;
192 }
193
194 yf::Z3950Client::~Z3950Client() {
195 }
196
197 yf::Z3950Client::Assoc *yf::Z3950Client::Rep::get_assoc(Package &package) 
198 {
199     // only one thread messes with the clients list at a time
200     boost::mutex::scoped_lock lock(m_mutex);
201
202     std::map<mp::Session,yf::Z3950Client::Assoc *>::iterator it;
203     
204     Z_GDU *gdu = package.request().get();
205     
206     int max_sockets = package.origin().get_max_sockets();
207     if (max_sockets == 0)
208         max_sockets = m_max_sockets;
209     
210     std::string host;
211
212     it = m_clients.find(package.session());
213     if (it != m_clients.end())
214     {
215         it->second->m_queue_len++;
216         while (true)
217         {
218 #if 0
219             // double init .. NOT working yet
220             if (gdu && gdu->which == Z_GDU_Z3950 &&
221                 gdu->u.z3950->which == Z_APDU_initRequest)
222             {
223                 yazpp_1::SocketManager *s = it->second->m_socket_manager;
224                 delete it->second;  // destroy Z_Assoc
225                 delete s;    // then manager
226                 m_clients.erase(it);
227                 break;
228             }
229 #endif
230             if (!it->second->m_in_use)
231             {
232                 it->second->m_in_use = true;
233                 return it->second;
234             }
235             m_cond_session_ready.wait(lock);
236         }
237     }
238     if (!gdu || gdu->which != Z_GDU_Z3950)
239     {
240         package.move();
241         return 0;
242     }
243     // new Z39.50 session ..
244     Z_APDU *apdu = gdu->u.z3950;
245     // check that it is init. If not, close
246     if (apdu->which != Z_APDU_initRequest)
247     {
248         mp::odr odr;
249         
250         package.response() = odr.create_close(apdu,
251                                               Z_Close_protocolError,
252                                               "First PDU was not an "
253                                               "Initialize Request");
254         package.session().close();
255         return 0;
256     }
257     std::string target = m_force_target;
258     if (!target.length())
259     {
260         target = m_default_target;
261         std::list<std::string> vhosts;
262         mp::util::remove_vhost_otherinfo(&apdu->u.initRequest->otherInfo,
263                                              vhosts);
264         size_t no_vhosts = vhosts.size();
265         if (no_vhosts == 1)
266         {
267             std::list<std::string>::const_iterator v_it = vhosts.begin();
268             target = *v_it;
269         }
270         else if (no_vhosts == 0)
271         {
272             if (!target.length())
273             {
274                 // no default target. So we don't know where to connect
275                 mp::odr odr;
276                 package.response() = odr.create_initResponse(
277                     apdu,
278                     YAZ_BIB1_INIT_NEGOTIATION_OPTION_REQUIRED,
279                     "z3950_client: No vhost given");
280                 
281                 package.session().close();
282                 return 0;
283             }
284         }
285         else if (no_vhosts > 1)
286         {
287             mp::odr odr;
288             package.response() = odr.create_initResponse(
289                 apdu,
290                 YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP,
291                 "z3950_client: Can not cope with multiple vhosts");
292             package.session().close();
293             return 0;
294         }
295     }
296     
297     std::list<std::string> dblist;
298     mp::util::split_zurl(target, host, dblist);
299     
300     if (dblist.size())
301     {
302         ; // z3950_client: Databases in vhost ignored
303     }
304     
305     // see if we have reached max number of clients (max-sockets)
306
307     while (max_sockets)
308     {
309         int no_not_in_use = 0;
310         int number = 0;
311         it = m_clients.begin();
312         for (; it != m_clients.end(); it++)
313         {
314             yf::Z3950Client::Assoc *as = it->second;
315             if (!strcmp(as->m_host.c_str(), host.c_str()))
316             {
317                 number++;
318                 if (!as->m_in_use)
319                     no_not_in_use++;
320             }
321         }
322         yaz_log(YLOG_LOG, "Found %d/%d connections for %s", number, max_sockets,
323                 host.c_str());
324         if (number < max_sockets)
325             break;
326         if (no_not_in_use == 0) // all in use..
327         {
328             mp::odr odr;
329             
330             package.response() = odr.create_initResponse(
331                 apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, "max sessions");
332             package.session().close();
333             return 0;
334         }
335         boost::xtime xt;
336         xtime_get(&xt, boost::TIME_UTC);
337         
338         xt.sec += 15;
339         if (!m_cond_session_ready.timed_wait(lock, xt))
340         {
341             mp::odr odr;
342             
343             package.response() = odr.create_initResponse(
344                 apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, "max sessions");
345             package.session().close();
346             return 0;
347         }
348     }
349
350     yazpp_1::SocketManager *sm = new yazpp_1::SocketManager;
351     yazpp_1::PDU_Assoc *pdu_as = new yazpp_1::PDU_Assoc(sm);
352     yf::Z3950Client::Assoc *as = new yf::Z3950Client::Assoc(sm, pdu_as,
353                                                             host.c_str(),
354                                                             m_timeout_sec);
355     m_clients[package.session()] = as;
356     return as;
357 }
358
359 void yf::Z3950Client::Rep::send_and_receive(Package &package,
360                                             yf::Z3950Client::Assoc *c)
361 {
362     if (c->m_destroyed)
363         return;
364
365     c->m_package = &package;
366
367     if (package.session().is_closed() && c->m_connected && !c->m_has_closed
368         && m_force_close)
369     {
370         mp::odr odr;
371             
372         package.request() = odr.create_close(
373             0, Z_Close_finished, "z3950_client");
374         c->m_package = 0; // don't inspect response
375     }
376     Z_GDU *gdu = package.request().get();
377
378     if (!gdu || gdu->which != Z_GDU_Z3950)
379         return;
380
381     if (gdu->u.z3950->which == Z_APDU_close)
382         c->m_has_closed = true;
383
384     // prepare connect
385     c->m_time_elapsed = 0;
386     c->m_waiting = true;
387     if (!c->m_connected)
388     {
389         c->client(c->m_host.c_str());
390         c->timeout(1);  // so timeoutNotify gets called once per second
391
392         while (!c->m_destroyed && c->m_waiting 
393                && c->m_socket_manager->processEvent() > 0)
394             ;
395     }
396     if (!c->m_connected)
397     {
398         return;
399     }
400
401     // prepare response
402     c->m_time_elapsed = 0;
403     c->m_waiting = true;
404     
405     // relay the package  ..
406     int len;
407     c->send_GDU(gdu, &len);
408
409     switch (gdu->u.z3950->which)
410     {
411     case Z_APDU_triggerResourceControlRequest:
412         // request only..
413         break;
414     default:
415         // for the rest: wait for a response PDU
416         while (!c->m_destroyed && c->m_waiting
417                && c->m_socket_manager->processEvent() > 0)
418             ;
419         break;
420     }
421 }
422
423 void yf::Z3950Client::Rep::release_assoc(Package &package)
424 {
425     boost::mutex::scoped_lock lock(m_mutex);
426     std::map<mp::Session,yf::Z3950Client::Assoc *>::iterator it;
427     
428     it = m_clients.find(package.session());
429     if (it != m_clients.end())
430     {
431         it->second->m_in_use = false;
432         it->second->m_queue_len--;
433
434         if (package.session().is_closed())
435         {
436             // destroy hint (send_and_receive)
437             it->second->m_destroyed = true;
438             if (it->second->m_queue_len == 0)
439             {
440                 yazpp_1::SocketManager *s = it->second->m_socket_manager;
441                 delete it->second;  // destroy Z_Assoc
442                 delete s;    // then manager
443                 m_clients.erase(it);
444             }
445         }
446         m_cond_session_ready.notify_all();
447     }
448 }
449
450 void yf::Z3950Client::process(Package &package) const
451 {
452     yf::Z3950Client::Assoc *c = m_p->get_assoc(package);
453     if (c)
454     {
455         m_p->send_and_receive(package, c);
456         m_p->release_assoc(package);
457     }
458 }
459
460 void yf::Z3950Client::configure(const xmlNode *ptr, bool test_only)
461 {
462     for (ptr = ptr->children; ptr; ptr = ptr->next)
463     {
464         if (ptr->type != XML_ELEMENT_NODE)
465             continue;
466         if (!strcmp((const char *) ptr->name, "timeout"))
467         {
468             m_p->m_timeout_sec = mp::xml::get_int(ptr, 30);
469         }
470         else if (!strcmp((const char *) ptr->name, "default_target"))
471         {
472             m_p->m_default_target = mp::xml::get_text(ptr);
473         }
474         else if (!strcmp((const char *) ptr->name, "force_target"))
475         {
476             m_p->m_force_target = mp::xml::get_text(ptr);
477         }
478         else if (!strcmp((const char *) ptr->name, "max-sockets"))
479         {
480             m_p->m_max_sockets = mp::xml::get_int(ptr, 0);
481         }
482         else if (!strcmp((const char *) ptr->name, "force_close"))
483         {
484             m_p->m_force_close = mp::xml::get_bool(ptr, 0);
485         }
486         else
487         {
488             throw mp::filter::FilterException("Bad element " 
489                                                + std::string((const char *)
490                                                              ptr->name));
491         }
492     }
493 }
494
495 static mp::filter::Base* filter_creator()
496 {
497     return new mp::filter::Z3950Client;
498 }
499
500 extern "C" {
501     struct metaproxy_1_filter_struct metaproxy_1_filter_z3950_client = {
502         0,
503         "z3950_client",
504         filter_creator
505     };
506 }
507
508 /*
509  * Local variables:
510  * c-basic-offset: 4
511  * c-file-style: "Stroustrup"
512  * indent-tabs-mode: nil
513  * End:
514  * vim: shiftwidth=4 tabstop=8 expandtab
515  */
516