log tv_sec in call to yaz_poll
[pazpar2-moved-to-github.git] / src / eventl.c
1 /* This file is part of Pazpar2.
2  Copyright (C) Index Data
3
4  Pazpar2 is free software; you can redistribute it and/or modify it under
5  the terms of the GNU General Public License as published by the Free
6  Software Foundation; either version 2, or (at your option) any later
7  version.
8
9  Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10  WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  for more details.
13
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18  */
19
20 /*
21  * Based on  ParaZ - a simple tool for harvesting performance data for
22  * parallel operations using Z39.50.
23  * Copyright (C) Index Data ApS
24  * See LICENSE file for details.
25  */
26
27 /*
28  * Based on revision YAZ' server/eventl.c 1.29.
29  */
30
31 #if HAVE_CONFIG_H
32 #include <config.h>
33 #endif
34
35 #include <math.h>
36 #include <stdio.h>
37 #include <assert.h>
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #include <stdlib.h>
44 #include <errno.h>
45 #include <string.h>
46
47 #include <yaz/yconfig.h>
48 #include <yaz/log.h>
49 #include <yaz/comstack.h>
50 #include <yaz/xmalloc.h>
51 #include <yaz/mutex.h>
52 #include <yaz/poll.h>
53 #include "eventl.h"
54 #include "sel_thread.h"
55
56 #if 1
57 static YAZ_MUTEX g_mutex = 0;
58 static int no_iochans = 0;
59 static int no_iochans_total = 0;
60
61 static int iochan_use(int delta)
62 {
63     int iochans;
64     if (!g_mutex)
65         yaz_mutex_create(&g_mutex);
66     yaz_mutex_enter(g_mutex);
67     no_iochans += delta;
68     if (delta > 0)
69         no_iochans_total += delta;
70     iochans = no_iochans;
71     yaz_mutex_leave(g_mutex);
72     yaz_log(YLOG_DEBUG, "%s iochans=%d",
73             delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), iochans);
74     return iochans;
75 }
76
77 int iochans_count(void)
78 {
79     return iochan_use(0);
80 }
81
82 int iochans_count_total(void)
83 {
84     int total = 0;
85     if (!g_mutex)
86         return 0;
87     yaz_mutex_enter(g_mutex);
88     total = no_iochans_total;
89     yaz_mutex_leave(g_mutex);
90     return total;
91 }
92 #else
93 #define iochan_use(x)
94 #define iochans_count(x) 0
95 #define iochans_count_total(x) 0
96 #endif
97
98 struct iochan_man_s {
99     IOCHAN channel_list;
100     sel_thread_t sel_thread;
101     int sel_fd;
102     int no_threads;
103     int log_level;
104     YAZ_MUTEX iochan_mutex;
105     int size_fds;
106     struct yaz_poll_fd *fds;
107 };
108
109 iochan_man_t iochan_man_create(int no_threads)
110 {
111     iochan_man_t man = xmalloc(sizeof(*man));
112     man->channel_list = 0;
113     man->sel_thread = 0; /* can't create sel_thread yet because we may fork */
114     man->sel_fd = -1;
115     man->no_threads = no_threads;
116     man->log_level = yaz_log_module_level("iochan");
117     man->iochan_mutex = 0;
118     man->size_fds = 0;
119     man->fds = 0;
120     yaz_mutex_create(&man->iochan_mutex);
121     return man;
122 }
123
124 IOCHAN iochan_destroy_real(IOCHAN chan)
125 {
126     IOCHAN next = chan->next;
127     if (chan->name)
128         xfree(chan->name);
129     xfree(chan);
130     iochan_use(-1);
131     return next;
132 }
133
134 void iochan_man_destroy(iochan_man_t *mp)
135 {
136     if (*mp)
137     {
138         IOCHAN c;
139         if ((*mp)->sel_thread)
140             sel_thread_destroy((*mp)->sel_thread);
141
142         yaz_mutex_enter((*mp)->iochan_mutex);
143         c = (*mp)->channel_list;
144         (*mp)->channel_list = NULL;
145         yaz_mutex_leave((*mp)->iochan_mutex);
146         while (c)
147             c = iochan_destroy_real(c);
148         yaz_mutex_destroy(&(*mp)->iochan_mutex);
149         xfree((*mp)->fds);
150         xfree(*mp);
151         *mp = 0;
152     }
153 }
154
155 void iochan_add(iochan_man_t man, IOCHAN chan)
156 {
157     chan->man = man;
158     yaz_mutex_enter(man->iochan_mutex);
159     yaz_log(man->log_level, "iochan_add : chan=%p channel list=%p", chan,
160             man->channel_list);
161     chan->next = man->channel_list;
162     man->channel_list = chan;
163     yaz_mutex_leave(man->iochan_mutex);
164 }
165
166 IOCHAN iochan_create(int fd, IOC_CALLBACK cb, int flags, const char *name)
167 {
168     IOCHAN new_iochan;
169
170     if (!(new_iochan = (IOCHAN) xmalloc(sizeof(*new_iochan))))
171         return 0;
172     iochan_use(1);
173     new_iochan->destroyed = 0;
174     new_iochan->fd = fd;
175     new_iochan->flags = flags;
176     new_iochan->fun = cb;
177     new_iochan->last_event = new_iochan->max_idle = 0;
178     new_iochan->next = NULL;
179     new_iochan->man = 0;
180     new_iochan->thread_users = 0;
181     new_iochan->name = name ? xstrdup(name) : 0;
182     return new_iochan;
183 }
184
185 static void work_handler(void *work_data)
186 {
187     IOCHAN p = work_data;
188
189     yaz_log(p->man->log_level, "eventl: work begin chan=%p name=%s event=%d",
190             p, p->name ? p->name : "", p->this_event);
191
192     if (!p->destroyed && (p->this_event & EVENT_TIMEOUT))
193         (*p->fun)(p, EVENT_TIMEOUT);
194     if (!p->destroyed && (p->this_event & EVENT_INPUT))
195         (*p->fun)(p, EVENT_INPUT);
196     if (!p->destroyed && (p->this_event & EVENT_OUTPUT))
197         (*p->fun)(p, EVENT_OUTPUT);
198     if (!p->destroyed && (p->this_event & EVENT_EXCEPT))
199         (*p->fun)(p, EVENT_EXCEPT);
200
201     yaz_log(p->man->log_level, "eventl: work end chan=%p name=%s event=%d", p,
202             p->name ? p->name : "", p->this_event);
203 }
204
205 static void run_fun(iochan_man_t man, IOCHAN p)
206 {
207     if (man->sel_thread)
208     {
209         yaz_log(man->log_level,
210                 "eventl: work add chan=%p name=%s event=%d", p,
211                 p->name ? p->name : "", p->this_event);
212         p->thread_users++;
213         sel_thread_add(man->sel_thread, p);
214     }
215     else
216         work_handler(p);
217 }
218
219 static int event_loop(iochan_man_t man, IOCHAN *iochans)
220 {
221     do /* loop as long as there are active associations to process */
222     {
223         IOCHAN p, *nextp;
224         IOCHAN start;
225         IOCHAN inv_start;
226         int res;
227         struct yaz_poll_fd *fds;
228         int i, no_fds = 0;
229         int connection_fired = 0;
230         int tv_sec = 300;
231
232         yaz_mutex_enter(man->iochan_mutex);
233         start = man->channel_list;
234         yaz_mutex_leave(man->iochan_mutex);
235         inv_start = start;
236         for (p = start; p; p = p->next)
237             no_fds++;
238         if (man->sel_fd != -1)
239             no_fds++;
240         if (no_fds > man->size_fds)
241         {
242             man->size_fds = no_fds * 2;
243             man->fds = xrealloc(man->fds, man->size_fds * sizeof(*man->fds));
244         }
245         fds = man->fds;
246         i = 0;
247         if (man->sel_fd != -1)
248         {
249             fds[i].fd = man->sel_fd;
250             fds[i].input_mask = yaz_poll_read;
251             fds[i].client_data = 0;
252             i++;
253         }
254         for (p = start; p; p = p->next, i++)
255         {
256             p->poll_offset = i;
257             fds[i].client_data = p;
258             fds[i].fd = -1;
259             fds[i].input_mask = 0;
260             if (p->thread_users > 0)
261                 continue;
262             if (p->max_idle && p->max_idle < tv_sec)
263                 tv_sec = p->max_idle;
264             if (p->fd < 0)
265                 continue;
266             if (p->flags & EVENT_INPUT)
267                 fds[i].input_mask |= yaz_poll_read;
268             if (p->flags & EVENT_OUTPUT)
269                 fds[i].input_mask |= yaz_poll_write;
270             if (p->flags & EVENT_EXCEPT)
271                 fds[i].input_mask |= yaz_poll_except;
272             if (fds[i].input_mask)
273                 fds[i].fd = p->fd;
274         }
275         assert(i == no_fds);
276         yaz_log(man->log_level, "yaz_poll begin tv_sec=%d nofds=%d", tv_sec,
277                 no_fds);
278         res = yaz_poll(fds, no_fds, tv_sec, 0);
279         yaz_log(man->log_level, "yaz_poll returned res=%d", res);
280         if (res < 0)
281         {
282             if (errno == EINTR)
283                 continue;
284             else
285             {
286                 yaz_log(YLOG_ERRNO | YLOG_WARN, "poll");
287                 abort();
288             }
289         }
290         if (man->sel_fd != -1)
291         {
292             i = 0;
293             assert(fds[i].fd == man->sel_fd);
294             if (fds[i].output_mask)
295             {
296                 IOCHAN chan;
297
298                 yaz_log(man->log_level, "eventl: sel input on sel_fd=%d",
299                         man->sel_fd);
300                 while ((chan = sel_thread_result(man->sel_thread)))
301                 {
302                     yaz_log(man->log_level,
303                             "eventl: got thread result chan=%p name=%s", chan,
304                             chan->name ? chan->name : "");
305                     chan->thread_users--;
306                 }
307             }
308         }
309         if (man->log_level)
310         {
311             int no = 0;
312             for (p = start; p; p = p->next)
313                 no++;
314             yaz_log(man->log_level, "%d channels", no);
315         }
316         for (p = start; p; p = p->next)
317         {
318             time_t now = time(0);
319             i = p->poll_offset;
320
321             if (p->destroyed)
322             {
323                 yaz_log(man->log_level,
324                         "eventl: skip destroyed chan=%p name=%s", p,
325                         p->name ? p->name : "");
326                 continue;
327             }
328             if (p->thread_users > 0)
329             {
330                 yaz_log(man->log_level,
331                         "eventl: skip chan=%p name=%s users=%d", p,
332                         p->name ? p->name : "", p->thread_users);
333                 continue;
334             }
335             p->this_event = 0;
336             if (p->max_idle && now - p->last_event > p->max_idle)
337             {
338                 p->last_event = now;
339                 p->this_event |= EVENT_TIMEOUT;
340             }
341             if (fds[i].fd >= 0 && p->fd == fds[i].fd)
342             {
343                 if (fds[i].output_mask & yaz_poll_read)
344                 {
345                     p->last_event = now;
346                     p->this_event |= EVENT_INPUT;
347                 }
348                 if (fds[i].output_mask & yaz_poll_write)
349                 {
350                     p->last_event = now;
351                     p->this_event |= EVENT_OUTPUT;
352                 }
353                 if (fds[i].output_mask & yaz_poll_except)
354                 {
355                     p->last_event = now;
356                     p->this_event |= EVENT_EXCEPT;
357                 }
358             }
359             /* only fire one Z39.50/SRU socket event.. except for timeout */
360             if (p->this_event)
361             {
362                 if (!(p->this_event & EVENT_TIMEOUT) &&
363                     !strcmp(p->name, "connection_socket"))
364                 {
365                     /* not a timeout and we have a Z39.50/SRU socket */
366                     if (connection_fired == 0)
367                         run_fun(man, p);
368                     connection_fired++;
369                 }
370                 else
371                     run_fun(man, p);
372             }
373         }
374
375         assert(inv_start == start);
376         yaz_mutex_enter(man->iochan_mutex);
377         for (nextp = iochans; *nextp; )
378         {
379             IOCHAN p = *nextp;
380             if (p->destroyed && p->thread_users == 0)
381                 *nextp = iochan_destroy_real(p);
382             else
383                 nextp = &p->next;
384         }
385         yaz_mutex_leave(man->iochan_mutex);
386     } while (*iochans);
387     return 0;
388 }
389
390 void iochan_man_events(iochan_man_t man)
391 {
392     if (man->no_threads > 0 && !man->sel_thread)
393     {
394         man->sel_thread = sel_thread_create(work_handler, 0 /*work_destroy */,
395                 &man->sel_fd, man->no_threads);
396         yaz_log(man->log_level, "iochan_man_events. Using %d threads",
397                 man->no_threads);
398     }
399     event_loop(man, &man->channel_list);
400 }
401
402 /*
403  * Local variables:
404  * c-basic-offset: 4
405  * c-file-style: "Stroustrup"
406  * indent-tabs-mode: nil
407  * End:
408  * vim: shiftwidth=4 tabstop=8 expandtab
409  */
410