Reformat
[metaproxy-moved-to-github.git] / src / filter_load_balance.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2009 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20 #include "session.hpp"
21 #include "package.hpp"
22 #include "filter.hpp"
23 #include "filter_load_balance.hpp"
24 #include "util.hpp"
25
26
27 #include <boost/thread/mutex.hpp>
28
29 #include <yaz/zgdu.h>
30
31 // remove max macro if already defined (defined later in <limits>)
32 #ifdef max
33 #undef max
34 #endif
35
36 //#include <iostream>
37 #include <list>
38 #include <map>
39 #include <limits>
40
41 namespace mp = metaproxy_1;
42 namespace yf = mp::filter;
43
44 namespace metaproxy_1
45 {
46     namespace filter
47     {
48         class LoadBalance::Impl
49         {
50         public:
51             Impl();
52             ~Impl();
53             void process(metaproxy_1::Package & package);
54             void configure(const xmlNode * ptr);
55         private:
56             // statistic manipulating functions, 
57             void add_dead(unsigned long session_id);
58             //void clear_dead(unsigned long session_id);
59             void add_package(unsigned long session_id);
60             void remove_package(unsigned long session_id);
61             void add_session(unsigned long session_id, std::string target);
62             void remove_session(unsigned long session_id);
63             std::string find_session_target(unsigned long session_id);
64
65             // cost functions
66             unsigned int cost(std::string target);
67             unsigned int dead(std::string target);
68
69             // local classes
70             class TargetStat {
71             public:
72                 unsigned int sessions;
73                 unsigned int packages;
74                 unsigned int deads;
75                 unsigned int cost() {
76                     unsigned int c = sessions + packages + deads;
77                     //std::cout << "stats  c:" << c 
78                     //          << " s:" << sessions 
79                     //          << " p:" << packages 
80                     //          << " d:" << deads 
81                     //          <<"\n";
82                     return c;
83                 }
84             };
85
86             // local protected databases
87             boost::mutex m_mutex;
88             std::map<std::string, TargetStat> m_target_stat;
89             std::map<unsigned long, std::string> m_session_target;
90         };
91     }
92 }
93
94 // define Pimpl wrapper forwarding to Impl
95  
96 yf::LoadBalance::LoadBalance() : m_p(new Impl)
97 {
98 }
99
100 yf::LoadBalance::~LoadBalance()
101 {  // must have a destructor because of boost::scoped_ptr
102 }
103
104 void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only)
105 {
106     m_p->configure(xmlnode);
107 }
108
109 void yf::LoadBalance::process(mp::Package &package) const
110 {
111     m_p->process(package);
112 }
113
114
115 yf::LoadBalance::Impl::Impl()
116 {
117 }
118
119 yf::LoadBalance::Impl::~Impl()
120
121 }
122
123 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
124 {
125 }
126
127 void yf::LoadBalance::Impl::process(mp::Package &package)
128 {
129
130     bool is_closed_front = false;
131     bool is_closed_back = false;
132
133     // checking for closed front end packages
134     if (package.session().is_closed())
135     {
136         is_closed_front = true;
137     }    
138
139     Z_GDU *gdu_req = package.request().get();
140
141     // passing anything but z3950 packages
142     if (gdu_req && gdu_req->which == Z_GDU_Z3950)
143     {
144         // target selecting only on Z39.50 init request
145         if (gdu_req->u.z3950->which == Z_APDU_initRequest)
146         {
147             mp::odr odr_en(ODR_ENCODE);
148             Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest;
149
150             // extracting virtual hosts
151             std::list<std::string> vhosts;
152             
153             mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
154             
155             // choosing one target according to load-balancing algorithm  
156             
157             if (vhosts.size())
158             {
159                 std::string target;
160                 unsigned int cost = std::numeric_limits<unsigned int>::max();
161                 { //locking scope for local databases
162                     boost::mutex::scoped_lock scoped_lock(m_mutex);
163                     
164                     // load-balancing algorithm goes here
165                     //target = *vhosts.begin();
166                     for (std::list<std::string>::const_iterator ivh 
167                              = vhosts.begin(); 
168                          ivh != vhosts.end();
169                          ivh++)
170                     {
171                         if ((*ivh).size() != 0)
172                         {
173                             unsigned int vhcost 
174                                 = yf::LoadBalance::Impl::cost(*ivh);
175                             if (cost > vhcost)
176                             {
177                                 cost = vhcost;
178                                 target = *ivh;
179                             }
180                         }
181                      } 
182
183                     // updating local database
184                     add_session(package.session().id(), target);
185                     yf::LoadBalance::Impl::cost(target);
186                     add_package(package.session().id());
187                 }
188                 
189                 // copying new target into init package
190                 mp::util::set_vhost_otherinfo(&(org_init->otherInfo), 
191                                               odr_en, target, 1);
192                 package.request() = gdu_req;
193             }
194         }
195         // frontend Z39.50 close request is added to statistics and marked
196         else if (gdu_req->u.z3950->which == Z_APDU_close)
197         {
198             is_closed_front = true;
199             boost::mutex::scoped_lock scoped_lock(m_mutex);        
200             add_package(package.session().id());
201         }    
202         // any other Z39.50 package is added to statistics
203         else
204         {
205             boost::mutex::scoped_lock scoped_lock(m_mutex);        
206             add_package(package.session().id());
207         }
208     }
209         
210     // moving all package types 
211     package.move();
212
213     // checking for closed back end packages
214     if (package.session().is_closed())
215         is_closed_back = true;
216
217     Z_GDU *gdu_res = package.response().get();
218
219     // passing anything but z3950 packages
220     if (gdu_res && gdu_res->which == Z_GDU_Z3950)
221     {
222         // session closing only on Z39.50 close response
223         if (gdu_res->u.z3950->which == Z_APDU_close)
224         {
225             is_closed_back = true;
226             boost::mutex::scoped_lock scoped_lock(m_mutex);
227             remove_package(package.session().id());
228         } 
229         // any other Z39.50 package is removed from statistics
230         else
231         {
232             boost::mutex::scoped_lock scoped_lock(m_mutex);
233             remove_package(package.session().id());
234         }
235     }
236
237     // finally removing sessions and marking deads
238     if (is_closed_back || is_closed_front)
239     {
240         boost::mutex::scoped_lock scoped_lock(m_mutex);
241
242         // marking backend dead if backend closed without fronted close 
243         if (is_closed_front == false)
244             add_dead(package.session().id());
245
246         remove_session(package.session().id());
247
248         // making sure that package is closed
249         package.session().close();
250     }
251 }
252             
253 // statistic manipulating functions, 
254 void yf::LoadBalance::Impl::add_dead(unsigned long session_id)
255 {
256     std::string target = find_session_target(session_id);
257
258     if (target.size() != 0)
259     {
260         std::map<std::string, TargetStat>::iterator itarg;        
261         itarg = m_target_stat.find(target);
262         if (itarg != m_target_stat.end()
263             && itarg->second.deads < std::numeric_limits<unsigned int>::max())
264         {
265             itarg->second.deads += 1;
266             // std:.cout << "add_dead " << session_id << " " << target 
267             //          << " d:" << itarg->second.deads << "\n";
268         }
269     }
270 }
271
272 //void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){
273 //    std::cout << "clear_dead " << session_id << "\n";
274 //};
275
276 void yf::LoadBalance::Impl::add_package(unsigned long session_id)
277 {
278     std::string target = find_session_target(session_id);
279
280     if (target.size() != 0)
281     {
282         std::map<std::string, TargetStat>::iterator itarg;        
283         itarg = m_target_stat.find(target);
284         if (itarg != m_target_stat.end()
285             && itarg->second.packages 
286                < std::numeric_limits<unsigned int>::max()){
287             itarg->second.packages += 1;
288             // std:.cout << "add_package " << session_id << " " << target 
289             //          << " p:" << itarg->second.packages << "\n";
290         }
291     }
292 }
293
294 void yf::LoadBalance::Impl::remove_package(unsigned long session_id)
295 {
296     std::string target = find_session_target(session_id);
297
298     if (target.size() != 0)
299     {
300         std::map<std::string, TargetStat>::iterator itarg;        
301         itarg = m_target_stat.find(target);
302         if (itarg != m_target_stat.end()
303             && itarg->second.packages > 0)
304         {
305             itarg->second.packages -= 1;
306             // std:.cout << "remove_package " << session_id << " " << target 
307             //          << " p:" << itarg->second.packages << "\n";
308         }
309     }
310 }
311
312 void yf::LoadBalance::Impl::add_session(unsigned long session_id, 
313                                         std::string target)
314 {
315     // finding and adding session
316     std::map<unsigned long, std::string>::iterator isess;
317     isess = m_session_target.find(session_id);
318     if (isess == m_session_target.end())
319     {
320         m_session_target.insert(std::make_pair(session_id, target));
321     }
322
323     // finding and adding target statistics
324     std::map<std::string, TargetStat>::iterator itarg;
325     itarg = m_target_stat.find(target);
326     if (itarg == m_target_stat.end())
327     {
328         TargetStat stat;
329         stat.sessions = 1;
330         stat.packages = 0;  // no idea why the defaut constructor TargetStat()
331         stat.deads = 0;     // is not initializig this correctly to zero ??
332         m_target_stat.insert(std::make_pair(target, stat));
333         // std:.cout << "add_session " << session_id << " " << target 
334         //          << " s:1\n";
335     } 
336     else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
337     {
338         itarg->second.sessions += 1;
339         // std:.cout << "add_session " << session_id << " " << target 
340         //          << " s:" << itarg->second.sessions << "\n";
341     }
342 }
343
344 void yf::LoadBalance::Impl::remove_session(unsigned long session_id)
345 {
346     std::string target;
347
348     // finding session
349     std::map<unsigned long, std::string>::iterator isess;
350     isess = m_session_target.find(session_id);
351     if (isess == m_session_target.end())
352         return;
353     else
354         target = isess->second;
355
356     // finding target statistics
357     std::map<std::string, TargetStat>::iterator itarg;
358     itarg = m_target_stat.find(target);
359     if (itarg == m_target_stat.end())
360     {
361         m_session_target.erase(isess);
362         return;
363     }
364     
365     // counting session down
366     if (itarg->second.sessions > 0)
367         itarg->second.sessions -= 1;
368
369     // std:.cout << "remove_session " << session_id << " " << target 
370     //          << " s:" << itarg->second.sessions << "\n";
371     
372     // clearing empty sessions and targets
373     if (itarg->second.sessions == 0 && itarg->second.deads == 0 )
374     {
375         m_target_stat.erase(itarg);
376         m_session_target.erase(isess);
377     }
378 }
379
380 std::string yf::LoadBalance::Impl::find_session_target(unsigned long session_id)
381 {
382     std::string target;
383     std::map<unsigned long, std::string>::iterator isess;
384     isess = m_session_target.find(session_id);
385     if (isess != m_session_target.end())
386         target = isess->second;
387     return target;
388 }
389
390
391 // cost functions
392 unsigned int yf::LoadBalance::Impl::cost(std::string target)
393 {
394     unsigned int cost;
395
396     if (target.size() != 0){
397         std::map<std::string, TargetStat>::iterator itarg;        
398         itarg = m_target_stat.find(target);
399         if (itarg != m_target_stat.end()){
400             cost = itarg->second.cost();
401         }
402     }
403     
404     //std::cout << "cost " << target << " c:" << cost << "\n";
405     return cost;
406 }
407
408 unsigned int yf::LoadBalance::Impl::dead(std::string target)
409 {
410     unsigned int dead;
411
412     if (target.size() != 0){
413         std::map<std::string, TargetStat>::iterator itarg;        
414         itarg = m_target_stat.find(target);
415         if (itarg != m_target_stat.end()){
416             dead = itarg->second.deads;
417         }
418     }
419     
420     //std::cout << "dead " << target << " d:" << dead << "\n";
421     return dead;
422 }
423
424
425 static mp::filter::Base* filter_creator()
426 {
427     return new mp::filter::LoadBalance;
428 }
429
430 extern "C" {
431     struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
432         0,
433         "load_balance",
434         filter_creator
435     };
436 }
437
438
439 /*
440  * Local variables:
441  * c-basic-offset: 4
442  * c-file-style: "Stroustrup"
443  * indent-tabs-mode: nil
444  * End:
445  * vim: shiftwidth=4 tabstop=8 expandtab
446  */
447