frontend_net: refactor scope of helper classes
[metaproxy-moved-to-github.git] / src / filter_frontend_net.cpp
index 1a91c07..f7b1bbb 100644 (file)
@@ -1,5 +1,5 @@
 /* This file is part of Metaproxy.
-   Copyright (C) 2005-2009 Index Data
+   Copyright (C) 2005-2012 Index Data
 
 Metaproxy is free software; you can redistribute it and/or modify it under
 the terms of the GNU General Public License as published by the Free
@@ -18,114 +18,152 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
 #include "config.hpp"
 
-#include "util.hpp"
+#include <sstream>
+#include <iomanip>
+#include <metaproxy/util.hpp>
 #include "pipe.hpp"
-#include "filter.hpp"
-#include "package.hpp"
+#include <metaproxy/filter.hpp>
+#include <metaproxy/package.hpp>
 #include "thread_pool_observer.hpp"
 #include "filter_frontend_net.hpp"
 #include <yazpp/z-assoc.h>
 #include <yazpp/pdu-assoc.h>
 #include <yazpp/socket-manager.h>
 #include <yazpp/limit-connect.h>
+#include <yaz/timing.h>
 #include <yaz/log.h>
+#include "gduutil.hpp"
 
 #include <iostream>
 
 namespace mp = metaproxy_1;
+namespace yf = metaproxy_1::filter;
 
 namespace metaproxy_1 {
-    class My_Timer_Thread;
-    class ZAssocServer;
     namespace filter {
+        class FrontendNet::Port {
+            friend class Rep;
+            friend class FrontendNet;
+            std::string port;
+            std::string route;
+        };
         class FrontendNet::Rep {
             friend class FrontendNet;
             int m_no_threads;
-            std::vector<std::string> m_ports;
+            std::vector<Port> m_ports;
             int m_listen_duration;
             int m_session_timeout;
             int m_connect_max;
+            std::string m_msg_config;
             yazpp_1::SocketManager mySocketManager;
             ZAssocServer **az;
         };
+        class FrontendNet::My_Timer_Thread : public yazpp_1::ISocketObserver {
+        private:
+            yazpp_1::ISocketObservable *m_obs;
+            Pipe m_pipe;
+            bool m_timeout;
+        public:
+            My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
+            void socketNotify(int event);
+            bool timeout();
+        };
+        class FrontendNet::ZAssocChild : public yazpp_1::Z_Assoc {
+        public:
+            ~ZAssocChild();
+            ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
+                        mp::ThreadPoolSocketObserver *m_thread_pool_observer,
+                        const mp::Package *package,
+                        std::string route,
+                        const char *msg_config);
+            int m_no_requests;
+            std::string m_route;
+        private:
+            yazpp_1::IPDU_Observer* sessionNotify(
+                yazpp_1::IPDU_Observable *the_PDU_Observable,
+                int fd);
+            void recv_GDU(Z_GDU *apdu, int len);
+            
+            void failNotify();
+            void timeoutNotify();
+            void connectNotify();
+        private:
+            mp::ThreadPoolSocketObserver *m_thread_pool_observer;
+            mp::Session m_session;
+            mp::Origin m_origin;
+            bool m_delete_flag;
+            const mp::Package *m_package;
+            const char *m_msg_config;
+        };
+        class FrontendNet::ThreadPoolPackage : public mp::IThreadPoolMsg {
+        public:
+            ThreadPoolPackage(mp::Package *package,
+                              yf::FrontendNet::ZAssocChild *ses,
+                              const char *msg_config);
+            ~ThreadPoolPackage();
+            IThreadPoolMsg *handle();
+            void result(const char *t_info);
+            bool cleanup(void *info);
+        private:
+            yaz_timing_t timer;
+            ZAssocChild *m_assoc_child;
+            mp::Package *m_package;
+            const char *m_msg_config;        
+        }; 
+        class FrontendNet::ZAssocServer : public yazpp_1::Z_Assoc {
+        public:
+            ~ZAssocServer();
+            ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable, int timeout,
+                         int connect_max, std::string route,
+                         const char *msg_config);
+            void set_package(const mp::Package *package);
+            void set_thread_pool(ThreadPoolSocketObserver *m_thread_pool_observer);
+        private:
+            yazpp_1::IPDU_Observer* sessionNotify(
+                yazpp_1::IPDU_Observable *the_PDU_Observable,
+                int fd);
+            void recv_GDU(Z_GDU *apdu, int len);
+            
+            void failNotify();
+            void timeoutNotify();
+            void connectNotify();
+        private:
+            mp::ThreadPoolSocketObserver *m_thread_pool_observer;
+            const mp::Package *m_package;
+            int m_session_timeout;
+            int m_connect_max;
+            yazpp_1::LimitConnect limit_connect;
+            std::string m_route;
+            const char *m_msg_config;
+        };
     }
-    class My_Timer_Thread : public yazpp_1::ISocketObserver {
-    private:
-        yazpp_1::ISocketObservable *m_obs;
-        Pipe m_pipe;
-        bool m_timeout;
-    public:
-        My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
-        void socketNotify(int event);
-        bool timeout();
-    };
-    class ZAssocChild : public yazpp_1::Z_Assoc {
-    public:
-        ~ZAssocChild();
-        ZAssocChild(yazpp_1::IPDU_Observable *the_PDU_Observable,
-                          mp::ThreadPoolSocketObserver *m_thread_pool_observer,
-                          const mp::Package *package);
-        int m_no_requests;
-    private:
-        yazpp_1::IPDU_Observer* sessionNotify(
-            yazpp_1::IPDU_Observable *the_PDU_Observable,
-            int fd);
-        void recv_GDU(Z_GDU *apdu, int len);
-        
-        void failNotify();
-        void timeoutNotify();
-        void connectNotify();
-    private:
-        mp::ThreadPoolSocketObserver *m_thread_pool_observer;
-        mp::Session m_session;
-        mp::Origin m_origin;
-        bool m_delete_flag;
-        const mp::Package *m_package;
-    };
-    class ThreadPoolPackage : public mp::IThreadPoolMsg {
-    public:
-        ThreadPoolPackage(mp::Package *package, mp::ZAssocChild *ses) :
-            m_assoc_child(ses), m_package(package) { };
-        ~ThreadPoolPackage();
-        IThreadPoolMsg *handle();
-        void result();
-        
-    private:
-        mp::ZAssocChild *m_assoc_child;
-        mp::Package *m_package;
-        
-    };
-    class ZAssocServer : public yazpp_1::Z_Assoc {
-    public:
-        ~ZAssocServer();
-        ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable, int timeout,
-                     int connect_max);
-        void set_package(const mp::Package *package);
-        void set_thread_pool(ThreadPoolSocketObserver *m_thread_pool_observer);
-    private:
-        yazpp_1::IPDU_Observer* sessionNotify(
-            yazpp_1::IPDU_Observable *the_PDU_Observable,
-            int fd);
-        void recv_GDU(Z_GDU *apdu, int len);
-        
-        void failNotify();
-        void timeoutNotify();
-    void connectNotify();
-    private:
-        mp::ThreadPoolSocketObserver *m_thread_pool_observer;
-        const mp::Package *m_package;
-        int m_session_timeout;
-        int m_connect_max;
-        yazpp_1::LimitConnect limit_connect;
-    };
 }
 
-mp::ThreadPoolPackage::~ThreadPoolPackage()
+yf::FrontendNet::ThreadPoolPackage::ThreadPoolPackage(mp::Package *package,
+                                                      ZAssocChild *ses,
+                                                      const char *msg_config) :
+    m_assoc_child(ses), m_package(package), m_msg_config(msg_config)
+{
+    if (msg_config)
+        timer = yaz_timing_create();
+    else
+        timer = 0;
+}
+
+yf::FrontendNet::ThreadPoolPackage::~ThreadPoolPackage()
 {
+    yaz_timing_destroy(&timer); // timer may be NULL
     delete m_package;
 }
 
-void mp::ThreadPoolPackage::result()
+bool yf::FrontendNet::ThreadPoolPackage::cleanup(void *info)
+{
+    mp::Session *ses = (mp::Session *) info;
+
+    return *ses == m_package->session();
+}
+
+void yf::FrontendNet::ThreadPoolPackage::result(const char *t_info)
 {
     m_assoc_child->m_no_requests--;
 
@@ -170,25 +208,49 @@ void mp::ThreadPoolPackage::result()
     {
         m_assoc_child->close();
     }
+
+    if (m_msg_config)
+    {
+        yaz_timing_stop(timer);
+        double duration = yaz_timing_get_real(timer);
+        Z_GDU *z_gdu = gdu->get();
+
+        std::ostringstream os;
+        os  << m_msg_config << " "
+            << *m_package << " "
+            << std::fixed << std::setprecision (6) << duration << " ";
+
+        if (z_gdu) 
+            os << *z_gdu;
+        else
+            os << "-";
+        
+        yaz_log(YLOG_LOG, "%s %s", os.str().c_str(), t_info);
+    }
+
     delete this;
 }
 
-mp::IThreadPoolMsg *mp::ThreadPoolPackage::handle() 
+mp::IThreadPoolMsg *yf::FrontendNet::ThreadPoolPackage::handle() 
 {
-    m_package->move();
+    m_package->move(m_assoc_child->m_route);
     return this;
 }
 
 
-mp::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
-                                    mp::ThreadPoolSocketObserver *my_thread_pool,
-                                    const mp::Package *package)
-    :  Z_Assoc(PDU_Observable)
+yf::FrontendNet::ZAssocChild::ZAssocChild(
+    yazpp_1::IPDU_Observable *PDU_Observable,
+    mp::ThreadPoolSocketObserver *my_thread_pool,
+    const mp::Package *package,
+    std::string route,
+    const char *msg_config)
+    :  Z_Assoc(PDU_Observable), m_msg_config(msg_config)
 {
     m_thread_pool_observer = my_thread_pool;
     m_no_requests = 0;
     m_delete_flag = false;
     m_package = package;
+    m_route = route;
     const char *peername = PDU_Observable->getpeername();
     if (!peername)
         peername = "unknown";
@@ -196,29 +258,43 @@ mp::ZAssocChild::ZAssocChild(yazpp_1::IPDU_Observable *PDU_Observable,
 }
 
 
-yazpp_1::IPDU_Observer *mp::ZAssocChild::sessionNotify(yazpp_1::IPDU_Observable
-                                                 *the_PDU_Observable, int fd)
+yazpp_1::IPDU_Observer *yf::FrontendNet::ZAssocChild::sessionNotify(
+    yazpp_1::IPDU_Observable
+    *the_PDU_Observable, int fd)
 {
     return 0;
 }
 
-mp::ZAssocChild::~ZAssocChild()
+yf::FrontendNet::ZAssocChild::~ZAssocChild()
 {
 }
 
-void mp::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
+void yf::FrontendNet::ZAssocChild::recv_GDU(Z_GDU *z_pdu, int len)
 {
     m_no_requests++;
 
     mp::Package *p = new mp::Package(m_session, m_origin);
 
-    mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this);
-    p->copy_filter(*m_package);
+    ThreadPoolPackage *tp = new ThreadPoolPackage(p, this, m_msg_config);
+    p->copy_route(*m_package);
     p->request() = yazpp_1::GDU(z_pdu);
+
+    if (m_msg_config)
+    {
+        if (z_pdu)          
+        {
+            std::ostringstream os;
+            os  << m_msg_config << " "
+                << *p << " "
+                << "0.000000" << " " 
+                << *z_pdu;
+            yaz_log(YLOG_LOG, "%s", os.str().c_str());
+        }
+    }
     m_thread_pool_observer->put(tp);  
 }
 
-void mp::ZAssocChild::failNotify()
+void yf::FrontendNet::ZAssocChild::failNotify()
 {
     // TODO: send Package to signal "close"
     if (m_session.is_closed())
@@ -233,41 +309,45 @@ void mp::ZAssocChild::failNotify()
 
     mp::Package *p = new mp::Package(m_session, m_origin);
 
-    mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this);
-    p->copy_filter(*m_package);
-    m_thread_pool_observer->put(tp);  
+    ThreadPoolPackage *tp = new ThreadPoolPackage(p, this, m_msg_config);
+    p->copy_route(*m_package);
+    m_thread_pool_observer->cleanup(tp, &m_session);
+    m_thread_pool_observer->put(tp);
 }
 
-void mp::ZAssocChild::timeoutNotify()
+void yf::FrontendNet::ZAssocChild::timeoutNotify()
 {
     failNotify();
 }
 
-void mp::ZAssocChild::connectNotify()
+void yf::FrontendNet::ZAssocChild::connectNotify()
 {
 
 }
 
-mp::ZAssocServer::ZAssocServer(yazpp_1::IPDU_Observable *PDU_Observable,
-                               int timeout, int connect_max)
-    :  Z_Assoc(PDU_Observable), m_session_timeout(timeout),
-       m_connect_max(connect_max)
+yf::FrontendNet::ZAssocServer::ZAssocServer(
+    yazpp_1::IPDU_Observable *PDU_Observable,
+    int timeout, int connect_max,
+    std::string route, const char *msg_config)
+    : 
+    Z_Assoc(PDU_Observable), m_session_timeout(timeout),
+    m_connect_max(connect_max), m_route(route), m_msg_config(msg_config)
 {
     m_package = 0;
 }
 
 
-void mp::ZAssocServer::set_package(const mp::Package *package)
+void yf::FrontendNet::ZAssocServer::set_package(const mp::Package *package)
 {
     m_package = package;
 }
 
-void mp::ZAssocServer::set_thread_pool(ThreadPoolSocketObserver *observer)
+void yf::FrontendNet::ZAssocServer::set_thread_pool(ThreadPoolSocketObserver *observer)
 {
     m_thread_pool_observer = observer;
 }
 
-yazpp_1::IPDU_Observer *mp::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
+yazpp_1::IPDU_Observer *yf::FrontendNet::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
                                                 *the_PDU_Observable, int fd)
 {
 
@@ -280,42 +360,43 @@ yazpp_1::IPDU_Observer *mp::ZAssocServer::sessionNotify(yazpp_1::IPDU_Observable
         if (m_connect_max && con_sz > m_connect_max)
             return 0;
     }
-    mp::ZAssocChild *my =
-       new mp::ZAssocChild(the_PDU_Observable, m_thread_pool_observer,
-                             m_package);
+    ZAssocChild *my = new ZAssocChild(the_PDU_Observable,
+                                      m_thread_pool_observer,
+                                      m_package, m_route, m_msg_config);
     my->timeout(m_session_timeout);
     return my;
 }
 
-mp::ZAssocServer::~ZAssocServer()
+yf::FrontendNet::ZAssocServer::~ZAssocServer()
 {
 }
 
-void mp::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
+void yf::FrontendNet::ZAssocServer::recv_GDU(Z_GDU *apdu, int len)
 {
 }
 
-void mp::ZAssocServer::failNotify()
+void yf::FrontendNet::ZAssocServer::failNotify()
 {
 }
 
-void mp::ZAssocServer::timeoutNotify()
+void yf::FrontendNet::ZAssocServer::timeoutNotify()
 {
 }
 
-void mp::ZAssocServer::connectNotify()
+void yf::FrontendNet::ZAssocServer::connectNotify()
 {
 }
 
-mp::filter::FrontendNet::FrontendNet() : m_p(new Rep)
+yf::FrontendNet::FrontendNet() : m_p(new Rep)
 {
     m_p->m_no_threads = 5;
     m_p->m_listen_duration = 0;
     m_p->m_session_timeout = 300; // 5 minutes
+    m_p->m_connect_max = 0;
     m_p->az = 0;
 }
 
-mp::filter::FrontendNet::~FrontendNet()
+yf::FrontendNet::~FrontendNet()
 {
     if (m_p->az)
     {
@@ -324,15 +405,27 @@ mp::filter::FrontendNet::~FrontendNet()
             delete m_p->az[i];
         delete [] m_p->az;
     }
+    m_p->az = 0;
 }
 
-bool mp::My_Timer_Thread::timeout()
+void yf::FrontendNet::stop() const
+{
+    if (m_p->az)
+    {
+        size_t i;
+        for (i = 0; i<m_p->m_ports.size(); i++)
+            m_p->az[i]->server("");
+    }
+}
+
+bool yf::FrontendNet::My_Timer_Thread::timeout()
 {
     return m_timeout;
 }
 
-mp::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
-                                int duration) : 
+yf::FrontendNet::My_Timer_Thread::My_Timer_Thread(
+    yazpp_1::ISocketObservable *obs,
+    int duration) : 
     m_obs(obs), m_pipe(9123), m_timeout(false)
 {
     obs->addObserver(m_pipe.read_fd(), this);
@@ -340,13 +433,13 @@ mp::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
     obs->timeoutObserver(this, duration);
 }
 
-void mp::My_Timer_Thread::socketNotify(int event)
+void yf::FrontendNet::My_Timer_Thread::socketNotify(int event)
 {
     m_timeout = true;
     m_obs->deleteObserver(this);
 }
 
-void mp::filter::FrontendNet::process(Package &package) const
+void yf::FrontendNet::process(Package &package) const
 {
     if (m_p->az == 0)
         return;
@@ -366,26 +459,37 @@ void mp::filter::FrontendNet::process(Package &package) const
     }
     while (m_p->mySocketManager.processEvent() > 0)
     {
+        int no = m_p->mySocketManager.getNumberOfObservers();
+        if (no <= 1)
+            break;
        if (tt && tt->timeout())
            break;
     }
     delete tt;
 }
 
-void mp::filter::FrontendNet::configure(const xmlNode * ptr, bool test_only)
+void yf::FrontendNet::configure(const xmlNode * ptr, bool test_only,
+                                const char *path)
 {
     if (!ptr || !ptr->children)
     {
-        throw mp::filter::FilterException("No ports for Frontend");
+        throw yf::FilterException("No ports for Frontend");
     }
-    std::vector<std::string> ports;
+    std::vector<Port> ports;
     for (ptr = ptr->children; ptr; ptr = ptr->next)
     {
         if (ptr->type != XML_ELEMENT_NODE)
             continue;
         if (!strcmp((const char *) ptr->name, "port"))
         {
-            std::string port = mp::xml::get_text(ptr);
+            Port port;
+            const struct _xmlAttr *attr;
+            for (attr = ptr->properties; attr; attr = attr->next)
+            {
+                if (!strcmp((const char *) attr->name, "route"))
+                    port.route = mp::xml::get_text(attr);
+            }
+            port.port = mp::xml::get_text(ptr);
             ports.push_back(port);
             
         }
@@ -394,7 +498,7 @@ void mp::filter::FrontendNet::configure(const xmlNode * ptr, bool test_only)
             std::string threads_str = mp::xml::get_text(ptr);
             int threads = atoi(threads_str.c_str());
             if (threads < 1)
-                throw mp::filter::FilterException("Bad value for threads: " 
+                throw yf::FilterException("Bad value for threads: " 
                                                    + threads_str);
             m_p->m_no_threads = threads;
         }
@@ -403,19 +507,23 @@ void mp::filter::FrontendNet::configure(const xmlNode * ptr, bool test_only)
             std::string timeout_str = mp::xml::get_text(ptr);
             int timeout = atoi(timeout_str.c_str());
             if (timeout < 1)
-                throw mp::filter::FilterException("Bad value for timeout: " 
+                throw yf::FilterException("Bad value for timeout: " 
                                                    + timeout_str);
             m_p->m_session_timeout = timeout;
         }
         else if (!strcmp((const char *) ptr->name, "connect-max"))
         {
-            m_p->m_connect_max = mp::xml::get_int(ptr->children, 0);
+            m_p->m_connect_max = mp::xml::get_int(ptr, 0);
+        }
+        else if (!strcmp((const char *) ptr->name, "message"))
+        {
+            m_p->m_msg_config = mp::xml::get_text(ptr);
         }
         else
         {
-            throw mp::filter::FilterException("Bad element " 
-                                               + std::string((const char *)
-                                                             ptr->name));
+            throw yf::FilterException("Bad element " 
+                                      + std::string((const char *)
+                                                    ptr->name));
         }
     }
     if (test_only)
@@ -423,39 +531,59 @@ void mp::filter::FrontendNet::configure(const xmlNode * ptr, bool test_only)
     set_ports(ports);
 }
 
-void mp::filter::FrontendNet::set_ports(std::vector<std::string> &ports)
+void yf::FrontendNet::set_ports(std::vector<std::string> &ports)
+{
+    std::vector<Port> nports;
+    size_t i;
+
+    for (i = 0; i < ports.size(); i++)
+    {
+        Port nport;
+
+        nport.port = ports[i];
+
+        nports.push_back(nport);
+    }
+    set_ports(nports);
+}
+
+
+void yf::FrontendNet::set_ports(std::vector<Port> &ports)
 {
     m_p->m_ports = ports;
     
-    m_p->az = new mp::ZAssocServer *[m_p->m_ports.size()];
+    m_p->az = new yf::FrontendNet::ZAssocServer *[m_p->m_ports.size()];
     
-    // Create mp::ZAssocServer for each port
+    // Create yf::FrontendNet::ZAssocServer for each port
     size_t i;
     for (i = 0; i<m_p->m_ports.size(); i++)
     {
-        // create a PDU assoc object (one per mp::ZAssocServer)
+        // create a PDU assoc object (one per yf::FrontendNet::ZAssocServer)
         yazpp_1::PDU_Assoc *as = new yazpp_1::PDU_Assoc(&m_p->mySocketManager);
         
         // create ZAssoc with PDU Assoc
-        m_p->az[i] = new mp::ZAssocServer(as, 
+        m_p->az[i] = new yf::FrontendNet::ZAssocServer(as, 
                                           m_p->m_session_timeout,
-                                          m_p->m_connect_max);
-        if (m_p->az[i]->server(m_p->m_ports[i].c_str()))
+                                          m_p->m_connect_max,
+                                          m_p->m_ports[i].route,
+                                          m_p->m_msg_config.length() > 0 ?
+                                          m_p->m_msg_config.c_str() : 0);
+        if (m_p->az[i]->server(m_p->m_ports[i].port.c_str()))
         {
-            throw mp::filter::FilterException("Unable to bind to address " 
-                                              + std::string(m_p->m_ports[i]));
+            throw yf::FilterException("Unable to bind to address " 
+                                      + std::string(m_p->m_ports[i].port));
         }
     }
 }
 
-void mp::filter::FrontendNet::set_listen_duration(int d)
+void yf::FrontendNet::set_listen_duration(int d)
 {
     m_p->m_listen_duration = d;
 }
 
-static mp::filter::Base* filter_creator()
+static yf::Base* filter_creator()
 {
-    return new mp::filter::FrontendNet;
+    return new yf::FrontendNet;
 }
 
 extern "C" {