Reformat
[pazpar2-moved-to-github.git] / src / eventl.c
1 /* This file is part of Pazpar2.
2  Copyright (C) Index Data
3
4  Pazpar2 is free software; you can redistribute it and/or modify it under
5  the terms of the GNU General Public License as published by the Free
6  Software Foundation; either version 2, or (at your option) any later
7  version.
8
9  Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10  WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  for more details.
13
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18  */
19
20 /*
21  * Based on  ParaZ - a simple tool for harvesting performance data for
22  * parallel operations using Z39.50.
23  * Copyright (C) Index Data ApS
24  * See LICENSE file for details.
25  */
26
27 /*
28  * Based on revision YAZ' server/eventl.c 1.29.
29  */
30
31 #if HAVE_CONFIG_H
32 #include <config.h>
33 #endif
34
35 #include <math.h>
36 #include <stdio.h>
37 #include <assert.h>
38
39 #ifdef WIN32
40 #include <winsock.h>
41 #else
42 #include <unistd.h>
43 #endif
44 #if HAVE_SYS_TIME_H
45 #include <sys/time.h>
46 #endif
47
48 #include <stdlib.h>
49 #include <errno.h>
50 #include <string.h>
51
52 #include <yaz/yconfig.h>
53 #include <yaz/log.h>
54 #include <yaz/comstack.h>
55 #include <yaz/xmalloc.h>
56 #include <yaz/mutex.h>
57 #include <yaz/poll.h>
58 #include "eventl.h"
59 #include "sel_thread.h"
60
61 #if 1
62 static YAZ_MUTEX g_mutex = 0;
63 static int no_iochans = 0;
64 static int no_iochans_total = 0;
65
66 static int iochan_use(int delta)
67 {
68     int iochans;
69     if (!g_mutex)
70         yaz_mutex_create(&g_mutex);
71     yaz_mutex_enter(g_mutex);
72     no_iochans += delta;
73     if (delta > 0)
74         no_iochans_total += delta;
75     iochans = no_iochans;
76     yaz_mutex_leave(g_mutex);
77     yaz_log(YLOG_DEBUG, "%s iochans=%d",
78             delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), iochans);
79     return iochans;
80 }
81
82 int iochans_count(void)
83 {
84     return iochan_use(0);
85 }
86
87 int iochans_count_total(void)
88 {
89     int total = 0;
90     if (!g_mutex)
91         return 0;
92     yaz_mutex_enter(g_mutex);
93     total = no_iochans_total;
94     yaz_mutex_leave(g_mutex);
95     return total;
96 }
97 #else
98 #define iochan_use(x)
99 #define iochans_count(x) 0
100 #define iochans_count_total(x) 0
101 #endif
102
103 struct iochan_man_s {
104     IOCHAN channel_list;
105     sel_thread_t sel_thread;
106     int sel_fd;
107     int no_threads;
108     int log_level;
109     YAZ_MUTEX iochan_mutex;
110     int size_fds;
111     struct yaz_poll_fd *fds;
112 };
113
114 iochan_man_t iochan_man_create(int no_threads)
115 {
116     iochan_man_t man = xmalloc(sizeof(*man));
117     man->channel_list = 0;
118     man->sel_thread = 0; /* can't create sel_thread yet because we may fork */
119     man->sel_fd = -1;
120     man->no_threads = no_threads;
121     man->log_level = yaz_log_module_level("iochan");
122     man->iochan_mutex = 0;
123     man->size_fds = 0;
124     man->fds = 0;
125     yaz_mutex_create(&man->iochan_mutex);
126     return man;
127 }
128
129 IOCHAN iochan_destroy_real(IOCHAN chan)
130 {
131     IOCHAN next = chan->next;
132     if (chan->name)
133         xfree(chan->name);
134     xfree(chan);
135     iochan_use(-1);
136     return next;
137 }
138
139 void iochan_man_destroy(iochan_man_t *mp)
140 {
141     if (*mp)
142     {
143         IOCHAN c;
144         if ((*mp)->sel_thread)
145             sel_thread_destroy((*mp)->sel_thread);
146
147         yaz_mutex_enter((*mp)->iochan_mutex);
148         c = (*mp)->channel_list;
149         (*mp)->channel_list = NULL;
150         yaz_mutex_leave((*mp)->iochan_mutex);
151         while (c)
152             c = iochan_destroy_real(c);
153         yaz_mutex_destroy(&(*mp)->iochan_mutex);
154         xfree((*mp)->fds);
155         xfree(*mp);
156         *mp = 0;
157     }
158 }
159
160 void iochan_add(iochan_man_t man, IOCHAN chan)
161 {
162     chan->man = man;
163     yaz_mutex_enter(man->iochan_mutex);
164     yaz_log(man->log_level, "iochan_add : chan=%p channel list=%p", chan,
165             man->channel_list);
166     chan->next = man->channel_list;
167     man->channel_list = chan;
168     yaz_mutex_leave(man->iochan_mutex);
169 }
170
171 IOCHAN iochan_create(int fd, IOC_CALLBACK cb, int flags, const char *name)
172 {
173     IOCHAN new_iochan;
174
175     if (!(new_iochan = (IOCHAN) xmalloc(sizeof(*new_iochan))))
176         return 0;
177     iochan_use(1);
178     new_iochan->destroyed = 0;
179     new_iochan->fd = fd;
180     new_iochan->flags = flags;
181     new_iochan->fun = cb;
182     new_iochan->last_event = new_iochan->max_idle = 0;
183     new_iochan->next = NULL;
184     new_iochan->man = 0;
185     new_iochan->thread_users = 0;
186     new_iochan->name = name ? xstrdup(name) : 0;
187     return new_iochan;
188 }
189
190 static void work_handler(void *work_data)
191 {
192     IOCHAN p = work_data;
193
194     yaz_log(p->man->log_level, "eventl: work begin chan=%p name=%s event=%d",
195             p, p->name ? p->name : "", p->this_event);
196
197     if (!p->destroyed && (p->this_event & EVENT_TIMEOUT))
198         (*p->fun)(p, EVENT_TIMEOUT);
199     if (!p->destroyed && (p->this_event & EVENT_INPUT))
200         (*p->fun)(p, EVENT_INPUT);
201     if (!p->destroyed && (p->this_event & EVENT_OUTPUT))
202         (*p->fun)(p, EVENT_OUTPUT);
203     if (!p->destroyed && (p->this_event & EVENT_EXCEPT))
204         (*p->fun)(p, EVENT_EXCEPT);
205
206     yaz_log(p->man->log_level, "eventl: work end chan=%p name=%s event=%d", p,
207             p->name ? p->name : "", p->this_event);
208 }
209
210 static void run_fun(iochan_man_t man, IOCHAN p)
211 {
212     if (man->sel_thread)
213     {
214         yaz_log(man->log_level,
215                 "eventl: work add chan=%p name=%s event=%d", p,
216                 p->name ? p->name : "", p->this_event);
217         p->thread_users++;
218         sel_thread_add(man->sel_thread, p);
219     }
220     else
221         work_handler(p);
222 }
223
224 static int event_loop(iochan_man_t man, IOCHAN *iochans)
225 {
226     do /* loop as long as there are active associations to process */
227     {
228         IOCHAN p, *nextp;
229         IOCHAN start;
230         IOCHAN inv_start;
231         int res;
232         static struct timeval to;
233         struct yaz_poll_fd *fds;
234         int i, no_fds = 0;
235         int connection_fired = 0;
236         to.tv_sec = 300;
237         to.tv_usec = 0;
238
239         yaz_mutex_enter(man->iochan_mutex);
240         start = man->channel_list;
241         yaz_mutex_leave(man->iochan_mutex);
242         inv_start = start;
243         for (p = start; p; p = p->next)
244             no_fds++;
245         if (man->sel_fd != -1)
246             no_fds++;
247         if (no_fds > man->size_fds)
248         {
249             man->size_fds = no_fds * 2;
250             man->fds = xrealloc(man->fds, man->size_fds * sizeof(*man->fds));
251         }
252         fds = man->fds;
253         i = 0;
254         if (man->sel_fd != -1)
255         {
256             fds[i].fd = man->sel_fd;
257             fds[i].input_mask = yaz_poll_read;
258             fds[i].client_data = 0;
259             i++;
260         }
261         for (p = start; p; p = p->next, i++)
262         {
263             fds[i].client_data = p;
264             fds[i].fd = p->fd;
265             fds[i].input_mask = 0;
266             if (p->thread_users > 0)
267                 continue;
268             if (p->max_idle && p->max_idle < to.tv_sec)
269                 to.tv_sec = p->max_idle;
270             if (p->fd < 0)
271                 continue;
272             if (p->flags & EVENT_INPUT)
273                 fds[i].input_mask |= yaz_poll_read;
274             if (p->flags & EVENT_OUTPUT)
275                 fds[i].input_mask |= yaz_poll_write;
276             if (p->flags & EVENT_EXCEPT)
277                 fds[i].input_mask |= yaz_poll_except;
278         }
279         yaz_log(man->log_level, "yaz_poll begin nofds=%d", no_fds);
280         res = yaz_poll(fds, no_fds, to.tv_sec, 0);
281         yaz_log(man->log_level, "yaz_poll returned res=%d", res);
282         if (res < 0)
283         {
284             if (errno == EINTR)
285                 continue;
286             else
287             {
288                 yaz_log(YLOG_ERRNO | YLOG_WARN, "poll");
289                 return 0;
290             }
291         }
292         i = 0;
293         if (man->sel_fd != -1)
294         {
295             assert(fds[i].fd == man->sel_fd);
296             if (fds[i].output_mask)
297             {
298                 IOCHAN chan;
299
300                 yaz_log(man->log_level, "eventl: sel input on sel_fd=%d",
301                         man->sel_fd);
302                 while ((chan = sel_thread_result(man->sel_thread)))
303                 {
304                     yaz_log(man->log_level,
305                             "eventl: got thread result chan=%p name=%s", chan,
306                             chan->name ? chan->name : "");
307                     chan->thread_users--;
308                 }
309             }
310             i++;
311         }
312         if (man->log_level)
313         {
314             int no = 0;
315             for (p = start; p; p = p->next)
316                 no++;
317             yaz_log(man->log_level, "%d channels", no);
318         }
319         for (; i < no_fds; i++)
320         {
321             time_t now = time(0);
322             p = fds[i].client_data;
323
324             if (p->destroyed)
325             {
326                 yaz_log(man->log_level,
327                         "eventl: skip destroyed chan=%p name=%s", p,
328                         p->name ? p->name : "");
329                 continue;
330             }
331             if (p->thread_users > 0)
332             {
333                 yaz_log(man->log_level,
334                         "eventl: skip chan=%p name=%s users=%d", p,
335                         p->name ? p->name : "", p->thread_users);
336                 continue;
337             }
338             p->this_event = 0;
339             if (p->max_idle && now - p->last_event > p->max_idle)
340             {
341                 p->last_event = now;
342                 p->this_event |= EVENT_TIMEOUT;
343             }
344             if (fds[i].fd >= 0)
345             {
346                 if (fds[i].output_mask & yaz_poll_read)
347                 {
348                     p->last_event = now;
349                     p->this_event |= EVENT_INPUT;
350                 }
351                 if (fds[i].output_mask & yaz_poll_write)
352                 {
353                     p->last_event = now;
354                     p->this_event |= EVENT_OUTPUT;
355                 }
356                 if (fds[i].output_mask & yaz_poll_except)
357                 {
358                     p->last_event = now;
359                     p->this_event |= EVENT_EXCEPT;
360                 }
361             }
362             /* only fire one Z39.50/SRU socket event.. except for timeout */
363             if (p->this_event)
364             {
365                 if (!(p->this_event & EVENT_TIMEOUT) &&
366                     !strcmp(p->name, "connection_socket"))
367                 {
368                     /* not a timeout and we have a Z39.50/SRU socket */
369                     if (connection_fired == 0)
370                         run_fun(man, p);
371                     connection_fired++;
372                 }
373                 else
374                     run_fun(man, p);
375             }
376         }
377
378         assert(inv_start == start);
379         yaz_mutex_enter(man->iochan_mutex);
380         for (nextp = iochans; *nextp; )
381         {
382             IOCHAN p = *nextp;
383             if (p->destroyed && p->thread_users == 0)
384                 *nextp = iochan_destroy_real(p);
385             else
386                 nextp = &p->next;
387         }
388         yaz_mutex_leave(man->iochan_mutex);
389     } while (*iochans);
390     return 0;
391 }
392
393 void iochan_man_events(iochan_man_t man)
394 {
395     if (man->no_threads > 0 && !man->sel_thread)
396     {
397         man->sel_thread = sel_thread_create(work_handler, 0 /*work_destroy */,
398                 &man->sel_fd, man->no_threads);
399         yaz_log(man->log_level, "iochan_man_events. Using %d threads",
400                 man->no_threads);
401     }
402     event_loop(man, &man->channel_list);
403 }
404
405 void pazpar2_sleep(double d)
406 {
407 #ifdef WIN32
408     Sleep( (DWORD) (d * 1000));
409 #else
410     struct timeval tv;
411     tv.tv_sec = floor(d);
412     tv.tv_usec = (d - floor(d)) * 1000000;
413     select(0, 0, 0, 0, &tv);
414 #endif
415 }
416
417 /*
418  * Local variables:
419  * c-basic-offset: 4
420  * c-file-style: "Stroustrup"
421  * indent-tabs-mode: nil
422  * End:
423  * vim: shiftwidth=4 tabstop=8 expandtab
424  */
425