Calls to pipe(2) replaced with usage of Pipe class. Now passes
authorAdam Dickmeiss <adam@indexdata.dk>
Mon, 7 Nov 2005 21:57:10 +0000 (21:57 +0000)
committerAdam Dickmeiss <adam@indexdata.dk>
Mon, 7 Nov 2005 21:57:10 +0000 (21:57 +0000)
all tests on Unix.

src/filter.cpp
src/filter_factory.hpp
src/filter_frontend_net.cpp
src/pipe.cpp
src/pipe.hpp
src/test_pipe.cpp
src/test_thread_pool_observer.cpp
src/thread_pool_observer.cpp

index 1b5ef43..f28700d 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: filter.cpp,v 1.2 2005-10-31 09:40:18 marc Exp $
+/* $Id: filter.cpp,v 1.3 2005-11-07 21:57:10 adam Exp $
    Copyright (c) 2005, Index Data.
 
 %LICENSE%
@@ -6,6 +6,7 @@
 
 #include <stdexcept>
 
+#include "config.hpp"
 #include "filter.hpp"
 
 
index 7ad4e19..29630a6 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: filter_factory.hpp,v 1.4 2005-10-31 09:40:18 marc Exp $
+/* $Id: filter_factory.hpp,v 1.5 2005-11-07 21:57:10 adam Exp $
    Copyright (c) 2005, Index Data.
 
 %LICENSE%
@@ -12,7 +12,6 @@
 #include <string>
 #include <map>
 
-#include "config.hpp"
 #include "filter.hpp"
 
 
index 5c1f119..68f28ba 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: filter_frontend_net.cpp,v 1.8 2005-11-07 12:31:43 adam Exp $
+/* $Id: filter_frontend_net.cpp,v 1.9 2005-11-07 21:57:10 adam Exp $
    Copyright (c) 2005, Index Data.
 
 %LICENSE%
@@ -7,6 +7,7 @@
 
 #include "config.hpp"
 
+#include "pipe.hpp"
 #include "filter.hpp"
 #include "router.hpp"
 #include "package.hpp"
@@ -23,7 +24,7 @@ namespace yp2 {
     class My_Timer_Thread : public yazpp_1::ISocketObserver {
     private:
         yazpp_1::ISocketObservable *m_obs;
-        int m_fd[2];
+        Pipe m_pipe;
         bool m_timeout;
     public:
         My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
@@ -227,10 +228,9 @@ bool yp2::My_Timer_Thread::timeout()
 
 yp2::My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
                                 int duration) : 
-    m_obs(obs), m_timeout(false)
+    m_obs(obs), m_pipe(9123), m_timeout(false)
 {
-    pipe(m_fd);
-    obs->addObserver(m_fd[0], this);
+    obs->addObserver(m_pipe.read_fd(), this);
     obs->maskObserver(this, yazpp_1::SOCKET_OBSERVE_READ);
     obs->timeoutObserver(this, duration);
 }
@@ -239,8 +239,6 @@ void yp2::My_Timer_Thread::socketNotify(int event)
 {
     m_timeout = true;
     m_obs->deleteObserver(this);
-    close(m_fd[0]);
-    close(m_fd[1]);
 }
 
 void yp2::filter::FrontendNet::process(Package &package) const {
index f0306f4..d45dee6 100644 (file)
@@ -1,5 +1,5 @@
 
-/* $Id: pipe.cpp,v 1.1 2005-11-07 12:32:01 adam Exp $
+/* $Id: pipe.cpp,v 1.2 2005-11-07 21:57:10 adam Exp $
    Copyright (c) 2005, Index Data.
 
 %LICENSE%
@@ -10,6 +10,7 @@
 #include <unistd.h>
 #endif
 
+#include <errno.h>
 #ifdef WIN32
 #include <winsock.h>
 #else
@@ -17,6 +18,8 @@
 #include <netdb.h>
 #include <arpa/inet.h>
 #include <netinet/tcp.h>
+
+#include <fcntl.h>
 #endif
 
 #if HAVE_SYS_SOCKET_H
@@ -45,6 +48,7 @@ namespace yp2 {
         Rep();
         int m_fd[2];
         int m_socket;
+        bool nonblock(int s);
     };
 }
 
@@ -56,42 +60,90 @@ Pipe::Rep::Rep()
     m_socket = -1;
 }
 
+bool Pipe::Rep::nonblock(int s)
+{
+#ifdef WIN32
+    if (ioctlsocket(s, FIONBIO, &tru) < 0)
+        return false;
+#else
+    if (fcntl(s, F_SETFL, O_NONBLOCK) < 0)
+        return false;
+#ifndef MSG_NOSIGNAL
+    signal (SIGPIPE, SIG_IGN);
+#endif
+#endif
+    return true;
+}
+
 Pipe::Pipe(int port_to_use) : m_p(new Rep)
 {
     if (port_to_use)
     {
+        // create server socket
         m_p->m_socket = socket(AF_INET, SOCK_STREAM, 0);
         if (m_p->m_socket < 0)
             throw Pipe::Error("could not create socket");
-
-        m_p->m_fd[1] = socket(AF_INET, SOCK_STREAM, 0);
-        if (m_p->m_fd[1] < 0)
-            throw Pipe::Error("could not create socket");
-        
+#ifndef WIN32
+        unsigned long one = 1;
+        if (setsockopt(m_p->m_socket, SOL_SOCKET, SO_REUSEADDR, (char*) 
+                       &one, sizeof(one)) < 0)
+            throw Pipe::Error("setsockopt error");
+#endif
+        // bind server socket
         struct sockaddr_in add;
         add.sin_family = AF_INET;
         add.sin_port = htons(port_to_use);
         add.sin_addr.s_addr = INADDR_ANY;
         struct sockaddr *addr = ( struct sockaddr *) &add;
-        
+      
         if (bind(m_p->m_socket, addr, sizeof(struct sockaddr_in)))
             throw Pipe::Error("could not bind on socket");
         
         if (listen(m_p->m_socket, 3) < 0)
             throw Pipe::Error("could not listen on socket");
 
+        // client socket
+        in_addr_t tmpadd;
+        tmpadd = (unsigned) inet_addr("127.0.0.1");
+        if (tmpadd)
+            memcpy(&add.sin_addr.s_addr, &tmpadd, sizeof(struct in_addr));
+        else
+            throw Pipe::Error("inet_addr failed");
+            
+        m_p->m_fd[1] = socket(AF_INET, SOCK_STREAM, 0);
+        if (m_p->m_fd[1] < 0)
+            throw Pipe::Error("could not create socket");
+        
+        m_p->nonblock(m_p->m_fd[1]);
+
+        if (connect(m_p->m_fd[1], addr, sizeof(*addr)) < 0 &&
+            errno != EINPROGRESS)
+        {
+            fprintf(stderr, "errno=%d[%s] tmpadd=%x\n", 
+                    errno, strerror(errno), tmpadd);
+            throw Pipe::Error("could not connect to socket");
+        }
+
+        // server accept
         struct sockaddr caddr;
         socklen_t caddr_len = sizeof(caddr);
         m_p->m_fd[0] = accept(m_p->m_socket, &caddr, &caddr_len);
         if (m_p->m_fd[0] < 0)
             throw Pipe::Error("could not accept on socket");
-        
-        if (connect(m_p->m_fd[1], addr, sizeof(addr)) < 0)
-            throw Pipe::Error("could not connect to socket");
+
+        // complete connect
+        fd_set write_set;
+        FD_ZERO(&write_set);
+        FD_SET(m_p->m_fd[1], &write_set);
+        int r = select(m_p->m_fd[1]+1, 0, &write_set, 0, 0);
+        if (r != 1)
+            throw Pipe::Error("could not complete connect");
+
+        close(m_p->m_socket);
+        m_p->m_socket = -1;
     }
     else
     {
-        m_p->m_socket = 0;
         pipe(m_p->m_fd);
     }
 }
index f5b2378..b883c39 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: pipe.hpp,v 1.1 2005-11-07 12:32:01 adam Exp $
+/* $Id: pipe.hpp,v 1.2 2005-11-07 21:57:10 adam Exp $
    Copyright (c) 2005, Index Data.
 
 %LICENSE%
@@ -7,6 +7,7 @@
 #ifndef YP2_PIPE_HPP
 #define YP2_PIPE_HPP
 
+#include <stdexcept>
 #include <boost/scoped_ptr.hpp>
 
 #include <yaz/yconfig.h>
index e4f09c8..d1f1233 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: test_pipe.cpp,v 1.1 2005-11-07 12:32:01 adam Exp $
+/* $Id: test_pipe.cpp,v 1.2 2005-11-07 21:57:10 adam Exp $
    Copyright (c) 2005, Index Data.
 
 %LICENSE%
 
 using namespace boost::unit_test;
 
-class My_Timer_Thread : public yazpp_1::ISocketObserver {
+class Timer : public yazpp_1::ISocketObserver {
 private:
     yazpp_1::ISocketObservable *m_obs;
     yp2::Pipe m_pipe;
     bool m_timeout;
 public:
-    My_Timer_Thread(yazpp_1::ISocketObservable *obs, int duration);
+    Timer(yazpp_1::ISocketObservable *obs, int duration);
     void socketNotify(int event);
     bool timeout() { return m_timeout; };
 };
 
 
-My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
+Timer::Timer(yazpp_1::ISocketObservable *obs,
                                 int duration) : 
     m_obs(obs), m_pipe(0), m_timeout(false)
 {
@@ -40,7 +40,7 @@ My_Timer_Thread::My_Timer_Thread(yazpp_1::ISocketObservable *obs,
     obs->timeoutObserver(this, duration);
 }
 
-void My_Timer_Thread::socketNotify(int event)
+void Timer::socketNotify(int event)
 {
     m_timeout = true;
     m_obs->deleteObserver(this);
@@ -50,14 +50,14 @@ BOOST_AUTO_TEST_CASE( test_pipe_1 )
 {
     yazpp_1::SocketManager mySocketManager;
     
-    yp2::Pipe pipe(0);
+    yp2::Pipe pipe(9999);
 
-    My_Timer_Thread t(&mySocketManager, 0);
+    Timer t(&mySocketManager, 0);
 
     while (mySocketManager.processEvent() > 0)
         if (t.timeout())
             break;
-    BOOST_CHECK (t.timeout());
+    BOOST_CHECK(t.timeout());
 }
 
 /*
index 38b4116..c496391 100644 (file)
@@ -1,9 +1,9 @@
-/* $Id: test_thread_pool_observer.cpp,v 1.6 2005-10-15 14:09:09 adam Exp $
+/* $Id: test_thread_pool_observer.cpp,v 1.7 2005-11-07 21:57:10 adam Exp $
    Copyright (c) 2005, Index Data.
 
 %LICENSE%
  */
