Implemented various stuff for client and proxy. Updated calls
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /*
2  * Copyright (c) 1998-1999, Index Data.
3  * See the file LICENSE for details.
4  * Sebastian Hammer, Adam Dickmeiss
5  * 
6  * $Log: yaz-pdu-assoc.cpp,v $
7  * Revision 1.6  1999-04-20 10:30:05  adam
8  * Implemented various stuff for client and proxy. Updated calls
9  * to ODR to reflect new name parameter.
10  *
11  * Revision 1.5  1999/04/09 11:46:57  adam
12  * Added object Yaz_Z_Assoc. Much more functional client.
13  *
14  * Revision 1.4  1999/03/23 14:17:57  adam
15  * More work on timeout handling. Work on yaz-client.
16  *
17  * Revision 1.3  1999/02/02 14:01:20  adam
18  * First WIN32 port of YAZ++.
19  *
20  * Revision 1.2  1999/01/28 13:08:44  adam
21  * Yaz_PDU_Assoc better encapsulated. Memory leak fix in
22  * yaz-socket-manager.cc.
23  *
24  * Revision 1.1.1.1  1999/01/28 09:41:07  adam
25  * First implementation of YAZ++.
26  *
27  */
28
29 #include <assert.h>
30
31 #include <yaz-pdu-assoc.h>
32
33 #include <log.h>
34 #include <tcpip.h>
35
36 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable,
37                              COMSTACK cs)
38 {
39     m_state = Closed;
40     m_cs = cs;
41     m_socketObservable = socketObservable;
42     m_PDU_Observer = 0;
43     m_queue_out = 0;
44     m_input_buf = 0;
45     m_input_len = 0;
46     m_children = 0;
47     m_parent = 0;
48     m_next = 0;
49     m_destroyed = 0;
50 }
51
52 IYaz_PDU_Observable *Yaz_PDU_Assoc::clone()
53 {
54     Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable, 0);
55     return copy;
56 }
57
58 Yaz_PDU_Assoc::~Yaz_PDU_Assoc()
59 {
60     destroy();
61 }
62
63 void Yaz_PDU_Assoc::socketNotify(int event)
64 {
65     logf (LOG_LOG, "Yaz_PDU_Assoc::socketNotify p=%p event = %d", this, event);
66     if (m_state == Connected)
67     {
68         m_state = Ready;
69         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
70                                          YAZ_SOCKET_OBSERVE_EXCEPT);
71         m_PDU_Observer->connectNotify();
72         flush_PDU();
73     }
74     else if (m_state == Connecting)
75     {
76         if (event & YAZ_SOCKET_OBSERVE_READ)
77         {
78             close();
79             m_PDU_Observer->failNotify();
80         }
81         else
82         {
83             m_state = Ready;
84             m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
85                                              YAZ_SOCKET_OBSERVE_EXCEPT);
86             m_PDU_Observer->connectNotify();
87             flush_PDU();
88         }
89     }
90     else if (m_state == Listen)
91     {
92         if (event & YAZ_SOCKET_OBSERVE_READ)
93         {
94             int res;
95             COMSTACK new_line;
96             
97             if ((res = cs_listen(m_cs, 0, 0)) == 1)
98                 return;
99             if (res < 0)
100             {
101                 logf(LOG_FATAL, "cs_listen failed");
102                 return;
103             }
104             if (!(new_line = cs_accept(m_cs)))
105                 return;
106             
107             Yaz_PDU_Assoc *assoc = new Yaz_PDU_Assoc (m_socketObservable,
108                                                       new_line);
109             assoc->m_parent = this;
110             assoc->m_next = m_children;
111             m_children = assoc;
112             
113             assoc->m_PDU_Observer = m_PDU_Observer->clone(assoc);
114             assoc->m_state = Ready;
115             assoc->m_socketObservable->addObserver(cs_fileno(new_line), assoc);
116             assoc->m_socketObservable->maskObserver(assoc,
117                                                     YAZ_SOCKET_OBSERVE_READ|
118                                                     YAZ_SOCKET_OBSERVE_EXCEPT);
119             if (m_idleTime)
120                 assoc->m_socketObservable->timeoutObserver(assoc, m_idleTime);
121         }
122     }
123     else if (m_state == Ready)
124     {
125         if (event & YAZ_SOCKET_OBSERVE_WRITE)
126         {
127             flush_PDU();
128         }
129         if (event & YAZ_SOCKET_OBSERVE_READ)
130         {
131             do
132             {
133                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
134                 if (res == 1)
135                     return;
136                 else if (res <= 0)
137                 {
138                     logf (LOG_LOG, "Connection closed by client");
139                     close();
140                     m_PDU_Observer->failNotify();
141                     return;
142                 }
143                 // lock it, so we know if recv_PDU deletes it.
144                 int destroyed = 0;
145                 m_destroyed = &destroyed;
146
147                 m_PDU_Observer->recv_PDU(m_input_buf, res);
148                 if (destroyed)   // it really was destroyed, return now.
149                     return;
150             } while (m_cs && cs_more (m_cs));
151         }
152         if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
153         {
154             m_PDU_Observer->timeoutNotify();
155         }
156     }
157 }
158
159 void Yaz_PDU_Assoc::close()
160 {
161     m_socketObservable->deleteObserver(this);
162     m_state = Closed;
163     if (m_cs)
164     {
165         logf (LOG_LOG, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs));
166         cs_close (m_cs);
167     }
168     m_cs = 0;
169     while (m_queue_out)
170     {
171         PDU_Queue *q_this = m_queue_out;
172         m_queue_out = m_queue_out->m_next;
173         delete q_this;
174     }
175 //   free (m_input_buf);
176     m_input_buf = 0;
177     m_input_len = 0;
178 }
179
180 void Yaz_PDU_Assoc::destroy()
181 {
182     close();
183     if (m_destroyed)
184         *m_destroyed = 1;
185     Yaz_PDU_Assoc **c;
186
187     // delete from parent's child list (if any)
188     if (m_parent)
189     {
190         c = &m_parent->m_children;
191         while (*c != this)
192         {
193             assert (*c);
194             c = &(*c)->m_next;
195         }
196         *c = (*c)->m_next;
197     }
198     // delete all children ...
199     c = &m_children;
200     while (*c)
201     {
202         Yaz_PDU_Assoc *here = *c;
203         *c = (*c)->m_next;
204         here->m_parent = 0;
205         delete here;
206     }
207 }
208
209 Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
210 {
211     m_buf = (char *) malloc (len);
212     memcpy (m_buf, buf, len);
213     m_len = len;
214     m_next = 0;
215 }
216
217 Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue()
218 {
219     free (m_buf);
220 }
221
222 int Yaz_PDU_Assoc::flush_PDU()
223 {
224     int r;
225     
226     logf (LOG_LOG, "Yaz_PDU_Assoc::flush_PDU");
227     if (m_state != Ready)
228     {
229         return 1;
230     }
231     PDU_Queue *q = m_queue_out;
232     if (!q)
233     {
234         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
235                                          YAZ_SOCKET_OBSERVE_EXCEPT);
236         return 0;
237     }
238     r = cs_put (m_cs, q->m_buf, q->m_len);
239     if (r < 0)
240     {
241         close();
242         m_PDU_Observer->failNotify();
243         return r;
244     }
245     if (r == 1)
246     {
247         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
248                                          YAZ_SOCKET_OBSERVE_EXCEPT|
249                                          YAZ_SOCKET_OBSERVE_WRITE);
250         logf (LOG_LOG, "Yaz_PDU_Assoc::flush_PDU put %d bytes (incomplete)",
251               q->m_len);
252         return r;
253     }
254     logf (LOG_LOG, "Yaz_PDU_Assoc::flush_PDU put %d bytes", q->m_len);
255     // whole packet sent... delete this and proceed to next ...
256     m_queue_out = q->m_next;
257     delete q;
258     // don't select on write if queue is empty ...
259     if (!m_queue_out)
260         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
261                                          YAZ_SOCKET_OBSERVE_EXCEPT);
262     return r;
263 }
264
265 int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
266 {
267     logf (LOG_LOG, "Yaz_PDU_Assoc::send_PDU");
268     PDU_Queue **pq = &m_queue_out;
269     int is_idle = (*pq ? 0 : 1);
270     
271     if (!m_cs)
272     {
273         logf (LOG_LOG, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0");
274         return 0;
275     }
276     while (*pq)
277         pq = &(*pq)->m_next;
278     *pq = new PDU_Queue(buf, len);
279     if (is_idle)
280         return flush_PDU ();
281     else
282         logf (LOG_LOG, "Yaz_PDU_Assoc::cannot send_PDU fd=%d",
283               cs_fileno(m_cs));
284     return 0;
285 }
286
287 COMSTACK Yaz_PDU_Assoc::comstack()
288 {
289     if (!m_cs)
290     {
291         CS_TYPE cs_type = tcpip_type;
292         int protocol = PROTO_Z3950;
293         m_cs = cs_create (cs_type, 0, protocol);
294     }
295     return m_cs;
296 }
297
298 void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
299                            const char *addr)
300 {
301     close();
302     void *ap;
303     COMSTACK cs = comstack();
304
305     logf (LOG_LOG, "Yaz_PDU_Assoc::listen %s", addr);
306     m_PDU_Observer = observer;
307     if (!cs)
308         return;
309     ap = cs_straddr (cs, addr);
310     if (!ap)
311         return;
312     if (cs_bind(cs, ap, CS_SERVER) < 0)
313         return;
314     m_socketObservable->addObserver(cs_fileno(cs), this);
315     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
316                                      YAZ_SOCKET_OBSERVE_EXCEPT);
317     m_state = Listen;
318 }
319
320 void Yaz_PDU_Assoc::idleTime(int idleTime)
321 {
322     m_idleTime = idleTime;
323 }
324
325 void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
326                             const char *addr)
327 {
328     logf (LOG_LOG, "Yaz_PDU_Assoc::connect %s", addr);
329     close();
330     m_PDU_Observer = observer;
331     COMSTACK cs = comstack();
332     void *ap = cs_straddr (cs, addr);
333     if (!ap)
334     {
335         logf (LOG_LOG, "cs_straddr failed");
336         return;
337     }
338     int res = cs_connect (cs, ap);
339     if (res < 0)
340     {
341         logf (LOG_LOG, "Yaz_PDU_Assoc::connect failed");
342 #if 1
343         logf (LOG_LOG, "Yaz_PDU_Assoc::connect fd=%d", cs_fileno(cs));
344         m_socketObservable->addObserver(cs_fileno(cs), this);
345         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
346                                          YAZ_SOCKET_OBSERVE_EXCEPT|
347                                          YAZ_SOCKET_OBSERVE_WRITE);
348         m_state = Connecting;
349 #else
350         close ();
351 #endif
352     }
353     else
354     {
355         logf (LOG_LOG, "Yaz_PDU_Assoc::connect fd=%d", cs_fileno(cs));
356         m_socketObservable->addObserver(cs_fileno(cs), this);
357         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
358                                          YAZ_SOCKET_OBSERVE_EXCEPT|
359                                          YAZ_SOCKET_OBSERVE_WRITE);
360         if (res == 1)
361         {
362             logf (LOG_LOG, "Yaz_PDU_Assoc::connect pending");
363             m_state = Connecting;
364         }
365         else
366         {
367             logf (LOG_LOG, "Yaz_PDU_Assoc::Connect complete");
368             m_state = Connected;
369         }
370     }
371 }