X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Ffilter_session_shared.cpp;h=dbfa491dfa5b62391a74c5c3b275ce6c1281c9c3;hb=953e615164c4c47ee871ef7901e90d6fcffd8473;hp=eb8b23a0b33005f97b564376606cf3d7a67e2559;hpb=a395969830b02ecb29a24d455d60edb394af77e4;p=metaproxy-moved-to-github.git diff --git a/src/filter_session_shared.cpp b/src/filter_session_shared.cpp index eb8b23a..dbfa491 100644 --- a/src/filter_session_shared.cpp +++ b/src/filter_session_shared.cpp @@ -129,12 +129,16 @@ namespace metaproxy_1 { yazpp_1::GDU m_init_request; yazpp_1::GDU m_init_response; boost::mutex m_mutex_backend_class; + boost::condition m_cond_set_ready; int m_sequence_top; time_t m_backend_set_ttl; time_t m_backend_expiry_ttl; size_t m_backend_set_max; Odr_int m_preferredMessageSize; Odr_int m_maximumRecordSize; + int m_no_failed; + int m_no_succeeded; + int m_no_init; public: BackendClass(const yazpp_1::GDU &init_request, int resultset_ttl, @@ -299,6 +303,7 @@ bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k) void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b) { boost::mutex::scoped_lock lock(m_mutex_backend_class); + m_cond_set_ready.notify_all(); b->m_in_use = false; } @@ -371,15 +376,30 @@ void yf::SessionShared::BackendInstance::timestamp() yf::SessionShared::BackendInstance::~BackendInstance() { + if (m_close_package) + { + mp::odr odr; + m_close_package->response() = odr.create_close( + 0, Z_Close_lackOfActivity, 0); + m_close_package->session().close(); + m_close_package->move(); + } delete m_close_package; } yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend( const mp::Package &frontend_package) { - BackendInstancePtr bp(new BackendInstance); BackendInstancePtr null; - + { + boost::mutex::scoped_lock lock(m_mutex_backend_class); + if (m_no_failed && !m_no_succeeded) + { + m_no_failed++; + return null; + } + } + BackendInstancePtr bp(new BackendInstance); bp->m_close_package = new mp::Package(bp->m_session, frontend_package.origin()); bp->m_close_package->copy_filter(frontend_package); @@ -413,6 +433,11 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba init_package.request() = init_pdu; + { + boost::mutex::scoped_lock lock(m_mutex_backend_class); + m_no_init++; + } + init_package.move(); boost::mutex::scoped_lock lock(m_mutex_backend_class); @@ -437,6 +462,7 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba bp->m_sequence_this = 0; bp->m_result_set_sequence = 0; m_backend_list.push_back(bp); + m_no_succeeded++; return bp; } } @@ -452,6 +478,7 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba init_package.session().close(); init_package.move(); } + m_no_failed++; return null; } @@ -466,7 +493,8 @@ yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request, m_sequence_top(0), m_backend_set_ttl(resultset_ttl), m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max), m_preferredMessageSize(preferredMessageSize), - m_maximumRecordSize(maximumRecordSize) + m_maximumRecordSize(maximumRecordSize), + m_no_failed(0), m_no_succeeded(0), m_no_init(0) {} yf::SessionShared::BackendClass::~BackendClass() @@ -524,13 +552,26 @@ void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu, // we only need to get init response from "first" target in // backend class - the assumption being that init response is // same for all - if (bc->m_backend_list.size() == 0) + bool create_first_one = false; + { + boost::mutex::scoped_lock lock(bc->m_mutex_backend_class); + if (!bc->m_no_failed && !bc->m_no_succeeded && !bc->m_no_init) + create_first_one = true; + else + { + // first for first one to finish + while (!bc->m_no_failed && !bc->m_no_succeeded && bc->m_no_init) + { + bc->m_cond_set_ready.wait(lock); + } + } + } + if (create_first_one) { BackendInstancePtr backend = bc->create_backend(package); if (backend) bc->release_backend(backend); } - yazpp_1::GDU init_response; { boost::mutex::scoped_lock lock(bc->m_mutex_backend_class); @@ -745,21 +786,28 @@ restart: { // look at each backend and see if we have a similar search BackendInstanceList::const_iterator it = bc->m_backend_list.begin(); - for (; it != bc->m_backend_list.end(); it++) + while (it != bc->m_backend_list.end()) { - if (!(*it)->m_in_use) + bool restart = false; + BackendSetList::const_iterator set_it = (*it)->m_sets.begin(); + for (; set_it != (*it)->m_sets.end(); set_it++) { - BackendSetList::const_iterator set_it = (*it)->m_sets.begin(); - for (; set_it != (*it)->m_sets.end(); set_it++) - { - // for real present request we don't care - // if additionalSearchInfo matches: same records - if ((*set_it)->m_databases == databases - && query.match(&(*set_it)->m_query) - && (apdu_req->which != Z_APDU_searchRequest || - yaz_compare_z_OtherInformation( - additionalSearchInfo, + // for real present request we don't care + // if additionalSearchInfo matches: same records + if ((*set_it)->m_databases == databases + && query.match(&(*set_it)->m_query) + && (apdu_req->which != Z_APDU_searchRequest || + yaz_compare_z_OtherInformation( + additionalSearchInfo, (*set_it)->additionalSearchInfoRequest))) + { + if ((*it)->m_in_use) + { + bc->m_cond_set_ready.wait(lock); + restart = true; + break; + } + else { found_set = *set_it; found_backend = *it; @@ -769,6 +817,10 @@ restart: } } } + if (restart) + it = bc->m_backend_list.begin(); + else + it++; } } override_set(found_backend, result_set_id, databases, out_of_sessions); @@ -824,6 +876,11 @@ restart: BackendSetPtr new_set(new BackendSet(result_set_id, databases, query, additionalSearchInfo)); + + found_set = new_set; + found_set->timestamp(); + found_backend->m_sets.push_back(found_set); + Z_Records *z_records = 0; Package search_package(found_backend->m_session, package.origin()); @@ -889,6 +946,7 @@ restart: package.response() = f_apdu; } bc->release_backend(found_backend); + found_set.reset(); return; // search error } } @@ -901,9 +959,11 @@ restart: goto restart; } +#if 0 found_set = new_set; found_set->timestamp(); found_backend->m_sets.push_back(found_set); +#endif } int yf::SessionShared::Frontend::result_set_ref(ODR o, @@ -1216,12 +1276,6 @@ bool yf::SessionShared::BackendClass::expire_instances() } else if (now < last_use || now - last_use > m_backend_expiry_ttl) { - mp::odr odr; - (*bit)->m_close_package->response() = odr.create_close( - 0, Z_Close_lackOfActivity, 0); - (*bit)->m_close_package->session().close(); - (*bit)->m_close_package->move(); - bit = m_backend_list.erase(bit); } else @@ -1262,7 +1316,7 @@ void yf::SessionShared::Rep::expire() boost::TIME_UTC #endif ); - xt.sec += m_session_ttl / 3; + xt.sec += m_session_ttl; { boost::mutex::scoped_lock lock(m_mutex); m_cond_expire_ready.timed_wait(lock, xt);