Filter virt_db did not pass route correctly
[metaproxy-moved-to-github.git] / src / filter_virt_db.cpp
1 /* $Id: filter_virt_db.cpp,v 1.30 2006-01-17 13:54:54 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
14 #include <boost/shared_ptr.hpp>
15
16 #include "util.hpp"
17 #include "filter_virt_db.hpp"
18
19 #include <yaz/zgdu.h>
20 #include <yaz/otherinfo.h>
21 #include <yaz/diagbib1.h>
22
23 #include <map>
24 #include <iostream>
25
26 namespace yf = yp2::filter;
27
28 namespace yp2 {
29     namespace filter {
30
31         struct Virt_db::Set {
32             Set(BackendPtr b, std::string setname);
33             Set();
34             ~Set();
35
36             BackendPtr m_backend;
37             std::string m_setname;
38         };
39         struct Virt_db::Map {
40             Map(std::list<std::string> targets, std::string route);
41             Map();
42             std::list<std::string> m_targets;
43             std::string m_route;
44         };
45         struct Virt_db::Backend {
46             yp2::Session m_backend_session;
47             std::list<std::string> m_frontend_databases;
48             std::list<std::string> m_targets;
49             std::string m_route;
50             bool m_named_result_sets;
51             int m_number_of_sets;
52         };
53         struct Virt_db::Frontend {
54             Frontend(Rep *rep);
55             ~Frontend();
56             yp2::Session m_session;
57             bool m_is_virtual;
58             bool m_in_use;
59             std::list<BackendPtr> m_backend_list;
60             std::map<std::string,Virt_db::Set> m_sets;
61
62             void search(Package &package, Z_APDU *apdu);
63             void present(Package &package, Z_APDU *apdu);
64             void scan(Package &package, Z_APDU *apdu);
65
66             void close(Package &package);
67             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
68
69             BackendPtr lookup_backend_from_databases(
70                 std::list<std::string> databases);
71             BackendPtr create_backend_from_databases(
72                 std::list<std::string> databases,
73                 int &error_code,
74                 std::string &failing_database);
75             
76             BackendPtr init_backend(std::list<std::string> database,
77                                     Package &package,
78                                     int &error_code, std::string &addinfo);
79             Rep *m_p;
80         };            
81         class Virt_db::Rep {
82             friend class Virt_db;
83             friend class Frontend;
84             
85             FrontendPtr get_frontend(Package &package);
86             void release_frontend(Package &package);
87         private:
88             boost::mutex m_sessions_mutex;
89             std::map<std::string, Virt_db::Map>m_maps;
90
91             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
92
93             boost::mutex m_mutex;
94             boost::condition m_cond_session_ready;
95             std::map<yp2::Session, FrontendPtr> m_clients;
96         };
97     }
98 }
99
100 using namespace yp2;
101
102 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::lookup_backend_from_databases(
103     std::list<std::string> databases)
104 {
105     std::list<BackendPtr>::const_iterator map_it;
106     map_it = m_backend_list.begin();
107     for (; map_it != m_backend_list.end(); map_it++)
108         if ((*map_it)->m_frontend_databases == databases)
109             return *map_it;
110     BackendPtr null;
111     return null;
112 }
113
114 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::create_backend_from_databases(
115     std::list<std::string> databases, int &error_code, std::string &addinfo)
116 {
117     BackendPtr b(new Backend);
118     std::list<std::string>::const_iterator db_it = databases.begin();
119
120     b->m_number_of_sets = 0;
121     b->m_frontend_databases = databases;
122     b->m_named_result_sets = false;
123
124     bool first_route = true;
125
126     std::map<std::string,bool> targets_dedup;
127     for (; db_it != databases.end(); db_it++)
128     {
129         std::map<std::string, Virt_db::Map>::iterator map_it;
130         map_it = m_p->m_maps.find(*db_it);
131         if (map_it == m_p->m_maps.end())  // database not found
132         {
133             error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
134             addinfo = *db_it;
135             BackendPtr ptr;
136             return ptr;
137         }
138         std::list<std::string>::const_iterator t_it =
139             map_it->second.m_targets.begin();
140         for (; t_it != map_it->second.m_targets.end(); t_it++)
141             targets_dedup[*t_it] = true;
142
143         // see if we have a route conflict.
144         if (!first_route && b->m_route != map_it->second.m_route)
145         {
146             // we have a conflict.. 
147             error_code =  YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP;
148             BackendPtr ptr;
149             return ptr;
150         }
151         b->m_route = map_it->second.m_route;
152         first_route = false;
153     }
154     std::map<std::string,bool>::const_iterator tm_it = targets_dedup.begin();
155     for (; tm_it != targets_dedup.end(); tm_it++)
156         b->m_targets.push_back(tm_it->first);
157
158     return b;
159 }
160
161 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::init_backend(
162     std::list<std::string> databases, Package &package,
163     int &error_code, std::string &addinfo)
164 {
165     BackendPtr b = create_backend_from_databases(databases, error_code,
166                                                  addinfo);
167     if (!b)
168         return b;
169     Package init_package(b->m_backend_session, package.origin());
170     init_package.copy_filter(package);
171
172     yp2::odr odr;
173
174     Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
175
176     std::list<std::string>::const_iterator t_it = b->m_targets.begin();
177     int cat = 1;
178     for (; t_it != b->m_targets.end(); t_it++, cat++)
179     {
180         yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
181                                  VAL_PROXY, cat, t_it->c_str());
182     }
183         
184     Z_InitRequest *req = init_apdu->u.initRequest;
185
186     ODR_MASK_SET(req->options, Z_Options_search);
187     ODR_MASK_SET(req->options, Z_Options_present);
188     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
189     ODR_MASK_SET(req->options, Z_Options_scan);
190
191     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
192     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
193     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
194
195     init_package.request() = init_apdu;
196     
197     init_package.move(b->m_route);  // sending init 
198     
199     if (init_package.session().is_closed())
200     {
201         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
202         // addinfo = database;
203         BackendPtr null;
204         return null;
205     }
206     Z_GDU *gdu = init_package.response().get();
207     // we hope to get an init response
208     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
209         Z_APDU_initResponse)
210     {
211         if (ODR_MASK_GET(gdu->u.z3950->u.initResponse->options,
212                          Z_Options_namedResultSets))
213         {
214             b->m_named_result_sets = true;
215         }
216     }
217     else
218     {
219         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
220         // addinfo = database;
221         BackendPtr null;
222         return null;
223     }        
224     m_backend_list.push_back(b);
225     return b;
226 }
227
228 void yf::Virt_db::Frontend::search(Package &package, Z_APDU *apdu_req)
229 {
230     Z_SearchRequest *req = apdu_req->u.searchRequest;
231     std::string vhost;
232     std::string resultSetId = req->resultSetName;
233     yp2::odr odr;
234
235     std::list<std::string> databases;
236     int i;
237     for (i = 0; i<req->num_databaseNames; i++)
238         databases.push_back(req->databaseNames[i]);
239
240     BackendPtr b; // null for now
241     Sets_it sets_it = m_sets.find(req->resultSetName);
242     if (sets_it != m_sets.end())
243     {
244         // result set already exist 
245         // if replace indicator is off: we return diagnostic if
246         // result set already exist.
247         if (*req->replaceIndicator == 0)
248         {
249             Z_APDU *apdu = 
250                 odr.create_searchResponse(
251                     apdu_req,
252                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
253                     0);
254             package.response() = apdu;
255             
256             return;
257         } 
258         sets_it->second.m_backend->m_number_of_sets--;
259
260         // pick up any existing backend with a database match
261         std::list<BackendPtr>::const_iterator map_it;
262         map_it = m_backend_list.begin();
263         for (; map_it != m_backend_list.end(); map_it++)
264         {
265             BackendPtr tmp = *map_it;
266             if (tmp->m_frontend_databases == databases)
267                 break;
268         }
269         if (map_it != m_backend_list.end()) 
270             b = *map_it;
271     }
272     else
273     {
274         // new result set.
275
276         // pick up any existing database with named result sets ..
277         // or one which has no result sets.. yet.
278         std::list<BackendPtr>::const_iterator map_it;
279         map_it = m_backend_list.begin();
280         for (; map_it != m_backend_list.end(); map_it++)
281         {
282             BackendPtr tmp = *map_it;
283             if (tmp->m_frontend_databases == databases &&
284                 (tmp->m_named_result_sets ||
285                  tmp->m_number_of_sets == 0))
286                 break;
287         }
288         if (map_it != m_backend_list.end()) 
289             b = *map_it;
290     }
291     if (!b)  // no backend yet. Must create a new one
292     {
293         int error_code;
294         std::string addinfo;
295         b = init_backend(databases, package, error_code, addinfo);
296         if (!b)
297         {
298             // did not get a backend (unavailable somehow?)
299             
300             Z_APDU *apdu = 
301                 odr.create_searchResponse(
302                     apdu_req, error_code, addinfo.c_str());
303             package.response() = apdu;
304             return;
305         }
306     }
307     m_sets.erase(req->resultSetName);
308     // sending search to backend
309     Package search_package(b->m_backend_session, package.origin());
310
311     search_package.copy_filter(package);
312
313     std::string backend_setname;
314     if (b->m_named_result_sets)
315     {
316         backend_setname = std::string(req->resultSetName);
317     }
318     else
319     {
320         backend_setname = "default";
321         req->resultSetName = odr_strdup(odr, backend_setname.c_str());
322     }
323
324     // pick first targets spec and move the databases from it ..
325     std::list<std::string>::const_iterator t_it = b->m_targets.begin();
326     if (t_it != b->m_targets.end())
327     {
328         if (!yp2::util::set_databases_from_zurl(odr, *t_it,
329                                                 &req->num_databaseNames,
330                                                 &req->databaseNames));
331     }
332
333     *req->replaceIndicator = 1;
334
335     search_package.request() = yazpp_1::GDU(apdu_req);
336     
337     search_package.move(b->m_route);
338
339     if (search_package.session().is_closed())
340     {
341         package.response() = search_package.response();
342         package.session().close();
343         return;
344     }
345     package.response() = search_package.response();
346
347     b->m_number_of_sets++;
348
349     m_sets[resultSetId] = Virt_db::Set(b, backend_setname);
350 }
351
352 yf::Virt_db::Frontend::Frontend(Rep *rep)
353 {
354     m_p = rep;
355     m_is_virtual = false;
356 }
357
358 void yf::Virt_db::Frontend::close(Package &package)
359 {
360     std::list<BackendPtr>::const_iterator b_it;
361     
362     for (b_it = m_backend_list.begin(); b_it != m_backend_list.end(); b_it++)
363     {
364         (*b_it)->m_backend_session.close();
365         Package close_package((*b_it)->m_backend_session, package.origin());
366         close_package.copy_filter(package);
367         close_package.move((*b_it)->m_route);
368     }
369     m_backend_list.clear();
370 }
371
372 yf::Virt_db::Frontend::~Frontend()
373 {
374 }
375
376 yf::Virt_db::FrontendPtr yf::Virt_db::Rep::get_frontend(Package &package)
377 {
378     boost::mutex::scoped_lock lock(m_mutex);
379
380     std::map<yp2::Session,yf::Virt_db::FrontendPtr>::iterator it;
381     
382     while(true)
383     {
384         it = m_clients.find(package.session());
385         if (it == m_clients.end())
386             break;
387         
388         if (!it->second->m_in_use)
389         {
390             it->second->m_in_use = true;
391             return it->second;
392         }
393         m_cond_session_ready.wait(lock);
394     }
395     FrontendPtr f(new Frontend(this));
396     m_clients[package.session()] = f;
397     f->m_in_use = true;
398     return f;
399 }
400
401 void yf::Virt_db::Rep::release_frontend(Package &package)
402 {
403     boost::mutex::scoped_lock lock(m_mutex);
404     std::map<yp2::Session,yf::Virt_db::FrontendPtr>::iterator it;
405     
406     it = m_clients.find(package.session());
407     if (it != m_clients.end())
408     {
409         if (package.session().is_closed())
410         {
411             it->second->close(package);
412             m_clients.erase(it);
413         }
414         else
415         {
416             it->second->m_in_use = false;
417         }
418         m_cond_session_ready.notify_all();
419     }
420 }
421
422 yf::Virt_db::Set::Set(BackendPtr b, std::string setname)
423     :  m_backend(b), m_setname(setname)
424 {
425 }
426
427
428 yf::Virt_db::Set::Set()
429 {
430 }
431
432
433 yf::Virt_db::Set::~Set()
434 {
435 }
436
437 yf::Virt_db::Map::Map(std::list<std::string> targets, std::string route)
438     : m_targets(targets), m_route(route) 
439 {
440 }
441
442 yf::Virt_db::Map::Map()
443 {
444 }
445
446 yf::Virt_db::Virt_db() : m_p(new Virt_db::Rep)
447 {
448 }
449
450 yf::Virt_db::~Virt_db() {
451 }
452
453 void yf::Virt_db::Frontend::present(Package &package, Z_APDU *apdu_req)
454 {
455     Z_PresentRequest *req = apdu_req->u.presentRequest;
456     std::string resultSetId = req->resultSetId;
457     yp2::odr odr;
458
459     Sets_it sets_it = m_sets.find(resultSetId);
460     if (sets_it == m_sets.end())
461     {
462         Z_APDU *apdu = 
463             odr.create_presentResponse(
464                 apdu_req,
465                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
466                 resultSetId.c_str());
467         package.response() = apdu;
468         return;
469     }
470     Session *id =
471         new yp2::Session(sets_it->second.m_backend->m_backend_session);
472     
473     // sending present to backend
474     Package present_package(*id, package.origin());
475     present_package.copy_filter(package);
476
477     req->resultSetId = odr_strdup(odr, sets_it->second.m_setname.c_str());
478     
479     present_package.request() = yazpp_1::GDU(apdu_req);
480
481     present_package.move(sets_it->second.m_backend->m_route);
482
483     if (present_package.session().is_closed())
484     {
485         package.response() = present_package.response();
486         package.session().close();
487         return;
488     }
489     else
490     {
491         package.response() = present_package.response();
492     }
493     delete id;
494 }
495
496 void yf::Virt_db::Frontend::scan(Package &package, Z_APDU *apdu_req)
497 {
498     Z_ScanRequest *req = apdu_req->u.scanRequest;
499     std::string vhost;
500     yp2::odr odr;
501
502     std::list<std::string> databases;
503     int i;
504     for (i = 0; i<req->num_databaseNames; i++)
505         databases.push_back(req->databaseNames[i]);
506
507     BackendPtr b;
508     // pick up any existing backend with a database match
509     std::list<BackendPtr>::const_iterator map_it;
510     map_it = m_backend_list.begin();
511     for (; map_it != m_backend_list.end(); map_it++)
512     {
513         BackendPtr tmp = *map_it;
514         if (tmp->m_frontend_databases == databases)
515             break;
516     }
517     if (map_it != m_backend_list.end()) 
518         b = *map_it;
519     if (!b)  // no backend yet. Must create a new one
520     {
521         int error_code;
522         std::string addinfo;
523         b = init_backend(databases, package, error_code, addinfo);
524         if (!b)
525         {
526             // did not get a backend (unavailable somehow?)
527             Z_APDU *apdu =
528                 odr.create_scanResponse(
529                     apdu_req, error_code, addinfo.c_str());
530             package.response() = apdu;
531             
532             return;
533         }
534     }
535     // sending scan to backend
536     Package scan_package(b->m_backend_session, package.origin());
537
538     scan_package.copy_filter(package);
539
540     // pick first targets spec and move the databases from it ..
541     std::list<std::string>::const_iterator t_it = b->m_targets.begin();
542     if (t_it != b->m_targets.end())
543     {
544         if (!yp2::util::set_databases_from_zurl(odr, *t_it,
545                                                 &req->num_databaseNames,
546                                                 &req->databaseNames));
547     }
548     scan_package.request() = yazpp_1::GDU(apdu_req);
549     
550     scan_package.move(b->m_route);
551
552     if (scan_package.session().is_closed())
553     {
554         package.response() = scan_package.response();
555         package.session().close();
556         return;
557     }
558     package.response() = scan_package.response();
559 }
560
561
562 void yf::Virt_db::add_map_db2targets(std::string db, 
563                                      std::list<std::string> targets,
564                                      std::string route)
565 {
566     m_p->m_maps[db] = Virt_db::Map(targets, route);
567 }
568
569
570 void yf::Virt_db::add_map_db2target(std::string db, 
571                                     std::string target,
572                                     std::string route)
573 {
574     std::list<std::string> targets;
575     targets.push_back(target);
576
577     m_p->m_maps[db] = Virt_db::Map(targets, route);
578 }
579
580 void yf::Virt_db::process(Package &package) const
581 {
582     FrontendPtr f = m_p->get_frontend(package);
583
584     Z_GDU *gdu = package.request().get();
585     
586     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
587         Z_APDU_initRequest && !f->m_is_virtual)
588     {
589         Z_InitRequest *req = gdu->u.z3950->u.initRequest;
590         
591         const char *vhost =
592             yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0);
593         if (!vhost)
594         {
595             yp2::odr odr;
596             Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
597             Z_InitResponse *resp = apdu->u.initResponse;
598             
599             int i;
600             static const int masks[] = {
601                 Z_Options_search,
602                 Z_Options_present,
603                 Z_Options_namedResultSets,
604                 Z_Options_scan,
605                 -1 
606             };
607             for (i = 0; masks[i] != -1; i++)
608                 if (ODR_MASK_GET(req->options, masks[i]))
609                     ODR_MASK_SET(resp->options, masks[i]);
610             
611             static const int versions[] = {
612                 Z_ProtocolVersion_1,
613                 Z_ProtocolVersion_2,
614                 Z_ProtocolVersion_3,
615                 -1
616             };
617             for (i = 0; versions[i] != -1; i++)
618                 if (ODR_MASK_GET(req->protocolVersion, versions[i]))
619                     ODR_MASK_SET(resp->protocolVersion, versions[i]);
620                 else
621                     break;
622             
623             package.response() = apdu;
624             f->m_is_virtual = true;
625         }
626         else
627             package.move();
628     }
629     else if (!f->m_is_virtual)
630         package.move();
631     else if (gdu && gdu->which == Z_GDU_Z3950)
632     {
633         Z_APDU *apdu = gdu->u.z3950;
634         if (apdu->which == Z_APDU_initRequest)
635         {
636             yp2::odr odr;
637             
638             package.response() = odr.create_close(
639                 apdu,
640                 Z_Close_protocolError,
641                 "double init");
642             
643             package.session().close();
644         }
645         else if (apdu->which == Z_APDU_searchRequest)
646         {
647             f->search(package, apdu);
648         }
649         else if (apdu->which == Z_APDU_presentRequest)
650         {
651             f->present(package, apdu);
652         }
653         else if (apdu->which == Z_APDU_scanRequest)
654         {
655             f->scan(package, apdu);
656         }
657         else
658         {
659             yp2::odr odr;
660             
661             package.response() = odr.create_close(
662                 apdu, Z_Close_protocolError,
663                 "unsupported APDU in filter_virt_db");
664             
665             package.session().close();
666         }
667     }
668     m_p->release_frontend(package);
669 }
670
671
672 void yp2::filter::Virt_db::configure(const xmlNode * ptr)
673 {
674     for (ptr = ptr->children; ptr; ptr = ptr->next)
675     {
676         if (ptr->type != XML_ELEMENT_NODE)
677             continue;
678         if (!strcmp((const char *) ptr->name, "virtual"))
679         {
680             std::string database;
681             std::list<std::string> targets;
682             xmlNode *v_node = ptr->children;
683             for (; v_node; v_node = v_node->next)
684             {
685                 if (v_node->type != XML_ELEMENT_NODE)
686                     continue;
687                 
688                 if (yp2::xml::is_element_yp2(v_node, "database"))
689                     database = yp2::xml::get_text(v_node);
690                 else if (yp2::xml::is_element_yp2(v_node, "target"))
691                     targets.push_back(yp2::xml::get_text(v_node));
692                 else
693                     throw yp2::filter::FilterException
694                         ("Bad element " 
695                          + std::string((const char *) v_node->name)
696                          + " in virtual section"
697                             );
698             }
699             std::string route = yp2::xml::get_route(ptr);
700             add_map_db2targets(database, targets, route);
701         }
702         else
703         {
704             throw yp2::filter::FilterException
705                 ("Bad element " 
706                  + std::string((const char *) ptr->name)
707                  + " in virt_db filter");
708         }
709     }
710 }
711
712 static yp2::filter::Base* filter_creator()
713 {
714     return new yp2::filter::Virt_db;
715 }
716
717 extern "C" {
718     struct yp2_filter_struct yp2_filter_virt_db = {
719         0,
720         "virt_db",
721         filter_creator
722     };
723 }
724
725
726 /*
727  * Local variables:
728  * c-basic-offset: 4
729  * indent-tabs-mode: nil
730  * c-file-style: "stroustrup"
731  * End:
732  * vim: shiftwidth=4 tabstop=8 expandtab
733  */