session_shared: only reuse sets with matching db
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
index 720a71d..55c45a7 100644 (file)
@@ -1,5 +1,5 @@
 /* This file is part of Metaproxy.
-   Copyright (C) 2005-2009 Index Data
+   Copyright (C) 2005-2012 Index Data
 
 Metaproxy is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free
@@ -18,8 +18,8 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
 #include "config.hpp"
 
-#include "filter.hpp"
-#include "package.hpp"
+#include <metaproxy/filter.hpp>
+#include <metaproxy/package.hpp>
 
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition.hpp>
@@ -28,7 +28,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 #include <boost/shared_ptr.hpp>
 #include <boost/format.hpp>
 
-#include "util.hpp"
+#include <metaproxy/util.hpp>
 #include "filter_session_shared.hpp"
 
 #include <yaz/log.h>
@@ -85,9 +85,10 @@ namespace metaproxy_1 {
                 const yazpp_1::Yaz_Z_Query &query);
             bool search(
                 Package &frontend_package,
+                Package &search_package,
                 const Z_APDU *apdu_req,
                 const BackendInstancePtr bp,
-                bool &fatal_error);
+                Z_Records **z_records);
         };
         // backend connection instance
         class SessionShared::BackendInstance {
@@ -103,6 +104,7 @@ namespace metaproxy_1 {
             time_t m_time_last_use;
             mp::Package * m_close_package;
             ~BackendInstance();
+            void timestamp();
         };
         // backends of some class (all with same InitKey)
         class SessionShared::BackendClass : boost::noncopyable {
@@ -160,7 +162,8 @@ namespace metaproxy_1 {
                          BackendInstancePtr &found_backend,
                          BackendSetPtr &found_set);
             void override_set(BackendInstancePtr &found_backend,
-                              std::string &result_set_id);
+                              std::string &result_set_id,
+                              const Databases &databases);
 
             Rep *m_p;
             BackendClassPtr m_backend_class;
@@ -179,6 +182,7 @@ namespace metaproxy_1 {
         private:
             void init(Package &package, const Z_GDU *gdu,
                       FrontendPtr frontend);
+            void start();
             boost::mutex m_mutex;
             boost::condition m_cond_session_ready;
             std::map<mp::Session, FrontendPtr> m_clients;
@@ -189,6 +193,7 @@ namespace metaproxy_1 {
             int m_resultset_ttl;
             int m_resultset_max;
             int m_session_ttl;
+            bool m_optimize_search;
         };
     }
 }
@@ -328,10 +333,15 @@ yf::SessionShared::BackendClass::get_backend(
 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
 {
     backend->m_in_use = true;
-    time(&backend->m_time_last_use);
     backend->m_sequence_this = m_sequence_top++;
 }
 
+void yf::SessionShared::BackendInstance::timestamp()
+{
+    assert(m_in_use);
+    time(&m_time_last_use);
+}
+
 yf::SessionShared::BackendInstance::~BackendInstance()
 {
     delete m_close_package;
@@ -377,13 +387,17 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba
 
     m_named_result_sets = false;
     Z_GDU *gdu = init_package.response().get();
-    if (!init_package.session().is_closed()
-        && gdu && gdu->which == Z_GDU_Z3950 
-        && gdu->u.z3950->which == Z_APDU_initResponse)
+    if (init_package.session().is_closed())
     {
+        /* already closed. We don't know why */
+        return null;
+    }
+    else if (gdu && gdu->which == Z_GDU_Z3950 
+             && gdu->u.z3950->which == Z_APDU_initResponse
+             && *gdu->u.z3950->u.initResponse->result)
+    {
+        /* successful init response */
         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
-        if (!*res->result)
-            return null;
         m_init_response = gdu->u.z3950;
         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
         {
@@ -392,7 +406,10 @@ yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_ba
     }
     else
     {
-        // did not receive an init response or closed
+        /* not init or init rejected */
+        init_package.copy_filter(frontend_package);
+        init_package.session().close();
+        init_package.move();
         return null;
     }
     bp->m_in_use = true;
@@ -491,16 +508,18 @@ yf::SessionShared::BackendSet::BackendSet(
     timestamp();
 }
 
+static int get_diagnostic(Z_DefaultDiagFormat *r)
+{
+    return *r->condition;
+}
+
 bool yf::SessionShared::BackendSet::search(
     mp::Package &frontend_package,
+    mp::Package &search_package,
     const Z_APDU *frontend_apdu,
     const BackendInstancePtr bp,
-    bool & fatal_error)
+    Z_Records **z_records)
 {
-    Package search_package(bp->m_session, frontend_package.origin());
-
-    search_package.copy_filter(frontend_package);
-
     mp::odr odr;
     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
     Z_SearchRequest *req = apdu_req->u.searchRequest;
@@ -516,49 +535,21 @@ bool yf::SessionShared::BackendSet::search(
     for (; it != m_databases.end(); it++)
         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
 
+    if (frontend_apdu->which == Z_APDU_searchRequest)
+        req->preferredRecordSyntax =
+            frontend_apdu->u.searchRequest->preferredRecordSyntax;
+
     search_package.request() = apdu_req;
 
     search_package.move();
-    fatal_error = false;  // assume backend session is good
 
-    Z_Records *z_records_diag = 0;
     Z_GDU *gdu = search_package.response().get();
     if (!search_package.session().is_closed()
         && gdu && gdu->which == Z_GDU_Z3950 
         && gdu->u.z3950->which == Z_APDU_searchResponse)
     {
         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
-        if (b_resp->records)
-        {
-            if (b_resp->records->which == Z_Records_NSD 
-                || b_resp->records->which == Z_Records_multipleNSD)
-                z_records_diag = b_resp->records;
-        }
-        if (z_records_diag)
-        {
-            // there could be diagnostics that are so bad.. that 
-            // we simply mark the error as fatal..  For now we assume
-            // we can resume
-            if (frontend_apdu->which == Z_APDU_searchRequest)
-            {
-                Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu, 
-                                                           0, 0);
-                Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
-                *f_resp->searchStatus = *b_resp->searchStatus;
-                f_resp->records = z_records_diag;
-                frontend_package.response() = f_apdu;
-                return false;
-            }
-            if (frontend_apdu->which == Z_APDU_presentRequest)
-            {
-                Z_APDU *f_apdu = odr.create_presentResponse(frontend_apdu, 
-                                                            0, 0);
-                Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
-                f_resp->records = z_records_diag;
-                frontend_package.response() = f_apdu;
-                return false;
-            }
-        }
+        *z_records = b_resp->records;
         m_result_set_size = *b_resp->resultCount;
         return true;
     }
@@ -573,19 +564,20 @@ bool yf::SessionShared::BackendSet::search(
         f_apdu = odr.create_close(
             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
     frontend_package.response() = f_apdu;
-    fatal_error = true; // weired response.. bad backend
     return false;
 }
 
 void yf::SessionShared::Frontend::override_set(
     BackendInstancePtr &found_backend,
-    std::string &result_set_id)
+    std::string &result_set_id,
+    const Databases &databases)
 {
     BackendClassPtr bc = m_backend_class;
     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
     time_t now;
     time(&now);
-    
+
+    size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
     for (; it != bc->m_backend_list.end(); it++)
     {
         if (!(*it)->m_in_use)
@@ -593,8 +585,10 @@ void yf::SessionShared::Frontend::override_set(
             BackendSetList::iterator set_it = (*it)->m_sets.begin();
             for (; set_it != (*it)->m_sets.end(); set_it++)
             {
-                if (now >= (*set_it)->m_time_last_use &&
-                    now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
+                if ((max_sets > 1 || (*set_it)->m_databases == databases)
+                    &&
+                    (now < (*set_it)->m_time_last_use ||
+                     now - (*set_it)->m_time_last_use >= bc->m_backend_set_ttl))
                 {
                     found_backend = *it;
                     result_set_id = (*set_it)->m_result_set_id;
@@ -604,7 +598,6 @@ void yf::SessionShared::Frontend::override_set(
             }
         }
     }
-    size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
     {
         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
@@ -638,31 +631,33 @@ restart:
     BackendClassPtr bc = m_backend_class;
     {
         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
-     
-        // look at each backend and see if we have a similar search
-        BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
         
-        for (; it != bc->m_backend_list.end(); it++)
+        if (m_p->m_optimize_search)
         {
-            if (!(*it)->m_in_use)
+            // look at each backend and see if we have a similar search
+            BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
+            for (; it != bc->m_backend_list.end(); it++)
             {
-                BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
-                for (; set_it != (*it)->m_sets.end(); set_it++)
+                if (!(*it)->m_in_use)
                 {
-                    if ((*set_it)->m_databases == databases 
-                        && query.match(&(*set_it)->m_query))
+                    BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
+                    for (; set_it != (*it)->m_sets.end(); set_it++)
                     {
-                        found_set = *set_it;
-                        found_backend = *it;
-                        bc->use_backend(found_backend);
-                        found_set->timestamp();
-                        // found matching set. No need to search again
-                        return;
+                        if ((*set_it)->m_databases == databases
+                            && query.match(&(*set_it)->m_query))
+                        {
+                            found_set = *set_it;
+                            found_backend = *it;
+                            bc->use_backend(found_backend);
+                            found_set->timestamp();
+                            // found matching set. No need to search again
+                            return;
+                        }
                     }
                 }
             }
         }
-        override_set(found_backend, result_set_id);
+        override_set(found_backend, result_set_id, databases);
         if (found_backend)
             bc->use_backend(found_backend);
     }
@@ -702,17 +697,77 @@ restart:
         else
             result_set_id = "default";
     }
+    found_backend->timestamp();
+
     // we must search ...
     BackendSetPtr new_set(new BackendSet(result_set_id,
                                          databases, query));
-    bool fatal_error = false;
-    if (!new_set->search(package, apdu_req, found_backend, fatal_error))
+    Z_Records *z_records = 0;
+
+    Package search_package(found_backend->m_session, package.origin());
+    search_package.copy_filter(package);
+
+    if (!new_set->search(package, search_package,
+                         apdu_req, found_backend, &z_records))
     {
-        if (fatal_error)
+        bc->remove_backend(found_backend);
+        return; // search error 
+    }
+
+    if (z_records)
+    {
+        int condition = 0;
+        if (z_records->which == Z_Records_NSD)
+        {
+            condition =
+                get_diagnostic(z_records->u.nonSurrogateDiagnostic);
+        }
+        else if (z_records->which == Z_Records_multipleNSD)
+        {
+            if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1
+                && 
+                
+                z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which ==
+                Z_DiagRec_defaultFormat)
+            {
+                condition = get_diagnostic(
+                    z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat);
+                
+            }
+        }
+        if (!session_restarted &&
+            condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR)
+        {
             bc->remove_backend(found_backend);
-        else
+            session_restarted = true;
+            found_backend.reset();
+            goto restart;
+
+        }
+
+        if (condition)
+        {
+            mp::odr odr;
+            if (apdu_req->which == Z_APDU_searchRequest)
+            {
+                Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 
+                                                           0, 0);
+                Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
+                *f_resp->searchStatus = Z_SearchResponse_none;
+                f_resp->records = z_records;
+                package.response() = f_apdu;
+            }
+            if (apdu_req->which == Z_APDU_presentRequest)
+            {
+                Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 
+                                                            0, 0);
+                Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
+                f_resp->records = z_records;
+                package.response() = f_apdu;
+            }
             bc->release_backend(found_backend);
-        return; // search error 
+            return; // search error 
+        }
     }
     if (!session_restarted && new_set->m_result_set_size < 0)
     {
@@ -821,7 +876,8 @@ void yf::SessionShared::Frontend::present(mp::Package &package,
         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
 
-        yaz_log(YLOG_LOG, "Found %d+%d records in cache %p",
+        yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF 
+                " records in cache %p",
                 *req->resultSetStartPoint,                      
                 *req->numberOfRecordsRequested,
                 &found_set->m_record_cache);        
@@ -837,6 +893,8 @@ void yf::SessionShared::Frontend::present(mp::Package &package,
         bc->release_backend(found_backend);
         return;
     }
+
+    found_backend->timestamp();
                               
     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
@@ -872,7 +930,8 @@ void yf::SessionShared::Frontend::present(mp::Package &package,
 
         if (b_resp->records && b_resp->records->which ==  Z_Records_DBOSD)
         {
-            yaz_log(YLOG_LOG, "Adding %d+%d records to cache %p",
+            yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF
+                    " records to cache %p",
                     *req->resultSetStartPoint,                      
                     *f_resp->numberOfRecordsReturned,
                     &found_set->m_record_cache);        
@@ -909,6 +968,7 @@ void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
     else
     {
         Package scan_package(backend->m_session, frontend_package.origin());
+        backend->timestamp();
         scan_package.copy_filter(frontend_package);
         scan_package.request() = apdu_req;
         scan_package.move();
@@ -946,8 +1006,7 @@ void yf::SessionShared::BackendClass::expire_class()
         {
             bit++;
         }
-        else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
-            || (now < last_use))
+        else if (now < last_use || now - last_use > m_backend_expiry_ttl)
         {
             mp::odr odr;
             (*bit)->m_close_package->response() = odr.create_close(
@@ -970,7 +1029,7 @@ void yf::SessionShared::Rep::expire()
     {
         boost::xtime xt;
         boost::xtime_get(&xt, boost::TIME_UTC);
-        xt.sec += 30;
+        xt.sec += m_session_ttl / 3;
         boost::thread::sleep(xt);
         
         BackendClassMap::const_iterator b_it = m_backend_map.begin();
@@ -984,6 +1043,11 @@ yf::SessionShared::Rep::Rep()
     m_resultset_ttl = 30;
     m_resultset_max = 10;
     m_session_ttl = 90;
+    m_optimize_search = true;
+}
+
+void yf::SessionShared::Rep::start()
+{
     yf::SessionShared::Worker w(this);
     m_thrds.add_thread(new boost::thread(w));
 }
@@ -995,6 +1059,10 @@ yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
 yf::SessionShared::~SessionShared() {
 }
 
+void yf::SessionShared::start() const
+{
+    m_p->start();
+}
 
 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
 {
@@ -1112,7 +1180,8 @@ void yf::SessionShared::process(mp::Package &package) const
     m_p->release_frontend(package);
 }
 
-void yf::SessionShared::configure(const xmlNode *ptr, bool test_only)
+void yf::SessionShared::configure(const xmlNode *ptr, bool test_only,
+                                  const char *path)
 {
     for (ptr = ptr->children; ptr; ptr = ptr->next)
     {
@@ -1131,6 +1200,11 @@ void yf::SessionShared::configure(const xmlNode *ptr, bool test_only)
                     m_p->m_resultset_max = 
                         mp::xml::get_int(attr->children, 10);
                 }
+                else if (!strcmp((const char *) attr->name, "optimizesearch"))
+                {
+                    m_p->m_optimize_search =
+                        mp::xml::get_bool(attr->children, true);
+                }
                 else
                     throw mp::filter::FilterException(
                         "Bad attribute " + std::string((const char *)