X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Fthread_pool_observer.cpp;h=e87090c851f3ec093a7170d8ec41bf5bd3deea87;hb=b80f4b3d531f36265ea46d337cb7e1000604cafb;hp=180f385aae736516706be355ad853c7445fdbe2e;hpb=323af6850b8aba94fb86edb303867dc55dc2eb7f;p=metaproxy-moved-to-github.git diff --git a/src/thread_pool_observer.cpp b/src/thread_pool_observer.cpp index 180f385..e87090c 100644 --- a/src/thread_pool_observer.cpp +++ b/src/thread_pool_observer.cpp @@ -1,6 +1,6 @@ -/* $Id: thread_pool_observer.cpp,v 1.10 2005-11-07 12:31:05 adam Exp $ - Copyright (c) 2005, Index Data. +/* $Id: thread_pool_observer.cpp,v 1.15 2006-03-29 13:44:45 adam Exp $ + Copyright (c) 2005-2006, Index Data. %LICENSE% */ @@ -13,6 +13,10 @@ #include #endif +#if HAVE_SYS_SOCKET_H +#include +#endif + #include #include #include @@ -22,12 +26,15 @@ #include -#include +#include #include #include "thread_pool_observer.hpp" +#include "pipe.hpp" -namespace yp2 { +namespace mp = metaproxy_1; + +namespace metaproxy_1 { class ThreadPoolSocketObserver::Worker { public: Worker(ThreadPoolSocketObserver *s) : m_s(s) {}; @@ -44,7 +51,7 @@ namespace yp2 { ~Rep(); private: yazpp_1::ISocketObservable *m_socketObservable; - int m_fd[2]; + Pipe m_pipe; boost::thread_group m_thrds; boost::mutex m_mutex_input_data; boost::condition m_cond_input_data; @@ -58,10 +65,10 @@ namespace yp2 { using namespace yazpp_1; -using namespace yp2; +using namespace mp; ThreadPoolSocketObserver::Rep::Rep(ISocketObservable *obs) - : m_socketObservable(obs) + : m_socketObservable(obs), m_pipe(9123) { } @@ -78,8 +85,7 @@ ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs, int no_threads) : m_p(new Rep(obs)) { - pipe(m_p->m_fd); - obs->addObserver(m_p->m_fd[0], this); + obs->addObserver(m_p->m_pipe.read_fd(), this); obs->maskObserver(this, SOCKET_OBSERVE_READ); m_p->m_stop_flag = false; @@ -102,9 +108,6 @@ ThreadPoolSocketObserver::~ThreadPoolSocketObserver() m_p->m_thrds.join_all(); m_p->m_socketObservable->deleteObserver(this); - - close(m_p->m_fd[0]); - close(m_p->m_fd[1]); } void ThreadPoolSocketObserver::socketNotify(int event) @@ -112,7 +115,7 @@ void ThreadPoolSocketObserver::socketNotify(int event) if (event & SOCKET_OBSERVE_READ) { char buf[2]; - read(m_p->m_fd[0], buf, 1); + recv(m_p->m_pipe.read_fd(), buf, 1, 0); IThreadPoolMsg *out; { boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data); @@ -143,7 +146,7 @@ void ThreadPoolSocketObserver::run(void *p) { boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data); m_p->m_output.push_back(out); - write(m_p->m_fd[1], "", 1); + send(m_p->m_pipe.write_fd(), "", 1, 0); } } }