a7fc3497f860461d2607649362aa9cb8e38a9433
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /*
2  * Copyright (c) 1998-2001, Index Data.
3  * See the file LICENSE for details.
4  * 
5  * $Id: yaz-pdu-assoc.cpp,v 1.30 2003-10-01 13:13:51 adam Exp $
6  */
7
8 #include <assert.h>
9 #include <string.h>
10 #include <yaz/log.h>
11 #include <yaz/tcpip.h>
12
13 #include <yaz++/pdu-assoc.h>
14
15
16 void Yaz_PDU_Assoc::init(IYazSocketObservable *socketObservable)
17 {
18     m_state = Closed;
19     m_cs = 0;
20     m_socketObservable = socketObservable;
21     m_PDU_Observer = 0;
22     m_queue_out = 0;
23     m_input_buf = 0;
24     m_input_len = 0;
25     m_children = 0;
26     m_parent = 0;
27     m_next = 0;
28     m_destroyed = 0;
29     m_idleTime = 0;
30     m_log = LOG_DEBUG;
31 }
32
33 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable)
34 {
35     init (socketObservable);
36 }
37
38 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable,
39                              COMSTACK cs)
40 {
41     init(socketObservable);
42     m_cs = cs;
43     unsigned mask = 0;
44     if (cs->io_pending & CS_WANT_WRITE)
45         mask |= YAZ_SOCKET_OBSERVE_WRITE;
46     if (cs->io_pending & CS_WANT_READ)
47         mask |= YAZ_SOCKET_OBSERVE_READ;
48     m_socketObservable->addObserver(cs_fileno(cs), this);
49     if (!mask)
50     {
51         yaz_log (m_log, "new PDU_Assoc. Ready");
52         m_state = Ready;
53         flush_PDU();
54     }
55     else
56     {
57         yaz_log (m_log, "new PDU_Assoc. Accepting");
58         // assume comstack is accepting...
59         m_state = Accepting;
60         m_socketObservable->addObserver(cs_fileno(cs), this);
61         yaz_log(m_log, "maskObserver 1");
62         m_socketObservable->maskObserver(this,
63                                          mask |YAZ_SOCKET_OBSERVE_EXCEPT);
64     }
65 }
66
67
68 IYaz_PDU_Observable *Yaz_PDU_Assoc::clone()
69 {
70     Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable);
71     return copy;
72 }
73
74 void Yaz_PDU_Assoc::socketNotify(int event)
75 {
76     yaz_log (m_log, "Yaz_PDU_Assoc::socketNotify p=%p state=%d event = %d",
77           this, m_state, event);
78     if (event & YAZ_SOCKET_OBSERVE_EXCEPT)
79     {
80         close();
81         m_PDU_Observer->failNotify();
82         return;
83     }
84     else if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
85     {
86         m_PDU_Observer->timeoutNotify();
87         return;
88     }
89     switch (m_state)
90     {
91     case Accepting:
92         if (!cs_accept (m_cs))
93         {
94             yaz_log (m_log, "Yaz_PDU_Assoc::cs_accept failed");
95             m_cs = 0;
96             close();
97             m_PDU_Observer->failNotify();
98         }
99         else
100         {
101             unsigned mask = 0;
102             if (m_cs->io_pending & CS_WANT_WRITE)
103                 mask |= YAZ_SOCKET_OBSERVE_WRITE;
104             if (m_cs->io_pending & CS_WANT_READ)
105                 mask |= YAZ_SOCKET_OBSERVE_READ;
106             if (!mask)
107             {   // accept is complete. turn to ready state and write if needed
108                 m_state = Ready;
109                 flush_PDU();
110             }
111             else  
112             {   // accept still incomplete.
113                 yaz_log(m_log, "maskObserver 2");
114                 m_socketObservable->maskObserver(this,
115                                              mask|YAZ_SOCKET_OBSERVE_EXCEPT);
116             }
117         }
118         break;
119     case Connecting:
120         if (event & YAZ_SOCKET_OBSERVE_READ && 
121             event & YAZ_SOCKET_OBSERVE_WRITE)
122         {
123             // For Unix: if both read and write is set, then connect failed.
124             close();
125             m_PDU_Observer->failNotify();
126         }
127         else
128         {
129             yaz_log (m_log, "cs_rcvconnect");
130             int res = cs_rcvconnect (m_cs);
131             if (res == 1)
132             {
133                 unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
134                 if (m_cs->io_pending & CS_WANT_WRITE)
135                     mask |= YAZ_SOCKET_OBSERVE_WRITE;
136                 if (m_cs->io_pending & CS_WANT_READ)
137                     mask |= YAZ_SOCKET_OBSERVE_READ;
138                 yaz_log(m_log, "maskObserver 3");
139                 m_socketObservable->maskObserver(this, mask);
140             }
141             else
142             {
143                 m_state = Ready;
144                 if (m_PDU_Observer)
145                     m_PDU_Observer->connectNotify();
146                 flush_PDU();
147             }
148         }
149         break;
150     case Listen:
151         if (event & YAZ_SOCKET_OBSERVE_READ)
152         {
153             int res;
154             COMSTACK new_line;
155             
156             if ((res = cs_listen(m_cs, 0, 0)) == 1)
157                 return;
158             if (res < 0)
159             {
160                 yaz_log(LOG_FATAL|LOG_ERRNO, "cs_listen failed");
161                 return;
162             }
163             if (!(new_line = cs_accept(m_cs)))
164                 return;
165             /* 1. create socket-manager 
166                2. create pdu-assoc
167                3. create top-level object
168                     setup observer for child fileid in pdu-assoc
169                4. start thread
170             */
171             yaz_log (m_log, "new session: parent fd=%d child fd=%d",
172                      cs_fileno(m_cs), cs_fileno(new_line));
173             childNotify (new_line);
174         }
175         break;
176     case Writing:
177         if (event & (YAZ_SOCKET_OBSERVE_READ|YAZ_SOCKET_OBSERVE_WRITE))
178             flush_PDU();
179         break;
180     case Ready:
181         if (event & (YAZ_SOCKET_OBSERVE_READ|YAZ_SOCKET_OBSERVE_WRITE))
182         {
183             do
184             {
185                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
186                 if (res == 1)
187                 {
188                     unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
189                     if (m_cs->io_pending & CS_WANT_WRITE)
190                         mask |= YAZ_SOCKET_OBSERVE_WRITE;
191                     if (m_cs->io_pending & CS_WANT_READ)
192                         mask |= YAZ_SOCKET_OBSERVE_READ;
193                     yaz_log(m_log, "maskObserver 4");
194                     m_socketObservable->maskObserver(this, mask);
195                     return;
196                 }
197                 else if (res <= 0)
198                 {
199                     yaz_log (m_log, "Yaz_PDU_Assoc::Connection closed by peer");
200                     close();
201                     if (m_PDU_Observer)
202                         m_PDU_Observer->failNotify(); // problem here..
203                     return;
204                 }
205                 // lock it, so we know if recv_PDU deletes it.
206                 int destroyed = 0;
207                 m_destroyed = &destroyed;
208
209                 if (!m_PDU_Observer)
210                     return;
211
212                 m_PDU_Observer->recv_PDU(m_input_buf, res);
213                 m_destroyed = 0;
214                 if (destroyed)   // it really was destroyed, return now.
215                     return;
216             } while (m_cs && cs_more (m_cs));
217             if (m_cs && m_state == Ready)
218             {
219                 yaz_log(m_log, "maskObserver 5");
220                 m_socketObservable->maskObserver(this,
221                                                  YAZ_SOCKET_OBSERVE_EXCEPT|
222                                                  YAZ_SOCKET_OBSERVE_READ);
223             }
224         }
225         break;
226     case Closed:
227         yaz_log (m_log, "CLOSING state=%d event was %d", m_state, event);
228         close();
229         m_PDU_Observer->failNotify();
230         break;
231     default:
232         yaz_log (m_log, "Unknown state=%d event was %d", m_state, event);
233         close();
234         m_PDU_Observer->failNotify();
235     }
236 }
237
238 void Yaz_PDU_Assoc::close()
239 {
240     Yaz_PDU_Assoc *ch;
241     for (ch = m_children; ch; ch = ch->m_next)
242         ch->close();
243
244     m_socketObservable->deleteObserver(this);
245     m_state = Closed;
246     if (m_cs)
247     {
248         yaz_log (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs));
249         cs_close (m_cs);
250     }
251     m_cs = 0;
252     while (m_queue_out)
253     {
254         PDU_Queue *q_this = m_queue_out;
255         m_queue_out = m_queue_out->m_next;
256         delete q_this;
257     }
258     xfree (m_input_buf);
259     m_input_buf = 0;
260     m_input_len = 0;
261 }
262
263 void Yaz_PDU_Assoc::destroy()
264 {
265     close();
266
267     if (m_destroyed)
268         *m_destroyed = 1;
269     Yaz_PDU_Assoc **c;
270
271     // delete from parent's child list (if any)
272     if (m_parent)
273     {
274         c = &m_parent->m_children;
275         while (*c != this)
276         {
277             assert (*c);
278             c = &(*c)->m_next;
279         }
280         *c = (*c)->m_next;
281     }
282     // delete all children ...
283     c = &m_children;
284     while (*c)
285     {
286         Yaz_PDU_Assoc *here = *c;
287         *c = (*c)->m_next;
288         here->m_parent = 0;
289         delete here;
290     }
291     yaz_log (m_log, "Yaz_PDU_Assoc::destroy this=%p", this);
292 }
293
294 Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
295 {
296     m_buf = (char *) xmalloc (len);
297     memcpy (m_buf, buf, len);
298     m_len = len;
299     m_next = 0;
300 }
301
302 Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue()
303 {
304     xfree (m_buf);
305 }
306
307 int Yaz_PDU_Assoc::flush_PDU()
308 {
309     int r;
310     
311     if (m_state != Ready && m_state != Writing)
312     {
313         yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready");
314         return 1;
315     }
316     PDU_Queue *q = m_queue_out;
317     if (!q)
318     {
319         m_state = Ready;
320         yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty");
321         yaz_log(m_log, "maskObserver 6");
322         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
323                                          YAZ_SOCKET_OBSERVE_WRITE|
324                                          YAZ_SOCKET_OBSERVE_EXCEPT);
325         return 0;
326     }
327     r = cs_put (m_cs, q->m_buf, q->m_len);
328     if (r < 0)
329     {
330         yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put failed");
331         close();
332         m_PDU_Observer->failNotify();
333         return r;
334     }
335     if (r == 1)
336     {
337         unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
338         m_state = Writing;
339         if (m_cs->io_pending & CS_WANT_WRITE)
340             mask |= YAZ_SOCKET_OBSERVE_WRITE;
341         if (m_cs->io_pending & CS_WANT_READ)
342             mask |= YAZ_SOCKET_OBSERVE_READ;
343
344         mask |= YAZ_SOCKET_OBSERVE_WRITE;
345         yaz_log(m_log, "maskObserver 7");
346         m_socketObservable->maskObserver(this, mask);
347         yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
348                  q->m_len, cs_fileno(m_cs));
349         return r;
350     } 
351     yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
352     // whole packet sent... delete this and proceed to next ...
353     m_queue_out = q->m_next;
354     delete q;
355     // don't select on write if queue is empty ...
356     if (!m_queue_out)
357     {
358         m_state = Ready;
359         yaz_log(m_log, "maskObserver 8");
360         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
361                                          YAZ_SOCKET_OBSERVE_EXCEPT);
362     }
363     return r;
364 }
365
366 int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
367 {
368     yaz_log (m_log, "Yaz_PDU_Assoc::send_PDU");
369     PDU_Queue **pq = &m_queue_out;
370     int is_idle = (*pq ? 0 : 1);
371     
372     if (!m_cs)
373     {
374         yaz_log (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0");
375         return -1;
376     }
377     while (*pq)
378         pq = &(*pq)->m_next;
379     *pq = new PDU_Queue(buf, len);
380     if (is_idle)
381         return flush_PDU ();
382     else
383         yaz_log (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d",
384                  cs_fileno(m_cs));
385     return 0;
386 }
387
388 COMSTACK Yaz_PDU_Assoc::comstack(const char *type_and_host, void **vp)
389 {
390     return cs_create_host(type_and_host, 0, vp);
391 }
392
393 void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
394                            const char *addr)
395 {
396     close();
397
398     yaz_log (LOG_LOG, "Adding listener %s", addr);
399
400     m_PDU_Observer = observer;
401     void *ap;
402     m_cs = comstack(addr, &ap);
403
404     if (!m_cs)
405         return;
406     if (cs_bind(m_cs, ap, CS_SERVER) < 0)
407         return;
408     m_socketObservable->addObserver(cs_fileno(m_cs), this);
409     yaz_log(m_log, "maskObserver 9");
410     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
411                                      YAZ_SOCKET_OBSERVE_EXCEPT);
412     yaz_log (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(m_cs));
413     m_state = Listen;
414 }
415
416 void Yaz_PDU_Assoc::idleTime(int idleTime)
417 {
418     m_idleTime = idleTime;
419     yaz_log (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime);
420     m_socketObservable->timeoutObserver(this, m_idleTime);
421 }
422
423 void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
424                             const char *addr)
425 {
426     yaz_log (m_log, "Yaz_PDU_Assoc::connect %s", addr);
427     close();
428     m_PDU_Observer = observer;
429     void *ap;
430     m_cs = comstack(addr, &ap);
431     int res = cs_connect (m_cs, ap);
432     yaz_log (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs),
433              res);
434     m_socketObservable->addObserver(cs_fileno(m_cs), this);
435
436     if (res == 0)
437     {   // Connect complete
438         m_state = Connecting;
439         unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
440         mask |= YAZ_SOCKET_OBSERVE_WRITE;
441         mask |= YAZ_SOCKET_OBSERVE_READ;
442         yaz_log(m_log, "maskObserver 11");
443         m_socketObservable->maskObserver(this, mask);
444     }
445     else if (res > 0)
446     {   // Connect pending
447         m_state = Connecting;
448         unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
449         if (m_cs->io_pending & CS_WANT_WRITE)
450             mask |= YAZ_SOCKET_OBSERVE_WRITE;
451         if (m_cs->io_pending & CS_WANT_READ)
452             mask |= YAZ_SOCKET_OBSERVE_READ;
453         yaz_log(m_log, "maskObserver 11");
454         m_socketObservable->maskObserver(this, mask);
455     }
456     else
457     {   // Connect failed immediately
458         // Since m_state is Closed we can distinguish this case from
459         // normal connect in socketNotify handler
460         yaz_log(m_log, "maskObserver 12");
461         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_WRITE|
462                                          YAZ_SOCKET_OBSERVE_EXCEPT);
463     }
464 }
465
466 // Single-threaded... Only useful for non-blocking handlers
467 void Yaz_PDU_Assoc::childNotify(COMSTACK cs)
468 {
469     Yaz_PDU_Assoc *new_observable =
470         new Yaz_PDU_Assoc (m_socketObservable, cs);
471     
472     new_observable->m_next = m_children;
473     m_children = new_observable;
474     new_observable->m_parent = this;
475
476     // Clone PDU Observer
477     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
478         (new_observable, cs_fileno(cs));
479 }
480
481 const char*Yaz_PDU_Assoc::getpeername()
482 {
483     return cs_addrstr(m_cs);
484 }