Use FrontendPtr in virt_db filter
[metaproxy-moved-to-github.git] / src / filter_virt_db.cpp
1 /* $Id: filter_virt_db.cpp,v 1.24 2006-01-14 08:38:57 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
14 #include <boost/shared_ptr.hpp>
15
16 #include "util.hpp"
17 #include "filter_virt_db.hpp"
18
19 #include <yaz/zgdu.h>
20 #include <yaz/otherinfo.h>
21 #include <yaz/diagbib1.h>
22
23 #include <map>
24 #include <iostream>
25
26 namespace yf = yp2::filter;
27
28 namespace yp2 {
29     namespace filter {
30
31         struct Virt_db::Set {
32             Set(BackendPtr b, std::string setname);
33             Set();
34             ~Set();
35
36             BackendPtr m_backend;
37             std::string m_setname;
38         };
39         struct Virt_db::Map {
40             Map(std::string vhost, std::string route);
41             Map();
42             std::string m_vhost;
43             std::string m_route;
44         };
45         struct Virt_db::Backend {
46             yp2::Session m_backend_session;
47             std::string m_backend_database;
48             std::string m_frontend_database;
49             std::string m_vhost;
50             std::string m_route;
51             bool m_named_result_sets;
52             int m_number_of_sets;
53         };
54         struct Virt_db::Frontend {
55             Frontend(Rep *rep);
56             ~Frontend();
57             yp2::Session m_session;
58             bool m_is_virtual;
59             bool m_in_use;
60             std::list<BackendPtr> m_backend_list;
61             std::map<std::string,Virt_db::Set> m_sets;
62
63             void search(Package &package, Z_APDU *apdu);
64             void present(Package &package, Z_APDU *apdu);
65             void scan(Package &package, Z_APDU *apdu);
66
67             void close(Package &package);
68             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
69
70             BackendPtr lookup_backend_from_database(std::string database);
71             BackendPtr create_backend_from_database(std::string database);
72             
73             BackendPtr init_backend(std::string database, Package &package,
74                                     int &error_code, std::string &addinfo);
75             Rep *m_p;
76         };            
77         class Virt_db::Rep {
78             friend class Virt_db;
79             friend class Frontend;
80             
81             FrontendPtr get_frontend(Package &package);
82             void release_frontend(Package &package);
83         private:
84             boost::mutex m_sessions_mutex;
85             std::map<std::string, Virt_db::Map>m_maps;
86
87             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
88
89             boost::mutex m_mutex;
90             boost::condition m_cond_session_ready;
91             std::map<yp2::Session, FrontendPtr> m_clients;
92         };
93     }
94 }
95
96 using namespace yp2;
97
98 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::lookup_backend_from_database(
99     std::string database)
100 {
101     std::list<BackendPtr>::const_iterator map_it;
102     map_it = m_backend_list.begin();
103     for (; map_it != m_backend_list.end(); map_it++)
104         if ((*map_it)->m_frontend_database == database)
105             return *map_it;
106     BackendPtr null;
107     return null;
108 }
109
110 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::create_backend_from_database(
111     std::string database)
112 {
113     std::map<std::string, Virt_db::Map>::iterator map_it;
114     map_it = m_p->m_maps.find(database);
115     if (map_it == m_p->m_maps.end())
116     {
117         BackendPtr ptr;
118         return ptr;
119     }
120     BackendPtr b(new Backend);
121
122     b->m_number_of_sets = 0;
123     b->m_frontend_database = database;
124     b->m_named_result_sets = false;
125     b->m_route = map_it->second.m_route;
126     b->m_vhost = map_it->second.m_vhost;
127     const char *sep = strchr(b->m_vhost.c_str(), '/');
128     std::string backend_database;
129     if (sep)
130         b->m_backend_database = std::string(sep+1);
131     else
132         b->m_backend_database = database;
133
134     return b;
135 }
136
137 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::init_backend(
138     std::string database, Package &package,
139     int &error_code, std::string &addinfo)
140 {
141     BackendPtr b = create_backend_from_database(database);
142     if (!b)
143     {
144         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
145         addinfo = database;
146         return b;
147     }
148     Package init_package(b->m_backend_session, package.origin());
149     init_package.copy_filter(package);
150
151     yp2::odr odr;
152
153     Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
154     
155     yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
156                              VAL_PROXY, 1, b->m_vhost.c_str());
157     
158     Z_InitRequest *req = init_apdu->u.initRequest;
159
160     ODR_MASK_SET(req->options, Z_Options_search);
161     ODR_MASK_SET(req->options, Z_Options_present);
162     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
163     ODR_MASK_SET(req->options, Z_Options_scan);
164
165     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
166     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
167     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
168
169     init_package.request() = init_apdu;
170     
171     init_package.move(b->m_route);  // sending init 
172     
173     if (init_package.session().is_closed())
174     {
175         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
176         addinfo = database;
177         BackendPtr null;
178         return null;
179     }
180     Z_GDU *gdu = init_package.response().get();
181     // we hope to get an init response
182     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
183         Z_APDU_initResponse)
184     {
185         if (ODR_MASK_GET(gdu->u.z3950->u.initResponse->options,
186                          Z_Options_namedResultSets))
187         {
188             b->m_named_result_sets = true;
189         }
190     }
191     else
192     {
193         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
194         addinfo = database;
195         BackendPtr null;
196         return null;
197     }        
198     m_backend_list.push_back(b);
199     return b;
200 }
201
202 void yf::Virt_db::Frontend::search(Package &package, Z_APDU *apdu_req)
203 {
204     Z_SearchRequest *req = apdu_req->u.searchRequest;
205     std::string vhost;
206     std::string resultSetId = req->resultSetName;
207     yp2::odr odr;
208
209     // only one datatabase for now
210     if (req->num_databaseNames != 1)
211     {   // exactly one database must be specified
212         Z_APDU *apdu =
213             odr.create_searchResponse(
214                 apdu_req, YAZ_BIB1_TOO_MANY_DATABASES_SPECIFIED, 0);
215         package.response() = apdu;
216         
217         return;
218     }
219     std::string database = std::string(req->databaseNames[0]);
220
221     BackendPtr b; // null for now
222     Sets_it sets_it = m_sets.find(req->resultSetName);
223     if (sets_it != m_sets.end())
224     {
225         // result set already exist 
226         // if replace indicator is off: we return diagnostic if
227         // result set already exist.
228         if (*req->replaceIndicator == 0)
229         {
230             Z_APDU *apdu = 
231                 odr.create_searchResponse(
232                     apdu_req,
233                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
234                     0);
235             package.response() = apdu;
236             
237             return;
238         } 
239         sets_it->second.m_backend->m_number_of_sets--;
240
241         // pick up any existing backend with a database match
242         std::list<BackendPtr>::const_iterator map_it;
243         map_it = m_backend_list.begin();
244         for (; map_it != m_backend_list.end(); map_it++)
245         {
246             BackendPtr tmp = *map_it;
247             if (tmp->m_frontend_database == database)
248                 break;
249         }
250         if (map_it != m_backend_list.end()) 
251             b = *map_it;
252     }
253     else
254     {
255         // new result set.
256
257         // pick up any existing database with named result sets ..
258         // or one which has no result sets.. yet.
259         std::list<BackendPtr>::const_iterator map_it;
260         map_it = m_backend_list.begin();
261         for (; map_it != m_backend_list.end(); map_it++)
262         {
263             BackendPtr tmp = *map_it;
264             if (tmp->m_frontend_database == database &&
265                 (tmp->m_named_result_sets ||
266                  tmp->m_number_of_sets == 0))
267                 break;
268         }
269         if (map_it != m_backend_list.end()) 
270             b = *map_it;
271     }
272     if (!b)  // no backend yet. Must create a new one
273     {
274         int error_code;
275         std::string addinfo;
276         b = init_backend(database, package, error_code, addinfo);
277         if (!b)
278         {
279             // did not get a backend (unavailable somehow?)
280             
281             Z_APDU *apdu = 
282                 odr.create_searchResponse(
283                     apdu_req, error_code, addinfo.c_str());
284             package.response() = apdu;
285             return;
286         }
287     }
288     m_sets.erase(req->resultSetName);
289     // sending search to backend
290     Package search_package(b->m_backend_session, package.origin());
291
292     search_package.copy_filter(package);
293
294     std::string backend_setname;
295     if (b->m_named_result_sets)
296     {
297         backend_setname = std::string(req->resultSetName);
298     }
299     else
300     {
301         backend_setname = "default";
302         req->resultSetName = odr_strdup(odr, backend_setname.c_str());
303     }
304
305     const char *backend_database = b->m_backend_database.c_str();
306     req->databaseNames[0] = odr_strdup(odr, backend_database);
307
308     *req->replaceIndicator = 1;
309
310     search_package.request() = yazpp_1::GDU(apdu_req);
311     
312     search_package.move(b->m_route);
313
314     if (search_package.session().is_closed())
315     {
316         Z_APDU *apdu = 
317             odr.create_searchResponse(
318                 apdu_req,
319                 YAZ_BIB1_DATABASE_UNAVAILABLE, database.c_str());
320         package.response() = apdu;
321         return;
322     }
323     package.response() = search_package.response();
324
325     b->m_number_of_sets++;
326
327     m_sets[resultSetId] = Virt_db::Set(b, backend_setname);
328 }
329
330 yf::Virt_db::Frontend::Frontend(Rep *rep)
331 {
332     m_p = rep;
333     m_is_virtual = false;
334 }
335
336 void yf::Virt_db::Frontend::close(Package &package)
337 {
338     std::list<BackendPtr>::const_iterator b_it;
339     
340     for (b_it = m_backend_list.begin(); b_it != m_backend_list.end(); b_it++)
341     {
342         (*b_it)->m_backend_session.close();
343         Package close_package((*b_it)->m_backend_session, package.origin());
344         close_package.copy_filter(package);
345         close_package.move((*b_it)->m_route);
346     }
347     m_backend_list.clear();
348 }
349
350 yf::Virt_db::Frontend::~Frontend()
351 {
352 }
353
354 yf::Virt_db::FrontendPtr yf::Virt_db::Rep::get_frontend(Package &package)
355 {
356     boost::mutex::scoped_lock lock(m_mutex);
357
358     std::map<yp2::Session,yf::Virt_db::FrontendPtr>::iterator it;
359     
360     while(true)
361     {
362         it = m_clients.find(package.session());
363         if (it == m_clients.end())
364             break;
365         
366         if (!it->second->m_in_use)
367         {
368             it->second->m_in_use = true;
369             return it->second;
370         }
371         m_cond_session_ready.wait(lock);
372     }
373     FrontendPtr f(new Frontend(this));
374     m_clients[package.session()] = f;
375     f->m_in_use = true;
376     return f;
377 }
378
379 void yf::Virt_db::Rep::release_frontend(Package &package)
380 {
381     boost::mutex::scoped_lock lock(m_mutex);
382     std::map<yp2::Session,yf::Virt_db::FrontendPtr>::iterator it;
383     
384     it = m_clients.find(package.session());
385     if (it != m_clients.end())
386     {
387         if (package.session().is_closed())
388         {
389             it->second->close(package);
390             m_clients.erase(it);
391         }
392         else
393         {
394             it->second->m_in_use = false;
395         }
396         m_cond_session_ready.notify_all();
397     }
398 }
399
400 yf::Virt_db::Set::Set(BackendPtr b, std::string setname)
401     :  m_backend(b), m_setname(setname)
402 {
403 }
404
405
406 yf::Virt_db::Set::Set()
407 {
408 }
409
410
411 yf::Virt_db::Set::~Set()
412 {
413 }
414
415 yf::Virt_db::Map::Map(std::string vhost, std::string route)
416     : m_vhost(vhost), m_route(route) 
417 {
418 }
419
420 yf::Virt_db::Map::Map()
421 {
422 }
423
424 yf::Virt_db::Virt_db() : m_p(new Virt_db::Rep)
425 {
426 }
427
428 yf::Virt_db::~Virt_db() {
429 }
430
431 void yf::Virt_db::Frontend::present(Package &package, Z_APDU *apdu_req)
432 {
433     Z_PresentRequest *req = apdu_req->u.presentRequest;
434     std::string resultSetId = req->resultSetId;
435     yp2::odr odr;
436
437     Sets_it sets_it = m_sets.find(resultSetId);
438     if (sets_it == m_sets.end())
439     {
440         Z_APDU *apdu = 
441             odr.create_presentResponse(
442                 apdu_req,
443                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
444                 resultSetId.c_str());
445         package.response() = apdu;
446         
447         return;
448     }
449     Session *id =
450         new yp2::Session(sets_it->second.m_backend->m_backend_session);
451     
452     // sending present to backend
453     Package present_package(*id, package.origin());
454     present_package.copy_filter(package);
455     
456     req->resultSetId = odr_strdup(odr, "default");
457     present_package.request() = yazpp_1::GDU(apdu_req);
458
459     present_package.move();
460
461     if (present_package.session().is_closed())
462     {
463         Z_APDU *apdu = 
464             odr.create_presentResponse(
465                 apdu_req,
466                 YAZ_BIB1_RESULT_SET_NO_LONGER_EXISTS_UNILATERALLY_DELETED_BY_,
467                 resultSetId.c_str());
468         package.response() = apdu;
469         m_sets.erase(resultSetId);
470     }
471     else
472     {
473         package.response() = present_package.response();
474     }
475     delete id;
476 }
477
478 void yf::Virt_db::Frontend::scan(Package &package, Z_APDU *apdu_req)
479 {
480     Z_ScanRequest *req = apdu_req->u.scanRequest;
481     std::string vhost;
482     yp2::odr odr;
483
484     // only one datatabase for now
485     if (req->num_databaseNames != 1)
486     {   // exactly one database must be specified
487         Z_APDU *apdu =
488             odr.create_scanResponse(
489                 apdu_req,
490                 YAZ_BIB1_TOO_MANY_DATABASES_SPECIFIED, 0);
491         package.response() = apdu;
492         return;
493     }
494     std::string database = std::string(req->databaseNames[0]);
495     
496     BackendPtr b;
497     // pick up any existing backend with a database match
498     std::list<BackendPtr>::const_iterator map_it;
499     map_it = m_backend_list.begin();
500     for (; map_it != m_backend_list.end(); map_it++)
501     {
502         BackendPtr tmp = *map_it;
503         if (tmp->m_frontend_database == database)
504             break;
505     }
506     if (map_it != m_backend_list.end()) 
507         b = *map_it;
508     if (!b)  // no backend yet. Must create a new one
509     {
510         int error_code;
511         std::string addinfo;
512         b = init_backend(database, package, error_code, addinfo);
513         if (!b)
514         {
515             // did not get a backend (unavailable somehow?)
516             Z_APDU *apdu =
517                 odr.create_scanResponse(
518                     apdu_req, error_code, addinfo.c_str());
519             package.response() = apdu;
520             
521             return;
522         }
523     }
524     // sending scan to backend
525     Package scan_package(b->m_backend_session, package.origin());
526
527     scan_package.copy_filter(package);
528
529     const char *backend_database = b->m_backend_database.c_str();
530     req->databaseNames[0] = odr_strdup(odr, backend_database);
531
532     scan_package.request() = yazpp_1::GDU(apdu_req);
533     
534     scan_package.move(b->m_route);
535
536     if (scan_package.session().is_closed())
537     {
538         Z_APDU *apdu =
539             odr.create_scanResponse(
540                 apdu_req, YAZ_BIB1_DATABASE_UNAVAILABLE, database.c_str());
541         package.response() = apdu;
542         return;
543     }
544     package.response() = scan_package.response();
545 }
546
547
548 void yf::Virt_db::add_map_db2vhost(std::string db, std::string vhost,
549                                    std::string route)
550 {
551     m_p->m_maps[db] = Virt_db::Map(vhost, route);
552 }
553
554 void yf::Virt_db::process(Package &package) const
555 {
556     FrontendPtr f = m_p->get_frontend(package);
557
558     Z_GDU *gdu = package.request().get();
559     
560     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
561         Z_APDU_initRequest && !f->m_is_virtual)
562     {
563         Z_InitRequest *req = gdu->u.z3950->u.initRequest;
564         
565         const char *vhost =
566             yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0);
567         if (!vhost)
568         {
569             yp2::odr odr;
570             Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
571             Z_InitResponse *resp = apdu->u.initResponse;
572             
573             int i;
574             static const int masks[] = {
575                 Z_Options_search,
576                 Z_Options_present,
577                 Z_Options_namedResultSets,
578                 Z_Options_scan,
579                 -1 
580             };
581             for (i = 0; masks[i] != -1; i++)
582                 if (ODR_MASK_GET(req->options, masks[i]))
583                     ODR_MASK_SET(resp->options, masks[i]);
584             
585             static const int versions[] = {
586                 Z_ProtocolVersion_1,
587                 Z_ProtocolVersion_2,
588                 Z_ProtocolVersion_3,
589                 -1
590             };
591             for (i = 0; versions[i] != -1; i++)
592                 if (ODR_MASK_GET(req->protocolVersion, versions[i]))
593                     ODR_MASK_SET(resp->protocolVersion, versions[i]);
594                 else
595                     break;
596             
597             package.response() = apdu;
598             f->m_is_virtual = true;
599         }
600         else
601             package.move();
602     }
603     else if (!f->m_is_virtual)
604         package.move();
605     else if (gdu && gdu->which == Z_GDU_Z3950)
606     {
607         Z_APDU *apdu = gdu->u.z3950;
608         if (apdu->which == Z_APDU_initRequest)
609         {
610             yp2::odr odr;
611             
612             package.response() = odr.create_close(
613                 apdu,
614                 Z_Close_protocolError,
615                 "double init");
616             
617             package.session().close();
618         }
619         else if (apdu->which == Z_APDU_searchRequest)
620         {
621             f->search(package, apdu);
622         }
623         else if (apdu->which == Z_APDU_presentRequest)
624         {
625             f->present(package, apdu);
626         }
627         else if (apdu->which == Z_APDU_scanRequest)
628         {
629             f->scan(package, apdu);
630         }
631         else
632         {
633             yp2::odr odr;
634             
635             package.response() = odr.create_close(
636                 apdu, Z_Close_protocolError,
637                 "unsupported APDU in filter_virt_db");
638             
639             package.session().close();
640         }
641     }
642     m_p->release_frontend(package);
643 }
644
645
646 void yp2::filter::Virt_db::configure(const xmlNode * ptr)
647 {
648     for (ptr = ptr->children; ptr; ptr = ptr->next)
649     {
650         if (ptr->type != XML_ELEMENT_NODE)
651             continue;
652         if (!strcmp((const char *) ptr->name, "virtual"))
653         {
654             std::string database;
655             std::string target;
656             xmlNode *v_node = ptr->children;
657             for (; v_node; v_node = v_node->next)
658             {
659                 if (v_node->type != XML_ELEMENT_NODE)
660                     continue;
661                 
662                 if (yp2::xml::is_element_yp2(v_node, "database"))
663                     database = yp2::xml::get_text(v_node);
664                 else if (yp2::xml::is_element_yp2(v_node, "target"))
665                     target = yp2::xml::get_text(v_node);
666                 else
667                     throw yp2::filter::FilterException
668                         ("Bad element " 
669                          + std::string((const char *) v_node->name)
670                          + " in virtual section"
671                             );
672             }
673             std::string route = yp2::xml::get_route(ptr);
674             add_map_db2vhost(database, target, route);
675             std::cout << "Add " << database << "->" << target
676                       << "," << route << "\n";
677         }
678         else
679         {
680             throw yp2::filter::FilterException
681                 ("Bad element " 
682                  + std::string((const char *) ptr->name)
683                  + " in virt_db filter");
684         }
685     }
686 }
687
688 static yp2::filter::Base* filter_creator()
689 {
690     return new yp2::filter::Virt_db;
691 }
692
693 extern "C" {
694     struct yp2_filter_struct yp2_filter_virt_db = {
695         0,
696         "virt_db",
697         filter_creator
698     };
699 }
700
701
702 /*
703  * Local variables:
704  * c-basic-offset: 4
705  * indent-tabs-mode: nil
706  * c-file-style: "stroustrup"
707  * End:
708  * vim: shiftwidth=4 tabstop=8 expandtab
709  */