X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Ffilter_session_shared.cpp;h=cd6a237de2aa1dbb6f9ff24ccd339331a3ee046a;hb=575099e940a170b75b8d2caad5e2cafe39ed89bb;hp=f25ab3c0ffec04f22eb6e57b681b661b4897d3d5;hpb=f7cf9c4139ed621ba8027a384df7cd58dbee4a50;p=metaproxy-moved-to-github.git diff --git a/src/filter_session_shared.cpp b/src/filter_session_shared.cpp index f25ab3c..cd6a237 100644 --- a/src/filter_session_shared.cpp +++ b/src/filter_session_shared.cpp @@ -1,8 +1,20 @@ -/* $Id: filter_session_shared.cpp,v 1.9 2006-05-16 11:53:54 adam Exp $ - Copyright (c) 2005-2006, Index Data. +/* This file is part of Metaproxy. + Copyright (C) 2005-2009 Index Data -%LICENSE% - */ +Metaproxy is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free +Software Foundation; either version 2, or (at your option) any later +version. + +Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ #include "config.hpp" @@ -11,7 +23,10 @@ #include #include +#include +#include #include +#include #include "util.hpp" #include "filter_session_shared.hpp" @@ -20,86 +35,202 @@ #include #include #include - +#include +#include #include #include +#include namespace mp = metaproxy_1; -namespace yf = mp::filter; +namespace yf = metaproxy_1::filter; namespace metaproxy_1 { namespace filter { - int memcmp2(const void *buf1, int len1, const void *buf2, int len2); - + // key for session.. We'll only share sessions with same InitKey class SessionShared::InitKey { public: bool operator < (const SessionShared::InitKey &k) const; InitKey(Z_InitRequest *req); + InitKey(const InitKey &); + ~InitKey(); private: char *m_idAuthentication_buf; int m_idAuthentication_size; char *m_otherInfo_buf; int m_otherInfo_size; - mp::odr m_odr; - - std::list m_targets; + ODR m_odr; }; - class SessionShared::BackendClass { + // worker thread .. for expiry of sessions + class SessionShared::Worker { + public: + Worker(SessionShared::Rep *rep); + void operator() (void); + private: + SessionShared::Rep *m_p; + }; + // backend result set + class SessionShared::BackendSet { + public: + std::string m_result_set_id; + Databases m_databases; + int m_result_set_size; + yazpp_1::Yaz_Z_Query m_query; + time_t m_time_last_use; + void timestamp(); + yazpp_1::RecordCache m_record_cache; + BackendSet( + const std::string &result_set_id, + const Databases &databases, + const yazpp_1::Yaz_Z_Query &query); + bool search( + Package &frontend_package, + const Z_APDU *apdu_req, + const BackendInstancePtr bp, + Z_Records **z_records); + }; + // backend connection instance + class SessionShared::BackendInstance { + friend class Rep; + friend class BackendClass; + friend class BackendSet; + public: + mp::Session m_session; + BackendSetList m_sets; + bool m_in_use; + int m_sequence_this; + int m_result_set_sequence; + time_t m_time_last_use; + mp::Package * m_close_package; + ~BackendInstance(); + }; + // backends of some class (all with same InitKey) + class SessionShared::BackendClass : boost::noncopyable { + friend class Rep; + friend struct Frontend; + bool m_named_result_sets; + BackendInstanceList m_backend_list; + BackendInstancePtr create_backend(const Package &package); + void remove_backend(BackendInstancePtr b); + BackendInstancePtr get_backend(const Package &package); + void use_backend(BackendInstancePtr b); + void release_backend(BackendInstancePtr b); + void expire_class(); + yazpp_1::GDU m_init_request; yazpp_1::GDU m_init_response; + boost::mutex m_mutex_backend_class; + int m_sequence_top; + time_t m_backend_set_ttl; + time_t m_backend_expiry_ttl; + size_t m_backend_set_max; + public: + BackendClass(const yazpp_1::GDU &init_request, + int resultset_ttl, + int resultset_max, + int session_ttl); + ~BackendClass(); + }; + // frontend result set + class SessionShared::FrontendSet { + Databases m_databases; + yazpp_1::Yaz_Z_Query m_query; + public: + const Databases &get_databases(); + const yazpp_1::Yaz_Z_Query &get_query(); + FrontendSet( + const Databases &databases, + const yazpp_1::Yaz_Z_Query &query); + FrontendSet(); }; + // frontend session struct SessionShared::Frontend { - void init(Package &package, Z_GDU *gdu); Frontend(Rep *rep); ~Frontend(); - mp::Session m_session; bool m_is_virtual; bool m_in_use; + Z_Options m_init_options; + void search(Package &package, Z_APDU *apdu); + void present(Package &package, Z_APDU *apdu); + void scan(Package &package, Z_APDU *apdu); + + void get_set(mp::Package &package, + const Z_APDU *apdu_req, + const Databases &databases, + yazpp_1::Yaz_Z_Query &query, + BackendInstancePtr &found_backend, + BackendSetPtr &found_set); + void override_set(BackendInstancePtr &found_backend, + std::string &result_set_id); - void close(Package &package); Rep *m_p; + BackendClassPtr m_backend_class; + FrontendSets m_frontend_sets; }; + // representation class SessionShared::Rep { friend class SessionShared; friend struct Frontend; FrontendPtr get_frontend(Package &package); void release_frontend(Package &package); + Rep(); + public: + void expire(); private: + void init(Package &package, const Z_GDU *gdu, + FrontendPtr frontend); boost::mutex m_mutex; boost::condition m_cond_session_ready; std::map m_clients; - typedef std::map BackendClassMap; BackendClassMap m_backend_map; + boost::mutex m_mutex_backend_map; + boost::thread_group m_thrds; + int m_resultset_ttl; + int m_resultset_max; + int m_session_ttl; }; } } -int yf::memcmp2(const void *buf1, int len1, - const void *buf2, int len2) +yf::SessionShared::FrontendSet::FrontendSet( + const Databases &databases, + const yazpp_1::Yaz_Z_Query &query) + : m_databases(databases), m_query(query) +{ +} + +const yf::SessionShared::Databases & +yf::SessionShared::FrontendSet::get_databases() +{ + return m_databases; +} + +const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query() { - int d = len1 - len2; + return m_query; +} - // compare buffer (common length) - int c = memcmp(buf1, buf2, d > 0 ? len2 : len1); - if (c > 0) - return 1; - else if (c < 0) - return -1; +yf::SessionShared::InitKey::InitKey(const InitKey &k) +{ + m_odr = odr_createmem(ODR_ENCODE); - // compare (remaining bytes) - if (d > 0) - return 1; - else if (d < 0) - return -1; - return 0; + m_idAuthentication_size = k.m_idAuthentication_size; + m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size); + memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf, + m_idAuthentication_size); + + m_otherInfo_size = k.m_otherInfo_size; + m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size); + memcpy(m_otherInfo_buf, k.m_otherInfo_buf, + m_otherInfo_size); } yf::SessionShared::InitKey::InitKey(Z_InitRequest *req) { + m_odr = odr_createmem(ODR_ENCODE); + Z_IdAuthentication *t = req->idAuthentication; - z_IdAuthentication(m_odr, &t, 1, 0); m_idAuthentication_buf = odr_getbuf(m_odr, &m_idAuthentication_size, 0); @@ -109,19 +240,25 @@ yf::SessionShared::InitKey::InitKey(Z_InitRequest *req) m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0); } +yf::SessionShared::InitKey::~InitKey() +{ + odr_destroy(m_odr); +} + bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k) const { int c; - c = memcmp2((void*) m_idAuthentication_buf, m_idAuthentication_size, - (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size); + c = mp::util::memcmp2( + (void*) m_idAuthentication_buf, m_idAuthentication_size, + (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size); if (c < 0) return true; else if (c > 0) return false; - c = memcmp2((void*) m_otherInfo_buf, m_otherInfo_size, - (void*) k.m_otherInfo_buf, k.m_otherInfo_size); + c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size, + (void*) k.m_otherInfo_buf, k.m_otherInfo_size); if (c < 0) return true; else if (c > 0) @@ -129,50 +266,764 @@ bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k) return false; } -void yf::SessionShared::Frontend::init(mp::Package &package, Z_GDU *gdu) +void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b) +{ + boost::mutex::scoped_lock lock(m_mutex_backend_class); + b->m_in_use = false; +} + + +void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b) +{ + BackendInstanceList::iterator it = m_backend_list.begin(); + + while (it != m_backend_list.end()) + { + if (*it == b) + { + mp::odr odr; + (*it)->m_close_package->response() = odr.create_close( + 0, Z_Close_lackOfActivity, 0); + (*it)->m_close_package->session().close(); + (*it)->m_close_package->move(); + + it = m_backend_list.erase(it); + } + else + it++; + } +} + + + +yf::SessionShared::BackendInstancePtr +yf::SessionShared::BackendClass::get_backend( + const mp::Package &frontend_package) +{ + { + boost::mutex::scoped_lock lock(m_mutex_backend_class); + + BackendInstanceList::const_iterator it = m_backend_list.begin(); + + BackendInstancePtr backend1; // null + + for (; it != m_backend_list.end(); it++) + { + if (!(*it)->m_in_use) + { + if (!backend1 + || (*it)->m_sequence_this < backend1->m_sequence_this) + backend1 = *it; + } + } + if (backend1) + { + use_backend(backend1); + return backend1; + } + } + return create_backend(frontend_package); +} + +void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend) +{ + backend->m_in_use = true; + time(&backend->m_time_last_use); + backend->m_sequence_this = m_sequence_top++; +} + +yf::SessionShared::BackendInstance::~BackendInstance() +{ + delete m_close_package; +} + +yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend( + const mp::Package &frontend_package) +{ + BackendInstancePtr bp(new BackendInstance); + BackendInstancePtr null; + + bp->m_close_package = + new mp::Package(bp->m_session, frontend_package.origin()); + bp->m_close_package->copy_filter(frontend_package); + + Package init_package(bp->m_session, frontend_package.origin()); + + init_package.copy_filter(frontend_package); + + yazpp_1::GDU actual_init_request = m_init_request; + Z_GDU *init_pdu = actual_init_request.get(); + + assert(init_pdu->which == Z_GDU_Z3950); + assert(init_pdu->u.z3950->which == Z_APDU_initRequest); + + Z_InitRequest *req = init_pdu->u.z3950->u.initRequest; + ODR_MASK_ZERO(req->options); + + ODR_MASK_SET(req->options, Z_Options_search); + ODR_MASK_SET(req->options, Z_Options_present); + ODR_MASK_SET(req->options, Z_Options_namedResultSets); + ODR_MASK_SET(req->options, Z_Options_scan); + + ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1); + ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2); + ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3); + + init_package.request() = init_pdu; + + init_package.move(); + + boost::mutex::scoped_lock lock(m_mutex_backend_class); + + m_named_result_sets = false; + Z_GDU *gdu = init_package.response().get(); + if (!init_package.session().is_closed() + && gdu && gdu->which == Z_GDU_Z3950 + && gdu->u.z3950->which == Z_APDU_initResponse) + { + Z_InitResponse *res = gdu->u.z3950->u.initResponse; + if (!*res->result) + return null; + m_init_response = gdu->u.z3950; + if (ODR_MASK_GET(res->options, Z_Options_namedResultSets)) + { + m_named_result_sets = true; + } + } + else + { + // did not receive an init response or closed + return null; + } + bp->m_in_use = true; + time(&bp->m_time_last_use); + bp->m_sequence_this = 0; + bp->m_result_set_sequence = 0; + m_backend_list.push_back(bp); + + return bp; +} + + +yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request, + int resultset_ttl, + int resultset_max, + int session_ttl) + : m_named_result_sets(false), m_init_request(init_request), + m_sequence_top(0), m_backend_set_ttl(resultset_ttl), + m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max) +{} + +yf::SessionShared::BackendClass::~BackendClass() +{} + +void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu, + FrontendPtr frontend) { Z_InitRequest *req = gdu->u.z3950->u.initRequest; - std::list targets; + frontend->m_is_virtual = true; + frontend->m_init_options = *req->options; + InitKey k(req); + { + boost::mutex::scoped_lock lock(m_mutex_backend_map); + BackendClassMap::const_iterator it; + it = m_backend_map.find(k); + if (it == m_backend_map.end()) + { + BackendClassPtr b(new BackendClass(gdu->u.z3950, + m_resultset_ttl, + m_resultset_max, + m_session_ttl)); + m_backend_map[k] = b; + frontend->m_backend_class = b; + } + else + { + frontend->m_backend_class = it->second; + } + } + BackendClassPtr bc = frontend->m_backend_class; + BackendInstancePtr backend = bc->get_backend(package); + + mp::odr odr; + if (!backend) + { + Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0); + *apdu->u.initResponse->result = 0; + package.response() = apdu; + package.session().close(); + } + else + { + boost::mutex::scoped_lock lock(bc->m_mutex_backend_class); + yazpp_1::GDU init_response = bc->m_init_response; + Z_GDU *response_gdu = init_response.get(); + mp::util::transfer_referenceId(odr, gdu->u.z3950, + response_gdu->u.z3950); + + Z_Options *server_options = + response_gdu->u.z3950->u.initResponse->options; + Z_Options *client_options = &frontend->m_init_options; + + int i; + for (i = 0; i<30; i++) + if (!ODR_MASK_GET(client_options, i)) + ODR_MASK_CLEAR(server_options, i); + package.response() = init_response; + } + if (backend) + bc->release_backend(backend); +} + +void yf::SessionShared::BackendSet::timestamp() +{ + time(&m_time_last_use); +} + +yf::SessionShared::BackendSet::BackendSet( + const std::string &result_set_id, + const Databases &databases, + const yazpp_1::Yaz_Z_Query &query) : + m_result_set_id(result_set_id), + m_databases(databases), m_result_set_size(0), m_query(query) +{ + timestamp(); +} + +static int get_diagnostic(Z_DefaultDiagFormat *r) +{ + return *r->condition; +} + +bool yf::SessionShared::BackendSet::search( + mp::Package &frontend_package, + const Z_APDU *frontend_apdu, + const BackendInstancePtr bp, + Z_Records **z_records) +{ + Package search_package(bp->m_session, frontend_package.origin()); + + search_package.copy_filter(frontend_package); + + mp::odr odr; + Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest); + Z_SearchRequest *req = apdu_req->u.searchRequest; - mp::util::get_vhost_otherinfo(&req->otherInfo, false, targets); + req->resultSetName = odr_strdup(odr, m_result_set_id.c_str()); + req->query = m_query.get_Z_Query(); - if (targets.size() < 1) + req->num_databaseNames = m_databases.size(); + req->databaseNames = (char**) + odr_malloc(odr, req->num_databaseNames * sizeof(char *)); + Databases::const_iterator it = m_databases.begin(); + size_t i = 0; + for (; it != m_databases.end(); it++) + req->databaseNames[i++] = odr_strdup(odr, it->c_str()); + + search_package.request() = apdu_req; + + search_package.move(); + + Z_GDU *gdu = search_package.response().get(); + if (!search_package.session().is_closed() + && gdu && gdu->which == Z_GDU_Z3950 + && gdu->u.z3950->which == Z_APDU_searchResponse) { - package.move(); + Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse; + *z_records = b_resp->records; + m_result_set_size = *b_resp->resultCount; + return true; + } + Z_APDU *f_apdu = 0; + if (frontend_apdu->which == Z_APDU_searchRequest) + f_apdu = odr.create_searchResponse( + frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + else if (frontend_apdu->which == Z_APDU_presentRequest) + f_apdu = odr.create_presentResponse( + frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + else + f_apdu = odr.create_close( + frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + frontend_package.response() = f_apdu; + return false; +} + +void yf::SessionShared::Frontend::override_set( + BackendInstancePtr &found_backend, + std::string &result_set_id) +{ + BackendClassPtr bc = m_backend_class; + BackendInstanceList::const_iterator it = bc->m_backend_list.begin(); + time_t now; + time(&now); + + for (; it != bc->m_backend_list.end(); it++) + { + if (!(*it)->m_in_use) + { + BackendSetList::iterator set_it = (*it)->m_sets.begin(); + for (; set_it != (*it)->m_sets.end(); set_it++) + { + if (now >= (*set_it)->m_time_last_use && + now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl) + { + found_backend = *it; + result_set_id = (*set_it)->m_result_set_id; + found_backend->m_sets.erase(set_it); + return; + } + } + } + } + size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1; + for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++) + { + if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets) + { + found_backend = *it; + if (bc->m_named_result_sets) + { + result_set_id = boost::io::str( + boost::format("%1%") % + found_backend->m_result_set_sequence); + found_backend->m_result_set_sequence++; + } + else + result_set_id = "default"; + return; + } + } +} + +void yf::SessionShared::Frontend::get_set(mp::Package &package, + const Z_APDU *apdu_req, + const Databases &databases, + yazpp_1::Yaz_Z_Query &query, + BackendInstancePtr &found_backend, + BackendSetPtr &found_set) +{ + bool session_restarted = false; + +restart: + std::string result_set_id; + BackendClassPtr bc = m_backend_class; + { + boost::mutex::scoped_lock lock(bc->m_mutex_backend_class); + + // look at each backend and see if we have a similar search + BackendInstanceList::const_iterator it = bc->m_backend_list.begin(); + + for (; it != bc->m_backend_list.end(); it++) + { + if (!(*it)->m_in_use) + { + BackendSetList::const_iterator set_it = (*it)->m_sets.begin(); + for (; set_it != (*it)->m_sets.end(); set_it++) + { + if ((*set_it)->m_databases == databases + && query.match(&(*set_it)->m_query)) + { + found_set = *set_it; + found_backend = *it; + bc->use_backend(found_backend); + found_set->timestamp(); + // found matching set. No need to search again + return; + } + } + } + } + override_set(found_backend, result_set_id); + if (found_backend) + bc->use_backend(found_backend); + } + if (!found_backend) + { + // create a new backend set (and new set) + found_backend = bc->create_backend(package); + + if (!found_backend) + { + Z_APDU *f_apdu = 0; + mp::odr odr; + if (apdu_req->which == Z_APDU_searchRequest) + { + f_apdu = odr.create_searchResponse( + apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + } + else if (apdu_req->which == Z_APDU_presentRequest) + { + f_apdu = odr.create_presentResponse( + apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + } + else + { + f_apdu = odr.create_close( + apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + } + package.response() = f_apdu; + return; + } + if (bc->m_named_result_sets) + { + result_set_id = boost::io::str( + boost::format("%1%") % found_backend->m_result_set_sequence); + found_backend->m_result_set_sequence++; + } + else + result_set_id = "default"; + } + // we must search ... + BackendSetPtr new_set(new BackendSet(result_set_id, + databases, query)); + Z_Records *z_records = 0; + if (!new_set->search(package, apdu_req, found_backend, &z_records)) + { + bc->remove_backend(found_backend); + return; // search error + } + + if (z_records) + { + int condition = 0; + if (z_records->which == Z_Records_NSD) + { + condition = + get_diagnostic(z_records->u.nonSurrogateDiagnostic); + } + else if (z_records->which == Z_Records_multipleNSD) + { + if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1 + && + + z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which == + Z_DiagRec_defaultFormat) + { + condition = get_diagnostic( + z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat); + + } + } + if (!session_restarted && + condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR) + { + bc->remove_backend(found_backend); + session_restarted = true; + found_backend.reset(); + goto restart; + + } + + if (condition) + { + mp::odr odr; + if (apdu_req->which == Z_APDU_searchRequest) + { + Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, + 0, 0); + Z_SearchResponse *f_resp = f_apdu->u.searchResponse; + *f_resp->searchStatus = Z_SearchResponse_none; + f_resp->records = z_records; + package.response() = f_apdu; + } + if (apdu_req->which == Z_APDU_presentRequest) + { + Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, + 0, 0); + Z_PresentResponse *f_resp = f_apdu->u.presentResponse; + f_resp->records = z_records; + package.response() = f_apdu; + } + bc->release_backend(found_backend); + return; // search error + } + } + if (!session_restarted && new_set->m_result_set_size < 0) + { + bc->remove_backend(found_backend); + session_restarted = true; + found_backend.reset(); + goto restart; + } + + found_set = new_set; + found_set->timestamp(); + found_backend->m_sets.push_back(found_set); +} + +void yf::SessionShared::Frontend::search(mp::Package &package, + Z_APDU *apdu_req) +{ + Z_SearchRequest *req = apdu_req->u.searchRequest; + FrontendSets::iterator fset_it = + m_frontend_sets.find(req->resultSetName); + if (fset_it != m_frontend_sets.end()) + { + // result set already exist + // if replace indicator is off: we return diagnostic if + // result set already exist. + if (*req->replaceIndicator == 0) + { + mp::odr odr; + Z_APDU *apdu = + odr.create_searchResponse( + apdu_req, + YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF, + 0); + package.response() = apdu; + + return; + } + m_frontend_sets.erase(fset_it); + } + + yazpp_1::Yaz_Z_Query query; + query.set_Z_Query(req->query); + Databases databases; + int i; + for (i = 0; inum_databaseNames; i++) + databases.push_back(req->databaseNames[i]); + + BackendSetPtr found_set; // null + BackendInstancePtr found_backend; // null + + get_set(package, apdu_req, databases, query, found_backend, found_set); + if (!found_set) + return; + + mp::odr odr; + Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0); + Z_SearchResponse *f_resp = f_apdu->u.searchResponse; + *f_resp->resultCount = found_set->m_result_set_size; + package.response() = f_apdu; + + FrontendSetPtr fset(new FrontendSet(databases, query)); + m_frontend_sets[req->resultSetName] = fset; + + m_backend_class->release_backend(found_backend); +} + +void yf::SessionShared::Frontend::present(mp::Package &package, + Z_APDU *apdu_req) +{ + mp::odr odr; + Z_PresentRequest *req = apdu_req->u.presentRequest; + + FrontendSets::iterator fset_it = + m_frontend_sets.find(req->resultSetId); + + if (fset_it == m_frontend_sets.end()) + { + Z_APDU *apdu = + odr.create_presentResponse( + apdu_req, + YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST, + req->resultSetId); + package.response() = apdu; + return; + } + FrontendSetPtr fset = fset_it->second; + + Databases databases = fset->get_databases(); + yazpp_1::Yaz_Z_Query query = fset->get_query(); + + BackendClassPtr bc = m_backend_class; + BackendSetPtr found_set; // null + BackendInstancePtr found_backend; + + get_set(package, apdu_req, databases, query, found_backend, found_set); + if (!found_set) + return; + + Z_NamePlusRecordList *npr_res = 0; + if (found_set->m_record_cache.lookup(odr, &npr_res, + *req->resultSetStartPoint, + *req->numberOfRecordsRequested, + req->preferredRecordSyntax, + req->recordComposition)) + { + Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0); + Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse; + + yaz_log(YLOG_LOG, "Found %d+%d records in cache %p", + *req->resultSetStartPoint, + *req->numberOfRecordsRequested, + &found_set->m_record_cache); + + *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested; + *f_resp->nextResultSetPosition = + *req->resultSetStartPoint + *req->numberOfRecordsRequested; + // f_resp->presentStatus assumed OK. + f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records)); + f_resp->records->which = Z_Records_DBOSD; + f_resp->records->u.databaseOrSurDiagnostics = npr_res; + package.response() = f_apdu_res; + bc->release_backend(found_backend); return; } + + Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest); + Z_PresentRequest *p_req = p_apdu->u.presentRequest; + p_req->preferredRecordSyntax = req->preferredRecordSyntax; + p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str()); + *p_req->resultSetStartPoint = *req->resultSetStartPoint; + *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested; + p_req->preferredRecordSyntax = req->preferredRecordSyntax; + p_req->recordComposition = req->recordComposition; + + Package present_package(found_backend->m_session, package.origin()); + present_package.copy_filter(package); + + present_package.request() = p_apdu; + + present_package.move(); + + Z_GDU *gdu = present_package.response().get(); + if (!present_package.session().is_closed() + && gdu && gdu->which == Z_GDU_Z3950 + && gdu->u.z3950->which == Z_APDU_presentResponse) + { + Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse; + Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0); + Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse; + + f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned; + f_resp->nextResultSetPosition = b_resp->nextResultSetPosition; + f_resp->presentStatus= b_resp->presentStatus; + f_resp->records = b_resp->records; + f_resp->otherInfo = b_resp->otherInfo; + package.response() = f_apdu_res; + if (b_resp->records && b_resp->records->which == Z_Records_DBOSD) + { + yaz_log(YLOG_LOG, "Adding %d+%d records to cache %p", + *req->resultSetStartPoint, + *f_resp->numberOfRecordsReturned, + &found_set->m_record_cache); + found_set->m_record_cache.add( + odr, + b_resp->records->u.databaseOrSurDiagnostics, + *req->resultSetStartPoint, + *f_resp->numberOfRecordsReturned); + } + bc->release_backend(found_backend); + } + else + { + bc->remove_backend(found_backend); + Z_APDU *f_apdu_res = + odr.create_presentResponse( + apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + package.response() = f_apdu_res; + } } -yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep) +void yf::SessionShared::Frontend::scan(mp::Package &frontend_package, + Z_APDU *apdu_req) { + BackendClassPtr bc = m_backend_class; + BackendInstancePtr backend = bc->get_backend(frontend_package); + if (!backend) + { + mp::odr odr; + Z_APDU *apdu = odr.create_scanResponse( + apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + frontend_package.response() = apdu; + } + else + { + Package scan_package(backend->m_session, frontend_package.origin()); + scan_package.copy_filter(frontend_package); + scan_package.request() = apdu_req; + scan_package.move(); + frontend_package.response() = scan_package.response(); + if (scan_package.session().is_closed()) + { + frontend_package.session().close(); + bc->remove_backend(backend); + } + else + bc->release_backend(backend); + } } -yf::SessionShared::~SessionShared() { +yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep) +{ } +void yf::SessionShared::Worker::operator() (void) +{ + m_p->expire(); +} -yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep) +void yf::SessionShared::BackendClass::expire_class() { + time_t now; + time(&now); + boost::mutex::scoped_lock lock(m_mutex_backend_class); + BackendInstanceList::iterator bit = m_backend_list.begin(); + while (bit != m_backend_list.end()) + { + time_t last_use = (*bit)->m_time_last_use; + + if ((*bit)->m_in_use) + { + bit++; + } + else if ((now >= last_use && now - last_use > m_backend_expiry_ttl) + || (now < last_use)) + { + mp::odr odr; + (*bit)->m_close_package->response() = odr.create_close( + 0, Z_Close_lackOfActivity, 0); + (*bit)->m_close_package->session().close(); + (*bit)->m_close_package->move(); + + bit = m_backend_list.erase(bit); + } + else + { + bit++; + } + } } -void yf::SessionShared::Frontend::close(mp::Package &package) +void yf::SessionShared::Rep::expire() { -#if 0 - std::list::const_iterator b_it; - - for (b_it = m_backend_list.begin(); b_it != m_backend_list.end(); b_it++) + while (true) { - (*b_it)->m_backend_session.close(); - Package close_package((*b_it)->m_backend_session, package.origin()); - close_package.copy_filter(package); - close_package.move((*b_it)->m_route); + boost::xtime xt; + boost::xtime_get(&xt, boost::TIME_UTC); + xt.sec += 30; + boost::thread::sleep(xt); + + BackendClassMap::const_iterator b_it = m_backend_map.begin(); + for (; b_it != m_backend_map.end(); b_it++) + b_it->second->expire_class(); } - m_backend_list.clear(); -#endif } +yf::SessionShared::Rep::Rep() +{ + m_resultset_ttl = 30; + m_resultset_max = 10; + m_session_ttl = 90; + yf::SessionShared::Worker w(this); + m_thrds.add_thread(new boost::thread(w)); +} + +yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep) +{ +} + +yf::SessionShared::~SessionShared() { +} + + +yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep) +{ +} yf::SessionShared::Frontend::~Frontend() { @@ -213,7 +1064,6 @@ void yf::SessionShared::Rep::release_frontend(mp::Package &package) { if (package.session().is_closed()) { - it->second->close(package); m_clients.erase(it); } else @@ -234,7 +1084,7 @@ void yf::SessionShared::process(mp::Package &package) const if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which == Z_APDU_initRequest && !f->m_is_virtual) { - f->init(package, gdu); + m_p->init(package, gdu, f); } else if (!f->m_is_virtual) package.move(); @@ -252,6 +1102,27 @@ void yf::SessionShared::process(mp::Package &package) const package.session().close(); } + else if (apdu->which == Z_APDU_close) + { + mp::odr odr; + + package.response() = odr.create_close( + apdu, + Z_Close_peerAbort, "received close from client"); + package.session().close(); + } + else if (apdu->which == Z_APDU_searchRequest) + { + f->search(package, apdu); + } + else if (apdu->which == Z_APDU_presentRequest) + { + f->present(package, apdu); + } + else if (apdu->which == Z_APDU_scanRequest) + { + f->scan(package, apdu); + } else { mp::odr odr; @@ -266,6 +1137,54 @@ void yf::SessionShared::process(mp::Package &package) const m_p->release_frontend(package); } +void yf::SessionShared::configure(const xmlNode *ptr, bool test_only) +{ + for (ptr = ptr->children; ptr; ptr = ptr->next) + { + if (ptr->type != XML_ELEMENT_NODE) + continue; + if (!strcmp((const char *) ptr->name, "resultset")) + { + const struct _xmlAttr *attr; + for (attr = ptr->properties; attr; attr = attr->next) + { + if (!strcmp((const char *) attr->name, "ttl")) + m_p->m_resultset_ttl = + mp::xml::get_int(attr->children, 30); + else if (!strcmp((const char *) attr->name, "max")) + { + m_p->m_resultset_max = + mp::xml::get_int(attr->children, 10); + } + else + throw mp::filter::FilterException( + "Bad attribute " + std::string((const char *) + attr->name)); + } + } + else if (!strcmp((const char *) ptr->name, "session")) + { + const struct _xmlAttr *attr; + for (attr = ptr->properties; attr; attr = attr->next) + { + if (!strcmp((const char *) attr->name, "ttl")) + m_p->m_session_ttl = + mp::xml::get_int(attr->children, 120); + else + throw mp::filter::FilterException( + "Bad attribute " + std::string((const char *) + attr->name)); + } + } + else + { + throw mp::filter::FilterException("Bad element " + + std::string((const char *) + ptr->name)); + } + } +} + static mp::filter::Base* filter_creator() { return new mp::filter::SessionShared; @@ -282,8 +1201,9 @@ extern "C" { /* * Local variables: * c-basic-offset: 4 + * c-file-style: "Stroustrup" * indent-tabs-mode: nil - * c-file-style: "stroustrup" * End: * vim: shiftwidth=4 tabstop=8 expandtab */ +