1 /* $Id: filter_virt_db.cpp,v 1.28 2006-01-16 17:03:09 adam Exp $
2 Copyright (c) 2005, Index Data.
10 #include "package.hpp"
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
14 #include <boost/shared_ptr.hpp>
17 #include "filter_virt_db.hpp"
20 #include <yaz/otherinfo.h>
21 #include <yaz/diagbib1.h>
26 namespace yf = yp2::filter;
32 Set(BackendPtr b, std::string setname);
37 std::string m_setname;
40 Map(std::list<std::string> targets, std::string route);
42 std::list<std::string> m_targets;
45 struct Virt_db::Backend {
46 yp2::Session m_backend_session;
48 std::string m_backend_database;
50 std::list<std::string> m_frontend_databases;
51 std::list<std::string> m_targets;
53 bool m_named_result_sets;
56 struct Virt_db::Frontend {
59 yp2::Session m_session;
62 std::list<BackendPtr> m_backend_list;
63 std::map<std::string,Virt_db::Set> m_sets;
65 void search(Package &package, Z_APDU *apdu);
66 void present(Package &package, Z_APDU *apdu);
67 void scan(Package &package, Z_APDU *apdu);
69 void close(Package &package);
70 typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
72 BackendPtr lookup_backend_from_databases(
73 std::list<std::string> databases);
74 BackendPtr create_backend_from_databases(
75 std::list<std::string> databases,
76 std::string &failing_database);
78 BackendPtr init_backend(std::list<std::string> database,
80 int &error_code, std::string &addinfo);
85 friend class Frontend;
87 FrontendPtr get_frontend(Package &package);
88 void release_frontend(Package &package);
90 boost::mutex m_sessions_mutex;
91 std::map<std::string, Virt_db::Map>m_maps;
93 typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
96 boost::condition m_cond_session_ready;
97 std::map<yp2::Session, FrontendPtr> m_clients;
104 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::lookup_backend_from_databases(
105 std::list<std::string> databases)
107 std::list<BackendPtr>::const_iterator map_it;
108 map_it = m_backend_list.begin();
109 for (; map_it != m_backend_list.end(); map_it++)
110 if ((*map_it)->m_frontend_databases == databases)
116 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::create_backend_from_databases(
117 std::list<std::string> databases, std::string &failing_database)
119 BackendPtr b(new Backend);
120 std::list<std::string>::const_iterator db_it = databases.begin();
122 b->m_number_of_sets = 0;
123 b->m_frontend_databases = databases;
124 b->m_named_result_sets = false;
126 std::map<std::string,bool> targets_dedup;
127 for (; db_it != databases.end(); db_it++)
129 std::map<std::string, Virt_db::Map>::iterator map_it;
130 map_it = m_p->m_maps.find(*db_it);
131 if (map_it == m_p->m_maps.end()) // database not found
133 failing_database = *db_it;
137 std::list<std::string>::const_iterator t_it =
138 map_it->second.m_targets.begin();
139 for (; t_it != map_it->second.m_targets.end(); t_it++)
140 targets_dedup[*t_it] = true;
141 b->m_route = map_it->second.m_route;
143 std::map<std::string,bool>::const_iterator tm_it = targets_dedup.begin();
144 for (; tm_it != targets_dedup.end(); tm_it++)
145 b->m_targets.push_back(tm_it->first);
147 const char *sep = strchr(b->m_vhost.c_str(), '/');
148 std::string backend_database;
150 b->m_backend_database = std::string(sep+1);
152 b->m_backend_database = database;
157 yf::Virt_db::BackendPtr yf::Virt_db::Frontend::init_backend(
158 std::list<std::string> databases, Package &package,
159 int &error_code, std::string &addinfo)
161 std::string failing_database;
162 BackendPtr b = create_backend_from_databases(databases, failing_database);
165 error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
166 addinfo = failing_database;
169 Package init_package(b->m_backend_session, package.origin());
170 init_package.copy_filter(package);
174 Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
176 std::list<std::string>::const_iterator t_it = b->m_targets.begin();
178 for (; t_it != b->m_targets.end(); t_it++, cat++)
180 yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
181 VAL_PROXY, cat, t_it->c_str());
183 Z_InitRequest *req = init_apdu->u.initRequest;
185 ODR_MASK_SET(req->options, Z_Options_search);
186 ODR_MASK_SET(req->options, Z_Options_present);
187 ODR_MASK_SET(req->options, Z_Options_namedResultSets);
188 ODR_MASK_SET(req->options, Z_Options_scan);
190 ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
191 ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
192 ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
194 init_package.request() = init_apdu;
196 init_package.move(b->m_route); // sending init
198 if (init_package.session().is_closed())
200 error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
201 // addinfo = database;
205 Z_GDU *gdu = init_package.response().get();
206 // we hope to get an init response
207 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
210 if (ODR_MASK_GET(gdu->u.z3950->u.initResponse->options,
211 Z_Options_namedResultSets))
213 b->m_named_result_sets = true;
218 error_code = YAZ_BIB1_DATABASE_UNAVAILABLE;
219 // addinfo = database;
223 m_backend_list.push_back(b);
227 void yf::Virt_db::Frontend::search(Package &package, Z_APDU *apdu_req)
229 Z_SearchRequest *req = apdu_req->u.searchRequest;
231 std::string resultSetId = req->resultSetName;
234 std::list<std::string> databases;
236 for (i = 0; i<req->num_databaseNames; i++)
237 databases.push_back(req->databaseNames[i]);
239 BackendPtr b; // null for now
240 Sets_it sets_it = m_sets.find(req->resultSetName);
241 if (sets_it != m_sets.end())
243 // result set already exist
244 // if replace indicator is off: we return diagnostic if
245 // result set already exist.
246 if (*req->replaceIndicator == 0)
249 odr.create_searchResponse(
251 YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
253 package.response() = apdu;
257 sets_it->second.m_backend->m_number_of_sets--;
259 // pick up any existing backend with a database match
260 std::list<BackendPtr>::const_iterator map_it;
261 map_it = m_backend_list.begin();
262 for (; map_it != m_backend_list.end(); map_it++)
264 BackendPtr tmp = *map_it;
265 if (tmp->m_frontend_databases == databases)
268 if (map_it != m_backend_list.end())
275 // pick up any existing database with named result sets ..
276 // or one which has no result sets.. yet.
277 std::list<BackendPtr>::const_iterator map_it;
278 map_it = m_backend_list.begin();
279 for (; map_it != m_backend_list.end(); map_it++)
281 BackendPtr tmp = *map_it;
282 if (tmp->m_frontend_databases == databases &&
283 (tmp->m_named_result_sets ||
284 tmp->m_number_of_sets == 0))
287 if (map_it != m_backend_list.end())
290 if (!b) // no backend yet. Must create a new one
294 b = init_backend(databases, package, error_code, addinfo);
297 // did not get a backend (unavailable somehow?)
300 odr.create_searchResponse(
301 apdu_req, error_code, addinfo.c_str());
302 package.response() = apdu;
306 m_sets.erase(req->resultSetName);
307 // sending search to backend
308 Package search_package(b->m_backend_session, package.origin());
310 search_package.copy_filter(package);
312 std::string backend_setname;
313 if (b->m_named_result_sets)
315 backend_setname = std::string(req->resultSetName);
319 backend_setname = "default";
320 req->resultSetName = odr_strdup(odr, backend_setname.c_str());
324 const char *backend_database = b->m_backend_database.c_str();
325 req->databaseNames[0] = odr_strdup(odr, backend_database);
328 *req->replaceIndicator = 1;
330 search_package.request() = yazpp_1::GDU(apdu_req);
332 search_package.move(b->m_route);
334 if (search_package.session().is_closed())
337 odr.create_searchResponse(
338 apdu_req, YAZ_BIB1_DATABASE_UNAVAILABLE, 0);
339 package.response() = apdu;
342 package.response() = search_package.response();
344 b->m_number_of_sets++;
346 m_sets[resultSetId] = Virt_db::Set(b, backend_setname);
349 yf::Virt_db::Frontend::Frontend(Rep *rep)
352 m_is_virtual = false;
355 void yf::Virt_db::Frontend::close(Package &package)
357 std::list<BackendPtr>::const_iterator b_it;
359 for (b_it = m_backend_list.begin(); b_it != m_backend_list.end(); b_it++)
361 (*b_it)->m_backend_session.close();
362 Package close_package((*b_it)->m_backend_session, package.origin());
363 close_package.copy_filter(package);
364 close_package.move((*b_it)->m_route);
366 m_backend_list.clear();
369 yf::Virt_db::Frontend::~Frontend()
373 yf::Virt_db::FrontendPtr yf::Virt_db::Rep::get_frontend(Package &package)
375 boost::mutex::scoped_lock lock(m_mutex);
377 std::map<yp2::Session,yf::Virt_db::FrontendPtr>::iterator it;
381 it = m_clients.find(package.session());
382 if (it == m_clients.end())
385 if (!it->second->m_in_use)
387 it->second->m_in_use = true;
390 m_cond_session_ready.wait(lock);
392 FrontendPtr f(new Frontend(this));
393 m_clients[package.session()] = f;
398 void yf::Virt_db::Rep::release_frontend(Package &package)
400 boost::mutex::scoped_lock lock(m_mutex);
401 std::map<yp2::Session,yf::Virt_db::FrontendPtr>::iterator it;
403 it = m_clients.find(package.session());
404 if (it != m_clients.end())
406 if (package.session().is_closed())
408 it->second->close(package);
413 it->second->m_in_use = false;
415 m_cond_session_ready.notify_all();
419 yf::Virt_db::Set::Set(BackendPtr b, std::string setname)
420 : m_backend(b), m_setname(setname)
425 yf::Virt_db::Set::Set()
430 yf::Virt_db::Set::~Set()
434 yf::Virt_db::Map::Map(std::list<std::string> targets, std::string route)
435 : m_targets(targets), m_route(route)
439 yf::Virt_db::Map::Map()
443 yf::Virt_db::Virt_db() : m_p(new Virt_db::Rep)
447 yf::Virt_db::~Virt_db() {
450 void yf::Virt_db::Frontend::present(Package &package, Z_APDU *apdu_req)
452 Z_PresentRequest *req = apdu_req->u.presentRequest;
453 std::string resultSetId = req->resultSetId;
456 Sets_it sets_it = m_sets.find(resultSetId);
457 if (sets_it == m_sets.end())
460 odr.create_presentResponse(
462 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
463 resultSetId.c_str());
464 package.response() = apdu;
468 new yp2::Session(sets_it->second.m_backend->m_backend_session);
470 // sending present to backend
471 Package present_package(*id, package.origin());
472 present_package.copy_filter(package);
474 req->resultSetId = odr_strdup(odr, sets_it->second.m_setname.c_str());
476 present_package.request() = yazpp_1::GDU(apdu_req);
478 present_package.move();
480 if (present_package.session().is_closed())
483 odr.create_presentResponse(
485 YAZ_BIB1_RESULT_SET_NO_LONGER_EXISTS_UNILATERALLY_DELETED_BY_,
486 resultSetId.c_str());
487 package.response() = apdu;
488 m_sets.erase(resultSetId);
492 package.response() = present_package.response();
497 void yf::Virt_db::Frontend::scan(Package &package, Z_APDU *apdu_req)
499 Z_ScanRequest *req = apdu_req->u.scanRequest;
503 std::list<std::string> databases;
505 for (i = 0; i<req->num_databaseNames; i++)
506 databases.push_back(req->databaseNames[i]);
509 // pick up any existing backend with a database match
510 std::list<BackendPtr>::const_iterator map_it;
511 map_it = m_backend_list.begin();
512 for (; map_it != m_backend_list.end(); map_it++)
514 BackendPtr tmp = *map_it;
515 if (tmp->m_frontend_databases == databases)
518 if (map_it != m_backend_list.end())
520 if (!b) // no backend yet. Must create a new one
524 b = init_backend(databases, package, error_code, addinfo);
527 // did not get a backend (unavailable somehow?)
529 odr.create_scanResponse(
530 apdu_req, error_code, addinfo.c_str());
531 package.response() = apdu;
536 // sending scan to backend
537 Package scan_package(b->m_backend_session, package.origin());
539 scan_package.copy_filter(package);
542 const char *backend_database = b->m_backend_database.c_str();
543 req->databaseNames[0] = odr_strdup(odr, backend_database);
546 scan_package.request() = yazpp_1::GDU(apdu_req);
548 scan_package.move(b->m_route);
550 if (scan_package.session().is_closed())
553 odr.create_scanResponse(
554 apdu_req, YAZ_BIB1_DATABASE_UNAVAILABLE, 0);
555 package.response() = apdu;
558 package.response() = scan_package.response();
562 void yf::Virt_db::add_map_db2targets(std::string db,
563 std::list<std::string> targets,
566 m_p->m_maps[db] = Virt_db::Map(targets, route);
570 void yf::Virt_db::add_map_db2target(std::string db,
574 std::list<std::string> targets;
575 targets.push_back(target);
577 m_p->m_maps[db] = Virt_db::Map(targets, route);
580 void yf::Virt_db::process(Package &package) const
582 FrontendPtr f = m_p->get_frontend(package);
584 Z_GDU *gdu = package.request().get();
586 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
587 Z_APDU_initRequest && !f->m_is_virtual)
589 Z_InitRequest *req = gdu->u.z3950->u.initRequest;
592 yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0);
596 Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
597 Z_InitResponse *resp = apdu->u.initResponse;
600 static const int masks[] = {
603 Z_Options_namedResultSets,
607 for (i = 0; masks[i] != -1; i++)
608 if (ODR_MASK_GET(req->options, masks[i]))
609 ODR_MASK_SET(resp->options, masks[i]);
611 static const int versions[] = {
617 for (i = 0; versions[i] != -1; i++)
618 if (ODR_MASK_GET(req->protocolVersion, versions[i]))
619 ODR_MASK_SET(resp->protocolVersion, versions[i]);
623 package.response() = apdu;
624 f->m_is_virtual = true;
629 else if (!f->m_is_virtual)
631 else if (gdu && gdu->which == Z_GDU_Z3950)
633 Z_APDU *apdu = gdu->u.z3950;
634 if (apdu->which == Z_APDU_initRequest)
638 package.response() = odr.create_close(
640 Z_Close_protocolError,
643 package.session().close();
645 else if (apdu->which == Z_APDU_searchRequest)
647 f->search(package, apdu);
649 else if (apdu->which == Z_APDU_presentRequest)
651 f->present(package, apdu);
653 else if (apdu->which == Z_APDU_scanRequest)
655 f->scan(package, apdu);
661 package.response() = odr.create_close(
662 apdu, Z_Close_protocolError,
663 "unsupported APDU in filter_virt_db");
665 package.session().close();
668 m_p->release_frontend(package);
672 void yp2::filter::Virt_db::configure(const xmlNode * ptr)
674 for (ptr = ptr->children; ptr; ptr = ptr->next)
676 if (ptr->type != XML_ELEMENT_NODE)
678 if (!strcmp((const char *) ptr->name, "virtual"))
680 std::string database;
681 std::list<std::string> targets;
682 xmlNode *v_node = ptr->children;
683 for (; v_node; v_node = v_node->next)
685 if (v_node->type != XML_ELEMENT_NODE)
688 if (yp2::xml::is_element_yp2(v_node, "database"))
689 database = yp2::xml::get_text(v_node);
690 else if (yp2::xml::is_element_yp2(v_node, "target"))
691 targets.push_back(yp2::xml::get_text(v_node));
693 throw yp2::filter::FilterException
695 + std::string((const char *) v_node->name)
696 + " in virtual section"
699 std::string route = yp2::xml::get_route(ptr);
700 add_map_db2targets(database, targets, route);
702 std::cout << "Add " << database << "->" << target
703 << "," << route << "\n";
708 throw yp2::filter::FilterException
710 + std::string((const char *) ptr->name)
711 + " in virt_db filter");
716 static yp2::filter::Base* filter_creator()
718 return new yp2::filter::Virt_db;
722 struct yp2_filter_struct yp2_filter_virt_db = {
733 * indent-tabs-mode: nil
734 * c-file-style: "stroustrup"
736 * vim: shiftwidth=4 tabstop=8 expandtab