Change ISocketObservable interface
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /* This file is part of the yazpp toolkit.
2  * Copyright (C) Index Data 
3  * See the file LICENSE for details.
4  */
5
6 #if HAVE_CONFIG_H
7 #include <config.h>
8 #endif
9 #include <assert.h>
10 #include <string.h>
11 #include <yaz/log.h>
12 #include <yaz/tcpip.h>
13
14 #include <yazpp/pdu-assoc.h>
15
16 #if HAVE_FCNTL_H
17 #include <fcntl.h>
18 #endif
19
20 using namespace yazpp_1;
21
22 namespace yazpp_1 {
23     class PDU_Assoc_priv {
24         friend class PDU_Assoc;
25     private:
26         enum {
27             Connecting,
28             Listen,
29             Ready,
30             Closed,
31             Writing,
32             Accepting
33         } state;
34         class PDU_Queue {
35         public:
36             PDU_Queue(const char *buf, int len);
37             ~PDU_Queue();
38             char *m_buf;
39             int m_len;
40             PDU_Queue *m_next;
41         };
42         PDU_Assoc *pdu_parent;
43         PDU_Assoc *pdu_children;
44         PDU_Assoc *pdu_next;
45         COMSTACK cs;
46         yazpp_1::ISocketObservable *m_socketObservable;
47         char *input_buf;
48         int input_len;
49         PDU_Queue *queue_out;
50         PDU_Queue *queue_in;
51         int *destroyed;
52         int idleTime;
53         int log;
54         void init(yazpp_1::ISocketObservable *socketObservable);
55         COMSTACK comstack(const char *type_and_host, void **vp);
56         bool m_session_is_dead;
57         char *cert_fname;
58     };
59 }
60
61 void PDU_Assoc_priv::init(ISocketObservable *socketObservable)
62 {
63     state = Closed;
64     cs = 0;
65     m_socketObservable = socketObservable;
66     queue_out = 0;
67     queue_in = 0;
68     input_buf = 0;
69     input_len = 0;
70     pdu_children = 0;
71     pdu_parent = 0;
72     pdu_next = 0;
73     destroyed = 0;
74     idleTime = 0;
75     log = YLOG_DEBUG;
76     m_session_is_dead = false;
77     cert_fname = 0;
78 }
79
80 PDU_Assoc::~PDU_Assoc()
81 {
82     xfree(m_p->cert_fname);
83     delete m_p;
84 }
85
86 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
87 {
88     m_PDU_Observer = 0;
89     m_p = new PDU_Assoc_priv;
90     m_p->init(socketObservable);
91 }
92
93 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
94                      COMSTACK cs)
95 {
96     m_PDU_Observer = 0;
97     m_p = new PDU_Assoc_priv;
98     m_p->init(socketObservable);
99     m_p->cs = cs;
100     unsigned mask = 0;
101     if (cs->io_pending & CS_WANT_WRITE)
102         mask |= SOCKET_OBSERVE_WRITE;
103     if (cs->io_pending & CS_WANT_READ)
104         mask |= SOCKET_OBSERVE_READ;
105     m_p->m_socketObservable->addObserver(this);
106     if (!mask)
107     {
108         yaz_log(m_p->log, "new PDU_Assoc. Ready");
109         m_p->state = PDU_Assoc_priv::Ready;
110         flush_PDU();
111     }
112     else
113     {
114         yaz_log(m_p->log, "new PDU_Assoc. Accepting");
115         // assume comstack is accepting...
116         m_p->state = PDU_Assoc_priv::Accepting;
117         m_p->m_socketObservable->addObserver(this);
118         yaz_log(m_p->log, "maskObserver 1");
119         m_p->m_socketObservable->maskObserver(this,
120                                               mask|SOCKET_OBSERVE_EXCEPT,
121                                               cs_fileno(cs));
122     }
123 }
124
125
126 IPDU_Observable *PDU_Assoc::clone()
127 {
128     PDU_Assoc *copy = new PDU_Assoc(m_p->m_socketObservable);
129     return copy;
130 }
131
132 void PDU_Assoc::socketNotify(int event)
133 {
134     yaz_log(m_p->log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
135             this, m_p->state, event);
136     if ((event & SOCKET_OBSERVE_EXCEPT) &&
137         m_p->state != PDU_Assoc_priv::Connecting)
138     {
139         yaz_log(m_p->log, "PDU_Assoc::socketNotify except");
140         shutdown();
141         m_PDU_Observer->failNotify();
142         return;
143     }
144     else if (event & SOCKET_OBSERVE_TIMEOUT)
145     {
146         yaz_log(m_p->log, "PDU_Assoc::socketNotify timeout");
147         m_PDU_Observer->timeoutNotify();
148         return;
149     }
150     int res;
151     switch (m_p->state)
152     {
153     case PDU_Assoc_priv::Accepting:
154         if (!cs_accept(m_p->cs))
155         {
156             yaz_log(m_p->log, "PDU_Assoc::cs_accept failed");
157             m_p->cs = 0;
158             shutdown();
159             m_PDU_Observer->failNotify();
160         }
161         else
162         {
163             unsigned mask = 0;
164             if (m_p->cs->io_pending & CS_WANT_WRITE)
165                 mask |= SOCKET_OBSERVE_WRITE;
166             if (m_p->cs->io_pending & CS_WANT_READ)
167                 mask |= SOCKET_OBSERVE_READ;
168             if (!mask)
169             {   // accept is complete. turn to ready state and write if needed
170                 m_p->state = PDU_Assoc_priv::Ready;
171                 flush_PDU();
172             }
173             else
174             {   // accept still incomplete.
175                 yaz_log(m_p->log, "maskObserver 2");
176                 m_p->m_socketObservable->maskObserver(
177                     this,
178                     mask|SOCKET_OBSERVE_EXCEPT, cs_fileno(m_p->cs));
179             }
180         }
181         break;
182     case PDU_Assoc_priv::Connecting:
183         yaz_log(m_p->log, "PDU_Assoc::socketNotify Connecting");
184         res = cs_rcvconnect(m_p->cs);
185         if (res == 1)
186         {
187             unsigned mask = SOCKET_OBSERVE_EXCEPT;
188             if (m_p->cs->io_pending & CS_WANT_WRITE)
189                 mask |= SOCKET_OBSERVE_WRITE;
190             if (m_p->cs->io_pending & CS_WANT_READ)
191                 mask |= SOCKET_OBSERVE_READ;
192             yaz_log(m_p->log, "maskObserver 3");
193             m_p->m_socketObservable->maskObserver(this, mask,
194                                                   cs_fileno(m_p->cs));
195         }
196         else
197         {
198             m_p->state = PDU_Assoc_priv::Ready;
199             if (m_PDU_Observer)
200                 m_PDU_Observer->connectNotify();
201             flush_PDU();
202         }
203         break;
204     case PDU_Assoc_priv::Listen:
205         if (event & SOCKET_OBSERVE_READ)
206         {
207             COMSTACK new_line;
208
209             if ((res = cs_listen(m_p->cs, 0, 0)) == 1)
210                 return;
211             if (res < 0)
212             {
213                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
214                 return;
215             }
216             if (!(new_line = cs_accept(m_p->cs)))
217                 return;
218             /* 1. create socket-manager
219                2. create pdu-assoc
220                3. create top-level object
221                     setup observer for child fileid in pdu-assoc
222                4. start thread
223             */
224             yaz_log(m_p->log, "new session: parent fd=%d child fd=%d",
225                     cs_fileno(m_p->cs), cs_fileno(new_line));
226             childNotify(new_line);
227         }
228         break;
229     case PDU_Assoc_priv::Writing:
230         yaz_log(m_p->log, "PDU_Assoc::socketNotify writing");
231         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
232             flush_PDU();
233         break;
234     case PDU_Assoc_priv::Ready:
235         yaz_log(m_p->log, "PDU_Assoc::socketNotify ready");
236         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
237         {
238             do
239             {
240                 res = cs_get(m_p->cs, &m_p->input_buf, &m_p->input_len);
241                 if (res == 1)
242                 {
243                     unsigned mask = SOCKET_OBSERVE_EXCEPT;
244                     if (m_p->cs->io_pending & CS_WANT_WRITE)
245                         mask |= SOCKET_OBSERVE_WRITE;
246                     if (m_p->cs->io_pending & CS_WANT_READ)
247                         mask |= SOCKET_OBSERVE_READ;
248                     yaz_log(m_p->log, "maskObserver 4");
249                     m_p->m_socketObservable->maskObserver(this, mask,
250                                                           cs_fileno(m_p->cs));
251                     return;
252                 }
253                 else if (res <= 0)
254                 {
255                     yaz_log(m_p->log, "PDU_Assoc::Connection closed by peer");
256                     shutdown();
257                     if (m_PDU_Observer)
258                         m_PDU_Observer->failNotify(); // problem here..
259                     return;
260                 }
261                 // lock it, so we know if recv_PDU deletes it.
262                 int destroyed = 0;
263                 m_p->destroyed = &destroyed;
264
265                 if (!m_PDU_Observer)
266                     return;
267 #if 0
268                 PDU_Assoc_priv::PDU_Queue **pq = &m_p->m_queue_in;
269                 while (*pq)
270                     pq = &(*pq)->m_next;
271
272                 *pq = new PDU_Assoc_priv::PDU_Queue(m_p->m_input_buf, res);
273 #else
274                 m_PDU_Observer->recv_PDU(m_p->input_buf, res);
275 #endif
276                 if (destroyed)   // it really was destroyed, return now.
277                     return;
278                 m_p->destroyed = 0;
279             } while (m_p->cs && cs_more(m_p->cs));
280             if (m_p->cs && m_p->state == PDU_Assoc_priv::Ready)
281             {
282                 yaz_log(m_p->log, "maskObserver 5");
283                 m_p->m_socketObservable->maskObserver(this,
284                                                       SOCKET_OBSERVE_EXCEPT|
285                                                       SOCKET_OBSERVE_READ,
286                                                       cs_fileno(m_p->cs));
287             }
288         }
289         break;
290     case PDU_Assoc_priv::Closed:
291         yaz_log(m_p->log, "CLOSING state=%d event was %d", m_p->state,
292                 event);
293         shutdown();
294         m_PDU_Observer->failNotify();
295         break;
296     default:
297         yaz_log(m_p->log, "Unknown state=%d event was %d", m_p->state, event);
298         shutdown();
299         m_PDU_Observer->failNotify();
300     }
301 }
302
303 void PDU_Assoc::close_session()
304 {
305     m_p->m_session_is_dead = true;
306     if (!m_p->queue_out)
307     {
308         shutdown();
309         m_PDU_Observer->failNotify();
310     }
311 }
312
313 void PDU_Assoc::shutdown()
314 {
315     PDU_Assoc *ch;
316     for (ch = m_p->pdu_children; ch; ch = ch->m_p->pdu_next)
317         ch->shutdown();
318
319     m_p->m_socketObservable->deleteObserver(this);
320     m_p->state = PDU_Assoc_priv::Closed;
321     if (m_p->cs)
322     {
323         yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
324         cs_close(m_p->cs);
325     }
326     m_p->cs = 0;
327     while (m_p->queue_out)
328     {
329         PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
330         m_p->queue_out = m_p->queue_out->m_next;
331         delete q_this;
332     }
333     xfree(m_p->input_buf);
334     m_p->input_buf = 0;
335     m_p->input_len = 0;
336 }
337
338 void PDU_Assoc::destroy()
339 {
340     shutdown();
341
342     if (m_p->destroyed)
343         *m_p->destroyed = 1;
344     PDU_Assoc **c;
345
346     // delete from parent's child list (if any)
347     if (m_p->pdu_parent)
348     {
349         c = &m_p->pdu_parent->m_p->pdu_children;
350         while (*c != this)
351         {
352             assert (*c);
353             c = &(*c)->m_p->pdu_next;
354         }
355         *c = (*c)->m_p->pdu_next;
356     }
357     // delete all children ...
358     c = &m_p->pdu_children;
359     while (*c)
360     {
361         PDU_Assoc *here = *c;
362         *c = (*c)->m_p->pdu_next;
363         here->m_p->pdu_parent = 0;
364         delete here;
365     }
366     yaz_log(m_p->log, "PDU_Assoc::destroy this=%p", this);
367 }
368
369 PDU_Assoc_priv::PDU_Queue::PDU_Queue(const char *buf, int len)
370 {
371     m_buf = (char *) xmalloc(len);
372     memcpy(m_buf, buf, len);
373     m_len = len;
374     m_next = 0;
375 }
376
377 PDU_Assoc_priv::PDU_Queue::~PDU_Queue()
378 {
379     xfree(m_buf);
380 }
381
382 int PDU_Assoc::flush_PDU()
383 {
384     int r;
385
386     yaz_log(m_p->log, "PDU_Assoc::flush_PDU");
387     if (m_p->state != PDU_Assoc_priv::Ready && m_p->state != PDU_Assoc_priv::Writing)
388     {
389         yaz_log(m_p->log, "PDU_Assoc::flush_PDU, not ready");
390         return 1;
391     }
392     PDU_Assoc_priv::PDU_Queue *q = m_p->queue_out;
393     if (!q)
394     {
395         m_p->state = PDU_Assoc_priv::Ready;
396         yaz_log(m_p->log, "PDU_Assoc::flush_PDU queue empty");
397         yaz_log(m_p->log, "maskObserver 6");
398         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
399                                               SOCKET_OBSERVE_WRITE|
400                                               SOCKET_OBSERVE_EXCEPT,
401                                               cs_fileno(m_p->cs));
402         if (m_p->m_session_is_dead)
403         {
404             shutdown();
405             m_PDU_Observer->failNotify();
406         }
407         return 0;
408     }
409     r = cs_put(m_p->cs, q->m_buf, q->m_len);
410     if (r < 0)
411     {
412         yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put failed");
413         shutdown();
414         m_PDU_Observer->failNotify();
415         return r;
416     }
417     if (r == 1)
418     {
419         unsigned mask = SOCKET_OBSERVE_EXCEPT;
420         m_p->state = PDU_Assoc_priv::Writing;
421         if (m_p->cs->io_pending & CS_WANT_WRITE)
422             mask |= SOCKET_OBSERVE_WRITE;
423         if (m_p->cs->io_pending & CS_WANT_READ)
424             mask |= SOCKET_OBSERVE_READ;
425
426         mask |= SOCKET_OBSERVE_WRITE;
427         yaz_log(m_p->log, "maskObserver 7");
428         m_p->m_socketObservable->maskObserver(this, mask, cs_fileno(m_p->cs));
429         yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
430                 q->m_len, cs_fileno(m_p->cs));
431         return r;
432     }
433     yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
434     // whole packet sent... delete this and proceed to next ...
435     m_p->queue_out = q->m_next;
436     delete q;
437     // don't select on write if queue is empty ...
438     if (!m_p->queue_out)
439     {
440         m_p->state = PDU_Assoc_priv::Ready;
441         yaz_log(m_p->log, "maskObserver 8");
442         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
443                                               SOCKET_OBSERVE_EXCEPT,
444                                               cs_fileno(m_p->cs));
445         if (m_p->m_session_is_dead)
446             shutdown();
447     }
448     return r;
449 }
450
451 int PDU_Assoc::send_PDU(const char *buf, int len)
452 {
453     yaz_log(m_p->log, "PDU_Assoc::send_PDU");
454     PDU_Assoc_priv::PDU_Queue **pq = &m_p->queue_out;
455     int is_idle = (*pq ? 0 : 1);
456
457     if (!m_p->cs)
458     {
459         yaz_log(m_p->log, "PDU_Assoc::send_PDU failed, cs == 0");
460         return -1;
461     }
462     while (*pq)
463         pq = &(*pq)->m_next;
464     *pq = new PDU_Assoc_priv::PDU_Queue(buf, len);
465     if (is_idle)
466         return flush_PDU();
467     else
468         yaz_log(m_p->log, "PDU_Assoc::cannot send_PDU fd=%d",
469                 cs_fileno(m_p->cs));
470     return 0;
471 }
472
473 COMSTACK PDU_Assoc_priv::comstack(const char *type_and_host, void **vp)
474 {
475     return cs_create_host(type_and_host, 2, vp);
476 }
477
478 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
479 {
480     if (*addr == '\0')
481     {
482         m_p->m_socketObservable->deleteObserver(this);
483         m_p->state = PDU_Assoc_priv::Closed;
484         if (m_p->cs)
485         {
486             yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
487             cs_close(m_p->cs);
488         }
489         m_p->cs = 0;
490         while (m_p->queue_out)
491         {
492             PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
493             m_p->queue_out = m_p->queue_out->m_next;
494             delete q_this;
495         }
496         xfree(m_p->input_buf);
497         m_p->input_buf = 0;
498         m_p->input_len = 0;
499
500         return 0;
501     }
502
503     shutdown();
504
505     m_PDU_Observer = observer;
506     void *ap;
507     m_p->cs = m_p->comstack(addr, &ap);
508
509     if (!m_p->cs)
510         return -1;
511
512     if (m_p->cert_fname)
513         cs_set_ssl_certificate_file(m_p->cs, m_p->cert_fname);
514
515     if (cs_bind(m_p->cs, ap, CS_SERVER) < 0)
516         return -2;
517
518     int fd = cs_fileno(m_p->cs);
519 #if HAVE_FCNTL_H
520     int oldflags = fcntl(fd, F_GETFD, 0);
521     if (oldflags >= 0)
522     {
523         oldflags |= FD_CLOEXEC;
524         fcntl(fd, F_SETFD, oldflags);
525     }
526 #endif
527     m_p->m_socketObservable->addObserver(this);
528     yaz_log(m_p->log, "maskObserver 9");
529     m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
530                                           SOCKET_OBSERVE_EXCEPT,
531                                           cs_fileno(m_p->cs));
532     yaz_log(m_p->log, "PDU_Assoc::listen ok fd=%d", fd);
533     m_p->state = PDU_Assoc_priv::Listen;
534     return 0;
535 }
536
537 COMSTACK PDU_Assoc::get_comstack()
538 {
539     return m_p->cs;
540 }
541
542 void PDU_Assoc::idleTime(int idleTime)
543 {
544     m_p->idleTime = idleTime;
545     yaz_log(m_p->log, "PDU_Assoc::idleTime(%d)", idleTime);
546     m_p->m_socketObservable->timeoutObserver(this, m_p->idleTime);
547 }
548
549 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
550 {
551     yaz_log(m_p->log, "PDU_Assoc::connect %s", addr);
552     shutdown();
553     m_PDU_Observer = observer;
554     void *ap;
555     m_p->cs = m_p->comstack(addr, &ap);
556     if (!m_p->cs)
557         return -1;
558     int res = cs_connect(m_p->cs, ap);
559     yaz_log(m_p->log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_p->cs),
560             res);
561     m_p->m_socketObservable->addObserver(this);
562
563     if (res == 0)
564     {   // Connect complete
565         m_p->state = PDU_Assoc_priv::Connecting;
566         unsigned mask = SOCKET_OBSERVE_EXCEPT;
567         mask |= SOCKET_OBSERVE_WRITE;
568         mask |= SOCKET_OBSERVE_READ;
569         yaz_log(m_p->log, "maskObserver 11");
570         m_p->m_socketObservable->maskObserver(this, mask, cs_fileno(m_p->cs));
571     }
572     else if (res > 0)
573     {   // Connect pending
574         m_p->state = PDU_Assoc_priv::Connecting;
575         unsigned mask = SOCKET_OBSERVE_EXCEPT;
576         if (m_p->cs->io_pending & CS_WANT_WRITE)
577             mask |= SOCKET_OBSERVE_WRITE;
578         if (m_p->cs->io_pending & CS_WANT_READ)
579             mask |= SOCKET_OBSERVE_READ;
580         yaz_log(m_p->log, "maskObserver 11");
581         m_p->m_socketObservable->maskObserver(this, mask, cs_fileno(m_p->cs));
582     }
583     else
584     {   // Connect failed immediately
585         // Since m_state is Closed we can distinguish this case from
586         // normal connect in socketNotify handler
587         yaz_log(m_p->log, "maskObserver 12");
588         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
589                                               SOCKET_OBSERVE_EXCEPT,
590                                               cs_fileno(m_p->cs));
591     }
592     return 0;
593 }
594
595 // Single-threaded... Only useful for non-blocking handlers
596 void PDU_Assoc::childNotify(COMSTACK cs)
597 {
598     PDU_Assoc *new_observable =
599         new PDU_Assoc(m_p->m_socketObservable, cs);
600
601     // Clone PDU Observer
602     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
603         (new_observable, cs_fileno(cs));
604
605     if (!new_observable->m_PDU_Observer)
606     {
607         new_observable->shutdown();
608         delete new_observable;
609         return;
610     }
611     new_observable->m_p->pdu_next = m_p->pdu_children;
612     m_p->pdu_children = new_observable;
613     new_observable->m_p->pdu_parent = this;
614 }
615
616 const char*PDU_Assoc::getpeername()
617 {
618     if (!m_p->cs)
619         return 0;
620     return cs_addrstr(m_p->cs);
621 }
622
623 void PDU_Assoc::set_cert_fname(const char *fname)
624 {
625     xfree(m_p->cert_fname);
626     m_p->cert_fname = 0;
627     if (fname)
628         m_p->cert_fname = xstrdup(fname);
629 }
630
631 /*
632  * Local variables:
633  * c-basic-offset: 4
634  * c-file-style: "Stroustrup"
635  * indent-tabs-mode: nil
636  * End:
637  * vim: shiftwidth=4 tabstop=8 expandtab
638  */
639