Rename a few files
[yazpp-moved-to-github.git] / src / yaz-proxy.cpp
index 83fae8a..e7783f9 100644 (file)
@@ -2,11 +2,14 @@
  * Copyright (c) 1998-2004, Index Data.
  * See the file LICENSE for details.
  * 
- * $Id: yaz-proxy.cpp,v 1.100 2004-02-12 20:40:22 adam Exp $
+ * $Id: yaz-proxy.cpp,v 1.110 2004-03-17 10:49:22 adam Exp $
  */
 
+#include <unistd.h>
 #include <assert.h>
 #include <time.h>
+#include <sys/types.h>
+#include <fcntl.h>
 
 #include <yaz/srw.h>
 #include <yaz/marcdisp.h>
@@ -111,6 +114,8 @@ Yaz_Proxy::Yaz_Proxy(IYaz_PDU_Observable *the_PDU_Observable,
     m_schema = 0;
     m_initRequest_apdu = 0;
     m_initRequest_mem = 0;
+    m_initRequest_preferredMessageSize = 0;
+    m_initRequest_maximumRecordSize = 0;
     m_initRequest_options = 0;
     m_initRequest_version = 0;
     m_apdu_invalid_session = 0;
@@ -126,6 +131,8 @@ Yaz_Proxy::Yaz_Proxy(IYaz_PDU_Observable *the_PDU_Observable,
     m_s2z_packing = Z_SRW_recordPacking_string;
     m_time_tv.tv_sec = 0;
     m_time_tv.tv_usec = 0;
+    if (!m_parent)
+       low_socket_open();
 }
 
 Yaz_Proxy::~Yaz_Proxy()
@@ -149,6 +156,8 @@ Yaz_Proxy::~Yaz_Proxy()
        odr_destroy(m_s2z_odr_init);
     if (m_s2z_odr_search)
        odr_destroy(m_s2z_odr_search);
+    if (!m_parent)
+       low_socket_close();
     delete m_config;
 }
 
