Bump year
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /* This file is part of the yazpp toolkit.
2  * Copyright (C) 1998-2013 Index Data and Mike Taylor
3  * See the file LICENSE for details.
4  */
5
6 #if HAVE_CONFIG_H
7 #include <config.h>
8 #endif
9 #include <assert.h>
10 #include <string.h>
11 #include <yaz/log.h>
12 #include <yaz/tcpip.h>
13
14 #include <yazpp/pdu-assoc.h>
15
16 #if HAVE_FCNTL_H
17 #include <fcntl.h>
18 #endif
19
20 using namespace yazpp_1;
21
22 namespace yazpp_1 {
23     class PDU_Assoc_priv {
24         friend class PDU_Assoc;
25     private:
26         enum {
27             Connecting,
28             Listen,
29             Ready,
30             Closed,
31             Writing,
32             Accepting
33         } state;
34         class PDU_Queue {
35         public:
36             PDU_Queue(const char *buf, int len);
37             ~PDU_Queue();
38             char *m_buf;
39             int m_len;
40             PDU_Queue *m_next;
41         };
42         PDU_Assoc *pdu_parent;
43         PDU_Assoc *pdu_children;
44         PDU_Assoc *pdu_next;
45         COMSTACK cs;
46         yazpp_1::ISocketObservable *m_socketObservable;
47         char *input_buf;
48         int input_len;
49         PDU_Queue *queue_out;
50         PDU_Queue *queue_in;
51         int *destroyed;
52         int idleTime;
53         int log;
54         void init(yazpp_1::ISocketObservable *socketObservable);
55         COMSTACK comstack(const char *type_and_host, void **vp);
56         bool m_session_is_dead;
57     };
58 }
59
60 void PDU_Assoc_priv::init(ISocketObservable *socketObservable)
61 {
62     state = Closed;
63     cs = 0;
64     m_socketObservable = socketObservable;
65     queue_out = 0;
66     queue_in = 0;
67     input_buf = 0;
68     input_len = 0;
69     pdu_children = 0;
70     pdu_parent = 0;
71     pdu_next = 0;
72     destroyed = 0;
73     idleTime = 0;
74     log = YLOG_DEBUG;
75     m_session_is_dead = false;
76 }
77
78 PDU_Assoc::~PDU_Assoc()
79 {
80     delete m_p;
81 }
82
83 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
84 {
85     m_PDU_Observer = 0;
86     m_p = new PDU_Assoc_priv;
87     m_p->init(socketObservable);
88 }
89
90 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
91                      COMSTACK cs)
92 {
93     m_PDU_Observer = 0;
94     m_p = new PDU_Assoc_priv;
95     m_p->init(socketObservable);
96     m_p->cs = cs;
97     unsigned mask = 0;
98     if (cs->io_pending & CS_WANT_WRITE)
99         mask |= SOCKET_OBSERVE_WRITE;
100     if (cs->io_pending & CS_WANT_READ)
101         mask |= SOCKET_OBSERVE_READ;
102     m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
103     if (!mask)
104     {
105         yaz_log(m_p->log, "new PDU_Assoc. Ready");
106         m_p->state = PDU_Assoc_priv::Ready;
107         flush_PDU();
108     }
109     else
110     {
111         yaz_log(m_p->log, "new PDU_Assoc. Accepting");
112         // assume comstack is accepting...
113         m_p->state = PDU_Assoc_priv::Accepting;
114         m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
115         yaz_log(m_p->log, "maskObserver 1");
116         m_p->m_socketObservable->maskObserver(this,
117                                               mask|SOCKET_OBSERVE_EXCEPT);
118     }
119 }
120
121
122 IPDU_Observable *PDU_Assoc::clone()
123 {
124     PDU_Assoc *copy = new PDU_Assoc(m_p->m_socketObservable);
125     return copy;
126 }
127
128 void PDU_Assoc::socketNotify(int event)
129 {
130     yaz_log(m_p->log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
131             this, m_p->state, event);
132     if (event & SOCKET_OBSERVE_EXCEPT)
133     {
134         shutdown();
135         m_PDU_Observer->failNotify();
136         return;
137     }
138     else if (event & SOCKET_OBSERVE_TIMEOUT)
139     {
140         m_PDU_Observer->timeoutNotify();
141         return;
142     }
143     switch (m_p->state)
144     {
145     case PDU_Assoc_priv::Accepting:
146         if (!cs_accept(m_p->cs))
147         {
148             yaz_log(m_p->log, "PDU_Assoc::cs_accept failed");
149             m_p->cs = 0;
150             shutdown();
151             m_PDU_Observer->failNotify();
152         }
153         else
154         {
155             unsigned mask = 0;
156             if (m_p->cs->io_pending & CS_WANT_WRITE)
157                 mask |= SOCKET_OBSERVE_WRITE;
158             if (m_p->cs->io_pending & CS_WANT_READ)
159                 mask |= SOCKET_OBSERVE_READ;
160             if (!mask)
161             {   // accept is complete. turn to ready state and write if needed
162                 m_p->state = PDU_Assoc_priv::Ready;
163                 flush_PDU();
164             }
165             else
166             {   // accept still incomplete.
167                 yaz_log(m_p->log, "maskObserver 2");
168                 m_p->m_socketObservable->maskObserver(this,
169                                              mask|SOCKET_OBSERVE_EXCEPT);
170             }
171         }
172         break;
173     case PDU_Assoc_priv::Connecting:
174         if (event & SOCKET_OBSERVE_READ &&
175             event & SOCKET_OBSERVE_WRITE)
176         {
177             // For Unix: if both read and write is set, then connect failed.
178             shutdown();
179             m_PDU_Observer->failNotify();
180         }
181         else
182         {
183             yaz_log(m_p->log, "cs_rcvconnect");
184             int res = cs_rcvconnect(m_p->cs);
185             if (res == 1)
186             {
187                 unsigned mask = SOCKET_OBSERVE_EXCEPT;
188                 if (m_p->cs->io_pending & CS_WANT_WRITE)
189                     mask |= SOCKET_OBSERVE_WRITE;
190                 if (m_p->cs->io_pending & CS_WANT_READ)
191                     mask |= SOCKET_OBSERVE_READ;
192                 yaz_log(m_p->log, "maskObserver 3");
193                 m_p->m_socketObservable->maskObserver(this, mask);
194             }
195             else
196             {
197                 m_p->state = PDU_Assoc_priv::Ready;
198                 if (m_PDU_Observer)
199                     m_PDU_Observer->connectNotify();
200                 flush_PDU();
201             }
202         }
203         break;
204     case PDU_Assoc_priv::Listen:
205         if (event & SOCKET_OBSERVE_READ)
206         {
207             int res;
208             COMSTACK new_line;
209
210             if ((res = cs_listen(m_p->cs, 0, 0)) == 1)
211                 return;
212             if (res < 0)
213             {
214                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
215                 return;
216             }
217             if (!(new_line = cs_accept(m_p->cs)))
218                 return;
219             /* 1. create socket-manager
220                2. create pdu-assoc
221                3. create top-level object
222                     setup observer for child fileid in pdu-assoc
223                4. start thread
224             */
225             yaz_log(m_p->log, "new session: parent fd=%d child fd=%d",
226                     cs_fileno(m_p->cs), cs_fileno(new_line));
227             childNotify(new_line);
228         }
229         break;
230     case PDU_Assoc_priv::Writing:
231         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
232             flush_PDU();
233         break;
234     case PDU_Assoc_priv::Ready:
235         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
236         {
237             do
238             {
239                 int res = cs_get(m_p->cs, &m_p->input_buf, &m_p->input_len);
240                 if (res == 1)
241                 {
242                     unsigned mask = SOCKET_OBSERVE_EXCEPT;
243                     if (m_p->cs->io_pending & CS_WANT_WRITE)
244                         mask |= SOCKET_OBSERVE_WRITE;
245                     if (m_p->cs->io_pending & CS_WANT_READ)
246                         mask |= SOCKET_OBSERVE_READ;
247                     yaz_log(m_p->log, "maskObserver 4");
248                     m_p->m_socketObservable->maskObserver(this, mask);
249                     return;
250                 }
251                 else if (res <= 0)
252                 {
253                     yaz_log(m_p->log, "PDU_Assoc::Connection closed by peer");
254                     shutdown();
255                     if (m_PDU_Observer)
256                         m_PDU_Observer->failNotify(); // problem here..
257                     return;
258                 }
259                 // lock it, so we know if recv_PDU deletes it.
260                 int destroyed = 0;
261                 m_p->destroyed = &destroyed;
262
263                 if (!m_PDU_Observer)
264                     return;
265 #if 0
266                 PDU_Assoc_priv::PDU_Queue **pq = &m_p->m_queue_in;
267                 while (*pq)
268                     pq = &(*pq)->m_next;
269
270                 *pq = new PDU_Assoc_priv::PDU_Queue(m_p->m_input_buf, res);
271 #else
272                 m_PDU_Observer->recv_PDU(m_p->input_buf, res);
273 #endif
274                 if (destroyed)   // it really was destroyed, return now.
275                     return;
276                 m_p->destroyed = 0;
277             } while (m_p->cs && cs_more(m_p->cs));
278             if (m_p->cs && m_p->state == PDU_Assoc_priv::Ready)
279             {
280                 yaz_log(m_p->log, "maskObserver 5");
281                 m_p->m_socketObservable->maskObserver(this,
282                                                       SOCKET_OBSERVE_EXCEPT|
283                                                       SOCKET_OBSERVE_READ);
284             }
285         }
286         break;
287     case PDU_Assoc_priv::Closed:
288         yaz_log(m_p->log, "CLOSING state=%d event was %d", m_p->state,
289                 event);
290         shutdown();
291         m_PDU_Observer->failNotify();
292         break;
293     default:
294         yaz_log(m_p->log, "Unknown state=%d event was %d", m_p->state, event);
295         shutdown();
296         m_PDU_Observer->failNotify();
297     }
298 }
299
300 void PDU_Assoc::close_session()
301 {
302     m_p->m_session_is_dead = true;
303     if (!m_p->queue_out)
304     {
305         shutdown();
306         m_PDU_Observer->failNotify();
307     }
308 }
309
310 void PDU_Assoc::shutdown()
311 {
312     PDU_Assoc *ch;
313     for (ch = m_p->pdu_children; ch; ch = ch->m_p->pdu_next)
314         ch->shutdown();
315
316     m_p->m_socketObservable->deleteObserver(this);
317     m_p->state = PDU_Assoc_priv::Closed;
318     if (m_p->cs)
319     {
320         yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
321         cs_close(m_p->cs);
322     }
323     m_p->cs = 0;
324     while (m_p->queue_out)
325     {
326         PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
327         m_p->queue_out = m_p->queue_out->m_next;
328         delete q_this;
329     }
330     xfree(m_p->input_buf);
331     m_p->input_buf = 0;
332     m_p->input_len = 0;
333 }
334
335 void PDU_Assoc::destroy()
336 {
337     shutdown();
338
339     if (m_p->destroyed)
340         *m_p->destroyed = 1;
341     PDU_Assoc **c;
342
343     // delete from parent's child list (if any)
344     if (m_p->pdu_parent)
345     {
346         c = &m_p->pdu_parent->m_p->pdu_children;
347         while (*c != this)
348         {
349             assert (*c);
350             c = &(*c)->m_p->pdu_next;
351         }
352         *c = (*c)->m_p->pdu_next;
353     }
354     // delete all children ...
355     c = &m_p->pdu_children;
356     while (*c)
357     {
358         PDU_Assoc *here = *c;
359         *c = (*c)->m_p->pdu_next;
360         here->m_p->pdu_parent = 0;
361         delete here;
362     }
363     yaz_log(m_p->log, "PDU_Assoc::destroy this=%p", this);
364 }
365
366 PDU_Assoc_priv::PDU_Queue::PDU_Queue(const char *buf, int len)
367 {
368     m_buf = (char *) xmalloc(len);
369     memcpy(m_buf, buf, len);
370     m_len = len;
371     m_next = 0;
372 }
373
374 PDU_Assoc_priv::PDU_Queue::~PDU_Queue()
375 {
376     xfree(m_buf);
377 }
378
379 int PDU_Assoc::flush_PDU()
380 {
381     int r;
382
383     if (m_p->state != PDU_Assoc_priv::Ready && m_p->state != PDU_Assoc_priv::Writing)
384     {
385         yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU, not ready");
386         return 1;
387     }
388     PDU_Assoc_priv::PDU_Queue *q = m_p->queue_out;
389     if (!q)
390     {
391         m_p->state = PDU_Assoc_priv::Ready;
392         yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU queue empty");
393         yaz_log(m_p->log, "maskObserver 6");
394         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
395                                               SOCKET_OBSERVE_WRITE|
396                                               SOCKET_OBSERVE_EXCEPT);
397         if (m_p->m_session_is_dead)
398         {
399             shutdown();
400             m_PDU_Observer->failNotify();
401         }
402         return 0;
403     }
404     r = cs_put(m_p->cs, q->m_buf, q->m_len);
405     if (r < 0)
406     {
407         yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put failed");
408         shutdown();
409         m_PDU_Observer->failNotify();
410         return r;
411     }
412     if (r == 1)
413     {
414         unsigned mask = SOCKET_OBSERVE_EXCEPT;
415         m_p->state = PDU_Assoc_priv::Writing;
416         if (m_p->cs->io_pending & CS_WANT_WRITE)
417             mask |= SOCKET_OBSERVE_WRITE;
418         if (m_p->cs->io_pending & CS_WANT_READ)
419             mask |= SOCKET_OBSERVE_READ;
420
421         mask |= SOCKET_OBSERVE_WRITE;
422         yaz_log(m_p->log, "maskObserver 7");
423         m_p->m_socketObservable->maskObserver(this, mask);
424         yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
425                 q->m_len, cs_fileno(m_p->cs));
426         return r;
427     }
428     yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
429     // whole packet sent... delete this and proceed to next ...
430     m_p->queue_out = q->m_next;
431     delete q;
432     // don't select on write if queue is empty ...
433     if (!m_p->queue_out)
434     {
435         m_p->state = PDU_Assoc_priv::Ready;
436         yaz_log(m_p->log, "maskObserver 8");
437         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
438                                               SOCKET_OBSERVE_EXCEPT);
439         if (m_p->m_session_is_dead)
440             shutdown();
441     }
442     return r;
443 }
444
445 int PDU_Assoc::send_PDU(const char *buf, int len)
446 {
447     yaz_log(m_p->log, "PDU_Assoc::send_PDU");
448     PDU_Assoc_priv::PDU_Queue **pq = &m_p->queue_out;
449     int is_idle = (*pq ? 0 : 1);
450
451     if (!m_p->cs)
452     {
453         yaz_log(m_p->log, "PDU_Assoc::send_PDU failed, cs == 0");
454         return -1;
455     }
456     while (*pq)
457         pq = &(*pq)->m_next;
458     *pq = new PDU_Assoc_priv::PDU_Queue(buf, len);
459     if (is_idle)
460         return flush_PDU();
461     else
462         yaz_log(m_p->log, "PDU_Assoc::cannot send_PDU fd=%d",
463                 cs_fileno(m_p->cs));
464     return 0;
465 }
466
467 COMSTACK PDU_Assoc_priv::comstack(const char *type_and_host, void **vp)
468 {
469     return cs_create_host(type_and_host, 2, vp);
470 }
471
472 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
473 {
474     if (*addr == '\0')
475     {
476         m_p->m_socketObservable->deleteObserver(this);
477         m_p->state = PDU_Assoc_priv::Closed;
478         if (m_p->cs)
479         {
480             yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
481             cs_close(m_p->cs);
482         }
483         m_p->cs = 0;
484         while (m_p->queue_out)
485         {
486             PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
487             m_p->queue_out = m_p->queue_out->m_next;
488             delete q_this;
489         }
490         xfree(m_p->input_buf);
491         m_p->input_buf = 0;
492         m_p->input_len = 0;
493
494         return 0;
495     }
496
497     shutdown();
498
499     m_PDU_Observer = observer;
500     void *ap;
501     m_p->cs = m_p->comstack(addr, &ap);
502
503     if (!m_p->cs)
504         return -1;
505     if (cs_bind(m_p->cs, ap, CS_SERVER) < 0)
506         return -2;
507
508     int fd = cs_fileno(m_p->cs);
509 #if HAVE_FCNTL_H
510     int oldflags = fcntl(fd, F_GETFD, 0);
511     if (oldflags >= 0)
512     {
513         oldflags |= FD_CLOEXEC;
514         fcntl(fd, F_SETFD, oldflags);
515     }
516 #endif
517     m_p->m_socketObservable->addObserver(fd, this);
518     yaz_log(m_p->log, "maskObserver 9");
519     m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
520                                           SOCKET_OBSERVE_EXCEPT);
521     yaz_log(m_p->log, "PDU_Assoc::listen ok fd=%d", fd);
522     m_p->state = PDU_Assoc_priv::Listen;
523     return 0;
524 }
525
526 COMSTACK PDU_Assoc::get_comstack()
527 {
528     return m_p->cs;
529 }
530
531 void PDU_Assoc::idleTime(int idleTime)
532 {
533     m_p->idleTime = idleTime;
534     yaz_log(m_p->log, "PDU_Assoc::idleTime(%d)", idleTime);
535     m_p->m_socketObservable->timeoutObserver(this, m_p->idleTime);
536 }
537
538 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
539 {
540     yaz_log(m_p->log, "PDU_Assoc::connect %s", addr);
541     shutdown();
542     m_PDU_Observer = observer;
543     void *ap;
544     m_p->cs = m_p->comstack(addr, &ap);
545     if (!m_p->cs)
546         return -1;
547     int res = cs_connect(m_p->cs, ap);
548     yaz_log(m_p->log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_p->cs),
549             res);
550     m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
551
552     if (res == 0)
553     {   // Connect complete
554         m_p->state = PDU_Assoc_priv::Connecting;
555         unsigned mask = SOCKET_OBSERVE_EXCEPT;
556         mask |= SOCKET_OBSERVE_WRITE;
557         mask |= SOCKET_OBSERVE_READ;
558         yaz_log(m_p->log, "maskObserver 11");
559         m_p->m_socketObservable->maskObserver(this, mask);
560     }
561     else if (res > 0)
562     {   // Connect pending
563         m_p->state = PDU_Assoc_priv::Connecting;
564         unsigned mask = SOCKET_OBSERVE_EXCEPT;
565         if (m_p->cs->io_pending & CS_WANT_WRITE)
566             mask |= SOCKET_OBSERVE_WRITE;
567         if (m_p->cs->io_pending & CS_WANT_READ)
568             mask |= SOCKET_OBSERVE_READ;
569         yaz_log(m_p->log, "maskObserver 11");
570         m_p->m_socketObservable->maskObserver(this, mask);
571     }
572     else
573     {   // Connect failed immediately
574         // Since m_state is Closed we can distinguish this case from
575         // normal connect in socketNotify handler
576         yaz_log(m_p->log, "maskObserver 12");
577         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
578                                               SOCKET_OBSERVE_EXCEPT);
579     }
580     return 0;
581 }
582
583 // Single-threaded... Only useful for non-blocking handlers
584 void PDU_Assoc::childNotify(COMSTACK cs)
585 {
586     PDU_Assoc *new_observable =
587         new PDU_Assoc(m_p->m_socketObservable, cs);
588
589     // Clone PDU Observer
590     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
591         (new_observable, cs_fileno(cs));
592
593     if (!new_observable->m_PDU_Observer)
594     {
595         new_observable->shutdown();
596         delete new_observable;
597         return;
598     }
599     new_observable->m_p->pdu_next = m_p->pdu_children;
600     m_p->pdu_children = new_observable;
601     new_observable->m_p->pdu_parent = this;
602 }
603
604 const char*PDU_Assoc::getpeername()
605 {
606     if (!m_p->cs)
607         return 0;
608     return cs_addrstr(m_p->cs);
609 }
610 /*
611  * Local variables:
612  * c-basic-offset: 4
613  * c-file-style: "Stroustrup"
614  * indent-tabs-mode: nil
615  * End:
616  * vim: shiftwidth=4 tabstop=8 expandtab
617  */
618