X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Fthread_pool_observer.cpp;h=ce372351b63c33f1dce625be64b60cb9504525c5;hb=722c498e016a6eb94044c608a7d1dcb21e5013cd;hp=d630fd953a5b4cbe2c5661b2861cf04c2a36c1f8;hpb=73bda3a639851ca17dd1449b94203600a32cd838;p=metaproxy-moved-to-github.git diff --git a/src/thread_pool_observer.cpp b/src/thread_pool_observer.cpp index d630fd9..ce37235 100644 --- a/src/thread_pool_observer.cpp +++ b/src/thread_pool_observer.cpp @@ -206,14 +206,30 @@ void ThreadPoolSocketObserver::run(void *p) } } +void ThreadPoolSocketObserver::cleanup(IThreadPoolMsg *m, void *info) +{ + boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data); + + std::deque::iterator it = m_p->m_input.begin(); + while (it != m_p->m_input.end()) + { + if ((*it)->cleanup(info)) + it = m_p->m_input.erase(it); + else + it++; + } +} + void ThreadPoolSocketObserver::put(IThreadPoolMsg *m) { boost::mutex::scoped_lock input_lock(m_p->m_mutex_input_data); + while (m_p->m_input.size() >= m_p->m_no_threads * queue_size_per_thread) m_p->m_cond_input_full.wait(input_lock); m_p->m_input.push_back(m); m_p->m_cond_input_data.notify_one(); } + /* * Local variables: * c-basic-offset: 4