From 722c498e016a6eb94044c608a7d1dcb21e5013cd Mon Sep 17 00:00:00 2001 From: Adam Dickmeiss Date: Tue, 17 Apr 2012 10:33:12 +0200 Subject: [PATCH] Remove incoming requests that can not be handled Requests in queue for filter frontend_net are removed if client closes connection for the session and request is not yet handled. --- src/filter_frontend_net.cpp | 12 ++++++++++-- src/test_thread_pool_observer.cpp | 6 ++++++ src/thread_pool_observer.cpp | 16 ++++++++++++++++ src/thread_pool_observer.hpp | 2 ++ 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/filter_frontend_net.cpp b/src/filter_frontend_net.cpp index a3ab5f7..b347ad2 100644 --- a/src/filter_frontend_net.cpp +++ b/src/filter_frontend_net.cpp @@ -103,7 +103,7 @@ namespace metaproxy_1 { ~ThreadPoolPackage(); IThreadPoolMsg *handle(); void result(const char *t_info); - + bool cleanup(void *info); private: yaz_timing_t timer; mp::ZAssocChild *m_assoc_child; @@ -155,6 +155,13 @@ mp::ThreadPoolPackage::~ThreadPoolPackage() delete m_package; } +bool mp::ThreadPoolPackage::cleanup(void *info) +{ + mp::Session *ses = (mp::Session *) info; + + return *ses == m_package->session(); +} + void mp::ThreadPoolPackage::result(const char *t_info) { m_assoc_child->m_no_requests--; @@ -284,7 +291,8 @@ void mp::ZAssocChild::failNotify() mp::ThreadPoolPackage *tp = new mp::ThreadPoolPackage(p, this, m_msg_config); p->copy_route(*m_package); - m_thread_pool_observer->put(tp); + m_thread_pool_observer->cleanup(tp, &m_session); + m_thread_pool_observer->put(tp); } void mp::ZAssocChild::timeoutNotify() diff --git a/src/test_thread_pool_observer.cpp b/src/test_thread_pool_observer.cpp index 9700db1..89f8f04 100644 --- a/src/test_thread_pool_observer.cpp +++ b/src/test_thread_pool_observer.cpp @@ -40,6 +40,7 @@ class My_Msg : public mp::IThreadPoolMsg { public: mp::IThreadPoolMsg *handle(); void result(const char *t_info); + bool cleanup(void *info); int m_val; My_Timer_Thread *m_timer; }; @@ -70,6 +71,11 @@ mp::IThreadPoolMsg *My_Msg::handle() return res; } +bool My_Msg::cleanup(void *info) +{ + return false; +} + void My_Msg::result(const char *t_info) { m_timer->m_sum += m_val; diff --git a/src/thread_pool_observer.cpp b/src/thread_pool_observer.cpp index d630fd9..ce37235 100644 --- a/src/thread_pool_observer.cpp +++ b/src/thread_pool_observer.cpp @@ -206,14 +206,30 @@ void ThreadPoolSocketObserver::run(void *p) } } +void ThreadPoolSocketObserver::cleanup(IThreadPoolMsg *m, void *info) +{ + boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data); + + std::deque::iterator it = m_p->m_input.begin(); + while (it != m_p->m_input.end()) + { + if ((*it)->cleanup(info)) + it = m_p->m_input.erase(it); + else + it++; + } +} + void ThreadPoolSocketObserver::put(IThreadPoolMsg *m) { boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data); + while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread) m_p->m_cond_input_full.wait(input_lock); m_p->m_input.push_back(m); m_p->m_cond_input_data.notify_one(); } + /* * Local variables: * c-basic-offset: 4 diff --git a/src/thread_pool_observer.hpp b/src/thread_pool_observer.hpp index 67aee0b..73ee78f 100644 --- a/src/thread_pool_observer.hpp +++ b/src/thread_pool_observer.hpp @@ -30,6 +30,7 @@ namespace metaproxy_1 { virtual IThreadPoolMsg *handle() = 0; virtual void result(const char *info) = 0; virtual ~IThreadPoolMsg(); + virtual bool cleanup(void *info) = 0; }; class ThreadPoolSocketObserver : public yazpp_1::ISocketObserver { @@ -40,6 +41,7 @@ namespace metaproxy_1 { int no_threads); virtual ~ThreadPoolSocketObserver(); void put(IThreadPoolMsg *m); + void cleanup(IThreadPoolMsg *m, void *info); IThreadPoolMsg *get(); void run(void *p); private: -- 1.7.10.4