1 /* This file is part of the yazpp toolkit.
2 * Copyright (C) 1998-2012 Index Data and Mike Taylor
3 * See the file LICENSE for details.
12 #include <yaz/tcpip.h>
14 #include <yazpp/pdu-assoc.h>
20 using namespace yazpp_1;
23 class PDU_Assoc_priv {
35 PDU_Queue(const char *buf, int len);
41 PDU_Assoc *pdu_parent;
42 PDU_Assoc *pdu_children;
45 yazpp_1::ISocketObservable *m_socketObservable;
53 void init(yazpp_1::ISocketObservable *socketObservable);
54 bool m_session_is_dead;
58 void PDU_Assoc_priv::init(ISocketObservable *socketObservable)
62 m_socketObservable = socketObservable;
73 m_session_is_dead = false;
76 PDU_Assoc::~PDU_Assoc()
81 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
84 m_p = new PDU_Assoc_priv;
85 m_p->init(socketObservable);
88 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
92 m_p = new PDU_Assoc_priv;
93 m_p->init(socketObservable);
96 if (cs->io_pending & CS_WANT_WRITE)
97 mask |= SOCKET_OBSERVE_WRITE;
98 if (cs->io_pending & CS_WANT_READ)
99 mask |= SOCKET_OBSERVE_READ;
100 m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
103 yaz_log(m_p->log, "new PDU_Assoc. Ready");
104 m_p->state = PDU_Assoc_priv::Ready;
109 yaz_log(m_p->log, "new PDU_Assoc. Accepting");
110 // assume comstack is accepting...
111 m_p->state = PDU_Assoc_priv::Accepting;
112 m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
113 yaz_log(m_p->log, "maskObserver 1");
114 m_p->m_socketObservable->maskObserver(this,
115 mask|SOCKET_OBSERVE_EXCEPT);
120 IPDU_Observable *PDU_Assoc::clone()
122 PDU_Assoc *copy = new PDU_Assoc(m_p->m_socketObservable);
126 void PDU_Assoc::socketNotify(int event)
128 yaz_log(m_p->log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
129 this, m_p->state, event);
130 if (event & SOCKET_OBSERVE_EXCEPT)
133 m_PDU_Observer->failNotify();
136 else if (event & SOCKET_OBSERVE_TIMEOUT)
138 m_PDU_Observer->timeoutNotify();
143 case PDU_Assoc_priv::Accepting:
144 if (!cs_accept(m_p->cs))
146 yaz_log(m_p->log, "PDU_Assoc::cs_accept failed");
149 m_PDU_Observer->failNotify();
154 if (m_p->cs->io_pending & CS_WANT_WRITE)
155 mask |= SOCKET_OBSERVE_WRITE;
156 if (m_p->cs->io_pending & CS_WANT_READ)
157 mask |= SOCKET_OBSERVE_READ;
159 { // accept is complete. turn to ready state and write if needed
160 m_p->state = PDU_Assoc_priv::Ready;
164 { // accept still incomplete.
165 yaz_log(m_p->log, "maskObserver 2");
166 m_p->m_socketObservable->maskObserver(this,
167 mask|SOCKET_OBSERVE_EXCEPT);
171 case PDU_Assoc_priv::Connecting:
172 if (event & SOCKET_OBSERVE_READ &&
173 event & SOCKET_OBSERVE_WRITE)
175 // For Unix: if both read and write is set, then connect failed.
177 m_PDU_Observer->failNotify();
181 yaz_log(m_p->log, "cs_rcvconnect");
182 int res = cs_rcvconnect(m_p->cs);
185 unsigned mask = SOCKET_OBSERVE_EXCEPT;
186 if (m_p->cs->io_pending & CS_WANT_WRITE)
187 mask |= SOCKET_OBSERVE_WRITE;
188 if (m_p->cs->io_pending & CS_WANT_READ)
189 mask |= SOCKET_OBSERVE_READ;
190 yaz_log(m_p->log, "maskObserver 3");
191 m_p->m_socketObservable->maskObserver(this, mask);
195 m_p->state = PDU_Assoc_priv::Ready;
197 m_PDU_Observer->connectNotify();
202 case PDU_Assoc_priv::Listen:
203 if (event & SOCKET_OBSERVE_READ)
208 if ((res = cs_listen(m_p->cs, 0, 0)) == 1)
212 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
215 if (!(new_line = cs_accept(m_p->cs)))
217 /* 1. create socket-manager
219 3. create top-level object
220 setup observer for child fileid in pdu-assoc
223 yaz_log(m_p->log, "new session: parent fd=%d child fd=%d",
224 cs_fileno(m_p->cs), cs_fileno(new_line));
225 childNotify(new_line);
228 case PDU_Assoc_priv::Writing:
229 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
232 case PDU_Assoc_priv::Ready:
233 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
237 int res = cs_get(m_p->cs, &m_p->input_buf, &m_p->input_len);
240 unsigned mask = SOCKET_OBSERVE_EXCEPT;
241 if (m_p->cs->io_pending & CS_WANT_WRITE)
242 mask |= SOCKET_OBSERVE_WRITE;
243 if (m_p->cs->io_pending & CS_WANT_READ)
244 mask |= SOCKET_OBSERVE_READ;
245 yaz_log(m_p->log, "maskObserver 4");
246 m_p->m_socketObservable->maskObserver(this, mask);
251 yaz_log(m_p->log, "PDU_Assoc::Connection closed by peer");
254 m_PDU_Observer->failNotify(); // problem here..
257 // lock it, so we know if recv_PDU deletes it.
259 m_p->destroyed = &destroyed;
264 PDU_Assoc_priv::PDU_Queue **pq = &m_p->m_queue_in;
268 *pq = new PDU_Assoc_priv::PDU_Queue(m_p->m_input_buf, res);
270 m_PDU_Observer->recv_PDU(m_p->input_buf, res);
272 if (destroyed) // it really was destroyed, return now.
275 } while (m_p->cs && cs_more(m_p->cs));
276 if (m_p->cs && m_p->state == PDU_Assoc_priv::Ready)
278 yaz_log(m_p->log, "maskObserver 5");
279 m_p->m_socketObservable->maskObserver(this,
280 SOCKET_OBSERVE_EXCEPT|
281 SOCKET_OBSERVE_READ);
285 case PDU_Assoc_priv::Closed:
286 yaz_log(m_p->log, "CLOSING state=%d event was %d", m_p->state,
289 m_PDU_Observer->failNotify();
292 yaz_log(m_p->log, "Unknown state=%d event was %d", m_p->state, event);
294 m_PDU_Observer->failNotify();
298 void PDU_Assoc::close_session()
300 m_p->m_session_is_dead = true;
304 m_PDU_Observer->failNotify();
308 void PDU_Assoc::shutdown()
311 for (ch = m_p->pdu_children; ch; ch = ch->m_p->pdu_next)
314 m_p->m_socketObservable->deleteObserver(this);
315 m_p->state = PDU_Assoc_priv::Closed;
318 yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
322 while (m_p->queue_out)
324 PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
325 m_p->queue_out = m_p->queue_out->m_next;
328 xfree(m_p->input_buf);
333 void PDU_Assoc::destroy()
341 // delete from parent's child list (if any)
344 c = &m_p->pdu_parent->m_p->pdu_children;
348 c = &(*c)->m_p->pdu_next;
350 *c = (*c)->m_p->pdu_next;
352 // delete all children ...
353 c = &m_p->pdu_children;
356 PDU_Assoc *here = *c;
357 *c = (*c)->m_p->pdu_next;
358 here->m_p->pdu_parent = 0;
361 yaz_log(m_p->log, "PDU_Assoc::destroy this=%p", this);
364 PDU_Assoc_priv::PDU_Queue::PDU_Queue(const char *buf, int len)
366 m_buf = (char *) xmalloc(len);
367 memcpy(m_buf, buf, len);
372 PDU_Assoc_priv::PDU_Queue::~PDU_Queue()
377 int PDU_Assoc::flush_PDU()
381 if (m_p->state != PDU_Assoc_priv::Ready && m_p->state != PDU_Assoc_priv::Writing)
383 yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU, not ready");
386 PDU_Assoc_priv::PDU_Queue *q = m_p->queue_out;
389 m_p->state = PDU_Assoc_priv::Ready;
390 yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU queue empty");
391 yaz_log(m_p->log, "maskObserver 6");
392 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
393 SOCKET_OBSERVE_WRITE|
394 SOCKET_OBSERVE_EXCEPT);
395 if (m_p->m_session_is_dead)
398 m_PDU_Observer->failNotify();
402 r = cs_put(m_p->cs, q->m_buf, q->m_len);
405 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put failed");
407 m_PDU_Observer->failNotify();
412 unsigned mask = SOCKET_OBSERVE_EXCEPT;
413 m_p->state = PDU_Assoc_priv::Writing;
414 if (m_p->cs->io_pending & CS_WANT_WRITE)
415 mask |= SOCKET_OBSERVE_WRITE;
416 if (m_p->cs->io_pending & CS_WANT_READ)
417 mask |= SOCKET_OBSERVE_READ;
419 mask |= SOCKET_OBSERVE_WRITE;
420 yaz_log(m_p->log, "maskObserver 7");
421 m_p->m_socketObservable->maskObserver(this, mask);
422 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
423 q->m_len, cs_fileno(m_p->cs));
426 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
427 // whole packet sent... delete this and proceed to next ...
428 m_p->queue_out = q->m_next;
430 // don't select on write if queue is empty ...
433 m_p->state = PDU_Assoc_priv::Ready;
434 yaz_log(m_p->log, "maskObserver 8");
435 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
436 SOCKET_OBSERVE_EXCEPT);
437 if (m_p->m_session_is_dead)
443 int PDU_Assoc::send_PDU(const char *buf, int len)
445 yaz_log(m_p->log, "PDU_Assoc::send_PDU");
446 PDU_Assoc_priv::PDU_Queue **pq = &m_p->queue_out;
447 int is_idle = (*pq ? 0 : 1);
451 yaz_log(m_p->log, "PDU_Assoc::send_PDU failed, cs == 0");
456 *pq = new PDU_Assoc_priv::PDU_Queue(buf, len);
460 yaz_log(m_p->log, "PDU_Assoc::cannot send_PDU fd=%d",
465 COMSTACK PDU_Assoc::comstack(const char *type_and_host, void **vp)
467 return cs_create_host(type_and_host, 2, vp);
470 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
474 m_p->m_socketObservable->deleteObserver(this);
475 m_p->state = PDU_Assoc_priv::Closed;
478 yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
482 while (m_p->queue_out)
484 PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
485 m_p->queue_out = m_p->queue_out->m_next;
488 xfree(m_p->input_buf);
497 m_PDU_Observer = observer;
499 m_p->cs = comstack(addr, &ap);
503 if (cs_bind(m_p->cs, ap, CS_SERVER) < 0)
506 int fd = cs_fileno(m_p->cs);
508 int oldflags = fcntl(fd, F_GETFD, 0);
511 oldflags |= FD_CLOEXEC;
512 fcntl(fd, F_SETFD, oldflags);
515 m_p->m_socketObservable->addObserver(fd, this);
516 yaz_log(m_p->log, "maskObserver 9");
517 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
518 SOCKET_OBSERVE_EXCEPT);
519 yaz_log(m_p->log, "PDU_Assoc::listen ok fd=%d", fd);
520 m_p->state = PDU_Assoc_priv::Listen;
524 void PDU_Assoc::idleTime(int idleTime)
526 m_p->idleTime = idleTime;
527 yaz_log(m_p->log, "PDU_Assoc::idleTime(%d)", idleTime);
528 m_p->m_socketObservable->timeoutObserver(this, m_p->idleTime);
531 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
533 yaz_log(m_p->log, "PDU_Assoc::connect %s", addr);
535 m_PDU_Observer = observer;
537 m_p->cs = comstack(addr, &ap);
540 int res = cs_connect(m_p->cs, ap);
541 yaz_log(m_p->log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_p->cs),
543 m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
546 { // Connect complete
547 m_p->state = PDU_Assoc_priv::Connecting;
548 unsigned mask = SOCKET_OBSERVE_EXCEPT;
549 mask |= SOCKET_OBSERVE_WRITE;
550 mask |= SOCKET_OBSERVE_READ;
551 yaz_log(m_p->log, "maskObserver 11");
552 m_p->m_socketObservable->maskObserver(this, mask);
556 m_p->state = PDU_Assoc_priv::Connecting;
557 unsigned mask = SOCKET_OBSERVE_EXCEPT;
558 if (m_p->cs->io_pending & CS_WANT_WRITE)
559 mask |= SOCKET_OBSERVE_WRITE;
560 if (m_p->cs->io_pending & CS_WANT_READ)
561 mask |= SOCKET_OBSERVE_READ;
562 yaz_log(m_p->log, "maskObserver 11");
563 m_p->m_socketObservable->maskObserver(this, mask);
566 { // Connect failed immediately
567 // Since m_state is Closed we can distinguish this case from
568 // normal connect in socketNotify handler
569 yaz_log(m_p->log, "maskObserver 12");
570 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
571 SOCKET_OBSERVE_EXCEPT);
576 // Single-threaded... Only useful for non-blocking handlers
577 void PDU_Assoc::childNotify(COMSTACK cs)
579 PDU_Assoc *new_observable =
580 new PDU_Assoc(m_p->m_socketObservable, cs);
582 // Clone PDU Observer
583 new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
584 (new_observable, cs_fileno(cs));
586 if (!new_observable->m_PDU_Observer)
588 new_observable->shutdown();
589 delete new_observable;
592 new_observable->m_p->pdu_next = m_p->pdu_children;
593 m_p->pdu_children = new_observable;
594 new_observable->m_p->pdu_parent = this;
597 const char*PDU_Assoc::getpeername()
601 return cs_addrstr(m_p->cs);
606 * c-file-style: "Stroustrup"
607 * indent-tabs-mode: nil
609 * vim: shiftwidth=4 tabstop=8 expandtab