Release 1.6.4
[yazpp-moved-to-github.git] / src / yaz-socket-manager.cpp
index 4d15f30..e119735 100644 (file)
@@ -1,8 +1,11 @@
 /* This file is part of the yazpp toolkit.
- * Copyright (C) 1998-2009 Index Data and Mike Taylor
+ * Copyright (C) Index Data 
  * See the file LICENSE for details.
  */
 
+#if HAVE_CONFIG_H
+#include <config.h>
+#endif
 #if HAVE_SYS_TIME_H
 #include <sys/time.h>
 #endif
@@ -17,6 +20,7 @@
 #include <string.h>
 #include <assert.h>
 #include <stdlib.h>
+#include <time.h>
 
 #include <yaz/log.h>
 
 
 using namespace yazpp_1;
 
-SocketManager::SocketEntry **SocketManager::lookupObserver(
+struct SocketManager::SocketEntry {
+    ISocketObserver *observer;
+    int fd;
+    unsigned mask;
+    int timeout;
+    int timeout_this;
+    time_t last_activity;
+    SocketEntry *next;
+};
+
+struct SocketManager::SocketEvent {
+    ISocketObserver *observer;
+    int event;
+    SocketEvent *next;          // front in queue
+    SocketEvent *prev;          // back in queue
+};
+
+struct SocketManager::Rep {
+    void putEvent(SocketEvent *event);
+    SocketEvent *getEvent();
+    void removeEvent(ISocketObserver *observer);
+    void inspect_poll_result(int res, struct yaz_poll_fd *fds, int no_fds,
+                             int timeout);
+    SocketEntry **lookupObserver(ISocketObserver *observer);
+    SocketEntry *observers;       // all registered observers
+    SocketEvent *queue_front;
+    SocketEvent *queue_back;
+    int log;
+};
+
+SocketManager::SocketEntry **SocketManager::Rep::lookupObserver(
     ISocketObserver *observer)
 {
     SocketEntry **se;
-    
-    for (se = &m_observers; *se; se = &(*se)->next)
+
+    for (se = &observers; *se; se = &(*se)->next)
         if ((*se)->observer == observer)
             break;
     return se;
 }
 
+int SocketManager::getNumberOfObservers()
+{
+    int i = 0;
+    SocketEntry *se;
+    for (se = m_p->observers; se; se = se->next, i++)
+        ;
+    return i;
+}
+
 void SocketManager::addObserver(int fd, ISocketObserver *observer)
 {
     SocketEntry *se;
 
-    se = *lookupObserver(observer);
+    se = *m_p->lookupObserver(observer);
     if (!se)
     {
         se = new SocketEntry;
-        se->next= m_observers;
-        m_observers = se;
+        se->next= m_p->observers;
+        m_p->observers = se;
         se->observer = observer;
     }
     se->fd = fd;
@@ -56,10 +99,10 @@ void SocketManager::addObserver(int fd, ISocketObserver *observer)
 
 void SocketManager::deleteObserver(ISocketObserver *observer)
 {
-    SocketEntry **se = lookupObserver(observer);
+    SocketEntry **se = m_p->lookupObserver(observer);
     if (*se)
     {
-        removeEvent (observer);
+        m_p->removeEvent(observer);
         SocketEntry *se_tmp = *se;
         *se = (*se)->next;
         delete se_tmp;
@@ -68,27 +111,27 @@ void SocketManager::deleteObserver(ISocketObserver *observer)
 
 void SocketManager::deleteObservers()
 {
-    SocketEntry *se = m_observers;
-    
+    SocketEntry *se = m_p->observers;
+
     while (se)
     {
         SocketEntry *se_next = se->next;
         delete se;
         se = se_next;
     }
-    m_observers = 0;
+    m_p->observers = 0;
 }
 
 void SocketManager::maskObserver(ISocketObserver *observer, int mask)
 {
     SocketEntry *se;
 
-    yaz_log(m_log, "obs=%p read=%d write=%d except=%d", observer,
+    yaz_log(m_p->log, "obs=%p read=%d write=%d except=%d", observer,
                     mask & SOCKET_OBSERVE_READ,
                     mask & SOCKET_OBSERVE_WRITE,
                     mask & SOCKET_OBSERVE_EXCEPT);
 
-    se = *lookupObserver(observer);
+    se = *m_p->lookupObserver(observer);
     if (se)
         se->mask = mask;
 }
@@ -98,24 +141,34 @@ void SocketManager::timeoutObserver(ISocketObserver *observer,
 {
     SocketEntry *se;
 
-    se = *lookupObserver(observer);
+    se = *m_p->lookupObserver(observer);
     if (se)
         se->timeout = timeout;
 }
 
-
-void SocketManager::inspect_poll_result(int res, struct yaz_poll_fd *fds,
-                                        int no_fds, int timeout)
+void SocketManager::Rep::inspect_poll_result(int res, struct yaz_poll_fd *fds,
+                                             int no_fds, int timeout)
 
 {
-    yaz_log(m_log, "yaz_poll returned res=%d", res);
+    yaz_log(log, "yaz_poll returned res=%d", res);
     time_t now = time(0);
     int i;
     int no_put_events = 0;
-    SocketEntry *p;
+    int no_lost_observers = 0;
 
-    for (i = 0, p = m_observers; p; p = p->next, i++)
+    for (i = 0; i < no_fds; i++)
     {
+        SocketEntry *p;
+        for (p = observers; p; p = p->next)
+            if (p->fd == fds[i].fd)
+                break;
+        if (!p)
+        {
+            // m_p->observers list changed since poll started
+            no_lost_observers++;
+            continue;
+        }
+
         enum yaz_poll_mask output_mask = fds[i].output_mask;
 
         int mask = 0;
@@ -127,29 +180,30 @@ void SocketManager::inspect_poll_result(int res, struct yaz_poll_fd *fds,
 
         if (output_mask & yaz_poll_except)
             mask |= SOCKET_OBSERVE_EXCEPT;
-        
+
         if (mask)
         {
             SocketEvent *event = new SocketEvent;
             p->last_activity = now;
             event->observer = p->observer;
             event->event = mask;
-            putEvent (event);
+            putEvent(event);
             no_put_events++;
-            yaz_log (m_log, "putEvent I/O mask=%d", mask);
+            yaz_log(log, "putEvent I/O mask=%d", mask);
         }
         else if (res == 0 && p->timeout_this == timeout)
         {
             SocketEvent *event = new SocketEvent;
-            assert (p->last_activity);
-            yaz_log (m_log, "putEvent timeout fd=%d, now = %ld last_activity=%ld timeout=%d",
-                     p->fd, now, p->last_activity, p->timeout);
+            assert(p->last_activity);
+            yaz_log(log, "putEvent timeout fd=%d, now = %ld "
+                    "last_activity=%ld timeout=%d",
+                    p->fd, now, p->last_activity, p->timeout);
             p->last_activity = now;
             event->observer = p->observer;
             event->event = SOCKET_OBSERVE_TIMEOUT;
-            putEvent (event);
+            putEvent(event);
             no_put_events++;
-            
+
         }
     }
     SocketEvent *event = getEvent();
@@ -160,21 +214,23 @@ void SocketManager::inspect_poll_result(int res, struct yaz_poll_fd *fds,
     }
     else
     {
-        // bug #2035
-        
-        yaz_log(YLOG_WARN, "unhandled socket event. yaz_poll returned %d", res);
-        yaz_log(YLOG_WARN, "no_put_events=%d no_fds=%d i=%d timeout=%d",
-                no_put_events, no_fds, i, timeout);
-        abort();
+        if (no_lost_observers == 0)
+        {
+            // bug #2035
+            yaz_log(YLOG_WARN, "unhandled socket event. yaz_poll returned %d",
+                    res);
+            yaz_log(YLOG_WARN, "no_put_events=%d no_fds=%d i=%d timeout=%d",
+                    no_put_events, no_fds, i, timeout);
+        }
     }
 }
 
 int SocketManager::processEvent()
 {
     SocketEntry *p;
-    SocketEvent *event = getEvent();
+    SocketEvent *event = m_p->getEvent();
     int timeout = -1;
-    yaz_log (m_log, "SocketManager::processEvent manager=%p", this);
+    yaz_log(m_p->log, "SocketManager::processEvent manager=%p", this);
     if (event)
     {
         event->observer->socketNotify(event->event);
@@ -186,13 +242,13 @@ int SocketManager::processEvent()
     time_t now = time(0);
     int i;
     int no_fds = 0;
-    for (p = m_observers; p; p = p->next)
+    for (p = m_p->observers; p; p = p->next)
         no_fds++;
 
     if (!no_fds)
         return 0;
     struct yaz_poll_fd *fds = new yaz_poll_fd [no_fds];
-    for (i = 0, p = m_observers; p; p = p->next, i++)
+    for (i = 0, p = m_p->observers; p; p = p->next, i++)
     {
         fds[i].fd = p->fd;
         int input_mask = 0;
@@ -216,8 +272,8 @@ int SocketManager::processEvent()
             if (timeout == -1 || timeout_this < timeout)
                 timeout = timeout_this;
             p->timeout_this = timeout_this;
-            yaz_log (m_log, "SocketManager::select timeout_this=%d", 
-                     p->timeout_this);
+            yaz_log(m_p->log, "SocketManager::select timeout_this=%d",
+                    p->timeout_this);
         }
         else
             p->timeout_this = -1;
@@ -225,66 +281,66 @@ int SocketManager::processEvent()
     }
 
     int pass = 0;
-    while ((res = yaz_poll(fds, no_fds, timeout, 0)) < 0)
+    while ((res = yaz_poll(fds, no_fds, timeout, 0)) < 0 && pass < 10)
     {
-        if (errno != EINTR)
+        if (errno == EINTR)
         {
-            yaz_log(YLOG_ERRNO|YLOG_WARN, "yaz_poll");
-            yaz_log(YLOG_WARN, "errno=%d timeout=%d", errno, timeout);
-            if (++pass > 10)
-                return -1;
+            delete [] fds;
+            return 1;
         }
+        yaz_log(YLOG_ERRNO|YLOG_WARN, "yaz_poll");
+        yaz_log(YLOG_WARN, "errno=%d timeout=%d", errno, timeout);
     }
 
-    inspect_poll_result(res, fds, no_fds, timeout);
+    if (res >= 0)
+        m_p->inspect_poll_result(res, fds, no_fds, timeout);
 
     delete [] fds;
-    return 1;
+    return res >= 0 ? 1 : -1;
 }
 
-
 //    n p    n p  ......   n p    n p
 //   front                        back
 
-void SocketManager::putEvent(SocketEvent *event)
+void SocketManager::Rep::putEvent(SocketEvent *event)
 {
     // put in back of queue
-    if (m_queue_back)
+    if (queue_back)
     {
-        m_queue_back->prev = event;
-        assert (m_queue_front);
+        queue_back->prev = event;
+        assert(queue_front);
     }
     else
     {
-        assert (!m_queue_front);
-        m_queue_front = event;
+        assert(!queue_front);
+        queue_front = event;
     }
-    event->next = m_queue_back;
+    event->next = queue_back;
     event->prev = 0;
-    m_queue_back = event;
+    queue_back = event;
 }
 
-SocketManager::SocketEvent *SocketManager::getEvent()
+SocketManager::SocketEvent *SocketManager::Rep::getEvent()
 {
     // get from front of queue
-    SocketEvent *event = m_queue_front;
+    SocketEvent *event = queue_front;
     if (!event)
         return 0;
-    assert (m_queue_back);
-    m_queue_front = event->prev;
-    if (m_queue_front)
+    assert(queue_back);
+    queue_front = event->prev;
+    if (queue_front)
     {
-        assert (m_queue_back);
-        m_queue_front->next = 0;
+        assert(queue_back);
+        queue_front->next = 0;
     }
     else
-        m_queue_back = 0;
+        queue_back = 0;
     return event;
 }
 
-void SocketManager::removeEvent(ISocketObserver *observer)
+void SocketManager::Rep::removeEvent(ISocketObserver *observer)
 {
-    SocketEvent *ev = m_queue_back;
+    SocketEvent *ev = queue_back;
     while (ev)
     {
         SocketEvent *ev_next = ev->next;
@@ -293,11 +349,11 @@ void SocketManager::removeEvent(ISocketObserver *observer)
             if (ev->prev)
                 ev->prev->next = ev->next;
             else
-                m_queue_back = ev->next;
+                queue_back = ev->next;
             if (ev->next)
                 ev->next->prev = ev->prev;
             else
-                m_queue_front = ev->prev;
+                queue_front = ev->prev;
             delete ev;
         }
         ev = ev_next;
@@ -306,19 +362,22 @@ void SocketManager::removeEvent(ISocketObserver *observer)
 
 SocketManager::SocketManager()
 {
-    m_observers = 0;
-    m_queue_front = 0;
-    m_queue_back = 0;
-    m_log = YLOG_DEBUG;
+    m_p = new Rep;
+    m_p->observers = 0;
+    m_p->queue_front = 0;
+    m_p->queue_back = 0;
+    m_p->log = YLOG_DEBUG;
 }
 
 SocketManager::~SocketManager()
 {
     deleteObservers();
+    delete m_p;
 }
 /*
  * Local variables:
  * c-basic-offset: 4
+ * c-file-style: "Stroustrup"
  * indent-tabs-mode: nil
  * End:
  * vim: shiftwidth=4 tabstop=8 expandtab