Monitor observes death of child (email kernel). The number
[egate.git] / kernel / monitor.c
1 /* Gateway Resource Monitor
2  * Europagate, 1995
3  *
4  * $Log: monitor.c,v $
5  * Revision 1.4  1995/05/02 15:26:00  adam
6  * Monitor observes death of child (email kernel). The number
7  * of simultanous processes is controlled now. Email requests are
8  * queued if necessary. This scheme should only be forced if no kernels
9  * are idle.
10  *
11  * Revision 1.3  1995/05/02  07:20:10  adam
12  * Use pid of exited child to close fifos.
13  *
14  * Revision 1.2  1995/05/01  16:26:57  adam
15  * More work on resource monitor.
16  *
17  * Revision 1.1  1995/05/01  12:43:36  adam
18  * First work on resource monitor program.
19  *
20  */
21
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <assert.h>
25 #include <ctype.h>
26 #include <string.h>
27 #include <unistd.h>
28 #include <fcntl.h>
29 #include <setjmp.h>
30 #include <signal.h>
31
32 #include <sys/file.h>
33 #include <sys/stat.h>
34 #include <sys/types.h>
35 #include <sys/time.h>
36 #include <sys/wait.h>
37
38 #include <gw-log.h>
39 #include <gw-res.h>
40 #include <gip.h>
41 #include <strqueue.h>
42 #include <lgets.h>
43
44 #define LINE_MAX 1024
45
46 #define MONITOR_FIFO_S "fifo.s"
47 #define MONITOR_FIFO_C "fifo.c"
48
49 static char *module = "monitor";
50 static jmp_buf retry_jmp;
51
52 static GwRes monitor_res = NULL;
53 static int no_process = 0;
54 static int max_process = 1;
55 static int got_sighup = 0;
56 static int got_term = 0;
57 const char *default_res = "default.res";
58
59 static void reread_resources (void)
60 {
61     if (monitor_res)
62         gw_res_close (monitor_res);
63     monitor_res = gw_res_init ();
64     if (gw_res_merge (monitor_res, default_res))
65     {
66         gw_log (GW_LOG_WARN, module, "Couldn't read resource file %s",
67                 default_res);
68         exit (1);
69     }
70     max_process = atoi (gw_res_get (monitor_res, "gw.max.process", "10"));
71 }
72
73 struct ke_info {
74     pid_t pid;
75     int id;
76     GIP gip;
77     struct str_queue *queue;
78     struct ke_info *next;
79 };
80
81 struct ke_info *ke_info_list = NULL;
82
83 struct ke_info *ke_info_add (int id)
84 {
85     struct ke_info **kip;
86
87     for (kip = &ke_info_list; *kip; kip= &(*kip)->next)
88         if ((*kip)->id == id)
89             return *kip;
90     *kip = malloc (sizeof(**kip));
91     assert (*kip);
92     (*kip)->next = NULL;
93     (*kip)->id = id;
94     (*kip)->gip = NULL;
95     (*kip)->queue = NULL;
96     return *kip;
97 }
98
99 static void ke_info_del (void)
100 {
101     struct ke_info *ki;
102
103     assert (ke_info_list);
104     ki = ke_info_list;
105     str_queue_rm (&ki->queue);
106     ke_info_list = ki->next;
107     free (ki);
108 }
109
110 static void catch_child (int num)
111 {
112     pid_t pid;
113     struct ke_info *ki;
114
115     while ((pid=waitpid (-1, 0, WNOHANG)) > 0)
116     {
117         for (ki = ke_info_list; ki; ki = ki->next)
118             if (ki->pid == pid)
119                 ki->pid = -1;
120         --no_process;
121     }
122     signal (SIGCHLD, catch_child);
123 }
124
125 static void catch_hup (int num)
126 {
127     got_sighup = 1;
128     signal (SIGHUP, catch_hup);
129 }
130
131 static void catch_term (int num)
132 {
133     got_term = 1;
134     signal (SIGTERM, catch_term);
135 }
136
137 static void pipe_handle (int dummy)
138 {
139     longjmp (retry_jmp, 1);
140 }
141
142 static pid_t start_kernel (int argc, char **argv, int id)
143 {
144     pid_t pid;
145     int i;
146     char **argv_p;
147     char userid_option[20];
148
149     argv_p = malloc (sizeof(*argv_p)*(argc+2));
150     if (!argv_p)
151     {
152         gw_log (GW_LOG_FATAL|GW_LOG_ERRNO, module, "malloc fail");
153         exit (1);
154     }
155     argv_p[0] = "kernel";
156     for (i = 1; i<argc; i++)
157         argv_p[i] = argv[i];
158     sprintf (userid_option, "-i%d", id);
159     argv_p[i++] = userid_option;
160     argv_p[i++] = NULL;
161
162     gw_log (GW_LOG_DEBUG, module, "Starting kernel");
163     pid = fork ();
164     if (pid == -1)
165     {
166         gw_log (GW_LOG_FATAL|GW_LOG_ERRNO, module, "fork");
167         exit (1);
168     }
169     if (!pid)
170     {
171         execv ("kernel", argv_p);
172         gw_log (GW_LOG_FATAL|GW_LOG_ERRNO, module, "execvp");
173         exit (1);
174     }
175     return pid;
176 }
177
178 static int deliver (int argc, char **argv, int id, struct str_queue *queue,
179                     GIP *gip, pid_t *pidp, int dont_exec)
180 {
181     int pass = 0;
182     int r;
183     int index;
184     char fifo_server_name[128];
185     char fifo_client_name[128];
186     void (*oldsig)();
187     const char *msg;
188
189     sprintf (fifo_server_name, "fifo.s.%d", id);
190     sprintf (fifo_client_name, "fifo.c.%d", id);
191
192     assert (gip);
193     if (!*gip)
194         *gip = gipc_initialize (fifo_client_name);
195
196     oldsig = signal (SIGPIPE, pipe_handle);
197     setjmp (retry_jmp);
198     ++pass;
199     if (pass == 1)
200     {
201         gipc_close (*gip);
202         r = gipc_open (*gip, fifo_server_name, 0);
203     }
204     else if (pass == 2)
205     {
206         pid_t pid;
207
208         if (dont_exec)
209         {
210             signal (SIGPIPE, oldsig);
211             return 0;
212         }
213         mknod (fifo_server_name, S_IFIFO|0666, 0);
214         pid = start_kernel (argc, argv, id);
215         if (pidp)
216             *pidp = pid;
217         r = gipc_open (*gip, fifo_server_name, 1);
218     }
219     else
220     {
221         signal (SIGPIPE, oldsig);
222         gw_log (GW_LOG_WARN, module, "Cannot start kernel");
223         return 0;
224     }
225     if (r < 0)
226     {
227         if (r == -2)
228         {
229             gw_log (GW_LOG_DEBUG|GW_LOG_ERRNO, module, "r==-2");
230             longjmp (retry_jmp, 1);
231         }
232         else if (r == -1)
233         {
234             gw_log (GW_LOG_DEBUG|GW_LOG_ERRNO, module, "r==-1");
235             longjmp (retry_jmp, 1);
236         }
237         else
238         {
239             gw_log (GW_LOG_WARN|GW_LOG_ERRNO, module, "gipc_open");
240         }
241     }
242     index = 0;
243     while ((msg = str_queue_get (queue, index++)))
244         gip_wline (*gip, msg);
245     signal (SIGPIPE, oldsig);
246     return pass;
247 }
248
249 static void monitor_events (int argc, char **argv)
250 {
251     GIP gip_m;
252     int r, gip_m_fd;
253     char line_buf[1024];
254     fd_set set_r;
255     char command[128], *cp;
256
257     gip_m = gips_initialize (MONITOR_FIFO_S);
258     r = gips_open (gip_m, MONITOR_FIFO_C);
259     gip_m_fd = gip_infileno (gip_m);
260     open (MONITOR_FIFO_S, O_WRONLY);
261
262     while (1)
263     {
264         int fd_max;
265         struct ke_info *ki;
266
267         while (1)
268         {
269             if (got_sighup)
270             {
271                 gw_log (GW_LOG_STAT, module, "Got SIGHUP. Reading resources");
272                 reread_resources ();
273                 got_sighup = 0;
274             }
275             if (got_term)
276             {
277                 gw_log (GW_LOG_STAT, module, "Got SIGTERM. Exiting...");
278                 unlink (MONITOR_FIFO_S);
279                 unlink (MONITOR_FIFO_C);
280                 exit (0);
281             }
282             for (ki = ke_info_list; ki; ki = ki->next)
283             {
284                 if (!ki->queue)
285                     continue;
286                 gw_log (GW_LOG_DEBUG, module, "Transfer mail to %d", ki->id);
287                 r = deliver (argc, argv, ki->id, ki->queue, &ki->gip, &ki->pid,
288                              no_process >= max_process);
289                 if (r == 2)
290                     ++no_process;
291                 if (r == 1 || r == 2)
292                     str_queue_rm (&ki->queue);
293             }
294             FD_ZERO (&set_r);
295             FD_SET (gip_m_fd, &set_r);
296             gw_log (GW_LOG_DEBUG, module, "set gip_m_fd %d", gip_m_fd);
297             fd_max = gip_m_fd;
298             
299             for (ki = ke_info_list; ki; ki = ki->next)
300             {
301                 int fd;
302                 if (ki->gip)
303                 {
304                     if (ki->pid == -1)
305                     {
306                         gw_log (GW_LOG_DEBUG, module, "Close of %d", ki->id);
307                         gipc_close (ki->gip);
308                         gipc_destroy (ki->gip);
309                         ki->gip = NULL;
310                     }
311                     else if ((fd = gip_infileno (ki->gip)) != -1)
312                     {
313                         gw_log (GW_LOG_DEBUG, module, "set fd %d", fd);
314                         FD_SET (fd, &set_r);
315                         if (fd > fd_max)
316                             fd_max = fd;
317                     }
318                 }
319             }
320             gw_log (GW_LOG_DEBUG, module, "Cur/Max processes %d/%d",
321                     no_process, max_process);
322             gw_log (GW_LOG_DEBUG, module, "IPC select");
323             r = select (fd_max+1, &set_r, NULL, NULL, NULL);
324             if (r != -1)
325                 break;
326             if (errno != EINTR)
327             {
328                 gw_log (GW_LOG_FATAL|GW_LOG_ERRNO, module, "select");
329                 exit (1);
330             }
331             gw_log (GW_LOG_DEBUG|GW_LOG_ERRNO, module, "select");
332         }
333         gw_log (GW_LOG_DEBUG, module, "Testing ke_info_list");
334         for (ki = ke_info_list; ki; ki = ki->next)
335         {
336             int fd;
337             if (ki->gip && (fd = gip_infileno (ki->gip)) != -1)
338             {
339                 gw_log (GW_LOG_DEBUG, module, "Test of %d", fd);
340                 if (FD_ISSET (fd, &set_r))
341                 {
342                     if (lgets (line_buf, sizeof(line_buf)-1, fd))
343                     {
344                         gw_log (GW_LOG_DEBUG, module, "IPC: %s", line_buf);
345                     }
346                     else
347                     {
348                         gw_log (GW_LOG_DEBUG, module, "Close of %d", ki->id);
349                         gipc_close (ki->gip);
350                         gipc_destroy (ki->gip);
351                         ki->gip = NULL;
352                     }
353                 }
354             }
355         }
356         gw_log (GW_LOG_DEBUG, module, "Testing gip_m_fd %d", gip_m_fd);
357         if (FD_ISSET (gip_m_fd, &set_r))
358         {
359             gw_log (GW_LOG_DEBUG, module, "Reading from %d", gip_m_fd);
360             if (!(lgets (command, sizeof(command)-1, gip_m_fd)))
361             {
362                 gw_log (GW_LOG_FATAL, module, "Unexpected close");
363                 exit (1);
364             }
365             gw_log (GW_LOG_DEBUG, module, "Done");
366             if ((cp = strchr (command, '\n')))
367                 *cp = '\0';
368             gw_log (GW_LOG_DEBUG, module, "IPC: %s", command);
369             if (!memcmp (command, "eti ", 4))
370             {
371                 int id = atoi (command+4);
372                 struct ke_info *new_k;
373                 
374                 new_k = ke_info_add (id);
375                 gw_log (GW_LOG_DEBUG, module, "Incoming mail %d", id);
376                 
377                 if (!new_k->queue)
378                 {
379                     if (!(new_k->queue = str_queue_mk ()))
380                     {
381                         gw_log (GW_LOG_FATAL|GW_LOG_ERRNO, module,
382                                 "str_queue_mk");
383                         exit (1);
384                     }
385                 }
386                 str_queue_enq (new_k->queue, "mail\n");
387                 while (lgets (line_buf, sizeof(line_buf)-1, gip_m_fd))
388                     str_queue_enq (new_k->queue, line_buf);
389                 str_queue_enq (new_k->queue, "\001");
390             }
391         }
392     }
393 }
394
395 int main (int argc, char **argv)
396 {
397     int argno = 0;
398
399     gw_log_init (*argv);
400     while (++argno < argc)
401     {
402         if (argv[argno][0] == '-')
403         {
404             switch (argv[argno][1])
405             {
406             case 'H':
407                 fprintf (stderr, "monitor [option..] [resource]\n");
408                 fprintf (stderr, "If no resource file is given");
409                 fprintf (stderr, " default.res is used\n");
410                 fprintf (stderr, "Options are transferred to kernel\n");
411                 exit (1);
412             case 'd':
413                 gw_log_level (GW_LOG_ALL & ~RES_DEBUG);
414                 break;
415             case 'D':
416                 gw_log_level (GW_LOG_ALL);
417                 break;
418             }
419         }
420         else
421             default_res = argv[argno];
422     }
423     reread_resources ();
424     signal (SIGCHLD, catch_child);
425     signal (SIGHUP, catch_hup);
426     signal (SIGTERM, catch_term);
427 #if 0
428     gw_log_file (GW_LOG_ALL, "monitor.log");
429 #endif
430     monitor_events (argc, argv);
431     exit (0);
432 }