Version 1.0.23. Bump copyright year.
[metaproxy-moved-to-github.git] / src / filter_load_balance.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2010 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20 #include "session.hpp"
21 #include "package.hpp"
22 #include "filter.hpp"
23 #include "filter_load_balance.hpp"
24 #include "util.hpp"
25
26
27 #include <boost/thread/mutex.hpp>
28
29 #include <yaz/diagbib1.h>
30 #include <yaz/log.h>
31 #include <yaz/zgdu.h>
32
33 // remove max macro if already defined (defined later in <limits>)
34 #ifdef max
35 #undef max
36 #endif
37
38 //#include <iostream>
39 #include <list>
40 #include <map>
41 #include <limits>
42
43 namespace mp = metaproxy_1;
44 namespace yf = mp::filter;
45
46 namespace metaproxy_1
47 {
48     namespace filter
49     {
50         class LoadBalance::Impl
51         {
52         public:
53             Impl();
54             ~Impl();
55             void process(metaproxy_1::Package & package);
56             void configure(const xmlNode * ptr);
57         private:
58             // statistic manipulating functions, 
59             void add_dead(unsigned long session_id);
60             //void clear_dead(unsigned long session_id);
61             void add_package(unsigned long session_id);
62             void remove_package(unsigned long session_id);
63             void add_session(unsigned long session_id, std::string target);
64             void remove_session(unsigned long session_id);
65             std::string find_session_target(unsigned long session_id);
66
67             // cost functions
68             unsigned int cost(std::string target);
69             unsigned int dead(std::string target);
70
71             // local classes
72             class TargetStat {
73             public:
74                 unsigned int sessions;
75                 unsigned int packages;
76                 unsigned int deads;
77                 unsigned int cost() {
78                     unsigned int c = sessions + packages + deads;
79                     return c;
80                 }
81             };
82
83             // local protected databases
84             boost::mutex m_mutex;
85             std::map<std::string, TargetStat> m_target_stat;
86             std::map<unsigned long, std::string> m_session_target;
87         };
88     }
89 }
90
91 // define Pimpl wrapper forwarding to Impl
92  
93 yf::LoadBalance::LoadBalance() : m_p(new Impl)
94 {
95 }
96
97 yf::LoadBalance::~LoadBalance()
98 {  // must have a destructor because of boost::scoped_ptr
99 }
100
101 void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only)
102 {
103     m_p->configure(xmlnode);
104 }
105
106 void yf::LoadBalance::process(mp::Package &package) const
107 {
108     m_p->process(package);
109 }
110
111
112 yf::LoadBalance::Impl::Impl()
113 {
114 }
115
116 yf::LoadBalance::Impl::~Impl()
117
118 }
119
120 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
121 {
122 }
123
124 void yf::LoadBalance::Impl::process(mp::Package &package)
125 {
126     bool is_closed_front = false;
127     
128     // checking for closed front end packages
129     if (package.session().is_closed())
130     {
131         is_closed_front = true;
132     }    
133
134     Z_GDU *gdu_req = package.request().get();
135
136     // passing anything but z3950 packages
137     if (gdu_req && gdu_req->which == Z_GDU_Z3950)
138     {
139         // target selecting only on Z39.50 init request
140         if (gdu_req->u.z3950->which == Z_APDU_initRequest)
141         {
142             yazpp_1::GDU base_req(gdu_req);
143             Z_APDU *apdu = base_req.get()->u.z3950;
144
145             Z_InitRequest *org_init = base_req.get()->u.z3950->u.initRequest;
146             mp::odr odr_en(ODR_ENCODE);
147
148             std::list<std::string> vhosts;
149             mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
150             // get lowest of all vhosts.. Remove them if individually if
151             // they turn out to be bad..
152             while (1)
153             {
154                 std::string target;
155                 std::list<std::string>::iterator ivh = vhosts.begin();
156
157                 Package init_pkg(package.session(), package.origin());
158                 init_pkg.copy_filter(package);
159
160                 unsigned int cost = std::numeric_limits<unsigned int>::max();
161                 { 
162                     boost::mutex::scoped_lock scoped_lock(m_mutex);
163                     
164                     for (; ivh != vhosts.end(); )
165                     {
166                         if ((*ivh).size() != 0)
167                         {
168                             unsigned int vhcost 
169                                 = yf::LoadBalance::Impl::cost(*ivh);
170                             yaz_log(YLOG_LOG, "Consider %s cost=%u vhcost=%u",
171                                     (*ivh).c_str(), cost, vhcost);
172                             if (cost > vhcost)
173                             {
174                                 cost = vhcost;
175                                 target = *ivh;
176                                 ivh = vhosts.erase(ivh);
177                             }
178                             else
179                                 ivh++;
180                         }
181                         else
182                             ivh++;
183                     }
184                 }
185                 if (target.length() == 0)
186                     break;
187                 // copying new target into init package
188                 
189                 yazpp_1::GDU init_gdu(base_req);
190                 Z_InitRequest *init_req = init_gdu.get()->u.z3950->u.initRequest;
191                 
192                 mp::util::set_vhost_otherinfo(&(init_req->otherInfo), 
193                                               odr_en, target, 1);
194                 
195                 init_pkg.request() = init_gdu;
196                 
197                 // moving all package types 
198                 init_pkg.move();
199                 
200                 // checking for closed back end packages
201                 if (!init_pkg.session().is_closed())
202                 {
203                     add_session(package.session().id(), target);
204
205                     package.response() = init_pkg.response();
206                     return;
207                 }
208             }
209             mp::odr odr;
210             package.response() = odr.create_initResponse(
211                 apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
212                 "load_balance: no available targets");
213             package.session().close();
214             return;
215         }
216         // frontend Z39.50 close request is added to statistics and marked
217         else if (gdu_req->u.z3950->which == Z_APDU_close)
218         {
219             is_closed_front = true;
220             boost::mutex::scoped_lock scoped_lock(m_mutex);        
221             add_package(package.session().id());
222         }    
223         // any other Z39.50 package is added to statistics
224         else
225         {
226             boost::mutex::scoped_lock scoped_lock(m_mutex);        
227             add_package(package.session().id());
228         }
229     }
230
231     // moving all package types 
232     package.move();
233
234     bool is_closed_back = false;
235
236     // checking for closed back end packages
237     if (package.session().is_closed())
238         is_closed_back = true;
239
240     Z_GDU *gdu_res = package.response().get();
241
242     // passing anything but z3950 packages
243     if (gdu_res && gdu_res->which == Z_GDU_Z3950)
244     {
245         // session closing only on Z39.50 close response
246         if (gdu_res->u.z3950->which == Z_APDU_close)
247         {
248             is_closed_back = true;
249             boost::mutex::scoped_lock scoped_lock(m_mutex);
250             remove_package(package.session().id());
251         } 
252         // any other Z39.50 package is removed from statistics
253         else
254         {
255             boost::mutex::scoped_lock scoped_lock(m_mutex);
256             remove_package(package.session().id());
257         }
258     }
259
260     // finally removing sessions and marking deads
261     if (is_closed_back || is_closed_front)
262     {
263         boost::mutex::scoped_lock scoped_lock(m_mutex);
264
265         // marking backend dead if backend closed without fronted close 
266         if (is_closed_front == false)
267             add_dead(package.session().id());
268
269         remove_session(package.session().id());
270
271         // making sure that package is closed
272         package.session().close();
273     }
274 }
275             
276 // statistic manipulating functions, 
277 void yf::LoadBalance::Impl::add_dead(unsigned long session_id)
278 {
279     std::string target = find_session_target(session_id);
280
281     if (target.size() != 0)
282     {
283         std::map<std::string, TargetStat>::iterator itarg;        
284         itarg = m_target_stat.find(target);
285         if (itarg != m_target_stat.end()
286             && itarg->second.deads < std::numeric_limits<unsigned int>::max())
287         {
288             itarg->second.deads += 1;
289             // std:.cout << "add_dead " << session_id << " " << target 
290             //          << " d:" << itarg->second.deads << "\n";
291         }
292     }
293 }
294
295 void yf::LoadBalance::Impl::add_package(unsigned long session_id)
296 {
297     std::string target = find_session_target(session_id);
298
299     if (target.size() != 0)
300     {
301         std::map<std::string, TargetStat>::iterator itarg;        
302         itarg = m_target_stat.find(target);
303         if (itarg != m_target_stat.end()
304             && itarg->second.packages 
305                < std::numeric_limits<unsigned int>::max())
306         {
307             itarg->second.packages += 1;
308         }
309     }
310 }
311
312 void yf::LoadBalance::Impl::remove_package(unsigned long session_id)
313 {
314     std::string target = find_session_target(session_id);
315
316     if (target.size() != 0)
317     {
318         std::map<std::string, TargetStat>::iterator itarg;        
319         itarg = m_target_stat.find(target);
320         if (itarg != m_target_stat.end()
321             && itarg->second.packages > 0)
322         {
323             itarg->second.packages -= 1;
324         }
325     }
326 }
327
328 void yf::LoadBalance::Impl::add_session(unsigned long session_id, 
329                                         std::string target)
330 {
331     // finding and adding session
332     std::map<unsigned long, std::string>::iterator isess;
333     isess = m_session_target.find(session_id);
334     if (isess == m_session_target.end())
335     {
336         m_session_target.insert(std::make_pair(session_id, target));
337     }
338
339     // finding and adding target statistics
340     std::map<std::string, TargetStat>::iterator itarg;
341     itarg = m_target_stat.find(target);
342     if (itarg == m_target_stat.end())
343     {
344         TargetStat stat;
345         stat.sessions = 1;
346         stat.packages = 0;
347         stat.deads = 0;
348         m_target_stat.insert(std::make_pair(target, stat));
349     } 
350     else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
351     {
352         itarg->second.sessions += 1;
353     }
354 }
355
356 void yf::LoadBalance::Impl::remove_session(unsigned long session_id)
357 {
358     std::string target;
359
360     // finding session
361     std::map<unsigned long, std::string>::iterator isess;
362     isess = m_session_target.find(session_id);
363     if (isess == m_session_target.end())
364         return;
365     else
366         target = isess->second;
367
368     // finding target statistics
369     std::map<std::string, TargetStat>::iterator itarg;
370     itarg = m_target_stat.find(target);
371     if (itarg == m_target_stat.end())
372     {
373         m_session_target.erase(isess);
374         return;
375     }
376     
377     // counting session down
378     if (itarg->second.sessions > 0)
379         itarg->second.sessions -= 1;
380
381     if (itarg->second.sessions == 0 && itarg->second.deads == 0)
382     {
383         m_target_stat.erase(itarg);
384         m_session_target.erase(isess);
385     }
386 }
387
388 std::string yf::LoadBalance::Impl::find_session_target(unsigned long session_id)
389 {
390     std::string target;
391     std::map<unsigned long, std::string>::iterator isess;
392     isess = m_session_target.find(session_id);
393     if (isess != m_session_target.end())
394         target = isess->second;
395     return target;
396 }
397
398
399 // cost functions
400 unsigned int yf::LoadBalance::Impl::cost(std::string target)
401 {
402     unsigned int cost = 0;
403
404     if (target.size() != 0)
405     {
406         std::map<std::string, TargetStat>::iterator itarg;        
407         itarg = m_target_stat.find(target);
408         if (itarg != m_target_stat.end())
409             cost = itarg->second.cost();
410     }
411     return cost;
412 }
413
414 unsigned int yf::LoadBalance::Impl::dead(std::string target)
415 {
416     unsigned int dead = 0;
417
418     if (target.size() != 0)
419     {
420         std::map<std::string, TargetStat>::iterator itarg;        
421         itarg = m_target_stat.find(target);
422         if (itarg != m_target_stat.end())
423             dead = itarg->second.deads;
424     }
425     return dead;
426 }
427
428
429 static mp::filter::Base* filter_creator()
430 {
431     return new mp::filter::LoadBalance;
432 }
433
434 extern "C" {
435     struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
436         0,
437         "load_balance",
438         filter_creator
439     };
440 }
441
442
443 /*
444  * Local variables:
445  * c-basic-offset: 4
446  * c-file-style: "Stroustrup"
447  * indent-tabs-mode: nil
448  * End:
449  * vim: shiftwidth=4 tabstop=8 expandtab
450  */
451