-/* $Id: filter_load_balance.cpp,v 1.5 2007-01-04 13:22:56 marc Exp $
- Copyright (c) 2005-2006, Index Data.
+/* This file is part of Metaproxy.
+ Copyright (C) Index Data
- See the LICENSE file for details
- */
+Metaproxy is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 2, or (at your option) any later
+version.
+
+Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
#include "config.hpp"
-#include "session.hpp"
-#include "package.hpp"
-#include "filter.hpp"
+#include <metaproxy/package.hpp>
+#include <metaproxy/filter.hpp>
#include "filter_load_balance.hpp"
-#include "util.hpp"
+#include <metaproxy/util.hpp>
+
#include <boost/thread/mutex.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include <yaz/diagbib1.h>
+#include <yaz/log.h>
#include <yaz/zgdu.h>
-//#include <iostream>
+// remove max macro if already defined (defined later in <limits>)
+#ifdef max
+#undef max
+#endif
+
#include <list>
#include <map>
#include <limits>
namespace mp = metaproxy_1;
namespace yf = mp::filter;
-namespace metaproxy_1 {
- namespace filter {
- class LoadBalance::Impl {
+namespace metaproxy_1
+{
+ namespace filter
+ {
+ class LoadBalance::Impl
+ {
public:
Impl();
~Impl();
void process(metaproxy_1::Package & package);
void configure(const xmlNode * ptr);
private:
- // statistic manipulating functions,
+ // statistic manipulating functions,
void add_dead(unsigned long session_id);
//void clear_dead(unsigned long session_id);
void add_package(unsigned long session_id);
unsigned int deads;
unsigned int cost() {
unsigned int c = sessions + packages + deads;
- //std::cout << "stats c:" << c
- // << " s:" << sessions
- // << " p:" << packages
- // << " d:" << deads
- // <<"\n";
return c;
}
};
}
// define Pimpl wrapper forwarding to Impl
-
+
yf::LoadBalance::LoadBalance() : m_p(new Impl)
{
}
{ // must have a destructor because of boost::scoped_ptr
}
-void yf::LoadBalance::configure(const xmlNode *xmlnode)
+void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only,
+ const char *path)
{
m_p->configure(xmlnode);
}
}
-// define Implementation stuff
-
-
-
yf::LoadBalance::Impl::Impl()
{
}
yf::LoadBalance::Impl::~Impl()
-{
+{
}
void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
void yf::LoadBalance::Impl::process(mp::Package &package)
{
-
bool is_closed_front = false;
- bool is_closed_back = false;
// checking for closed front end packages
- if (package.session().is_closed()){
+ if (package.session().is_closed())
+ {
is_closed_front = true;
- }
+ }
Z_GDU *gdu_req = package.request().get();
// passing anything but z3950 packages
- if (gdu_req && gdu_req->which == Z_GDU_Z3950){
-
+ if (gdu_req && gdu_req->which == Z_GDU_Z3950)
+ {
// target selecting only on Z39.50 init request
- if (gdu_req->u.z3950->which == Z_APDU_initRequest){
+ if (gdu_req->u.z3950->which == Z_APDU_initRequest)
+ {
+ yazpp_1::GDU base_req(gdu_req);
+ Z_APDU *apdu = base_req.get()->u.z3950;
+ Z_InitRequest *org_init = base_req.get()->u.z3950->u.initRequest;
mp::odr odr_en(ODR_ENCODE);
- Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest;
- // extracting virtual hosts
std::list<std::string> vhosts;
-
mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
-
- // choosing one target according to load-balancing algorithm
-
- if (vhosts.size()){
+ // get lowest of all vhosts.. Remove them if individually if
+ // they turn out to be bad..
+ while (1)
+ {
std::string target;
+ std::list<std::string>::iterator ivh = vhosts.begin();
+
+ Package init_pkg(package.session(), package.origin());
+ init_pkg.copy_filter(package);
+
unsigned int cost = std::numeric_limits<unsigned int>::max();
-
- { //locking scope for local databases
+ {
boost::mutex::scoped_lock scoped_lock(m_mutex);
-
- // load-balancing algorithm goes here
- //target = *vhosts.begin();
- for(std::list<std::string>::const_iterator ivh
- = vhosts.begin();
- ivh != vhosts.end();
- ivh++){
- if ((*ivh).size() != 0){
- unsigned int vhcost
+
+ for (; ivh != vhosts.end(); )
+ {
+ if ((*ivh).size() != 0)
+ {
+ unsigned int vhcost
= yf::LoadBalance::Impl::cost(*ivh);
- if (cost > vhcost){
+ yaz_log(YLOG_LOG, "Consider %s cost=%u vhcost=%u",
+ (*ivh).c_str(), cost, vhcost);
+ if (cost > vhcost)
+ {
cost = vhcost;
target = *ivh;
+ ivh = vhosts.erase(ivh);
}
+ else
+ ivh++;
}
- }
+ else
+ ivh++;
+ }
+ }
+ if (target.length() == 0)
+ break;
+ // copying new target into init package
+
+ yazpp_1::GDU init_gdu(base_req);
+ Z_InitRequest *init_req = init_gdu.get()->u.z3950->u.initRequest;
+
+ mp::util::set_vhost_otherinfo(&(init_req->otherInfo),
+ odr_en, target, 1);
- // updating local database
+ init_pkg.request() = init_gdu;
+
+ // moving all package types
+ init_pkg.move();
+
+ // checking for closed back end packages
+ if (!init_pkg.session().is_closed())
+ {
add_session(package.session().id(), target);
- yf::LoadBalance::Impl::cost(target);
- add_package(package.session().id());
+
+ package.response() = init_pkg.response();
+ return;
}
-
- // copying new target into init package
- mp::util::set_vhost_otherinfo(&(org_init->otherInfo),
- odr_en, target);
- package.request() = gdu_req;
- }
-
+ }
+ mp::odr odr;
+ package.response() = odr.create_initResponse(
+ apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
+ "load_balance: no available targets");
+ package.session().close();
+ return;
}
// frontend Z39.50 close request is added to statistics and marked
- else if (gdu_req->u.z3950->which == Z_APDU_close){
+ else if (gdu_req->u.z3950->which == Z_APDU_close)
+ {
is_closed_front = true;
- boost::mutex::scoped_lock scoped_lock(m_mutex);
+ boost::mutex::scoped_lock scoped_lock(m_mutex);
add_package(package.session().id());
- }
+ }
// any other Z39.50 package is added to statistics
- else {
- boost::mutex::scoped_lock scoped_lock(m_mutex);
+ else
+ {
+ boost::mutex::scoped_lock scoped_lock(m_mutex);
add_package(package.session().id());
}
}
-
- // moving all package types
+
+ // moving all package types
package.move();
+ bool is_closed_back = false;
// checking for closed back end packages
if (package.session().is_closed())
Z_GDU *gdu_res = package.response().get();
// passing anything but z3950 packages
- if (gdu_res && gdu_res->which == Z_GDU_Z3950){
-
+ if (gdu_res && gdu_res->which == Z_GDU_Z3950)
+ {
// session closing only on Z39.50 close response
- if (gdu_res->u.z3950->which == Z_APDU_close){
+ if (gdu_res->u.z3950->which == Z_APDU_close)
+ {
is_closed_back = true;
boost::mutex::scoped_lock scoped_lock(m_mutex);
remove_package(package.session().id());
- }
+ }
// any other Z39.50 package is removed from statistics
- else {
+ else
+ {
boost::mutex::scoped_lock scoped_lock(m_mutex);
remove_package(package.session().id());
}
}
// finally removing sessions and marking deads
- if (is_closed_back || is_closed_front){
+ if (is_closed_back || is_closed_front)
+ {
boost::mutex::scoped_lock scoped_lock(m_mutex);
- // marking backend dead if backend closed without fronted close
+ // marking backend dead if backend closed without fronted close
if (is_closed_front == false)
add_dead(package.session().id());
package.session().close();
}
}
-
-// getting timestamp for receiving of package
-//boost::posix_time::ptime receive_time
-// = boost::posix_time::microsec_clock::local_time();
-// //<< receive_time << " "
-// //<< to_iso_string(receive_time) << " "
-//<< to_iso_extended_string(receive_time) << " "
-
-// statistic manipulating functions,
-void yf::LoadBalance::Impl::add_dead(unsigned long session_id){
-
-
+// statistic manipulating functions,
+void yf::LoadBalance::Impl::add_dead(unsigned long session_id)
+{
std::string target = find_session_target(session_id);
- if (target.size() != 0){
- std::map<std::string, TargetStat>::iterator itarg;
+ if (target.size() != 0)
+ {
+ std::map<std::string, TargetStat>::iterator itarg;
itarg = m_target_stat.find(target);
if (itarg != m_target_stat.end()
- && itarg->second.deads < std::numeric_limits<unsigned int>::max()){
+ && itarg->second.deads < std::numeric_limits<unsigned int>::max())
+ {
itarg->second.deads += 1;
- // std:.cout << "add_dead " << session_id << " " << target
+ // std:.cout << "add_dead " << session_id << " " << target
// << " d:" << itarg->second.deads << "\n";
}
}
-};
-
-//void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){
-// std::cout << "clear_dead " << session_id << "\n";
-//};
-
-void yf::LoadBalance::Impl::add_package(unsigned long session_id){
+}
+void yf::LoadBalance::Impl::add_package(unsigned long session_id)
+{
std::string target = find_session_target(session_id);
- if (target.size() != 0){
- std::map<std::string, TargetStat>::iterator itarg;
+ if (target.size() != 0)
+ {
+ std::map<std::string, TargetStat>::iterator itarg;
itarg = m_target_stat.find(target);
if (itarg != m_target_stat.end()
- && itarg->second.packages
- < std::numeric_limits<unsigned int>::max()){
+ && itarg->second.packages
+ < std::numeric_limits<unsigned int>::max())
+ {
itarg->second.packages += 1;
- // std:.cout << "add_package " << session_id << " " << target
- // << " p:" << itarg->second.packages << "\n";
}
}
-};
+}
-void yf::LoadBalance::Impl::remove_package(unsigned long session_id){
+void yf::LoadBalance::Impl::remove_package(unsigned long session_id)
+{
std::string target = find_session_target(session_id);
- if (target.size() != 0){
- std::map<std::string, TargetStat>::iterator itarg;
+ if (target.size() != 0)
+ {
+ std::map<std::string, TargetStat>::iterator itarg;
itarg = m_target_stat.find(target);
if (itarg != m_target_stat.end()
- && itarg->second.packages > 0){
+ && itarg->second.packages > 0)
+ {
itarg->second.packages -= 1;
- // std:.cout << "remove_package " << session_id << " " << target
- // << " p:" << itarg->second.packages << "\n";
}
}
-};
-
-void yf::LoadBalance::Impl::add_session(unsigned long session_id,
- std::string target){
+}
+void yf::LoadBalance::Impl::add_session(unsigned long session_id,
+ std::string target)
+{
// finding and adding session
std::map<unsigned long, std::string>::iterator isess;
isess = m_session_target.find(session_id);
- if (isess == m_session_target.end()){
+ if (isess == m_session_target.end())
+ {
m_session_target.insert(std::make_pair(session_id, target));
}
// finding and adding target statistics
std::map<std::string, TargetStat>::iterator itarg;
itarg = m_target_stat.find(target);
- if (itarg == m_target_stat.end()){
+ if (itarg == m_target_stat.end())
+ {
TargetStat stat;
stat.sessions = 1;
- stat.packages = 0; // no idea why the defaut constructor TargetStat()
- stat.deads = 0; // is not initializig this correctly to zero ??
- m_target_stat.insert(std::make_pair(target, stat));
- // std:.cout << "add_session " << session_id << " " << target
- // << " s:1\n";
- }
+ stat.packages = 0;
+ stat.deads = 0;
+ m_target_stat.insert(std::make_pair(target, stat));
+ }
else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
{
itarg->second.sessions += 1;
- // std:.cout << "add_session " << session_id << " " << target
- // << " s:" << itarg->second.sessions << "\n";
}
-};
-
-void yf::LoadBalance::Impl::remove_session(unsigned long session_id){
+}
+void yf::LoadBalance::Impl::remove_session(unsigned long session_id)
+{
std::string target;
// finding session
// finding target statistics
std::map<std::string, TargetStat>::iterator itarg;
itarg = m_target_stat.find(target);
- if (itarg == m_target_stat.end()){
+ if (itarg == m_target_stat.end())
+ {
m_session_target.erase(isess);
return;
}
-
+
// counting session down
if (itarg->second.sessions > 0)
itarg->second.sessions -= 1;
- // std:.cout << "remove_session " << session_id << " " << target
- // << " s:" << itarg->second.sessions << "\n";
-
- // clearing empty sessions and targets
- if (itarg->second.sessions == 0 && itarg->second.deads == 0 ){
+ if (itarg->second.sessions == 0 && itarg->second.deads == 0)
+ {
m_target_stat.erase(itarg);
m_session_target.erase(isess);
}
-};
-
-std::string
-yf::LoadBalance::Impl::find_session_target(unsigned long session_id){
+}
+std::string yf::LoadBalance::Impl::find_session_target(unsigned long session_id)
+{
std::string target;
std::map<unsigned long, std::string>::iterator isess;
isess = m_session_target.find(session_id);
// cost functions
-unsigned int yf::LoadBalance::Impl::cost(std::string target){
-
- unsigned int cost;
+unsigned int yf::LoadBalance::Impl::cost(std::string target)
+{
+ unsigned int cost = 0;
- if (target.size() != 0){
- std::map<std::string, TargetStat>::iterator itarg;
+ if (target.size() != 0)
+ {
+ std::map<std::string, TargetStat>::iterator itarg;
itarg = m_target_stat.find(target);
- if (itarg != m_target_stat.end()){
+ if (itarg != m_target_stat.end())
cost = itarg->second.cost();
- }
}
-
- //std::cout << "cost " << target << " c:" << cost << "\n";
return cost;
-};
-
-unsigned int yf::LoadBalance::Impl::dead(std::string target){
+}
- unsigned int dead;
+unsigned int yf::LoadBalance::Impl::dead(std::string target)
+{
+ unsigned int dead = 0;
- if (target.size() != 0){
- std::map<std::string, TargetStat>::iterator itarg;
+ if (target.size() != 0)
+ {
+ std::map<std::string, TargetStat>::iterator itarg;
itarg = m_target_stat.find(target);
- if (itarg != m_target_stat.end()){
+ if (itarg != m_target_stat.end())
dead = itarg->second.deads;
- }
}
-
- //std::cout << "dead " << target << " d:" << dead << "\n";
return dead;
-};
-
-
+}
static mp::filter::Base* filter_creator()
/*
* Local variables:
* c-basic-offset: 4
+ * c-file-style: "Stroustrup"
* indent-tabs-mode: nil
- * c-file-style: "stroustrup"
* End:
* vim: shiftwidth=4 tabstop=8 expandtab
*/
+