Filter virt_db allows multiple databases.. Each of these is
[metaproxy-moved-to-github.git] / src / filter_virt_db.cpp
1 /* $Id: filter_virt_db.cpp,v 1.26 2006-01-16 15:51:56 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
14 #include <boost/shared_ptr.hpp>
15
16 #include "util.hpp"
17 #include "filter_virt_db.hpp"
18
19 #include <yaz/zgdu.h>
20 #include <yaz/otherinfo.h>
21 #include <yaz/diagbib1.h>
22
23 #include <map>
24 #include <iostream>
25
26 namespace yf = yp2::filter;
27
28 namespace yp2 {
29     namespace filter {
30
31         struct Virt_db::Set {
32             Set(BackendPtr b, std::string setname);
33             Set();
34             ~Set();
35
36             BackendPtr m_backend;
37             std::string m_setname;
38         };
39         struct Virt_db::Map {
40             Map(std::list<std::string> targets, std::string route);
41             Map();
42             std::list<std::string> m_targets;
43             std::string m_route;
44         };
45         struct Virt_db::Backend {
46             yp2::Session m_backend_session;
47 #if 0
48             std::string m_backend_database;
49 #endif
50             std::list<std::string> m_frontend_databases;
51             std::list<std::string> m_targets;
52             std::string m_route;
53             bool m_named_result_sets;
54             int m_number_of_sets;
55         };
56         struct Virt_db::Frontend {
57             Frontend(Rep *rep);
58             ~Frontend();
59             yp2::Session m_session;
60             bool m_is_virtual;
61             bool m_in_use;
62             std::list<BackendPtr> m_backend_list;
63             std::map<std::string,Virt_db::Set> m_sets;
64
65             void search(Package &package, Z_APDU *apdu);
66             void present(Package &package, Z_APDU *apdu);
67             void scan(Package &package, Z_APDU *apdu);
68
69             void close(Package &package);
70             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
71
72             BackendPtr lookup_backend_from_databases(
73                 std::list<std::string> databases);
74             BackendPtr create_backend_from_databases(
75                 std::list<std::string> databases);
76             
77             BackendPtr init_backend(std::list<std::string> database,
78                                     Package &package,
79                                     int &error_code, std::string &addinfo);
80             Rep *m_p;
81         };            
82         class Virt_db::Rep {
83             friend class Virt_db;
84             friend class Frontend;
85             
86             FrontendPtr get_frontend(Package &package);
87             void release_frontend(Package &package);
88         private:
89             boost::mutex m_sessions_mutex;
90             std::map<std::string, Virt_db::Map>m_maps;
91
92             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
93
94             boost::mutex m_mutex;
95             boost::condition m_cond_session_ready;
96             std::map<yp2::Session, FrontendPtr> m_clients;
97         };
98     }
99 }
100
101 using namespace yp2;
102
103 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::lookup_backend_from_databases(
104     std::list<std::string> databases)
105 {
106     std::list<BackendPtr>::const_iterator map_it;
107     map_it = m_backend_list.begin();
108     for (; map_it != m_backend_list.end(); map_it++)
109         if ((*map_it)->m_frontend_databases == databases)
110             return *map_it;
111     BackendPtr null;
112     return null;
113 }
114
115 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::create_backend_from_databases(
116     std::list<std::string> databases)
117 {
118     BackendPtr b(new Backend);
119     std::list<std::string>::const_iterator db_it = databases.begin();
120
121     b->m_number_of_sets = 0;
122     b->m_frontend_databases = databases;
123     b->m_named_result_sets = false;
124
125     std::map<std::string,bool> targets_dedup;
126     for (; db_it != databases.end(); db_it++)
127     {
128         std::map<std::string, Virt_db::Map>::iterator map_it;
129         map_it = m_p->m_maps.find(*db_it);
130         if (map_it == m_p->m_maps.end())  // database not found
131         {
132             BackendPtr ptr;
133             return ptr;
134         }
135         std::list<std::string>::const_iterator t_it =
136             map_it->second.m_targets.begin();
137         for (; t_it != map_it->second.m_targets.end(); t_it++)
138             targets_dedup[*t_it] = true;
139         b->m_route = map_it->second.m_route;
140     }
141     std::map<std::string,bool>::const_iterator tm_it = targets_dedup.begin();
142     for (; tm_it != targets_dedup.end(); tm_it++)
143         b->m_targets.push_back(tm_it->first);
144 #if 0
145     const char *sep = strchr(b->m_vhost.c_str(), '/');
146     std::string backend_database;
147     if (sep)
148         b->m_backend_database = std::string(sep+1);
149     else
150         b->m_backend_database = database;
151 #endif
152     return b;
153 }
154
155 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::init_backend(
156     std::list<std::string> databases, Package &package,
157     int &error_code, std::string &addinfo)
158 {
159     BackendPtr b = create_backend_from_databases(databases);
160     if (!b)
161     {
162         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
163         // addinfo = database;
164         return b;
165     }
166     Package init_package(b->m_backend_session, package.origin());
167     init_package.copy_filter(package);
168
169     yp2::odr odr;
170
171     Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
172     
173     std::list<std::string>::const_iterator t_it = b->m_targets.begin();
174     int cat = 1;
175     for (; t_it != b->m_targets.end(); t_it++, cat++)
176     {
177         yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
178                                  VAL_PROXY, cat, t_it->c_str());
179     }
180     Z_InitRequest *req = init_apdu->u.initRequest;
181
182     ODR_MASK_SET(req->options, Z_Options_search);
183     ODR_MASK_SET(req->options, Z_Options_present);
184     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
185     ODR_MASK_SET(req->options, Z_Options_scan);
186
187     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
188     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
189     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
190
191     init_package.request() = init_apdu;
192     
193     init_package.move(b->m_route);  // sending init 
194     
195     if (init_package.session().is_closed())
196     {
197         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
198         // addinfo = database;
199         BackendPtr null;
200         return null;
201     }
202     Z_GDU *gdu = init_package.response().get();
203     // we hope to get an init response
204     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
205         Z_APDU_initResponse)
206     {
207         if (ODR_MASK_GET(gdu->u.z3950->u.initResponse->options,
208                          Z_Options_namedResultSets))
209         {
210             b->m_named_result_sets = true;
211         }
212     }
213     else
214     {
215         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
216         // addinfo = database;
217         BackendPtr null;
218         return null;
219     }        
220     m_backend_list.push_back(b);
221     return b;
222 }
223
224 void yf::Virt_db::Frontend::search(Package &package, Z_APDU *apdu_req)
225 {
226     Z_SearchRequest *req = apdu_req->u.searchRequest;
227     std::string vhost;
228     std::string resultSetId = req->resultSetName;
229     yp2::odr odr;
230
231     std::list<std::string> databases;
232     int i;
233     for (i = 0; i<req->num_databaseNames; i++)
234         databases.push_back(req->databaseNames[i]);
235
236     BackendPtr b; // null for now
237     Sets_it sets_it = m_sets.find(req->resultSetName);
238     if (sets_it != m_sets.end())
239     {
240         // result set already exist 
241         // if replace indicator is off: we return diagnostic if
242         // result set already exist.
243         if (*req->replaceIndicator == 0)
244         {
245             Z_APDU *apdu = 
246                 odr.create_searchResponse(
247                     apdu_req,
248                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
249                     0);
250             package.response() = apdu;
251             
252             return;
253         } 
254         sets_it->second.m_backend->m_number_of_sets--;
255
256         // pick up any existing backend with a database match
257         std::list<BackendPtr>::const_iterator map_it;
258         map_it = m_backend_list.begin();
259         for (; map_it != m_backend_list.end(); map_it++)
260         {
261             BackendPtr tmp = *map_it;
262             if (tmp->m_frontend_databases == databases)
263                 break;
264         }
265         if (map_it != m_backend_list.end()) 
266             b = *map_it;
267     }
268     else
269     {
270         // new result set.
271
272         // pick up any existing database with named result sets ..
273         // or one which has no result sets.. yet.
274         std::list<BackendPtr>::const_iterator map_it;
275         map_it = m_backend_list.begin();
276         for (; map_it != m_backend_list.end(); map_it++)
277         {
278             BackendPtr tmp = *map_it;
279             if (tmp->m_frontend_databases == databases &&
280                 (tmp->m_named_result_sets ||
281                  tmp->m_number_of_sets == 0))
282                 break;
283         }
284         if (map_it != m_backend_list.end()) 
285             b = *map_it;
286     }
287     if (!b)  // no backend yet. Must create a new one
288     {
289         int error_code;
290         std::string addinfo;
291         b = init_backend(databases, package, error_code, addinfo);
292         if (!b)
293         {
294             // did not get a backend (unavailable somehow?)
295             
296             Z_APDU *apdu = 
297                 odr.create_searchResponse(
298                     apdu_req, error_code, addinfo.c_str());
299             package.response() = apdu;
300             return;
301         }
302     }
303     m_sets.erase(req->resultSetName);
304     // sending search to backend
305     Package search_package(b->m_backend_session, package.origin());
306
307     search_package.copy_filter(package);
308
309     std::string backend_setname;
310     if (b->m_named_result_sets)
311     {
312         backend_setname = std::string(req->resultSetName);
313     }
314     else
315     {
316         backend_setname = "default";
317         req->resultSetName = odr_strdup(odr, backend_setname.c_str());
318     }
319
320 #if 0
321     const char *backend_database = b->m_backend_database.c_str();
322     req->databaseNames[0] = odr_strdup(odr, backend_database);
323 #endif
324
325     *req->replaceIndicator = 1;
326
327     search_package.request() = yazpp_1::GDU(apdu_req);
328     
329     search_package.move(b->m_route);
330
331     if (search_package.session().is_closed())
332     {
333         Z_APDU *apdu = 
334             odr.create_searchResponse(
335                 apdu_req, YAZ_BIB1_DATABASE_UNAVAILABLE, 0);
336         package.response() = apdu;
337         return;
338     }
339     package.response() = search_package.response();
340
341     b->m_number_of_sets++;
342
343     m_sets[resultSetId] = Virt_db::Set(b, backend_setname);
344 }
345
346 yf::Virt_db::Frontend::Frontend(Rep *rep)
347 {
348     m_p = rep;
349     m_is_virtual = false;
350 }
351
352 void yf::Virt_db::Frontend::close(Package &package)
353 {
354     std::list<BackendPtr>::const_iterator b_it;
355     
356     for (b_it = m_backend_list.begin(); b_it != m_backend_list.end(); b_it++)
357     {
358         (*b_it)->m_backend_session.close();
359         Package close_package((*b_it)->m_backend_session, package.origin());
360         close_package.copy_filter(package);
361         close_package.move((*b_it)->m_route);
362     }
363     m_backend_list.clear();
364 }
365
366 yf::Virt_db::Frontend::~Frontend()
367 {
368 }
369
370 yf::Virt_db::FrontendPtr yf::Virt_db::Rep::get_frontend(Package &package)
371 {
372     boost::mutex::scoped_lock lock(m_mutex);
373
374     std::map<yp2::Session,yf::Virt_db::FrontendPtr>::iterator it;
375     
376     while(true)
377     {
378         it = m_clients.find(package.session());
379         if (it == m_clients.end())
380             break;
381         
382         if (!it->second->m_in_use)
383         {
384             it->second->m_in_use = true;
385             return it->second;
386         }
387         m_cond_session_ready.wait(lock);
388     }
389     FrontendPtr f(new Frontend(this));
390     m_clients[package.session()] = f;
391     f->m_in_use = true;
392     return f;
393 }
394
395 void yf::Virt_db::Rep::release_frontend(Package &package)
396 {
397     boost::mutex::scoped_lock lock(m_mutex);
398     std::map<yp2::Session,yf::Virt_db::FrontendPtr>::iterator it;
399     
400     it = m_clients.find(package.session());
401     if (it != m_clients.end())
402     {
403         if (package.session().is_closed())
404         {
405             it->second->close(package);
406             m_clients.erase(it);
407         }
408         else
409         {
410             it->second->m_in_use = false;
411         }
412         m_cond_session_ready.notify_all();
413     }
414 }
415
416 yf::Virt_db::Set::Set(BackendPtr b, std::string setname)
417     :  m_backend(b), m_setname(setname)
418 {
419 }
420
421
422 yf::Virt_db::Set::Set()
423 {
424 }
425
426
427 yf::Virt_db::Set::~Set()
428 {
429 }
430
431 yf::Virt_db::Map::Map(std::list<std::string> targets, std::string route)
432     : m_targets(targets), m_route(route) 
433 {
434 }
435
436 yf::Virt_db::Map::Map()
437 {
438 }
439
440 yf::Virt_db::Virt_db() : m_p(new Virt_db::Rep)
441 {
442 }
443
444 yf::Virt_db::~Virt_db() {
445 }
446
447 void yf::Virt_db::Frontend::present(Package &package, Z_APDU *apdu_req)
448 {
449     Z_PresentRequest *req = apdu_req->u.presentRequest;
450     std::string resultSetId = req->resultSetId;
451     yp2::odr odr;
452
453     Sets_it sets_it = m_sets.find(resultSetId);
454     if (sets_it == m_sets.end())
455     {
456         Z_APDU *apdu = 
457             odr.create_presentResponse(
458                 apdu_req,
459                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
460                 resultSetId.c_str());
461         package.response() = apdu;
462         return;
463     }
464     Session *id =
465         new yp2::Session(sets_it->second.m_backend->m_backend_session);
466     
467     // sending present to backend
468     Package present_package(*id, package.origin());
469     present_package.copy_filter(package);
470     
471     req->resultSetId = odr_strdup(odr, "default");
472     present_package.request() = yazpp_1::GDU(apdu_req);
473
474     present_package.move();
475
476     if (present_package.session().is_closed())
477     {
478         Z_APDU *apdu = 
479             odr.create_presentResponse(
480                 apdu_req,
481                 YAZ_BIB1_RESULT_SET_NO_LONGER_EXISTS_UNILATERALLY_DELETED_BY_,
482                 resultSetId.c_str());
483         package.response() = apdu;
484         m_sets.erase(resultSetId);
485     }
486     else
487     {
488         package.response() = present_package.response();
489     }
490     delete id;
491 }
492
493 void yf::Virt_db::Frontend::scan(Package &package, Z_APDU *apdu_req)
494 {
495     Z_ScanRequest *req = apdu_req->u.scanRequest;
496     std::string vhost;
497     yp2::odr odr;
498
499     std::list<std::string> databases;
500     int i;
501     for (i = 0; i<req->num_databaseNames; i++)
502         databases.push_back(req->databaseNames[i]);
503
504     BackendPtr b;
505     // pick up any existing backend with a database match
506     std::list<BackendPtr>::const_iterator map_it;
507     map_it = m_backend_list.begin();
508     for (; map_it != m_backend_list.end(); map_it++)
509     {
510         BackendPtr tmp = *map_it;
511         if (tmp->m_frontend_databases == databases)
512             break;
513     }
514     if (map_it != m_backend_list.end()) 
515         b = *map_it;
516     if (!b)  // no backend yet. Must create a new one
517     {
518         int error_code;
519         std::string addinfo;
520         b = init_backend(databases, package, error_code, addinfo);
521         if (!b)
522         {
523             // did not get a backend (unavailable somehow?)
524             Z_APDU *apdu =
525                 odr.create_scanResponse(
526                     apdu_req, error_code, addinfo.c_str());
527             package.response() = apdu;
528             
529             return;
530         }
531     }
532     // sending scan to backend
533     Package scan_package(b->m_backend_session, package.origin());
534
535     scan_package.copy_filter(package);
536
537 #if 0
538     const char *backend_database = b->m_backend_database.c_str();
539     req->databaseNames[0] = odr_strdup(odr, backend_database);
540 #endif
541
542     scan_package.request() = yazpp_1::GDU(apdu_req);
543     
544     scan_package.move(b->m_route);
545
546     if (scan_package.session().is_closed())
547     {
548         Z_APDU *apdu =
549             odr.create_scanResponse(
550                 apdu_req, YAZ_BIB1_DATABASE_UNAVAILABLE, 0);
551         package.response() = apdu;
552         return;
553     }
554     package.response() = scan_package.response();
555 }
556
557
558 void yf::Virt_db::add_map_db2targets(std::string db, 
559                                      std::list<std::string> targets,
560                                      std::string route)
561 {
562     m_p->m_maps[db] = Virt_db::Map(targets, route);
563 }
564
565
566 void yf::Virt_db::add_map_db2target(std::string db, 
567                                     std::string target,
568                                     std::string route)
569 {
570     std::list<std::string> targets;
571     targets.push_back(target);
572
573     m_p->m_maps[db] = Virt_db::Map(targets, route);
574 }
575
576 void yf::Virt_db::process(Package &package) const
577 {
578     FrontendPtr f = m_p->get_frontend(package);
579
580     Z_GDU *gdu = package.request().get();
581     
582     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
583         Z_APDU_initRequest && !f->m_is_virtual)
584     {
585         Z_InitRequest *req = gdu->u.z3950->u.initRequest;
586         
587         const char *vhost =
588             yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0);
589         if (!vhost)
590         {
591             yp2::odr odr;
592             Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
593             Z_InitResponse *resp = apdu->u.initResponse;
594             
595             int i;
596             static const int masks[] = {
597                 Z_Options_search,
598                 Z_Options_present,
599                 Z_Options_namedResultSets,
600                 Z_Options_scan,
601                 -1 
602             };
603             for (i = 0; masks[i] != -1; i++)
604                 if (ODR_MASK_GET(req->options, masks[i]))
605                     ODR_MASK_SET(resp->options, masks[i]);
606             
607             static const int versions[] = {
608                 Z_ProtocolVersion_1,
609                 Z_ProtocolVersion_2,
610                 Z_ProtocolVersion_3,
611                 -1
612             };
613             for (i = 0; versions[i] != -1; i++)
614                 if (ODR_MASK_GET(req->protocolVersion, versions[i]))
615                     ODR_MASK_SET(resp->protocolVersion, versions[i]);
616                 else
617                     break;
618             
619             package.response() = apdu;
620             f->m_is_virtual = true;
621         }
622         else
623             package.move();
624     }
625     else if (!f->m_is_virtual)
626         package.move();
627     else if (gdu && gdu->which == Z_GDU_Z3950)
628     {
629         Z_APDU *apdu = gdu->u.z3950;
630         if (apdu->which == Z_APDU_initRequest)
631         {
632             yp2::odr odr;
633             
634             package.response() = odr.create_close(
635                 apdu,
636                 Z_Close_protocolError,
637                 "double init");
638             
639             package.session().close();
640         }
641         else if (apdu->which == Z_APDU_searchRequest)
642         {
643             f->search(package, apdu);
644         }
645         else if (apdu->which == Z_APDU_presentRequest)
646         {
647             f->present(package, apdu);
648         }
649         else if (apdu->which == Z_APDU_scanRequest)
650         {
651             f->scan(package, apdu);
652         }
653         else
654         {
655             yp2::odr odr;
656             
657             package.response() = odr.create_close(
658                 apdu, Z_Close_protocolError,
659                 "unsupported APDU in filter_virt_db");
660             
661             package.session().close();
662         }
663     }
664     m_p->release_frontend(package);
665 }
666
667
668 void yp2::filter::Virt_db::configure(const xmlNode * ptr)
669 {
670     for (ptr = ptr->children; ptr; ptr = ptr->next)
671     {
672         if (ptr->type != XML_ELEMENT_NODE)
673             continue;
674         if (!strcmp((const char *) ptr->name, "virtual"))
675         {
676             std::string database;
677             std::list<std::string> targets;
678             xmlNode *v_node = ptr->children;
679             for (; v_node; v_node = v_node->next)
680             {
681                 if (v_node->type != XML_ELEMENT_NODE)
682                     continue;
683                 
684                 if (yp2::xml::is_element_yp2(v_node, "database"))
685                     database = yp2::xml::get_text(v_node);
686                 else if (yp2::xml::is_element_yp2(v_node, "target"))
687                     targets.push_back(yp2::xml::get_text(v_node));
688                 else
689                     throw yp2::filter::FilterException
690                         ("Bad element " 
691                          + std::string((const char *) v_node->name)
692                          + " in virtual section"
693                             );
694             }
695             std::string route = yp2::xml::get_route(ptr);
696             add_map_db2targets(database, targets, route);
697 #if 0
698             std::cout << "Add " << database << "->" << target
699                       << "," << route << "\n";
700 #endif
701         }
702         else
703         {
704             throw yp2::filter::FilterException
705                 ("Bad element " 
706                  + std::string((const char *) ptr->name)
707                  + " in virt_db filter");
708         }
709     }
710 }
711
712 static yp2::filter::Base* filter_creator()
713 {
714     return new yp2::filter::Virt_db;
715 }
716
717 extern "C" {
718     struct yp2_filter_struct yp2_filter_virt_db = {
719         0,
720         "virt_db",
721         filter_creator
722     };
723 }
724
725
726 /*
727  * Local variables:
728  * c-basic-offset: 4
729  * indent-tabs-mode: nil
730  * c-file-style: "stroustrup"
731  * End:
732  * vim: shiftwidth=4 tabstop=8 expandtab
733  */