@@ -269,30 +278,53 @@ char *Yaz_Proxy::get_proxy(Z_OtherInformation **otherInfo)
 const char *Yaz_Proxy::load_balance(const char **url)
 {
     int zurl_in_use[MAX_ZURL_PLEX];
+    int zurl_in_spare[MAX_ZURL_PLEX];
     Yaz_ProxyClient *c;
     int i;
 
     for (i = 0; i<MAX_ZURL_PLEX; i++)
+    {
        zurl_in_use[i] = 0;
+       zurl_in_spare[i] = 0;
+    }
     for (c = m_parent->m_clientPool; c; c = c->m_next)
     {
        for (i = 0; url[i]; i++)
            if (!strcmp(url[i], c->get_hostname()))
+           {
                zurl_in_use[i]++;
+               if (c->m_cookie == 0 && c->m_server == 0 && c->m_waiting == 0)
+                   zurl_in_spare[i]++;
+           }
     }
-    int min = 100000;
-    const char *ret = 0;
+    int min_use = 100000;
+    int spare_for_min = 0;
+    int max_spare = 0;
+    const char *ret_min = 0;
+    const char *ret_spare = 0;
     for (i = 0; url[i]; i++)
     {
-       yaz_log(LOG_DEBUG, "%szurl=%s use=%d",
-               m_session_str, url[i], zurl_in_use[i]);
-       if (min > zurl_in_use[i])
+       yaz_log(LOG_DEBUG, "%szurl=%s use=%d spare=%d",
+               m_session_str, url[i], zurl_in_use[i], zurl_in_spare[i]);
+       if (min_use > zurl_in_use[i])
        {
-           ret = url[i];
-           min = zurl_in_use[i];
+           ret_min = url[i];
+           min_use = zurl_in_use[i];
+           spare_for_min = zurl_in_spare[i];
+       }
+       if (max_spare < zurl_in_spare[i]) 
+       {
+           ret_spare = url[i];
+           max_spare = zurl_in_spare[i];
        }
     }
-    return ret;
+    // use the one with minimum connections if spare is > 3
+    if (spare_for_min > 3)
+       return ret_min;
+    // use one with most spares (if any)
+    if (max_spare > 0)
+       return ret_spare;
+    return ret_min;
 }
 
 Yaz_ProxyClient *Yaz_Proxy::get_client(Z_APDU *apdu, const char *cookie,
@@ -409,8 +441,8 @@ Yaz_ProxyClient *Yaz_Proxy::get_client(Z_APDU *apdu, const char *cookie,
                !strcmp(m_proxyTarget, c->get_hostname()))
            {
                // found it in cache
-               yaz_log (LOG_LOG, "%sREUSE %s",
-                        m_session_str, c->get_hostname());
+               yaz_log (LOG_LOG, "%sREUSE %d %s",
+                        m_session_str, parent->m_seqno, c->get_hostname());
                
                c->m_seqno = parent->m_seqno;
                assert(c->m_server == 0);
@@ -475,16 +507,16 @@ Yaz_ProxyClient *Yaz_Proxy::get_client(Z_APDU *apdu, const char *cookie,
            c = c_min;
            if (c->m_waiting || strcmp(m_proxyTarget, c->get_hostname()))
            {
-               yaz_log (LOG_LOG, "%sMAXCLIENTS Destroy %d",
-                        m_session_str, c->m_seqno);
+               yaz_log (LOG_LOG, "%sMAXCLIENTS %d Destroy %d",
+                        m_session_str, parent->m_max_clients, c->m_seqno);
                if (c->m_server && c->m_server != this)
                    delete c->m_server;
                c->m_server = 0;
            }
            else
            {
-               yaz_log (LOG_LOG, "%sMAXCLIENTS Reuse %d %d %s",
-                        m_session_str,
+               yaz_log (LOG_LOG, "%sMAXCLIENTS %d Reuse %d %d %s",
+                        m_session_str, parent->m_max_clients,
                         c->m_seqno, parent->m_seqno, c->get_hostname());
                xfree (c->m_cookie);
                c->m_cookie = 0;
@@ -607,13 +639,18 @@ void Yaz_Proxy::convert_xsl_delay()
        Z_External *r = npr->u.databaseRecord;
        if (r->which == Z_External_octet)
        {
+#if 0
+           fwrite((char*) r->u.octet_aligned->buf, 1, r->u.octet_aligned->len, stdout);
+#endif
            xmlDocPtr res, doc = xmlParseMemory(
                (char*) r->u.octet_aligned->buf,
                r->u.octet_aligned->len);
+
            
            yaz_log(LOG_LOG, "%sXSLT convert %d",
                    m_session_str, m_stylesheet_offset);
            res = xsltApplyStylesheet(m_stylesheet_xsp, doc, 0);
+
            if (res)
            {
                xmlChar *out_buf;
@@ -627,6 +664,7 @@ void Yaz_Proxy::convert_xsl_delay()
                xmlFree(out_buf);
                xmlFreeDoc(res);
            }
+
            xmlFreeDoc(doc);
        }
     }
@@ -648,8 +686,10 @@ void Yaz_Proxy::convert_to_marcxml(Z_NamePlusRecordList *p)
 {
     int i;
 
+    yaz_iconv_t cd = yaz_iconv_open("UTF-8", "MARC-8");
     yaz_marc_t mt = yaz_marc_create();
     yaz_marc_xml(mt, YAZ_MARC_MARCXML);
+    yaz_marc_iconv(mt, cd);
     for (i = 0; i < p->num_records; i++)
     {
        Z_NamePlusRecord *npr = p->records[i];
@@ -664,44 +704,15 @@ void Yaz_Proxy::convert_to_marcxml(Z_NamePlusRecordList *p)
                                        r->u.octet_aligned->len,
                                        &result, &rlen))
                {
-                   yaz_iconv_t cd = yaz_iconv_open("UTF-8", "MARC-8");
-                   WRBUF wrbuf = wrbuf_alloc();
-                   
-                   char outbuf[120];
-                   size_t inbytesleft = rlen;
-                   const char *inp = result;
-                   while (cd && inbytesleft)
-                   {
-                       size_t outbytesleft = sizeof(outbuf);
-                       char *outp = outbuf;
-                       size_t r;
-                       
-                       r = yaz_iconv (cd, (char**) &inp,
-                                      &inbytesleft,
-                                      &outp, &outbytesleft);
-                       if (r == (size_t) (-1))
-                       {
-                           int e = yaz_iconv_error(cd);
-                           if (e != YAZ_ICONV_E2BIG)
-                           {
-                               yaz_log(LOG_WARN, "conversion failure");
-                               break;
-                           }
-                       }
-                       wrbuf_write(wrbuf, outbuf, outp - outbuf);
-                   }
-                   if (cd)
-                       yaz_iconv_close(cd);
-
                    npr->u.databaseRecord = z_ext_record(odr_encode(),
                                                         VAL_TEXT_XML,
-                                                        wrbuf_buf(wrbuf),
-                                                        wrbuf_len(wrbuf));
-                   wrbuf_free(wrbuf, 1);
+                                                        result, rlen);
                }
            }
        }
     }
