1 /* This file is part of the yazpp toolkit.
2 * Copyright (C) Index Data
3 * See the file LICENSE for details.
12 #include <yaz/tcpip.h>
14 #include <yazpp/pdu-assoc.h>
20 using namespace yazpp_1;
23 class PDU_Assoc_priv {
24 friend class PDU_Assoc;
36 PDU_Queue(const char *buf, int len);
42 PDU_Assoc *pdu_parent;
43 PDU_Assoc *pdu_children;
46 yazpp_1::ISocketObservable *m_socketObservable;
54 void init(yazpp_1::ISocketObservable *socketObservable);
55 COMSTACK comstack(const char *type_and_host, void **vp);
56 bool m_session_is_dead;
61 void PDU_Assoc_priv::init(ISocketObservable *socketObservable)
65 m_socketObservable = socketObservable;
76 m_session_is_dead = false;
80 PDU_Assoc::~PDU_Assoc()
82 xfree(m_p->cert_fname);
86 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable)
89 m_p = new PDU_Assoc_priv;
90 m_p->init(socketObservable);
93 PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable,
97 m_p = new PDU_Assoc_priv;
98 m_p->init(socketObservable);
101 if (cs->io_pending & CS_WANT_WRITE)
102 mask |= SOCKET_OBSERVE_WRITE;
103 if (cs->io_pending & CS_WANT_READ)
104 mask |= SOCKET_OBSERVE_READ;
105 m_p->m_socketObservable->addObserver(this);
108 yaz_log(m_p->log, "new PDU_Assoc. Ready");
109 m_p->state = PDU_Assoc_priv::Ready;
114 yaz_log(m_p->log, "new PDU_Assoc. Accepting");
115 // assume comstack is accepting...
116 m_p->state = PDU_Assoc_priv::Accepting;
117 m_p->m_socketObservable->addObserver(this);
118 yaz_log(m_p->log, "maskObserver 1");
119 m_p->m_socketObservable->maskObserver(this,
120 mask|SOCKET_OBSERVE_EXCEPT,
126 IPDU_Observable *PDU_Assoc::clone()
128 PDU_Assoc *copy = new PDU_Assoc(m_p->m_socketObservable);
132 void PDU_Assoc::socketNotify(int event)
134 yaz_log(m_p->log, "PDU_Assoc::socketNotify p=%p state=%d event = %d",
135 this, m_p->state, event);
136 if ((event & SOCKET_OBSERVE_EXCEPT) &&
137 m_p->state != PDU_Assoc_priv::Connecting)
139 yaz_log(m_p->log, "PDU_Assoc::socketNotify except");
141 m_PDU_Observer->failNotify();
144 else if (event & SOCKET_OBSERVE_TIMEOUT)
146 yaz_log(m_p->log, "PDU_Assoc::socketNotify timeout");
147 m_PDU_Observer->timeoutNotify();
153 case PDU_Assoc_priv::Accepting:
154 if (!cs_accept(m_p->cs))
156 yaz_log(m_p->log, "PDU_Assoc::cs_accept failed");
159 m_PDU_Observer->failNotify();
164 if (m_p->cs->io_pending & CS_WANT_WRITE)
165 mask |= SOCKET_OBSERVE_WRITE;
166 if (m_p->cs->io_pending & CS_WANT_READ)
167 mask |= SOCKET_OBSERVE_READ;
169 { // accept is complete. turn to ready state and write if needed
170 m_p->state = PDU_Assoc_priv::Ready;
174 { // accept still incomplete.
175 yaz_log(m_p->log, "maskObserver 2");
176 m_p->m_socketObservable->maskObserver(
178 mask|SOCKET_OBSERVE_EXCEPT, cs_fileno(m_p->cs));
182 case PDU_Assoc_priv::Connecting:
183 yaz_log(m_p->log, "PDU_Assoc::socketNotify Connecting");
184 res = cs_rcvconnect(m_p->cs);
187 unsigned mask = SOCKET_OBSERVE_EXCEPT;
188 if (m_p->cs->io_pending & CS_WANT_WRITE)
189 mask |= SOCKET_OBSERVE_WRITE;
190 if (m_p->cs->io_pending & CS_WANT_READ)
191 mask |= SOCKET_OBSERVE_READ;
192 yaz_log(m_p->log, "maskObserver 3");
193 m_p->m_socketObservable->maskObserver(this, mask,
198 m_p->state = PDU_Assoc_priv::Ready;
200 m_PDU_Observer->connectNotify();
204 case PDU_Assoc_priv::Listen:
205 if (event & SOCKET_OBSERVE_READ)
209 if ((res = cs_listen(m_p->cs, 0, 0)) == 1)
213 yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
216 if (!(new_line = cs_accept(m_p->cs)))
218 /* 1. create socket-manager
220 3. create top-level object
221 setup observer for child fileid in pdu-assoc
224 yaz_log(m_p->log, "new session: parent fd=%d child fd=%d",
225 cs_fileno(m_p->cs), cs_fileno(new_line));
226 childNotify(new_line);
229 case PDU_Assoc_priv::Writing:
230 yaz_log(m_p->log, "PDU_Assoc::socketNotify writing");
231 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
234 case PDU_Assoc_priv::Ready:
235 yaz_log(m_p->log, "PDU_Assoc::socketNotify ready");
236 if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE))
240 res = cs_get(m_p->cs, &m_p->input_buf, &m_p->input_len);
243 unsigned mask = SOCKET_OBSERVE_EXCEPT;
244 if (m_p->cs->io_pending & CS_WANT_WRITE)
245 mask |= SOCKET_OBSERVE_WRITE;
246 if (m_p->cs->io_pending & CS_WANT_READ)
247 mask |= SOCKET_OBSERVE_READ;
248 yaz_log(m_p->log, "maskObserver 4");
249 m_p->m_socketObservable->maskObserver(this, mask,
255 yaz_log(m_p->log, "PDU_Assoc::Connection closed by peer");
258 m_PDU_Observer->failNotify(); // problem here..
261 // lock it, so we know if recv_PDU deletes it.
263 m_p->destroyed = &destroyed;
268 PDU_Assoc_priv::PDU_Queue **pq = &m_p->m_queue_in;
272 *pq = new PDU_Assoc_priv::PDU_Queue(m_p->m_input_buf, res);
274 m_PDU_Observer->recv_PDU(m_p->input_buf, res);
276 if (destroyed) // it really was destroyed, return now.
279 } while (m_p->cs && cs_more(m_p->cs));
280 if (m_p->cs && m_p->state == PDU_Assoc_priv::Ready)
282 yaz_log(m_p->log, "maskObserver 5");
283 m_p->m_socketObservable->maskObserver(this,
284 SOCKET_OBSERVE_EXCEPT|
290 case PDU_Assoc_priv::Closed:
291 yaz_log(m_p->log, "CLOSING state=%d event was %d", m_p->state,
294 m_PDU_Observer->failNotify();
297 yaz_log(m_p->log, "Unknown state=%d event was %d", m_p->state, event);
299 m_PDU_Observer->failNotify();
303 void PDU_Assoc::close_session()
305 m_p->m_session_is_dead = true;
309 m_PDU_Observer->failNotify();
313 void PDU_Assoc::shutdown()
316 for (ch = m_p->pdu_children; ch; ch = ch->m_p->pdu_next)
319 m_p->m_socketObservable->deleteObserver(this);
320 m_p->state = PDU_Assoc_priv::Closed;
323 yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
327 while (m_p->queue_out)
329 PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
330 m_p->queue_out = m_p->queue_out->m_next;
333 xfree(m_p->input_buf);
338 void PDU_Assoc::destroy()
346 // delete from parent's child list (if any)
349 c = &m_p->pdu_parent->m_p->pdu_children;
353 c = &(*c)->m_p->pdu_next;
355 *c = (*c)->m_p->pdu_next;
357 // delete all children ...
358 c = &m_p->pdu_children;
361 PDU_Assoc *here = *c;
362 *c = (*c)->m_p->pdu_next;
363 here->m_p->pdu_parent = 0;
366 yaz_log(m_p->log, "PDU_Assoc::destroy this=%p", this);
369 PDU_Assoc_priv::PDU_Queue::PDU_Queue(const char *buf, int len)
371 m_buf = (char *) xmalloc(len);
372 memcpy(m_buf, buf, len);
377 PDU_Assoc_priv::PDU_Queue::~PDU_Queue()
382 int PDU_Assoc::flush_PDU()
386 yaz_log(m_p->log, "PDU_Assoc::flush_PDU");
387 if (m_p->state != PDU_Assoc_priv::Ready && m_p->state != PDU_Assoc_priv::Writing)
389 yaz_log(m_p->log, "PDU_Assoc::flush_PDU, not ready");
392 PDU_Assoc_priv::PDU_Queue *q = m_p->queue_out;
395 m_p->state = PDU_Assoc_priv::Ready;
396 yaz_log(m_p->log, "PDU_Assoc::flush_PDU queue empty");
397 yaz_log(m_p->log, "maskObserver 6");
398 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
399 SOCKET_OBSERVE_WRITE|
400 SOCKET_OBSERVE_EXCEPT,
402 if (m_p->m_session_is_dead)
405 m_PDU_Observer->failNotify();
409 r = cs_put(m_p->cs, q->m_buf, q->m_len);
412 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put failed");
414 m_PDU_Observer->failNotify();
419 unsigned mask = SOCKET_OBSERVE_EXCEPT;
420 m_p->state = PDU_Assoc_priv::Writing;
421 if (m_p->cs->io_pending & CS_WANT_WRITE)
422 mask |= SOCKET_OBSERVE_WRITE;
423 if (m_p->cs->io_pending & CS_WANT_READ)
424 mask |= SOCKET_OBSERVE_READ;
426 mask |= SOCKET_OBSERVE_WRITE;
427 yaz_log(m_p->log, "maskObserver 7");
428 m_p->m_socketObservable->maskObserver(this, mask, cs_fileno(m_p->cs));
429 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
430 q->m_len, cs_fileno(m_p->cs));
433 yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
434 // whole packet sent... delete this and proceed to next ...
435 m_p->queue_out = q->m_next;
437 // don't select on write if queue is empty ...
440 m_p->state = PDU_Assoc_priv::Ready;
441 yaz_log(m_p->log, "maskObserver 8");
442 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
443 SOCKET_OBSERVE_EXCEPT,
445 if (m_p->m_session_is_dead)
451 int PDU_Assoc::send_PDU(const char *buf, int len)
453 yaz_log(m_p->log, "PDU_Assoc::send_PDU");
454 PDU_Assoc_priv::PDU_Queue **pq = &m_p->queue_out;
455 int is_idle = (*pq ? 0 : 1);
459 yaz_log(m_p->log, "PDU_Assoc::send_PDU failed, cs == 0");
464 *pq = new PDU_Assoc_priv::PDU_Queue(buf, len);
468 yaz_log(m_p->log, "PDU_Assoc::cannot send_PDU fd=%d",
473 COMSTACK PDU_Assoc_priv::comstack(const char *type_and_host, void **vp)
475 return cs_create_host(type_and_host, 2, vp);
478 int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr)
482 m_p->m_socketObservable->deleteObserver(this);
483 m_p->state = PDU_Assoc_priv::Closed;
486 yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs));
490 while (m_p->queue_out)
492 PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out;
493 m_p->queue_out = m_p->queue_out->m_next;
496 xfree(m_p->input_buf);
505 m_PDU_Observer = observer;
507 m_p->cs = m_p->comstack(addr, &ap);
513 cs_set_ssl_certificate_file(m_p->cs, m_p->cert_fname);
515 if (cs_bind(m_p->cs, ap, CS_SERVER) < 0)
518 int fd = cs_fileno(m_p->cs);
520 int oldflags = fcntl(fd, F_GETFD, 0);
523 oldflags |= FD_CLOEXEC;
524 fcntl(fd, F_SETFD, oldflags);
527 m_p->m_socketObservable->addObserver(this);
528 yaz_log(m_p->log, "maskObserver 9");
529 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ|
530 SOCKET_OBSERVE_EXCEPT,
532 yaz_log(m_p->log, "PDU_Assoc::listen ok fd=%d", fd);
533 m_p->state = PDU_Assoc_priv::Listen;
537 COMSTACK PDU_Assoc::get_comstack()
542 void PDU_Assoc::idleTime(int idleTime)
544 m_p->idleTime = idleTime;
545 yaz_log(m_p->log, "PDU_Assoc::idleTime(%d)", idleTime);
546 m_p->m_socketObservable->timeoutObserver(this, m_p->idleTime);
549 int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr)
551 yaz_log(m_p->log, "PDU_Assoc::connect %s", addr);
553 m_PDU_Observer = observer;
555 m_p->cs = m_p->comstack(addr, &ap);
558 int res = cs_connect(m_p->cs, ap);
559 yaz_log(m_p->log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_p->cs),
561 m_p->m_socketObservable->addObserver(this);
564 { // Connect complete
565 m_p->state = PDU_Assoc_priv::Connecting;
566 unsigned mask = SOCKET_OBSERVE_EXCEPT;
567 mask |= SOCKET_OBSERVE_WRITE;
568 mask |= SOCKET_OBSERVE_READ;
569 yaz_log(m_p->log, "maskObserver 11");
570 m_p->m_socketObservable->maskObserver(this, mask, cs_fileno(m_p->cs));
574 m_p->state = PDU_Assoc_priv::Connecting;
575 unsigned mask = SOCKET_OBSERVE_EXCEPT;
576 if (m_p->cs->io_pending & CS_WANT_WRITE)
577 mask |= SOCKET_OBSERVE_WRITE;
578 if (m_p->cs->io_pending & CS_WANT_READ)
579 mask |= SOCKET_OBSERVE_READ;
580 yaz_log(m_p->log, "maskObserver 11");
581 m_p->m_socketObservable->maskObserver(this, mask, cs_fileno(m_p->cs));
584 { // Connect failed immediately
585 // Since m_state is Closed we can distinguish this case from
586 // normal connect in socketNotify handler
587 yaz_log(m_p->log, "maskObserver 12");
588 m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE|
589 SOCKET_OBSERVE_EXCEPT,
595 // Single-threaded... Only useful for non-blocking handlers
596 void PDU_Assoc::childNotify(COMSTACK cs)
598 PDU_Assoc *new_observable =
599 new PDU_Assoc(m_p->m_socketObservable, cs);
601 // Clone PDU Observer
602 new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
603 (new_observable, cs_fileno(cs));
605 if (!new_observable->m_PDU_Observer)
607 new_observable->shutdown();
608 delete new_observable;
611 new_observable->m_p->pdu_next = m_p->pdu_children;
612 m_p->pdu_children = new_observable;
613 new_observable->m_p->pdu_parent = this;
616 const char*PDU_Assoc::getpeername()
620 return cs_addrstr(m_p->cs);
623 void PDU_Assoc::set_cert_fname(const char *fname)
625 xfree(m_p->cert_fname);
628 m_p->cert_fname = xstrdup(fname);
634 * c-file-style: "Stroustrup"
635 * indent-tabs-mode: nil
637 * vim: shiftwidth=4 tabstop=8 expandtab