Put Virt_db private classes inside Virt_db scope
[metaproxy-moved-to-github.git] / src / filter_virt_db.cpp
1 /* $Id: filter_virt_db.cpp,v 1.22 2006-01-12 14:45:04 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
14
15 #include "util.hpp"
16 #include "filter_virt_db.hpp"
17
18 #include <yaz/zgdu.h>
19 #include <yaz/otherinfo.h>
20 #include <yaz/diagbib1.h>
21
22 #include <map>
23 #include <iostream>
24
25 namespace yf = yp2::filter;
26
27 namespace yp2 {
28     namespace filter {
29         struct Virt_db::Set {
30             Set(yp2::Session &id, std::string setname,
31                 std::string vhost, std::string route,
32                 bool named_result_sets);
33             Set();
34             ~Set();
35
36             yp2::Session m_backend_session;
37             std::string m_backend_setname;
38             std::string m_vhost;
39             std::string m_route;
40             bool m_named_result_sets;
41         };
42         struct Virt_db::Map {
43             Map(std::string vhost, std::string route);
44             Map();
45             std::string m_vhost;
46             std::string m_route;
47         };
48         struct Virt_db::Frontend {
49             Frontend();
50             ~Frontend();
51             yp2::Session m_session;
52             bool m_is_virtual;
53             bool m_in_use;
54             std::map<std::string,Virt_db::Set> m_sets;
55             void search(Package &package, Z_APDU *apdu,
56                         const std::map<std::string, Virt_db::Map> &maps);
57             void present(Package &package, Z_APDU *apdu);
58             void close(Package &package);
59             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
60         };            
61         class Virt_db::Rep {
62             friend class Virt_db;
63             
64             Frontend *get_frontend(Package &package);
65             void release_frontend(Package &package);
66         private:
67             boost::mutex m_sessions_mutex;
68             std::map<std::string, Virt_db::Map>m_maps;
69
70             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
71
72             boost::mutex m_mutex;
73             boost::condition m_cond_session_ready;
74             std::map<yp2::Session,Frontend *> m_clients;
75         };
76     }
77 }
78
79 using namespace yp2;
80
81 yf::Virt_db::Frontend::Frontend()
82 {
83     m_is_virtual = false;
84 }
85
86 void yf::Virt_db::Frontend::close(Package &package)
87 {
88     Sets_it sit = m_sets.begin();
89     for (; sit != m_sets.end(); sit++)
90     {
91         sit->second.m_backend_session.close();
92         Package close_package(sit->second.m_backend_session, package.origin());
93         close_package.copy_filter(package);
94         close_package.move(sit->second.m_route);
95     }
96     m_sets.clear();
97 }
98
99 yf::Virt_db::Frontend::~Frontend()
100 {
101 }
102
103 yf::Virt_db::Frontend *yf::Virt_db::Rep::get_frontend(Package &package)
104 {
105     boost::mutex::scoped_lock lock(m_mutex);
106
107     std::map<yp2::Session,yf::Virt_db::Frontend *>::iterator it;
108     
109     while(true)
110     {
111         it = m_clients.find(package.session());
112         if (it == m_clients.end())
113             break;
114         
115         if (!it->second->m_in_use)
116         {
117             it->second->m_in_use = true;
118             return it->second;
119         }
120         m_cond_session_ready.wait(lock);
121     }
122     Frontend *f = new Frontend;
123     m_clients[package.session()] = f;
124     f->m_in_use = true;
125     return f;
126 }
127
128
129 void yf::Virt_db::Rep::release_frontend(Package &package)
130 {
131     boost::mutex::scoped_lock lock(m_mutex);
132     std::map<yp2::Session,yf::Virt_db::Frontend *>::iterator it;
133     
134     it = m_clients.find(package.session());
135     if (it != m_clients.end())
136     {
137         if (package.session().is_closed())
138         {
139             it->second->close(package);
140             delete it->second;
141             m_clients.erase(it);
142         }
143         else
144         {
145             it->second->m_in_use = false;
146         }
147         m_cond_session_ready.notify_all();
148     }
149 }
150
151 yf::Virt_db::Set::Set(yp2::Session &id, std::string setname,
152                       std::string vhost, std::string route,
153                       bool named_result_sets)
154     :   m_backend_session(id), m_backend_setname(setname), m_vhost(vhost),
155     m_route(route), m_named_result_sets(named_result_sets)
156 {
157 }
158
159
160 yf::Virt_db::Set::Set()
161 {
162 }
163
164
165 yf::Virt_db::Set::~Set()
166 {
167 }
168
169 yf::Virt_db::Map::Map(std::string vhost, std::string route)
170     : m_vhost(vhost), m_route(route) 
171 {
172 }
173
174 yf::Virt_db::Map::Map()
175 {
176 }
177
178 yf::Virt_db::Virt_db() : m_p(new Virt_db::Rep)
179 {
180 }
181
182 yf::Virt_db::~Virt_db() {
183 }
184
185 void yf::Virt_db::Frontend::present(Package &package, Z_APDU *apdu)
186 {
187     Session *id = 0;
188     Z_PresentRequest *req = apdu->u.presentRequest;
189     std::string resultSetId = req->resultSetId;
190     yp2::odr odr;
191
192     Sets_it sets_it = m_sets.find(resultSetId);
193     if (sets_it == m_sets.end())
194     {
195         Z_APDU *apdu = zget_APDU(odr, Z_APDU_presentResponse);
196         
197         Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
198         apdu->u.presentResponse->records = rec;
199         rec->which = Z_Records_NSD;
200         rec->u.nonSurrogateDiagnostic =
201             zget_DefaultDiagFormat(
202                 odr,
203                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
204                 resultSetId.c_str());
205         package.response() = apdu;
206         
207         return;
208     }
209     id = new yp2::Session(sets_it->second.m_backend_session);
210     
211     // sending present to backend
212     Package present_package(*id, package.origin());
213     present_package.copy_filter(package);
214     
215     req->resultSetId = odr_strdup(odr, "default");
216     present_package.request() = yazpp_1::GDU(apdu);
217
218     present_package.move();
219
220     if (present_package.session().is_closed())
221     {
222         Z_APDU *apdu = zget_APDU(odr, Z_APDU_presentResponse);
223         
224         Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
225         apdu->u.presentResponse->records = rec;
226         rec->which = Z_Records_NSD;
227         rec->u.nonSurrogateDiagnostic =
228             zget_DefaultDiagFormat(
229                 odr,
230                 YAZ_BIB1_RESULT_SET_NO_LONGER_EXISTS_UNILATERALLY_DELETED_BY_,
231                 resultSetId.c_str());
232         package.response() = apdu;
233         
234         m_sets.erase(resultSetId);
235     }
236     else
237     {
238         package.response() = present_package.response();
239     }
240     delete id;
241 }
242
243 void yf::Virt_db::Frontend::search(Package &package, Z_APDU *apdu,
244                           const std::map<std::string, Virt_db::Map> &maps)
245 {
246     Z_SearchRequest *req = apdu->u.searchRequest;
247     std::string vhost;
248     std::string database;
249     std::string resultSetId = req->resultSetName;
250     bool support_named_result_sets = false;  // whether backend supports it
251     yp2::odr odr;
252     
253     if (req->num_databaseNames != 1)
254     {   // exactly one database must be specified
255         Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
256         
257         Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
258         apdu->u.searchResponse->records = rec;
259         rec->which = Z_Records_NSD;
260         rec->u.nonSurrogateDiagnostic =
261             zget_DefaultDiagFormat(
262                 odr, YAZ_BIB1_TOO_MANY_DATABASES_SPECIFIED, 0);
263         package.response() = apdu;
264         
265         return;
266     }
267     database = req->databaseNames[0];
268     std::map<std::string, Virt_db::Map>::const_iterator map_it;
269     map_it = maps.find(database);
270     if (map_it == maps.end()) 
271     {   // no map for database: return diagnostic
272         Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
273         
274         Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
275         apdu->u.searchResponse->records = rec;
276         rec->which = Z_Records_NSD;
277         rec->u.nonSurrogateDiagnostic =
278             zget_DefaultDiagFormat(
279                 odr, YAZ_BIB1_DATABASE_DOES_NOT_EXIST, database.c_str());
280         package.response() = apdu;
281         
282         return;
283     }
284     if (*req->replaceIndicator == 0)
285     {
286         Sets_it sets_it = m_sets.find(req->resultSetName);
287         if (sets_it != m_sets.end())
288         {
289             Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
290             
291             Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
292             apdu->u.searchResponse->records = rec;
293             rec->which = Z_Records_NSD;
294             rec->u.nonSurrogateDiagnostic =
295                 zget_DefaultDiagFormat(
296                     odr,
297                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
298                     0);
299             package.response() = apdu;
300             
301             return;
302         }
303     }
304     m_sets.erase(req->resultSetName);
305     vhost = map_it->second.m_vhost;
306     std::string route = map_it->second.m_route;
307     // we might look for an existing session with same vhost
308     Session id;
309     const char *vhost_cstr = vhost.c_str();
310     if (true)
311     {  // sending init to backend
312         Package init_package(id, package.origin());
313         init_package.copy_filter(package);
314         
315         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
316         
317         yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
318                                  VAL_PROXY, 1, vhost_cstr);
319         
320         init_package.request() = init_apdu;
321
322         init_package.move(route);  // sending init 
323
324         if (init_package.session().is_closed())
325         {
326             Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
327             
328             Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
329             apdu->u.searchResponse->records = rec;
330             rec->which = Z_Records_NSD;
331             rec->u.nonSurrogateDiagnostic =
332                 zget_DefaultDiagFormat(
333                     odr, YAZ_BIB1_DATABASE_UNAVAILABLE, database.c_str());
334             package.response() = apdu;
335         }
336         Z_GDU *gdu = init_package.response().get();
337         // we hope to get an init response
338         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
339             Z_APDU_initResponse)
340         {
341             if (ODR_MASK_GET(gdu->u.z3950->u.initResponse->options,
342                              Z_Options_namedResultSets))
343                 support_named_result_sets = true;
344         }
345         else
346         {
347             Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
348             
349             Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
350             apdu->u.searchResponse->records = rec;
351             rec->which = Z_Records_NSD;
352             rec->u.nonSurrogateDiagnostic =
353                 zget_DefaultDiagFormat(
354                     odr, YAZ_BIB1_DATABASE_UNAVAILABLE, database.c_str());
355             package.response() = apdu;
356             
357             return;
358         }
359     }
360     // sending search to backend
361     Package search_package(id, package.origin());
362
363     search_package.copy_filter(package);
364     const char *sep = strchr(vhost_cstr, '/');
365     if (sep)
366         req->databaseNames[0] = odr_strdup(odr, sep+1);
367
368     *req->replaceIndicator = 1;
369
370     std::string backend_resultSetId = "default";
371     req->resultSetName = odr_strdup(odr, backend_resultSetId.c_str());
372     search_package.request() = yazpp_1::GDU(apdu);
373     
374     search_package.move(route);
375
376     if (search_package.session().is_closed())
377     {
378         Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
379         
380         Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
381         apdu->u.searchResponse->records = rec;
382         rec->which = Z_Records_NSD;
383         rec->u.nonSurrogateDiagnostic =
384             zget_DefaultDiagFormat(
385                 odr, YAZ_BIB1_DATABASE_UNAVAILABLE, database.c_str());
386         package.response() = apdu;
387         
388         return;
389     }
390     package.response() = search_package.response();
391     
392     m_sets[resultSetId] =
393         Virt_db::Set(id, backend_resultSetId, vhost, route,
394                      support_named_result_sets);
395 }
396
397 void yf::Virt_db::add_map_db2vhost(std::string db, std::string vhost,
398                                    std::string route)
399 {
400     m_p->m_maps[db] = Virt_db::Map(vhost, route);
401 }
402
403 void yf::Virt_db::process(Package &package) const
404 {
405     yf::Virt_db::Frontend *f = m_p->get_frontend(package);
406     if (f)
407     {
408         Z_GDU *gdu = package.request().get();
409         
410         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
411             Z_APDU_initRequest && !f->m_is_virtual)
412         {
413             Z_InitRequest *req = gdu->u.z3950->u.initRequest;
414
415             const char *vhost =
416                 yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0);
417             if (!vhost)
418             {
419                 yp2::odr odr;
420                 Z_APDU *apdu = zget_APDU(odr, Z_APDU_initResponse);
421                 Z_InitResponse *resp = apdu->u.initResponse;
422                 
423                 int i;
424                 static const int masks[] = {
425                     Z_Options_search,
426                     Z_Options_present,
427                     Z_Options_namedResultSets,
428                     -1 
429                 };
430                 for (i = 0; masks[i] != -1; i++)
431                     if (ODR_MASK_GET(req->options, masks[i]))
432                         ODR_MASK_SET(resp->options, masks[i]);
433                 
434                 static const int versions[] = {
435                     Z_ProtocolVersion_1,
436                     Z_ProtocolVersion_2,
437                     Z_ProtocolVersion_3,
438                     -1
439                 };
440                 for (i = 0; versions[i] != -1; i++)
441                     if (ODR_MASK_GET(req->protocolVersion, versions[i]))
442                         ODR_MASK_SET(resp->protocolVersion, versions[i]);
443                     else
444                         break;
445                 
446                 package.response() = apdu;
447                 f->m_is_virtual = true;
448             }
449             else
450                 package.move();
451         }
452         else if (!f->m_is_virtual)
453             package.move();
454         else if (gdu && gdu->which == Z_GDU_Z3950)
455         {
456             Z_APDU *apdu = gdu->u.z3950;
457             if (apdu->which == Z_APDU_initRequest)
458             {
459                 yp2::odr odr;
460                 
461                 package.response() = odr.create_close(
462                     Z_Close_protocolError,
463                     "double init");
464                 
465                 package.session().close();
466             }
467             else if (apdu->which == Z_APDU_searchRequest)
468             {
469                 f->search(package, apdu, m_p->m_maps);
470             }
471             else if (apdu->which == Z_APDU_presentRequest)
472             {
473                 f->present(package, apdu);
474             }
475             else
476             {
477                 yp2::odr odr;
478                 
479                 package.response() = odr.create_close(
480                     Z_Close_protocolError,
481                     "unsupported APDU in filter_virt_db");
482                 
483                 package.session().close();
484             }
485         }
486     }
487     m_p->release_frontend(package);
488 }
489
490
491 void yp2::filter::Virt_db::configure(const xmlNode * ptr)
492 {
493     for (ptr = ptr->children; ptr; ptr = ptr->next)
494     {
495         if (ptr->type != XML_ELEMENT_NODE)
496             continue;
497         if (!strcmp((const char *) ptr->name, "virtual"))
498         {
499             std::string database;
500             std::string target;
501             xmlNode *v_node = ptr->children;
502             for (; v_node; v_node = v_node->next)
503             {
504                 if (v_node->type != XML_ELEMENT_NODE)
505                     continue;
506                 
507                 if (yp2::xml::is_element_yp2(v_node, "database"))
508                     database = yp2::xml::get_text(v_node);
509                 else if (yp2::xml::is_element_yp2(v_node, "target"))
510                     target = yp2::xml::get_text(v_node);
511                 else
512                     throw yp2::filter::FilterException
513                         ("Bad element " 
514                          + std::string((const char *) v_node->name)
515                          + " in virtual section"
516                             );
517             }
518             std::string route = yp2::xml::get_route(ptr);
519             add_map_db2vhost(database, target, route);
520             std::cout << "Add " << database << "->" << target
521                       << "," << route << "\n";
522         }
523         else
524         {
525             throw yp2::filter::FilterException
526                 ("Bad element " 
527                  + std::string((const char *) ptr->name)
528                  + " in virt_db filter");
529         }
530     }
531 }
532
533 static yp2::filter::Base* filter_creator()
534 {
535     return new yp2::filter::Virt_db;
536 }
537
538 extern "C" {
539     struct yp2_filter_struct yp2_filter_virt_db = {
540         0,
541         "virt_db",
542         filter_creator
543     };
544 }
545
546
547 /*
548  * Local variables:
549  * c-basic-offset: 4
550  * indent-tabs-mode: nil
551  * c-file-style: "stroustrup"
552  * End:
553  * vim: shiftwidth=4 tabstop=8 expandtab
554  */