X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Ffilter_session_shared.cpp;h=dbfa491dfa5b62391a74c5c3b275ce6c1281c9c3;hb=953e615164c4c47ee871ef7901e90d6fcffd8473;hp=e3833e15c40e8abc7f31d36bc8ae8f64027c16ce;hpb=7dafcdbbc006a374063e26984a2cff8576104f81;p=metaproxy-moved-to-github.git diff --git a/src/filter_session_shared.cpp b/src/filter_session_shared.cpp index e3833e1..dbfa491 100644 --- a/src/filter_session_shared.cpp +++ b/src/filter_session_shared.cpp @@ -1,5 +1,5 @@ /* This file is part of Metaproxy. - Copyright (C) 2005-2013 Index Data + Copyright (C) Index Data Metaproxy is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free @@ -41,6 +41,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #include #include #include +#include namespace mp = metaproxy_1; namespace yf = metaproxy_1::filter; @@ -80,10 +81,16 @@ namespace metaproxy_1 { time_t m_time_last_use; void timestamp(); yazpp_1::RecordCache m_record_cache; + + Z_OtherInformation *additionalSearchInfoRequest; + Z_OtherInformation *additionalSearchInfoResponse; + NMEM mem_additionalSearchInfo; BackendSet( const std::string &result_set_id, const Databases &databases, - const yazpp_1::Yaz_Z_Query &query); + const yazpp_1::Yaz_Z_Query &query, + Z_OtherInformation *additionalSearchInfoRequest); + ~BackendSet(); bool search( Package &frontend_package, Package &search_package, @@ -118,16 +125,20 @@ namespace metaproxy_1 { BackendInstancePtr get_backend(const Package &package); void use_backend(BackendInstancePtr b); void release_backend(BackendInstancePtr b); - void expire_class(); + bool expire_instances(); yazpp_1::GDU m_init_request; yazpp_1::GDU m_init_response; boost::mutex m_mutex_backend_class; + boost::condition m_cond_set_ready; int m_sequence_top; time_t m_backend_set_ttl; time_t m_backend_expiry_ttl; size_t m_backend_set_max; Odr_int m_preferredMessageSize; Odr_int m_maximumRecordSize; + int m_no_failed; + int m_no_succeeded; + int m_no_init; public: BackendClass(const yazpp_1::GDU &init_request, int resultset_ttl, @@ -187,13 +198,17 @@ namespace metaproxy_1 { void release_frontend(Package &package); Rep(); public: + ~Rep(); void expire(); private: + void expire_classes(); + void stat(); void init(Package &package, const Z_GDU *gdu, FrontendPtr frontend); void start(); boost::mutex m_mutex; boost::condition m_cond_session_ready; + boost::condition m_cond_expire_ready; std::map m_clients; BackendClassMap m_backend_map; @@ -207,6 +222,7 @@ namespace metaproxy_1 { int m_session_max; Odr_int m_preferredMessageSize; Odr_int m_maximumRecordSize; + bool close_down; }; } } @@ -287,33 +303,36 @@ bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k) void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b) { boost::mutex::scoped_lock lock(m_mutex_backend_class); + m_cond_set_ready.notify_all(); b->m_in_use = false; } void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b) { - BackendInstanceList::iterator it = m_backend_list.begin(); - - while (it != m_backend_list.end()) { - if (*it == b) + boost::mutex::scoped_lock lock(m_mutex_backend_class); + BackendInstanceList::iterator it = m_backend_list.begin(); + for (;;) { - mp::odr odr; - (*it)->m_close_package->response() = odr.create_close( - 0, Z_Close_lackOfActivity, 0); - (*it)->m_close_package->session().close(); - (*it)->m_close_package->move(); - - it = m_backend_list.erase(it); - } - else + if (it == m_backend_list.end()) + return; + if (*it == b) + { + it = m_backend_list.erase(it); + break; + } it++; + } } + mp::odr odr; + b->m_close_package->response() = odr.create_close( + 0, Z_Close_lackOfActivity, 0); + b->m_close_package->session().close(); + b->m_close_package->move(); } - yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::get_backend( const mp::Package &frontend_package) @@ -357,15 +376,30 @@ void yf::SessionShared::BackendInstance::timestamp() yf::SessionShared::BackendInstance::~BackendInstance() { + if (m_close_package) + { + mp::odr odr; + m_close_package->response() = odr.create_close( + 0, Z_Close_lackOfActivity, 0); + m_close_package->session().close(); + m_close_package->move(); + } delete m_close_package; } yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend( const mp::Package &frontend_package) { - BackendInstancePtr bp(new BackendInstance); BackendInstancePtr null; - + { + boost::mutex::scoped_lock lock(m_mutex_backend_class); + if (m_no_failed && !m_no_succeeded) + { + m_no_failed++; + return null; + } + } + BackendInstancePtr bp(new BackendInstance); bp->m_close_package = new mp::Package(bp->m_session, frontend_package.origin()); bp->m_close_package->copy_filter(frontend_package); @@ -399,6 +433,11 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba init_package.request() = init_pdu; + { + boost::mutex::scoped_lock lock(m_mutex_backend_class); + m_no_init++; + } + init_package.move(); boost::mutex::scoped_lock lock(m_mutex_backend_class); @@ -423,6 +462,7 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba bp->m_sequence_this = 0; bp->m_result_set_sequence = 0; m_backend_list.push_back(bp); + m_no_succeeded++; return bp; } } @@ -438,6 +478,7 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba init_package.session().close(); init_package.move(); } + m_no_failed++; return null; } @@ -452,12 +493,31 @@ yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request, m_sequence_top(0), m_backend_set_ttl(resultset_ttl), m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max), m_preferredMessageSize(preferredMessageSize), - m_maximumRecordSize(maximumRecordSize) + m_maximumRecordSize(maximumRecordSize), + m_no_failed(0), m_no_succeeded(0), m_no_init(0) {} yf::SessionShared::BackendClass::~BackendClass() {} +void yf::SessionShared::Rep::stat() +{ + int no_classes = 0; + int no_instances = 0; + BackendClassMap::const_iterator it; + { + boost::mutex::scoped_lock lock(m_mutex_backend_map); + for (it = m_backend_map.begin(); it != m_backend_map.end(); it++) + { + BackendClassPtr bc = it->second; + no_classes++; + BackendInstanceList::iterator bit = bc->m_backend_list.begin(); + for (; bit != bc->m_backend_list.end(); bit++) + no_instances++; + } + } +} + void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu, FrontendPtr frontend) { @@ -492,13 +552,26 @@ void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu, // we only need to get init response from "first" target in // backend class - the assumption being that init response is // same for all - if (bc->m_backend_list.size() == 0) + bool create_first_one = false; + { + boost::mutex::scoped_lock lock(bc->m_mutex_backend_class); + if (!bc->m_no_failed && !bc->m_no_succeeded && !bc->m_no_init) + create_first_one = true; + else + { + // first for first one to finish + while (!bc->m_no_failed && !bc->m_no_succeeded && bc->m_no_init) + { + bc->m_cond_set_ready.wait(lock); + } + } + } + if (create_first_one) { BackendInstancePtr backend = bc->create_backend(package); if (backend) bc->release_backend(backend); } - yazpp_1::GDU init_response; { boost::mutex::scoped_lock lock(bc->m_mutex_backend_class); @@ -551,11 +624,22 @@ void yf::SessionShared::BackendSet::timestamp() yf::SessionShared::BackendSet::BackendSet( const std::string &result_set_id, const Databases &databases, - const yazpp_1::Yaz_Z_Query &query) : + const yazpp_1::Yaz_Z_Query &query, + Z_OtherInformation *additionalSearchInfo) : m_result_set_id(result_set_id), m_databases(databases), m_result_set_size(0), m_query(query) { timestamp(); + mem_additionalSearchInfo = nmem_create(); + additionalSearchInfoResponse = 0; + additionalSearchInfoRequest = + yaz_clone_z_OtherInformation(additionalSearchInfo, + mem_additionalSearchInfo); +} + +yf::SessionShared::BackendSet::~BackendSet() +{ + nmem_destroy(mem_additionalSearchInfo); } static int get_diagnostic(Z_DefaultDiagFormat *r) @@ -574,6 +658,7 @@ bool yf::SessionShared::BackendSet::search( Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest); Z_SearchRequest *req = apdu_req->u.searchRequest; + req->additionalSearchInfo = additionalSearchInfoRequest; req->resultSetName = odr_strdup(odr, m_result_set_id.c_str()); req->query = m_query.get_Z_Query(); @@ -601,6 +686,9 @@ bool yf::SessionShared::BackendSet::search( Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse; *z_records = b_resp->records; m_result_set_size = *b_resp->resultCount; + + additionalSearchInfoResponse = yaz_clone_z_OtherInformation( + b_resp->additionalSearchInfo, mem_additionalSearchInfo); return true; } Z_APDU *f_apdu = 0; @@ -679,6 +767,10 @@ void yf::SessionShared::Frontend::get_set(mp::Package &package, BackendSetPtr &found_set) { bool session_restarted = false; + Z_OtherInformation *additionalSearchInfo = 0; + + if (apdu_req->which == Z_APDU_searchRequest) + additionalSearchInfo = apdu_req->u.searchRequest->additionalSearchInfo; restart: std::string result_set_id; @@ -694,15 +786,28 @@ restart: { // look at each backend and see if we have a similar search BackendInstanceList::const_iterator it = bc->m_backend_list.begin(); - for (; it != bc->m_backend_list.end(); it++) + while (it != bc->m_backend_list.end()) { - if (!(*it)->m_in_use) + bool restart = false; + BackendSetList::const_iterator set_it = (*it)->m_sets.begin(); + for (; set_it != (*it)->m_sets.end(); set_it++) { - BackendSetList::const_iterator set_it = (*it)->m_sets.begin(); - for (; set_it != (*it)->m_sets.end(); set_it++) + // for real present request we don't care + // if additionalSearchInfo matches: same records + if ((*set_it)->m_databases == databases + && query.match(&(*set_it)->m_query) + && (apdu_req->which != Z_APDU_searchRequest || + yaz_compare_z_OtherInformation( + additionalSearchInfo, + (*set_it)->additionalSearchInfoRequest))) { - if ((*set_it)->m_databases == databases - && query.match(&(*set_it)->m_query)) + if ((*it)->m_in_use) + { + bc->m_cond_set_ready.wait(lock); + restart = true; + break; + } + else { found_set = *set_it; found_backend = *it; @@ -712,6 +817,10 @@ restart: } } } + if (restart) + it = bc->m_backend_list.begin(); + else + it++; } } override_set(found_backend, result_set_id, databases, out_of_sessions); @@ -765,7 +874,13 @@ restart: // we must search ... BackendSetPtr new_set(new BackendSet(result_set_id, - databases, query)); + databases, query, + additionalSearchInfo)); + + found_set = new_set; + found_set->timestamp(); + found_backend->m_sets.push_back(found_set); + Z_Records *z_records = 0; Package search_package(found_backend->m_session, package.origin()); @@ -831,6 +946,7 @@ restart: package.response() = f_apdu; } bc->release_backend(found_backend); + found_set.reset(); return; // search error } } @@ -843,9 +959,11 @@ restart: goto restart; } +#if 0 found_set = new_set; found_set->timestamp(); found_backend->m_sets.push_back(found_set); +#endif } int yf::SessionShared::Frontend::result_set_ref(ODR o, @@ -961,6 +1079,7 @@ void yf::SessionShared::Frontend::search(mp::Package &package, Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0); Z_SearchResponse *f_resp = f_apdu->u.searchResponse; *f_resp->resultCount = found_set->m_result_set_size; + f_resp->additionalSearchInfo = found_set->additionalSearchInfoResponse; package.response() = f_apdu; FrontendSetPtr fset(new FrontendSet(databases, query)); @@ -1002,11 +1121,16 @@ void yf::SessionShared::Frontend::present(mp::Package &package, return; Z_NamePlusRecordList *npr_res = 0; - if (found_set->m_record_cache.lookup(odr, &npr_res, - *req->resultSetStartPoint, - *req->numberOfRecordsRequested, - req->preferredRecordSyntax, - req->recordComposition)) + // record_cache.lookup types are int's. Avoid non-fitting values + if (*req->resultSetStartPoint > 0 + && *req->resultSetStartPoint < INT_MAX + && *req->numberOfRecordsRequested > 0 + && *req->numberOfRecordsRequested < INT_MAX + && found_set->m_record_cache.lookup(odr, &npr_res, + *req->resultSetStartPoint, + *req->numberOfRecordsRequested, + req->preferredRecordSyntax, + req->recordComposition)) { Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0); Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse; @@ -1065,16 +1189,23 @@ void yf::SessionShared::Frontend::present(mp::Package &package, if (b_resp->records && b_resp->records->which == Z_Records_DBOSD) { - yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF - " records to cache %p", - *req->resultSetStartPoint, - *f_resp->numberOfRecordsReturned, - &found_set->m_record_cache); - found_set->m_record_cache.add( - odr, - b_resp->records->u.databaseOrSurDiagnostics, - *req->resultSetStartPoint, - *f_resp->numberOfRecordsReturned); + Z_NamePlusRecordList *npr = + b_resp->records->u.databaseOrSurDiagnostics; + // record_cache.add types are int's. Avoid non-fitting values + if (*req->resultSetStartPoint > 0 + && npr->num_records + *req->resultSetStartPoint < INT_MAX) + { +#if 0 + yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF + " records to cache %p", + *req->resultSetStartPoint, + *f_resp->numberOfRecordsReturned, + &found_set->m_record_cache); +#endif + found_set->m_record_cache.add( + odr, npr, *req->resultSetStartPoint, + p_req->recordComposition); + } } bc->release_backend(found_backend); } @@ -1129,7 +1260,7 @@ void yf::SessionShared::Worker::operator() (void) m_p->expire(); } -void yf::SessionShared::BackendClass::expire_class() +bool yf::SessionShared::BackendClass::expire_instances() { time_t now; time(&now); @@ -1145,12 +1276,6 @@ void yf::SessionShared::BackendClass::expire_class() } else if (now < last_use || now - last_use > m_backend_expiry_ttl) { - mp::odr odr; - (*bit)->m_close_package->response() = odr.create_close( - 0, Z_Close_lackOfActivity, 0); - (*bit)->m_close_package->session().close(); - (*bit)->m_close_package->move(); - bit = m_backend_list.erase(bit); } else @@ -1158,6 +1283,25 @@ void yf::SessionShared::BackendClass::expire_class() bit++; } } + if (m_backend_list.empty()) + return true; + return false; +} + +void yf::SessionShared::Rep::expire_classes() +{ + boost::mutex::scoped_lock lock(m_mutex_backend_map); + BackendClassMap::iterator b_it = m_backend_map.begin(); + while (b_it != m_backend_map.end()) + { + if (b_it->second->expire_instances()) + { + m_backend_map.erase(b_it); + b_it = m_backend_map.begin(); + } + else + b_it++; + } } void yf::SessionShared::Rep::expire() @@ -1166,18 +1310,21 @@ void yf::SessionShared::Rep::expire() { boost::xtime xt; boost::xtime_get(&xt, -#if BOOST_VERSION >= 105000 +#if BOOST_VERSION >= 105000 boost::TIME_UTC_ #else boost::TIME_UTC #endif ); - xt.sec += m_session_ttl / 3; - boost::thread::sleep(xt); - - BackendClassMap::const_iterator b_it = m_backend_map.begin(); - for (; b_it != m_backend_map.end(); b_it++) - b_it->second->expire_class(); + xt.sec += m_session_ttl; + { + boost::mutex::scoped_lock lock(m_mutex); + m_cond_expire_ready.timed_wait(lock, xt); + if (close_down) + break; + } + stat(); + expire_classes(); } } @@ -1191,6 +1338,17 @@ yf::SessionShared::Rep::Rep() m_session_max = 100; m_preferredMessageSize = 0; m_maximumRecordSize = 0; + close_down = false; +} + +yf::SessionShared::Rep::~Rep() +{ + { + boost::mutex::scoped_lock lock(m_mutex); + close_down = true; + m_cond_expire_ready.notify_all(); + } + m_thrds.join_all(); } void yf::SessionShared::Rep::start()