Fixed bug in proxy where a Yaz_ProxyClient could be owned by
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /*
2  * Copyright (c) 1998-2000, Index Data.
3  * See the file LICENSE for details.
4  * 
5  * $Log: yaz-pdu-assoc.cpp,v $
6  * Revision 1.18  2000-10-24 12:29:57  adam
7  * Fixed bug in proxy where a Yaz_ProxyClient could be owned by
8  * two Yaz_Proxy's (fatal).
9  *
10  * Revision 1.17  2000/10/11 11:58:16  adam
11  * Moved header files to include/yaz++. Switched to libtool and automake.
12  * Configure script creates yaz++-config script.
13  *
14  * Revision 1.16  2000/09/22 09:54:11  heikki
15  * minor
16  *
17  * Revision 1.15  2000/09/21 21:43:20  adam
18  * Better high-level server API.
19  *
20  * Revision 1.14  2000/09/12 12:09:53  adam
21  * More work on high-level server.
22  *
23  * Revision 1.13  2000/09/08 10:23:42  adam
24  * Added skeleton of yaz-z-server.
25  *
26  * Revision 1.12  2000/09/06 14:23:45  adam
27  * WIN32 updates.
28  *
29  * Revision 1.11  2000/09/04 08:29:22  adam
30  * Fixed memory leak(s). Added re-use of associations, rather than
31  * re-init, when maximum number of targets are in use.
32  *
33  * Revision 1.10  2000/08/10 08:42:42  adam
34  * Fixes for {set,get}_APDU_log.
35  *
36  * Revision 1.9  1999/12/06 13:52:45  adam
37  * Modified for new location of YAZ header files. Experimental threaded
38  * operation.
39  *
40  * Revision 1.8  1999/04/28 13:04:03  adam
41  * Fixed setting of proxy otherInfo so that database(s) are removed.
42  *
43  * Revision 1.7  1999/04/21 12:09:01  adam
44  * Many improvements. Modified to proxy server to work with "sessions"
45  * based on cookies.
46  *
47  * Revision 1.6  1999/04/20 10:30:05  adam
48  * Implemented various stuff for client and proxy. Updated calls
49  * to ODR to reflect new name parameter.
50  *g
51  * Revision 1.5  1999/04/09 11:46:57  adam
52  * Added object Yaz_Z_Assoc. Much more functional client.
53  *
54  * Revision 1.4  1999/03/23 14:17:57  adam
55  * More work on timeout handling. Work on yaz-client.
56  *
57  * Revision 1.3  1999/02/02 14:01:20  adam
58  * First WIN32 port of YAZ++.
59  *
60  * Revision 1.2  1999/01/28 13:08:44  adam
61  * Yaz_PDU_Assoc better encapsulated. Memory leak fix in
62  * yaz-socket-manager.cc.
63  *
64  * Revision 1.1.1.1  1999/01/28 09:41:07  adam
65  * First implementation of YAZ++.
66  *
67  */
68
69 #include <assert.h>
70
71 #include <yaz++/yaz-pdu-assoc.h>
72
73 #include <yaz/log.h>
74 #include <yaz/tcpip.h>
75
76 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable)
77 {
78     m_state = Closed;
79     m_cs = 0;
80     m_socketObservable = socketObservable;
81     m_PDU_Observer = 0;
82     m_queue_out = 0;
83     m_input_buf = 0;
84     m_input_len = 0;
85     m_children = 0;
86     m_parent = 0;
87     m_next = 0;
88     m_destroyed = 0;
89     m_idleTime = 0;
90     m_log = LOG_DEBUG;
91 }
92
93 IYaz_PDU_Observable *Yaz_PDU_Assoc::clone()
94 {
95     Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable);
96     return copy;
97 }
98
99 void Yaz_PDU_Assoc::socketNotify(int event)
100 {
101     logf (m_log, "Yaz_PDU_Assoc::socketNotify p=%p event = %d", this, event);
102     if (0 /* m_state == Connected */)
103     {
104         m_state = Ready;
105         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
106                                          YAZ_SOCKET_OBSERVE_EXCEPT);
107         m_PDU_Observer->connectNotify();
108         flush_PDU();
109     }
110     else if (m_state == Connecting)
111     {
112         if (event & YAZ_SOCKET_OBSERVE_READ)
113         {
114             close();
115             m_PDU_Observer->failNotify();
116         }
117         else if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
118         {
119             m_PDU_Observer->timeoutNotify();
120         }
121         else
122         {
123             m_state = Ready;
124             m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
125                                              YAZ_SOCKET_OBSERVE_EXCEPT);
126             m_PDU_Observer->connectNotify();
127             flush_PDU();
128         }
129     }
130     else if (m_state == Listen)
131     {
132         if (event & YAZ_SOCKET_OBSERVE_READ)
133         {
134             int res;
135             COMSTACK new_line;
136             
137             if ((res = cs_listen(m_cs, 0, 0)) == 1)
138                 return;
139             if (res < 0)
140             {
141                 logf(LOG_FATAL|LOG_ERRNO, "cs_listen failed");
142                 return;
143             }
144             if (!(new_line = cs_accept(m_cs)))
145                 return;
146             /* 1. create socket-manager 
147                2. create pdu-assoc
148                3. create top-level object
149                     setup observer for child fileid in pdu-assoc
150                4. start thread
151             */
152             int fd = cs_fileno(new_line);
153             logf (m_log, "accept ok fd = %d", fd);
154             cs_fileno(new_line) = -1;  
155             cs_close (new_line);
156             childNotify(fd);
157         }
158     }
159     else if (m_state == Ready)
160     {
161         if (event & YAZ_SOCKET_OBSERVE_WRITE)
162         {
163             flush_PDU();
164         }
165         if (event & YAZ_SOCKET_OBSERVE_READ)
166         {
167             do
168             {
169                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
170                 if (res == 1)
171                     return;
172                 else if (res <= 0)
173                 {
174                     logf (m_log, "Connection closed by peer");
175                     close();
176                     m_PDU_Observer->failNotify(); // problem here..
177                     return;
178                 }
179                 // lock it, so we know if recv_PDU deletes it.
180                 int destroyed = 0;
181                 m_destroyed = &destroyed;
182
183                 m_PDU_Observer->recv_PDU(m_input_buf, res);
184                 m_destroyed = 0;
185                 if (destroyed)   // it really was destroyed, return now.
186                     return;
187             } while (m_cs && cs_more (m_cs));
188         }
189         if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
190         {
191             m_PDU_Observer->timeoutNotify();
192         }
193     }
194 }
195
196 void Yaz_PDU_Assoc::close()
197 {
198     m_socketObservable->deleteObserver(this);
199     m_state = Closed;
200     if (m_cs)
201     {
202         logf (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs));
203         cs_close (m_cs);
204     }
205     m_cs = 0;
206     while (m_queue_out)
207     {
208         PDU_Queue *q_this = m_queue_out;
209         m_queue_out = m_queue_out->m_next;
210         delete q_this;
211     }
212     xfree (m_input_buf);
213     m_input_buf = 0;
214     m_input_len = 0;
215 }
216
217 void Yaz_PDU_Assoc::destroy()
218 {
219     close();
220     if (m_destroyed)
221         *m_destroyed = 1;
222     Yaz_PDU_Assoc **c;
223
224     // delete from parent's child list (if any)
225     if (m_parent)
226     {
227         c = &m_parent->m_children;
228         while (*c != this)
229         {
230             assert (*c);
231             c = &(*c)->m_next;
232         }
233         *c = (*c)->m_next;
234     }
235     // delete all children ...
236     c = &m_children;
237     while (*c)
238     {
239         Yaz_PDU_Assoc *here = *c;
240         *c = (*c)->m_next;
241         here->m_parent = 0;
242         delete here;
243     }
244 }
245
246 Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
247 {
248     m_buf = (char *) malloc (len);
249     memcpy (m_buf, buf, len);
250     m_len = len;
251     m_next = 0;
252 }
253
254 Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue()
255 {
256     free (m_buf);
257 }
258
259 int Yaz_PDU_Assoc::flush_PDU()
260 {
261     int r;
262     
263     logf (m_log, "Yaz_PDU_Assoc::flush_PDU");
264     if (m_state != Ready)
265     {
266         logf (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready");
267         return 1;
268     }
269     PDU_Queue *q = m_queue_out;
270     if (!q)
271     {
272         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
273                                          YAZ_SOCKET_OBSERVE_EXCEPT);
274         return 0;
275     }
276     r = cs_put (m_cs, q->m_buf, q->m_len);
277     if (r < 0)
278     {
279         close();
280         m_PDU_Observer->failNotify();
281         return r;
282     }
283     if (r == 1)
284     {
285         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
286                                          YAZ_SOCKET_OBSERVE_EXCEPT|
287                                          YAZ_SOCKET_OBSERVE_WRITE);
288         logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes (incomplete)",
289               q->m_len);
290         return r;
291     }
292     logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes", q->m_len);
293     // whole packet sent... delete this and proceed to next ...
294     m_queue_out = q->m_next;
295     delete q;
296     // don't select on write if queue is empty ...
297     if (!m_queue_out)
298         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
299                                          YAZ_SOCKET_OBSERVE_EXCEPT);
300     return r;
301 }
302
303 int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
304 {
305     logf (m_log, "Yaz_PDU_Assoc::send_PDU");
306     PDU_Queue **pq = &m_queue_out;
307     int is_idle = (*pq ? 0 : 1);
308     
309     if (!m_cs)
310     {
311         logf (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0");
312         return -1;
313     }
314     while (*pq)
315         pq = &(*pq)->m_next;
316     *pq = new PDU_Queue(buf, len);
317     if (is_idle)
318         return flush_PDU ();
319     else
320         logf (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d",
321               cs_fileno(m_cs));
322     return 0;
323 }
324
325 COMSTACK Yaz_PDU_Assoc::comstack()
326 {
327     if (!m_cs)
328     {
329         CS_TYPE cs_type = tcpip_type;
330         m_cs = cs_create (cs_type, 0, PROTO_Z3950);
331     }
332     return m_cs;
333 }
334
335 void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
336                            const char *addr)
337 {
338     close();
339     void *ap;
340     COMSTACK cs = comstack();
341
342     logf (m_log, "Yaz_PDU_Assoc::listen %s", addr);
343     m_PDU_Observer = observer;
344     if (!cs)
345         return;
346     ap = cs_straddr (cs, addr);
347     if (!ap)
348         return;
349     if (cs_bind(cs, ap, CS_SERVER) < 0)
350         return;
351     m_socketObservable->addObserver(cs_fileno(cs), this);
352     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
353                                      YAZ_SOCKET_OBSERVE_EXCEPT);
354     logf (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(cs));
355     m_state = Listen;
356 }
357
358 void Yaz_PDU_Assoc::idleTime(int idleTime)
359 {
360     m_idleTime = idleTime;
361     logf (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime);
362     m_socketObservable->timeoutObserver(this, m_idleTime);
363 }
364
365 void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
366                             const char *addr)
367 {
368     logf (m_log, "Yaz_PDU_Assoc::connect %s", addr);
369     close();
370     m_PDU_Observer = observer;
371     COMSTACK cs = comstack();
372     void *ap = cs_straddr (cs, addr);
373     if (!ap)
374     {
375         logf (m_log, "cs_straddr failed");
376         return;
377     }
378     int res = cs_connect (cs, ap);
379     logf (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(cs), res);
380     m_socketObservable->addObserver(cs_fileno(cs), this);
381     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
382                                            YAZ_SOCKET_OBSERVE_EXCEPT|
383                                            YAZ_SOCKET_OBSERVE_WRITE);
384     m_state = Connecting;
385 }
386
387 void Yaz_PDU_Assoc::socket(IYaz_PDU_Observer *observer, int fd)
388 {
389     close();
390     m_PDU_Observer = observer;
391     if (fd >= 0)
392     {
393         CS_TYPE cs_type = tcpip_type;
394         m_cs = cs_createbysocket(fd, cs_type, 0, PROTO_Z3950);
395         m_state = Ready;
396         m_socketObservable->addObserver(fd, this);
397         m_socketObservable->maskObserver(this,
398                                          YAZ_SOCKET_OBSERVE_READ|
399                                          YAZ_SOCKET_OBSERVE_EXCEPT);
400         m_socketObservable->timeoutObserver(this, m_idleTime);
401     }
402 }
403
404 #if 1
405  // 1 = single-threaded
406  // 0 = multi-threaded
407
408 // Single-threaded... Only useful for non-blocking handlers
409 void Yaz_PDU_Assoc::childNotify(int fd)
410 {
411     // Clone PDU Observable (keep socket manager)
412     IYaz_PDU_Observable *new_observable = clone();
413
414     // Clone PDU Observer
415     IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable);
416
417     // Attach new socket to it
418     new_observable->socket(observer, fd);
419 }
420 #else
421
422 #include <yaz++/yaz-socket-manager.h>
423
424 #ifdef WIN32
425 #include <process.h>
426 #else
427 #include <pthread.h>
428 #endif
429
430 #ifdef WIN32
431 void __cdecl
432 #else
433 void *
434 #endif 
435     events(void *p)
436 {
437     Yaz_SocketManager *s = (Yaz_SocketManager *) p;
438
439     logf (LOG_LOG, "thread started");
440     while (s->processEvent() > 0)
441         ;
442     logf (LOG_LOG, "thread finished");
443 #ifdef WIN32
444 #else
445     return 0;
446 #endif
447 }
448
449 void Yaz_PDU_Assoc::childNotify(int fd)
450 {
451     Yaz_SocketManager *socket_observable = new Yaz_SocketManager;
452     Yaz_PDU_Assoc *new_observable = new Yaz_PDU_Assoc (socket_observable);
453     
454     /// Clone PDU Observer
455     IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable);
456     
457     /// Attach new socket to it
458     new_observable->socket(observer, fd);
459
460 #ifdef WIN32
461     long t_id;
462     t_id = _beginthread (events, 0, socket_observable);
463     if (t_id == -1)
464     {
465         logf (LOG_FATAL|LOG_ERRNO, "_beginthread failed");
466         exit (1);
467     }
468 #else
469     pthread_t type;
470
471     int id = pthread_create (&type, 0, events, socket_observable);
472     logf (LOG_LOG, "pthread_create returned id=%d", id);
473 #endif
474 }
475 // Threads end
476 #endif