session_shared: restart (error recovery) configurable
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
index 98d4497..d733f5e 100644 (file)
@@ -1,5 +1,5 @@
 /* This file is part of Metaproxy.
-   Copyright (C) 2005-2011 Index Data
+   Copyright (C) 2005-2012 Index Data
 
 Metaproxy is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free
@@ -104,6 +104,7 @@ namespace metaproxy_1 {
             time_t m_time_last_use;
             mp::Package * m_close_package;
             ~BackendInstance();
+            void timestamp();
         };
         // backends of some class (all with same InitKey)
         class SessionShared::BackendClass : boost::noncopyable {
@@ -161,7 +162,9 @@ namespace metaproxy_1 {
                          BackendInstancePtr &found_backend,
                          BackendSetPtr &found_set);
             void override_set(BackendInstancePtr &found_backend,
-                              std::string &result_set_id);
+                              std::string &result_set_id,
+                              const Databases &databases,
+                              bool out_of_sessions);
 
             Rep *m_p;
             BackendClassPtr m_backend_class;
@@ -192,6 +195,8 @@ namespace metaproxy_1 {
             int m_resultset_max;
             int m_session_ttl;
             bool m_optimize_search;
+            bool m_restart;
+            int m_session_max;
         };
     }
 }
@@ -331,10 +336,15 @@ yf::SessionShared::BackendClass::get_backend(
 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
 {
     backend->m_in_use = true;
-    time(&backend->m_time_last_use);
     backend->m_sequence_this = m_sequence_top++;
 }
 
+void yf::SessionShared::BackendInstance::timestamp()
+{
+    assert(m_in_use);
+    time(&m_time_last_use);
+}
+
 yf::SessionShared::BackendInstance::~BackendInstance()
 {
     delete m_close_package;
@@ -454,33 +464,42 @@ void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
         }
     }
     BackendClassPtr bc = frontend->m_backend_class;
-    BackendInstancePtr backend = bc->get_backend(package);
-    
+    BackendInstancePtr backend;
     mp::odr odr;
-    if (!backend)
+
+    // we only need to get init response from "first" target in
+    // backend class - the assumption being that init response is
+    // same for all
+    if (bc->m_init_response.get() == 0)
     {
-        Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
-        *apdu->u.initResponse->result = 0;
-        package.response() = apdu;
-        package.session().close();
+        backend = bc->get_backend(package);
     }
-    else
     {
         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
-        yazpp_1::GDU init_response = bc->m_init_response;
-        Z_GDU *response_gdu = init_response.get();
-        mp::util::transfer_referenceId(odr, gdu->u.z3950,
-                                       response_gdu->u.z3950);
-
-        Z_Options *server_options =
-            response_gdu->u.z3950->u.initResponse->options;
-        Z_Options *client_options = &frontend->m_init_options;
-
-        int i;
-        for (i = 0; i<30; i++)
-            if (!ODR_MASK_GET(client_options, i))
-                ODR_MASK_CLEAR(server_options, i);
-        package.response() = init_response;
+        if (bc->m_init_response.get() == 0)
+        {
+            Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
+            *apdu->u.initResponse->result = 0;
+            package.response() = apdu;
+            package.session().close();
+        }
+        else
+        {
+            yazpp_1::GDU init_response = bc->m_init_response;
+            Z_GDU *response_gdu = init_response.get();
+            mp::util::transfer_referenceId(odr, gdu->u.z3950,
+                                           response_gdu->u.z3950);
+            
+            Z_Options *server_options =
+                response_gdu->u.z3950->u.initResponse->options;
+            Z_Options *client_options = &frontend->m_init_options;
+            
+            int i;
+            for (i = 0; i < 30; i++)
+                if (!ODR_MASK_GET(client_options, i))
+                    ODR_MASK_CLEAR(server_options, i);
+            package.response() = init_response;
+        }
     }
     if (backend)
         bc->release_backend(backend);
@@ -562,13 +581,16 @@ bool yf::SessionShared::BackendSet::search(
 
 void yf::SessionShared::Frontend::override_set(
     BackendInstancePtr &found_backend,
-    std::string &result_set_id)
+    std::string &result_set_id,
+    const Databases &databases,
+    bool out_of_sessions)
 {
     BackendClassPtr bc = m_backend_class;
     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
     time_t now;
     time(&now);
-    
+
+    size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
     for (; it != bc->m_backend_list.end(); it++)
     {
         if (!(*it)->m_in_use)
@@ -576,8 +598,11 @@ void yf::SessionShared::Frontend::override_set(
             BackendSetList::iterator set_it = (*it)->m_sets.begin();
             for (; set_it != (*it)->m_sets.end(); set_it++)
             {
-                if (now >= (*set_it)->m_time_last_use &&
-                    now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
+                if ((max_sets > 1 || (*set_it)->m_databases == databases)
+                    &&
+                    (out_of_sessions ||
+                     now < (*set_it)->m_time_last_use ||
+                     now - (*set_it)->m_time_last_use >= bc->m_backend_set_ttl))
                 {
                     found_backend = *it;
                     result_set_id = (*set_it)->m_result_set_id;
@@ -587,7 +612,6 @@ void yf::SessionShared::Frontend::override_set(
             }
         }
     }
-    size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
     {
         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
@@ -618,9 +642,13 @@ void yf::SessionShared::Frontend::get_set(mp::Package &package,
 
 restart:
     std::string result_set_id;
+    bool out_of_sessions = false;
     BackendClassPtr bc = m_backend_class;
     {
         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
+
+        if ((int) bc->m_backend_list.size() >= m_p->m_session_max)
+            out_of_sessions = true;
         
         if (m_p->m_optimize_search)
         {
@@ -647,33 +675,38 @@ restart:
                 }
             }
         }
-        override_set(found_backend, result_set_id);
+        override_set(found_backend, result_set_id, databases, out_of_sessions);
         if (found_backend)
             bc->use_backend(found_backend);
     }
     if (!found_backend)
     {
-        // create a new backend set (and new set)
-        found_backend = bc->create_backend(package);
+        // create a new backend set (and new set) if we're not out of sessions
+        if (!out_of_sessions)
+            found_backend = bc->create_backend(package);
 
         if (!found_backend)
         {
             Z_APDU *f_apdu = 0;
             mp::odr odr;
+            const char *addinfo = 0;
+            
+            if (out_of_sessions)
+                addinfo = "session_shared: all sessions in use";
             if (apdu_req->which == Z_APDU_searchRequest)
             {
                 f_apdu = odr.create_searchResponse(
-                        apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
+                    apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
             }
             else if (apdu_req->which == Z_APDU_presentRequest)
             {
                 f_apdu = odr.create_presentResponse(
-                    apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
+                    apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
             }
             else
             {
                 f_apdu = odr.create_close(
-                    apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
+                    apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
             }
             package.response() = f_apdu;
             return;
@@ -687,6 +720,8 @@ restart:
         else
             result_set_id = "default";
     }
+    found_backend->timestamp();
+
     // we must search ...
     BackendSetPtr new_set(new BackendSet(result_set_id,
                                          databases, query));
@@ -723,9 +758,10 @@ restart:
                 
             }
         }
-        if (!session_restarted &&
+        if (m_p->m_restart && !session_restarted &&
             condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR)
         {
+            package.log("session_shared", YLOG_LOG, "restart");
             bc->remove_backend(found_backend);
             session_restarted = true;
             found_backend.reset();
@@ -757,8 +793,9 @@ restart:
             return; // search error 
         }
     }
-    if (!session_restarted && new_set->m_result_set_size < 0)
+    if (m_p->m_restart && !session_restarted && new_set->m_result_set_size < 0)
     {
+        package.log("session_shared", YLOG_LOG, "restart");
         bc->remove_backend(found_backend);
         session_restarted = true;
         found_backend.reset();
@@ -800,7 +837,7 @@ void yf::SessionShared::Frontend::search(mp::Package &package,
     query.set_Z_Query(req->query);
     Databases databases;
     int i;
-    for (i = 0; i<req->num_databaseNames; i++)
+    for (i = 0; i < req->num_databaseNames; i++)
         databases.push_back(req->databaseNames[i]);
 
     BackendSetPtr found_set; // null
@@ -881,6 +918,8 @@ void yf::SessionShared::Frontend::present(mp::Package &package,
         bc->release_backend(found_backend);
         return;
     }
+
+    found_backend->timestamp();
                               
     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
@@ -954,6 +993,7 @@ void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
     else
     {
         Package scan_package(backend->m_session, frontend_package.origin());
+        backend->timestamp();
         scan_package.copy_filter(frontend_package);
         scan_package.request() = apdu_req;
         scan_package.move();
@@ -991,8 +1031,7 @@ void yf::SessionShared::BackendClass::expire_class()
         {
             bit++;
         }
-        else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
-            || (now < last_use))
+        else if (now < last_use || now - last_use > m_backend_expiry_ttl)
         {
             mp::odr odr;
             (*bit)->m_close_package->response() = odr.create_close(
@@ -1015,7 +1054,7 @@ void yf::SessionShared::Rep::expire()
     {
         boost::xtime xt;
         boost::xtime_get(&xt, boost::TIME_UTC);
-        xt.sec += 30;
+        xt.sec += m_session_ttl / 3;
         boost::thread::sleep(xt);
         
         BackendClassMap::const_iterator b_it = m_backend_map.begin();
@@ -1030,6 +1069,8 @@ yf::SessionShared::Rep::Rep()
     m_resultset_max = 10;
     m_session_ttl = 90;
     m_optimize_search = true;
+    m_restart = true;
+    m_session_max = 100;
 }
 
 void yf::SessionShared::Rep::start()
@@ -1191,6 +1232,10 @@ void yf::SessionShared::configure(const xmlNode *ptr, bool test_only,
                     m_p->m_optimize_search =
                         mp::xml::get_bool(attr->children, true);
                 }
+                else if (!strcmp((const char *) attr->name, "restart"))
+                {
+                    m_p->m_restart = mp::xml::get_bool(attr->children, true);
+                }
                 else
                     throw mp::filter::FilterException(
                         "Bad attribute " + std::string((const char *)
@@ -1204,7 +1249,10 @@ void yf::SessionShared::configure(const xmlNode *ptr, bool test_only,
             {
                 if (!strcmp((const char *) attr->name, "ttl"))
                     m_p->m_session_ttl = 
-                        mp::xml::get_int(attr->children, 120);
+                        mp::xml::get_int(attr->children, 90);
+                else if (!strcmp((const char *) attr->name, "max"))
+                    m_p->m_session_max = 
+                        mp::xml::get_int(attr->children, 100);
                 else
                     throw mp::filter::FilterException(
                         "Bad attribute " + std::string((const char *)