X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Fyaz-pdu-assoc.cpp;h=62e024796fa68551e202a6c13b7cec513098354c;hb=d3d51bee2100324025f7530d4e3ff3502397fd83;hp=203ba34024dc922aa22082a9eff74b0e86eb0e93;hpb=ceb226cb18d96a6b3aa2dd6cff94aa27850362cf;p=yazpp-moved-to-github.git diff --git a/src/yaz-pdu-assoc.cpp b/src/yaz-pdu-assoc.cpp index 203ba34..62e0247 100644 --- a/src/yaz-pdu-assoc.cpp +++ b/src/yaz-pdu-assoc.cpp @@ -1,8 +1,8 @@ /* - * Copyright (c) 1998-2001, Index Data. + * Copyright (c) 1998-2003, Index Data. * See the file LICENSE for details. * - * $Id: yaz-pdu-assoc.cpp,v 1.28 2002-10-09 12:50:26 adam Exp $ + * $Id: yaz-pdu-assoc.cpp,v 1.37 2003-10-23 11:45:08 adam Exp $ */ #include @@ -20,6 +20,7 @@ void Yaz_PDU_Assoc::init(IYazSocketObservable *socketObservable) m_socketObservable = socketObservable; m_PDU_Observer = 0; m_queue_out = 0; + m_queue_in = 0; m_input_buf = 0; m_input_len = 0; m_children = 0; @@ -58,6 +59,7 @@ Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable, // assume comstack is accepting... m_state = Accepting; m_socketObservable->addObserver(cs_fileno(cs), this); + yaz_log(m_log, "maskObserver 1"); m_socketObservable->maskObserver(this, mask |YAZ_SOCKET_OBSERVE_EXCEPT); } @@ -109,6 +111,7 @@ void Yaz_PDU_Assoc::socketNotify(int event) } else { // accept still incomplete. + yaz_log(m_log, "maskObserver 2"); m_socketObservable->maskObserver(this, mask|YAZ_SOCKET_OBSERVE_EXCEPT); } @@ -133,6 +136,7 @@ void Yaz_PDU_Assoc::socketNotify(int event) mask |= YAZ_SOCKET_OBSERVE_WRITE; if (m_cs->io_pending & CS_WANT_READ) mask |= YAZ_SOCKET_OBSERVE_READ; + yaz_log(m_log, "maskObserver 3"); m_socketObservable->maskObserver(this, mask); } else @@ -187,6 +191,7 @@ void Yaz_PDU_Assoc::socketNotify(int event) mask |= YAZ_SOCKET_OBSERVE_WRITE; if (m_cs->io_pending & CS_WANT_READ) mask |= YAZ_SOCKET_OBSERVE_READ; + yaz_log(m_log, "maskObserver 4"); m_socketObservable->maskObserver(this, mask); return; } @@ -204,16 +209,26 @@ void Yaz_PDU_Assoc::socketNotify(int event) if (!m_PDU_Observer) return; - +#if 0 + PDU_Queue **pq = &m_queue_in; + while (*pq) + pq = &(*pq)->m_next; + + *pq = new PDU_Queue(m_input_buf, res); +#else m_PDU_Observer->recv_PDU(m_input_buf, res); - m_destroyed = 0; +#endif if (destroyed) // it really was destroyed, return now. return; + m_destroyed = 0; } while (m_cs && cs_more (m_cs)); - if (m_cs) + if (m_cs && m_state == Ready) + { + yaz_log(m_log, "maskObserver 5"); m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_EXCEPT| YAZ_SOCKET_OBSERVE_READ); + } } break; case Closed: @@ -311,6 +326,7 @@ int Yaz_PDU_Assoc::flush_PDU() { m_state = Ready; yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty"); + yaz_log(m_log, "maskObserver 6"); m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| YAZ_SOCKET_OBSERVE_WRITE| YAZ_SOCKET_OBSERVE_EXCEPT); @@ -332,21 +348,26 @@ int Yaz_PDU_Assoc::flush_PDU() mask |= YAZ_SOCKET_OBSERVE_WRITE; if (m_cs->io_pending & CS_WANT_READ) mask |= YAZ_SOCKET_OBSERVE_READ; - + + mask |= YAZ_SOCKET_OBSERVE_WRITE; + yaz_log(m_log, "maskObserver 7"); m_socketObservable->maskObserver(this, mask); - yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes (incomp)", - q->m_len); + yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)", + q->m_len, cs_fileno(m_cs)); return r; } - m_state = Ready; yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len); // whole packet sent... delete this and proceed to next ... m_queue_out = q->m_next; delete q; // don't select on write if queue is empty ... if (!m_queue_out) + { + m_state = Ready; + yaz_log(m_log, "maskObserver 8"); m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| YAZ_SOCKET_OBSERVE_EXCEPT); + } return r; } @@ -374,29 +395,29 @@ int Yaz_PDU_Assoc::send_PDU(const char *buf, int len) COMSTACK Yaz_PDU_Assoc::comstack(const char *type_and_host, void **vp) { - return cs_create_host(type_and_host, 0, vp); + return cs_create_host(type_and_host, 2, vp); } -void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer, - const char *addr) +int Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer, + const char *addr) { close(); - yaz_log (LOG_LOG, "Adding listener %s", addr); - m_PDU_Observer = observer; void *ap; m_cs = comstack(addr, &ap); if (!m_cs) - return; + return -1; if (cs_bind(m_cs, ap, CS_SERVER) < 0) - return; + return -2; m_socketObservable->addObserver(cs_fileno(m_cs), this); + yaz_log(m_log, "maskObserver 9"); m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| YAZ_SOCKET_OBSERVE_EXCEPT); yaz_log (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(m_cs)); m_state = Listen; + return 0; } void Yaz_PDU_Assoc::idleTime(int idleTime) @@ -406,7 +427,7 @@ void Yaz_PDU_Assoc::idleTime(int idleTime) m_socketObservable->timeoutObserver(this, m_idleTime); } -void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer, +int Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer, const char *addr) { yaz_log (m_log, "Yaz_PDU_Assoc::connect %s", addr); @@ -414,28 +435,42 @@ void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer, m_PDU_Observer = observer; void *ap; m_cs = comstack(addr, &ap); + if (!m_cs) + return -1; int res = cs_connect (m_cs, ap); yaz_log (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs), res); m_socketObservable->addObserver(cs_fileno(m_cs), this); - if (res >= 0) - { // Connect pending or complete + if (res == 0) + { // Connect complete + m_state = Connecting; + unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT; + mask |= YAZ_SOCKET_OBSERVE_WRITE; + mask |= YAZ_SOCKET_OBSERVE_READ; + yaz_log(m_log, "maskObserver 11"); + m_socketObservable->maskObserver(this, mask); + } + else if (res > 0) + { // Connect pending m_state = Connecting; unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT; if (m_cs->io_pending & CS_WANT_WRITE) mask |= YAZ_SOCKET_OBSERVE_WRITE; if (m_cs->io_pending & CS_WANT_READ) mask |= YAZ_SOCKET_OBSERVE_READ; + yaz_log(m_log, "maskObserver 11"); m_socketObservable->maskObserver(this, mask); } else { // Connect failed immediately // Since m_state is Closed we can distinguish this case from // normal connect in socketNotify handler + yaz_log(m_log, "maskObserver 12"); m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_WRITE| YAZ_SOCKET_OBSERVE_EXCEPT); } + return 0; } // Single-threaded... Only useful for non-blocking handlers @@ -452,3 +487,8 @@ void Yaz_PDU_Assoc::childNotify(COMSTACK cs) new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify (new_observable, cs_fileno(cs)); } + +const char*Yaz_PDU_Assoc::getpeername() +{ + return cs_addrstr(m_cs); +}