From 73bda3a639851ca17dd1449b94203600a32cd838 Mon Sep 17 00:00:00 2001 From: Adam Dickmeiss Date: Mon, 16 Apr 2012 14:56:48 +0200 Subject: [PATCH] sru_z3950: serialize requests This is to ensure that pipelined HTTP requests are handled properly. --- src/filter_sru_to_z3950.cpp | 127 ++++++++++++++++++++++++++++++++----------- src/filter_sru_to_z3950.hpp | 3 + 2 files changed, 98 insertions(+), 32 deletions(-) diff --git a/src/filter_sru_to_z3950.cpp b/src/filter_sru_to_z3950.cpp index a4aab88..75c0da4 100644 --- a/src/filter_sru_to_z3950.cpp +++ b/src/filter_sru_to_z3950.cpp @@ -48,18 +48,32 @@ namespace yf = mp::filter; namespace metaproxy_1 { namespace filter { + class SRUtoZ3950::Frontend : boost::noncopyable { + friend class Impl; + bool m_in_use; + public: + Frontend(); + ~Frontend(); + }; class SRUtoZ3950::Impl { public: void configure(const xmlNode *xmlnode); void process(metaproxy_1::Package &package); private: + FrontendPtr get_frontend(mp::Package &package); + void release_frontend(mp::Package &package); std::map m_database_explain; typedef std::map ActiveUrlMap; - boost::mutex m_mutex; + boost::mutex m_url_mutex; boost::condition m_cond_url_ready; ActiveUrlMap m_active_urls; + + + boost::mutex m_mutex_session; + boost::condition m_cond_session_ready; + std::map m_clients; private: void sru(metaproxy_1::Package &package, Z_GDU *zgdu_req); int z3950_build_query( @@ -319,51 +333,100 @@ void yf::SRUtoZ3950::Impl::sru(mp::Package &package, Z_GDU *zgdu_req) } -void yf::SRUtoZ3950::Impl::process(mp::Package &package) +yf::SRUtoZ3950::Frontend::Frontend() : m_in_use(true) { - Z_GDU *zgdu_req = package.request().get(); +} + +yf::SRUtoZ3950::Frontend::~Frontend() +{ +} + + +yf::SRUtoZ3950::FrontendPtr yf::SRUtoZ3950::Impl::get_frontend( + mp::Package &package) +{ + boost::mutex::scoped_lock lock(m_mutex_session); - // ignoring all non HTTP_Request packages - if (!zgdu_req || !(zgdu_req->which == Z_GDU_HTTP_Request)) + std::map::iterator it; + + while (true) { - package.move(); - return; + it = m_clients.find(package.session()); + if (it == m_clients.end()) + break; + + if (!it->second->m_in_use) + { + it->second->m_in_use = true; + return it->second; + } + m_cond_session_ready.wait(lock); } - - // only working on HTTP_Request packages now + FrontendPtr f(new Frontend); + m_clients[package.session()] = f; + f->m_in_use = true; + return f; +} - // see if HTTP request is already being executed.. - // we consider only the SRU - GET case.. - if (zgdu_req->u.HTTP_Request->content_len == 0) +void yf::SRUtoZ3950::Impl::release_frontend(mp::Package &package) +{ + boost::mutex::scoped_lock lock(m_mutex_session); + std::map::iterator it; + + it = m_clients.find(package.session()); + if (it != m_clients.end()) { - const char *path = zgdu_req->u.HTTP_Request->path; - boost::mutex::scoped_lock lock(m_mutex); - while (1) + if (package.session().is_closed()) { - ActiveUrlMap::iterator it = m_active_urls.find(path); - if (it == m_active_urls.end()) - { - m_active_urls[path] = 1; - break; - } - yaz_log(YLOG_LOG, "Waiting for %s to complete", path); - m_cond_url_ready.wait(lock); + m_clients.erase(it); + } + else + { + it->second->m_in_use = false; } + m_cond_session_ready.notify_all(); } - sru(package, zgdu_req); - if (zgdu_req->u.HTTP_Request->content_len == 0) - { - const char *path = zgdu_req->u.HTTP_Request->path; - boost::mutex::scoped_lock lock(m_mutex); +} - ActiveUrlMap::iterator it = m_active_urls.find(path); +void yf::SRUtoZ3950::Impl::process(mp::Package &package) +{ + FrontendPtr f = get_frontend(package); + + Z_GDU *zgdu_req = package.request().get(); - m_active_urls.erase(it); - m_cond_url_ready.notify_all(); + if (zgdu_req && zgdu_req->which == Z_GDU_HTTP_Request) + { + if (zgdu_req->u.HTTP_Request->content_len == 0) + { + const char *path = zgdu_req->u.HTTP_Request->path; + boost::mutex::scoped_lock lock(m_url_mutex); + while (1) + { + ActiveUrlMap::iterator it = m_active_urls.find(path); + if (it == m_active_urls.end()) + { + m_active_urls[path] = 1; + break; + } + yaz_log(YLOG_LOG, "Waiting for %s to complete", path); + m_cond_url_ready.wait(lock); + } + } + sru(package, zgdu_req); + if (zgdu_req && zgdu_req->u.HTTP_Request->content_len == 0) + { + const char *path = zgdu_req->u.HTTP_Request->path; + boost::mutex::scoped_lock lock(m_url_mutex); + + ActiveUrlMap::iterator it = m_active_urls.find(path); + + m_active_urls.erase(it); + m_cond_url_ready.notify_all(); + } } + release_frontend(package); } - bool yf::SRUtoZ3950::Impl::z3950_init_request(mp::Package &package, mp::odr &odr_en, diff --git a/src/filter_sru_to_z3950.hpp b/src/filter_sru_to_z3950.hpp index a4fde58..8d93233 100644 --- a/src/filter_sru_to_z3950.hpp +++ b/src/filter_sru_to_z3950.hpp @@ -20,14 +20,17 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #define FILTER_SRU_TO_Z3950_HPP #include +#include #include namespace metaproxy_1 { namespace filter { class SRUtoZ3950 : public Base { + class Frontend; class Impl; boost::scoped_ptr m_p; + typedef boost::shared_ptr FrontendPtr; public: SRUtoZ3950(); ~SRUtoZ3950(); -- 1.7.10.4