dd16ffe4a0557138700e71156a4f771f1c8cedbc
[metaproxy-moved-to-github.git] / src / filter_z3950_client.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include "filter_z3950_client.hpp"
22 #include <metaproxy/package.hpp>
23 #include <metaproxy/util.hpp>
24
25 #include <map>
26 #include <stdexcept>
27 #include <list>
28 #include <iostream>
29
30 #include <boost/thread/mutex.hpp>
31 #include <boost/thread/condition.hpp>
32 #include <boost/thread/xtime.hpp>
33
34 #include <yaz/zgdu.h>
35 #include <yaz/log.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yaz/oid_db.h>
39 #include <yaz/charneg.h>
40
41 #include <yazpp/socket-manager.h>
42 #include <yazpp/pdu-assoc.h>
43 #include <yazpp/z-assoc.h>
44
45 namespace mp = metaproxy_1;
46 namespace yf = mp::filter;
47
48 namespace metaproxy_1 {
49     namespace filter {
50         class Z3950Client::Assoc : public yazpp_1::Z_Assoc{
51             friend class Rep;
52             Assoc(yazpp_1::SocketManager *socket_manager,
53                   yazpp_1::IPDU_Observable *PDU_Observable,
54                   std::string host, int timeout);
55             ~Assoc();
56             void connectNotify();
57             void failNotify();
58             void timeoutNotify();
59             void recv_GDU(Z_GDU *gdu, int len);
60             void fixup_nsd(ODR odr, Z_Records *records);
61             void fixup_nsd(ODR odr, Z_DefaultDiagFormat *nsd);
62             void fixup_init(ODR odr, Z_InitResponse *initrs);
63             yazpp_1::IPDU_Observer* sessionNotify(
64                 yazpp_1::IPDU_Observable *the_PDU_Observable,
65                 int fd);
66
67             yazpp_1::SocketManager *m_socket_manager;
68             yazpp_1::IPDU_Observable *m_PDU_Observable;
69             Package *m_package;
70             bool m_in_use;
71             bool m_waiting;
72             bool m_destroyed;
73             bool m_connected;
74             bool m_has_closed;
75             int m_queue_len;
76             int m_time_elapsed;
77             int m_time_max;
78             int m_time_connect_max;
79             std::string m_host;
80         };
81
82         class Z3950Client::Rep {
83         public:
84             // number of seconds to wait before we give up request
85             int m_timeout_sec;
86             int m_max_sockets;
87             bool m_force_close;
88             bool m_client_ip;
89             bool m_bind_host;
90             std::string m_charset;
91             std::string m_default_target;
92             std::string m_force_target;
93             boost::mutex m_mutex;
94             boost::condition m_cond_session_ready;
95             std::map<mp::Session,Z3950Client::Assoc *> m_clients;
96             Z3950Client::Assoc *get_assoc(Package &package);
97             void send_and_receive(Package &package,
98                                   yf::Z3950Client::Assoc *c);
99             void release_assoc(Package &package);
100         };
101     }
102 }
103
104 using namespace mp;
105
106 yf::Z3950Client::Assoc::Assoc(yazpp_1::SocketManager *socket_manager,
107                               yazpp_1::IPDU_Observable *PDU_Observable,
108                               std::string host, int timeout_sec)
109     :  Z_Assoc(PDU_Observable),
110        m_socket_manager(socket_manager), m_PDU_Observable(PDU_Observable),
111        m_package(0), m_in_use(true), m_waiting(false),
112        m_destroyed(false), m_connected(false), m_has_closed(false),
113        m_queue_len(1),
114        m_time_elapsed(0), m_time_max(timeout_sec),  m_time_connect_max(10),
115        m_host(host)
116 {
117     // std::cout << "create assoc " << this << "\n";
118 }
119
120 yf::Z3950Client::Assoc::~Assoc()
121 {
122     // std::cout << "destroy assoc " << this << "\n";
123 }
124
125 void yf::Z3950Client::Assoc::connectNotify()
126 {
127     m_waiting = false;
128
129     m_connected = true;
130 }
131
132 void yf::Z3950Client::Assoc::failNotify()
133 {
134     m_waiting = false;
135
136     mp::odr odr;
137
138     if (m_package)
139     {
140         Z_GDU *gdu = m_package->request().get();
141         Z_APDU *apdu = 0;
142         if (gdu && gdu->which == Z_GDU_Z3950)
143             apdu = gdu->u.z3950;
144
145         m_package->response() = odr.create_close(apdu, Z_Close_peerAbort, 0);
146         m_package->session().close();
147     }
148 }
149
150 void yf::Z3950Client::Assoc::timeoutNotify()
151 {
152     m_time_elapsed++;
153     if ((m_connected && m_time_elapsed >= m_time_max)
154         || (!m_connected && m_time_elapsed >= m_time_connect_max))
155     {
156         m_waiting = false;
157
158         mp::odr odr;
159
160         if (m_package)
161         {
162             Z_GDU *gdu = m_package->request().get();
163             Z_APDU *apdu = 0;
164             if (gdu && gdu->which == Z_GDU_Z3950)
165                 apdu = gdu->u.z3950;
166
167             if (m_connected)
168                 m_package->response() =
169                     odr.create_close(apdu, Z_Close_lackOfActivity, 0);
170             else
171                 m_package->response() =
172                     odr.create_close(apdu, Z_Close_peerAbort, 0);
173
174             m_package->session().close();
175         }
176     }
177 }
178
179 void yf::Z3950Client::Assoc::fixup_nsd(ODR odr, Z_DefaultDiagFormat *nsd)
180 {
181     std::string addinfo;
182
183     // should really check for nsd->which.. But union has two members
184     // containing almost same data
185     const char *v2Addinfo = nsd->u.v2Addinfo;
186     //  Z_InternationalString *v3Addinfo;
187     if (v2Addinfo && *v2Addinfo)
188     {
189         addinfo.assign(nsd->u.v2Addinfo);
190         addinfo += " ";
191     }
192     addinfo += "(backend=" + m_host + ")";
193     nsd->u.v2Addinfo = odr_strdup(odr, addinfo.c_str());
194 }
195
196 void yf::Z3950Client::Assoc::fixup_nsd(ODR odr, Z_Records *records)
197 {
198     if (records && records->which == Z_Records_NSD)
199     {
200         fixup_nsd(odr, records->u.nonSurrogateDiagnostic);
201     }
202     if (records && records->which == Z_Records_multipleNSD)
203     {
204         Z_DiagRecs *drecs = records->u.multipleNonSurDiagnostics;
205         int i;
206         for (i = 0; i < drecs->num_diagRecs; i++)
207         {
208             Z_DiagRec *dr = drecs->diagRecs[i];
209
210             if (dr->which == Z_DiagRec_defaultFormat)
211                 fixup_nsd(odr, dr->u.defaultFormat);
212         }
213     }
214 }
215
216 void yf::Z3950Client::Assoc::fixup_init(ODR odr, Z_InitResponse *initrs)
217 {
218     Z_External *uif = initrs->userInformationField;
219
220     if (uif && uif->which == Z_External_userInfo1)
221     {
222         Z_OtherInformation *ui = uif->u.userInfo1;
223         int i;
224         for (i = 0; i < ui->num_elements; i++)
225         {
226             Z_OtherInformationUnit *unit = ui->list[i];
227             if (unit->which == Z_OtherInfo_externallyDefinedInfo &&
228                 unit->information.externallyDefinedInfo &&
229                 unit->information.externallyDefinedInfo->which ==
230                 Z_External_diag1)
231             {
232                 Z_DiagnosticFormat *diag =
233                     unit->information.externallyDefinedInfo->u.diag1;
234                 int j;
235                 for (j = 0; j < diag->num; j++)
236                 {
237                     Z_DiagnosticFormat_s *ds = diag->elements[j];
238                     if (ds->which == Z_DiagnosticFormat_s_defaultDiagRec)
239                     {
240                         Z_DefaultDiagFormat *r = ds->u.defaultDiagRec;
241                         char *oaddinfo = r->u.v2Addinfo;
242                         char *naddinfo = (char *) odr_malloc(
243                             odr,
244                             (oaddinfo ? strlen(oaddinfo) : 0) + 20 +
245                             m_host.length());
246                         *naddinfo = '\0';
247                         if (oaddinfo && *oaddinfo)
248                         {
249                             strcat(naddinfo, oaddinfo);
250                             strcat(naddinfo, " ");
251                         }
252                         strcat(naddinfo, "(backend=");
253                         strcat(naddinfo, m_host.c_str());
254                         strcat(naddinfo, ")");
255
256                         r->u.v2Addinfo = naddinfo;
257                     }
258                 }
259             }
260         }
261     }
262 }
263
264 void yf::Z3950Client::Assoc::recv_GDU(Z_GDU *gdu, int len)
265 {
266     m_waiting = false;
267
268     if (m_package)
269     {
270         mp::odr odr; // must be in scope for response() = assignment
271         if (gdu && gdu->which == Z_GDU_Z3950)
272         {
273             Z_APDU *apdu = gdu->u.z3950;
274             switch (apdu->which)
275             {
276             case Z_APDU_searchResponse:
277                 fixup_nsd(odr, apdu->u.searchResponse->records);
278                 break;
279             case Z_APDU_presentResponse:
280                 fixup_nsd(odr, apdu->u.presentResponse->records);
281                 break;
282             case Z_APDU_initResponse:
283                 fixup_init(odr, apdu->u.initResponse);
284                 break;
285             }
286         }
287         m_package->response() = gdu;
288     }
289 }
290
291 yazpp_1::IPDU_Observer *yf::Z3950Client::Assoc::sessionNotify(
292     yazpp_1::IPDU_Observable *the_PDU_Observable,
293     int fd)
294 {
295     return 0;
296 }
297
298
299 yf::Z3950Client::Z3950Client() :  m_p(new yf::Z3950Client::Rep)
300 {
301     m_p->m_timeout_sec = 30;
302     m_p->m_max_sockets = 0;
303     m_p->m_force_close = false;
304     m_p->m_client_ip = false;
305     m_p->m_bind_host = false;
306 }
307
308 yf::Z3950Client::~Z3950Client() {
309 }
310
311 yf::Z3950Client::Assoc *yf::Z3950Client::Rep::get_assoc(Package &package)
312 {
313     // only one thread messes with the clients list at a time
314     boost::mutex::scoped_lock lock(m_mutex);
315
316     std::map<mp::Session,yf::Z3950Client::Assoc *>::iterator it;
317
318     Z_GDU *gdu = package.request().get();
319
320     int max_sockets = package.origin().get_max_sockets();
321     if (max_sockets == 0)
322         max_sockets = m_max_sockets;
323
324     it = m_clients.find(package.session());
325     if (it != m_clients.end())
326     {
327         it->second->m_queue_len++;
328         while (true)
329         {
330 #if 0
331             // double init .. NOT working yet
332             if (gdu && gdu->which == Z_GDU_Z3950 &&
333                 gdu->u.z3950->which == Z_APDU_initRequest)
334             {
335                 yazpp_1::SocketManager *s = it->second->m_socket_manager;
336                 delete it->second;  // destroy Z_Assoc
337                 delete s;    // then manager
338                 m_clients.erase(it);
339                 break;
340             }
341 #endif
342             if (!it->second->m_in_use)
343             {
344                 it->second->m_in_use = true;
345                 return it->second;
346             }
347             m_cond_session_ready.wait(lock);
348         }
349     }
350     if (!gdu || gdu->which != Z_GDU_Z3950)
351     {
352         package.move();
353         return 0;
354     }
355     // new Z39.50 session ..
356     Z_APDU *apdu = gdu->u.z3950;
357     // check that it is init. If not, close
358     if (apdu->which != Z_APDU_initRequest)
359     {
360         mp::odr odr;
361
362         package.response() = odr.create_close(apdu,
363                                               Z_Close_protocolError,
364                                               "First PDU was not an "
365                                               "Initialize Request");
366         package.session().close();
367         return 0;
368     }
369     std::string target = m_force_target;
370     if (!target.length())
371     {
372         target = m_default_target;
373         std::list<std::string> vhosts;
374         mp::util::remove_vhost_otherinfo(&apdu->u.initRequest->otherInfo,
375                                              vhosts);
376         size_t no_vhosts = vhosts.size();
377         if (no_vhosts == 1)
378         {
379             std::list<std::string>::const_iterator v_it = vhosts.begin();
380             target = *v_it;
381         }
382         else if (no_vhosts == 0)
383         {
384             if (!target.length())
385             {
386                 // no default target. So we don't know where to connect
387                 mp::odr odr;
388                 package.response() = odr.create_initResponse(
389                     apdu,
390                     YAZ_BIB1_INIT_NEGOTIATION_OPTION_REQUIRED,
391                     "z3950_client: No vhost given");
392
393                 package.session().close();
394                 return 0;
395             }
396         }
397         else if (no_vhosts > 1)
398         {
399             mp::odr odr;
400             package.response() = odr.create_initResponse(
401                 apdu,
402                 YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP,
403                 "z3950_client: Can not cope with multiple vhosts");
404             package.session().close();
405             return 0;
406         }
407     }
408
409     // see if we have reached max number of clients (max-sockets)
410
411     while (max_sockets)
412     {
413         int no_not_in_use = 0;
414         int number = 0;
415         it = m_clients.begin();
416         for (; it != m_clients.end(); it++)
417         {
418             yf::Z3950Client::Assoc *as = it->second;
419             if (!strcmp(as->m_host.c_str(), target.c_str()))
420             {
421                 number++;
422                 if (!as->m_in_use)
423                     no_not_in_use++;
424             }
425         }
426         yaz_log(YLOG_LOG, "Found %d/%d connections for %s", number, max_sockets,
427                 target.c_str());
428         if (number < max_sockets)
429             break;
430         if (no_not_in_use == 0) // all in use..
431         {
432             mp::odr odr;
433
434             package.response() = odr.create_initResponse(
435                 apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
436                 "z3950_client: max sessions");
437             package.session().close();
438             return 0;
439         }
440         boost::xtime xt;
441         xtime_get(&xt,
442 #if BOOST_VERSION >= 105000 
443                 boost::TIME_UTC_
444 #else
445                 boost::TIME_UTC
446 #endif 
447                 );
448
449         xt.sec += 15;
450         if (!m_cond_session_ready.timed_wait(lock, xt))
451         {
452             mp::odr odr;
453
454             package.response() = odr.create_initResponse(
455                 apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
456                 "z3950_client: max sessions");
457             package.session().close();
458             return 0;
459         }
460     }
461
462     yazpp_1::SocketManager *sm = new yazpp_1::SocketManager;
463     yazpp_1::PDU_Assoc *pdu_as = new yazpp_1::PDU_Assoc(sm);
464     yf::Z3950Client::Assoc *as = new yf::Z3950Client::Assoc(sm, pdu_as,
465                                                             target.c_str(),
466                                                             m_timeout_sec);
467     m_clients[package.session()] = as;
468     return as;
469 }
470
471 static void set_charset_proposal(ODR odr, Z_InitRequest *req, const char *charset)
472 {
473     Z_OtherInformation **p = &req->otherInfo;
474     Z_OtherInformationUnit *oi;
475
476     if (*p)
477     {
478         int i;
479         for (i = 0; i < (*p)->num_elements; i++)
480         {
481             Z_External *ext = (*p)->list[i]->information.externallyDefinedInfo;
482             if ((*p)->list[i]->which == Z_OtherInfo_externallyDefinedInfo
483                 && ext &&
484                 ext->which == Z_External_charSetandLanguageNegotiation)
485                 return;
486         }
487     }
488     if ((oi = yaz_oi_update(p, odr, 0, 0, 0)))
489     {
490         ODR_MASK_SET(req->options, Z_Options_negotiationModel);
491         oi->which = Z_OtherInfo_externallyDefinedInfo;
492         oi->information.externallyDefinedInfo =
493             yaz_set_proposal_charneg_list(odr, ",",
494                                           charset,
495                                           0 /* lang */,
496                                           1 /* records included */);
497     }
498 }
499
500 void yf::Z3950Client::Rep::send_and_receive(Package &package,
501                                             yf::Z3950Client::Assoc *c)
502 {
503     if (c->m_destroyed)
504         return;
505
506     c->m_package = &package;
507
508     if (package.session().is_closed() && c->m_connected && !c->m_has_closed
509         && m_force_close)
510     {
511         mp::odr odr;
512
513         package.request() = odr.create_close(
514             0, Z_Close_finished, "z3950_client");
515         c->m_package = 0; // don't inspect response
516     }
517     Z_GDU *gdu = package.request().get();
518
519     if (!gdu || gdu->which != Z_GDU_Z3950)
520         return;
521
522     if (gdu->u.z3950->which == Z_APDU_close)
523         c->m_has_closed = true;
524
525     Z_APDU *apdu = gdu->u.z3950;
526
527     // prepare connect
528     c->m_time_elapsed = 0;
529     c->m_waiting = true;
530     if (!c->m_connected)
531     {
532         std::string host(c->m_host);
533
534         if (m_bind_host)
535         {
536             std::string bind_host = package.origin().get_bind_address();
537             if (bind_host.length())
538             {
539                 host.append(" ");
540                 host.append(bind_host);
541             }
542         }
543         if (c->client(host.c_str()))
544         {
545             mp::odr odr;
546             package.response() =
547                 odr.create_close(gdu->u.z3950, Z_Close_peerAbort, 0);
548             package.session().close();
549             return;
550         }
551         c->timeout(1);  // so timeoutNotify gets called once per second
552
553
554         while (!c->m_destroyed && c->m_waiting
555                && c->m_socket_manager->processEvent() > 0)
556             ;
557     }
558     if (!c->m_connected)
559     {
560         return;
561     }
562     mp::odr odr;
563     if (m_client_ip)
564     {
565         std::string peer_name2 = package.origin().get_address();
566         if (apdu->which == Z_APDU_initRequest && peer_name2.length())
567         {
568             Z_OtherInformation **oi = &apdu->u.initRequest->otherInfo;
569             char *peer_name1 =
570                 yaz_oi_get_string_oid(oi, yaz_oid_userinfo_client_ip, 1, 1);
571             std::string pcomb;
572             if (peer_name1)
573             {
574                 pcomb.append(peer_name1);
575                 pcomb.append(", ");
576             }
577             pcomb.append(peer_name2);
578             yaz_oi_set_string_oid(&apdu->u.initRequest->otherInfo,
579                                   odr, yaz_oid_userinfo_client_ip,
580                                   1, pcomb.c_str());
581         }
582     }
583     if (apdu->which == Z_APDU_initRequest && m_charset.length() > 0)
584         set_charset_proposal(odr, apdu->u.initRequest, m_charset.c_str());
585
586     // prepare response
587     c->m_time_elapsed = 0;
588     c->m_waiting = true;
589
590     // relay the package  ..
591     int len;
592     c->send_GDU(gdu, &len);
593
594     switch (gdu->u.z3950->which)
595     {
596     case Z_APDU_triggerResourceControlRequest:
597         // request only..
598         break;
599     default:
600         // for the rest: wait for a response PDU
601         while (!c->m_destroyed && c->m_waiting
602                && c->m_socket_manager->processEvent() > 0)
603             ;
604         break;
605     }
606 }
607
608 void yf::Z3950Client::Rep::release_assoc(Package &package)
609 {
610     boost::mutex::scoped_lock lock(m_mutex);
611     std::map<mp::Session,yf::Z3950Client::Assoc *>::iterator it;
612
613     it = m_clients.find(package.session());
614     if (it != m_clients.end())
615     {
616         it->second->m_in_use = false;
617         it->second->m_queue_len--;
618
619         if (package.session().is_closed())
620         {
621             // destroy hint (send_and_receive)
622             it->second->m_destroyed = true;
623             if (it->second->m_queue_len == 0)
624             {
625                 yazpp_1::SocketManager *s = it->second->m_socket_manager;
626                 delete it->second;  // destroy Z_Assoc
627                 delete s;    // then manager
628                 m_clients.erase(it);
629             }
630         }
631         m_cond_session_ready.notify_all();
632     }
633 }
634
635 void yf::Z3950Client::process(Package &package) const
636 {
637     yf::Z3950Client::Assoc *c = m_p->get_assoc(package);
638     if (c)
639     {
640         m_p->send_and_receive(package, c);
641         m_p->release_assoc(package);
642     }
643 }
644
645 void yf::Z3950Client::configure(const xmlNode *ptr, bool test_only,
646                                 const char *path)
647 {
648     for (ptr = ptr->children; ptr; ptr = ptr->next)
649     {
650         if (ptr->type != XML_ELEMENT_NODE)
651             continue;
652         if (!strcmp((const char *) ptr->name, "timeout"))
653         {
654             m_p->m_timeout_sec = mp::xml::get_int(ptr, 30);
655         }
656         else if (!strcmp((const char *) ptr->name, "default_target"))
657         {
658             m_p->m_default_target = mp::xml::get_text(ptr);
659         }
660         else if (!strcmp((const char *) ptr->name, "force_target"))
661         {
662             m_p->m_force_target = mp::xml::get_text(ptr);
663         }
664         else if (!strcmp((const char *) ptr->name, "max-sockets"))
665         {
666             m_p->m_max_sockets = mp::xml::get_int(ptr, 0);
667         }
668         else if (!strcmp((const char *) ptr->name, "force_close"))
669         {
670             m_p->m_force_close = mp::xml::get_bool(ptr, 0);
671         }
672         else if (!strcmp((const char *) ptr->name, "client_ip"))
673         {
674             m_p->m_client_ip = mp::xml::get_bool(ptr, 0);
675         }
676         else if (!strcmp((const char *) ptr->name, "charset"))
677         {
678             m_p->m_charset = mp::xml::get_text(ptr);
679         }
680         else if (!strcmp((const char *) ptr->name, "bind_host"))
681         {
682             m_p->m_bind_host = mp::xml::get_bool(ptr, 0);
683         }
684         else
685         {
686             throw mp::filter::FilterException("Bad element "
687                                                + std::string((const char *)
688                                                              ptr->name));
689         }
690     }
691 }
692
693 static mp::filter::Base* filter_creator()
694 {
695     return new mp::filter::Z3950Client;
696 }
697
698 extern "C" {
699     struct metaproxy_1_filter_struct metaproxy_1_filter_z3950_client = {
700         0,
701         "z3950_client",
702         filter_creator
703     };
704 }
705
706 /*
707  * Local variables:
708  * c-basic-offset: 4
709  * c-file-style: "Stroustrup"
710  * indent-tabs-mode: nil
711  * End:
712  * vim: shiftwidth=4 tabstop=8 expandtab
713  */
714