Merge branch 'master' into graceful_stop
authorAdam Dickmeiss <adam@indexdata.dk>
Tue, 17 Apr 2012 10:49:12 +0000 (12:49 +0200)
committerAdam Dickmeiss <adam@indexdata.dk>
Tue, 17 Apr 2012 10:49:12 +0000 (12:49 +0200)
src/filter_frontend_net.cpp
src/filter_sru_to_z3950.cpp
src/filter_sru_to_z3950.hpp
src/filter_zoom.cpp
src/test_thread_pool_observer.cpp
src/thread_pool_observer.cpp
src/thread_pool_observer.hpp

index 216d740..c7450c1 100644 (file)
@@ -103,7 +103,7 @@ namespace metaproxy_1 {
         ~ThreadPoolPackage();
         IThreadPoolMsg *handle();
         void result(const char *t_info);
-        
+        bool cleanup(void *info);
     private:
         yaz_timing_t timer;
         mp::ZAssocChild *m_assoc_child;
@@ -155,6 +155,13 @@ mp::ThreadPoolPackage::~ThreadPoolPackage()
     delete m_package;
 }
 
+bool mp::ThreadPoolPackage::cleanup(void *info)
+{
+    mp::Session *ses = (mp::Session *) info;
+
+    return *ses == m_package->session();
+}
+
 void mp::ThreadPoolPackage::result(const char *t_info)
 {
     m_assoc_child->m_no_requests--;
@@ -284,7 +291,8 @@ void mp::ZAssocChild::failNotify()
     mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this,
                                                           m_msg_config);
     p->copy_route(*m_package);
-    m_thread_pool_observer->put(tp);  
+    m_thread_pool_observer->cleanup(tp, &m_session);
+    m_thread_pool_observer->put(tp);
 }
 
 void mp::ZAssocChild::timeoutNotify()
