Happy new year
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /* This file is part of the yazpp toolkit.
2  * Copyright (C) 1998-2011 Index Data and Mike Taylor
3  * See the file LICENSE for details.
4  */
5
6 #include <assert.h>
7 #include <string.h>
8 #include <yaz/log.h>
9 #include <yaz/tcpip.h>
10
11 #include <yazpp/pdu-assoc.h>
12
13 #if HAVE_FCNTL_H
14 #include <fcntl.h>
15 #endif
16
17 using namespace yazpp_1;
18
19 void PDU_Assoc::init(ISocketObservable *socketObservable)
20 {
21     m_state = Closed;
22     m_cs = 0;
23     m_socketObservable = socketObservable;
24     m_PDU_Observer = 0;
25     m_queue_out = 0;
26     m_queue_in = 0;
27     m_input_buf = 0;
28     m_input_len = 0;
29     m_children = 0;
30     m_parent = 0;
31     m_next = 0;
32     m_destroyed = 0;
33     m_idleTime = 0;
34     m_log = YLOG_DEBUG;
35     m_session_is_dead = false;
36 }
37
38 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
39 {
40     init (socketObservable);
41 }
42
43 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
44                      COMSTACK cs)
45 {
46     init(socketObservable);
47     m_cs = cs;
48     unsigned mask = 0;
49     if (cs->io_pending & CS_WANT_WRITE)
50         mask |= SOCKET_OBSERVE_WRITE;
51     if (cs->io_pending & CS_WANT_READ)
52         mask |= SOCKET_OBSERVE_READ;
53     m_socketObservable->addObserver(cs_fileno(cs), this);
54     if (!mask)
55     {
56         yaz_log (m_log, "new PDU_Assoc. Ready");
57         m_state = Ready;
58         flush_PDU();
59     }
60     else
61     {
62         yaz_log (m_log, "new PDU_Assoc. Accepting");
63         // assume comstack is accepting...
64         m_state = Accepting;
65         m_socketObservable->addObserver(cs_fileno(cs), this);
66         yaz_log(m_log, "maskObserver 1");
67         m_socketObservable->maskObserver(this,
68                                          mask |SOCKET_OBSERVE_EXCEPT);
69     }
70 }
71
72
73 IPDU_Observable *PDU_Assoc::clone()
74 {
75     PDU_Assoc *copy = new PDU_Assoc(m_socketObservable);
76     return copy;
77 }
78
79 void PDU_Assoc::socketNotify(int event)
80 {
81     yaz_log (m_log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
82           this, m_state, event);
83     if (event & SOCKET_OBSERVE_EXCEPT)
84     {
85         shutdown();
86         m_PDU_Observer->failNotify();
87         return;
88     }
89     else if (event & SOCKET_OBSERVE_TIMEOUT)
90     {
91         m_PDU_Observer->timeoutNotify();
92         return;
93     }
94     switch (m_state)
95     {
96     case Accepting:
97         if (!cs_accept (m_cs))
98         {
99             yaz_log (m_log, "PDU_Assoc::cs_accept failed");
100             m_cs = 0;
101             shutdown();
102             m_PDU_Observer->failNotify();
103         }
104         else
105         {
106             unsigned mask = 0;
107             if (m_cs->io_pending & CS_WANT_WRITE)
108                 mask |= SOCKET_OBSERVE_WRITE;
109             if (m_cs->io_pending & CS_WANT_READ)
110                 mask |= SOCKET_OBSERVE_READ;
111             if (!mask)
112             {   // accept is complete. turn to ready state and write if needed
113                 m_state = Ready;
114                 flush_PDU();
115             }
116             else  
117             {   // accept still incomplete.
118                 yaz_log(m_log, "maskObserver 2");
119                 m_socketObservable->maskObserver(this,
120                                              mask|SOCKET_OBSERVE_EXCEPT);
121             }
122         }
123         break;
124     case Connecting:
125         if (event & SOCKET_OBSERVE_READ && 
126             event & SOCKET_OBSERVE_WRITE)
127         {
128             // For Unix: if both read and write is set, then connect failed.
129             shutdown();
130             m_PDU_Observer->failNotify();
131         }
132         else
133         {
134             yaz_log (m_log, "cs_rcvconnect");
135             int res = cs_rcvconnect (m_cs);
136             if (res == 1)
137             {
138                 unsigned mask = SOCKET_OBSERVE_EXCEPT;
139                 if (m_cs->io_pending & CS_WANT_WRITE)
140                     mask |= SOCKET_OBSERVE_WRITE;
141                 if (m_cs->io_pending & CS_WANT_READ)
142                     mask |= SOCKET_OBSERVE_READ;
143                 yaz_log(m_log, "maskObserver 3");
144                 m_socketObservable->maskObserver(this, mask);
145             }
146             else
147             {
148                 m_state = Ready;
149                 if (m_PDU_Observer)
150                     m_PDU_Observer->connectNotify();
151                 flush_PDU();
152             }
153         }
154         break;
155     case Listen:
156         if (event & SOCKET_OBSERVE_READ)
157         {
158             int res;
159             COMSTACK new_line;
160             
161             if ((res = cs_listen(m_cs, 0, 0)) == 1)
162                 return;
163             if (res < 0)
164             {
165                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
166                 return;
167             }
168             if (!(new_line = cs_accept(m_cs)))
169                 return;
170             /* 1. create socket-manager 
171                2. create pdu-assoc
172                3. create top-level object
173                     setup observer for child fileid in pdu-assoc
174                4. start thread
175             */
176             yaz_log (m_log, "new session: parent fd=%d child fd=%d",
177                      cs_fileno(m_cs), cs_fileno(new_line));
178             childNotify (new_line);
179         }
180         break;
181     case Writing:
182         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
183             flush_PDU();
184         break;
185     case Ready:
186         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
187         {
188             do
189             {
190                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
191                 if (res == 1)
192                 {
193                     unsigned mask = SOCKET_OBSERVE_EXCEPT;
194                     if (m_cs->io_pending & CS_WANT_WRITE)
195                         mask |= SOCKET_OBSERVE_WRITE;
196                     if (m_cs->io_pending & CS_WANT_READ)
197                         mask |= SOCKET_OBSERVE_READ;
198                     yaz_log(m_log, "maskObserver 4");
199                     m_socketObservable->maskObserver(this, mask);
200                     return;
201                 }
202                 else if (res <= 0)
203                 {
204                     yaz_log (m_log, "PDU_Assoc::Connection closed by peer");
205                     shutdown();
206                     if (m_PDU_Observer)
207                         m_PDU_Observer->failNotify(); // problem here..
208                     return;
209                 }
210                 // lock it, so we know if recv_PDU deletes it.
211                 int destroyed = 0;
212                 m_destroyed = &destroyed;
213
214                 if (!m_PDU_Observer)
215                     return;
216 #if 0
217                 PDU_Queue **pq = &m_queue_in;
218                 while (*pq)
219                     pq = &(*pq)->m_next;
220                 
221                 *pq = new PDU_Queue(m_input_buf, res);
222 #else
223                 m_PDU_Observer->recv_PDU(m_input_buf, res);
224 #endif
225                 if (destroyed)   // it really was destroyed, return now.
226                     return;
227                 m_destroyed = 0;
228             } while (m_cs && cs_more (m_cs));
229             if (m_cs && m_state == Ready)
230             {
231                 yaz_log(m_log, "maskObserver 5");
232                 m_socketObservable->maskObserver(this,
233                                                  SOCKET_OBSERVE_EXCEPT|
234                                                  SOCKET_OBSERVE_READ);
235             }
236         }
237         break;
238     case Closed:
239         yaz_log (m_log, "CLOSING state=%d event was %d", m_state, event);
240         shutdown();
241         m_PDU_Observer->failNotify();
242         break;
243     default:
244         yaz_log (m_log, "Unknown state=%d event was %d", m_state, event);
245         shutdown();
246         m_PDU_Observer->failNotify();
247     }
248 }
249
250 void PDU_Assoc::close_session()
251 {
252     m_session_is_dead = true;
253     if (!m_queue_out)
254     {
255         shutdown();
256         m_PDU_Observer->failNotify();
257     }
258 }
259
260 void PDU_Assoc::shutdown()
261 {
262     PDU_Assoc *ch;
263     for (ch = m_children; ch; ch = ch->m_next)
264         ch->shutdown();
265
266     m_socketObservable->deleteObserver(this);
267     m_state = Closed;
268     if (m_cs)
269     {
270         yaz_log (m_log, "PDU_Assoc::close fd=%d", cs_fileno(m_cs));
271         cs_close (m_cs);
272     }
273     m_cs = 0;
274     while (m_queue_out)
275     {
276         PDU_Queue *q_this = m_queue_out;
277         m_queue_out = m_queue_out->m_next;
278         delete q_this;
279     }
280     xfree (m_input_buf);
281     m_input_buf = 0;
282     m_input_len = 0;
283 }
284
285 void PDU_Assoc::destroy()
286 {
287     shutdown();
288
289     if (m_destroyed)
290         *m_destroyed = 1;
291     PDU_Assoc **c;
292
293     // delete from parent's child list (if any)
294     if (m_parent)
295     {
296         c = &m_parent->m_children;
297         while (*c != this)
298         {
299             assert (*c);
300             c = &(*c)->m_next;
301         }
302         *c = (*c)->m_next;
303     }
304     // delete all children ...
305     c = &m_children;
306     while (*c)
307     {
308         PDU_Assoc *here = *c;
309         *c = (*c)->m_next;
310         here->m_parent = 0;
311         delete here;
312     }
313     yaz_log (m_log, "PDU_Assoc::destroy this=%p", this);
314 }
315
316 PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
317 {
318     m_buf = (char *) xmalloc (len);
319     memcpy (m_buf, buf, len);
320     m_len = len;
321     m_next = 0;
322 }
323
324 PDU_Assoc::PDU_Queue::~PDU_Queue()
325 {
326     xfree (m_buf);
327 }
328
329 int PDU_Assoc::flush_PDU()
330 {
331     int r;
332     
333     if (m_state != Ready && m_state != Writing)
334     {
335         yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready");
336         return 1;
337     }
338     PDU_Queue *q = m_queue_out;
339     if (!q)
340     {
341         m_state = Ready;
342         yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty");
343         yaz_log(m_log, "maskObserver 6");
344         m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
345                                          SOCKET_OBSERVE_WRITE|
346                                          SOCKET_OBSERVE_EXCEPT);
347         if (m_session_is_dead)
348         {
349             shutdown();
350             m_PDU_Observer->failNotify();
351         }
352         return 0;
353     }
354     r = cs_put (m_cs, q->m_buf, q->m_len);
355     if (r < 0)
356     {
357         yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put failed");
358         shutdown();
359         m_PDU_Observer->failNotify();
360         return r;
361     }
362     if (r == 1)
363     {
364         unsigned mask = SOCKET_OBSERVE_EXCEPT;
365         m_state = Writing;
366         if (m_cs->io_pending & CS_WANT_WRITE)
367             mask |= SOCKET_OBSERVE_WRITE;
368         if (m_cs->io_pending & CS_WANT_READ)
369             mask |= SOCKET_OBSERVE_READ;
370
371         mask |= SOCKET_OBSERVE_WRITE;
372         yaz_log(m_log, "maskObserver 7");
373         m_socketObservable->maskObserver(this, mask);
374         yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
375                  q->m_len, cs_fileno(m_cs));
376         return r;
377     } 
378     yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
379     // whole packet sent... delete this and proceed to next ...
380     m_queue_out = q->m_next;
381     delete q;
382     // don't select on write if queue is empty ...
383     if (!m_queue_out)
384     {
385         m_state = Ready;
386         yaz_log(m_log, "maskObserver 8");
387         m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
388                                          SOCKET_OBSERVE_EXCEPT);
389     }
390     return r;
391 }
392
393 int PDU_Assoc::send_PDU(const char *buf, int len)
394 {
395     yaz_log (m_log, "PDU_Assoc::send_PDU");
396     PDU_Queue **pq = &m_queue_out;
397     int is_idle = (*pq ? 0 : 1);
398     
399     if (!m_cs)
400     {
401         yaz_log (m_log, "PDU_Assoc::send_PDU failed, m_cs == 0");
402         return -1;
403     }
404     while (*pq)
405         pq = &(*pq)->m_next;
406     *pq = new PDU_Queue(buf, len);
407     if (is_idle)
408         return flush_PDU ();
409     else
410         yaz_log (m_log, "PDU_Assoc::cannot send_PDU fd=%d",
411                  cs_fileno(m_cs));
412     return 0;
413 }
414
415 COMSTACK PDU_Assoc::comstack(const char *type_and_host, void **vp)
416 {
417     return cs_create_host(type_and_host, 2, vp);
418 }
419
420 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
421 {
422     shutdown();
423
424     m_PDU_Observer = observer;
425     void *ap;
426     m_cs = comstack(addr, &ap);
427
428     if (!m_cs)
429         return -1;
430     if (cs_bind(m_cs, ap, CS_SERVER) < 0)
431         return -2;
432
433     int fd = cs_fileno(m_cs);
434 #if HAVE_FCNTL_H
435     int oldflags = fcntl(fd, F_GETFD, 0);
436     if (oldflags >= 0)
437     {
438         oldflags |= FD_CLOEXEC;
439         fcntl(fd, F_SETFD, oldflags);
440     }
441 #endif
442     m_socketObservable->addObserver(fd, this);
443     yaz_log(m_log, "maskObserver 9");
444     m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
445                                      SOCKET_OBSERVE_EXCEPT);
446     yaz_log (m_log, "PDU_Assoc::listen ok fd=%d", fd);
447     m_state = Listen;
448     return 0;
449 }
450
451 void PDU_Assoc::idleTime(int idleTime)
452 {
453     m_idleTime = idleTime;
454     yaz_log (m_log, "PDU_Assoc::idleTime(%d)", idleTime);
455     m_socketObservable->timeoutObserver(this, m_idleTime);
456 }
457
458 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
459 {
460     yaz_log (m_log, "PDU_Assoc::connect %s", addr);
461     shutdown();
462     m_PDU_Observer = observer;
463     void *ap;
464     m_cs = comstack(addr, &ap);
465     if (!m_cs)
466         return -1;
467     int res = cs_connect (m_cs, ap);
468     yaz_log (m_log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs),
469              res);
470     m_socketObservable->addObserver(cs_fileno(m_cs), this);
471
472     if (res == 0)
473     {   // Connect complete
474         m_state = Connecting;
475         unsigned mask = SOCKET_OBSERVE_EXCEPT;
476         mask |= SOCKET_OBSERVE_WRITE;
477         mask |= SOCKET_OBSERVE_READ;
478         yaz_log(m_log, "maskObserver 11");
479         m_socketObservable->maskObserver(this, mask);
480     }
481     else if (res > 0)
482     {   // Connect pending
483         m_state = Connecting;
484         unsigned mask = SOCKET_OBSERVE_EXCEPT;
485         if (m_cs->io_pending & CS_WANT_WRITE)
486             mask |= SOCKET_OBSERVE_WRITE;
487         if (m_cs->io_pending & CS_WANT_READ)
488             mask |= SOCKET_OBSERVE_READ;
489         yaz_log(m_log, "maskObserver 11");
490         m_socketObservable->maskObserver(this, mask);
491     }
492     else
493     {   // Connect failed immediately
494         // Since m_state is Closed we can distinguish this case from
495         // normal connect in socketNotify handler
496         yaz_log(m_log, "maskObserver 12");
497         m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
498                                          SOCKET_OBSERVE_EXCEPT);
499     }
500     return 0;
501 }
502
503 // Single-threaded... Only useful for non-blocking handlers
504 void PDU_Assoc::childNotify(COMSTACK cs)
505 {
506     PDU_Assoc *new_observable =
507         new PDU_Assoc (m_socketObservable, cs);
508     
509     // Clone PDU Observer
510     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
511         (new_observable, cs_fileno(cs));
512
513     if (!new_observable->m_PDU_Observer)
514     {
515         new_observable->shutdown();
516         delete new_observable;
517         return;
518     }
519     new_observable->m_next = m_children;
520     m_children = new_observable;
521     new_observable->m_parent = this;
522 }
523
524 const char*PDU_Assoc::getpeername()
525 {
526     if (!m_cs)
527         return 0;
528     return cs_addrstr(m_cs);
529 }
530 /*
531  * Local variables:
532  * c-basic-offset: 4
533  * c-file-style: "Stroustrup"
534  * indent-tabs-mode: nil
535  * End:
536  * vim: shiftwidth=4 tabstop=8 expandtab
537  */
538