Reformat: delete trailing whitespace
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /* This file is part of the yazpp toolkit.
2  * Copyright (C) 1998-2012 Index Data and Mike Taylor
3  * See the file LICENSE for details.
4  */
5
6 #if HAVE_CONFIG_H
7 #include <config.h>
8 #endif
9 #include <assert.h>
10 #include <string.h>
11 #include <yaz/log.h>
12 #include <yaz/tcpip.h>
13
14 #include <yazpp/pdu-assoc.h>
15
16 #if HAVE_FCNTL_H
17 #include <fcntl.h>
18 #endif
19
20 using namespace yazpp_1;
21
22 void PDU_Assoc::init(ISocketObservable *socketObservable)
23 {
24     m_state = Closed;
25     m_cs = 0;
26     m_socketObservable = socketObservable;
27     m_PDU_Observer = 0;
28     m_queue_out = 0;
29     m_queue_in = 0;
30     m_input_buf = 0;
31     m_input_len = 0;
32     m_children = 0;
33     m_parent = 0;
34     m_next = 0;
35     m_destroyed = 0;
36     m_idleTime = 0;
37     m_log = YLOG_DEBUG;
38     m_session_is_dead = false;
39 }
40
41 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
42 {
43     init (socketObservable);
44 }
45
46 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
47                      COMSTACK cs)
48 {
49     init(socketObservable);
50     m_cs = cs;
51     unsigned mask = 0;
52     if (cs->io_pending & CS_WANT_WRITE)
53         mask |= SOCKET_OBSERVE_WRITE;
54     if (cs->io_pending & CS_WANT_READ)
55         mask |= SOCKET_OBSERVE_READ;
56     m_socketObservable->addObserver(cs_fileno(cs), this);
57     if (!mask)
58     {
59         yaz_log (m_log, "new PDU_Assoc. Ready");
60         m_state = Ready;
61         flush_PDU();
62     }
63     else
64     {
65         yaz_log (m_log, "new PDU_Assoc. Accepting");
66         // assume comstack is accepting...
67         m_state = Accepting;
68         m_socketObservable->addObserver(cs_fileno(cs), this);
69         yaz_log(m_log, "maskObserver 1");
70         m_socketObservable->maskObserver(this,
71                                          mask |SOCKET_OBSERVE_EXCEPT);
72     }
73 }
74
75
76 IPDU_Observable *PDU_Assoc::clone()
77 {
78     PDU_Assoc *copy = new PDU_Assoc(m_socketObservable);
79     return copy;
80 }
81
82 void PDU_Assoc::socketNotify(int event)
83 {
84     yaz_log (m_log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
85           this, m_state, event);
86     if (event & SOCKET_OBSERVE_EXCEPT)
87     {
88         shutdown();
89         m_PDU_Observer->failNotify();
90         return;
91     }
92     else if (event & SOCKET_OBSERVE_TIMEOUT)
93     {
94         m_PDU_Observer->timeoutNotify();
95         return;
96     }
97     switch (m_state)
98     {
99     case Accepting:
100         if (!cs_accept (m_cs))
101         {
102             yaz_log (m_log, "PDU_Assoc::cs_accept failed");
103             m_cs = 0;
104             shutdown();
105             m_PDU_Observer->failNotify();
106         }
107         else
108         {
109             unsigned mask = 0;
110             if (m_cs->io_pending & CS_WANT_WRITE)
111                 mask |= SOCKET_OBSERVE_WRITE;
112             if (m_cs->io_pending & CS_WANT_READ)
113                 mask |= SOCKET_OBSERVE_READ;
114             if (!mask)
115             {   // accept is complete. turn to ready state and write if needed
116                 m_state = Ready;
117                 flush_PDU();
118             }
119             else
120             {   // accept still incomplete.
121                 yaz_log(m_log, "maskObserver 2");
122                 m_socketObservable->maskObserver(this,
123                                              mask|SOCKET_OBSERVE_EXCEPT);
124             }
125         }
126         break;
127     case Connecting:
128         if (event & SOCKET_OBSERVE_READ &&
129             event & SOCKET_OBSERVE_WRITE)
130         {
131             // For Unix: if both read and write is set, then connect failed.
132             shutdown();
133             m_PDU_Observer->failNotify();
134         }
135         else
136         {
137             yaz_log (m_log, "cs_rcvconnect");
138             int res = cs_rcvconnect (m_cs);
139             if (res == 1)
140             {
141                 unsigned mask = SOCKET_OBSERVE_EXCEPT;
142                 if (m_cs->io_pending & CS_WANT_WRITE)
143                     mask |= SOCKET_OBSERVE_WRITE;
144                 if (m_cs->io_pending & CS_WANT_READ)
145                     mask |= SOCKET_OBSERVE_READ;
146                 yaz_log(m_log, "maskObserver 3");
147                 m_socketObservable->maskObserver(this, mask);
148             }
149             else
150             {
151                 m_state = Ready;
152                 if (m_PDU_Observer)
153                     m_PDU_Observer->connectNotify();
154                 flush_PDU();
155             }
156         }
157         break;
158     case Listen:
159         if (event & SOCKET_OBSERVE_READ)
160         {
161             int res;
162             COMSTACK new_line;
163
164             if ((res = cs_listen(m_cs, 0, 0)) == 1)
165                 return;
166             if (res < 0)
167             {
168                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
169                 return;
170             }
171             if (!(new_line = cs_accept(m_cs)))
172                 return;
173             /* 1. create socket-manager
174                2. create pdu-assoc
175                3. create top-level object
176                     setup observer for child fileid in pdu-assoc
177                4. start thread
178             */
179             yaz_log (m_log, "new session: parent fd=%d child fd=%d",
180                      cs_fileno(m_cs), cs_fileno(new_line));
181             childNotify (new_line);
182         }
183         break;
184     case Writing:
185         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
186             flush_PDU();
187         break;
188     case Ready:
189         if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
190         {
191             do
192             {
193                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
194                 if (res == 1)
195                 {
196                     unsigned mask = SOCKET_OBSERVE_EXCEPT;
197                     if (m_cs->io_pending & CS_WANT_WRITE)
198                         mask |= SOCKET_OBSERVE_WRITE;
199                     if (m_cs->io_pending & CS_WANT_READ)
200                         mask |= SOCKET_OBSERVE_READ;
201                     yaz_log(m_log, "maskObserver 4");
202                     m_socketObservable->maskObserver(this, mask);
203                     return;
204                 }
205                 else if (res <= 0)
206                 {
207                     yaz_log (m_log, "PDU_Assoc::Connection closed by peer");
208                     shutdown();
209                     if (m_PDU_Observer)
210                         m_PDU_Observer->failNotify(); // problem here..
211                     return;
212                 }
213                 // lock it, so we know if recv_PDU deletes it.
214                 int destroyed = 0;
215                 m_destroyed = &destroyed;
216
217                 if (!m_PDU_Observer)
218                     return;
219 #if 0
220                 PDU_Queue **pq = &m_queue_in;
221                 while (*pq)
222                     pq = &(*pq)->m_next;
223
224                 *pq = new PDU_Queue(m_input_buf, res);
225 #else
226                 m_PDU_Observer->recv_PDU(m_input_buf, res);
227 #endif
228                 if (destroyed)   // it really was destroyed, return now.
229                     return;
230                 m_destroyed = 0;
231             } while (m_cs && cs_more (m_cs));
232             if (m_cs && m_state == Ready)
233             {
234                 yaz_log(m_log, "maskObserver 5");
235                 m_socketObservable->maskObserver(this,
236                                                  SOCKET_OBSERVE_EXCEPT|
237                                                  SOCKET_OBSERVE_READ);
238             }
239         }
240         break;
241     case Closed:
242         yaz_log (m_log, "CLOSING state=%d event was %d", m_state, event);
243         shutdown();
244         m_PDU_Observer->failNotify();
245         break;
246     default:
247         yaz_log (m_log, "Unknown state=%d event was %d", m_state, event);
248         shutdown();
249         m_PDU_Observer->failNotify();
250     }
251 }
252
253 void PDU_Assoc::close_session()
254 {
255     m_session_is_dead = true;
256     if (!m_queue_out)
257     {
258         shutdown();
259         m_PDU_Observer->failNotify();
260     }
261 }
262
263 void PDU_Assoc::shutdown()
264 {
265     PDU_Assoc *ch;
266     for (ch = m_children; ch; ch = ch->m_next)
267         ch->shutdown();
268
269     m_socketObservable->deleteObserver(this);
270     m_state = Closed;
271     if (m_cs)
272     {
273         yaz_log (m_log, "PDU_Assoc::close fd=%d", cs_fileno(m_cs));
274         cs_close (m_cs);
275     }
276     m_cs = 0;
277     while (m_queue_out)
278     {
279         PDU_Queue *q_this = m_queue_out;
280         m_queue_out = m_queue_out->m_next;
281         delete q_this;
282     }
283     xfree (m_input_buf);
284     m_input_buf = 0;
285     m_input_len = 0;
286 }
287
288 void PDU_Assoc::destroy()
289 {
290     shutdown();
291
292     if (m_destroyed)
293         *m_destroyed = 1;
294     PDU_Assoc **c;
295
296     // delete from parent's child list (if any)
297     if (m_parent)
298     {
299         c = &m_parent->m_children;
300         while (*c != this)
301         {
302             assert (*c);
303             c = &(*c)->m_next;
304         }
305         *c = (*c)->m_next;
306     }
307     // delete all children ...
308     c = &m_children;
309     while (*c)
310     {
311         PDU_Assoc *here = *c;
312         *c = (*c)->m_next;
313         here->m_parent = 0;
314         delete here;
315     }
316     yaz_log (m_log, "PDU_Assoc::destroy this=%p", this);
317 }
318
319 PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
320 {
321     m_buf = (char *) xmalloc (len);
322     memcpy (m_buf, buf, len);
323     m_len = len;
324     m_next = 0;
325 }
326
327 PDU_Assoc::PDU_Queue::~PDU_Queue()
328 {
329     xfree (m_buf);
330 }
331
332 int PDU_Assoc::flush_PDU()
333 {
334     int r;
335
336     if (m_state != Ready && m_state != Writing)
337     {
338         yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready");
339         return 1;
340     }
341     PDU_Queue *q = m_queue_out;
342     if (!q)
343     {
344         m_state = Ready;
345         yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty");
346         yaz_log(m_log, "maskObserver 6");
347         m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
348                                          SOCKET_OBSERVE_WRITE|
349                                          SOCKET_OBSERVE_EXCEPT);
350         if (m_session_is_dead)
351         {
352             shutdown();
353             m_PDU_Observer->failNotify();
354         }
355         return 0;
356     }
357     r = cs_put (m_cs, q->m_buf, q->m_len);
358     if (r < 0)
359     {
360         yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put failed");
361         shutdown();
362         m_PDU_Observer->failNotify();
363         return r;
364     }
365     if (r == 1)
366     {
367         unsigned mask = SOCKET_OBSERVE_EXCEPT;
368         m_state = Writing;
369         if (m_cs->io_pending & CS_WANT_WRITE)
370             mask |= SOCKET_OBSERVE_WRITE;
371         if (m_cs->io_pending & CS_WANT_READ)
372             mask |= SOCKET_OBSERVE_READ;
373
374         mask |= SOCKET_OBSERVE_WRITE;
375         yaz_log(m_log, "maskObserver 7");
376         m_socketObservable->maskObserver(this, mask);
377         yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
378                  q->m_len, cs_fileno(m_cs));
379         return r;
380     }
381     yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
382     // whole packet sent... delete this and proceed to next ...
383     m_queue_out = q->m_next;
384     delete q;
385     // don't select on write if queue is empty ...
386     if (!m_queue_out)
387     {
388         m_state = Ready;
389         yaz_log(m_log, "maskObserver 8");
390         m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
391                                          SOCKET_OBSERVE_EXCEPT);
392         if (m_session_is_dead)
393             shutdown();
394     }
395     return r;
396 }
397
398 int PDU_Assoc::send_PDU(const char *buf, int len)
399 {
400     yaz_log (m_log, "PDU_Assoc::send_PDU");
401     PDU_Queue **pq = &m_queue_out;
402     int is_idle = (*pq ? 0 : 1);
403
404     if (!m_cs)
405     {
406         yaz_log (m_log, "PDU_Assoc::send_PDU failed, m_cs == 0");
407         return -1;
408     }
409     while (*pq)
410         pq = &(*pq)->m_next;
411     *pq = new PDU_Queue(buf, len);
412     if (is_idle)
413         return flush_PDU ();
414     else
415         yaz_log (m_log, "PDU_Assoc::cannot send_PDU fd=%d",
416                  cs_fileno(m_cs));
417     return 0;
418 }
419
420 COMSTACK PDU_Assoc::comstack(const char *type_and_host, void **vp)
421 {
422     return cs_create_host(type_and_host, 2, vp);
423 }
424
425 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
426 {
427     if (*addr == '\0')
428     {
429         m_socketObservable->deleteObserver(this);
430         m_state = Closed;
431         if (m_cs)
432         {
433             yaz_log (m_log, "PDU_Assoc::close fd=%d", cs_fileno(m_cs));
434             cs_close (m_cs);
435         }
436         m_cs = 0;
437         while (m_queue_out)
438         {
439             PDU_Queue *q_this = m_queue_out;
440             m_queue_out = m_queue_out->m_next;
441             delete q_this;
442         }
443         xfree (m_input_buf);
444         m_input_buf = 0;
445         m_input_len = 0;
446
447         return 0;
448     }
449
450     shutdown();
451
452     m_PDU_Observer = observer;
453     void *ap;
454     m_cs = comstack(addr, &ap);
455
456     if (!m_cs)
457         return -1;
458     if (cs_bind(m_cs, ap, CS_SERVER) < 0)
459         return -2;
460
461     int fd = cs_fileno(m_cs);
462 #if HAVE_FCNTL_H
463     int oldflags = fcntl(fd, F_GETFD, 0);
464     if (oldflags >= 0)
465     {
466         oldflags |= FD_CLOEXEC;
467         fcntl(fd, F_SETFD, oldflags);
468     }
469 #endif
470     m_socketObservable->addObserver(fd, this);
471     yaz_log(m_log, "maskObserver 9");
472     m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
473                                      SOCKET_OBSERVE_EXCEPT);
474     yaz_log (m_log, "PDU_Assoc::listen ok fd=%d", fd);
475     m_state = Listen;
476     return 0;
477 }
478
479 void PDU_Assoc::idleTime(int idleTime)
480 {
481     m_idleTime = idleTime;
482     yaz_log (m_log, "PDU_Assoc::idleTime(%d)", idleTime);
483     m_socketObservable->timeoutObserver(this, m_idleTime);
484 }
485
486 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
487 {
488     yaz_log (m_log, "PDU_Assoc::connect %s", addr);
489     shutdown();
490     m_PDU_Observer = observer;
491     void *ap;
492     m_cs = comstack(addr, &ap);
493     if (!m_cs)
494         return -1;
495     int res = cs_connect (m_cs, ap);
496     yaz_log (m_log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs),
497              res);
498     m_socketObservable->addObserver(cs_fileno(m_cs), this);
499
500     if (res == 0)
501     {   // Connect complete
502         m_state = Connecting;
503         unsigned mask = SOCKET_OBSERVE_EXCEPT;
504         mask |= SOCKET_OBSERVE_WRITE;
505         mask |= SOCKET_OBSERVE_READ;
506         yaz_log(m_log, "maskObserver 11");
507         m_socketObservable->maskObserver(this, mask);
508     }
509     else if (res > 0)
510     {   // Connect pending
511         m_state = Connecting;
512         unsigned mask = SOCKET_OBSERVE_EXCEPT;
513         if (m_cs->io_pending & CS_WANT_WRITE)
514             mask |= SOCKET_OBSERVE_WRITE;
515         if (m_cs->io_pending & CS_WANT_READ)
516             mask |= SOCKET_OBSERVE_READ;
517         yaz_log(m_log, "maskObserver 11");
518         m_socketObservable->maskObserver(this, mask);
519     }
520     else
521     {   // Connect failed immediately
522         // Since m_state is Closed we can distinguish this case from
523         // normal connect in socketNotify handler
524         yaz_log(m_log, "maskObserver 12");
525         m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
526                                          SOCKET_OBSERVE_EXCEPT);
527     }
528     return 0;
529 }
530
531 // Single-threaded... Only useful for non-blocking handlers
532 void PDU_Assoc::childNotify(COMSTACK cs)
533 {
534     PDU_Assoc *new_observable =
535         new PDU_Assoc (m_socketObservable, cs);
536
537     // Clone PDU Observer
538     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
539         (new_observable, cs_fileno(cs));
540
541     if (!new_observable->m_PDU_Observer)
542     {
543         new_observable->shutdown();
544         delete new_observable;
545         return;
546     }
547     new_observable->m_next = m_children;
548     m_children = new_observable;
549     new_observable->m_parent = this;
550 }
551
552 const char*PDU_Assoc::getpeername()
553 {
554     if (!m_cs)
555         return 0;
556     return cs_addrstr(m_cs);
557 }
558 /*
559  * Local variables:
560  * c-basic-offset: 4
561  * c-file-style: "Stroustrup"
562  * indent-tabs-mode: nil
563  * End:
564  * vim: shiftwidth=4 tabstop=8 expandtab
565  */
566