From: Adam Dickmeiss Date: Mon, 7 Nov 2005 21:57:10 +0000 (+0000) Subject: Calls to pipe(2) replaced with usage of Pipe class. Now passes X-Git-Tag: YP2.0.0.2~147 X-Git-Url: http://git.indexdata.com/?a=commitdiff_plain;h=d8ae25e485e2b560d7990fadbd061c45e70b69a6;p=metaproxy-moved-to-github.git Calls to pipe(2) replaced with usage of Pipe class. Now passes all tests on Unix. --- diff --git a/src/filter.cpp b/src/filter.cpp index 1b5ef43..f28700d 100644 --- a/src/filter.cpp +++ b/src/filter.cpp @@ -1,4 +1,4 @@ -/* $Id: filter.cpp,v 1.2 2005-10-31 09:40:18 marc Exp $ +/* $Id: filter.cpp,v 1.3 2005-11-07 21:57:10 adam Exp $ Copyright (c) 2005, Index Data. %LICENSE% @@ -6,6 +6,7 @@ #include +#include "config.hpp" #include "filter.hpp" diff --git a/src/filter_factory.hpp b/src/filter_factory.hpp index 7ad4e19..29630a6 100644 --- a/src/filter_factory.hpp +++ b/src/filter_factory.hpp @@ -1,4 +1,4 @@ -/* $Id: filter_factory.hpp,v 1.4 2005-10-31 09:40:18 marc Exp $ +/* $Id: filter_factory.hpp,v 1.5 2005-11-07 21:57:10 adam Exp $ Copyright (c) 2005, Index Data. %LICENSE% @@ -12,7 +12,6 @@ #include #include -#include "config.hpp" #include "filter.hpp" diff --git a/src/filter_frontend_net.cpp b/src/filter_frontend_net.cpp index 5c1f119..68f28ba 100644 --- a/src/filter_frontend_net.cpp +++ b/src/filter_frontend_net.cpp @@ -1,4 +1,4 @@ -/* $Id: filter_frontend_net.cpp,v 1.8 2005-11-07 12:31:43 adam Exp $ +/* $Id: filter_frontend_net.cpp,v 1.9 2005-11-07 21:57:10 adam Exp $ Copyright (c) 2005, Index Data. %LICENSE% @@ -7,6 +7,7 @@ #include "config.hpp" +#include "pipe.hpp" #include "filter.hpp" #include "router.hpp" #include "package.hpp" @@ -23,7 +24,7 @@ namespace yp2 { class My_Timer_Thread : public yazpp_1::ISocketObserver { private: yazpp_1::ISocketObservable *m_obs; - int m_fd[2]; + Pipe m_pipe; bool m_timeout; public: My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration); @@ -227,10 +228,9 @@ bool yp2::My_Timer_Thread::timeout() yp2::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration) : - m_obs(obs), m_timeout(false) + m_obs(obs), m_pipe(9123), m_timeout(false) { - pipe(m_fd); - obs->addObserver(m_fd[0], this); + obs->addObserver(m_pipe.read_fd(), this); obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ); obs->timeoutObserver(this, duration); } @@ -239,8 +239,6 @@ void yp2::My_Timer_Thread::socketNotify(int event) { m_timeout = true; m_obs->deleteObserver(this); - close(m_fd[0]); - close(m_fd[1]); } void yp2::filter::FrontendNet::process(Package &package) const { diff --git a/src/pipe.cpp b/src/pipe.cpp index f0306f4..d45dee6 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -1,5 +1,5 @@ -/* $Id: pipe.cpp,v 1.1 2005-11-07 12:32:01 adam Exp $ +/* $Id: pipe.cpp,v 1.2 2005-11-07 21:57:10 adam Exp $ Copyright (c) 2005, Index Data. %LICENSE% @@ -10,6 +10,7 @@ #include #endif +#include #ifdef WIN32 #include #else @@ -17,6 +18,8 @@ #include #include #include + +#include #endif #if HAVE_SYS_SOCKET_H @@ -45,6 +48,7 @@ namespace yp2 { Rep(); int m_fd[2]; int m_socket; + bool nonblock(int s); }; } @@ -56,42 +60,90 @@ Pipe::Rep::Rep() m_socket = -1; } +bool Pipe::Rep::nonblock(int s) +{ +#ifdef WIN32 + if (ioctlsocket(s, FIONBIO, &tru) < 0) + return false; +#else + if (fcntl(s, F_SETFL, O_NONBLOCK) < 0) + return false; +#ifndef MSG_NOSIGNAL + signal (SIGPIPE, SIG_IGN); +#endif +#endif + return true; +} + Pipe::Pipe(int port_to_use) : m_p(new Rep) { if (port_to_use) { + // create server socket m_p->m_socket = socket(AF_INET, SOCK_STREAM, 0); if (m_p->m_socket < 0) throw Pipe::Error("could not create socket"); - - m_p->m_fd[1] = socket(AF_INET, SOCK_STREAM, 0); - if (m_p->m_fd[1] < 0) - throw Pipe::Error("could not create socket"); - +#ifndef WIN32 + unsigned long one = 1; + if (setsockopt(m_p->m_socket, SOL_SOCKET, SO_REUSEADDR, (char*) + &one, sizeof(one)) < 0) + throw Pipe::Error("setsockopt error"); +#endif + // bind server socket struct sockaddr_in add; add.sin_family = AF_INET; add.sin_port = htons(port_to_use); add.sin_addr.s_addr = INADDR_ANY; struct sockaddr *addr = ( struct sockaddr *) &add; - + if (bind(m_p->m_socket, addr, sizeof(struct sockaddr_in))) throw Pipe::Error("could not bind on socket"); if (listen(m_p->m_socket, 3) < 0) throw Pipe::Error("could not listen on socket"); + // client socket + in_addr_t tmpadd; + tmpadd = (unsigned) inet_addr("127.0.0.1"); + if (tmpadd) + memcpy(&add.sin_addr.s_addr, &tmpadd, sizeof(struct in_addr)); + else + throw Pipe::Error("inet_addr failed"); + + m_p->m_fd[1] = socket(AF_INET, SOCK_STREAM, 0); + if (m_p->m_fd[1] < 0) + throw Pipe::Error("could not create socket"); + + m_p->nonblock(m_p->m_fd[1]); + + if (connect(m_p->m_fd[1], addr, sizeof(*addr)) < 0 && + errno != EINPROGRESS) + { + fprintf(stderr, "errno=%d[%s] tmpadd=%x\n", + errno, strerror(errno), tmpadd); + throw Pipe::Error("could not connect to socket"); + } + + // server accept struct sockaddr caddr; socklen_t caddr_len = sizeof(caddr); m_p->m_fd[0] = accept(m_p->m_socket, &caddr, &caddr_len); if (m_p->m_fd[0] < 0) throw Pipe::Error("could not accept on socket"); - - if (connect(m_p->m_fd[1], addr, sizeof(addr)) < 0) - throw Pipe::Error("could not connect to socket"); + + // complete connect + fd_set write_set; + FD_ZERO(&write_set); + FD_SET(m_p->m_fd[1], &write_set); + int r = select(m_p->m_fd[1]+1, 0, &write_set, 0, 0); + if (r != 1) + throw Pipe::Error("could not complete connect"); + + close(m_p->m_socket); + m_p->m_socket = -1; } else { - m_p->m_socket = 0; pipe(m_p->m_fd); } } diff --git a/src/pipe.hpp b/src/pipe.hpp index f5b2378..b883c39 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -1,4 +1,4 @@ -/* $Id: pipe.hpp,v 1.1 2005-11-07 12:32:01 adam Exp $ +/* $Id: pipe.hpp,v 1.2 2005-11-07 21:57:10 adam Exp $ Copyright (c) 2005, Index Data. %LICENSE% @@ -7,6 +7,7 @@ #ifndef YP2_PIPE_HPP #define YP2_PIPE_HPP +#include #include #include diff --git a/src/test_pipe.cpp b/src/test_pipe.cpp index e4f09c8..d1f1233 100644 --- a/src/test_pipe.cpp +++ b/src/test_pipe.cpp @@ -1,4 +1,4 @@ -/* $Id: test_pipe.cpp,v 1.1 2005-11-07 12:32:01 adam Exp $ +/* $Id: test_pipe.cpp,v 1.2 2005-11-07 21:57:10 adam Exp $ Copyright (c) 2005, Index Data. %LICENSE% @@ -19,19 +19,19 @@ using namespace boost::unit_test; -class My_Timer_Thread : public yazpp_1::ISocketObserver { +class Timer : public yazpp_1::ISocketObserver { private: yazpp_1::ISocketObservable *m_obs; yp2::Pipe m_pipe; bool m_timeout; public: - My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration); + Timer(yazpp_1::ISocketObservable *obs, int duration); void socketNotify(int event); bool timeout() { return m_timeout; }; }; -My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs, +Timer::Timer(yazpp_1::ISocketObservable *obs, int duration) : m_obs(obs), m_pipe(0), m_timeout(false) { @@ -40,7 +40,7 @@ My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs, obs->timeoutObserver(this, duration); } -void My_Timer_Thread::socketNotify(int event) +void Timer::socketNotify(int event) { m_timeout = true; m_obs->deleteObserver(this); @@ -50,14 +50,14 @@ BOOST_AUTO_TEST_CASE( test_pipe_1 ) { yazpp_1::SocketManager mySocketManager; - yp2::Pipe pipe(0); + yp2::Pipe pipe(9999); - My_Timer_Thread t(&mySocketManager, 0); + Timer t(&mySocketManager, 0); while (mySocketManager.processEvent() > 0) if (t.timeout()) break; - BOOST_CHECK (t.timeout()); + BOOST_CHECK(t.timeout()); } /* diff --git a/src/test_thread_pool_observer.cpp b/src/test_thread_pool_observer.cpp index 38b4116..c496391 100644 --- a/src/test_thread_pool_observer.cpp +++ b/src/test_thread_pool_observer.cpp @@ -1,9 +1,9 @@ -/* $Id: test_thread_pool_observer.cpp,v 1.6 2005-10-15 14:09:09 adam Exp $ +/* $Id: test_thread_pool_observer.cpp,v 1.7 2005-11-07 21:57:10 adam Exp $ Copyright (c) 2005, Index Data. %LICENSE% */ -/* $Id: test_thread_pool_observer.cpp,v 1.6 2005-10-15 14:09:09 adam Exp $ +/* $Id: test_thread_pool_observer.cpp,v 1.7 2005-11-07 21:57:10 adam Exp $ Copyright (c) 1998-2005, Index Data. This file is part of the yaz-proxy. @@ -31,6 +31,7 @@ Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA #include #include #include +#include "pipe.hpp" #include "thread_pool_observer.hpp" #define BOOST_AUTO_TEST_MAIN @@ -52,7 +53,7 @@ public: class My_Timer_Thread : public ISocketObserver { private: ISocketObservable *m_obs; - int m_fd[2]; + yp2::Pipe m_pipe; yp2::ThreadPoolSocketObserver *m_t; public: int m_sum; @@ -82,14 +83,14 @@ void My_Msg::result() } My_Timer_Thread::My_Timer_Thread(ISocketObservable *obs, - yp2::ThreadPoolSocketObserver *t) : m_obs(obs) + yp2::ThreadPoolSocketObserver *t) : + m_obs(obs), m_pipe(9123) { - pipe(m_fd); m_t = t; m_sum = 0; m_requests = 0; m_responses = 0; - obs->addObserver(m_fd[0], this); + obs->addObserver(m_pipe.read_fd(), this); obs->maskObserver(this, SOCKET_OBSERVE_READ); obs->timeoutObserver(this, 0); } diff --git a/src/thread_pool_observer.cpp b/src/thread_pool_observer.cpp index 180f385..2f3a0e0 100644 --- a/src/thread_pool_observer.cpp +++ b/src/thread_pool_observer.cpp @@ -1,5 +1,5 @@ -/* $Id: thread_pool_observer.cpp,v 1.10 2005-11-07 12:31:05 adam Exp $ +/* $Id: thread_pool_observer.cpp,v 1.11 2005-11-07 21:57:10 adam Exp $ Copyright (c) 2005, Index Data. %LICENSE% @@ -26,6 +26,7 @@ #include #include "thread_pool_observer.hpp" +#include "pipe.hpp" namespace yp2 { class ThreadPoolSocketObserver::Worker { @@ -44,7 +45,7 @@ namespace yp2 { ~Rep(); private: yazpp_1::ISocketObservable *m_socketObservable; - int m_fd[2]; + Pipe m_pipe; boost::thread_group m_thrds; boost::mutex m_mutex_input_data; boost::condition m_cond_input_data; @@ -61,7 +62,7 @@ using namespace yazpp_1; using namespace yp2; ThreadPoolSocketObserver::Rep::Rep(ISocketObservable *obs) - : m_socketObservable(obs) + : m_socketObservable(obs), m_pipe(9123) { } @@ -78,8 +79,7 @@ ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs, int no_threads) : m_p(new Rep(obs)) { - pipe(m_p->m_fd); - obs->addObserver(m_p->m_fd[0], this); + obs->addObserver(m_p->m_pipe.read_fd(), this); obs->maskObserver(this, SOCKET_OBSERVE_READ); m_p->m_stop_flag = false; @@ -102,9 +102,6 @@ ThreadPoolSocketObserver::~ThreadPoolSocketObserver() m_p->m_thrds.join_all(); m_p->m_socketObservable->deleteObserver(this); - - close(m_p->m_fd[0]); - close(m_p->m_fd[1]); } void ThreadPoolSocketObserver::socketNotify(int event) @@ -112,7 +109,7 @@ void ThreadPoolSocketObserver::socketNotify(int event) if (event & SOCKET_OBSERVE_READ) { char buf[2]; - read(m_p->m_fd[0], buf, 1); + read(m_p->m_pipe.read_fd(), buf, 1); IThreadPoolMsg *out; { boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data); @@ -143,7 +140,7 @@ void ThreadPoolSocketObserver::run(void *p) { boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data); m_p->m_output.push_back(out); - write(m_p->m_fd[1], "", 1); + write(m_p->m_pipe.write_fd(), "", 1); } } }