From e1563004f70ed8be4549db0ba3a20de902f98d4c Mon Sep 17 00:00:00 2001 From: Adam Dickmeiss Date: Fri, 9 Nov 2012 13:22:49 +0100 Subject: [PATCH] Pimpl PDU_Assoc class --- include/yazpp/pdu-assoc.h | 42 +---- src/yaz-pdu-assoc.cpp | 423 +++++++++++++++++++++++++-------------------- 2 files changed, 242 insertions(+), 223 deletions(-) diff --git a/include/yazpp/pdu-assoc.h b/include/yazpp/pdu-assoc.h index ac86329..eee460a 100644 --- a/include/yazpp/pdu-assoc.h +++ b/include/yazpp/pdu-assoc.h @@ -33,6 +33,8 @@ #include namespace yazpp_1 { + class PDU_Assoc_priv; + /** Simple Protocol Data Unit Assocation. This object sends - and receives PDU's using the COMSTACK network utility. To use the association in client role, use @@ -41,46 +43,18 @@ namespace yazpp_1 { */ class YAZ_EXPORT PDU_Assoc : public IPDU_Observable, yazpp_1::ISocketObserver { friend class PDU_AssocThread; - private: - enum { - Connecting, - Listen, - Ready, - Closed, - Writing, - Accepting - } m_state; - class PDU_Queue { - public: - PDU_Queue(const char *buf, int len); - ~PDU_Queue(); - char *m_buf; - int m_len; - PDU_Queue *m_next; - }; - PDU_Assoc *m_parent; - PDU_Assoc *m_children; - PDU_Assoc *m_next; - COMSTACK m_cs; - yazpp_1::ISocketObservable *m_socketObservable; + PDU_Assoc_priv *m_p; IPDU_Observer *m_PDU_Observer; - char *m_input_buf; - int m_input_len; - PDU_Queue *m_queue_out; - PDU_Queue *m_queue_in; + int flush_PDU(); - int *m_destroyed; - int m_idleTime; - int m_log; - void init(yazpp_1::ISocketObservable *socketObservable); - bool m_session_is_dead; - public: COMSTACK comstack(const char *type_and_host, void **vp); + public: /// Create object using specified socketObservable PDU_Assoc(yazpp_1::ISocketObservable *socketObservable); /// Create Object using existing comstack PDU_Assoc(yazpp_1::ISocketObservable *socketObservable, - COMSTACK cs); + COMSTACK cs); + virtual ~PDU_Assoc(); /// Close socket and destroy object. /// virtual ~PDU_Assoc(); /// Clone the object @@ -98,7 +72,7 @@ class YAZ_EXPORT PDU_Assoc : public IPDU_Observable, yazpp_1::ISocketObserver { /// Close and destroy void destroy(); /// Set Idle Time - void idleTime (int timeout); + void idleTime(int timeout); /// Child start... virtual void childNotify(COMSTACK cs); /// close session diff --git a/src/yaz-pdu-assoc.cpp b/src/yaz-pdu-assoc.cpp index 9af1ab7..2957f25 100644 --- a/src/yaz-pdu-assoc.cpp +++ b/src/yaz-pdu-assoc.cpp @@ -19,70 +19,114 @@ using namespace yazpp_1; -void PDU_Assoc::init(ISocketObservable *socketObservable) +namespace yazpp_1 { + class PDU_Assoc_priv { + public: + enum { + Connecting, + Listen, + Ready, + Closed, + Writing, + Accepting + } state; + class PDU_Queue { + public: + PDU_Queue(const char *buf, int len); + ~PDU_Queue(); + char *m_buf; + int m_len; + PDU_Queue *m_next; + }; + PDU_Assoc *pdu_parent; + PDU_Assoc *pdu_children; + PDU_Assoc *pdu_next; + COMSTACK cs; + yazpp_1::ISocketObservable *m_socketObservable; + char *input_buf; + int input_len; + PDU_Queue *queue_out; + PDU_Queue *queue_in; + int *destroyed; + int idleTime; + int log; + void init(yazpp_1::ISocketObservable *socketObservable); + bool m_session_is_dead; + }; +} + +void PDU_Assoc_priv::init(ISocketObservable *socketObservable) { - m_state = Closed; - m_cs = 0; + state = Closed; + cs = 0; m_socketObservable = socketObservable; - m_PDU_Observer = 0; - m_queue_out = 0; - m_queue_in = 0; - m_input_buf = 0; - m_input_len = 0; - m_children = 0; - m_parent = 0; - m_next = 0; - m_destroyed = 0; - m_idleTime = 0; - m_log = YLOG_DEBUG; + queue_out = 0; + queue_in = 0; + input_buf = 0; + input_len = 0; + pdu_children = 0; + pdu_parent = 0; + pdu_next = 0; + destroyed = 0; + idleTime = 0; + log = YLOG_DEBUG; m_session_is_dead = false; } +PDU_Assoc::~PDU_Assoc() +{ + delete m_p; +} + PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable) { - init (socketObservable); + m_PDU_Observer = 0; + m_p = new PDU_Assoc_priv; + m_p->init(socketObservable); } PDU_Assoc::PDU_Assoc(ISocketObservable *socketObservable, COMSTACK cs) { - init(socketObservable); - m_cs = cs; + m_PDU_Observer = 0; + m_p = new PDU_Assoc_priv; + m_p->init(socketObservable); + m_p->cs = cs; unsigned mask = 0; if (cs->io_pending & CS_WANT_WRITE) mask |= SOCKET_OBSERVE_WRITE; if (cs->io_pending & CS_WANT_READ) mask |= SOCKET_OBSERVE_READ; - m_socketObservable->addObserver(cs_fileno(cs), this); + m_p->m_socketObservable->addObserver(cs_fileno(cs), this); if (!mask) { - yaz_log (m_log, "new PDU_Assoc. Ready"); - m_state = Ready; + yaz_log(m_p->log, "new PDU_Assoc. Ready"); + m_p->state = PDU_Assoc_priv::Ready; flush_PDU(); } else { - yaz_log (m_log, "new PDU_Assoc. Accepting"); + yaz_log(m_p->log, "new PDU_Assoc. Accepting"); // assume comstack is accepting... - m_state = Accepting; - m_socketObservable->addObserver(cs_fileno(cs), this); - yaz_log(m_log, "maskObserver 1"); - m_socketObservable->maskObserver(this, - mask |SOCKET_OBSERVE_EXCEPT); + m_p->state = PDU_Assoc_priv::Accepting; + m_p->m_socketObservable->addObserver(cs_fileno(cs), this); + yaz_log(m_p->log, "maskObserver 1"); + m_p->m_socketObservable->maskObserver(this, + mask|SOCKET_OBSERVE_EXCEPT); } } IPDU_Observable *PDU_Assoc::clone() { - PDU_Assoc *copy = new PDU_Assoc(m_socketObservable); + PDU_Assoc *copy = new PDU_Assoc(m_p->m_socketObservable); return copy; } void PDU_Assoc::socketNotify(int event) { - yaz_log (m_log, "PDU_Assoc::socketNotify p=%p state=%d event = %d", - this, m_state, event); + yaz_log(m_p->log, "PDU_Assoc::socketNotify p=%p state=%d event = %d", + this, m_p->state, event); if (event & SOCKET_OBSERVE_EXCEPT) { shutdown(); @@ -94,37 +138,37 @@ void PDU_Assoc::socketNotify(int event) m_PDU_Observer->timeoutNotify(); return; } - switch (m_state) + switch (m_p->state) { - case Accepting: - if (!cs_accept (m_cs)) + case PDU_Assoc_priv::Accepting: + if (!cs_accept(m_p->cs)) { - yaz_log (m_log, "PDU_Assoc::cs_accept failed"); - m_cs = 0; + yaz_log(m_p->log, "PDU_Assoc::cs_accept failed"); + m_p->cs = 0; shutdown(); m_PDU_Observer->failNotify(); } else { unsigned mask = 0; - if (m_cs->io_pending & CS_WANT_WRITE) + if (m_p->cs->io_pending & CS_WANT_WRITE) mask |= SOCKET_OBSERVE_WRITE; - if (m_cs->io_pending & CS_WANT_READ) + if (m_p->cs->io_pending & CS_WANT_READ) mask |= SOCKET_OBSERVE_READ; if (!mask) { // accept is complete. turn to ready state and write if needed - m_state = Ready; + m_p->state = PDU_Assoc_priv::Ready; flush_PDU(); } else { // accept still incomplete. - yaz_log(m_log, "maskObserver 2"); - m_socketObservable->maskObserver(this, + yaz_log(m_p->log, "maskObserver 2"); + m_p->m_socketObservable->maskObserver(this, mask|SOCKET_OBSERVE_EXCEPT); } } break; - case Connecting: + case PDU_Assoc_priv::Connecting: if (event & SOCKET_OBSERVE_READ && event & SOCKET_OBSERVE_WRITE) { @@ -134,41 +178,41 @@ void PDU_Assoc::socketNotify(int event) } else { - yaz_log (m_log, "cs_rcvconnect"); - int res = cs_rcvconnect (m_cs); + yaz_log(m_p->log, "cs_rcvconnect"); + int res = cs_rcvconnect(m_p->cs); if (res == 1) { unsigned mask = SOCKET_OBSERVE_EXCEPT; - if (m_cs->io_pending & CS_WANT_WRITE) + if (m_p->cs->io_pending & CS_WANT_WRITE) mask |= SOCKET_OBSERVE_WRITE; - if (m_cs->io_pending & CS_WANT_READ) + if (m_p->cs->io_pending & CS_WANT_READ) mask |= SOCKET_OBSERVE_READ; - yaz_log(m_log, "maskObserver 3"); - m_socketObservable->maskObserver(this, mask); + yaz_log(m_p->log, "maskObserver 3"); + m_p->m_socketObservable->maskObserver(this, mask); } else { - m_state = Ready; + m_p->state = PDU_Assoc_priv::Ready; if (m_PDU_Observer) m_PDU_Observer->connectNotify(); flush_PDU(); } } break; - case Listen: + case PDU_Assoc_priv::Listen: if (event & SOCKET_OBSERVE_READ) { int res; COMSTACK new_line; - if ((res = cs_listen(m_cs, 0, 0)) == 1) + if ((res = cs_listen(m_p->cs, 0, 0)) == 1) return; if (res < 0) { yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed"); return; } - if (!(new_line = cs_accept(m_cs))) + if (!(new_line = cs_accept(m_p->cs))) return; /* 1. create socket-manager 2. create pdu-assoc @@ -176,35 +220,35 @@ void PDU_Assoc::socketNotify(int event) setup observer for child fileid in pdu-assoc 4. start thread */ - yaz_log (m_log, "new session: parent fd=%d child fd=%d", - cs_fileno(m_cs), cs_fileno(new_line)); - childNotify (new_line); + yaz_log(m_p->log, "new session: parent fd=%d child fd=%d", + cs_fileno(m_p->cs), cs_fileno(new_line)); + childNotify(new_line); } break; - case Writing: + case PDU_Assoc_priv::Writing: if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE)) flush_PDU(); break; - case Ready: + case PDU_Assoc_priv::Ready: if (event & (SOCKET_OBSERVE_READ|SOCKET_OBSERVE_WRITE)) { do { - int res = cs_get (m_cs, &m_input_buf, &m_input_len); + int res = cs_get(m_p->cs, &m_p->input_buf, &m_p->input_len); if (res == 1) { unsigned mask = SOCKET_OBSERVE_EXCEPT; - if (m_cs->io_pending & CS_WANT_WRITE) + if (m_p->cs->io_pending & CS_WANT_WRITE) mask |= SOCKET_OBSERVE_WRITE; - if (m_cs->io_pending & CS_WANT_READ) + if (m_p->cs->io_pending & CS_WANT_READ) mask |= SOCKET_OBSERVE_READ; - yaz_log(m_log, "maskObserver 4"); - m_socketObservable->maskObserver(this, mask); + yaz_log(m_p->log, "maskObserver 4"); + m_p->m_socketObservable->maskObserver(this, mask); return; } else if (res <= 0) { - yaz_log (m_log, "PDU_Assoc::Connection closed by peer"); + yaz_log(m_p->log, "PDU_Assoc::Connection closed by peer"); shutdown(); if (m_PDU_Observer) m_PDU_Observer->failNotify(); // problem here.. @@ -212,39 +256,40 @@ void PDU_Assoc::socketNotify(int event) } // lock it, so we know if recv_PDU deletes it. int destroyed = 0; - m_destroyed = &destroyed; + m_p->destroyed = &destroyed; if (!m_PDU_Observer) return; #if 0 - PDU_Queue **pq = &m_queue_in; + PDU_Assoc_priv::PDU_Queue **pq = &m_p->m_queue_in; while (*pq) pq = &(*pq)->m_next; - *pq = new PDU_Queue(m_input_buf, res); + *pq = new PDU_Assoc_priv::PDU_Queue(m_p->m_input_buf, res); #else - m_PDU_Observer->recv_PDU(m_input_buf, res); + m_PDU_Observer->recv_PDU(m_p->input_buf, res); #endif if (destroyed) // it really was destroyed, return now. return; - m_destroyed = 0; - } while (m_cs && cs_more (m_cs)); - if (m_cs && m_state == Ready) + m_p->destroyed = 0; + } while (m_p->cs && cs_more(m_p->cs)); + if (m_p->cs && m_p->state == PDU_Assoc_priv::Ready) { - yaz_log(m_log, "maskObserver 5"); - m_socketObservable->maskObserver(this, - SOCKET_OBSERVE_EXCEPT| - SOCKET_OBSERVE_READ); + yaz_log(m_p->log, "maskObserver 5"); + m_p->m_socketObservable->maskObserver(this, + SOCKET_OBSERVE_EXCEPT| + SOCKET_OBSERVE_READ); } } break; - case Closed: - yaz_log (m_log, "CLOSING state=%d event was %d", m_state, event); + case PDU_Assoc_priv::Closed: + yaz_log(m_p->log, "CLOSING state=%d event was %d", m_p->state, + event); shutdown(); m_PDU_Observer->failNotify(); break; default: - yaz_log (m_log, "Unknown state=%d event was %d", m_state, event); + yaz_log(m_p->log, "Unknown state=%d event was %d", m_p->state, event); shutdown(); m_PDU_Observer->failNotify(); } @@ -252,8 +297,8 @@ void PDU_Assoc::socketNotify(int event) void PDU_Assoc::close_session() { - m_session_is_dead = true; - if (!m_queue_out) + m_p->m_session_is_dead = true; + if (!m_p->queue_out) { shutdown(); m_PDU_Observer->failNotify(); @@ -263,101 +308,101 @@ void PDU_Assoc::close_session() void PDU_Assoc::shutdown() { PDU_Assoc *ch; - for (ch = m_children; ch; ch = ch->m_next) + for (ch = m_p->pdu_children; ch; ch = ch->m_p->pdu_next) ch->shutdown(); - m_socketObservable->deleteObserver(this); - m_state = Closed; - if (m_cs) + m_p->m_socketObservable->deleteObserver(this); + m_p->state = PDU_Assoc_priv::Closed; + if (m_p->cs) { - yaz_log (m_log, "PDU_Assoc::close fd=%d", cs_fileno(m_cs)); - cs_close (m_cs); + yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs)); + cs_close(m_p->cs); } - m_cs = 0; - while (m_queue_out) + m_p->cs = 0; + while (m_p->queue_out) { - PDU_Queue *q_this = m_queue_out; - m_queue_out = m_queue_out->m_next; + PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out; + m_p->queue_out = m_p->queue_out->m_next; delete q_this; } - xfree (m_input_buf); - m_input_buf = 0; - m_input_len = 0; + xfree(m_p->input_buf); + m_p->input_buf = 0; + m_p->input_len = 0; } void PDU_Assoc::destroy() { shutdown(); - if (m_destroyed) - *m_destroyed = 1; + if (m_p->destroyed) + *m_p->destroyed = 1; PDU_Assoc **c; // delete from parent's child list (if any) - if (m_parent) + if (m_p->pdu_parent) { - c = &m_parent->m_children; + c = &m_p->pdu_parent->m_p->pdu_children; while (*c != this) { assert (*c); - c = &(*c)->m_next; + c = &(*c)->m_p->pdu_next; } - *c = (*c)->m_next; + *c = (*c)->m_p->pdu_next; } // delete all children ... - c = &m_children; + c = &m_p->pdu_children; while (*c) { PDU_Assoc *here = *c; - *c = (*c)->m_next; - here->m_parent = 0; + *c = (*c)->m_p->pdu_next; + here->m_p->pdu_parent = 0; delete here; } - yaz_log (m_log, "PDU_Assoc::destroy this=%p", this); + yaz_log(m_p->log, "PDU_Assoc::destroy this=%p", this); } -PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len) +PDU_Assoc_priv::PDU_Queue::PDU_Queue(const char *buf, int len) { - m_buf = (char *) xmalloc (len); - memcpy (m_buf, buf, len); + m_buf = (char *) xmalloc(len); + memcpy(m_buf, buf, len); m_len = len; m_next = 0; } -PDU_Assoc::PDU_Queue::~PDU_Queue() +PDU_Assoc_priv::PDU_Queue::~PDU_Queue() { - xfree (m_buf); + xfree(m_buf); } int PDU_Assoc::flush_PDU() { int r; - if (m_state != Ready && m_state != Writing) + if (m_p->state != PDU_Assoc_priv::Ready && m_p->state != PDU_Assoc_priv::Writing) { - yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready"); + yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU, not ready"); return 1; } - PDU_Queue *q = m_queue_out; + PDU_Assoc_priv::PDU_Queue *q = m_p->queue_out; if (!q) { - m_state = Ready; - yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty"); - yaz_log(m_log, "maskObserver 6"); - m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ| - SOCKET_OBSERVE_WRITE| - SOCKET_OBSERVE_EXCEPT); - if (m_session_is_dead) + m_p->state = PDU_Assoc_priv::Ready; + yaz_log(m_p->log, "YAZ_PDU_Assoc::flush_PDU queue empty"); + yaz_log(m_p->log, "maskObserver 6"); + m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ| + SOCKET_OBSERVE_WRITE| + SOCKET_OBSERVE_EXCEPT); + if (m_p->m_session_is_dead) { shutdown(); m_PDU_Observer->failNotify(); } return 0; } - r = cs_put (m_cs, q->m_buf, q->m_len); + r = cs_put(m_p->cs, q->m_buf, q->m_len); if (r < 0) { - yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put failed"); + yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put failed"); shutdown(); m_PDU_Observer->failNotify(); return r; @@ -365,31 +410,31 @@ int PDU_Assoc::flush_PDU() if (r == 1) { unsigned mask = SOCKET_OBSERVE_EXCEPT; - m_state = Writing; - if (m_cs->io_pending & CS_WANT_WRITE) + m_p->state = PDU_Assoc_priv::Writing; + if (m_p->cs->io_pending & CS_WANT_WRITE) mask |= SOCKET_OBSERVE_WRITE; - if (m_cs->io_pending & CS_WANT_READ) + if (m_p->cs->io_pending & CS_WANT_READ) mask |= SOCKET_OBSERVE_READ; mask |= SOCKET_OBSERVE_WRITE; - yaz_log(m_log, "maskObserver 7"); - m_socketObservable->maskObserver(this, mask); - yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)", - q->m_len, cs_fileno(m_cs)); + yaz_log(m_p->log, "maskObserver 7"); + m_p->m_socketObservable->maskObserver(this, mask); + yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)", + q->m_len, cs_fileno(m_p->cs)); return r; } - yaz_log (m_log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len); + yaz_log(m_p->log, "PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len); // whole packet sent... delete this and proceed to next ... - m_queue_out = q->m_next; + m_p->queue_out = q->m_next; delete q; // don't select on write if queue is empty ... - if (!m_queue_out) + if (!m_p->queue_out) { - m_state = Ready; - yaz_log(m_log, "maskObserver 8"); - m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ| - SOCKET_OBSERVE_EXCEPT); - if (m_session_is_dead) + m_p->state = PDU_Assoc_priv::Ready; + yaz_log(m_p->log, "maskObserver 8"); + m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ| + SOCKET_OBSERVE_EXCEPT); + if (m_p->m_session_is_dead) shutdown(); } return r; @@ -397,23 +442,23 @@ int PDU_Assoc::flush_PDU() int PDU_Assoc::send_PDU(const char *buf, int len) { - yaz_log (m_log, "PDU_Assoc::send_PDU"); - PDU_Queue **pq = &m_queue_out; + yaz_log(m_p->log, "PDU_Assoc::send_PDU"); + PDU_Assoc_priv::PDU_Queue **pq = &m_p->queue_out; int is_idle = (*pq ? 0 : 1); - if (!m_cs) + if (!m_p->cs) { - yaz_log (m_log, "PDU_Assoc::send_PDU failed, m_cs == 0"); + yaz_log(m_p->log, "PDU_Assoc::send_PDU failed, cs == 0"); return -1; } while (*pq) pq = &(*pq)->m_next; - *pq = new PDU_Queue(buf, len); + *pq = new PDU_Assoc_priv::PDU_Queue(buf, len); if (is_idle) - return flush_PDU (); + return flush_PDU(); else - yaz_log (m_log, "PDU_Assoc::cannot send_PDU fd=%d", - cs_fileno(m_cs)); + yaz_log(m_p->log, "PDU_Assoc::cannot send_PDU fd=%d", + cs_fileno(m_p->cs)); return 0; } @@ -426,23 +471,23 @@ int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr) { if (*addr == '\0') { - m_socketObservable->deleteObserver(this); - m_state = Closed; - if (m_cs) + m_p->m_socketObservable->deleteObserver(this); + m_p->state = PDU_Assoc_priv::Closed; + if (m_p->cs) { - yaz_log (m_log, "PDU_Assoc::close fd=%d", cs_fileno(m_cs)); - cs_close (m_cs); + yaz_log(m_p->log, "PDU_Assoc::close fd=%d", cs_fileno(m_p->cs)); + cs_close(m_p->cs); } - m_cs = 0; - while (m_queue_out) + m_p->cs = 0; + while (m_p->queue_out) { - PDU_Queue *q_this = m_queue_out; - m_queue_out = m_queue_out->m_next; + PDU_Assoc_priv::PDU_Queue *q_this = m_p->queue_out; + m_p->queue_out = m_p->queue_out->m_next; delete q_this; } - xfree (m_input_buf); - m_input_buf = 0; - m_input_len = 0; + xfree(m_p->input_buf); + m_p->input_buf = 0; + m_p->input_len = 0; return 0; } @@ -451,14 +496,14 @@ int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr) m_PDU_Observer = observer; void *ap; - m_cs = comstack(addr, &ap); + m_p->cs = comstack(addr, &ap); - if (!m_cs) + if (!m_p->cs) return -1; - if (cs_bind(m_cs, ap, CS_SERVER) < 0) + if (cs_bind(m_p->cs, ap, CS_SERVER) < 0) return -2; - int fd = cs_fileno(m_cs); + int fd = cs_fileno(m_p->cs); #if HAVE_FCNTL_H int oldflags = fcntl(fd, F_GETFD, 0); if (oldflags >= 0) @@ -467,63 +512,63 @@ int PDU_Assoc::listen(IPDU_Observer *observer, const char *addr) fcntl(fd, F_SETFD, oldflags); } #endif - m_socketObservable->addObserver(fd, this); - yaz_log(m_log, "maskObserver 9"); - m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ| - SOCKET_OBSERVE_EXCEPT); - yaz_log (m_log, "PDU_Assoc::listen ok fd=%d", fd); - m_state = Listen; + m_p->m_socketObservable->addObserver(fd, this); + yaz_log(m_p->log, "maskObserver 9"); + m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_READ| + SOCKET_OBSERVE_EXCEPT); + yaz_log(m_p->log, "PDU_Assoc::listen ok fd=%d", fd); + m_p->state = PDU_Assoc_priv::Listen; return 0; } void PDU_Assoc::idleTime(int idleTime) { - m_idleTime = idleTime; - yaz_log (m_log, "PDU_Assoc::idleTime(%d)", idleTime); - m_socketObservable->timeoutObserver(this, m_idleTime); + m_p->idleTime = idleTime; + yaz_log(m_p->log, "PDU_Assoc::idleTime(%d)", idleTime); + m_p->m_socketObservable->timeoutObserver(this, m_p->idleTime); } int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr) { - yaz_log (m_log, "PDU_Assoc::connect %s", addr); + yaz_log(m_p->log, "PDU_Assoc::connect %s", addr); shutdown(); m_PDU_Observer = observer; void *ap; - m_cs = comstack(addr, &ap); - if (!m_cs) + m_p->cs = comstack(addr, &ap); + if (!m_p->cs) return -1; - int res = cs_connect (m_cs, ap); - yaz_log (m_log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs), - res); - m_socketObservable->addObserver(cs_fileno(m_cs), this); + int res = cs_connect(m_p->cs, ap); + yaz_log(m_p->log, "PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_p->cs), + res); + m_p->m_socketObservable->addObserver(cs_fileno(m_p->cs), this); if (res == 0) { // Connect complete - m_state = Connecting; + m_p->state = PDU_Assoc_priv::Connecting; unsigned mask = SOCKET_OBSERVE_EXCEPT; mask |= SOCKET_OBSERVE_WRITE; mask |= SOCKET_OBSERVE_READ; - yaz_log(m_log, "maskObserver 11"); - m_socketObservable->maskObserver(this, mask); + yaz_log(m_p->log, "maskObserver 11"); + m_p->m_socketObservable->maskObserver(this, mask); } else if (res > 0) { // Connect pending - m_state = Connecting; + m_p->state = PDU_Assoc_priv::Connecting; unsigned mask = SOCKET_OBSERVE_EXCEPT; - if (m_cs->io_pending & CS_WANT_WRITE) + if (m_p->cs->io_pending & CS_WANT_WRITE) mask |= SOCKET_OBSERVE_WRITE; - if (m_cs->io_pending & CS_WANT_READ) + if (m_p->cs->io_pending & CS_WANT_READ) mask |= SOCKET_OBSERVE_READ; - yaz_log(m_log, "maskObserver 11"); - m_socketObservable->maskObserver(this, mask); + yaz_log(m_p->log, "maskObserver 11"); + m_p->m_socketObservable->maskObserver(this, mask); } else { // Connect failed immediately // Since m_state is Closed we can distinguish this case from // normal connect in socketNotify handler - yaz_log(m_log, "maskObserver 12"); - m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE| - SOCKET_OBSERVE_EXCEPT); + yaz_log(m_p->log, "maskObserver 12"); + m_p->m_socketObservable->maskObserver(this, SOCKET_OBSERVE_WRITE| + SOCKET_OBSERVE_EXCEPT); } return 0; } @@ -532,7 +577,7 @@ int PDU_Assoc::connect(IPDU_Observer *observer, const char *addr) void PDU_Assoc::childNotify(COMSTACK cs) { PDU_Assoc *new_observable = - new PDU_Assoc (m_socketObservable, cs); + new PDU_Assoc(m_p->m_socketObservable, cs); // Clone PDU Observer new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify @@ -544,16 +589,16 @@ void PDU_Assoc::childNotify(COMSTACK cs) delete new_observable; return; } - new_observable->m_next = m_children; - m_children = new_observable; - new_observable->m_parent = this; + new_observable->m_p->pdu_next = m_p->pdu_children; + m_p->pdu_children = new_observable; + new_observable->m_p->pdu_parent = this; } const char*PDU_Assoc::getpeername() { - if (!m_cs) + if (!m_p->cs) return 0; - return cs_addrstr(m_cs); + return cs_addrstr(m_p->cs); } /* * Local variables: -- 1.7.10.4