index 93196e1..75c0da4 100644 (file)
@@ -48,18 +48,32 @@ namespace yf = mp::filter;
 
 namespace metaproxy_1 {
     namespace filter {
+        class SRUtoZ3950::Frontend : boost::noncopyable {
+            friend class Impl;
+            bool m_in_use;
+        public:
+            Frontend();
+            ~Frontend();
+        };
         class SRUtoZ3950::Impl {
         public:
             void configure(const xmlNode *xmlnode);
             void process(metaproxy_1::Package &package);
         private:
+            FrontendPtr get_frontend(mp::Package &package);
+            void release_frontend(mp::Package &package);
             std::map<std::string, const xmlNode *> m_database_explain;
 
             typedef std::map<std::string, int> ActiveUrlMap;
 
-            boost::mutex m_mutex;
+            boost::mutex m_url_mutex;
             boost::condition m_cond_url_ready;
             ActiveUrlMap m_active_urls;
+
+
+            boost::mutex m_mutex_session;
+            boost::condition m_cond_session_ready;
+            std::map<mp::Session, FrontendPtr> m_clients;            
         private:
             void sru(metaproxy_1::Package &package, Z_GDU *zgdu_req);
             int z3950_build_query(
@@ -242,8 +256,6 @@ void yf::SRUtoZ3950::Impl::sru(mp::Package &package, Z_GDU *zgdu_req)
     if (sru_pdu_req->which == Z_SRW_explain_request)
     {
         Z_SRW_explainRequest *er_req = sru_pdu_req->u.explain_request;
-        //mp_util::build_simple_explain(package, odr_en, sru_pdu_res, 
-        //                           sruinfo, er_req);
         mp_util::build_sru_explain(package, odr_en, sru_pdu_res, 
                                    sruinfo, explainnode, er_req);
     }
@@ -321,51 +333,100 @@ void yf::SRUtoZ3950::Impl::sru(mp::Package &package, Z_GDU *zgdu_req)
 }
 
 
-void yf::SRUtoZ3950::Impl::process(mp::Package &package)
+yf::SRUtoZ3950::Frontend::Frontend() :  m_in_use(true)
 {
-    Z_GDU *zgdu_req = package.request().get();
+}
+
+yf::SRUtoZ3950::Frontend::~Frontend()
+{
+}
+
+
+yf::SRUtoZ3950::FrontendPtr yf::SRUtoZ3950::Impl::get_frontend(
+    mp::Package &package)
+{
+    boost::mutex::scoped_lock lock(m_mutex_session);
 
-    // ignoring all non HTTP_Request packages
-    if (!zgdu_req || !(zgdu_req->which == Z_GDU_HTTP_Request))
+    std::map<mp::Session,yf::SRUtoZ3950::FrontendPtr>::iterator it;
+    
+    while (true)
     {
-        package.move();
-        return;
+        it = m_clients.find(package.session());
+        if (it == m_clients.end())
+            break;
+        
+        if (!it->second->m_in_use)
+        {
+            it->second->m_in_use = true;
+            return it->second;
+        }
+        m_cond_session_ready.wait(lock);
     }
-    
-    // only working on HTTP_Request packages now
+    FrontendPtr f(new Frontend);
+    m_clients[package.session()] = f;
+    f->m_in_use = true;
+    return f;
+}
 
-    // see if HTTP request is already being executed..
-    // we consider only the SRU - GET case..
-    if (zgdu_req->u.HTTP_Request->content_len == 0)
+void yf::SRUtoZ3950::Impl::release_frontend(mp::Package &package)
+{
+    boost::mutex::scoped_lock lock(m_mutex_session);
+    std::map<mp::Session,FrontendPtr>::iterator it;
+    
+    it = m_clients.find(package.session());
+    if (it != m_clients.end())
     {
-        const char *path = zgdu_req->u.HTTP_Request->path;
-        boost::mutex::scoped_lock lock(m_mutex);
-        while (1)
+        if (package.session().is_closed())
         {
-            ActiveUrlMap::iterator it = m_active_urls.find(path);
-            if (it == m_active_urls.end())
-            {
-                m_active_urls[path] = 1;
-                break;
-            }
-            yaz_log(YLOG_LOG, "Waiting for %s to complete", path);
-            m_cond_url_ready.wait(lock);
+            m_clients.erase(it);
         }
+        else
+        {
+            it->second->m_in_use = false;
+        }
+        m_cond_session_ready.notify_all();
     }
-    sru(package, zgdu_req);
-    if (zgdu_req->u.HTTP_Request->content_len == 0)
-    {
-        const char *path = zgdu_req->u.HTTP_Request->path;
-        boost::mutex::scoped_lock lock(m_mutex);
+}
 
-        ActiveUrlMap::iterator it = m_active_urls.find(path);
+void yf::SRUtoZ3950::Impl::process(mp::Package &package)
+{
+    FrontendPtr f = get_frontend(package);
+
+    Z_GDU *zgdu_req = package.request().get();
 
-        m_active_urls.erase(it);
-        m_cond_url_ready.notify_all();
+    if (zgdu_req && zgdu_req->which == Z_GDU_HTTP_Request)
+    {
+        if (zgdu_req->u.HTTP_Request->content_len == 0)
+        {
+            const char *path = zgdu_req->u.HTTP_Request->path;
+            boost::mutex::scoped_lock lock(m_url_mutex);
+            while (1)
+            {
+                ActiveUrlMap::iterator it = m_active_urls.find(path);
+                if (it == m_active_urls.end())
+                {
+                    m_active_urls[path] = 1;
+                    break;
+                }
+                yaz_log(YLOG_LOG, "Waiting for %s to complete", path);
+                m_cond_url_ready.wait(lock);
+            }
+        }
+        sru(package, zgdu_req);
+        if (zgdu_req && zgdu_req->u.HTTP_Request->content_len == 0)
+        {
+            const char *path = zgdu_req->u.HTTP_Request->path;
+            boost::mutex::scoped_lock lock(m_url_mutex);
+            
+            ActiveUrlMap::iterator it = m_active_urls.find(path);
+            
+            m_active_urls.erase(it);
+            m_cond_url_ready.notify_all();
+        }
     }
+    release_frontend(package);
 }
 
-
 bool 
 yf::SRUtoZ3950::Impl::z3950_init_request(mp::Package &package, 
                                          mp::odr &odr_en,
@@ -450,27 +511,14 @@ yf::SRUtoZ3950::Impl::z3950_init_request(mp::Package &package,
     return false;
 }
 
-bool 
-yf::SRUtoZ3950::Impl::z3950_close_request(mp::Package &package) const
+bool yf::SRUtoZ3950::Impl::z3950_close_request(mp::Package &package) const
 {
-    // prepare and close Z3950 package 
     Package z3950_package(package.session(), package.origin());
     z3950_package.copy_filter(package);
     z3950_package.session().close();
 
-    // set close APDU
-    //mp::odr odr_en(ODR_ENCODE);
-    //Z_APDU *apdu = zget_APDU(odr_en, Z_APDU_close);
-    //z3950_package.request() = apdu;
-
     z3950_package.move();
 
-    // check successful close response
-    //Z_GDU *z3950_gdu = z3950_package.response().get();
-    //if (z3950_gdu && z3950_gdu->which == Z_GDU_Z3950 
-    //    && z3950_gdu->u.z3950->which == Z_APDU_close)
-    //    return true;
-
     if (z3950_package.session().is_closed())
     {
         return true;
@@ -491,7 +539,6 @@ bool yf::SRUtoZ3950::Impl::z3950_search_request(mp::Package &package,
     Package z3950_package(package.session(), package.origin());
     z3950_package.copy_filter(package);
 
-    //add stuff in z3950 apdu
     Z_APDU *apdu = zget_APDU(odr_en, Z_APDU_searchRequest);
     Z_SearchRequest *z_searchRequest = apdu->u.searchRequest;
 
@@ -515,7 +562,6 @@ bool yf::SRUtoZ3950::Impl::z3950_search_request(mp::Package &package,
                 = odr_strdup(odr_en, "Default");
     }
 
-    // z3950'fy query
     Z_Query *z_query = (Z_Query *) odr_malloc(odr_en, sizeof(Z_Query));
     z_searchRequest->query = z_query;
  
@@ -532,17 +578,10 @@ bool yf::SRUtoZ3950::Impl::z3950_search_request(mp::Package &package,
 
     z3950_package.request() = apdu;
         
-    // send Z39.50 package off to backend
     z3950_package.move();
 
-
     Z_GDU *z3950_gdu = z3950_package.response().get();
 
-    //TODO: check success condition
-    //int yaz_diag_bib1_to_srw (int bib1_code);
-    //int yaz_diag_srw_to_bib1(int srw_code);
-    //Se kode i src/seshigh.c (srw_bend_search, srw_bend_init).
-
     if (!z3950_gdu || z3950_gdu->which != Z_GDU_Z3950 
         || z3950_gdu->u.z3950->which != Z_APDU_searchResponse
         || !z3950_gdu->u.z3950->u.searchResponse
@@ -555,30 +594,20 @@ bool yf::SRUtoZ3950::Impl::z3950_search_request(mp::Package &package,
         return false;
     }
     
-    // everything fine, continuing
     Z_SearchResponse *sr = z3950_gdu->u.z3950->u.searchResponse;
 
-    // checking non surrogate diagnostics in Z3950 search response package
     if (!z3950_to_srw_diagnostics_ok(odr_en, sru_pdu_res->u.response, 
                                      sr->records))
     {
         return false;
     }
 
-    // Finally, roll on and srw'fy number of records
-    sru_pdu_res->u.response->numberOfRecords 
+    sru_pdu_res->u.response->numberOfRecords
         = odr_intdup(odr_en, *sr->resultCount);
-    
-    // srw'fy nextRecordPosition
-    //sru_pdu_res->u.response->nextRecordPosition 
-    //    = (int *) odr_malloc(odr_en, sizeof(int *));
-    //*(sru_pdu_res->u.response->nextRecordPosition) = 1;
-
     return true;
 }
 
-bool 
-yf::SRUtoZ3950::Impl::z3950_present_request(
+bool yf::SRUtoZ3950::Impl::z3950_present_request(
     mp::Package &package, 
     mp::odr &odr_en,
     Z_SRW_PDU *sru_pdu_res,
@@ -780,8 +809,9 @@ yf::SRUtoZ3950::Impl::z3950_present_request(
     return true;
 }
 
-int yf::SRUtoZ3950::Impl::z3950_build_query(mp::odr &odr_en, Z_Query *z_query, 
-                                            const Z_SRW_searchRetrieveRequest *req
+int yf::SRUtoZ3950::Impl::z3950_build_query(
+    mp::odr &odr_en, Z_Query *z_query, 
+    const Z_SRW_searchRetrieveRequest *req
     ) const
 {        
     if (req->query_type == Z_SRW_query_type_cql)
@@ -822,12 +852,11 @@ int yf::SRUtoZ3950::Impl::z3950_build_query(mp::odr &odr_en, Z_Query *z_query,
     return YAZ_SRW_MANDATORY_PARAMETER_NOT_SUPPLIED;
 }
 
-
-bool 
-yf::SRUtoZ3950::Impl::z3950_to_srw_diagnostics_ok(mp::odr &odr_en, 
-                                                  Z_SRW_searchRetrieveResponse 
-                                                  *sru_res,
-                                                  Z_Records *records) const
+bool yf::SRUtoZ3950::Impl::z3950_to_srw_diagnostics_ok(
+    mp::odr &odr_en, 
+    Z_SRW_searchRetrieveResponse 
+    *sru_res,
+    Z_Records *records) const
 {
     // checking non surrogate diagnostics in Z3950 present response package
     if (records 
@@ -841,11 +870,10 @@ yf::SRUtoZ3950::Impl::z3950_to_srw_diagnostics_ok(mp::odr &odr_en,
     return true;
 }
 
-
-int 
-yf::SRUtoZ3950::Impl::z3950_to_srw_diag(mp::odr &odr_en, 
-                                        Z_SRW_searchRetrieveResponse *sru_res,
-                                        Z_DefaultDiagFormat *ddf) const
+int yf::SRUtoZ3950::Impl::z3950_to_srw_diag(
+    mp::odr &odr_en, 
+    Z_SRW_searchRetrieveResponse *sru_res,
+    Z_DefaultDiagFormat *ddf) const
 {
     int bib1_code = *ddf->condition;
     sru_res->num_diagnostics = 1;
@@ -857,8 +885,6 @@ yf::SRUtoZ3950::Impl::z3950_to_srw_diag(mp::odr &odr_en,
     return 0;
 }
 
-
-
 static mp::filter::Base* filter_creator()
 {
     return new mp::filter::SRUtoZ3950;
index 16fb7e7..8d93233 100644 (file)
@@ -16,19 +16,21 @@ along with this program; if not, write to the Free Software
 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 */
 
-// Filter that does nothing. Use as sru_to_z3950 for new filters 
 #ifndef FILTER_SRU_TO_Z3950_HPP
 #define FILTER_SRU_TO_Z3950_HPP
 
 #include <boost/scoped_ptr.hpp>
+#include <boost/shared_ptr.hpp>
 
 #include <metaproxy/filter.hpp>
 
 namespace metaproxy_1 {
     namespace filter {
         class SRUtoZ3950 : public Base {
+            class Frontend;
             class Impl;
             boost::scoped_ptr<Impl> m_p;
+            typedef boost::shared_ptr<Frontend> FrontendPtr;
         public:
             SRUtoZ3950();
             ~SRUtoZ3950();
index b3c989f..bf6f4ab 100644 (file)
@@ -1407,28 +1407,20 @@ Z_Records *yf::Zoom::Frontend::get_records(Package &package,
     {  // only return records if no error and at least one record
 
         const char *xsl_parms[3];
-        char cproxy_host[1024];
-
+        mp::wrbuf cproxy_host;
+        
         if (b->enable_cproxy && b->content_session_id.length())
         {
-            sprintf(cproxy_host, "%s.%s/",
-                    b->content_session_id.c_str(),
-                    m_p->content_proxy_server.c_str());
-            
-            char *q_cproxy_host = (char *) 
-                odr_malloc(odr, strlen(cproxy_host) + 3);
-            strcpy(q_cproxy_host, "\"");
-            strcat(q_cproxy_host, cproxy_host);
-            strcat(q_cproxy_host, "\"");
-
+            wrbuf_printf(cproxy_host, "\"%s.%s/\"",
+                         b->content_session_id.c_str(),
+                         m_p->content_proxy_server.c_str());
             xsl_parms[0] = "cproxyhost";
-            xsl_parms[1] = q_cproxy_host;
+            xsl_parms[1] = wrbuf_cstr(cproxy_host);
             xsl_parms[2] = 0;
         }
         else
         {
             xsl_parms[0] = 0;
-            *cproxy_host = '\0';
         }
 
         char *odr_database = odr_strdup(odr,
index 9700db1..89f8f04 100644 (file)
@@ -40,6 +40,7 @@ class My_Msg : public mp::IThreadPoolMsg {
 public:
     mp::IThreadPoolMsg *handle();
     void result(const char *t_info);
+    bool cleanup(void *info);
     int m_val;
     My_Timer_Thread *m_timer;
 };
@@ -70,6 +71,11 @@ mp::IThreadPoolMsg *My_Msg::handle()
     return res;
 }
 
+bool My_Msg::cleanup(void *info)
+{
+    return false;
+}
+
 void My_Msg::result(const char *t_info)
 {
     m_timer->m_sum += m_val;
index d630fd9..ce37235 100644 (file)
@@ -206,14 +206,30 @@ void ThreadPoolSocketObserver::run(void *p)
     }
 }
 
+void ThreadPoolSocketObserver::cleanup(IThreadPoolMsg *m, void *info)
+{
+    boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+
+    std::deque<IThreadPoolMsg *>::iterator it = m_p->m_input.begin();
+    while (it != m_p->m_input.end())
+    {
+        if ((*it)->cleanup(info))
+            it = m_p->m_input.erase(it);
+        else
+            it++;
+    }
+}
+
 void ThreadPoolSocketObserver::put(IThreadPoolMsg *m)
 {
     boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data);
+
     while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread)
         m_p->m_cond_input_full.wait(input_lock);
     m_p->m_input.push_back(m);
     m_p->m_cond_input_data.notify_one();
 }
+
 /*
  * Local variables:
  * c-basic-offset: 4
index 67aee0b..73ee78f 100644 (file)
@@ -30,6 +30,7 @@ namespace metaproxy_1 {
         virtual IThreadPoolMsg *handle() = 0;
         virtual void result(const char *info) = 0;
         virtual ~IThreadPoolMsg();
+        virtual bool cleanup(void *info) = 0;
     };
 
     class ThreadPoolSocketObserver : public yazpp_1::ISocketObserver {
@@ -40,6 +41,7 @@ namespace metaproxy_1 {
                                  int no_threads);
         virtual ~ThreadPoolSocketObserver();
         void put(IThreadPoolMsg *m);
+        void cleanup(IThreadPoolMsg *m, void *info);
         IThreadPoolMsg *get();
         void run(void *p);
     private: