From cea1fb12604fd1ddbac6804b95c4aff078d30409 Mon Sep 17 00:00:00 2001 From: Adam Dickmeiss Date: Fri, 14 Oct 2005 10:08:40 +0000 Subject: [PATCH] Simplified process interface. Private sub class Worker. --- src/filter.hpp | 3 +-- src/filter_frontend_net.cpp | 28 ++++++++++++++-------------- src/filter_frontend_net.hpp | 2 +- src/package.hpp | 6 ++---- src/test_filter1.cpp | 4 +--- src/test_filter2.cpp | 22 ++++++++++------------ src/test_filter_frontend_net.cpp | 23 ++++++++++++----------- src/thread_pool_observer.cpp | 13 ++----------- src/thread_pool_observer.hpp | 15 +++++++++++++-- 9 files changed, 56 insertions(+), 60 deletions(-) diff --git a/src/filter.hpp b/src/filter.hpp index 4632a3b..f66457c 100644 --- a/src/filter.hpp +++ b/src/filter.hpp @@ -13,8 +13,7 @@ namespace yp2 { virtual ~Filter(){}; ///sends Package off to next Filter, returns altered Package - virtual Package & process(Package & package) const { - return package; + virtual void process(Package & package) const { }; virtual void configure(){}; diff --git a/src/filter_frontend_net.cpp b/src/filter_frontend_net.cpp index 6b2a46e..fbc48a0 100644 --- a/src/filter_frontend_net.cpp +++ b/src/filter_frontend_net.cpp @@ -20,7 +20,7 @@ class P2_Session : public yazpp_1::Z_Assoc { public: ~P2_Session(); P2_Session(yazpp_1::IPDU_Observable *the_PDU_Observable, - ThreadPoolSocketObserver *m_my_thread, + ThreadPoolSocketObserver *m_thread_pool_observer, const Package *package); int m_no_requests; private: @@ -33,7 +33,7 @@ private: void timeoutNotify(); void connectNotify(); private: - ThreadPoolSocketObserver *m_my_thread; + ThreadPoolSocketObserver *m_thread_pool_observer; Session m_session; Origin m_origin; bool m_delete_flag; @@ -87,7 +87,7 @@ P2_Session::P2_Session(yazpp_1::IPDU_Observable *the_PDU_Observable, const Package *package) : Z_Assoc(the_PDU_Observable) { - m_my_thread = my_thread_pool; + m_thread_pool_observer = my_thread_pool; m_no_requests = 0; m_delete_flag = false; m_package = package; @@ -110,10 +110,10 @@ void P2_Session::recv_GDU(Z_GDU *z_pdu, int len) Package *p = new Package(m_session, m_origin); - ThreadPoolPackage *m = new ThreadPoolPackage(p, this); + ThreadPoolPackage *tp = new ThreadPoolPackage(p, this); p->copy_filter(*m_package); p->request() = yazpp_1::GDU(z_pdu); - m_my_thread->put(m); + m_thread_pool_observer->put(tp); } void P2_Session::failNotify() @@ -127,9 +127,9 @@ void P2_Session::failNotify() Package *p = new Package(m_session, m_origin); - ThreadPoolPackage *m = new ThreadPoolPackage(p, this); + ThreadPoolPackage *tp = new ThreadPoolPackage(p, this); p->copy_filter(*m_package); - m_my_thread->put(m); + m_thread_pool_observer->put(tp); } void P2_Session::timeoutNotify() @@ -146,7 +146,7 @@ class P2_Server : public yazpp_1::Z_Assoc { public: ~P2_Server(); P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable, - ThreadPoolSocketObserver *m_my_thread, + ThreadPoolSocketObserver *m_thread_pool_observer, const Package *package); private: yazpp_1::IPDU_Observer* sessionNotify( @@ -158,17 +158,17 @@ private: void timeoutNotify(); void connectNotify(); private: - ThreadPoolSocketObserver *m_my_thread; + ThreadPoolSocketObserver *m_thread_pool_observer; const Package *m_package; }; P2_Server::P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable, - ThreadPoolSocketObserver *my_thread, + ThreadPoolSocketObserver *thread_pool_observer, const Package *package) : Z_Assoc(the_PDU_Observable) { - m_my_thread = my_thread; + m_thread_pool_observer = thread_pool_observer; m_package = package; } @@ -176,7 +176,7 @@ P2_Server::P2_Server(yazpp_1::IPDU_Observable *the_PDU_Observable, yazpp_1::IPDU_Observer *P2_Server::sessionNotify(yazpp_1::IPDU_Observable *the_PDU_Observable, int fd) { - P2_Session *my = new P2_Session(the_PDU_Observable, m_my_thread, + P2_Session *my = new P2_Session(the_PDU_Observable, m_thread_pool_observer, m_package); return my; } @@ -242,7 +242,7 @@ void My_Timer_Thread::socketNotify(int event) close(m_fd[1]); } -Package &FilterFrontendNet::process(Package &package) const { +void FilterFrontendNet::process(Package &package) const { yazpp_1::SocketManager mySocketManager; My_Timer_Thread *tt = 0; @@ -262,7 +262,7 @@ Package &FilterFrontendNet::process(Package &package) const { if (tt && tt->timeout()) break; } - return package; + delete tt; } std::string &FilterFrontendNet::listen_address() diff --git a/src/filter_frontend_net.hpp b/src/filter_frontend_net.hpp index b30fabe..d00945f 100644 --- a/src/filter_frontend_net.hpp +++ b/src/filter_frontend_net.hpp @@ -10,7 +10,7 @@ namespace yp2 { class FilterFrontendNet : public yp2::Filter { public: FilterFrontendNet::FilterFrontendNet(); - yp2::Package & process(yp2::Package & package) const; + void process(yp2::Package & package) const; private: int m_no_threads; std::string m_listen_address; diff --git a/src/package.hpp b/src/package.hpp index 014bfeb..808a23b 100644 --- a/src/package.hpp +++ b/src/package.hpp @@ -38,12 +38,10 @@ namespace yp2 { } /// send Package to it's next Filter defined in Router - Package & move() { + void move() { m_filter = m_router->move(m_filter, this); if (m_filter) - return m_filter->process(*this); - else - return *this; + m_filter->process(*this); } /// access session - left val in assignment diff --git a/src/test_filter1.cpp b/src/test_filter1.cpp index b78a18a..2983c8a 100644 --- a/src/test_filter1.cpp +++ b/src/test_filter1.cpp @@ -12,9 +12,7 @@ using namespace boost::unit_test; class TFilter: public yp2::Filter { public: - yp2::Package & process(yp2::Package & package) const { - return package; - }; + void process(yp2::Package & package) const {}; }; diff --git a/src/test_filter2.cpp b/src/test_filter2.cpp index e2ee322..f5cf20a 100644 --- a/src/test_filter2.cpp +++ b/src/test_filter2.cpp @@ -15,18 +15,18 @@ using namespace boost::unit_test; class FilterConstant: public yp2::Filter { public: - yp2::Package & process(yp2::Package & package) const { + void process(yp2::Package & package) const { package.data() = 1234; - return package.move(); + package.move(); }; }; class FilterDouble: public yp2::Filter { public: - yp2::Package & process(yp2::Package & package) const { + void process(yp2::Package & package) const { package.data() = package.data() * 2; - return package.move(); + package.move(); }; }; @@ -49,11 +49,11 @@ BOOST_AUTO_TEST_CASE( testfilter2 ) yp2::Session session; yp2::Origin origin; - yp2::Package pack_in(session, origin); + yp2::Package pack(session, origin); - yp2::Package pack_out = pack_in.router(router1).move(); + pack.router(router1).move(); - BOOST_CHECK (pack_out.data() == 2468); + BOOST_CHECK (pack.data() == 2468); } @@ -65,13 +65,11 @@ BOOST_AUTO_TEST_CASE( testfilter2 ) yp2::Session session; yp2::Origin origin; - yp2::Package pack_in(session, origin); + yp2::Package pack(session, origin); - yp2::Package pack_out(session, origin); - - pack_out = pack_in.router(router2).move(); + pack.router(router2).move(); - BOOST_CHECK (pack_out.data() == 1234); + BOOST_CHECK (pack.data() == 1234); } diff --git a/src/test_filter_frontend_net.cpp b/src/test_filter_frontend_net.cpp index a8e5116..abba95b 100644 --- a/src/test_filter_frontend_net.cpp +++ b/src/test_filter_frontend_net.cpp @@ -16,14 +16,14 @@ using namespace boost::unit_test; class FilterInit: public yp2::Filter { public: - yp2::Package & process(yp2::Package & package) const { + void process(yp2::Package & package) const { - Z_GDU *gdu = package.request().get(); if (package.session().is_closed()) { - // std::cout << "Got Close. Sending nothing\n"; + // std::cout << "Got Close.\n"; } + Z_GDU *gdu = package.request().get(); if (gdu) { // std::cout << "Got PDU. Sending init response\n"; @@ -47,7 +47,6 @@ BOOST_AUTO_TEST_CASE( test_filter_frontend_net_1 ) { yp2::FilterFrontendNet nf; } - BOOST_CHECK(true); } catch ( ... ) { BOOST_CHECK (false); @@ -68,20 +67,20 @@ BOOST_AUTO_TEST_CASE( test_filter_frontend_net_2 ) // Create package with Z39.50 init request in it yp2::Session session; yp2::Origin origin; - yp2::Package pack_in(session, origin); + yp2::Package pack(session, origin); ODR odr = odr_createmem(ODR_ENCODE); Z_APDU *apdu = zget_APDU(odr, Z_APDU_initRequest); - pack_in.request() = apdu; + pack.request() = apdu; odr_destroy(odr); // Done creating query. // Put it in router - pack_in.router(router).move(); + pack.router(router).move(); // Inspect that we got Z39.50 init response - yazpp_1::GDU *gdu = &pack_in.response(); + yazpp_1::GDU *gdu = &pack.response(); Z_GDU *z_gdu = gdu->get(); BOOST_CHECK(z_gdu); @@ -90,7 +89,6 @@ BOOST_AUTO_TEST_CASE( test_filter_frontend_net_2 ) BOOST_CHECK_EQUAL(z_gdu->u.z3950->which, Z_APDU_initResponse); } } - BOOST_CHECK(true); } catch ( ... ) { BOOST_CHECK (false); @@ -103,19 +101,22 @@ BOOST_AUTO_TEST_CASE( test_filter_frontend_net_3 ) { { yp2::RouterChain router; + + // put in frontend first yp2::FilterFrontendNet filter_front; filter_front.listen_address() = "unix:socket"; filter_front.listen_duration() = 2; // listen a short time only router.rule(filter_front); + // put in a backend FilterInit filter_init; router.rule(filter_init); yp2::Session session; yp2::Origin origin; - yp2::Package pack_in(session, origin); + yp2::Package pack(session, origin); - pack_in.router(router).move(); + pack.router(router).move(); } BOOST_CHECK(true); } diff --git a/src/thread_pool_observer.cpp b/src/thread_pool_observer.cpp index de0cf63..72325ce 100644 --- a/src/thread_pool_observer.cpp +++ b/src/thread_pool_observer.cpp @@ -1,4 +1,4 @@ -/* $Id: thread_pool_observer.cpp,v 1.4 2005-10-13 20:06:45 adam Exp $ +/* $Id: thread_pool_observer.cpp,v 1.5 2005-10-14 10:08:40 adam Exp $ Copyright (c) 1998-2005, Index Data. This file is part of the yaz-proxy. @@ -35,15 +35,6 @@ IThreadPoolMsg::~IThreadPoolMsg() } -class worker { -public: - worker(ThreadPoolSocketObserver *s) : m_s(s) {}; - ThreadPoolSocketObserver *m_s; - void operator() (void) { - m_s->run(0); - } -}; - ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs, int no_threads) : m_SocketObservable(obs) { @@ -56,7 +47,7 @@ ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs, int n int i; for (i = 0; irun(0); + } + }; +public: ThreadPoolSocketObserver(yazpp_1::ISocketObservable *obs, int no_threads); virtual ~ThreadPoolSocketObserver(); void socketNotify(int event); @@ -61,6 +70,8 @@ private: boost::condition m_cond_input_data; boost::mutex m_mutex_output_data; bool m_stop_flag; + + }; #endif -- 1.7.10.4