Thread pool for server
[yaz-moved-to-github.git] / src / srv.c
1 /* This file is part of the YAZ toolkit.
2  * Copyright (C) 1995-2009 Index Data
3  * See the file LICENSE for details.
4  */
5 /**
6  * \file 
7  * \brief Small HTTP server
8  */
9
10 #include <yaz/zgdu.h>
11 #include <yaz/comstack.h>
12 #include <yaz/nmem.h>
13 #include <yaz/log.h>
14 #include <yaz/sock_man.h>
15 #include <yaz/tpool.h>
16 #include <assert.h>
17 #include <yaz/srv.h>
18
19
20 enum cs_ses_type {
21     cs_ses_type_listener,
22     cs_ses_type_accepting,
23     cs_ses_type_normal
24 };
25
26 struct cs_session {
27     enum cs_ses_type type;
28     COMSTACK cs;
29     yaz_sock_chan_t chan;
30     unsigned cs_put_mask;
31     unsigned cs_get_mask;
32     char *input_buffer;
33     int input_len;
34     void *user;
35 };
36     
37 struct yaz_pkg_s {
38     Z_GDU *gdu;
39     ODR odr;
40     struct cs_session *ses;
41     yaz_srv_t srv;
42 };
43
44 struct yaz_srv_s {
45     struct cs_session *listeners;
46     size_t num_listeners;
47     NMEM nmem;
48     yaz_sock_man_t sock_man;
49     yaz_tpool_t tpool;
50     int stop_flag;
51     yaz_srv_session_handler_t *session_handler;
52     yaz_srv_gdu_handler_t *gdu_handler;
53 };
54
55 static void cs_session_init(struct cs_session *ses, enum cs_ses_type type)
56 {
57     ses->type = type;
58     ses->cs = 0;
59     ses->chan = 0;
60     ses->cs_put_mask = 0;
61     ses->cs_get_mask = yaz_poll_read;
62     ses->input_buffer = 0;
63     ses->input_len = 0;
64 }
65
66 static void cs_session_destroy(struct cs_session *ses)
67 {
68     xfree(ses->input_buffer);
69     if (ses->chan)
70         yaz_sock_chan_destroy(ses->chan);
71     if (ses->cs)
72         cs_close(ses->cs);
73 }
74
75 void yaz_srv_destroy(yaz_srv_t p)
76 {
77     if (p)
78     {
79         size_t i;
80
81         yaz_tpool_destroy(p->tpool);
82         for (i = 0; i < p->num_listeners; i++)
83         {
84             cs_session_destroy(p->listeners + i);
85         }
86         yaz_sock_man_destroy(p->sock_man);
87         nmem_destroy(p->nmem);
88     }
89 }
90
91 yaz_srv_t yaz_srv_create(const char **listeners_str)
92 {
93     NMEM nmem = nmem_create();
94     yaz_srv_t p = nmem_malloc(nmem, sizeof(*p));
95     size_t i;
96     for (i = 0; listeners_str[i]; i++)
97         ;
98     p->nmem = nmem;
99
100     p->stop_flag = 0;
101     p->session_handler = 0;
102     p->gdu_handler = 0;
103     p->num_listeners = i;
104     p->listeners = 
105         nmem_malloc(nmem, p->num_listeners * sizeof(*p->listeners));
106     p->sock_man = yaz_sock_man_new();
107     p->tpool = 0;
108     for (i = 0; i < p->num_listeners; i++)
109     {
110         void *ap;
111         const char *where = listeners_str[i];
112         COMSTACK l = cs_create_host(where, CS_FLAGS_NUMERICHOST, &ap);
113
114         cs_session_init(p->listeners +i, cs_ses_type_listener);
115         if (!l)
116         {
117             yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_create_host(%s) failed", where);
118         }
119         else
120         {
121             if (cs_bind(l, ap, CS_SERVER) < 0)
122             {
123                 if (cs_errno(l) == CSYSERR)
124                     yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to bind to %s", where);
125                 else
126                     yaz_log(YLOG_FATAL, "Failed to bind to %s: %s", where,
127                             cs_strerror(l));
128                 cs_close(l);
129             }
130             else
131             {
132                 p->listeners[i].cs = l; /* success */
133                 p->listeners[i].chan =
134                     yaz_sock_chan_new(p->sock_man,
135                                       cs_fileno(l),
136                                       p->listeners + i,
137                                       yaz_poll_read | yaz_poll_except);
138             }
139         }
140     }
141
142     /* check if all are OK */
143     for (i = 0; i < p->num_listeners; i++)
144         if (!p->listeners[i].cs)
145         {
146             yaz_srv_destroy(p);
147             return 0;
148         }
149     return p;
150 }
151
152 static void new_session(yaz_srv_t p, COMSTACK new_line)
153 {
154     struct cs_session *ses = xmalloc(sizeof(*ses));
155     unsigned mask =  
156         ((new_line->io_pending & CS_WANT_WRITE) ? yaz_poll_write : 0) |
157         ((new_line->io_pending & CS_WANT_READ) ? yaz_poll_read : 0);
158
159     if (mask)
160     {
161         yaz_log(YLOG_LOG, "type accepting");
162         cs_session_init(ses, cs_ses_type_accepting);
163     }
164     else
165     {
166         yaz_log(YLOG_LOG, "type normal");
167         cs_session_init(ses, cs_ses_type_normal);
168         mask = yaz_poll_read;
169         ses->user = p->session_handler(ses);
170     }
171     ses->cs = new_line;
172     ses->chan = yaz_sock_chan_new(p->sock_man, cs_fileno(new_line), ses, mask);
173 }
174
175 void yaz_pkg_destroy(yaz_pkg_t pkg)
176 {
177     if (pkg)
178     {
179         odr_destroy(pkg->odr);
180         xfree(pkg);
181     }
182 }
183
184 void work_handler(void *data)
185 {
186     yaz_pkg_t pkg = (yaz_pkg_t) data;
187
188     pkg->srv->gdu_handler(pkg, pkg->ses->user);
189     yaz_pkg_destroy(pkg);
190 }
191
192 void work_destroy(void *data)
193 {
194     yaz_pkg_t pkg = (yaz_pkg_t) data;
195     yaz_pkg_destroy(pkg);
196 }
197
198
199 void yaz_srv_run(yaz_srv_t p, yaz_srv_session_handler_t session_handler,
200                  yaz_srv_gdu_handler_t gdu_handler)
201 {
202     yaz_sock_chan_t chan;
203
204     p->session_handler = session_handler;
205     p->gdu_handler = gdu_handler;
206
207     assert(!p->tpool);
208     p->tpool = yaz_tpool_create(work_handler, work_destroy, 20);
209     while ((chan = yaz_sock_man_wait(p->sock_man)))
210     {
211         unsigned output_mask = yaz_sock_get_mask(chan);
212         struct cs_session *ses = yaz_sock_chan_get_data(chan);
213
214         if (p->stop_flag)
215             break;
216         switch (ses->type)
217         {
218         case cs_ses_type_listener:
219             if (yaz_sock_get_mask(chan) & yaz_poll_read)
220             {
221                 int ret = cs_listen(ses->cs, 0, 0);
222                 if (ret < 0)
223                 {
224                     yaz_log(YLOG_WARN|YLOG_ERRNO, "listen failed");
225                 }
226                 else if (ret == 1)
227                 {
228                     yaz_log(YLOG_WARN, "cs_listen incomplete");
229                 }
230                 else
231                 {
232                     COMSTACK new_line = cs_accept(ses->cs);
233                     if (new_line)
234                     {
235                         yaz_log(YLOG_LOG, "new session");
236                         new_session(p, new_line);
237                     }
238                     else
239                     {
240                         yaz_log(YLOG_WARN|YLOG_ERRNO, "accept failed");
241                     }
242                 }
243             }
244             break;
245         case cs_ses_type_accepting:
246             if (!cs_accept(ses->cs))
247             {
248                 yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_accept failed");
249                 cs_session_destroy(ses);
250                 xfree(ses);
251             }
252             else
253             {
254                 unsigned mask =  
255                     ((ses->cs->io_pending & CS_WANT_WRITE) ? yaz_poll_write : 0) |
256                     ((ses->cs->io_pending & CS_WANT_READ) ? yaz_poll_read : 0);
257                 if (mask)
258                 {
259                     ses->type = cs_ses_type_accepting;
260                 }
261                 else
262                 {
263                     ses->type = cs_ses_type_normal;
264                     mask = yaz_poll_read;
265                 }
266                 yaz_sock_chan_set_mask(ses->chan, mask);
267             }
268             break;
269         case cs_ses_type_normal:
270             if ((ses->cs_put_mask & yaz_poll_read) == 0 &&
271                 output_mask & ses->cs_get_mask)
272             {
273                 /* receiving package */
274                 unsigned new_mask = yaz_poll_read;
275                 yaz_log(YLOG_LOG, "Receive");
276                 do
277                 {
278                     int res = cs_get(ses->cs, &ses->input_buffer, &ses->input_len);
279                     if (res <= 0)
280                     {
281                         yaz_log(YLOG_WARN, "Connection closed by client");
282                         cs_session_destroy(ses);
283                         xfree(ses);
284                         ses = 0;
285                         break;
286                     }
287                     else if (res == 1)
288                     {
289                         if (ses->cs->io_pending & CS_WANT_WRITE)
290                             new_mask |= yaz_poll_write;
291                         break;
292                     }
293                     else
294                     {  /* complete package */
295                         yaz_pkg_t pkg = xmalloc(sizeof(*pkg));
296                         yaz_log(YLOG_LOG, "COMPLETE PACKAGE");
297
298                         pkg->ses = ses;
299                         pkg->srv = p;
300                         pkg->odr = odr_createmem(ODR_DECODE);
301                         odr_setbuf(pkg->odr, ses->input_buffer, res, 0);
302                         if (!z_GDU(pkg->odr, &pkg->gdu, 0, 0))
303                         {
304                             yaz_log(YLOG_WARN, "decoding failed");
305                             odr_destroy(pkg->odr);
306                             xfree(pkg);
307                         }
308                         else
309                         {
310                             yaz_tpool_add(p->tpool, pkg);
311                         }
312                     }
313                 } while (cs_more(ses->cs));
314                 yaz_sock_chan_set_mask(chan, new_mask);
315             }
316             if (ses && (output_mask & ses->cs_put_mask))
317             {  /* sending package */
318                 yaz_log(YLOG_LOG, "Sending");
319             }
320         }
321     }
322 }
323
324 Z_GDU **yaz_pkg_get_gdu(yaz_pkg_t pkg)
325 {
326     return &pkg->gdu;
327 }
328
329 ODR yaz_pkg_get_odr(yaz_pkg_t pkg)
330 {
331     return pkg->odr;
332 }
333
334 void yaz_pkg_close(yaz_pkg_t pkg)
335 {
336     struct cs_session *ses = pkg->ses;
337     if (ses)
338     {
339         cs_session_destroy(ses);
340         xfree(ses);
341     }
342     pkg->ses = 0;
343 }
344
345 void yaz_pkg_stop_server(yaz_pkg_t pkg)
346 {
347     pkg->srv->stop_flag = 1;
348 }
349
350 yaz_pkg_t yaz_pkg_create(yaz_pkg_t request_pkg)
351 {
352     yaz_pkg_t pkg = xmalloc(sizeof(*pkg));
353
354     pkg->gdu = 0;
355     pkg->odr = odr_createmem(ODR_ENCODE);
356     pkg->ses = request_pkg->ses;
357     pkg->srv = request_pkg->srv;
358     return pkg;
359 }
360
361 Z_GDU *zget_wrap_APDU(ODR o, Z_APDU *apdu)
362 {
363     Z_GDU *gdu = odr_malloc(o, sizeof(*gdu));
364     gdu->which = Z_GDU_Z3950;
365     gdu->u.z3950 = apdu;
366     return gdu;
367 }
368
369 void yaz_pkg_send(yaz_pkg_t pkg)
370 {
371     yaz_log(YLOG_WARN, "send.. UNFINISHED");
372 }
373
374 /*
375  * Local variables:
376  * c-basic-offset: 4
377  * c-file-style: "Stroustrup"
378  * indent-tabs-mode: nil
379  * End:
380  * vim: shiftwidth=4 tabstop=8 expandtab
381  */
382