From a715926f3a2dc33f605b119561ccac38322f1c7b Mon Sep 17 00:00:00 2001 From: Adam Dickmeiss Date: Wed, 18 Apr 2007 12:06:59 +0000 Subject: [PATCH] Fixed bug #1064: Test test_thread_pool_observer hangs. --- src/test_thread_pool_observer.cpp | 12 ++++++++++-- src/thread_pool_observer.cpp | 12 +++++++++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/test_thread_pool_observer.cpp b/src/test_thread_pool_observer.cpp index 6138759..c5810c2 100644 --- a/src/test_thread_pool_observer.cpp +++ b/src/test_thread_pool_observer.cpp @@ -1,4 +1,4 @@ -/* $Id: test_thread_pool_observer.cpp,v 1.12 2007-01-25 14:05:54 adam Exp $ +/* $Id: test_thread_pool_observer.cpp,v 1.13 2007-04-18 12:06:59 adam Exp $ Copyright (c) 2005-2007, Index Data. See the LICENSE file for details @@ -82,6 +82,14 @@ void My_Timer_Thread::socketNotify(int event) m->m_val = m_requests++; m->m_timer = this; m_t->put(m); +#if 0 + // prevent input queue from being filled up.. + // bug #1064: Test test_thread_pool_observer hangs + // fortunately we don't need this hack. because put (ebove) + // will block itself if needed + if (m->m_val == 30) + m_obs->deleteObserver(this); +#endif } BOOST_AUTO_UNIT_TEST( thread_pool_observer1 ) @@ -93,7 +101,7 @@ BOOST_AUTO_UNIT_TEST( thread_pool_observer1 ) while (t.m_responses < 30 && mySocketManager.processEvent() > 0) ; BOOST_CHECK_EQUAL(t.m_responses, 30); - BOOST_CHECK(t.m_sum >= 435); + BOOST_CHECK(t.m_sum >= 435); // = 29*30/2 } /* diff --git a/src/thread_pool_observer.cpp b/src/thread_pool_observer.cpp index ab72255..36cf783 100644 --- a/src/thread_pool_observer.cpp +++ b/src/thread_pool_observer.cpp @@ -1,4 +1,4 @@ -/* $Id: thread_pool_observer.cpp,v 1.19 2007-02-19 12:51:08 adam Exp $ +/* $Id: thread_pool_observer.cpp,v 1.20 2007-04-18 12:06:59 adam Exp $ Copyright (c) 2005-2007, Index Data. See the LICENSE file for details @@ -52,15 +52,18 @@ namespace metaproxy_1 { boost::thread_group m_thrds; boost::mutex m_mutex_input_data; boost::condition m_cond_input_data; + boost::condition m_cond_input_full; boost::mutex m_mutex_output_data; std::deque m_input; std::deque m_output; bool m_stop_flag; - int m_no_threads; + unsigned m_no_threads; }; + const unsigned int queue_size_per_thread = 64; } + using namespace yazpp_1; using namespace metaproxy_1; @@ -141,7 +144,8 @@ void ThreadPoolSocketObserver::run(void *p) break; in = m_p->m_input.front(); - m_p->m_input.pop_front(); + m_p->m_input.pop_front(); + m_p->m_cond_input_full.notify_all(); } IThreadPoolMsg *out = in->handle(); { @@ -159,6 +163,8 @@ void ThreadPoolSocketObserver::run(void *p) void ThreadPoolSocketObserver::put(IThreadPoolMsg *m) { boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data); + while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread) + m_p->m_cond_input_full.wait(input_lock); m_p->m_input.push_back(m); m_p->m_cond_input_data.notify_one(); } -- 1.7.10.4