Make PDU_Assoc_priv members private
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /* This file is part of the yazpp toolkit.
2  * Copyright (C) 1998-2012 Index Data and Mike Taylor
3  * See the file LICENSE for details.
4  */
5
6 #if HAVE_CONFIG_H
7 #include <config.h>
8 #endif
9 #include <assert.h>
10 #include <string.h>
11 #include <yaz/log.h>
12 #include <yaz/tcpip.h>
13
14 #include <yazpp/pdu-assoc.h>
15
16 #if HAVE_FCNTL_H
17 #include <fcntl.h>
18 #endif
19
20 using namespace yazpp_1;
21
22 namespace yazpp_1 {
23     class PDU_Assoc_priv {
24         friend class PDU_Assoc;
25     private:
26         enum {
27             Connecting,
28             Listen,
29             Ready,
30             Closed,
31             Writing,
32             Accepting
33         } state;
34         class PDU_Queue {
35         public:
36             PDU_Queue(const char *buf, int len);
37             ~PDU_Queue();
38             char *m_buf;
39             int m_len;
40             PDU_Queue *m_next;
41         };
42         PDU_Assoc *pdu_parent;
43         PDU_Assoc *pdu_children;
44         PDU_Assoc *pdu_next;
45         COMSTACK cs;
46         yazpp_1::ISocketObservable *m_socketObservable;
47         char *input_buf;
48         int input_len;
49         PDU_Queue *queue_out;
50         PDU_Queue *queue_in;
51         int *destroyed;
52         int idleTime;
53         int log;
54         void init(yazpp_1::ISocketObservable *socketObservable);
55         bool m_session_is_dead;
56     };
57 }
58
59 void PDU_Assoc_priv::init(ISocketObservable *socketObservable)
60 {
61     state = Closed;
62     cs = 0;
63     m_socketObservable = socketObservable;
64     queue_out = 0;
65     queue_in = 0;
66     input_buf = 0;
67     input_len = 0;
68     pdu_children = 0;
69     pdu_parent = 0;
70     pdu_next = 0;
71     destroyed = 0;
72     idleTime = 0;
73     log = YLOG_DEBUG;
74     m_session_is_dead = false;
75 }
76
77 PDU_Assoc::~PDU_Assoc()
78 {
79     delete m_p;
80 }
81
82 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
83 {
84     m_PDU_Observer = 0;
85     m_p = new PDU_Assoc_priv;
86     m_p->init(socketObservable);
87 }
88
89 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
90                      COMSTACK cs)
91 {
92     m_PDU_Observer = 0;
93     m_p = new PDU_Assoc_priv;
94     m_p->init(socketObservable);
95     m_p->cs = cs;
96     unsigned mask = 0;
97     if (cs->io_pending & CS_WANT_WRITE)
98         mask |= SOCKET_OBSERVE_WRITE;
99     if (cs->io_pending & CS_WANT_READ)
100         mask |= SOCKET_OBSERVE_READ;
101     m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
102     if (!mask)
103     {
104         yaz_log(m_p->log, "new PDU_Assoc. Ready");
105         m_p->state = PDU_Assoc_priv::Ready;
106         flush_PDU();
107     }
108     else
109     {
110         yaz_log(m_p->log, "new PDU_Assoc. Accepting");
111         // assume comstack is accepting...
112         m_p->state = PDU_Assoc_priv::Accepting;
113         m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
114         yaz_log(m_p->log, "maskObserver 1");
115         m_p->m_socketObservable->maskObserver(this,
116                                               mask|SOCKET_OBSERVE_EXCEPT);
117     }
118 }
119
120
121 IPDU_Observable *PDU_Assoc::clone()
122 {
123     PDU_Assoc *copy = new PDU_Assoc(m_p->m_socketObservable);
124     return copy;
125 }
126
127 void PDU_Assoc::socketNotify(int event)
128 {
129     yaz_log(m_p->log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
130             this, m_p->state, event);
131     if (event & SOCKET_OBSERVE_EXCEPT)
132     {
133         shutdown();
134         m_PDU_Observer->failNotify();
135         return;
136     }
137     else if (event & SOCKET_OBSERVE_TIMEOUT)
138     {
139         m_PDU_Observer->timeoutNotify();
140         return;
141     }
142     switch (m_p->state)
143     {
144     case PDU_Assoc_priv::Accepting:
145         if (!cs_accept(m_p->cs))
146         {
147             yaz_log(m_p->log, "PDU_Assoc::cs_accept failed");
148             m_p->cs = 0;
149             shutdown();
150             m_PDU_Observer->failNotify();
151         }
152         else
153         {
154             unsigned mask = 0;
155             if (m_p->cs->io_pending & CS_WANT_WRITE)
156                 mask |= SOCKET_OBSERVE_WRITE;
157             if (m_p->cs->io_pending & CS_WANT_READ)
158                 mask |= SOCKET_OBSERVE_READ;
159             if (!mask)
160             {   // accept is complete. turn to ready state and write if needed
161                 m_p->state = PDU_Assoc_priv::Ready;
162                 flush_PDU();
163             }
164             else
165             {   // accept still incomplete.
166                 yaz_log(m_p->log, "maskObserver 2");
167                 m_p->m_socketObservable->maskObserver(this,
168                                              mask|SOCKET_OBSERVE_EXCEPT);
169             }
170         }
171         break;
172     case PDU_Assoc_priv::Connecting:
173         if (event & SOCKET_OBSERVE_READ &&
174             event & SOCKET_OBSERVE_WRITE)
175         {
176             // For Unix: if both read and write is set, then connect failed.
177             shutdown();
178             m_PDU_Observer->failNotify();
179         }
180         else
181         {
182             yaz_log(m_p->log, "cs_rcvconnect");
183             int res = cs_rcvconnect(m_p->cs);
184             if (res == 1)
185             {
186                 unsigned mask = SOCKET_OBSERVE_EXCEPT;
187                 if (m_p->cs->io_pending & CS_WANT_WRITE)
188                     mask |= SOCKET_OBSERVE_WRITE;
189                 if (m_p->cs->io_pending & CS_WANT_READ)
190                     mask |= SOCKET_OBSERVE_READ;
191                 yaz_log(m_p->log, "maskObserver 3");
192                 m_p->m_socketObservable->maskObserver(this, mask);
193             }
194             else
195             {
196                 m_p->state = PDU_Assoc_priv::Ready;
197                 if (m_PDU_Observer)
198                     m_PDU_Observer->connectNotify();
199                 flush_PDU();
200             }
201         }
202         break;
203     case PDU_Assoc_priv::Listen:
204         if (event & SOCKET_OBSERVE_READ)
205         {
206             int res;
207             COMSTACK new_line;
208
209             if ((res = cs_listen(m_p->cs, 0, 0)) == 1)
210                 return;
211             if (res < 0)
212             {
213                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
214                 return;
215             }
216             if (!(new_line = cs_accept(m_p->cs)))
217                 return;
218             /* 1. create socket-manager
219                2. create pdu-assoc
220                3. create top-level object
221                     setup observer for child fileid in pdu-assoc
222                4. start thread
223             */
224             yaz_log(m_p->log, "new session: parent fd=%d child fd=%d",
225                     cs_fileno(m_p->cs), cs_fileno(new_line));
226             childNotify(new_line);
227         }
228         break;
229     case PDU_Assoc_priv::Writing:
230         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
231             flush_PDU();
232         break;
233     case PDU_Assoc_priv::Ready:
234         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
235         {
236             do
237             {
238                 int res = cs_get(m_p->cs, &m_p->input_buf, &m_p->input_len);
239                 if (res == 1)
240                 {
241                     unsigned mask = SOCKET_OBSERVE_EXCEPT;
242                     if (m_p->cs->io_pending & CS_WANT_WRITE)
243                         mask |= SOCKET_OBSERVE_WRITE;
244                     if (m_p->cs->io_pending & CS_WANT_READ)
245                         mask |= SOCKET_OBSERVE_READ;
246                     yaz_log(m_p->log, "maskObserver 4");
247                     m_p->m_socketObservable->maskObserver(this, mask);
248                     return;
249                 }
250                 else if (res <= 0)
251                 {
252                     yaz_log(m_p->log, "PDU_Assoc::Connection closed by peer");
253                     shutdown();
254                     if (m_PDU_Observer)
255                         m_PDU_Observer->failNotify(); // problem here..
256                     return;
257                 }
258                 // lock it, so we know if recv_PDU deletes it.
259                 int destroyed = 0;
260                 m_p->destroyed = &destroyed;
261
262                 if (!m_PDU_Observer)
263                     return;
264 #if 0
265                 PDU_Assoc_priv::PDU_Queue **pq = &m_p->m_queue_in;
266                 while (*pq)
267                     pq = &(*pq)->m_next;
268
269                 *pq = new PDU_Assoc_priv::PDU_Queue(m_p->m_input_buf, res);
270 #else
271                 m_PDU_Observer->recv_PDU(m_p->input_buf, res);
272 #endif
273                 if (destroyed)   // it really was destroyed, return now.
274                     return;
275                 m_p->destroyed = 0;
276             } while (m_p->cs && cs_more(m_p->cs));
277             if (m_p->cs && m_p->state == PDU_Assoc_priv::Ready)
278             {
279                 yaz_log(m_p->log, "maskObserver 5");
280                 m_p->m_socketObservable->maskObserver(this,
281                                                       SOCKET_OBSERVE_EXCEPT|
282                                                       SOCKET_OBSERVE_READ);
283             }
284         }
285         break;
286     case PDU_Assoc_priv::Closed:
287         yaz_log(m_p->log, "CLOSING state=%d event was %d", m_p->state,
288                 event);
289         shutdown();
290         m_PDU_Observer->failNotify();
291         break;
292     default:
293         yaz_log(m_p->log, "Unknown state=%d event was %d", m_p->state, event);
294         shutdown();
295         m_PDU_Observer->failNotify();
296     }
297 }
298
299 void PDU_Assoc::close_session()
300 {
301     m_p->m_session_is_dead = true;
302     if (!m_p->queue_out)
303     {
304         shutdown();
305         m_PDU_Observer->failNotify();
306     }
307 }
308
309 void PDU_Assoc::shutdown()
310 {
311     PDU_Assoc *ch;
312     for (ch = m_p->pdu_children; ch; ch = ch->m_p->pdu_next)
313         ch->shutdown();
314
315     m_p->m_socketObservable->deleteObserver(this);
316     m_p->state = PDU_Assoc_priv::Closed;
317     if (m_p->cs)
318     {
319         yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
320         cs_close(m_p->cs);
321     }
322     m_p->cs = 0;
323     while (m_p->queue_out)
324     {
325         PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
326         m_p->queue_out = m_p->queue_out->m_next;
327         delete q_this;
328     }
329     xfree(m_p->input_buf);
330     m_p->input_buf = 0;
331     m_p->input_len = 0;
332 }
333
334 void PDU_Assoc::destroy()
335 {
336     shutdown();
337
338     if (m_p->destroyed)
339         *m_p->destroyed = 1;
340     PDU_Assoc **c;
341
342     // delete from parent's child list (if any)
343     if (m_p->pdu_parent)
344     {
345         c = &m_p->pdu_parent->m_p->pdu_children;
346         while (*c != this)
347         {
348             assert (*c);
349             c = &(*c)->m_p->pdu_next;
350         }
351         *c = (*c)->m_p->pdu_next;
352     }
353     // delete all children ...
354     c = &m_p->pdu_children;
355     while (*c)
356     {
357         PDU_Assoc *here = *c;
358         *c = (*c)->m_p->pdu_next;
359         here->m_p->pdu_parent = 0;
360         delete here;
361     }
362     yaz_log(m_p->log, "PDU_Assoc::destroy this=%p", this);
363 }
364
365 PDU_Assoc_priv::PDU_Queue::PDU_Queue(const char *buf, int len)
366 {
367     m_buf = (char *) xmalloc(len);
368     memcpy(m_buf, buf, len);
369     m_len = len;
370     m_next = 0;
371 }
372
373 PDU_Assoc_priv::PDU_Queue::~PDU_Queue()
374 {
375     xfree(m_buf);
376 }
377
378 int PDU_Assoc::flush_PDU()
379 {
380     int r;
381
382     if (m_p->state != PDU_Assoc_priv::Ready && m_p->state != PDU_Assoc_priv::Writing)
383     {
384         yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU, not ready");
385         return 1;
386     }
387     PDU_Assoc_priv::PDU_Queue *q = m_p->queue_out;
388     if (!q)
389     {
390         m_p->state = PDU_Assoc_priv::Ready;
391         yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU queue empty");
392         yaz_log(m_p->log, "maskObserver 6");
393         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
394                                               SOCKET_OBSERVE_WRITE|
395                                               SOCKET_OBSERVE_EXCEPT);
396         if (m_p->m_session_is_dead)
397         {
398             shutdown();
399             m_PDU_Observer->failNotify();
400         }
401         return 0;
402     }
403     r = cs_put(m_p->cs, q->m_buf, q->m_len);
404     if (r < 0)
405     {
406         yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put failed");
407         shutdown();
408         m_PDU_Observer->failNotify();
409         return r;
410     }
411     if (r == 1)
412     {
413         unsigned mask = SOCKET_OBSERVE_EXCEPT;
414         m_p->state = PDU_Assoc_priv::Writing;
415         if (m_p->cs->io_pending & CS_WANT_WRITE)
416             mask |= SOCKET_OBSERVE_WRITE;
417         if (m_p->cs->io_pending & CS_WANT_READ)
418             mask |= SOCKET_OBSERVE_READ;
419
420         mask |= SOCKET_OBSERVE_WRITE;
421         yaz_log(m_p->log, "maskObserver 7");
422         m_p->m_socketObservable->maskObserver(this, mask);
423         yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
424                 q->m_len, cs_fileno(m_p->cs));
425         return r;
426     }
427     yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
428     // whole packet sent... delete this and proceed to next ...
429     m_p->queue_out = q->m_next;
430     delete q;
431     // don't select on write if queue is empty ...
432     if (!m_p->queue_out)
433     {
434         m_p->state = PDU_Assoc_priv::Ready;
435         yaz_log(m_p->log, "maskObserver 8");
436         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
437                                               SOCKET_OBSERVE_EXCEPT);
438         if (m_p->m_session_is_dead)
439             shutdown();
440     }
441     return r;
442 }
443
444 int PDU_Assoc::send_PDU(const char *buf, int len)
445 {
446     yaz_log(m_p->log, "PDU_Assoc::send_PDU");
447     PDU_Assoc_priv::PDU_Queue **pq = &m_p->queue_out;
448     int is_idle = (*pq ? 0 : 1);
449
450     if (!m_p->cs)
451     {
452         yaz_log(m_p->log, "PDU_Assoc::send_PDU failed, cs == 0");
453         return -1;
454     }
455     while (*pq)
456         pq = &(*pq)->m_next;
457     *pq = new PDU_Assoc_priv::PDU_Queue(buf, len);
458     if (is_idle)
459         return flush_PDU();
460     else
461         yaz_log(m_p->log, "PDU_Assoc::cannot send_PDU fd=%d",
462                 cs_fileno(m_p->cs));
463     return 0;
464 }
465
466 COMSTACK PDU_Assoc::comstack(const char *type_and_host, void **vp)
467 {
468     return cs_create_host(type_and_host, 2, vp);
469 }
470
471 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
472 {
473     if (*addr == '\0')
474     {
475         m_p->m_socketObservable->deleteObserver(this);
476         m_p->state = PDU_Assoc_priv::Closed;
477         if (m_p->cs)
478         {
479             yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
480             cs_close(m_p->cs);
481         }
482         m_p->cs = 0;
483         while (m_p->queue_out)
484         {
485             PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
486             m_p->queue_out = m_p->queue_out->m_next;
487             delete q_this;
488         }
489         xfree(m_p->input_buf);
490         m_p->input_buf = 0;
491         m_p->input_len = 0;
492
493         return 0;
494     }
495
496     shutdown();
497
498     m_PDU_Observer = observer;
499     void *ap;
500     m_p->cs = comstack(addr, &ap);
501
502     if (!m_p->cs)
503         return -1;
504     if (cs_bind(m_p->cs, ap, CS_SERVER) < 0)
505         return -2;
506
507     int fd = cs_fileno(m_p->cs);
508 #if HAVE_FCNTL_H
509     int oldflags = fcntl(fd, F_GETFD, 0);
510     if (oldflags >= 0)
511     {
512         oldflags |= FD_CLOEXEC;
513         fcntl(fd, F_SETFD, oldflags);
514     }
515 #endif
516     m_p->m_socketObservable->addObserver(fd, this);
517     yaz_log(m_p->log, "maskObserver 9");
518     m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
519                                           SOCKET_OBSERVE_EXCEPT);
520     yaz_log(m_p->log, "PDU_Assoc::listen ok fd=%d", fd);
521     m_p->state = PDU_Assoc_priv::Listen;
522     return 0;
523 }
524
525 void PDU_Assoc::idleTime(int idleTime)
526 {
527     m_p->idleTime = idleTime;
528     yaz_log(m_p->log, "PDU_Assoc::idleTime(%d)", idleTime);
529     m_p->m_socketObservable->timeoutObserver(this, m_p->idleTime);
530 }
531
532 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
533 {
534     yaz_log(m_p->log, "PDU_Assoc::connect %s", addr);
535     shutdown();
536     m_PDU_Observer = observer;
537     void *ap;
538     m_p->cs = comstack(addr, &ap);
539     if (!m_p->cs)
540         return -1;
541     int res = cs_connect(m_p->cs, ap);
542     yaz_log(m_p->log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_p->cs),
543             res);
544     m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
545
546     if (res == 0)
547     {   // Connect complete
548         m_p->state = PDU_Assoc_priv::Connecting;
549         unsigned mask = SOCKET_OBSERVE_EXCEPT;
550         mask |= SOCKET_OBSERVE_WRITE;
551         mask |= SOCKET_OBSERVE_READ;
552         yaz_log(m_p->log, "maskObserver 11");
553         m_p->m_socketObservable->maskObserver(this, mask);
554     }
555     else if (res > 0)
556     {   // Connect pending
557         m_p->state = PDU_Assoc_priv::Connecting;
558         unsigned mask = SOCKET_OBSERVE_EXCEPT;
559         if (m_p->cs->io_pending & CS_WANT_WRITE)
560             mask |= SOCKET_OBSERVE_WRITE;
561         if (m_p->cs->io_pending & CS_WANT_READ)
562             mask |= SOCKET_OBSERVE_READ;
563         yaz_log(m_p->log, "maskObserver 11");
564         m_p->m_socketObservable->maskObserver(this, mask);
565     }
566     else
567     {   // Connect failed immediately
568         // Since m_state is Closed we can distinguish this case from
569         // normal connect in socketNotify handler
570         yaz_log(m_p->log, "maskObserver 12");
571         m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
572                                               SOCKET_OBSERVE_EXCEPT);
573     }
574     return 0;
575 }
576
577 // Single-threaded... Only useful for non-blocking handlers
578 void PDU_Assoc::childNotify(COMSTACK cs)
579 {
580     PDU_Assoc *new_observable =
581         new PDU_Assoc(m_p->m_socketObservable, cs);
582
583     // Clone PDU Observer
584     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
585         (new_observable, cs_fileno(cs));
586
587     if (!new_observable->m_PDU_Observer)
588     {
589         new_observable->shutdown();
590         delete new_observable;
591         return;
592     }
593     new_observable->m_p->pdu_next = m_p->pdu_children;
594     m_p->pdu_children = new_observable;
595     new_observable->m_p->pdu_parent = this;
596 }
597
598 const char*PDU_Assoc::getpeername()
599 {
600     if (!m_p->cs)
601         return 0;
602     return cs_addrstr(m_p->cs);
603 }
604 /*
605  * Local variables:
606  * c-basic-offset: 4
607  * c-file-style: "Stroustrup"
608  * indent-tabs-mode: nil
609  * End:
610  * vim: shiftwidth=4 tabstop=8 expandtab
611  */
612