Better high-level server API.
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /*
2  * Copyright (c) 1998-2000, Index Data.
3  * See the file LICENSE for details.
4  * 
5  * $Log: yaz-pdu-assoc.cpp,v $
6  * Revision 1.15  2000-09-21 21:43:20  adam
7  * Better high-level server API.
8  *
9  * Revision 1.14  2000/09/12 12:09:53  adam
10  * More work on high-level server.
11  *
12  * Revision 1.13  2000/09/08 10:23:42  adam
13  * Added skeleton of yaz-z-server.
14  *
15  * Revision 1.12  2000/09/06 14:23:45  adam
16  * WIN32 updates.
17  *
18  * Revision 1.11  2000/09/04 08:29:22  adam
19  * Fixed memory leak(s). Added re-use of associations, rather than
20  * re-init, when maximum number of targets are in use.
21  *
22  * Revision 1.10  2000/08/10 08:42:42  adam
23  * Fixes for {set,get}_APDU_log.
24  *
25  * Revision 1.9  1999/12/06 13:52:45  adam
26  * Modified for new location of YAZ header files. Experimental threaded
27  * operation.
28  *
29  * Revision 1.8  1999/04/28 13:04:03  adam
30  * Fixed setting of proxy otherInfo so that database(s) are removed.
31  *
32  * Revision 1.7  1999/04/21 12:09:01  adam
33  * Many improvements. Modified to proxy server to work with "sessions"
34  * based on cookies.
35  *
36  * Revision 1.6  1999/04/20 10:30:05  adam
37  * Implemented various stuff for client and proxy. Updated calls
38  * to ODR to reflect new name parameter.
39  *g
40  * Revision 1.5  1999/04/09 11:46:57  adam
41  * Added object Yaz_Z_Assoc. Much more functional client.
42  *
43  * Revision 1.4  1999/03/23 14:17:57  adam
44  * More work on timeout handling. Work on yaz-client.
45  *
46  * Revision 1.3  1999/02/02 14:01:20  adam
47  * First WIN32 port of YAZ++.
48  *
49  * Revision 1.2  1999/01/28 13:08:44  adam
50  * Yaz_PDU_Assoc better encapsulated. Memory leak fix in
51  * yaz-socket-manager.cc.
52  *
53  * Revision 1.1.1.1  1999/01/28 09:41:07  adam
54  * First implementation of YAZ++.
55  *
56  */
57
58 #include <assert.h>
59
60 #include <yaz-pdu-assoc.h>
61
62 #include <yaz/log.h>
63 #include <yaz/tcpip.h>
64
65 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable)
66 {
67     m_state = Closed;
68     m_cs = 0;
69     m_socketObservable = socketObservable;
70     m_PDU_Observer = 0;
71     m_queue_out = 0;
72     m_input_buf = 0;
73     m_input_len = 0;
74     m_children = 0;
75     m_parent = 0;
76     m_next = 0;
77     m_destroyed = 0;
78     m_idleTime = 0;
79     m_log = LOG_DEBUG;
80 }
81
82 IYaz_PDU_Observable *Yaz_PDU_Assoc::clone()
83 {
84     Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable);
85     return copy;
86 }
87
88 void Yaz_PDU_Assoc::socketNotify(int event)
89 {
90     logf (m_log, "Yaz_PDU_Assoc::socketNotify p=%p event = %d", this, event);
91     if (0 /* m_state == Connected */)
92     {
93         m_state = Ready;
94         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
95                                          YAZ_SOCKET_OBSERVE_EXCEPT);
96         m_PDU_Observer->connectNotify();
97         flush_PDU();
98     }
99     else if (m_state == Connecting)
100     {
101         if (event & YAZ_SOCKET_OBSERVE_READ)
102         {
103             close();
104             m_PDU_Observer->failNotify();
105         }
106         else if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
107         {
108             m_PDU_Observer->timeoutNotify();
109         }
110         else
111         {
112             m_state = Ready;
113             m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
114                                              YAZ_SOCKET_OBSERVE_EXCEPT);
115             m_PDU_Observer->connectNotify();
116             flush_PDU();
117         }
118     }
119     else if (m_state == Listen)
120     {
121         if (event & YAZ_SOCKET_OBSERVE_READ)
122         {
123             int res;
124             COMSTACK new_line;
125             
126             if ((res = cs_listen(m_cs, 0, 0)) == 1)
127                 return;
128             if (res < 0)
129             {
130                 logf(LOG_FATAL|LOG_ERRNO, "cs_listen failed");
131                 return;
132             }
133             if (!(new_line = cs_accept(m_cs)))
134                 return;
135             /* 1. create socket-manager 
136                2. create pdu-assoc
137                3. create top-level object
138                     setup observer for child fileid in pdu-assoc
139                4. start thread
140             */
141             int fd = cs_fileno(new_line);
142             logf (m_log, "accept ok fd = %d", fd);
143             cs_fileno(new_line) = -1;  
144             cs_close (new_line);
145             childNotify(fd);
146         }
147     }
148     else if (m_state == Ready)
149     {
150         if (event & YAZ_SOCKET_OBSERVE_WRITE)
151         {
152             flush_PDU();
153         }
154         if (event & YAZ_SOCKET_OBSERVE_READ)
155         {
156             do
157             {
158                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
159                 if (res == 1)
160                     return;
161                 else if (res <= 0)
162                 {
163                     logf (m_log, "Connection closed by peer");
164                     close();
165                     m_PDU_Observer->failNotify();
166                     return;
167                 }
168                 // lock it, so we know if recv_PDU deletes it.
169                 int destroyed = 0;
170                 m_destroyed = &destroyed;
171
172                 m_PDU_Observer->recv_PDU(m_input_buf, res);
173                 m_destroyed = 0;
174                 if (destroyed)   // it really was destroyed, return now.
175                     return;
176             } while (m_cs && cs_more (m_cs));
177         }
178         if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
179         {
180             m_PDU_Observer->timeoutNotify();
181         }
182     }
183 }
184
185 void Yaz_PDU_Assoc::close()
186 {
187     m_socketObservable->deleteObserver(this);
188     m_state = Closed;
189     if (m_cs)
190     {
191         logf (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs));
192         cs_close (m_cs);
193     }
194     m_cs = 0;
195     while (m_queue_out)
196     {
197         PDU_Queue *q_this = m_queue_out;
198         m_queue_out = m_queue_out->m_next;
199         delete q_this;
200     }
201     xfree (m_input_buf);
202     m_input_buf = 0;
203     m_input_len = 0;
204 }
205
206 void Yaz_PDU_Assoc::destroy()
207 {
208     close();
209     if (m_destroyed)
210         *m_destroyed = 1;
211     Yaz_PDU_Assoc **c;
212
213     // delete from parent's child list (if any)
214     if (m_parent)
215     {
216         c = &m_parent->m_children;
217         while (*c != this)
218         {
219             assert (*c);
220             c = &(*c)->m_next;
221         }
222         *c = (*c)->m_next;
223     }
224     // delete all children ...
225     c = &m_children;
226     while (*c)
227     {
228         Yaz_PDU_Assoc *here = *c;
229         *c = (*c)->m_next;
230         here->m_parent = 0;
231         delete here;
232     }
233 }
234
235 Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
236 {
237     m_buf = (char *) malloc (len);
238     memcpy (m_buf, buf, len);
239     m_len = len;
240     m_next = 0;
241 }
242
243 Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue()
244 {
245     free (m_buf);
246 }
247
248 int Yaz_PDU_Assoc::flush_PDU()
249 {
250     int r;
251     
252     logf (m_log, "Yaz_PDU_Assoc::flush_PDU");
253     if (m_state != Ready)
254     {
255         logf (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready");
256         return 1;
257     }
258     PDU_Queue *q = m_queue_out;
259     if (!q)
260     {
261         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
262                                          YAZ_SOCKET_OBSERVE_EXCEPT);
263         return 0;
264     }
265     r = cs_put (m_cs, q->m_buf, q->m_len);
266     if (r < 0)
267     {
268         close();
269         m_PDU_Observer->failNotify();
270         return r;
271     }
272     if (r == 1)
273     {
274         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
275                                          YAZ_SOCKET_OBSERVE_EXCEPT|
276                                          YAZ_SOCKET_OBSERVE_WRITE);
277         logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes (incomplete)",
278               q->m_len);
279         return r;
280     }
281     logf (m_log, "Yaz_PDU_Assoc::flush_PDU put %d bytes", q->m_len);
282     // whole packet sent... delete this and proceed to next ...
283     m_queue_out = q->m_next;
284     delete q;
285     // don't select on write if queue is empty ...
286     if (!m_queue_out)
287         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
288                                          YAZ_SOCKET_OBSERVE_EXCEPT);
289     return r;
290 }
291
292 int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
293 {
294     logf (m_log, "Yaz_PDU_Assoc::send_PDU");
295     PDU_Queue **pq = &m_queue_out;
296     int is_idle = (*pq ? 0 : 1);
297     
298     if (!m_cs)
299     {
300         logf (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0");
301         return -1;
302     }
303     while (*pq)
304         pq = &(*pq)->m_next;
305     *pq = new PDU_Queue(buf, len);
306     if (is_idle)
307         return flush_PDU ();
308     else
309         logf (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d",
310               cs_fileno(m_cs));
311     return 0;
312 }
313
314 COMSTACK Yaz_PDU_Assoc::comstack()
315 {
316     if (!m_cs)
317     {
318         CS_TYPE cs_type = tcpip_type;
319         m_cs = cs_create (cs_type, 0, PROTO_Z3950);
320     }
321     return m_cs;
322 }
323
324 void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
325                            const char *addr)
326 {
327     close();
328     void *ap;
329     COMSTACK cs = comstack();
330
331     logf (m_log, "Yaz_PDU_Assoc::listen %s", addr);
332     m_PDU_Observer = observer;
333     if (!cs)
334         return;
335     ap = cs_straddr (cs, addr);
336     if (!ap)
337         return;
338     if (cs_bind(cs, ap, CS_SERVER) < 0)
339         return;
340     m_socketObservable->addObserver(cs_fileno(cs), this);
341     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
342                                      YAZ_SOCKET_OBSERVE_EXCEPT);
343     logf (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(cs));
344     m_state = Listen;
345 }
346
347 void Yaz_PDU_Assoc::idleTime(int idleTime)
348 {
349     m_idleTime = idleTime;
350     logf (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime);
351     m_socketObservable->timeoutObserver(this, m_idleTime);
352 }
353
354 void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
355                             const char *addr)
356 {
357     logf (m_log, "Yaz_PDU_Assoc::connect %s", addr);
358     close();
359     m_PDU_Observer = observer;
360     COMSTACK cs = comstack();
361     void *ap = cs_straddr (cs, addr);
362     if (!ap)
363     {
364         logf (m_log, "cs_straddr failed");
365         return;
366     }
367     int res = cs_connect (cs, ap);
368     logf (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(cs), res);
369     m_socketObservable->addObserver(cs_fileno(cs), this);
370     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
371                                            YAZ_SOCKET_OBSERVE_EXCEPT|
372                                            YAZ_SOCKET_OBSERVE_WRITE);
373     m_state = Connecting;
374 }
375
376 void Yaz_PDU_Assoc::socket(IYaz_PDU_Observer *observer, int fd)
377 {
378     close();
379     m_PDU_Observer = observer;
380     if (fd >= 0)
381     {
382         CS_TYPE cs_type = tcpip_type;
383         m_cs = cs_createbysocket(fd, cs_type, 0, PROTO_Z3950);
384         m_state = Ready;
385         m_socketObservable->addObserver(fd, this);
386         m_socketObservable->maskObserver(this,
387                                          YAZ_SOCKET_OBSERVE_READ|
388                                          YAZ_SOCKET_OBSERVE_EXCEPT);
389         m_socketObservable->timeoutObserver(this, m_idleTime);
390     }
391 }
392
393 #if 1
394
395 // Single-threaded... Only useful for non-blocking handlers
396 void Yaz_PDU_Assoc::childNotify(int fd)
397 {
398     // Clone PDU Observable (keep socket manager)
399     IYaz_PDU_Observable *new_observable = clone();
400
401     // Clone PDU Observer
402     IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable);
403
404     // Attach new socket to it
405     new_observable->socket(observer, fd);
406 }
407 #else
408
409 #include <yaz-socket-manager.h>
410
411 #ifdef WIN32
412 #include <process.h>
413 #else
414 #include <pthread.h>
415 #endif
416
417 #ifdef WIN32
418 void __cdecl
419 #else
420 void *
421 #endif 
422     events(void *p)
423 {
424     Yaz_SocketManager *s = (Yaz_SocketManager *) p;
425
426     logf (LOG_LOG, "thread started");
427     while (s->processEvent() > 0)
428         ;
429     logf (LOG_LOG, "thread finished");
430 #ifdef WIN32
431 #else
432     return 0;
433 #endif
434 }
435
436 void Yaz_PDU_Assoc::childNotify(int fd)
437 {
438     Yaz_SocketManager *socket_observable = new Yaz_SocketManager;
439     Yaz_PDU_Assoc *new_observable = new Yaz_PDU_Assoc (socket_observable);
440     
441     /// Clone PDU Observer
442     IYaz_PDU_Observer *observer = m_PDU_Observer->clone(new_observable);
443     
444     /// Attach new socket to it
445     new_observable->socket(observer, fd);
446
447 #ifdef WIN32
448     long t_id;
449     t_id = _beginthread (events, 0, socket_observable);
450     if (t_id == -1)
451     {
452         logf (LOG_FATAL|LOG_ERRNO, "_beginthread failed");
453         exit (1);
454     }
455 #else
456     pthread_t type;
457
458     int id = pthread_create (&type, 0, events, socket_observable);
459     logf (LOG_LOG, "pthread_create returned id=%d", id);
460 #endif
461 }
462 // Threads end
463 #endif