Only inspect for proper FD
[pazpar2-moved-to-github.git] / src / eventl.c
1 /* This file is part of Pazpar2.
2  Copyright (C) Index Data
3
4  Pazpar2 is free software; you can redistribute it and/or modify it under
5  the terms of the GNU General Public License as published by the Free
6  Software Foundation; either version 2, or (at your option) any later
7  version.
8
9  Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10  WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  for more details.
13
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18  */
19
20 /*
21  * Based on  ParaZ - a simple tool for harvesting performance data for
22  * parallel operations using Z39.50.
23  * Copyright (C) Index Data ApS
24  * See LICENSE file for details.
25  */
26
27 /*
28  * Based on revision YAZ' server/eventl.c 1.29.
29  */
30
31 #if HAVE_CONFIG_H
32 #include <config.h>
33 #endif
34
35 #include <math.h>
36 #include <stdio.h>
37 #include <assert.h>
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #include <stdlib.h>
44 #include <errno.h>
45 #include <string.h>
46
47 #include <yaz/yconfig.h>
48 #include <yaz/log.h>
49 #include <yaz/comstack.h>
50 #include <yaz/xmalloc.h>
51 #include <yaz/mutex.h>
52 #include <yaz/poll.h>
53 #include "eventl.h"
54 #include "sel_thread.h"
55
56 #if 1
57 static YAZ_MUTEX g_mutex = 0;
58 static int no_iochans = 0;
59 static int no_iochans_total = 0;
60
61 static int iochan_use(int delta)
62 {
63     int iochans;
64     if (!g_mutex)
65         yaz_mutex_create(&g_mutex);
66     yaz_mutex_enter(g_mutex);
67     no_iochans += delta;
68     if (delta > 0)
69         no_iochans_total += delta;
70     iochans = no_iochans;
71     yaz_mutex_leave(g_mutex);
72     yaz_log(YLOG_DEBUG, "%s iochans=%d",
73             delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), iochans);
74     return iochans;
75 }
76
77 int iochans_count(void)
78 {
79     return iochan_use(0);
80 }
81
82 int iochans_count_total(void)
83 {
84     int total = 0;
85     if (!g_mutex)
86         return 0;
87     yaz_mutex_enter(g_mutex);
88     total = no_iochans_total;
89     yaz_mutex_leave(g_mutex);
90     return total;
91 }
92 #else
93 #define iochan_use(x)
94 #define iochans_count(x) 0
95 #define iochans_count_total(x) 0
96 #endif
97
98 struct iochan_man_s {
99     IOCHAN channel_list;
100     sel_thread_t sel_thread;
101     int sel_fd;
102     int no_threads;
103     int log_level;
104     YAZ_MUTEX iochan_mutex;
105     int size_fds;
106     struct yaz_poll_fd *fds;
107 };
108
109 iochan_man_t iochan_man_create(int no_threads)
110 {
111     iochan_man_t man = xmalloc(sizeof(*man));
112     man->channel_list = 0;
113     man->sel_thread = 0; /* can't create sel_thread yet because we may fork */
114     man->sel_fd = -1;
115     man->no_threads = no_threads;
116     man->log_level = yaz_log_module_level("iochan");
117     man->iochan_mutex = 0;
118     man->size_fds = 0;
119     man->fds = 0;
120     yaz_mutex_create(&man->iochan_mutex);
121     return man;
122 }
123
124 IOCHAN iochan_destroy_real(IOCHAN chan)
125 {
126     IOCHAN next = chan->next;
127     if (chan->name)
128         xfree(chan->name);
129     xfree(chan);
130     iochan_use(-1);
131     return next;
132 }
133
134 void iochan_man_destroy(iochan_man_t *mp)
135 {
136     if (*mp)
137     {
138         IOCHAN c;
139         if ((*mp)->sel_thread)
140             sel_thread_destroy((*mp)->sel_thread);
141
142         yaz_mutex_enter((*mp)->iochan_mutex);
143         c = (*mp)->channel_list;
144         (*mp)->channel_list = NULL;
145         yaz_mutex_leave((*mp)->iochan_mutex);
146         while (c)
147             c = iochan_destroy_real(c);
148         yaz_mutex_destroy(&(*mp)->iochan_mutex);
149         xfree((*mp)->fds);
150         xfree(*mp);
151         *mp = 0;
152     }
153 }
154
155 void iochan_add(iochan_man_t man, IOCHAN chan)
156 {
157     chan->man = man;
158     yaz_mutex_enter(man->iochan_mutex);
159     yaz_log(man->log_level, "iochan_add : chan=%p channel list=%p", chan,
160             man->channel_list);
161     chan->next = man->channel_list;
162     man->channel_list = chan;
163     yaz_mutex_leave(man->iochan_mutex);
164 }
165
166 IOCHAN iochan_create(int fd, IOC_CALLBACK cb, int flags, const char *name)
167 {
168     IOCHAN new_iochan;
169
170     if (!(new_iochan = (IOCHAN) xmalloc(sizeof(*new_iochan))))
171         return 0;
172     iochan_use(1);
173     new_iochan->destroyed = 0;
174     new_iochan->fd = fd;
175     new_iochan->flags = flags;
176     new_iochan->fun = cb;
177     new_iochan->last_event = new_iochan->max_idle = 0;
178     new_iochan->next = NULL;
179     new_iochan->man = 0;
180     new_iochan->thread_users = 0;
181     new_iochan->name = name ? xstrdup(name) : 0;
182     return new_iochan;
183 }
184
185 static void work_handler(void *work_data)
186 {
187     IOCHAN p = work_data;
188
189     yaz_log(p->man->log_level, "eventl: work begin chan=%p name=%s event=%d",
190             p, p->name ? p->name : "", p->this_event);
191
192     if (!p->destroyed && (p->this_event & EVENT_TIMEOUT))
193         (*p->fun)(p, EVENT_TIMEOUT);
194     if (!p->destroyed && (p->this_event & EVENT_INPUT))
195         (*p->fun)(p, EVENT_INPUT);
196     if (!p->destroyed && (p->this_event & EVENT_OUTPUT))
197         (*p->fun)(p, EVENT_OUTPUT);
198     if (!p->destroyed && (p->this_event & EVENT_EXCEPT))
199         (*p->fun)(p, EVENT_EXCEPT);
200
201     yaz_log(p->man->log_level, "eventl: work end chan=%p name=%s event=%d", p,
202             p->name ? p->name : "", p->this_event);
203 }
204
205 static void run_fun(iochan_man_t man, IOCHAN p)
206 {
207     if (man->sel_thread)
208     {
209         yaz_log(man->log_level,
210                 "eventl: work add chan=%p name=%s event=%d", p,
211                 p->name ? p->name : "", p->this_event);
212         p->thread_users++;
213         sel_thread_add(man->sel_thread, p);
214     }
215     else
216         work_handler(p);
217 }
218
219 static int event_loop(iochan_man_t man, IOCHAN *iochans)
220 {
221     do /* loop as long as there are active associations to process */
222     {
223         IOCHAN p, *nextp;
224         IOCHAN start;
225         IOCHAN inv_start;
226         int res;
227         struct yaz_poll_fd *fds;
228         int i, no_fds = 0;
229         int connection_fired = 0;
230         int tv_sec = 300;
231
232         yaz_mutex_enter(man->iochan_mutex);
233         start = man->channel_list;
234         yaz_mutex_leave(man->iochan_mutex);
235         inv_start = start;
236         for (p = start; p; p = p->next)
237             no_fds++;
238         if (man->sel_fd != -1)
239             no_fds++;
240         if (no_fds > man->size_fds)
241         {
242             man->size_fds = no_fds * 2;
243             man->fds = xrealloc(man->fds, man->size_fds * sizeof(*man->fds));
244         }
245         fds = man->fds;
246         i = 0;
247         if (man->sel_fd != -1)
248         {
249             fds[i].fd = man->sel_fd;
250             fds[i].input_mask = yaz_poll_read;
251             fds[i].client_data = 0;
252             i++;
253         }
254         for (p = start; p; p = p->next, i++)
255         {
256             p->poll_offset = i;
257             fds[i].client_data = p;
258             fds[i].fd = p->fd;
259             fds[i].input_mask = 0;
260             if (p->thread_users > 0)
261                 continue;
262             if (p->max_idle && p->max_idle < tv_sec)
263                 tv_sec = p->max_idle;
264             if (p->fd < 0)
265                 continue;
266             if (p->flags & EVENT_INPUT)
267                 fds[i].input_mask |= yaz_poll_read;
268             if (p->flags & EVENT_OUTPUT)
269                 fds[i].input_mask |= yaz_poll_write;
270             if (p->flags & EVENT_EXCEPT)
271                 fds[i].input_mask |= yaz_poll_except;
272         }
273         yaz_log(man->log_level, "yaz_poll begin nofds=%d", no_fds);
274         res = yaz_poll(fds, no_fds, tv_sec, 0);
275         yaz_log(man->log_level, "yaz_poll returned res=%d", res);
276         if (res < 0)
277         {
278             if (errno == EINTR)
279                 continue;
280             else
281             {
282                 yaz_log(YLOG_ERRNO | YLOG_WARN, "poll");
283                 return 0;
284             }
285         }
286         if (man->sel_fd != -1)
287         {
288             i = 0;
289             assert(fds[i].fd == man->sel_fd);
290             if (fds[i].output_mask)
291             {
292                 IOCHAN chan;
293
294                 yaz_log(man->log_level, "eventl: sel input on sel_fd=%d",
295                         man->sel_fd);
296                 while ((chan = sel_thread_result(man->sel_thread)))
297                 {
298                     yaz_log(man->log_level,
299                             "eventl: got thread result chan=%p name=%s", chan,
300                             chan->name ? chan->name : "");
301                     chan->thread_users--;
302                 }
303             }
304         }
305         if (man->log_level)
306         {
307             int no = 0;
308             for (p = start; p; p = p->next)
309                 no++;
310             yaz_log(man->log_level, "%d channels", no);
311         }
312         for (p = start; p; p = p->next)
313         {
314             time_t now = time(0);
315             i = p->poll_offset;
316
317             if (p->destroyed)
318             {
319                 yaz_log(man->log_level,
320                         "eventl: skip destroyed chan=%p name=%s", p,
321                         p->name ? p->name : "");
322                 continue;
323             }
324             if (p->thread_users > 0)
325             {
326                 yaz_log(man->log_level,
327                         "eventl: skip chan=%p name=%s users=%d", p,
328                         p->name ? p->name : "", p->thread_users);
329                 continue;
330             }
331             p->this_event = 0;
332             if (p->max_idle && now - p->last_event > p->max_idle)
333             {
334                 p->last_event = now;
335                 p->this_event |= EVENT_TIMEOUT;
336             }
337             if (fds[i].fd >= 0 && p->fd == fds[i].fd)
338             {
339                 if (fds[i].output_mask & yaz_poll_read)
340                 {
341                     p->last_event = now;
342                     p->this_event |= EVENT_INPUT;
343                 }
344                 if (fds[i].output_mask & yaz_poll_write)
345                 {
346                     p->last_event = now;
347                     p->this_event |= EVENT_OUTPUT;
348                 }
349                 if (fds[i].output_mask & yaz_poll_except)
350                 {
351                     p->last_event = now;
352                     p->this_event |= EVENT_EXCEPT;
353                 }
354             }
355             /* only fire one Z39.50/SRU socket event.. except for timeout */
356             if (p->this_event)
357             {
358                 if (!(p->this_event & EVENT_TIMEOUT) &&
359                     !strcmp(p->name, "connection_socket"))
360                 {
361                     /* not a timeout and we have a Z39.50/SRU socket */
362                     if (connection_fired == 0)
363                         run_fun(man, p);
364                     connection_fired++;
365                 }
366                 else
367                     run_fun(man, p);
368             }
369         }
370
371         assert(inv_start == start);
372         yaz_mutex_enter(man->iochan_mutex);
373         for (nextp = iochans; *nextp; )
374         {
375             IOCHAN p = *nextp;
376             if (p->destroyed && p->thread_users == 0)
377                 *nextp = iochan_destroy_real(p);
378             else
379                 nextp = &p->next;
380         }
381         yaz_mutex_leave(man->iochan_mutex);
382     } while (*iochans);
383     return 0;
384 }
385
386 void iochan_man_events(iochan_man_t man)
387 {
388     if (man->no_threads > 0 && !man->sel_thread)
389     {
390         man->sel_thread = sel_thread_create(work_handler, 0 /*work_destroy */,
391                 &man->sel_fd, man->no_threads);
392         yaz_log(man->log_level, "iochan_man_events. Using %d threads",
393                 man->no_threads);
394     }
395     event_loop(man, &man->channel_list);
396 }
397
398 /*
399  * Local variables:
400  * c-basic-offset: 4
401  * c-file-style: "Stroustrup"
402  * indent-tabs-mode: nil
403  * End:
404  * vim: shiftwidth=4 tabstop=8 expandtab
405  */
406