+    if (cd)
+       yaz_iconv_close(cd);
     yaz_marc_destroy(mt);
 }
 
@@ -896,6 +907,7 @@ int Yaz_Proxy::send_srw_explain_response(Z_SRW_diagnostic *diagnostics,
            er->record.recordData_buf = b;
            er->record.recordData_len = len;
            er->record.recordPacking = m_s2z_packing;
+           er->record.recordSchema = "http://explain.z3950.org/dtd/2.0/";
 
            er->diagnostics = diagnostics;
            er->num_diagnostics = num_diagnostics;
@@ -984,7 +996,7 @@ int Yaz_Proxy::send_to_client(Z_APDU *apdu)
     int kill_session = 0;
     Z_ReferenceId **new_id = get_referenceIdP(apdu);
 
-    if (new_id && m_referenceId)
+    if (new_id)
        *new_id = m_referenceId;
     
     if (apdu->which == Z_APDU_searchResponse)
@@ -1078,6 +1090,18 @@ int Yaz_Proxy::send_to_client(Z_APDU *apdu)
                    ODR_MASK_SET(nopt, i);
            apdu->u.initResponse->protocolVersion = nopt;           
        }
+       apdu->u.initResponse->preferredMessageSize =
+           odr_intdup(odr_encode(),
+                      m_client->m_initResponse_preferredMessageSize >
+                      m_initRequest_preferredMessageSize ?
+                      m_initRequest_preferredMessageSize :
+                      m_client->m_initResponse_preferredMessageSize);
+       apdu->u.initResponse->maximumRecordSize =
+           odr_intdup(odr_encode(),
+                      m_client->m_initResponse_maximumRecordSize >
+                      m_initRequest_maximumRecordSize ?
+                      m_initRequest_maximumRecordSize :
+                      m_client->m_initResponse_maximumRecordSize);
     }
     int r = send_PDU_convert(apdu);
     if (r)
@@ -1346,8 +1370,8 @@ void Yaz_Proxy::recv_GDU(Z_GDU *apdu, int len)
 
     m_bytes_recv += len;
     
