Put proper reference IDs in response PDUs properly.
[metaproxy-moved-to-github.git] / src / filter_virt_db.cpp
1 /* $Id: filter_virt_db.cpp,v 1.23 2006-01-13 15:09:35 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
14 #include <boost/shared_ptr.hpp>
15
16 #include "util.hpp"
17 #include "filter_virt_db.hpp"
18
19 #include <yaz/zgdu.h>
20 #include <yaz/otherinfo.h>
21 #include <yaz/diagbib1.h>
22
23 #include <map>
24 #include <iostream>
25
26 namespace yf = yp2::filter;
27
28 namespace yp2 {
29     namespace filter {
30
31         struct Virt_db::Set {
32             Set(BackendPtr b, std::string setname);
33             Set();
34             ~Set();
35
36             BackendPtr m_backend;
37             std::string m_setname;
38         };
39         struct Virt_db::Map {
40             Map(std::string vhost, std::string route);
41             Map();
42             std::string m_vhost;
43             std::string m_route;
44         };
45         struct Virt_db::Backend {
46             yp2::Session m_backend_session;
47             std::string m_backend_database;
48             std::string m_frontend_database;
49             std::string m_vhost;
50             std::string m_route;
51             bool m_named_result_sets;
52             int m_number_of_sets;
53         };
54         struct Virt_db::Frontend {
55             Frontend(Rep *rep);
56             ~Frontend();
57             yp2::Session m_session;
58             bool m_is_virtual;
59             bool m_in_use;
60             std::list<BackendPtr> m_backend_list;
61             std::map<std::string,Virt_db::Set> m_sets;
62
63             void search(Package &package, Z_APDU *apdu);
64             void present(Package &package, Z_APDU *apdu);
65             void scan(Package &package, Z_APDU *apdu);
66
67             void close(Package &package);
68             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
69
70             BackendPtr lookup_backend_from_database(std::string database);
71             BackendPtr create_backend_from_database(std::string database);
72             
73             BackendPtr init_backend(std::string database, Package &package,
74                                     int &error_code, std::string &addinfo);
75             Rep *m_p;
76         };            
77         class Virt_db::Rep {
78             friend class Virt_db;
79             friend class Frontend;
80             
81             Frontend *get_frontend(Package &package);
82             void release_frontend(Package &package);
83         private:
84             boost::mutex m_sessions_mutex;
85             std::map<std::string, Virt_db::Map>m_maps;
86
87             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
88
89             boost::mutex m_mutex;
90             boost::condition m_cond_session_ready;
91             std::map<yp2::Session,Frontend *> m_clients;
92         };
93     }
94 }
95
96 using namespace yp2;
97
98 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::lookup_backend_from_database(
99     std::string database)
100 {
101     std::list<BackendPtr>::const_iterator map_it;
102     map_it = m_backend_list.begin();
103     for (; map_it != m_backend_list.end(); map_it++)
104         if ((*map_it)->m_frontend_database == database)
105             return *map_it;
106     BackendPtr null;
107     return null;
108 }
109
110 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::create_backend_from_database(
111     std::string database)
112 {
113     std::map<std::string, Virt_db::Map>::iterator map_it;
114     map_it = m_p->m_maps.find(database);
115     if (map_it == m_p->m_maps.end())
116     {
117         BackendPtr ptr;
118         return ptr;
119     }
120     BackendPtr b(new Backend);
121
122     b->m_number_of_sets = 0;
123     b->m_frontend_database = database;
124     b->m_named_result_sets = false;
125     b->m_route = map_it->second.m_route;
126     b->m_vhost = map_it->second.m_vhost;
127     const char *sep = strchr(b->m_vhost.c_str(), '/');
128     std::string backend_database;
129     if (sep)
130         b->m_backend_database = std::string(sep+1);
131     else
132         b->m_backend_database = database;
133
134     return b;
135 }
136
137 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::init_backend(
138     std::string database, Package &package,
139     int &error_code, std::string &addinfo)
140 {
141     BackendPtr b = create_backend_from_database(database);
142     if (!b)
143     {
144         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
145         addinfo = database;
146         return b;
147     }
148     Package init_package(b->m_backend_session, package.origin());
149     init_package.copy_filter(package);
150
151     yp2::odr odr;
152
153     Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
154     
155     yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
156                              VAL_PROXY, 1, b->m_vhost.c_str());
157     
158     Z_InitRequest *req = init_apdu->u.initRequest;
159
160     ODR_MASK_SET(req->options, Z_Options_search);
161     ODR_MASK_SET(req->options, Z_Options_present);
162     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
163     ODR_MASK_SET(req->options, Z_Options_scan);
164
165     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
166     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
167     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
168
169     init_package.request() = init_apdu;
170     
171     init_package.move(b->m_route);  // sending init 
172     
173     if (init_package.session().is_closed())
174     {
175         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
176         addinfo = database;
177         BackendPtr null;
178         return null;
179     }
180     Z_GDU *gdu = init_package.response().get();
181     // we hope to get an init response
182     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
183         Z_APDU_initResponse)
184     {
185         if (ODR_MASK_GET(gdu->u.z3950->u.initResponse->options,
186                          Z_Options_namedResultSets))
187         {
188             b->m_named_result_sets = true;
189         }
190     }
191     else
192     {
193         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
194         addinfo = database;
195         BackendPtr null;
196         return null;
197     }        
198     m_backend_list.push_back(b);
199     return b;
200 }
201
202 void yf::Virt_db::Frontend::search(Package &package, Z_APDU *apdu_req)
203 {
204     Z_SearchRequest *req = apdu_req->u.searchRequest;
205     std::string vhost;
206     std::string resultSetId = req->resultSetName;
207     yp2::odr odr;
208
209     // only one datatabase for now
210     if (req->num_databaseNames != 1)
211     {   // exactly one database must be specified
212         Z_APDU *apdu =
213             odr.create_searchResponse(
214                 apdu_req, YAZ_BIB1_TOO_MANY_DATABASES_SPECIFIED, 0);
215         package.response() = apdu;
216         
217         return;
218     }
219     std::string database = std::string(req->databaseNames[0]);
220
221     BackendPtr b; // null for now
222     Sets_it sets_it = m_sets.find(req->resultSetName);
223     if (sets_it != m_sets.end())
224     {
225         // result set already exist 
226         // if replace indicator is off: we return diagnostic if
227         // result set already exist.
228         if (*req->replaceIndicator == 0)
229         {
230             Z_APDU *apdu = 
231                 odr.create_searchResponse(
232                     apdu_req,
233                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
234                     0);
235             package.response() = apdu;
236             
237             return;
238         } 
239         sets_it->second.m_backend->m_number_of_sets--;
240
241         // pick up any existing backend with a database match
242         std::list<BackendPtr>::const_iterator map_it;
243         map_it = m_backend_list.begin();
244         for (; map_it != m_backend_list.end(); map_it++)
245         {
246             BackendPtr tmp = *map_it;
247             if (tmp->m_frontend_database == database)
248                 break;
249         }
250         if (map_it != m_backend_list.end()) 
251             b = *map_it;
252     }
253     else
254     {
255         // new result set.
256
257         // pick up any existing database with named result sets ..
258         // or one which has no result sets.. yet.
259         std::list<BackendPtr>::const_iterator map_it;
260         map_it = m_backend_list.begin();
261         for (; map_it != m_backend_list.end(); map_it++)
262         {
263             BackendPtr tmp = *map_it;
264             if (tmp->m_frontend_database == database &&
265                 (tmp->m_named_result_sets ||
266                  tmp->m_number_of_sets == 0))
267                 break;
268         }
269         if (map_it != m_backend_list.end()) 
270             b = *map_it;
271     }
272     if (!b)  // no backend yet. Must create a new one
273     {
274         int error_code;
275         std::string addinfo;
276         b = init_backend(database, package, error_code, addinfo);
277         if (!b)
278         {
279             // did not get a backend (unavailable somehow?)
280             
281             Z_APDU *apdu = 
282                 odr.create_searchResponse(
283                     apdu_req, error_code, addinfo.c_str());
284             package.response() = apdu;
285             return;
286         }
287     }
288     m_sets.erase(req->resultSetName);
289     // sending search to backend
290     Package search_package(b->m_backend_session, package.origin());
291
292     search_package.copy_filter(package);
293
294     std::string backend_setname;
295     if (b->m_named_result_sets)
296     {
297         std::cout << "named_result_sets TRUE\n";
298         backend_setname = std::string(req->resultSetName);
299     }
300     else
301     {
302         std::cout << "named_result_sets FALSE\n";
303         backend_setname = "default";
304         req->resultSetName = odr_strdup(odr, backend_setname.c_str());
305     }
306
307     const char *backend_database = b->m_backend_database.c_str();
308     req->databaseNames[0] = odr_strdup(odr, backend_database);
309
310     *req->replaceIndicator = 1;
311
312     search_package.request() = yazpp_1::GDU(apdu_req);
313     
314     search_package.move(b->m_route);
315
316     if (search_package.session().is_closed())
317     {
318         Z_APDU *apdu = 
319             odr.create_searchResponse(
320                 apdu_req,
321                 YAZ_BIB1_DATABASE_UNAVAILABLE, database.c_str());
322         package.response() = apdu;
323         return;
324     }
325     package.response() = search_package.response();
326
327     b->m_number_of_sets++;
328
329     m_sets[resultSetId] = Virt_db::Set(b, backend_setname);
330 }
331
332 yf::Virt_db::Frontend::Frontend(Rep *rep)
333 {
334     m_p = rep;
335     m_is_virtual = false;
336 }
337
338 void yf::Virt_db::Frontend::close(Package &package)
339 {
340     std::list<BackendPtr>::const_iterator b_it;
341     
342     for (b_it = m_backend_list.begin(); b_it != m_backend_list.end(); b_it++)
343     {
344         (*b_it)->m_backend_session.close();
345         Package close_package((*b_it)->m_backend_session, package.origin());
346         close_package.copy_filter(package);
347         close_package.move((*b_it)->m_route);
348     }
349     m_backend_list.clear();
350 }
351
352 yf::Virt_db::Frontend::~Frontend()
353 {
354 }
355
356 yf::Virt_db::Frontend *yf::Virt_db::Rep::get_frontend(Package &package)
357 {
358     boost::mutex::scoped_lock lock(m_mutex);
359
360     std::map<yp2::Session,yf::Virt_db::Frontend *>::iterator it;
361     
362     while(true)
363     {
364         it = m_clients.find(package.session());
365         if (it == m_clients.end())
366             break;
367         
368         if (!it->second->m_in_use)
369         {
370             it->second->m_in_use = true;
371             return it->second;
372         }
373         m_cond_session_ready.wait(lock);
374     }
375     Frontend *f = new Frontend(this);
376     m_clients[package.session()] = f;
377     f->m_in_use = true;
378     return f;
379 }
380
381
382 void yf::Virt_db::Rep::release_frontend(Package &package)
383 {
384     boost::mutex::scoped_lock lock(m_mutex);
385     std::map<yp2::Session,yf::Virt_db::Frontend *>::iterator it;
386     
387     it = m_clients.find(package.session());
388     if (it != m_clients.end())
389     {
390         if (package.session().is_closed())
391         {
392             it->second->close(package);
393             delete it->second;
394             m_clients.erase(it);
395         }
396         else
397         {
398             it->second->m_in_use = false;
399         }
400         m_cond_session_ready.notify_all();
401     }
402 }
403
404 yf::Virt_db::Set::Set(BackendPtr b, std::string setname)
405     :  m_backend(b), m_setname(setname)
406 {
407 }
408
409
410 yf::Virt_db::Set::Set()
411 {
412 }
413
414
415 yf::Virt_db::Set::~Set()
416 {
417 }
418
419 yf::Virt_db::Map::Map(std::string vhost, std::string route)
420     : m_vhost(vhost), m_route(route) 
421 {
422 }
423
424 yf::Virt_db::Map::Map()
425 {
426 }
427
428 yf::Virt_db::Virt_db() : m_p(new Virt_db::Rep)
429 {
430 }
431
432 yf::Virt_db::~Virt_db() {
433 }
434
435 void yf::Virt_db::Frontend::present(Package &package, Z_APDU *apdu_req)
436 {
437     Z_PresentRequest *req = apdu_req->u.presentRequest;
438     std::string resultSetId = req->resultSetId;
439     yp2::odr odr;
440
441     Sets_it sets_it = m_sets.find(resultSetId);
442     if (sets_it == m_sets.end())
443     {
444         Z_APDU *apdu = 
445             odr.create_presentResponse(
446                 apdu_req,
447                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
448                 resultSetId.c_str());
449         package.response() = apdu;
450         
451         return;
452     }
453     Session *id =
454         new yp2::Session(sets_it->second.m_backend->m_backend_session);
455     
456     // sending present to backend
457     Package present_package(*id, package.origin());
458     present_package.copy_filter(package);
459     
460     req->resultSetId = odr_strdup(odr, "default");
461     present_package.request() = yazpp_1::GDU(apdu_req);
462
463     present_package.move();
464
465     if (present_package.session().is_closed())
466     {
467         Z_APDU *apdu = 
468             odr.create_presentResponse(
469                 apdu_req,
470                 YAZ_BIB1_RESULT_SET_NO_LONGER_EXISTS_UNILATERALLY_DELETED_BY_,
471                 resultSetId.c_str());
472         package.response() = apdu;
473         m_sets.erase(resultSetId);
474     }
475     else
476     {
477         package.response() = present_package.response();
478     }
479     delete id;
480 }
481
482 void yf::Virt_db::Frontend::scan(Package &package, Z_APDU *apdu_req)
483 {
484     Z_ScanRequest *req = apdu_req->u.scanRequest;
485     std::string vhost;
486     yp2::odr odr;
487
488     // only one datatabase for now
489     if (req->num_databaseNames != 1)
490     {   // exactly one database must be specified
491         Z_APDU *apdu =
492             odr.create_scanResponse(
493                 apdu_req,
494                 YAZ_BIB1_TOO_MANY_DATABASES_SPECIFIED, 0);
495         package.response() = apdu;
496         return;
497     }
498     std::string database = std::string(req->databaseNames[0]);
499     
500     BackendPtr b;
501     // pick up any existing backend with a database match
502     std::list<BackendPtr>::const_iterator map_it;
503     map_it = m_backend_list.begin();
504     for (; map_it != m_backend_list.end(); map_it++)
505     {
506         BackendPtr tmp = *map_it;
507         if (tmp->m_frontend_database == database)
508             break;
509     }
510     if (map_it != m_backend_list.end()) 
511         b = *map_it;
512     if (!b)  // no backend yet. Must create a new one
513     {
514         int error_code;
515         std::string addinfo;
516         b = init_backend(database, package, error_code, addinfo);
517         if (!b)
518         {
519             // did not get a backend (unavailable somehow?)
520             Z_APDU *apdu =
521                 odr.create_scanResponse(
522                     apdu_req, error_code, addinfo.c_str());
523             package.response() = apdu;
524             
525             return;
526         }
527     }
528     // sending scan to backend
529     Package scan_package(b->m_backend_session, package.origin());
530
531     scan_package.copy_filter(package);
532
533     const char *backend_database = b->m_backend_database.c_str();
534     req->databaseNames[0] = odr_strdup(odr, backend_database);
535
536     scan_package.request() = yazpp_1::GDU(apdu_req);
537     
538     scan_package.move(b->m_route);
539
540     if (scan_package.session().is_closed())
541     {
542         Z_APDU *apdu =
543             odr.create_scanResponse(
544                 apdu_req, YAZ_BIB1_DATABASE_UNAVAILABLE, database.c_str());
545         package.response() = apdu;
546         return;
547     }
548     package.response() = scan_package.response();
549 }
550
551
552 void yf::Virt_db::add_map_db2vhost(std::string db, std::string vhost,
553                                    std::string route)
554 {
555     m_p->m_maps[db] = Virt_db::Map(vhost, route);
556 }
557
558 void yf::Virt_db::process(Package &package) const
559 {
560     yf::Virt_db::Frontend *f = m_p->get_frontend(package);
561     if (f)
562     {
563         Z_GDU *gdu = package.request().get();
564         
565         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
566             Z_APDU_initRequest && !f->m_is_virtual)
567         {
568             Z_InitRequest *req = gdu->u.z3950->u.initRequest;
569
570             const char *vhost =
571                 yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0);
572             if (!vhost)
573             {
574                 yp2::odr odr;
575                 Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
576                 Z_InitResponse *resp = apdu->u.initResponse;
577                 
578                 int i;
579                 static const int masks[] = {
580                     Z_Options_search,
581                     Z_Options_present,
582                     Z_Options_namedResultSets,
583                     Z_Options_scan,
584                     -1 
585                 };
586                 for (i = 0; masks[i] != -1; i++)
587                     if (ODR_MASK_GET(req->options, masks[i]))
588                         ODR_MASK_SET(resp->options, masks[i]);
589                 
590                 static const int versions[] = {
591                     Z_ProtocolVersion_1,
592                     Z_ProtocolVersion_2,
593                     Z_ProtocolVersion_3,
594                     -1
595                 };
596                 for (i = 0; versions[i] != -1; i++)
597                     if (ODR_MASK_GET(req->protocolVersion, versions[i]))
598                         ODR_MASK_SET(resp->protocolVersion, versions[i]);
599                     else
600                         break;
601                 
602                 package.response() = apdu;
603                 f->m_is_virtual = true;
604             }
605             else
606                 package.move();
607         }
608         else if (!f->m_is_virtual)
609             package.move();
610         else if (gdu && gdu->which == Z_GDU_Z3950)
611         {
612             Z_APDU *apdu = gdu->u.z3950;
613             if (apdu->which == Z_APDU_initRequest)
614             {
615                 yp2::odr odr;
616                 
617                 package.response() = odr.create_close(
618                     apdu,
619                     Z_Close_protocolError,
620                     "double init");
621                 
622                 package.session().close();
623             }
624             else if (apdu->which == Z_APDU_searchRequest)
625             {
626                 f->search(package, apdu);
627             }
628             else if (apdu->which == Z_APDU_presentRequest)
629             {
630                 f->present(package, apdu);
631             }
632             else if (apdu->which == Z_APDU_scanRequest)
633             {
634                 f->scan(package, apdu);
635             }
636             else
637             {
638                 yp2::odr odr;
639                 
640                 package.response() = odr.create_close(
641                     apdu, Z_Close_protocolError,
642                     "unsupported APDU in filter_virt_db");
643                 
644                 package.session().close();
645             }
646         }
647     }
648     m_p->release_frontend(package);
649 }
650
651
652 void yp2::filter::Virt_db::configure(const xmlNode * ptr)
653 {
654     for (ptr = ptr->children; ptr; ptr = ptr->next)
655     {
656         if (ptr->type != XML_ELEMENT_NODE)
657             continue;
658         if (!strcmp((const char *) ptr->name, "virtual"))
659         {
660             std::string database;
661             std::string target;
662             xmlNode *v_node = ptr->children;
663             for (; v_node; v_node = v_node->next)
664             {
665                 if (v_node->type != XML_ELEMENT_NODE)
666                     continue;
667                 
668                 if (yp2::xml::is_element_yp2(v_node, "database"))
669                     database = yp2::xml::get_text(v_node);
670                 else if (yp2::xml::is_element_yp2(v_node, "target"))
671                     target = yp2::xml::get_text(v_node);
672                 else
673                     throw yp2::filter::FilterException
674                         ("Bad element " 
675                          + std::string((const char *) v_node->name)
676                          + " in virtual section"
677                             );
678             }
679             std::string route = yp2::xml::get_route(ptr);
680             add_map_db2vhost(database, target, route);
681             std::cout << "Add " << database << "->" << target
682                       << "," << route << "\n";
683         }
684         else
685         {
686             throw yp2::filter::FilterException
687                 ("Bad element " 
688                  + std::string((const char *) ptr->name)
689                  + " in virt_db filter");
690         }
691     }
692 }
693
694 static yp2::filter::Base* filter_creator()
695 {
696     return new yp2::filter::Virt_db;
697 }
698
699 extern "C" {
700     struct yp2_filter_struct yp2_filter_virt_db = {
701         0,
702         "virt_db",
703         filter_creator
704     };
705 }
706
707
708 /*
709  * Local variables:
710  * c-basic-offset: 4
711  * indent-tabs-mode: nil
712  * c-file-style: "stroustrup"
713  * End:
714  * vim: shiftwidth=4 tabstop=8 expandtab
715  */