Log request number. More configurable keepalive with pdu/bw limits.
[yazpp-moved-to-github.git] / src / yaz-pdu-assoc.cpp
index 17dc1b0..c6b1668 100644 (file)
@@ -2,86 +2,7 @@
  * Copyright (c) 1998-2001, Index Data.
  * See the file LICENSE for details.
  * 
- * $Log: yaz-pdu-assoc.cpp,v $
- * Revision 1.24  2001-08-13 16:39:12  adam
- * PDU_Assoc keeps track of children. Using yaz_log instead of logf.
- *
- * Revision 1.23  2001/03/26 14:43:49  adam
- * New threaded PDU association.
- *
- * Revision 1.22  2001/01/29 11:18:24  adam
- * Server sets OPTIONS search and present.
- *
- * Revision 1.21  2000/11/20 14:17:36  adam
- * Yet another WIN32 fix for connect notify.
- *
- * Revision 1.20  2000/11/20 11:27:33  adam
- * Fixes for connect operation (timeout and notify fix).
- *
- * Revision 1.19  2000/11/01 14:22:59  adam
- * Added fd parameter for method IYaz_PDU_Observer::clone.
- *
- * Revision 1.18  2000/10/24 12:29:57  adam
- * Fixed bug in proxy where a Yaz_ProxyClient could be owned by
- * two Yaz_Proxy's (fatal).
- *
- * Revision 1.17  2000/10/11 11:58:16  adam
- * Moved header files to include/yaz++. Switched to libtool and automake.
- * Configure script creates yaz++-config script.
- *
- * Revision 1.16  2000/09/22 09:54:11  heikki
- * minor
- *
- * Revision 1.15  2000/09/21 21:43:20  adam
- * Better high-level server API.
- *
- * Revision 1.14  2000/09/12 12:09:53  adam
- * More work on high-level server.
- *
- * Revision 1.13  2000/09/08 10:23:42  adam
- * Added skeleton of yaz-z-server.
- *
- * Revision 1.12  2000/09/06 14:23:45  adam
- * WIN32 updates.
- *
- * Revision 1.11  2000/09/04 08:29:22  adam
- * Fixed memory leak(s). Added re-use of associations, rather than
- * re-init, when maximum number of targets are in use.
- *
- * Revision 1.10  2000/08/10 08:42:42  adam
- * Fixes for {set,get}_APDU_log.
- *
- * Revision 1.9  1999/12/06 13:52:45  adam
- * Modified for new location of YAZ header files. Experimental threaded
- * operation.
- *
- * Revision 1.8  1999/04/28 13:04:03  adam
- * Fixed setting of proxy otherInfo so that database(s) are removed.
- *
- * Revision 1.7  1999/04/21 12:09:01  adam
- * Many improvements. Modified to proxy server to work with "sessions"
- * based on cookies.
- *
- * Revision 1.6  1999/04/20 10:30:05  adam
- * Implemented various stuff for client and proxy. Updated calls
- * to ODR to reflect new name parameter.
- *g
- * Revision 1.5  1999/04/09 11:46:57  adam
- * Added object Yaz_Z_Assoc. Much more functional client.
- *
- * Revision 1.4  1999/03/23 14:17:57  adam
- * More work on timeout handling. Work on yaz-client.
- *
- * Revision 1.3  1999/02/02 14:01:20  adam
- * First WIN32 port of YAZ++.
- *
- * Revision 1.2  1999/01/28 13:08:44  adam
- * Yaz_PDU_Assoc better encapsulated. Memory leak fix in
- * yaz-socket-manager.cc.
- *
- * Revision 1.1.1.1  1999/01/28 09:41:07  adam
- * First implementation of YAZ++.
- *
+ * $Id: yaz-pdu-assoc.cpp,v 1.31 2003-10-09 12:11:10 adam Exp $
  */
 
 #include <assert.h>
@@ -89,7 +10,7 @@
 #include <yaz/log.h>
 #include <yaz/tcpip.h>
 
-#include <yaz++/yaz-pdu-assoc.h>
+#include <yaz++/pdu-assoc.h>
 
 
 void Yaz_PDU_Assoc::init(IYazSocketObservable *socketObservable)
