Work on load balancer
[metaproxy-moved-to-github.git] / src / filter_load_balance.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2009 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20 #include "session.hpp"
21 #include "package.hpp"
22 #include "filter.hpp"
23 #include "filter_load_balance.hpp"
24 #include "util.hpp"
25
26
27 #include <boost/thread/mutex.hpp>
28
29 #include <yaz/diagbib1.h>
30 #include <yaz/log.h>
31 #include <yaz/zgdu.h>
32
33 // remove max macro if already defined (defined later in <limits>)
34 #ifdef max
35 #undef max
36 #endif
37
38 //#include <iostream>
39 #include <list>
40 #include <map>
41 #include <limits>
42
43 namespace mp = metaproxy_1;
44 namespace yf = mp::filter;
45
46 namespace metaproxy_1
47 {
48     namespace filter
49     {
50         class LoadBalance::Impl
51         {
52         public:
53             Impl();
54             ~Impl();
55             void process(metaproxy_1::Package & package);
56             void configure(const xmlNode * ptr);
57         private:
58             // statistic manipulating functions, 
59             void add_dead(unsigned long session_id);
60             //void clear_dead(unsigned long session_id);
61             void add_package(unsigned long session_id);
62             void remove_package(unsigned long session_id);
63             void add_session(unsigned long session_id, std::string target);
64             void remove_session(unsigned long session_id);
65             std::string find_session_target(unsigned long session_id);
66
67             // cost functions
68             unsigned int cost(std::string target);
69             unsigned int dead(std::string target);
70
71             // local classes
72             class TargetStat {
73             public:
74                 unsigned int sessions;
75                 unsigned int packages;
76                 unsigned int deads;
77                 unsigned int cost() {
78                     unsigned int c = sessions + packages + deads;
79                     //std::cout << "stats  c:" << c 
80                     //          << " s:" << sessions 
81                     //          << " p:" << packages 
82                     //          << " d:" << deads 
83                     //          <<"\n";
84                     return c;
85                 }
86             };
87
88             // local protected databases
89             boost::mutex m_mutex;
90             std::map<std::string, TargetStat> m_target_stat;
91             std::map<unsigned long, std::string> m_session_target;
92         };
93     }
94 }
95
96 // define Pimpl wrapper forwarding to Impl
97  
98 yf::LoadBalance::LoadBalance() : m_p(new Impl)
99 {
100 }
101
102 yf::LoadBalance::~LoadBalance()
103 {  // must have a destructor because of boost::scoped_ptr
104 }
105
106 void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only)
107 {
108     m_p->configure(xmlnode);
109 }
110
111 void yf::LoadBalance::process(mp::Package &package) const
112 {
113     m_p->process(package);
114 }
115
116
117 yf::LoadBalance::Impl::Impl()
118 {
119 }
120
121 yf::LoadBalance::Impl::~Impl()
122
123 }
124
125 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
126 {
127 }
128
129 void yf::LoadBalance::Impl::process(mp::Package &package)
130 {
131     bool is_closed_front = false;
132     
133     // checking for closed front end packages
134     if (package.session().is_closed())
135     {
136         is_closed_front = true;
137     }    
138
139     Z_GDU *gdu_req = package.request().get();
140
141     // passing anything but z3950 packages
142     if (gdu_req && gdu_req->which == Z_GDU_Z3950)
143     {
144         // target selecting only on Z39.50 init request
145         if (gdu_req->u.z3950->which == Z_APDU_initRequest)
146         {
147             yazpp_1::GDU base_req(gdu_req);
148             Z_APDU *apdu = base_req.get()->u.z3950;
149
150             Z_InitRequest *org_init = base_req.get()->u.z3950->u.initRequest;
151             mp::odr odr_en(ODR_ENCODE);
152
153             std::list<std::string> vhosts;
154             mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
155             // get lowest of all vhosts.. Remove them if individually if
156             // they turn out to be bad..
157             while (1)
158             {
159                 std::string target;
160                 std::list<std::string>::iterator ivh = vhosts.begin();
161                 unsigned int cost = std::numeric_limits<unsigned int>::max();
162                 { 
163                     boost::mutex::scoped_lock scoped_lock(m_mutex);
164                     
165                     for (; ivh != vhosts.end(); )
166                     {
167                         if ((*ivh).size() != 0)
168                         {
169                             unsigned int vhcost 
170                                 = yf::LoadBalance::Impl::cost(*ivh);
171                             yaz_log(YLOG_LOG, "Consider %s cost=%u vhcost=%u",
172                                     (*ivh).c_str(), cost, vhcost);
173                             if (cost > vhcost)
174                             {
175                                 cost = vhcost;
176                                 target = *ivh;
177                                 ivh = vhosts.erase(ivh);
178                             }
179                             else
180                                 ivh++;
181                         }
182                         else
183                             ivh++;
184                     }
185                 }
186                 if (target.length() == 0)
187                     break;
188                 // copying new target into init package
189
190                 yazpp_1::GDU init_gdu(base_req);
191                 Z_InitRequest *init_req = init_gdu.get()->u.z3950->u.initRequest;
192                 
193                 mp::util::set_vhost_otherinfo(&(init_req->otherInfo), 
194                                               odr_en, target, 1);
195                 
196                 package.request() = init_gdu;
197                 
198                 // moving all package types 
199                 package.move();
200                 
201                 // checking for closed back end packages
202                 if (!package.session().is_closed())
203                 {
204                     add_session(package.session().id(), target);
205                     return;
206                 }
207                 yaz_log(YLOG_LOG, "Other round..");
208             }
209             mp::odr odr;
210             package.response() = odr.create_initResponse(
211                 apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
212                 "load_balance: no available targets");
213             package.session().close();
214             return;
215         }
216         // frontend Z39.50 close request is added to statistics and marked
217         else if (gdu_req->u.z3950->which == Z_APDU_close)
218         {
219             is_closed_front = true;
220             boost::mutex::scoped_lock scoped_lock(m_mutex);        
221             add_package(package.session().id());
222         }    
223         // any other Z39.50 package is added to statistics
224         else
225         {
226             boost::mutex::scoped_lock scoped_lock(m_mutex);        
227             add_package(package.session().id());
228         }
229     }
230
231     // moving all package types 
232     package.move();
233
234     bool is_closed_back = false;
235
236     // checking for closed back end packages
237     if (package.session().is_closed())
238         is_closed_back = true;
239
240     Z_GDU *gdu_res = package.response().get();
241
242     // passing anything but z3950 packages
243     if (gdu_res && gdu_res->which == Z_GDU_Z3950)
244     {
245         // session closing only on Z39.50 close response
246         if (gdu_res->u.z3950->which == Z_APDU_close)
247         {
248             is_closed_back = true;
249             boost::mutex::scoped_lock scoped_lock(m_mutex);
250             remove_package(package.session().id());
251         } 
252         // any other Z39.50 package is removed from statistics
253         else
254         {
255             boost::mutex::scoped_lock scoped_lock(m_mutex);
256             remove_package(package.session().id());
257         }
258     }
259
260     // finally removing sessions and marking deads
261     if (is_closed_back || is_closed_front)
262     {
263         boost::mutex::scoped_lock scoped_lock(m_mutex);
264
265         // marking backend dead if backend closed without fronted close 
266         if (is_closed_front == false)
267             add_dead(package.session().id());
268
269         remove_session(package.session().id());
270
271         // making sure that package is closed
272         package.session().close();
273     }
274 }
275             
276 // statistic manipulating functions, 
277 void yf::LoadBalance::Impl::add_dead(unsigned long session_id)
278 {
279     std::string target = find_session_target(session_id);
280
281     if (target.size() != 0)
282     {
283         std::map<std::string, TargetStat>::iterator itarg;        
284         itarg = m_target_stat.find(target);
285         if (itarg != m_target_stat.end()
286             && itarg->second.deads < std::numeric_limits<unsigned int>::max())
287         {
288             itarg->second.deads += 1;
289             // std:.cout << "add_dead " << session_id << " " << target 
290             //          << " d:" << itarg->second.deads << "\n";
291         }
292     }
293 }
294
295 //void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){
296 //    std::cout << "clear_dead " << session_id << "\n";
297 //};
298
299 void yf::LoadBalance::Impl::add_package(unsigned long session_id)
300 {
301     std::string target = find_session_target(session_id);
302
303     if (target.size() != 0)
304     {
305         std::map<std::string, TargetStat>::iterator itarg;        
306         itarg = m_target_stat.find(target);
307         if (itarg != m_target_stat.end()
308             && itarg->second.packages 
309                < std::numeric_limits<unsigned int>::max())
310         {
311             itarg->second.packages += 1;
312         }
313     }
314 }
315
316 void yf::LoadBalance::Impl::remove_package(unsigned long session_id)
317 {
318     std::string target = find_session_target(session_id);
319
320     if (target.size() != 0)
321     {
322         std::map<std::string, TargetStat>::iterator itarg;        
323         itarg = m_target_stat.find(target);
324         if (itarg != m_target_stat.end()
325             && itarg->second.packages > 0)
326         {
327             itarg->second.packages -= 1;
328         }
329     }
330 }
331
332 void yf::LoadBalance::Impl::add_session(unsigned long session_id, 
333                                         std::string target)
334 {
335     // finding and adding session
336     std::map<unsigned long, std::string>::iterator isess;
337     isess = m_session_target.find(session_id);
338     if (isess == m_session_target.end())
339     {
340         m_session_target.insert(std::make_pair(session_id, target));
341     }
342
343     // finding and adding target statistics
344     std::map<std::string, TargetStat>::iterator itarg;
345     itarg = m_target_stat.find(target);
346     if (itarg == m_target_stat.end())
347     {
348         TargetStat stat;
349         stat.sessions = 1;
350         stat.packages = 0;
351         stat.deads = 0;
352         m_target_stat.insert(std::make_pair(target, stat));
353     } 
354     else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
355     {
356         itarg->second.sessions += 1;
357     }
358 }
359
360 void yf::LoadBalance::Impl::remove_session(unsigned long session_id)
361 {
362     std::string target;
363
364     // finding session
365     std::map<unsigned long, std::string>::iterator isess;
366     isess = m_session_target.find(session_id);
367     if (isess == m_session_target.end())
368         return;
369     else
370         target = isess->second;
371
372     // finding target statistics
373     std::map<std::string, TargetStat>::iterator itarg;
374     itarg = m_target_stat.find(target);
375     if (itarg == m_target_stat.end())
376     {
377         m_session_target.erase(isess);
378         return;
379     }
380     
381     // counting session down
382     if (itarg->second.sessions > 0)
383         itarg->second.sessions -= 1;
384
385     // std:.cout << "remove_session " << session_id << " " << target 
386     //          << " s:" << itarg->second.sessions << "\n";
387     
388     // clearing empty sessions and targets
389     if (itarg->second.sessions == 0 && itarg->second.deads == 0)
390     {
391         m_target_stat.erase(itarg);
392         m_session_target.erase(isess);
393     }
394 }
395
396 std::string yf::LoadBalance::Impl::find_session_target(unsigned long session_id)
397 {
398     std::string target;
399     std::map<unsigned long, std::string>::iterator isess;
400     isess = m_session_target.find(session_id);
401     if (isess != m_session_target.end())
402         target = isess->second;
403     return target;
404 }
405
406
407 // cost functions
408 unsigned int yf::LoadBalance::Impl::cost(std::string target)
409 {
410     unsigned int cost = 0;
411
412     if (target.size() != 0)
413     {
414         std::map<std::string, TargetStat>::iterator itarg;        
415         itarg = m_target_stat.find(target);
416         if (itarg != m_target_stat.end())
417             cost = itarg->second.cost();
418     }
419     
420     //std::cout << "cost " << target << " c:" << cost << "\n";
421     return cost;
422 }
423
424 unsigned int yf::LoadBalance::Impl::dead(std::string target)
425 {
426     unsigned int dead = 0;
427
428     if (target.size() != 0)
429     {
430         std::map<std::string, TargetStat>::iterator itarg;        
431         itarg = m_target_stat.find(target);
432         if (itarg != m_target_stat.end())
433             dead = itarg->second.deads;
434     }
435     
436     //std::cout << "dead " << target << " d:" << dead << "\n";
437     return dead;
438 }
439
440
441 static mp::filter::Base* filter_creator()
442 {
443     return new mp::filter::LoadBalance;
444 }
445
446 extern "C" {
447     struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
448         0,
449         "load_balance",
450         filter_creator
451     };
452 }
453
454
455 /*
456  * Local variables:
457  * c-basic-offset: 4
458  * c-file-style: "Stroustrup"
459  * indent-tabs-mode: nil
460  * End:
461  * vim: shiftwidth=4 tabstop=8 expandtab
462  */
463