Filter virt_db returns first unsvailable database as addinfo
[metaproxy-moved-to-github.git] / src / filter_virt_db.cpp
1 /* $Id: filter_virt_db.cpp,v 1.27 2006-01-16 16:05:50 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
14 #include <boost/shared_ptr.hpp>
15
16 #include "util.hpp"
17 #include "filter_virt_db.hpp"
18
19 #include <yaz/zgdu.h>
20 #include <yaz/otherinfo.h>
21 #include <yaz/diagbib1.h>
22
23 #include <map>
24 #include <iostream>
25
26 namespace yf = yp2::filter;
27
28 namespace yp2 {
29     namespace filter {
30
31         struct Virt_db::Set {
32             Set(BackendPtr b, std::string setname);
33             Set();
34             ~Set();
35
36             BackendPtr m_backend;
37             std::string m_setname;
38         };
39         struct Virt_db::Map {
40             Map(std::list<std::string> targets, std::string route);
41             Map();
42             std::list<std::string> m_targets;
43             std::string m_route;
44         };
45         struct Virt_db::Backend {
46             yp2::Session m_backend_session;
47 #if 0
48             std::string m_backend_database;
49 #endif
50             std::list<std::string> m_frontend_databases;
51             std::list<std::string> m_targets;
52             std::string m_route;
53             bool m_named_result_sets;
54             int m_number_of_sets;
55         };
56         struct Virt_db::Frontend {
57             Frontend(Rep *rep);
58             ~Frontend();
59             yp2::Session m_session;
60             bool m_is_virtual;
61             bool m_in_use;
62             std::list<BackendPtr> m_backend_list;
63             std::map<std::string,Virt_db::Set> m_sets;
64
65             void search(Package &package, Z_APDU *apdu);
66             void present(Package &package, Z_APDU *apdu);
67             void scan(Package &package, Z_APDU *apdu);
68
69             void close(Package &package);
70             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
71
72             BackendPtr lookup_backend_from_databases(
73                 std::list<std::string> databases);
74             BackendPtr create_backend_from_databases(
75                 std::list<std::string> databases,
76                 std::string &failing_database);
77             
78             BackendPtr init_backend(std::list<std::string> database,
79                                     Package &package,
80                                     int &error_code, std::string &addinfo);
81             Rep *m_p;
82         };            
83         class Virt_db::Rep {
84             friend class Virt_db;
85             friend class Frontend;
86             
87             FrontendPtr get_frontend(Package &package);
88             void release_frontend(Package &package);
89         private:
90             boost::mutex m_sessions_mutex;
91             std::map<std::string, Virt_db::Map>m_maps;
92
93             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
94
95             boost::mutex m_mutex;
96             boost::condition m_cond_session_ready;
97             std::map<yp2::Session, FrontendPtr> m_clients;
98         };
99     }
100 }
101
102 using namespace yp2;
103
104 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::lookup_backend_from_databases(
105     std::list<std::string> databases)
106 {
107     std::list<BackendPtr>::const_iterator map_it;
108     map_it = m_backend_list.begin();
109     for (; map_it != m_backend_list.end(); map_it++)
110         if ((*map_it)->m_frontend_databases == databases)
111             return *map_it;
112     BackendPtr null;
113     return null;
114 }
115
116 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::create_backend_from_databases(
117     std::list<std::string> databases, std::string &failing_database)
118 {
119     BackendPtr b(new Backend);
120     std::list<std::string>::const_iterator db_it = databases.begin();
121
122     b->m_number_of_sets = 0;
123     b->m_frontend_databases = databases;
124     b->m_named_result_sets = false;
125
126     std::map<std::string,bool> targets_dedup;
127     for (; db_it != databases.end(); db_it++)
128     {
129         std::map<std::string, Virt_db::Map>::iterator map_it;
130         map_it = m_p->m_maps.find(*db_it);
131         if (map_it == m_p->m_maps.end())  // database not found
132         {
133             failing_database = *db_it;
134             BackendPtr ptr;
135             return ptr;
136         }
137         std::list<std::string>::const_iterator t_it =
138             map_it->second.m_targets.begin();
139         for (; t_it != map_it->second.m_targets.end(); t_it++)
140             targets_dedup[*t_it] = true;
141         b->m_route = map_it->second.m_route;
142     }
143     std::map<std::string,bool>::const_iterator tm_it = targets_dedup.begin();
144     for (; tm_it != targets_dedup.end(); tm_it++)
145         b->m_targets.push_back(tm_it->first);
146 #if 0
147     const char *sep = strchr(b->m_vhost.c_str(), '/');
148     std::string backend_database;
149     if (sep)
150         b->m_backend_database = std::string(sep+1);
151     else
152         b->m_backend_database = database;
153 #endif
154     return b;
155 }
156
157 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::init_backend(
158     std::list<std::string> databases, Package &package,
159     int &error_code, std::string &addinfo)
160 {
161     std::string failing_database;
162     BackendPtr b = create_backend_from_databases(databases, failing_database);
163     if (!b)
164     {
165         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
166         addinfo = failing_database;
167         return b;
168     }
169     Package init_package(b->m_backend_session, package.origin());
170     init_package.copy_filter(package);
171
172     yp2::odr odr;
173
174     Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
175     
176     std::list<std::string>::const_iterator t_it = b->m_targets.begin();
177     int cat = 1;
178     for (; t_it != b->m_targets.end(); t_it++, cat++)
179     {
180         yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
181                                  VAL_PROXY, cat, t_it->c_str());
182     }
183     Z_InitRequest *req = init_apdu->u.initRequest;
184
185     ODR_MASK_SET(req->options, Z_Options_search);
186     ODR_MASK_SET(req->options, Z_Options_present);
187     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
188     ODR_MASK_SET(req->options, Z_Options_scan);
189
190     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
191     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
192     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
193
194     init_package.request() = init_apdu;
195     
196     init_package.move(b->m_route);  // sending init 
197     
198     if (init_package.session().is_closed())
199     {
200         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
201         // addinfo = database;
202         BackendPtr null;
203         return null;
204     }
205     Z_GDU *gdu = init_package.response().get();
206     // we hope to get an init response
207     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
208         Z_APDU_initResponse)
209     {
210         if (ODR_MASK_GET(gdu->u.z3950->u.initResponse->options,
211                          Z_Options_namedResultSets))
212         {
213             b->m_named_result_sets = true;
214         }
215     }
216     else
217     {
218         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
219         // addinfo = database;
220         BackendPtr null;
221         return null;
222     }        
223     m_backend_list.push_back(b);
224     return b;
225 }
226
227 void yf::Virt_db::Frontend::search(Package &package, Z_APDU *apdu_req)
228 {
229     Z_SearchRequest *req = apdu_req->u.searchRequest;
230     std::string vhost;
231     std::string resultSetId = req->resultSetName;
232     yp2::odr odr;
233
234     std::list<std::string> databases;
235     int i;
236     for (i = 0; i<req->num_databaseNames; i++)
237         databases.push_back(req->databaseNames[i]);
238
239     BackendPtr b; // null for now
240     Sets_it sets_it = m_sets.find(req->resultSetName);
241     if (sets_it != m_sets.end())
242     {
243         // result set already exist 
244         // if replace indicator is off: we return diagnostic if
245         // result set already exist.
246         if (*req->replaceIndicator == 0)
247         {
248             Z_APDU *apdu = 
249                 odr.create_searchResponse(
250                     apdu_req,
251                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
252                     0);
253             package.response() = apdu;
254             
255             return;
256         } 
257         sets_it->second.m_backend->m_number_of_sets--;
258
259         // pick up any existing backend with a database match
260         std::list<BackendPtr>::const_iterator map_it;
261         map_it = m_backend_list.begin();
262         for (; map_it != m_backend_list.end(); map_it++)
263         {
264             BackendPtr tmp = *map_it;
265             if (tmp->m_frontend_databases == databases)
266                 break;
267         }
268         if (map_it != m_backend_list.end()) 
269             b = *map_it;
270     }
271     else
272     {
273         // new result set.
274
275         // pick up any existing database with named result sets ..
276         // or one which has no result sets.. yet.
277         std::list<BackendPtr>::const_iterator map_it;
278         map_it = m_backend_list.begin();
279         for (; map_it != m_backend_list.end(); map_it++)
280         {
281             BackendPtr tmp = *map_it;
282             if (tmp->m_frontend_databases == databases &&
283                 (tmp->m_named_result_sets ||
284                  tmp->m_number_of_sets == 0))
285                 break;
286         }
287         if (map_it != m_backend_list.end()) 
288             b = *map_it;
289     }
290     if (!b)  // no backend yet. Must create a new one
291     {
292         int error_code;
293         std::string addinfo;
294         b = init_backend(databases, package, error_code, addinfo);
295         if (!b)
296         {
297             // did not get a backend (unavailable somehow?)
298             
299             Z_APDU *apdu = 
300                 odr.create_searchResponse(
301                     apdu_req, error_code, addinfo.c_str());
302             package.response() = apdu;
303             return;
304         }
305     }
306     m_sets.erase(req->resultSetName);
307     // sending search to backend
308     Package search_package(b->m_backend_session, package.origin());
309
310     search_package.copy_filter(package);
311
312     std::string backend_setname;
313     if (b->m_named_result_sets)
314     {
315         backend_setname = std::string(req->resultSetName);
316     }
317     else
318     {
319         backend_setname = "default";
320         req->resultSetName = odr_strdup(odr, backend_setname.c_str());
321     }
322
323 #if 0
324     const char *backend_database = b->m_backend_database.c_str();
325     req->databaseNames[0] = odr_strdup(odr, backend_database);
326 #endif
327
328     *req->replaceIndicator = 1;
329
330     search_package.request() = yazpp_1::GDU(apdu_req);
331     
332     search_package.move(b->m_route);
333
334     if (search_package.session().is_closed())
335     {
336         Z_APDU *apdu = 
337             odr.create_searchResponse(
338                 apdu_req, YAZ_BIB1_DATABASE_UNAVAILABLE, 0);
339         package.response() = apdu;
340         return;
341     }
342     package.response() = search_package.response();
343
344     b->m_number_of_sets++;
345
346     m_sets[resultSetId] = Virt_db::Set(b, backend_setname);
347 }
348
349 yf::Virt_db::Frontend::Frontend(Rep *rep)
350 {
351     m_p = rep;
352     m_is_virtual = false;
353 }
354
355 void yf::Virt_db::Frontend::close(Package &package)
356 {
357     std::list<BackendPtr>::const_iterator b_it;
358     
359     for (b_it = m_backend_list.begin(); b_it != m_backend_list.end(); b_it++)
360     {
361         (*b_it)->m_backend_session.close();
362         Package close_package((*b_it)->m_backend_session, package.origin());
363         close_package.copy_filter(package);
364         close_package.move((*b_it)->m_route);
365     }
366     m_backend_list.clear();
367 }
368
369 yf::Virt_db::Frontend::~Frontend()
370 {
371 }
372
373 yf::Virt_db::FrontendPtr yf::Virt_db::Rep::get_frontend(Package &package)
374 {
375     boost::mutex::scoped_lock lock(m_mutex);
376
377     std::map<yp2::Session,yf::Virt_db::FrontendPtr>::iterator it;
378     
379     while(true)
380     {
381         it = m_clients.find(package.session());
382         if (it == m_clients.end())
383             break;
384         
385         if (!it->second->m_in_use)
386         {
387             it->second->m_in_use = true;
388             return it->second;
389         }
390         m_cond_session_ready.wait(lock);
391     }
392     FrontendPtr f(new Frontend(this));
393     m_clients[package.session()] = f;
394     f->m_in_use = true;
395     return f;
396 }
397
398 void yf::Virt_db::Rep::release_frontend(Package &package)
399 {
400     boost::mutex::scoped_lock lock(m_mutex);
401     std::map<yp2::Session,yf::Virt_db::FrontendPtr>::iterator it;
402     
403     it = m_clients.find(package.session());
404     if (it != m_clients.end())
405     {
406         if (package.session().is_closed())
407         {
408             it->second->close(package);
409             m_clients.erase(it);
410         }
411         else
412         {
413             it->second->m_in_use = false;
414         }
415         m_cond_session_ready.notify_all();
416     }
417 }
418
419 yf::Virt_db::Set::Set(BackendPtr b, std::string setname)
420     :  m_backend(b), m_setname(setname)
421 {
422 }
423
424
425 yf::Virt_db::Set::Set()
426 {
427 }
428
429
430 yf::Virt_db::Set::~Set()
431 {
432 }
433
434 yf::Virt_db::Map::Map(std::list<std::string> targets, std::string route)
435     : m_targets(targets), m_route(route) 
436 {
437 }
438
439 yf::Virt_db::Map::Map()
440 {
441 }
442
443 yf::Virt_db::Virt_db() : m_p(new Virt_db::Rep)
444 {
445 }
446
447 yf::Virt_db::~Virt_db() {
448 }
449
450 void yf::Virt_db::Frontend::present(Package &package, Z_APDU *apdu_req)
451 {
452     Z_PresentRequest *req = apdu_req->u.presentRequest;
453     std::string resultSetId = req->resultSetId;
454     yp2::odr odr;
455
456     Sets_it sets_it = m_sets.find(resultSetId);
457     if (sets_it == m_sets.end())
458     {
459         Z_APDU *apdu = 
460             odr.create_presentResponse(
461                 apdu_req,
462                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
463                 resultSetId.c_str());
464         package.response() = apdu;
465         return;
466     }
467     Session *id =
468         new yp2::Session(sets_it->second.m_backend->m_backend_session);
469     
470     // sending present to backend
471     Package present_package(*id, package.origin());
472     present_package.copy_filter(package);
473     
474     req->resultSetId = odr_strdup(odr, "default");
475     present_package.request() = yazpp_1::GDU(apdu_req);
476
477     present_package.move();
478
479     if (present_package.session().is_closed())
480     {
481         Z_APDU *apdu = 
482             odr.create_presentResponse(
483                 apdu_req,
484                 YAZ_BIB1_RESULT_SET_NO_LONGER_EXISTS_UNILATERALLY_DELETED_BY_,
485                 resultSetId.c_str());
486         package.response() = apdu;
487         m_sets.erase(resultSetId);
488     }
489     else
490     {
491         package.response() = present_package.response();
492     }
493     delete id;
494 }
495
496 void yf::Virt_db::Frontend::scan(Package &package, Z_APDU *apdu_req)
497 {
498     Z_ScanRequest *req = apdu_req->u.scanRequest;
499     std::string vhost;
500     yp2::odr odr;
501
502     std::list<std::string> databases;
503     int i;
504     for (i = 0; i<req->num_databaseNames; i++)
505         databases.push_back(req->databaseNames[i]);
506
507     BackendPtr b;
508     // pick up any existing backend with a database match
509     std::list<BackendPtr>::const_iterator map_it;
510     map_it = m_backend_list.begin();
511     for (; map_it != m_backend_list.end(); map_it++)
512     {
513         BackendPtr tmp = *map_it;
514         if (tmp->m_frontend_databases == databases)
515             break;
516     }
517     if (map_it != m_backend_list.end()) 
518         b = *map_it;
519     if (!b)  // no backend yet. Must create a new one
520     {
521         int error_code;
522         std::string addinfo;
523         b = init_backend(databases, package, error_code, addinfo);
524         if (!b)
525         {
526             // did not get a backend (unavailable somehow?)
527             Z_APDU *apdu =
528                 odr.create_scanResponse(
529                     apdu_req, error_code, addinfo.c_str());
530             package.response() = apdu;
531             
532             return;
533         }
534     }
535     // sending scan to backend
536     Package scan_package(b->m_backend_session, package.origin());
537
538     scan_package.copy_filter(package);
539
540 #if 0
541     const char *backend_database = b->m_backend_database.c_str();
542     req->databaseNames[0] = odr_strdup(odr, backend_database);
543 #endif
544
545     scan_package.request() = yazpp_1::GDU(apdu_req);
546     
547     scan_package.move(b->m_route);
548
549     if (scan_package.session().is_closed())
550     {
551         Z_APDU *apdu =
552             odr.create_scanResponse(
553                 apdu_req, YAZ_BIB1_DATABASE_UNAVAILABLE, 0);
554         package.response() = apdu;
555         return;
556     }
557     package.response() = scan_package.response();
558 }
559
560
561 void yf::Virt_db::add_map_db2targets(std::string db, 
562                                      std::list<std::string> targets,
563                                      std::string route)
564 {
565     m_p->m_maps[db] = Virt_db::Map(targets, route);
566 }
567
568
569 void yf::Virt_db::add_map_db2target(std::string db, 
570                                     std::string target,
571                                     std::string route)
572 {
573     std::list<std::string> targets;
574     targets.push_back(target);
575
576     m_p->m_maps[db] = Virt_db::Map(targets, route);
577 }
578
579 void yf::Virt_db::process(Package &package) const
580 {
581     FrontendPtr f = m_p->get_frontend(package);
582
583     Z_GDU *gdu = package.request().get();
584     
585     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
586         Z_APDU_initRequest && !f->m_is_virtual)
587     {
588         Z_InitRequest *req = gdu->u.z3950->u.initRequest;
589         
590         const char *vhost =
591             yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0);
592         if (!vhost)
593         {
594             yp2::odr odr;
595             Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
596             Z_InitResponse *resp = apdu->u.initResponse;
597             
598             int i;
599             static const int masks[] = {
600                 Z_Options_search,
601                 Z_Options_present,
602                 Z_Options_namedResultSets,
603                 Z_Options_scan,
604                 -1 
605             };
606             for (i = 0; masks[i] != -1; i++)
607                 if (ODR_MASK_GET(req->options, masks[i]))
608                     ODR_MASK_SET(resp->options, masks[i]);
609             
610             static const int versions[] = {
611                 Z_ProtocolVersion_1,
612                 Z_ProtocolVersion_2,
613                 Z_ProtocolVersion_3,
614                 -1
615             };
616             for (i = 0; versions[i] != -1; i++)
617                 if (ODR_MASK_GET(req->protocolVersion, versions[i]))
618                     ODR_MASK_SET(resp->protocolVersion, versions[i]);
619                 else
620                     break;
621             
622             package.response() = apdu;
623             f->m_is_virtual = true;
624         }
625         else
626             package.move();
627     }
628     else if (!f->m_is_virtual)
629         package.move();
630     else if (gdu && gdu->which == Z_GDU_Z3950)
631     {
632         Z_APDU *apdu = gdu->u.z3950;
633         if (apdu->which == Z_APDU_initRequest)
634         {
635             yp2::odr odr;
636             
637             package.response() = odr.create_close(
638                 apdu,
639                 Z_Close_protocolError,
640                 "double init");
641             
642             package.session().close();
643         }
644         else if (apdu->which == Z_APDU_searchRequest)
645         {
646             f->search(package, apdu);
647         }
648         else if (apdu->which == Z_APDU_presentRequest)
649         {
650             f->present(package, apdu);
651         }
652         else if (apdu->which == Z_APDU_scanRequest)
653         {
654             f->scan(package, apdu);
655         }
656         else
657         {
658             yp2::odr odr;
659             
660             package.response() = odr.create_close(
661                 apdu, Z_Close_protocolError,
662                 "unsupported APDU in filter_virt_db");
663             
664             package.session().close();
665         }
666     }
667     m_p->release_frontend(package);
668 }
669
670
671 void yp2::filter::Virt_db::configure(const xmlNode * ptr)
672 {
673     for (ptr = ptr->children; ptr; ptr = ptr->next)
674     {
675         if (ptr->type != XML_ELEMENT_NODE)
676             continue;
677         if (!strcmp((const char *) ptr->name, "virtual"))
678         {
679             std::string database;
680             std::list<std::string> targets;
681             xmlNode *v_node = ptr->children;
682             for (; v_node; v_node = v_node->next)
683             {
684                 if (v_node->type != XML_ELEMENT_NODE)
685                     continue;
686                 
687                 if (yp2::xml::is_element_yp2(v_node, "database"))
688                     database = yp2::xml::get_text(v_node);
689                 else if (yp2::xml::is_element_yp2(v_node, "target"))
690                     targets.push_back(yp2::xml::get_text(v_node));
691                 else
692                     throw yp2::filter::FilterException
693                         ("Bad element " 
694                          + std::string((const char *) v_node->name)
695                          + " in virtual section"
696                             );
697             }
698             std::string route = yp2::xml::get_route(ptr);
699             add_map_db2targets(database, targets, route);
700 #if 0
701             std::cout << "Add " << database << "->" << target
702                       << "," << route << "\n";
703 #endif
704         }
705         else
706         {
707             throw yp2::filter::FilterException
708                 ("Bad element " 
709                  + std::string((const char *) ptr->name)
710                  + " in virt_db filter");
711         }
712     }
713 }
714
715 static yp2::filter::Base* filter_creator()
716 {
717     return new yp2::filter::Virt_db;
718 }
719
720 extern "C" {
721     struct yp2_filter_struct yp2_filter_virt_db = {
722         0,
723         "virt_db",
724         filter_creator
725     };
726 }
727
728
729 /*
730  * Local variables:
731  * c-basic-offset: 4
732  * indent-tabs-mode: nil
733  * c-file-style: "stroustrup"
734  * End:
735  * vim: shiftwidth=4 tabstop=8 expandtab
736  */