Simplified process interface. Private sub class Worker.
authorAdam Dickmeiss <adam@indexdata.dk>
Fri, 14 Oct 2005 10:08:40 +0000 (10:08 +0000)
committerAdam Dickmeiss <adam@indexdata.dk>
Fri, 14 Oct 2005 10:08:40 +0000 (10:08 +0000)
src/filter.hpp
src/filter_frontend_net.cpp
src/filter_frontend_net.hpp
src/package.hpp
src/test_filter1.cpp
src/test_filter2.cpp
src/test_filter_frontend_net.cpp
src/thread_pool_observer.cpp
src/thread_pool_observer.hpp

index 4632a3b..f66457c 100644 (file)
@@ -13,8 +13,7 @@ namespace yp2 {
         virtual ~Filter(){};
 
         ///sends Package off to next Filter, returns altered Package
-        virtual  Package & process(Package & package) const {
-            return package;
+        virtual  void process(Package & package) const {
         };
         virtual  void configure(){};
 
index 6b2a46e..fbc48a0 100644 (file)
@@ -20,7 +20,7 @@ class P2_Session : public yazpp_1::Z_Assoc {
 public:
     ~P2_Session();
     P2_Session(yazpp_1::IPDU_Observable *the_PDU_Observable,
-              ThreadPoolSocketObserver *m_my_thread,
+              ThreadPoolSocketObserver *m_thread_pool_observer,
               const Package *package);
     int m_no_requests;
 private:
@@ -33,7 +33,7 @@ private:
     void timeoutNotify();
     void connectNotify();
 private:
-    ThreadPoolSocketObserver *m_my_thread;
+    ThreadPoolSocketObserver *m_thread_pool_observer;
     Session m_session;
     Origin m_origin;
     bool m_delete_flag;
@@ -87,7 +87,7 @@ P2_Session::P2_Session(yazpp_1::IPDU_Observable *the_PDU_Observable,
                       const Package *package)
     :  Z_Assoc(the_PDU_Observable)
 {
-    m_my_thread = my_thread_pool;
+    m_thread_pool_observer = my_thread_pool;
     m_no_requests = 0;
     m_delete_flag = false;
     m_package = package;
@@ -110,10 +110,10 @@ void P2_Session::recv_GDU(Z_GDU *z_pdu, int len)
 
     Package *p = new Package(m_session, m_origin);
 
-    ThreadPoolPackage *m = new ThreadPoolPackage(p, this);
+    ThreadPoolPackage *tp = new ThreadPoolPackage(p, this);
     p->copy_filter(*m_package);
     p->request() = yazpp_1::GDU(z_pdu);
-    m_my_thread->put(m);  
+    m_thread_pool_observer->put(tp);  
 }
 
 void P2_Session::failNotify()
@@ -127,9 +127,9 @@ void P2_Session::failNotify()
 
     Package *p = new Package(m_session, m_origin);
 
-    ThreadPoolPackage *m = new ThreadPoolPackage(p, this);
+    ThreadPoolPackage *tp = new ThreadPoolPackage(p, this);
     p->copy_filter(*m_package);
-    m_my_thread->put(m);  
+    m_thread_pool_observer->put(tp);  
 }
 
 void P2_Session::timeoutNotify()
@@ -146,7 +146,7 @@ class P2_Server : public yazpp_1::Z_Assoc {
 public:
     ~P2_Server();
     P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
-              ThreadPoolSocketObserver *m_my_thread,
+              ThreadPoolSocketObserver *m_thread_pool_observer,
              const Package *package);
 private:
     yazpp_1::IPDU_Observer* sessionNotify(
@@ -158,17 +158,17 @@ private:
     void timeoutNotify();
     void connectNotify();
 private:
-    ThreadPoolSocketObserver *m_my_thread;
+    ThreadPoolSocketObserver *m_thread_pool_observer;
     const Package *m_package;
 };
 
 
 P2_Server::P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
-                     ThreadPoolSocketObserver *my_thread,
+                     ThreadPoolSocketObserver *thread_pool_observer,
                     const Package *package)
     :  Z_Assoc(the_PDU_Observable)
 {
-    m_my_thread = my_thread;
+    m_thread_pool_observer = thread_pool_observer;
     m_package = package;
 
 }
@@ -176,7 +176,7 @@ P2_Server::P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable,
 yazpp_1::IPDU_Observer *P2_Server::sessionNotify(yazpp_1::IPDU_Observable
                                                 *the_PDU_Observable, int fd)
 {
-    P2_Session *my = new P2_Session(the_PDU_Observable, m_my_thread,
+    P2_Session *my = new P2_Session(the_PDU_Observable, m_thread_pool_observer,
                                    m_package);
     return my;
 }
@@ -242,7 +242,7 @@ void My_Timer_Thread::socketNotify(int event)
     close(m_fd[1]);
 }
 
-Package &FilterFrontendNet::process(Package &package) const {
+void FilterFrontendNet::process(Package &package) const {
     yazpp_1::SocketManager mySocketManager;
 
     My_Timer_Thread *tt = 0;
@@ -262,7 +262,7 @@ Package &FilterFrontendNet::process(Package &package) const {
        if (tt && tt->timeout())
            break;
     }
-    return package;
+    delete tt;
 }
 
 std::string &FilterFrontendNet::listen_address()
index b30fabe..d00945f 100644 (file)
@@ -10,7 +10,7 @@ namespace yp2 {
     class FilterFrontendNet : public yp2::Filter {
     public:
        FilterFrontendNet::FilterFrontendNet();
-       yp2::Package & process(yp2::Package & package) const;
+       void process(yp2::Package & package) const;
     private:
         int m_no_threads;
         std::string m_listen_address;
index 014bfeb..808a23b 100644 (file)
@@ -38,12 +38,10 @@ namespace yp2 {
         }
 
         /// send Package to it's next Filter defined in Router
-        Package & move() {
+        void move() {
             m_filter = m_router->move(m_filter, this);
             if (m_filter)
-                return m_filter->process(*this);
-            else
-                return *this;
+                m_filter->process(*this);
         }
         
         /// access session - left val in assignment
index b78a18a..2983c8a 100644 (file)
@@ -12,9 +12,7 @@ using namespace boost::unit_test;
 
 class TFilter: public yp2::Filter {
 public:
-    yp2::Package & process(yp2::Package & package) const {
-       return package;
-    };
+    void process(yp2::Package & package) const {};
 };
     
 
index e2ee322..f5cf20a 100644 (file)
@@ -15,18 +15,18 @@ using namespace boost::unit_test;
 
 class FilterConstant: public yp2::Filter {
 public:
-    yp2::Package & process(yp2::Package & package) const {
+    void process(yp2::Package & package) const {
        package.data() = 1234;
-       return package.move();
+       package.move();
     };
 };
 
 
 class FilterDouble: public yp2::Filter {
 public:
-    yp2::Package & process(yp2::Package & package) const {
+    void process(yp2::Package & package) const {
        package.data() = package.data() * 2;
-       return package.move();
+       package.move();
     };
 };
     
@@ -49,11 +49,11 @@ BOOST_AUTO_TEST_CASE( testfilter2 )
 
             yp2::Session session;
             yp2::Origin origin;
-           yp2::Package pack_in(session, origin);
+           yp2::Package pack(session, origin);
            
-           yp2::Package pack_out = pack_in.router(router1).move(); 
+           pack.router(router1).move(); 
            
-            BOOST_CHECK (pack_out.data() == 2468);
+            BOOST_CHECK (pack.data() == 2468);
             
         }
         
@@ -65,13 +65,11 @@ BOOST_AUTO_TEST_CASE( testfilter2 )
            
             yp2::Session session;
             yp2::Origin origin;
-           yp2::Package pack_in(session, origin);
+           yp2::Package pack(session, origin);
         
-           yp2::Package pack_out(session, origin);
-
-            pack_out = pack_in.router(router2).move();
+            pack.router(router2).move();
      
-            BOOST_CHECK (pack_out.data() == 1234);
+            BOOST_CHECK (pack.data() == 1234);
             
        }
 
index a8e5116..abba95b 100644 (file)
@@ -16,14 +16,14 @@ using namespace boost::unit_test;
 
 class FilterInit: public yp2::Filter {
 public:
-    yp2::Package & process(yp2::Package & package) const {
+    void process(yp2::Package & package) const {
 
-        Z_GDU *gdu = package.request().get();
         if (package.session().is_closed())
         {
-            // std::cout << "Got Close. Sending nothing\n";
+            // std::cout << "Got Close.\n";
         }
        
+        Z_GDU *gdu = package.request().get();
         if (gdu)
         {
             // std::cout << "Got PDU. Sending init response\n";
@@ -47,7 +47,6 @@ BOOST_AUTO_TEST_CASE( test_filter_frontend_net_1 )
         {
             yp2::FilterFrontendNet nf;
         }
-        BOOST_CHECK(true);
     }
     catch ( ... ) {
         BOOST_CHECK (false);
@@ -68,20 +67,20 @@ BOOST_AUTO_TEST_CASE( test_filter_frontend_net_2 )
             // Create package with Z39.50 init request in it
             yp2::Session session;
             yp2::Origin origin;
-           yp2::Package pack_in(session, origin);
+           yp2::Package pack(session, origin);
 
             ODR odr = odr_createmem(ODR_ENCODE);
             Z_APDU *apdu = zget_APDU(odr, Z_APDU_initRequest);
             
-            pack_in.request() = apdu;
+            pack.request() = apdu;
             odr_destroy(odr);
            // Done creating query. 
 
             // Put it in router
-           pack_in.router(router).move(); 
+           pack.router(router).move(); 
 
             // Inspect that we got Z39.50 init response
-            yazpp_1::GDU *gdu = &pack_in.response();
+            yazpp_1::GDU *gdu = &pack.response();
 
             Z_GDU *z_gdu = gdu->get();
             BOOST_CHECK(z_gdu);
@@ -90,7 +89,6 @@ BOOST_AUTO_TEST_CASE( test_filter_frontend_net_2 )
                 BOOST_CHECK_EQUAL(z_gdu->u.z3950->which, Z_APDU_initResponse);
             }
         }
-        BOOST_CHECK(true);
     }
     catch ( ... ) {
         BOOST_CHECK (false);
@@ -103,19 +101,22 @@ BOOST_AUTO_TEST_CASE( test_filter_frontend_net_3 )
     {
         {
            yp2::RouterChain router;
+
+            // put in frontend first
             yp2::FilterFrontendNet filter_front;
             filter_front.listen_address() = "unix:socket";
             filter_front.listen_duration() = 2;  // listen a short time only
            router.rule(filter_front);
 
+            // put in a backend
             FilterInit filter_init;
            router.rule(filter_init);
 
             yp2::Session session;
             yp2::Origin origin;
-           yp2::Package pack_in(session, origin);
+           yp2::Package pack(session, origin);
            
-           pack_in.router(router).move(); 
+           pack.router(router).move(); 
         }
         BOOST_CHECK(true);
     }
index de0cf63..72325ce 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: thread_pool_observer.cpp,v 1.4 2005-10-13 20:06:45 adam Exp $
+/* $Id: thread_pool_observer.cpp,v 1.5 2005-10-14 10:08:40 adam Exp $
    Copyright (c) 1998-2005, Index Data.
 
 This file is part of the yaz-proxy.
@@ -35,15 +35,6 @@ IThreadPoolMsg::~IThreadPoolMsg()
 
 }
 
-class worker {
-public:
-    worker(ThreadPoolSocketObserver *s) : m_s(s) {};
-    ThreadPoolSocketObserver *m_s;
-    void operator() (void) {
-        m_s->run(0);
-    }
-};
-
 ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs, int no_threads)
     : m_SocketObservable(obs)
 {
@@ -56,7 +47,7 @@ ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs, int n
     int i;
     for (i = 0; i<no_threads; i++)
     {
-        worker w(this);
+        Worker w(this);
         m_thrds.add_thread(new boost::thread(w));
     }
 }
index 0ff7a50..4f9636d 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: thread_pool_observer.hpp,v 1.1 2005-10-13 20:06:45 adam Exp $
+/* $Id: thread_pool_observer.hpp,v 1.2 2005-10-14 10:08:40 adam Exp $
    Copyright (c) 1998-2005, Index Data.
 
 This file is part of the yaz-proxy.
@@ -41,7 +41,16 @@ public:
 };
 
 class ThreadPoolSocketObserver : public yazpp_1::ISocketObserver {
- public:
+private:
+    class Worker {
+    public:
+        Worker(ThreadPoolSocketObserver *s) : m_s(s) {};
+        ThreadPoolSocketObserver *m_s;
+        void operator() (void) {
+            m_s->run(0);
+        }
+    };
+public:
     ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs, int no_threads);
     virtual ~ThreadPoolSocketObserver();
     void socketNotify(int event);
@@ -61,6 +70,8 @@ private:
     boost::condition m_cond_input_data;
     boost::mutex m_mutex_output_data;
     bool m_stop_flag;
+
+    
 };
 
 #endif