GPL v2.
[metaproxy-moved-to-github.git] / src / filter_load_balance.cpp
1 /* $Id: filter_load_balance.cpp,v 1.8 2007-05-09 21:23:09 adam Exp $
2    Copyright (c) 2005-2007, Index Data.
3
4 This file is part of Metaproxy.
5
6 Metaproxy is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Metaproxy; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #include "config.hpp"
23 #include "session.hpp"
24 #include "package.hpp"
25 #include "filter.hpp"
26 #include "filter_load_balance.hpp"
27 #include "util.hpp"
28
29
30 #include <boost/thread/mutex.hpp>
31 #include <boost/date_time/posix_time/posix_time.hpp>
32
33 #include <yaz/zgdu.h>
34
35 // remove max macro if already defined (defined later in <limits>)
36 #ifdef max
37 #undef max
38 #endif
39
40 //#include <iostream>
41 #include <list>
42 #include <map>
43 #include <limits>
44
45 namespace mp = metaproxy_1;
46 namespace yf = mp::filter;
47
48 namespace metaproxy_1 {
49     namespace filter {
50         class LoadBalance::Impl {
51         public:
52             Impl();
53             ~Impl();
54             void process(metaproxy_1::Package & package);
55             void configure(const xmlNode * ptr);
56         private:
57             // statistic manipulating functions, 
58             void add_dead(unsigned long session_id);
59             //void clear_dead(unsigned long session_id);
60             void add_package(unsigned long session_id);
61             void remove_package(unsigned long session_id);
62             void add_session(unsigned long session_id, std::string target);
63             void remove_session(unsigned long session_id);
64             std::string find_session_target(unsigned long session_id);
65
66             // cost functions
67             unsigned int cost(std::string target);
68             unsigned int dead(std::string target);
69
70             // local classes
71             class TargetStat {
72             public:
73                 unsigned int sessions;
74                 unsigned int packages;
75                 unsigned int deads;
76                 unsigned int cost() {
77                     unsigned int c = sessions + packages + deads;
78                     //std::cout << "stats  c:" << c 
79                     //          << " s:" << sessions 
80                     //          << " p:" << packages 
81                     //          << " d:" << deads 
82                     //          <<"\n";
83                     return c;
84                 }
85             };
86
87             // local protected databases
88             boost::mutex m_mutex;
89             std::map<std::string, TargetStat> m_target_stat;
90             std::map<unsigned long, std::string> m_session_target;
91         };
92     }
93 }
94
95 // define Pimpl wrapper forwarding to Impl
96  
97 yf::LoadBalance::LoadBalance() : m_p(new Impl)
98 {
99 }
100
101 yf::LoadBalance::~LoadBalance()
102 {  // must have a destructor because of boost::scoped_ptr
103 }
104
105 void yf::LoadBalance::configure(const xmlNode *xmlnode)
106 {
107     m_p->configure(xmlnode);
108 }
109
110 void yf::LoadBalance::process(mp::Package &package) const
111 {
112     m_p->process(package);
113 }
114
115
116 // define Implementation stuff
117
118
119
120 yf::LoadBalance::Impl::Impl()
121 {
122 }
123
124 yf::LoadBalance::Impl::~Impl()
125
126 }
127
128 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
129 {
130 }
131
132 void yf::LoadBalance::Impl::process(mp::Package &package)
133 {
134
135     bool is_closed_front = false;
136     bool is_closed_back = false;
137
138     // checking for closed front end packages
139     if (package.session().is_closed()){
140         is_closed_front = true;
141     }    
142
143     Z_GDU *gdu_req = package.request().get();
144
145     // passing anything but z3950 packages
146     if (gdu_req && gdu_req->which == Z_GDU_Z3950){
147
148         // target selecting only on Z39.50 init request
149         if (gdu_req->u.z3950->which == Z_APDU_initRequest){
150
151             mp::odr odr_en(ODR_ENCODE);
152             Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest;
153
154             // extracting virtual hosts
155             std::list<std::string> vhosts;
156             
157             mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
158             
159             // choosing one target according to load-balancing algorithm  
160             
161             if (vhosts.size()){
162                 std::string target;
163                 unsigned int cost = std::numeric_limits<unsigned int>::max();
164                 
165                 { //locking scope for local databases
166                     boost::mutex::scoped_lock scoped_lock(m_mutex);
167                     
168                     // load-balancing algorithm goes here
169                     //target = *vhosts.begin();
170                     for(std::list<std::string>::const_iterator ivh 
171                             = vhosts.begin(); 
172                         ivh != vhosts.end();
173                         ivh++){
174                         if ((*ivh).size() != 0){
175                             unsigned int vhcost 
176                                 = yf::LoadBalance::Impl::cost(*ivh);
177                             if (cost > vhcost){
178                                 cost = vhcost;
179                                 target = *ivh;
180                             }
181                         }
182                      } 
183
184                     // updating local database
185                     add_session(package.session().id(), target);
186                     yf::LoadBalance::Impl::cost(target);
187                     add_package(package.session().id());
188                 }
189                 
190                 // copying new target into init package
191                 mp::util::set_vhost_otherinfo(&(org_init->otherInfo), 
192                                               odr_en, target); 
193                 package.request() = gdu_req;
194         }
195             
196         }
197         // frontend Z39.50 close request is added to statistics and marked
198         else if (gdu_req->u.z3950->which == Z_APDU_close){
199             is_closed_front = true;
200             boost::mutex::scoped_lock scoped_lock(m_mutex);        
201             add_package(package.session().id());
202         }    
203         // any other Z39.50 package is added to statistics
204         else {
205             boost::mutex::scoped_lock scoped_lock(m_mutex);        
206             add_package(package.session().id());
207         }
208     }
209         
210     // moving all package types 
211     package.move();
212
213
214     // checking for closed back end packages
215     if (package.session().is_closed())
216         is_closed_back = true;
217
218     Z_GDU *gdu_res = package.response().get();
219
220     // passing anything but z3950 packages
221     if (gdu_res && gdu_res->which == Z_GDU_Z3950){
222
223         // session closing only on Z39.50 close response
224         if (gdu_res->u.z3950->which == Z_APDU_close){
225             is_closed_back = true;
226             boost::mutex::scoped_lock scoped_lock(m_mutex);
227             remove_package(package.session().id());
228         } 
229         // any other Z39.50 package is removed from statistics
230         else {
231             boost::mutex::scoped_lock scoped_lock(m_mutex);
232             remove_package(package.session().id());
233         }
234     }
235
236     // finally removing sessions and marking deads
237     if (is_closed_back || is_closed_front){        
238         boost::mutex::scoped_lock scoped_lock(m_mutex);
239
240         // marking backend dead if backend closed without fronted close 
241         if (is_closed_front == false)
242             add_dead(package.session().id());
243
244         remove_session(package.session().id());
245
246         // making sure that package is closed
247         package.session().close();
248     }
249 }
250             
251 // getting timestamp for receiving of package
252 //boost::posix_time::ptime receive_time
253 //    = boost::posix_time::microsec_clock::local_time();
254 // //<< receive_time << " "
255 // //<< to_iso_string(receive_time) << " "
256 //<< to_iso_extended_string(receive_time) << " "
257
258
259 // statistic manipulating functions, 
260 void yf::LoadBalance::Impl::add_dead(unsigned long session_id){
261
262     
263     std::string target = find_session_target(session_id);
264
265     if (target.size() != 0){
266         std::map<std::string, TargetStat>::iterator itarg;        
267         itarg = m_target_stat.find(target);
268         if (itarg != m_target_stat.end()
269             && itarg->second.deads < std::numeric_limits<unsigned int>::max()){
270             itarg->second.deads += 1;
271             // std:.cout << "add_dead " << session_id << " " << target 
272             //          << " d:" << itarg->second.deads << "\n";
273         }
274     }
275 };
276
277 //void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){
278 //    std::cout << "clear_dead " << session_id << "\n";
279 //};
280
281 void yf::LoadBalance::Impl::add_package(unsigned long session_id){
282
283     std::string target = find_session_target(session_id);
284
285     if (target.size() != 0){
286         std::map<std::string, TargetStat>::iterator itarg;        
287         itarg = m_target_stat.find(target);
288         if (itarg != m_target_stat.end()
289             && itarg->second.packages 
290                < std::numeric_limits<unsigned int>::max()){
291             itarg->second.packages += 1;
292             // std:.cout << "add_package " << session_id << " " << target 
293             //          << " p:" << itarg->second.packages << "\n";
294         }
295     }
296 };
297
298 void yf::LoadBalance::Impl::remove_package(unsigned long session_id){
299     std::string target = find_session_target(session_id);
300
301     if (target.size() != 0){
302         std::map<std::string, TargetStat>::iterator itarg;        
303         itarg = m_target_stat.find(target);
304         if (itarg != m_target_stat.end()
305             && itarg->second.packages > 0){
306             itarg->second.packages -= 1;
307             // std:.cout << "remove_package " << session_id << " " << target 
308             //          << " p:" << itarg->second.packages << "\n";
309         }
310     }
311 };
312
313 void yf::LoadBalance::Impl::add_session(unsigned long session_id, 
314                                         std::string target){
315
316     // finding and adding session
317     std::map<unsigned long, std::string>::iterator isess;
318     isess = m_session_target.find(session_id);
319     if (isess == m_session_target.end()){
320         m_session_target.insert(std::make_pair(session_id, target));
321     }
322
323     // finding and adding target statistics
324     std::map<std::string, TargetStat>::iterator itarg;
325     itarg = m_target_stat.find(target);
326     if (itarg == m_target_stat.end()){
327         TargetStat stat;
328         stat.sessions = 1;
329         stat.packages = 0;  // no idea why the defaut constructor TargetStat()
330         stat.deads = 0;     // is not initializig this correctly to zero ??
331        m_target_stat.insert(std::make_pair(target, stat));
332         // std:.cout << "add_session " << session_id << " " << target 
333         //          << " s:1\n";
334     } 
335     else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
336     {
337         itarg->second.sessions += 1;
338         // std:.cout << "add_session " << session_id << " " << target 
339         //          << " s:" << itarg->second.sessions << "\n";
340     }
341 };
342
343 void yf::LoadBalance::Impl::remove_session(unsigned long session_id){
344
345     std::string target;
346
347     // finding session
348     std::map<unsigned long, std::string>::iterator isess;
349     isess = m_session_target.find(session_id);
350     if (isess == m_session_target.end())
351         return;
352     else
353         target = isess->second;
354
355     // finding target statistics
356     std::map<std::string, TargetStat>::iterator itarg;
357     itarg = m_target_stat.find(target);
358     if (itarg == m_target_stat.end()){
359         m_session_target.erase(isess);
360         return;
361     }
362     
363     // counting session down
364     if (itarg->second.sessions > 0)
365         itarg->second.sessions -= 1;
366
367     // std:.cout << "remove_session " << session_id << " " << target 
368     //          << " s:" << itarg->second.sessions << "\n";
369     
370     // clearing empty sessions and targets
371     if (itarg->second.sessions == 0 && itarg->second.deads == 0 ){
372         m_target_stat.erase(itarg);
373         m_session_target.erase(isess);
374     }
375 };
376
377 std::string 
378 yf::LoadBalance::Impl::find_session_target(unsigned long session_id){
379
380     std::string target;
381     std::map<unsigned long, std::string>::iterator isess;
382     isess = m_session_target.find(session_id);
383     if (isess != m_session_target.end())
384         target = isess->second;
385     return target;
386 }
387
388
389 // cost functions
390 unsigned int yf::LoadBalance::Impl::cost(std::string target){
391
392     unsigned int cost;
393
394     if (target.size() != 0){
395         std::map<std::string, TargetStat>::iterator itarg;        
396         itarg = m_target_stat.find(target);
397         if (itarg != m_target_stat.end()){
398             cost = itarg->second.cost();
399         }
400     }
401     
402     //std::cout << "cost " << target << " c:" << cost << "\n";
403     return cost;
404 };
405
406 unsigned int yf::LoadBalance::Impl::dead(std::string target){
407
408     unsigned int dead;
409
410     if (target.size() != 0){
411         std::map<std::string, TargetStat>::iterator itarg;        
412         itarg = m_target_stat.find(target);
413         if (itarg != m_target_stat.end()){
414             dead = itarg->second.deads;
415         }
416     }
417     
418     //std::cout << "dead " << target << " d:" << dead << "\n";
419     return dead;
420 };
421
422
423
424
425 static mp::filter::Base* filter_creator()
426 {
427     return new mp::filter::LoadBalance;
428 }
429
430 extern "C" {
431     struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
432         0,
433         "load_balance",
434         filter_creator
435     };
436 }
437
438
439 /*
440  * Local variables:
441  * c-basic-offset: 4
442  * indent-tabs-mode: nil
443  * c-file-style: "stroustrup"
444  * End:
445  * vim: shiftwidth=4 tabstop=8 expandtab
446  */