Add support for thread config element which specifies number of
[yazproxy-moved-to-github.git] / src / yaz-proxy.cpp
index 476e045..194b973 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: yaz-proxy.cpp,v 1.61 2006-04-22 07:03:34 adam Exp $
+/* $Id: yaz-proxy.cpp,v 1.68 2006-06-09 09:35:14 adam Exp $
    Copyright (c) 1998-2006, Index Data.
 
 This file is part of the yazproxy.
@@ -59,9 +59,6 @@ using namespace yazpp_1;
 #define strncasecmp _strnicmp
 #endif
 
-#define USE_AUTH_MSG 1
-
-#if USE_AUTH_MSG
 class YAZ_EXPORT Auth_Msg : public IMsg_Thread {
 public:
     int m_ret;
@@ -106,7 +103,7 @@ IMsg_Thread *Auth_Msg::handle()
 
 void Auth_Msg::result()
 {
-    if (m_proxy->dec_ref(false))
+    if (m_proxy->dec_ref())
     {
         yaz_log(YLOG_LOG, "Auth_Msg::proxy deleted meanwhile");
     }
@@ -122,8 +119,6 @@ void Auth_Msg::result()
     delete this;
 }
 
-#endif
-
 void Yaz_Proxy::result_authentication(Z_APDU *apdu, int ret)
 {
     if (apdu == 0 || ret == 0)
@@ -131,7 +126,7 @@ void Yaz_Proxy::result_authentication(Z_APDU *apdu, int ret)
         Z_APDU *apdu_reject = zget_APDU(odr_encode(), Z_APDU_initResponse);
         *apdu_reject->u.initResponse->result = 0;
         send_to_client(apdu_reject);
-        dec_ref(false);
+        dec_ref();
     }
     else
     {
@@ -295,6 +290,7 @@ Yaz_Proxy::Yaz_Proxy(IPDU_Observable *the_PDU_Observable,
     m_ref_count = 1;
     m_main_ptr_dec = false;
     m_peername = 0;
+    m_num_msg_threads = 0;
 }
 
 void Yaz_Proxy::inc_ref()
@@ -358,7 +354,8 @@ int Yaz_Proxy::set_config(const char *config)
     {
         int period = 60;
         m_config->get_generic_info(&m_log_mask, &m_max_clients,
-                                   &m_max_connect, &m_limit_connect, &period);
+                                   &m_max_connect, &m_limit_connect, &period,
+                                   &m_num_msg_threads);
         m_connect.set_period(period);
     }
     return r;
@@ -414,7 +411,7 @@ Yaz_ProxyConfig *Yaz_Proxy::check_reconfigure()
                 int period = 60;
                 cfg->get_generic_info(&m_log_mask, &m_max_clients,
                                       &m_max_connect, &m_limit_connect,
-                                      &period);
+                                      &period, &m_num_msg_threads);
                 m_connect.set_period(period);
             }
         }
@@ -432,6 +429,9 @@ IPDU_Observer *Yaz_Proxy::sessionNotify(IPDU_Observable
 
     char session_str[200];
     const char *peername = the_PDU_Observable->getpeername();
+    if (!peername)
+        peername = "nullpeer";
+
     if (m_log_mask & PROXY_LOG_IP_CLIENT)
         sprintf(session_str, "%ld:%d %.80s %d ",
                 (long) time(0), m_session_no, peername, 0);
@@ -452,9 +452,14 @@ IPDU_Observer *Yaz_Proxy::sessionNotify(IPDU_Observable
     new_proxy->set_default_target(m_default_target);
     new_proxy->m_max_clients = m_max_clients;
     new_proxy->m_log_mask = m_log_mask;
+    new_proxy->m_session_no = m_session_no;
+    new_proxy->m_num_msg_threads = m_num_msg_threads;
 
+#if 0
+    // in case we want to watch a particular client..
     if (!strcmp(peername, "tcp:163.121.19.82")) // NIS GROUP
         new_proxy->m_log_mask = 255;
+#endif
 
     new_proxy->set_APDU_log(get_APDU_log());
     if (new_proxy->m_log_mask & PROXY_LOG_APDU_CLIENT)
@@ -466,8 +471,12 @@ IPDU_Observer *Yaz_Proxy::sessionNotify(IPDU_Observable
     new_proxy->set_proxy_negotiation(m_proxy_negotiation_charset,
         m_proxy_negotiation_lang, m_proxy_negotiation_default_charset);
     // create thread object the first time we get an incoming connection
-    if (!m_my_thread)
-        m_my_thread = new Msg_Thread(m_socket_observable, 1);
+    if (!m_my_thread && m_num_msg_threads > 0)
+    {
+        yaz_log (YLOG_LOG, "%sStarting message thread management. number=%d",
+                 session_str, m_num_msg_threads);
+        m_my_thread = new Msg_Thread(m_socket_observable, m_num_msg_threads);
+    }
     new_proxy->m_my_thread = m_my_thread;
     return new_proxy;
 }
@@ -601,8 +610,14 @@ Yaz_ProxyClient *Yaz_Proxy::get_client(Z_APDU *apdu, const char *cookie,
             m_client_idletime = client_idletime;
             timeout(m_client_idletime);
         }
+
+        // get those FILE descriptors available 
+        m_parent->low_socket_close();
         if (cql2rpn_fname)
             m_cql2rpn.set_pqf_file(cql2rpn_fname);
+        // reserve them again
+        m_parent->low_socket_open();
+        
         if (negotiation_charset || negotiation_lang || default_client_query_charset)
         {
             set_proxy_negotiation(negotiation_charset,
@@ -734,7 +749,7 @@ Yaz_ProxyClient *Yaz_Proxy::get_client(Z_APDU *apdu, const char *cookie,
                 yaz_log (YLOG_LOG, "%sMAXCLIENTS %d Destroy %d",
                          m_session_str, parent->m_max_clients, c->m_seqno);
                 if (c->m_server && c->m_server != this)
-                    c->m_server->dec_ref(true);
+                    c->m_server->dec_ref();
             }
             else
             {
@@ -749,7 +764,7 @@ Yaz_ProxyClient *Yaz_Proxy::get_client(Z_APDU *apdu, const char *cookie,
                 if (c->m_server && c->m_server != this)
                 {
                     c->m_server->m_client = 0;
-                    c->m_server->dec_ref(true);
+                    c->m_server->dec_ref();
                 }
                 (parent->m_seqno)++;
                 c->m_target_idletime = m_target_idletime;
@@ -1911,7 +1926,7 @@ void Yaz_Proxy::recv_GDU(Z_GDU *apdu, int len)
         delete gdu;
         yaz_log(YLOG_LOG, "%sUnable to encode package", m_session_str);
         m_in_queue.clear();
-        dec_ref(true);
+        dec_ref();
         return;
     }
     m_in_queue.enqueue(gdu);
@@ -2042,7 +2057,7 @@ void Yaz_Proxy::recv_GDU_more(bool normal)
         m_timeout_mode = timeout_busy;
         inc_ref();
         recv_GDU_reduce(g);
-        if (dec_ref(false))
+        if (dec_ref())
             break;
     }
 }
@@ -3122,28 +3137,32 @@ void Yaz_Proxy::handle_init(Z_APDU *apdu)
 
             handle_charset_lang_negotiation(apdu2);
 
+           if (m_timeout_mode == timeout_busy)
+               m_timeout_mode = timeout_normal;
             send_to_client(apdu2);
-            m_timeout_mode = timeout_normal;
             return;
         }
     }
     m_client->m_init_flag = 1;
 
-#if USE_AUTH_MSG
-    Auth_Msg *m = new Auth_Msg;
-    m->m_proxy = this;
-    z_APDU(odr_encode(), &apdu, 0, "encode");
-    char *apdu_buf = odr_getbuf(odr_encode(), &m->m_apdu_len, 0);
-    m->m_apdu_buf = (char*) nmem_malloc(m->m_nmem, m->m_apdu_len);
-    memcpy(m->m_apdu_buf, apdu_buf, m->m_apdu_len);
-    odr_reset(odr_encode());
-
-    inc_ref();
-    m_my_thread->put(m);
-#else
-    int ret = handle_authentication(apdu);
-    result_authentication(apdu, ret);
-#endif
+    if (m_num_msg_threads && m_my_thread)
+    {
+        Auth_Msg *m = new Auth_Msg;
+        m->m_proxy = this;
+        z_APDU(odr_encode(), &apdu, 0, "encode");
+        char *apdu_buf = odr_getbuf(odr_encode(), &m->m_apdu_len, 0);
+        m->m_apdu_buf = (char*) nmem_malloc(m->m_nmem, m->m_apdu_len);
+        memcpy(m->m_apdu_buf, apdu_buf, m->m_apdu_len);
+        odr_reset(odr_encode());
+        
+        inc_ref();
+        m_my_thread->put(m);
+    }
+    else
+    {
+        int ret = handle_authentication(apdu);
+        result_authentication(apdu, ret);
+    }
 }
 
 void Yaz_Proxy::handle_incoming_Z_PDU(Z_APDU *apdu)
@@ -3301,17 +3320,8 @@ void Yaz_Proxy::releaseClient()
         m_parent->pre_init();
 }
 
-bool Yaz_Proxy::dec_ref(bool main_ptr)
+bool Yaz_Proxy::dec_ref()
 {
-    main_ptr = false;
-    assert(m_ref_count > 0);
-    if (main_ptr)
-    {
-        if (m_main_ptr_dec)
-            return false;
-        m_main_ptr_dec = true;
-    }
-
     m_http_keepalive = 0;
 
     --m_ref_count;
@@ -3339,7 +3349,7 @@ void Yaz_ProxyClient::shutdown()
     if (m_server)
     {
         m_waiting = 1;   // ensure it's released from Yaz_Proxy::releaseClient
-        m_server->dec_ref(true);
+        m_server->dec_ref();
     }
     else
         delete this;
@@ -3349,7 +3359,7 @@ void Yaz_Proxy::failNotify()
 {
     inc_request_no();
     yaz_log (YLOG_LOG, "%sConnection closed by client", get_session_str());
-    dec_ref(true);
+    dec_ref();
 }
 
 void Yaz_Proxy::send_response_fail_client(const char *addr)
@@ -3550,7 +3560,7 @@ void Yaz_Proxy::timeoutNotify()
             inc_request_no();
             m_in_queue.clear();
             yaz_log (YLOG_LOG, "%sTimeout (client to proxy)", m_session_str);
-            dec_ref(true);
+            dec_ref();
             break;
         case timeout_reduce:
             timeout(m_client_idletime);
@@ -3700,7 +3710,9 @@ void Yaz_ProxyClient::recv_Z_PDU(Z_APDU *apdu, int len)
         *imv1 = '\0';
         if (imv0)
             strcat(imv1, imv0);
+#ifdef VERSION
         strcat(imv1, "/" VERSION);
+#endif
         ir->implementationVersion = imv1;
         
         // apply YAZ Proxy implementation name