Put proper reference IDs in response PDUs properly.
[metaproxy-moved-to-github.git] / src / filter_z3950_client.cpp
1 /* $Id: filter_z3950_client.cpp,v 1.20 2006-01-13 15:09:35 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11 #include "util.hpp"
12 #include "filter_z3950_client.hpp"
13
14 #include <map>
15 #include <stdexcept>
16 #include <list>
17 #include <iostream>
18
19 #include <boost/thread/mutex.hpp>
20 #include <boost/thread/condition.hpp>
21
22 #include <yaz/zgdu.h>
23 #include <yaz/log.h>
24 #include <yaz/otherinfo.h>
25 #include <yaz/diagbib1.h>
26
27 #include <yaz++/socket-manager.h>
28 #include <yaz++/pdu-assoc.h>
29 #include <yaz++/z-assoc.h>
30
31 namespace yf = yp2::filter;
32
33 namespace yp2 {
34     namespace filter {
35         class Z3950Client::Assoc : public yazpp_1::Z_Assoc{
36             friend class Rep;
37             Assoc(yazpp_1::SocketManager *socket_manager,
38                   yazpp_1::IPDU_Observable *PDU_Observable,
39                   std::string host, int timeout);
40             ~Assoc();
41             void connectNotify();
42             void failNotify();
43             void timeoutNotify();
44             void recv_GDU(Z_GDU *gdu, int len);
45             yazpp_1::IPDU_Observer* sessionNotify(
46                 yazpp_1::IPDU_Observable *the_PDU_Observable,
47                 int fd);
48
49             yazpp_1::SocketManager *m_socket_manager;
50             yazpp_1::IPDU_Observable *m_PDU_Observable;
51             Package *m_package;
52             bool m_in_use;
53             bool m_waiting;
54             bool m_destroyed;
55             bool m_connected;
56             int m_queue_len;
57             int m_time_elapsed;
58             int m_time_max;
59             std::string m_host;
60         };
61
62         class Z3950Client::Rep {
63         public:
64             int m_timeout_sec;
65             boost::mutex m_mutex;
66             boost::condition m_cond_session_ready;
67             std::map<yp2::Session,Z3950Client::Assoc *> m_clients;
68             Z3950Client::Assoc *get_assoc(Package &package);
69             void send_and_receive(Package &package,
70                                   yf::Z3950Client::Assoc *c);
71             void release_assoc(Package &package);
72         };
73     }
74 }
75
76 using namespace yp2;
77
78 yf::Z3950Client::Assoc::Assoc(yazpp_1::SocketManager *socket_manager,
79                               yazpp_1::IPDU_Observable *PDU_Observable,
80                               std::string host, int timeout_sec)
81     :  Z_Assoc(PDU_Observable),
82        m_socket_manager(socket_manager), m_PDU_Observable(PDU_Observable),
83        m_package(0), m_in_use(true), m_waiting(false), 
84        m_destroyed(false), m_connected(false), m_queue_len(1),
85        m_time_elapsed(0), m_time_max(timeout_sec), 
86        m_host(host)
87 {
88     // std::cout << "create assoc " << this << "\n";
89 }
90
91 yf::Z3950Client::Assoc::~Assoc()
92 {
93     // std::cout << "destroy assoc " << this << "\n";
94 }
95
96 void yf::Z3950Client::Assoc::connectNotify()
97 {
98     m_waiting = false;
99
100     m_connected = true;
101 }
102
103 void yf::Z3950Client::Assoc::failNotify()
104 {
105     m_waiting = false;
106
107     yp2::odr odr;
108
109     if (m_package)
110     {
111         Z_GDU *gdu = m_package->request().get();
112         Z_APDU *apdu = 0;
113         if (gdu && gdu->which == Z_GDU_Z3950)
114             apdu = gdu->u.z3950;
115         
116         m_package->response() = odr.create_close(apdu, Z_Close_peerAbort, 0);
117         m_package->session().close();
118     }
119 }
120
121 void yf::Z3950Client::Assoc::timeoutNotify()
122 {
123     m_time_elapsed++;
124     if (m_time_elapsed >= m_time_max)
125     {
126         m_waiting = false;
127
128         yp2::odr odr;
129         
130         if (m_package)
131         {
132             Z_GDU *gdu = m_package->request().get();
133             Z_APDU *apdu = 0;
134             if (gdu && gdu->which == Z_GDU_Z3950)
135                 apdu = gdu->u.z3950;
136         
137             if (m_connected)
138                 m_package->response() =
139                     odr.create_close(apdu, Z_Close_lackOfActivity, 0);
140             else
141                 m_package->response() = 
142                     odr.create_close(apdu, Z_Close_peerAbort, 0);
143                 
144             m_package->session().close();
145         }
146     }
147 }
148
149 void yf::Z3950Client::Assoc::recv_GDU(Z_GDU *gdu, int len)
150 {
151     m_waiting = false;
152
153     if (m_package)
154         m_package->response() = gdu;
155 }
156
157 yazpp_1::IPDU_Observer *yf::Z3950Client::Assoc::sessionNotify(
158     yazpp_1::IPDU_Observable *the_PDU_Observable,
159     int fd)
160 {
161     return 0;
162 }
163
164
165 yf::Z3950Client::Z3950Client() :  m_p(new yf::Z3950Client::Rep)
166 {
167     m_p->m_timeout_sec = 30;
168 }
169
170 yf::Z3950Client::~Z3950Client() {
171 }
172
173 yf::Z3950Client::Assoc *yf::Z3950Client::Rep::get_assoc(Package &package) 
174 {
175     // only one thread messes with the clients list at a time
176     boost::mutex::scoped_lock lock(m_mutex);
177
178     std::map<yp2::Session,yf::Z3950Client::Assoc *>::iterator it;
179     
180     Z_GDU *gdu = package.request().get();
181     // only deal with Z39.50
182     if (!gdu || gdu->which != Z_GDU_Z3950)
183     {
184         package.move();
185         return 0;
186     }
187     it = m_clients.find(package.session());
188     if (it != m_clients.end())
189     {
190         it->second->m_queue_len++;
191         while(true)
192         {
193 #if 0
194             if (gdu && gdu->which == Z_GDU_Z3950 &&
195                 gdu->u.z3950->which == Z_APDU_initRequest)
196             {
197                 yazpp_1::SocketManager *s = it->second->m_socket_manager;
198                 delete it->second;  // destroy Z_Assoc
199                 delete s;    // then manager
200                 m_clients.erase(it);
201                 break;
202             }
203 #endif
204             if (!it->second->m_in_use)
205             {
206                 it->second->m_in_use = true;
207                 return it->second;
208             }
209             m_cond_session_ready.wait(lock);
210         }
211     }
212     // new Z39.50 session ..
213     Z_APDU *apdu = gdu->u.z3950;
214     // check that it is init. If not, close
215     if (apdu->which != Z_APDU_initRequest)
216     {
217         yp2::odr odr;
218         
219         package.response() = odr.create_close(apdu,
220                                               Z_Close_protocolError,
221                                               "First PDU was not an "
222                                               "Initialize Request");
223         package.session().close();
224         return 0;
225     }
226     // check virtual host
227     const char *vhost =
228         yaz_oi_get_string_oidval(&apdu->u.initRequest->otherInfo,
229                                  VAL_PROXY, 
230                                  /* categoryValue */ 1, /* delete */ 1);
231     if (!vhost)
232     {
233         yp2::odr odr;
234         package.response() = odr.create_initResponse(
235             apdu,
236             YAZ_BIB1_INIT_NEGOTIATION_OPTION_REQUIRED,
237             "Virtual host not given");
238         
239         package.session().close();
240         return 0;
241     }
242                       
243     yazpp_1::SocketManager *sm = new yazpp_1::SocketManager;
244     yazpp_1::PDU_Assoc *pdu_as = new yazpp_1::PDU_Assoc(sm);
245     yf::Z3950Client::Assoc *as = new yf::Z3950Client::Assoc(sm, pdu_as, vhost,
246                                                             m_timeout_sec);
247     m_clients[package.session()] = as;
248     return as;
249 }
250
251 void yf::Z3950Client::Rep::send_and_receive(Package &package,
252                                             yf::Z3950Client::Assoc *c)
253 {
254     Z_GDU *gdu = package.request().get();
255
256     if (c->m_destroyed)
257         return;
258
259     if (!gdu || gdu->which != Z_GDU_Z3950)
260         return;
261
262     c->m_time_elapsed = 0;
263     c->m_package = &package;
264     c->m_waiting = true;
265     if (!c->m_connected)
266     {
267         c->client(c->m_host.c_str());
268         c->timeout(1);
269
270         while (!c->m_destroyed && c->m_waiting 
271                && c->m_socket_manager->processEvent() > 0)
272             ;
273     }
274     if (!c->m_connected)
275     {
276         return;
277     }
278
279     // prepare response
280     c->m_time_elapsed = 0;
281     c->m_waiting = true;
282     
283     // relay the package  ..
284     int len;
285     c->send_GDU(gdu, &len);
286
287     switch(gdu->u.z3950->which)
288     {
289     case Z_APDU_triggerResourceControlRequest:
290         // request only..
291         break;
292     default:
293         // for the rest: wait for a response PDU
294         while (!c->m_destroyed && c->m_waiting
295                && c->m_socket_manager->processEvent() > 0)
296             ;
297         break;
298     }
299 }
300
301 void yf::Z3950Client::Rep::release_assoc(Package &package)
302 {
303     boost::mutex::scoped_lock lock(m_mutex);
304     std::map<yp2::Session,yf::Z3950Client::Assoc *>::iterator it;
305     
306     it = m_clients.find(package.session());
307     if (it != m_clients.end())
308     {
309         Z_GDU *gdu = package.request().get();
310         if (gdu && gdu->which == Z_GDU_Z3950)
311         {   // only Z39.50 packages lock in get_assoc.. release it
312             it->second->m_in_use = false;
313             it->second->m_queue_len--;
314         }
315
316         if (package.session().is_closed())
317         {
318             // destroy hint (send_and_receive)
319             it->second->m_destroyed = true;
320             
321             // wait until no one is waiting for it.
322             while (it->second->m_queue_len)
323                 m_cond_session_ready.wait(lock);
324             
325             // the Z_Assoc and PDU_Assoc must be destroyed before
326             // the socket manager.. so pull that out.. first..
327             yazpp_1::SocketManager *s = it->second->m_socket_manager;
328             delete it->second;  // destroy Z_Assoc
329             delete s;    // then manager
330             m_clients.erase(it);
331         }
332         m_cond_session_ready.notify_all();
333     }
334 }
335
336 void yf::Z3950Client::process(Package &package) const
337 {
338     yf::Z3950Client::Assoc *c = m_p->get_assoc(package);
339     if (c)
340     {
341         m_p->send_and_receive(package, c);
342     }
343     m_p->release_assoc(package);
344 }
345
346 void yf::Z3950Client::configure(const xmlNode *ptr)
347 {
348     for (ptr = ptr->children; ptr; ptr = ptr->next)
349     {
350         if (ptr->type != XML_ELEMENT_NODE)
351             continue;
352         if (!strcmp((const char *) ptr->name, "timeout"))
353         {
354             std::string timeout_str = yp2::xml::get_text(ptr);
355             int timeout_sec = atoi(timeout_str.c_str());
356             if (timeout_sec < 2)
357                 throw yp2::filter::FilterException("Bad timeout value " 
358                                                    + timeout_str);
359             m_p->m_timeout_sec = timeout_sec;
360         }
361         else
362         {
363             throw yp2::filter::FilterException("Bad element " 
364                                                + std::string((const char *)
365                                                              ptr->name));
366         }
367     }
368 }
369
370 static yp2::filter::Base* filter_creator()
371 {
372     return new yp2::filter::Z3950Client;
373 }
374
375 extern "C" {
376     struct yp2_filter_struct yp2_filter_z3950_client = {
377         0,
378         "z3950_client",
379         filter_creator
380     };
381 }
382
383 /*
384  * Local variables:
385  * c-basic-offset: 4
386  * indent-tabs-mode: nil
387  * c-file-style: "stroustrup"
388  * End:
389  * vim: shiftwidth=4 tabstop=8 expandtab
390  */