sru_z3950: serialize requests
authorAdam Dickmeiss <adam@indexdata.dk>
Mon, 16 Apr 2012 12:56:48 +0000 (14:56 +0200)
committerAdam Dickmeiss <adam@indexdata.dk>
Tue, 17 Apr 2012 07:59:00 +0000 (09:59 +0200)
This is to ensure that pipelined HTTP requests are handled
properly.

src/filter_sru_to_z3950.cpp
src/filter_sru_to_z3950.hpp

index a4aab88..75c0da4 100644 (file)
@@ -48,18 +48,32 @@ namespace yf = mp::filter;
 
 namespace metaproxy_1 {
     namespace filter {
+        class SRUtoZ3950::Frontend : boost::noncopyable {
+            friend class Impl;
+            bool m_in_use;
+        public:
+            Frontend();
+            ~Frontend();
+        };
         class SRUtoZ3950::Impl {
         public:
             void configure(const xmlNode *xmlnode);
             void process(metaproxy_1::Package &package);
         private:
+            FrontendPtr get_frontend(mp::Package &package);
+            void release_frontend(mp::Package &package);
             std::map<std::string, const xmlNode *> m_database_explain;
 
             typedef std::map<std::string, int> ActiveUrlMap;
 
-            boost::mutex m_mutex;
+            boost::mutex m_url_mutex;
             boost::condition m_cond_url_ready;
             ActiveUrlMap m_active_urls;
+
+
+            boost::mutex m_mutex_session;
+            boost::condition m_cond_session_ready;
+            std::map<mp::Session, FrontendPtr> m_clients;            
         private:
             void sru(metaproxy_1::Package &package, Z_GDU *zgdu_req);
             int z3950_build_query(
@@ -319,51 +333,100 @@ void yf::SRUtoZ3950::Impl::sru(mp::Package &package, Z_GDU *zgdu_req)
 }
 
 
-void yf::SRUtoZ3950::Impl::process(mp::Package &package)
+yf::SRUtoZ3950::Frontend::Frontend() :  m_in_use(true)
 {
-    Z_GDU *zgdu_req = package.request().get();
+}
+
+yf::SRUtoZ3950::Frontend::~Frontend()
+{
+}
+
+
+yf::SRUtoZ3950::FrontendPtr yf::SRUtoZ3950::Impl::get_frontend(
+    mp::Package &package)
+{
+    boost::mutex::scoped_lock lock(m_mutex_session);
 
-    // ignoring all non HTTP_Request packages
-    if (!zgdu_req || !(zgdu_req->which == Z_GDU_HTTP_Request))
+    std::map<mp::Session,yf::SRUtoZ3950::FrontendPtr>::iterator it;
+    
+    while (true)
     {
-        package.move();
-        return;
+        it = m_clients.find(package.session());
+        if (it == m_clients.end())
+            break;
+        
+        if (!it->second->m_in_use)
+        {
+            it->second->m_in_use = true;
+            return it->second;
+        }
+        m_cond_session_ready.wait(lock);
     }
-    
-    // only working on HTTP_Request packages now
+    FrontendPtr f(new Frontend);
+    m_clients[package.session()] = f;
+    f->m_in_use = true;
+    return f;
+}
 
-    // see if HTTP request is already being executed..
-    // we consider only the SRU - GET case..
-    if (zgdu_req->u.HTTP_Request->content_len == 0)
+void yf::SRUtoZ3950::Impl::release_frontend(mp::Package &package)
+{
+    boost::mutex::scoped_lock lock(m_mutex_session);
+    std::map<mp::Session,FrontendPtr>::iterator it;
+    
+    it = m_clients.find(package.session());
+    if (it != m_clients.end())
     {
-        const char *path = zgdu_req->u.HTTP_Request->path;
-        boost::mutex::scoped_lock lock(m_mutex);
-        while (1)
+        if (package.session().is_closed())
         {
-            ActiveUrlMap::iterator it = m_active_urls.find(path);
-            if (it == m_active_urls.end())
-            {
-                m_active_urls[path] = 1;
-                break;
-            }
-            yaz_log(YLOG_LOG, "Waiting for %s to complete", path);
-            m_cond_url_ready.wait(lock);
+            m_clients.erase(it);
+        }
+        else
+        {
+            it->second->m_in_use = false;
         }
+        m_cond_session_ready.notify_all();
     }
-    sru(package, zgdu_req);
-    if (zgdu_req->u.HTTP_Request->content_len == 0)
-    {
-        const char *path = zgdu_req->u.HTTP_Request->path;
-        boost::mutex::scoped_lock lock(m_mutex);
+}
 
-        ActiveUrlMap::iterator it = m_active_urls.find(path);
+void yf::SRUtoZ3950::Impl::process(mp::Package &package)
+{
+    FrontendPtr f = get_frontend(package);
+
+    Z_GDU *zgdu_req = package.request().get();
 
-        m_active_urls.erase(it);
-        m_cond_url_ready.notify_all();
+    if (zgdu_req && zgdu_req->which == Z_GDU_HTTP_Request)
+    {
+        if (zgdu_req->u.HTTP_Request->content_len == 0)
+        {
+            const char *path = zgdu_req->u.HTTP_Request->path;
+            boost::mutex::scoped_lock lock(m_url_mutex);
+            while (1)
+            {
+                ActiveUrlMap::iterator it = m_active_urls.find(path);
+                if (it == m_active_urls.end())
+                {
+                    m_active_urls[path] = 1;
+                    break;
+                }
+                yaz_log(YLOG_LOG, "Waiting for %s to complete", path);
+                m_cond_url_ready.wait(lock);
+            }
+        }
+        sru(package, zgdu_req);
+        if (zgdu_req && zgdu_req->u.HTTP_Request->content_len == 0)
+        {
+            const char *path = zgdu_req->u.HTTP_Request->path;
+            boost::mutex::scoped_lock lock(m_url_mutex);
+            
+            ActiveUrlMap::iterator it = m_active_urls.find(path);
+            
+            m_active_urls.erase(it);
+            m_cond_url_ready.notify_all();
+        }
     }
+    release_frontend(package);
 }
 
-
 bool 
 yf::SRUtoZ3950::Impl::z3950_init_request(mp::Package &package, 
                                          mp::odr &odr_en,
index a4fde58..8d93233 100644 (file)
@@ -20,14 +20,17 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 #define FILTER_SRU_TO_Z3950_HPP
 
 #include <boost/scoped_ptr.hpp>
+#include <boost/shared_ptr.hpp>
 
 #include <metaproxy/filter.hpp>
 
 namespace metaproxy_1 {
     namespace filter {
         class SRUtoZ3950 : public Base {
+            class Frontend;
             class Impl;
             boost::scoped_ptr<Impl> m_p;
+            typedef boost::shared_ptr<Frontend> FrontendPtr;
         public:
             SRUtoZ3950();
             ~SRUtoZ3950();