Pimpl PDU_Assoc class
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /* This file is part of the yazpp toolkit.
2  * Copyright (C) 1998-2012 Index Data and Mike Taylor
3  * See the file LICENSE for details.
4  */
5
6 #if HAVE_CONFIG_H
7 #include <config.h>
8 #endif
9 #include <assert.h>
10 #include <string.h>
11 #include <yaz/log.h>
12 #include <yaz/tcpip.h>
13
14 #include <yazpp/pdu-assoc.h>
15
16 #if HAVE_FCNTL_H
17 #include <fcntl.h>
18 #endif
19
20 using namespace yazpp_1;
21
22 namespace yazpp_1 {
23     class PDU_Assoc_priv {
24     public:
25         enum {
26             Connecting,
27             Listen,
28             Ready,
29             Closed,
30             Writing,
31             Accepting
32         } state;
33         class PDU_Queue {
34         public:
35             PDU_Queue(const char *buf, int len);
36             ~PDU_Queue();
37             char *m_buf;
38             int m_len;
39             PDU_Queue *m_next;
40         };
41         PDU_Assoc *pdu_parent;
42         PDU_Assoc *pdu_children;
43         PDU_Assoc *pdu_next;
44         COMSTACK cs;
45         yazpp_1::ISocketObservable *m_socketObservable;
46         char *input_buf;
47         int input_len;
48         PDU_Queue *queue_out;
49         PDU_Queue *queue_in;
50         int *destroyed;
51         int idleTime;
52         int log;
53         void init(yazpp_1::ISocketObservable *socketObservable);
54         bool m_session_is_dead;
55     };
56 }
57
58 void PDU_Assoc_priv::init(ISocketObservable *socketObservable)
59 {
60     state = Closed;
61     cs = 0;
62     m_socketObservable = socketObservable;
63     queue_out = 0;
64     queue_in = 0;
65     input_buf = 0;
66     input_len = 0;
67     pdu_children = 0;
68     pdu_parent = 0;
69     pdu_next = 0;
70     destroyed = 0;
71     idleTime = 0;
72     log = YLOG_DEBUG;
73     m_session_is_dead = false;
74 }
75
76 PDU_Assoc::~PDU_Assoc()
77 {
78     delete m_p;
79 }
80
81 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
82 {
83     m_PDU_Observer = 0;
84     m_p = new PDU_Assoc_priv;
85     m_p->init(socketObservable);
86 }
87
88 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
89                      COMSTACK cs)
90 {
91     m_PDU_Observer = 0;
92     m_p = new PDU_Assoc_priv;
93     m_p->init(socketObservable);
94     m_p->cs = cs;
95     unsigned mask = 0;
96     if (cs->io_pending & CS_WANT_WRITE)
97         mask |= SOCKET_OBSERVE_WRITE;
98     if (cs->io_pending & CS_WANT_READ)
99         mask |= SOCKET_OBSERVE_READ;
100     m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
101     if (!mask)
102     {
103         yaz_log(m_p->log, "new PDU_Assoc. Ready");
104         m_p->state = PDU_Assoc_priv::Ready;
105         flush_PDU();
106     }
107     else
108     {
109         yaz_log(m_p->log, "new PDU_Assoc. Accepting");
110         // assume comstack is accepting...
111         m_p->state = PDU_Assoc_priv::Accepting;
112         m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
113         yaz_log(m_p->log, "maskObserver 1");
114         m_p->m_socketObservable->maskObserver(this,
115                                               mask|SOCKET_OBSERVE_EXCEPT);
116     }
117 }
118
119
120 IPDU_Observable *PDU_Assoc::clone()
121 {
122     PDU_Assoc *copy = new PDU_Assoc(m_p->m_socketObservable);
123     return copy;
124 }
125
126 void PDU_Assoc::socketNotify(int event)
127 {
128     yaz_log(m_p->log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
129             this, m_p->state, event);
130     if (event & SOCKET_OBSERVE_EXCEPT)
131     {
132         shutdown();
133         m_PDU_Observer->failNotify();
134         return;
135     }
136     else if (event & SOCKET_OBSERVE_TIMEOUT)
137     {
138         m_PDU_Observer->timeoutNotify();
139         return;
140     }
141     switch (m_p->state)
142     {
143     case PDU_Assoc_priv::Accepting:
144         if (!cs_accept(m_p->cs))
145         {
146             yaz_log(m_p->log, "PDU_Assoc::cs_accept failed");
147             m_p->cs = 0;
148             shutdown();
149             m_PDU_Observer->failNotify();
150         }
151         else
152         {
153             unsigned mask = 0;
154             if (m_p->cs->io_pending & CS_WANT_WRITE)
155                 mask |= SOCKET_OBSERVE_WRITE;
156             if (m_p->cs->io_pending & CS_WANT_READ)
157                 mask |= SOCKET_OBSERVE_READ;
158             if (!mask)
159             {   // accept is complete. turn to ready state and write if needed
160                 m_p->state = PDU_Assoc_priv::Ready;
161                 flush_PDU();
162             }
163             else
164             {   // accept still incomplete.
165                 yaz_log(m_p->log, "maskObserver 2");
166                 m_p->m_socketObservable->maskObserver(this,
167                                              mask|SOCKET_OBSERVE_EXCEPT);
168             }
169         }
170         break;
171     case PDU_Assoc_priv::Connecting:
172         if (event & SOCKET_OBSERVE_READ &&
173             event & SOCKET_OBSERVE_WRITE)
174         {
175             // For Unix: if both read and write is set, then connect failed.
176             shutdown();
177             m_PDU_Observer->failNotify();
178         }
179         else
180         {
181             yaz_log(m_p->log, "cs_rcvconnect");
182             int res = cs_rcvconnect(m_p->cs);
183             if (res == 1)
184             {
185                 unsigned mask = SOCKET_OBSERVE_EXCEPT;
186                 if (m_p->cs->io_pending & CS_WANT_WRITE)
187                     mask |= SOCKET_OBSERVE_WRITE;
188                 if (m_p->cs->io_pending & CS_WANT_READ)
189                     mask |= SOCKET_OBSERVE_READ;
190                 yaz_log(m_p->log, "maskObserver 3");
191                 m_p->m_socketObservable->maskObserver(this, mask);
192             }
193             else
194             {
195                 m_p->state = PDU_Assoc_priv::Ready;
196                 if (m_PDU_Observer)
197                     m_PDU_Observer->connectNotify();
198                 flush_PDU();
199             }
200         }
201         break;
202     case PDU_Assoc_priv::Listen:
203         if (event & SOCKET_OBSERVE_READ)
204         {
205             int res;
206             COMSTACK new_line;
207
208             if ((res = cs_listen(m_p->cs, 0, 0)) == 1)
209                 return;
210             if (res < 0)
211             {
212                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
213                 return;
214             }
215             if (!(new_line = cs_accept(m_p->cs)))
216                 return;
217             /* 1. create socket-manager
218                2. create pdu-assoc
219                3. create top-level object
220                     setup observer for child fileid in pdu-assoc
221                4. start thread
222             */
223             yaz_log(m_p->log, "new session: parent fd=%d child fd=%d",
224                     cs_fileno(m_p->cs), cs_fileno(new_line));
225             childNotify(new_line);
226         }
227         break;
228     case PDU_Assoc_priv::Writing:
229         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
230             flush_PDU();
231         break;
232     case PDU_Assoc_priv::Ready:
233         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
234         {
235             do
236             {
237                 int res = cs_get(m_p->cs, &m_p->input_buf, &m_p->input_len);
238                 if (res == 1)
239                 {
240                     unsigned mask = SOCKET_OBSERVE_EXCEPT;
241                     if (m_p->cs->io_pending & CS_WANT_WRITE)
242                         mask |= SOCKET_OBSERVE_WRITE;
243                     if (m_p->cs->io_pending & CS_WANT_READ)
244                         mask |= SOCKET_OBSERVE_READ;
245                     yaz_log(m_p->log, "maskObserver 4");
246                     m_p->m_socketObservable->maskObserver(this, mask);
247                     return;
248                 }
249                 else if (res <= 0)
250                 {
251                     yaz_log(m_p->log, "PDU_Assoc::Connection closed by peer");
252                     shutdown();
253                     if (m_PDU_Observer)
254                         m_PDU_Observer->failNotify(); // problem here..
255                     return;
256                 }
257                 // lock it, so we know if recv_PDU deletes it.
258                 int destroyed = 0;
259                 m_p->destroyed = &destroyed;
260
261                 if (!m_PDU_Observer)
262                     return;
263 #if 0
264                 PDU_Assoc_priv::PDU_Queue **pq = &m_p->m_queue_in;
265                 while (*pq)
266                     pq = &(*pq)->m_next;
267
268                 *pq = new PDU_Assoc_priv::PDU_Queue(m_p->m_input_buf, res);
269 #else
270                 m_PDU_Observer->recv_PDU(m_p->input_buf, res);
271 #endif
272                 if (destroyed)   // it really was destroyed, return now.
273                     return;
274                 m_p->destroyed = 0;
275             } while (m_p->cs && cs_more(m_p->cs));
276             if (m_p->cs && m_p->state == PDU_Assoc_priv::Ready)
277             {
278                 yaz_log(m_p->log, "maskObserver 5");
279                 m_p->m_socketObservable->maskObserver(this,
280                                                       SOCKET_OBSERVE_EXCEPT|
281                                                       SOCKET_OBSERVE_READ);
282             }
283         }
284         break;
285     case PDU_Assoc_priv::Closed:
286         yaz_log(m_p->log, "CLOSING state=%d event was %d", m_p->state,
287                 event);
288         shutdown();
289         m_PDU_Observer->failNotify();
290         break;
291     default:
292         yaz_log(m_p->log, "Unknown state=%d event was %d", m_p->state, event);
293         shutdown();
294         m_PDU_Observer->failNotify();
295     }
296 }
297
298 void PDU_Assoc::close_session()
299 {
300     m_p->m_session_is_dead = true;
301     if (!m_p->queue_out)
302     {
303         shutdown();
304         m_PDU_Observer->failNotify();
305     }
306 }
307
308 void PDU_Assoc::shutdown()
309 {
310     PDU_Assoc *ch;
311     for (ch = m_p->pdu_children; ch; ch = ch->m_p->pdu_next)
312         ch->shutdown();
313
314     m_p->m_socketObservable->deleteObserver(this);
315     m_p->state = PDU_Assoc_priv::Closed;
316     if (m_p->cs)
317     {
318         yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
319         cs_close(m_p->cs);
320     }
321     m_p->cs = 0;
322     while (m_p->queue_out)
323     {
324         PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
325         m_p->queue_out = m_p->queue_out->m_next;
326         delete q_this;
327     }
328     xfree(m_p->input_buf);
329     m_p->input_buf = 0;
330     m_p->input_len = 0;
331 }
332
333 void PDU_Assoc::destroy()
334 {
335     shutdown();
336
337     if (m_p->destroyed)
338         *m_p->destroyed = 1;
339     PDU_Assoc **c;
340
341     // delete from parent's child list (if any)
342     if (m_p->pdu_parent)
343     {
344         c = &m_p->pdu_parent->m_p->pdu_children;
345         while (*c != this)
346         {
347             assert (*c);
348             c = &(*c)->m_p->pdu_next;
349         }
350         *c = (*c)->m_p->pdu_next;
351     }
352     // delete all children ...
353     c = &m_p->pdu_children;
354     while (*c)
355     {
356         PDU_Assoc *here = *c;
357         *c = (*c)->m_p->pdu_next;
358         here->m_p->pdu_parent = 0;
359         delete here;
360     }
361     yaz_log(m_p->log, "PDU_Assoc::destroy this=%p", this);
362 }
363
364 PDU_Assoc_priv::PDU_Queue::PDU_Queue(const char *buf, int len)
365 {
366     m_buf = (char *) xmalloc(len);
367     memcpy(m_buf, buf, len);
368     m_len = len;
369     m_next = 0;
370 }
371
372 PDU_Assoc_priv::PDU_Queue::~PDU_Queue()
373 {
374     xfree(m_buf);
375 }
376
377 int PDU_Assoc::flush_PDU()
378 {
379     int r;
380
381     if (m_p->state != PDU_Assoc_priv::Ready && m_p->state != PDU_Assoc_priv::Writing)
382     {
383         yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU, not ready");
384         return 1;
385     }
386     PDU_Assoc_priv::PDU_Queue *q = m_p->queue_out;
387     if (!q)
388     {
389         m_p->state = PDU_Assoc_priv::Ready;
390         yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU queue empty");
391         yaz_log(m_p->log, "maskObserver 6");
392         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
393                                               SOCKET_OBSERVE_WRITE|
394                                               SOCKET_OBSERVE_EXCEPT);
395         if (m_p->m_session_is_dead)
396         {
397             shutdown();
398             m_PDU_Observer->failNotify();
399         }
400         return 0;
401     }
402     r = cs_put(m_p->cs, q->m_buf, q->m_len);
403     if (r < 0)
404     {
405         yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put failed");
406         shutdown();
407         m_PDU_Observer->failNotify();
408         return r;
409     }
410     if (r == 1)
411     {
412         unsigned mask = SOCKET_OBSERVE_EXCEPT;
413         m_p->state = PDU_Assoc_priv::Writing;
414         if (m_p->cs->io_pending & CS_WANT_WRITE)
415             mask |= SOCKET_OBSERVE_WRITE;
416         if (m_p->cs->io_pending & CS_WANT_READ)
417             mask |= SOCKET_OBSERVE_READ;
418
419         mask |= SOCKET_OBSERVE_WRITE;
420         yaz_log(m_p->log, "maskObserver 7");
421         m_p->m_socketObservable->maskObserver(this, mask);
422         yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
423                 q->m_len, cs_fileno(m_p->cs));
424         return r;
425     }
426     yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
427     // whole packet sent... delete this and proceed to next ...
428     m_p->queue_out = q->m_next;
429     delete q;
430     // don't select on write if queue is empty ...
431     if (!m_p->queue_out)
432     {
433         m_p->state = PDU_Assoc_priv::Ready;
434         yaz_log(m_p->log, "maskObserver 8");
435         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
436                                               SOCKET_OBSERVE_EXCEPT);
437         if (m_p->m_session_is_dead)
438             shutdown();
439     }
440     return r;
441 }
442
443 int PDU_Assoc::send_PDU(const char *buf, int len)
444 {
445     yaz_log(m_p->log, "PDU_Assoc::send_PDU");
446     PDU_Assoc_priv::PDU_Queue **pq = &m_p->queue_out;
447     int is_idle = (*pq ? 0 : 1);
448
449     if (!m_p->cs)
450     {
451         yaz_log(m_p->log, "PDU_Assoc::send_PDU failed, cs == 0");
452         return -1;
453     }
454     while (*pq)
455         pq = &(*pq)->m_next;
456     *pq = new PDU_Assoc_priv::PDU_Queue(buf, len);
457     if (is_idle)
458         return flush_PDU();
459     else
460         yaz_log(m_p->log, "PDU_Assoc::cannot send_PDU fd=%d",
461                 cs_fileno(m_p->cs));
462     return 0;
463 }
464
465 COMSTACK PDU_Assoc::comstack(const char *type_and_host, void **vp)
466 {
467     return cs_create_host(type_and_host, 2, vp);
468 }
469
470 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
471 {
472     if (*addr == '\0')
473     {
474         m_p->m_socketObservable->deleteObserver(this);
475         m_p->state = PDU_Assoc_priv::Closed;
476         if (m_p->cs)
477         {
478             yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
479             cs_close(m_p->cs);
480         }
481         m_p->cs = 0;
482         while (m_p->queue_out)
483         {
484             PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
485             m_p->queue_out = m_p->queue_out->m_next;
486             delete q_this;
487         }
488         xfree(m_p->input_buf);
489         m_p->input_buf = 0;
490         m_p->input_len = 0;
491
492         return 0;
493     }
494
495     shutdown();
496
497     m_PDU_Observer = observer;
498     void *ap;
499     m_p->cs = comstack(addr, &ap);
500
501     if (!m_p->cs)
502         return -1;
503     if (cs_bind(m_p->cs, ap, CS_SERVER) < 0)
504         return -2;
505
506     int fd = cs_fileno(m_p->cs);
507 #if HAVE_FCNTL_H
508     int oldflags = fcntl(fd, F_GETFD, 0);
509     if (oldflags >= 0)
510     {
511         oldflags |= FD_CLOEXEC;
512         fcntl(fd, F_SETFD, oldflags);
513     }
514 #endif
515     m_p->m_socketObservable->addObserver(fd, this);
516     yaz_log(m_p->log, "maskObserver 9");
517     m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
518                                           SOCKET_OBSERVE_EXCEPT);
519     yaz_log(m_p->log, "PDU_Assoc::listen ok fd=%d", fd);
520     m_p->state = PDU_Assoc_priv::Listen;
521     return 0;
522 }
523
524 void PDU_Assoc::idleTime(int idleTime)
525 {
526     m_p->idleTime = idleTime;
527     yaz_log(m_p->log, "PDU_Assoc::idleTime(%d)", idleTime);
528     m_p->m_socketObservable->timeoutObserver(this, m_p->idleTime);
529 }
530
531 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
532 {
533     yaz_log(m_p->log, "PDU_Assoc::connect %s", addr);
534     shutdown();
535     m_PDU_Observer = observer;
536     void *ap;
537     m_p->cs = comstack(addr, &ap);
538     if (!m_p->cs)
539         return -1;
540     int res = cs_connect(m_p->cs, ap);
541     yaz_log(m_p->log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_p->cs),
542             res);
543     m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
544
545     if (res == 0)
546     {   // Connect complete
547         m_p->state = PDU_Assoc_priv::Connecting;
548         unsigned mask = SOCKET_OBSERVE_EXCEPT;
549         mask |= SOCKET_OBSERVE_WRITE;
550         mask |= SOCKET_OBSERVE_READ;
551         yaz_log(m_p->log, "maskObserver 11");
552         m_p->m_socketObservable->maskObserver(this, mask);
553     }
554     else if (res > 0)
555     {   // Connect pending
556         m_p->state = PDU_Assoc_priv::Connecting;
557         unsigned mask = SOCKET_OBSERVE_EXCEPT;
558         if (m_p->cs->io_pending & CS_WANT_WRITE)
559             mask |= SOCKET_OBSERVE_WRITE;
560         if (m_p->cs->io_pending & CS_WANT_READ)
561             mask |= SOCKET_OBSERVE_READ;
562         yaz_log(m_p->log, "maskObserver 11");
563         m_p->m_socketObservable->maskObserver(this, mask);
564     }
565     else
566     {   // Connect failed immediately
567         // Since m_state is Closed we can distinguish this case from
568         // normal connect in socketNotify handler
569         yaz_log(m_p->log, "maskObserver 12");
570         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
571                                               SOCKET_OBSERVE_EXCEPT);
572     }
573     return 0;
574 }
575
576 // Single-threaded... Only useful for non-blocking handlers
577 void PDU_Assoc::childNotify(COMSTACK cs)
578 {
579     PDU_Assoc *new_observable =
580         new PDU_Assoc(m_p->m_socketObservable, cs);
581
582     // Clone PDU Observer
583     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
584         (new_observable, cs_fileno(cs));
585
586     if (!new_observable->m_PDU_Observer)
587     {
588         new_observable->shutdown();
589         delete new_observable;
590         return;
591     }
592     new_observable->m_p->pdu_next = m_p->pdu_children;
593     m_p->pdu_children = new_observable;
594     new_observable->m_p->pdu_parent = this;
595 }
596
597 const char*PDU_Assoc::getpeername()
598 {
599     if (!m_p->cs)
600         return 0;
601     return cs_addrstr(m_p->cs);
602 }
603 /*
604  * Local variables:
605  * c-basic-offset: 4
606  * c-file-style: "Stroustrup"
607  * indent-tabs-mode: nil
608  * End:
609  * vim: shiftwidth=4 tabstop=8 expandtab
610  */
611