1 /* $Id: filter_virt_db.cpp,v 1.22 2006-01-12 14:45:04 adam Exp $
2 Copyright (c) 2005, Index Data.
10 #include "package.hpp"
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
16 #include "filter_virt_db.hpp"
19 #include <yaz/otherinfo.h>
20 #include <yaz/diagbib1.h>
25 namespace yf = yp2::filter;
30 Set(yp2::Session &id, std::string setname,
31 std::string vhost, std::string route,
32 bool named_result_sets);
36 yp2::Session m_backend_session;
37 std::string m_backend_setname;
40 bool m_named_result_sets;
43 Map(std::string vhost, std::string route);
48 struct Virt_db::Frontend {
51 yp2::Session m_session;
54 std::map<std::string,Virt_db::Set> m_sets;
55 void search(Package &package, Z_APDU *apdu,
56 const std::map<std::string, Virt_db::Map> &maps);
57 void present(Package &package, Z_APDU *apdu);
58 void close(Package &package);
59 typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
64 Frontend *get_frontend(Package &package);
65 void release_frontend(Package &package);
67 boost::mutex m_sessions_mutex;
68 std::map<std::string, Virt_db::Map>m_maps;
70 typedef std::map<std::string,Virt_db::Set>::iterator Sets_it;
73 boost::condition m_cond_session_ready;
74 std::map<yp2::Session,Frontend *> m_clients;
81 yf::Virt_db::Frontend::Frontend()
86 void yf::Virt_db::Frontend::close(Package &package)
88 Sets_it sit = m_sets.begin();
89 for (; sit != m_sets.end(); sit++)
91 sit->second.m_backend_session.close();
92 Package close_package(sit->second.m_backend_session, package.origin());
93 close_package.copy_filter(package);
94 close_package.move(sit->second.m_route);
99 yf::Virt_db::Frontend::~Frontend()
103 yf::Virt_db::Frontend *yf::Virt_db::Rep::get_frontend(Package &package)
105 boost::mutex::scoped_lock lock(m_mutex);
107 std::map<yp2::Session,yf::Virt_db::Frontend *>::iterator it;
111 it = m_clients.find(package.session());
112 if (it == m_clients.end())
115 if (!it->second->m_in_use)
117 it->second->m_in_use = true;
120 m_cond_session_ready.wait(lock);
122 Frontend *f = new Frontend;
123 m_clients[package.session()] = f;
129 void yf::Virt_db::Rep::release_frontend(Package &package)
131 boost::mutex::scoped_lock lock(m_mutex);
132 std::map<yp2::Session,yf::Virt_db::Frontend *>::iterator it;
134 it = m_clients.find(package.session());
135 if (it != m_clients.end())
137 if (package.session().is_closed())
139 it->second->close(package);
145 it->second->m_in_use = false;
147 m_cond_session_ready.notify_all();
151 yf::Virt_db::Set::Set(yp2::Session &id, std::string setname,
152 std::string vhost, std::string route,
153 bool named_result_sets)
154 : m_backend_session(id), m_backend_setname(setname), m_vhost(vhost),
155 m_route(route), m_named_result_sets(named_result_sets)
160 yf::Virt_db::Set::Set()
165 yf::Virt_db::Set::~Set()
169 yf::Virt_db::Map::Map(std::string vhost, std::string route)
170 : m_vhost(vhost), m_route(route)
174 yf::Virt_db::Map::Map()
178 yf::Virt_db::Virt_db() : m_p(new Virt_db::Rep)
182 yf::Virt_db::~Virt_db() {
185 void yf::Virt_db::Frontend::present(Package &package, Z_APDU *apdu)
188 Z_PresentRequest *req = apdu->u.presentRequest;
189 std::string resultSetId = req->resultSetId;
192 Sets_it sets_it = m_sets.find(resultSetId);
193 if (sets_it == m_sets.end())
195 Z_APDU *apdu = zget_APDU(odr, Z_APDU_presentResponse);
197 Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
198 apdu->u.presentResponse->records = rec;
199 rec->which = Z_Records_NSD;
200 rec->u.nonSurrogateDiagnostic =
201 zget_DefaultDiagFormat(
203 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
204 resultSetId.c_str());
205 package.response() = apdu;
209 id = new yp2::Session(sets_it->second.m_backend_session);
211 // sending present to backend
212 Package present_package(*id, package.origin());
213 present_package.copy_filter(package);
215 req->resultSetId = odr_strdup(odr, "default");
216 present_package.request() = yazpp_1::GDU(apdu);
218 present_package.move();
220 if (present_package.session().is_closed())
222 Z_APDU *apdu = zget_APDU(odr, Z_APDU_presentResponse);
224 Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
225 apdu->u.presentResponse->records = rec;
226 rec->which = Z_Records_NSD;
227 rec->u.nonSurrogateDiagnostic =
228 zget_DefaultDiagFormat(
230 YAZ_BIB1_RESULT_SET_NO_LONGER_EXISTS_UNILATERALLY_DELETED_BY_,
231 resultSetId.c_str());
232 package.response() = apdu;
234 m_sets.erase(resultSetId);
238 package.response() = present_package.response();
243 void yf::Virt_db::Frontend::search(Package &package, Z_APDU *apdu,
244 const std::map<std::string, Virt_db::Map> &maps)
246 Z_SearchRequest *req = apdu->u.searchRequest;
248 std::string database;
249 std::string resultSetId = req->resultSetName;
250 bool support_named_result_sets = false; // whether backend supports it
253 if (req->num_databaseNames != 1)
254 { // exactly one database must be specified
255 Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
257 Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
258 apdu->u.searchResponse->records = rec;
259 rec->which = Z_Records_NSD;
260 rec->u.nonSurrogateDiagnostic =
261 zget_DefaultDiagFormat(
262 odr, YAZ_BIB1_TOO_MANY_DATABASES_SPECIFIED, 0);
263 package.response() = apdu;
267 database = req->databaseNames[0];
268 std::map<std::string, Virt_db::Map>::const_iterator map_it;
269 map_it = maps.find(database);
270 if (map_it == maps.end())
271 { // no map for database: return diagnostic
272 Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
274 Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
275 apdu->u.searchResponse->records = rec;
276 rec->which = Z_Records_NSD;
277 rec->u.nonSurrogateDiagnostic =
278 zget_DefaultDiagFormat(
279 odr, YAZ_BIB1_DATABASE_DOES_NOT_EXIST, database.c_str());
280 package.response() = apdu;
284 if (*req->replaceIndicator == 0)
286 Sets_it sets_it = m_sets.find(req->resultSetName);
287 if (sets_it != m_sets.end())
289 Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
291 Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
292 apdu->u.searchResponse->records = rec;
293 rec->which = Z_Records_NSD;
294 rec->u.nonSurrogateDiagnostic =
295 zget_DefaultDiagFormat(
297 YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
299 package.response() = apdu;
304 m_sets.erase(req->resultSetName);
305 vhost = map_it->second.m_vhost;
306 std::string route = map_it->second.m_route;
307 // we might look for an existing session with same vhost
309 const char *vhost_cstr = vhost.c_str();
311 { // sending init to backend
312 Package init_package(id, package.origin());
313 init_package.copy_filter(package);
315 Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
317 yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
318 VAL_PROXY, 1, vhost_cstr);
320 init_package.request() = init_apdu;
322 init_package.move(route); // sending init
324 if (init_package.session().is_closed())
326 Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
328 Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
329 apdu->u.searchResponse->records = rec;
330 rec->which = Z_Records_NSD;
331 rec->u.nonSurrogateDiagnostic =
332 zget_DefaultDiagFormat(
333 odr, YAZ_BIB1_DATABASE_UNAVAILABLE, database.c_str());
334 package.response() = apdu;
336 Z_GDU *gdu = init_package.response().get();
337 // we hope to get an init response
338 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
341 if (ODR_MASK_GET(gdu->u.z3950->u.initResponse->options,
342 Z_Options_namedResultSets))
343 support_named_result_sets = true;
347 Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
349 Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
350 apdu->u.searchResponse->records = rec;
351 rec->which = Z_Records_NSD;
352 rec->u.nonSurrogateDiagnostic =
353 zget_DefaultDiagFormat(
354 odr, YAZ_BIB1_DATABASE_UNAVAILABLE, database.c_str());
355 package.response() = apdu;
360 // sending search to backend
361 Package search_package(id, package.origin());
363 search_package.copy_filter(package);
364 const char *sep = strchr(vhost_cstr, '/');
366 req->databaseNames[0] = odr_strdup(odr, sep+1);
368 *req->replaceIndicator = 1;
370 std::string backend_resultSetId = "default";
371 req->resultSetName = odr_strdup(odr, backend_resultSetId.c_str());
372 search_package.request() = yazpp_1::GDU(apdu);
374 search_package.move(route);
376 if (search_package.session().is_closed())
378 Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
380 Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
381 apdu->u.searchResponse->records = rec;
382 rec->which = Z_Records_NSD;
383 rec->u.nonSurrogateDiagnostic =
384 zget_DefaultDiagFormat(
385 odr, YAZ_BIB1_DATABASE_UNAVAILABLE, database.c_str());
386 package.response() = apdu;
390 package.response() = search_package.response();
392 m_sets[resultSetId] =
393 Virt_db::Set(id, backend_resultSetId, vhost, route,
394 support_named_result_sets);
397 void yf::Virt_db::add_map_db2vhost(std::string db, std::string vhost,
400 m_p->m_maps[db] = Virt_db::Map(vhost, route);
403 void yf::Virt_db::process(Package &package) const
405 yf::Virt_db::Frontend *f = m_p->get_frontend(package);
408 Z_GDU *gdu = package.request().get();
410 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
411 Z_APDU_initRequest && !f->m_is_virtual)
413 Z_InitRequest *req = gdu->u.z3950->u.initRequest;
416 yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0);
420 Z_APDU *apdu = zget_APDU(odr, Z_APDU_initResponse);
421 Z_InitResponse *resp = apdu->u.initResponse;
424 static const int masks[] = {
427 Z_Options_namedResultSets,
430 for (i = 0; masks[i] != -1; i++)
431 if (ODR_MASK_GET(req->options, masks[i]))
432 ODR_MASK_SET(resp->options, masks[i]);
434 static const int versions[] = {
440 for (i = 0; versions[i] != -1; i++)
441 if (ODR_MASK_GET(req->protocolVersion, versions[i]))
442 ODR_MASK_SET(resp->protocolVersion, versions[i]);
446 package.response() = apdu;
447 f->m_is_virtual = true;
452 else if (!f->m_is_virtual)
454 else if (gdu && gdu->which == Z_GDU_Z3950)
456 Z_APDU *apdu = gdu->u.z3950;
457 if (apdu->which == Z_APDU_initRequest)
461 package.response() = odr.create_close(
462 Z_Close_protocolError,
465 package.session().close();
467 else if (apdu->which == Z_APDU_searchRequest)
469 f->search(package, apdu, m_p->m_maps);
471 else if (apdu->which == Z_APDU_presentRequest)
473 f->present(package, apdu);
479 package.response() = odr.create_close(
480 Z_Close_protocolError,
481 "unsupported APDU in filter_virt_db");
483 package.session().close();
487 m_p->release_frontend(package);
491 void yp2::filter::Virt_db::configure(const xmlNode * ptr)
493 for (ptr = ptr->children; ptr; ptr = ptr->next)
495 if (ptr->type != XML_ELEMENT_NODE)
497 if (!strcmp((const char *) ptr->name, "virtual"))
499 std::string database;
501 xmlNode *v_node = ptr->children;
502 for (; v_node; v_node = v_node->next)
504 if (v_node->type != XML_ELEMENT_NODE)
507 if (yp2::xml::is_element_yp2(v_node, "database"))
508 database = yp2::xml::get_text(v_node);
509 else if (yp2::xml::is_element_yp2(v_node, "target"))
510 target = yp2::xml::get_text(v_node);
512 throw yp2::filter::FilterException
514 + std::string((const char *) v_node->name)
515 + " in virtual section"
518 std::string route = yp2::xml::get_route(ptr);
519 add_map_db2vhost(database, target, route);
520 std::cout << "Add " << database << "->" << target
521 << "," << route << "\n";
525 throw yp2::filter::FilterException
527 + std::string((const char *) ptr->name)
528 + " in virt_db filter");
533 static yp2::filter::Base* filter_creator()
535 return new yp2::filter::Virt_db;
539 struct yp2_filter_struct yp2_filter_virt_db = {
550 * indent-tabs-mode: nil
551 * c-file-style: "stroustrup"
553 * vim: shiftwidth=4 tabstop=8 expandtab