More work on timeout handling. Work on yaz-client.
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /*
2  * Copyright (c) 1998-1999, Index Data.
3  * See the file LICENSE for details.
4  * Sebastian Hammer, Adam Dickmeiss
5  * 
6  * $Log: yaz-pdu-assoc.cpp,v $
7  * Revision 1.4  1999-03-23 14:17:57  adam
8  * More work on timeout handling. Work on yaz-client.
9  *
10  * Revision 1.3  1999/02/02 14:01:20  adam
11  * First WIN32 port of YAZ++.
12  *
13  * Revision 1.2  1999/01/28 13:08:44  adam
14  * Yaz_PDU_Assoc better encapsulated. Memory leak fix in
15  * yaz-socket-manager.cc.
16  *
17  * Revision 1.1.1.1  1999/01/28 09:41:07  adam
18  * First implementation of YAZ++.
19  *
20  */
21
22 #include <assert.h>
23
24 #include <yaz-pdu-assoc.h>
25
26 #include <log.h>
27 #include <tcpip.h>
28
29 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable,
30                              COMSTACK cs)
31 {
32     m_state = Closed;
33     m_cs = cs;
34     m_socketObservable = socketObservable;
35     m_PDU_Observer = 0;
36     m_queue_out = 0;
37     m_input_buf = 0;
38     m_input_len = 0;
39     m_children = 0;
40     m_parent = 0;
41     m_next = 0;
42     m_destroyed = 0;
43 }
44
45 IYaz_PDU_Observable *Yaz_PDU_Assoc::clone()
46 {
47     Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable, 0);
48     return copy;
49 }
50
51 Yaz_PDU_Assoc::~Yaz_PDU_Assoc()
52 {
53     destroy();
54 }
55
56 void Yaz_PDU_Assoc::socketNotify(int event)
57 {
58     logf (LOG_LOG, "socketNotify p=%p event = %d", this, event);
59     if (m_state == Connected)
60     {
61         m_state = Ready;
62         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
63                                          YAZ_SOCKET_OBSERVE_EXCEPT);
64         m_PDU_Observer->connectNotify();
65         flush_PDU();
66     }
67     else if (m_state == Connecting)
68     {
69         if (event & YAZ_SOCKET_OBSERVE_READ)
70         {
71             close();
72             m_PDU_Observer->failNotify();
73         }
74         else
75         {
76             m_state = Ready;
77             m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
78                                              YAZ_SOCKET_OBSERVE_EXCEPT);
79             m_PDU_Observer->connectNotify();
80             flush_PDU();
81         }
82     }
83     else if (m_state == Listen)
84     {
85         if (event & YAZ_SOCKET_OBSERVE_READ)
86         {
87             int res;
88             COMSTACK new_line;
89             
90             if ((res = cs_listen(m_cs, 0, 0)) == 1)
91                 return;
92             if (res < 0)
93             {
94                 logf(LOG_FATAL, "cs_listen failed");
95                 return;
96             }
97             if (!(new_line = cs_accept(m_cs)))
98                 return;
99             
100             Yaz_PDU_Assoc *assoc = new Yaz_PDU_Assoc (m_socketObservable,
101                                                       new_line);
102             assoc->m_parent = this;
103             assoc->m_next = m_children;
104             m_children = assoc;
105             
106             assoc->m_PDU_Observer = m_PDU_Observer->clone(assoc);
107             assoc->m_state = Ready;
108             assoc->m_socketObservable->addObserver(cs_fileno(new_line), assoc);
109             assoc->m_socketObservable->maskObserver(assoc,
110                                                     YAZ_SOCKET_OBSERVE_READ|
111                                                     YAZ_SOCKET_OBSERVE_EXCEPT);
112             if (m_idleTime)
113                 assoc->m_socketObservable->timeoutObserver(assoc, m_idleTime);
114         }
115     }
116     else if (m_state == Ready)
117     {
118         if (event & YAZ_SOCKET_OBSERVE_WRITE)
119         {
120             flush_PDU();
121         }
122         if (event & YAZ_SOCKET_OBSERVE_READ)
123         {
124             do
125             {
126                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
127                 if (res == 1)
128                     return;
129                 else if (res <= 0)
130                 {
131                     logf (LOG_LOG, "Connection closed by client");
132                     close();
133                     m_PDU_Observer->failNotify();
134                     return;
135                 }
136                 // lock it, so we know if recv_PDU deletes it.
137                 int destroyed = 0;
138                 m_destroyed = &destroyed;
139
140                 m_PDU_Observer->recv_PDU(m_input_buf, res);
141                 if (destroyed)   // it really was destroyed, return now.
142                     return;
143             } while (m_cs && cs_more (m_cs));
144         }
145         if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
146         {
147             m_PDU_Observer->timeoutNotify();
148         }
149     }
150 }
151
152 void Yaz_PDU_Assoc::close()
153 {
154     m_socketObservable->deleteObserver(this);
155     m_state = Closed;
156     if (m_cs)
157     {
158         logf (LOG_LOG, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs));
159         cs_close (m_cs);
160     }
161     m_cs = 0;
162     while (m_queue_out)
163     {
164         PDU_Queue *q_this = m_queue_out;
165         m_queue_out = m_queue_out->m_next;
166         delete q_this;
167     }
168 //   free (m_input_buf);
169     m_input_buf = 0;
170     m_input_len = 0;
171 }
172
173 void Yaz_PDU_Assoc::destroy()
174 {
175     close();
176     if (m_destroyed)
177         *m_destroyed = 1;
178     Yaz_PDU_Assoc **c;
179
180     // delete from parent's child list (if any)
181     if (m_parent)
182     {
183         c = &m_parent->m_children;
184         while (*c != this)
185         {
186             assert (*c);
187             c = &(*c)->m_next;
188         }
189         *c = (*c)->m_next;
190     }
191     // delete all children ...
192     c = &m_children;
193     while (*c)
194     {
195         Yaz_PDU_Assoc *here = *c;
196         *c = (*c)->m_next;
197         here->m_parent = 0;
198         delete here;
199     }
200 }
201
202 Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
203 {
204     m_buf = (char *) malloc (len);
205     memcpy (m_buf, buf, len);
206     m_len = len;
207     m_next = 0;
208 }
209
210 Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue()
211 {
212     free (m_buf);
213 }
214
215 int Yaz_PDU_Assoc::flush_PDU()
216 {
217     int r;
218
219     if (m_state != Ready)
220         return 1;
221     PDU_Queue *q = m_queue_out;
222     if (!q)
223     {
224         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
225                                          YAZ_SOCKET_OBSERVE_EXCEPT);
226         return 0;
227     }
228     r = cs_put (m_cs, q->m_buf, q->m_len);
229     if (r < 0)
230     {
231         close();
232         m_PDU_Observer->failNotify();
233         return r;
234     }
235     if (r == 1)
236     {
237         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
238                                          YAZ_SOCKET_OBSERVE_EXCEPT|
239                                          YAZ_SOCKET_OBSERVE_WRITE);
240         logf (LOG_LOG, "put %d bytes (incomplete write)", q->m_len);
241         return r;
242     }
243     logf (LOG_LOG, "put %d bytes fd=%d", q->m_len, cs_fileno(m_cs));
244     // whole packet sent... delete this and proceed to next ...
245     m_queue_out = q->m_next;
246     logf (LOG_LOG, "m_queue_out = %p", m_queue_out);
247     delete q;
248     // don't select on write if queue is empty ...
249     if (!m_queue_out)
250         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
251                                          YAZ_SOCKET_OBSERVE_EXCEPT);
252     return r;
253 }
254
255 int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
256 {
257     logf (LOG_LOG, "send_PDU");
258     PDU_Queue **pq = &m_queue_out;
259     int is_idle = (*pq ? 0 : 1);
260     
261     if (!m_cs)
262     {
263         logf (LOG_LOG, "send_PDU failed, m_cs == 0");
264         return 0;
265     }
266     while (*pq)
267         pq = &(*pq)->m_next;
268     *pq = new PDU_Queue(buf, len);
269     if (is_idle)
270     {
271         return flush_PDU ();
272     }
273     else
274     {
275         logf (LOG_LOG, "cannot send_PDU fd=%d", cs_fileno(m_cs));
276     }
277     return 0;
278 }
279
280 COMSTACK Yaz_PDU_Assoc::comstack()
281 {
282     if (!m_cs)
283     {
284         CS_TYPE cs_type = tcpip_type;
285         int protocol = PROTO_Z3950;
286         m_cs = cs_create (cs_type, 0, protocol);
287     }
288     return m_cs;
289 }
290
291 void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
292                            const char *addr)
293 {
294     close();
295     void *ap;
296     COMSTACK cs = comstack();
297
298     logf (LOG_LOG, "Yaz_PDU_Assoc::listen %s", addr);
299     m_PDU_Observer = observer;
300     if (!cs)
301         return;
302     ap = cs_straddr (cs, addr);
303     if (!ap)
304         return;
305     if (cs_bind(cs, ap, CS_SERVER) < 0)
306         return;
307     m_socketObservable->addObserver(cs_fileno(cs), this);
308     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
309                                      YAZ_SOCKET_OBSERVE_EXCEPT);
310     m_state = Listen;
311 }
312
313 void Yaz_PDU_Assoc::idleTime(int idleTime)
314 {
315     m_idleTime = idleTime;
316 }
317
318 void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
319                             const char *addr)
320 {
321     logf (LOG_LOG, "Yaz_PDU_Assoc::connect %s", addr);
322     close();
323     m_PDU_Observer = observer;
324     COMSTACK cs = comstack();
325     void *ap = cs_straddr (cs, addr);
326     if (!ap)
327     {
328         logf (LOG_LOG, "cs_straddr failed");
329         return;
330     }
331     int res = cs_connect (cs, ap);
332     if (res < 0)
333     {
334         logf (LOG_DEBUG, "Yaz_PDU_Assoc::connect failed");
335         close ();
336     }
337     else
338     {
339         logf (LOG_LOG, "Yaz_PDU_Assoc::connect fd=%d", cs_fileno(cs));
340         m_socketObservable->addObserver(cs_fileno(cs), this);
341         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
342                                          YAZ_SOCKET_OBSERVE_EXCEPT|
343                                          YAZ_SOCKET_OBSERVE_WRITE);
344         if (res == 1)
345             m_state = Connecting;
346         else
347             m_state = Connected;
348     }
349 }