Changed from 'class' to 'struct' in a few forward declarations
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.14 2006-02-02 11:33:46 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <vector>
25 #include <algorithm>
26 #include <map>
27 #include <iostream>
28
29 namespace yf = yp2::filter;
30
31 namespace yp2 {
32     namespace filter {
33
34         struct Multi::BackendSet {
35             BackendPtr m_backend;
36             int m_count;
37             bool operator < (const BackendSet &k) const;
38             bool operator == (const BackendSet &k) const;
39         };
40         struct Multi::ScanTermInfo {
41             std::string m_norm_term;
42             std::string m_display_term;
43             int m_count;
44             bool operator < (const ScanTermInfo &) const;
45             bool operator == (const ScanTermInfo &) const;
46             Z_Entry *get_entry(ODR odr);
47         };
48         struct Multi::FrontendSet {
49             struct PresentJob {
50                 BackendPtr m_backend;
51                 int m_pos;
52                 int m_inside_pos;
53             };
54             FrontendSet(std::string setname);
55             FrontendSet();
56             ~FrontendSet();
57
58             void round_robin(int pos, int number, std::list<PresentJob> &job);
59
60             std::list<BackendSet> m_backend_sets;
61             std::string m_setname;
62         };
63         struct Multi::Backend {
64             PackagePtr m_package;
65             std::string m_backend_database;
66             std::string m_vhost;
67             std::string m_route;
68             void operator() (void);  // thread operation
69         };
70         struct Multi::Frontend {
71             Frontend(Rep *rep);
72             ~Frontend();
73             bool m_is_multi;
74             bool m_in_use;
75             std::list<BackendPtr> m_backend_list;
76             std::map<std::string,Multi::FrontendSet> m_sets;
77
78             void multi_move(std::list<BackendPtr> &blist);
79             void init(Package &package, Z_GDU *gdu);
80             void close(Package &package);
81             void search(Package &package, Z_APDU *apdu);
82             void present(Package &package, Z_APDU *apdu);
83             void scan1(Package &package, Z_APDU *apdu);
84             void scan2(Package &package, Z_APDU *apdu);
85             Rep *m_p;
86         };            
87         struct Multi::Map {
88             Map(std::list<std::string> hosts, std::string route);
89             Map();
90             std::list<std::string> m_hosts;
91             std::string m_route;
92         };
93         class Multi::Rep {
94             friend class Multi;
95             friend struct Frontend;
96             
97             FrontendPtr get_frontend(Package &package);
98             void release_frontend(Package &package);
99         private:
100             boost::mutex m_sessions_mutex;
101             std::map<std::string, Multi::Map>m_maps;
102             std::map<std::string,std::string> m_target_route;
103             boost::mutex m_mutex;
104             boost::condition m_cond_session_ready;
105             std::map<yp2::Session, FrontendPtr> m_clients;
106         };
107     }
108 }
109
110 using namespace yp2;
111
112 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
113 {
114     return m_count < k.m_count;
115 }
116
117 yf::Multi::Frontend::Frontend(Rep *rep)
118 {
119     m_p = rep;
120     m_is_multi = false;
121 }
122
123 yf::Multi::Frontend::~Frontend()
124 {
125 }
126
127 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package)
128 {
129     boost::mutex::scoped_lock lock(m_mutex);
130
131     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
132     
133     while(true)
134     {
135         it = m_clients.find(package.session());
136         if (it == m_clients.end())
137             break;
138         
139         if (!it->second->m_in_use)
140         {
141             it->second->m_in_use = true;
142             return it->second;
143         }
144         m_cond_session_ready.wait(lock);
145     }
146     FrontendPtr f(new Frontend(this));
147     m_clients[package.session()] = f;
148     f->m_in_use = true;
149     return f;
150 }
151
152 void yf::Multi::Rep::release_frontend(Package &package)
153 {
154     boost::mutex::scoped_lock lock(m_mutex);
155     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
156     
157     it = m_clients.find(package.session());
158     if (it != m_clients.end())
159     {
160         if (package.session().is_closed())
161         {
162             it->second->close(package);
163             m_clients.erase(it);
164         }
165         else
166         {
167             it->second->m_in_use = false;
168         }
169         m_cond_session_ready.notify_all();
170     }
171 }
172
173 yf::Multi::FrontendSet::FrontendSet(std::string setname)
174     :  m_setname(setname)
175 {
176 }
177
178
179 yf::Multi::FrontendSet::FrontendSet()
180 {
181 }
182
183
184 yf::Multi::FrontendSet::~FrontendSet()
185 {
186 }
187
188 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
189     : m_hosts(hosts), m_route(route) 
190 {
191 }
192
193 yf::Multi::Map::Map()
194 {
195 }
196
197 yf::Multi::Multi() : m_p(new Multi::Rep)
198 {
199 }
200
201 yf::Multi::~Multi() {
202 }
203
204
205 void yf::Multi::add_map_host2hosts(std::string host,
206                                    std::list<std::string> hosts,
207                                    std::string route)
208 {
209     m_p->m_maps[host] = Multi::Map(hosts, route);
210 }
211
212 void yf::Multi::Backend::operator() (void) 
213 {
214     m_package->move(m_route);
215 }
216
217
218 void yf::Multi::Frontend::close(Package &package)
219 {
220     std::list<BackendPtr>::const_iterator bit;
221     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
222     {
223         BackendPtr b = *bit;
224
225         b->m_package->copy_filter(package);
226         b->m_package->request() = (Z_GDU *) 0;
227         b->m_package->session().close();
228         b->m_package->move(b->m_route);
229     }
230 }
231
232 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
233 {
234     std::list<BackendPtr>::const_iterator bit;
235     boost::thread_group g;
236     for (bit = blist.begin(); bit != blist.end(); bit++)
237     {
238         g.add_thread(new boost::thread(**bit));
239     }
240     g.join_all();
241 }
242
243 void yf::Multi::FrontendSet::round_robin(int start, int number,
244                                          std::list<PresentJob> &jobs)
245 {
246     std::list<int> pos;
247     std::list<int> inside_pos;
248     std::list<BackendSet>::const_iterator bsit;
249     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
250     {
251         pos.push_back(1);
252         inside_pos.push_back(0);
253     }
254
255     int p = 1;
256 #if 1
257     // optimization step!
258     int omin = 0;
259     while(true)
260     {
261         int min = 0;
262         int no_left = 0;
263         // find min count for each set which is > omin
264         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
265         {
266             if (bsit->m_count > omin)
267             {
268                 if (no_left == 0 || bsit->m_count < min)
269                     min = bsit->m_count;
270                 no_left++;
271             }
272         }
273         if (no_left == 0) // if nothing greater than omin, bail out.
274             break;
275         int skip = no_left * min;
276         if (p + skip > start)  // step gets us "into" present range?
277         {
278             // Yes. skip until start.. Rounding off is deliberate!
279             min = (start-p) / no_left;
280             p += no_left * min;
281             
282             // update positions in each set..
283             std::list<int>::iterator psit = pos.begin();
284             for (psit = pos.begin(); psit != pos.end(); psit++)
285                 *psit += min;
286             break;
287         }
288         // skip on each set.. before "present range"..
289         p = p + skip;
290         
291         std::cout << "\nSKIP min=" << min << " no_left=" << no_left << "\n\n";
292         
293         std::list<int>::iterator psit = pos.begin();
294         for (psit = pos.begin(); psit != pos.end(); psit++)
295             *psit += min;
296         
297         omin = min; // update so we consider next class (with higher count)
298     }
299 #endif
300     int fetched = 0;
301     bool more = true;
302     while (more)
303     {
304         more = false;
305         std::list<int>::iterator psit = pos.begin();
306         std::list<int>::iterator esit = inside_pos.begin();
307         bsit = m_backend_sets.begin();
308
309         for (; bsit != m_backend_sets.end(); psit++,esit++,bsit++)
310         {
311             if (fetched >= number)
312             {
313                 more = false;
314                 break;
315             }
316             if (*psit <= bsit->m_count)
317             {
318                 if (p >= start)
319                 {
320                     PresentJob job;
321                     job.m_backend = bsit->m_backend;
322                     job.m_pos = *psit;
323                     job.m_inside_pos = *esit;
324                     jobs.push_back(job);
325                     (*esit)++;
326                     fetched++;
327                 }
328                 (*psit)++;
329                 p++;
330                 more = true;
331             }
332         }
333     }
334 }
335
336 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
337 {
338     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
339
340     std::list<std::string> targets;
341
342     yp2::util::get_vhost_otherinfo(&req->otherInfo, false, targets);
343
344     if (targets.size() < 1)
345     {
346         package.move();
347         return;
348     }
349
350     std::list<std::string>::const_iterator t_it = targets.begin();
351     for (; t_it != targets.end(); t_it++)
352     {
353         Session s;
354         Backend *b = new Backend;
355         b->m_vhost = *t_it;
356
357         b->m_route = m_p->m_target_route[*t_it];
358         // b->m_route unset
359         b->m_package = PackagePtr(new Package(s, package.origin()));
360
361         m_backend_list.push_back(BackendPtr(b));
362     }
363     m_is_multi = true;
364
365     // create init request 
366     std::list<BackendPtr>::const_iterator bit;
367     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
368     {
369         yp2::odr odr;
370         BackendPtr b = *bit;
371         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
372         
373         std::list<std::string>vhost_one;
374         vhost_one.push_back(b->m_vhost);
375         yp2::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
376                                        odr, vhost_one);
377
378         Z_InitRequest *req = init_apdu->u.initRequest;
379         
380         ODR_MASK_SET(req->options, Z_Options_search);
381         ODR_MASK_SET(req->options, Z_Options_present);
382         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
383         ODR_MASK_SET(req->options, Z_Options_scan);
384         
385         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
386         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
387         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
388         
389         b->m_package->request() = init_apdu;
390
391         b->m_package->copy_filter(package);
392     }
393     multi_move(m_backend_list);
394
395     // create the frontend init response based on each backend init response
396     yp2::odr odr;
397
398     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
399     Z_InitResponse *f_resp = f_apdu->u.initResponse;
400
401     ODR_MASK_SET(f_resp->options, Z_Options_search);
402     ODR_MASK_SET(f_resp->options, Z_Options_present);
403     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
404     
405     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
406     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
407     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
408
409     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
410     {
411         PackagePtr p = (*bit)->m_package;
412         
413         if (p->session().is_closed()) // if any backend closes, close frontend
414             package.session().close();
415         Z_GDU *gdu = p->response().get();
416         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
417             Z_APDU_initResponse)
418         {
419             int i;
420             Z_APDU *b_apdu = gdu->u.z3950;
421             Z_InitResponse *b_resp = b_apdu->u.initResponse;
422
423             // common options for all backends
424             for (i = 0; i <= Z_Options_stringSchema; i++)
425             {
426                 if (!ODR_MASK_GET(b_resp->options, i))
427                     ODR_MASK_CLEAR(f_resp->options, i);
428             }
429             // common protocol version
430             for (i = 0; i <= Z_ProtocolVersion_3; i++)
431                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
432                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
433             // reject if any of the backends reject
434             if (!*b_resp->result)
435                 *f_resp->result = 0;
436         }
437         else
438         {
439             // if any target does not return init return that (close or
440             // similar )
441             package.response() = p->response();
442             return;
443         }
444     }
445     package.response() = f_apdu;
446 }
447
448 void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
449 {
450     // create search request 
451     Z_SearchRequest *req = apdu_req->u.searchRequest;
452
453     // save these for later
454     int smallSetUpperBound = *req->smallSetUpperBound;
455     int largeSetLowerBound = *req->largeSetLowerBound;
456     int mediumSetPresentNumber = *req->mediumSetPresentNumber;
457     
458     // they are altered now - to disable piggyback
459     *req->smallSetUpperBound = 0;
460     *req->largeSetLowerBound = 1;
461     *req->mediumSetPresentNumber = 1;
462
463     int default_num_db = req->num_databaseNames;
464     char **default_db = req->databaseNames;
465
466     std::list<BackendPtr>::const_iterator bit;
467     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
468     {
469         PackagePtr p = (*bit)->m_package;
470         yp2::odr odr;
471     
472         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
473                                                 &req->num_databaseNames,
474                                                 &req->databaseNames))
475         {
476             req->num_databaseNames = default_num_db;
477             req->databaseNames = default_db;
478         }
479         p->request() = apdu_req;
480         p->copy_filter(package);
481     }
482     multi_move(m_backend_list);
483
484     // look at each response
485     FrontendSet resultSet(std::string(req->resultSetName));
486
487     int result_set_size = 0;
488     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
489     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
490     {
491         PackagePtr p = (*bit)->m_package;
492         
493         if (p->session().is_closed()) // if any backend closes, close frontend
494             package.session().close();
495         
496         Z_GDU *gdu = p->response().get();
497         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
498             Z_APDU_searchResponse)
499         {
500             Z_APDU *b_apdu = gdu->u.z3950;
501             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
502          
503             // see we get any errors (AKA diagnstics)
504             if (b_resp->records)
505             {
506                 if (b_resp->records->which == Z_Records_NSD
507                     || b_resp->records->which == Z_Records_multipleNSD)
508                     z_records_diag = b_resp->records;
509                 // we may set this multiple times (TOO BAD!)
510             }
511             BackendSet backendSet;
512             backendSet.m_backend = *bit;
513             backendSet.m_count = *b_resp->resultCount;
514             result_set_size += *b_resp->resultCount;
515             resultSet.m_backend_sets.push_back(backendSet);
516         }
517         else
518         {
519             // if any target does not return search response - return that 
520             package.response() = p->response();
521             return;
522         }
523     }
524
525     yp2::odr odr;
526     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
527     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
528
529     *f_resp->resultCount = result_set_size;
530     if (z_records_diag)
531     {
532         // search error
533         f_resp->records = z_records_diag;
534         package.response() = f_apdu;
535         return;
536     }
537     // assume OK
538     m_sets[resultSet.m_setname] = resultSet;
539
540     int number;
541     yp2::util::piggyback(smallSetUpperBound,
542                          largeSetLowerBound,
543                          mediumSetPresentNumber,
544                          result_set_size,
545                          number);
546     Package pp(package.session(), package.origin());
547     if (number > 0)
548     {
549         pp.copy_filter(package);
550         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
551         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
552         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
553         p_req->resultSetId = req->resultSetName;
554         *p_req->resultSetStartPoint = 1;
555         *p_req->numberOfRecordsRequested = number;
556         pp.request() = p_apdu;
557         present(pp, p_apdu);
558         
559         if (pp.session().is_closed())
560             package.session().close();
561         
562         Z_GDU *gdu = pp.response().get();
563         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
564             Z_APDU_presentResponse)
565         {
566             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
567             f_resp->records = p_res->records;
568             *f_resp->numberOfRecordsReturned = 
569                 *p_res->numberOfRecordsReturned;
570             *f_resp->nextResultSetPosition = 
571                 *p_res->nextResultSetPosition;
572         }
573         else 
574         {
575             package.response() = pp.response(); 
576             return;
577         }
578     }
579     package.response() = f_apdu; // in this scope because of p
580 }
581
582 void yf::Multi::Frontend::present(Package &package, Z_APDU *apdu_req)
583 {
584     // create present request 
585     Z_PresentRequest *req = apdu_req->u.presentRequest;
586
587     Sets_it it;
588     it = m_sets.find(std::string(req->resultSetId));
589     if (it == m_sets.end())
590     {
591         yp2::odr odr;
592         Z_APDU *apdu = 
593             odr.create_presentResponse(
594                 apdu_req,
595                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
596                 req->resultSetId);
597         package.response() = apdu;
598         return;
599     }
600     std::list<Multi::FrontendSet::PresentJob> jobs;
601     int start = *req->resultSetStartPoint;
602     int number = *req->numberOfRecordsRequested;
603     it->second.round_robin(start, number, jobs);
604
605     std::list<BackendPtr> present_backend_list;
606
607     std::list<BackendSet>::const_iterator bsit;
608     bsit = it->second.m_backend_sets.begin();
609     for (; bsit != it->second.m_backend_sets.end(); bsit++)
610     {
611         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
612         int start = -1;
613         int end = -1;
614         
615         for (jit = jobs.begin(); jit != jobs.end(); jit++)
616         {
617             if (jit->m_backend == bsit->m_backend)
618             {
619                 if (start == -1 || jit->m_pos < start)
620                     start = jit->m_pos;
621                 if (end == -1 || jit->m_pos > end)
622                     end = jit->m_pos;
623             }
624         }
625         if (start != -1)
626         {
627             PackagePtr p = bsit->m_backend->m_package;
628
629             *req->resultSetStartPoint = start;
630             *req->numberOfRecordsRequested = end - start + 1;
631             
632             p->request() = apdu_req;
633             p->copy_filter(package);
634
635             present_backend_list.push_back(bsit->m_backend);
636         }
637     }
638     multi_move(present_backend_list);
639
640     // look at each response
641     Z_Records *z_records_diag = 0;
642
643     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
644     for (; pbit != present_backend_list.end(); pbit++)
645     {
646         PackagePtr p = (*pbit)->m_package;
647         
648         if (p->session().is_closed()) // if any backend closes, close frontend
649             package.session().close();
650         
651         Z_GDU *gdu = p->response().get();
652         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
653             Z_APDU_presentResponse)
654         {
655             Z_APDU *b_apdu = gdu->u.z3950;
656             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
657          
658             // see we get any errors (AKA diagnstics)
659             if (b_resp->records)
660             {
661                 if (b_resp->records->which != Z_Records_DBOSD)
662                     z_records_diag = b_resp->records;
663                 // we may set this multiple times (TOO BAD!)
664             }
665         }
666         else
667         {
668             // if any target does not return present response - return that 
669             package.response() = p->response();
670             return;
671         }
672     }
673
674     yp2::odr odr;
675     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
676     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
677
678     if (z_records_diag)
679     {
680         f_resp->records = z_records_diag;
681         *f_resp->presentStatus = Z_PresentStatus_failure;
682     }
683     else
684     {
685         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
686         Z_Records * records = f_resp->records;
687         records->which = Z_Records_DBOSD;
688         records->u.databaseOrSurDiagnostics =
689             (Z_NamePlusRecordList *)
690             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
691         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
692         nprl->num_records = jobs.size();
693         nprl->records = (Z_NamePlusRecord**)
694             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
695         int i = 0;
696         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
697         for (jit = jobs.begin(); jit != jobs.end(); jit++)
698         {
699             PackagePtr p = jit->m_backend->m_package;
700             
701             Z_GDU *gdu = p->response().get();
702             Z_APDU *b_apdu = gdu->u.z3950;
703             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
704
705             nprl->records[i++] =
706                 b_resp->records->u.databaseOrSurDiagnostics->
707                 records[jit->m_inside_pos];
708         }
709         *f_resp->nextResultSetPosition = start + i;
710         *f_resp->numberOfRecordsReturned = i;
711     }
712     package.response() = f_apdu;
713 }
714
715 void yf::Multi::Frontend::scan1(Package &package, Z_APDU *apdu_req)
716 {
717     if (m_backend_list.size() > 1)
718     {
719         yp2::odr odr;
720         Z_APDU *f_apdu = 
721             odr.create_scanResponse(
722                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
723         package.response() = f_apdu;
724         return;
725     }
726     Z_ScanRequest *req = apdu_req->u.scanRequest;
727
728     int default_num_db = req->num_databaseNames;
729     char **default_db = req->databaseNames;
730
731     std::list<BackendPtr>::const_iterator bit;
732     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
733     {
734         PackagePtr p = (*bit)->m_package;
735         yp2::odr odr;
736     
737         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
738                                                 &req->num_databaseNames,
739                                                 &req->databaseNames))
740         {
741             req->num_databaseNames = default_num_db;
742             req->databaseNames = default_db;
743         }
744         p->request() = apdu_req;
745         p->copy_filter(package);
746     }
747     multi_move(m_backend_list);
748
749     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
750     {
751         PackagePtr p = (*bit)->m_package;
752         
753         if (p->session().is_closed()) // if any backend closes, close frontend
754             package.session().close();
755         
756         Z_GDU *gdu = p->response().get();
757         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
758             Z_APDU_scanResponse)
759         {
760             package.response() = p->response();
761             break;
762         }
763         else
764         {
765             // if any target does not return scan response - return that 
766             package.response() = p->response();
767             return;
768         }
769     }
770 }
771
772 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
773 {
774     return m_norm_term < k.m_norm_term;
775 }
776
777 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
778 {
779     return m_norm_term == k.m_norm_term;
780 }
781
782 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
783 {
784     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
785     e->which = Z_Entry_termInfo;
786     Z_TermInfo *t;
787     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
788     t->suggestedAttributes = 0;
789     t->displayTerm = 0;
790     t->alternativeTerm = 0;
791     t->byAttributes = 0;
792     t->otherTermInfo = 0;
793     t->globalOccurrences = odr_intdup(odr, m_count);
794     t->term = (Z_Term *)
795         odr_malloc(odr, sizeof(*t->term));
796     t->term->which = Z_Term_general;
797     Odr_oct *o;
798     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
799
800     o->len = o->size = m_norm_term.size();
801     o->buf = (unsigned char *) odr_malloc(odr, o->len);
802     memcpy(o->buf, m_norm_term.c_str(), o->len);
803     return e;
804 }
805
806 void yf::Multi::Frontend::scan2(Package &package, Z_APDU *apdu_req)
807 {
808     Z_ScanRequest *req = apdu_req->u.scanRequest;
809
810     int default_num_db = req->num_databaseNames;
811     char **default_db = req->databaseNames;
812
813     std::list<BackendPtr>::const_iterator bit;
814     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
815     {
816         PackagePtr p = (*bit)->m_package;
817         yp2::odr odr;
818     
819         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
820                                                 &req->num_databaseNames,
821                                                 &req->databaseNames))
822         {
823             req->num_databaseNames = default_num_db;
824             req->databaseNames = default_db;
825         }
826         p->request() = apdu_req;
827         p->copy_filter(package);
828     }
829     multi_move(m_backend_list);
830
831     ScanTermInfoList entries_before;
832     ScanTermInfoList entries_after;
833     int no_before = 0;
834     int no_after = 0;
835
836     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
837     {
838         PackagePtr p = (*bit)->m_package;
839         
840         if (p->session().is_closed()) // if any backend closes, close frontend
841             package.session().close();
842         
843         Z_GDU *gdu = p->response().get();
844         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
845             Z_APDU_scanResponse)
846         {
847             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
848
849             if (res->entries && res->entries->nonsurrogateDiagnostics)
850             {
851                 // failure
852                 yp2::odr odr;
853                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
854                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
855
856                 f_res->entries->nonsurrogateDiagnostics = 
857                     res->entries->nonsurrogateDiagnostics;
858                 f_res->entries->num_nonsurrogateDiagnostics = 
859                     res->entries->num_nonsurrogateDiagnostics;
860
861                 package.response() = f_apdu;
862                 return;
863             }
864
865             if (res->entries && res->entries->entries)
866             {
867                 Z_Entry **entries = res->entries->entries;
868                 int num_entries = res->entries->num_entries;
869                 int position = 1;
870                 if (req->preferredPositionInResponse)
871                     position = *req->preferredPositionInResponse;
872                 if (res->positionOfTerm)
873                     position = *res->positionOfTerm;
874
875                 // before
876                 int i;
877                 for (i = 0; i<position-1 && i<num_entries; i++)
878                 {
879                     Z_Entry *ent = entries[i];
880
881                     if (ent->which == Z_Entry_termInfo)
882                     {
883                         ScanTermInfo my;
884
885                         int *occur = ent->u.termInfo->globalOccurrences;
886                         my.m_count = occur ? *occur : 0;
887
888                         if (ent->u.termInfo->term->which == Z_Term_general)
889                         {
890                             my.m_norm_term = std::string(
891                                 (const char *)
892                                 ent->u.termInfo->term->u.general->buf,
893                                 ent->u.termInfo->term->u.general->len);
894                         }
895                         if (my.m_norm_term.length())
896                         {
897                             ScanTermInfoList::iterator it = 
898                                 entries_before.begin();
899                             while (it != entries_before.end() && my <*it)
900                                 it++;
901                             if (my == *it)
902                             {
903                                 it->m_count += my.m_count;
904                             }
905                             else
906                             {
907                                 entries_before.insert(it, my);
908                                 no_before++;
909                             }
910                         }
911                     }
912                 }
913                 // after
914                 if (position <= 0)
915                     i = 0;
916                 else
917                     i = position-1;
918                 for ( ; i<num_entries; i++)
919                 {
920                     Z_Entry *ent = entries[i];
921
922                     if (ent->which == Z_Entry_termInfo)
923                     {
924                         ScanTermInfo my;
925
926                         int *occur = ent->u.termInfo->globalOccurrences;
927                         my.m_count = occur ? *occur : 0;
928
929                         if (ent->u.termInfo->term->which == Z_Term_general)
930                         {
931                             my.m_norm_term = std::string(
932                                 (const char *)
933                                 ent->u.termInfo->term->u.general->buf,
934                                 ent->u.termInfo->term->u.general->len);
935                         }
936                         if (my.m_norm_term.length())
937                         {
938                             ScanTermInfoList::iterator it = 
939                                 entries_after.begin();
940                             while (it != entries_after.end() && *it < my)
941                                 it++;
942                             if (my == *it)
943                             {
944                                 it->m_count += my.m_count;
945                             }
946                             else
947                             {
948                                 entries_after.insert(it, my);
949                                 no_after++;
950                             }
951                         }
952                     }
953                 }
954
955             }                
956         }
957         else
958         {
959             // if any target does not return scan response - return that 
960             package.response() = p->response();
961             return;
962         }
963     }
964
965     if (true)
966     {
967         std::cout << "BEFORE\n";
968         ScanTermInfoList::iterator it = entries_before.begin();
969         for(; it != entries_before.end(); it++)
970         {
971             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
972         }
973         
974         std::cout << "AFTER\n";
975         it = entries_after.begin();
976         for(; it != entries_after.end(); it++)
977         {
978             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
979         }
980     }
981
982     if (false)
983     {
984         yp2::odr odr;
985         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
986         package.response() = f_apdu;
987     }
988     else
989     {
990         yp2::odr odr;
991         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
992         Z_ScanResponse *resp = f_apdu->u.scanResponse;
993         
994         int number_returned = *req->numberOfTermsRequested;
995         int position_returned = *req->preferredPositionInResponse;
996         
997         resp->entries->num_entries = number_returned;
998         resp->entries->entries = (Z_Entry**)
999             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1000         int i;
1001
1002         int lbefore = entries_before.size();
1003         if (lbefore < position_returned-1)
1004             position_returned = lbefore+1;
1005
1006         ScanTermInfoList::iterator it = entries_before.begin();
1007         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1008         {
1009             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1010         }
1011
1012         it = entries_after.begin();
1013
1014         if (position_returned <= 0)
1015             i = 0;
1016         else
1017             i = position_returned-1;
1018         for (; i<number_returned && it != entries_after.end(); i++, it++)
1019         {
1020             resp->entries->entries[i] = it->get_entry(odr);
1021         }
1022
1023         number_returned = i;
1024
1025         resp->positionOfTerm = odr_intdup(odr, position_returned);
1026         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1027         resp->entries->num_entries = number_returned;
1028
1029         package.response() = f_apdu;
1030     }
1031 }
1032
1033
1034 void yf::Multi::process(Package &package) const
1035 {
1036     FrontendPtr f = m_p->get_frontend(package);
1037
1038     Z_GDU *gdu = package.request().get();
1039     
1040     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1041         Z_APDU_initRequest && !f->m_is_multi)
1042     {
1043         f->init(package, gdu);
1044     }
1045     else if (!f->m_is_multi)
1046         package.move();
1047     else if (gdu && gdu->which == Z_GDU_Z3950)
1048     {
1049         Z_APDU *apdu = gdu->u.z3950;
1050         if (apdu->which == Z_APDU_initRequest)
1051         {
1052             yp2::odr odr;
1053             
1054             package.response() = odr.create_close(
1055                 apdu,
1056                 Z_Close_protocolError,
1057                 "double init");
1058             
1059             package.session().close();
1060         }
1061         else if (apdu->which == Z_APDU_searchRequest)
1062         {
1063             f->search(package, apdu);
1064         }
1065         else if (apdu->which == Z_APDU_presentRequest)
1066         {
1067             f->present(package, apdu);
1068         }
1069         else if (apdu->which == Z_APDU_scanRequest)
1070         {
1071             f->scan2(package, apdu);
1072         }
1073         else
1074         {
1075             yp2::odr odr;
1076             
1077             package.response() = odr.create_close(
1078                 apdu, Z_Close_protocolError,
1079                 "unsupported APDU in filter multi");
1080             
1081             package.session().close();
1082         }
1083     }
1084     m_p->release_frontend(package);
1085 }
1086
1087 void yp2::filter::Multi::configure(const xmlNode * ptr)
1088 {
1089     for (ptr = ptr->children; ptr; ptr = ptr->next)
1090     {
1091         if (ptr->type != XML_ELEMENT_NODE)
1092             continue;
1093         if (!strcmp((const char *) ptr->name, "target"))
1094         {
1095             std::string route = yp2::xml::get_route(ptr);
1096             std::string target = yp2::xml::get_text(ptr);
1097             std::cout << "route=" << route << " target=" << target << "\n";
1098             m_p->m_target_route[target] = route;
1099         }
1100         else if (!strcmp((const char *) ptr->name, "virtual"))
1101         {
1102             std::list<std::string> targets;
1103             std::string vhost;
1104             xmlNode *v_node = ptr->children;
1105             for (; v_node; v_node = v_node->next)
1106             {
1107                 if (v_node->type != XML_ELEMENT_NODE)
1108                     continue;
1109                 
1110                 if (yp2::xml::is_element_yp2(v_node, "vhost"))
1111                     vhost = yp2::xml::get_text(v_node);
1112                 else if (yp2::xml::is_element_yp2(v_node, "target"))
1113                     targets.push_back(yp2::xml::get_text(v_node));
1114                 else
1115                     throw yp2::filter::FilterException
1116                         ("Bad element " 
1117                          + std::string((const char *) v_node->name)
1118                          + " in virtual section"
1119                             );
1120             }
1121             std::string route = yp2::xml::get_route(ptr);
1122             add_map_host2hosts(vhost, targets, route);
1123             std::list<std::string>::const_iterator it;
1124             for (it = targets.begin(); it != targets.end(); it++)
1125             {
1126                 std::cout << "Add " << vhost << "->" << *it
1127                           << "," << route << "\n";
1128             }
1129         }
1130         else
1131         {
1132             throw yp2::filter::FilterException
1133                 ("Bad element " 
1134                  + std::string((const char *) ptr->name)
1135                  + " in virt_db filter");
1136         }
1137     }
1138 }
1139
1140 static yp2::filter::Base* filter_creator()
1141 {
1142     return new yp2::filter::Multi;
1143 }
1144
1145 extern "C" {
1146     struct yp2_filter_struct yp2_filter_multi = {
1147         0,
1148         "multi",
1149         filter_creator
1150     };
1151 }
1152
1153
1154 /*
1155  * Local variables:
1156  * c-basic-offset: 4
1157  * indent-tabs-mode: nil
1158  * c-file-style: "stroustrup"
1159  * End:
1160  * vim: shiftwidth=4 tabstop=8 expandtab
1161  */