369dfcf4289837e47ccd20693d12ac29932da706
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /* This file is part of the yazpp toolkit.
2  * Copyright (C) Index Data 
3  * See the file LICENSE for details.
4  */
5
6 #if HAVE_CONFIG_H
7 #include <config.h>
8 #endif
9 #include <assert.h>
10 #include <string.h>
11 #include <yaz/log.h>
12 #include <yaz/tcpip.h>
13
14 #include <yazpp/pdu-assoc.h>
15
16 #if HAVE_FCNTL_H
17 #include <fcntl.h>
18 #endif
19
20 using namespace yazpp_1;
21
22 namespace yazpp_1 {
23     class PDU_Assoc_priv {
24         friend class PDU_Assoc;
25     private:
26         enum {
27             Connecting,
28             Listen,
29             Ready,
30             Closed,
31             Writing,
32             Accepting
33         } state;
34         class PDU_Queue {
35         public:
36             PDU_Queue(const char *buf, int len);
37             ~PDU_Queue();
38             char *m_buf;
39             int m_len;
40             PDU_Queue *m_next;
41         };
42         PDU_Assoc *pdu_parent;
43         PDU_Assoc *pdu_children;
44         PDU_Assoc *pdu_next;
45         COMSTACK cs;
46         yazpp_1::ISocketObservable *m_socketObservable;
47         char *input_buf;
48         int input_len;
49         PDU_Queue *queue_out;
50         PDU_Queue *queue_in;
51         int *destroyed;
52         int idleTime;
53         int log;
54         void init(yazpp_1::ISocketObservable *socketObservable);
55         COMSTACK comstack(const char *type_and_host, void **vp);
56         bool m_session_is_dead;
57         char *cert_fname;
58     };
59 }
60
61 void PDU_Assoc_priv::init(ISocketObservable *socketObservable)
62 {
63     state = Closed;
64     cs = 0;
65     m_socketObservable = socketObservable;
66     queue_out = 0;
67     queue_in = 0;
68     input_buf = 0;
69     input_len = 0;
70     pdu_children = 0;
71     pdu_parent = 0;
72     pdu_next = 0;
73     destroyed = 0;
74     idleTime = 0;
75     log = YLOG_DEBUG;
76     m_session_is_dead = false;
77     cert_fname = 0;
78 }
79
80 PDU_Assoc::~PDU_Assoc()
81 {
82     xfree(m_p->cert_fname);
83     delete m_p;
84 }
85
86 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
87 {
88     m_PDU_Observer = 0;
89     m_p = new PDU_Assoc_priv;
90     m_p->init(socketObservable);
91 }
92
93 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
94                      COMSTACK cs)
95 {
96     m_PDU_Observer = 0;
97     m_p = new PDU_Assoc_priv;
98     m_p->init(socketObservable);
99     m_p->cs = cs;
100     unsigned mask = 0;
101     if (cs->io_pending & CS_WANT_WRITE)
102         mask |= SOCKET_OBSERVE_WRITE;
103     if (cs->io_pending & CS_WANT_READ)
104         mask |= SOCKET_OBSERVE_READ;
105     m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
106     if (!mask)
107     {
108         yaz_log(m_p->log, "new PDU_Assoc. Ready");
109         m_p->state = PDU_Assoc_priv::Ready;
110         flush_PDU();
111     }
112     else
113     {
114         yaz_log(m_p->log, "new PDU_Assoc. Accepting");
115         // assume comstack is accepting...
116         m_p->state = PDU_Assoc_priv::Accepting;
117         m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
118         yaz_log(m_p->log, "maskObserver 1");
119         m_p->m_socketObservable->maskObserver(this,
120                                               mask|SOCKET_OBSERVE_EXCEPT);
121     }
122 }
123
124
125 IPDU_Observable *PDU_Assoc::clone()
126 {
127     PDU_Assoc *copy = new PDU_Assoc(m_p->m_socketObservable);
128     return copy;
129 }
130
131 void PDU_Assoc::socketNotify(int event)
132 {
133     yaz_log(m_p->log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
134             this, m_p->state, event);
135     if ((event & SOCKET_OBSERVE_EXCEPT) &&
136         m_p->state != PDU_Assoc_priv::Connecting)
137     {
138         yaz_log(m_p->log, "PDU_Assoc::socketNotify except");
139         shutdown();
140         m_PDU_Observer->failNotify();
141         return;
142     }
143     else if (event & SOCKET_OBSERVE_TIMEOUT)
144     {
145         yaz_log(m_p->log, "PDU_Assoc::socketNotify timeout");
146         m_PDU_Observer->timeoutNotify();
147         return;
148     }
149     int res;
150     switch (m_p->state)
151     {
152     case PDU_Assoc_priv::Accepting:
153         if (!cs_accept(m_p->cs))
154         {
155             yaz_log(m_p->log, "PDU_Assoc::cs_accept failed");
156             m_p->cs = 0;
157             shutdown();
158             m_PDU_Observer->failNotify();
159         }
160         else
161         {
162             unsigned mask = 0;
163             if (m_p->cs->io_pending & CS_WANT_WRITE)
164                 mask |= SOCKET_OBSERVE_WRITE;
165             if (m_p->cs->io_pending & CS_WANT_READ)
166                 mask |= SOCKET_OBSERVE_READ;
167             if (!mask)
168             {   // accept is complete. turn to ready state and write if needed
169                 m_p->state = PDU_Assoc_priv::Ready;
170                 flush_PDU();
171             }
172             else
173             {   // accept still incomplete.
174                 yaz_log(m_p->log, "maskObserver 2");
175                 m_p->m_socketObservable->maskObserver(this,
176                                              mask|SOCKET_OBSERVE_EXCEPT);
177             }
178         }
179         break;
180     case PDU_Assoc_priv::Connecting:
181         yaz_log(m_p->log, "PDU_Assoc::socketNotify Connecting");
182         res = cs_rcvconnect(m_p->cs);
183         if (res == 1)
184         {
185             unsigned mask = SOCKET_OBSERVE_EXCEPT;
186             if (m_p->cs->io_pending & CS_WANT_WRITE)
187                 mask |= SOCKET_OBSERVE_WRITE;
188             if (m_p->cs->io_pending & CS_WANT_READ)
189                 mask |= SOCKET_OBSERVE_READ;
190             yaz_log(m_p->log, "maskObserver 3");
191             m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
192             m_p->m_socketObservable->maskObserver(this, mask);
193         }
194         else
195         {
196             m_p->state = PDU_Assoc_priv::Ready;
197             if (m_PDU_Observer)
198                 m_PDU_Observer->connectNotify();
199             flush_PDU();
200         }
201         break;
202     case PDU_Assoc_priv::Listen:
203         if (event & SOCKET_OBSERVE_READ)
204         {
205             COMSTACK new_line;
206
207             if ((res = cs_listen(m_p->cs, 0, 0)) == 1)
208                 return;
209             if (res < 0)
210             {
211                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
212                 return;
213             }
214             if (!(new_line = cs_accept(m_p->cs)))
215                 return;
216             /* 1. create socket-manager
217                2. create pdu-assoc
218                3. create top-level object
219                     setup observer for child fileid in pdu-assoc
220                4. start thread
221             */
222             yaz_log(m_p->log, "new session: parent fd=%d child fd=%d",
223                     cs_fileno(m_p->cs), cs_fileno(new_line));
224             childNotify(new_line);
225         }
226         break;
227     case PDU_Assoc_priv::Writing:
228         yaz_log(m_p->log, "PDU_Assoc::socketNotify writing");
229         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
230             flush_PDU();
231         break;
232     case PDU_Assoc_priv::Ready:
233         yaz_log(m_p->log, "PDU_Assoc::socketNotify ready");
234         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
235         {
236             do
237             {
238                 res = cs_get(m_p->cs, &m_p->input_buf, &m_p->input_len);
239                 if (res == 1)
240                 {
241                     unsigned mask = SOCKET_OBSERVE_EXCEPT;
242                     if (m_p->cs->io_pending & CS_WANT_WRITE)
243                         mask |= SOCKET_OBSERVE_WRITE;
244                     if (m_p->cs->io_pending & CS_WANT_READ)
245                         mask |= SOCKET_OBSERVE_READ;
246                     yaz_log(m_p->log, "maskObserver 4");
247                     m_p->m_socketObservable->maskObserver(this, mask);
248                     return;
249                 }
250                 else if (res <= 0)
251                 {
252                     yaz_log(m_p->log, "PDU_Assoc::Connection closed by peer");
253                     shutdown();
254                     if (m_PDU_Observer)
255                         m_PDU_Observer->failNotify(); // problem here..
256                     return;
257                 }
258                 // lock it, so we know if recv_PDU deletes it.
259                 int destroyed = 0;
260                 m_p->destroyed = &destroyed;
261
262                 if (!m_PDU_Observer)
263                     return;
264 #if 0
265                 PDU_Assoc_priv::PDU_Queue **pq = &m_p->m_queue_in;
266                 while (*pq)
267                     pq = &(*pq)->m_next;
268
269                 *pq = new PDU_Assoc_priv::PDU_Queue(m_p->m_input_buf, res);
270 #else
271                 m_PDU_Observer->recv_PDU(m_p->input_buf, res);
272 #endif
273                 if (destroyed)   // it really was destroyed, return now.
274                     return;
275                 m_p->destroyed = 0;
276             } while (m_p->cs && cs_more(m_p->cs));
277             if (m_p->cs && m_p->state == PDU_Assoc_priv::Ready)
278             {
279                 yaz_log(m_p->log, "maskObserver 5");
280                 m_p->m_socketObservable->maskObserver(this,
281                                                       SOCKET_OBSERVE_EXCEPT|
282                                                       SOCKET_OBSERVE_READ);
283             }
284         }
285         break;
286     case PDU_Assoc_priv::Closed:
287         yaz_log(m_p->log, "CLOSING state=%d event was %d", m_p->state,
288                 event);
289         shutdown();
290         m_PDU_Observer->failNotify();
291         break;
292     default:
293         yaz_log(m_p->log, "Unknown state=%d event was %d", m_p->state, event);
294         shutdown();
295         m_PDU_Observer->failNotify();
296     }
297 }
298
299 void PDU_Assoc::close_session()
300 {
301     m_p->m_session_is_dead = true;
302     if (!m_p->queue_out)
303     {
304         shutdown();
305         m_PDU_Observer->failNotify();
306     }
307 }
308
309 void PDU_Assoc::shutdown()
310 {
311     PDU_Assoc *ch;
312     for (ch = m_p->pdu_children; ch; ch = ch->m_p->pdu_next)
313         ch->shutdown();
314
315     m_p->m_socketObservable->deleteObserver(this);
316     m_p->state = PDU_Assoc_priv::Closed;
317     if (m_p->cs)
318     {
319         yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
320         cs_close(m_p->cs);
321     }
322     m_p->cs = 0;
323     while (m_p->queue_out)
324     {
325         PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
326         m_p->queue_out = m_p->queue_out->m_next;
327         delete q_this;
328     }
329     xfree(m_p->input_buf);
330     m_p->input_buf = 0;
331     m_p->input_len = 0;
332 }
333
334 void PDU_Assoc::destroy()
335 {
336     shutdown();
337
338     if (m_p->destroyed)
339         *m_p->destroyed = 1;
340     PDU_Assoc **c;
341
342     // delete from parent's child list (if any)
343     if (m_p->pdu_parent)
344     {
345         c = &m_p->pdu_parent->m_p->pdu_children;
346         while (*c != this)
347         {
348             assert (*c);
349             c = &(*c)->m_p->pdu_next;
350         }
351         *c = (*c)->m_p->pdu_next;
352     }
353     // delete all children ...
354     c = &m_p->pdu_children;
355     while (*c)
356     {
357         PDU_Assoc *here = *c;
358         *c = (*c)->m_p->pdu_next;
359         here->m_p->pdu_parent = 0;
360         delete here;
361     }
362     yaz_log(m_p->log, "PDU_Assoc::destroy this=%p", this);
363 }
364
365 PDU_Assoc_priv::PDU_Queue::PDU_Queue(const char *buf, int len)
366 {
367     m_buf = (char *) xmalloc(len);
368     memcpy(m_buf, buf, len);
369     m_len = len;
370     m_next = 0;
371 }
372
373 PDU_Assoc_priv::PDU_Queue::~PDU_Queue()
374 {
375     xfree(m_buf);
376 }
377
378 int PDU_Assoc::flush_PDU()
379 {
380     int r;
381
382     yaz_log(m_p->log, "PDU_Assoc::flush_PDU");
383     if (m_p->state != PDU_Assoc_priv::Ready && m_p->state != PDU_Assoc_priv::Writing)
384     {
385         yaz_log(m_p->log, "PDU_Assoc::flush_PDU, not ready");
386         return 1;
387     }
388     PDU_Assoc_priv::PDU_Queue *q = m_p->queue_out;
389     if (!q)
390     {
391         m_p->state = PDU_Assoc_priv::Ready;
392         yaz_log(m_p->log, "PDU_Assoc::flush_PDU queue empty");
393         yaz_log(m_p->log, "maskObserver 6");
394         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
395                                               SOCKET_OBSERVE_WRITE|
396                                               SOCKET_OBSERVE_EXCEPT);
397         if (m_p->m_session_is_dead)
398         {
399             shutdown();
400             m_PDU_Observer->failNotify();
401         }
402         return 0;
403     }
404     r = cs_put(m_p->cs, q->m_buf, q->m_len);
405     if (r < 0)
406     {
407         yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put failed");
408         shutdown();
409         m_PDU_Observer->failNotify();
410         return r;
411     }
412     m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
413     if (r == 1)
414     {
415         unsigned mask = SOCKET_OBSERVE_EXCEPT;
416         m_p->state = PDU_Assoc_priv::Writing;
417         if (m_p->cs->io_pending & CS_WANT_WRITE)
418             mask |= SOCKET_OBSERVE_WRITE;
419         if (m_p->cs->io_pending & CS_WANT_READ)
420             mask |= SOCKET_OBSERVE_READ;
421
422         mask |= SOCKET_OBSERVE_WRITE;
423         yaz_log(m_p->log, "maskObserver 7");
424         m_p->m_socketObservable->maskObserver(this, mask);
425         yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
426                 q->m_len, cs_fileno(m_p->cs));
427         return r;
428     }
429     yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
430     // whole packet sent... delete this and proceed to next ...
431     m_p->queue_out = q->m_next;
432     delete q;
433     // don't select on write if queue is empty ...
434     if (!m_p->queue_out)
435     {
436         m_p->state = PDU_Assoc_priv::Ready;
437         yaz_log(m_p->log, "maskObserver 8");
438         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
439                                               SOCKET_OBSERVE_EXCEPT);
440         if (m_p->m_session_is_dead)
441             shutdown();
442     }
443     return r;
444 }
445
446 int PDU_Assoc::send_PDU(const char *buf, int len)
447 {
448     yaz_log(m_p->log, "PDU_Assoc::send_PDU");
449     PDU_Assoc_priv::PDU_Queue **pq = &m_p->queue_out;
450     int is_idle = (*pq ? 0 : 1);
451
452     if (!m_p->cs)
453     {
454         yaz_log(m_p->log, "PDU_Assoc::send_PDU failed, cs == 0");
455         return -1;
456     }
457     while (*pq)
458         pq = &(*pq)->m_next;
459     *pq = new PDU_Assoc_priv::PDU_Queue(buf, len);
460     if (is_idle)
461         return flush_PDU();
462     else
463         yaz_log(m_p->log, "PDU_Assoc::cannot send_PDU fd=%d",
464                 cs_fileno(m_p->cs));
465     return 0;
466 }
467
468 COMSTACK PDU_Assoc_priv::comstack(const char *type_and_host, void **vp)
469 {
470     return cs_create_host(type_and_host, 2, vp);
471 }
472
473 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
474 {
475     if (*addr == '\0')
476     {
477         m_p->m_socketObservable->deleteObserver(this);
478         m_p->state = PDU_Assoc_priv::Closed;
479         if (m_p->cs)
480         {
481             yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
482             cs_close(m_p->cs);
483         }
484         m_p->cs = 0;
485         while (m_p->queue_out)
486         {
487             PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
488             m_p->queue_out = m_p->queue_out->m_next;
489             delete q_this;
490         }
491         xfree(m_p->input_buf);
492         m_p->input_buf = 0;
493         m_p->input_len = 0;
494
495         return 0;
496     }
497
498     shutdown();
499
500     m_PDU_Observer = observer;
501     void *ap;
502     m_p->cs = m_p->comstack(addr, &ap);
503
504     if (!m_p->cs)
505         return -1;
506
507     if (m_p->cert_fname)
508         cs_set_ssl_certificate_file(m_p->cs, m_p->cert_fname);
509
510     if (cs_bind(m_p->cs, ap, CS_SERVER) < 0)
511         return -2;
512
513     int fd = cs_fileno(m_p->cs);
514 #if HAVE_FCNTL_H
515     int oldflags = fcntl(fd, F_GETFD, 0);
516     if (oldflags >= 0)
517     {
518         oldflags |= FD_CLOEXEC;
519         fcntl(fd, F_SETFD, oldflags);
520     }
521 #endif
522     m_p->m_socketObservable->addObserver(fd, this);
523     yaz_log(m_p->log, "maskObserver 9");
524     m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
525                                           SOCKET_OBSERVE_EXCEPT);
526     yaz_log(m_p->log, "PDU_Assoc::listen ok fd=%d", fd);
527     m_p->state = PDU_Assoc_priv::Listen;
528     return 0;
529 }
530
531 COMSTACK PDU_Assoc::get_comstack()
532 {
533     return m_p->cs;
534 }
535
536 void PDU_Assoc::idleTime(int idleTime)
537 {
538     m_p->idleTime = idleTime;
539     yaz_log(m_p->log, "PDU_Assoc::idleTime(%d)", idleTime);
540     m_p->m_socketObservable->timeoutObserver(this, m_p->idleTime);
541 }
542
543 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
544 {
545     yaz_log(m_p->log, "PDU_Assoc::connect %s", addr);
546     shutdown();
547     m_PDU_Observer = observer;
548     void *ap;
549     m_p->cs = m_p->comstack(addr, &ap);
550     if (!m_p->cs)
551         return -1;
552     int res = cs_connect(m_p->cs, ap);
553     yaz_log(m_p->log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_p->cs),
554             res);
555     m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
556
557     if (res == 0)
558     {   // Connect complete
559         m_p->state = PDU_Assoc_priv::Connecting;
560         unsigned mask = SOCKET_OBSERVE_EXCEPT;
561         mask |= SOCKET_OBSERVE_WRITE;
562         mask |= SOCKET_OBSERVE_READ;
563         yaz_log(m_p->log, "maskObserver 11");
564         m_p->m_socketObservable->maskObserver(this, mask);
565     }
566     else if (res > 0)
567     {   // Connect pending
568         m_p->state = PDU_Assoc_priv::Connecting;
569         unsigned mask = SOCKET_OBSERVE_EXCEPT;
570         if (m_p->cs->io_pending & CS_WANT_WRITE)
571             mask |= SOCKET_OBSERVE_WRITE;
572         if (m_p->cs->io_pending & CS_WANT_READ)
573             mask |= SOCKET_OBSERVE_READ;
574         yaz_log(m_p->log, "maskObserver 11");
575         m_p->m_socketObservable->maskObserver(this, mask);
576     }
577     else
578     {   // Connect failed immediately
579         // Since m_state is Closed we can distinguish this case from
580         // normal connect in socketNotify handler
581         yaz_log(m_p->log, "maskObserver 12");
582         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
583                                               SOCKET_OBSERVE_EXCEPT);
584     }
585     return 0;
586 }
587
588 // Single-threaded... Only useful for non-blocking handlers
589 void PDU_Assoc::childNotify(COMSTACK cs)
590 {
591     PDU_Assoc *new_observable =
592         new PDU_Assoc(m_p->m_socketObservable, cs);
593
594     // Clone PDU Observer
595     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
596         (new_observable, cs_fileno(cs));
597
598     if (!new_observable->m_PDU_Observer)
599     {
600         new_observable->shutdown();
601         delete new_observable;
602         return;
603     }
604     new_observable->m_p->pdu_next = m_p->pdu_children;
605     m_p->pdu_children = new_observable;
606     new_observable->m_p->pdu_parent = this;
607 }
608
609 const char*PDU_Assoc::getpeername()
610 {
611     if (!m_p->cs)
612         return 0;
613     return cs_addrstr(m_p->cs);
614 }
615
616 void PDU_Assoc::set_cert_fname(const char *fname)
617 {
618     xfree(m_p->cert_fname);
619     m_p->cert_fname = 0;
620     if (fname)
621         m_p->cert_fname = xstrdup(fname);
622 }
623
624 /*
625  * Local variables:
626  * c-basic-offset: 4
627  * c-file-style: "Stroustrup"
628  * indent-tabs-mode: nil
629  * End:
630  * vim: shiftwidth=4 tabstop=8 expandtab
631  */
632