-    if (m_log_mask & PROXY_LOG_APDU_CLIENT)
-       yaz_log (LOG_DEBUG, "%sReceiving %s from client %d bytes",
+    if (m_log_mask & PROXY_LOG_REQ_CLIENT)
+       yaz_log (LOG_LOG, "%sReceiving %s from client %d bytes",
                 m_session_str, gdu_name(apdu), len);
 
     if (m_bw_hold_PDU)     // double incoming PDU. shutdown now.
@@ -1529,12 +1553,17 @@ Z_APDU *Yaz_Proxy::handle_syntax_validation(Z_APDU *apdu)
                                    &addinfo, &stylesheet_name, &m_schema);
        if (stylesheet_name)
        {
+           m_parent->low_socket_close();
+
            if (m_stylesheet_xsp)
                xsltFreeStylesheet(m_stylesheet_xsp);
+
            m_stylesheet_xsp = xsltParseStylesheetFile((const xmlChar*)
                                                       stylesheet_name);
            m_stylesheet_offset = 0;
            xfree(stylesheet_name);
+
+           m_parent->low_socket_open();
        }
        if (err == -1)
        {
@@ -1571,12 +1600,17 @@ Z_APDU *Yaz_Proxy::handle_syntax_validation(Z_APDU *apdu)
                                    &addinfo, &stylesheet_name, &m_schema);
        if (stylesheet_name)
        {
+           m_parent->low_socket_close();
+
            if (m_stylesheet_xsp)
                xsltFreeStylesheet(m_stylesheet_xsp);
+
            m_stylesheet_xsp = xsltParseStylesheetFile((const xmlChar*)
                                                       stylesheet_name);
            m_stylesheet_offset = 0;
            xfree(stylesheet_name);
+
+           m_parent->low_socket_open();
        }
        if (err == -1)
        {
@@ -1970,6 +2004,13 @@ void Yaz_Proxy::handle_incoming_Z_PDU(Z_APDU *apdu)
            m_initRequest_apdu = apdu;
            m_initRequest_mem = odr_extract_mem(odr_decode());
 
+           m_initRequest_preferredMessageSize = *apdu->u.initRequest->
+               preferredMessageSize;
+           *apdu->u.initRequest->preferredMessageSize = 1024*1024;
+           m_initRequest_maximumRecordSize = *apdu->u.initRequest->
+               maximumRecordSize;
+           *apdu->u.initRequest->maximumRecordSize = 1024*1024;
+
            // save init options for the response..
            m_initRequest_options = apdu->u.initRequest->options;
            
@@ -1982,6 +2023,8 @@ void Yaz_Proxy::handle_incoming_Z_PDU(Z_APDU *apdu)
                ODR_MASK_SET(apdu->u.initRequest->options, i);
            ODR_MASK_CLEAR(apdu->u.initRequest->options,
                           Z_Options_negotiationModel);
+           ODR_MASK_CLEAR(apdu->u.initRequest->options,
+                          Z_Options_concurrentOperations);
 
            // make new version
            m_initRequest_version = apdu->u.initRequest->protocolVersion;
@@ -2181,6 +2224,8 @@ void Yaz_ProxyClient::pre_init_client()
        ODR_MASK_SET(req->options, i);
     ODR_MASK_CLEAR(apdu->u.initRequest->options,
                   Z_Options_negotiationModel);
+    ODR_MASK_CLEAR(apdu->u.initRequest->options,
+                  Z_Options_concurrentOperations);
     for (i = 0; i<= 10; i++)
        ODR_MASK_SET(req->protocolVersion, i);
 
@@ -2232,6 +2277,7 @@ void Yaz_Proxy::pre_init()
            {
                Yaz_ProxyClient *c;
                int spare = 0;
+               int spare_waiting = 0;
                int in_use = 0;
                int other = 0;
                for (c = m_clientPool; c; c = c->m_next)
@@ -2241,7 +2287,10 @@ void Yaz_Proxy::pre_init()
                        if (c->m_cookie == 0)
                        {
                            if (c->m_server == 0)
-                               spare++;
+                               if (c->m_waiting)
+                                   spare_waiting++;
+                               else
+                                   spare++;
                            else
                                in_use++;
                        }
@@ -2250,9 +2299,10 @@ void Yaz_Proxy::pre_init()
                    }
                }
                yaz_log(LOG_LOG, "%spre-init %s %s use=%d other=%d spare=%d "
-                       "preinit=%d",m_session_str,
-                       name, zurl_in_use[j], in_use, other, spare, pre_init);
-               if (spare < pre_init)
+                       "sparew=%d preinit=%d",m_session_str,
+                       name, zurl_in_use[j], in_use, other,
+                       spare, spare_waiting, pre_init);
+               if (spare + spare_waiting < pre_init)
                {
                    c = new Yaz_ProxyClient(m_PDU_Observable->clone(), this);
                    c->m_next = m_clientPool;
@@ -2358,6 +2408,8 @@ Yaz_ProxyClient::Yaz_ProxyClient(IYaz_PDU_Observable *the_PDU_Observable,
     m_initResponse = 0;
     m_initResponse_options = 0;
     m_initResponse_version = 0;
+    m_initResponse_preferredMessageSize = 0;
+    m_initResponse_maximumRecordSize = 0;
     m_resultSetStartPoint = 0;
     m_bytes_sent = m_bytes_recv = 0;
     m_pdu_recv = 0;
@@ -2425,6 +2477,10 @@ void Yaz_ProxyClient::recv_Z_PDU(Z_APDU *apdu, int len)
         m_initResponse = apdu;
        m_initResponse_options = apdu->u.initResponse->options;
        m_initResponse_version = apdu->u.initResponse->protocolVersion;
+       m_initResponse_preferredMessageSize = 
+           *apdu->u.initResponse->preferredMessageSize;
+       m_initResponse_maximumRecordSize = 
+           *apdu->u.initResponse->maximumRecordSize;
 
        Z_InitResponse *ir = apdu->u.initResponse;
        char *im0 = ir->implementationName;
@@ -2498,6 +2554,21 @@ void Yaz_ProxyClient::recv_Z_PDU(Z_APDU *apdu, int len)
     }
 }
 
+void Yaz_Proxy::low_socket_close()
+{
+    int i;
+    for (i = 0; i<NO_SPARE_SOLARIS_FD; i++)
+       if  (m_lo_fd[i] >= 0)
+           ::close(m_lo_fd[i]);
+}
+
+void Yaz_Proxy::low_socket_open()
+{
+    int i;
+    for (i = 0; i<NO_SPARE_SOLARIS_FD; i++)
+       m_lo_fd[i] = open("/dev/null", O_RDONLY);
+}
+
 int Yaz_Proxy::server(const char *addr)
 {
     int r = Yaz_Z_Assoc::server(addr);