Updated footer comment
[metaproxy-moved-to-github.git] / src / filter_load_balance.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2008 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20 #include "session.hpp"
21 #include "package.hpp"
22 #include "filter.hpp"
23 #include "filter_load_balance.hpp"
24 #include "util.hpp"
25
26
27 #include <boost/thread/mutex.hpp>
28
29 #include <yaz/zgdu.h>
30
31 // remove max macro if already defined (defined later in <limits>)
32 #ifdef max
33 #undef max
34 #endif
35
36 //#include <iostream>
37 #include <list>
38 #include <map>
39 #include <limits>
40
41 namespace mp = metaproxy_1;
42 namespace yf = mp::filter;
43
44 namespace metaproxy_1 {
45     namespace filter {
46         class LoadBalance::Impl {
47         public:
48             Impl();
49             ~Impl();
50             void process(metaproxy_1::Package & package);
51             void configure(const xmlNode * ptr);
52         private:
53             // statistic manipulating functions, 
54             void add_dead(unsigned long session_id);
55             //void clear_dead(unsigned long session_id);
56             void add_package(unsigned long session_id);
57             void remove_package(unsigned long session_id);
58             void add_session(unsigned long session_id, std::string target);
59             void remove_session(unsigned long session_id);
60             std::string find_session_target(unsigned long session_id);
61
62             // cost functions
63             unsigned int cost(std::string target);
64             unsigned int dead(std::string target);
65
66             // local classes
67             class TargetStat {
68             public:
69                 unsigned int sessions;
70                 unsigned int packages;
71                 unsigned int deads;
72                 unsigned int cost() {
73                     unsigned int c = sessions + packages + deads;
74                     //std::cout << "stats  c:" << c 
75                     //          << " s:" << sessions 
76                     //          << " p:" << packages 
77                     //          << " d:" << deads 
78                     //          <<"\n";
79                     return c;
80                 }
81             };
82
83             // local protected databases
84             boost::mutex m_mutex;
85             std::map<std::string, TargetStat> m_target_stat;
86             std::map<unsigned long, std::string> m_session_target;
87         };
88     }
89 }
90
91 // define Pimpl wrapper forwarding to Impl
92  
93 yf::LoadBalance::LoadBalance() : m_p(new Impl)
94 {
95 }
96
97 yf::LoadBalance::~LoadBalance()
98 {  // must have a destructor because of boost::scoped_ptr
99 }
100
101 void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only)
102 {
103     m_p->configure(xmlnode);
104 }
105
106 void yf::LoadBalance::process(mp::Package &package) const
107 {
108     m_p->process(package);
109 }
110
111
112 // define Implementation stuff
113
114
115
116 yf::LoadBalance::Impl::Impl()
117 {
118 }
119
120 yf::LoadBalance::Impl::~Impl()
121
122 }
123
124 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
125 {
126 }
127
128 void yf::LoadBalance::Impl::process(mp::Package &package)
129 {
130
131     bool is_closed_front = false;
132     bool is_closed_back = false;
133
134     // checking for closed front end packages
135     if (package.session().is_closed()){
136         is_closed_front = true;
137     }    
138
139     Z_GDU *gdu_req = package.request().get();
140
141     // passing anything but z3950 packages
142     if (gdu_req && gdu_req->which == Z_GDU_Z3950){
143
144         // target selecting only on Z39.50 init request
145         if (gdu_req->u.z3950->which == Z_APDU_initRequest){
146
147             mp::odr odr_en(ODR_ENCODE);
148             Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest;
149
150             // extracting virtual hosts
151             std::list<std::string> vhosts;
152             
153             mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
154             
155             // choosing one target according to load-balancing algorithm  
156             
157             if (vhosts.size()){
158                 std::string target;
159                 unsigned int cost = std::numeric_limits<unsigned int>::max();
160                 
161                 { //locking scope for local databases
162                     boost::mutex::scoped_lock scoped_lock(m_mutex);
163                     
164                     // load-balancing algorithm goes here
165                     //target = *vhosts.begin();
166                     for(std::list<std::string>::const_iterator ivh 
167                             = vhosts.begin(); 
168                         ivh != vhosts.end();
169                         ivh++){
170                         if ((*ivh).size() != 0){
171                             unsigned int vhcost 
172                                 = yf::LoadBalance::Impl::cost(*ivh);
173                             if (cost > vhcost){
174                                 cost = vhcost;
175                                 target = *ivh;
176                             }
177                         }
178                      } 
179
180                     // updating local database
181                     add_session(package.session().id(), target);
182                     yf::LoadBalance::Impl::cost(target);
183                     add_package(package.session().id());
184                 }
185                 
186                 // copying new target into init package
187                 mp::util::set_vhost_otherinfo(&(org_init->otherInfo), 
188                                               odr_en, target, 1);
189                 package.request() = gdu_req;
190         }
191             
192         }
193         // frontend Z39.50 close request is added to statistics and marked
194         else if (gdu_req->u.z3950->which == Z_APDU_close){
195             is_closed_front = true;
196             boost::mutex::scoped_lock scoped_lock(m_mutex);        
197             add_package(package.session().id());
198         }    
199         // any other Z39.50 package is added to statistics
200         else {
201             boost::mutex::scoped_lock scoped_lock(m_mutex);        
202             add_package(package.session().id());
203         }
204     }
205         
206     // moving all package types 
207     package.move();
208
209
210     // checking for closed back end packages
211     if (package.session().is_closed())
212         is_closed_back = true;
213
214     Z_GDU *gdu_res = package.response().get();
215
216     // passing anything but z3950 packages
217     if (gdu_res && gdu_res->which == Z_GDU_Z3950){
218
219         // session closing only on Z39.50 close response
220         if (gdu_res->u.z3950->which == Z_APDU_close){
221             is_closed_back = true;
222             boost::mutex::scoped_lock scoped_lock(m_mutex);
223             remove_package(package.session().id());
224         } 
225         // any other Z39.50 package is removed from statistics
226         else {
227             boost::mutex::scoped_lock scoped_lock(m_mutex);
228             remove_package(package.session().id());
229         }
230     }
231
232     // finally removing sessions and marking deads
233     if (is_closed_back || is_closed_front){        
234         boost::mutex::scoped_lock scoped_lock(m_mutex);
235
236         // marking backend dead if backend closed without fronted close 
237         if (is_closed_front == false)
238             add_dead(package.session().id());
239
240         remove_session(package.session().id());
241
242         // making sure that package is closed
243         package.session().close();
244     }
245 }
246             
247 // statistic manipulating functions, 
248 void yf::LoadBalance::Impl::add_dead(unsigned long session_id){
249
250     
251     std::string target = find_session_target(session_id);
252
253     if (target.size() != 0){
254         std::map<std::string, TargetStat>::iterator itarg;        
255         itarg = m_target_stat.find(target);
256         if (itarg != m_target_stat.end()
257             && itarg->second.deads < std::numeric_limits<unsigned int>::max()){
258             itarg->second.deads += 1;
259             // std:.cout << "add_dead " << session_id << " " << target 
260             //          << " d:" << itarg->second.deads << "\n";
261         }
262     }
263 };
264
265 //void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){
266 //    std::cout << "clear_dead " << session_id << "\n";
267 //};
268
269 void yf::LoadBalance::Impl::add_package(unsigned long session_id){
270
271     std::string target = find_session_target(session_id);
272
273     if (target.size() != 0){
274         std::map<std::string, TargetStat>::iterator itarg;        
275         itarg = m_target_stat.find(target);
276         if (itarg != m_target_stat.end()
277             && itarg->second.packages 
278                < std::numeric_limits<unsigned int>::max()){
279             itarg->second.packages += 1;
280             // std:.cout << "add_package " << session_id << " " << target 
281             //          << " p:" << itarg->second.packages << "\n";
282         }
283     }
284 };
285
286 void yf::LoadBalance::Impl::remove_package(unsigned long session_id){
287     std::string target = find_session_target(session_id);
288
289     if (target.size() != 0){
290         std::map<std::string, TargetStat>::iterator itarg;        
291         itarg = m_target_stat.find(target);
292         if (itarg != m_target_stat.end()
293             && itarg->second.packages > 0){
294             itarg->second.packages -= 1;
295             // std:.cout << "remove_package " << session_id << " " << target 
296             //          << " p:" << itarg->second.packages << "\n";
297         }
298     }
299 };
300
301 void yf::LoadBalance::Impl::add_session(unsigned long session_id, 
302                                         std::string target){
303
304     // finding and adding session
305     std::map<unsigned long, std::string>::iterator isess;
306     isess = m_session_target.find(session_id);
307     if (isess == m_session_target.end()){
308         m_session_target.insert(std::make_pair(session_id, target));
309     }
310
311     // finding and adding target statistics
312     std::map<std::string, TargetStat>::iterator itarg;
313     itarg = m_target_stat.find(target);
314     if (itarg == m_target_stat.end()){
315         TargetStat stat;
316         stat.sessions = 1;
317         stat.packages = 0;  // no idea why the defaut constructor TargetStat()
318         stat.deads = 0;     // is not initializig this correctly to zero ??
319        m_target_stat.insert(std::make_pair(target, stat));
320         // std:.cout << "add_session " << session_id << " " << target 
321         //          << " s:1\n";
322     } 
323     else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
324     {
325         itarg->second.sessions += 1;
326         // std:.cout << "add_session " << session_id << " " << target 
327         //          << " s:" << itarg->second.sessions << "\n";
328     }
329 };
330
331 void yf::LoadBalance::Impl::remove_session(unsigned long session_id){
332
333     std::string target;
334
335     // finding session
336     std::map<unsigned long, std::string>::iterator isess;
337     isess = m_session_target.find(session_id);
338     if (isess == m_session_target.end())
339         return;
340     else
341         target = isess->second;
342
343     // finding target statistics
344     std::map<std::string, TargetStat>::iterator itarg;
345     itarg = m_target_stat.find(target);
346     if (itarg == m_target_stat.end()){
347         m_session_target.erase(isess);
348         return;
349     }
350     
351     // counting session down
352     if (itarg->second.sessions > 0)
353         itarg->second.sessions -= 1;
354
355     // std:.cout << "remove_session " << session_id << " " << target 
356     //          << " s:" << itarg->second.sessions << "\n";
357     
358     // clearing empty sessions and targets
359     if (itarg->second.sessions == 0 && itarg->second.deads == 0 ){
360         m_target_stat.erase(itarg);
361         m_session_target.erase(isess);
362     }
363 };
364
365 std::string 
366 yf::LoadBalance::Impl::find_session_target(unsigned long session_id){
367
368     std::string target;
369     std::map<unsigned long, std::string>::iterator isess;
370     isess = m_session_target.find(session_id);
371     if (isess != m_session_target.end())
372         target = isess->second;
373     return target;
374 }
375
376
377 // cost functions
378 unsigned int yf::LoadBalance::Impl::cost(std::string target){
379
380     unsigned int cost;
381
382     if (target.size() != 0){
383         std::map<std::string, TargetStat>::iterator itarg;        
384         itarg = m_target_stat.find(target);
385         if (itarg != m_target_stat.end()){
386             cost = itarg->second.cost();
387         }
388     }
389     
390     //std::cout << "cost " << target << " c:" << cost << "\n";
391     return cost;
392 };
393
394 unsigned int yf::LoadBalance::Impl::dead(std::string target){
395
396     unsigned int dead;
397
398     if (target.size() != 0){
399         std::map<std::string, TargetStat>::iterator itarg;        
400         itarg = m_target_stat.find(target);
401         if (itarg != m_target_stat.end()){
402             dead = itarg->second.deads;
403         }
404     }
405     
406     //std::cout << "dead " << target << " d:" << dead << "\n";
407     return dead;
408 };
409
410
411
412
413 static mp::filter::Base* filter_creator()
414 {
415     return new mp::filter::LoadBalance;
416 }
417
418 extern "C" {
419     struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
420         0,
421         "load_balance",
422         filter_creator
423     };
424 }
425
426
427 /*
428  * Local variables:
429  * c-basic-offset: 4
430  * c-file-style: "Stroustrup"
431  * indent-tabs-mode: nil
432  * End:
433  * vim: shiftwidth=4 tabstop=8 expandtab
434  */
435