counting dead backends correctly
[metaproxy-moved-to-github.git] / src / filter_load_balance.cpp
1 /* $Id: filter_load_balance.cpp,v 1.3 2007-01-03 16:25:24 marc Exp $
2    Copyright (c) 2005-2006, Index Data.
3
4    See the LICENSE file for details
5  */
6
7 #include "config.hpp"
8 #include "session.hpp"
9 #include "package.hpp"
10 #include "filter.hpp"
11 #include "filter_load_balance.hpp"
12 #include "util.hpp"
13
14 #include <boost/thread/mutex.hpp>
15 #include <boost/date_time/posix_time/posix_time.hpp>
16
17 #include <yaz/zgdu.h>
18
19 #include <iostream>
20 #include <list>
21 #include <map>
22 #include <limits>
23
24 namespace mp = metaproxy_1;
25 namespace yf = mp::filter;
26
27 namespace metaproxy_1 {
28     namespace filter {
29         class LoadBalance::Impl {
30         public:
31             Impl();
32             ~Impl();
33             void process(metaproxy_1::Package & package);
34             void configure(const xmlNode * ptr);
35         private:
36             // statistic manipulating functions, 
37             void add_dead(unsigned long session_id);
38             //void clear_dead(unsigned long session_id);
39             void add_package(unsigned long session_id);
40             void remove_package(unsigned long session_id);
41             void add_session(unsigned long session_id, std::string target);
42             void remove_session(unsigned long session_id);
43             std::string find_session_target(unsigned long session_id);
44
45             // cost functions
46             unsigned int cost(std::string target);
47             unsigned int dead(std::string target);
48
49             // local classes
50             class TargetStat {
51             public:
52                 unsigned int sessions;
53                 unsigned int packages;
54                 unsigned int deads;
55                 unsigned int cost() {
56                     unsigned int c = sessions + packages + deads;
57                     std::cout << "stats  c:" << c 
58                               << " s:" << sessions 
59                               << " p:" << packages 
60                               << " d:" << deads 
61                               <<"\n";
62                     return c;
63                 }
64             };
65
66             // local protected databases
67             boost::mutex m_mutex;
68             std::map<std::string, TargetStat> m_target_stat;
69             std::map<unsigned long, std::string> m_session_target;
70         };
71     }
72 }
73
74 // define Pimpl wrapper forwarding to Impl
75  
76 yf::LoadBalance::LoadBalance() : m_p(new Impl)
77 {
78 }
79
80 yf::LoadBalance::~LoadBalance()
81 {  // must have a destructor because of boost::scoped_ptr
82 }
83
84 void yf::LoadBalance::configure(const xmlNode *xmlnode)
85 {
86     m_p->configure(xmlnode);
87 }
88
89 void yf::LoadBalance::process(mp::Package &package) const
90 {
91     m_p->process(package);
92 }
93
94
95 // define Implementation stuff
96
97
98
99 yf::LoadBalance::Impl::Impl()
100 {
101 }
102
103 yf::LoadBalance::Impl::~Impl()
104
105 }
106
107 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
108 {
109 }
110
111 void yf::LoadBalance::Impl::process(mp::Package &package)
112 {
113
114     bool is_closed_front = false;
115
116     // checking for closed front end packages
117     if (package.session().is_closed()){
118         is_closed_front = true;
119     }    
120
121     Z_GDU *gdu_req = package.request().get();
122
123     // passing anything but z3950 packages
124     if (gdu_req && gdu_req->which == Z_GDU_Z3950){
125
126         // target selecting only on Z39.50 init request
127         if (gdu_req->u.z3950->which == Z_APDU_initRequest){
128
129             mp::odr odr_en(ODR_ENCODE);
130             Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest;
131
132             // extracting virtual hosts
133             std::list<std::string> vhosts;
134             
135             mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
136             
137             // choosing one target according to load-balancing algorithm  
138             
139             if (vhosts.size()){
140                 std::string target;
141                 unsigned int cost = std::numeric_limits<unsigned int>::max();
142                 
143                 { //locking scope for local databases
144                     boost::mutex::scoped_lock scoped_lock(m_mutex);
145                     
146                     // load-balancing algorithm goes here
147                     //target = *vhosts.begin();
148                     for(std::list<std::string>::const_iterator ivh 
149                             = vhosts.begin(); 
150                         ivh != vhosts.end();
151                         ivh++){
152                         if ((*ivh).size() != 0){
153                             unsigned int vhcost 
154                                 = yf::LoadBalance::Impl::cost(*ivh);
155                             if (cost > vhcost){
156                                 cost = vhcost;
157                                 target = *ivh;
158                             }
159                         }
160                      } 
161
162                     // updating local database
163                     add_session(package.session().id(), target);
164                     yf::LoadBalance::Impl::cost(target);
165                     add_package(package.session().id());
166                 }
167                 
168                 // copying new target into init package
169                 mp::util::set_vhost_otherinfo(&(org_init->otherInfo), 
170                                               odr_en, target); 
171                 package.request() = gdu_req;
172         }
173             
174         }
175         // frontend Z39.50 close request is added to statistics and marked
176         else if (gdu_req->u.z3950->which == Z_APDU_close){
177             is_closed_front = true;
178             boost::mutex::scoped_lock scoped_lock(m_mutex);        
179             add_package(package.session().id());
180         }    
181         // any other Z39.50 package is added to statistics
182         else {
183             boost::mutex::scoped_lock scoped_lock(m_mutex);        
184             add_package(package.session().id());
185         }
186     }
187         
188     // moving all package types 
189     package.move();
190
191
192     // checking for closed back end packages
193     if (package.session().is_closed()) {
194         boost::mutex::scoped_lock scoped_lock(m_mutex);
195
196         // marking backend dead if backend closed without fronted close 
197         if (is_closed_front == false)
198             add_dead(package.session().id());
199
200         remove_session(package.session().id());
201     }
202
203     Z_GDU *gdu_res = package.response().get();
204
205     // passing anything but z3950 packages
206     if (gdu_res && gdu_res->which == Z_GDU_Z3950){
207
208         // session closing only on Z39.50 close response
209         if (gdu_res->u.z3950->which == Z_APDU_close){
210             boost::mutex::scoped_lock scoped_lock(m_mutex);
211             remove_package(package.session().id());
212             
213             // marking backend dead if backend closed without fronted close 
214             if (is_closed_front == false)
215                 add_dead(package.session().id());
216             
217             //remove_session(package.session().id());
218         } 
219         // any other Z39.50 package is removed from statistics
220         else {
221             boost::mutex::scoped_lock scoped_lock(m_mutex);
222             remove_package(package.session().id());
223         }
224     }
225 }
226             
227 // getting timestamp for receiving of package
228 //boost::posix_time::ptime receive_time
229 //    = boost::posix_time::microsec_clock::local_time();
230 // //<< receive_time << " "
231 // //<< to_iso_string(receive_time) << " "
232 //<< to_iso_extended_string(receive_time) << " "
233
234
235 // statistic manipulating functions, 
236 void yf::LoadBalance::Impl::add_dead(unsigned long session_id){
237
238     
239     std::string target = find_session_target(session_id);
240
241     if (target.size() != 0){
242         std::map<std::string, TargetStat>::iterator itarg;        
243         itarg = m_target_stat.find(target);
244         if (itarg != m_target_stat.end()){
245             itarg->second.deads += 1;
246             std::cout << "add_dead " << session_id << " " << target 
247                       << " d:" << itarg->second.deads << "\n";
248         }
249     }
250 };
251
252 //void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){
253 //    std::cout << "clear_dead " << session_id << "\n";
254 //};
255
256 void yf::LoadBalance::Impl::add_package(unsigned long session_id){
257
258     std::string target = find_session_target(session_id);
259
260     if (target.size() != 0){
261         std::map<std::string, TargetStat>::iterator itarg;        
262         itarg = m_target_stat.find(target);
263         if (itarg != m_target_stat.end()){
264             itarg->second.packages += 1;
265             std::cout << "add_package " << session_id << " " << target 
266                       << " p:" << itarg->second.packages << "\n";
267         }
268     }
269 };
270
271 void yf::LoadBalance::Impl::remove_package(unsigned long session_id){
272     std::string target = find_session_target(session_id);
273
274     if (target.size() != 0){
275         std::map<std::string, TargetStat>::iterator itarg;        
276         itarg = m_target_stat.find(target);
277         if (itarg != m_target_stat.end()
278             && itarg->second.packages > 0){
279             itarg->second.packages -= 1;
280             std::cout << "remove_package " << session_id << " " << target 
281                       << " p:" << itarg->second.packages << "\n";
282         }
283     }
284 };
285
286 void yf::LoadBalance::Impl::add_session(unsigned long session_id, 
287                                         std::string target){
288
289     // finding and adding session
290     std::map<unsigned long, std::string>::iterator isess;
291     isess = m_session_target.find(session_id);
292     if (isess == m_session_target.end()){
293         m_session_target.insert(std::make_pair(session_id, target));
294     }
295
296     // finding and adding target statistics
297     std::map<std::string, TargetStat>::iterator itarg;
298     itarg = m_target_stat.find(target);
299     if (itarg == m_target_stat.end()){
300         TargetStat stat;
301         stat.sessions = 1;
302         stat.packages = 0;  // no idea why the defaut constructor TargetStat()
303         stat.deads = 0;     // is not initializig this correctly to zero ??
304        m_target_stat.insert(std::make_pair(target, stat));
305         std::cout << "add_session " << session_id << " " << target 
306                   << " s:1\n";
307     } else {
308         itarg->second.sessions += 1;
309         std::cout << "add_session " << session_id << " " << target 
310                   << " s:" << itarg->second.sessions << "\n";
311     }
312     
313     
314 };
315
316 void yf::LoadBalance::Impl::remove_session(unsigned long session_id){
317
318     std::string target;
319
320     // finding session
321     std::map<unsigned long, std::string>::iterator isess;
322     isess = m_session_target.find(session_id);
323     if (isess == m_session_target.end())
324         return;
325     else
326         target = isess->second;
327
328     // finding target statistics
329     std::map<std::string, TargetStat>::iterator itarg;
330     itarg = m_target_stat.find(target);
331     if (itarg == m_target_stat.end()){
332         m_session_target.erase(isess);
333         return;
334     }
335     
336     // counting session down
337     if (itarg->second.sessions > 0)
338         itarg->second.sessions -= 1;
339
340     std::cout << "remove_session " << session_id << " " << target 
341               << " s:" << itarg->second.sessions << "\n";
342     
343     // clearing empty sessions and targets
344     if (itarg->second.sessions == 0 && itarg->second.deads == 0 ){
345         m_target_stat.erase(itarg);
346         m_session_target.erase(isess);
347     }
348 };
349
350 std::string 
351 yf::LoadBalance::Impl::find_session_target(unsigned long session_id){
352
353     std::string target;
354     std::map<unsigned long, std::string>::iterator isess;
355     isess = m_session_target.find(session_id);
356     if (isess != m_session_target.end())
357         target = isess->second;
358     return target;
359 }
360
361
362 // cost functions
363 unsigned int yf::LoadBalance::Impl::cost(std::string target){
364
365     unsigned int cost;
366
367     if (target.size() != 0){
368         std::map<std::string, TargetStat>::iterator itarg;        
369         itarg = m_target_stat.find(target);
370         if (itarg != m_target_stat.end()){
371             cost = itarg->second.cost();
372         }
373     }
374     
375     std::cout << "cost " << target << " c:" << cost << "\n";
376     return cost;
377 };
378
379 unsigned int yf::LoadBalance::Impl::dead(std::string target){
380     std::cout << "dead " << target << "\n";
381     return 0;
382 };
383
384
385
386
387 static mp::filter::Base* filter_creator()
388 {
389     return new mp::filter::LoadBalance;
390 }
391
392 extern "C" {
393     struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
394         0,
395         "load_balance",
396         filter_creator
397     };
398 }
399
400
401 /*
402  * Local variables:
403  * c-basic-offset: 4
404  * indent-tabs-mode: nil
405  * c-file-style: "stroustrup"
406  * End:
407  * vim: shiftwidth=4 tabstop=8 expandtab
408  */