X-Git-Url: http://git.indexdata.com/?a=blobdiff_plain;f=src%2Ffilter_load_balance.cpp;h=1ad761c5f4c4ce4d4c73b7ee19757bc5474b6304;hb=HEAD;hp=a534f601587be9a6e6e0d14f1495c056b7e7e965;hpb=665559cbc22546e8df69be33a7d492294cab9fb1;p=metaproxy-moved-to-github.git diff --git a/src/filter_load_balance.cpp b/src/filter_load_balance.cpp index a534f60..1ad761c 100644 --- a/src/filter_load_balance.cpp +++ b/src/filter_load_balance.cpp @@ -1,5 +1,5 @@ /* This file is part of Metaproxy. - Copyright (C) 2005-2008 Index Data + Copyright (C) Index Data Metaproxy is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free @@ -17,15 +17,16 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "config.hpp" -#include "session.hpp" -#include "package.hpp" -#include "filter.hpp" +#include +#include #include "filter_load_balance.hpp" -#include "util.hpp" +#include #include +#include +#include #include // remove max macro if already defined (defined later in ) @@ -33,7 +34,6 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #undef max #endif -//#include #include #include #include @@ -41,16 +41,19 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA namespace mp = metaproxy_1; namespace yf = mp::filter; -namespace metaproxy_1 { - namespace filter { - class LoadBalance::Impl { +namespace metaproxy_1 +{ + namespace filter + { + class LoadBalance::Impl + { public: Impl(); ~Impl(); void process(metaproxy_1::Package & package); void configure(const xmlNode * ptr); private: - // statistic manipulating functions, + // statistic manipulating functions, void add_dead(unsigned long session_id); //void clear_dead(unsigned long session_id); void add_package(unsigned long session_id); @@ -71,11 +74,6 @@ namespace metaproxy_1 { unsigned int deads; unsigned int cost() { unsigned int c = sessions + packages + deads; - //std::cout << "stats c:" << c - // << " s:" << sessions - // << " p:" << packages - // << " d:" << deads - // <<"\n"; return c; } }; @@ -89,7 +87,7 @@ namespace metaproxy_1 { } // define Pimpl wrapper forwarding to Impl - + yf::LoadBalance::LoadBalance() : m_p(new Impl) { } @@ -98,7 +96,8 @@ yf::LoadBalance::~LoadBalance() { // must have a destructor because of boost::scoped_ptr } -void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only) +void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only, + const char *path) { m_p->configure(xmlnode); } @@ -109,16 +108,12 @@ void yf::LoadBalance::process(mp::Package &package) const } -// define Implementation stuff - - - yf::LoadBalance::Impl::Impl() { } yf::LoadBalance::Impl::~Impl() -{ +{ } void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode) @@ -127,85 +122,111 @@ void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode) void yf::LoadBalance::Impl::process(mp::Package &package) { - bool is_closed_front = false; - bool is_closed_back = false; // checking for closed front end packages - if (package.session().is_closed()){ + if (package.session().is_closed()) + { is_closed_front = true; - } + } Z_GDU *gdu_req = package.request().get(); // passing anything but z3950 packages - if (gdu_req && gdu_req->which == Z_GDU_Z3950){ - + if (gdu_req && gdu_req->which == Z_GDU_Z3950) + { // target selecting only on Z39.50 init request - if (gdu_req->u.z3950->which == Z_APDU_initRequest){ + if (gdu_req->u.z3950->which == Z_APDU_initRequest) + { + yazpp_1::GDU base_req(gdu_req); + Z_APDU *apdu = base_req.get()->u.z3950; + Z_InitRequest *org_init = base_req.get()->u.z3950->u.initRequest; mp::odr odr_en(ODR_ENCODE); - Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest; - // extracting virtual hosts std::list vhosts; - mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts); - - // choosing one target according to load-balancing algorithm - - if (vhosts.size()){ - std::string target; + // get lowest of all vhosts.. Remove them if individually if + // they turn out to be bad.. + while (1) + { + std::list::iterator ivh = vhosts.begin(); + std::list::iterator ivh_pick = vhosts.end(); + + Package init_pkg(package.session(), package.origin()); + init_pkg.copy_filter(package); + unsigned int cost = std::numeric_limits::max(); - - { //locking scope for local databases + { boost::mutex::scoped_lock scoped_lock(m_mutex); - - // load-balancing algorithm goes here - //target = *vhosts.begin(); - for(std::list::const_iterator ivh - = vhosts.begin(); - ivh != vhosts.end(); - ivh++){ - if ((*ivh).size() != 0){ - unsigned int vhcost + + for (; ivh != vhosts.end(); ivh++) + { + if ((*ivh).size() != 0) + { + unsigned int vhcost = yf::LoadBalance::Impl::cost(*ivh); - if (cost > vhcost){ + yaz_log(YLOG_LOG, "Consider %s cost=%u vhcost=%u", + (*ivh).c_str(), cost, vhcost); + if (cost > vhcost) + { + ivh_pick = ivh; cost = vhcost; - target = *ivh; } } - } - - // updating local database - add_session(package.session().id(), target); - yf::LoadBalance::Impl::cost(target); - add_package(package.session().id()); + } } - + if (ivh_pick == vhosts.end()) + break; + std::string target = *ivh_pick; + vhosts.erase(ivh_pick); // copying new target into init package - mp::util::set_vhost_otherinfo(&(org_init->otherInfo), + yazpp_1::GDU init_gdu(base_req); + Z_InitRequest *init_req = init_gdu.get()->u.z3950->u.initRequest; + + mp::util::set_vhost_otherinfo(&(init_req->otherInfo), odr_en, target, 1); - package.request() = gdu_req; - } - + + init_pkg.request() = init_gdu; + + // moving all package types + init_pkg.move(); + + // checking for closed back end packages + if (!init_pkg.session().is_closed()) + { + add_session(package.session().id(), target); + + package.response() = init_pkg.response(); + return; + } + } + mp::odr odr; + package.response() = odr.create_initResponse( + apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, + "load_balance: no available targets"); + package.session().close(); + return; } // frontend Z39.50 close request is added to statistics and marked - else if (gdu_req->u.z3950->which == Z_APDU_close){ + else if (gdu_req->u.z3950->which == Z_APDU_close) + { is_closed_front = true; - boost::mutex::scoped_lock scoped_lock(m_mutex); + boost::mutex::scoped_lock scoped_lock(m_mutex); add_package(package.session().id()); - } + } // any other Z39.50 package is added to statistics - else { - boost::mutex::scoped_lock scoped_lock(m_mutex); + else + { + boost::mutex::scoped_lock scoped_lock(m_mutex); add_package(package.session().id()); } } - - // moving all package types + + // moving all package types package.move(); + bool is_closed_back = false; // checking for closed back end packages if (package.session().is_closed()) @@ -214,26 +235,29 @@ void yf::LoadBalance::Impl::process(mp::Package &package) Z_GDU *gdu_res = package.response().get(); // passing anything but z3950 packages - if (gdu_res && gdu_res->which == Z_GDU_Z3950){ - + if (gdu_res && gdu_res->which == Z_GDU_Z3950) + { // session closing only on Z39.50 close response - if (gdu_res->u.z3950->which == Z_APDU_close){ + if (gdu_res->u.z3950->which == Z_APDU_close) + { is_closed_back = true; boost::mutex::scoped_lock scoped_lock(m_mutex); remove_package(package.session().id()); - } + } // any other Z39.50 package is removed from statistics - else { + else + { boost::mutex::scoped_lock scoped_lock(m_mutex); remove_package(package.session().id()); } } // finally removing sessions and marking deads - if (is_closed_back || is_closed_front){ + if (is_closed_back || is_closed_front) + { boost::mutex::scoped_lock scoped_lock(m_mutex); - // marking backend dead if backend closed without fronted close + // marking backend dead if backend closed without fronted close if (is_closed_front == false) add_dead(package.session().id()); @@ -243,93 +267,89 @@ void yf::LoadBalance::Impl::process(mp::Package &package) package.session().close(); } } - -// statistic manipulating functions, -void yf::LoadBalance::Impl::add_dead(unsigned long session_id){ - +// statistic manipulating functions, +void yf::LoadBalance::Impl::add_dead(unsigned long session_id) +{ std::string target = find_session_target(session_id); - if (target.size() != 0){ - std::map::iterator itarg; + if (target.size() != 0) + { + std::map::iterator itarg; itarg = m_target_stat.find(target); if (itarg != m_target_stat.end() - && itarg->second.deads < std::numeric_limits::max()){ + && itarg->second.deads < std::numeric_limits::max()) + { itarg->second.deads += 1; - // std:.cout << "add_dead " << session_id << " " << target + // std:.cout << "add_dead " << session_id << " " << target // << " d:" << itarg->second.deads << "\n"; } } -}; - -//void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){ -// std::cout << "clear_dead " << session_id << "\n"; -//}; - -void yf::LoadBalance::Impl::add_package(unsigned long session_id){ +} +void yf::LoadBalance::Impl::add_package(unsigned long session_id) +{ std::string target = find_session_target(session_id); - if (target.size() != 0){ - std::map::iterator itarg; + if (target.size() != 0) + { + std::map::iterator itarg; itarg = m_target_stat.find(target); if (itarg != m_target_stat.end() - && itarg->second.packages - < std::numeric_limits::max()){ + && itarg->second.packages + < std::numeric_limits::max()) + { itarg->second.packages += 1; - // std:.cout << "add_package " << session_id << " " << target - // << " p:" << itarg->second.packages << "\n"; } } -}; +} -void yf::LoadBalance::Impl::remove_package(unsigned long session_id){ +void yf::LoadBalance::Impl::remove_package(unsigned long session_id) +{ std::string target = find_session_target(session_id); - if (target.size() != 0){ - std::map::iterator itarg; + if (target.size() != 0) + { + std::map::iterator itarg; itarg = m_target_stat.find(target); if (itarg != m_target_stat.end() - && itarg->second.packages > 0){ + && itarg->second.packages > 0) + { itarg->second.packages -= 1; - // std:.cout << "remove_package " << session_id << " " << target - // << " p:" << itarg->second.packages << "\n"; } } -}; - -void yf::LoadBalance::Impl::add_session(unsigned long session_id, - std::string target){ +} +void yf::LoadBalance::Impl::add_session(unsigned long session_id, + std::string target) +{ // finding and adding session std::map::iterator isess; isess = m_session_target.find(session_id); - if (isess == m_session_target.end()){ + if (isess == m_session_target.end()) + { m_session_target.insert(std::make_pair(session_id, target)); } // finding and adding target statistics std::map::iterator itarg; itarg = m_target_stat.find(target); - if (itarg == m_target_stat.end()){ + if (itarg == m_target_stat.end()) + { TargetStat stat; stat.sessions = 1; - stat.packages = 0; // no idea why the defaut constructor TargetStat() - stat.deads = 0; // is not initializig this correctly to zero ?? - m_target_stat.insert(std::make_pair(target, stat)); - // std:.cout << "add_session " << session_id << " " << target - // << " s:1\n"; - } + stat.packages = 0; + stat.deads = 0; + m_target_stat.insert(std::make_pair(target, stat)); + } else if (itarg->second.sessions < std::numeric_limits::max()) { itarg->second.sessions += 1; - // std:.cout << "add_session " << session_id << " " << target - // << " s:" << itarg->second.sessions << "\n"; } -}; - -void yf::LoadBalance::Impl::remove_session(unsigned long session_id){ +} +void yf::LoadBalance::Impl::remove_session(unsigned long session_id) +{ std::string target; // finding session @@ -343,28 +363,25 @@ void yf::LoadBalance::Impl::remove_session(unsigned long session_id){ // finding target statistics std::map::iterator itarg; itarg = m_target_stat.find(target); - if (itarg == m_target_stat.end()){ + if (itarg == m_target_stat.end()) + { m_session_target.erase(isess); return; } - + // counting session down if (itarg->second.sessions > 0) itarg->second.sessions -= 1; - // std:.cout << "remove_session " << session_id << " " << target - // << " s:" << itarg->second.sessions << "\n"; - - // clearing empty sessions and targets - if (itarg->second.sessions == 0 && itarg->second.deads == 0 ){ + if (itarg->second.sessions == 0 && itarg->second.deads == 0) + { m_target_stat.erase(itarg); m_session_target.erase(isess); } -}; - -std::string -yf::LoadBalance::Impl::find_session_target(unsigned long session_id){ +} +std::string yf::LoadBalance::Impl::find_session_target(unsigned long session_id) +{ std::string target; std::map::iterator isess; isess = m_session_target.find(session_id); @@ -375,39 +392,33 @@ yf::LoadBalance::Impl::find_session_target(unsigned long session_id){ // cost functions -unsigned int yf::LoadBalance::Impl::cost(std::string target){ - - unsigned int cost; +unsigned int yf::LoadBalance::Impl::cost(std::string target) +{ + unsigned int cost = 0; - if (target.size() != 0){ - std::map::iterator itarg; + if (target.size() != 0) + { + std::map::iterator itarg; itarg = m_target_stat.find(target); - if (itarg != m_target_stat.end()){ + if (itarg != m_target_stat.end()) cost = itarg->second.cost(); - } } - - //std::cout << "cost " << target << " c:" << cost << "\n"; return cost; -}; - -unsigned int yf::LoadBalance::Impl::dead(std::string target){ +} - unsigned int dead; +unsigned int yf::LoadBalance::Impl::dead(std::string target) +{ + unsigned int dead = 0; - if (target.size() != 0){ - std::map::iterator itarg; + if (target.size() != 0) + { + std::map::iterator itarg; itarg = m_target_stat.find(target); - if (itarg != m_target_stat.end()){ + if (itarg != m_target_stat.end()) dead = itarg->second.deads; - } } - - //std::cout << "dead " << target << " d:" << dead << "\n"; return dead; -}; - - +} static mp::filter::Base* filter_creator()