Fix load_balance: does not retry dead target MP-626
[metaproxy-moved-to-github.git] / src / filter_load_balance.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20 #include <metaproxy/package.hpp>
21 #include <metaproxy/filter.hpp>
22 #include "filter_load_balance.hpp"
23 #include <metaproxy/util.hpp>
24
25
26 #include <boost/thread/mutex.hpp>
27
28 #include <yaz/diagbib1.h>
29 #include <yaz/log.h>
30 #include <yaz/zgdu.h>
31
32 // remove max macro if already defined (defined later in <limits>)
33 #ifdef max
34 #undef max
35 #endif
36
37 #include <list>
38 #include <map>
39 #include <limits>
40
41 namespace mp = metaproxy_1;
42 namespace yf = mp::filter;
43
44 namespace metaproxy_1
45 {
46     namespace filter
47     {
48         class LoadBalance::Impl
49         {
50         public:
51             Impl();
52             ~Impl();
53             void process(metaproxy_1::Package & package);
54             void configure(const xmlNode * ptr);
55         private:
56             // statistic manipulating functions,
57             void add_dead(unsigned long session_id);
58             //void clear_dead(unsigned long session_id);
59             void add_package(unsigned long session_id);
60             void remove_package(unsigned long session_id);
61             void add_session(unsigned long session_id, std::string target);
62             void remove_session(unsigned long session_id);
63             std::string find_session_target(unsigned long session_id);
64
65             // cost functions
66             unsigned int cost(std::string target);
67             unsigned int dead(std::string target);
68
69             // local classes
70             class TargetStat {
71             public:
72                 unsigned int sessions;
73                 unsigned int packages;
74                 unsigned int deads;
75                 unsigned int cost() {
76                     unsigned int c = sessions + packages + deads;
77                     return c;
78                 }
79             };
80
81             // local protected databases
82             boost::mutex m_mutex;
83             std::map<std::string, TargetStat> m_target_stat;
84             std::map<unsigned long, std::string> m_session_target;
85         };
86     }
87 }
88
89 // define Pimpl wrapper forwarding to Impl
90
91 yf::LoadBalance::LoadBalance() : m_p(new Impl)
92 {
93 }
94
95 yf::LoadBalance::~LoadBalance()
96 {  // must have a destructor because of boost::scoped_ptr
97 }
98
99 void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only,
100                                 const char *path)
101 {
102     m_p->configure(xmlnode);
103 }
104
105 void yf::LoadBalance::process(mp::Package &package) const
106 {
107     m_p->process(package);
108 }
109
110
111 yf::LoadBalance::Impl::Impl()
112 {
113 }
114
115 yf::LoadBalance::Impl::~Impl()
116 {
117 }
118
119 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
120 {
121 }
122
123 void yf::LoadBalance::Impl::process(mp::Package &package)
124 {
125     bool is_closed_front = false;
126
127     // checking for closed front end packages
128     if (package.session().is_closed())
129     {
130         is_closed_front = true;
131     }
132
133     Z_GDU *gdu_req = package.request().get();
134
135     // passing anything but z3950 packages
136     if (gdu_req && gdu_req->which == Z_GDU_Z3950)
137     {
138         // target selecting only on Z39.50 init request
139         if (gdu_req->u.z3950->which == Z_APDU_initRequest)
140         {
141             yazpp_1::GDU base_req(gdu_req);
142             Z_APDU *apdu = base_req.get()->u.z3950;
143
144             Z_InitRequest *org_init = base_req.get()->u.z3950->u.initRequest;
145             mp::odr odr_en(ODR_ENCODE);
146
147             std::list<std::string> vhosts;
148             mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
149             // get lowest of all vhosts.. Remove them if individually if
150             // they turn out to be bad..
151             while (1)
152             {
153                 std::list<std::string>::iterator ivh = vhosts.begin();
154                 std::list<std::string>::iterator ivh_pick = vhosts.end();
155
156                 Package init_pkg(package.session(), package.origin());
157                 init_pkg.copy_filter(package);
158
159                 unsigned int cost = std::numeric_limits<unsigned int>::max();
160                 {
161                     boost::mutex::scoped_lock scoped_lock(m_mutex);
162
163                     for (; ivh != vhosts.end(); ivh++)
164                     {
165                         if ((*ivh).size() != 0)
166                         {
167                             unsigned int vhcost
168                                 = yf::LoadBalance::Impl::cost(*ivh);
169                             yaz_log(YLOG_LOG, "Consider %s cost=%u vhcost=%u",
170                                     (*ivh).c_str(), cost, vhcost);
171                             if (cost > vhcost)
172                             {
173                                 ivh_pick = ivh;
174                                 cost = vhcost;
175                             }
176                         }
177                     }
178                 }
179                 if (ivh_pick == vhosts.end())
180                     break;
181                 std::string target = *ivh_pick;
182                 vhosts.erase(ivh_pick);
183                 // copying new target into init package
184                 yazpp_1::GDU init_gdu(base_req);
185                 Z_InitRequest *init_req = init_gdu.get()->u.z3950->u.initRequest;
186
187                 mp::util::set_vhost_otherinfo(&(init_req->otherInfo),
188                                               odr_en, target, 1);
189
190                 init_pkg.request() = init_gdu;
191
192                 // moving all package types
193                 init_pkg.move();
194
195                 // checking for closed back end packages
196                 if (!init_pkg.session().is_closed())
197                 {
198                     add_session(package.session().id(), target);
199
200                     package.response() = init_pkg.response();
201                     return;
202                 }
203             }
204             mp::odr odr;
205             package.response() = odr.create_initResponse(
206                 apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
207                 "load_balance: no available targets");
208             package.session().close();
209             return;
210         }
211         // frontend Z39.50 close request is added to statistics and marked
212         else if (gdu_req->u.z3950->which == Z_APDU_close)
213         {
214             is_closed_front = true;
215             boost::mutex::scoped_lock scoped_lock(m_mutex);
216             add_package(package.session().id());
217         }
218         // any other Z39.50 package is added to statistics
219         else
220         {
221             boost::mutex::scoped_lock scoped_lock(m_mutex);
222             add_package(package.session().id());
223         }
224     }
225
226     // moving all package types
227     package.move();
228
229     bool is_closed_back = false;
230
231     // checking for closed back end packages
232     if (package.session().is_closed())
233         is_closed_back = true;
234
235     Z_GDU *gdu_res = package.response().get();
236
237     // passing anything but z3950 packages
238     if (gdu_res && gdu_res->which == Z_GDU_Z3950)
239     {
240         // session closing only on Z39.50 close response
241         if (gdu_res->u.z3950->which == Z_APDU_close)
242         {
243             is_closed_back = true;
244             boost::mutex::scoped_lock scoped_lock(m_mutex);
245             remove_package(package.session().id());
246         }
247         // any other Z39.50 package is removed from statistics
248         else
249         {
250             boost::mutex::scoped_lock scoped_lock(m_mutex);
251             remove_package(package.session().id());
252         }
253     }
254
255     // finally removing sessions and marking deads
256     if (is_closed_back || is_closed_front)
257     {
258         boost::mutex::scoped_lock scoped_lock(m_mutex);
259
260         // marking backend dead if backend closed without fronted close
261         if (is_closed_front == false)
262             add_dead(package.session().id());
263
264         remove_session(package.session().id());
265
266         // making sure that package is closed
267         package.session().close();
268     }
269 }
270
271 // statistic manipulating functions,
272 void yf::LoadBalance::Impl::add_dead(unsigned long session_id)
273 {
274     std::string target = find_session_target(session_id);
275
276     if (target.size() != 0)
277     {
278         std::map<std::string, TargetStat>::iterator itarg;
279         itarg = m_target_stat.find(target);
280         if (itarg != m_target_stat.end()
281             && itarg->second.deads < std::numeric_limits<unsigned int>::max())
282         {
283             itarg->second.deads += 1;
284             // std:.cout << "add_dead " << session_id << " " << target
285             //          << " d:" << itarg->second.deads << "\n";
286         }
287     }
288 }
289
290 void yf::LoadBalance::Impl::add_package(unsigned long session_id)
291 {
292     std::string target = find_session_target(session_id);
293
294     if (target.size() != 0)
295     {
296         std::map<std::string, TargetStat>::iterator itarg;
297         itarg = m_target_stat.find(target);
298         if (itarg != m_target_stat.end()
299             && itarg->second.packages
300                < std::numeric_limits<unsigned int>::max())
301         {
302             itarg->second.packages += 1;
303         }
304     }
305 }
306
307 void yf::LoadBalance::Impl::remove_package(unsigned long session_id)
308 {
309     std::string target = find_session_target(session_id);
310
311     if (target.size() != 0)
312     {
313         std::map<std::string, TargetStat>::iterator itarg;
314         itarg = m_target_stat.find(target);
315         if (itarg != m_target_stat.end()
316             && itarg->second.packages > 0)
317         {
318             itarg->second.packages -= 1;
319         }
320     }
321 }
322
323 void yf::LoadBalance::Impl::add_session(unsigned long session_id,
324                                         std::string target)
325 {
326     // finding and adding session
327     std::map<unsigned long, std::string>::iterator isess;
328     isess = m_session_target.find(session_id);
329     if (isess == m_session_target.end())
330     {
331         m_session_target.insert(std::make_pair(session_id, target));
332     }
333
334     // finding and adding target statistics
335     std::map<std::string, TargetStat>::iterator itarg;
336     itarg = m_target_stat.find(target);
337     if (itarg == m_target_stat.end())
338     {
339         TargetStat stat;
340         stat.sessions = 1;
341         stat.packages = 0;
342         stat.deads = 0;
343         m_target_stat.insert(std::make_pair(target, stat));
344     }
345     else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
346     {
347         itarg->second.sessions += 1;
348     }
349 }
350
351 void yf::LoadBalance::Impl::remove_session(unsigned long session_id)
352 {
353     std::string target;
354
355     // finding session
356     std::map<unsigned long, std::string>::iterator isess;
357     isess = m_session_target.find(session_id);
358     if (isess == m_session_target.end())
359         return;
360     else
361         target = isess->second;
362
363     // finding target statistics
364     std::map<std::string, TargetStat>::iterator itarg;
365     itarg = m_target_stat.find(target);
366     if (itarg == m_target_stat.end())
367     {
368         m_session_target.erase(isess);
369         return;
370     }
371
372     // counting session down
373     if (itarg->second.sessions > 0)
374         itarg->second.sessions -= 1;
375
376     if (itarg->second.sessions == 0 && itarg->second.deads == 0)
377     {
378         m_target_stat.erase(itarg);
379         m_session_target.erase(isess);
380     }
381 }
382
383 std::string yf::LoadBalance::Impl::find_session_target(unsigned long session_id)
384 {
385     std::string target;
386     std::map<unsigned long, std::string>::iterator isess;
387     isess = m_session_target.find(session_id);
388     if (isess != m_session_target.end())
389         target = isess->second;
390     return target;
391 }
392
393
394 // cost functions
395 unsigned int yf::LoadBalance::Impl::cost(std::string target)
396 {
397     unsigned int cost = 0;
398
399     if (target.size() != 0)
400     {
401         std::map<std::string, TargetStat>::iterator itarg;
402         itarg = m_target_stat.find(target);
403         if (itarg != m_target_stat.end())
404             cost = itarg->second.cost();
405     }
406     return cost;
407 }
408
409 unsigned int yf::LoadBalance::Impl::dead(std::string target)
410 {
411     unsigned int dead = 0;
412
413     if (target.size() != 0)
414     {
415         std::map<std::string, TargetStat>::iterator itarg;
416         itarg = m_target_stat.find(target);
417         if (itarg != m_target_stat.end())
418             dead = itarg->second.deads;
419     }
420     return dead;
421 }
422
423
424 static mp::filter::Base* filter_creator()
425 {
426     return new mp::filter::LoadBalance;
427 }
428
429 extern "C" {
430     struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
431         0,
432         "load_balance",
433         filter_creator
434     };
435 }
436
437
438 /*
439  * Local variables:
440  * c-basic-offset: 4
441  * c-file-style: "Stroustrup"
442  * indent-tabs-mode: nil
443  * End:
444  * vim: shiftwidth=4 tabstop=8 expandtab
445  */
446