First WIN32 port of YAZ++.
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
1 /*
2  * Copyright (c) 1998-1999, Index Data.
3  * See the file LICENSE for details.
4  * Sebastian Hammer, Adam Dickmeiss
5  * 
6  * $Log: yaz-pdu-assoc.cpp,v $
7  * Revision 1.3  1999-02-02 14:01:20  adam
8  * First WIN32 port of YAZ++.
9  *
10  * Revision 1.2  1999/01/28 13:08:44  adam
11  * Yaz_PDU_Assoc better encapsulated. Memory leak fix in
12  * yaz-socket-manager.cc.
13  *
14  * Revision 1.1.1.1  1999/01/28 09:41:07  adam
15  * First implementation of YAZ++.
16  *
17  */
18
19 #include <assert.h>
20
21 #include <yaz-pdu-assoc.h>
22
23 #include <log.h>
24 #include <tcpip.h>
25
26 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable,
27                              COMSTACK cs)
28 {
29     m_state = Closed;
30     m_cs = cs;
31     m_socketObservable = socketObservable;
32     m_PDU_Observer = 0;
33     m_queue_out = 0;
34     m_input_buf = 0;
35     m_input_len = 0;
36     m_children = 0;
37     m_parent = 0;
38     m_next = 0;
39     m_destroyed = 0;
40 }
41
42 IYaz_PDU_Observable *Yaz_PDU_Assoc::clone()
43 {
44     Yaz_PDU_Assoc *copy = new Yaz_PDU_Assoc(m_socketObservable, 0);
45     return copy;
46 }
47
48 Yaz_PDU_Assoc::~Yaz_PDU_Assoc()
49 {
50     destroy();
51 }
52
53 void Yaz_PDU_Assoc::socketNotify(int event)
54 {
55     logf (LOG_LOG, "socketNotify p=%p event = %d", this, event);
56     if (m_state == Connected)
57     {
58         m_state = Ready;
59         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
60                                          YAZ_SOCKET_OBSERVE_EXCEPT);
61         m_PDU_Observer->connectNotify();
62         flush_PDU();
63     }
64     else if (m_state == Connecting)
65     {
66         if (event & YAZ_SOCKET_OBSERVE_READ)
67         {
68             close();
69             m_PDU_Observer->failNotify();
70         }
71         else
72         {
73             m_state = Ready;
74             m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
75                                              YAZ_SOCKET_OBSERVE_EXCEPT);
76             m_PDU_Observer->connectNotify();
77             flush_PDU();
78         }
79     }
80     else if (m_state == Listen)
81     {
82         if (event & YAZ_SOCKET_OBSERVE_READ)
83         {
84             int res;
85             COMSTACK new_line;
86             
87             if ((res = cs_listen(m_cs, 0, 0)) == 1)
88                 return;
89             if (res < 0)
90             {
91                 logf(LOG_FATAL, "cs_listen failed");
92                 return;
93             }
94             if (!(new_line = cs_accept(m_cs)))
95                 return;
96             
97             Yaz_PDU_Assoc *assoc = new Yaz_PDU_Assoc (m_socketObservable,
98                                                       new_line);
99             assoc->m_parent = this;
100             assoc->m_next = m_children;
101             m_children = assoc;
102             
103             assoc->m_PDU_Observer = m_PDU_Observer->clone(assoc);
104             assoc->m_state = Ready;
105             assoc->m_socketObservable->addObserver(cs_fileno(new_line), assoc);
106             assoc->m_socketObservable->maskObserver(assoc,
107                                                     YAZ_SOCKET_OBSERVE_READ|
108                                                     YAZ_SOCKET_OBSERVE_EXCEPT);
109         }
110     }
111     else if (m_state == Ready)
112     {
113         if (event & YAZ_SOCKET_OBSERVE_WRITE)
114         {
115             flush_PDU();
116         }
117         if (event & YAZ_SOCKET_OBSERVE_READ)
118         {
119             do
120             {
121                 int res = cs_get (m_cs, &m_input_buf, &m_input_len);
122                 if (res == 1)
123                     return;
124                 else if (res <= 0)
125                 {
126                     logf (LOG_LOG, "Connection closed by client");
127                     close();
128                     m_PDU_Observer->failNotify();
129                     return;
130                 }
131                 // lock it, so we know if recv_PDU deletes it.
132                 int destroyed = 0;
133                 m_destroyed = &destroyed;
134
135                 m_PDU_Observer->recv_PDU(m_input_buf, res);
136                 if (destroyed)   // it really was destroyed, return now.
137                     return;
138             } while (m_cs && cs_more (m_cs));
139         }
140     }
141 }
142
143 void Yaz_PDU_Assoc::close()
144 {
145     m_socketObservable->deleteObserver(this);
146     m_state = Closed;
147     if (m_cs)
148     {
149         logf (LOG_LOG, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs));
150         cs_close (m_cs);
151     }
152     m_cs = 0;
153     while (m_queue_out)
154     {
155         PDU_Queue *q_this = m_queue_out;
156         m_queue_out = m_queue_out->m_next;
157         delete q_this;
158     }
159 //   free (m_input_buf);
160     m_input_buf = 0;
161     m_input_len = 0;
162 }
163
164 void Yaz_PDU_Assoc::destroy()
165 {
166     close();
167     if (m_destroyed)
168         *m_destroyed = 1;
169     Yaz_PDU_Assoc **c;
170
171     // delete from parent's child list (if any)
172     if (m_parent)
173     {
174         c = &m_parent->m_children;
175         while (*c != this)
176         {
177             assert (*c);
178             c = &(*c)->m_next;
179         }
180         *c = (*c)->m_next;
181     }
182     // delete all children ...
183     c = &m_children;
184     while (*c)
185     {
186         Yaz_PDU_Assoc *here = *c;
187         *c = (*c)->m_next;
188         here->m_parent = 0;
189         delete here;
190     }
191 }
192
193 Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
194 {
195     m_buf = (char *) malloc (len);
196     memcpy (m_buf, buf, len);
197     m_len = len;
198     m_next = 0;
199 }
200
201 Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue()
202 {
203     free (m_buf);
204 }
205
206 int Yaz_PDU_Assoc::flush_PDU()
207 {
208     int r;
209
210     if (m_state != Ready)
211         return 1;
212     PDU_Queue *q = m_queue_out;
213     if (!q)
214     {
215         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
216                                          YAZ_SOCKET_OBSERVE_EXCEPT);
217         return 0;
218     }
219     r = cs_put (m_cs, q->m_buf, q->m_len);
220     if (r < 0)
221     {
222         close();
223         m_PDU_Observer->failNotify();
224         return r;
225     }
226     if (r == 1)
227     {
228         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
229                                          YAZ_SOCKET_OBSERVE_EXCEPT|
230                                          YAZ_SOCKET_OBSERVE_WRITE);
231         logf (LOG_LOG, "put %d bytes (incomplete write)", q->m_len);
232         return r;
233     }
234     logf (LOG_LOG, "put %d bytes fd=%d", q->m_len, cs_fileno(m_cs));
235     // whole packet sent... delete this and proceed to next ...
236     m_queue_out = q->m_next;
237     logf (LOG_LOG, "m_queue_out = %p", m_queue_out);
238     delete q;
239     // don't select on write if queue is empty ...
240     if (!m_queue_out)
241         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
242                                          YAZ_SOCKET_OBSERVE_EXCEPT);
243     return r;
244 }
245
246 int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
247 {
248     logf (LOG_LOG, "send_PDU");
249     PDU_Queue **pq = &m_queue_out;
250     int is_idle = (*pq ? 0 : 1);
251     
252     if (!m_cs)
253     {
254         logf (LOG_LOG, "send_PDU failed, m_cs == 0");
255         return 0;
256     }
257     while (*pq)
258         pq = &(*pq)->m_next;
259     *pq = new PDU_Queue(buf, len);
260     if (is_idle)
261     {
262         return flush_PDU ();
263     }
264     else
265     {
266         logf (LOG_LOG, "cannot send_PDU fd=%d", cs_fileno(m_cs));
267     }
268     return 0;
269 }
270
271 COMSTACK Yaz_PDU_Assoc::comstack()
272 {
273     if (!m_cs)
274     {
275         CS_TYPE cs_type = tcpip_type;
276         int protocol = PROTO_Z3950;
277         m_cs = cs_create (cs_type, 0, protocol);
278     }
279     return m_cs;
280 }
281
282 void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
283                            const char *addr)
284 {
285     close();
286     void *ap;
287     COMSTACK cs = comstack();
288
289     logf (LOG_LOG, "Yaz_PDU_Assoc::listen %s", addr);
290     m_PDU_Observer = observer;
291     if (!cs)
292         return;
293     ap = cs_straddr (cs, addr);
294     if (!ap)
295         return;
296     if (cs_bind(cs, ap, CS_SERVER) < 0)
297         return;
298     m_socketObservable->addObserver(cs_fileno(cs), this);
299     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
300                                      YAZ_SOCKET_OBSERVE_EXCEPT);
301     m_state = Listen;
302 }
303
304 void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
305                             const char *addr)
306 {
307     logf (LOG_LOG, "Yaz_PDU_Assoc::connect %s", addr);
308     close();
309     m_PDU_Observer = observer;
310     COMSTACK cs = comstack();
311     void *ap = cs_straddr (cs, addr);
312     if (!ap)
313     {
314         logf (LOG_LOG, "cs_straddr failed");
315         return;
316     }
317     int res = cs_connect (cs, ap);
318     if (res < 0)
319     {
320         logf (LOG_DEBUG, "Yaz_PDU_Assoc::connect failed");
321         close ();
322     }
323     else
324     {
325         logf (LOG_LOG, "Yaz_PDU_Assoc::connect fd=%d", cs_fileno(cs));
326         m_socketObservable->addObserver(cs_fileno(cs), this);
327         m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
328                                          YAZ_SOCKET_OBSERVE_EXCEPT|
329                                          YAZ_SOCKET_OBSERVE_WRITE);
330         if (res == 1)
331             m_state = Connecting;
332         else
333             m_state = Connected;
334     }
335 }