removed unnecessary debug output statements
[metaproxy-moved-to-github.git] / src / filter_load_balance.cpp
1 /* $Id: filter_load_balance.cpp,v 1.5 2007-01-04 13:22:56 marc Exp $
2    Copyright (c) 2005-2006, Index Data.
3
4    See the LICENSE file for details
5  */
6
7 #include "config.hpp"
8 #include "session.hpp"
9 #include "package.hpp"
10 #include "filter.hpp"
11 #include "filter_load_balance.hpp"
12 #include "util.hpp"
13
14 #include <boost/thread/mutex.hpp>
15 #include <boost/date_time/posix_time/posix_time.hpp>
16
17 #include <yaz/zgdu.h>
18
19 //#include <iostream>
20 #include <list>
21 #include <map>
22 #include <limits>
23
24 namespace mp = metaproxy_1;
25 namespace yf = mp::filter;
26
27 namespace metaproxy_1 {
28     namespace filter {
29         class LoadBalance::Impl {
30         public:
31             Impl();
32             ~Impl();
33             void process(metaproxy_1::Package & package);
34             void configure(const xmlNode * ptr);
35         private:
36             // statistic manipulating functions, 
37             void add_dead(unsigned long session_id);
38             //void clear_dead(unsigned long session_id);
39             void add_package(unsigned long session_id);
40             void remove_package(unsigned long session_id);
41             void add_session(unsigned long session_id, std::string target);
42             void remove_session(unsigned long session_id);
43             std::string find_session_target(unsigned long session_id);
44
45             // cost functions
46             unsigned int cost(std::string target);
47             unsigned int dead(std::string target);
48
49             // local classes
50             class TargetStat {
51             public:
52                 unsigned int sessions;
53                 unsigned int packages;
54                 unsigned int deads;
55                 unsigned int cost() {
56                     unsigned int c = sessions + packages + deads;
57                     //std::cout << "stats  c:" << c 
58                     //          << " s:" << sessions 
59                     //          << " p:" << packages 
60                     //          << " d:" << deads 
61                     //          <<"\n";
62                     return c;
63                 }
64             };
65
66             // local protected databases
67             boost::mutex m_mutex;
68             std::map<std::string, TargetStat> m_target_stat;
69             std::map<unsigned long, std::string> m_session_target;
70         };
71     }
72 }
73
74 // define Pimpl wrapper forwarding to Impl
75  
76 yf::LoadBalance::LoadBalance() : m_p(new Impl)
77 {
78 }
79
80 yf::LoadBalance::~LoadBalance()
81 {  // must have a destructor because of boost::scoped_ptr
82 }
83
84 void yf::LoadBalance::configure(const xmlNode *xmlnode)
85 {
86     m_p->configure(xmlnode);
87 }
88
89 void yf::LoadBalance::process(mp::Package &package) const
90 {
91     m_p->process(package);
92 }
93
94
95 // define Implementation stuff
96
97
98
99 yf::LoadBalance::Impl::Impl()
100 {
101 }
102
103 yf::LoadBalance::Impl::~Impl()
104
105 }
106
107 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
108 {
109 }
110
111 void yf::LoadBalance::Impl::process(mp::Package &package)
112 {
113
114     bool is_closed_front = false;
115     bool is_closed_back = false;
116
117     // checking for closed front end packages
118     if (package.session().is_closed()){
119         is_closed_front = true;
120     }    
121
122     Z_GDU *gdu_req = package.request().get();
123
124     // passing anything but z3950 packages
125     if (gdu_req && gdu_req->which == Z_GDU_Z3950){
126
127         // target selecting only on Z39.50 init request
128         if (gdu_req->u.z3950->which == Z_APDU_initRequest){
129
130             mp::odr odr_en(ODR_ENCODE);
131             Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest;
132
133             // extracting virtual hosts
134             std::list<std::string> vhosts;
135             
136             mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
137             
138             // choosing one target according to load-balancing algorithm  
139             
140             if (vhosts.size()){
141                 std::string target;
142                 unsigned int cost = std::numeric_limits<unsigned int>::max();
143                 
144                 { //locking scope for local databases
145                     boost::mutex::scoped_lock scoped_lock(m_mutex);
146                     
147                     // load-balancing algorithm goes here
148                     //target = *vhosts.begin();
149                     for(std::list<std::string>::const_iterator ivh 
150                             = vhosts.begin(); 
151                         ivh != vhosts.end();
152                         ivh++){
153                         if ((*ivh).size() != 0){
154                             unsigned int vhcost 
155                                 = yf::LoadBalance::Impl::cost(*ivh);
156                             if (cost > vhcost){
157                                 cost = vhcost;
158                                 target = *ivh;
159                             }
160                         }
161                      } 
162
163                     // updating local database
164                     add_session(package.session().id(), target);
165                     yf::LoadBalance::Impl::cost(target);
166                     add_package(package.session().id());
167                 }
168                 
169                 // copying new target into init package
170                 mp::util::set_vhost_otherinfo(&(org_init->otherInfo), 
171                                               odr_en, target); 
172                 package.request() = gdu_req;
173         }
174             
175         }
176         // frontend Z39.50 close request is added to statistics and marked
177         else if (gdu_req->u.z3950->which == Z_APDU_close){
178             is_closed_front = true;
179             boost::mutex::scoped_lock scoped_lock(m_mutex);        
180             add_package(package.session().id());
181         }    
182         // any other Z39.50 package is added to statistics
183         else {
184             boost::mutex::scoped_lock scoped_lock(m_mutex);        
185             add_package(package.session().id());
186         }
187     }
188         
189     // moving all package types 
190     package.move();
191
192
193     // checking for closed back end packages
194     if (package.session().is_closed())
195         is_closed_back = true;
196
197     Z_GDU *gdu_res = package.response().get();
198
199     // passing anything but z3950 packages
200     if (gdu_res && gdu_res->which == Z_GDU_Z3950){
201
202         // session closing only on Z39.50 close response
203         if (gdu_res->u.z3950->which == Z_APDU_close){
204             is_closed_back = true;
205             boost::mutex::scoped_lock scoped_lock(m_mutex);
206             remove_package(package.session().id());
207         } 
208         // any other Z39.50 package is removed from statistics
209         else {
210             boost::mutex::scoped_lock scoped_lock(m_mutex);
211             remove_package(package.session().id());
212         }
213     }
214
215     // finally removing sessions and marking deads
216     if (is_closed_back || is_closed_front){        
217         boost::mutex::scoped_lock scoped_lock(m_mutex);
218
219         // marking backend dead if backend closed without fronted close 
220         if (is_closed_front == false)
221             add_dead(package.session().id());
222
223         remove_session(package.session().id());
224
225         // making sure that package is closed
226         package.session().close();
227     }
228 }
229             
230 // getting timestamp for receiving of package
231 //boost::posix_time::ptime receive_time
232 //    = boost::posix_time::microsec_clock::local_time();
233 // //<< receive_time << " "
234 // //<< to_iso_string(receive_time) << " "
235 //<< to_iso_extended_string(receive_time) << " "
236
237
238 // statistic manipulating functions, 
239 void yf::LoadBalance::Impl::add_dead(unsigned long session_id){
240
241     
242     std::string target = find_session_target(session_id);
243
244     if (target.size() != 0){
245         std::map<std::string, TargetStat>::iterator itarg;        
246         itarg = m_target_stat.find(target);
247         if (itarg != m_target_stat.end()
248             && itarg->second.deads < std::numeric_limits<unsigned int>::max()){
249             itarg->second.deads += 1;
250             // std:.cout << "add_dead " << session_id << " " << target 
251             //          << " d:" << itarg->second.deads << "\n";
252         }
253     }
254 };
255
256 //void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){
257 //    std::cout << "clear_dead " << session_id << "\n";
258 //};
259
260 void yf::LoadBalance::Impl::add_package(unsigned long session_id){
261
262     std::string target = find_session_target(session_id);
263
264     if (target.size() != 0){
265         std::map<std::string, TargetStat>::iterator itarg;        
266         itarg = m_target_stat.find(target);
267         if (itarg != m_target_stat.end()
268             && itarg->second.packages 
269                < std::numeric_limits<unsigned int>::max()){
270             itarg->second.packages += 1;
271             // std:.cout << "add_package " << session_id << " " << target 
272             //          << " p:" << itarg->second.packages << "\n";
273         }
274     }
275 };
276
277 void yf::LoadBalance::Impl::remove_package(unsigned long session_id){
278     std::string target = find_session_target(session_id);
279
280     if (target.size() != 0){
281         std::map<std::string, TargetStat>::iterator itarg;        
282         itarg = m_target_stat.find(target);
283         if (itarg != m_target_stat.end()
284             && itarg->second.packages > 0){
285             itarg->second.packages -= 1;
286             // std:.cout << "remove_package " << session_id << " " << target 
287             //          << " p:" << itarg->second.packages << "\n";
288         }
289     }
290 };
291
292 void yf::LoadBalance::Impl::add_session(unsigned long session_id, 
293                                         std::string target){
294
295     // finding and adding session
296     std::map<unsigned long, std::string>::iterator isess;
297     isess = m_session_target.find(session_id);
298     if (isess == m_session_target.end()){
299         m_session_target.insert(std::make_pair(session_id, target));
300     }
301
302     // finding and adding target statistics
303     std::map<std::string, TargetStat>::iterator itarg;
304     itarg = m_target_stat.find(target);
305     if (itarg == m_target_stat.end()){
306         TargetStat stat;
307         stat.sessions = 1;
308         stat.packages = 0;  // no idea why the defaut constructor TargetStat()
309         stat.deads = 0;     // is not initializig this correctly to zero ??
310        m_target_stat.insert(std::make_pair(target, stat));
311         // std:.cout << "add_session " << session_id << " " << target 
312         //          << " s:1\n";
313     } 
314     else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
315     {
316         itarg->second.sessions += 1;
317         // std:.cout << "add_session " << session_id << " " << target 
318         //          << " s:" << itarg->second.sessions << "\n";
319     }
320 };
321
322 void yf::LoadBalance::Impl::remove_session(unsigned long session_id){
323
324     std::string target;
325
326     // finding session
327     std::map<unsigned long, std::string>::iterator isess;
328     isess = m_session_target.find(session_id);
329     if (isess == m_session_target.end())
330         return;
331     else
332         target = isess->second;
333
334     // finding target statistics
335     std::map<std::string, TargetStat>::iterator itarg;
336     itarg = m_target_stat.find(target);
337     if (itarg == m_target_stat.end()){
338         m_session_target.erase(isess);
339         return;
340     }
341     
342     // counting session down
343     if (itarg->second.sessions > 0)
344         itarg->second.sessions -= 1;
345
346     // std:.cout << "remove_session " << session_id << " " << target 
347     //          << " s:" << itarg->second.sessions << "\n";
348     
349     // clearing empty sessions and targets
350     if (itarg->second.sessions == 0 && itarg->second.deads == 0 ){
351         m_target_stat.erase(itarg);
352         m_session_target.erase(isess);
353     }
354 };
355
356 std::string 
357 yf::LoadBalance::Impl::find_session_target(unsigned long session_id){
358
359     std::string target;
360     std::map<unsigned long, std::string>::iterator isess;
361     isess = m_session_target.find(session_id);
362     if (isess != m_session_target.end())
363         target = isess->second;
364     return target;
365 }
366
367
368 // cost functions
369 unsigned int yf::LoadBalance::Impl::cost(std::string target){
370
371     unsigned int cost;
372
373     if (target.size() != 0){
374         std::map<std::string, TargetStat>::iterator itarg;        
375         itarg = m_target_stat.find(target);
376         if (itarg != m_target_stat.end()){
377             cost = itarg->second.cost();
378         }
379     }
380     
381     //std::cout << "cost " << target << " c:" << cost << "\n";
382     return cost;
383 };
384
385 unsigned int yf::LoadBalance::Impl::dead(std::string target){
386
387     unsigned int dead;
388
389     if (target.size() != 0){
390         std::map<std::string, TargetStat>::iterator itarg;        
391         itarg = m_target_stat.find(target);
392         if (itarg != m_target_stat.end()){
393             dead = itarg->second.deads;
394         }
395     }
396     
397     //std::cout << "dead " << target << " d:" << dead << "\n";
398     return dead;
399 };
400
401
402
403
404 static mp::filter::Base* filter_creator()
405 {
406     return new mp::filter::LoadBalance;
407 }
408
409 extern "C" {
410     struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
411         0,
412         "load_balance",
413         filter_creator
414     };
415 }
416
417
418 /*
419  * Local variables:
420  * c-basic-offset: 4
421  * indent-tabs-mode: nil
422  * c-file-style: "stroustrup"
423  * End:
424  * vim: shiftwidth=4 tabstop=8 expandtab
425  */