Added skeleton of yaz-z-server.
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /*
2  * Copyright (c) 1998-2000, Index Data.
3  * See the file LICENSE for details.
4  * 
5  * $Log: yaz-pdu-assoc.cpp,v $
6  * Revision 1.13  2000-09-08 10:23:42  adam
7  * Added skeleton of yaz-z-server.
8  *
9  * Revision 1.12  2000/09/06 14:23:45  adam
10  * WIN32 updates.
11  *
12  * Revision 1.11  2000/09/04 08:29:22  adam
13  * Fixed memory leak(s). Added re-use of associations, rather than
14  * re-init, when maximum number of targets are in use.
15  *
16  * Revision 1.10  2000/08/10 08:42:42  adam
17  * Fixes for {set,get}_APDU_log.
18  *
19  * Revision 1.9  1999/12/06 13:52:45  adam
20  * Modified for new location of YAZ header files. Experimental threaded
21  * operation.
22  *
23  * Revision 1.8  1999/04/28 13:04:03  adam
24  * Fixed setting of proxy otherInfo so that database(s) are removed.
25  *
26  * Revision 1.7  1999/04/21 12:09:01  adam
27  * Many improvements. Modified to proxy server to work with "sessions"
28  * based on cookies.
29  *
30  * Revision 1.6  1999/04/20 10:30:05  adam
31  * Implemented various stuff for client and proxy. Updated calls
32  * to ODR to reflect new name parameter.
33  *
34  * Revision 1.5  1999/04/09 11:46:57  adam
35  * Added object Yaz_Z_Assoc. Much more functional client.
36  *
37  * Revision 1.4  1999/03/23 14:17:57  adam
38  * More work on timeout handling. Work on yaz-client.
39  *
40  * Revision 1.3  1999/02/02 14:01:20  adam
41  * First WIN32 port of YAZ++.
42  *
43  * Revision 1.2  1999/01/28 13:08:44  adam
44  * Yaz_PDU_Assoc better encapsulated. Memory leak fix in
45  * yaz-socket-manager.cc.
46  *
47  * Revision 1.1.1.1  1999/01/28 09:41:07  adam
48  * First implementation of YAZ++.
49  *
50  */
51
52 #include <assert.h>
53
54 #include <yaz-pdu-assoc.h>
55
56 #include <yaz/log.h>
57 #include <yaz/tcpip.h>
58
59 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable)
60 {
61     m_state = Closed;
62     m_cs = 0;
63     m_socketObservable = socketObservable;
64     m_PDU_Observer = 0;
65     m_queue_out = 0;
66     m_input_buf = 0;
67     m_input_len = 0;
68     m_children = 0;
69     m_parent = 0;
70     m_next = 0;
71     m_destroyed = 0;
72     m_idleTime = 0;
73     m_log = LOG_DEBUG;
74 }
75
76 IYaz_PDU_Observable *Yaz_PDU_Assoc::clone()
77 {
78     Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable);
79     return copy;
80 }
81
82 Yaz_PDU_Assoc::~Yaz_PDU_Assoc()
83 {
84     destroy();
85 }
86
87 void Yaz_PDU_Assoc::socketNotify(int event)
88 {
89     logf (m_log, "Yaz_PDU_Assoc::socketNotify p=%p event = %d", this, event);
90     if (0 /* m_state == Connected */)
91     {
92         m_state = Ready;
93         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
94                                          YAZ_SOCKET_OBSERVE_EXCEPT);
95         m_PDU_Observer->connectNotify();
96         flush_PDU();
97     }
98     else if (m_state == Connecting)
99     {
100         if (event & YAZ_SOCKET_OBSERVE_READ)
101         {
102             close();
103             m_PDU_Observer->failNotify();
104         }
105         else if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
106         {
107             m_PDU_Observer->timeoutNotify();
108         }
109         else
110         {
111             m_state = Ready;
112             m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
113                                              YAZ_SOCKET_OBSERVE_EXCEPT);
114             m_PDU_Observer->connectNotify();
115             flush_PDU();
116         }
117     }
118     else if (m_state == Listen)
119     {
120         if (event & YAZ_SOCKET_OBSERVE_READ)
121         {
122             int res;
123             COMSTACK new_line;
124             
125             if ((res = cs_listen(m_cs, 0, 0)) == 1)
126                 return;
127             if (res < 0)
128             {
129                 logf(LOG_FATAL|LOG_ERRNO, "cs_listen failed");
130                 return;
131             }
132             if (!(new_line = cs_accept(m_cs)))
133                 return;
134             
135             /* 1. create socket-manager 
136                2. create pdu-assoc
137                3. create top-level object
138                     setup observer for child fileid in pdu-assoc
139                4. start thread
140             */
141             int fd = cs_fileno(new_line);
142             cs_fileno(new_line) = -1;  
143             cs_close (new_line);        /* potential problem ... */
144 #if 1
145             childNotify(fd);
146 #else
147             Yaz_PDU_Assoc *assoc = new Yaz_PDU_Assoc (m_socketObservable);
148             assoc->m_parent = this;
149             assoc->m_next = m_children;
150             m_children = assoc;
151  
152             assoc->m_PDU_Observer = m_PDU_Observer->clone(assoc);
153             socket(fd);
154 #endif
155         }
156     }
157     else if (m_state == Ready)
158     {
159         if (event & YAZ_SOCKET_OBSERVE_WRITE)
160         {
161             flush_PDU();
162         }
163         if (event & YAZ_SOCKET_OBSERVE_READ)
164         {
165             do
166             {
167                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
168                 if (res == 1)
169                     return;
170                 else if (res <= 0)
171                 {
172                     logf (m_log, "Connection closed by peer");
173                     close();
174                     m_PDU_Observer->failNotify();
175                     return;
176                 }
177                 // lock it, so we know if recv_PDU deletes it.
178                 int destroyed = 0;
179                 m_destroyed = &destroyed;
180
181                 m_PDU_Observer->recv_PDU(m_input_buf, res);
182                 m_destroyed = 0;
183                 if (destroyed)   // it really was destroyed, return now.
184                     return;
185             } while (m_cs && cs_more (m_cs));
186         }
187         if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
188         {
189             m_PDU_Observer->timeoutNotify();
190         }
191     }
192 }
193
194 void Yaz_PDU_Assoc::close()
195 {
196     m_socketObservable->deleteObserver(this);
197     m_state = Closed;
198     if (m_cs)
199     {
200         logf (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs));
201         cs_close (m_cs);
202     }
203     m_cs = 0;
204     while (m_queue_out)
205     {
206         PDU_Queue *q_this = m_queue_out;
207         m_queue_out = m_queue_out->m_next;
208         delete q_this;
209     }
210     xfree (m_input_buf);
211     m_input_buf = 0;
212     m_input_len = 0;
213 }
214
215 void Yaz_PDU_Assoc::destroy()
216 {
217     close();
218     if (m_destroyed)
219         *m_destroyed = 1;
220     Yaz_PDU_Assoc **c;
221
222     // delete from parent's child list (if any)
223     if (m_parent)
224     {
225         c = &m_parent->m_children;
226         while (*c != this)
227         {
228             assert (*c);
229             c = &(*c)->m_next;
230         }
231         *c = (*c)->m_next;
232     }
233     // delete all children ...
234     c = &m_children;
235     while (*c)
236     {
237         Yaz_PDU_Assoc *here = *c;
238         *c = (*c)->m_next;
239         here->m_parent = 0;
240         delete here;
241     }
242 }
243
244 Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
245 {
246     m_buf = (char *) malloc (len);
247     memcpy (m_buf, buf, len);
248     m_len = len;
249     m_next = 0;
250 }
251
252 Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue()
253 {
254     free (m_buf);
255 }
256
257 int Yaz_PDU_Assoc::flush_PDU()
258 {
259     int r;
260     
261     logf (m_log, "Yaz_PDU_Assoc::flush_PDU");
262     if (m_state != Ready)
263     {
264         logf (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready");
265         return 1;
266     }
267     PDU_Queue *q = m_queue_out;
268     if (!q)
269     {
270         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
271                                          YAZ_SOCKET_OBSERVE_EXCEPT);
272         return 0;
273     }
274     r = cs_put (m_cs, q->m_buf, q->m_len);
275     if (r < 0)
276     {
277         close();
278         m_PDU_Observer->failNotify();
279         return r;
280     }
281     if (r == 1)
282     {
283         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
284                                          YAZ_SOCKET_OBSERVE_EXCEPT|
285                                          YAZ_SOCKET_OBSERVE_WRITE);
286         logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes (incomplete)",
287               q->m_len);
288         return r;
289     }
290     logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes", q->m_len);
291     // whole packet sent... delete this and proceed to next ...
292     m_queue_out = q->m_next;
293     delete q;
294     // don't select on write if queue is empty ...
295     if (!m_queue_out)
296         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
297                                          YAZ_SOCKET_OBSERVE_EXCEPT);
298     return r;
299 }
300
301 int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
302 {
303     logf (m_log, "Yaz_PDU_Assoc::send_PDU");
304     PDU_Queue **pq = &m_queue_out;
305     int is_idle = (*pq ? 0 : 1);
306     
307     if (!m_cs)
308     {
309         logf (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0");
310         return -1;
311     }
312     while (*pq)
313         pq = &(*pq)->m_next;
314     *pq = new PDU_Queue(buf, len);
315     if (is_idle)
316         return flush_PDU ();
317     else
318         logf (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d",
319               cs_fileno(m_cs));
320     return 0;
321 }
322
323 COMSTACK Yaz_PDU_Assoc::comstack()
324 {
325     if (!m_cs)
326     {
327         CS_TYPE cs_type = tcpip_type;
328         m_cs = cs_create (cs_type, 0, PROTO_Z3950);
329     }
330     return m_cs;
331 }
332
333 void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
334                            const char *addr)
335 {
336     close();
337     void *ap;
338     COMSTACK cs = comstack();
339
340     logf (m_log, "Yaz_PDU_Assoc::listen %s", addr);
341     m_PDU_Observer = observer;
342     if (!cs)
343         return;
344     ap = cs_straddr (cs, addr);
345     if (!ap)
346         return;
347     if (cs_bind(cs, ap, CS_SERVER) < 0)
348         return;
349     m_socketObservable->addObserver(cs_fileno(cs), this);
350     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
351                                      YAZ_SOCKET_OBSERVE_EXCEPT);
352     m_state = Listen;
353 }
354
355 void Yaz_PDU_Assoc::idleTime(int idleTime)
356 {
357     m_idleTime = idleTime;
358     logf (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime);
359     m_socketObservable->timeoutObserver(this, m_idleTime);
360 }
361
362 void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
363                             const char *addr)
364 {
365     logf (m_log, "Yaz_PDU_Assoc::connect %s", addr);
366     close();
367     m_PDU_Observer = observer;
368     COMSTACK cs = comstack();
369     void *ap = cs_straddr (cs, addr);
370     if (!ap)
371     {
372         logf (m_log, "cs_straddr failed");
373         return;
374     }
375     int res = cs_connect (cs, ap);
376     logf (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(cs), res);
377     m_socketObservable->addObserver(cs_fileno(cs), this);
378     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
379                                            YAZ_SOCKET_OBSERVE_EXCEPT|
380                                            YAZ_SOCKET_OBSERVE_WRITE);
381     m_state = Connecting;
382 }
383
384 void Yaz_PDU_Assoc::socket(IYaz_PDU_Observer *observer, int fd)
385 {
386     close();
387     m_PDU_Observer = observer;
388     if (fd >= 0)
389     {
390         CS_TYPE cs_type = tcpip_type;
391         m_cs = cs_createbysocket(fd, cs_type, 0, PROTO_Z3950);
392         m_state = Ready;
393         m_socketObservable->addObserver(fd, this);
394         m_socketObservable->maskObserver(this,
395                                          YAZ_SOCKET_OBSERVE_READ|
396                                          YAZ_SOCKET_OBSERVE_EXCEPT);
397         m_socketObservable->timeoutObserver(this, m_idleTime);
398     }
399 }
400
401 #if 0
402
403 // Single-threaded... Only useful for non-blocking handlers
404 void Yaz_PDU_Assoc::childNotify(int fd)
405 {
406     /// Clone PDU Observable (keep socket manager)
407     IYaz_PDU_Observable *new_observable = clone();
408
409     /// Clone PDU Observer
410     IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable);
411
412     /// Attach new socket to it
413     new_observable->socket(observer, fd);
414 }
415 #else
416
417 #include <yaz-socket-manager.h>
418
419 #ifdef WIN32
420 #include <process.h>
421 #else
422 #include <pthread.h>
423 #endif
424
425 #ifdef WIN32
426 void __cdecl
427 #else
428 void *
429 #endif 
430     events(void *p)
431 {
432     Yaz_SocketManager *s = (Yaz_SocketManager *) p;
433
434     logf (LOG_LOG, "thread started");
435     while (s->processEvent() > 0)
436         ;
437     logf (LOG_LOG, "thread finished");
438 #ifdef WIN32
439 #else
440     return 0;
441 #endif
442 }
443
444 void Yaz_PDU_Assoc::childNotify(int fd)
445 {
446     Yaz_SocketManager *socket_observable = new Yaz_SocketManager;
447     Yaz_PDU_Assoc *new_observable = new Yaz_PDU_Assoc (socket_observable);
448     
449     /// Clone PDU Observer
450     IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable);
451     
452     /// Attach new socket to it
453     new_observable->socket(observer, fd);
454
455 #ifdef WIN32
456     long t_id;
457     t_id = _beginthread (events, 0, socket_observable);
458     if (t_id == -1)
459     {
460         logf (LOG_FATAL|LOG_ERRNO, "_beginthread failed");
461         exit (1);
462     }
463 #else
464     pthread_t type;
465
466     int id = pthread_create (&type, 0, events, socket_observable);
467     logf (LOG_LOG, "pthread_create returned id=%d", id);
468 #endif
469 }
470 // Threads end
471 #endif