X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Fyaz-pdu-assoc.cpp;h=17dc1b0c37dc9c276cc3ecf9f4e404d3dd7dc94f;hb=e55896f9cb43034c3e8743b51ef79936f5404bc3;hp=566f9b0bf251efea1e679f1f333e8e06b272be0a;hpb=bd71f8812ca0f38438733efc89ecce1f49dae9e7;p=yazpp-moved-to-github.git diff --git a/src/yaz-pdu-assoc.cpp b/src/yaz-pdu-assoc.cpp index 566f9b0..17dc1b0 100644 --- a/src/yaz-pdu-assoc.cpp +++ b/src/yaz-pdu-assoc.cpp @@ -3,7 +3,10 @@ * See the file LICENSE for details. * * $Log: yaz-pdu-assoc.cpp,v $ - * Revision 1.23 2001-03-26 14:43:49 adam + * Revision 1.24 2001-08-13 16:39:12 adam + * PDU_Assoc keeps track of children. Using yaz_log instead of logf. + * + * Revision 1.23 2001/03/26 14:43:49 adam * New threaded PDU association. * * Revision 1.22 2001/01/29 11:18:24 adam @@ -214,7 +217,8 @@ void Yaz_PDU_Assoc::socketNotify(int event) else { m_state = Ready; - m_PDU_Observer->connectNotify(); + if (m_PDU_Observer) + m_PDU_Observer->connectNotify(); flush_PDU(); } } @@ -240,6 +244,8 @@ void Yaz_PDU_Assoc::socketNotify(int event) setup observer for child fileid in pdu-assoc 4. start thread */ + yaz_log (m_log, "new session: parent fd=%d child fd=%d", + cs_fileno(m_cs), cs_fileno(new_line)); childNotify (new_line); } break; @@ -267,13 +273,17 @@ void Yaz_PDU_Assoc::socketNotify(int event) { yaz_log (m_log, "Yaz_PDU_Assoc::Connection closed by peer"); close(); - m_PDU_Observer->failNotify(); // problem here.. + if (m_PDU_Observer) + m_PDU_Observer->failNotify(); // problem here.. return; } // lock it, so we know if recv_PDU deletes it. int destroyed = 0; m_destroyed = &destroyed; + if (!m_PDU_Observer) + return; + m_PDU_Observer->recv_PDU(m_input_buf, res); m_destroyed = 0; if (destroyed) // it really was destroyed, return now. @@ -286,12 +296,12 @@ void Yaz_PDU_Assoc::socketNotify(int event) } break; case Closed: - logf (m_log, "CLOSING state=%d event was %d", m_state, event); + yaz_log (m_log, "CLOSING state=%d event was %d", m_state, event); close(); m_PDU_Observer->failNotify(); break; default: - logf (m_log, "Unknown state=%d event was %d", m_state, event); + yaz_log (m_log, "Unknown state=%d event was %d", m_state, event); close(); m_PDU_Observer->failNotify(); } @@ -299,11 +309,15 @@ void Yaz_PDU_Assoc::socketNotify(int event) void Yaz_PDU_Assoc::close() { + Yaz_PDU_Assoc *ch; + for (ch = m_children; ch; ch = ch->m_next) + ch->close(); + m_socketObservable->deleteObserver(this); m_state = Closed; if (m_cs) { - logf (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs)); + yaz_log (m_log, "Yaz_PDU_Assoc::close fd=%d", cs_fileno(m_cs)); cs_close (m_cs); } m_cs = 0; @@ -321,6 +335,7 @@ void Yaz_PDU_Assoc::close() void Yaz_PDU_Assoc::destroy() { close(); + if (m_destroyed) *m_destroyed = 1; Yaz_PDU_Assoc **c; @@ -345,6 +360,7 @@ void Yaz_PDU_Assoc::destroy() here->m_parent = 0; delete here; } + yaz_log (m_log, "Yaz_PDU_Assoc::destroy this=%p", this); } Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len) @@ -366,14 +382,14 @@ int Yaz_PDU_Assoc::flush_PDU() if (m_state != Ready && m_state != Writing) { - logf (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready"); + yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU, not ready"); return 1; } PDU_Queue *q = m_queue_out; if (!q) { m_state = Ready; - logf (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty"); + yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty"); m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| YAZ_SOCKET_OBSERVE_WRITE| YAZ_SOCKET_OBSERVE_EXCEPT); @@ -382,7 +398,7 @@ int Yaz_PDU_Assoc::flush_PDU() r = cs_put (m_cs, q->m_buf, q->m_len); if (r < 0) { - logf (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put failed"); + yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put failed"); close(); m_PDU_Observer->failNotify(); return r; @@ -397,12 +413,12 @@ int Yaz_PDU_Assoc::flush_PDU() mask |= YAZ_SOCKET_OBSERVE_READ; m_socketObservable->maskObserver(this, mask); - logf (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes (incomplete)", - q->m_len); + yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes (incomp)", + q->m_len); return r; } m_state = Ready; - logf (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len); + yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len); // whole packet sent... delete this and proceed to next ... m_queue_out = q->m_next; delete q; @@ -415,13 +431,13 @@ int Yaz_PDU_Assoc::flush_PDU() int Yaz_PDU_Assoc::send_PDU(const char *buf, int len) { - logf (m_log, "Yaz_PDU_Assoc::send_PDU"); + yaz_log (m_log, "Yaz_PDU_Assoc::send_PDU"); PDU_Queue **pq = &m_queue_out; int is_idle = (*pq ? 0 : 1); if (!m_cs) { - logf (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0"); + yaz_log (m_log, "Yaz_PDU_Assoc::send_PDU failed, m_cs == 0"); return -1; } while (*pq) @@ -430,8 +446,8 @@ int Yaz_PDU_Assoc::send_PDU(const char *buf, int len) if (is_idle) return flush_PDU (); else - logf (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d", - cs_fileno(m_cs)); + yaz_log (m_log, "Yaz_PDU_Assoc::cannot send_PDU fd=%d", + cs_fileno(m_cs)); return 0; } @@ -445,7 +461,7 @@ void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer, { close(); - logf (LOG_LOG, "Adding listener %s", addr); + yaz_log (LOG_LOG, "Adding listener %s", addr); m_PDU_Observer = observer; void *ap; @@ -458,27 +474,28 @@ void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer, m_socketObservable->addObserver(cs_fileno(m_cs), this); m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ| YAZ_SOCKET_OBSERVE_EXCEPT); - logf (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(m_cs)); + yaz_log (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(m_cs)); m_state = Listen; } void Yaz_PDU_Assoc::idleTime(int idleTime) { m_idleTime = idleTime; - logf (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime); + yaz_log (m_log, "Yaz_PDU_Assoc::idleTime(%d)", idleTime); m_socketObservable->timeoutObserver(this, m_idleTime); } void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer, const char *addr) { - logf (m_log, "Yaz_PDU_Assoc::connect %s", addr); + yaz_log (m_log, "Yaz_PDU_Assoc::connect %s", addr); close(); m_PDU_Observer = observer; void *ap; m_cs = comstack(addr, &ap); int res = cs_connect (m_cs, ap); - logf (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs), res); + yaz_log (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs), + res); m_socketObservable->addObserver(cs_fileno(m_cs), this); if (res >= 0) @@ -503,9 +520,15 @@ void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer, // Single-threaded... Only useful for non-blocking handlers void Yaz_PDU_Assoc::childNotify(COMSTACK cs) { + + Yaz_PDU_Assoc *new_observable = new Yaz_PDU_Assoc (m_socketObservable, cs); + new_observable->m_next = m_children; + m_children = new_observable; + new_observable->m_parent = this; + // Clone PDU Observer new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify (new_observable, cs_fileno(cs));