X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Ffilter_session_shared.cpp;h=4d4606ab9546a3938f8210f304395a6ad17852b5;hb=9041eedc322ed6160521322b4ccf5b3dc0c24f2d;hp=8f7c44dc0bfdb7b8e1f2b98927c89a4350bf761f;hpb=516b4a41888fccdb4dca00f625adf6dd00350f5d;p=metaproxy-moved-to-github.git diff --git a/src/filter_session_shared.cpp b/src/filter_session_shared.cpp index 8f7c44d..4d4606a 100644 --- a/src/filter_session_shared.cpp +++ b/src/filter_session_shared.cpp @@ -1,5 +1,5 @@ /* This file is part of Metaproxy. - Copyright (C) 2005-2008 Index Data + Copyright (C) 2005-2011 Index Data Metaproxy is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free @@ -18,8 +18,8 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #include "config.hpp" -#include "filter.hpp" -#include "package.hpp" +#include +#include #include #include @@ -28,7 +28,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #include #include -#include "util.hpp" +#include #include "filter_session_shared.hpp" #include @@ -36,6 +36,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #include #include #include +#include #include #include #include @@ -77,15 +78,17 @@ namespace metaproxy_1 { yazpp_1::Yaz_Z_Query m_query; time_t m_time_last_use; void timestamp(); + yazpp_1::RecordCache m_record_cache; BackendSet( const std::string &result_set_id, const Databases &databases, const yazpp_1::Yaz_Z_Query &query); bool search( Package &frontend_package, + Package &search_package, const Z_APDU *apdu_req, const BackendInstancePtr bp, - bool &fatal_error); + Z_Records **z_records); }; // backend connection instance class SessionShared::BackendInstance { @@ -101,6 +104,7 @@ namespace metaproxy_1 { time_t m_time_last_use; mp::Package * m_close_package; ~BackendInstance(); + void timestamp(); }; // backends of some class (all with same InitKey) class SessionShared::BackendClass : boost::noncopyable { @@ -177,6 +181,7 @@ namespace metaproxy_1 { private: void init(Package &package, const Z_GDU *gdu, FrontendPtr frontend); + void start(); boost::mutex m_mutex; boost::condition m_cond_session_ready; std::map m_clients; @@ -187,6 +192,7 @@ namespace metaproxy_1 { int m_resultset_ttl; int m_resultset_max; int m_session_ttl; + bool m_optimize_search; }; } } @@ -326,10 +332,15 @@ yf::SessionShared::BackendClass::get_backend( void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend) { backend->m_in_use = true; - time(&backend->m_time_last_use); backend->m_sequence_this = m_sequence_top++; } +void yf::SessionShared::BackendInstance::timestamp() +{ + assert(m_in_use); + time(&m_time_last_use); +} + yf::SessionShared::BackendInstance::~BackendInstance() { delete m_close_package; @@ -375,13 +386,17 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba m_named_result_sets = false; Z_GDU *gdu = init_package.response().get(); - if (!init_package.session().is_closed() - && gdu && gdu->which == Z_GDU_Z3950 - && gdu->u.z3950->which == Z_APDU_initResponse) + if (init_package.session().is_closed()) { + /* already closed. We don't know why */ + return null; + } + else if (gdu && gdu->which == Z_GDU_Z3950 + && gdu->u.z3950->which == Z_APDU_initResponse + && *gdu->u.z3950->u.initResponse->result) + { + /* successful init response */ Z_InitResponse *res = gdu->u.z3950->u.initResponse; - if (!*res->result) - return null; m_init_response = gdu->u.z3950; if (ODR_MASK_GET(res->options, Z_Options_namedResultSets)) { @@ -390,7 +405,10 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba } else { - // did not receive an init response or closed + /* not init or init rejected */ + init_package.copy_filter(frontend_package); + init_package.session().close(); + init_package.move(); return null; } bp->m_in_use = true; @@ -489,16 +507,18 @@ yf::SessionShared::BackendSet::BackendSet( timestamp(); } +static int get_diagnostic(Z_DefaultDiagFormat *r) +{ + return *r->condition; +} + bool yf::SessionShared::BackendSet::search( mp::Package &frontend_package, + mp::Package &search_package, const Z_APDU *frontend_apdu, const BackendInstancePtr bp, - bool & fatal_error) + Z_Records **z_records) { - Package search_package(bp->m_session, frontend_package.origin()); - - search_package.copy_filter(frontend_package); - mp::odr odr; Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest); Z_SearchRequest *req = apdu_req->u.searchRequest; @@ -514,49 +534,21 @@ bool yf::SessionShared::BackendSet::search( for (; it != m_databases.end(); it++) req->databaseNames[i++] = odr_strdup(odr, it->c_str()); + if (frontend_apdu->which == Z_APDU_searchRequest) + req->preferredRecordSyntax = + frontend_apdu->u.searchRequest->preferredRecordSyntax; + search_package.request() = apdu_req; search_package.move(); - fatal_error = false; // assume backend session is good - Z_Records *z_records_diag = 0; Z_GDU *gdu = search_package.response().get(); if (!search_package.session().is_closed() && gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which == Z_APDU_searchResponse) { Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse; - if (b_resp->records) - { - if (b_resp->records->which == Z_Records_NSD - || b_resp->records->which == Z_Records_multipleNSD) - z_records_diag = b_resp->records; - } - if (z_records_diag) - { - // there could be diagnostics that are so bad.. that - // we simply mark the error as fatal.. For now we assume - // we can resume - if (frontend_apdu->which == Z_APDU_searchRequest) - { - Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu, - 0, 0); - Z_SearchResponse *f_resp = f_apdu->u.searchResponse; - *f_resp->searchStatus = *b_resp->searchStatus; - f_resp->records = z_records_diag; - frontend_package.response() = f_apdu; - return false; - } - if (frontend_apdu->which == Z_APDU_presentRequest) - { - Z_APDU *f_apdu = odr.create_presentResponse(frontend_apdu, - 0, 0); - Z_PresentResponse *f_resp = f_apdu->u.presentResponse; - f_resp->records = z_records_diag; - frontend_package.response() = f_apdu; - return false; - } - } + *z_records = b_resp->records; m_result_set_size = *b_resp->resultCount; return true; } @@ -571,7 +563,6 @@ bool yf::SessionShared::BackendSet::search( f_apdu = odr.create_close( frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); frontend_package.response() = f_apdu; - fatal_error = true; // weired response.. bad backend return false; } @@ -629,30 +620,35 @@ void yf::SessionShared::Frontend::get_set(mp::Package &package, BackendInstancePtr &found_backend, BackendSetPtr &found_set) { + bool session_restarted = false; + +restart: std::string result_set_id; BackendClassPtr bc = m_backend_class; { boost::mutex::scoped_lock lock(bc->m_mutex_backend_class); - - // look at each backend and see if we have a similar search - BackendInstanceList::const_iterator it = bc->m_backend_list.begin(); - for (; it != bc->m_backend_list.end(); it++) + if (m_p->m_optimize_search) { - if (!(*it)->m_in_use) + // look at each backend and see if we have a similar search + BackendInstanceList::const_iterator it = bc->m_backend_list.begin(); + for (; it != bc->m_backend_list.end(); it++) { - BackendSetList::const_iterator set_it = (*it)->m_sets.begin(); - for (; set_it != (*it)->m_sets.end(); set_it++) + if (!(*it)->m_in_use) { - if ((*set_it)->m_databases == databases - && query.match(&(*set_it)->m_query)) + BackendSetList::const_iterator set_it = (*it)->m_sets.begin(); + for (; set_it != (*it)->m_sets.end(); set_it++) { - found_set = *set_it; - found_backend = *it; - bc->use_backend(found_backend); - found_set->timestamp(); - // found matching set. No need to search again - return; + if ((*set_it)->m_databases == databases + && query.match(&(*set_it)->m_query)) + { + found_set = *set_it; + found_backend = *it; + bc->use_backend(found_backend); + found_set->timestamp(); + // found matching set. No need to search again + return; + } } } } @@ -697,18 +693,86 @@ void yf::SessionShared::Frontend::get_set(mp::Package &package, else result_set_id = "default"; } + found_backend->timestamp(); + // we must search ... BackendSetPtr new_set(new BackendSet(result_set_id, databases, query)); - bool fatal_error = false; - if (!new_set->search(package, apdu_req, found_backend, fatal_error)) + Z_Records *z_records = 0; + + Package search_package(found_backend->m_session, package.origin()); + search_package.copy_filter(package); + + if (!new_set->search(package, search_package, + apdu_req, found_backend, &z_records)) { - if (fatal_error) + bc->remove_backend(found_backend); + return; // search error + } + + if (z_records) + { + int condition = 0; + if (z_records->which == Z_Records_NSD) + { + condition = + get_diagnostic(z_records->u.nonSurrogateDiagnostic); + } + else if (z_records->which == Z_Records_multipleNSD) + { + if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1 + && + + z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which == + Z_DiagRec_defaultFormat) + { + condition = get_diagnostic( + z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat); + + } + } + if (!session_restarted && + condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR) + { bc->remove_backend(found_backend); - else + session_restarted = true; + found_backend.reset(); + goto restart; + + } + + if (condition) + { + mp::odr odr; + if (apdu_req->which == Z_APDU_searchRequest) + { + Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, + 0, 0); + Z_SearchResponse *f_resp = f_apdu->u.searchResponse; + *f_resp->searchStatus = Z_SearchResponse_none; + f_resp->records = z_records; + package.response() = f_apdu; + } + if (apdu_req->which == Z_APDU_presentRequest) + { + Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, + 0, 0); + Z_PresentResponse *f_resp = f_apdu->u.presentResponse; + f_resp->records = z_records; + package.response() = f_apdu; + } bc->release_backend(found_backend); - return; // search error + return; // search error + } + } + if (!session_restarted && new_set->m_result_set_size < 0) + { + bc->remove_backend(found_backend); + session_restarted = true; + found_backend.reset(); + goto restart; } + found_set = new_set; found_set->timestamp(); found_backend->m_sets.push_back(found_set); @@ -798,6 +862,36 @@ void yf::SessionShared::Frontend::present(mp::Package &package, if (!found_set) return; + Z_NamePlusRecordList *npr_res = 0; + if (found_set->m_record_cache.lookup(odr, &npr_res, + *req->resultSetStartPoint, + *req->numberOfRecordsRequested, + req->preferredRecordSyntax, + req->recordComposition)) + { + Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0); + Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse; + + yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF + " records in cache %p", + *req->resultSetStartPoint, + *req->numberOfRecordsRequested, + &found_set->m_record_cache); + + *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested; + *f_resp->nextResultSetPosition = + *req->resultSetStartPoint + *req->numberOfRecordsRequested; + // f_resp->presentStatus assumed OK. + f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records)); + f_resp->records->which = Z_Records_DBOSD; + f_resp->records->u.databaseOrSurDiagnostics = npr_res; + package.response() = f_apdu_res; + bc->release_backend(found_backend); + return; + } + + found_backend->timestamp(); + Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest); Z_PresentRequest *p_req = p_apdu->u.presentRequest; p_req->preferredRecordSyntax = req->preferredRecordSyntax; @@ -829,6 +923,20 @@ void yf::SessionShared::Frontend::present(mp::Package &package, f_resp->records = b_resp->records; f_resp->otherInfo = b_resp->otherInfo; package.response() = f_apdu_res; + + if (b_resp->records && b_resp->records->which == Z_Records_DBOSD) + { + yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF + " records to cache %p", + *req->resultSetStartPoint, + *f_resp->numberOfRecordsReturned, + &found_set->m_record_cache); + found_set->m_record_cache.add( + odr, + b_resp->records->u.databaseOrSurDiagnostics, + *req->resultSetStartPoint, + *f_resp->numberOfRecordsReturned); + } bc->release_backend(found_backend); } else @@ -856,6 +964,7 @@ void yf::SessionShared::Frontend::scan(mp::Package &frontend_package, else { Package scan_package(backend->m_session, frontend_package.origin()); + backend->timestamp(); scan_package.copy_filter(frontend_package); scan_package.request() = apdu_req; scan_package.move(); @@ -917,7 +1026,7 @@ void yf::SessionShared::Rep::expire() { boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); - xt.sec += 30; + xt.sec += m_session_ttl / 3; boost::thread::sleep(xt); BackendClassMap::const_iterator b_it = m_backend_map.begin(); @@ -931,6 +1040,11 @@ yf::SessionShared::Rep::Rep() m_resultset_ttl = 30; m_resultset_max = 10; m_session_ttl = 90; + m_optimize_search = true; +} + +void yf::SessionShared::Rep::start() +{ yf::SessionShared::Worker w(this); m_thrds.add_thread(new boost::thread(w)); } @@ -942,6 +1056,10 @@ yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep) yf::SessionShared::~SessionShared() { } +void yf::SessionShared::start() const +{ + m_p->start(); +} yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep) { @@ -1059,7 +1177,8 @@ void yf::SessionShared::process(mp::Package &package) const m_p->release_frontend(package); } -void yf::SessionShared::configure(const xmlNode *ptr, bool test_only) +void yf::SessionShared::configure(const xmlNode *ptr, bool test_only, + const char *path) { for (ptr = ptr->children; ptr; ptr = ptr->next) { @@ -1078,6 +1197,11 @@ void yf::SessionShared::configure(const xmlNode *ptr, bool test_only) m_p->m_resultset_max = mp::xml::get_int(attr->children, 10); } + else if (!strcmp((const char *) attr->name, "optimizesearch")) + { + m_p->m_optimize_search = + mp::xml::get_bool(attr->children, true); + } else throw mp::filter::FilterException( "Bad attribute " + std::string((const char *) @@ -1123,8 +1247,9 @@ extern "C" { /* * Local variables: * c-basic-offset: 4 + * c-file-style: "Stroustrup" * indent-tabs-mode: nil - * c-file-style: "stroustrup" * End: * vim: shiftwidth=4 tabstop=8 expandtab */ +