Working on UrsulaRenewal, Request, and Update
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /*
2  * Copyright (c) 1998-2001, Index Data.
3  * See the file LICENSE for details.
4  * 
5  * $Log: yaz-pdu-assoc.cpp,v $
6  * Revision 1.23  2001-03-26 14:43:49  adam
7  * New threaded PDU association.
8  *
9  * Revision 1.22  2001/01/29 11:18:24  adam
10  * Server sets OPTIONS search and present.
11  *
12  * Revision 1.21  2000/11/20 14:17:36  adam
13  * Yet another WIN32 fix for connect notify.
14  *
15  * Revision 1.20  2000/11/20 11:27:33  adam
16  * Fixes for connect operation (timeout and notify fix).
17  *
18  * Revision 1.19  2000/11/01 14:22:59  adam
19  * Added fd parameter for method IYaz_PDU_Observer::clone.
20  *
21  * Revision 1.18  2000/10/24 12:29:57  adam
22  * Fixed bug in proxy where a Yaz_ProxyClient could be owned by
23  * two Yaz_Proxy's (fatal).
24  *
25  * Revision 1.17  2000/10/11 11:58:16  adam
26  * Moved header files to include/yaz++. Switched to libtool and automake.
27  * Configure script creates yaz++-config script.
28  *
29  * Revision 1.16  2000/09/22 09:54:11  heikki
30  * minor
31  *
32  * Revision 1.15  2000/09/21 21:43:20  adam
33  * Better high-level server API.
34  *
35  * Revision 1.14  2000/09/12 12:09:53  adam
36  * More work on high-level server.
37  *
38  * Revision 1.13  2000/09/08 10:23:42  adam
39  * Added skeleton of yaz-z-server.
40  *
41  * Revision 1.12  2000/09/06 14:23:45  adam
42  * WIN32 updates.
43  *
44  * Revision 1.11  2000/09/04 08:29:22  adam
45  * Fixed memory leak(s). Added re-use of associations, rather than
46  * re-init, when maximum number of targets are in use.
47  *
48  * Revision 1.10  2000/08/10 08:42:42  adam
49  * Fixes for {set,get}_APDU_log.
50  *
51  * Revision 1.9  1999/12/06 13:52:45  adam
52  * Modified for new location of YAZ header files. Experimental threaded
53  * operation.
54  *
55  * Revision 1.8  1999/04/28 13:04:03  adam
56  * Fixed setting of proxy otherInfo so that database(s) are removed.
57  *
58  * Revision 1.7  1999/04/21 12:09:01  adam
59  * Many improvements. Modified to proxy server to work with "sessions"
60  * based on cookies.
61  *
62  * Revision 1.6  1999/04/20 10:30:05  adam
63  * Implemented various stuff for client and proxy. Updated calls
64  * to ODR to reflect new name parameter.
65  *g
66  * Revision 1.5  1999/04/09 11:46:57  adam
67  * Added object Yaz_Z_Assoc. Much more functional client.
68  *
69  * Revision 1.4  1999/03/23 14:17:57  adam
70  * More work on timeout handling. Work on yaz-client.
71  *
72  * Revision 1.3  1999/02/02 14:01:20  adam
73  * First WIN32 port of YAZ++.
74  *
75  * Revision 1.2  1999/01/28 13:08:44  adam
76  * Yaz_PDU_Assoc better encapsulated. Memory leak fix in
77  * yaz-socket-manager.cc.
78  *
79  * Revision 1.1.1.1  1999/01/28 09:41:07  adam
80  * First implementation of YAZ++.
81  *
82  */
83
84 #include <assert.h>
85 #include <string.h>
86 #include <yaz/log.h>
87 #include <yaz/tcpip.h>
88
89 #include <yaz++/yaz-pdu-assoc.h>
90
91
92 void Yaz_PDU_Assoc::init(IYazSocketObservable *socketObservable)
93 {
94     m_state = Closed;
95     m_cs = 0;
96     m_socketObservable = socketObservable;
97     m_PDU_Observer = 0;
98     m_queue_out = 0;
99     m_input_buf = 0;
100     m_input_len = 0;
101     m_children = 0;
102     m_parent = 0;
103     m_next = 0;
104     m_destroyed = 0;
105     m_idleTime = 0;
106     m_log = LOG_DEBUG;
107 }
108
109 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable)
110 {
111     init (socketObservable);
112 }
113
114 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable,
115                              COMSTACK cs)
116 {
117     init(socketObservable);
118     m_cs = cs;
119     unsigned mask = 0;
120     if (cs->io_pending & CS_WANT_WRITE)
121         mask |= YAZ_SOCKET_OBSERVE_WRITE;
122     if (cs->io_pending & CS_WANT_READ)
123         mask |= YAZ_SOCKET_OBSERVE_READ;
124     m_socketObservable->addObserver(cs_fileno(cs), this);
125     if (!mask)
126     {
127         yaz_log (m_log, "new PDU_Assoc. Ready");
128         m_state = Ready;
129         flush_PDU();
130     }
131     else
132     {
133         yaz_log (m_log, "new PDU_Assoc. Accepting");
134         // assume comstack is accepting...
135         m_state = Accepting;
136         m_socketObservable->addObserver(cs_fileno(cs), this);
137         m_socketObservable->maskObserver(this,
138                                          mask |YAZ_SOCKET_OBSERVE_EXCEPT);
139     }
140 }
141
142
143 IYaz_PDU_Observable *Yaz_PDU_Assoc::clone()
144 {
145     Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable);
146     return copy;
147 }
148
149 void Yaz_PDU_Assoc::socketNotify(int event)
150 {
151     yaz_log (m_log, "Yaz_PDU_Assoc::socketNotify p=%p state=%d event = %d",
152           this, m_state, event);
153     if (event & YAZ_SOCKET_OBSERVE_EXCEPT)
154     {
155         close();
156         m_PDU_Observer->failNotify();
157         return;
158     }
159     else if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
160     {
161         m_PDU_Observer->timeoutNotify();
162         return;
163     }
164     switch (m_state)
165     {
166     case Accepting:
167         if (!cs_accept (m_cs))
168         {
169             yaz_log (m_log, "Yaz_PDU_Assoc::cs_accept failed");
170             m_cs = 0;
171             close();
172             m_PDU_Observer->failNotify();
173         }
174         else
175         {
176             unsigned mask = 0;
177             if (m_cs->io_pending & CS_WANT_WRITE)
178                 mask |= YAZ_SOCKET_OBSERVE_WRITE;
179             if (m_cs->io_pending & CS_WANT_READ)
180                 mask |= YAZ_SOCKET_OBSERVE_READ;
181             if (!mask)
182             {   // accept is complete. turn to ready state and write if needed
183                 m_state = Ready;
184                 flush_PDU();
185             }
186             else  
187             {   // accept still incomplete.
188                 m_socketObservable->maskObserver(this,
189                                              mask|YAZ_SOCKET_OBSERVE_EXCEPT);
190             }
191         }
192         break;
193     case Connecting:
194         if (event & YAZ_SOCKET_OBSERVE_READ && 
195             event & YAZ_SOCKET_OBSERVE_WRITE)
196         {
197             // For Unix: if both read and write is set, then connect failed.
198             close();
199             m_PDU_Observer->failNotify();
200         }
201         else
202         {
203             yaz_log (m_log, "cs_connect again");
204             int res = cs_connect (m_cs, 0);
205             if (res == 1)
206             {
207                 unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
208                 if (m_cs->io_pending & CS_WANT_WRITE)
209                     mask |= YAZ_SOCKET_OBSERVE_WRITE;
210                 if (m_cs->io_pending & CS_WANT_READ)
211                     mask |= YAZ_SOCKET_OBSERVE_READ;
212                 m_socketObservable->maskObserver(this, mask);
213             }
214             else
215             {
216                 m_state = Ready;
217                 m_PDU_Observer->connectNotify();
218                 flush_PDU();
219             }
220         }
221         break;
222     case Listen:
223         if (event & YAZ_SOCKET_OBSERVE_READ)
224         {
225             int res;
226             COMSTACK new_line;
227             
228             if ((res = cs_listen(m_cs, 0, 0)) == 1)
229                 return;
230             if (res < 0)
231             {
232                 yaz_log(LOG_FATAL|LOG_ERRNO, "cs_listen failed");
233                 return;
234             }
235             if (!(new_line = cs_accept(m_cs)))
236                 return;
237             /* 1. create socket-manager 
238                2. create pdu-assoc
239                3. create top-level object
240                     setup observer for child fileid in pdu-assoc
241                4. start thread
242             */
243             childNotify (new_line);
244         }
245         break;
246     case Writing:
247         if (event & (YAZ_SOCKET_OBSERVE_READ|YAZ_SOCKET_OBSERVE_WRITE))
248             flush_PDU();
249         break;
250     case Ready:
251         if (event & (YAZ_SOCKET_OBSERVE_READ|YAZ_SOCKET_OBSERVE_WRITE))
252         {
253             do
254             {
255                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
256                 if (res == 1)
257                 {
258                     unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
259                     if (m_cs->io_pending & CS_WANT_WRITE)
260                         mask |= YAZ_SOCKET_OBSERVE_WRITE;
261                     if (m_cs->io_pending & CS_WANT_READ)
262                         mask |= YAZ_SOCKET_OBSERVE_READ;
263                     m_socketObservable->maskObserver(this, mask);
264                     return;
265                 }
266                 else if (res <= 0)
267                 {
268                     yaz_log (m_log, "Yaz_PDU_Assoc::Connection closed by peer");
269                     close();
270                     m_PDU_Observer->failNotify(); // problem here..
271                     return;
272                 }
273                 // lock it, so we know if recv_PDU deletes it.
274                 int destroyed = 0;
275                 m_destroyed = &destroyed;
276
277                 m_PDU_Observer->recv_PDU(m_input_buf, res);
278                 m_destroyed = 0;
279                 if (destroyed)   // it really was destroyed, return now.
280                     return;
281             } while (m_cs && cs_more (m_cs));
282             if (m_cs)
283                 m_socketObservable->maskObserver(this,
284                                                  YAZ_SOCKET_OBSERVE_EXCEPT|
285                                                  YAZ_SOCKET_OBSERVE_READ);
286         }
287         break;
288     case Closed:
289         logf (m_log, "CLOSING state=%d event was %d", m_state, event);
290         close();
291         m_PDU_Observer->failNotify();
292         break;
293     default:
294         logf (m_log, "Unknown state=%d event was %d", m_state, event);
295         close();
296         m_PDU_Observer->failNotify();
297     }
298 }
299
300 void Yaz_PDU_Assoc::close()
301 {
302     m_socketObservable->deleteObserver(this);
303     m_state = Closed;
304     if (m_cs)
305     {
306         logf (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs));
307         cs_close (m_cs);
308     }
309     m_cs = 0;
310     while (m_queue_out)
311     {
312         PDU_Queue *q_this = m_queue_out;
313         m_queue_out = m_queue_out->m_next;
314         delete q_this;
315     }
316     xfree (m_input_buf);
317     m_input_buf = 0;
318     m_input_len = 0;
319 }
320
321 void Yaz_PDU_Assoc::destroy()
322 {
323     close();
324     if (m_destroyed)
325         *m_destroyed = 1;
326     Yaz_PDU_Assoc **c;
327
328     // delete from parent's child list (if any)
329     if (m_parent)
330     {
331         c = &m_parent->m_children;
332         while (*c != this)
333         {
334             assert (*c);
335             c = &(*c)->m_next;
336         }
337         *c = (*c)->m_next;
338     }
339     // delete all children ...
340     c = &m_children;
341     while (*c)
342     {
343         Yaz_PDU_Assoc *here = *c;
344         *c = (*c)->m_next;
345         here->m_parent = 0;
346         delete here;
347     }
348 }
349
350 Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
351 {
352     m_buf = (char *) malloc (len);
353     memcpy (m_buf, buf, len);
354     m_len = len;
355     m_next = 0;
356 }
357
358 Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue()
359 {
360     free (m_buf);
361 }
362
363 int Yaz_PDU_Assoc::flush_PDU()
364 {
365     int r;
366     
367     if (m_state != Ready && m_state != Writing)
368     {
369         logf (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready");
370         return 1;
371     }
372     PDU_Queue *q = m_queue_out;
373     if (!q)
374     {
375         m_state = Ready;
376         logf (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty");
377         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
378                                          YAZ_SOCKET_OBSERVE_WRITE|
379                                          YAZ_SOCKET_OBSERVE_EXCEPT);
380         return 0;
381     }
382     r = cs_put (m_cs, q->m_buf, q->m_len);
383     if (r < 0)
384     {
385         logf (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put failed");
386         close();
387         m_PDU_Observer->failNotify();
388         return r;
389     }
390     if (r == 1)
391     {
392         unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
393         m_state = Writing;
394         if (m_cs->io_pending & CS_WANT_WRITE)
395             mask |= YAZ_SOCKET_OBSERVE_WRITE;
396         if (m_cs->io_pending & CS_WANT_READ)
397             mask |= YAZ_SOCKET_OBSERVE_READ;
398  
399         m_socketObservable->maskObserver(this, mask);
400         logf (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes (incomplete)",
401               q->m_len);
402         return r;
403     } 
404     m_state = Ready;
405     logf (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
406     // whole packet sent... delete this and proceed to next ...
407     m_queue_out = q->m_next;
408     delete q;
409     // don't select on write if queue is empty ...
410     if (!m_queue_out)
411         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
412                                          YAZ_SOCKET_OBSERVE_EXCEPT);
413     return r;
414 }
415
416 int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
417 {
418     logf (m_log, "Yaz_PDU_Assoc::send_PDU");
419     PDU_Queue **pq = &m_queue_out;
420     int is_idle = (*pq ? 0 : 1);
421     
422     if (!m_cs)
423     {
424         logf (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0");
425         return -1;
426     }
427     while (*pq)
428         pq = &(*pq)->m_next;
429     *pq = new PDU_Queue(buf, len);
430     if (is_idle)
431         return flush_PDU ();
432     else
433         logf (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d",
434               cs_fileno(m_cs));
435     return 0;
436 }
437
438 COMSTACK Yaz_PDU_Assoc::comstack(const char *type_and_host, void **vp)
439 {
440     return cs_create_host(type_and_host, 0, vp);
441 }
442
443 void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
444                            const char *addr)
445 {
446     close();
447
448     logf (LOG_LOG, "Adding listener %s", addr);
449
450     m_PDU_Observer = observer;
451     void *ap;
452     m_cs = comstack(addr, &ap);
453
454     if (!m_cs)
455         return;
456     if (cs_bind(m_cs, ap, CS_SERVER) < 0)
457         return;
458     m_socketObservable->addObserver(cs_fileno(m_cs), this);
459     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
460                                      YAZ_SOCKET_OBSERVE_EXCEPT);
461     logf (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(m_cs));
462     m_state = Listen;
463 }
464
465 void Yaz_PDU_Assoc::idleTime(int idleTime)
466 {
467     m_idleTime = idleTime;
468     logf (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime);
469     m_socketObservable->timeoutObserver(this, m_idleTime);
470 }
471
472 void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
473                             const char *addr)
474 {
475     logf (m_log, "Yaz_PDU_Assoc::connect %s", addr);
476     close();
477     m_PDU_Observer = observer;
478     void *ap;
479     m_cs = comstack(addr, &ap);
480     int res = cs_connect (m_cs, ap);
481     logf (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs), res);
482     m_socketObservable->addObserver(cs_fileno(m_cs), this);
483
484     if (res >= 0)
485     {   // Connect pending or complet
486         m_state = Connecting;
487         unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
488         if (m_cs->io_pending & CS_WANT_WRITE)
489             mask |= YAZ_SOCKET_OBSERVE_WRITE;
490         if (m_cs->io_pending & CS_WANT_READ)
491             mask |= YAZ_SOCKET_OBSERVE_READ;
492         m_socketObservable->maskObserver(this, mask);
493     }
494     else
495     {   // Connect failed immediately
496         // Since m_state is Closed we can distinguish this case from
497         // normal connect in socketNotify handler
498         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_WRITE|
499                                          YAZ_SOCKET_OBSERVE_EXCEPT);
500     }
501 }
502
503 // Single-threaded... Only useful for non-blocking handlers
504 void Yaz_PDU_Assoc::childNotify(COMSTACK cs)
505 {
506     Yaz_PDU_Assoc *new_observable =
507         new Yaz_PDU_Assoc (m_socketObservable, cs);
508     
509     // Clone PDU Observer
510     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
511         (new_observable, cs_fileno(cs));
512 }