dafe8f3a615a675d38029e3675b476fb5ee0802c
[metaproxy-moved-to-github.git] / src / filter_load_balance.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20 #include <metaproxy/package.hpp>
21 #include <metaproxy/filter.hpp>
22 #include "filter_load_balance.hpp"
23 #include <metaproxy/util.hpp>
24
25
26 #include <boost/thread/mutex.hpp>
27
28 #include <yaz/diagbib1.h>
29 #include <yaz/log.h>
30 #include <yaz/zgdu.h>
31
32 // remove max macro if already defined (defined later in <limits>)
33 #ifdef max
34 #undef max
35 #endif
36
37 #include <list>
38 #include <map>
39 #include <limits>
40
41 namespace mp = metaproxy_1;
42 namespace yf = mp::filter;
43
44 namespace metaproxy_1
45 {
46     namespace filter
47     {
48         class LoadBalance::Impl
49         {
50         public:
51             Impl();
52             ~Impl();
53             void process(metaproxy_1::Package & package);
54             void configure(const xmlNode * ptr);
55         private:
56             // statistic manipulating functions,
57             void add_dead(unsigned long session_id);
58             //void clear_dead(unsigned long session_id);
59             void add_package(unsigned long session_id);
60             void remove_package(unsigned long session_id);
61             void add_session(unsigned long session_id, std::string target);
62             void remove_session(unsigned long session_id);
63             std::string find_session_target(unsigned long session_id);
64
65             // cost functions
66             unsigned int cost(std::string target);
67             unsigned int dead(std::string target);
68
69             // local classes
70             class TargetStat {
71             public:
72                 unsigned int sessions;
73                 unsigned int packages;
74                 unsigned int deads;
75                 unsigned int cost() {
76                     unsigned int c = sessions + packages + deads;
77                     return c;
78                 }
79             };
80
81             // local protected databases
82             boost::mutex m_mutex;
83             std::map<std::string, TargetStat> m_target_stat;
84             std::map<unsigned long, std::string> m_session_target;
85         };
86     }
87 }
88
89 // define Pimpl wrapper forwarding to Impl
90
91 yf::LoadBalance::LoadBalance() : m_p(new Impl)
92 {
93 }
94
95 yf::LoadBalance::~LoadBalance()
96 {  // must have a destructor because of boost::scoped_ptr
97 }
98
99 void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only,
100                                 const char *path)
101 {
102     m_p->configure(xmlnode);
103 }
104
105 void yf::LoadBalance::process(mp::Package &package) const
106 {
107     m_p->process(package);
108 }
109
110
111 yf::LoadBalance::Impl::Impl()
112 {
113 }
114
115 yf::LoadBalance::Impl::~Impl()
116 {
117 }
118
119 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
120 {
121 }
122
123 void yf::LoadBalance::Impl::process(mp::Package &package)
124 {
125     bool is_closed_front = false;
126
127     // checking for closed front end packages
128     if (package.session().is_closed())
129     {
130         is_closed_front = true;
131     }
132
133     Z_GDU *gdu_req = package.request().get();
134
135     // passing anything but z3950 packages
136     if (gdu_req && gdu_req->which == Z_GDU_Z3950)
137     {
138         // target selecting only on Z39.50 init request
139         if (gdu_req->u.z3950->which == Z_APDU_initRequest)
140         {
141             yazpp_1::GDU base_req(gdu_req);
142             Z_APDU *apdu = base_req.get()->u.z3950;
143
144             Z_InitRequest *org_init = base_req.get()->u.z3950->u.initRequest;
145             mp::odr odr_en(ODR_ENCODE);
146
147             std::list<std::string> vhosts;
148             mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
149             // get lowest of all vhosts.. Remove them if individually if
150             // they turn out to be bad..
151             while (1)
152             {
153                 std::string target;
154                 std::list<std::string>::iterator ivh = vhosts.begin();
155
156                 Package init_pkg(package.session(), package.origin());
157                 init_pkg.copy_filter(package);
158
159                 unsigned int cost = std::numeric_limits<unsigned int>::max();
160                 {
161                     boost::mutex::scoped_lock scoped_lock(m_mutex);
162
163                     for (; ivh != vhosts.end(); )
164                     {
165                         if ((*ivh).size() != 0)
166                         {
167                             unsigned int vhcost
168                                 = yf::LoadBalance::Impl::cost(*ivh);
169                             yaz_log(YLOG_LOG, "Consider %s cost=%u vhcost=%u",
170                                     (*ivh).c_str(), cost, vhcost);
171                             if (cost > vhcost)
172                             {
173                                 cost = vhcost;
174                                 target = *ivh;
175                                 ivh = vhosts.erase(ivh);
176                             }
177                             else
178                                 ivh++;
179                         }
180                         else
181                             ivh++;
182                     }
183                 }
184                 if (target.length() == 0)
185                     break;
186                 // copying new target into init package
187
188                 yazpp_1::GDU init_gdu(base_req);
189                 Z_InitRequest *init_req = init_gdu.get()->u.z3950->u.initRequest;
190
191                 mp::util::set_vhost_otherinfo(&(init_req->otherInfo),
192                                               odr_en, target, 1);
193
194                 init_pkg.request() = init_gdu;
195
196                 // moving all package types
197                 init_pkg.move();
198
199                 // checking for closed back end packages
200                 if (!init_pkg.session().is_closed())
201                 {
202                     add_session(package.session().id(), target);
203
204                     package.response() = init_pkg.response();
205                     return;
206                 }
207             }
208             mp::odr odr;
209             package.response() = odr.create_initResponse(
210                 apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
211                 "load_balance: no available targets");
212             package.session().close();
213             return;
214         }
215         // frontend Z39.50 close request is added to statistics and marked
216         else if (gdu_req->u.z3950->which == Z_APDU_close)
217         {
218             is_closed_front = true;
219             boost::mutex::scoped_lock scoped_lock(m_mutex);
220             add_package(package.session().id());
221         }
222         // any other Z39.50 package is added to statistics
223         else
224         {
225             boost::mutex::scoped_lock scoped_lock(m_mutex);
226             add_package(package.session().id());
227         }
228     }
229
230     // moving all package types
231     package.move();
232
233     bool is_closed_back = false;
234
235     // checking for closed back end packages
236     if (package.session().is_closed())
237         is_closed_back = true;
238
239     Z_GDU *gdu_res = package.response().get();
240
241     // passing anything but z3950 packages
242     if (gdu_res && gdu_res->which == Z_GDU_Z3950)
243     {
244         // session closing only on Z39.50 close response
245         if (gdu_res->u.z3950->which == Z_APDU_close)
246         {
247             is_closed_back = true;
248             boost::mutex::scoped_lock scoped_lock(m_mutex);
249             remove_package(package.session().id());
250         }
251         // any other Z39.50 package is removed from statistics
252         else
253         {
254             boost::mutex::scoped_lock scoped_lock(m_mutex);
255             remove_package(package.session().id());
256         }
257     }
258
259     // finally removing sessions and marking deads
260     if (is_closed_back || is_closed_front)
261     {
262         boost::mutex::scoped_lock scoped_lock(m_mutex);
263
264         // marking backend dead if backend closed without fronted close
265         if (is_closed_front == false)
266             add_dead(package.session().id());
267
268         remove_session(package.session().id());
269
270         // making sure that package is closed
271         package.session().close();
272     }
273 }
274
275 // statistic manipulating functions,
276 void yf::LoadBalance::Impl::add_dead(unsigned long session_id)
277 {
278     std::string target = find_session_target(session_id);
279
280     if (target.size() != 0)
281     {
282         std::map<std::string, TargetStat>::iterator itarg;
283         itarg = m_target_stat.find(target);
284         if (itarg != m_target_stat.end()
285             && itarg->second.deads < std::numeric_limits<unsigned int>::max())
286         {
287             itarg->second.deads += 1;
288             // std:.cout << "add_dead " << session_id << " " << target
289             //          << " d:" << itarg->second.deads << "\n";
290         }
291     }
292 }
293
294 void yf::LoadBalance::Impl::add_package(unsigned long session_id)
295 {
296     std::string target = find_session_target(session_id);
297
298     if (target.size() != 0)
299     {
300         std::map<std::string, TargetStat>::iterator itarg;
301         itarg = m_target_stat.find(target);
302         if (itarg != m_target_stat.end()
303             && itarg->second.packages
304                < std::numeric_limits<unsigned int>::max())
305         {
306             itarg->second.packages += 1;
307         }
308     }
309 }
310
311 void yf::LoadBalance::Impl::remove_package(unsigned long session_id)
312 {
313     std::string target = find_session_target(session_id);
314
315     if (target.size() != 0)
316     {
317         std::map<std::string, TargetStat>::iterator itarg;
318         itarg = m_target_stat.find(target);
319         if (itarg != m_target_stat.end()
320             && itarg->second.packages > 0)
321         {
322             itarg->second.packages -= 1;
323         }
324     }
325 }
326
327 void yf::LoadBalance::Impl::add_session(unsigned long session_id,
328                                         std::string target)
329 {
330     // finding and adding session
331     std::map<unsigned long, std::string>::iterator isess;
332     isess = m_session_target.find(session_id);
333     if (isess == m_session_target.end())
334     {
335         m_session_target.insert(std::make_pair(session_id, target));
336     }
337
338     // finding and adding target statistics
339     std::map<std::string, TargetStat>::iterator itarg;
340     itarg = m_target_stat.find(target);
341     if (itarg == m_target_stat.end())
342     {
343         TargetStat stat;
344         stat.sessions = 1;
345         stat.packages = 0;
346         stat.deads = 0;
347         m_target_stat.insert(std::make_pair(target, stat));
348     }
349     else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
350     {
351         itarg->second.sessions += 1;
352     }
353 }
354
355 void yf::LoadBalance::Impl::remove_session(unsigned long session_id)
356 {
357     std::string target;
358
359     // finding session
360     std::map<unsigned long, std::string>::iterator isess;
361     isess = m_session_target.find(session_id);
362     if (isess == m_session_target.end())
363         return;
364     else
365         target = isess->second;
366
367     // finding target statistics
368     std::map<std::string, TargetStat>::iterator itarg;
369     itarg = m_target_stat.find(target);
370     if (itarg == m_target_stat.end())
371     {
372         m_session_target.erase(isess);
373         return;
374     }
375
376     // counting session down
377     if (itarg->second.sessions > 0)
378         itarg->second.sessions -= 1;
379
380     if (itarg->second.sessions == 0 && itarg->second.deads == 0)
381     {
382         m_target_stat.erase(itarg);
383         m_session_target.erase(isess);
384     }
385 }
386
387 std::string yf::LoadBalance::Impl::find_session_target(unsigned long session_id)
388 {
389     std::string target;
390     std::map<unsigned long, std::string>::iterator isess;
391     isess = m_session_target.find(session_id);
392     if (isess != m_session_target.end())
393         target = isess->second;
394     return target;
395 }
396
397
398 // cost functions
399 unsigned int yf::LoadBalance::Impl::cost(std::string target)
400 {
401     unsigned int cost = 0;
402
403     if (target.size() != 0)
404     {
405         std::map<std::string, TargetStat>::iterator itarg;
406         itarg = m_target_stat.find(target);
407         if (itarg != m_target_stat.end())
408             cost = itarg->second.cost();
409     }
410     return cost;
411 }
412
413 unsigned int yf::LoadBalance::Impl::dead(std::string target)
414 {
415     unsigned int dead = 0;
416
417     if (target.size() != 0)
418     {
419         std::map<std::string, TargetStat>::iterator itarg;
420         itarg = m_target_stat.find(target);
421         if (itarg != m_target_stat.end())
422             dead = itarg->second.deads;
423     }
424     return dead;
425 }
426
427
428 static mp::filter::Base* filter_creator()
429 {
430     return new mp::filter::LoadBalance;
431 }
432
433 extern "C" {
434     struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
435         0,
436         "load_balance",
437         filter_creator
438     };
439 }
440
441
442 /*
443  * Local variables:
444  * c-basic-offset: 4
445  * c-file-style: "Stroustrup"
446  * indent-tabs-mode: nil
447  * End:
448  * vim: shiftwidth=4 tabstop=8 expandtab
449  */
450