X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Fyaz-proxy.cpp;h=1caa1ea73b99f665453fe40b3759bc43afa58678;hb=76cd9bbe1a562c561e1ac35b24c8f1fe508bac93;hp=4aff7673593119ea24169fc4ff9c36f668ab2a62;hpb=3eb9c4a4462d9ea7630750b541c253b145309521;p=yazpp-moved-to-github.git diff --git a/src/yaz-proxy.cpp b/src/yaz-proxy.cpp index 4aff767..1caa1ea 100644 --- a/src/yaz-proxy.cpp +++ b/src/yaz-proxy.cpp @@ -2,11 +2,14 @@ * Copyright (c) 1998-2004, Index Data. * See the file LICENSE for details. * - * $Id: yaz-proxy.cpp,v 1.96 2004-02-02 11:17:45 adam Exp $ + * $Id: yaz-proxy.cpp,v 1.106 2004-03-01 17:01:52 adam Exp $ */ +#include #include #include +#include +#include #include #include @@ -100,6 +103,8 @@ Yaz_Proxy::Yaz_Proxy(IYaz_PDU_Observable *the_PDU_Observable, m_config_fname = 0; m_request_no = 0; m_invalid_session = 0; + m_referenceId = 0; + m_referenceId_mem = nmem_create(); m_config = 0; m_marcxml_flag = 0; m_stylesheet_xsp = 0; @@ -109,6 +114,8 @@ Yaz_Proxy::Yaz_Proxy(IYaz_PDU_Observable *the_PDU_Observable, m_schema = 0; m_initRequest_apdu = 0; m_initRequest_mem = 0; + m_initRequest_preferredMessageSize = 0; + m_initRequest_maximumRecordSize = 0; m_initRequest_options = 0; m_initRequest_version = 0; m_apdu_invalid_session = 0; @@ -124,6 +131,8 @@ Yaz_Proxy::Yaz_Proxy(IYaz_PDU_Observable *the_PDU_Observable, m_s2z_packing = Z_SRW_recordPacking_string; m_time_tv.tv_sec = 0; m_time_tv.tv_usec = 0; + if (!m_parent) + low_socket_open(); } Yaz_Proxy::~Yaz_Proxy() @@ -132,6 +141,8 @@ Yaz_Proxy::~Yaz_Proxy() m_bytes_sent, m_bytes_recv); nmem_destroy(m_initRequest_mem); nmem_destroy(m_mem_invalid_session); + nmem_destroy(m_referenceId_mem); + xfree (m_proxyTarget); xfree (m_default_target); xfree (m_proxy_authentication); @@ -145,6 +156,8 @@ Yaz_Proxy::~Yaz_Proxy() odr_destroy(m_s2z_odr_init); if (m_s2z_odr_search) odr_destroy(m_s2z_odr_search); + if (!m_parent) + low_socket_close(); delete m_config; } @@ -265,30 +278,53 @@ char *Yaz_Proxy::get_proxy(Z_OtherInformation **otherInfo) const char *Yaz_Proxy::load_balance(const char **url) { int zurl_in_use[MAX_ZURL_PLEX]; + int zurl_in_spare[MAX_ZURL_PLEX]; Yaz_ProxyClient *c; int i; for (i = 0; im_clientPool; c; c = c->m_next) { for (i = 0; url[i]; i++) if (!strcmp(url[i], c->get_hostname())) + { zurl_in_use[i]++; + if (c->m_cookie == 0 && c->m_server == 0 && c->m_waiting == 0) + zurl_in_spare[i]++; + } } - int min = 100000; - const char *ret = 0; + int min_use = 100000; + int spare_for_min = 0; + int max_spare = 0; + const char *ret_min = 0; + const char *ret_spare = 0; for (i = 0; url[i]; i++) { - yaz_log(LOG_DEBUG, "%szurl=%s use=%d", - m_session_str, url[i], zurl_in_use[i]); - if (min > zurl_in_use[i]) + yaz_log(LOG_DEBUG, "%szurl=%s use=%d spare=%d", + m_session_str, url[i], zurl_in_use[i], zurl_in_spare[i]); + if (min_use > zurl_in_use[i]) { - ret = url[i]; - min = zurl_in_use[i]; + ret_min = url[i]; + min_use = zurl_in_use[i]; + spare_for_min = zurl_in_spare[i]; + } + if (max_spare < zurl_in_spare[i]) + { + ret_spare = url[i]; + max_spare = zurl_in_spare[i]; } } - return ret; + // use the one with minimum connections if spare is > 3 + if (spare_for_min > 3) + return ret_min; + // use one with most spares (if any) + if (max_spare > 0) + return ret_spare; + return ret_min; } Yaz_ProxyClient *Yaz_Proxy::get_client(Z_APDU *apdu, const char *cookie, @@ -405,8 +441,8 @@ Yaz_ProxyClient *Yaz_Proxy::get_client(Z_APDU *apdu, const char *cookie, !strcmp(m_proxyTarget, c->get_hostname())) { // found it in cache - yaz_log (LOG_LOG, "%sREUSE %s", - m_session_str, c->get_hostname()); + yaz_log (LOG_LOG, "%sREUSE %d %s", + m_session_str, parent->m_seqno, c->get_hostname()); c->m_seqno = parent->m_seqno; assert(c->m_server == 0); @@ -471,16 +507,16 @@ Yaz_ProxyClient *Yaz_Proxy::get_client(Z_APDU *apdu, const char *cookie, c = c_min; if (c->m_waiting || strcmp(m_proxyTarget, c->get_hostname())) { - yaz_log (LOG_LOG, "%sMAXCLIENTS Destroy %d", - m_session_str, c->m_seqno); + yaz_log (LOG_LOG, "%sMAXCLIENTS %d Destroy %d", + m_session_str, parent->m_max_clients, c->m_seqno); if (c->m_server && c->m_server != this) delete c->m_server; c->m_server = 0; } else { - yaz_log (LOG_LOG, "%sMAXCLIENTS Reuse %d %d %s", - m_session_str, + yaz_log (LOG_LOG, "%sMAXCLIENTS %d Reuse %d %d %s", + m_session_str, parent->m_max_clients, c->m_seqno, parent->m_seqno, c->get_hostname()); xfree (c->m_cookie); c->m_cookie = 0; @@ -610,6 +646,7 @@ void Yaz_Proxy::convert_xsl_delay() yaz_log(LOG_LOG, "%sXSLT convert %d", m_session_str, m_stylesheet_offset); res = xsltApplyStylesheet(m_stylesheet_xsp, doc, 0); + if (res) { xmlChar *out_buf; @@ -623,6 +660,7 @@ void Yaz_Proxy::convert_xsl_delay() xmlFree(out_buf); xmlFreeDoc(res); } + xmlFreeDoc(doc); } } @@ -634,7 +672,7 @@ void Yaz_Proxy::convert_xsl_delay() xsltFreeStylesheet(m_stylesheet_xsp); m_stylesheet_xsp = 0; timeout(m_client_idletime); - int r = send_PDU_convert(m_stylesheet_apdu); + send_PDU_convert(m_stylesheet_apdu); } else timeout(0); @@ -892,6 +930,7 @@ int Yaz_Proxy::send_srw_explain_response(Z_SRW_diagnostic *diagnostics, er->record.recordData_buf = b; er->record.recordData_len = len; er->record.recordPacking = m_s2z_packing; + er->record.recordSchema = "http://explain.z3950.org/dtd/2.0/"; er->diagnostics = diagnostics; er->num_diagnostics = num_diagnostics; @@ -978,6 +1017,11 @@ int Yaz_Proxy::send_PDU_convert(Z_APDU *apdu) int Yaz_Proxy::send_to_client(Z_APDU *apdu) { int kill_session = 0; + Z_ReferenceId **new_id = get_referenceIdP(apdu); + + if (new_id && m_referenceId) + *new_id = m_referenceId; + if (apdu->which == Z_APDU_searchResponse) { Z_SearchResponse *sr = apdu->u.searchResponse; @@ -1069,6 +1113,18 @@ int Yaz_Proxy::send_to_client(Z_APDU *apdu) ODR_MASK_SET(nopt, i); apdu->u.initResponse->protocolVersion = nopt; } + apdu->u.initResponse->preferredMessageSize = + odr_intdup(odr_encode(), + m_client->m_initResponse_preferredMessageSize > + m_initRequest_preferredMessageSize ? + m_initRequest_preferredMessageSize : + m_client->m_initResponse_preferredMessageSize); + apdu->u.initResponse->maximumRecordSize = + odr_intdup(odr_encode(), + m_client->m_initResponse_maximumRecordSize > + m_initRequest_maximumRecordSize ? + m_initRequest_maximumRecordSize : + m_client->m_initResponse_maximumRecordSize); } int r = send_PDU_convert(apdu); if (r) @@ -1520,12 +1576,17 @@ Z_APDU *Yaz_Proxy::handle_syntax_validation(Z_APDU *apdu) &addinfo, &stylesheet_name, &m_schema); if (stylesheet_name) { + m_parent->low_socket_close(); + if (m_stylesheet_xsp) xsltFreeStylesheet(m_stylesheet_xsp); + m_stylesheet_xsp = xsltParseStylesheetFile((const xmlChar*) stylesheet_name); m_stylesheet_offset = 0; xfree(stylesheet_name); + + m_parent->low_socket_open(); } if (err == -1) { @@ -1562,12 +1623,17 @@ Z_APDU *Yaz_Proxy::handle_syntax_validation(Z_APDU *apdu) &addinfo, &stylesheet_name, &m_schema); if (stylesheet_name) { + m_parent->low_socket_close(); + if (m_stylesheet_xsp) xsltFreeStylesheet(m_stylesheet_xsp); + m_stylesheet_xsp = xsltParseStylesheetFile((const xmlChar*) stylesheet_name); m_stylesheet_offset = 0; xfree(stylesheet_name); + + m_parent->low_socket_open(); } if (err == -1) { @@ -1911,13 +1977,27 @@ void Yaz_Proxy::handle_incoming_HTTP(Z_HTTP_Request *hreq) void Yaz_Proxy::handle_incoming_Z_PDU(Z_APDU *apdu) { + Z_ReferenceId **refid = get_referenceIdP(apdu); + nmem_reset(m_referenceId_mem); + if (refid && *refid) + { + m_referenceId = (Z_ReferenceId *) + nmem_malloc(m_referenceId_mem, sizeof(*m_referenceId)); + m_referenceId->len = m_referenceId->size = (*refid)->len; + m_referenceId->buf = (unsigned char *) + nmem_malloc(m_referenceId_mem, (*refid)->len); + memcpy(m_referenceId->buf, (*refid)->buf, (*refid)->len); + } + else + m_referenceId = 0; + if (!m_client && m_invalid_session) { m_apdu_invalid_session = apdu; m_mem_invalid_session = odr_extract_mem(odr_decode()); apdu = m_initRequest_apdu; } - + // Determine our client. Z_OtherInformation **oi; get_otherInfoAPDU(apdu, &oi); @@ -1947,6 +2027,13 @@ void Yaz_Proxy::handle_incoming_Z_PDU(Z_APDU *apdu) m_initRequest_apdu = apdu; m_initRequest_mem = odr_extract_mem(odr_decode()); + m_initRequest_preferredMessageSize = *apdu->u.initRequest-> + preferredMessageSize; + *apdu->u.initRequest->preferredMessageSize = 1024*1024; + m_initRequest_maximumRecordSize = *apdu->u.initRequest-> + maximumRecordSize; + *apdu->u.initRequest->maximumRecordSize = 1024*1024; + // save init options for the response.. m_initRequest_options = apdu->u.initRequest->options; @@ -1959,6 +2046,8 @@ void Yaz_Proxy::handle_incoming_Z_PDU(Z_APDU *apdu) ODR_MASK_SET(apdu->u.initRequest->options, i); ODR_MASK_CLEAR(apdu->u.initRequest->options, Z_Options_negotiationModel); + ODR_MASK_CLEAR(apdu->u.initRequest->options, + Z_Options_concurrentOperations); // make new version m_initRequest_version = apdu->u.initRequest->protocolVersion; @@ -1974,19 +2063,22 @@ void Yaz_Proxy::handle_incoming_Z_PDU(Z_APDU *apdu) { if (handle_init_response_for_invalid_session(apdu)) return; - Z_APDU *apdu2 = m_client->m_initResponse; - apdu2->u.initResponse->otherInfo = 0; - if (m_client->m_cookie && *m_client->m_cookie) - set_otherInformationString(apdu2, VAL_COOKIE, 1, - m_client->m_cookie); - apdu2->u.initResponse->referenceId = - apdu->u.initRequest->referenceId; - apdu2->u.initResponse->options = m_client->m_initResponse_options; - apdu2->u.initResponse->protocolVersion = - m_client->m_initResponse_version; - - send_to_client(apdu2); - return; + if (m_client->m_initResponse) + { + Z_APDU *apdu2 = m_client->m_initResponse; + apdu2->u.initResponse->otherInfo = 0; + if (m_client->m_cookie && *m_client->m_cookie) + set_otherInformationString(apdu2, VAL_COOKIE, 1, + m_client->m_cookie); + apdu2->u.initResponse->referenceId = + apdu->u.initRequest->referenceId; + apdu2->u.initResponse->options = m_client->m_initResponse_options; + apdu2->u.initResponse->protocolVersion = + m_client->m_initResponse_version; + + send_to_client(apdu2); + return; + } } m_client->m_init_flag = 1; } @@ -2155,6 +2247,8 @@ void Yaz_ProxyClient::pre_init_client() ODR_MASK_SET(req->options, i); ODR_MASK_CLEAR(apdu->u.initRequest->options, Z_Options_negotiationModel); + ODR_MASK_CLEAR(apdu->u.initRequest->options, + Z_Options_concurrentOperations); for (i = 0; i<= 10; i++) ODR_MASK_SET(req->protocolVersion, i); @@ -2206,6 +2300,7 @@ void Yaz_Proxy::pre_init() { Yaz_ProxyClient *c; int spare = 0; + int spare_waiting = 0; int in_use = 0; int other = 0; for (c = m_clientPool; c; c = c->m_next) @@ -2215,7 +2310,10 @@ void Yaz_Proxy::pre_init() if (c->m_cookie == 0) { if (c->m_server == 0) - spare++; + if (c->m_waiting) + spare_waiting; + else + spare++; else in_use++; } @@ -2224,9 +2322,10 @@ void Yaz_Proxy::pre_init() } } yaz_log(LOG_LOG, "%spre-init %s %s use=%d other=%d spare=%d " - "preinit=%d",m_session_str, - name, zurl_in_use[j], in_use, other, spare, pre_init); - if (spare < pre_init) + "sparew=%d preinit=%d",m_session_str, + name, zurl_in_use[j], in_use, other, + spare, spare_waiting, pre_init); + if (spare + spare_waiting < pre_init) { c = new Yaz_ProxyClient(m_PDU_Observable->clone(), this); c->m_next = m_clientPool; @@ -2332,6 +2431,8 @@ Yaz_ProxyClient::Yaz_ProxyClient(IYaz_PDU_Observable *the_PDU_Observable, m_initResponse = 0; m_initResponse_options = 0; m_initResponse_version = 0; + m_initResponse_preferredMessageSize = 0; + m_initResponse_maximumRecordSize = 0; m_resultSetStartPoint = 0; m_bytes_sent = m_bytes_recv = 0; m_pdu_recv = 0; @@ -2399,6 +2500,10 @@ void Yaz_ProxyClient::recv_Z_PDU(Z_APDU *apdu, int len) m_initResponse = apdu; m_initResponse_options = apdu->u.initResponse->options; m_initResponse_version = apdu->u.initResponse->protocolVersion; + m_initResponse_preferredMessageSize = + *apdu->u.initResponse->preferredMessageSize; + m_initResponse_maximumRecordSize = + *apdu->u.initResponse->maximumRecordSize; Z_InitResponse *ir = apdu->u.initResponse; char *im0 = ir->implementationName; @@ -2472,6 +2577,21 @@ void Yaz_ProxyClient::recv_Z_PDU(Z_APDU *apdu, int len) } } +void Yaz_Proxy::low_socket_close() +{ + int i; + for (i = 0; i= 0) + ::close(m_lo_fd[i]); +} + +void Yaz_Proxy::low_socket_open() +{ + int i; + for (i = 0; i