-/* $Id: test_thread_pool_observer.cpp,v 1.6 2005-10-15 14:09:09 adam Exp $
+/* $Id: test_thread_pool_observer.cpp,v 1.7 2005-11-07 21:57:10 adam Exp $
    Copyright (c) 1998-2005, Index Data.
 
 This file is part of the yaz-proxy.
@@ -31,6 +31,7 @@ Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
 #include <yaz++/pdu-assoc.h>
 #include <yaz++/socket-manager.h>
 #include <yaz/log.h>
+#include "pipe.hpp"
 #include "thread_pool_observer.hpp"
 
 #define BOOST_AUTO_TEST_MAIN
@@ -52,7 +53,7 @@ public:
 class My_Timer_Thread : public ISocketObserver {
 private:
     ISocketObservable *m_obs;
-    int m_fd[2];
+    yp2::Pipe m_pipe;
     yp2::ThreadPoolSocketObserver *m_t;
 public:
     int m_sum;
@@ -82,14 +83,14 @@ void My_Msg::result()
 }
 
 My_Timer_Thread::My_Timer_Thread(ISocketObservable *obs,
-                                 yp2::ThreadPoolSocketObserver *t) : m_obs(obs) 
+                                 yp2::ThreadPoolSocketObserver *t) : 
+    m_obs(obs), m_pipe(9123) 
 {
-    pipe(m_fd);
     m_t = t;
     m_sum = 0;
     m_requests = 0;
     m_responses = 0;
-    obs->addObserver(m_fd[0], this);
+    obs->addObserver(m_pipe.read_fd(), this);
     obs->maskObserver(this, SOCKET_OBSERVE_READ);
     obs->timeoutObserver(this, 0);
 }
index 180f385..2f3a0e0 100644 (file)
@@ -1,5 +1,5 @@
 
-/* $Id: thread_pool_observer.cpp,v 1.10 2005-11-07 12:31:05 adam Exp $
+/* $Id: thread_pool_observer.cpp,v 1.11 2005-11-07 21:57:10 adam Exp $
    Copyright (c) 2005, Index Data.
 
 %LICENSE%
@@ -26,6 +26,7 @@
 #include <yaz/log.h>
 
 #include "thread_pool_observer.hpp"
+#include "pipe.hpp"
 
 namespace yp2 {
     class ThreadPoolSocketObserver::Worker {
@@ -44,7 +45,7 @@ namespace yp2 {
         ~Rep();
     private:
         yazpp_1::ISocketObservable *m_socketObservable;
-        int m_fd[2];
+        Pipe m_pipe;
         boost::thread_group m_thrds;
         boost::mutex m_mutex_input_data;
         boost::condition m_cond_input_data;
@@ -61,7 +62,7 @@ using namespace yazpp_1;
 using namespace yp2;
 
 ThreadPoolSocketObserver::Rep::Rep(ISocketObservable *obs)
-    : m_socketObservable(obs)
+    : m_socketObservable(obs), m_pipe(9123)
 {
 }
 
@@ -78,8 +79,7 @@ ThreadPoolSocketObserver::ThreadPoolSocketObserver(ISocketObservable *obs,
                                                    int no_threads)
     : m_p(new Rep(obs))
 {
-    pipe(m_p->m_fd);
-    obs->addObserver(m_p->m_fd[0], this);
+    obs->addObserver(m_p->m_pipe.read_fd(), this);
     obs->maskObserver(this, SOCKET_OBSERVE_READ);
 
     m_p->m_stop_flag = false;
@@ -102,9 +102,6 @@ ThreadPoolSocketObserver::~ThreadPoolSocketObserver()
     m_p->m_thrds.join_all();
 
     m_p->m_socketObservable->deleteObserver(this);
-
-    close(m_p->m_fd[0]);
-    close(m_p->m_fd[1]);
 }
 
 void ThreadPoolSocketObserver::socketNotify(int event)
@@ -112,7 +109,7 @@ void ThreadPoolSocketObserver::socketNotify(int event)
     if (event & SOCKET_OBSERVE_READ)
     {
         char buf[2];
-        read(m_p->m_fd[0], buf, 1);
+        read(m_p->m_pipe.read_fd(), buf, 1);
         IThreadPoolMsg *out;
         {
             boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
@@ -143,7 +140,7 @@ void ThreadPoolSocketObserver::run(void *p)
         {
             boost::mutex::scoped_lock output_lock(m_p->m_mutex_output_data);
             m_p->m_output.push_back(out);
-            write(m_p->m_fd[1], "", 1);
+            write(m_p->m_pipe.write_fd(), "", 1);
         }
     }
 }