X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Ffilter_session_shared.cpp;h=da4d8e0b1cd2731c9eb5435406ced583aaf32b3f;hb=a258482a3b53b90b0932f4a789ba66e8e1576c05;hp=736ae69e40eb6f0643572366a48de7dca1a0d206;hpb=2e63f8129fbd6ff31b322590117c428c1b4afd3d;p=metaproxy-moved-to-github.git diff --git a/src/filter_session_shared.cpp b/src/filter_session_shared.cpp index 736ae69..da4d8e0 100644 --- a/src/filter_session_shared.cpp +++ b/src/filter_session_shared.cpp @@ -1,5 +1,5 @@ /* This file is part of Metaproxy. - Copyright (C) 2005-2012 Index Data + Copyright (C) 2005-2013 Index Data Metaproxy is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free @@ -169,12 +169,12 @@ namespace metaproxy_1 { Rep *m_p; BackendClassPtr m_backend_class; FrontendSets m_frontend_sets; - }; + }; // representation class SessionShared::Rep { friend class SessionShared; friend struct Frontend; - + FrontendPtr get_frontend(Package &package); void release_frontend(Package &package); Rep(); @@ -208,7 +208,7 @@ yf::SessionShared::FrontendSet::FrontendSet( { } -const yf::SessionShared::Databases & +const yf::SessionShared::Databases & yf::SessionShared::FrontendSet::get_databases() { return m_databases; @@ -222,7 +222,7 @@ const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query() yf::SessionShared::InitKey::InitKey(const InitKey &k) { m_odr = odr_createmem(ODR_ENCODE); - + m_idAuthentication_size = k.m_idAuthentication_size; m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size); memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf, @@ -254,7 +254,7 @@ yf::SessionShared::InitKey::~InitKey() } bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k) - const + const { int c; c = mp::util::memcmp2( @@ -284,7 +284,7 @@ void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b) void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b) { BackendInstanceList::iterator it = m_backend_list.begin(); - + while (it != m_backend_list.end()) { if (*it == b) @@ -294,7 +294,7 @@ void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b) 0, Z_Close_lackOfActivity, 0); (*it)->m_close_package->session().close(); (*it)->m_close_package->move(); - + it = m_backend_list.erase(it); } else @@ -304,22 +304,22 @@ void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b) -yf::SessionShared::BackendInstancePtr +yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::get_backend( const mp::Package &frontend_package) { { boost::mutex::scoped_lock lock(m_mutex_backend_class); - + BackendInstanceList::const_iterator it = m_backend_list.begin(); - + BackendInstancePtr backend1; // null - + for (; it != m_backend_list.end(); it++) { if (!(*it)->m_in_use) { - if (!backend1 + if (!backend1 || (*it)->m_sequence_this < backend1->m_sequence_this) backend1 = *it; } @@ -390,38 +390,40 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba m_named_result_sets = false; Z_GDU *gdu = init_package.response().get(); - if (init_package.session().is_closed()) - { - /* already closed. We don't know why */ - return null; - } - else if (gdu && gdu->which == Z_GDU_Z3950 - && gdu->u.z3950->which == Z_APDU_initResponse - && *gdu->u.z3950->u.initResponse->result) + + if (gdu && gdu->which == Z_GDU_Z3950 + && gdu->u.z3950->which == Z_APDU_initResponse) { - /* successful init response */ Z_InitResponse *res = gdu->u.z3950->u.initResponse; m_init_response = gdu->u.z3950; if (ODR_MASK_GET(res->options, Z_Options_namedResultSets)) { m_named_result_sets = true; } + if (*gdu->u.z3950->u.initResponse->result + && !init_package.session().is_closed()) + { + bp->m_in_use = true; + time(&bp->m_time_last_use); + bp->m_sequence_this = 0; + bp->m_result_set_sequence = 0; + m_backend_list.push_back(bp); + return bp; + } } else { - /* not init or init rejected */ + yazpp_1::GDU empty_gdu; + m_init_response = empty_gdu; + } + + if (!init_package.session().is_closed()) + { init_package.copy_filter(frontend_package); init_package.session().close(); init_package.move(); - return null; } - bp->m_in_use = true; - time(&bp->m_time_last_use); - bp->m_sequence_this = 0; - bp->m_result_set_sequence = 0; - m_backend_list.push_back(bp); - - return bp; + return null; } @@ -460,49 +462,56 @@ void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu, } else { - frontend->m_backend_class = it->second; + frontend->m_backend_class = it->second; } } BackendClassPtr bc = frontend->m_backend_class; - BackendInstancePtr backend; mp::odr odr; // we only need to get init response from "first" target in // backend class - the assumption being that init response is // same for all - if (bc->m_init_response.get() == 0) + if (bc->m_backend_list.size() == 0) { - backend = bc->get_backend(package); + BackendInstancePtr backend = bc->create_backend(package); + + if (backend) + bc->release_backend(backend); } + + yazpp_1::GDU init_response; { boost::mutex::scoped_lock lock(bc->m_mutex_backend_class); - if (bc->m_init_response.get() == 0) - { - Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0); - *apdu->u.initResponse->result = 0; - package.response() = apdu; + + init_response = bc->m_init_response; + } + + if (init_response.get()) + { + Z_GDU *response_gdu = init_response.get(); + mp::util::transfer_referenceId(odr, gdu->u.z3950, + response_gdu->u.z3950); + Z_Options *server_options = + response_gdu->u.z3950->u.initResponse->options; + Z_Options *client_options = &frontend->m_init_options; + int i; + for (i = 0; i < 30; i++) + if (!ODR_MASK_GET(client_options, i)) + ODR_MASK_CLEAR(server_options, i); + package.response() = init_response; + if (!*response_gdu->u.z3950->u.initResponse->result) package.session().close(); - } - else - { - yazpp_1::GDU init_response = bc->m_init_response; - Z_GDU *response_gdu = init_response.get(); - mp::util::transfer_referenceId(odr, gdu->u.z3950, - response_gdu->u.z3950); - - Z_Options *server_options = - response_gdu->u.z3950->u.initResponse->options; - Z_Options *client_options = &frontend->m_init_options; - - int i; - for (i = 0; i < 30; i++) - if (!ODR_MASK_GET(client_options, i)) - ODR_MASK_CLEAR(server_options, i); - package.response() = init_response; - } } - if (backend) - bc->release_backend(backend); + else + { + Z_APDU *apdu = + odr.create_initResponse( + gdu->u.z3950, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, + "session_shared: target closed connection during init"); + *apdu->u.initResponse->result = 0; + package.response() = apdu; + package.session().close(); + } } void yf::SessionShared::BackendSet::timestamp() @@ -515,7 +524,7 @@ yf::SessionShared::BackendSet::BackendSet( const Databases &databases, const yazpp_1::Yaz_Z_Query &query) : m_result_set_id(result_set_id), - m_databases(databases), m_result_set_size(0), m_query(query) + m_databases(databases), m_result_set_size(0), m_query(query) { timestamp(); } @@ -540,7 +549,7 @@ bool yf::SessionShared::BackendSet::search( req->query = m_query.get_Z_Query(); req->num_databaseNames = m_databases.size(); - req->databaseNames = (char**) + req->databaseNames = (char**) odr_malloc(odr, req->num_databaseNames * sizeof(char *)); Databases::const_iterator it = m_databases.begin(); size_t i = 0; @@ -557,7 +566,7 @@ bool yf::SessionShared::BackendSet::search( Z_GDU *gdu = search_package.response().get(); if (!search_package.session().is_closed() - && gdu && gdu->which == Z_GDU_Z3950 + && gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which == Z_APDU_searchResponse) { Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse; @@ -566,15 +575,17 @@ bool yf::SessionShared::BackendSet::search( return true; } Z_APDU *f_apdu = 0; + const char *addinfo = "session_shared: " + "target closed connection during search"; if (frontend_apdu->which == Z_APDU_searchRequest) f_apdu = odr.create_searchResponse( - frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo); else if (frontend_apdu->which == Z_APDU_presentRequest) f_apdu = odr.create_presentResponse( - frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo); else f_apdu = odr.create_close( - frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo); frontend_package.response() = f_apdu; return false; } @@ -620,7 +631,7 @@ void yf::SessionShared::Frontend::override_set( if (bc->m_named_result_sets) { result_set_id = boost::io::str( - boost::format("%1%") % + boost::format("%1%") % found_backend->m_result_set_sequence); found_backend->m_result_set_sequence++; } @@ -649,7 +660,7 @@ restart: if ((int) bc->m_backend_list.size() >= m_p->m_session_max) out_of_sessions = true; - + if (m_p->m_optimize_search) { // look at each backend and see if we have a similar search @@ -667,7 +678,6 @@ restart: found_set = *set_it; found_backend = *it; bc->use_backend(found_backend); - found_set->timestamp(); // found matching set. No need to search again return; } @@ -690,9 +700,11 @@ restart: Z_APDU *f_apdu = 0; mp::odr odr; const char *addinfo = 0; - + if (out_of_sessions) addinfo = "session_shared: all sessions in use"; + else + addinfo = "session_shared: could not create backend"; if (apdu_req->which == Z_APDU_searchRequest) { f_apdu = odr.create_searchResponse( @@ -734,7 +746,7 @@ restart: apdu_req, found_backend, &z_records)) { bc->remove_backend(found_backend); - return; // search error + return; // search error } if (z_records) @@ -748,14 +760,14 @@ restart: else if (z_records->which == Z_Records_multipleNSD) { if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1 - && - + && + z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which == Z_DiagRec_defaultFormat) { condition = get_diagnostic( z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat); - + } } if (m_p->m_restart && !session_restarted && @@ -774,7 +786,7 @@ restart: mp::odr odr; if (apdu_req->which == Z_APDU_searchRequest) { - Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, + Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0); Z_SearchResponse *f_resp = f_apdu->u.searchResponse; *f_resp->searchStatus = Z_SearchResponse_none; @@ -783,14 +795,14 @@ restart: } if (apdu_req->which == Z_APDU_presentRequest) { - Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, + Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0); Z_PresentResponse *f_resp = f_apdu->u.presentResponse; f_resp->records = z_records; package.response() = f_apdu; } bc->release_backend(found_backend); - return; // search error + return; // search error } } if (m_p->m_restart && !session_restarted && new_set->m_result_set_size < 0) @@ -811,28 +823,28 @@ void yf::SessionShared::Frontend::search(mp::Package &package, Z_APDU *apdu_req) { Z_SearchRequest *req = apdu_req->u.searchRequest; - FrontendSets::iterator fset_it = + FrontendSets::iterator fset_it = m_frontend_sets.find(req->resultSetName); if (fset_it != m_frontend_sets.end()) { - // result set already exist + // result set already exist // if replace indicator is off: we return diagnostic if // result set already exist. if (*req->replaceIndicator == 0) { mp::odr odr; - Z_APDU *apdu = + Z_APDU *apdu = odr.create_searchResponse( apdu_req, YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF, 0); package.response() = apdu; - + return; } m_frontend_sets.erase(fset_it); } - + yazpp_1::Yaz_Z_Query query; query.set_Z_Query(req->query); Databases databases; @@ -865,12 +877,12 @@ void yf::SessionShared::Frontend::present(mp::Package &package, mp::odr odr; Z_PresentRequest *req = apdu_req->u.presentRequest; - FrontendSets::iterator fset_it = + FrontendSets::iterator fset_it = m_frontend_sets.find(req->resultSetId); if (fset_it == m_frontend_sets.end()) { - Z_APDU *apdu = + Z_APDU *apdu = odr.create_presentResponse( apdu_req, YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST, @@ -892,7 +904,7 @@ void yf::SessionShared::Frontend::present(mp::Package &package, return; Z_NamePlusRecordList *npr_res = 0; - if (found_set->m_record_cache.lookup(odr, &npr_res, + if (found_set->m_record_cache.lookup(odr, &npr_res, *req->resultSetStartPoint, *req->numberOfRecordsRequested, req->preferredRecordSyntax, @@ -901,14 +913,14 @@ void yf::SessionShared::Frontend::present(mp::Package &package, Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0); Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse; - yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF + yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF " records in cache %p", - *req->resultSetStartPoint, + *req->resultSetStartPoint, *req->numberOfRecordsRequested, - &found_set->m_record_cache); + &found_set->m_record_cache); *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested; - *f_resp->nextResultSetPosition = + *f_resp->nextResultSetPosition = *req->resultSetStartPoint + *req->numberOfRecordsRequested; // f_resp->presentStatus assumed OK. f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records)); @@ -920,7 +932,7 @@ void yf::SessionShared::Frontend::present(mp::Package &package, } found_backend->timestamp(); - + Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest); Z_PresentRequest *p_req = p_apdu->u.presentRequest; p_req->preferredRecordSyntax = req->preferredRecordSyntax; @@ -939,7 +951,7 @@ void yf::SessionShared::Frontend::present(mp::Package &package, Z_GDU *gdu = present_package.response().get(); if (!present_package.session().is_closed() - && gdu && gdu->which == Z_GDU_Z3950 + && gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which == Z_APDU_presentResponse) { Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse; @@ -957,13 +969,13 @@ void yf::SessionShared::Frontend::present(mp::Package &package, { yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF " records to cache %p", - *req->resultSetStartPoint, + *req->resultSetStartPoint, *f_resp->numberOfRecordsReturned, - &found_set->m_record_cache); + &found_set->m_record_cache); found_set->m_record_cache.add( odr, b_resp->records->u.databaseOrSurDiagnostics, - *req->resultSetStartPoint, + *req->resultSetStartPoint, *f_resp->numberOfRecordsReturned); } bc->release_backend(found_backend); @@ -971,9 +983,10 @@ void yf::SessionShared::Frontend::present(mp::Package &package, else { bc->remove_backend(found_backend); - Z_APDU *f_apdu_res = + Z_APDU *f_apdu_res = odr.create_presentResponse( - apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, + "session_shared: target closed connection during present"); package.response() = f_apdu_res; } } @@ -987,7 +1000,8 @@ void yf::SessionShared::Frontend::scan(mp::Package &frontend_package, { mp::odr odr; Z_APDU *apdu = odr.create_scanResponse( - apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0); + apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, + "session_shared: could not create backend"); frontend_package.response() = apdu; } else @@ -1026,7 +1040,7 @@ void yf::SessionShared::BackendClass::expire_class() while (bit != m_backend_list.end()) { time_t last_use = (*bit)->m_time_last_use; - + if ((*bit)->m_in_use) { bit++; @@ -1053,10 +1067,16 @@ void yf::SessionShared::Rep::expire() while (true) { boost::xtime xt; - boost::xtime_get(&xt, boost::TIME_UTC); + boost::xtime_get(&xt, +#if BOOST_VERSION >= 105000 + boost::TIME_UTC_ +#else + boost::TIME_UTC +#endif + ); xt.sec += m_session_ttl / 3; boost::thread::sleep(xt); - + BackendClassMap::const_iterator b_it = m_backend_map.begin(); for (; b_it != m_backend_map.end(); b_it++) b_it->second->expire_class(); @@ -1104,13 +1124,13 @@ yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package boost::mutex::scoped_lock lock(m_mutex); std::map::iterator it; - + while(true) { it = m_clients.find(package.session()); if (it == m_clients.end()) break; - + if (!it->second->m_in_use) { it->second->m_in_use = true; @@ -1128,7 +1148,7 @@ void yf::SessionShared::Rep::release_frontend(mp::Package &package) { boost::mutex::scoped_lock lock(m_mutex); std::map::iterator it; - + it = m_clients.find(package.session()); if (it != m_clients.end()) { @@ -1150,7 +1170,7 @@ void yf::SessionShared::process(mp::Package &package) const FrontendPtr f = m_p->get_frontend(package); Z_GDU *gdu = package.request().get(); - + if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which == Z_APDU_initRequest && !f->m_is_virtual) { @@ -1164,18 +1184,18 @@ void yf::SessionShared::process(mp::Package &package) const if (apdu->which == Z_APDU_initRequest) { mp::odr odr; - + package.response() = odr.create_close( apdu, Z_Close_protocolError, "double init"); - + package.session().close(); } else if (apdu->which == Z_APDU_close) { mp::odr odr; - + package.response() = odr.create_close( apdu, Z_Close_peerAbort, "received close from client"); @@ -1196,11 +1216,11 @@ void yf::SessionShared::process(mp::Package &package) const else { mp::odr odr; - + package.response() = odr.create_close( apdu, Z_Close_protocolError, "unsupported APDU in filter_session_shared"); - + package.session().close(); } } @@ -1220,11 +1240,11 @@ void yf::SessionShared::configure(const xmlNode *ptr, bool test_only, for (attr = ptr->properties; attr; attr = attr->next) { if (!strcmp((const char *) attr->name, "ttl")) - m_p->m_resultset_ttl = + m_p->m_resultset_ttl = mp::xml::get_int(attr->children, 30); else if (!strcmp((const char *) attr->name, "max")) { - m_p->m_resultset_max = + m_p->m_resultset_max = mp::xml::get_int(attr->children, 10); } else if (!strcmp((const char *) attr->name, "optimizesearch")) @@ -1248,10 +1268,10 @@ void yf::SessionShared::configure(const xmlNode *ptr, bool test_only, for (attr = ptr->properties; attr; attr = attr->next) { if (!strcmp((const char *) attr->name, "ttl")) - m_p->m_session_ttl = + m_p->m_session_ttl = mp::xml::get_int(attr->children, 90); else if (!strcmp((const char *) attr->name, "max")) - m_p->m_session_max = + m_p->m_session_max = mp::xml::get_int(attr->children, 100); else throw mp::filter::FilterException( @@ -1261,7 +1281,7 @@ void yf::SessionShared::configure(const xmlNode *ptr, bool test_only, } else { - throw mp::filter::FilterException("Bad element " + throw mp::filter::FilterException("Bad element " + std::string((const char *) ptr->name)); }