Rewrite the round_robin algorithm for multi target retrieval. The
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.12 2006-01-19 09:35:43 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <vector>
25 #include <algorithm>
26 #include <map>
27 #include <iostream>
28
29 namespace yf = yp2::filter;
30
31 namespace yp2 {
32     namespace filter {
33
34         struct Multi::BackendSet {
35             BackendPtr m_backend;
36             int m_count;
37             bool operator < (const BackendSet &k) const;
38             bool operator == (const BackendSet &k) const;
39         };
40         struct Multi::ScanTermInfo {
41             std::string m_norm_term;
42             std::string m_display_term;
43             int m_count;
44             bool operator < (const ScanTermInfo &) const;
45             bool operator == (const ScanTermInfo &) const;
46             Z_Entry *get_entry(ODR odr);
47         };
48         struct Multi::FrontendSet {
49             struct PresentJob {
50                 BackendPtr m_backend;
51                 int m_pos;
52                 int m_inside_pos;
53             };
54             FrontendSet(std::string setname);
55             FrontendSet();
56             ~FrontendSet();
57
58             void round_robin(int pos, int number, std::list<PresentJob> &job);
59
60             std::list<BackendSet> m_backend_sets;
61             std::string m_setname;
62         };
63         struct Multi::Backend {
64             PackagePtr m_package;
65             std::string m_backend_database;
66             std::string m_vhost;
67             std::string m_route;
68             void operator() (void);  // thread operation
69         };
70         struct Multi::Frontend {
71             Frontend(Rep *rep);
72             ~Frontend();
73             bool m_is_multi;
74             bool m_in_use;
75             std::list<BackendPtr> m_backend_list;
76             std::map<std::string,Multi::FrontendSet> m_sets;
77
78             void multi_move(std::list<BackendPtr> &blist);
79             void init(Package &package, Z_GDU *gdu);
80             void close(Package &package);
81             void search(Package &package, Z_APDU *apdu);
82             void present(Package &package, Z_APDU *apdu);
83             void scan1(Package &package, Z_APDU *apdu);
84             void scan2(Package &package, Z_APDU *apdu);
85             Rep *m_p;
86         };            
87         struct Multi::Map {
88             Map(std::list<std::string> hosts, std::string route);
89             Map();
90             std::list<std::string> m_hosts;
91             std::string m_route;
92         };
93         class Multi::Rep {
94             friend class Multi;
95             friend class Frontend;
96             
97             FrontendPtr get_frontend(Package &package);
98             void release_frontend(Package &package);
99         private:
100             boost::mutex m_sessions_mutex;
101             std::map<std::string, Multi::Map>m_maps;
102             std::map<std::string,std::string> m_target_route;
103             boost::mutex m_mutex;
104             boost::condition m_cond_session_ready;
105             std::map<yp2::Session, FrontendPtr> m_clients;
106         };
107     }
108 }
109
110 using namespace yp2;
111
112 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
113 {
114     return m_count < k.m_count;
115 }
116
117 yf::Multi::Frontend::Frontend(Rep *rep)
118 {
119     m_p = rep;
120     m_is_multi = false;
121 }
122
123 yf::Multi::Frontend::~Frontend()
124 {
125 }
126
127 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package)
128 {
129     boost::mutex::scoped_lock lock(m_mutex);
130
131     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
132     
133     while(true)
134     {
135         it = m_clients.find(package.session());
136         if (it == m_clients.end())
137             break;
138         
139         if (!it->second->m_in_use)
140         {
141             it->second->m_in_use = true;
142             return it->second;
143         }
144         m_cond_session_ready.wait(lock);
145     }
146     FrontendPtr f(new Frontend(this));
147     m_clients[package.session()] = f;
148     f->m_in_use = true;
149     return f;
150 }
151
152 void yf::Multi::Rep::release_frontend(Package &package)
153 {
154     boost::mutex::scoped_lock lock(m_mutex);
155     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
156     
157     it = m_clients.find(package.session());
158     if (it != m_clients.end())
159     {
160         if (package.session().is_closed())
161         {
162             it->second->close(package);
163             m_clients.erase(it);
164         }
165         else
166         {
167             it->second->m_in_use = false;
168         }
169         m_cond_session_ready.notify_all();
170     }
171 }
172
173 yf::Multi::FrontendSet::FrontendSet(std::string setname)
174     :  m_setname(setname)
175 {
176 }
177
178
179 yf::Multi::FrontendSet::FrontendSet()
180 {
181 }
182
183
184 yf::Multi::FrontendSet::~FrontendSet()
185 {
186 }
187
188 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
189     : m_hosts(hosts), m_route(route) 
190 {
191 }
192
193 yf::Multi::Map::Map()
194 {
195 }
196
197 yf::Multi::Multi() : m_p(new Multi::Rep)
198 {
199 }
200
201 yf::Multi::~Multi() {
202 }
203
204
205 void yf::Multi::add_map_host2hosts(std::string host,
206                                    std::list<std::string> hosts,
207                                    std::string route)
208 {
209     m_p->m_maps[host] = Multi::Map(hosts, route);
210 }
211
212 void yf::Multi::Backend::operator() (void) 
213 {
214     m_package->move(m_route);
215 }
216
217
218 void yf::Multi::Frontend::close(Package &package)
219 {
220     std::list<BackendPtr>::const_iterator bit;
221     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
222     {
223         BackendPtr b = *bit;
224
225         b->m_package->copy_filter(package);
226         b->m_package->request() = (Z_GDU *) 0;
227         b->m_package->session().close();
228         b->m_package->move(b->m_route);
229     }
230 }
231
232 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
233 {
234     std::list<BackendPtr>::const_iterator bit;
235     boost::thread_group g;
236     for (bit = blist.begin(); bit != blist.end(); bit++)
237     {
238         g.add_thread(new boost::thread(**bit));
239     }
240     g.join_all();
241 }
242
243 void yf::Multi::FrontendSet::round_robin(int start, int number,
244                                          std::list<PresentJob> &jobs)
245 {
246     std::list<int> pos;
247     std::list<int> inside_pos;
248     std::list<BackendSet>::const_iterator bsit;
249     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
250     {
251         pos.push_back(1);
252         inside_pos.push_back(0);
253     }
254
255     int p = 1;
256 #if 1
257     // optimization step!
258     int omin = 0;
259     while(true)
260     {
261         int min = 0;
262         int no_left = 0;
263         // find min count for each set which is > omin
264         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
265         {
266             if (bsit->m_count > omin)
267             {
268                 if (no_left == 0 || bsit->m_count < min)
269                     min = bsit->m_count;
270                 no_left++;
271             }
272         }
273         if (no_left == 0) // if nothing greater than omin, bail out.
274             break;
275         int skip = no_left * min;
276         if (p + skip > start)  // step gets us "into" present range?
277         {
278             // Yes. skip until start.. Rounding off is deliberate!
279             min = (start-p) / no_left;
280             p += no_left * min;
281             
282             std::cout << "\nBREAK min=" << min << " no_left=" << no_left << "\n\n";
283             // update positions in each set..
284             std::list<int>::iterator psit = pos.begin();
285             for (psit = pos.begin(); psit != pos.end(); psit++)
286                 *psit += min;
287             break;
288         }
289         // skip on each set.. before "present range"..
290         p = p + skip;
291         
292         std::cout << "\nSKIP min=" << min << " no_left=" << no_left << "\n\n";
293         
294         std::list<int>::iterator psit = pos.begin();
295         for (psit = pos.begin(); psit != pos.end(); psit++)
296             *psit += min;
297         
298         omin = min; // update so we consider next class (with higher count)
299     }
300 #endif
301     int fetched = 0;
302     bool more = true;
303     while (more)
304     {
305         more = false;
306         std::list<int>::iterator psit = pos.begin();
307         std::list<int>::iterator esit = inside_pos.begin();
308         bsit = m_backend_sets.begin();
309
310         for (; bsit != m_backend_sets.end(); psit++,esit++,bsit++)
311         {
312             if (fetched >= number)
313             {
314                 more = false;
315                 break;
316             }
317             if (*psit <= bsit->m_count)
318             {
319                 if (p >= start)
320                 {
321                     PresentJob job;
322                     job.m_backend = bsit->m_backend;
323                     job.m_pos = *psit;
324                     job.m_inside_pos = *esit;
325                     jobs.push_back(job);
326                     (*esit)++;
327                     fetched++;
328                 }
329                 (*psit)++;
330                 p++;
331                 more = true;
332             }
333         }
334     }
335 }
336
337 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
338 {
339     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
340
341     std::list<std::string> targets;
342
343     yp2::util::get_vhost_otherinfo(&req->otherInfo, false, targets);
344
345     if (targets.size() < 1)
346     {
347         package.move();
348         return;
349     }
350
351     std::list<std::string>::const_iterator t_it = targets.begin();
352     for (; t_it != targets.end(); t_it++)
353     {
354         Session s;
355         Backend *b = new Backend;
356         b->m_vhost = *t_it;
357
358         b->m_route = m_p->m_target_route[*t_it];
359         // b->m_route unset
360         b->m_package = PackagePtr(new Package(s, package.origin()));
361
362         m_backend_list.push_back(BackendPtr(b));
363     }
364     m_is_multi = true;
365
366     // create init request 
367     std::list<BackendPtr>::const_iterator bit;
368     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
369     {
370         yp2::odr odr;
371         BackendPtr b = *bit;
372         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
373         
374         std::list<std::string>vhost_one;
375         vhost_one.push_back(b->m_vhost);
376         yp2::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
377                                        odr, vhost_one);
378
379         Z_InitRequest *req = init_apdu->u.initRequest;
380         
381         ODR_MASK_SET(req->options, Z_Options_search);
382         ODR_MASK_SET(req->options, Z_Options_present);
383         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
384         ODR_MASK_SET(req->options, Z_Options_scan);
385         
386         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
387         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
388         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
389         
390         b->m_package->request() = init_apdu;
391
392         b->m_package->copy_filter(package);
393     }
394     multi_move(m_backend_list);
395
396     // create the frontend init response based on each backend init response
397     yp2::odr odr;
398
399     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
400     Z_InitResponse *f_resp = f_apdu->u.initResponse;
401
402     ODR_MASK_SET(f_resp->options, Z_Options_search);
403     ODR_MASK_SET(f_resp->options, Z_Options_present);
404     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
405     
406     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
407     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
408     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
409
410     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
411     {
412         PackagePtr p = (*bit)->m_package;
413         
414         if (p->session().is_closed()) // if any backend closes, close frontend
415             package.session().close();
416         Z_GDU *gdu = p->response().get();
417         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
418             Z_APDU_initResponse)
419         {
420             int i;
421             Z_APDU *b_apdu = gdu->u.z3950;
422             Z_InitResponse *b_resp = b_apdu->u.initResponse;
423
424             // common options for all backends
425             for (i = 0; i <= Z_Options_stringSchema; i++)
426             {
427                 if (!ODR_MASK_GET(b_resp->options, i))
428                     ODR_MASK_CLEAR(f_resp->options, i);
429             }
430             // common protocol version
431             for (i = 0; i <= Z_ProtocolVersion_3; i++)
432                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
433                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
434             // reject if any of the backends reject
435             if (!*b_resp->result)
436                 *f_resp->result = 0;
437         }
438         else
439         {
440             // if any target does not return init return that (close or
441             // similar )
442             package.response() = p->response();
443             return;
444         }
445     }
446     package.response() = f_apdu;
447 }
448
449 void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
450 {
451     // create search request 
452     Z_SearchRequest *req = apdu_req->u.searchRequest;
453
454     // save these for later
455     int smallSetUpperBound = *req->smallSetUpperBound;
456     int largeSetLowerBound = *req->largeSetLowerBound;
457     int mediumSetPresentNumber = *req->mediumSetPresentNumber;
458     
459     // they are altered now - to disable piggyback
460     *req->smallSetUpperBound = 0;
461     *req->largeSetLowerBound = 1;
462     *req->mediumSetPresentNumber = 1;
463
464     int default_num_db = req->num_databaseNames;
465     char **default_db = req->databaseNames;
466
467     std::list<BackendPtr>::const_iterator bit;
468     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
469     {
470         PackagePtr p = (*bit)->m_package;
471         yp2::odr odr;
472     
473         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
474                                                 &req->num_databaseNames,
475                                                 &req->databaseNames))
476         {
477             req->num_databaseNames = default_num_db;
478             req->databaseNames = default_db;
479         }
480         p->request() = apdu_req;
481         p->copy_filter(package);
482     }
483     multi_move(m_backend_list);
484
485     // look at each response
486     FrontendSet resultSet(std::string(req->resultSetName));
487
488     int result_set_size = 0;
489     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
490     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
491     {
492         PackagePtr p = (*bit)->m_package;
493         
494         if (p->session().is_closed()) // if any backend closes, close frontend
495             package.session().close();
496         
497         Z_GDU *gdu = p->response().get();
498         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
499             Z_APDU_searchResponse)
500         {
501             Z_APDU *b_apdu = gdu->u.z3950;
502             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
503          
504             // see we get any errors (AKA diagnstics)
505             if (b_resp->records)
506             {
507                 if (b_resp->records->which == Z_Records_NSD
508                     || b_resp->records->which == Z_Records_multipleNSD)
509                     z_records_diag = b_resp->records;
510                 // we may set this multiple times (TOO BAD!)
511             }
512             BackendSet backendSet;
513             backendSet.m_backend = *bit;
514             backendSet.m_count = *b_resp->resultCount;
515             result_set_size += *b_resp->resultCount;
516             resultSet.m_backend_sets.push_back(backendSet);
517         }
518         else
519         {
520             // if any target does not return search response - return that 
521             package.response() = p->response();
522             return;
523         }
524     }
525
526     yp2::odr odr;
527     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
528     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
529
530     *f_resp->resultCount = result_set_size;
531     if (z_records_diag)
532     {
533         // search error
534         f_resp->records = z_records_diag;
535         package.response() = f_apdu;
536         return;
537     }
538     // assume OK
539     m_sets[resultSet.m_setname] = resultSet;
540
541     int number;
542     yp2::util::piggyback(smallSetUpperBound,
543                          largeSetLowerBound,
544                          mediumSetPresentNumber,
545                          result_set_size,
546                          number);
547     Package pp(package.session(), package.origin());
548     if (number > 0)
549     {
550         pp.copy_filter(package);
551         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
552         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
553         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
554         p_req->resultSetId = req->resultSetName;
555         *p_req->resultSetStartPoint = 1;
556         *p_req->numberOfRecordsRequested = number;
557         pp.request() = p_apdu;
558         present(pp, p_apdu);
559         
560         if (pp.session().is_closed())
561             package.session().close();
562         
563         Z_GDU *gdu = pp.response().get();
564         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
565             Z_APDU_presentResponse)
566         {
567             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
568             f_resp->records = p_res->records;
569             *f_resp->numberOfRecordsReturned = 
570                 *p_res->numberOfRecordsReturned;
571             *f_resp->nextResultSetPosition = 
572                 *p_res->nextResultSetPosition;
573         }
574         else 
575         {
576             package.response() = pp.response(); 
577             return;
578         }
579     }
580     package.response() = f_apdu; // in this scope because of p
581 }
582
583 void yf::Multi::Frontend::present(Package &package, Z_APDU *apdu_req)
584 {
585     // create present request 
586     Z_PresentRequest *req = apdu_req->u.presentRequest;
587
588     Sets_it it;
589     it = m_sets.find(std::string(req->resultSetId));
590     if (it == m_sets.end())
591     {
592         yp2::odr odr;
593         Z_APDU *apdu = 
594             odr.create_presentResponse(
595                 apdu_req,
596                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
597                 req->resultSetId);
598         package.response() = apdu;
599         return;
600     }
601     std::list<Multi::FrontendSet::PresentJob> jobs;
602     int start = *req->resultSetStartPoint;
603     int number = *req->numberOfRecordsRequested;
604     it->second.round_robin(start, number, jobs);
605
606     std::list<BackendPtr> present_backend_list;
607
608     std::list<BackendSet>::const_iterator bsit;
609     bsit = it->second.m_backend_sets.begin();
610     for (; bsit != it->second.m_backend_sets.end(); bsit++)
611     {
612         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
613         int start = -1;
614         int end = -1;
615         
616         for (jit = jobs.begin(); jit != jobs.end(); jit++)
617         {
618             if (jit->m_backend == bsit->m_backend)
619             {
620                 if (start == -1 || jit->m_pos < start)
621                     start = jit->m_pos;
622                 if (end == -1 || jit->m_pos > end)
623                     end = jit->m_pos;
624             }
625         }
626         if (start != -1)
627         {
628             PackagePtr p = bsit->m_backend->m_package;
629
630             *req->resultSetStartPoint = start;
631             *req->numberOfRecordsRequested = end - start + 1;
632             
633             p->request() = apdu_req;
634             p->copy_filter(package);
635
636             present_backend_list.push_back(bsit->m_backend);
637         }
638     }
639     multi_move(present_backend_list);
640
641     // look at each response
642     Z_Records *z_records_diag = 0;
643
644     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
645     for (; pbit != present_backend_list.end(); pbit++)
646     {
647         PackagePtr p = (*pbit)->m_package;
648         
649         if (p->session().is_closed()) // if any backend closes, close frontend
650             package.session().close();
651         
652         Z_GDU *gdu = p->response().get();
653         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
654             Z_APDU_presentResponse)
655         {
656             Z_APDU *b_apdu = gdu->u.z3950;
657             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
658          
659             // see we get any errors (AKA diagnstics)
660             if (b_resp->records)
661             {
662                 if (b_resp->records->which != Z_Records_DBOSD)
663                     z_records_diag = b_resp->records;
664                 // we may set this multiple times (TOO BAD!)
665             }
666         }
667         else
668         {
669             // if any target does not return present response - return that 
670             package.response() = p->response();
671             return;
672         }
673     }
674
675     yp2::odr odr;
676     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
677     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
678
679     if (z_records_diag)
680     {
681         f_resp->records = z_records_diag;
682         *f_resp->presentStatus = Z_PresentStatus_failure;
683     }
684     else
685     {
686         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
687         Z_Records * records = f_resp->records;
688         records->which = Z_Records_DBOSD;
689         records->u.databaseOrSurDiagnostics =
690             (Z_NamePlusRecordList *)
691             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
692         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
693         nprl->num_records = jobs.size();
694         nprl->records = (Z_NamePlusRecord**)
695             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
696         int i = 0;
697         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
698         for (jit = jobs.begin(); jit != jobs.end(); jit++)
699         {
700             PackagePtr p = jit->m_backend->m_package;
701             
702             Z_GDU *gdu = p->response().get();
703             Z_APDU *b_apdu = gdu->u.z3950;
704             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
705
706             nprl->records[i++] =
707                 b_resp->records->u.databaseOrSurDiagnostics->
708                 records[jit->m_inside_pos];
709         }
710         *f_resp->nextResultSetPosition = start + i;
711         *f_resp->numberOfRecordsReturned = i;
712     }
713     package.response() = f_apdu;
714 }
715
716 void yf::Multi::Frontend::scan1(Package &package, Z_APDU *apdu_req)
717 {
718     if (m_backend_list.size() > 1)
719     {
720         yp2::odr odr;
721         Z_APDU *f_apdu = 
722             odr.create_scanResponse(
723                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
724         package.response() = f_apdu;
725         return;
726     }
727     Z_ScanRequest *req = apdu_req->u.scanRequest;
728
729     int default_num_db = req->num_databaseNames;
730     char **default_db = req->databaseNames;
731
732     std::list<BackendPtr>::const_iterator bit;
733     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
734     {
735         PackagePtr p = (*bit)->m_package;
736         yp2::odr odr;
737     
738         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
739                                                 &req->num_databaseNames,
740                                                 &req->databaseNames))
741         {
742             req->num_databaseNames = default_num_db;
743             req->databaseNames = default_db;
744         }
745         p->request() = apdu_req;
746         p->copy_filter(package);
747     }
748     multi_move(m_backend_list);
749
750     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
751     {
752         PackagePtr p = (*bit)->m_package;
753         
754         if (p->session().is_closed()) // if any backend closes, close frontend
755             package.session().close();
756         
757         Z_GDU *gdu = p->response().get();
758         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
759             Z_APDU_scanResponse)
760         {
761             package.response() = p->response();
762             break;
763         }
764         else
765         {
766             // if any target does not return scan response - return that 
767             package.response() = p->response();
768             return;
769         }
770     }
771 }
772
773 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
774 {
775     return m_norm_term < k.m_norm_term;
776 }
777
778 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
779 {
780     return m_norm_term == k.m_norm_term;
781 }
782
783 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
784 {
785     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
786     e->which = Z_Entry_termInfo;
787     Z_TermInfo *t;
788     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
789     t->suggestedAttributes = 0;
790     t->displayTerm = 0;
791     t->alternativeTerm = 0;
792     t->byAttributes = 0;
793     t->otherTermInfo = 0;
794     t->globalOccurrences = odr_intdup(odr, m_count);
795     t->term = (Z_Term *)
796         odr_malloc(odr, sizeof(*t->term));
797     t->term->which = Z_Term_general;
798     Odr_oct *o;
799     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
800
801     o->len = o->size = m_norm_term.size();
802     o->buf = (unsigned char *) odr_malloc(odr, o->len);
803     memcpy(o->buf, m_norm_term.c_str(), o->len);
804     return e;
805 }
806
807 void yf::Multi::Frontend::scan2(Package &package, Z_APDU *apdu_req)
808 {
809     Z_ScanRequest *req = apdu_req->u.scanRequest;
810
811     int default_num_db = req->num_databaseNames;
812     char **default_db = req->databaseNames;
813
814     std::list<BackendPtr>::const_iterator bit;
815     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
816     {
817         PackagePtr p = (*bit)->m_package;
818         yp2::odr odr;
819     
820         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
821                                                 &req->num_databaseNames,
822                                                 &req->databaseNames))
823         {
824             req->num_databaseNames = default_num_db;
825             req->databaseNames = default_db;
826         }
827         p->request() = apdu_req;
828         p->copy_filter(package);
829     }
830     multi_move(m_backend_list);
831
832     ScanTermInfoList entries_before;
833     ScanTermInfoList entries_after;
834     int no_before = 0;
835     int no_after = 0;
836
837     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
838     {
839         PackagePtr p = (*bit)->m_package;
840         
841         if (p->session().is_closed()) // if any backend closes, close frontend
842             package.session().close();
843         
844         Z_GDU *gdu = p->response().get();
845         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
846             Z_APDU_scanResponse)
847         {
848             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
849
850             if (res->entries && res->entries->nonsurrogateDiagnostics)
851             {
852                 // failure
853                 yp2::odr odr;
854                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
855                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
856
857                 f_res->entries->nonsurrogateDiagnostics = 
858                     res->entries->nonsurrogateDiagnostics;
859                 f_res->entries->num_nonsurrogateDiagnostics = 
860                     res->entries->num_nonsurrogateDiagnostics;
861
862                 package.response() = f_apdu;
863                 return;
864             }
865
866             if (res->entries && res->entries->entries)
867             {
868                 Z_Entry **entries = res->entries->entries;
869                 int num_entries = res->entries->num_entries;
870                 int position = 1;
871                 if (req->preferredPositionInResponse)
872                     position = *req->preferredPositionInResponse;
873                 if (res->positionOfTerm)
874                     position = *res->positionOfTerm;
875
876                 // before
877                 int i;
878                 for (i = 0; i<position-1 && i<num_entries; i++)
879                 {
880                     Z_Entry *ent = entries[i];
881
882                     if (ent->which == Z_Entry_termInfo)
883                     {
884                         ScanTermInfo my;
885
886                         int *occur = ent->u.termInfo->globalOccurrences;
887                         my.m_count = occur ? *occur : 0;
888
889                         if (ent->u.termInfo->term->which == Z_Term_general)
890                         {
891                             my.m_norm_term = std::string(
892                                 (const char *)
893                                 ent->u.termInfo->term->u.general->buf,
894                                 ent->u.termInfo->term->u.general->len);
895                         }
896                         if (my.m_norm_term.length())
897                         {
898                             ScanTermInfoList::iterator it = 
899                                 entries_before.begin();
900                             while (it != entries_before.end() && my <*it)
901                                 it++;
902                             if (my == *it)
903                             {
904                                 it->m_count += my.m_count;
905                             }
906                             else
907                             {
908                                 entries_before.insert(it, my);
909                                 no_before++;
910                             }
911                         }
912                     }
913                 }
914                 // after
915                 if (position <= 0)
916                     i = 0;
917                 else
918                     i = position-1;
919                 for ( ; i<num_entries; i++)
920                 {
921                     Z_Entry *ent = entries[i];
922
923                     if (ent->which == Z_Entry_termInfo)
924                     {
925                         ScanTermInfo my;
926
927                         int *occur = ent->u.termInfo->globalOccurrences;
928                         my.m_count = occur ? *occur : 0;
929
930                         if (ent->u.termInfo->term->which == Z_Term_general)
931                         {
932                             my.m_norm_term = std::string(
933                                 (const char *)
934                                 ent->u.termInfo->term->u.general->buf,
935                                 ent->u.termInfo->term->u.general->len);
936                         }
937                         if (my.m_norm_term.length())
938                         {
939                             ScanTermInfoList::iterator it = 
940                                 entries_after.begin();
941                             while (it != entries_after.end() && *it < my)
942                                 it++;
943                             if (my == *it)
944                             {
945                                 it->m_count += my.m_count;
946                             }
947                             else
948                             {
949                                 entries_after.insert(it, my);
950                                 no_after++;
951                             }
952                         }
953                     }
954                 }
955
956             }                
957         }
958         else
959         {
960             // if any target does not return scan response - return that 
961             package.response() = p->response();
962             return;
963         }
964     }
965
966     if (true)
967     {
968         std::cout << "BEFORE\n";
969         ScanTermInfoList::iterator it = entries_before.begin();
970         for(; it != entries_before.end(); it++)
971         {
972             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
973         }
974         
975         std::cout << "AFTER\n";
976         it = entries_after.begin();
977         for(; it != entries_after.end(); it++)
978         {
979             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
980         }
981     }
982
983     if (false)
984     {
985         yp2::odr odr;
986         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
987         package.response() = f_apdu;
988     }
989     else
990     {
991         yp2::odr odr;
992         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
993         Z_ScanResponse *resp = f_apdu->u.scanResponse;
994         
995         int number_returned = *req->numberOfTermsRequested;
996         int position_returned = *req->preferredPositionInResponse;
997         
998         resp->entries->num_entries = number_returned;
999         resp->entries->entries = (Z_Entry**)
1000             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1001         int i;
1002
1003         int lbefore = entries_before.size();
1004         if (lbefore < position_returned-1)
1005             position_returned = lbefore+1;
1006
1007         ScanTermInfoList::iterator it = entries_before.begin();
1008         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1009         {
1010             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1011         }
1012
1013         it = entries_after.begin();
1014
1015         if (position_returned <= 0)
1016             i = 0;
1017         else
1018             i = position_returned-1;
1019         for (; i<number_returned && it != entries_after.end(); i++, it++)
1020         {
1021             resp->entries->entries[i] = it->get_entry(odr);
1022         }
1023
1024         number_returned = i;
1025
1026         resp->positionOfTerm = odr_intdup(odr, position_returned);
1027         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1028         resp->entries->num_entries = number_returned;
1029
1030         package.response() = f_apdu;
1031     }
1032 }
1033
1034
1035 void yf::Multi::process(Package &package) const
1036 {
1037     FrontendPtr f = m_p->get_frontend(package);
1038
1039     Z_GDU *gdu = package.request().get();
1040     
1041     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1042         Z_APDU_initRequest && !f->m_is_multi)
1043     {
1044         f->init(package, gdu);
1045     }
1046     else if (!f->m_is_multi)
1047         package.move();
1048     else if (gdu && gdu->which == Z_GDU_Z3950)
1049     {
1050         Z_APDU *apdu = gdu->u.z3950;
1051         if (apdu->which == Z_APDU_initRequest)
1052         {
1053             yp2::odr odr;
1054             
1055             package.response() = odr.create_close(
1056                 apdu,
1057                 Z_Close_protocolError,
1058                 "double init");
1059             
1060             package.session().close();
1061         }
1062         else if (apdu->which == Z_APDU_searchRequest)
1063         {
1064             f->search(package, apdu);
1065         }
1066         else if (apdu->which == Z_APDU_presentRequest)
1067         {
1068             f->present(package, apdu);
1069         }
1070         else if (apdu->which == Z_APDU_scanRequest)
1071         {
1072             f->scan2(package, apdu);
1073         }
1074         else
1075         {
1076             yp2::odr odr;
1077             
1078             package.response() = odr.create_close(
1079                 apdu, Z_Close_protocolError,
1080                 "unsupported APDU in filter multi");
1081             
1082             package.session().close();
1083         }
1084     }
1085     m_p->release_frontend(package);
1086 }
1087
1088 void yp2::filter::Multi::configure(const xmlNode * ptr)
1089 {
1090     for (ptr = ptr->children; ptr; ptr = ptr->next)
1091     {
1092         if (ptr->type != XML_ELEMENT_NODE)
1093             continue;
1094         if (!strcmp((const char *) ptr->name, "target"))
1095         {
1096             std::string route = yp2::xml::get_route(ptr);
1097             std::string target = yp2::xml::get_text(ptr);
1098             std::cout << "route=" << route << " target=" << target << "\n";
1099             m_p->m_target_route[target] = route;
1100         }
1101         else if (!strcmp((const char *) ptr->name, "virtual"))
1102         {
1103             std::list<std::string> targets;
1104             std::string vhost;
1105             xmlNode *v_node = ptr->children;
1106             for (; v_node; v_node = v_node->next)
1107             {
1108                 if (v_node->type != XML_ELEMENT_NODE)
1109                     continue;
1110                 
1111                 if (yp2::xml::is_element_yp2(v_node, "vhost"))
1112                     vhost = yp2::xml::get_text(v_node);
1113                 else if (yp2::xml::is_element_yp2(v_node, "target"))
1114                     targets.push_back(yp2::xml::get_text(v_node));
1115                 else
1116                     throw yp2::filter::FilterException
1117                         ("Bad element " 
1118                          + std::string((const char *) v_node->name)
1119                          + " in virtual section"
1120                             );
1121             }
1122             std::string route = yp2::xml::get_route(ptr);
1123             add_map_host2hosts(vhost, targets, route);
1124             std::list<std::string>::const_iterator it;
1125             for (it = targets.begin(); it != targets.end(); it++)
1126             {
1127                 std::cout << "Add " << vhost << "->" << *it
1128                           << "," << route << "\n";
1129             }
1130         }
1131         else
1132         {
1133             throw yp2::filter::FilterException
1134                 ("Bad element " 
1135                  + std::string((const char *) ptr->name)
1136                  + " in virt_db filter");
1137         }
1138     }
1139 }
1140
1141 static yp2::filter::Base* filter_creator()
1142 {
1143     return new yp2::filter::Multi;
1144 }
1145
1146 extern "C" {
1147     struct yp2_filter_struct yp2_filter_multi = {
1148         0,
1149         "multi",
1150         filter_creator
1151     };
1152 }
1153
1154
1155 /*
1156  * Local variables:
1157  * c-basic-offset: 4
1158  * indent-tabs-mode: nil
1159  * c-file-style: "stroustrup"
1160  * End:
1161  * vim: shiftwidth=4 tabstop=8 expandtab
1162  */