1 /* This file is part of Metaproxy.
2 Copyright (C) Index Data
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 #include <metaproxy/package.hpp>
21 #include <metaproxy/filter.hpp>
22 #include "filter_load_balance.hpp"
23 #include <metaproxy/util.hpp>
26 #include <boost/thread/mutex.hpp>
28 #include <yaz/diagbib1.h>
32 // remove max macro if already defined (defined later in <limits>)
41 namespace mp = metaproxy_1;
42 namespace yf = mp::filter;
48 class LoadBalance::Impl
53 void process(metaproxy_1::Package & package);
54 void configure(const xmlNode * ptr);
56 // statistic manipulating functions,
57 void add_dead(unsigned long session_id);
58 //void clear_dead(unsigned long session_id);
59 void add_package(unsigned long session_id);
60 void remove_package(unsigned long session_id);
61 void add_session(unsigned long session_id, std::string target);
62 void remove_session(unsigned long session_id);
63 std::string find_session_target(unsigned long session_id);
66 unsigned int cost(std::string target);
67 unsigned int dead(std::string target);
72 unsigned int sessions;
73 unsigned int packages;
76 unsigned int c = sessions + packages + deads;
81 // local protected databases
83 std::map<std::string, TargetStat> m_target_stat;
84 std::map<unsigned long, std::string> m_session_target;
89 // define Pimpl wrapper forwarding to Impl
91 yf::LoadBalance::LoadBalance() : m_p(new Impl)
95 yf::LoadBalance::~LoadBalance()
96 { // must have a destructor because of boost::scoped_ptr
99 void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only,
102 m_p->configure(xmlnode);
105 void yf::LoadBalance::process(mp::Package &package) const
107 m_p->process(package);
111 yf::LoadBalance::Impl::Impl()
115 yf::LoadBalance::Impl::~Impl()
119 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
123 void yf::LoadBalance::Impl::process(mp::Package &package)
125 bool is_closed_front = false;
127 // checking for closed front end packages
128 if (package.session().is_closed())
130 is_closed_front = true;
133 Z_GDU *gdu_req = package.request().get();
135 // passing anything but z3950 packages
136 if (gdu_req && gdu_req->which == Z_GDU_Z3950)
138 // target selecting only on Z39.50 init request
139 if (gdu_req->u.z3950->which == Z_APDU_initRequest)
141 yazpp_1::GDU base_req(gdu_req);
142 Z_APDU *apdu = base_req.get()->u.z3950;
144 Z_InitRequest *org_init = base_req.get()->u.z3950->u.initRequest;
145 mp::odr odr_en(ODR_ENCODE);
147 std::list<std::string> vhosts;
148 mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
149 // get lowest of all vhosts.. Remove them if individually if
150 // they turn out to be bad..
153 std::list<std::string>::iterator ivh = vhosts.begin();
154 std::list<std::string>::iterator ivh_pick = vhosts.end();
156 Package init_pkg(package.session(), package.origin());
157 init_pkg.copy_filter(package);
159 unsigned int cost = std::numeric_limits<unsigned int>::max();
161 boost::mutex::scoped_lock scoped_lock(m_mutex);
163 for (; ivh != vhosts.end(); ivh++)
165 if ((*ivh).size() != 0)
168 = yf::LoadBalance::Impl::cost(*ivh);
169 yaz_log(YLOG_LOG, "Consider %s cost=%u vhcost=%u",
170 (*ivh).c_str(), cost, vhcost);
179 if (ivh_pick == vhosts.end())
181 std::string target = *ivh_pick;
182 vhosts.erase(ivh_pick);
183 // copying new target into init package
184 yazpp_1::GDU init_gdu(base_req);
185 Z_InitRequest *init_req = init_gdu.get()->u.z3950->u.initRequest;
187 mp::util::set_vhost_otherinfo(&(init_req->otherInfo),
190 init_pkg.request() = init_gdu;
192 // moving all package types
195 // checking for closed back end packages
196 if (!init_pkg.session().is_closed())
198 add_session(package.session().id(), target);
200 package.response() = init_pkg.response();
205 package.response() = odr.create_initResponse(
206 apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
207 "load_balance: no available targets");
208 package.session().close();
211 // frontend Z39.50 close request is added to statistics and marked
212 else if (gdu_req->u.z3950->which == Z_APDU_close)
214 is_closed_front = true;
215 boost::mutex::scoped_lock scoped_lock(m_mutex);
216 add_package(package.session().id());
218 // any other Z39.50 package is added to statistics
221 boost::mutex::scoped_lock scoped_lock(m_mutex);
222 add_package(package.session().id());
226 // moving all package types
229 bool is_closed_back = false;
231 // checking for closed back end packages
232 if (package.session().is_closed())
233 is_closed_back = true;
235 Z_GDU *gdu_res = package.response().get();
237 // passing anything but z3950 packages
238 if (gdu_res && gdu_res->which == Z_GDU_Z3950)
240 // session closing only on Z39.50 close response
241 if (gdu_res->u.z3950->which == Z_APDU_close)
243 is_closed_back = true;
244 boost::mutex::scoped_lock scoped_lock(m_mutex);
245 remove_package(package.session().id());
247 // any other Z39.50 package is removed from statistics
250 boost::mutex::scoped_lock scoped_lock(m_mutex);
251 remove_package(package.session().id());
255 // finally removing sessions and marking deads
256 if (is_closed_back || is_closed_front)
258 boost::mutex::scoped_lock scoped_lock(m_mutex);
260 // marking backend dead if backend closed without fronted close
261 if (is_closed_front == false)
262 add_dead(package.session().id());
264 remove_session(package.session().id());
266 // making sure that package is closed
267 package.session().close();
271 // statistic manipulating functions,
272 void yf::LoadBalance::Impl::add_dead(unsigned long session_id)
274 std::string target = find_session_target(session_id);
276 if (target.size() != 0)
278 std::map<std::string, TargetStat>::iterator itarg;
279 itarg = m_target_stat.find(target);
280 if (itarg != m_target_stat.end()
281 && itarg->second.deads < std::numeric_limits<unsigned int>::max())
283 itarg->second.deads += 1;
284 // std:.cout << "add_dead " << session_id << " " << target
285 // << " d:" << itarg->second.deads << "\n";
290 void yf::LoadBalance::Impl::add_package(unsigned long session_id)
292 std::string target = find_session_target(session_id);
294 if (target.size() != 0)
296 std::map<std::string, TargetStat>::iterator itarg;
297 itarg = m_target_stat.find(target);
298 if (itarg != m_target_stat.end()
299 && itarg->second.packages
300 < std::numeric_limits<unsigned int>::max())
302 itarg->second.packages += 1;
307 void yf::LoadBalance::Impl::remove_package(unsigned long session_id)
309 std::string target = find_session_target(session_id);
311 if (target.size() != 0)
313 std::map<std::string, TargetStat>::iterator itarg;
314 itarg = m_target_stat.find(target);
315 if (itarg != m_target_stat.end()
316 && itarg->second.packages > 0)
318 itarg->second.packages -= 1;
323 void yf::LoadBalance::Impl::add_session(unsigned long session_id,
326 // finding and adding session
327 std::map<unsigned long, std::string>::iterator isess;
328 isess = m_session_target.find(session_id);
329 if (isess == m_session_target.end())
331 m_session_target.insert(std::make_pair(session_id, target));
334 // finding and adding target statistics
335 std::map<std::string, TargetStat>::iterator itarg;
336 itarg = m_target_stat.find(target);
337 if (itarg == m_target_stat.end())
343 m_target_stat.insert(std::make_pair(target, stat));
345 else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
347 itarg->second.sessions += 1;
351 void yf::LoadBalance::Impl::remove_session(unsigned long session_id)
356 std::map<unsigned long, std::string>::iterator isess;
357 isess = m_session_target.find(session_id);
358 if (isess == m_session_target.end())
361 target = isess->second;
363 // finding target statistics
364 std::map<std::string, TargetStat>::iterator itarg;
365 itarg = m_target_stat.find(target);
366 if (itarg == m_target_stat.end())
368 m_session_target.erase(isess);
372 // counting session down
373 if (itarg->second.sessions > 0)
374 itarg->second.sessions -= 1;
376 if (itarg->second.sessions == 0 && itarg->second.deads == 0)
378 m_target_stat.erase(itarg);
379 m_session_target.erase(isess);
383 std::string yf::LoadBalance::Impl::find_session_target(unsigned long session_id)
386 std::map<unsigned long, std::string>::iterator isess;
387 isess = m_session_target.find(session_id);
388 if (isess != m_session_target.end())
389 target = isess->second;
395 unsigned int yf::LoadBalance::Impl::cost(std::string target)
397 unsigned int cost = 0;
399 if (target.size() != 0)
401 std::map<std::string, TargetStat>::iterator itarg;
402 itarg = m_target_stat.find(target);
403 if (itarg != m_target_stat.end())
404 cost = itarg->second.cost();
409 unsigned int yf::LoadBalance::Impl::dead(std::string target)
411 unsigned int dead = 0;
413 if (target.size() != 0)
415 std::map<std::string, TargetStat>::iterator itarg;
416 itarg = m_target_stat.find(target);
417 if (itarg != m_target_stat.end())
418 dead = itarg->second.deads;
424 static mp::filter::Base* filter_creator()
426 return new mp::filter::LoadBalance;
430 struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
441 * c-file-style: "Stroustrup"
442 * indent-tabs-mode: nil
444 * vim: shiftwidth=4 tabstop=8 expandtab