Release 1.6.4
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /* This file is part of the yazpp toolkit.
2  * Copyright (C) Index Data 
3  * See the file LICENSE for details.
4  */
5
6 #if HAVE_CONFIG_H
7 #include <config.h>
8 #endif
9 #include <assert.h>
10 #include <string.h>
11 #include <yaz/log.h>
12 #include <yaz/tcpip.h>
13
14 #include <yazpp/pdu-assoc.h>
15
16 #if HAVE_FCNTL_H
17 #include <fcntl.h>
18 #endif
19
20 using namespace yazpp_1;
21
22 namespace yazpp_1 {
23     class PDU_Assoc_priv {
24         friend class PDU_Assoc;
25     private:
26         enum {
27             Connecting,
28             Listen,
29             Ready,
30             Closed,
31             Writing,
32             Accepting
33         } state;
34         class PDU_Queue {
35         public:
36             PDU_Queue(const char *buf, int len);
37             ~PDU_Queue();
38             char *m_buf;
39             int m_len;
40             PDU_Queue *m_next;
41         };
42         PDU_Assoc *pdu_parent;
43         PDU_Assoc *pdu_children;
44         PDU_Assoc *pdu_next;
45         COMSTACK cs;
46         yazpp_1::ISocketObservable *m_socketObservable;
47         char *input_buf;
48         int input_len;
49         PDU_Queue *queue_out;
50         PDU_Queue *queue_in;
51         int *destroyed;
52         int idleTime;
53         int log;
54         void init(yazpp_1::ISocketObservable *socketObservable);
55         COMSTACK comstack(const char *type_and_host, void **vp);
56         bool m_session_is_dead;
57         char *cert_fname;
58     };
59 }
60
61 void PDU_Assoc_priv::init(ISocketObservable *socketObservable)
62 {
63     state = Closed;
64     cs = 0;
65     m_socketObservable = socketObservable;
66     queue_out = 0;
67     queue_in = 0;
68     input_buf = 0;
69     input_len = 0;
70     pdu_children = 0;
71     pdu_parent = 0;
72     pdu_next = 0;
73     destroyed = 0;
74     idleTime = 0;
75     log = YLOG_DEBUG;
76     m_session_is_dead = false;
77     cert_fname = 0;
78 }
79
80 PDU_Assoc::~PDU_Assoc()
81 {
82     xfree(m_p->cert_fname);
83     delete m_p;
84 }
85
86 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
87 {
88     m_PDU_Observer = 0;
89     m_p = new PDU_Assoc_priv;
90     m_p->init(socketObservable);
91 }
92
93 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
94                      COMSTACK cs)
95 {
96     m_PDU_Observer = 0;
97     m_p = new PDU_Assoc_priv;
98     m_p->init(socketObservable);
99     m_p->cs = cs;
100     unsigned mask = 0;
101     if (cs->io_pending & CS_WANT_WRITE)
102         mask |= SOCKET_OBSERVE_WRITE;
103     if (cs->io_pending & CS_WANT_READ)
104         mask |= SOCKET_OBSERVE_READ;
105     m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
106     if (!mask)
107     {
108         yaz_log(m_p->log, "new PDU_Assoc. Ready");
109         m_p->state = PDU_Assoc_priv::Ready;
110         flush_PDU();
111     }
112     else
113     {
114         yaz_log(m_p->log, "new PDU_Assoc. Accepting");
115         // assume comstack is accepting...
116         m_p->state = PDU_Assoc_priv::Accepting;
117         m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
118         yaz_log(m_p->log, "maskObserver 1");
119         m_p->m_socketObservable->maskObserver(this,
120                                               mask|SOCKET_OBSERVE_EXCEPT);
121     }
122 }
123
124
125 IPDU_Observable *PDU_Assoc::clone()
126 {
127     PDU_Assoc *copy = new PDU_Assoc(m_p->m_socketObservable);
128     return copy;
129 }
130
131 void PDU_Assoc::socketNotify(int event)
132 {
133     yaz_log(m_p->log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
134             this, m_p->state, event);
135     if (event & SOCKET_OBSERVE_EXCEPT)
136     {
137         shutdown();
138         m_PDU_Observer->failNotify();
139         return;
140     }
141     else if (event & SOCKET_OBSERVE_TIMEOUT)
142     {
143         m_PDU_Observer->timeoutNotify();
144         return;
145     }
146     switch (m_p->state)
147     {
148     case PDU_Assoc_priv::Accepting:
149         if (!cs_accept(m_p->cs))
150         {
151             yaz_log(m_p->log, "PDU_Assoc::cs_accept failed");
152             m_p->cs = 0;
153             shutdown();
154             m_PDU_Observer->failNotify();
155         }
156         else
157         {
158             unsigned mask = 0;
159             if (m_p->cs->io_pending & CS_WANT_WRITE)
160                 mask |= SOCKET_OBSERVE_WRITE;
161             if (m_p->cs->io_pending & CS_WANT_READ)
162                 mask |= SOCKET_OBSERVE_READ;
163             if (!mask)
164             {   // accept is complete. turn to ready state and write if needed
165                 m_p->state = PDU_Assoc_priv::Ready;
166                 flush_PDU();
167             }
168             else
169             {   // accept still incomplete.
170                 yaz_log(m_p->log, "maskObserver 2");
171                 m_p->m_socketObservable->maskObserver(this,
172                                              mask|SOCKET_OBSERVE_EXCEPT);
173             }
174         }
175         break;
176     case PDU_Assoc_priv::Connecting:
177         if (event & SOCKET_OBSERVE_READ &&
178             event & SOCKET_OBSERVE_WRITE)
179         {
180             // For Unix: if both read and write is set, then connect failed.
181             shutdown();
182             m_PDU_Observer->failNotify();
183         }
184         else
185         {
186             yaz_log(m_p->log, "cs_rcvconnect");
187             int res = cs_rcvconnect(m_p->cs);
188             if (res == 1)
189             {
190                 unsigned mask = SOCKET_OBSERVE_EXCEPT;
191                 if (m_p->cs->io_pending & CS_WANT_WRITE)
192                     mask |= SOCKET_OBSERVE_WRITE;
193                 if (m_p->cs->io_pending & CS_WANT_READ)
194                     mask |= SOCKET_OBSERVE_READ;
195                 yaz_log(m_p->log, "maskObserver 3");
196                 m_p->m_socketObservable->maskObserver(this, mask);
197             }
198             else
199             {
200                 m_p->state = PDU_Assoc_priv::Ready;
201                 if (m_PDU_Observer)
202                     m_PDU_Observer->connectNotify();
203                 flush_PDU();
204             }
205         }
206         break;
207     case PDU_Assoc_priv::Listen:
208         if (event & SOCKET_OBSERVE_READ)
209         {
210             int res;
211             COMSTACK new_line;
212
213             if ((res = cs_listen(m_p->cs, 0, 0)) == 1)
214                 return;
215             if (res < 0)
216             {
217                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
218                 return;
219             }
220             if (!(new_line = cs_accept(m_p->cs)))
221                 return;
222             /* 1. create socket-manager
223                2. create pdu-assoc
224                3. create top-level object
225                     setup observer for child fileid in pdu-assoc
226                4. start thread
227             */
228             yaz_log(m_p->log, "new session: parent fd=%d child fd=%d",
229                     cs_fileno(m_p->cs), cs_fileno(new_line));
230             childNotify(new_line);
231         }
232         break;
233     case PDU_Assoc_priv::Writing:
234         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
235             flush_PDU();
236         break;
237     case PDU_Assoc_priv::Ready:
238         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
239         {
240             do
241             {
242                 int res = cs_get(m_p->cs, &m_p->input_buf, &m_p->input_len);
243                 if (res == 1)
244                 {
245                     unsigned mask = SOCKET_OBSERVE_EXCEPT;
246                     if (m_p->cs->io_pending & CS_WANT_WRITE)
247                         mask |= SOCKET_OBSERVE_WRITE;
248                     if (m_p->cs->io_pending & CS_WANT_READ)
249                         mask |= SOCKET_OBSERVE_READ;
250                     yaz_log(m_p->log, "maskObserver 4");
251                     m_p->m_socketObservable->maskObserver(this, mask);
252                     return;
253                 }
254                 else if (res <= 0)
255                 {
256                     yaz_log(m_p->log, "PDU_Assoc::Connection closed by peer");
257                     shutdown();
258                     if (m_PDU_Observer)
259                         m_PDU_Observer->failNotify(); // problem here..
260                     return;
261                 }
262                 // lock it, so we know if recv_PDU deletes it.
263                 int destroyed = 0;
264                 m_p->destroyed = &destroyed;
265
266                 if (!m_PDU_Observer)
267                     return;
268 #if 0
269                 PDU_Assoc_priv::PDU_Queue **pq = &m_p->m_queue_in;
270                 while (*pq)
271                     pq = &(*pq)->m_next;
272
273                 *pq = new PDU_Assoc_priv::PDU_Queue(m_p->m_input_buf, res);
274 #else
275                 m_PDU_Observer->recv_PDU(m_p->input_buf, res);
276 #endif
277                 if (destroyed)   // it really was destroyed, return now.
278                     return;
279                 m_p->destroyed = 0;
280             } while (m_p->cs && cs_more(m_p->cs));
281             if (m_p->cs && m_p->state == PDU_Assoc_priv::Ready)
282             {
283                 yaz_log(m_p->log, "maskObserver 5");
284                 m_p->m_socketObservable->maskObserver(this,
285                                                       SOCKET_OBSERVE_EXCEPT|
286                                                       SOCKET_OBSERVE_READ);
287             }
288         }
289         break;
290     case PDU_Assoc_priv::Closed:
291         yaz_log(m_p->log, "CLOSING state=%d event was %d", m_p->state,
292                 event);
293         shutdown();
294         m_PDU_Observer->failNotify();
295         break;
296     default:
297         yaz_log(m_p->log, "Unknown state=%d event was %d", m_p->state, event);
298         shutdown();
299         m_PDU_Observer->failNotify();
300     }
301 }
302
303 void PDU_Assoc::close_session()
304 {
305     m_p->m_session_is_dead = true;
306     if (!m_p->queue_out)
307     {
308         shutdown();
309         m_PDU_Observer->failNotify();
310     }
311 }
312
313 void PDU_Assoc::shutdown()
314 {
315     PDU_Assoc *ch;
316     for (ch = m_p->pdu_children; ch; ch = ch->m_p->pdu_next)
317         ch->shutdown();
318
319     m_p->m_socketObservable->deleteObserver(this);
320     m_p->state = PDU_Assoc_priv::Closed;
321     if (m_p->cs)
322     {
323         yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
324         cs_close(m_p->cs);
325     }
326     m_p->cs = 0;
327     while (m_p->queue_out)
328     {
329         PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
330         m_p->queue_out = m_p->queue_out->m_next;
331         delete q_this;
332     }
333     xfree(m_p->input_buf);
334     m_p->input_buf = 0;
335     m_p->input_len = 0;
336 }
337
338 void PDU_Assoc::destroy()
339 {
340     shutdown();
341
342     if (m_p->destroyed)
343         *m_p->destroyed = 1;
344     PDU_Assoc **c;
345
346     // delete from parent's child list (if any)
347     if (m_p->pdu_parent)
348     {
349         c = &m_p->pdu_parent->m_p->pdu_children;
350         while (*c != this)
351         {
352             assert (*c);
353             c = &(*c)->m_p->pdu_next;
354         }
355         *c = (*c)->m_p->pdu_next;
356     }
357     // delete all children ...
358     c = &m_p->pdu_children;
359     while (*c)
360     {
361         PDU_Assoc *here = *c;
362         *c = (*c)->m_p->pdu_next;
363         here->m_p->pdu_parent = 0;
364         delete here;
365     }
366     yaz_log(m_p->log, "PDU_Assoc::destroy this=%p", this);
367 }
368
369 PDU_Assoc_priv::PDU_Queue::PDU_Queue(const char *buf, int len)
370 {
371     m_buf = (char *) xmalloc(len);
372     memcpy(m_buf, buf, len);
373     m_len = len;
374     m_next = 0;
375 }
376
377 PDU_Assoc_priv::PDU_Queue::~PDU_Queue()
378 {
379     xfree(m_buf);
380 }
381
382 int PDU_Assoc::flush_PDU()
383 {
384     int r;
385
386     if (m_p->state != PDU_Assoc_priv::Ready && m_p->state != PDU_Assoc_priv::Writing)
387     {
388         yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU, not ready");
389         return 1;
390     }
391     PDU_Assoc_priv::PDU_Queue *q = m_p->queue_out;
392     if (!q)
393     {
394         m_p->state = PDU_Assoc_priv::Ready;
395         yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU queue empty");
396         yaz_log(m_p->log, "maskObserver 6");
397         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
398                                               SOCKET_OBSERVE_WRITE|
399                                               SOCKET_OBSERVE_EXCEPT);
400         if (m_p->m_session_is_dead)
401         {
402             shutdown();
403             m_PDU_Observer->failNotify();
404         }
405         return 0;
406     }
407     r = cs_put(m_p->cs, q->m_buf, q->m_len);
408     if (r < 0)
409     {
410         yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put failed");
411         shutdown();
412         m_PDU_Observer->failNotify();
413         return r;
414     }
415     if (r == 1)
416     {
417         unsigned mask = SOCKET_OBSERVE_EXCEPT;
418         m_p->state = PDU_Assoc_priv::Writing;
419         if (m_p->cs->io_pending & CS_WANT_WRITE)
420             mask |= SOCKET_OBSERVE_WRITE;
421         if (m_p->cs->io_pending & CS_WANT_READ)
422             mask |= SOCKET_OBSERVE_READ;
423
424         mask |= SOCKET_OBSERVE_WRITE;
425         yaz_log(m_p->log, "maskObserver 7");
426         m_p->m_socketObservable->maskObserver(this, mask);
427         yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
428                 q->m_len, cs_fileno(m_p->cs));
429         return r;
430     }
431     yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
432     // whole packet sent... delete this and proceed to next ...
433     m_p->queue_out = q->m_next;
434     delete q;
435     // don't select on write if queue is empty ...
436     if (!m_p->queue_out)
437     {
438         m_p->state = PDU_Assoc_priv::Ready;
439         yaz_log(m_p->log, "maskObserver 8");
440         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
441                                               SOCKET_OBSERVE_EXCEPT);
442         if (m_p->m_session_is_dead)
443             shutdown();
444     }
445     return r;
446 }
447
448 int PDU_Assoc::send_PDU(const char *buf, int len)
449 {
450     yaz_log(m_p->log, "PDU_Assoc::send_PDU");
451     PDU_Assoc_priv::PDU_Queue **pq = &m_p->queue_out;
452     int is_idle = (*pq ? 0 : 1);
453
454     if (!m_p->cs)
455     {
456         yaz_log(m_p->log, "PDU_Assoc::send_PDU failed, cs == 0");
457         return -1;
458     }
459     while (*pq)
460         pq = &(*pq)->m_next;
461     *pq = new PDU_Assoc_priv::PDU_Queue(buf, len);
462     if (is_idle)
463         return flush_PDU();
464     else
465         yaz_log(m_p->log, "PDU_Assoc::cannot send_PDU fd=%d",
466                 cs_fileno(m_p->cs));
467     return 0;
468 }
469
470 COMSTACK PDU_Assoc_priv::comstack(const char *type_and_host, void **vp)
471 {
472     return cs_create_host(type_and_host, 2, vp);
473 }
474
475 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
476 {
477     if (*addr == '\0')
478     {
479         m_p->m_socketObservable->deleteObserver(this);
480         m_p->state = PDU_Assoc_priv::Closed;
481         if (m_p->cs)
482         {
483             yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
484             cs_close(m_p->cs);
485         }
486         m_p->cs = 0;
487         while (m_p->queue_out)
488         {
489             PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
490             m_p->queue_out = m_p->queue_out->m_next;
491             delete q_this;
492         }
493         xfree(m_p->input_buf);
494         m_p->input_buf = 0;
495         m_p->input_len = 0;
496
497         return 0;
498     }
499
500     shutdown();
501
502     m_PDU_Observer = observer;
503     void *ap;
504     m_p->cs = m_p->comstack(addr, &ap);
505
506     if (!m_p->cs)
507         return -1;
508
509     if (m_p->cert_fname)
510         cs_set_ssl_certificate_file(m_p->cs, m_p->cert_fname);
511
512     if (cs_bind(m_p->cs, ap, CS_SERVER) < 0)
513         return -2;
514
515     int fd = cs_fileno(m_p->cs);
516 #if HAVE_FCNTL_H
517     int oldflags = fcntl(fd, F_GETFD, 0);
518     if (oldflags >= 0)
519     {
520         oldflags |= FD_CLOEXEC;
521         fcntl(fd, F_SETFD, oldflags);
522     }
523 #endif
524     m_p->m_socketObservable->addObserver(fd, this);
525     yaz_log(m_p->log, "maskObserver 9");
526     m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
527                                           SOCKET_OBSERVE_EXCEPT);
528     yaz_log(m_p->log, "PDU_Assoc::listen ok fd=%d", fd);
529     m_p->state = PDU_Assoc_priv::Listen;
530     return 0;
531 }
532
533 COMSTACK PDU_Assoc::get_comstack()
534 {
535     return m_p->cs;
536 }
537
538 void PDU_Assoc::idleTime(int idleTime)
539 {
540     m_p->idleTime = idleTime;
541     yaz_log(m_p->log, "PDU_Assoc::idleTime(%d)", idleTime);
542     m_p->m_socketObservable->timeoutObserver(this, m_p->idleTime);
543 }
544
545 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
546 {
547     yaz_log(m_p->log, "PDU_Assoc::connect %s", addr);
548     shutdown();
549     m_PDU_Observer = observer;
550     void *ap;
551     m_p->cs = m_p->comstack(addr, &ap);
552     if (!m_p->cs)
553         return -1;
554     int res = cs_connect(m_p->cs, ap);
555     yaz_log(m_p->log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_p->cs),
556             res);
557     m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
558
559     if (res == 0)
560     {   // Connect complete
561         m_p->state = PDU_Assoc_priv::Connecting;
562         unsigned mask = SOCKET_OBSERVE_EXCEPT;
563         mask |= SOCKET_OBSERVE_WRITE;
564         mask |= SOCKET_OBSERVE_READ;
565         yaz_log(m_p->log, "maskObserver 11");
566         m_p->m_socketObservable->maskObserver(this, mask);
567     }
568     else if (res > 0)
569     {   // Connect pending
570         m_p->state = PDU_Assoc_priv::Connecting;
571         unsigned mask = SOCKET_OBSERVE_EXCEPT;
572         if (m_p->cs->io_pending & CS_WANT_WRITE)
573             mask |= SOCKET_OBSERVE_WRITE;
574         if (m_p->cs->io_pending & CS_WANT_READ)
575             mask |= SOCKET_OBSERVE_READ;
576         yaz_log(m_p->log, "maskObserver 11");
577         m_p->m_socketObservable->maskObserver(this, mask);
578     }
579     else
580     {   // Connect failed immediately
581         // Since m_state is Closed we can distinguish this case from
582         // normal connect in socketNotify handler
583         yaz_log(m_p->log, "maskObserver 12");
584         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
585                                               SOCKET_OBSERVE_EXCEPT);
586     }
587     return 0;
588 }
589
590 // Single-threaded... Only useful for non-blocking handlers
591 void PDU_Assoc::childNotify(COMSTACK cs)
592 {
593     PDU_Assoc *new_observable =
594         new PDU_Assoc(m_p->m_socketObservable, cs);
595
596     // Clone PDU Observer
597     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
598         (new_observable, cs_fileno(cs));
599
600     if (!new_observable->m_PDU_Observer)
601     {
602         new_observable->shutdown();
603         delete new_observable;
604         return;
605     }
606     new_observable->m_p->pdu_next = m_p->pdu_children;
607     m_p->pdu_children = new_observable;
608     new_observable->m_p->pdu_parent = this;
609 }
610
611 const char*PDU_Assoc::getpeername()
612 {
613     if (!m_p->cs)
614         return 0;
615     return cs_addrstr(m_p->cs);
616 }
617
618 void PDU_Assoc::set_cert_fname(const char *fname)
619 {
620     xfree(m_p->cert_fname);
621     m_p->cert_fname = 0;
622     if (fname)
623         m_p->cert_fname = xstrdup(fname);
624 }
625
626 /*
627  * Local variables:
628  * c-basic-offset: 4
629  * c-file-style: "Stroustrup"
630  * indent-tabs-mode: nil
631  * End:
632  * vim: shiftwidth=4 tabstop=8 expandtab
633  */
634