Custom time format for log filter using strftime.
[metaproxy-moved-to-github.git] / src / filter_load_balance.cpp
1 /* $Id: filter_load_balance.cpp,v 1.11 2008-02-27 11:08:49 adam Exp $
2    Copyright (c) 2005-2007, Index Data.
3
4 This file is part of Metaproxy.
5
6 Metaproxy is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Metaproxy; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #include "config.hpp"
23 #include "session.hpp"
24 #include "package.hpp"
25 #include "filter.hpp"
26 #include "filter_load_balance.hpp"
27 #include "util.hpp"
28
29
30 #include <boost/thread/mutex.hpp>
31
32 #include <yaz/zgdu.h>
33
34 // remove max macro if already defined (defined later in <limits>)
35 #ifdef max
36 #undef max
37 #endif
38
39 //#include <iostream>
40 #include <list>
41 #include <map>
42 #include <limits>
43
44 namespace mp = metaproxy_1;
45 namespace yf = mp::filter;
46
47 namespace metaproxy_1 {
48     namespace filter {
49         class LoadBalance::Impl {
50         public:
51             Impl();
52             ~Impl();
53             void process(metaproxy_1::Package & package);
54             void configure(const xmlNode * ptr);
55         private:
56             // statistic manipulating functions, 
57             void add_dead(unsigned long session_id);
58             //void clear_dead(unsigned long session_id);
59             void add_package(unsigned long session_id);
60             void remove_package(unsigned long session_id);
61             void add_session(unsigned long session_id, std::string target);
62             void remove_session(unsigned long session_id);
63             std::string find_session_target(unsigned long session_id);
64
65             // cost functions
66             unsigned int cost(std::string target);
67             unsigned int dead(std::string target);
68
69             // local classes
70             class TargetStat {
71             public:
72                 unsigned int sessions;
73                 unsigned int packages;
74                 unsigned int deads;
75                 unsigned int cost() {
76                     unsigned int c = sessions + packages + deads;
77                     //std::cout << "stats  c:" << c 
78                     //          << " s:" << sessions 
79                     //          << " p:" << packages 
80                     //          << " d:" << deads 
81                     //          <<"\n";
82                     return c;
83                 }
84             };
85
86             // local protected databases
87             boost::mutex m_mutex;
88             std::map<std::string, TargetStat> m_target_stat;
89             std::map<unsigned long, std::string> m_session_target;
90         };
91     }
92 }
93
94 // define Pimpl wrapper forwarding to Impl
95  
96 yf::LoadBalance::LoadBalance() : m_p(new Impl)
97 {
98 }
99
100 yf::LoadBalance::~LoadBalance()
101 {  // must have a destructor because of boost::scoped_ptr
102 }
103
104 void yf::LoadBalance::configure(const xmlNode *xmlnode, bool test_only)
105 {
106     m_p->configure(xmlnode);
107 }
108
109 void yf::LoadBalance::process(mp::Package &package) const
110 {
111     m_p->process(package);
112 }
113
114
115 // define Implementation stuff
116
117
118
119 yf::LoadBalance::Impl::Impl()
120 {
121 }
122
123 yf::LoadBalance::Impl::~Impl()
124
125 }
126
127 void yf::LoadBalance::Impl::configure(const xmlNode *xmlnode)
128 {
129 }
130
131 void yf::LoadBalance::Impl::process(mp::Package &package)
132 {
133
134     bool is_closed_front = false;
135     bool is_closed_back = false;
136
137     // checking for closed front end packages
138     if (package.session().is_closed()){
139         is_closed_front = true;
140     }    
141
142     Z_GDU *gdu_req = package.request().get();
143
144     // passing anything but z3950 packages
145     if (gdu_req && gdu_req->which == Z_GDU_Z3950){
146
147         // target selecting only on Z39.50 init request
148         if (gdu_req->u.z3950->which == Z_APDU_initRequest){
149
150             mp::odr odr_en(ODR_ENCODE);
151             Z_InitRequest *org_init = gdu_req->u.z3950->u.initRequest;
152
153             // extracting virtual hosts
154             std::list<std::string> vhosts;
155             
156             mp::util::remove_vhost_otherinfo(&(org_init->otherInfo), vhosts);
157             
158             // choosing one target according to load-balancing algorithm  
159             
160             if (vhosts.size()){
161                 std::string target;
162                 unsigned int cost = std::numeric_limits<unsigned int>::max();
163                 
164                 { //locking scope for local databases
165                     boost::mutex::scoped_lock scoped_lock(m_mutex);
166                     
167                     // load-balancing algorithm goes here
168                     //target = *vhosts.begin();
169                     for(std::list<std::string>::const_iterator ivh 
170                             = vhosts.begin(); 
171                         ivh != vhosts.end();
172                         ivh++){
173                         if ((*ivh).size() != 0){
174                             unsigned int vhcost 
175                                 = yf::LoadBalance::Impl::cost(*ivh);
176                             if (cost > vhcost){
177                                 cost = vhcost;
178                                 target = *ivh;
179                             }
180                         }
181                      } 
182
183                     // updating local database
184                     add_session(package.session().id(), target);
185                     yf::LoadBalance::Impl::cost(target);
186                     add_package(package.session().id());
187                 }
188                 
189                 // copying new target into init package
190                 mp::util::set_vhost_otherinfo(&(org_init->otherInfo), 
191                                               odr_en, target, 1);
192                 package.request() = gdu_req;
193         }
194             
195         }
196         // frontend Z39.50 close request is added to statistics and marked
197         else if (gdu_req->u.z3950->which == Z_APDU_close){
198             is_closed_front = true;
199             boost::mutex::scoped_lock scoped_lock(m_mutex);        
200             add_package(package.session().id());
201         }    
202         // any other Z39.50 package is added to statistics
203         else {
204             boost::mutex::scoped_lock scoped_lock(m_mutex);        
205             add_package(package.session().id());
206         }
207     }
208         
209     // moving all package types 
210     package.move();
211
212
213     // checking for closed back end packages
214     if (package.session().is_closed())
215         is_closed_back = true;
216
217     Z_GDU *gdu_res = package.response().get();
218
219     // passing anything but z3950 packages
220     if (gdu_res && gdu_res->which == Z_GDU_Z3950){
221
222         // session closing only on Z39.50 close response
223         if (gdu_res->u.z3950->which == Z_APDU_close){
224             is_closed_back = true;
225             boost::mutex::scoped_lock scoped_lock(m_mutex);
226             remove_package(package.session().id());
227         } 
228         // any other Z39.50 package is removed from statistics
229         else {
230             boost::mutex::scoped_lock scoped_lock(m_mutex);
231             remove_package(package.session().id());
232         }
233     }
234
235     // finally removing sessions and marking deads
236     if (is_closed_back || is_closed_front){        
237         boost::mutex::scoped_lock scoped_lock(m_mutex);
238
239         // marking backend dead if backend closed without fronted close 
240         if (is_closed_front == false)
241             add_dead(package.session().id());
242
243         remove_session(package.session().id());
244
245         // making sure that package is closed
246         package.session().close();
247     }
248 }
249             
250 // statistic manipulating functions, 
251 void yf::LoadBalance::Impl::add_dead(unsigned long session_id){
252
253     
254     std::string target = find_session_target(session_id);
255
256     if (target.size() != 0){
257         std::map<std::string, TargetStat>::iterator itarg;        
258         itarg = m_target_stat.find(target);
259         if (itarg != m_target_stat.end()
260             && itarg->second.deads < std::numeric_limits<unsigned int>::max()){
261             itarg->second.deads += 1;
262             // std:.cout << "add_dead " << session_id << " " << target 
263             //          << " d:" << itarg->second.deads << "\n";
264         }
265     }
266 };
267
268 //void yf::LoadBalance::Impl::clear_dead(unsigned long session_id){
269 //    std::cout << "clear_dead " << session_id << "\n";
270 //};
271
272 void yf::LoadBalance::Impl::add_package(unsigned long session_id){
273
274     std::string target = find_session_target(session_id);
275
276     if (target.size() != 0){
277         std::map<std::string, TargetStat>::iterator itarg;        
278         itarg = m_target_stat.find(target);
279         if (itarg != m_target_stat.end()
280             && itarg->second.packages 
281                < std::numeric_limits<unsigned int>::max()){
282             itarg->second.packages += 1;
283             // std:.cout << "add_package " << session_id << " " << target 
284             //          << " p:" << itarg->second.packages << "\n";
285         }
286     }
287 };
288
289 void yf::LoadBalance::Impl::remove_package(unsigned long session_id){
290     std::string target = find_session_target(session_id);
291
292     if (target.size() != 0){
293         std::map<std::string, TargetStat>::iterator itarg;        
294         itarg = m_target_stat.find(target);
295         if (itarg != m_target_stat.end()
296             && itarg->second.packages > 0){
297             itarg->second.packages -= 1;
298             // std:.cout << "remove_package " << session_id << " " << target 
299             //          << " p:" << itarg->second.packages << "\n";
300         }
301     }
302 };
303
304 void yf::LoadBalance::Impl::add_session(unsigned long session_id, 
305                                         std::string target){
306
307     // finding and adding session
308     std::map<unsigned long, std::string>::iterator isess;
309     isess = m_session_target.find(session_id);
310     if (isess == m_session_target.end()){
311         m_session_target.insert(std::make_pair(session_id, target));
312     }
313
314     // finding and adding target statistics
315     std::map<std::string, TargetStat>::iterator itarg;
316     itarg = m_target_stat.find(target);
317     if (itarg == m_target_stat.end()){
318         TargetStat stat;
319         stat.sessions = 1;
320         stat.packages = 0;  // no idea why the defaut constructor TargetStat()
321         stat.deads = 0;     // is not initializig this correctly to zero ??
322        m_target_stat.insert(std::make_pair(target, stat));
323         // std:.cout << "add_session " << session_id << " " << target 
324         //          << " s:1\n";
325     } 
326     else if (itarg->second.sessions < std::numeric_limits<unsigned int>::max())
327     {
328         itarg->second.sessions += 1;
329         // std:.cout << "add_session " << session_id << " " << target 
330         //          << " s:" << itarg->second.sessions << "\n";
331     }
332 };
333
334 void yf::LoadBalance::Impl::remove_session(unsigned long session_id){
335
336     std::string target;
337
338     // finding session
339     std::map<unsigned long, std::string>::iterator isess;
340     isess = m_session_target.find(session_id);
341     if (isess == m_session_target.end())
342         return;
343     else
344         target = isess->second;
345
346     // finding target statistics
347     std::map<std::string, TargetStat>::iterator itarg;
348     itarg = m_target_stat.find(target);
349     if (itarg == m_target_stat.end()){
350         m_session_target.erase(isess);
351         return;
352     }
353     
354     // counting session down
355     if (itarg->second.sessions > 0)
356         itarg->second.sessions -= 1;
357
358     // std:.cout << "remove_session " << session_id << " " << target 
359     //          << " s:" << itarg->second.sessions << "\n";
360     
361     // clearing empty sessions and targets
362     if (itarg->second.sessions == 0 && itarg->second.deads == 0 ){
363         m_target_stat.erase(itarg);
364         m_session_target.erase(isess);
365     }
366 };
367
368 std::string 
369 yf::LoadBalance::Impl::find_session_target(unsigned long session_id){
370
371     std::string target;
372     std::map<unsigned long, std::string>::iterator isess;
373     isess = m_session_target.find(session_id);
374     if (isess != m_session_target.end())
375         target = isess->second;
376     return target;
377 }
378
379
380 // cost functions
381 unsigned int yf::LoadBalance::Impl::cost(std::string target){
382
383     unsigned int cost;
384
385     if (target.size() != 0){
386         std::map<std::string, TargetStat>::iterator itarg;        
387         itarg = m_target_stat.find(target);
388         if (itarg != m_target_stat.end()){
389             cost = itarg->second.cost();
390         }
391     }
392     
393     //std::cout << "cost " << target << " c:" << cost << "\n";
394     return cost;
395 };
396
397 unsigned int yf::LoadBalance::Impl::dead(std::string target){
398
399     unsigned int dead;
400
401     if (target.size() != 0){
402         std::map<std::string, TargetStat>::iterator itarg;        
403         itarg = m_target_stat.find(target);
404         if (itarg != m_target_stat.end()){
405             dead = itarg->second.deads;
406         }
407     }
408     
409     //std::cout << "dead " << target << " d:" << dead << "\n";
410     return dead;
411 };
412
413
414
415
416 static mp::filter::Base* filter_creator()
417 {
418     return new mp::filter::LoadBalance;
419 }
420
421 extern "C" {
422     struct metaproxy_1_filter_struct metaproxy_1_filter_load_balance = {
423         0,
424         "load_balance",
425         filter_creator
426     };
427 }
428
429
430 /*
431  * Local variables:
432  * c-basic-offset: 4
433  * indent-tabs-mode: nil
434  * c-file-style: "stroustrup"
435  * End:
436  * vim: shiftwidth=4 tabstop=8 expandtab
437  */