@@ -137,6 +58,7 @@ Yaz_PDU_Assoc::Yaz_PDU_Assoc(IYazSocketObservable *socketObservable,
        // assume comstack is accepting...
        m_state = Accepting;
        m_socketObservable->addObserver(cs_fileno(cs), this);
+       yaz_log(m_log, "maskObserver 1");
        m_socketObservable->maskObserver(this,
                                         mask |YAZ_SOCKET_OBSERVE_EXCEPT);
     }
@@ -188,6 +110,7 @@ void Yaz_PDU_Assoc::socketNotify(int event)
            }
            else  
            {   // accept still incomplete.
+               yaz_log(m_log, "maskObserver 2");
                m_socketObservable->maskObserver(this,
                                             mask|YAZ_SOCKET_OBSERVE_EXCEPT);
            }
@@ -203,8 +126,8 @@ void Yaz_PDU_Assoc::socketNotify(int event)
        }
        else
        {
-           yaz_log (m_log, "cs_connect again");
-           int res = cs_connect (m_cs, 0);
+           yaz_log (m_log, "cs_rcvconnect");
+           int res = cs_rcvconnect (m_cs);
            if (res == 1)
            {
                unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
@@ -212,6 +135,7 @@ void Yaz_PDU_Assoc::socketNotify(int event)
                    mask |= YAZ_SOCKET_OBSERVE_WRITE;
                if (m_cs->io_pending & CS_WANT_READ)
                    mask |= YAZ_SOCKET_OBSERVE_READ;
+               yaz_log(m_log, "maskObserver 3");
                m_socketObservable->maskObserver(this, mask);
            }
            else
@@ -266,6 +190,7 @@ void Yaz_PDU_Assoc::socketNotify(int event)
                         mask |= YAZ_SOCKET_OBSERVE_WRITE;
                     if (m_cs->io_pending & CS_WANT_READ)
                         mask |= YAZ_SOCKET_OBSERVE_READ;
+                   yaz_log(m_log, "maskObserver 4");
                    m_socketObservable->maskObserver(this, mask);
                    return;
                 }
@@ -289,10 +214,13 @@ void Yaz_PDU_Assoc::socketNotify(int event)
                if (destroyed)   // it really was destroyed, return now.
                    return;
            } while (m_cs && cs_more (m_cs));
-           if (m_cs)
+           if (m_cs && m_state == Ready)
+            {
+               yaz_log(m_log, "maskObserver 5");
                m_socketObservable->maskObserver(this,
                                                 YAZ_SOCKET_OBSERVE_EXCEPT|
                                                 YAZ_SOCKET_OBSERVE_READ);
+           }
        }
        break;
     case Closed:
@@ -365,7 +293,7 @@ void Yaz_PDU_Assoc::destroy()
 
 Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
 {
-    m_buf = (char *) malloc (len);
+    m_buf = (char *) xmalloc (len);
     memcpy (m_buf, buf, len);
     m_len = len;
     m_next = 0;
@@ -373,7 +301,7 @@ Yaz_PDU_Assoc::PDU_Queue::PDU_Queue(const char *buf, int len)
 
 Yaz_PDU_Assoc::PDU_Queue::~PDU_Queue()
 {
-    free (m_buf);
+    xfree (m_buf);
 }
 
 int Yaz_PDU_Assoc::flush_PDU()
@@ -390,6 +318,7 @@ int Yaz_PDU_Assoc::flush_PDU()
     {
        m_state = Ready;
        yaz_log (m_log, "YAZ_PDU_Assoc::flush_PDU queue empty");
+       yaz_log(m_log, "maskObserver 6");
        m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
                                         YAZ_SOCKET_OBSERVE_WRITE|
                                         YAZ_SOCKET_OBSERVE_EXCEPT);
@@ -411,21 +340,26 @@ int Yaz_PDU_Assoc::flush_PDU()
             mask |= YAZ_SOCKET_OBSERVE_WRITE;
         if (m_cs->io_pending & CS_WANT_READ)
             mask |= YAZ_SOCKET_OBSERVE_READ;
+
+       mask |= YAZ_SOCKET_OBSERVE_WRITE;
+       yaz_log(m_log, "maskObserver 7");
        m_socketObservable->maskObserver(this, mask);
-        yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes (incomp)",
-                q->m_len);
+        yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes fd=%d (inc)",
+                q->m_len, cs_fileno(m_cs));
         return r;
     } 
