X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Ffilter_load_balance.cpp;h=dafe8f3a615a675d38029e3675b476fb5ee0802c;hb=586d78659d671683f33ec55f4a7d32b28e345ccd;hp=57bd3901402857e5853e07885f20c118a5f8f057;hpb=451ffea184560b0d10f369dfc3fccb0fa86b91a2;p=metaproxy-moved-to-github.git diff --git a/src/filter_load_balance.cpp b/src/filter_load_balance.cpp index 57bd390..dafe8f3 100644 --- a/src/filter_load_balance.cpp +++ b/src/filter_load_balance.cpp @@ -1,41 +1,93 @@ -/* $Id: filter_load_balance.cpp,v 1.1 2007-01-02 15:35:36 marc Exp $ - Copyright (c) 2005-2006, Index Data. +/* This file is part of Metaproxy. + Copyright (C) Index Data - See the LICENSE file for details - */ +Metaproxy is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free +Software Foundation; either version 2, or (at your option) any later +version. + +Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ #include "config.hpp" -#include "filter.hpp" +#include +#include #include "filter_load_balance.hpp" -#include "package.hpp" -#include "util.hpp" +#include + #include -#include +#include +#include #include -#include +// remove max macro if already defined (defined later in ) +#ifdef max +#undef max +#endif + +#include +#include +#include namespace mp = metaproxy_1; namespace yf = mp::filter; -namespace metaproxy_1 { - namespace filter { - class LoadBalance::Impl { +namespace metaproxy_1 +{ + namespace filter + { + class LoadBalance::Impl + { public: Impl(); ~Impl(); void process(metaproxy_1::Package & package); void configure(const xmlNode * ptr); private: + // statistic manipulating functions, + void add_dead(unsigned long session_id); + //void clear_dead(unsigned long session_id); + void add_package(unsigned long session_id); + void remove_package(unsigned long session_id); + void add_session(unsigned long session_id, std::string target); + void remove_session(unsigned long session_id); + std::string find_session_target(unsigned long session_id); + + // cost functions + unsigned int cost(std::string target); + unsigned int dead(std::string target); + + // local classes + class TargetStat { + public: + unsigned int sessions; + unsigned int packages; + unsigned int deads; + unsigned int cost() { + unsigned int c = sessions + packages + deads; + return c; + } + }; + + // local protected databases boost::mutex m_mutex; + std::map m_target_stat; + std::map m_session_target; }; } } // define Pimpl wrapper forwarding to Impl - + yf::LoadBalance::LoadBalance() : m_p(new Impl) { } @@ -44,7 +96,8 @@ yf::LoadBalance::~LoadBalance() { // must have a destructor because of boost::scoped_ptr } -void yf::LoadBalance::configure(const xmlNode *xmlnode) +void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only, + const char *path) { m_p->configure(xmlnode); } @@ -55,16 +108,12 @@ void yf::LoadBalance::process(mp::Package &package) const } -// define Implementation stuff - - - yf::LoadBalance::Impl::Impl() { } yf::LoadBalance::Impl::~Impl() -{ +{ } void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode) @@ -73,78 +122,306 @@ void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode) void yf::LoadBalance::Impl::process(mp::Package &package) { + bool is_closed_front = false; + + // checking for closed front end packages + if (package.session().is_closed()) + { + is_closed_front = true; + } + Z_GDU *gdu_req = package.request().get(); + // passing anything but z3950 packages + if (gdu_req && gdu_req->which == Z_GDU_Z3950) + { + // target selecting only on Z39.50 init request + if (gdu_req->u.z3950->which == Z_APDU_initRequest) + { + yazpp_1::GDU base_req(gdu_req); + Z_APDU *apdu = base_req.get()->u.z3950; + + Z_InitRequest *org_init = base_req.get()->u.z3950->u.initRequest; + mp::odr odr_en(ODR_ENCODE); + + std::list vhosts; + mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts); + // get lowest of all vhosts.. Remove them if individually if + // they turn out to be bad.. + while (1) + { + std::string target; + std::list::iterator ivh = vhosts.begin(); + + Package init_pkg(package.session(), package.origin()); + init_pkg.copy_filter(package); + + unsigned int cost = std::numeric_limits::max(); + { + boost::mutex::scoped_lock scoped_lock(m_mutex); + + for (; ivh != vhosts.end(); ) + { + if ((*ivh).size() != 0) + { + unsigned int vhcost + = yf::LoadBalance::Impl::cost(*ivh); + yaz_log(YLOG_LOG, "Consider %s cost=%u vhcost=%u", + (*ivh).c_str(), cost, vhcost); + if (cost > vhcost) + { + cost = vhcost; + target = *ivh; + ivh = vhosts.erase(ivh); + } + else + ivh++; + } + else + ivh++; + } + } + if (target.length() == 0) + break; + // copying new target into init package + + yazpp_1::GDU init_gdu(base_req); + Z_InitRequest *init_req = init_gdu.get()->u.z3950->u.initRequest; + + mp::util::set_vhost_otherinfo(&(init_req->otherInfo), + odr_en, target, 1); + + init_pkg.request() = init_gdu; + + // moving all package types + init_pkg.move(); + + // checking for closed back end packages + if (!init_pkg.session().is_closed()) + { + add_session(package.session().id(), target); + + package.response() = init_pkg.response(); + return; + } + } + mp::odr odr; + package.response() = odr.create_initResponse( + apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, + "load_balance: no available targets"); + package.session().close(); + return; + } + // frontend Z39.50 close request is added to statistics and marked + else if (gdu_req->u.z3950->which == Z_APDU_close) + { + is_closed_front = true; + boost::mutex::scoped_lock scoped_lock(m_mutex); + add_package(package.session().id()); + } + // any other Z39.50 package is added to statistics + else + { + boost::mutex::scoped_lock scoped_lock(m_mutex); + add_package(package.session().id()); + } + } + + // moving all package types + package.move(); + + bool is_closed_back = false; + + // checking for closed back end packages + if (package.session().is_closed()) + is_closed_back = true; + + Z_GDU *gdu_res = package.response().get(); // passing anything but z3950 packages - if (!gdu_req - || !(gdu_req->which == Z_GDU_Z3950)) + if (gdu_res && gdu_res->which == Z_GDU_Z3950) { - package.move(); - return; + // session closing only on Z39.50 close response + if (gdu_res->u.z3950->which == Z_APDU_close) + { + is_closed_back = true; + boost::mutex::scoped_lock scoped_lock(m_mutex); + remove_package(package.session().id()); + } + // any other Z39.50 package is removed from statistics + else + { + boost::mutex::scoped_lock scoped_lock(m_mutex); + remove_package(package.session().id()); + } } + // finally removing sessions and marking deads + if (is_closed_back || is_closed_front) + { + boost::mutex::scoped_lock scoped_lock(m_mutex); - // target selecting only on Z39.50 init request - if (gdu_req->u.z3950->which == Z_APDU_initRequest){ - - mp::odr odr_en(ODR_ENCODE); - Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest; - - // extracting virtual hosts - std::list vhosts; - - mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts); - - //std::cout << "LoadBalance::Impl::process() vhosts: " - // << vhosts.size() << "\n"; - //std::cout << "LoadBalance::Impl::process()" << *gdu_req << "\n"; - - // choosing one target according to load-balancing algorithm - - if (vhosts.size()){ - std::string target; - - // getting timestamp for receiving of package - boost::posix_time::ptime receive_time - = boost::posix_time::microsec_clock::local_time(); - - // //<< receive_time << " " - // //<< to_iso_string(receive_time) << " " - //<< to_iso_extended_string(receive_time) << " " - // package.session().id(); - - { // scope for locking local target database - boost::mutex::scoped_lock scoped_lock(m_mutex); - target = *vhosts.begin(); - } - - - // copying new target into init package - mp::util::set_vhost_otherinfo(&(org_init->otherInfo), odr_en, target); - package.request() = gdu_req; + // marking backend dead if backend closed without fronted close + if (is_closed_front == false) + add_dead(package.session().id()); + + remove_session(package.session().id()); + + // making sure that package is closed + package.session().close(); + } +} + +// statistic manipulating functions, +void yf::LoadBalance::Impl::add_dead(unsigned long session_id) +{ + std::string target = find_session_target(session_id); + + if (target.size() != 0) + { + std::map::iterator itarg; + itarg = m_target_stat.find(target); + if (itarg != m_target_stat.end() + && itarg->second.deads < std::numeric_limits::max()) + { + itarg->second.deads += 1; + // std:.cout << "add_dead " << session_id << " " << target + // << " d:" << itarg->second.deads << "\n"; } - } - - - // moving all Z39.50 package typess - package.move(); - +} +void yf::LoadBalance::Impl::add_package(unsigned long session_id) +{ + std::string target = find_session_target(session_id); - //boost::posix_time::ptime send_time - // = boost::posix_time::microsec_clock::local_time(); + if (target.size() != 0) + { + std::map::iterator itarg; + itarg = m_target_stat.find(target); + if (itarg != m_target_stat.end() + && itarg->second.packages + < std::numeric_limits::max()) + { + itarg->second.packages += 1; + } + } +} + +void yf::LoadBalance::Impl::remove_package(unsigned long session_id) +{ + std::string target = find_session_target(session_id); + + if (target.size() != 0) + { + std::map::iterator itarg; + itarg = m_target_stat.find(target); + if (itarg != m_target_stat.end() + && itarg->second.packages > 0) + { + itarg->second.packages -= 1; + } + } +} + +void yf::LoadBalance::Impl::add_session(unsigned long session_id, + std::string target) +{ + // finding and adding session + std::map::iterator isess; + isess = m_session_target.find(session_id); + if (isess == m_session_target.end()) + { + m_session_target.insert(std::make_pair(session_id, target)); + } + + // finding and adding target statistics + std::map::iterator itarg; + itarg = m_target_stat.find(target); + if (itarg == m_target_stat.end()) + { + TargetStat stat; + stat.sessions = 1; + stat.packages = 0; + stat.deads = 0; + m_target_stat.insert(std::make_pair(target, stat)); + } + else if (itarg->second.sessions < std::numeric_limits::max()) + { + itarg->second.sessions += 1; + } +} + +void yf::LoadBalance::Impl::remove_session(unsigned long session_id) +{ + std::string target; - //boost::posix_time::time_duration duration = send_time - receive_time; + // finding session + std::map::iterator isess; + isess = m_session_target.find(session_id); + if (isess == m_session_target.end()) + return; + else + target = isess->second; + + // finding target statistics + std::map::iterator itarg; + itarg = m_target_stat.find(target); + if (itarg == m_target_stat.end()) + { + m_session_target.erase(isess); + return; + } + + // counting session down + if (itarg->second.sessions > 0) + itarg->second.sessions -= 1; + + if (itarg->second.sessions == 0 && itarg->second.deads == 0) + { + m_target_stat.erase(itarg); + m_session_target.erase(isess); + } +} + +std::string yf::LoadBalance::Impl::find_session_target(unsigned long session_id) +{ + std::string target; + std::map::iterator isess; + isess = m_session_target.find(session_id); + if (isess != m_session_target.end()) + target = isess->second; + return target; +} - // { // scope for locking local target database - // boost::mutex::scoped_lock scoped_lock(m_mutex); - // target = *vhosts.begin(); - // } +// cost functions +unsigned int yf::LoadBalance::Impl::cost(std::string target) +{ + unsigned int cost = 0; + if (target.size() != 0) + { + std::map::iterator itarg; + itarg = m_target_stat.find(target); + if (itarg != m_target_stat.end()) + cost = itarg->second.cost(); + } + return cost; +} +unsigned int yf::LoadBalance::Impl::dead(std::string target) +{ + unsigned int dead = 0; + + if (target.size() != 0) + { + std::map::iterator itarg; + itarg = m_target_stat.find(target); + if (itarg != m_target_stat.end()) + dead = itarg->second.deads; + } + return dead; } @@ -165,8 +442,9 @@ extern "C" { /* * Local variables: * c-basic-offset: 4 + * c-file-style: "Stroustrup" * indent-tabs-mode: nil - * c-file-style: "stroustrup" * End: * vim: shiftwidth=4 tabstop=8 expandtab */ +