Remove unsed functuion pazpar2_sleep
[pazpar2-moved-to-github.git] / src / eventl.c
1 /* This file is part of Pazpar2.
2  Copyright (C) Index Data
3
4  Pazpar2 is free software; you can redistribute it and/or modify it under
5  the terms of the GNU General Public License as published by the Free
6  Software Foundation; either version 2, or (at your option) any later
7  version.
8
9  Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10  WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  for more details.
13
14  You should have received a copy of the GNU General Public License
15  along with this program; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18  */
19
20 /*
21  * Based on  ParaZ - a simple tool for harvesting performance data for
22  * parallel operations using Z39.50.
23  * Copyright (C) Index Data ApS
24  * See LICENSE file for details.
25  */
26
27 /*
28  * Based on revision YAZ' server/eventl.c 1.29.
29  */
30
31 #if HAVE_CONFIG_H
32 #include <config.h>
33 #endif
34
35 #include <math.h>
36 #include <stdio.h>
37 #include <assert.h>
38
39 #include <unistd.h>
40
41 #include <stdlib.h>
42 #include <errno.h>
43 #include <string.h>
44
45 #include <yaz/yconfig.h>
46 #include <yaz/log.h>
47 #include <yaz/comstack.h>
48 #include <yaz/xmalloc.h>
49 #include <yaz/mutex.h>
50 #include <yaz/poll.h>
51 #include "eventl.h"
52 #include "sel_thread.h"
53
54 #if 1
55 static YAZ_MUTEX g_mutex = 0;
56 static int no_iochans = 0;
57 static int no_iochans_total = 0;
58
59 static int iochan_use(int delta)
60 {
61     int iochans;
62     if (!g_mutex)
63         yaz_mutex_create(&g_mutex);
64     yaz_mutex_enter(g_mutex);
65     no_iochans += delta;
66     if (delta > 0)
67         no_iochans_total += delta;
68     iochans = no_iochans;
69     yaz_mutex_leave(g_mutex);
70     yaz_log(YLOG_DEBUG, "%s iochans=%d",
71             delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), iochans);
72     return iochans;
73 }
74
75 int iochans_count(void)
76 {
77     return iochan_use(0);
78 }
79
80 int iochans_count_total(void)
81 {
82     int total = 0;
83     if (!g_mutex)
84         return 0;
85     yaz_mutex_enter(g_mutex);
86     total = no_iochans_total;
87     yaz_mutex_leave(g_mutex);
88     return total;
89 }
90 #else
91 #define iochan_use(x)
92 #define iochans_count(x) 0
93 #define iochans_count_total(x) 0
94 #endif
95
96 struct iochan_man_s {
97     IOCHAN channel_list;
98     sel_thread_t sel_thread;
99     int sel_fd;
100     int no_threads;
101     int log_level;
102     YAZ_MUTEX iochan_mutex;
103     int size_fds;
104     struct yaz_poll_fd *fds;
105 };
106
107 iochan_man_t iochan_man_create(int no_threads)
108 {
109     iochan_man_t man = xmalloc(sizeof(*man));
110     man->channel_list = 0;
111     man->sel_thread = 0; /* can't create sel_thread yet because we may fork */
112     man->sel_fd = -1;
113     man->no_threads = no_threads;
114     man->log_level = yaz_log_module_level("iochan");
115     man->iochan_mutex = 0;
116     man->size_fds = 0;
117     man->fds = 0;
118     yaz_mutex_create(&man->iochan_mutex);
119     return man;
120 }
121
122 IOCHAN iochan_destroy_real(IOCHAN chan)
123 {
124     IOCHAN next = chan->next;
125     if (chan->name)
126         xfree(chan->name);
127     xfree(chan);
128     iochan_use(-1);
129     return next;
130 }
131
132 void iochan_man_destroy(iochan_man_t *mp)
133 {
134     if (*mp)
135     {
136         IOCHAN c;
137         if ((*mp)->sel_thread)
138             sel_thread_destroy((*mp)->sel_thread);
139
140         yaz_mutex_enter((*mp)->iochan_mutex);
141         c = (*mp)->channel_list;
142         (*mp)->channel_list = NULL;
143         yaz_mutex_leave((*mp)->iochan_mutex);
144         while (c)
145             c = iochan_destroy_real(c);
146         yaz_mutex_destroy(&(*mp)->iochan_mutex);
147         xfree((*mp)->fds);
148         xfree(*mp);
149         *mp = 0;
150     }
151 }
152
153 void iochan_add(iochan_man_t man, IOCHAN chan)
154 {
155     chan->man = man;
156     yaz_mutex_enter(man->iochan_mutex);
157     yaz_log(man->log_level, "iochan_add : chan=%p channel list=%p", chan,
158             man->channel_list);
159     chan->next = man->channel_list;
160     man->channel_list = chan;
161     yaz_mutex_leave(man->iochan_mutex);
162 }
163
164 IOCHAN iochan_create(int fd, IOC_CALLBACK cb, int flags, const char *name)
165 {
166     IOCHAN new_iochan;
167
168     if (!(new_iochan = (IOCHAN) xmalloc(sizeof(*new_iochan))))
169         return 0;
170     iochan_use(1);
171     new_iochan->destroyed = 0;
172     new_iochan->fd = fd;
173     new_iochan->flags = flags;
174     new_iochan->fun = cb;
175     new_iochan->last_event = new_iochan->max_idle = 0;
176     new_iochan->next = NULL;
177     new_iochan->man = 0;
178     new_iochan->thread_users = 0;
179     new_iochan->name = name ? xstrdup(name) : 0;
180     return new_iochan;
181 }
182
183 static void work_handler(void *work_data)
184 {
185     IOCHAN p = work_data;
186
187     yaz_log(p->man->log_level, "eventl: work begin chan=%p name=%s event=%d",
188             p, p->name ? p->name : "", p->this_event);
189
190     if (!p->destroyed && (p->this_event & EVENT_TIMEOUT))
191         (*p->fun)(p, EVENT_TIMEOUT);
192     if (!p->destroyed && (p->this_event & EVENT_INPUT))
193         (*p->fun)(p, EVENT_INPUT);
194     if (!p->destroyed && (p->this_event & EVENT_OUTPUT))
195         (*p->fun)(p, EVENT_OUTPUT);
196     if (!p->destroyed && (p->this_event & EVENT_EXCEPT))
197         (*p->fun)(p, EVENT_EXCEPT);
198
199     yaz_log(p->man->log_level, "eventl: work end chan=%p name=%s event=%d", p,
200             p->name ? p->name : "", p->this_event);
201 }
202
203 static void run_fun(iochan_man_t man, IOCHAN p)
204 {
205     if (man->sel_thread)
206     {
207         yaz_log(man->log_level,
208                 "eventl: work add chan=%p name=%s event=%d", p,
209                 p->name ? p->name : "", p->this_event);
210         p->thread_users++;
211         sel_thread_add(man->sel_thread, p);
212     }
213     else
214         work_handler(p);
215 }
216
217 static int event_loop(iochan_man_t man, IOCHAN *iochans)
218 {
219     do /* loop as long as there are active associations to process */
220     {
221         IOCHAN p, *nextp;
222         IOCHAN start;
223         IOCHAN inv_start;
224         int res;
225         struct yaz_poll_fd *fds;
226         int i, no_fds = 0;
227         int connection_fired = 0;
228         int tv_sec = 300;
229
230         yaz_mutex_enter(man->iochan_mutex);
231         start = man->channel_list;
232         yaz_mutex_leave(man->iochan_mutex);
233         inv_start = start;
234         for (p = start; p; p = p->next)
235             no_fds++;
236         if (man->sel_fd != -1)
237             no_fds++;
238         if (no_fds > man->size_fds)
239         {
240             man->size_fds = no_fds * 2;
241             man->fds = xrealloc(man->fds, man->size_fds * sizeof(*man->fds));
242         }
243         fds = man->fds;
244         i = 0;
245         if (man->sel_fd != -1)
246         {
247             fds[i].fd = man->sel_fd;
248             fds[i].input_mask = yaz_poll_read;
249             fds[i].client_data = 0;
250             i++;
251         }
252         for (p = start; p; p = p->next, i++)
253         {
254             fds[i].client_data = p;
255             fds[i].fd = p->fd;
256             fds[i].input_mask = 0;
257             if (p->thread_users > 0)
258                 continue;
259             if (p->max_idle && p->max_idle < tv_sec)
260                 tv_sec = p->max_idle;
261             if (p->fd < 0)
262                 continue;
263             if (p->flags & EVENT_INPUT)
264                 fds[i].input_mask |= yaz_poll_read;
265             if (p->flags & EVENT_OUTPUT)
266                 fds[i].input_mask |= yaz_poll_write;
267             if (p->flags & EVENT_EXCEPT)
268                 fds[i].input_mask |= yaz_poll_except;
269         }
270         yaz_log(man->log_level, "yaz_poll begin nofds=%d", no_fds);
271         res = yaz_poll(fds, no_fds, tv_sec, 0);
272         yaz_log(man->log_level, "yaz_poll returned res=%d", res);
273         if (res < 0)
274         {
275             if (errno == EINTR)
276                 continue;
277             else
278             {
279                 yaz_log(YLOG_ERRNO | YLOG_WARN, "poll");
280                 return 0;
281             }
282         }
283         i = 0;
284         if (man->sel_fd != -1)
285         {
286             assert(fds[i].fd == man->sel_fd);
287             if (fds[i].output_mask)
288             {
289                 IOCHAN chan;
290
291                 yaz_log(man->log_level, "eventl: sel input on sel_fd=%d",
292                         man->sel_fd);
293                 while ((chan = sel_thread_result(man->sel_thread)))
294                 {
295                     yaz_log(man->log_level,
296                             "eventl: got thread result chan=%p name=%s", chan,
297                             chan->name ? chan->name : "");
298                     chan->thread_users--;
299                 }
300             }
301             i++;
302         }
303         if (man->log_level)
304         {
305             int no = 0;
306             for (p = start; p; p = p->next)
307                 no++;
308             yaz_log(man->log_level, "%d channels", no);
309         }
310         for (; i < no_fds; i++)
311         {
312             time_t now = time(0);
313             p = fds[i].client_data;
314
315             if (p->destroyed)
316             {
317                 yaz_log(man->log_level,
318                         "eventl: skip destroyed chan=%p name=%s", p,
319                         p->name ? p->name : "");
320                 continue;
321             }
322             if (p->thread_users > 0)
323             {
324                 yaz_log(man->log_level,
325                         "eventl: skip chan=%p name=%s users=%d", p,
326                         p->name ? p->name : "", p->thread_users);
327                 continue;
328             }
329             p->this_event = 0;
330             if (p->max_idle && now - p->last_event > p->max_idle)
331             {
332                 p->last_event = now;
333                 p->this_event |= EVENT_TIMEOUT;
334             }
335             if (fds[i].fd >= 0)
336             {
337                 if (fds[i].output_mask & yaz_poll_read)
338                 {
339                     p->last_event = now;
340                     p->this_event |= EVENT_INPUT;
341                 }
342                 if (fds[i].output_mask & yaz_poll_write)
343                 {
344                     p->last_event = now;
345                     p->this_event |= EVENT_OUTPUT;
346                 }
347                 if (fds[i].output_mask & yaz_poll_except)
348                 {
349                     p->last_event = now;
350                     p->this_event |= EVENT_EXCEPT;
351                 }
352             }
353             /* only fire one Z39.50/SRU socket event.. except for timeout */
354             if (p->this_event)
355             {
356                 if (!(p->this_event & EVENT_TIMEOUT) &&
357                     !strcmp(p->name, "connection_socket"))
358                 {
359                     /* not a timeout and we have a Z39.50/SRU socket */
360                     if (connection_fired == 0)
361                         run_fun(man, p);
362                     connection_fired++;
363                 }
364                 else
365                     run_fun(man, p);
366             }
367         }
368
369         assert(inv_start == start);
370         yaz_mutex_enter(man->iochan_mutex);
371         for (nextp = iochans; *nextp; )
372         {
373             IOCHAN p = *nextp;
374             if (p->destroyed && p->thread_users == 0)
375                 *nextp = iochan_destroy_real(p);
376             else
377                 nextp = &p->next;
378         }
379         yaz_mutex_leave(man->iochan_mutex);
380     } while (*iochans);
381     return 0;
382 }
383
384 void iochan_man_events(iochan_man_t man)
385 {
386     if (man->no_threads > 0 && !man->sel_thread)
387     {
388         man->sel_thread = sel_thread_create(work_handler, 0 /*work_destroy */,
389                 &man->sel_fd, man->no_threads);
390         yaz_log(man->log_level, "iochan_man_events. Using %d threads",
391                 man->no_threads);
392     }
393     event_loop(man, &man->channel_list);
394 }
395
396 /*
397  * Local variables:
398  * c-basic-offset: 4
399  * c-file-style: "Stroustrup"
400  * indent-tabs-mode: nil
401  * End:
402  * vim: shiftwidth=4 tabstop=8 expandtab
403  */
404