Updates for yaz/ylog.h
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
index 203ba34..d888cef 100644 (file)
@@ -1,13 +1,13 @@
 /*
- * Copyright (c) 1998-2001, Index Data.
+ * Copyright (c) 1998-2003, Index Data.
  * See the file LICENSE for details.
  * 
- * $Id: yaz-pdu-assoc.cpp,v 1.28 2002-10-09 12:50:26 adam Exp $
+ * $Id: yaz-pdu-assoc.cpp,v 1.38 2004-11-30 21:10:31 adam Exp $
  */
 
 #include <assert.h>
 #include <string.h>
-#include <yaz/log.h>
+#include <yaz/ylog.h>
 #include <yaz/tcpip.h>
 
 #include <yaz++/pdu-assoc.h>
@@ -20,6 +20,7 @@ void Yaz_PDU_Assoc::init(IYazSocketObservable *socketObservable)
     m_socketObservable = socketObservable;
     m_PDU_Observer = 0;
     m_queue_out = 0;
+    m_queue_in = 0;
     m_input_buf = 0;
     m_input_len = 0;
     m_children = 0;
@@ -27,7 +28,7 @@ void Yaz_PDU_Assoc::init(IYazSocketObservable *socketObservable)
     m_next = 0;
     m_destroyed = 0;
     m_idleTime = 0;
-    m_log = LOG_DEBUG;
+    m_log = YLOG_DEBUG;
 }
 
 Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable)
@@ -58,6 +59,7 @@ Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable,
        // assume comstack is accepting...
        m_state = Accepting;
        m_socketObservable->addObserver(cs_fileno(cs), this);
+       yaz_log(m_log, "maskObserver 1");
        m_socketObservable->maskObserver(this,
                                         mask |YAZ_SOCKET_OBSERVE_EXCEPT);
     }
@@ -109,6 +111,7 @@ void Yaz_PDU_Assoc::socketNotify(int event)
            }
            else  
            {   // accept still incomplete.
+               yaz_log(m_log, "maskObserver 2");
                m_socketObservable->maskObserver(this,
                                             mask|YAZ_SOCKET_OBSERVE_EXCEPT);
            }
@@ -133,6 +136,7 @@ void Yaz_PDU_Assoc::socketNotify(int event)
                    mask |= YAZ_SOCKET_OBSERVE_WRITE;
                if (m_cs->io_pending & CS_WANT_READ)
                    mask |= YAZ_SOCKET_OBSERVE_READ;
+               yaz_log(m_log, "maskObserver 3");
                m_socketObservable->maskObserver(this, mask);
            }
            else
@@ -154,7 +158,7 @@ void Yaz_PDU_Assoc::socketNotify(int event)
                return;
            if (res < 0)
            {
-               yaz_log(LOG_FATAL|LOG_ERRNO, "cs_listen failed");
+               yaz_log(YLOG_FATAL|YLOG_ERRNO, "cs_listen failed");
                return;
            }
            if (!(new_line = cs_accept(m_cs)))
@@ -187,6 +191,7 @@ void Yaz_PDU_Assoc::socketNotify(int event)
                         mask |= YAZ_SOCKET_OBSERVE_WRITE;
                     if (m_cs->io_pending & CS_WANT_READ)
                         mask |= YAZ_SOCKET_OBSERVE_READ;
+                   yaz_log(m_log, "maskObserver 4");
                    m_socketObservable->maskObserver(this, mask);
                    return;
                 }
@@ -204,16 +209,26 @@ void Yaz_PDU_Assoc::socketNotify(int event)
 
                if (!m_PDU_Observer)
                    return;
-
+#if 0
+               PDU_Queue **pq = &m_queue_in;
+               while (*pq)
+                   pq = &(*pq)->m_next;
+               
+               *pq = new PDU_Queue(m_input_buf, res);
+#else
                m_PDU_Observer->recv_PDU(m_input_buf, res);
-                m_destroyed = 0;
+#endif
                if (destroyed)   // it really was destroyed, return now.
                    return;
+                m_destroyed = 0;
            } while (m_cs && cs_more (m_cs));
-           if (m_cs)
+           if (m_cs && m_state == Ready)
+            {
+               yaz_log(m_log, "maskObserver 5");
                m_socketObservable->maskObserver(this,
                                                 YAZ_SOCKET_OBSERVE_EXCEPT|
                                                 YAZ_SOCKET_OBSERVE_READ);
+           }
        }
        break;
     case Closed:
@@ -311,6 +326,7 @@ int Yaz_PDU_Assoc::flush_PDU()
     {
        m_state = Ready;
        yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty");
+       yaz_log(m_log, "maskObserver 6");
        m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
                                         YAZ_SOCKET_OBSERVE_WRITE|
                                         YAZ_SOCKET_OBSERVE_EXCEPT);
