Use simpler var for timeout in event loop
[pazpar2-moved-to-github.git] / src / eventl.c
1 /* This file is part of Pazpar2.
2  Copyright (C) Index Data
3
4  Pazpar2 is free software; you can redistribute it and/or modify it under
5  the terms of the GNU General Public License as published by the Free
6  Software Foundation; either version 2, or (at your option) any later
7  version.
8
9  Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10  WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  for more details.
13
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18  */
19
20 /*
21  * Based on  ParaZ - a simple tool for harvesting performance data for
22  * parallel operations using Z39.50.
23  * Copyright (C) Index Data ApS
24  * See LICENSE file for details.
25  */
26
27 /*
28  * Based on revision YAZ' server/eventl.c 1.29.
29  */
30
31 #if HAVE_CONFIG_H
32 #include <config.h>
33 #endif
34
35 #include <math.h>
36 #include <stdio.h>
37 #include <assert.h>
38
39 #ifdef WIN32
40 #include <winsock.h>
41 #else
42 #include <unistd.h>
43 #endif
44 #if HAVE_SYS_TIME_H
45 #include <sys/time.h>
46 #endif
47
48 #include <stdlib.h>
49 #include <errno.h>
50 #include <string.h>
51
52 #include <yaz/yconfig.h>
53 #include <yaz/log.h>
54 #include <yaz/comstack.h>
55 #include <yaz/xmalloc.h>
56 #include <yaz/mutex.h>
57 #include <yaz/poll.h>
58 #include "eventl.h"
59 #include "sel_thread.h"
60
61 #if 1
62 static YAZ_MUTEX g_mutex = 0;
63 static int no_iochans = 0;
64 static int no_iochans_total = 0;
65
66 static int iochan_use(int delta)
67 {
68     int iochans;
69     if (!g_mutex)
70         yaz_mutex_create(&g_mutex);
71     yaz_mutex_enter(g_mutex);
72     no_iochans += delta;
73     if (delta > 0)
74         no_iochans_total += delta;
75     iochans = no_iochans;
76     yaz_mutex_leave(g_mutex);
77     yaz_log(YLOG_DEBUG, "%s iochans=%d",
78             delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), iochans);
79     return iochans;
80 }
81
82 int iochans_count(void)
83 {
84     return iochan_use(0);
85 }
86
87 int iochans_count_total(void)
88 {
89     int total = 0;
90     if (!g_mutex)
91         return 0;
92     yaz_mutex_enter(g_mutex);
93     total = no_iochans_total;
94     yaz_mutex_leave(g_mutex);
95     return total;
96 }
97 #else
98 #define iochan_use(x)
99 #define iochans_count(x) 0
100 #define iochans_count_total(x) 0
101 #endif
102
103 struct iochan_man_s {
104     IOCHAN channel_list;
105     sel_thread_t sel_thread;
106     int sel_fd;
107     int no_threads;
108     int log_level;
109     YAZ_MUTEX iochan_mutex;
110     int size_fds;
111     struct yaz_poll_fd *fds;
112 };
113
114 iochan_man_t iochan_man_create(int no_threads)
115 {
116     iochan_man_t man = xmalloc(sizeof(*man));
117     man->channel_list = 0;
118     man->sel_thread = 0; /* can't create sel_thread yet because we may fork */
119     man->sel_fd = -1;
120     man->no_threads = no_threads;
121     man->log_level = yaz_log_module_level("iochan");
122     man->iochan_mutex = 0;
123     man->size_fds = 0;
124     man->fds = 0;
125     yaz_mutex_create(&man->iochan_mutex);
126     return man;
127 }
128
129 IOCHAN iochan_destroy_real(IOCHAN chan)
130 {
131     IOCHAN next = chan->next;
132     if (chan->name)
133         xfree(chan->name);
134     xfree(chan);
135     iochan_use(-1);
136     return next;
137 }
138
139 void iochan_man_destroy(iochan_man_t *mp)
140 {
141     if (*mp)
142     {
143         IOCHAN c;
144         if ((*mp)->sel_thread)
145             sel_thread_destroy((*mp)->sel_thread);
146
147         yaz_mutex_enter((*mp)->iochan_mutex);
148         c = (*mp)->channel_list;
149         (*mp)->channel_list = NULL;
150         yaz_mutex_leave((*mp)->iochan_mutex);
151         while (c)
152             c = iochan_destroy_real(c);
153         yaz_mutex_destroy(&(*mp)->iochan_mutex);
154         xfree((*mp)->fds);
155         xfree(*mp);
156         *mp = 0;
157     }
158 }
159
160 void iochan_add(iochan_man_t man, IOCHAN chan)
161 {
162     chan->man = man;
163     yaz_mutex_enter(man->iochan_mutex);
164     yaz_log(man->log_level, "iochan_add : chan=%p channel list=%p", chan,
165             man->channel_list);
166     chan->next = man->channel_list;
167     man->channel_list = chan;
168     yaz_mutex_leave(man->iochan_mutex);
169 }
170
171 IOCHAN iochan_create(int fd, IOC_CALLBACK cb, int flags, const char *name)
172 {
173     IOCHAN new_iochan;
174
175     if (!(new_iochan = (IOCHAN) xmalloc(sizeof(*new_iochan))))
176         return 0;
177     iochan_use(1);
178     new_iochan->destroyed = 0;
179     new_iochan->fd = fd;
180     new_iochan->flags = flags;
181     new_iochan->fun = cb;
182     new_iochan->last_event = new_iochan->max_idle = 0;
183     new_iochan->next = NULL;
184     new_iochan->man = 0;
185     new_iochan->thread_users = 0;
186     new_iochan->name = name ? xstrdup(name) : 0;
187     return new_iochan;
188 }
189
190 static void work_handler(void *work_data)
191 {
192     IOCHAN p = work_data;
193
194     yaz_log(p->man->log_level, "eventl: work begin chan=%p name=%s event=%d",
195             p, p->name ? p->name : "", p->this_event);
196
197     if (!p->destroyed && (p->this_event & EVENT_TIMEOUT))
198         (*p->fun)(p, EVENT_TIMEOUT);
199     if (!p->destroyed && (p->this_event & EVENT_INPUT))
200         (*p->fun)(p, EVENT_INPUT);
201     if (!p->destroyed && (p->this_event & EVENT_OUTPUT))
202         (*p->fun)(p, EVENT_OUTPUT);
203     if (!p->destroyed && (p->this_event & EVENT_EXCEPT))
204         (*p->fun)(p, EVENT_EXCEPT);
205
206     yaz_log(p->man->log_level, "eventl: work end chan=%p name=%s event=%d", p,
207             p->name ? p->name : "", p->this_event);
208 }
209
210 static void run_fun(iochan_man_t man, IOCHAN p)
211 {
212     if (man->sel_thread)
213     {
214         yaz_log(man->log_level,
215                 "eventl: work add chan=%p name=%s event=%d", p,
216                 p->name ? p->name : "", p->this_event);
217         p->thread_users++;
218         sel_thread_add(man->sel_thread, p);
219     }
220     else
221         work_handler(p);
222 }
223
224 static int event_loop(iochan_man_t man, IOCHAN *iochans)
225 {
226     do /* loop as long as there are active associations to process */
227     {
228         IOCHAN p, *nextp;
229         IOCHAN start;
230         IOCHAN inv_start;
231         int res;
232         struct yaz_poll_fd *fds;
233         int i, no_fds = 0;
234         int connection_fired = 0;
235         int tv_sec = 300;
236
237         yaz_mutex_enter(man->iochan_mutex);
238         start = man->channel_list;
239         yaz_mutex_leave(man->iochan_mutex);
240         inv_start = start;
241         for (p = start; p; p = p->next)
242             no_fds++;
243         if (man->sel_fd != -1)
244             no_fds++;
245         if (no_fds > man->size_fds)
246         {
247             man->size_fds = no_fds * 2;
248             man->fds = xrealloc(man->fds, man->size_fds * sizeof(*man->fds));
249         }
250         fds = man->fds;
251         i = 0;
252         if (man->sel_fd != -1)
253         {
254             fds[i].fd = man->sel_fd;
255             fds[i].input_mask = yaz_poll_read;
256             fds[i].client_data = 0;
257             i++;
258         }
259         for (p = start; p; p = p->next, i++)
260         {
261             fds[i].client_data = p;
262             fds[i].fd = p->fd;
263             fds[i].input_mask = 0;
264             if (p->thread_users > 0)
265                 continue;
266             if (p->max_idle && p->max_idle < tv_sec)
267                 tv_sec = p->max_idle;
268             if (p->fd < 0)
269                 continue;
270             if (p->flags & EVENT_INPUT)
271                 fds[i].input_mask |= yaz_poll_read;
272             if (p->flags & EVENT_OUTPUT)
273                 fds[i].input_mask |= yaz_poll_write;
274             if (p->flags & EVENT_EXCEPT)
275                 fds[i].input_mask |= yaz_poll_except;
276         }
277         yaz_log(man->log_level, "yaz_poll begin nofds=%d", no_fds);
278         res = yaz_poll(fds, no_fds, tv_sec, 0);
279         yaz_log(man->log_level, "yaz_poll returned res=%d", res);
280         if (res < 0)
281         {
282             if (errno == EINTR)
283                 continue;
284             else
285             {
286                 yaz_log(YLOG_ERRNO | YLOG_WARN, "poll");
287                 return 0;
288             }
289         }
290         i = 0;
291         if (man->sel_fd != -1)
292         {
293             assert(fds[i].fd == man->sel_fd);
294             if (fds[i].output_mask)
295             {
296                 IOCHAN chan;
297
298                 yaz_log(man->log_level, "eventl: sel input on sel_fd=%d",
299                         man->sel_fd);
300                 while ((chan = sel_thread_result(man->sel_thread)))
301                 {
302                     yaz_log(man->log_level,
303                             "eventl: got thread result chan=%p name=%s", chan,
304                             chan->name ? chan->name : "");
305                     chan->thread_users--;
306                 }
307             }
308             i++;
309         }
310         if (man->log_level)
311         {
312             int no = 0;
313             for (p = start; p; p = p->next)
314                 no++;
315             yaz_log(man->log_level, "%d channels", no);
316         }
317         for (; i < no_fds; i++)
318         {
319             time_t now = time(0);
320             p = fds[i].client_data;
321
322             if (p->destroyed)
323             {
324                 yaz_log(man->log_level,
325                         "eventl: skip destroyed chan=%p name=%s", p,
326                         p->name ? p->name : "");
327                 continue;
328             }
329             if (p->thread_users > 0)
330             {
331                 yaz_log(man->log_level,
332                         "eventl: skip chan=%p name=%s users=%d", p,
333                         p->name ? p->name : "", p->thread_users);
334                 continue;
335             }
336             p->this_event = 0;
337             if (p->max_idle && now - p->last_event > p->max_idle)
338             {
339                 p->last_event = now;
340                 p->this_event |= EVENT_TIMEOUT;
341             }
342             if (fds[i].fd >= 0)
343             {
344                 if (fds[i].output_mask & yaz_poll_read)
345                 {
346                     p->last_event = now;
347                     p->this_event |= EVENT_INPUT;
348                 }
349                 if (fds[i].output_mask & yaz_poll_write)
350                 {
351                     p->last_event = now;
352                     p->this_event |= EVENT_OUTPUT;
353                 }
354                 if (fds[i].output_mask & yaz_poll_except)
355                 {
356                     p->last_event = now;
357                     p->this_event |= EVENT_EXCEPT;
358                 }
359             }
360             /* only fire one Z39.50/SRU socket event.. except for timeout */
361             if (p->this_event)
362             {
363                 if (!(p->this_event & EVENT_TIMEOUT) &&
364                     !strcmp(p->name, "connection_socket"))
365                 {
366                     /* not a timeout and we have a Z39.50/SRU socket */
367                     if (connection_fired == 0)
368                         run_fun(man, p);
369                     connection_fired++;
370                 }
371                 else
372                     run_fun(man, p);
373             }
374         }
375
376         assert(inv_start == start);
377         yaz_mutex_enter(man->iochan_mutex);
378         for (nextp = iochans; *nextp; )
379         {
380             IOCHAN p = *nextp;
381             if (p->destroyed && p->thread_users == 0)
382                 *nextp = iochan_destroy_real(p);
383             else
384                 nextp = &p->next;
385         }
386         yaz_mutex_leave(man->iochan_mutex);
387     } while (*iochans);
388     return 0;
389 }
390
391 void iochan_man_events(iochan_man_t man)
392 {
393     if (man->no_threads > 0 && !man->sel_thread)
394     {
395         man->sel_thread = sel_thread_create(work_handler, 0 /*work_destroy */,
396                 &man->sel_fd, man->no_threads);
397         yaz_log(man->log_level, "iochan_man_events. Using %d threads",
398                 man->no_threads);
399     }
400     event_loop(man, &man->channel_list);
401 }
402
403 void pazpar2_sleep(double d)
404 {
405 #ifdef WIN32
406     Sleep( (DWORD) (d * 1000));
407 #else
408     struct timeval tv;
409     tv.tv_sec = floor(d);
410     tv.tv_usec = (d - floor(d)) * 1000000;
411     select(0, 0, 0, 0, &tv);
412 #endif
413 }
414
415 /*
416  * Local variables:
417  * c-basic-offset: 4
418  * c-file-style: "Stroustrup"
419  * indent-tabs-mode: nil
420  * End:
421  * vim: shiftwidth=4 tabstop=8 expandtab
422  */
423