1 /* This file is part of the yazpp toolkit.
2 * Copyright (C) Index Data
3 * See the file LICENSE for details.
13 #include <sys/types.h>
27 #include <yazpp/socket-manager.h>
30 using namespace yazpp_1;
32 struct SocketManager::SocketEntry {
33 ISocketObserver *observer;
42 struct SocketManager::SocketEvent {
43 ISocketObserver *observer;
45 SocketEvent *next; // front in queue
46 SocketEvent *prev; // back in queue
49 struct SocketManager::Rep {
50 void putEvent(SocketEvent *event);
51 SocketEvent *getEvent();
52 void removeEvent(ISocketObserver *observer);
53 void inspect_poll_result(int res, struct yaz_poll_fd *fds, int no_fds,
55 SocketEntry **lookupObserver(ISocketObserver *observer);
56 SocketEntry *observers; // all registered observers
57 SocketEvent *queue_front;
58 SocketEvent *queue_back;
62 SocketManager::SocketEntry **SocketManager::Rep::lookupObserver(
63 ISocketObserver *observer)
67 for (se = &observers; *se; se = &(*se)->next)
68 if ((*se)->observer == observer)
73 int SocketManager::getNumberOfObservers()
77 for (se = m_p->observers; se; se = se->next, i++)
82 void SocketManager::addObserver(ISocketObserver *observer)
86 se = *m_p->lookupObserver(observer);
90 se->next= m_p->observers;
92 se->observer = observer;
96 se->last_activity = 0;
100 void SocketManager::deleteObserver(ISocketObserver *observer)
102 SocketEntry **se = m_p->lookupObserver(observer);
105 m_p->removeEvent(observer);
106 SocketEntry *se_tmp = *se;
112 void SocketManager::deleteObservers()
114 SocketEntry *se = m_p->observers;
118 SocketEntry *se_next = se->next;
125 void SocketManager::maskObserver(ISocketObserver *observer, int mask, int fd)
129 yaz_log(m_p->log, "obs=%p read=%d write=%d except=%d fd=%d", observer,
130 mask & SOCKET_OBSERVE_READ,
131 mask & SOCKET_OBSERVE_WRITE,
132 mask & SOCKET_OBSERVE_EXCEPT, fd);
134 se = *m_p->lookupObserver(observer);
142 void SocketManager::timeoutObserver(ISocketObserver *observer,
147 se = *m_p->lookupObserver(observer);
149 se->timeout = timeout;
152 void SocketManager::Rep::inspect_poll_result(int res, struct yaz_poll_fd *fds,
153 int no_fds, int timeout)
156 yaz_log(log, "yaz_poll returned res=%d", res);
157 time_t now = time(0);
159 int no_put_events = 0;
160 int no_lost_observers = 0;
162 for (i = 0; i < no_fds; i++)
165 for (p = observers; p; p = p->next)
166 if (p->fd == fds[i].fd)
170 // m_p->observers list changed since poll started
175 enum yaz_poll_mask output_mask = fds[i].output_mask;
178 if (output_mask & yaz_poll_read)
179 mask |= SOCKET_OBSERVE_READ;
181 if (output_mask & yaz_poll_write)
182 mask |= SOCKET_OBSERVE_WRITE;
184 if (output_mask & yaz_poll_except)
185 mask |= SOCKET_OBSERVE_EXCEPT;
189 SocketEvent *event = new SocketEvent;
190 p->last_activity = now;
191 event->observer = p->observer;
195 yaz_log(log, "putEvent I/O mask=%d", mask);
197 else if (res == 0 && p->timeout_this == timeout)
199 SocketEvent *event = new SocketEvent;
200 assert(p->last_activity);
201 yaz_log(log, "putEvent timeout fd=%d, now = %ld "
202 "last_activity=%ld timeout=%d",
203 p->fd, now, p->last_activity, p->timeout);
204 p->last_activity = now;
205 event->observer = p->observer;
206 event->event = SOCKET_OBSERVE_TIMEOUT;
212 SocketEvent *event = getEvent();
215 event->observer->socketNotify(event->event);
220 if (no_lost_observers == 0)
223 yaz_log(YLOG_WARN, "unhandled socket event. yaz_poll returned %d",
225 yaz_log(YLOG_WARN, "no_put_events=%d no_fds=%d i=%d timeout=%d",
226 no_put_events, no_fds, i, timeout);
231 int SocketManager::processEvent()
234 SocketEvent *event = m_p->getEvent();
236 yaz_log(m_p->log, "SocketManager::processEvent manager=%p", this);
239 event->observer->socketNotify(event->event);
245 time_t now = time(0);
248 for (p = m_p->observers; p; p = p->next)
253 struct yaz_poll_fd *fds = new yaz_poll_fd [no_fds];
254 for (i = 0, p = m_p->observers; p; p = p->next, i++)
258 if (p->mask & SOCKET_OBSERVE_READ)
259 input_mask += yaz_poll_read;
260 if (p->mask & SOCKET_OBSERVE_WRITE)
261 input_mask += yaz_poll_write;
262 if (p->mask & SOCKET_OBSERVE_EXCEPT)
263 input_mask += yaz_poll_except;
264 if (p->timeout > 0 ||
265 (p->timeout == 0 && (p->mask & SOCKET_OBSERVE_WRITE) == 0))
268 timeout_this = p->timeout;
269 if (p->last_activity)
270 timeout_this -= now - p->last_activity;
272 p->last_activity = now;
273 if (timeout_this < 0 || timeout_this > 2147483646)
275 if (timeout == -1 || timeout_this < timeout)
276 timeout = timeout_this;
277 p->timeout_this = timeout_this;
278 yaz_log(m_p->log, "SocketManager::select timeout_this=%d",
282 p->timeout_this = -1;
283 fds[i].input_mask = (enum yaz_poll_mask) input_mask;
287 while ((res = yaz_poll(fds, no_fds, timeout, 0)) < 0 && pass < 10)
294 yaz_log(YLOG_ERRNO|YLOG_WARN, "yaz_poll");
295 yaz_log(YLOG_WARN, "errno=%d timeout=%d", errno, timeout);
299 m_p->inspect_poll_result(res, fds, no_fds, timeout);
302 return res >= 0 ? 1 : -1;
305 // n p n p ...... n p n p
308 void SocketManager::Rep::putEvent(SocketEvent *event)
310 // put in back of queue
313 queue_back->prev = event;
318 assert(!queue_front);
321 event->next = queue_back;
326 SocketManager::SocketEvent *SocketManager::Rep::getEvent()
328 // get from front of queue
329 SocketEvent *event = queue_front;
333 queue_front = event->prev;
337 queue_front->next = 0;
344 void SocketManager::Rep::removeEvent(ISocketObserver *observer)
346 SocketEvent *ev = queue_back;
349 SocketEvent *ev_next = ev->next;
350 if (observer == ev->observer)
353 ev->prev->next = ev->next;
355 queue_back = ev->next;
357 ev->next->prev = ev->prev;
359 queue_front = ev->prev;
366 SocketManager::SocketManager()
370 m_p->queue_front = 0;
372 m_p->log = YLOG_DEBUG;
375 SocketManager::~SocketManager()
383 * c-file-style: "Stroustrup"
384 * indent-tabs-mode: nil
386 * vim: shiftwidth=4 tabstop=8 expandtab