PDU_Assoc keeps track of children. Using yaz_log instead of logf.
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /*
2  * Copyright (c) 1998-2001, Index Data.
3  * See the file LICENSE for details.
4  * 
5  * $Log: yaz-pdu-assoc.cpp,v $
6  * Revision 1.24  2001-08-13 16:39:12  adam
7  * PDU_Assoc keeps track of children. Using yaz_log instead of logf.
8  *
9  * Revision 1.23  2001/03/26 14:43:49  adam
10  * New threaded PDU association.
11  *
12  * Revision 1.22  2001/01/29 11:18:24  adam
13  * Server sets OPTIONS search and present.
14  *
15  * Revision 1.21  2000/11/20 14:17:36  adam
16  * Yet another WIN32 fix for connect notify.
17  *
18  * Revision 1.20  2000/11/20 11:27:33  adam
19  * Fixes for connect operation (timeout and notify fix).
20  *
21  * Revision 1.19  2000/11/01 14:22:59  adam
22  * Added fd parameter for method IYaz_PDU_Observer::clone.
23  *
24  * Revision 1.18  2000/10/24 12:29:57  adam
25  * Fixed bug in proxy where a Yaz_ProxyClient could be owned by
26  * two Yaz_Proxy's (fatal).
27  *
28  * Revision 1.17  2000/10/11 11:58:16  adam
29  * Moved header files to include/yaz++. Switched to libtool and automake.
30  * Configure script creates yaz++-config script.
31  *
32  * Revision 1.16  2000/09/22 09:54:11  heikki
33  * minor
34  *
35  * Revision 1.15  2000/09/21 21:43:20  adam
36  * Better high-level server API.
37  *
38  * Revision 1.14  2000/09/12 12:09:53  adam
39  * More work on high-level server.
40  *
41  * Revision 1.13  2000/09/08 10:23:42  adam
42  * Added skeleton of yaz-z-server.
43  *
44  * Revision 1.12  2000/09/06 14:23:45  adam
45  * WIN32 updates.
46  *
47  * Revision 1.11  2000/09/04 08:29:22  adam
48  * Fixed memory leak(s). Added re-use of associations, rather than
49  * re-init, when maximum number of targets are in use.
50  *
51  * Revision 1.10  2000/08/10 08:42:42  adam
52  * Fixes for {set,get}_APDU_log.
53  *
54  * Revision 1.9  1999/12/06 13:52:45  adam
55  * Modified for new location of YAZ header files. Experimental threaded
56  * operation.
57  *
58  * Revision 1.8  1999/04/28 13:04:03  adam
59  * Fixed setting of proxy otherInfo so that database(s) are removed.
60  *
61  * Revision 1.7  1999/04/21 12:09:01  adam
62  * Many improvements. Modified to proxy server to work with "sessions"
63  * based on cookies.
64  *
65  * Revision 1.6  1999/04/20 10:30:05  adam
66  * Implemented various stuff for client and proxy. Updated calls
67  * to ODR to reflect new name parameter.
68  *g
69  * Revision 1.5  1999/04/09 11:46:57  adam
70  * Added object Yaz_Z_Assoc. Much more functional client.
71  *
72  * Revision 1.4  1999/03/23 14:17:57  adam
73  * More work on timeout handling. Work on yaz-client.
74  *
75  * Revision 1.3  1999/02/02 14:01:20  adam
76  * First WIN32 port of YAZ++.
77  *
78  * Revision 1.2  1999/01/28 13:08:44  adam
79  * Yaz_PDU_Assoc better encapsulated. Memory leak fix in
80  * yaz-socket-manager.cc.
81  *
82  * Revision 1.1.1.1  1999/01/28 09:41:07  adam
83  * First implementation of YAZ++.
84  *
85  */
86
87 #include <assert.h>
88 #include <string.h>
89 #include <yaz/log.h>
90 #include <yaz/tcpip.h>
91
92 #include <yaz++/yaz-pdu-assoc.h>
93
94
95 void Yaz_PDU_Assoc::init(IYazSocketObservable *socketObservable)
96 {
97     m_state = Closed;
98     m_cs = 0;
99     m_socketObservable = socketObservable;
100     m_PDU_Observer = 0;
101     m_queue_out = 0;
102     m_input_buf = 0;
103     m_input_len = 0;
104     m_children = 0;
105     m_parent = 0;
106     m_next = 0;
107     m_destroyed = 0;
108     m_idleTime = 0;
109     m_log = LOG_DEBUG;
110 }
111
112 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable)
113 {
114     init (socketObservable);
115 }
116
117 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable,
118                              COMSTACK cs)
119 {
120     init(socketObservable);
121     m_cs = cs;
122     unsigned mask = 0;
123     if (cs->io_pending & CS_WANT_WRITE)
124         mask |= YAZ_SOCKET_OBSERVE_WRITE;
125     if (cs->io_pending & CS_WANT_READ)
126         mask |= YAZ_SOCKET_OBSERVE_READ;
127     m_socketObservable->addObserver(cs_fileno(cs), this);
128     if (!mask)
129     {
130         yaz_log (m_log, "new PDU_Assoc. Ready");
131         m_state = Ready;
132         flush_PDU();
133     }
134     else
135     {
136         yaz_log (m_log, "new PDU_Assoc. Accepting");
137         // assume comstack is accepting...
138         m_state = Accepting;
139         m_socketObservable->addObserver(cs_fileno(cs), this);
140         m_socketObservable->maskObserver(this,
141                                          mask |YAZ_SOCKET_OBSERVE_EXCEPT);
142     }
143 }
144
145
146 IYaz_PDU_Observable *Yaz_PDU_Assoc::clone()
147 {
148     Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable);
149     return copy;
150 }
151
152 void Yaz_PDU_Assoc::socketNotify(int event)
153 {
154     yaz_log (m_log, "Yaz_PDU_Assoc::socketNotify p=%p state=%d event = %d",
155           this, m_state, event);
156     if (event & YAZ_SOCKET_OBSERVE_EXCEPT)
157     {
158         close();
159         m_PDU_Observer->failNotify();
160         return;
161     }
162     else if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
163     {
164         m_PDU_Observer->timeoutNotify();
165         return;
166     }
167     switch (m_state)
168     {
169     case Accepting:
170         if (!cs_accept (m_cs))
171         {
172             yaz_log (m_log, "Yaz_PDU_Assoc::cs_accept failed");
173             m_cs = 0;
174             close();
175             m_PDU_Observer->failNotify();
176         }
177         else
178         {
179             unsigned mask = 0;
180             if (m_cs->io_pending & CS_WANT_WRITE)
181                 mask |= YAZ_SOCKET_OBSERVE_WRITE;
182             if (m_cs->io_pending & CS_WANT_READ)
183                 mask |= YAZ_SOCKET_OBSERVE_READ;
184             if (!mask)
185             {   // accept is complete. turn to ready state and write if needed
186                 m_state = Ready;
187                 flush_PDU();
188             }
189             else  
190             {   // accept still incomplete.
191                 m_socketObservable->maskObserver(this,
192                                              mask|YAZ_SOCKET_OBSERVE_EXCEPT);
193             }
194         }
195         break;
196     case Connecting:
197         if (event & YAZ_SOCKET_OBSERVE_READ && 
198             event & YAZ_SOCKET_OBSERVE_WRITE)
199         {
200             // For Unix: if both read and write is set, then connect failed.
201             close();
202             m_PDU_Observer->failNotify();
203         }
204         else
205         {
206             yaz_log (m_log, "cs_connect again");
207             int res = cs_connect (m_cs, 0);
208             if (res == 1)
209             {
210                 unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
211                 if (m_cs->io_pending & CS_WANT_WRITE)
212                     mask |= YAZ_SOCKET_OBSERVE_WRITE;
213                 if (m_cs->io_pending & CS_WANT_READ)
214                     mask |= YAZ_SOCKET_OBSERVE_READ;
215                 m_socketObservable->maskObserver(this, mask);
216             }
217             else
218             {
219                 m_state = Ready;
220                 if (m_PDU_Observer)
221                     m_PDU_Observer->connectNotify();
222                 flush_PDU();
223             }
224         }
225         break;
226     case Listen:
227         if (event & YAZ_SOCKET_OBSERVE_READ)
228         {
229             int res;
230             COMSTACK new_line;
231             
232             if ((res = cs_listen(m_cs, 0, 0)) == 1)
233                 return;
234             if (res < 0)
235             {
236                 yaz_log(LOG_FATAL|LOG_ERRNO, "cs_listen failed");
237                 return;
238             }
239             if (!(new_line = cs_accept(m_cs)))
240                 return;
241             /* 1. create socket-manager 
242                2. create pdu-assoc
243                3. create top-level object
244                     setup observer for child fileid in pdu-assoc
245                4. start thread
246             */
247             yaz_log (m_log, "new session: parent fd=%d child fd=%d",
248                      cs_fileno(m_cs), cs_fileno(new_line));
249             childNotify (new_line);
250         }
251         break;
252     case Writing:
253         if (event & (YAZ_SOCKET_OBSERVE_READ|YAZ_SOCKET_OBSERVE_WRITE))
254             flush_PDU();
255         break;
256     case Ready:
257         if (event & (YAZ_SOCKET_OBSERVE_READ|YAZ_SOCKET_OBSERVE_WRITE))
258         {
259             do
260             {
261                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
262                 if (res == 1)
263                 {
264                     unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
265                     if (m_cs->io_pending & CS_WANT_WRITE)
266                         mask |= YAZ_SOCKET_OBSERVE_WRITE;
267                     if (m_cs->io_pending & CS_WANT_READ)
268                         mask |= YAZ_SOCKET_OBSERVE_READ;
269                     m_socketObservable->maskObserver(this, mask);
270                     return;
271                 }
272                 else if (res <= 0)
273                 {
274                     yaz_log (m_log, "Yaz_PDU_Assoc::Connection closed by peer");
275                     close();
276                     if (m_PDU_Observer)
277                         m_PDU_Observer->failNotify(); // problem here..
278                     return;
279                 }
280                 // lock it, so we know if recv_PDU deletes it.
281                 int destroyed = 0;
282                 m_destroyed = &destroyed;
283
284                 if (!m_PDU_Observer)
285                     return;
286
287                 m_PDU_Observer->recv_PDU(m_input_buf, res);
288                 m_destroyed = 0;
289                 if (destroyed)   // it really was destroyed, return now.
290                     return;
291             } while (m_cs && cs_more (m_cs));
292             if (m_cs)
293                 m_socketObservable->maskObserver(this,
294                                                  YAZ_SOCKET_OBSERVE_EXCEPT|
295                                                  YAZ_SOCKET_OBSERVE_READ);
296         }
297         break;
298     case Closed:
299         yaz_log (m_log, "CLOSING state=%d event was %d", m_state, event);
300         close();
301         m_PDU_Observer->failNotify();
302         break;
303     default:
304         yaz_log (m_log, "Unknown state=%d event was %d", m_state, event);
305         close();
306         m_PDU_Observer->failNotify();
307     }
308 }
309
310 void Yaz_PDU_Assoc::close()
311 {
312     Yaz_PDU_Assoc *ch;
313     for (ch = m_children; ch; ch = ch->m_next)
314         ch->close();
315
316     m_socketObservable->deleteObserver(this);
317     m_state = Closed;
318     if (m_cs)
319     {
320         yaz_log (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs));
321         cs_close (m_cs);
322     }
323     m_cs = 0;
324     while (m_queue_out)
325     {
326         PDU_Queue *q_this = m_queue_out;
327         m_queue_out = m_queue_out->m_next;
328         delete q_this;
329     }
330     xfree (m_input_buf);
331     m_input_buf = 0;
332     m_input_len = 0;
333 }
334
335 void Yaz_PDU_Assoc::destroy()
336 {
337     close();
338
339     if (m_destroyed)
340         *m_destroyed = 1;
341     Yaz_PDU_Assoc **c;
342
343     // delete from parent's child list (if any)
344     if (m_parent)
345     {
346         c = &m_parent->m_children;
347         while (*c != this)
348         {
349             assert (*c);
350             c = &(*c)->m_next;
351         }
352         *c = (*c)->m_next;
353     }
354     // delete all children ...
355     c = &m_children;
356     while (*c)
357     {
358         Yaz_PDU_Assoc *here = *c;
359         *c = (*c)->m_next;
360         here->m_parent = 0;
361         delete here;
362     }
363     yaz_log (m_log, "Yaz_PDU_Assoc::destroy this=%p", this);
364 }
365
366 Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
367 {
368     m_buf = (char *) malloc (len);
369     memcpy (m_buf, buf, len);
370     m_len = len;
371     m_next = 0;
372 }
373
374 Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue()
375 {
376     free (m_buf);
377 }
378
379 int Yaz_PDU_Assoc::flush_PDU()
380 {
381     int r;
382     
383     if (m_state != Ready && m_state != Writing)
384     {
385         yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready");
386         return 1;
387     }
388     PDU_Queue *q = m_queue_out;
389     if (!q)
390     {
391         m_state = Ready;
392         yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty");
393         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
394                                          YAZ_SOCKET_OBSERVE_WRITE|
395                                          YAZ_SOCKET_OBSERVE_EXCEPT);
396         return 0;
397     }
398     r = cs_put (m_cs, q->m_buf, q->m_len);
399     if (r < 0)
400     {
401         yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put failed");
402         close();
403         m_PDU_Observer->failNotify();
404         return r;
405     }
406     if (r == 1)
407     {
408         unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
409         m_state = Writing;
410         if (m_cs->io_pending & CS_WANT_WRITE)
411             mask |= YAZ_SOCKET_OBSERVE_WRITE;
412         if (m_cs->io_pending & CS_WANT_READ)
413             mask |= YAZ_SOCKET_OBSERVE_READ;
414  
415         m_socketObservable->maskObserver(this, mask);
416         yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes (incomp)",
417                  q->m_len);
418         return r;
419     } 
420     m_state = Ready;
421     yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
422     // whole packet sent... delete this and proceed to next ...
423     m_queue_out = q->m_next;
424     delete q;
425     // don't select on write if queue is empty ...
426     if (!m_queue_out)
427         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
428                                          YAZ_SOCKET_OBSERVE_EXCEPT);
429     return r;
430 }
431
432 int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
433 {
434     yaz_log (m_log, "Yaz_PDU_Assoc::send_PDU");
435     PDU_Queue **pq = &m_queue_out;
436     int is_idle = (*pq ? 0 : 1);
437     
438     if (!m_cs)
439     {
440         yaz_log (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0");
441         return -1;
442     }
443     while (*pq)
444         pq = &(*pq)->m_next;
445     *pq = new PDU_Queue(buf, len);
446     if (is_idle)
447         return flush_PDU ();
448     else
449         yaz_log (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d",
450                  cs_fileno(m_cs));
451     return 0;
452 }
453
454 COMSTACK Yaz_PDU_Assoc::comstack(const char *type_and_host, void **vp)
455 {
456     return cs_create_host(type_and_host, 0, vp);
457 }
458
459 void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
460                            const char *addr)
461 {
462     close();
463
464     yaz_log (LOG_LOG, "Adding listener %s", addr);
465
466     m_PDU_Observer = observer;
467     void *ap;
468     m_cs = comstack(addr, &ap);
469
470     if (!m_cs)
471         return;
472     if (cs_bind(m_cs, ap, CS_SERVER) < 0)
473         return;
474     m_socketObservable->addObserver(cs_fileno(m_cs), this);
475     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
476                                      YAZ_SOCKET_OBSERVE_EXCEPT);
477     yaz_log (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(m_cs));
478     m_state = Listen;
479 }
480
481 void Yaz_PDU_Assoc::idleTime(int idleTime)
482 {
483     m_idleTime = idleTime;
484     yaz_log (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime);
485     m_socketObservable->timeoutObserver(this, m_idleTime);
486 }
487
488 void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
489                             const char *addr)
490 {
491     yaz_log (m_log, "Yaz_PDU_Assoc::connect %s", addr);
492     close();
493     m_PDU_Observer = observer;
494     void *ap;
495     m_cs = comstack(addr, &ap);
496     int res = cs_connect (m_cs, ap);
497     yaz_log (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs),
498              res);
499     m_socketObservable->addObserver(cs_fileno(m_cs), this);
500
501     if (res >= 0)
502     {   // Connect pending or complet
503         m_state = Connecting;
504         unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
505         if (m_cs->io_pending & CS_WANT_WRITE)
506             mask |= YAZ_SOCKET_OBSERVE_WRITE;
507         if (m_cs->io_pending & CS_WANT_READ)
508             mask |= YAZ_SOCKET_OBSERVE_READ;
509         m_socketObservable->maskObserver(this, mask);
510     }
511     else
512     {   // Connect failed immediately
513         // Since m_state is Closed we can distinguish this case from
514         // normal connect in socketNotify handler
515         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_WRITE|
516                                          YAZ_SOCKET_OBSERVE_EXCEPT);
517     }
518 }
519
520 // Single-threaded... Only useful for non-blocking handlers
521 void Yaz_PDU_Assoc::childNotify(COMSTACK cs)
522 {
523
524  
525     Yaz_PDU_Assoc *new_observable =
526         new Yaz_PDU_Assoc (m_socketObservable, cs);
527     
528     new_observable->m_next = m_children;
529     m_children = new_observable;
530     new_observable->m_parent = this;
531
532     // Clone PDU Observer
533     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
534         (new_observable, cs_fileno(cs));
535 }