1 /* This file is part of the yazpp toolkit.
2 * Copyright (C) Index Data
3 * See the file LICENSE for details.
12 #include <yaz/tcpip.h>
14 #include <yazpp/pdu-assoc.h>
20 using namespace yazpp_1;
23 class PDU_Assoc_priv {
24 friend class PDU_Assoc;
36 PDU_Queue(const char *buf, int len);
42 PDU_Assoc *pdu_parent;
43 PDU_Assoc *pdu_children;
46 yazpp_1::ISocketObservable *m_socketObservable;
54 void init(yazpp_1::ISocketObservable *socketObservable);
55 COMSTACK comstack(const char *type_and_host, void **vp);
56 bool m_session_is_dead;
61 void PDU_Assoc_priv::init(ISocketObservable *socketObservable)
65 m_socketObservable = socketObservable;
76 m_session_is_dead = false;
80 PDU_Assoc::~PDU_Assoc()
82 xfree(m_p->cert_fname);
86 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
89 m_p = new PDU_Assoc_priv;
90 m_p->init(socketObservable);
93 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
97 m_p = new PDU_Assoc_priv;
98 m_p->init(socketObservable);
101 if (cs->io_pending & CS_WANT_WRITE)
102 mask |= SOCKET_OBSERVE_WRITE;
103 if (cs->io_pending & CS_WANT_READ)
104 mask |= SOCKET_OBSERVE_READ;
105 m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
108 yaz_log(m_p->log, "new PDU_Assoc. Ready");
109 m_p->state = PDU_Assoc_priv::Ready;
114 yaz_log(m_p->log, "new PDU_Assoc. Accepting");
115 // assume comstack is accepting...
116 m_p->state = PDU_Assoc_priv::Accepting;
117 m_p->m_socketObservable->addObserver(cs_fileno(cs), this);
118 yaz_log(m_p->log, "maskObserver 1");
119 m_p->m_socketObservable->maskObserver(this,
120 mask|SOCKET_OBSERVE_EXCEPT);
125 IPDU_Observable *PDU_Assoc::clone()
127 PDU_Assoc *copy = new PDU_Assoc(m_p->m_socketObservable);
131 void PDU_Assoc::socketNotify(int event)
133 yaz_log(m_p->log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
134 this, m_p->state, event);
135 if ((event & SOCKET_OBSERVE_EXCEPT) &&
136 m_p->state != PDU_Assoc_priv::Connecting)
138 yaz_log(m_p->log, "PDU_Assoc::socketNotify except");
140 m_PDU_Observer->failNotify();
143 else if (event & SOCKET_OBSERVE_TIMEOUT)
145 yaz_log(m_p->log, "PDU_Assoc::socketNotify timeout");
146 m_PDU_Observer->timeoutNotify();
152 case PDU_Assoc_priv::Accepting:
153 if (!cs_accept(m_p->cs))
155 yaz_log(m_p->log, "PDU_Assoc::cs_accept failed");
158 m_PDU_Observer->failNotify();
163 if (m_p->cs->io_pending & CS_WANT_WRITE)
164 mask |= SOCKET_OBSERVE_WRITE;
165 if (m_p->cs->io_pending & CS_WANT_READ)
166 mask |= SOCKET_OBSERVE_READ;
168 { // accept is complete. turn to ready state and write if needed
169 m_p->state = PDU_Assoc_priv::Ready;
173 { // accept still incomplete.
174 yaz_log(m_p->log, "maskObserver 2");
175 m_p->m_socketObservable->maskObserver(this,
176 mask|SOCKET_OBSERVE_EXCEPT);
180 case PDU_Assoc_priv::Connecting:
181 yaz_log(m_p->log, "PDU_Assoc::socketNotify Connecting");
182 res = cs_rcvconnect(m_p->cs);
185 unsigned mask = SOCKET_OBSERVE_EXCEPT;
186 if (m_p->cs->io_pending & CS_WANT_WRITE)
187 mask |= SOCKET_OBSERVE_WRITE;
188 if (m_p->cs->io_pending & CS_WANT_READ)
189 mask |= SOCKET_OBSERVE_READ;
190 yaz_log(m_p->log, "maskObserver 3");
191 m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
192 m_p->m_socketObservable->maskObserver(this, mask);
196 m_p->state = PDU_Assoc_priv::Ready;
198 m_PDU_Observer->connectNotify();
202 case PDU_Assoc_priv::Listen:
203 if (event & SOCKET_OBSERVE_READ)
207 if ((res = cs_listen(m_p->cs, 0, 0)) == 1)
211 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
214 if (!(new_line = cs_accept(m_p->cs)))
216 /* 1. create socket-manager
218 3. create top-level object
219 setup observer for child fileid in pdu-assoc
222 yaz_log(m_p->log, "new session: parent fd=%d child fd=%d",
223 cs_fileno(m_p->cs), cs_fileno(new_line));
224 childNotify(new_line);
227 case PDU_Assoc_priv::Writing:
228 yaz_log(m_p->log, "PDU_Assoc::socketNotify writing");
229 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
232 case PDU_Assoc_priv::Ready:
233 yaz_log(m_p->log, "PDU_Assoc::socketNotify ready");
234 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
238 res = cs_get(m_p->cs, &m_p->input_buf, &m_p->input_len);
241 unsigned mask = SOCKET_OBSERVE_EXCEPT;
242 if (m_p->cs->io_pending & CS_WANT_WRITE)
243 mask |= SOCKET_OBSERVE_WRITE;
244 if (m_p->cs->io_pending & CS_WANT_READ)
245 mask |= SOCKET_OBSERVE_READ;
246 yaz_log(m_p->log, "maskObserver 4");
247 m_p->m_socketObservable->maskObserver(this, mask);
252 yaz_log(m_p->log, "PDU_Assoc::Connection closed by peer");
255 m_PDU_Observer->failNotify(); // problem here..
258 // lock it, so we know if recv_PDU deletes it.
260 m_p->destroyed = &destroyed;
265 PDU_Assoc_priv::PDU_Queue **pq = &m_p->m_queue_in;
269 *pq = new PDU_Assoc_priv::PDU_Queue(m_p->m_input_buf, res);
271 m_PDU_Observer->recv_PDU(m_p->input_buf, res);
273 if (destroyed) // it really was destroyed, return now.
276 } while (m_p->cs && cs_more(m_p->cs));
277 if (m_p->cs && m_p->state == PDU_Assoc_priv::Ready)
279 yaz_log(m_p->log, "maskObserver 5");
280 m_p->m_socketObservable->maskObserver(this,
281 SOCKET_OBSERVE_EXCEPT|
282 SOCKET_OBSERVE_READ);
286 case PDU_Assoc_priv::Closed:
287 yaz_log(m_p->log, "CLOSING state=%d event was %d", m_p->state,
290 m_PDU_Observer->failNotify();
293 yaz_log(m_p->log, "Unknown state=%d event was %d", m_p->state, event);
295 m_PDU_Observer->failNotify();
299 void PDU_Assoc::close_session()
301 m_p->m_session_is_dead = true;
305 m_PDU_Observer->failNotify();
309 void PDU_Assoc::shutdown()
312 for (ch = m_p->pdu_children; ch; ch = ch->m_p->pdu_next)
315 m_p->m_socketObservable->deleteObserver(this);
316 m_p->state = PDU_Assoc_priv::Closed;
319 yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
323 while (m_p->queue_out)
325 PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
326 m_p->queue_out = m_p->queue_out->m_next;
329 xfree(m_p->input_buf);
334 void PDU_Assoc::destroy()
342 // delete from parent's child list (if any)
345 c = &m_p->pdu_parent->m_p->pdu_children;
349 c = &(*c)->m_p->pdu_next;
351 *c = (*c)->m_p->pdu_next;
353 // delete all children ...
354 c = &m_p->pdu_children;
357 PDU_Assoc *here = *c;
358 *c = (*c)->m_p->pdu_next;
359 here->m_p->pdu_parent = 0;
362 yaz_log(m_p->log, "PDU_Assoc::destroy this=%p", this);
365 PDU_Assoc_priv::PDU_Queue::PDU_Queue(const char *buf, int len)
367 m_buf = (char *) xmalloc(len);
368 memcpy(m_buf, buf, len);
373 PDU_Assoc_priv::PDU_Queue::~PDU_Queue()
378 int PDU_Assoc::flush_PDU()
382 yaz_log(m_p->log, "PDU_Assoc::flush_PDU");
383 if (m_p->state != PDU_Assoc_priv::Ready && m_p->state != PDU_Assoc_priv::Writing)
385 yaz_log(m_p->log, "PDU_Assoc::flush_PDU, not ready");
388 PDU_Assoc_priv::PDU_Queue *q = m_p->queue_out;
391 m_p->state = PDU_Assoc_priv::Ready;
392 yaz_log(m_p->log, "PDU_Assoc::flush_PDU queue empty");
393 yaz_log(m_p->log, "maskObserver 6");
394 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
395 SOCKET_OBSERVE_WRITE|
396 SOCKET_OBSERVE_EXCEPT);
397 if (m_p->m_session_is_dead)
400 m_PDU_Observer->failNotify();
404 r = cs_put(m_p->cs, q->m_buf, q->m_len);
407 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put failed");
409 m_PDU_Observer->failNotify();
412 m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
415 unsigned mask = SOCKET_OBSERVE_EXCEPT;
416 m_p->state = PDU_Assoc_priv::Writing;
417 if (m_p->cs->io_pending & CS_WANT_WRITE)
418 mask |= SOCKET_OBSERVE_WRITE;
419 if (m_p->cs->io_pending & CS_WANT_READ)
420 mask |= SOCKET_OBSERVE_READ;
422 mask |= SOCKET_OBSERVE_WRITE;
423 yaz_log(m_p->log, "maskObserver 7");
424 m_p->m_socketObservable->maskObserver(this, mask);
425 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
426 q->m_len, cs_fileno(m_p->cs));
429 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
430 // whole packet sent... delete this and proceed to next ...
431 m_p->queue_out = q->m_next;
433 // don't select on write if queue is empty ...
436 m_p->state = PDU_Assoc_priv::Ready;
437 yaz_log(m_p->log, "maskObserver 8");
438 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
439 SOCKET_OBSERVE_EXCEPT);
440 if (m_p->m_session_is_dead)
446 int PDU_Assoc::send_PDU(const char *buf, int len)
448 yaz_log(m_p->log, "PDU_Assoc::send_PDU");
449 PDU_Assoc_priv::PDU_Queue **pq = &m_p->queue_out;
450 int is_idle = (*pq ? 0 : 1);
454 yaz_log(m_p->log, "PDU_Assoc::send_PDU failed, cs == 0");
459 *pq = new PDU_Assoc_priv::PDU_Queue(buf, len);
463 yaz_log(m_p->log, "PDU_Assoc::cannot send_PDU fd=%d",
468 COMSTACK PDU_Assoc_priv::comstack(const char *type_and_host, void **vp)
470 return cs_create_host(type_and_host, 2, vp);
473 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
477 m_p->m_socketObservable->deleteObserver(this);
478 m_p->state = PDU_Assoc_priv::Closed;
481 yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
485 while (m_p->queue_out)
487 PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
488 m_p->queue_out = m_p->queue_out->m_next;
491 xfree(m_p->input_buf);
500 m_PDU_Observer = observer;
502 m_p->cs = m_p->comstack(addr, &ap);
508 cs_set_ssl_certificate_file(m_p->cs, m_p->cert_fname);
510 if (cs_bind(m_p->cs, ap, CS_SERVER) < 0)
513 int fd = cs_fileno(m_p->cs);
515 int oldflags = fcntl(fd, F_GETFD, 0);
518 oldflags |= FD_CLOEXEC;
519 fcntl(fd, F_SETFD, oldflags);
522 m_p->m_socketObservable->addObserver(fd, this);
523 yaz_log(m_p->log, "maskObserver 9");
524 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
525 SOCKET_OBSERVE_EXCEPT);
526 yaz_log(m_p->log, "PDU_Assoc::listen ok fd=%d", fd);
527 m_p->state = PDU_Assoc_priv::Listen;
531 COMSTACK PDU_Assoc::get_comstack()
536 void PDU_Assoc::idleTime(int idleTime)
538 m_p->idleTime = idleTime;
539 yaz_log(m_p->log, "PDU_Assoc::idleTime(%d)", idleTime);
540 m_p->m_socketObservable->timeoutObserver(this, m_p->idleTime);
543 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
545 yaz_log(m_p->log, "PDU_Assoc::connect %s", addr);
547 m_PDU_Observer = observer;
549 m_p->cs = m_p->comstack(addr, &ap);
552 int res = cs_connect(m_p->cs, ap);
553 yaz_log(m_p->log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_p->cs),
555 m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this);
558 { // Connect complete
559 m_p->state = PDU_Assoc_priv::Connecting;
560 unsigned mask = SOCKET_OBSERVE_EXCEPT;
561 mask |= SOCKET_OBSERVE_WRITE;
562 mask |= SOCKET_OBSERVE_READ;
563 yaz_log(m_p->log, "maskObserver 11");
564 m_p->m_socketObservable->maskObserver(this, mask);
568 m_p->state = PDU_Assoc_priv::Connecting;
569 unsigned mask = SOCKET_OBSERVE_EXCEPT;
570 if (m_p->cs->io_pending & CS_WANT_WRITE)
571 mask |= SOCKET_OBSERVE_WRITE;
572 if (m_p->cs->io_pending & CS_WANT_READ)
573 mask |= SOCKET_OBSERVE_READ;
574 yaz_log(m_p->log, "maskObserver 11");
575 m_p->m_socketObservable->maskObserver(this, mask);
578 { // Connect failed immediately
579 // Since m_state is Closed we can distinguish this case from
580 // normal connect in socketNotify handler
581 yaz_log(m_p->log, "maskObserver 12");
582 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
583 SOCKET_OBSERVE_EXCEPT);
588 // Single-threaded... Only useful for non-blocking handlers
589 void PDU_Assoc::childNotify(COMSTACK cs)
591 PDU_Assoc *new_observable =
592 new PDU_Assoc(m_p->m_socketObservable, cs);
594 // Clone PDU Observer
595 new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
596 (new_observable, cs_fileno(cs));
598 if (!new_observable->m_PDU_Observer)
600 new_observable->shutdown();
601 delete new_observable;
604 new_observable->m_p->pdu_next = m_p->pdu_children;
605 m_p->pdu_children = new_observable;
606 new_observable->m_p->pdu_parent = this;
609 const char*PDU_Assoc::getpeername()
613 return cs_addrstr(m_p->cs);
616 void PDU_Assoc::set_cert_fname(const char *fname)
618 xfree(m_p->cert_fname);
621 m_p->cert_fname = xstrdup(fname);
627 * c-file-style: "Stroustrup"
628 * indent-tabs-mode: nil
630 * vim: shiftwidth=4 tabstop=8 expandtab