Rename from yp2 to metaproxy. The namespace for all definitions
[metaproxy-moved-to-github.git] / src / filter_virt_db.cpp
1 /* $Id: filter_virt_db.cpp,v 1.36 2006-03-16 10:40:59 adam Exp $
2    Copyright (c) 2005-2006, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
14 #include <boost/shared_ptr.hpp>
15
16 #include "util.hpp"
17 #include "filter_virt_db.hpp"
18
19 #include <yaz/zgdu.h>
20 #include <yaz/otherinfo.h>
21 #include <yaz/diagbib1.h>
22
23 #include <map>
24 #include <iostream>
25
26 namespace mp = metaproxy_1;
27 namespace yf = mp::filter;
28
29 namespace metaproxy_1 {
30     namespace filter {
31
32         struct Virt_db::Set {
33             Set(BackendPtr b, std::string setname);
34             Set();
35             ~Set();
36
37             BackendPtr m_backend;
38             std::string m_setname;
39         };
40         struct Virt_db::Map {
41             Map(std::list<std::string> targets, std::string route);
42             Map();
43             std::list<std::string> m_targets;
44             std::string m_route;
45         };
46         struct Virt_db::Backend {
47             mp::Session m_backend_session;
48             std::list<std::string> m_frontend_databases;
49             std::list<std::string> m_targets;
50             std::string m_route;
51             bool m_named_result_sets;
52             int m_number_of_sets;
53         };
54         struct Virt_db::Frontend {
55             Frontend(Rep *rep);
56             ~Frontend();
57             mp::Session m_session;
58             bool m_is_virtual;
59             bool m_in_use;
60             yazpp_1::GDU m_init_gdu;
61             std::list<BackendPtr> m_backend_list;
62             std::map<std::string,Virt_db::Set> m_sets;
63
64             void search(Package &package, Z_APDU *apdu);
65             void present(Package &package, Z_APDU *apdu);
66             void scan(Package &package, Z_APDU *apdu);
67
68             void close(Package &package);
69             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
70
71             BackendPtr lookup_backend_from_databases(
72                 std::list<std::string> databases);
73             BackendPtr create_backend_from_databases(
74                 std::list<std::string> databases,
75                 int &error_code,
76                 std::string &failing_database);
77             
78             BackendPtr init_backend(std::list<std::string> database,
79                                     Package &package,
80                                     int &error_code, std::string &addinfo);
81             Rep *m_p;
82         };            
83         class Virt_db::Rep {
84             friend class Virt_db;
85             friend struct Frontend;
86             
87             FrontendPtr get_frontend(Package &package);
88             void release_frontend(Package &package);
89         private:
90             boost::mutex m_sessions_mutex;
91             std::map<std::string, Virt_db::Map>m_maps;
92
93             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
94
95             boost::mutex m_mutex;
96             boost::condition m_cond_session_ready;
97             std::map<mp::Session, FrontendPtr> m_clients;
98         };
99     }
100 }
101
102 using namespace mp;
103
104 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::lookup_backend_from_databases(
105     std::list<std::string> databases)
106 {
107     std::list<BackendPtr>::const_iterator map_it;
108     map_it = m_backend_list.begin();
109     for (; map_it != m_backend_list.end(); map_it++)
110         if ((*map_it)->m_frontend_databases == databases)
111             return *map_it;
112     BackendPtr null;
113     return null;
114 }
115
116 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::create_backend_from_databases(
117     std::list<std::string> databases, int &error_code, std::string &addinfo)
118 {
119     BackendPtr b(new Backend);
120     std::list<std::string>::const_iterator db_it = databases.begin();
121
122     b->m_number_of_sets = 0;
123     b->m_frontend_databases = databases;
124     b->m_named_result_sets = false;
125
126     bool first_route = true;
127
128     std::map<std::string,bool> targets_dedup;
129     for (; db_it != databases.end(); db_it++)
130     {
131         std::map<std::string, Virt_db::Map>::iterator map_it;
132         map_it = m_p->m_maps.find(*db_it);
133         if (map_it == m_p->m_maps.end())  // database not found
134         {
135             error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
136             addinfo = *db_it;
137             BackendPtr ptr;
138             return ptr;
139         }
140         std::list<std::string>::const_iterator t_it =
141             map_it->second.m_targets.begin();
142         for (; t_it != map_it->second.m_targets.end(); t_it++)
143             targets_dedup[*t_it] = true;
144
145         // see if we have a route conflict.
146         if (!first_route && b->m_route != map_it->second.m_route)
147         {
148             // we have a conflict.. 
149             error_code =  YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
150             BackendPtr ptr;
151             return ptr;
152         }
153         b->m_route = map_it->second.m_route;
154         first_route = false;
155     }
156     std::map<std::string,bool>::const_iterator tm_it = targets_dedup.begin();
157     for (; tm_it != targets_dedup.end(); tm_it++)
158         b->m_targets.push_back(tm_it->first);
159
160     return b;
161 }
162
163 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::init_backend(
164     std::list<std::string> databases, Package &package,
165     int &error_code, std::string &addinfo)
166 {
167     BackendPtr b = create_backend_from_databases(databases, error_code,
168                                                  addinfo);
169     if (!b)
170         return b;
171     Package init_package(b->m_backend_session, package.origin());
172     init_package.copy_filter(package);
173
174     mp::odr odr;
175
176     Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
177
178     mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo, odr,
179                                    b->m_targets);
180     Z_InitRequest *req = init_apdu->u.initRequest;
181
182     // copy stuff from Frontend Init Request
183     Z_GDU *org_gdu = m_init_gdu.get();
184     Z_InitRequest *org_init = org_gdu->u.z3950->u.initRequest;
185
186     req->idAuthentication = org_init->idAuthentication;
187     req->implementationId = org_init->implementationId;
188     req->implementationName = org_init->implementationName;
189     req->implementationVersion = org_init->implementationVersion;
190
191     ODR_MASK_SET(req->options, Z_Options_search);
192     ODR_MASK_SET(req->options, Z_Options_present);
193     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
194     ODR_MASK_SET(req->options, Z_Options_scan);
195
196     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
197     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
198     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
199
200     init_package.request() = init_apdu;
201     
202     init_package.move(b->m_route);  // sending init 
203
204     Z_GDU *gdu = init_package.response().get();
205     // we hope to get an init response
206     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
207         Z_APDU_initResponse)
208     {
209         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
210         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
211         {
212             b->m_named_result_sets = true;
213         }
214         if (!*res->result)
215         {
216             mp::util::get_init_diagnostics(res, error_code, addinfo);
217             BackendPtr null;
218             return null; 
219         }
220     }
221     else
222     {
223         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
224         // addinfo = database;
225         BackendPtr null;
226         return null;
227     }        
228     if (init_package.session().is_closed())
229     {
230         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
231         // addinfo = database;
232         BackendPtr null;
233         return null;
234     }
235
236     m_backend_list.push_back(b);
237     return b;
238 }
239
240 void yf::Virt_db::Frontend::search(Package &package, Z_APDU *apdu_req)
241 {
242     Z_SearchRequest *req = apdu_req->u.searchRequest;
243     std::string vhost;
244     std::string resultSetId = req->resultSetName;
245     mp::odr odr;
246
247     std::list<std::string> databases;
248     int i;
249     for (i = 0; i<req->num_databaseNames; i++)
250         databases.push_back(req->databaseNames[i]);
251
252     BackendPtr b; // null for now
253     Sets_it sets_it = m_sets.find(req->resultSetName);
254     if (sets_it != m_sets.end())
255     {
256         // result set already exist 
257         // if replace indicator is off: we return diagnostic if
258         // result set already exist.
259         if (*req->replaceIndicator == 0)
260         {
261             Z_APDU *apdu = 
262                 odr.create_searchResponse(
263                     apdu_req,
264                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
265                     0);
266             package.response() = apdu;
267             
268             return;
269         } 
270         sets_it->second.m_backend->m_number_of_sets--;
271
272         // pick up any existing backend with a database match
273         std::list<BackendPtr>::const_iterator map_it;
274         map_it = m_backend_list.begin();
275         for (; map_it != m_backend_list.end(); map_it++)
276         {
277             BackendPtr tmp = *map_it;
278             if (tmp->m_frontend_databases == databases)
279                 break;
280         }
281         if (map_it != m_backend_list.end()) 
282             b = *map_it;
283     }
284     else
285     {
286         // new result set.
287
288         // pick up any existing database with named result sets ..
289         // or one which has no result sets.. yet.
290         std::list<BackendPtr>::const_iterator map_it;
291         map_it = m_backend_list.begin();
292         for (; map_it != m_backend_list.end(); map_it++)
293         {
294             BackendPtr tmp = *map_it;
295             if (tmp->m_frontend_databases == databases &&
296                 (tmp->m_named_result_sets ||
297                  tmp->m_number_of_sets == 0))
298                 break;
299         }
300         if (map_it != m_backend_list.end()) 
301             b = *map_it;
302     }
303     if (!b)  // no backend yet. Must create a new one
304     {
305         int error_code;
306         std::string addinfo;
307         b = init_backend(databases, package, error_code, addinfo);
308         if (!b)
309         {
310             // did not get a backend (unavailable somehow?)
311             
312             Z_APDU *apdu = 
313                 odr.create_searchResponse(
314                     apdu_req, error_code, addinfo.c_str());
315             package.response() = apdu;
316             return;
317         }
318     }
319     m_sets.erase(req->resultSetName);
320     // sending search to backend
321     Package search_package(b->m_backend_session, package.origin());
322
323     search_package.copy_filter(package);
324
325     std::string backend_setname;
326     if (b->m_named_result_sets)
327     {
328         backend_setname = std::string(req->resultSetName);
329     }
330     else
331     {
332         backend_setname = "default";
333         req->resultSetName = odr_strdup(odr, backend_setname.c_str());
334     }
335
336     // pick first targets spec and move the databases from it ..
337     std::list<std::string>::const_iterator t_it = b->m_targets.begin();
338     if (t_it != b->m_targets.end())
339     {
340         mp::util::set_databases_from_zurl(odr, *t_it,
341                                                 &req->num_databaseNames,
342                                                 &req->databaseNames);
343     }
344
345     *req->replaceIndicator = 1;
346
347     search_package.request() = yazpp_1::GDU(apdu_req);
348     
349     search_package.move(b->m_route);
350
351     if (search_package.session().is_closed())
352     {
353         package.response() = search_package.response();
354         package.session().close();
355         return;
356     }
357     package.response() = search_package.response();
358
359     b->m_number_of_sets++;
360
361     m_sets[resultSetId] = Virt_db::Set(b, backend_setname);
362 }
363
364 yf::Virt_db::Frontend::Frontend(Rep *rep)
365 {
366     m_p = rep;
367     m_is_virtual = false;
368 }
369
370 void yf::Virt_db::Frontend::close(Package &package)
371 {
372     std::list<BackendPtr>::const_iterator b_it;
373     
374     for (b_it = m_backend_list.begin(); b_it != m_backend_list.end(); b_it++)
375     {
376         (*b_it)->m_backend_session.close();
377         Package close_package((*b_it)->m_backend_session, package.origin());
378         close_package.copy_filter(package);
379         close_package.move((*b_it)->m_route);
380     }
381     m_backend_list.clear();
382 }
383
384 yf::Virt_db::Frontend::~Frontend()
385 {
386 }
387
388 yf::Virt_db::FrontendPtr yf::Virt_db::Rep::get_frontend(Package &package)
389 {
390     boost::mutex::scoped_lock lock(m_mutex);
391
392     std::map<mp::Session,yf::Virt_db::FrontendPtr>::iterator it;
393     
394     while(true)
395     {
396         it = m_clients.find(package.session());
397         if (it == m_clients.end())
398             break;
399         
400         if (!it->second->m_in_use)
401         {
402             it->second->m_in_use = true;
403             return it->second;
404         }
405         m_cond_session_ready.wait(lock);
406     }
407     FrontendPtr f(new Frontend(this));
408     m_clients[package.session()] = f;
409     f->m_in_use = true;
410     return f;
411 }
412
413 void yf::Virt_db::Rep::release_frontend(Package &package)
414 {
415     boost::mutex::scoped_lock lock(m_mutex);
416     std::map<mp::Session,yf::Virt_db::FrontendPtr>::iterator it;
417     
418     it = m_clients.find(package.session());
419     if (it != m_clients.end())
420     {
421         if (package.session().is_closed())
422         {
423             it->second->close(package);
424             m_clients.erase(it);
425         }
426         else
427         {
428             it->second->m_in_use = false;
429         }
430         m_cond_session_ready.notify_all();
431     }
432 }
433
434 yf::Virt_db::Set::Set(BackendPtr b, std::string setname)
435     :  m_backend(b), m_setname(setname)
436 {
437 }
438
439
440 yf::Virt_db::Set::Set()
441 {
442 }
443
444
445 yf::Virt_db::Set::~Set()
446 {
447 }
448
449 yf::Virt_db::Map::Map(std::list<std::string> targets, std::string route)
450     : m_targets(targets), m_route(route) 
451 {
452 }
453
454 yf::Virt_db::Map::Map()
455 {
456 }
457
458 yf::Virt_db::Virt_db() : m_p(new Virt_db::Rep)
459 {
460 }
461
462 yf::Virt_db::~Virt_db() {
463 }
464
465 void yf::Virt_db::Frontend::present(Package &package, Z_APDU *apdu_req)
466 {
467     Z_PresentRequest *req = apdu_req->u.presentRequest;
468     std::string resultSetId = req->resultSetId;
469     mp::odr odr;
470
471     Sets_it sets_it = m_sets.find(resultSetId);
472     if (sets_it == m_sets.end())
473     {
474         Z_APDU *apdu = 
475             odr.create_presentResponse(
476                 apdu_req,
477                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
478                 resultSetId.c_str());
479         package.response() = apdu;
480         return;
481     }
482     Session *id =
483         new mp::Session(sets_it->second.m_backend->m_backend_session);
484     
485     // sending present to backend
486     Package present_package(*id, package.origin());
487     present_package.copy_filter(package);
488
489     req->resultSetId = odr_strdup(odr, sets_it->second.m_setname.c_str());
490     
491     present_package.request() = yazpp_1::GDU(apdu_req);
492
493     present_package.move(sets_it->second.m_backend->m_route);
494
495     if (present_package.session().is_closed())
496     {
497         package.response() = present_package.response();
498         package.session().close();
499         return;
500     }
501     else
502     {
503         package.response() = present_package.response();
504     }
505     delete id;
506 }
507
508 void yf::Virt_db::Frontend::scan(Package &package, Z_APDU *apdu_req)
509 {
510     Z_ScanRequest *req = apdu_req->u.scanRequest;
511     std::string vhost;
512     mp::odr odr;
513
514     std::list<std::string> databases;
515     int i;
516     for (i = 0; i<req->num_databaseNames; i++)
517         databases.push_back(req->databaseNames[i]);
518
519     BackendPtr b;
520     // pick up any existing backend with a database match
521     std::list<BackendPtr>::const_iterator map_it;
522     map_it = m_backend_list.begin();
523     for (; map_it != m_backend_list.end(); map_it++)
524     {
525         BackendPtr tmp = *map_it;
526         if (tmp->m_frontend_databases == databases)
527             break;
528     }
529     if (map_it != m_backend_list.end()) 
530         b = *map_it;
531     if (!b)  // no backend yet. Must create a new one
532     {
533         int error_code;
534         std::string addinfo;
535         b = init_backend(databases, package, error_code, addinfo);
536         if (!b)
537         {
538             // did not get a backend (unavailable somehow?)
539             Z_APDU *apdu =
540                 odr.create_scanResponse(
541                     apdu_req, error_code, addinfo.c_str());
542             package.response() = apdu;
543             
544             return;
545         }
546     }
547     // sending scan to backend
548     Package scan_package(b->m_backend_session, package.origin());
549
550     scan_package.copy_filter(package);
551
552     // pick first targets spec and move the databases from it ..
553     std::list<std::string>::const_iterator t_it = b->m_targets.begin();
554     if (t_it != b->m_targets.end())
555     {
556         mp::util::set_databases_from_zurl(odr, *t_it,
557                                                 &req->num_databaseNames,
558                                                 &req->databaseNames);
559     }
560     scan_package.request() = yazpp_1::GDU(apdu_req);
561     
562     scan_package.move(b->m_route);
563
564     if (scan_package.session().is_closed())
565     {
566         package.response() = scan_package.response();
567         package.session().close();
568         return;
569     }
570     package.response() = scan_package.response();
571 }
572
573
574 void yf::Virt_db::add_map_db2targets(std::string db, 
575                                      std::list<std::string> targets,
576                                      std::string route)
577 {
578     m_p->m_maps[db] = Virt_db::Map(targets, route);
579 }
580
581
582 void yf::Virt_db::add_map_db2target(std::string db, 
583                                     std::string target,
584                                     std::string route)
585 {
586     std::list<std::string> targets;
587     targets.push_back(target);
588
589     m_p->m_maps[db] = Virt_db::Map(targets, route);
590 }
591
592 void yf::Virt_db::process(Package &package) const
593 {
594     FrontendPtr f = m_p->get_frontend(package);
595
596     Z_GDU *gdu = package.request().get();
597     
598     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
599         Z_APDU_initRequest && !f->m_is_virtual)
600     {
601         Z_InitRequest *req = gdu->u.z3950->u.initRequest;
602         
603         std::list<std::string> vhosts;
604         mp::util::get_vhost_otherinfo(&req->otherInfo, false, vhosts);
605         if (vhosts.size() == 0)
606         {
607             f->m_init_gdu = gdu;
608             
609             mp::odr odr;
610             Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
611             Z_InitResponse *resp = apdu->u.initResponse;
612             
613             int i;
614             static const int masks[] = {
615                 Z_Options_search,
616                 Z_Options_present,
617                 Z_Options_namedResultSets,
618                 Z_Options_scan,
619                 -1 
620             };
621             for (i = 0; masks[i] != -1; i++)
622                 if (ODR_MASK_GET(req->options, masks[i]))
623                     ODR_MASK_SET(resp->options, masks[i]);
624             
625             static const int versions[] = {
626                 Z_ProtocolVersion_1,
627                 Z_ProtocolVersion_2,
628                 Z_ProtocolVersion_3,
629                 -1
630             };
631             for (i = 0; versions[i] != -1; i++)
632                 if (ODR_MASK_GET(req->protocolVersion, versions[i]))
633                     ODR_MASK_SET(resp->protocolVersion, versions[i]);
634                 else
635                     break;
636             
637             package.response() = apdu;
638             f->m_is_virtual = true;
639         }
640         else
641             package.move();
642     }
643     else if (!f->m_is_virtual)
644         package.move();
645     else if (gdu && gdu->which == Z_GDU_Z3950)
646     {
647         Z_APDU *apdu = gdu->u.z3950;
648         if (apdu->which == Z_APDU_initRequest)
649         {
650             mp::odr odr;
651             
652             package.response() = odr.create_close(
653                 apdu,
654                 Z_Close_protocolError,
655                 "double init");
656             
657             package.session().close();
658         }
659         else if (apdu->which == Z_APDU_searchRequest)
660         {
661             f->search(package, apdu);
662         }
663         else if (apdu->which == Z_APDU_presentRequest)
664         {
665             f->present(package, apdu);
666         }
667         else if (apdu->which == Z_APDU_scanRequest)
668         {
669             f->scan(package, apdu);
670         }
671         else
672         {
673             mp::odr odr;
674             
675             package.response() = odr.create_close(
676                 apdu, Z_Close_protocolError,
677                 "unsupported APDU in filter_virt_db");
678             
679             package.session().close();
680         }
681     }
682     m_p->release_frontend(package);
683 }
684
685
686 void mp::filter::Virt_db::configure(const xmlNode * ptr)
687 {
688     for (ptr = ptr->children; ptr; ptr = ptr->next)
689     {
690         if (ptr->type != XML_ELEMENT_NODE)
691             continue;
692         if (!strcmp((const char *) ptr->name, "virtual"))
693         {
694             std::string database;
695             std::list<std::string> targets;
696             xmlNode *v_node = ptr->children;
697             for (; v_node; v_node = v_node->next)
698             {
699                 if (v_node->type != XML_ELEMENT_NODE)
700                     continue;
701                 
702                 if (mp::xml::is_element_yp2(v_node, "database"))
703                     database = mp::xml::get_text(v_node);
704                 else if (mp::xml::is_element_yp2(v_node, "target"))
705                     targets.push_back(mp::xml::get_text(v_node));
706                 else
707                     throw mp::filter::FilterException
708                         ("Bad element " 
709                          + std::string((const char *) v_node->name)
710                          + " in virtual section"
711                             );
712             }
713             std::string route = mp::xml::get_route(ptr);
714             add_map_db2targets(database, targets, route);
715         }
716         else
717         {
718             throw mp::filter::FilterException
719                 ("Bad element " 
720                  + std::string((const char *) ptr->name)
721                  + " in virt_db filter");
722         }
723     }
724 }
725
726 static mp::filter::Base* filter_creator()
727 {
728     return new mp::filter::Virt_db;
729 }
730
731 extern "C" {
732     struct metaproxy_1_filter_struct metaproxy_1_filter_virt_db = {
733         0,
734         "virt_db",
735         filter_creator
736     };
737 }
738
739
740 /*
741  * Local variables:
742  * c-basic-offset: 4
743  * indent-tabs-mode: nil
744  * c-file-style: "stroustrup"
745  * End:
746  * vim: shiftwidth=4 tabstop=8 expandtab
747  */