X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Fyaz-socket-manager.cpp;h=c78c529ef27c81cd3a3d8f1b6e5b668134a85383;hb=805b4fdf7b70b0e9b298064f24ceff0c1d38e05b;hp=7d771ad9723a8582df5a0bec0daef3a4b4a39f2c;hpb=6ed58852b5c17bc8ac4124e916b9167af47db6b7;p=yazpp-moved-to-github.git diff --git a/src/yaz-socket-manager.cpp b/src/yaz-socket-manager.cpp index 7d771ad..c78c529 100644 --- a/src/yaz-socket-manager.cpp +++ b/src/yaz-socket-manager.cpp @@ -1,10 +1,11 @@ -/* - * Copyright (c) 1998-2005, Index Data. +/* This file is part of the yazpp toolkit. + * Copyright (C) 1998-2013 Index Data and Mike Taylor * See the file LICENSE for details. - * - * $Id: yaz-socket-manager.cpp,v 1.38 2007-11-09 22:10:10 adam Exp $ */ +#if HAVE_CONFIG_H +#include +#endif #if HAVE_SYS_TIME_H #include #endif @@ -19,6 +20,7 @@ #include #include #include +#include #include @@ -27,27 +29,66 @@ using namespace yazpp_1; -SocketManager::SocketEntry **SocketManager::lookupObserver( +struct SocketManager::SocketEntry { + ISocketObserver *observer; + int fd; + unsigned mask; + int timeout; + int timeout_this; + time_t last_activity; + SocketEntry *next; +}; + +struct SocketManager::SocketEvent { + ISocketObserver *observer; + int event; + SocketEvent *next; // front in queue + SocketEvent *prev; // back in queue +}; + +struct SocketManager::Rep { + void putEvent(SocketEvent *event); + SocketEvent *getEvent(); + void removeEvent(ISocketObserver *observer); + void inspect_poll_result(int res, struct yaz_poll_fd *fds, int no_fds, + int timeout); + SocketEntry **lookupObserver(ISocketObserver *observer); + SocketEntry *observers; // all registered observers + SocketEvent *queue_front; + SocketEvent *queue_back; + int log; +}; + +SocketManager::SocketEntry **SocketManager::Rep::lookupObserver( ISocketObserver *observer) { SocketEntry **se; - - for (se = &m_observers; *se; se = &(*se)->next) + + for (se = &observers; *se; se = &(*se)->next) if ((*se)->observer == observer) break; return se; } +int SocketManager::getNumberOfObservers() +{ + int i = 0; + SocketEntry *se; + for (se = m_p->observers; se; se = se->next, i++) + ; + return i; +} + void SocketManager::addObserver(int fd, ISocketObserver *observer) { SocketEntry *se; - se = *lookupObserver(observer); + se = *m_p->lookupObserver(observer); if (!se) { se = new SocketEntry; - se->next= m_observers; - m_observers = se; + se->next= m_p->observers; + m_p->observers = se; se->observer = observer; } se->fd = fd; @@ -58,10 +99,10 @@ void SocketManager::addObserver(int fd, ISocketObserver *observer) void SocketManager::deleteObserver(ISocketObserver *observer) { - SocketEntry **se = lookupObserver(observer); + SocketEntry **se = m_p->lookupObserver(observer); if (*se) { - removeEvent (observer); + m_p->removeEvent(observer); SocketEntry *se_tmp = *se; *se = (*se)->next; delete se_tmp; @@ -70,27 +111,27 @@ void SocketManager::deleteObserver(ISocketObserver *observer) void SocketManager::deleteObservers() { - SocketEntry *se = m_observers; - + SocketEntry *se = m_p->observers; + while (se) { SocketEntry *se_next = se->next; delete se; se = se_next; } - m_observers = 0; + m_p->observers = 0; } void SocketManager::maskObserver(ISocketObserver *observer, int mask) { SocketEntry *se; - yaz_log(m_log, "obs=%p read=%d write=%d except=%d", observer, + yaz_log(m_p->log, "obs=%p read=%d write=%d except=%d", observer, mask & SOCKET_OBSERVE_READ, mask & SOCKET_OBSERVE_WRITE, mask & SOCKET_OBSERVE_EXCEPT); - se = *lookupObserver(observer); + se = *m_p->lookupObserver(observer); if (se) se->mask = mask; } @@ -100,24 +141,34 @@ void SocketManager::timeoutObserver(ISocketObserver *observer, { SocketEntry *se; - se = *lookupObserver(observer); + se = *m_p->lookupObserver(observer); if (se) se->timeout = timeout; } - -void SocketManager::inspect_poll_result(int res, struct yaz_poll_fd *fds, - int no_fds, int timeout) +void SocketManager::Rep::inspect_poll_result(int res, struct yaz_poll_fd *fds, + int no_fds, int timeout) { - yaz_log(m_log, "yaz_poll returned res=%d", res); + yaz_log(log, "yaz_poll returned res=%d", res); time_t now = time(0); int i; int no_put_events = 0; - SocketEntry *p; + int no_lost_observers = 0; - for (i = 0, p = m_observers; p; p = p->next, i++) + for (i = 0; i < no_fds; i++) { + SocketEntry *p; + for (p = observers; p; p = p->next) + if (p->fd == fds[i].fd) + break; + if (!p) + { + // m_p->observers list changed since poll started + no_lost_observers++; + continue; + } + enum yaz_poll_mask output_mask = fds[i].output_mask; int mask = 0; @@ -129,29 +180,30 @@ void SocketManager::inspect_poll_result(int res, struct yaz_poll_fd *fds, if (output_mask & yaz_poll_except) mask |= SOCKET_OBSERVE_EXCEPT; - + if (mask) { SocketEvent *event = new SocketEvent; p->last_activity = now; event->observer = p->observer; event->event = mask; - putEvent (event); + putEvent(event); no_put_events++; - yaz_log (m_log, "putEvent I/O mask=%d", mask); + yaz_log(log, "putEvent I/O mask=%d", mask); } else if (res == 0 && p->timeout_this == timeout) { SocketEvent *event = new SocketEvent; - assert (p->last_activity); - yaz_log (m_log, "putEvent timeout fd=%d, now = %ld last_activity=%ld timeout=%d", - p->fd, now, p->last_activity, p->timeout); + assert(p->last_activity); + yaz_log(log, "putEvent timeout fd=%d, now = %ld " + "last_activity=%ld timeout=%d", + p->fd, now, p->last_activity, p->timeout); p->last_activity = now; event->observer = p->observer; event->event = SOCKET_OBSERVE_TIMEOUT; - putEvent (event); + putEvent(event); no_put_events++; - + } } SocketEvent *event = getEvent(); @@ -162,21 +214,23 @@ void SocketManager::inspect_poll_result(int res, struct yaz_poll_fd *fds, } else { - // bug #2035 - - yaz_log(YLOG_WARN, "unhandled socket event. yaz_poll returned %d", res); - yaz_log(YLOG_WARN, "no_put_events=%d no_fds=%d i=%d timeout=%d", - no_put_events, no_fds, i, timeout); - abort(); + if (no_lost_observers == 0) + { + // bug #2035 + yaz_log(YLOG_WARN, "unhandled socket event. yaz_poll returned %d", + res); + yaz_log(YLOG_WARN, "no_put_events=%d no_fds=%d i=%d timeout=%d", + no_put_events, no_fds, i, timeout); + } } } int SocketManager::processEvent() { SocketEntry *p; - SocketEvent *event = getEvent(); + SocketEvent *event = m_p->getEvent(); int timeout = -1; - yaz_log (m_log, "SocketManager::processEvent manager=%p", this); + yaz_log(m_p->log, "SocketManager::processEvent manager=%p", this); if (event) { event->observer->socketNotify(event->event); @@ -188,13 +242,13 @@ int SocketManager::processEvent() time_t now = time(0); int i; int no_fds = 0; - for (p = m_observers; p; p = p->next) + for (p = m_p->observers; p; p = p->next) no_fds++; if (!no_fds) return 0; struct yaz_poll_fd *fds = new yaz_poll_fd [no_fds]; - for (i = 0, p = m_observers; p; p = p->next, i++) + for (i = 0, p = m_p->observers; p; p = p->next, i++) { fds[i].fd = p->fd; int input_mask = 0; @@ -218,8 +272,8 @@ int SocketManager::processEvent() if (timeout == -1 || timeout_this < timeout) timeout = timeout_this; p->timeout_this = timeout_this; - yaz_log (m_log, "SocketManager::select timeout_this=%d", - p->timeout_this); + yaz_log(m_p->log, "SocketManager::select timeout_this=%d", + p->timeout_this); } else p->timeout_this = -1; @@ -227,66 +281,66 @@ int SocketManager::processEvent() } int pass = 0; - while ((res = yaz_poll(fds, no_fds, timeout, 0)) < 0) + while ((res = yaz_poll(fds, no_fds, timeout, 0)) < 0 && pass < 10) { - if (errno != EINTR) + if (errno == EINTR) { - yaz_log(YLOG_ERRNO|YLOG_WARN, "yaz_poll"); - yaz_log(YLOG_WARN, "errno=%d timeout=%d", errno, timeout); - if (++pass > 10) - return -1; + delete [] fds; + return 1; } + yaz_log(YLOG_ERRNO|YLOG_WARN, "yaz_poll"); + yaz_log(YLOG_WARN, "errno=%d timeout=%d", errno, timeout); } - inspect_poll_result(res, fds, no_fds, timeout); + if (res >= 0) + m_p->inspect_poll_result(res, fds, no_fds, timeout); delete [] fds; - return 1; + return res >= 0 ? 1 : -1; } - // n p n p ...... n p n p // front back -void SocketManager::putEvent(SocketEvent *event) +void SocketManager::Rep::putEvent(SocketEvent *event) { // put in back of queue - if (m_queue_back) + if (queue_back) { - m_queue_back->prev = event; - assert (m_queue_front); + queue_back->prev = event; + assert(queue_front); } else { - assert (!m_queue_front); - m_queue_front = event; + assert(!queue_front); + queue_front = event; } - event->next = m_queue_back; + event->next = queue_back; event->prev = 0; - m_queue_back = event; + queue_back = event; } -SocketManager::SocketEvent *SocketManager::getEvent() +SocketManager::SocketEvent *SocketManager::Rep::getEvent() { // get from front of queue - SocketEvent *event = m_queue_front; + SocketEvent *event = queue_front; if (!event) return 0; - assert (m_queue_back); - m_queue_front = event->prev; - if (m_queue_front) + assert(queue_back); + queue_front = event->prev; + if (queue_front) { - assert (m_queue_back); - m_queue_front->next = 0; + assert(queue_back); + queue_front->next = 0; } else - m_queue_back = 0; + queue_back = 0; return event; } -void SocketManager::removeEvent(ISocketObserver *observer) +void SocketManager::Rep::removeEvent(ISocketObserver *observer) { - SocketEvent *ev = m_queue_back; + SocketEvent *ev = queue_back; while (ev) { SocketEvent *ev_next = ev->next; @@ -295,11 +349,11 @@ void SocketManager::removeEvent(ISocketObserver *observer) if (ev->prev) ev->prev->next = ev->next; else - m_queue_back = ev->next; + queue_back = ev->next; if (ev->next) ev->next->prev = ev->prev; else - m_queue_front = ev->prev; + queue_front = ev->prev; delete ev; } ev = ev_next; @@ -308,19 +362,22 @@ void SocketManager::removeEvent(ISocketObserver *observer) SocketManager::SocketManager() { - m_observers = 0; - m_queue_front = 0; - m_queue_back = 0; - m_log = YLOG_DEBUG; + m_p = new Rep; + m_p->observers = 0; + m_p->queue_front = 0; + m_p->queue_back = 0; + m_p->log = YLOG_DEBUG; } SocketManager::~SocketManager() { deleteObservers(); + delete m_p; } /* * Local variables: * c-basic-offset: 4 + * c-file-style: "Stroustrup" * indent-tabs-mode: nil * End: * vim: shiftwidth=4 tabstop=8 expandtab