Many improvements. Modified to proxy server to work with "sessions"
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /*
2  * Copyright (c) 1998-1999, Index Data.
3  * See the file LICENSE for details.
4  * Sebastian Hammer, Adam Dickmeiss
5  * 
6  * $Log: yaz-pdu-assoc.cpp,v $
7  * Revision 1.7  1999-04-21 12:09:01  adam
8  * Many improvements. Modified to proxy server to work with "sessions"
9  * based on cookies.
10  *
11  * Revision 1.6  1999/04/20 10:30:05  adam
12  * Implemented various stuff for client and proxy. Updated calls
13  * to ODR to reflect new name parameter.
14  *
15  * Revision 1.5  1999/04/09 11:46:57  adam
16  * Added object Yaz_Z_Assoc. Much more functional client.
17  *
18  * Revision 1.4  1999/03/23 14:17:57  adam
19  * More work on timeout handling. Work on yaz-client.
20  *
21  * Revision 1.3  1999/02/02 14:01:20  adam
22  * First WIN32 port of YAZ++.
23  *
24  * Revision 1.2  1999/01/28 13:08:44  adam
25  * Yaz_PDU_Assoc better encapsulated. Memory leak fix in
26  * yaz-socket-manager.cc.
27  *
28  * Revision 1.1.1.1  1999/01/28 09:41:07  adam
29  * First implementation of YAZ++.
30  *
31  */
32
33 #include <assert.h>
34
35 #include <yaz-pdu-assoc.h>
36
37 #include <log.h>
38 #include <tcpip.h>
39
40 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable,
41                              COMSTACK cs)
42 {
43     m_state = Closed;
44     m_cs = cs;
45     m_socketObservable = socketObservable;
46     m_PDU_Observer = 0;
47     m_queue_out = 0;
48     m_input_buf = 0;
49     m_input_len = 0;
50     m_children = 0;
51     m_parent = 0;
52     m_next = 0;
53     m_destroyed = 0;
54 }
55
56 IYaz_PDU_Observable *Yaz_PDU_Assoc::clone()
57 {
58     Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable, 0);
59     return copy;
60 }
61
62 Yaz_PDU_Assoc::~Yaz_PDU_Assoc()
63 {
64     destroy();
65 }
66
67 void Yaz_PDU_Assoc::socketNotify(int event)
68 {
69     logf (LOG_LOG, "Yaz_PDU_Assoc::socketNotify p=%p event = %d", this, event);
70     if (m_state == Connected)
71     {
72         m_state = Ready;
73         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
74                                          YAZ_SOCKET_OBSERVE_EXCEPT);
75         m_PDU_Observer->connectNotify();
76         flush_PDU();
77     }
78     else if (m_state == Connecting)
79     {
80         if (event & YAZ_SOCKET_OBSERVE_READ)
81         {
82             close();
83             m_PDU_Observer->failNotify();
84         }
85         else
86         {
87             m_state = Ready;
88             m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
89                                              YAZ_SOCKET_OBSERVE_EXCEPT);
90             m_PDU_Observer->connectNotify();
91             flush_PDU();
92         }
93     }
94     else if (m_state == Listen)
95     {
96         if (event & YAZ_SOCKET_OBSERVE_READ)
97         {
98             int res;
99             COMSTACK new_line;
100             
101             if ((res = cs_listen(m_cs, 0, 0)) == 1)
102                 return;
103             if (res < 0)
104             {
105                 logf(LOG_FATAL, "cs_listen failed");
106                 return;
107             }
108             if (!(new_line = cs_accept(m_cs)))
109                 return;
110             
111             Yaz_PDU_Assoc *assoc = new Yaz_PDU_Assoc (m_socketObservable,
112                                                       new_line);
113             assoc->m_parent = this;
114             assoc->m_next = m_children;
115             m_children = assoc;
116             
117             assoc->m_PDU_Observer = m_PDU_Observer->clone(assoc);
118             assoc->m_state = Ready;
119             assoc->m_socketObservable->addObserver(cs_fileno(new_line), assoc);
120             assoc->m_socketObservable->maskObserver(assoc,
121                                                     YAZ_SOCKET_OBSERVE_READ|
122                                                     YAZ_SOCKET_OBSERVE_EXCEPT);
123             assoc->m_socketObservable->timeoutObserver(assoc,
124                                                        assoc->m_idleTime);
125         }
126     }
127     else if (m_state == Ready)
128     {
129         if (event & YAZ_SOCKET_OBSERVE_WRITE)
130         {
131             flush_PDU();
132         }
133         if (event & YAZ_SOCKET_OBSERVE_READ)
134         {
135             do
136             {
137                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
138                 if (res == 1)
139                     return;
140                 else if (res <= 0)
141                 {
142                     logf (LOG_LOG, "Connection closed by client");
143                     close();
144                     m_PDU_Observer->failNotify();
145                     return;
146                 }
147                 // lock it, so we know if recv_PDU deletes it.
148                 int destroyed = 0;
149                 m_destroyed = &destroyed;
150
151                 m_PDU_Observer->recv_PDU(m_input_buf, res);
152                 if (destroyed)   // it really was destroyed, return now.
153                     return;
154             } while (m_cs && cs_more (m_cs));
155         }
156         if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
157         {
158             m_PDU_Observer->timeoutNotify();
159         }
160     }
161 }
162
163 void Yaz_PDU_Assoc::close()
164 {
165     m_socketObservable->deleteObserver(this);
166     m_state = Closed;
167     if (m_cs)
168     {
169         logf (LOG_LOG, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs));
170         cs_close (m_cs);
171     }
172     m_cs = 0;
173     while (m_queue_out)
174     {
175         PDU_Queue *q_this = m_queue_out;
176         m_queue_out = m_queue_out->m_next;
177         delete q_this;
178     }
179 //   free (m_input_buf);
180     m_input_buf = 0;
181     m_input_len = 0;
182 }
183
184 void Yaz_PDU_Assoc::destroy()
185 {
186     close();
187     if (m_destroyed)
188         *m_destroyed = 1;
189     Yaz_PDU_Assoc **c;
190
191     // delete from parent's child list (if any)
192     if (m_parent)
193     {
194         c = &m_parent->m_children;
195         while (*c != this)
196         {
197             assert (*c);
198             c = &(*c)->m_next;
199         }
200         *c = (*c)->m_next;
201     }
202     // delete all children ...
203     c = &m_children;
204     while (*c)
205     {
206         Yaz_PDU_Assoc *here = *c;
207         *c = (*c)->m_next;
208         here->m_parent = 0;
209         delete here;
210     }
211 }
212
213 Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
214 {
215     m_buf = (char *) malloc (len);
216     memcpy (m_buf, buf, len);
217     m_len = len;
218     m_next = 0;
219 }
220
221 Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue()
222 {
223     free (m_buf);
224 }
225
226 int Yaz_PDU_Assoc::flush_PDU()
227 {
228     int r;
229     
230     logf (LOG_LOG, "Yaz_PDU_Assoc::flush_PDU");
231     if (m_state != Ready)
232     {
233         return 1;
234     }
235     PDU_Queue *q = m_queue_out;
236     if (!q)
237     {
238         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
239                                          YAZ_SOCKET_OBSERVE_EXCEPT);
240         return 0;
241     }
242     r = cs_put (m_cs, q->m_buf, q->m_len);
243     if (r < 0)
244     {
245         close();
246         m_PDU_Observer->failNotify();
247         return r;
248     }
249     if (r == 1)
250     {
251         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
252                                          YAZ_SOCKET_OBSERVE_EXCEPT|
253                                          YAZ_SOCKET_OBSERVE_WRITE);
254         logf (LOG_LOG, "Yaz_PDU_Assoc::flush_PDU put %d bytes (incomplete)",
255               q->m_len);
256         return r;
257     }
258     logf (LOG_LOG, "Yaz_PDU_Assoc::flush_PDU put %d bytes", q->m_len);
259     // whole packet sent... delete this and proceed to next ...
260     m_queue_out = q->m_next;
261     delete q;
262     // don't select on write if queue is empty ...
263     if (!m_queue_out)
264         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
265                                          YAZ_SOCKET_OBSERVE_EXCEPT);
266     return r;
267 }
268
269 int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
270 {
271     logf (LOG_LOG, "Yaz_PDU_Assoc::send_PDU");
272     PDU_Queue **pq = &m_queue_out;
273     int is_idle = (*pq ? 0 : 1);
274     
275     if (!m_cs)
276     {
277         logf (LOG_LOG, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0");
278         return -1;
279     }
280     while (*pq)
281         pq = &(*pq)->m_next;
282     *pq = new PDU_Queue(buf, len);
283     if (is_idle)
284         return flush_PDU ();
285     else
286         logf (LOG_LOG, "Yaz_PDU_Assoc::cannot send_PDU fd=%d",
287               cs_fileno(m_cs));
288     return 0;
289 }
290
291 COMSTACK Yaz_PDU_Assoc::comstack()
292 {
293     if (!m_cs)
294     {
295         CS_TYPE cs_type = tcpip_type;
296         int protocol = PROTO_Z3950;
297         m_cs = cs_create (cs_type, 0, protocol);
298     }
299     return m_cs;
300 }
301
302 void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
303                            const char *addr)
304 {
305     close();
306     void *ap;
307     COMSTACK cs = comstack();
308
309     logf (LOG_LOG, "Yaz_PDU_Assoc::listen %s", addr);
310     m_PDU_Observer = observer;
311     if (!cs)
312         return;
313     ap = cs_straddr (cs, addr);
314     if (!ap)
315         return;
316     if (cs_bind(cs, ap, CS_SERVER) < 0)
317         return;
318     m_socketObservable->addObserver(cs_fileno(cs), this);
319     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
320                                      YAZ_SOCKET_OBSERVE_EXCEPT);
321     m_state = Listen;
322 }
323
324 void Yaz_PDU_Assoc::idleTime(int idleTime)
325 {
326     m_idleTime = idleTime;
327     logf (LOG_LOG, "Yaz_PDU_Assoc::idleTime(%d)", idleTime);
328     m_socketObservable->timeoutObserver(this, m_idleTime);
329 }
330
331 void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
332                             const char *addr)
333 {
334     logf (LOG_LOG, "Yaz_PDU_Assoc::connect %s", addr);
335     close();
336     m_PDU_Observer = observer;
337     COMSTACK cs = comstack();
338     void *ap = cs_straddr (cs, addr);
339     if (!ap)
340     {
341         logf (LOG_LOG, "cs_straddr failed");
342         return;
343     }
344     int res = cs_connect (cs, ap);
345     if (res < 0)
346     {
347         logf (LOG_LOG, "Yaz_PDU_Assoc::connect failed");
348 #if 1
349         logf (LOG_LOG, "Yaz_PDU_Assoc::connect fd=%d", cs_fileno(cs));
350         m_socketObservable->addObserver(cs_fileno(cs), this);
351         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
352                                          YAZ_SOCKET_OBSERVE_EXCEPT|
353                                          YAZ_SOCKET_OBSERVE_WRITE);
354         m_state = Connecting;
355 #else
356         close ();
357 #endif
358     }
359     else
360     {
361         logf (LOG_LOG, "Yaz_PDU_Assoc::connect fd=%d", cs_fileno(cs));
362         m_socketObservable->addObserver(cs_fileno(cs), this);
363         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
364                                          YAZ_SOCKET_OBSERVE_EXCEPT|
365                                          YAZ_SOCKET_OBSERVE_WRITE);
366         if (res == 1)
367         {
368             logf (LOG_LOG, "Yaz_PDU_Assoc::connect pending");
369             m_state = Connecting;
370         }
371         else
372         {
373             logf (LOG_LOG, "Yaz_PDU_Assoc::Connect complete");
374             m_state = Connected;
375         }
376     }
377 }