1 /* This file is part of the yazpp toolkit.
2 * Copyright (C) 1998-2012 Index Data and Mike Taylor
3 * See the file LICENSE for details.
12 #include <yaz/tcpip.h>
14 #include <yazpp/pdu-assoc.h>
20 using namespace yazpp_1;
23 class PDU_Assoc_priv {
24 friend class PDU_Assoc;
36 PDU_Queue(const char *buf, int len);
42 PDU_Assoc *pdu_parent;
43 PDU_Assoc *pdu_children;
46 yazpp_1::ISocketObservable *m_socketObservable;
54 void init(yazpp_1::ISocketObservable *socketObservable);
55 bool m_session_is_dead;
59 void PDU_Assoc_priv::init(ISocketObservable *socketObservable)
63 m_socketObservable = socketObservable;
74 m_session_is_dead = false;
77 PDU_Assoc::~PDU_Assoc()
82 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
85 m_p = new PDU_Assoc_priv;
86 m_p->init(socketObservable);
89 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
93 m_p = new PDU_Assoc_priv;
94 m_p->init(socketObservable);
97 if (cs->io_pending & CS_WANT_WRITE)
98 mask |= SOCKET_OBSERVE_WRITE;
99 if (cs->io_pending & CS_WANT_READ)
100 mask |= SOCKET_OBSERVE_READ;
101 m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
104 yaz_log(m_p->log, "new PDU_Assoc. Ready");
105 m_p->state = PDU_Assoc_priv::Ready;
110 yaz_log(m_p->log, "new PDU_Assoc. Accepting");
111 // assume comstack is accepting...
112 m_p->state = PDU_Assoc_priv::Accepting;
113 m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
114 yaz_log(m_p->log, "maskObserver 1");
115 m_p->m_socketObservable->maskObserver(this,
116 mask|SOCKET_OBSERVE_EXCEPT);
121 IPDU_Observable *PDU_Assoc::clone()
123 PDU_Assoc *copy = new PDU_Assoc(m_p->m_socketObservable);
127 void PDU_Assoc::socketNotify(int event)
129 yaz_log(m_p->log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
130 this, m_p->state, event);
131 if (event & SOCKET_OBSERVE_EXCEPT)
134 m_PDU_Observer->failNotify();
137 else if (event & SOCKET_OBSERVE_TIMEOUT)
139 m_PDU_Observer->timeoutNotify();
144 case PDU_Assoc_priv::Accepting:
145 if (!cs_accept(m_p->cs))
147 yaz_log(m_p->log, "PDU_Assoc::cs_accept failed");
150 m_PDU_Observer->failNotify();
155 if (m_p->cs->io_pending & CS_WANT_WRITE)
156 mask |= SOCKET_OBSERVE_WRITE;
157 if (m_p->cs->io_pending & CS_WANT_READ)
158 mask |= SOCKET_OBSERVE_READ;
160 { // accept is complete. turn to ready state and write if needed
161 m_p->state = PDU_Assoc_priv::Ready;
165 { // accept still incomplete.
166 yaz_log(m_p->log, "maskObserver 2");
167 m_p->m_socketObservable->maskObserver(this,
168 mask|SOCKET_OBSERVE_EXCEPT);
172 case PDU_Assoc_priv::Connecting:
173 if (event & SOCKET_OBSERVE_READ &&
174 event & SOCKET_OBSERVE_WRITE)
176 // For Unix: if both read and write is set, then connect failed.
178 m_PDU_Observer->failNotify();
182 yaz_log(m_p->log, "cs_rcvconnect");
183 int res = cs_rcvconnect(m_p->cs);
186 unsigned mask = SOCKET_OBSERVE_EXCEPT;
187 if (m_p->cs->io_pending & CS_WANT_WRITE)
188 mask |= SOCKET_OBSERVE_WRITE;
189 if (m_p->cs->io_pending & CS_WANT_READ)
190 mask |= SOCKET_OBSERVE_READ;
191 yaz_log(m_p->log, "maskObserver 3");
192 m_p->m_socketObservable->maskObserver(this, mask);
196 m_p->state = PDU_Assoc_priv::Ready;
198 m_PDU_Observer->connectNotify();
203 case PDU_Assoc_priv::Listen:
204 if (event & SOCKET_OBSERVE_READ)
209 if ((res = cs_listen(m_p->cs, 0, 0)) == 1)
213 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
216 if (!(new_line = cs_accept(m_p->cs)))
218 /* 1. create socket-manager
220 3. create top-level object
221 setup observer for child fileid in pdu-assoc
224 yaz_log(m_p->log, "new session: parent fd=%d child fd=%d",
225 cs_fileno(m_p->cs), cs_fileno(new_line));
226 childNotify(new_line);
229 case PDU_Assoc_priv::Writing:
230 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
233 case PDU_Assoc_priv::Ready:
234 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
238 int res = cs_get(m_p->cs, &m_p->input_buf, &m_p->input_len);
241 unsigned mask = SOCKET_OBSERVE_EXCEPT;
242 if (m_p->cs->io_pending & CS_WANT_WRITE)
243 mask |= SOCKET_OBSERVE_WRITE;
244 if (m_p->cs->io_pending & CS_WANT_READ)
245 mask |= SOCKET_OBSERVE_READ;
246 yaz_log(m_p->log, "maskObserver 4");
247 m_p->m_socketObservable->maskObserver(this, mask);
252 yaz_log(m_p->log, "PDU_Assoc::Connection closed by peer");
255 m_PDU_Observer->failNotify(); // problem here..
258 // lock it, so we know if recv_PDU deletes it.
260 m_p->destroyed = &destroyed;
265 PDU_Assoc_priv::PDU_Queue **pq = &m_p->m_queue_in;
269 *pq = new PDU_Assoc_priv::PDU_Queue(m_p->m_input_buf, res);
271 m_PDU_Observer->recv_PDU(m_p->input_buf, res);
273 if (destroyed) // it really was destroyed, return now.
276 } while (m_p->cs && cs_more(m_p->cs));
277 if (m_p->cs && m_p->state == PDU_Assoc_priv::Ready)
279 yaz_log(m_p->log, "maskObserver 5");
280 m_p->m_socketObservable->maskObserver(this,
281 SOCKET_OBSERVE_EXCEPT|
282 SOCKET_OBSERVE_READ);
286 case PDU_Assoc_priv::Closed:
287 yaz_log(m_p->log, "CLOSING state=%d event was %d", m_p->state,
290 m_PDU_Observer->failNotify();
293 yaz_log(m_p->log, "Unknown state=%d event was %d", m_p->state, event);
295 m_PDU_Observer->failNotify();
299 void PDU_Assoc::close_session()
301 m_p->m_session_is_dead = true;
305 m_PDU_Observer->failNotify();
309 void PDU_Assoc::shutdown()
312 for (ch = m_p->pdu_children; ch; ch = ch->m_p->pdu_next)
315 m_p->m_socketObservable->deleteObserver(this);
316 m_p->state = PDU_Assoc_priv::Closed;
319 yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
323 while (m_p->queue_out)
325 PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
326 m_p->queue_out = m_p->queue_out->m_next;
329 xfree(m_p->input_buf);
334 void PDU_Assoc::destroy()
342 // delete from parent's child list (if any)
345 c = &m_p->pdu_parent->m_p->pdu_children;
349 c = &(*c)->m_p->pdu_next;
351 *c = (*c)->m_p->pdu_next;
353 // delete all children ...
354 c = &m_p->pdu_children;
357 PDU_Assoc *here = *c;
358 *c = (*c)->m_p->pdu_next;
359 here->m_p->pdu_parent = 0;
362 yaz_log(m_p->log, "PDU_Assoc::destroy this=%p", this);
365 PDU_Assoc_priv::PDU_Queue::PDU_Queue(const char *buf, int len)
367 m_buf = (char *) xmalloc(len);
368 memcpy(m_buf, buf, len);
373 PDU_Assoc_priv::PDU_Queue::~PDU_Queue()
378 int PDU_Assoc::flush_PDU()
382 if (m_p->state != PDU_Assoc_priv::Ready && m_p->state != PDU_Assoc_priv::Writing)
384 yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU, not ready");
387 PDU_Assoc_priv::PDU_Queue *q = m_p->queue_out;
390 m_p->state = PDU_Assoc_priv::Ready;
391 yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU queue empty");
392 yaz_log(m_p->log, "maskObserver 6");
393 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
394 SOCKET_OBSERVE_WRITE|
395 SOCKET_OBSERVE_EXCEPT);
396 if (m_p->m_session_is_dead)
399 m_PDU_Observer->failNotify();
403 r = cs_put(m_p->cs, q->m_buf, q->m_len);
406 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put failed");
408 m_PDU_Observer->failNotify();
413 unsigned mask = SOCKET_OBSERVE_EXCEPT;
414 m_p->state = PDU_Assoc_priv::Writing;
415 if (m_p->cs->io_pending & CS_WANT_WRITE)
416 mask |= SOCKET_OBSERVE_WRITE;
417 if (m_p->cs->io_pending & CS_WANT_READ)
418 mask |= SOCKET_OBSERVE_READ;
420 mask |= SOCKET_OBSERVE_WRITE;
421 yaz_log(m_p->log, "maskObserver 7");
422 m_p->m_socketObservable->maskObserver(this, mask);
423 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
424 q->m_len, cs_fileno(m_p->cs));
427 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
428 // whole packet sent... delete this and proceed to next ...
429 m_p->queue_out = q->m_next;
431 // don't select on write if queue is empty ...
434 m_p->state = PDU_Assoc_priv::Ready;
435 yaz_log(m_p->log, "maskObserver 8");
436 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
437 SOCKET_OBSERVE_EXCEPT);
438 if (m_p->m_session_is_dead)
444 int PDU_Assoc::send_PDU(const char *buf, int len)
446 yaz_log(m_p->log, "PDU_Assoc::send_PDU");
447 PDU_Assoc_priv::PDU_Queue **pq = &m_p->queue_out;
448 int is_idle = (*pq ? 0 : 1);
452 yaz_log(m_p->log, "PDU_Assoc::send_PDU failed, cs == 0");
457 *pq = new PDU_Assoc_priv::PDU_Queue(buf, len);
461 yaz_log(m_p->log, "PDU_Assoc::cannot send_PDU fd=%d",
466 COMSTACK PDU_Assoc::comstack(const char *type_and_host, void **vp)
468 return cs_create_host(type_and_host, 2, vp);
471 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
475 m_p->m_socketObservable->deleteObserver(this);
476 m_p->state = PDU_Assoc_priv::Closed;
479 yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
483 while (m_p->queue_out)
485 PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
486 m_p->queue_out = m_p->queue_out->m_next;
489 xfree(m_p->input_buf);
498 m_PDU_Observer = observer;
500 m_p->cs = comstack(addr, &ap);
504 if (cs_bind(m_p->cs, ap, CS_SERVER) < 0)
507 int fd = cs_fileno(m_p->cs);
509 int oldflags = fcntl(fd, F_GETFD, 0);
512 oldflags |= FD_CLOEXEC;
513 fcntl(fd, F_SETFD, oldflags);
516 m_p->m_socketObservable->addObserver(fd, this);
517 yaz_log(m_p->log, "maskObserver 9");
518 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
519 SOCKET_OBSERVE_EXCEPT);
520 yaz_log(m_p->log, "PDU_Assoc::listen ok fd=%d", fd);
521 m_p->state = PDU_Assoc_priv::Listen;
525 void PDU_Assoc::idleTime(int idleTime)
527 m_p->idleTime = idleTime;
528 yaz_log(m_p->log, "PDU_Assoc::idleTime(%d)", idleTime);
529 m_p->m_socketObservable->timeoutObserver(this, m_p->idleTime);
532 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
534 yaz_log(m_p->log, "PDU_Assoc::connect %s", addr);
536 m_PDU_Observer = observer;
538 m_p->cs = comstack(addr, &ap);
541 int res = cs_connect(m_p->cs, ap);
542 yaz_log(m_p->log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_p->cs),
544 m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
547 { // Connect complete
548 m_p->state = PDU_Assoc_priv::Connecting;
549 unsigned mask = SOCKET_OBSERVE_EXCEPT;
550 mask |= SOCKET_OBSERVE_WRITE;
551 mask |= SOCKET_OBSERVE_READ;
552 yaz_log(m_p->log, "maskObserver 11");
553 m_p->m_socketObservable->maskObserver(this, mask);
557 m_p->state = PDU_Assoc_priv::Connecting;
558 unsigned mask = SOCKET_OBSERVE_EXCEPT;
559 if (m_p->cs->io_pending & CS_WANT_WRITE)
560 mask |= SOCKET_OBSERVE_WRITE;
561 if (m_p->cs->io_pending & CS_WANT_READ)
562 mask |= SOCKET_OBSERVE_READ;
563 yaz_log(m_p->log, "maskObserver 11");
564 m_p->m_socketObservable->maskObserver(this, mask);
567 { // Connect failed immediately
568 // Since m_state is Closed we can distinguish this case from
569 // normal connect in socketNotify handler
570 yaz_log(m_p->log, "maskObserver 12");
571 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
572 SOCKET_OBSERVE_EXCEPT);
577 // Single-threaded... Only useful for non-blocking handlers
578 void PDU_Assoc::childNotify(COMSTACK cs)
580 PDU_Assoc *new_observable =
581 new PDU_Assoc(m_p->m_socketObservable, cs);
583 // Clone PDU Observer
584 new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
585 (new_observable, cs_fileno(cs));
587 if (!new_observable->m_PDU_Observer)
589 new_observable->shutdown();
590 delete new_observable;
593 new_observable->m_p->pdu_next = m_p->pdu_children;
594 m_p->pdu_children = new_observable;
595 new_observable->m_p->pdu_parent = this;
598 const char*PDU_Assoc::getpeername()
602 return cs_addrstr(m_p->cs);
607 * c-file-style: "Stroustrup"
608 * indent-tabs-mode: nil
610 * vim: shiftwidth=4 tabstop=8 expandtab