Added object Yaz_Z_Assoc. Much more functional client.
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /*
2  * Copyright (c) 1998-1999, Index Data.
3  * See the file LICENSE for details.
4  * Sebastian Hammer, Adam Dickmeiss
5  * 
6  * $Log: yaz-pdu-assoc.cpp,v $
7  * Revision 1.5  1999-04-09 11:46:57  adam
8  * Added object Yaz_Z_Assoc. Much more functional client.
9  *
10  * Revision 1.4  1999/03/23 14:17:57  adam
11  * More work on timeout handling. Work on yaz-client.
12  *
13  * Revision 1.3  1999/02/02 14:01:20  adam
14  * First WIN32 port of YAZ++.
15  *
16  * Revision 1.2  1999/01/28 13:08:44  adam
17  * Yaz_PDU_Assoc better encapsulated. Memory leak fix in
18  * yaz-socket-manager.cc.
19  *
20  * Revision 1.1.1.1  1999/01/28 09:41:07  adam
21  * First implementation of YAZ++.
22  *
23  */
24
25 #include <assert.h>
26
27 #include <yaz-pdu-assoc.h>
28
29 #include <log.h>
30 #include <tcpip.h>
31
32 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable,
33                              COMSTACK cs)
34 {
35     m_state = Closed;
36     m_cs = cs;
37     m_socketObservable = socketObservable;
38     m_PDU_Observer = 0;
39     m_queue_out = 0;
40     m_input_buf = 0;
41     m_input_len = 0;
42     m_children = 0;
43     m_parent = 0;
44     m_next = 0;
45     m_destroyed = 0;
46 }
47
48 IYaz_PDU_Observable *Yaz_PDU_Assoc::clone()
49 {
50     Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable, 0);
51     return copy;
52 }
53
54 Yaz_PDU_Assoc::~Yaz_PDU_Assoc()
55 {
56     destroy();
57 }
58
59 void Yaz_PDU_Assoc::socketNotify(int event)
60 {
61     logf (LOG_LOG, "Yaz_PDU_Assoc::socketNotify p=%p event = %d", this, event);
62     if (m_state == Connected)
63     {
64         m_state = Ready;
65         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
66                                          YAZ_SOCKET_OBSERVE_EXCEPT);
67         m_PDU_Observer->connectNotify();
68         flush_PDU();
69     }
70     else if (m_state == Connecting)
71     {
72         if (event & YAZ_SOCKET_OBSERVE_READ)
73         {
74             close();
75             m_PDU_Observer->failNotify();
76         }
77         else
78         {
79             m_state = Ready;
80             m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
81                                              YAZ_SOCKET_OBSERVE_EXCEPT);
82             m_PDU_Observer->connectNotify();
83             flush_PDU();
84         }
85     }
86     else if (m_state == Listen)
87     {
88         if (event & YAZ_SOCKET_OBSERVE_READ)
89         {
90             int res;
91             COMSTACK new_line;
92             
93             if ((res = cs_listen(m_cs, 0, 0)) == 1)
94                 return;
95             if (res < 0)
96             {
97                 logf(LOG_FATAL, "cs_listen failed");
98                 return;
99             }
100             if (!(new_line = cs_accept(m_cs)))
101                 return;
102             
103             Yaz_PDU_Assoc *assoc = new Yaz_PDU_Assoc (m_socketObservable,
104                                                       new_line);
105             assoc->m_parent = this;
106             assoc->m_next = m_children;
107             m_children = assoc;
108             
109             assoc->m_PDU_Observer = m_PDU_Observer->clone(assoc);
110             assoc->m_state = Ready;
111             assoc->m_socketObservable->addObserver(cs_fileno(new_line), assoc);
112             assoc->m_socketObservable->maskObserver(assoc,
113                                                     YAZ_SOCKET_OBSERVE_READ|
114                                                     YAZ_SOCKET_OBSERVE_EXCEPT);
115             if (m_idleTime)
116                 assoc->m_socketObservable->timeoutObserver(assoc, m_idleTime);
117         }
118     }
119     else if (m_state == Ready)
120     {
121         if (event & YAZ_SOCKET_OBSERVE_WRITE)
122         {
123             flush_PDU();
124         }
125         if (event & YAZ_SOCKET_OBSERVE_READ)
126         {
127             do
128             {
129                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
130                 if (res == 1)
131                     return;
132                 else if (res <= 0)
133                 {
134                     logf (LOG_LOG, "Connection closed by client");
135                     close();
136                     m_PDU_Observer->failNotify();
137                     return;
138                 }
139                 // lock it, so we know if recv_PDU deletes it.
140                 int destroyed = 0;
141                 m_destroyed = &destroyed;
142
143                 m_PDU_Observer->recv_PDU(m_input_buf, res);
144                 if (destroyed)   // it really was destroyed, return now.
145                     return;
146             } while (m_cs && cs_more (m_cs));
147         }
148         if (event & YAZ_SOCKET_OBSERVE_TIMEOUT)
149         {
150             m_PDU_Observer->timeoutNotify();
151         }
152     }
153 }
154
155 void Yaz_PDU_Assoc::close()
156 {
157     m_socketObservable->deleteObserver(this);
158     m_state = Closed;
159     if (m_cs)
160     {
161         logf (LOG_LOG, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs));
162         cs_close (m_cs);
163     }
164     m_cs = 0;
165     while (m_queue_out)
166     {
167         PDU_Queue *q_this = m_queue_out;
168         m_queue_out = m_queue_out->m_next;
169         delete q_this;
170     }
171 //   free (m_input_buf);
172     m_input_buf = 0;
173     m_input_len = 0;
174 }
175
176 void Yaz_PDU_Assoc::destroy()
177 {
178     close();
179     if (m_destroyed)
180         *m_destroyed = 1;
181     Yaz_PDU_Assoc **c;
182
183     // delete from parent's child list (if any)
184     if (m_parent)
185     {
186         c = &m_parent->m_children;
187         while (*c != this)
188         {
189             assert (*c);
190             c = &(*c)->m_next;
191         }
192         *c = (*c)->m_next;
193     }
194     // delete all children ...
195     c = &m_children;
196     while (*c)
197     {
198         Yaz_PDU_Assoc *here = *c;
199         *c = (*c)->m_next;
200         here->m_parent = 0;
201         delete here;
202     }
203 }
204
205 Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
206 {
207     m_buf = (char *) malloc (len);
208     memcpy (m_buf, buf, len);
209     m_len = len;
210     m_next = 0;
211 }
212
213 Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue()
214 {
215     free (m_buf);
216 }
217
218 int Yaz_PDU_Assoc::flush_PDU()
219 {
220     int r;
221     
222     logf (LOG_LOG, "Yaz_PDU_Assoc::flush_PDU");
223     if (m_state != Ready)
224     {
225         return 1;
226     }
227     PDU_Queue *q = m_queue_out;
228     if (!q)
229     {
230         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
231                                          YAZ_SOCKET_OBSERVE_EXCEPT);
232         return 0;
233     }
234     r = cs_put (m_cs, q->m_buf, q->m_len);
235     if (r < 0)
236     {
237         close();
238         m_PDU_Observer->failNotify();
239         return r;
240     }
241     if (r == 1)
242     {
243         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
244                                          YAZ_SOCKET_OBSERVE_EXCEPT|
245                                          YAZ_SOCKET_OBSERVE_WRITE);
246         logf (LOG_LOG, "flush_PDU put %d bytes (incomplete write)", q->m_len);
247         return r;
248     }
249     logf (LOG_LOG, "flush_PDU put %d bytes fd=%d", q->m_len, cs_fileno(m_cs));
250     // whole packet sent... delete this and proceed to next ...
251     m_queue_out = q->m_next;
252     delete q;
253     // don't select on write if queue is empty ...
254     if (!m_queue_out)
255         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
256                                          YAZ_SOCKET_OBSERVE_EXCEPT);
257     return r;
258 }
259
260 int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
261 {
262     logf (LOG_LOG, "Yaz_PDU_Assoc::send_PDU");
263     PDU_Queue **pq = &m_queue_out;
264     int is_idle = (*pq ? 0 : 1);
265     
266     if (!m_cs)
267     {
268         logf (LOG_LOG, "send_PDU failed, m_cs == 0");
269         return 0;
270     }
271     while (*pq)
272         pq = &(*pq)->m_next;
273     *pq = new PDU_Queue(buf, len);
274     if (is_idle)
275         return flush_PDU ();
276     else
277         logf (LOG_LOG, "cannot send_PDU fd=%d", cs_fileno(m_cs));
278     return 0;
279 }
280
281 COMSTACK Yaz_PDU_Assoc::comstack()
282 {
283     if (!m_cs)
284     {
285         CS_TYPE cs_type = tcpip_type;
286         int protocol = PROTO_Z3950;
287         m_cs = cs_create (cs_type, 0, protocol);
288     }
289     return m_cs;
290 }
291
292 void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
293                            const char *addr)
294 {
295     close();
296     void *ap;
297     COMSTACK cs = comstack();
298
299     logf (LOG_LOG, "Yaz_PDU_Assoc::listen %s", addr);
300     m_PDU_Observer = observer;
301     if (!cs)
302         return;
303     ap = cs_straddr (cs, addr);
304     if (!ap)
305         return;
306     if (cs_bind(cs, ap, CS_SERVER) < 0)
307         return;
308     m_socketObservable->addObserver(cs_fileno(cs), this);
309     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
310                                      YAZ_SOCKET_OBSERVE_EXCEPT);
311     m_state = Listen;
312 }
313
314 void Yaz_PDU_Assoc::idleTime(int idleTime)
315 {
316     m_idleTime = idleTime;
317 }
318
319 void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
320                             const char *addr)
321 {
322     logf (LOG_LOG, "Yaz_PDU_Assoc::connect %s", addr);
323     close();
324     m_PDU_Observer = observer;
325     COMSTACK cs = comstack();
326     void *ap = cs_straddr (cs, addr);
327     if (!ap)
328     {
329         logf (LOG_LOG, "cs_straddr failed");
330         return;
331     }
332     int res = cs_connect (cs, ap);
333     if (res < 0)
334     {
335         logf (LOG_DEBUG, "Yaz_PDU_Assoc::connect failed");
336         close ();
337     }
338     else
339     {
340         logf (LOG_LOG, "Yaz_PDU_Assoc::connect fd=%d", cs_fileno(cs));
341         m_socketObservable->addObserver(cs_fileno(cs), this);
342         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
343                                          YAZ_SOCKET_OBSERVE_EXCEPT|
344                                          YAZ_SOCKET_OBSERVE_WRITE);
345         if (res == 1)
346         {
347             logf (LOG_LOG, "Connect pending");
348             m_state = Connecting;
349         }
350         else
351         {
352             logf (LOG_LOG, "Connect complete");
353             m_state = Connected;
354         }
355     }
356 }