-    m_state = Ready;
     yaz_log (m_log, "Yaz_PDU_Assoc::flush_PDU cs_put %d bytes", q->m_len);
     // whole packet sent... delete this and proceed to next ...
     m_queue_out = q->m_next;
     delete q;
     // don't select on write if queue is empty ...
     if (!m_queue_out)
+    {
+        m_state = Ready;
+       yaz_log(m_log, "maskObserver 8");
        m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
                                         YAZ_SOCKET_OBSERVE_EXCEPT);
+    }
     return r;
 }
 
@@ -453,7 +387,7 @@ int Yaz_PDU_Assoc::send_PDU(const char *buf, int len)
 
 COMSTACK Yaz_PDU_Assoc::comstack(const char *type_and_host, void **vp)
 {
-    return cs_create_host(type_and_host, 0, vp);
+    return cs_create_host(type_and_host, 2, vp);
 }
 
 void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
@@ -472,6 +406,7 @@ void Yaz_PDU_Assoc::listen(IYaz_PDU_Observer *observer,
     if (cs_bind(m_cs, ap, CS_SERVER) < 0)
         return;
     m_socketObservable->addObserver(cs_fileno(m_cs), this);
+    yaz_log(m_log, "maskObserver 9");
     m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_READ|
                                     YAZ_SOCKET_OBSERVE_EXCEPT);
     yaz_log (m_log, "Yaz_PDU_Assoc::listen ok fd=%d", cs_fileno(m_cs));
@@ -498,20 +433,31 @@ void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
             res);
     m_socketObservable->addObserver(cs_fileno(m_cs), this);
 
-    if (res >= 0)
-    {   // Connect pending or complet
+    if (res == 0)
+    {   // Connect complete
+       m_state = Connecting;
+       unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
+       mask |= YAZ_SOCKET_OBSERVE_WRITE;
+       mask |= YAZ_SOCKET_OBSERVE_READ;
+       yaz_log(m_log, "maskObserver 11");
+       m_socketObservable->maskObserver(this, mask);
+    }
+    else if (res > 0)
+    {   // Connect pending
        m_state = Connecting;
        unsigned mask = YAZ_SOCKET_OBSERVE_EXCEPT;
        if (m_cs->io_pending & CS_WANT_WRITE)
            mask |= YAZ_SOCKET_OBSERVE_WRITE;
        if (m_cs->io_pending & CS_WANT_READ)
            mask |= YAZ_SOCKET_OBSERVE_READ;
+       yaz_log(m_log, "maskObserver 11");
        m_socketObservable->maskObserver(this, mask);
     }
     else
     {   // Connect failed immediately
        // Since m_state is Closed we can distinguish this case from
         // normal connect in socketNotify handler
+       yaz_log(m_log, "maskObserver 12");
        m_socketObservable->maskObserver(this, YAZ_SOCKET_OBSERVE_WRITE|
                                         YAZ_SOCKET_OBSERVE_EXCEPT);
     }
@@ -520,8 +466,6 @@ void Yaz_PDU_Assoc::connect(IYaz_PDU_Observer *observer,
 // Single-threaded... Only useful for non-blocking handlers
 void Yaz_PDU_Assoc::childNotify(COMSTACK cs)
 {
-
     Yaz_PDU_Assoc *new_observable =
        new Yaz_PDU_Assoc (m_socketObservable, cs);
     
@@ -533,3 +477,8 @@ void Yaz_PDU_Assoc::childNotify(COMSTACK cs)
     new_observable->m_PDU_Observer = m_PDU_Observer->sessionNotify
        (new_observable, cs_fileno(cs));
 }
+
+const char*Yaz_PDU_Assoc::getpeername()
+{
+    return cs_addrstr(m_cs);
+}