2e39b3ab122b6732ab36dd0a24f0402750ed131f
[pazpar2-moved-to-github.git] / src / eventl.c
1 /* This file is part of Pazpar2.
2  Copyright (C) Index Data
3
4  Pazpar2 is free software; you can redistribute it and/or modify it under
5  the terms of the GNU General Public License as published by the Free
6  Software Foundation; either version 2, or (at your option) any later
7  version.
8
9  Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10  WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  for more details.
13
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18  */
19
20 /*
21  * Based on  ParaZ - a simple tool for harvesting performance data for
22  * parallel operations using Z39.50.
23  * Copyright (C) Index Data ApS
24  * See LICENSE file for details.
25  */
26
27 /*
28  * Based on revision YAZ' server/eventl.c 1.29.
29  */
30
31 #if HAVE_CONFIG_H
32 #include <config.h>
33 #endif
34
35 #include <math.h>
36 #include <stdio.h>
37 #include <assert.h>
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #include <stdlib.h>
44 #include <errno.h>
45 #include <string.h>
46
47 #include <yaz/yconfig.h>
48 #include <yaz/log.h>
49 #include <yaz/comstack.h>
50 #include <yaz/xmalloc.h>
51 #include <yaz/mutex.h>
52 #include <yaz/poll.h>
53 #include "eventl.h"
54 #include "sel_thread.h"
55
56 #if 1
57 static YAZ_MUTEX g_mutex = 0;
58 static int no_iochans = 0;
59 static int no_iochans_total = 0;
60
61 static int iochan_use(int delta)
62 {
63     int iochans;
64     if (!g_mutex)
65         yaz_mutex_create(&g_mutex);
66     yaz_mutex_enter(g_mutex);
67     no_iochans += delta;
68     if (delta > 0)
69         no_iochans_total += delta;
70     iochans = no_iochans;
71     yaz_mutex_leave(g_mutex);
72     yaz_log(YLOG_DEBUG, "%s iochans=%d",
73             delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), iochans);
74     return iochans;
75 }
76
77 int iochans_count(void)
78 {
79     return iochan_use(0);
80 }
81
82 int iochans_count_total(void)
83 {
84     int total = 0;
85     if (!g_mutex)
86         return 0;
87     yaz_mutex_enter(g_mutex);
88     total = no_iochans_total;
89     yaz_mutex_leave(g_mutex);
90     return total;
91 }
92 #else
93 #define iochan_use(x)
94 #define iochans_count(x) 0
95 #define iochans_count_total(x) 0
96 #endif
97
98 struct iochan_man_s {
99     IOCHAN channel_list;
100     sel_thread_t sel_thread;
101     int sel_fd;
102     int no_threads;
103     int log_level;
104     YAZ_MUTEX iochan_mutex;
105     int size_fds;
106     struct yaz_poll_fd *fds;
107 };
108
109 iochan_man_t iochan_man_create(int no_threads)
110 {
111     iochan_man_t man = xmalloc(sizeof(*man));
112     man->channel_list = 0;
113     man->sel_thread = 0; /* can't create sel_thread yet because we may fork */
114     man->sel_fd = -1;
115     man->no_threads = no_threads;
116     man->log_level = yaz_log_module_level("iochan");
117     man->iochan_mutex = 0;
118     man->size_fds = 0;
119     man->fds = 0;
120     yaz_mutex_create(&man->iochan_mutex);
121     return man;
122 }
123
124 IOCHAN iochan_destroy_real(IOCHAN chan)
125 {
126     IOCHAN next = chan->next;
127     if (chan->name)
128         xfree(chan->name);
129     xfree(chan);
130     iochan_use(-1);
131     return next;
132 }
133
134 void iochan_man_destroy(iochan_man_t *mp)
135 {
136     if (*mp)
137     {
138         IOCHAN c;
139         if ((*mp)->sel_thread)
140             sel_thread_destroy((*mp)->sel_thread);
141
142         yaz_mutex_enter((*mp)->iochan_mutex);
143         c = (*mp)->channel_list;
144         (*mp)->channel_list = NULL;
145         yaz_mutex_leave((*mp)->iochan_mutex);
146         while (c)
147             c = iochan_destroy_real(c);
148         yaz_mutex_destroy(&(*mp)->iochan_mutex);
149         xfree((*mp)->fds);
150         xfree(*mp);
151         *mp = 0;
152     }
153 }
154
155 void iochan_add(iochan_man_t man, IOCHAN chan)
156 {
157     chan->man = man;
158     yaz_mutex_enter(man->iochan_mutex);
159     yaz_log(man->log_level, "iochan_add : chan=%p channel list=%p", chan,
160             man->channel_list);
161     chan->next = man->channel_list;
162     man->channel_list = chan;
163     yaz_mutex_leave(man->iochan_mutex);
164 }
165
166 IOCHAN iochan_create(int fd, IOC_CALLBACK cb, int flags, const char *name)
167 {
168     IOCHAN new_iochan;
169
170     if (!(new_iochan = (IOCHAN) xmalloc(sizeof(*new_iochan))))
171         return 0;
172     iochan_use(1);
173     new_iochan->destroyed = 0;
174     new_iochan->fd = fd;
175     new_iochan->flags = flags;
176     new_iochan->fun = cb;
177     new_iochan->last_event = new_iochan->max_idle = 0;
178     new_iochan->next = NULL;
179     new_iochan->man = 0;
180     new_iochan->thread_users = 0;
181     new_iochan->name = name ? xstrdup(name) : 0;
182     return new_iochan;
183 }
184
185 static void work_handler(void *work_data)
186 {
187     IOCHAN p = work_data;
188
189     yaz_log(p->man->log_level, "eventl: work begin chan=%p name=%s event=%d",
190             p, p->name ? p->name : "", p->this_event);
191
192     if (!p->destroyed && (p->this_event & EVENT_TIMEOUT))
193         (*p->fun)(p, EVENT_TIMEOUT);
194     if (!p->destroyed && (p->this_event & EVENT_INPUT))
195         (*p->fun)(p, EVENT_INPUT);
196     if (!p->destroyed && (p->this_event & EVENT_OUTPUT))
197         (*p->fun)(p, EVENT_OUTPUT);
198     if (!p->destroyed && (p->this_event & EVENT_EXCEPT))
199         (*p->fun)(p, EVENT_EXCEPT);
200
201     yaz_log(p->man->log_level, "eventl: work end chan=%p name=%s event=%d", p,
202             p->name ? p->name : "", p->this_event);
203 }
204
205 static void run_fun(iochan_man_t man, IOCHAN p)
206 {
207     if (man->sel_thread)
208     {
209         yaz_log(man->log_level,
210                 "eventl: work add chan=%p name=%s event=%d", p,
211                 p->name ? p->name : "", p->this_event);
212         p->thread_users++;
213         sel_thread_add(man->sel_thread, p);
214     }
215     else
216         work_handler(p);
217 }
218
219 static int event_loop(iochan_man_t man, IOCHAN *iochans)
220 {
221     do /* loop as long as there are active associations to process */
222     {
223         IOCHAN p, *nextp;
224         IOCHAN start;
225         IOCHAN inv_start;
226         int res;
227         struct yaz_poll_fd *fds;
228         int i, no_fds = 0;
229         int connection_fired = 0;
230         int tv_sec = 300;
231
232         yaz_mutex_enter(man->iochan_mutex);
233         start = man->channel_list;
234         yaz_mutex_leave(man->iochan_mutex);
235         inv_start = start;
236         for (p = start; p; p = p->next)
237             no_fds++;
238         if (man->sel_fd != -1)
239             no_fds++;
240         if (no_fds > man->size_fds)
241         {
242             man->size_fds = no_fds * 2;
243             man->fds = xrealloc(man->fds, man->size_fds * sizeof(*man->fds));
244         }
245         fds = man->fds;
246         i = 0;
247         if (man->sel_fd != -1)
248         {
249             fds[i].fd = man->sel_fd;
250             fds[i].input_mask = yaz_poll_read;
251             fds[i].client_data = 0;
252             i++;
253         }
254         for (p = start; p; p = p->next, i++)
255         {
256             p->poll_offset = i;
257             fds[i].client_data = p;
258             fds[i].fd = -1;
259             fds[i].input_mask = 0;
260             if (p->thread_users > 0)
261                 continue;
262             if (p->max_idle && p->max_idle < tv_sec)
263                 tv_sec = p->max_idle;
264             if (p->fd < 0)
265                 continue;
266             if (p->flags & EVENT_INPUT)
267                 fds[i].input_mask |= yaz_poll_read;
268             if (p->flags & EVENT_OUTPUT)
269                 fds[i].input_mask |= yaz_poll_write;
270             if (p->flags & EVENT_EXCEPT)
271                 fds[i].input_mask |= yaz_poll_except;
272             if (fds[i].input_mask)
273                 fds[i].fd = p->fd;
274         }
275         assert(i == no_fds);
276         yaz_log(man->log_level, "yaz_poll begin nofds=%d", no_fds);
277         res = yaz_poll(fds, no_fds, tv_sec, 0);
278         yaz_log(man->log_level, "yaz_poll returned res=%d", res);
279         if (res < 0)
280         {
281             if (errno == EINTR)
282                 continue;
283             else
284             {
285                 yaz_log(YLOG_ERRNO | YLOG_WARN, "poll");
286                 abort();
287             }
288         }
289         if (man->sel_fd != -1)
290         {
291             i = 0;
292             assert(fds[i].fd == man->sel_fd);
293             if (fds[i].output_mask)
294             {
295                 IOCHAN chan;
296
297                 yaz_log(man->log_level, "eventl: sel input on sel_fd=%d",
298                         man->sel_fd);
299                 while ((chan = sel_thread_result(man->sel_thread)))
300                 {
301                     yaz_log(man->log_level,
302                             "eventl: got thread result chan=%p name=%s", chan,
303                             chan->name ? chan->name : "");
304                     chan->thread_users--;
305                 }
306             }
307         }
308         if (man->log_level)
309         {
310             int no = 0;
311             for (p = start; p; p = p->next)
312                 no++;
313             yaz_log(man->log_level, "%d channels", no);
314         }
315         for (p = start; p; p = p->next)
316         {
317             time_t now = time(0);
318             i = p->poll_offset;
319
320             if (p->destroyed)
321             {
322                 yaz_log(man->log_level,
323                         "eventl: skip destroyed chan=%p name=%s", p,
324                         p->name ? p->name : "");
325                 continue;
326             }
327             if (p->thread_users > 0)
328             {
329                 yaz_log(man->log_level,
330                         "eventl: skip chan=%p name=%s users=%d", p,
331                         p->name ? p->name : "", p->thread_users);
332                 continue;
333             }
334             p->this_event = 0;
335             if (p->max_idle && now - p->last_event > p->max_idle)
336             {
337                 p->last_event = now;
338                 p->this_event |= EVENT_TIMEOUT;
339             }
340             if (fds[i].fd >= 0 && p->fd == fds[i].fd)
341             {
342                 if (fds[i].output_mask & yaz_poll_read)
343                 {
344                     p->last_event = now;
345                     p->this_event |= EVENT_INPUT;
346                 }
347                 if (fds[i].output_mask & yaz_poll_write)
348                 {
349                     p->last_event = now;
350                     p->this_event |= EVENT_OUTPUT;
351                 }
352                 if (fds[i].output_mask & yaz_poll_except)
353                 {
354                     p->last_event = now;
355                     p->this_event |= EVENT_EXCEPT;
356                 }
357             }
358             /* only fire one Z39.50/SRU socket event.. except for timeout */
359             if (p->this_event)
360             {
361                 if (!(p->this_event & EVENT_TIMEOUT) &&
362                     !strcmp(p->name, "connection_socket"))
363                 {
364                     /* not a timeout and we have a Z39.50/SRU socket */
365                     if (connection_fired == 0)
366                         run_fun(man, p);
367                     connection_fired++;
368                 }
369                 else
370                     run_fun(man, p);
371             }
372         }
373
374         assert(inv_start == start);
375         yaz_mutex_enter(man->iochan_mutex);
376         for (nextp = iochans; *nextp; )
377         {
378             IOCHAN p = *nextp;
379             if (p->destroyed && p->thread_users == 0)
380                 *nextp = iochan_destroy_real(p);
381             else
382                 nextp = &p->next;
383         }
384         yaz_mutex_leave(man->iochan_mutex);
385     } while (*iochans);
386     return 0;
387 }
388
389 void iochan_man_events(iochan_man_t man)
390 {
391     if (man->no_threads > 0 && !man->sel_thread)
392     {
393         man->sel_thread = sel_thread_create(work_handler, 0 /*work_destroy */,
394                 &man->sel_fd, man->no_threads);
395         yaz_log(man->log_level, "iochan_man_events. Using %d threads",
396                 man->no_threads);
397     }
398     event_loop(man, &man->channel_list);
399 }
400
401 /*
402  * Local variables:
403  * c-basic-offset: 4
404  * c-file-style: "Stroustrup"
405  * indent-tabs-mode: nil
406  * End:
407  * vim: shiftwidth=4 tabstop=8 expandtab
408  */
409