1cd189f2cc2b4cba7de614dca4fa96a2542c8105
[metaproxy-moved-to-github.git] / src / filter_load_balance.cpp
1 /* $Id: filter_load_balance.cpp,v 1.7 2007-01-25 14:05:54 adam Exp $
2    Copyright (c) 2005-2007, Index Data.
3
4    See the LICENSE file for details
5  */
6
7 #include "config.hpp"
8 #include "session.hpp"
9 #include "package.hpp"
10 #include "filter.hpp"
11 #include "filter_load_balance.hpp"
12 #include "util.hpp"
13
14
15 #include <boost/thread/mutex.hpp>
16 #include <boost/date_time/posix_time/posix_time.hpp>
17
18 #include <yaz/zgdu.h>
19
20 // remove max macro if already defined (defined later in <limits>)
21 #ifdef max
22 #undef max
23 #endif
24
25 //#include <iostream>
26 #include <list>
27 #include <map>
28 #include <limits>
29
30 namespace mp = metaproxy_1;
31 namespace yf = mp::filter;
32
33 namespace metaproxy_1 {
34     namespace filter {
35         class LoadBalance::Impl {
36         public:
37             Impl();
38             ~Impl();
39             void process(metaproxy_1::Package & package);
40             void configure(const xmlNode * ptr);
41         private:
42             // statistic manipulating functions, 
43             void add_dead(unsigned long session_id);
44             //void clear_dead(unsigned long session_id);
45             void add_package(unsigned long session_id);
46             void remove_package(unsigned long session_id);
47             void add_session(unsigned long session_id, std::string target);
48             void remove_session(unsigned long session_id);
49             std::string find_session_target(unsigned long session_id);
50
51             // cost functions
52             unsigned int cost(std::string target);
53             unsigned int dead(std::string target);
54
55             // local classes
56             class TargetStat {
57             public:
58                 unsigned int sessions;
59                 unsigned int packages;
60                 unsigned int deads;
61                 unsigned int cost() {
62                     unsigned int c = sessions + packages + deads;
63                     //std::cout << "stats  c:" << c 
64                     //          << " s:" << sessions 
65                     //          << " p:" << packages 
66                     //          << " d:" << deads 
67                     //          <<"\n";
68                     return c;
69                 }
70             };
71
72             // local protected databases
73             boost::mutex m_mutex;
74             std::map<std::string, TargetStat> m_target_stat;
75             std::map<unsigned long, std::string> m_session_target;
76         };
77     }
78 }
79
80 // define Pimpl wrapper forwarding to Impl
81  
82 yf::LoadBalance::LoadBalance() : m_p(new Impl)
83 {
84 }
85
86 yf::LoadBalance::~LoadBalance()
87 {  // must have a destructor because of boost::scoped_ptr
88 }
89
90 void yf::LoadBalance::configure(const xmlNode *xmlnode)
91 {
92     m_p->configure(xmlnode);
93 }
94
95 void yf::LoadBalance::process(mp::Package &package) const
96 {
97     m_p->process(package);
98 }
99
100
101 // define Implementation stuff
102
103
104
105 yf::LoadBalance::Impl::Impl()
106 {
107 }
108
109 yf::LoadBalance::Impl::~Impl()
110
111 }
112
113 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
114 {
115 }
116
117 void yf::LoadBalance::Impl::process(mp::Package &package)
118 {
119
120     bool is_closed_front = false;
121     bool is_closed_back = false;
122
123     // checking for closed front end packages
124     if (package.session().is_closed()){
125         is_closed_front = true;
126     }    
127
128     Z_GDU *gdu_req = package.request().get();
129
130     // passing anything but z3950 packages
131     if (gdu_req && gdu_req->which == Z_GDU_Z3950){
132
133         // target selecting only on Z39.50 init request
134         if (gdu_req->u.z3950->which == Z_APDU_initRequest){
135
136             mp::odr odr_en(ODR_ENCODE);
137             Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest;
138
139             // extracting virtual hosts
140             std::list<std::string> vhosts;
141             
142             mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
143             
144             // choosing one target according to load-balancing algorithm  
145             
146             if (vhosts.size()){
147                 std::string target;
148                 unsigned int cost = std::numeric_limits<unsigned int>::max();
149                 
150                 { //locking scope for local databases
151                     boost::mutex::scoped_lock scoped_lock(m_mutex);
152                     
153                     // load-balancing algorithm goes here
154                     //target = *vhosts.begin();
155                     for(std::list<std::string>::const_iterator ivh 
156                             = vhosts.begin(); 
157                         ivh != vhosts.end();
158                         ivh++){
159                         if ((*ivh).size() != 0){
160                             unsigned int vhcost 
161                                 = yf::LoadBalance::Impl::cost(*ivh);
162                             if (cost > vhcost){
163                                 cost = vhcost;
164                                 target = *ivh;
165                             }
166                         }
167                      } 
168
169                     // updating local database
170                     add_session(package.session().id(), target);
171                     yf::LoadBalance::Impl::cost(target);
172                     add_package(package.session().id());
173                 }
174                 
175                 // copying new target into init package
176                 mp::util::set_vhost_otherinfo(&(org_init->otherInfo), 
177                                               odr_en, target); 
178                 package.request() = gdu_req;
179         }
180             
181         }
182         // frontend Z39.50 close request is added to statistics and marked
183         else if (gdu_req->u.z3950->which == Z_APDU_close){
184             is_closed_front = true;
185             boost::mutex::scoped_lock scoped_lock(m_mutex);        
186             add_package(package.session().id());
187         }    
188         // any other Z39.50 package is added to statistics
189         else {
190             boost::mutex::scoped_lock scoped_lock(m_mutex);        
191             add_package(package.session().id());
192         }
193     }
194         
195     // moving all package types 
196     package.move();
197
198
199     // checking for closed back end packages
200     if (package.session().is_closed())
201         is_closed_back = true;
202
203     Z_GDU *gdu_res = package.response().get();
204
205     // passing anything but z3950 packages
206     if (gdu_res && gdu_res->which == Z_GDU_Z3950){
207
208         // session closing only on Z39.50 close response
209         if (gdu_res->u.z3950->which == Z_APDU_close){
210             is_closed_back = true;
211             boost::mutex::scoped_lock scoped_lock(m_mutex);
212             remove_package(package.session().id());
213         } 
214         // any other Z39.50 package is removed from statistics
215         else {
216             boost::mutex::scoped_lock scoped_lock(m_mutex);
217             remove_package(package.session().id());
218         }
219     }
220
221     // finally removing sessions and marking deads
222     if (is_closed_back || is_closed_front){        
223         boost::mutex::scoped_lock scoped_lock(m_mutex);
224
225         // marking backend dead if backend closed without fronted close 
226         if (is_closed_front == false)
227             add_dead(package.session().id());
228
229         remove_session(package.session().id());
230
231         // making sure that package is closed
232         package.session().close();
233     }
234 }
235             
236 // getting timestamp for receiving of package
237 //boost::posix_time::ptime receive_time
238 //    = boost::posix_time::microsec_clock::local_time();
239 // //<< receive_time << " "
240 // //<< to_iso_string(receive_time) << " "
241 //<< to_iso_extended_string(receive_time) << " "
242
243
244 // statistic manipulating functions, 
245 void yf::LoadBalance::Impl::add_dead(unsigned long session_id){
246
247     
248     std::string target = find_session_target(session_id);
249
250     if (target.size() != 0){
251         std::map<std::string, TargetStat>::iterator itarg;        
252         itarg = m_target_stat.find(target);
253         if (itarg != m_target_stat.end()
254             && itarg->second.deads < std::numeric_limits<unsigned int>::max()){
255             itarg->second.deads += 1;
256             // std:.cout << "add_dead " << session_id << " " << target 
257             //          << " d:" << itarg->second.deads << "\n";
258         }
259     }
260 };
261
262 //void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){
263 //    std::cout << "clear_dead " << session_id << "\n";
264 //};
265
266 void yf::LoadBalance::Impl::add_package(unsigned long session_id){
267
268     std::string target = find_session_target(session_id);
269
270     if (target.size() != 0){
271         std::map<std::string, TargetStat>::iterator itarg;        
272         itarg = m_target_stat.find(target);
273         if (itarg != m_target_stat.end()
274             && itarg->second.packages 
275                < std::numeric_limits<unsigned int>::max()){
276             itarg->second.packages += 1;
277             // std:.cout << "add_package " << session_id << " " << target 
278             //          << " p:" << itarg->second.packages << "\n";
279         }
280     }
281 };
282
283 void yf::LoadBalance::Impl::remove_package(unsigned long session_id){
284     std::string target = find_session_target(session_id);
285
286     if (target.size() != 0){
287         std::map<std::string, TargetStat>::iterator itarg;        
288         itarg = m_target_stat.find(target);
289         if (itarg != m_target_stat.end()
290             && itarg->second.packages > 0){
291             itarg->second.packages -= 1;
292             // std:.cout << "remove_package " << session_id << " " << target 
293             //          << " p:" << itarg->second.packages << "\n";
294         }
295     }
296 };
297
298 void yf::LoadBalance::Impl::add_session(unsigned long session_id, 
299                                         std::string target){
300
301     // finding and adding session
302     std::map<unsigned long, std::string>::iterator isess;
303     isess = m_session_target.find(session_id);
304     if (isess == m_session_target.end()){
305         m_session_target.insert(std::make_pair(session_id, target));
306     }
307
308     // finding and adding target statistics
309     std::map<std::string, TargetStat>::iterator itarg;
310     itarg = m_target_stat.find(target);
311     if (itarg == m_target_stat.end()){
312         TargetStat stat;
313         stat.sessions = 1;
314         stat.packages = 0;  // no idea why the defaut constructor TargetStat()
315         stat.deads = 0;     // is not initializig this correctly to zero ??
316        m_target_stat.insert(std::make_pair(target, stat));
317         // std:.cout << "add_session " << session_id << " " << target 
318         //          << " s:1\n";
319     } 
320     else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
321     {
322         itarg->second.sessions += 1;
323         // std:.cout << "add_session " << session_id << " " << target 
324         //          << " s:" << itarg->second.sessions << "\n";
325     }
326 };
327
328 void yf::LoadBalance::Impl::remove_session(unsigned long session_id){
329
330     std::string target;
331
332     // finding session
333     std::map<unsigned long, std::string>::iterator isess;
334     isess = m_session_target.find(session_id);
335     if (isess == m_session_target.end())
336         return;
337     else
338         target = isess->second;
339
340     // finding target statistics
341     std::map<std::string, TargetStat>::iterator itarg;
342     itarg = m_target_stat.find(target);
343     if (itarg == m_target_stat.end()){
344         m_session_target.erase(isess);
345         return;
346     }
347     
348     // counting session down
349     if (itarg->second.sessions > 0)
350         itarg->second.sessions -= 1;
351
352     // std:.cout << "remove_session " << session_id << " " << target 
353     //          << " s:" << itarg->second.sessions << "\n";
354     
355     // clearing empty sessions and targets
356     if (itarg->second.sessions == 0 && itarg->second.deads == 0 ){
357         m_target_stat.erase(itarg);
358         m_session_target.erase(isess);
359     }
360 };
361
362 std::string 
363 yf::LoadBalance::Impl::find_session_target(unsigned long session_id){
364
365     std::string target;
366     std::map<unsigned long, std::string>::iterator isess;
367     isess = m_session_target.find(session_id);
368     if (isess != m_session_target.end())
369         target = isess->second;
370     return target;
371 }
372
373
374 // cost functions
375 unsigned int yf::LoadBalance::Impl::cost(std::string target){
376
377     unsigned int cost;
378
379     if (target.size() != 0){
380         std::map<std::string, TargetStat>::iterator itarg;        
381         itarg = m_target_stat.find(target);
382         if (itarg != m_target_stat.end()){
383             cost = itarg->second.cost();
384         }
385     }
386     
387     //std::cout << "cost " << target << " c:" << cost << "\n";
388     return cost;
389 };
390
391 unsigned int yf::LoadBalance::Impl::dead(std::string target){
392
393     unsigned int dead;
394
395     if (target.size() != 0){
396         std::map<std::string, TargetStat>::iterator itarg;        
397         itarg = m_target_stat.find(target);
398         if (itarg != m_target_stat.end()){
399             dead = itarg->second.deads;
400         }
401     }
402     
403     //std::cout << "dead " << target << " d:" << dead << "\n";
404     return dead;
405 };
406
407
408
409
410 static mp::filter::Base* filter_creator()
411 {
412     return new mp::filter::LoadBalance;
413 }
414
415 extern "C" {
416     struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
417         0,
418         "load_balance",
419         filter_creator
420     };
421 }
422
423
424 /*
425  * Local variables:
426  * c-basic-offset: 4
427  * indent-tabs-mode: nil
428  * c-file-style: "stroustrup"
429  * End:
430  * vim: shiftwidth=4 tabstop=8 expandtab
431  */