search+present functional for multi filter
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.2 2006-01-16 01:10:19 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <map>
25 #include <iostream>
26
27 namespace yf = yp2::filter;
28
29 namespace yp2 {
30     namespace filter {
31
32         struct Multi::BackendSet {
33             BackendPtr m_backend;
34             int m_count;
35             bool operator < (const BackendSet &k) const;
36         };
37         struct Multi::FrontendSet {
38             struct PresentJob {
39                 BackendPtr m_backend;
40                 int m_pos;
41                 int m_inside_pos;
42             };
43             FrontendSet(std::string setname);
44             FrontendSet();
45             ~FrontendSet();
46
47             void round_robin(int pos, int number, std::list<PresentJob> &job);
48
49             std::list<BackendSet> m_backend_sets;
50             std::string m_setname;
51         };
52         struct Multi::Backend {
53             PackagePtr m_package;
54             std::string m_backend_database;
55             std::string m_vhost;
56             std::string m_route;
57             void operator() (void);  // thread operation
58         };
59         struct Multi::Frontend {
60             Frontend(Rep *rep);
61             ~Frontend();
62             yp2::Session m_session;
63             bool m_is_multi;
64             bool m_in_use;
65             std::list<BackendPtr> m_backend_list;
66             std::map<std::string,Multi::FrontendSet> m_sets;
67
68             void multi_move(std::list<BackendPtr> &blist);
69             void init(Package &package, Z_GDU *gdu);
70             void close(Package &package);
71             void search(Package &package, Z_APDU *apdu);
72             void present(Package &package, Z_APDU *apdu);
73             Rep *m_p;
74         };            
75         struct Multi::Map {
76             Map(std::list<std::string> hosts, std::string route);
77             Map();
78             std::list<std::string> m_hosts;
79             std::string m_route;
80         };
81         class Multi::Rep {
82             friend class Multi;
83             friend class Frontend;
84             
85             FrontendPtr get_frontend(Package &package);
86             void release_frontend(Package &package);
87         private:
88             boost::mutex m_sessions_mutex;
89             std::map<std::string, Multi::Map>m_maps;
90
91             boost::mutex m_mutex;
92             boost::condition m_cond_session_ready;
93             std::map<yp2::Session, FrontendPtr> m_clients;
94         };
95     }
96 }
97
98 using namespace yp2;
99
100 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
101 {
102     return m_count < k.m_count;
103 }
104
105 yf::Multi::Frontend::Frontend(Rep *rep)
106 {
107     m_p = rep;
108     m_is_multi = false;
109 }
110
111 yf::Multi::Frontend::~Frontend()
112 {
113 }
114
115 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package)
116 {
117     boost::mutex::scoped_lock lock(m_mutex);
118
119     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
120     
121     while(true)
122     {
123         it = m_clients.find(package.session());
124         if (it == m_clients.end())
125             break;
126         
127         if (!it->second->m_in_use)
128         {
129             it->second->m_in_use = true;
130             return it->second;
131         }
132         m_cond_session_ready.wait(lock);
133     }
134     FrontendPtr f(new Frontend(this));
135     m_clients[package.session()] = f;
136     f->m_in_use = true;
137     return f;
138 }
139
140 void yf::Multi::Rep::release_frontend(Package &package)
141 {
142     boost::mutex::scoped_lock lock(m_mutex);
143     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
144     
145     it = m_clients.find(package.session());
146     if (it != m_clients.end())
147     {
148         if (package.session().is_closed())
149         {
150             it->second->close(package);
151             m_clients.erase(it);
152         }
153         else
154         {
155             it->second->m_in_use = false;
156         }
157         m_cond_session_ready.notify_all();
158     }
159 }
160
161 yf::Multi::FrontendSet::FrontendSet(std::string setname)
162     :  m_setname(setname)
163 {
164 }
165
166
167 yf::Multi::FrontendSet::FrontendSet()
168 {
169 }
170
171
172 yf::Multi::FrontendSet::~FrontendSet()
173 {
174 }
175
176 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
177     : m_hosts(hosts), m_route(route) 
178 {
179 }
180
181 yf::Multi::Map::Map()
182 {
183 }
184
185 yf::Multi::Multi() : m_p(new Multi::Rep)
186 {
187 }
188
189 yf::Multi::~Multi() {
190 }
191
192
193 void yf::Multi::add_map_host2hosts(std::string host,
194                                    std::list<std::string> hosts,
195                                    std::string route)
196 {
197     m_p->m_maps[host] = Multi::Map(hosts, route);
198 }
199
200 void yf::Multi::Backend::operator() (void) 
201 {
202     m_package->move(m_route);
203 }
204
205 void yf::Multi::Frontend::close(Package &package)
206 {
207     std::list<BackendPtr>::const_iterator bit;
208     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
209     {
210         BackendPtr b = *bit;
211
212         b->m_package->copy_filter(package);
213         b->m_package->request() = (Z_GDU *) 0;
214         b->m_package->session().close();
215         b->m_package->move(b->m_route);
216     }
217 }
218
219 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
220 {
221     std::list<BackendPtr>::const_iterator bit;
222     boost::thread_group g;
223     for (bit = blist.begin(); bit != blist.end(); bit++)
224     {
225         g.add_thread(new boost::thread(**bit));
226     }
227     g.join_all();
228 }
229
230
231 void yf::Multi::FrontendSet::round_robin(int start, int number,
232                                          std::list<PresentJob> &jobs)
233 {
234     int fetched = 0;
235     int p = 1;
236     bool eof = true;
237
238     std::list<int> pos;
239     std::list<int> inside_pos;
240     std::list<BackendSet>::const_iterator bsit;
241     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
242     {
243         pos.push_back(1);
244         inside_pos.push_back(0);
245     }
246
247     std::list<int>::iterator psit = pos.begin();
248     std::list<int>::iterator esit = inside_pos.begin();
249     bsit = m_backend_sets.begin();
250     while (fetched < number)
251     {
252         if (bsit == m_backend_sets.end())
253         {
254             psit = pos.begin();
255             esit = inside_pos.begin();
256             bsit = m_backend_sets.begin();
257             if (eof)
258                 break;
259             eof = true;
260         }
261         if (*psit <= bsit->m_count)
262         {
263             if (p >= start)
264             {
265                 PresentJob job;
266                 job.m_backend = bsit->m_backend;
267                 job.m_pos = *psit;
268                 job.m_inside_pos = *esit;
269                 jobs.push_back(job);
270                 (*esit)++;
271                 fetched++;
272             }
273             (*psit)++;
274             p++;
275             eof = false;
276         }
277         psit++;
278         esit++;
279         bsit++;
280     }
281 }
282
283 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
284 {
285     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
286
287     // empty or non-existang vhost is the same..
288     const char *vhost_cstr =
289         yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0);
290     std::string vhost;
291     if (vhost_cstr)
292         vhost = std::string(vhost_cstr);
293
294     std::map<std::string, Map>::const_iterator it;
295     it = m_p->m_maps.find(std::string(vhost));
296     if (it == m_p->m_maps.end())
297     {
298         // might return diagnostics if no match
299         package.move();
300         return;
301     }
302     std::list<std::string>::const_iterator hit = it->second.m_hosts.begin();
303     for (; hit != it->second.m_hosts.end(); hit++)
304     {
305         Session s;
306         Backend *b = new Backend;
307         b->m_vhost = *hit;
308         b->m_route = it->second.m_route;
309         b->m_package = PackagePtr(new Package(s, package.origin()));
310
311         m_backend_list.push_back(BackendPtr(b));
312     }
313     // we're going to deal with this for sure..
314
315     m_is_multi = true;
316
317     // create init request 
318     std::list<BackendPtr>::const_iterator bit;
319     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
320     {
321         yp2::odr odr;
322         BackendPtr b = *bit;
323         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
324         
325         yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
326                                  VAL_PROXY, 1, b->m_vhost.c_str());
327         
328         Z_InitRequest *req = init_apdu->u.initRequest;
329         
330         ODR_MASK_SET(req->options, Z_Options_search);
331         ODR_MASK_SET(req->options, Z_Options_present);
332         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
333         
334         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
335         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
336         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
337         
338         b->m_package->request() = init_apdu;
339
340         b->m_package->copy_filter(package);
341     }
342     multi_move(m_backend_list);
343
344     // create the frontend init response based on each backend init response
345     yp2::odr odr;
346
347     int i;
348
349     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
350     Z_InitResponse *f_resp = f_apdu->u.initResponse;
351
352     ODR_MASK_SET(f_resp->options, Z_Options_search);
353     ODR_MASK_SET(f_resp->options, Z_Options_present);
354     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
355     
356     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
357     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
358     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
359
360     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
361     {
362         PackagePtr p = (*bit)->m_package;
363         
364         if (p->session().is_closed()) // if any backend closes, close frontend
365             package.session().close();
366         Z_GDU *gdu = p->response().get();
367         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
368             Z_APDU_initResponse)
369         {
370             Z_APDU *b_apdu = gdu->u.z3950;
371             Z_InitResponse *b_resp = b_apdu->u.initResponse;
372
373             // common options for all backends
374             for (i = 0; i <= Z_Options_stringSchema; i++)
375             {
376                 if (!ODR_MASK_GET(b_resp->options, i))
377                     ODR_MASK_CLEAR(f_resp->options, i);
378             }
379             // common protocol version
380             for (i = 0; i <= Z_ProtocolVersion_3; i++)
381                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
382                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
383             // reject if any of the backends reject
384             if (!*b_resp->result)
385                 *f_resp->result = 0;
386         }
387         else
388         {
389             // if any target does not return init return that (close or
390             // similar )
391             package.response() = p->response();
392             return;
393         }
394     }
395     package.response() = f_apdu;
396 }
397
398 void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
399 {
400     // create search request 
401     Z_SearchRequest *req = apdu_req->u.searchRequest;
402
403     // deal with piggy back (for now disable)
404     *req->smallSetUpperBound = 0;
405     *req->largeSetLowerBound = 1;
406     *req->mediumSetPresentNumber = 1;
407
408     std::list<BackendPtr>::const_iterator bit;
409     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
410     {
411         PackagePtr p = (*bit)->m_package;
412         // we don't modify database name yet!
413
414         p->request() = apdu_req;
415         p->copy_filter(package);
416     }
417     multi_move(m_backend_list);
418
419     // look at each response
420     FrontendSet resultSet(std::string(req->resultSetName));
421
422     int total_count = 0;
423     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
424     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
425     {
426         PackagePtr p = (*bit)->m_package;
427         
428         if (p->session().is_closed()) // if any backend closes, close frontend
429             package.session().close();
430         
431         Z_GDU *gdu = p->response().get();
432         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
433             Z_APDU_searchResponse)
434         {
435             Z_APDU *b_apdu = gdu->u.z3950;
436             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
437          
438             // see we get any errors (AKA diagnstics)
439             if (b_resp->records)
440             {
441                 if (b_resp->records->which == Z_Records_NSD
442                     || b_resp->records->which == Z_Records_multipleNSD)
443                     z_records_diag = b_resp->records;
444                 // we may set this multiple times (TOO BAD!)
445             }
446             BackendSet backendSet;
447             backendSet.m_backend = *bit;
448             backendSet.m_count = *b_resp->resultCount;
449             total_count += *b_resp->resultCount;
450             resultSet.m_backend_sets.push_back(backendSet);
451         }
452         else
453         {
454             // if any target does not return search response - return that 
455             package.response() = p->response();
456             return;
457         }
458     }
459
460     yp2::odr odr;
461     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
462     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
463
464     if (z_records_diag)
465     {
466         // search error
467         f_resp->records = z_records_diag;
468     }
469     else
470     {   // assume OK
471         m_sets[resultSet.m_setname] = resultSet;
472     }
473     *f_resp->resultCount = total_count;
474     
475     package.response() = f_apdu;
476 }
477
478 void yf::Multi::Frontend::present(Package &package, Z_APDU *apdu_req)
479 {
480     // create present request 
481     Z_PresentRequest *req = apdu_req->u.presentRequest;
482
483     Sets_it it;
484     it = m_sets.find(std::string(req->resultSetId));
485     if (it == m_sets.end())
486     {
487         yp2::odr odr;
488         Z_APDU *apdu = 
489             odr.create_presentResponse(
490                 apdu_req,
491                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
492                 req->resultSetId);
493         package.response() = apdu;
494         return;
495     }
496     std::list<Multi::FrontendSet::PresentJob> jobs;
497     int start = *req->resultSetStartPoint;
498     int number = *req->numberOfRecordsRequested;
499     it->second.round_robin(start, number, jobs);
500
501     std::list<BackendPtr> present_backend_list;
502
503     std::list<BackendSet>::const_iterator bsit;
504     bsit = it->second.m_backend_sets.begin();
505     for (; bsit != it->second.m_backend_sets.end(); bsit++)
506     {
507         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
508         int start = -1;
509         int end = -1;
510         
511         for (jit = jobs.begin(); jit != jobs.end(); jit++)
512         {
513             if (jit->m_backend == bsit->m_backend)
514             {
515                 if (start == -1 || jit->m_pos < start)
516                     start = jit->m_pos;
517                 if (end == -1 || jit->m_pos > end)
518                     end = jit->m_pos;
519             }
520         }
521         if (start != -1)
522         {
523             PackagePtr p = bsit->m_backend->m_package;
524
525             *req->resultSetStartPoint = start;
526             *req->numberOfRecordsRequested = end - start + 1;
527             
528             p->request() = apdu_req;
529             p->copy_filter(package);
530
531             present_backend_list.push_back(bsit->m_backend);
532         }
533     }
534     multi_move(present_backend_list);
535
536     // look at each response
537     Z_Records *z_records_diag = 0;
538
539     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
540     for (; pbit != present_backend_list.end(); pbit++)
541     {
542         PackagePtr p = (*pbit)->m_package;
543         
544         if (p->session().is_closed()) // if any backend closes, close frontend
545             package.session().close();
546         
547         Z_GDU *gdu = p->response().get();
548         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
549             Z_APDU_presentResponse)
550         {
551             Z_APDU *b_apdu = gdu->u.z3950;
552             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
553          
554             // see we get any errors (AKA diagnstics)
555             if (b_resp->records)
556             {
557                 if (b_resp->records->which != Z_Records_DBOSD)
558                     z_records_diag = b_resp->records;
559                 // we may set this multiple times (TOO BAD!)
560             }
561         }
562         else
563         {
564             // if any target does not return present response - return that 
565             package.response() = p->response();
566             return;
567         }
568     }
569
570     yp2::odr odr;
571     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
572     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
573
574     if (z_records_diag)
575     {
576         f_resp->records = z_records_diag;
577         *f_resp->presentStatus = Z_PresentStatus_failure;
578     }
579     else
580     {
581         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
582         Z_Records * records = f_resp->records;
583         records->which = Z_Records_DBOSD;
584         records->u.databaseOrSurDiagnostics =
585             (Z_NamePlusRecordList *)
586             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
587         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
588         nprl->num_records = jobs.size();
589         nprl->records = (Z_NamePlusRecord**)
590             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
591         int i = 0;
592         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
593         for (jit = jobs.begin(); jit != jobs.end(); jit++)
594         {
595             PackagePtr p = jit->m_backend->m_package;
596             
597             Z_GDU *gdu = p->response().get();
598             Z_APDU *b_apdu = gdu->u.z3950;
599             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
600
601             nprl->records[i++] =
602                 b_resp->records->u.databaseOrSurDiagnostics->
603                 records[jit->m_inside_pos];
604         }
605         *f_resp->nextResultSetPosition = start + i;
606         *f_resp->numberOfRecordsReturned = i;
607     }
608     package.response() = f_apdu;
609 }
610
611 void yf::Multi::process(Package &package) const
612 {
613     FrontendPtr f = m_p->get_frontend(package);
614
615     Z_GDU *gdu = package.request().get();
616     
617     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
618         Z_APDU_initRequest && !f->m_is_multi)
619     {
620         f->init(package, gdu);
621     }
622     else if (!f->m_is_multi)
623         package.move();
624     else if (gdu && gdu->which == Z_GDU_Z3950)
625     {
626         Z_APDU *apdu = gdu->u.z3950;
627         if (apdu->which == Z_APDU_initRequest)
628         {
629             yp2::odr odr;
630             
631             package.response() = odr.create_close(
632                 apdu,
633                 Z_Close_protocolError,
634                 "double init");
635             
636             package.session().close();
637         }
638         else if (apdu->which == Z_APDU_searchRequest)
639         {
640             f->search(package, apdu);
641         }
642         else if (apdu->which == Z_APDU_presentRequest)
643         {
644             f->present(package, apdu);
645         }
646         else
647         {
648             yp2::odr odr;
649             
650             package.response() = odr.create_close(
651                 apdu, Z_Close_protocolError,
652                 "unsupported APDU in filter multi");
653             
654             package.session().close();
655         }
656     }
657     m_p->release_frontend(package);
658 }
659
660 void yp2::filter::Multi::configure(const xmlNode * ptr)
661 {
662     for (ptr = ptr->children; ptr; ptr = ptr->next)
663     {
664         if (ptr->type != XML_ELEMENT_NODE)
665             continue;
666         if (!strcmp((const char *) ptr->name, "virtual"))
667         {
668             std::list<std::string> targets;
669             std::string vhost;
670             xmlNode *v_node = ptr->children;
671             for (; v_node; v_node = v_node->next)
672             {
673                 if (v_node->type != XML_ELEMENT_NODE)
674                     continue;
675                 
676                 if (yp2::xml::is_element_yp2(v_node, "vhost"))
677                     vhost = yp2::xml::get_text(v_node);
678                 else if (yp2::xml::is_element_yp2(v_node, "target"))
679                     targets.push_back(yp2::xml::get_text(v_node));
680                 else
681                     throw yp2::filter::FilterException
682                         ("Bad element " 
683                          + std::string((const char *) v_node->name)
684                          + " in virtual section"
685                             );
686             }
687             std::string route = yp2::xml::get_route(ptr);
688             add_map_host2hosts(vhost, targets, route);
689             std::list<std::string>::const_iterator it;
690             for (it = targets.begin(); it != targets.end(); it++)
691             {
692                 std::cout << "Add " << vhost << "->" << *it
693                           << "," << route << "\n";
694             }
695         }
696         else
697         {
698             throw yp2::filter::FilterException
699                 ("Bad element " 
700                  + std::string((const char *) ptr->name)
701                  + " in virt_db filter");
702         }
703     }
704 }
705
706 static yp2::filter::Base* filter_creator()
707 {
708     return new yp2::filter::Multi;
709 }
710
711 extern "C" {
712     struct yp2_filter_struct yp2_filter_multi = {
713         0,
714         "multi",
715         filter_creator
716     };
717 }
718
719
720 /*
721  * Local variables:
722  * c-basic-offset: 4
723  * indent-tabs-mode: nil
724  * c-file-style: "stroustrup"
725  * End:
726  * vim: shiftwidth=4 tabstop=8 expandtab
727  */