disable-zoom configure option
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /*
2  * Copyright (c) 1998-2001, Index Data.
3  * See the file LICENSE for details.
4  * 
5  * $Id: yaz-pdu-assoc.cpp,v 1.28 2002-10-09 12:50:26 adam Exp $
6  */
7
8 #include <assert.h>
9 #include <string.h>
10 #include <yaz/log.h>
11 #include <yaz/tcpip.h>
12
13 #include <yaz++/pdu-assoc.h>
14
15
16 void Yaz_PDU_Assoc::init(IYazSocketObservable *socketObservable)
17 {
18     m_state = Closed;
19     m_cs = 0;
20     m_socketObservable = socketObservable;
21     m_PDU_Observer = 0;
22     m_queue_out = 0;
23     m_input_buf = 0;
24     m_input_len = 0;
25     m_children = 0;
26     m_parent = 0;
27     m_next = 0;
28     m_destroyed = 0;
29     m_idleTime = 0;
30     m_log = LOG_DEBUG;
31 }
32
33 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable)
34 {
35     init (socketObservable);
36 }
37
38 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable,
39                              COMSTACK cs)
40 {
41     init(socketObservable);
42     m_cs = cs;
43     unsigned mask = 0;
44     if (cs->io_pending & CS_WANT_WRITE)
45         mask |= YAZ_SOCKET_OBSERVE_WRITE;
46     if (cs->io_pending & CS_WANT_READ)
47         mask |= YAZ_SOCKET_OBSERVE_READ;
48     m_socketObservable->addObserver(cs_fileno(cs), this);
49     if (!mask)
50     {
51         yaz_log (m_log, "new PDU_Assoc. Ready");
52         m_state = Ready;
53         flush_PDU();
54     }
55     else
56     {
57         yaz_log (m_log, "new PDU_Assoc. Accepting");
58         // assume comstack is accepting...
59         m_state = Accepting;
60         m_socketObservable->addObserver(cs_fileno(cs), this);
61         m_socketObservable->maskObserver(this,
62                                          mask |YAZ_SOCKET_OBSERVE_EXCEPT);
63     }
64 }
65
66
67 IYaz_PDU_Observable *Yaz_PDU_Assoc::clone()
68 {
69     Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable);
70     return copy;
71 }
72
73 void Yaz_PDU_Assoc::socketNotify(int event)
74 {
75     yaz_log (m_log, "Yaz_PDU_Assoc::socketNotify p=%p state=%d event = %d",
76           this, m_state, event);
77     if (event & YAZ_SOCKET_OBSERVE_EXCEPT)
78     {
79         close();
80         m_PDU_Observer->failNotify();
81         return;
82     }
83     else if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
84     {
85         m_PDU_Observer->timeoutNotify();
86         return;
87     }
88     switch (m_state)
89     {
90     case Accepting:
91         if (!cs_accept (m_cs))
92         {
93             yaz_log (m_log, "Yaz_PDU_Assoc::cs_accept failed");
94             m_cs = 0;
95             close();
96             m_PDU_Observer->failNotify();
97         }
98         else
99         {
100             unsigned mask = 0;
101             if (m_cs->io_pending & CS_WANT_WRITE)
102                 mask |= YAZ_SOCKET_OBSERVE_WRITE;
103             if (m_cs->io_pending & CS_WANT_READ)
104                 mask |= YAZ_SOCKET_OBSERVE_READ;
105             if (!mask)
106             {   // accept is complete. turn to ready state and write if needed
107                 m_state = Ready;
108                 flush_PDU();
109             }
110             else  
111             {   // accept still incomplete.
112                 m_socketObservable->maskObserver(this,
113                                              mask|YAZ_SOCKET_OBSERVE_EXCEPT);
114             }
115         }
116         break;
117     case Connecting:
118         if (event & YAZ_SOCKET_OBSERVE_READ && 
119             event & YAZ_SOCKET_OBSERVE_WRITE)
120         {
121             // For Unix: if both read and write is set, then connect failed.
122             close();
123             m_PDU_Observer->failNotify();
124         }
125         else
126         {
127             yaz_log (m_log, "cs_rcvconnect");
128             int res = cs_rcvconnect (m_cs);
129             if (res == 1)
130             {
131                 unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
132                 if (m_cs->io_pending & CS_WANT_WRITE)
133                     mask |= YAZ_SOCKET_OBSERVE_WRITE;
134                 if (m_cs->io_pending & CS_WANT_READ)
135                     mask |= YAZ_SOCKET_OBSERVE_READ;
136                 m_socketObservable->maskObserver(this, mask);
137             }
138             else
139             {
140                 m_state = Ready;
141                 if (m_PDU_Observer)
142                     m_PDU_Observer->connectNotify();
143                 flush_PDU();
144             }
145         }
146         break;
147     case Listen:
148         if (event & YAZ_SOCKET_OBSERVE_READ)
149         {
150             int res;
151             COMSTACK new_line;
152             
153             if ((res = cs_listen(m_cs, 0, 0)) == 1)
154                 return;
155             if (res < 0)
156             {
157                 yaz_log(LOG_FATAL|LOG_ERRNO, "cs_listen failed");
158                 return;
159             }
160             if (!(new_line = cs_accept(m_cs)))
161                 return;
162             /* 1. create socket-manager 
163                2. create pdu-assoc
164                3. create top-level object
165                     setup observer for child fileid in pdu-assoc
166                4. start thread
167             */
168             yaz_log (m_log, "new session: parent fd=%d child fd=%d",
169                      cs_fileno(m_cs), cs_fileno(new_line));
170             childNotify (new_line);
171         }
172         break;
173     case Writing:
174         if (event & (YAZ_SOCKET_OBSERVE_READ|YAZ_SOCKET_OBSERVE_WRITE))
175             flush_PDU();
176         break;
177     case Ready:
178         if (event & (YAZ_SOCKET_OBSERVE_READ|YAZ_SOCKET_OBSERVE_WRITE))
179         {
180             do
181             {
182                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
183                 if (res == 1)
184                 {
185                     unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
186                     if (m_cs->io_pending & CS_WANT_WRITE)
187                         mask |= YAZ_SOCKET_OBSERVE_WRITE;
188                     if (m_cs->io_pending & CS_WANT_READ)
189                         mask |= YAZ_SOCKET_OBSERVE_READ;
190                     m_socketObservable->maskObserver(this, mask);
191                     return;
192                 }
193                 else if (res <= 0)
194                 {
195                     yaz_log (m_log, "Yaz_PDU_Assoc::Connection closed by peer");
196                     close();
197                     if (m_PDU_Observer)
198                         m_PDU_Observer->failNotify(); // problem here..
199                     return;
200                 }
201                 // lock it, so we know if recv_PDU deletes it.
202                 int destroyed = 0;
203                 m_destroyed = &destroyed;
204
205                 if (!m_PDU_Observer)
206                     return;
207
208                 m_PDU_Observer->recv_PDU(m_input_buf, res);
209                 m_destroyed = 0;
210                 if (destroyed)   // it really was destroyed, return now.
211                     return;
212             } while (m_cs && cs_more (m_cs));
213             if (m_cs)
214                 m_socketObservable->maskObserver(this,
215                                                  YAZ_SOCKET_OBSERVE_EXCEPT|
216                                                  YAZ_SOCKET_OBSERVE_READ);
217         }
218         break;
219     case Closed:
220         yaz_log (m_log, "CLOSING state=%d event was %d", m_state, event);
221         close();
222         m_PDU_Observer->failNotify();
223         break;
224     default:
225         yaz_log (m_log, "Unknown state=%d event was %d", m_state, event);
226         close();
227         m_PDU_Observer->failNotify();
228     }
229 }
230
231 void Yaz_PDU_Assoc::close()
232 {
233     Yaz_PDU_Assoc *ch;
234     for (ch = m_children; ch; ch = ch->m_next)
235         ch->close();
236
237     m_socketObservable->deleteObserver(this);
238     m_state = Closed;
239     if (m_cs)
240     {
241         yaz_log (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs));
242         cs_close (m_cs);
243     }
244     m_cs = 0;
245     while (m_queue_out)
246     {
247         PDU_Queue *q_this = m_queue_out;
248         m_queue_out = m_queue_out->m_next;
249         delete q_this;
250     }
251     xfree (m_input_buf);
252     m_input_buf = 0;
253     m_input_len = 0;
254 }
255
256 void Yaz_PDU_Assoc::destroy()
257 {
258     close();
259
260     if (m_destroyed)
261         *m_destroyed = 1;
262     Yaz_PDU_Assoc **c;
263
264     // delete from parent's child list (if any)
265     if (m_parent)
266     {
267         c = &m_parent->m_children;
268         while (*c != this)
269         {
270             assert (*c);
271             c = &(*c)->m_next;
272         }
273         *c = (*c)->m_next;
274     }
275     // delete all children ...
276     c = &m_children;
277     while (*c)
278     {
279         Yaz_PDU_Assoc *here = *c;
280         *c = (*c)->m_next;
281         here->m_parent = 0;
282         delete here;
283     }
284     yaz_log (m_log, "Yaz_PDU_Assoc::destroy this=%p", this);
285 }
286
287 Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
288 {
289     m_buf = (char *) xmalloc (len);
290     memcpy (m_buf, buf, len);
291     m_len = len;
292     m_next = 0;
293 }
294
295 Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue()
296 {
297     xfree (m_buf);
298 }
299
300 int Yaz_PDU_Assoc::flush_PDU()
301 {
302     int r;
303     
304     if (m_state != Ready && m_state != Writing)
305     {
306         yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready");
307         return 1;
308     }
309     PDU_Queue *q = m_queue_out;
310     if (!q)
311     {
312         m_state = Ready;
313         yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty");
314         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
315                                          YAZ_SOCKET_OBSERVE_WRITE|
316                                          YAZ_SOCKET_OBSERVE_EXCEPT);
317         return 0;
318     }
319     r = cs_put (m_cs, q->m_buf, q->m_len);
320     if (r < 0)
321     {
322         yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put failed");
323         close();
324         m_PDU_Observer->failNotify();
325         return r;
326     }
327     if (r == 1)
328     {
329         unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
330         m_state = Writing;
331         if (m_cs->io_pending & CS_WANT_WRITE)
332             mask |= YAZ_SOCKET_OBSERVE_WRITE;
333         if (m_cs->io_pending & CS_WANT_READ)
334             mask |= YAZ_SOCKET_OBSERVE_READ;
335  
336         m_socketObservable->maskObserver(this, mask);
337         yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes (incomp)",
338                  q->m_len);
339         return r;
340     } 
341     m_state = Ready;
342     yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
343     // whole packet sent... delete this and proceed to next ...
344     m_queue_out = q->m_next;
345     delete q;
346     // don't select on write if queue is empty ...
347     if (!m_queue_out)
348         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
349                                          YAZ_SOCKET_OBSERVE_EXCEPT);
350     return r;
351 }
352
353 int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
354 {
355     yaz_log (m_log, "Yaz_PDU_Assoc::send_PDU");
356     PDU_Queue **pq = &m_queue_out;
357     int is_idle = (*pq ? 0 : 1);
358     
359     if (!m_cs)
360     {
361         yaz_log (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0");
362         return -1;
363     }
364     while (*pq)
365         pq = &(*pq)->m_next;
366     *pq = new PDU_Queue(buf, len);
367     if (is_idle)
368         return flush_PDU ();
369     else
370         yaz_log (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d",
371                  cs_fileno(m_cs));
372     return 0;
373 }
374
375 COMSTACK Yaz_PDU_Assoc::comstack(const char *type_and_host, void **vp)
376 {
377     return cs_create_host(type_and_host, 0, vp);
378 }
379
380 void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
381                            const char *addr)
382 {
383     close();
384
385     yaz_log (LOG_LOG, "Adding listener %s", addr);
386
387     m_PDU_Observer = observer;
388     void *ap;
389     m_cs = comstack(addr, &ap);
390
391     if (!m_cs)
392         return;
393     if (cs_bind(m_cs, ap, CS_SERVER) < 0)
394         return;
395     m_socketObservable->addObserver(cs_fileno(m_cs), this);
396     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
397                                      YAZ_SOCKET_OBSERVE_EXCEPT);
398     yaz_log (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(m_cs));
399     m_state = Listen;
400 }
401
402 void Yaz_PDU_Assoc::idleTime(int idleTime)
403 {
404     m_idleTime = idleTime;
405     yaz_log (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime);
406     m_socketObservable->timeoutObserver(this, m_idleTime);
407 }
408
409 void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
410                             const char *addr)
411 {
412     yaz_log (m_log, "Yaz_PDU_Assoc::connect %s", addr);
413     close();
414     m_PDU_Observer = observer;
415     void *ap;
416     m_cs = comstack(addr, &ap);
417     int res = cs_connect (m_cs, ap);
418     yaz_log (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs),
419              res);
420     m_socketObservable->addObserver(cs_fileno(m_cs), this);
421
422     if (res >= 0)
423     {   // Connect pending or complete
424         m_state = Connecting;
425         unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
426         if (m_cs->io_pending & CS_WANT_WRITE)
427             mask |= YAZ_SOCKET_OBSERVE_WRITE;
428         if (m_cs->io_pending & CS_WANT_READ)
429             mask |= YAZ_SOCKET_OBSERVE_READ;
430         m_socketObservable->maskObserver(this, mask);
431     }
432     else
433     {   // Connect failed immediately
434         // Since m_state is Closed we can distinguish this case from
435         // normal connect in socketNotify handler
436         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_WRITE|
437                                          YAZ_SOCKET_OBSERVE_EXCEPT);
438     }
439 }
440
441 // Single-threaded... Only useful for non-blocking handlers
442 void Yaz_PDU_Assoc::childNotify(COMSTACK cs)
443 {
444     Yaz_PDU_Assoc *new_observable =
445         new Yaz_PDU_Assoc (m_socketObservable, cs);
446     
447     new_observable->m_next = m_children;
448     m_children = new_observable;
449     new_observable->m_parent = this;
450
451     // Clone PDU Observer
452     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
453         (new_observable, cs_fileno(cs));
454 }