Added yp2::util split_zurl and get_vhost_otherinfo.
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.4 2006-01-17 13:34:51 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <map>
25 #include <iostream>
26
27 namespace yf = yp2::filter;
28
29 namespace yp2 {
30     namespace filter {
31
32         struct Multi::BackendSet {
33             BackendPtr m_backend;
34             int m_count;
35             bool operator < (const BackendSet &k) const;
36         };
37         struct Multi::FrontendSet {
38             struct PresentJob {
39                 BackendPtr m_backend;
40                 int m_pos;
41                 int m_inside_pos;
42             };
43             FrontendSet(std::string setname);
44             FrontendSet();
45             ~FrontendSet();
46
47             void round_robin(int pos, int number, std::list<PresentJob> &job);
48
49             std::list<BackendSet> m_backend_sets;
50             std::string m_setname;
51         };
52         struct Multi::Backend {
53             PackagePtr m_package;
54             std::string m_backend_database;
55             std::string m_vhost;
56             std::string m_route;
57             void operator() (void);  // thread operation
58         };
59         struct Multi::Frontend {
60             Frontend(Rep *rep);
61             ~Frontend();
62             yp2::Session m_session;
63             bool m_is_multi;
64             bool m_in_use;
65             std::list<BackendPtr> m_backend_list;
66             std::map<std::string,Multi::FrontendSet> m_sets;
67
68             void multi_move(std::list<BackendPtr> &blist);
69             void init(Package &package, Z_GDU *gdu);
70             void close(Package &package);
71             void search(Package &package, Z_APDU *apdu);
72             void present(Package &package, Z_APDU *apdu);
73             Rep *m_p;
74         };            
75         struct Multi::Map {
76             Map(std::list<std::string> hosts, std::string route);
77             Map();
78             std::list<std::string> m_hosts;
79             std::string m_route;
80         };
81         class Multi::Rep {
82             friend class Multi;
83             friend class Frontend;
84             
85             FrontendPtr get_frontend(Package &package);
86             void release_frontend(Package &package);
87         private:
88             boost::mutex m_sessions_mutex;
89             std::map<std::string, Multi::Map>m_maps;
90
91             boost::mutex m_mutex;
92             boost::condition m_cond_session_ready;
93             std::map<yp2::Session, FrontendPtr> m_clients;
94         };
95     }
96 }
97
98 using namespace yp2;
99
100 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
101 {
102     return m_count < k.m_count;
103 }
104
105 yf::Multi::Frontend::Frontend(Rep *rep)
106 {
107     m_p = rep;
108     m_is_multi = false;
109 }
110
111 yf::Multi::Frontend::~Frontend()
112 {
113 }
114
115 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package)
116 {
117     boost::mutex::scoped_lock lock(m_mutex);
118
119     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
120     
121     while(true)
122     {
123         it = m_clients.find(package.session());
124         if (it == m_clients.end())
125             break;
126         
127         if (!it->second->m_in_use)
128         {
129             it->second->m_in_use = true;
130             return it->second;
131         }
132         m_cond_session_ready.wait(lock);
133     }
134     FrontendPtr f(new Frontend(this));
135     m_clients[package.session()] = f;
136     f->m_in_use = true;
137     return f;
138 }
139
140 void yf::Multi::Rep::release_frontend(Package &package)
141 {
142     boost::mutex::scoped_lock lock(m_mutex);
143     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
144     
145     it = m_clients.find(package.session());
146     if (it != m_clients.end())
147     {
148         if (package.session().is_closed())
149         {
150             it->second->close(package);
151             m_clients.erase(it);
152         }
153         else
154         {
155             it->second->m_in_use = false;
156         }
157         m_cond_session_ready.notify_all();
158     }
159 }
160
161 yf::Multi::FrontendSet::FrontendSet(std::string setname)
162     :  m_setname(setname)
163 {
164 }
165
166
167 yf::Multi::FrontendSet::FrontendSet()
168 {
169 }
170
171
172 yf::Multi::FrontendSet::~FrontendSet()
173 {
174 }
175
176 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
177     : m_hosts(hosts), m_route(route) 
178 {
179 }
180
181 yf::Multi::Map::Map()
182 {
183 }
184
185 yf::Multi::Multi() : m_p(new Multi::Rep)
186 {
187 }
188
189 yf::Multi::~Multi() {
190 }
191
192
193 void yf::Multi::add_map_host2hosts(std::string host,
194                                    std::list<std::string> hosts,
195                                    std::string route)
196 {
197     m_p->m_maps[host] = Multi::Map(hosts, route);
198 }
199
200 void yf::Multi::Backend::operator() (void) 
201 {
202     m_package->move(m_route);
203 }
204
205 void yf::Multi::Frontend::close(Package &package)
206 {
207     std::list<BackendPtr>::const_iterator bit;
208     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
209     {
210         BackendPtr b = *bit;
211
212         b->m_package->copy_filter(package);
213         b->m_package->request() = (Z_GDU *) 0;
214         b->m_package->session().close();
215         b->m_package->move(b->m_route);
216     }
217 }
218
219 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
220 {
221     std::list<BackendPtr>::const_iterator bit;
222     boost::thread_group g;
223     for (bit = blist.begin(); bit != blist.end(); bit++)
224     {
225         g.add_thread(new boost::thread(**bit));
226     }
227     g.join_all();
228 }
229
230
231 void yf::Multi::FrontendSet::round_robin(int start, int number,
232                                          std::list<PresentJob> &jobs)
233 {
234     int fetched = 0;
235     int p = 1;
236     bool eof = true;
237
238     std::list<int> pos;
239     std::list<int> inside_pos;
240     std::list<BackendSet>::const_iterator bsit;
241     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
242     {
243         pos.push_back(1);
244         inside_pos.push_back(0);
245     }
246
247     std::list<int>::iterator psit = pos.begin();
248     std::list<int>::iterator esit = inside_pos.begin();
249     bsit = m_backend_sets.begin();
250     while (fetched < number)
251     {
252         if (bsit == m_backend_sets.end())
253         {
254             psit = pos.begin();
255             esit = inside_pos.begin();
256             bsit = m_backend_sets.begin();
257             if (eof)
258                 break;
259             eof = true;
260         }
261         if (*psit <= bsit->m_count)
262         {
263             if (p >= start)
264             {
265                 PresentJob job;
266                 job.m_backend = bsit->m_backend;
267                 job.m_pos = *psit;
268                 job.m_inside_pos = *esit;
269                 jobs.push_back(job);
270                 (*esit)++;
271                 fetched++;
272             }
273             (*psit)++;
274             p++;
275             eof = false;
276         }
277         psit++;
278         esit++;
279         bsit++;
280     }
281 }
282
283 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
284 {
285     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
286
287     std::list<std::string> targets;
288
289     yp2::util::get_vhost_otherinfo(&req->otherInfo, false, targets);
290
291     if (targets.size() < 1)
292     {
293         package.move();
294         return;
295     }
296
297     std::list<std::string>::const_iterator t_it = targets.begin();
298     for (; t_it != targets.end(); t_it++)
299     {
300         Session s;
301         Backend *b = new Backend;
302         b->m_vhost = *t_it;
303
304         // b->m_route unset
305         b->m_package = PackagePtr(new Package(s, package.origin()));
306
307         m_backend_list.push_back(BackendPtr(b));
308     }
309     m_is_multi = true;
310
311     // create init request 
312     std::list<BackendPtr>::const_iterator bit;
313     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
314     {
315         yp2::odr odr;
316         BackendPtr b = *bit;
317         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
318         
319         yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
320                                  VAL_PROXY, 1, b->m_vhost.c_str());
321         
322         Z_InitRequest *req = init_apdu->u.initRequest;
323         
324         ODR_MASK_SET(req->options, Z_Options_search);
325         ODR_MASK_SET(req->options, Z_Options_present);
326         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
327         
328         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
329         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
330         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
331         
332         b->m_package->request() = init_apdu;
333
334         b->m_package->copy_filter(package);
335     }
336     multi_move(m_backend_list);
337
338     // create the frontend init response based on each backend init response
339     yp2::odr odr;
340
341     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
342     Z_InitResponse *f_resp = f_apdu->u.initResponse;
343
344     ODR_MASK_SET(f_resp->options, Z_Options_search);
345     ODR_MASK_SET(f_resp->options, Z_Options_present);
346     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
347     
348     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
349     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
350     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
351
352     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
353     {
354         PackagePtr p = (*bit)->m_package;
355         
356         if (p->session().is_closed()) // if any backend closes, close frontend
357             package.session().close();
358         Z_GDU *gdu = p->response().get();
359         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
360             Z_APDU_initResponse)
361         {
362             int i;
363             Z_APDU *b_apdu = gdu->u.z3950;
364             Z_InitResponse *b_resp = b_apdu->u.initResponse;
365
366             // common options for all backends
367             for (i = 0; i <= Z_Options_stringSchema; i++)
368             {
369                 if (!ODR_MASK_GET(b_resp->options, i))
370                     ODR_MASK_CLEAR(f_resp->options, i);
371             }
372             // common protocol version
373             for (i = 0; i <= Z_ProtocolVersion_3; i++)
374                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
375                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
376             // reject if any of the backends reject
377             if (!*b_resp->result)
378                 *f_resp->result = 0;
379         }
380         else
381         {
382             // if any target does not return init return that (close or
383             // similar )
384             package.response() = p->response();
385             return;
386         }
387     }
388     package.response() = f_apdu;
389 }
390
391 void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
392 {
393     // create search request 
394     Z_SearchRequest *req = apdu_req->u.searchRequest;
395         
396     // deal with piggy back (for now disable)
397     *req->smallSetUpperBound = 0;
398     *req->largeSetLowerBound = 1;
399     *req->mediumSetPresentNumber = 1;
400
401     int default_num_db = req->num_databaseNames;
402     char **default_db = req->databaseNames;
403
404     std::list<BackendPtr>::const_iterator bit;
405     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
406     {
407         PackagePtr p = (*bit)->m_package;
408         yp2::odr odr;
409     
410         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
411                                                 &req->num_databaseNames,
412                                                 &req->databaseNames))
413         {
414             req->num_databaseNames = default_num_db;
415             req->databaseNames = default_db;
416         }
417         p->request() = apdu_req;
418         p->copy_filter(package);
419     }
420     multi_move(m_backend_list);
421
422     // look at each response
423     FrontendSet resultSet(std::string(req->resultSetName));
424
425     int total_count = 0;
426     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
427     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
428     {
429         PackagePtr p = (*bit)->m_package;
430         
431         if (p->session().is_closed()) // if any backend closes, close frontend
432             package.session().close();
433         
434         Z_GDU *gdu = p->response().get();
435         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
436             Z_APDU_searchResponse)
437         {
438             Z_APDU *b_apdu = gdu->u.z3950;
439             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
440          
441             // see we get any errors (AKA diagnstics)
442             if (b_resp->records)
443             {
444                 if (b_resp->records->which == Z_Records_NSD
445                     || b_resp->records->which == Z_Records_multipleNSD)
446                     z_records_diag = b_resp->records;
447                 // we may set this multiple times (TOO BAD!)
448             }
449             BackendSet backendSet;
450             backendSet.m_backend = *bit;
451             backendSet.m_count = *b_resp->resultCount;
452             total_count += *b_resp->resultCount;
453             resultSet.m_backend_sets.push_back(backendSet);
454         }
455         else
456         {
457             // if any target does not return search response - return that 
458             package.response() = p->response();
459             return;
460         }
461     }
462
463     yp2::odr odr;
464     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
465     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
466
467     if (z_records_diag)
468     {
469         // search error
470         f_resp->records = z_records_diag;
471     }
472     else
473     {   // assume OK
474         m_sets[resultSet.m_setname] = resultSet;
475     }
476     *f_resp->resultCount = total_count;
477     
478     package.response() = f_apdu;
479 }
480
481 void yf::Multi::Frontend::present(Package &package, Z_APDU *apdu_req)
482 {
483     // create present request 
484     Z_PresentRequest *req = apdu_req->u.presentRequest;
485
486     Sets_it it;
487     it = m_sets.find(std::string(req->resultSetId));
488     if (it == m_sets.end())
489     {
490         yp2::odr odr;
491         Z_APDU *apdu = 
492             odr.create_presentResponse(
493                 apdu_req,
494                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
495                 req->resultSetId);
496         package.response() = apdu;
497         return;
498     }
499     std::list<Multi::FrontendSet::PresentJob> jobs;
500     int start = *req->resultSetStartPoint;
501     int number = *req->numberOfRecordsRequested;
502     it->second.round_robin(start, number, jobs);
503
504     std::list<BackendPtr> present_backend_list;
505
506     std::list<BackendSet>::const_iterator bsit;
507     bsit = it->second.m_backend_sets.begin();
508     for (; bsit != it->second.m_backend_sets.end(); bsit++)
509     {
510         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
511         int start = -1;
512         int end = -1;
513         
514         for (jit = jobs.begin(); jit != jobs.end(); jit++)
515         {
516             if (jit->m_backend == bsit->m_backend)
517             {
518                 if (start == -1 || jit->m_pos < start)
519                     start = jit->m_pos;
520                 if (end == -1 || jit->m_pos > end)
521                     end = jit->m_pos;
522             }
523         }
524         if (start != -1)
525         {
526             PackagePtr p = bsit->m_backend->m_package;
527
528             *req->resultSetStartPoint = start;
529             *req->numberOfRecordsRequested = end - start + 1;
530             
531             p->request() = apdu_req;
532             p->copy_filter(package);
533
534             present_backend_list.push_back(bsit->m_backend);
535         }
536     }
537     multi_move(present_backend_list);
538
539     // look at each response
540     Z_Records *z_records_diag = 0;
541
542     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
543     for (; pbit != present_backend_list.end(); pbit++)
544     {
545         PackagePtr p = (*pbit)->m_package;
546         
547         if (p->session().is_closed()) // if any backend closes, close frontend
548             package.session().close();
549         
550         Z_GDU *gdu = p->response().get();
551         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
552             Z_APDU_presentResponse)
553         {
554             Z_APDU *b_apdu = gdu->u.z3950;
555             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
556          
557             // see we get any errors (AKA diagnstics)
558             if (b_resp->records)
559             {
560                 if (b_resp->records->which != Z_Records_DBOSD)
561                     z_records_diag = b_resp->records;
562                 // we may set this multiple times (TOO BAD!)
563             }
564         }
565         else
566         {
567             // if any target does not return present response - return that 
568             package.response() = p->response();
569             return;
570         }
571     }
572
573     yp2::odr odr;
574     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
575     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
576
577     if (z_records_diag)
578     {
579         f_resp->records = z_records_diag;
580         *f_resp->presentStatus = Z_PresentStatus_failure;
581     }
582     else
583     {
584         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
585         Z_Records * records = f_resp->records;
586         records->which = Z_Records_DBOSD;
587         records->u.databaseOrSurDiagnostics =
588             (Z_NamePlusRecordList *)
589             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
590         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
591         nprl->num_records = jobs.size();
592         nprl->records = (Z_NamePlusRecord**)
593             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
594         int i = 0;
595         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
596         for (jit = jobs.begin(); jit != jobs.end(); jit++)
597         {
598             PackagePtr p = jit->m_backend->m_package;
599             
600             Z_GDU *gdu = p->response().get();
601             Z_APDU *b_apdu = gdu->u.z3950;
602             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
603
604             nprl->records[i++] =
605                 b_resp->records->u.databaseOrSurDiagnostics->
606                 records[jit->m_inside_pos];
607         }
608         *f_resp->nextResultSetPosition = start + i;
609         *f_resp->numberOfRecordsReturned = i;
610     }
611     package.response() = f_apdu;
612 }
613
614 void yf::Multi::process(Package &package) const
615 {
616     FrontendPtr f = m_p->get_frontend(package);
617
618     Z_GDU *gdu = package.request().get();
619     
620     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
621         Z_APDU_initRequest && !f->m_is_multi)
622     {
623         f->init(package, gdu);
624     }
625     else if (!f->m_is_multi)
626         package.move();
627     else if (gdu && gdu->which == Z_GDU_Z3950)
628     {
629         Z_APDU *apdu = gdu->u.z3950;
630         if (apdu->which == Z_APDU_initRequest)
631         {
632             yp2::odr odr;
633             
634             package.response() = odr.create_close(
635                 apdu,
636                 Z_Close_protocolError,
637                 "double init");
638             
639             package.session().close();
640         }
641         else if (apdu->which == Z_APDU_searchRequest)
642         {
643             f->search(package, apdu);
644         }
645         else if (apdu->which == Z_APDU_presentRequest)
646         {
647             f->present(package, apdu);
648         }
649         else
650         {
651             yp2::odr odr;
652             
653             package.response() = odr.create_close(
654                 apdu, Z_Close_protocolError,
655                 "unsupported APDU in filter multi");
656             
657             package.session().close();
658         }
659     }
660     m_p->release_frontend(package);
661 }
662
663 void yp2::filter::Multi::configure(const xmlNode * ptr)
664 {
665     for (ptr = ptr->children; ptr; ptr = ptr->next)
666     {
667         if (ptr->type != XML_ELEMENT_NODE)
668             continue;
669         if (!strcmp((const char *) ptr->name, "virtual"))
670         {
671             std::list<std::string> targets;
672             std::string vhost;
673             xmlNode *v_node = ptr->children;
674             for (; v_node; v_node = v_node->next)
675             {
676                 if (v_node->type != XML_ELEMENT_NODE)
677                     continue;
678                 
679                 if (yp2::xml::is_element_yp2(v_node, "vhost"))
680                     vhost = yp2::xml::get_text(v_node);
681                 else if (yp2::xml::is_element_yp2(v_node, "target"))
682                     targets.push_back(yp2::xml::get_text(v_node));
683                 else
684                     throw yp2::filter::FilterException
685                         ("Bad element " 
686                          + std::string((const char *) v_node->name)
687                          + " in virtual section"
688                             );
689             }
690             std::string route = yp2::xml::get_route(ptr);
691             add_map_host2hosts(vhost, targets, route);
692             std::list<std::string>::const_iterator it;
693             for (it = targets.begin(); it != targets.end(); it++)
694             {
695                 std::cout << "Add " << vhost << "->" << *it
696                           << "," << route << "\n";
697             }
698         }
699         else
700         {
701             throw yp2::filter::FilterException
702                 ("Bad element " 
703                  + std::string((const char *) ptr->name)
704                  + " in virt_db filter");
705         }
706     }
707 }
708
709 static yp2::filter::Base* filter_creator()
710 {
711     return new yp2::filter::Multi;
712 }
713
714 extern "C" {
715     struct yp2_filter_struct yp2_filter_multi = {
716         0,
717         "multi",
718         filter_creator
719     };
720 }
721
722
723 /*
724  * Local variables:
725  * c-basic-offset: 4
726  * indent-tabs-mode: nil
727  * c-file-style: "stroustrup"
728  * End:
729  * vim: shiftwidth=4 tabstop=8 expandtab
730  */