@@ -332,21 +348,26 @@ int Yaz_PDU_Assoc::flush_PDU()
             mask |= YAZ_SOCKET_OBSERVE_WRITE;
         if (m_cs->io_pending & CS_WANT_READ)
             mask |= YAZ_SOCKET_OBSERVE_READ;
+
+       mask |= YAZ_SOCKET_OBSERVE_WRITE;
+       yaz_log(m_log, "maskObserver 7");
        m_socketObservable->maskObserver(this, mask);
-        yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes (incomp)",
-                q->m_len);
+        yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
+                q->m_len, cs_fileno(m_cs));
         return r;
     } 
-    m_state = Ready;
     yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
     // whole packet sent... delete this and proceed to next ...
     m_queue_out = q->m_next;
     delete q;
     // don't select on write if queue is empty ...
     if (!m_queue_out)
+    {
+        m_state = Ready;
+       yaz_log(m_log, "maskObserver 8");
        m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
                                         YAZ_SOCKET_OBSERVE_EXCEPT);
+    }
     return r;
 }
 
@@ -374,29 +395,29 @@ int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
 
 COMSTACK Yaz_PDU_Assoc::comstack(const char *type_and_host, void **vp)
 {
-    return cs_create_host(type_and_host, 0, vp);
+    return cs_create_host(type_and_host, 2, vp);
 }
 
-void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
-                          const char *addr)
+int Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
+                         const char *addr)
 {
     close();
 
-    yaz_log (LOG_LOG, "Adding listener %s", addr);
-
     m_PDU_Observer = observer;
     void *ap;
     m_cs = comstack(addr, &ap);
 
     if (!m_cs)
-        return;
+        return -1;
     if (cs_bind(m_cs, ap, CS_SERVER) < 0)
-        return;
+        return -2;
     m_socketObservable->addObserver(cs_fileno(m_cs), this);
+    yaz_log(m_log, "maskObserver 9");
     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
                                     YAZ_SOCKET_OBSERVE_EXCEPT);
     yaz_log (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(m_cs));
     m_state = Listen;
+    return 0;
 }
 
 void Yaz_PDU_Assoc::idleTime(int idleTime)
@@ -406,7 +427,7 @@ void Yaz_PDU_Assoc::idleTime(int idleTime)
     m_socketObservable->timeoutObserver(this, m_idleTime);
 }
 
-void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
+int Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
                            const char *addr)
 {
     yaz_log (m_log, "Yaz_PDU_Assoc::connect %s", addr);
@@ -414,28 +435,42 @@ void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
     m_PDU_Observer = observer;
     void *ap;
     m_cs = comstack(addr, &ap);
+    if (!m_cs)
+       return -1;
     int res = cs_connect (m_cs, ap);
     yaz_log (m_log, "Yaz_PDU_Assoc::connect fd=%d res=%d", cs_fileno(m_cs),
             res);
     m_socketObservable->addObserver(cs_fileno(m_cs), this);
 
-    if (res >= 0)
-    {   // Connect pending or complete
+    if (res == 0)
+    {   // Connect complete
+       m_state = Connecting;
+       unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
+       mask |= YAZ_SOCKET_OBSERVE_WRITE;
+       mask |= YAZ_SOCKET_OBSERVE_READ;
+       yaz_log(m_log, "maskObserver 11");
+       m_socketObservable->maskObserver(this, mask);
+    }
+    else if (res > 0)
+    {   // Connect pending
        m_state = Connecting;
        unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
        if (m_cs->io_pending & CS_WANT_WRITE)
            mask |= YAZ_SOCKET_OBSERVE_WRITE;
        if (m_cs->io_pending & CS_WANT_READ)
            mask |= YAZ_SOCKET_OBSERVE_READ;
+       yaz_log(m_log, "maskObserver 11");
        m_socketObservable->maskObserver(this, mask);
     }
     else
     {   // Connect failed immediately
        // Since m_state is Closed we can distinguish this case from
         // normal connect in socketNotify handler
+       yaz_log(m_log, "maskObserver 12");
        m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_WRITE|
                                         YAZ_SOCKET_OBSERVE_EXCEPT);
     }
+    return 0;
 }
 
 // Single-threaded... Only useful for non-blocking handlers
@@ -452,3 +487,8 @@ void Yaz_PDU_Assoc::childNotify(COMSTACK cs)
     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
        (new_observable, cs_fileno(cs));
 }
+
+const char*Yaz_PDU_Assoc::getpeername()
+{
+    return cs_addrstr(m_cs);
+}