Filter virt_db allows multiple databases.. Each of these is
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.3 2006-01-16 15:51:56 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <map>
25 #include <iostream>
26
27 namespace yf = yp2::filter;
28
29 namespace yp2 {
30     namespace filter {
31
32         struct Multi::BackendSet {
33             BackendPtr m_backend;
34             int m_count;
35             bool operator < (const BackendSet &k) const;
36         };
37         struct Multi::FrontendSet {
38             struct PresentJob {
39                 BackendPtr m_backend;
40                 int m_pos;
41                 int m_inside_pos;
42             };
43             FrontendSet(std::string setname);
44             FrontendSet();
45             ~FrontendSet();
46
47             void round_robin(int pos, int number, std::list<PresentJob> &job);
48
49             std::list<BackendSet> m_backend_sets;
50             std::string m_setname;
51         };
52         struct Multi::Backend {
53             PackagePtr m_package;
54             std::string m_backend_database;
55             std::string m_vhost;
56             std::string m_route;
57             void operator() (void);  // thread operation
58         };
59         struct Multi::Frontend {
60             Frontend(Rep *rep);
61             ~Frontend();
62             yp2::Session m_session;
63             bool m_is_multi;
64             bool m_in_use;
65             std::list<BackendPtr> m_backend_list;
66             std::map<std::string,Multi::FrontendSet> m_sets;
67
68             void multi_move(std::list<BackendPtr> &blist);
69             void init(Package &package, Z_GDU *gdu);
70             void close(Package &package);
71             void search(Package &package, Z_APDU *apdu);
72             void present(Package &package, Z_APDU *apdu);
73             Rep *m_p;
74         };            
75         struct Multi::Map {
76             Map(std::list<std::string> hosts, std::string route);
77             Map();
78             std::list<std::string> m_hosts;
79             std::string m_route;
80         };
81         class Multi::Rep {
82             friend class Multi;
83             friend class Frontend;
84             
85             FrontendPtr get_frontend(Package &package);
86             void release_frontend(Package &package);
87         private:
88             boost::mutex m_sessions_mutex;
89             std::map<std::string, Multi::Map>m_maps;
90
91             boost::mutex m_mutex;
92             boost::condition m_cond_session_ready;
93             std::map<yp2::Session, FrontendPtr> m_clients;
94         };
95     }
96 }
97
98 using namespace yp2;
99
100 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
101 {
102     return m_count < k.m_count;
103 }
104
105 yf::Multi::Frontend::Frontend(Rep *rep)
106 {
107     m_p = rep;
108     m_is_multi = false;
109 }
110
111 yf::Multi::Frontend::~Frontend()
112 {
113 }
114
115 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package)
116 {
117     boost::mutex::scoped_lock lock(m_mutex);
118
119     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
120     
121     while(true)
122     {
123         it = m_clients.find(package.session());
124         if (it == m_clients.end())
125             break;
126         
127         if (!it->second->m_in_use)
128         {
129             it->second->m_in_use = true;
130             return it->second;
131         }
132         m_cond_session_ready.wait(lock);
133     }
134     FrontendPtr f(new Frontend(this));
135     m_clients[package.session()] = f;
136     f->m_in_use = true;
137     return f;
138 }
139
140 void yf::Multi::Rep::release_frontend(Package &package)
141 {
142     boost::mutex::scoped_lock lock(m_mutex);
143     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
144     
145     it = m_clients.find(package.session());
146     if (it != m_clients.end())
147     {
148         if (package.session().is_closed())
149         {
150             it->second->close(package);
151             m_clients.erase(it);
152         }
153         else
154         {
155             it->second->m_in_use = false;
156         }
157         m_cond_session_ready.notify_all();
158     }
159 }
160
161 yf::Multi::FrontendSet::FrontendSet(std::string setname)
162     :  m_setname(setname)
163 {
164 }
165
166
167 yf::Multi::FrontendSet::FrontendSet()
168 {
169 }
170
171
172 yf::Multi::FrontendSet::~FrontendSet()
173 {
174 }
175
176 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
177     : m_hosts(hosts), m_route(route) 
178 {
179 }
180
181 yf::Multi::Map::Map()
182 {
183 }
184
185 yf::Multi::Multi() : m_p(new Multi::Rep)
186 {
187 }
188
189 yf::Multi::~Multi() {
190 }
191
192
193 void yf::Multi::add_map_host2hosts(std::string host,
194                                    std::list<std::string> hosts,
195                                    std::string route)
196 {
197     m_p->m_maps[host] = Multi::Map(hosts, route);
198 }
199
200 void yf::Multi::Backend::operator() (void) 
201 {
202     m_package->move(m_route);
203 }
204
205 void yf::Multi::Frontend::close(Package &package)
206 {
207     std::list<BackendPtr>::const_iterator bit;
208     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
209     {
210         BackendPtr b = *bit;
211
212         b->m_package->copy_filter(package);
213         b->m_package->request() = (Z_GDU *) 0;
214         b->m_package->session().close();
215         b->m_package->move(b->m_route);
216     }
217 }
218
219 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
220 {
221     std::list<BackendPtr>::const_iterator bit;
222     boost::thread_group g;
223     for (bit = blist.begin(); bit != blist.end(); bit++)
224     {
225         g.add_thread(new boost::thread(**bit));
226     }
227     g.join_all();
228 }
229
230
231 void yf::Multi::FrontendSet::round_robin(int start, int number,
232                                          std::list<PresentJob> &jobs)
233 {
234     int fetched = 0;
235     int p = 1;
236     bool eof = true;
237
238     std::list<int> pos;
239     std::list<int> inside_pos;
240     std::list<BackendSet>::const_iterator bsit;
241     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
242     {
243         pos.push_back(1);
244         inside_pos.push_back(0);
245     }
246
247     std::list<int>::iterator psit = pos.begin();
248     std::list<int>::iterator esit = inside_pos.begin();
249     bsit = m_backend_sets.begin();
250     while (fetched < number)
251     {
252         if (bsit == m_backend_sets.end())
253         {
254             psit = pos.begin();
255             esit = inside_pos.begin();
256             bsit = m_backend_sets.begin();
257             if (eof)
258                 break;
259             eof = true;
260         }
261         if (*psit <= bsit->m_count)
262         {
263             if (p >= start)
264             {
265                 PresentJob job;
266                 job.m_backend = bsit->m_backend;
267                 job.m_pos = *psit;
268                 job.m_inside_pos = *esit;
269                 jobs.push_back(job);
270                 (*esit)++;
271                 fetched++;
272             }
273             (*psit)++;
274             p++;
275             eof = false;
276         }
277         psit++;
278         esit++;
279         bsit++;
280     }
281 }
282
283 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
284 {
285     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
286
287     std::list<std::string> targets;
288
289     int no_targets = 0;
290     while (true)
291     {
292         const char *vhost_cstr =
293             yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, no_targets+1,
294                                      0);
295         if (!vhost_cstr)
296             break;
297         no_targets++;
298         if (no_targets > 1000)
299             return;
300         targets.push_back(vhost_cstr);
301     }
302     if (no_targets < 2)
303     {
304         package.move();
305         return;
306     }
307
308     std::list<std::string>::const_iterator t_it = targets.begin();
309     for (; t_it != targets.end(); t_it++)
310     {
311         Session s;
312         Backend *b = new Backend;
313         b->m_vhost = *t_it;
314
315         // b->m_route unset
316         b->m_package = PackagePtr(new Package(s, package.origin()));
317
318         m_backend_list.push_back(BackendPtr(b));
319     }
320     m_is_multi = true;
321
322     // create init request 
323     std::list<BackendPtr>::const_iterator bit;
324     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
325     {
326         yp2::odr odr;
327         BackendPtr b = *bit;
328         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
329         
330         yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
331                                  VAL_PROXY, 1, b->m_vhost.c_str());
332         
333         Z_InitRequest *req = init_apdu->u.initRequest;
334         
335         ODR_MASK_SET(req->options, Z_Options_search);
336         ODR_MASK_SET(req->options, Z_Options_present);
337         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
338         
339         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
340         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
341         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
342         
343         b->m_package->request() = init_apdu;
344
345         b->m_package->copy_filter(package);
346     }
347     multi_move(m_backend_list);
348
349     // create the frontend init response based on each backend init response
350     yp2::odr odr;
351
352     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
353     Z_InitResponse *f_resp = f_apdu->u.initResponse;
354
355     ODR_MASK_SET(f_resp->options, Z_Options_search);
356     ODR_MASK_SET(f_resp->options, Z_Options_present);
357     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
358     
359     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
360     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
361     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
362
363     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
364     {
365         PackagePtr p = (*bit)->m_package;
366         
367         if (p->session().is_closed()) // if any backend closes, close frontend
368             package.session().close();
369         Z_GDU *gdu = p->response().get();
370         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
371             Z_APDU_initResponse)
372         {
373             int i;
374             Z_APDU *b_apdu = gdu->u.z3950;
375             Z_InitResponse *b_resp = b_apdu->u.initResponse;
376
377             // common options for all backends
378             for (i = 0; i <= Z_Options_stringSchema; i++)
379             {
380                 if (!ODR_MASK_GET(b_resp->options, i))
381                     ODR_MASK_CLEAR(f_resp->options, i);
382             }
383             // common protocol version
384             for (i = 0; i <= Z_ProtocolVersion_3; i++)
385                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
386                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
387             // reject if any of the backends reject
388             if (!*b_resp->result)
389                 *f_resp->result = 0;
390         }
391         else
392         {
393             // if any target does not return init return that (close or
394             // similar )
395             package.response() = p->response();
396             return;
397         }
398     }
399     package.response() = f_apdu;
400 }
401
402 void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
403 {
404     // create search request 
405     Z_SearchRequest *req = apdu_req->u.searchRequest;
406         
407     // deal with piggy back (for now disable)
408     *req->smallSetUpperBound = 0;
409     *req->largeSetLowerBound = 1;
410     *req->mediumSetPresentNumber = 1;
411
412     int default_num_db = req->num_databaseNames;
413     char **default_db = req->databaseNames;
414
415     std::list<BackendPtr>::const_iterator bit;
416     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
417     {
418         PackagePtr p = (*bit)->m_package;
419         yp2::odr odr;
420     
421         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
422                                                 &req->num_databaseNames,
423                                                 &req->databaseNames))
424         {
425             req->num_databaseNames = default_num_db;
426             req->databaseNames = default_db;
427         }
428         p->request() = apdu_req;
429         p->copy_filter(package);
430     }
431     multi_move(m_backend_list);
432
433     // look at each response
434     FrontendSet resultSet(std::string(req->resultSetName));
435
436     int total_count = 0;
437     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
438     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
439     {
440         PackagePtr p = (*bit)->m_package;
441         
442         if (p->session().is_closed()) // if any backend closes, close frontend
443             package.session().close();
444         
445         Z_GDU *gdu = p->response().get();
446         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
447             Z_APDU_searchResponse)
448         {
449             Z_APDU *b_apdu = gdu->u.z3950;
450             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
451          
452             // see we get any errors (AKA diagnstics)
453             if (b_resp->records)
454             {
455                 if (b_resp->records->which == Z_Records_NSD
456                     || b_resp->records->which == Z_Records_multipleNSD)
457                     z_records_diag = b_resp->records;
458                 // we may set this multiple times (TOO BAD!)
459             }
460             BackendSet backendSet;
461             backendSet.m_backend = *bit;
462             backendSet.m_count = *b_resp->resultCount;
463             total_count += *b_resp->resultCount;
464             resultSet.m_backend_sets.push_back(backendSet);
465         }
466         else
467         {
468             // if any target does not return search response - return that 
469             package.response() = p->response();
470             return;
471         }
472     }
473
474     yp2::odr odr;
475     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
476     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
477
478     if (z_records_diag)
479     {
480         // search error
481         f_resp->records = z_records_diag;
482     }
483     else
484     {   // assume OK
485         m_sets[resultSet.m_setname] = resultSet;
486     }
487     *f_resp->resultCount = total_count;
488     
489     package.response() = f_apdu;
490 }
491
492 void yf::Multi::Frontend::present(Package &package, Z_APDU *apdu_req)
493 {
494     // create present request 
495     Z_PresentRequest *req = apdu_req->u.presentRequest;
496
497     Sets_it it;
498     it = m_sets.find(std::string(req->resultSetId));
499     if (it == m_sets.end())
500     {
501         yp2::odr odr;
502         Z_APDU *apdu = 
503             odr.create_presentResponse(
504                 apdu_req,
505                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
506                 req->resultSetId);
507         package.response() = apdu;
508         return;
509     }
510     std::list<Multi::FrontendSet::PresentJob> jobs;
511     int start = *req->resultSetStartPoint;
512     int number = *req->numberOfRecordsRequested;
513     it->second.round_robin(start, number, jobs);
514
515     std::list<BackendPtr> present_backend_list;
516
517     std::list<BackendSet>::const_iterator bsit;
518     bsit = it->second.m_backend_sets.begin();
519     for (; bsit != it->second.m_backend_sets.end(); bsit++)
520     {
521         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
522         int start = -1;
523         int end = -1;
524         
525         for (jit = jobs.begin(); jit != jobs.end(); jit++)
526         {
527             if (jit->m_backend == bsit->m_backend)
528             {
529                 if (start == -1 || jit->m_pos < start)
530                     start = jit->m_pos;
531                 if (end == -1 || jit->m_pos > end)
532                     end = jit->m_pos;
533             }
534         }
535         if (start != -1)
536         {
537             PackagePtr p = bsit->m_backend->m_package;
538
539             *req->resultSetStartPoint = start;
540             *req->numberOfRecordsRequested = end - start + 1;
541             
542             p->request() = apdu_req;
543             p->copy_filter(package);
544
545             present_backend_list.push_back(bsit->m_backend);
546         }
547     }
548     multi_move(present_backend_list);
549
550     // look at each response
551     Z_Records *z_records_diag = 0;
552
553     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
554     for (; pbit != present_backend_list.end(); pbit++)
555     {
556         PackagePtr p = (*pbit)->m_package;
557         
558         if (p->session().is_closed()) // if any backend closes, close frontend
559             package.session().close();
560         
561         Z_GDU *gdu = p->response().get();
562         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
563             Z_APDU_presentResponse)
564         {
565             Z_APDU *b_apdu = gdu->u.z3950;
566             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
567          
568             // see we get any errors (AKA diagnstics)
569             if (b_resp->records)
570             {
571                 if (b_resp->records->which != Z_Records_DBOSD)
572                     z_records_diag = b_resp->records;
573                 // we may set this multiple times (TOO BAD!)
574             }
575         }
576         else
577         {
578             // if any target does not return present response - return that 
579             package.response() = p->response();
580             return;
581         }
582     }
583
584     yp2::odr odr;
585     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
586     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
587
588     if (z_records_diag)
589     {
590         f_resp->records = z_records_diag;
591         *f_resp->presentStatus = Z_PresentStatus_failure;
592     }
593     else
594     {
595         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
596         Z_Records * records = f_resp->records;
597         records->which = Z_Records_DBOSD;
598         records->u.databaseOrSurDiagnostics =
599             (Z_NamePlusRecordList *)
600             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
601         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
602         nprl->num_records = jobs.size();
603         nprl->records = (Z_NamePlusRecord**)
604             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
605         int i = 0;
606         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
607         for (jit = jobs.begin(); jit != jobs.end(); jit++)
608         {
609             PackagePtr p = jit->m_backend->m_package;
610             
611             Z_GDU *gdu = p->response().get();
612             Z_APDU *b_apdu = gdu->u.z3950;
613             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
614
615             nprl->records[i++] =
616                 b_resp->records->u.databaseOrSurDiagnostics->
617                 records[jit->m_inside_pos];
618         }
619         *f_resp->nextResultSetPosition = start + i;
620         *f_resp->numberOfRecordsReturned = i;
621     }
622     package.response() = f_apdu;
623 }
624
625 void yf::Multi::process(Package &package) const
626 {
627     FrontendPtr f = m_p->get_frontend(package);
628
629     Z_GDU *gdu = package.request().get();
630     
631     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
632         Z_APDU_initRequest && !f->m_is_multi)
633     {
634         f->init(package, gdu);
635     }
636     else if (!f->m_is_multi)
637         package.move();
638     else if (gdu && gdu->which == Z_GDU_Z3950)
639     {
640         Z_APDU *apdu = gdu->u.z3950;
641         if (apdu->which == Z_APDU_initRequest)
642         {
643             yp2::odr odr;
644             
645             package.response() = odr.create_close(
646                 apdu,
647                 Z_Close_protocolError,
648                 "double init");
649             
650             package.session().close();
651         }
652         else if (apdu->which == Z_APDU_searchRequest)
653         {
654             f->search(package, apdu);
655         }
656         else if (apdu->which == Z_APDU_presentRequest)
657         {
658             f->present(package, apdu);
659         }
660         else
661         {
662             yp2::odr odr;
663             
664             package.response() = odr.create_close(
665                 apdu, Z_Close_protocolError,
666                 "unsupported APDU in filter multi");
667             
668             package.session().close();
669         }
670     }
671     m_p->release_frontend(package);
672 }
673
674 void yp2::filter::Multi::configure(const xmlNode * ptr)
675 {
676     for (ptr = ptr->children; ptr; ptr = ptr->next)
677     {
678         if (ptr->type != XML_ELEMENT_NODE)
679             continue;
680         if (!strcmp((const char *) ptr->name, "virtual"))
681         {
682             std::list<std::string> targets;
683             std::string vhost;
684             xmlNode *v_node = ptr->children;
685             for (; v_node; v_node = v_node->next)
686             {
687                 if (v_node->type != XML_ELEMENT_NODE)
688                     continue;
689                 
690                 if (yp2::xml::is_element_yp2(v_node, "vhost"))
691                     vhost = yp2::xml::get_text(v_node);
692                 else if (yp2::xml::is_element_yp2(v_node, "target"))
693                     targets.push_back(yp2::xml::get_text(v_node));
694                 else
695                     throw yp2::filter::FilterException
696                         ("Bad element " 
697                          + std::string((const char *) v_node->name)
698                          + " in virtual section"
699                             );
700             }
701             std::string route = yp2::xml::get_route(ptr);
702             add_map_host2hosts(vhost, targets, route);
703             std::list<std::string>::const_iterator it;
704             for (it = targets.begin(); it != targets.end(); it++)
705             {
706                 std::cout << "Add " << vhost << "->" << *it
707                           << "," << route << "\n";
708             }
709         }
710         else
711         {
712             throw yp2::filter::FilterException
713                 ("Bad element " 
714                  + std::string((const char *) ptr->name)
715                  + " in virt_db filter");
716         }
717     }
718 }
719
720 static yp2::filter::Base* filter_creator()
721 {
722     return new yp2::filter::Multi;
723 }
724
725 extern "C" {
726     struct yp2_filter_struct yp2_filter_multi = {
727         0,
728         "multi",
729         filter_creator
730     };
731 }
732
733
734 /*
735  * Local variables:
736  * c-basic-offset: 4
737  * indent-tabs-mode: nil
738  * c-file-style: "stroustrup"
739  * End:
740  * vim: shiftwidth=4 tabstop=8 expandtab
741  */