Added filter multi. init+search operational
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.1 2006-01-15 20:03:14 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <map>
25 #include <iostream>
26
27 namespace yf = yp2::filter;
28
29 namespace yp2 {
30     namespace filter {
31
32         struct Multi::BackendSet {
33             BackendPtr m_backend;
34             long size;
35         };
36         struct Multi::Set {
37             Set(std::string setname);
38             Set();
39             ~Set();
40
41             std::list<BackendSet> m_backend_sets;
42             std::string m_setname;
43         };
44         struct Multi::Backend {
45             PackagePtr m_package;
46             std::string m_backend_database;
47             std::string m_vhost;
48             std::string m_route;
49             void operator() (void);  // thread operation
50         };
51         struct Multi::Frontend {
52             Frontend(Rep *rep);
53             ~Frontend();
54             yp2::Session m_session;
55             bool m_is_multi;
56             bool m_in_use;
57             std::list<BackendPtr> m_backend_list;
58             std::map<std::string,Multi::Set> m_sets;
59             void multi_move();
60             void init(Package &package, Z_GDU *gdu);
61             void close(Package &package);
62             void search(Package &package, Z_APDU *apdu);
63 #if 0
64             void present(Package &package, Z_APDU *apdu);
65             void scan(Package &package, Z_APDU *apdu);
66 #endif
67             Rep *m_p;
68         };            
69         struct Multi::Map {
70             Map(std::list<std::string> hosts, std::string route);
71             Map();
72             std::list<std::string> m_hosts;
73             std::string m_route;
74         };
75         class Multi::Rep {
76             friend class Multi;
77             friend class Frontend;
78             
79             FrontendPtr get_frontend(Package &package);
80             void release_frontend(Package &package);
81         private:
82             boost::mutex m_sessions_mutex;
83             std::map<std::string, Multi::Map>m_maps;
84
85             boost::mutex m_mutex;
86             boost::condition m_cond_session_ready;
87             std::map<yp2::Session, FrontendPtr> m_clients;
88         };
89     }
90 }
91
92 using namespace yp2;
93
94 yf::Multi::Frontend::Frontend(Rep *rep)
95 {
96     m_p = rep;
97     m_is_multi = false;
98 }
99
100 yf::Multi::Frontend::~Frontend()
101 {
102 }
103
104 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package)
105 {
106     boost::mutex::scoped_lock lock(m_mutex);
107
108     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
109     
110     while(true)
111     {
112         it = m_clients.find(package.session());
113         if (it == m_clients.end())
114             break;
115         
116         if (!it->second->m_in_use)
117         {
118             it->second->m_in_use = true;
119             return it->second;
120         }
121         m_cond_session_ready.wait(lock);
122     }
123     FrontendPtr f(new Frontend(this));
124     m_clients[package.session()] = f;
125     f->m_in_use = true;
126     return f;
127 }
128
129 void yf::Multi::Rep::release_frontend(Package &package)
130 {
131     boost::mutex::scoped_lock lock(m_mutex);
132     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
133     
134     it = m_clients.find(package.session());
135     if (it != m_clients.end())
136     {
137         if (package.session().is_closed())
138         {
139             it->second->close(package);
140             m_clients.erase(it);
141         }
142         else
143         {
144             it->second->m_in_use = false;
145         }
146         m_cond_session_ready.notify_all();
147     }
148 }
149
150 yf::Multi::Set::Set(std::string setname)
151     :  m_setname(setname)
152 {
153 }
154
155
156 yf::Multi::Set::Set()
157 {
158 }
159
160
161 yf::Multi::Set::~Set()
162 {
163 }
164
165 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
166     : m_hosts(hosts), m_route(route) 
167 {
168 }
169
170 yf::Multi::Map::Map()
171 {
172 }
173
174 yf::Multi::Multi() : m_p(new Multi::Rep)
175 {
176 }
177
178 yf::Multi::~Multi() {
179 }
180
181
182 void yf::Multi::add_map_host2hosts(std::string host,
183                                    std::list<std::string> hosts,
184                                    std::string route)
185 {
186     m_p->m_maps[host] = Multi::Map(hosts, route);
187 }
188
189 void yf::Multi::Backend::operator() (void) 
190 {
191     m_package->move(m_route);
192 }
193
194 void yf::Multi::Frontend::close(Package &package)
195 {
196     std::list<BackendPtr>::const_iterator bit;
197     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
198     {
199         BackendPtr b = *bit;
200
201         b->m_package->copy_filter(package);
202         b->m_package->request() = (Z_GDU *) 0;
203         b->m_package->session().close();
204         b->m_package->move(b->m_route);
205     }
206 }
207
208 void yf::Multi::Frontend::multi_move()
209 {
210     std::list<BackendPtr>::const_iterator bit;
211     boost::thread_group g;
212     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
213     {
214         g.add_thread(new boost::thread(**bit));
215     }
216     g.join_all();
217 }
218
219 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
220 {
221     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
222
223     // empty or non-existang vhost is the same..
224     const char *vhost_cstr =
225         yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0);
226     std::string vhost;
227     if (vhost_cstr)
228         vhost = std::string(vhost_cstr);
229
230     std::map<std::string, Map>::const_iterator it;
231     it = m_p->m_maps.find(std::string(vhost));
232     if (it == m_p->m_maps.end())
233     {
234         // might return diagnostics if no match
235         package.move();
236         return;
237     }
238     std::list<std::string>::const_iterator hit = it->second.m_hosts.begin();
239     for (; hit != it->second.m_hosts.end(); hit++)
240     {
241         Session s;
242         Backend *b = new Backend;
243         b->m_vhost = *hit;
244         b->m_route = it->second.m_route;
245         b->m_package = PackagePtr(new Package(s, package.origin()));
246
247         m_backend_list.push_back(BackendPtr(b));
248     }
249     // we're going to deal with this for sure..
250
251     m_is_multi = true;
252
253     // create init request 
254     std::list<BackendPtr>::const_iterator bit;
255     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
256     {
257         yp2::odr odr;
258         BackendPtr b = *bit;
259         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
260         
261         yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
262                                  VAL_PROXY, 1, b->m_vhost.c_str());
263         
264         Z_InitRequest *req = init_apdu->u.initRequest;
265         
266         ODR_MASK_SET(req->options, Z_Options_search);
267         ODR_MASK_SET(req->options, Z_Options_present);
268         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
269         
270         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
271         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
272         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
273         
274         b->m_package->request() = init_apdu;
275
276         b->m_package->copy_filter(package);
277     }
278     multi_move();
279
280     // create the frontend init response based on each backend init response
281     yp2::odr odr;
282
283     int i;
284
285     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
286     Z_InitResponse *f_resp = f_apdu->u.initResponse;
287
288     ODR_MASK_SET(f_resp->options, Z_Options_search);
289     ODR_MASK_SET(f_resp->options, Z_Options_present);
290     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
291     
292     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
293     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
294     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
295
296     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
297     {
298         PackagePtr p = (*bit)->m_package;
299         
300         if (p->session().is_closed()) // if any backend closes, close frontend
301             package.session().close();
302         Z_GDU *gdu = p->response().get();
303         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
304             Z_APDU_initResponse)
305         {
306             Z_APDU *b_apdu = gdu->u.z3950;
307             Z_InitResponse *b_resp = b_apdu->u.initResponse;
308
309             // common options for all backends
310             for (i = 0; i <= Z_Options_stringSchema; i++)
311             {
312                 if (!ODR_MASK_GET(b_resp->options, i))
313                     ODR_MASK_CLEAR(f_resp->options, i);
314             }
315             // common protocol version
316             for (i = 0; i <= Z_ProtocolVersion_3; i++)
317                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
318                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
319             // reject if any of the backends reject
320             if (!*b_resp->result)
321                 *f_resp->result = 0;
322         }
323         else
324         {
325             // if any target does not return init return that (close or
326             // similar )
327             package.response() = p->response();
328             return;
329         }
330     }
331     package.response() = f_apdu;
332 }
333
334 void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
335 {
336     // create search request 
337     Z_SearchRequest *req = apdu_req->u.searchRequest;
338
339     // deal with piggy back (for now disable)
340     *req->smallSetUpperBound = 0;
341     *req->largeSetLowerBound = 1;
342     *req->mediumSetPresentNumber = 1;
343
344     std::list<BackendPtr>::const_iterator bit;
345     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
346     {
347         PackagePtr p = (*bit)->m_package;
348         // we don't modify database name yet!
349
350         p->request() = apdu_req;
351         p->copy_filter(package);
352     }
353     multi_move();
354
355     // look at each response
356     int total_hits = 0;
357     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
358     {
359         PackagePtr p = (*bit)->m_package;
360         
361         if (p->session().is_closed()) // if any backend closes, close frontend
362             package.session().close();
363         
364         Z_GDU *gdu = p->response().get();
365         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
366             Z_APDU_searchResponse)
367         {
368             Z_APDU *b_apdu = gdu->u.z3950;
369             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
370             
371             total_hits += *b_resp->resultCount;
372         }
373         else
374         {
375             // if any target does not return search response - return that 
376             package.response() = p->response();
377             return;
378         }
379     }
380
381     yp2::odr odr;
382     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
383     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
384
385     *f_resp->resultCount = total_hits;
386
387     package.response() = f_apdu;
388 }
389
390 void yf::Multi::process(Package &package) const
391 {
392     FrontendPtr f = m_p->get_frontend(package);
393
394     Z_GDU *gdu = package.request().get();
395     
396     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
397         Z_APDU_initRequest && !f->m_is_multi)
398     {
399         f->init(package, gdu);
400     }
401     else if (!f->m_is_multi)
402         package.move();
403     else if (gdu && gdu->which == Z_GDU_Z3950)
404     {
405         Z_APDU *apdu = gdu->u.z3950;
406         if (apdu->which == Z_APDU_initRequest)
407         {
408             yp2::odr odr;
409             
410             package.response() = odr.create_close(
411                 apdu,
412                 Z_Close_protocolError,
413                 "double init");
414             
415             package.session().close();
416         }
417         else if (apdu->which == Z_APDU_searchRequest)
418         {
419             f->search(package, apdu);
420         }
421         else
422         {
423             yp2::odr odr;
424             
425             package.response() = odr.create_close(
426                 apdu, Z_Close_protocolError,
427                 "unsupported APDU in filter multi");
428             
429             package.session().close();
430         }
431     }
432     m_p->release_frontend(package);
433 }
434
435 void yp2::filter::Multi::configure(const xmlNode * ptr)
436 {
437     for (ptr = ptr->children; ptr; ptr = ptr->next)
438     {
439         if (ptr->type != XML_ELEMENT_NODE)
440             continue;
441         if (!strcmp((const char *) ptr->name, "virtual"))
442         {
443             std::list<std::string> targets;
444             std::string vhost;
445             xmlNode *v_node = ptr->children;
446             for (; v_node; v_node = v_node->next)
447             {
448                 if (v_node->type != XML_ELEMENT_NODE)
449                     continue;
450                 
451                 if (yp2::xml::is_element_yp2(v_node, "vhost"))
452                     vhost = yp2::xml::get_text(v_node);
453                 else if (yp2::xml::is_element_yp2(v_node, "target"))
454                     targets.push_back(yp2::xml::get_text(v_node));
455                 else
456                     throw yp2::filter::FilterException
457                         ("Bad element " 
458                          + std::string((const char *) v_node->name)
459                          + " in virtual section"
460                             );
461             }
462             std::string route = yp2::xml::get_route(ptr);
463             add_map_host2hosts(vhost, targets, route);
464             std::list<std::string>::const_iterator it;
465             for (it = targets.begin(); it != targets.end(); it++)
466             {
467                 std::cout << "Add " << vhost << "->" << *it
468                           << "," << route << "\n";
469             }
470         }
471         else
472         {
473             throw yp2::filter::FilterException
474                 ("Bad element " 
475                  + std::string((const char *) ptr->name)
476                  + " in virt_db filter");
477         }
478     }
479 }
480
481 static yp2::filter::Base* filter_creator()
482 {
483     return new yp2::filter::Multi;
484 }
485
486 extern "C" {
487     struct yp2_filter_struct yp2_filter_multi = {
488         0,
489         "multi",
490         filter_creator
491     };
492 }
493
494
495 /*
496  * Local variables:
497  * c-basic-offset: 4
498  * indent-tabs-mode: nil
499  * c-file-style: "stroustrup"
500  * End:
501  * vim: shiftwidth=4 tabstop=8 expandtab
502  */