Removed unused mutex
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.16 2006-05-15 11:40:26 adam Exp $
2    Copyright (c) 2005-2006, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <vector>
25 #include <algorithm>
26 #include <map>
27 #include <iostream>
28
29 namespace mp = metaproxy_1;
30 namespace yf = mp::filter;
31
32 namespace metaproxy_1 {
33     namespace filter {
34
35         struct Multi::BackendSet {
36             BackendPtr m_backend;
37             int m_count;
38             bool operator < (const BackendSet &k) const;
39             bool operator == (const BackendSet &k) const;
40         };
41         struct Multi::ScanTermInfo {
42             std::string m_norm_term;
43             std::string m_display_term;
44             int m_count;
45             bool operator < (const ScanTermInfo &) const;
46             bool operator == (const ScanTermInfo &) const;
47             Z_Entry *get_entry(ODR odr);
48         };
49         struct Multi::FrontendSet {
50             struct PresentJob {
51                 BackendPtr m_backend;
52                 int m_pos;
53                 int m_inside_pos;
54             };
55             FrontendSet(std::string setname);
56             FrontendSet();
57             ~FrontendSet();
58
59             void round_robin(int pos, int number, std::list<PresentJob> &job);
60
61             std::list<BackendSet> m_backend_sets;
62             std::string m_setname;
63         };
64         struct Multi::Backend {
65             PackagePtr m_package;
66             std::string m_backend_database;
67             std::string m_vhost;
68             std::string m_route;
69             void operator() (void);  // thread operation
70         };
71         struct Multi::Frontend {
72             Frontend(Rep *rep);
73             ~Frontend();
74             bool m_is_multi;
75             bool m_in_use;
76             std::list<BackendPtr> m_backend_list;
77             std::map<std::string,Multi::FrontendSet> m_sets;
78
79             void multi_move(std::list<BackendPtr> &blist);
80             void init(Package &package, Z_GDU *gdu);
81             void close(Package &package);
82             void search(Package &package, Z_APDU *apdu);
83             void present(Package &package, Z_APDU *apdu);
84             void scan1(Package &package, Z_APDU *apdu);
85             void scan2(Package &package, Z_APDU *apdu);
86             Rep *m_p;
87         };            
88         struct Multi::Map {
89             Map(std::list<std::string> hosts, std::string route);
90             Map();
91             std::list<std::string> m_hosts;
92             std::string m_route;
93         };
94         class Multi::Rep {
95             friend class Multi;
96             friend struct Frontend;
97             
98             FrontendPtr get_frontend(Package &package);
99             void release_frontend(Package &package);
100         private:
101             std::map<std::string, Multi::Map>m_maps;
102             std::map<std::string,std::string> m_target_route;
103             boost::mutex m_mutex;
104             boost::condition m_cond_session_ready;
105             std::map<mp::Session, FrontendPtr> m_clients;
106         };
107     }
108 }
109
110 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
111 {
112     return m_count < k.m_count;
113 }
114
115 yf::Multi::Frontend::Frontend(Rep *rep)
116 {
117     m_p = rep;
118     m_is_multi = false;
119 }
120
121 yf::Multi::Frontend::~Frontend()
122 {
123 }
124
125 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package)
126 {
127     boost::mutex::scoped_lock lock(m_mutex);
128
129     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
130     
131     while(true)
132     {
133         it = m_clients.find(package.session());
134         if (it == m_clients.end())
135             break;
136         
137         if (!it->second->m_in_use)
138         {
139             it->second->m_in_use = true;
140             return it->second;
141         }
142         m_cond_session_ready.wait(lock);
143     }
144     FrontendPtr f(new Frontend(this));
145     m_clients[package.session()] = f;
146     f->m_in_use = true;
147     return f;
148 }
149
150 void yf::Multi::Rep::release_frontend(Package &package)
151 {
152     boost::mutex::scoped_lock lock(m_mutex);
153     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
154     
155     it = m_clients.find(package.session());
156     if (it != m_clients.end())
157     {
158         if (package.session().is_closed())
159         {
160             it->second->close(package);
161             m_clients.erase(it);
162         }
163         else
164         {
165             it->second->m_in_use = false;
166         }
167         m_cond_session_ready.notify_all();
168     }
169 }
170
171 yf::Multi::FrontendSet::FrontendSet(std::string setname)
172     :  m_setname(setname)
173 {
174 }
175
176
177 yf::Multi::FrontendSet::FrontendSet()
178 {
179 }
180
181
182 yf::Multi::FrontendSet::~FrontendSet()
183 {
184 }
185
186 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
187     : m_hosts(hosts), m_route(route) 
188 {
189 }
190
191 yf::Multi::Map::Map()
192 {
193 }
194
195 yf::Multi::Multi() : m_p(new Multi::Rep)
196 {
197 }
198
199 yf::Multi::~Multi() {
200 }
201
202
203 void yf::Multi::add_map_host2hosts(std::string host,
204                                    std::list<std::string> hosts,
205                                    std::string route)
206 {
207     m_p->m_maps[host] = Multi::Map(hosts, route);
208 }
209
210 void yf::Multi::Backend::operator() (void) 
211 {
212     m_package->move(m_route);
213 }
214
215
216 void yf::Multi::Frontend::close(Package &package)
217 {
218     std::list<BackendPtr>::const_iterator bit;
219     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
220     {
221         BackendPtr b = *bit;
222
223         b->m_package->copy_filter(package);
224         b->m_package->request() = (Z_GDU *) 0;
225         b->m_package->session().close();
226         b->m_package->move(b->m_route);
227     }
228 }
229
230 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
231 {
232     std::list<BackendPtr>::const_iterator bit;
233     boost::thread_group g;
234     for (bit = blist.begin(); bit != blist.end(); bit++)
235     {
236         g.add_thread(new boost::thread(**bit));
237     }
238     g.join_all();
239 }
240
241 void yf::Multi::FrontendSet::round_robin(int start, int number,
242                                          std::list<PresentJob> &jobs)
243 {
244     std::list<int> pos;
245     std::list<int> inside_pos;
246     std::list<BackendSet>::const_iterator bsit;
247     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
248     {
249         pos.push_back(1);
250         inside_pos.push_back(0);
251     }
252
253     int p = 1;
254 #if 1
255     // optimization step!
256     int omin = 0;
257     while(true)
258     {
259         int min = 0;
260         int no_left = 0;
261         // find min count for each set which is > omin
262         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
263         {
264             if (bsit->m_count > omin)
265             {
266                 if (no_left == 0 || bsit->m_count < min)
267                     min = bsit->m_count;
268                 no_left++;
269             }
270         }
271         if (no_left == 0) // if nothing greater than omin, bail out.
272             break;
273         int skip = no_left * min;
274         if (p + skip > start)  // step gets us "into" present range?
275         {
276             // Yes. skip until start.. Rounding off is deliberate!
277             min = (start-p) / no_left;
278             p += no_left * min;
279             
280             // update positions in each set..
281             std::list<int>::iterator psit = pos.begin();
282             for (psit = pos.begin(); psit != pos.end(); psit++)
283                 *psit += min;
284             break;
285         }
286         // skip on each set.. before "present range"..
287         p = p + skip;
288         
289         std::cout << "\nSKIP min=" << min << " no_left=" << no_left << "\n\n";
290         
291         std::list<int>::iterator psit = pos.begin();
292         for (psit = pos.begin(); psit != pos.end(); psit++)
293             *psit += min;
294         
295         omin = min; // update so we consider next class (with higher count)
296     }
297 #endif
298     int fetched = 0;
299     bool more = true;
300     while (more)
301     {
302         more = false;
303         std::list<int>::iterator psit = pos.begin();
304         std::list<int>::iterator esit = inside_pos.begin();
305         bsit = m_backend_sets.begin();
306
307         for (; bsit != m_backend_sets.end(); psit++,esit++,bsit++)
308         {
309             if (fetched >= number)
310             {
311                 more = false;
312                 break;
313             }
314             if (*psit <= bsit->m_count)
315             {
316                 if (p >= start)
317                 {
318                     PresentJob job;
319                     job.m_backend = bsit->m_backend;
320                     job.m_pos = *psit;
321                     job.m_inside_pos = *esit;
322                     jobs.push_back(job);
323                     (*esit)++;
324                     fetched++;
325                 }
326                 (*psit)++;
327                 p++;
328                 more = true;
329             }
330         }
331     }
332 }
333
334 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
335 {
336     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
337
338     std::list<std::string> targets;
339
340     mp::util::get_vhost_otherinfo(&req->otherInfo, false, targets);
341
342     if (targets.size() < 1)
343     {
344         package.move();
345         return;
346     }
347
348     std::list<std::string>::const_iterator t_it = targets.begin();
349     for (; t_it != targets.end(); t_it++)
350     {
351         Session s;
352         Backend *b = new Backend;
353         b->m_vhost = *t_it;
354
355         b->m_route = m_p->m_target_route[*t_it];
356         // b->m_route unset
357         b->m_package = PackagePtr(new Package(s, package.origin()));
358
359         m_backend_list.push_back(BackendPtr(b));
360     }
361     m_is_multi = true;
362
363     // create init request 
364     std::list<BackendPtr>::const_iterator bit;
365     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
366     {
367         mp::odr odr;
368         BackendPtr b = *bit;
369         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
370         
371         std::list<std::string>vhost_one;
372         vhost_one.push_back(b->m_vhost);
373         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
374                                        odr, vhost_one);
375
376         Z_InitRequest *req = init_apdu->u.initRequest;
377         
378         ODR_MASK_SET(req->options, Z_Options_search);
379         ODR_MASK_SET(req->options, Z_Options_present);
380         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
381         ODR_MASK_SET(req->options, Z_Options_scan);
382         
383         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
384         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
385         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
386         
387         b->m_package->request() = init_apdu;
388
389         b->m_package->copy_filter(package);
390     }
391     multi_move(m_backend_list);
392
393     // create the frontend init response based on each backend init response
394     mp::odr odr;
395
396     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
397     Z_InitResponse *f_resp = f_apdu->u.initResponse;
398
399     ODR_MASK_SET(f_resp->options, Z_Options_search);
400     ODR_MASK_SET(f_resp->options, Z_Options_present);
401     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
402     
403     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
404     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
405     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
406
407     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
408     {
409         PackagePtr p = (*bit)->m_package;
410         
411         if (p->session().is_closed()) // if any backend closes, close frontend
412             package.session().close();
413         Z_GDU *gdu = p->response().get();
414         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
415             Z_APDU_initResponse)
416         {
417             int i;
418             Z_APDU *b_apdu = gdu->u.z3950;
419             Z_InitResponse *b_resp = b_apdu->u.initResponse;
420
421             // common options for all backends
422             for (i = 0; i <= Z_Options_stringSchema; i++)
423             {
424                 if (!ODR_MASK_GET(b_resp->options, i))
425                     ODR_MASK_CLEAR(f_resp->options, i);
426             }
427             // common protocol version
428             for (i = 0; i <= Z_ProtocolVersion_3; i++)
429                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
430                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
431             // reject if any of the backends reject
432             if (!*b_resp->result)
433                 *f_resp->result = 0;
434         }
435         else
436         {
437             // if any target does not return init return that (close or
438             // similar )
439             package.response() = p->response();
440             return;
441         }
442     }
443     package.response() = f_apdu;
444 }
445
446 void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
447 {
448     // create search request 
449     Z_SearchRequest *req = apdu_req->u.searchRequest;
450
451     // save these for later
452     int smallSetUpperBound = *req->smallSetUpperBound;
453     int largeSetLowerBound = *req->largeSetLowerBound;
454     int mediumSetPresentNumber = *req->mediumSetPresentNumber;
455     
456     // they are altered now - to disable piggyback
457     *req->smallSetUpperBound = 0;
458     *req->largeSetLowerBound = 1;
459     *req->mediumSetPresentNumber = 1;
460
461     int default_num_db = req->num_databaseNames;
462     char **default_db = req->databaseNames;
463
464     std::list<BackendPtr>::const_iterator bit;
465     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
466     {
467         PackagePtr p = (*bit)->m_package;
468         mp::odr odr;
469     
470         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
471                                                 &req->num_databaseNames,
472                                                 &req->databaseNames))
473         {
474             req->num_databaseNames = default_num_db;
475             req->databaseNames = default_db;
476         }
477         p->request() = apdu_req;
478         p->copy_filter(package);
479     }
480     multi_move(m_backend_list);
481
482     // look at each response
483     FrontendSet resultSet(std::string(req->resultSetName));
484
485     int result_set_size = 0;
486     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
487     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
488     {
489         PackagePtr p = (*bit)->m_package;
490         
491         if (p->session().is_closed()) // if any backend closes, close frontend
492             package.session().close();
493         
494         Z_GDU *gdu = p->response().get();
495         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
496             Z_APDU_searchResponse)
497         {
498             Z_APDU *b_apdu = gdu->u.z3950;
499             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
500          
501             // see we get any errors (AKA diagnstics)
502             if (b_resp->records)
503             {
504                 if (b_resp->records->which == Z_Records_NSD
505                     || b_resp->records->which == Z_Records_multipleNSD)
506                     z_records_diag = b_resp->records;
507                 // we may set this multiple times (TOO BAD!)
508             }
509             BackendSet backendSet;
510             backendSet.m_backend = *bit;
511             backendSet.m_count = *b_resp->resultCount;
512             result_set_size += *b_resp->resultCount;
513             resultSet.m_backend_sets.push_back(backendSet);
514         }
515         else
516         {
517             // if any target does not return search response - return that 
518             package.response() = p->response();
519             return;
520         }
521     }
522
523     mp::odr odr;
524     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
525     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
526
527     *f_resp->resultCount = result_set_size;
528     if (z_records_diag)
529     {
530         // search error
531         f_resp->records = z_records_diag;
532         package.response() = f_apdu;
533         return;
534     }
535     // assume OK
536     m_sets[resultSet.m_setname] = resultSet;
537
538     int number;
539     mp::util::piggyback(smallSetUpperBound,
540                          largeSetLowerBound,
541                          mediumSetPresentNumber,
542                          result_set_size,
543                          number);
544     Package pp(package.session(), package.origin());
545     if (number > 0)
546     {
547         pp.copy_filter(package);
548         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
549         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
550         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
551         p_req->resultSetId = req->resultSetName;
552         *p_req->resultSetStartPoint = 1;
553         *p_req->numberOfRecordsRequested = number;
554         pp.request() = p_apdu;
555         present(pp, p_apdu);
556         
557         if (pp.session().is_closed())
558             package.session().close();
559         
560         Z_GDU *gdu = pp.response().get();
561         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
562             Z_APDU_presentResponse)
563         {
564             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
565             f_resp->records = p_res->records;
566             *f_resp->numberOfRecordsReturned = 
567                 *p_res->numberOfRecordsReturned;
568             *f_resp->nextResultSetPosition = 
569                 *p_res->nextResultSetPosition;
570         }
571         else 
572         {
573             package.response() = pp.response(); 
574             return;
575         }
576     }
577     package.response() = f_apdu; // in this scope because of p
578 }
579
580 void yf::Multi::Frontend::present(Package &package, Z_APDU *apdu_req)
581 {
582     // create present request 
583     Z_PresentRequest *req = apdu_req->u.presentRequest;
584
585     Sets_it it;
586     it = m_sets.find(std::string(req->resultSetId));
587     if (it == m_sets.end())
588     {
589         mp::odr odr;
590         Z_APDU *apdu = 
591             odr.create_presentResponse(
592                 apdu_req,
593                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
594                 req->resultSetId);
595         package.response() = apdu;
596         return;
597     }
598     std::list<Multi::FrontendSet::PresentJob> jobs;
599     int start = *req->resultSetStartPoint;
600     int number = *req->numberOfRecordsRequested;
601     it->second.round_robin(start, number, jobs);
602
603     std::list<BackendPtr> present_backend_list;
604
605     std::list<BackendSet>::const_iterator bsit;
606     bsit = it->second.m_backend_sets.begin();
607     for (; bsit != it->second.m_backend_sets.end(); bsit++)
608     {
609         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
610         int start = -1;
611         int end = -1;
612         
613         for (jit = jobs.begin(); jit != jobs.end(); jit++)
614         {
615             if (jit->m_backend == bsit->m_backend)
616             {
617                 if (start == -1 || jit->m_pos < start)
618                     start = jit->m_pos;
619                 if (end == -1 || jit->m_pos > end)
620                     end = jit->m_pos;
621             }
622         }
623         if (start != -1)
624         {
625             PackagePtr p = bsit->m_backend->m_package;
626
627             *req->resultSetStartPoint = start;
628             *req->numberOfRecordsRequested = end - start + 1;
629             
630             p->request() = apdu_req;
631             p->copy_filter(package);
632
633             present_backend_list.push_back(bsit->m_backend);
634         }
635     }
636     multi_move(present_backend_list);
637
638     // look at each response
639     Z_Records *z_records_diag = 0;
640
641     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
642     for (; pbit != present_backend_list.end(); pbit++)
643     {
644         PackagePtr p = (*pbit)->m_package;
645         
646         if (p->session().is_closed()) // if any backend closes, close frontend
647             package.session().close();
648         
649         Z_GDU *gdu = p->response().get();
650         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
651             Z_APDU_presentResponse)
652         {
653             Z_APDU *b_apdu = gdu->u.z3950;
654             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
655          
656             // see we get any errors (AKA diagnstics)
657             if (b_resp->records)
658             {
659                 if (b_resp->records->which != Z_Records_DBOSD)
660                     z_records_diag = b_resp->records;
661                 // we may set this multiple times (TOO BAD!)
662             }
663         }
664         else
665         {
666             // if any target does not return present response - return that 
667             package.response() = p->response();
668             return;
669         }
670     }
671
672     mp::odr odr;
673     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
674     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
675
676     if (z_records_diag)
677     {
678         f_resp->records = z_records_diag;
679         *f_resp->presentStatus = Z_PresentStatus_failure;
680     }
681     else
682     {
683         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
684         Z_Records * records = f_resp->records;
685         records->which = Z_Records_DBOSD;
686         records->u.databaseOrSurDiagnostics =
687             (Z_NamePlusRecordList *)
688             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
689         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
690         nprl->num_records = jobs.size();
691         nprl->records = (Z_NamePlusRecord**)
692             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
693         int i = 0;
694         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
695         for (jit = jobs.begin(); jit != jobs.end(); jit++)
696         {
697             PackagePtr p = jit->m_backend->m_package;
698             
699             Z_GDU *gdu = p->response().get();
700             Z_APDU *b_apdu = gdu->u.z3950;
701             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
702
703             nprl->records[i++] =
704                 b_resp->records->u.databaseOrSurDiagnostics->
705                 records[jit->m_inside_pos];
706         }
707         *f_resp->nextResultSetPosition = start + i;
708         *f_resp->numberOfRecordsReturned = i;
709     }
710     package.response() = f_apdu;
711 }
712
713 void yf::Multi::Frontend::scan1(Package &package, Z_APDU *apdu_req)
714 {
715     if (m_backend_list.size() > 1)
716     {
717         mp::odr odr;
718         Z_APDU *f_apdu = 
719             odr.create_scanResponse(
720                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
721         package.response() = f_apdu;
722         return;
723     }
724     Z_ScanRequest *req = apdu_req->u.scanRequest;
725
726     int default_num_db = req->num_databaseNames;
727     char **default_db = req->databaseNames;
728
729     std::list<BackendPtr>::const_iterator bit;
730     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
731     {
732         PackagePtr p = (*bit)->m_package;
733         mp::odr odr;
734     
735         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
736                                                 &req->num_databaseNames,
737                                                 &req->databaseNames))
738         {
739             req->num_databaseNames = default_num_db;
740             req->databaseNames = default_db;
741         }
742         p->request() = apdu_req;
743         p->copy_filter(package);
744     }
745     multi_move(m_backend_list);
746
747     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
748     {
749         PackagePtr p = (*bit)->m_package;
750         
751         if (p->session().is_closed()) // if any backend closes, close frontend
752             package.session().close();
753         
754         Z_GDU *gdu = p->response().get();
755         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
756             Z_APDU_scanResponse)
757         {
758             package.response() = p->response();
759             break;
760         }
761         else
762         {
763             // if any target does not return scan response - return that 
764             package.response() = p->response();
765             return;
766         }
767     }
768 }
769
770 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
771 {
772     return m_norm_term < k.m_norm_term;
773 }
774
775 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
776 {
777     return m_norm_term == k.m_norm_term;
778 }
779
780 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
781 {
782     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
783     e->which = Z_Entry_termInfo;
784     Z_TermInfo *t;
785     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
786     t->suggestedAttributes = 0;
787     t->displayTerm = 0;
788     t->alternativeTerm = 0;
789     t->byAttributes = 0;
790     t->otherTermInfo = 0;
791     t->globalOccurrences = odr_intdup(odr, m_count);
792     t->term = (Z_Term *)
793         odr_malloc(odr, sizeof(*t->term));
794     t->term->which = Z_Term_general;
795     Odr_oct *o;
796     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
797
798     o->len = o->size = m_norm_term.size();
799     o->buf = (unsigned char *) odr_malloc(odr, o->len);
800     memcpy(o->buf, m_norm_term.c_str(), o->len);
801     return e;
802 }
803
804 void yf::Multi::Frontend::scan2(Package &package, Z_APDU *apdu_req)
805 {
806     Z_ScanRequest *req = apdu_req->u.scanRequest;
807
808     int default_num_db = req->num_databaseNames;
809     char **default_db = req->databaseNames;
810
811     std::list<BackendPtr>::const_iterator bit;
812     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
813     {
814         PackagePtr p = (*bit)->m_package;
815         mp::odr odr;
816     
817         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
818                                                 &req->num_databaseNames,
819                                                 &req->databaseNames))
820         {
821             req->num_databaseNames = default_num_db;
822             req->databaseNames = default_db;
823         }
824         p->request() = apdu_req;
825         p->copy_filter(package);
826     }
827     multi_move(m_backend_list);
828
829     ScanTermInfoList entries_before;
830     ScanTermInfoList entries_after;
831     int no_before = 0;
832     int no_after = 0;
833
834     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
835     {
836         PackagePtr p = (*bit)->m_package;
837         
838         if (p->session().is_closed()) // if any backend closes, close frontend
839             package.session().close();
840         
841         Z_GDU *gdu = p->response().get();
842         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
843             Z_APDU_scanResponse)
844         {
845             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
846
847             if (res->entries && res->entries->nonsurrogateDiagnostics)
848             {
849                 // failure
850                 mp::odr odr;
851                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
852                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
853
854                 f_res->entries->nonsurrogateDiagnostics = 
855                     res->entries->nonsurrogateDiagnostics;
856                 f_res->entries->num_nonsurrogateDiagnostics = 
857                     res->entries->num_nonsurrogateDiagnostics;
858
859                 package.response() = f_apdu;
860                 return;
861             }
862
863             if (res->entries && res->entries->entries)
864             {
865                 Z_Entry **entries = res->entries->entries;
866                 int num_entries = res->entries->num_entries;
867                 int position = 1;
868                 if (req->preferredPositionInResponse)
869                     position = *req->preferredPositionInResponse;
870                 if (res->positionOfTerm)
871                     position = *res->positionOfTerm;
872
873                 // before
874                 int i;
875                 for (i = 0; i<position-1 && i<num_entries; i++)
876                 {
877                     Z_Entry *ent = entries[i];
878
879                     if (ent->which == Z_Entry_termInfo)
880                     {
881                         ScanTermInfo my;
882
883                         int *occur = ent->u.termInfo->globalOccurrences;
884                         my.m_count = occur ? *occur : 0;
885
886                         if (ent->u.termInfo->term->which == Z_Term_general)
887                         {
888                             my.m_norm_term = std::string(
889                                 (const char *)
890                                 ent->u.termInfo->term->u.general->buf,
891                                 ent->u.termInfo->term->u.general->len);
892                         }
893                         if (my.m_norm_term.length())
894                         {
895                             ScanTermInfoList::iterator it = 
896                                 entries_before.begin();
897                             while (it != entries_before.end() && my <*it)
898                                 it++;
899                             if (my == *it)
900                             {
901                                 it->m_count += my.m_count;
902                             }
903                             else
904                             {
905                                 entries_before.insert(it, my);
906                                 no_before++;
907                             }
908                         }
909                     }
910                 }
911                 // after
912                 if (position <= 0)
913                     i = 0;
914                 else
915                     i = position-1;
916                 for ( ; i<num_entries; i++)
917                 {
918                     Z_Entry *ent = entries[i];
919
920                     if (ent->which == Z_Entry_termInfo)
921                     {
922                         ScanTermInfo my;
923
924                         int *occur = ent->u.termInfo->globalOccurrences;
925                         my.m_count = occur ? *occur : 0;
926
927                         if (ent->u.termInfo->term->which == Z_Term_general)
928                         {
929                             my.m_norm_term = std::string(
930                                 (const char *)
931                                 ent->u.termInfo->term->u.general->buf,
932                                 ent->u.termInfo->term->u.general->len);
933                         }
934                         if (my.m_norm_term.length())
935                         {
936                             ScanTermInfoList::iterator it = 
937                                 entries_after.begin();
938                             while (it != entries_after.end() && *it < my)
939                                 it++;
940                             if (my == *it)
941                             {
942                                 it->m_count += my.m_count;
943                             }
944                             else
945                             {
946                                 entries_after.insert(it, my);
947                                 no_after++;
948                             }
949                         }
950                     }
951                 }
952
953             }                
954         }
955         else
956         {
957             // if any target does not return scan response - return that 
958             package.response() = p->response();
959             return;
960         }
961     }
962
963     if (true)
964     {
965         std::cout << "BEFORE\n";
966         ScanTermInfoList::iterator it = entries_before.begin();
967         for(; it != entries_before.end(); it++)
968         {
969             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
970         }
971         
972         std::cout << "AFTER\n";
973         it = entries_after.begin();
974         for(; it != entries_after.end(); it++)
975         {
976             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
977         }
978     }
979
980     if (false)
981     {
982         mp::odr odr;
983         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
984         package.response() = f_apdu;
985     }
986     else
987     {
988         mp::odr odr;
989         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
990         Z_ScanResponse *resp = f_apdu->u.scanResponse;
991         
992         int number_returned = *req->numberOfTermsRequested;
993         int position_returned = *req->preferredPositionInResponse;
994         
995         resp->entries->num_entries = number_returned;
996         resp->entries->entries = (Z_Entry**)
997             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
998         int i;
999
1000         int lbefore = entries_before.size();
1001         if (lbefore < position_returned-1)
1002             position_returned = lbefore+1;
1003
1004         ScanTermInfoList::iterator it = entries_before.begin();
1005         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1006         {
1007             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1008         }
1009
1010         it = entries_after.begin();
1011
1012         if (position_returned <= 0)
1013             i = 0;
1014         else
1015             i = position_returned-1;
1016         for (; i<number_returned && it != entries_after.end(); i++, it++)
1017         {
1018             resp->entries->entries[i] = it->get_entry(odr);
1019         }
1020
1021         number_returned = i;
1022
1023         resp->positionOfTerm = odr_intdup(odr, position_returned);
1024         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1025         resp->entries->num_entries = number_returned;
1026
1027         package.response() = f_apdu;
1028     }
1029 }
1030
1031
1032 void yf::Multi::process(Package &package) const
1033 {
1034     FrontendPtr f = m_p->get_frontend(package);
1035
1036     Z_GDU *gdu = package.request().get();
1037     
1038     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1039         Z_APDU_initRequest && !f->m_is_multi)
1040     {
1041         f->init(package, gdu);
1042     }
1043     else if (!f->m_is_multi)
1044         package.move();
1045     else if (gdu && gdu->which == Z_GDU_Z3950)
1046     {
1047         Z_APDU *apdu = gdu->u.z3950;
1048         if (apdu->which == Z_APDU_initRequest)
1049         {
1050             mp::odr odr;
1051             
1052             package.response() = odr.create_close(
1053                 apdu,
1054                 Z_Close_protocolError,
1055                 "double init");
1056             
1057             package.session().close();
1058         }
1059         else if (apdu->which == Z_APDU_searchRequest)
1060         {
1061             f->search(package, apdu);
1062         }
1063         else if (apdu->which == Z_APDU_presentRequest)
1064         {
1065             f->present(package, apdu);
1066         }
1067         else if (apdu->which == Z_APDU_scanRequest)
1068         {
1069             f->scan2(package, apdu);
1070         }
1071         else
1072         {
1073             mp::odr odr;
1074             
1075             package.response() = odr.create_close(
1076                 apdu, Z_Close_protocolError,
1077                 "unsupported APDU in filter multi");
1078             
1079             package.session().close();
1080         }
1081     }
1082     m_p->release_frontend(package);
1083 }
1084
1085 void mp::filter::Multi::configure(const xmlNode * ptr)
1086 {
1087     for (ptr = ptr->children; ptr; ptr = ptr->next)
1088     {
1089         if (ptr->type != XML_ELEMENT_NODE)
1090             continue;
1091         if (!strcmp((const char *) ptr->name, "target"))
1092         {
1093             std::string route = mp::xml::get_route(ptr);
1094             std::string target = mp::xml::get_text(ptr);
1095             std::cout << "route=" << route << " target=" << target << "\n";
1096             m_p->m_target_route[target] = route;
1097         }
1098         else if (!strcmp((const char *) ptr->name, "virtual"))
1099         {
1100             std::list<std::string> targets;
1101             std::string vhost;
1102             xmlNode *v_node = ptr->children;
1103             for (; v_node; v_node = v_node->next)
1104             {
1105                 if (v_node->type != XML_ELEMENT_NODE)
1106                     continue;
1107                 
1108                 if (mp::xml::is_element_yp2(v_node, "vhost"))
1109                     vhost = mp::xml::get_text(v_node);
1110                 else if (mp::xml::is_element_yp2(v_node, "target"))
1111                     targets.push_back(mp::xml::get_text(v_node));
1112                 else
1113                     throw mp::filter::FilterException
1114                         ("Bad element " 
1115                          + std::string((const char *) v_node->name)
1116                          + " in virtual section"
1117                             );
1118             }
1119             std::string route = mp::xml::get_route(ptr);
1120             add_map_host2hosts(vhost, targets, route);
1121             std::list<std::string>::const_iterator it;
1122             for (it = targets.begin(); it != targets.end(); it++)
1123             {
1124                 std::cout << "Add " << vhost << "->" << *it
1125                           << "," << route << "\n";
1126             }
1127         }
1128         else
1129         {
1130             throw mp::filter::FilterException
1131                 ("Bad element " 
1132                  + std::string((const char *) ptr->name)
1133                  + " in virt_db filter");
1134         }
1135     }
1136 }
1137
1138 static mp::filter::Base* filter_creator()
1139 {
1140     return new mp::filter::Multi;
1141 }
1142
1143 extern "C" {
1144     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1145         0,
1146         "multi",
1147         filter_creator
1148     };
1149 }
1150
1151
1152 /*
1153  * Local variables:
1154  * c-basic-offset: 4
1155  * indent-tabs-mode: nil
1156  * c-file-style: "stroustrup"
1157  * End:
1158  * vim: shiftwidth=4 tabstop=8 expandtab
1159  */