Use proper resultSetId
[metaproxy-moved-to-github.git] / src / filter_virt_db.cpp
1 /* $Id: filter_virt_db.cpp,v 1.28 2006-01-16 17:03:09 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
14 #include <boost/shared_ptr.hpp>
15
16 #include "util.hpp"
17 #include "filter_virt_db.hpp"
18
19 #include <yaz/zgdu.h>
20 #include <yaz/otherinfo.h>
21 #include <yaz/diagbib1.h>
22
23 #include <map>
24 #include <iostream>
25
26 namespace yf = yp2::filter;
27
28 namespace yp2 {
29     namespace filter {
30
31         struct Virt_db::Set {
32             Set(BackendPtr b, std::string setname);
33             Set();
34             ~Set();
35
36             BackendPtr m_backend;
37             std::string m_setname;
38         };
39         struct Virt_db::Map {
40             Map(std::list<std::string> targets, std::string route);
41             Map();
42             std::list<std::string> m_targets;
43             std::string m_route;
44         };
45         struct Virt_db::Backend {
46             yp2::Session m_backend_session;
47 #if 0
48             std::string m_backend_database;
49 #endif
50             std::list<std::string> m_frontend_databases;
51             std::list<std::string> m_targets;
52             std::string m_route;
53             bool m_named_result_sets;
54             int m_number_of_sets;
55         };
56         struct Virt_db::Frontend {
57             Frontend(Rep *rep);
58             ~Frontend();
59             yp2::Session m_session;
60             bool m_is_virtual;
61             bool m_in_use;
62             std::list<BackendPtr> m_backend_list;
63             std::map<std::string,Virt_db::Set> m_sets;
64
65             void search(Package &package, Z_APDU *apdu);
66             void present(Package &package, Z_APDU *apdu);
67             void scan(Package &package, Z_APDU *apdu);
68
69             void close(Package &package);
70             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
71
72             BackendPtr lookup_backend_from_databases(
73                 std::list<std::string> databases);
74             BackendPtr create_backend_from_databases(
75                 std::list<std::string> databases,
76                 std::string &failing_database);
77             
78             BackendPtr init_backend(std::list<std::string> database,
79                                     Package &package,
80                                     int &error_code, std::string &addinfo);
81             Rep *m_p;
82         };            
83         class Virt_db::Rep {
84             friend class Virt_db;
85             friend class Frontend;
86             
87             FrontendPtr get_frontend(Package &package);
88             void release_frontend(Package &package);
89         private:
90             boost::mutex m_sessions_mutex;
91             std::map<std::string, Virt_db::Map>m_maps;
92
93             typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
94
95             boost::mutex m_mutex;
96             boost::condition m_cond_session_ready;
97             std::map<yp2::Session, FrontendPtr> m_clients;
98         };
99     }
100 }
101
102 using namespace yp2;
103
104 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::lookup_backend_from_databases(
105     std::list<std::string> databases)
106 {
107     std::list<BackendPtr>::const_iterator map_it;
108     map_it = m_backend_list.begin();
109     for (; map_it != m_backend_list.end(); map_it++)
110         if ((*map_it)->m_frontend_databases == databases)
111             return *map_it;
112     BackendPtr null;
113     return null;
114 }
115
116 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::create_backend_from_databases(
117     std::list<std::string> databases, std::string &failing_database)
118 {
119     BackendPtr b(new Backend);
120     std::list<std::string>::const_iterator db_it = databases.begin();
121
122     b->m_number_of_sets = 0;
123     b->m_frontend_databases = databases;
124     b->m_named_result_sets = false;
125
126     std::map<std::string,bool> targets_dedup;
127     for (; db_it != databases.end(); db_it++)
128     {
129         std::map<std::string, Virt_db::Map>::iterator map_it;
130         map_it = m_p->m_maps.find(*db_it);
131         if (map_it == m_p->m_maps.end())  // database not found
132         {
133             failing_database = *db_it;
134             BackendPtr ptr;
135             return ptr;
136         }
137         std::list<std::string>::const_iterator t_it =
138             map_it->second.m_targets.begin();
139         for (; t_it != map_it->second.m_targets.end(); t_it++)
140             targets_dedup[*t_it] = true;
141         b->m_route = map_it->second.m_route;
142     }
143     std::map<std::string,bool>::const_iterator tm_it = targets_dedup.begin();
144     for (; tm_it != targets_dedup.end(); tm_it++)
145         b->m_targets.push_back(tm_it->first);
146 #if 0
147     const char *sep = strchr(b->m_vhost.c_str(), '/');
148     std::string backend_database;
149     if (sep)
150         b->m_backend_database = std::string(sep+1);
151     else
152         b->m_backend_database = database;
153 #endif
154     return b;
155 }
156
157 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::init_backend(
158     std::list<std::string> databases, Package &package,
159     int &error_code, std::string &addinfo)
160 {
161     std::string failing_database;
162     BackendPtr b = create_backend_from_databases(databases, failing_database);
163     if (!b)
164     {
165         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
166         addinfo = failing_database;
167         return b;
168     }
169     Package init_package(b->m_backend_session, package.origin());
170     init_package.copy_filter(package);
171
172     yp2::odr odr;
173
174     Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
175     
176     std::list<std::string>::const_iterator t_it = b->m_targets.begin();
177     int cat = 1;
178     for (; t_it != b->m_targets.end(); t_it++, cat++)
179     {
180         yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
181                                  VAL_PROXY, cat, t_it->c_str());
182     }
183     Z_InitRequest *req = init_apdu->u.initRequest;
184
185     ODR_MASK_SET(req->options, Z_Options_search);
186     ODR_MASK_SET(req->options, Z_Options_present);
187     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
188     ODR_MASK_SET(req->options, Z_Options_scan);
189
190     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
191     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
192     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
193
194     init_package.request() = init_apdu;
195     
196     init_package.move(b->m_route);  // sending init 
197     
198     if (init_package.session().is_closed())
199     {
200         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
201         // addinfo = database;
202         BackendPtr null;
203         return null;
204     }
205     Z_GDU *gdu = init_package.response().get();
206     // we hope to get an init response
207     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
208         Z_APDU_initResponse)
209     {
210         if (ODR_MASK_GET(gdu->u.z3950->u.initResponse->options,
211                          Z_Options_namedResultSets))
212         {
213             b->m_named_result_sets = true;
214         }
215     }
216     else
217     {
218         error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
219         // addinfo = database;
220         BackendPtr null;
221         return null;
222     }        
223     m_backend_list.push_back(b);
224     return b;
225 }
226
227 void yf::Virt_db::Frontend::search(Package &package, Z_APDU *apdu_req)
228 {
229     Z_SearchRequest *req = apdu_req->u.searchRequest;
230     std::string vhost;
231     std::string resultSetId = req->resultSetName;
232     yp2::odr odr;
233
234     std::list<std::string> databases;
235     int i;
236     for (i = 0; i<req->num_databaseNames; i++)
237         databases.push_back(req->databaseNames[i]);
238
239     BackendPtr b; // null for now
240     Sets_it sets_it = m_sets.find(req->resultSetName);
241     if (sets_it != m_sets.end())
242     {
243         // result set already exist 
244         // if replace indicator is off: we return diagnostic if
245         // result set already exist.
246         if (*req->replaceIndicator == 0)
247         {
248             Z_APDU *apdu = 
249                 odr.create_searchResponse(
250                     apdu_req,
251                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
252                     0);
253             package.response() = apdu;
254             
255             return;
256         } 
257         sets_it->second.m_backend->m_number_of_sets--;
258
259         // pick up any existing backend with a database match
260         std::list<BackendPtr>::const_iterator map_it;
261         map_it = m_backend_list.begin();
262         for (; map_it != m_backend_list.end(); map_it++)
263         {
264             BackendPtr tmp = *map_it;
265             if (tmp->m_frontend_databases == databases)
266                 break;
267         }
268         if (map_it != m_backend_list.end()) 
269             b = *map_it;
270     }
271     else
272     {
273         // new result set.
274
275         // pick up any existing database with named result sets ..
276         // or one which has no result sets.. yet.
277         std::list<BackendPtr>::const_iterator map_it;
278         map_it = m_backend_list.begin();
279         for (; map_it != m_backend_list.end(); map_it++)
280         {
281             BackendPtr tmp = *map_it;
282             if (tmp->m_frontend_databases == databases &&
283                 (tmp->m_named_result_sets ||
284                  tmp->m_number_of_sets == 0))
285                 break;
286         }
287         if (map_it != m_backend_list.end()) 
288             b = *map_it;
289     }
290     if (!b)  // no backend yet. Must create a new one
291     {
292         int error_code;
293         std::string addinfo;
294         b = init_backend(databases, package, error_code, addinfo);
295         if (!b)
296         {
297             // did not get a backend (unavailable somehow?)
298             
299             Z_APDU *apdu = 
300                 odr.create_searchResponse(
301                     apdu_req, error_code, addinfo.c_str());
302             package.response() = apdu;
303             return;
304         }
305     }
306     m_sets.erase(req->resultSetName);
307     // sending search to backend
308     Package search_package(b->m_backend_session, package.origin());
309
310     search_package.copy_filter(package);
311
312     std::string backend_setname;
313     if (b->m_named_result_sets)
314     {
315         backend_setname = std::string(req->resultSetName);
316     }
317     else
318     {
319         backend_setname = "default";
320         req->resultSetName = odr_strdup(odr, backend_setname.c_str());
321     }
322
323 #if 0
324     const char *backend_database = b->m_backend_database.c_str();
325     req->databaseNames[0] = odr_strdup(odr, backend_database);
326 #endif
327
328     *req->replaceIndicator = 1;
329
330     search_package.request() = yazpp_1::GDU(apdu_req);
331     
332     search_package.move(b->m_route);
333
334     if (search_package.session().is_closed())
335     {
336         Z_APDU *apdu = 
337             odr.create_searchResponse(
338                 apdu_req, YAZ_BIB1_DATABASE_UNAVAILABLE, 0);
339         package.response() = apdu;
340         return;
341     }
342     package.response() = search_package.response();
343
344     b->m_number_of_sets++;
345
346     m_sets[resultSetId] = Virt_db::Set(b, backend_setname);
347 }
348
349 yf::Virt_db::Frontend::Frontend(Rep *rep)
350 {
351     m_p = rep;
352     m_is_virtual = false;
353 }
354
355 void yf::Virt_db::Frontend::close(Package &package)
356 {
357     std::list<BackendPtr>::const_iterator b_it;
358     
359     for (b_it = m_backend_list.begin(); b_it != m_backend_list.end(); b_it++)
360     {
361         (*b_it)->m_backend_session.close();
362         Package close_package((*b_it)->m_backend_session, package.origin());
363         close_package.copy_filter(package);
364         close_package.move((*b_it)->m_route);
365     }
366     m_backend_list.clear();
367 }
368
369 yf::Virt_db::Frontend::~Frontend()
370 {
371 }
372
373 yf::Virt_db::FrontendPtr yf::Virt_db::Rep::get_frontend(Package &package)
374 {
375     boost::mutex::scoped_lock lock(m_mutex);
376
377     std::map<yp2::Session,yf::Virt_db::FrontendPtr>::iterator it;
378     
379     while(true)
380     {
381         it = m_clients.find(package.session());
382         if (it == m_clients.end())
383             break;
384         
385         if (!it->second->m_in_use)
386         {
387             it->second->m_in_use = true;
388             return it->second;
389         }
390         m_cond_session_ready.wait(lock);
391     }
392     FrontendPtr f(new Frontend(this));
393     m_clients[package.session()] = f;
394     f->m_in_use = true;
395     return f;
396 }
397
398 void yf::Virt_db::Rep::release_frontend(Package &package)
399 {
400     boost::mutex::scoped_lock lock(m_mutex);
401     std::map<yp2::Session,yf::Virt_db::FrontendPtr>::iterator it;
402     
403     it = m_clients.find(package.session());
404     if (it != m_clients.end())
405     {
406         if (package.session().is_closed())
407         {
408             it->second->close(package);
409             m_clients.erase(it);
410         }
411         else
412         {
413             it->second->m_in_use = false;
414         }
415         m_cond_session_ready.notify_all();
416     }
417 }
418
419 yf::Virt_db::Set::Set(BackendPtr b, std::string setname)
420     :  m_backend(b), m_setname(setname)
421 {
422 }
423
424
425 yf::Virt_db::Set::Set()
426 {
427 }
428
429
430 yf::Virt_db::Set::~Set()
431 {
432 }
433
434 yf::Virt_db::Map::Map(std::list<std::string> targets, std::string route)
435     : m_targets(targets), m_route(route) 
436 {
437 }
438
439 yf::Virt_db::Map::Map()
440 {
441 }
442
443 yf::Virt_db::Virt_db() : m_p(new Virt_db::Rep)
444 {
445 }
446
447 yf::Virt_db::~Virt_db() {
448 }
449
450 void yf::Virt_db::Frontend::present(Package &package, Z_APDU *apdu_req)
451 {
452     Z_PresentRequest *req = apdu_req->u.presentRequest;
453     std::string resultSetId = req->resultSetId;
454     yp2::odr odr;
455
456     Sets_it sets_it = m_sets.find(resultSetId);
457     if (sets_it == m_sets.end())
458     {
459         Z_APDU *apdu = 
460             odr.create_presentResponse(
461                 apdu_req,
462                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
463                 resultSetId.c_str());
464         package.response() = apdu;
465         return;
466     }
467     Session *id =
468         new yp2::Session(sets_it->second.m_backend->m_backend_session);
469     
470     // sending present to backend
471     Package present_package(*id, package.origin());
472     present_package.copy_filter(package);
473
474     req->resultSetId = odr_strdup(odr, sets_it->second.m_setname.c_str());
475     
476     present_package.request() = yazpp_1::GDU(apdu_req);
477
478     present_package.move();
479
480     if (present_package.session().is_closed())
481     {
482         Z_APDU *apdu = 
483             odr.create_presentResponse(
484                 apdu_req,
485                 YAZ_BIB1_RESULT_SET_NO_LONGER_EXISTS_UNILATERALLY_DELETED_BY_,
486                 resultSetId.c_str());
487         package.response() = apdu;
488         m_sets.erase(resultSetId);
489     }
490     else
491     {
492         package.response() = present_package.response();
493     }
494     delete id;
495 }
496
497 void yf::Virt_db::Frontend::scan(Package &package, Z_APDU *apdu_req)
498 {
499     Z_ScanRequest *req = apdu_req->u.scanRequest;
500     std::string vhost;
501     yp2::odr odr;
502
503     std::list<std::string> databases;
504     int i;
505     for (i = 0; i<req->num_databaseNames; i++)
506         databases.push_back(req->databaseNames[i]);
507
508     BackendPtr b;
509     // pick up any existing backend with a database match
510     std::list<BackendPtr>::const_iterator map_it;
511     map_it = m_backend_list.begin();
512     for (; map_it != m_backend_list.end(); map_it++)
513     {
514         BackendPtr tmp = *map_it;
515         if (tmp->m_frontend_databases == databases)
516             break;
517     }
518     if (map_it != m_backend_list.end()) 
519         b = *map_it;
520     if (!b)  // no backend yet. Must create a new one
521     {
522         int error_code;
523         std::string addinfo;
524         b = init_backend(databases, package, error_code, addinfo);
525         if (!b)
526         {
527             // did not get a backend (unavailable somehow?)
528             Z_APDU *apdu =
529                 odr.create_scanResponse(
530                     apdu_req, error_code, addinfo.c_str());
531             package.response() = apdu;
532             
533             return;
534         }
535     }
536     // sending scan to backend
537     Package scan_package(b->m_backend_session, package.origin());
538
539     scan_package.copy_filter(package);
540
541 #if 0
542     const char *backend_database = b->m_backend_database.c_str();
543     req->databaseNames[0] = odr_strdup(odr, backend_database);
544 #endif
545
546     scan_package.request() = yazpp_1::GDU(apdu_req);
547     
548     scan_package.move(b->m_route);
549
550     if (scan_package.session().is_closed())
551     {
552         Z_APDU *apdu =
553             odr.create_scanResponse(
554                 apdu_req, YAZ_BIB1_DATABASE_UNAVAILABLE, 0);
555         package.response() = apdu;
556         return;
557     }
558     package.response() = scan_package.response();
559 }
560
561
562 void yf::Virt_db::add_map_db2targets(std::string db, 
563                                      std::list<std::string> targets,
564                                      std::string route)
565 {
566     m_p->m_maps[db] = Virt_db::Map(targets, route);
567 }
568
569
570 void yf::Virt_db::add_map_db2target(std::string db, 
571                                     std::string target,
572                                     std::string route)
573 {
574     std::list<std::string> targets;
575     targets.push_back(target);
576
577     m_p->m_maps[db] = Virt_db::Map(targets, route);
578 }
579
580 void yf::Virt_db::process(Package &package) const
581 {
582     FrontendPtr f = m_p->get_frontend(package);
583
584     Z_GDU *gdu = package.request().get();
585     
586     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
587         Z_APDU_initRequest && !f->m_is_virtual)
588     {
589         Z_InitRequest *req = gdu->u.z3950->u.initRequest;
590         
591         const char *vhost =
592             yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0);
593         if (!vhost)
594         {
595             yp2::odr odr;
596             Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
597             Z_InitResponse *resp = apdu->u.initResponse;
598             
599             int i;
600             static const int masks[] = {
601                 Z_Options_search,
602                 Z_Options_present,
603                 Z_Options_namedResultSets,
604                 Z_Options_scan,
605                 -1 
606             };
607             for (i = 0; masks[i] != -1; i++)
608                 if (ODR_MASK_GET(req->options, masks[i]))
609                     ODR_MASK_SET(resp->options, masks[i]);
610             
611             static const int versions[] = {
612                 Z_ProtocolVersion_1,
613                 Z_ProtocolVersion_2,
614                 Z_ProtocolVersion_3,
615                 -1
616             };
617             for (i = 0; versions[i] != -1; i++)
618                 if (ODR_MASK_GET(req->protocolVersion, versions[i]))
619                     ODR_MASK_SET(resp->protocolVersion, versions[i]);
620                 else
621                     break;
622             
623             package.response() = apdu;
624             f->m_is_virtual = true;
625         }
626         else
627             package.move();
628     }
629     else if (!f->m_is_virtual)
630         package.move();
631     else if (gdu && gdu->which == Z_GDU_Z3950)
632     {
633         Z_APDU *apdu = gdu->u.z3950;
634         if (apdu->which == Z_APDU_initRequest)
635         {
636             yp2::odr odr;
637             
638             package.response() = odr.create_close(
639                 apdu,
640                 Z_Close_protocolError,
641                 "double init");
642             
643             package.session().close();
644         }
645         else if (apdu->which == Z_APDU_searchRequest)
646         {
647             f->search(package, apdu);
648         }
649         else if (apdu->which == Z_APDU_presentRequest)
650         {
651             f->present(package, apdu);
652         }
653         else if (apdu->which == Z_APDU_scanRequest)
654         {
655             f->scan(package, apdu);
656         }
657         else
658         {
659             yp2::odr odr;
660             
661             package.response() = odr.create_close(
662                 apdu, Z_Close_protocolError,
663                 "unsupported APDU in filter_virt_db");
664             
665             package.session().close();
666         }
667     }
668     m_p->release_frontend(package);
669 }
670
671
672 void yp2::filter::Virt_db::configure(const xmlNode * ptr)
673 {
674     for (ptr = ptr->children; ptr; ptr = ptr->next)
675     {
676         if (ptr->type != XML_ELEMENT_NODE)
677             continue;
678         if (!strcmp((const char *) ptr->name, "virtual"))
679         {
680             std::string database;
681             std::list<std::string> targets;
682             xmlNode *v_node = ptr->children;
683             for (; v_node; v_node = v_node->next)
684             {
685                 if (v_node->type != XML_ELEMENT_NODE)
686                     continue;
687                 
688                 if (yp2::xml::is_element_yp2(v_node, "database"))
689                     database = yp2::xml::get_text(v_node);
690                 else if (yp2::xml::is_element_yp2(v_node, "target"))
691                     targets.push_back(yp2::xml::get_text(v_node));
692                 else
693                     throw yp2::filter::FilterException
694                         ("Bad element " 
695                          + std::string((const char *) v_node->name)
696                          + " in virtual section"
697                             );
698             }
699             std::string route = yp2::xml::get_route(ptr);
700             add_map_db2targets(database, targets, route);
701 #if 0
702             std::cout << "Add " << database << "->" << target
703                       << "," << route << "\n";
704 #endif
705         }
706         else
707         {
708             throw yp2::filter::FilterException
709                 ("Bad element " 
710                  + std::string((const char *) ptr->name)
711                  + " in virt_db filter");
712         }
713     }
714 }
715
716 static yp2::filter::Base* filter_creator()
717 {
718     return new yp2::filter::Virt_db;
719 }
720
721 extern "C" {
722     struct yp2_filter_struct yp2_filter_virt_db = {
723         0,
724         "virt_db",
725         filter_creator
726     };
727 }
728
729
730 /*
731  * Local variables:
732  * c-basic-offset: 4
733  * indent-tabs-mode: nil
734  * c-file-style: "stroustrup"
735  * End:
736  * vim: shiftwidth=4 tabstop=8 expandtab
737  */