First work at bug #573: Hide errors for individual servers.
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.18 2006-05-15 13:53:37 adam Exp $
2    Copyright (c) 2005-2006, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <vector>
25 #include <algorithm>
26 #include <map>
27 #include <iostream>
28
29 namespace mp = metaproxy_1;
30 namespace yf = mp::filter;
31
32 namespace metaproxy_1 {
33     namespace filter {
34
35         struct Multi::BackendSet {
36             BackendPtr m_backend;
37             int m_count;
38             bool operator < (const BackendSet &k) const;
39             bool operator == (const BackendSet &k) const;
40         };
41         struct Multi::ScanTermInfo {
42             std::string m_norm_term;
43             std::string m_display_term;
44             int m_count;
45             bool operator < (const ScanTermInfo &) const;
46             bool operator == (const ScanTermInfo &) const;
47             Z_Entry *get_entry(ODR odr);
48         };
49         struct Multi::FrontendSet {
50             struct PresentJob {
51                 BackendPtr m_backend;
52                 int m_pos;
53                 int m_inside_pos;
54             };
55             FrontendSet(std::string setname);
56             FrontendSet();
57             ~FrontendSet();
58
59             void round_robin(int pos, int number, std::list<PresentJob> &job);
60
61             std::list<BackendSet> m_backend_sets;
62             std::string m_setname;
63         };
64         struct Multi::Backend {
65             PackagePtr m_package;
66             std::string m_backend_database;
67             std::string m_vhost;
68             std::string m_route;
69             void operator() (void);  // thread operation
70         };
71         struct Multi::Frontend {
72             Frontend(Rep *rep);
73             ~Frontend();
74             bool m_is_multi;
75             bool m_in_use;
76             std::list<BackendPtr> m_backend_list;
77             std::map<std::string,Multi::FrontendSet> m_sets;
78
79             void multi_move(std::list<BackendPtr> &blist);
80             void init(Package &package, Z_GDU *gdu);
81             void close(Package &package);
82             void search(Package &package, Z_APDU *apdu);
83             void present(Package &package, Z_APDU *apdu);
84             void scan1(Package &package, Z_APDU *apdu);
85             void scan2(Package &package, Z_APDU *apdu);
86             Rep *m_p;
87         };            
88         struct Multi::Map {
89             Map(std::list<std::string> hosts, std::string route);
90             Map();
91             std::list<std::string> m_hosts;
92             std::string m_route;
93         };
94         class Multi::Rep {
95             friend class Multi;
96             friend struct Frontend;
97             
98             Rep();
99             FrontendPtr get_frontend(Package &package);
100             void release_frontend(Package &package);
101         private:
102             std::map<std::string,std::string> m_target_route;
103             boost::mutex m_mutex;
104             boost::condition m_cond_session_ready;
105             std::map<mp::Session, FrontendPtr> m_clients;
106             bool m_hide_unavailable;
107         };
108     }
109 }
110
111 yf::Multi::Rep::Rep()
112 {
113     m_hide_unavailable = false;
114 }
115
116 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
117 {
118     return m_count < k.m_count;
119 }
120
121 yf::Multi::Frontend::Frontend(Rep *rep)
122 {
123     m_p = rep;
124     m_is_multi = false;
125 }
126
127 yf::Multi::Frontend::~Frontend()
128 {
129 }
130
131 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package)
132 {
133     boost::mutex::scoped_lock lock(m_mutex);
134
135     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
136     
137     while(true)
138     {
139         it = m_clients.find(package.session());
140         if (it == m_clients.end())
141             break;
142         
143         if (!it->second->m_in_use)
144         {
145             it->second->m_in_use = true;
146             return it->second;
147         }
148         m_cond_session_ready.wait(lock);
149     }
150     FrontendPtr f(new Frontend(this));
151     m_clients[package.session()] = f;
152     f->m_in_use = true;
153     return f;
154 }
155
156 void yf::Multi::Rep::release_frontend(Package &package)
157 {
158     boost::mutex::scoped_lock lock(m_mutex);
159     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
160     
161     it = m_clients.find(package.session());
162     if (it != m_clients.end())
163     {
164         if (package.session().is_closed())
165         {
166             it->second->close(package);
167             m_clients.erase(it);
168         }
169         else
170         {
171             it->second->m_in_use = false;
172         }
173         m_cond_session_ready.notify_all();
174     }
175 }
176
177 yf::Multi::FrontendSet::FrontendSet(std::string setname)
178     :  m_setname(setname)
179 {
180 }
181
182
183 yf::Multi::FrontendSet::FrontendSet()
184 {
185 }
186
187
188 yf::Multi::FrontendSet::~FrontendSet()
189 {
190 }
191
192 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
193     : m_hosts(hosts), m_route(route) 
194 {
195 }
196
197 yf::Multi::Map::Map()
198 {
199 }
200
201 yf::Multi::Multi() : m_p(new Multi::Rep)
202 {
203 }
204
205 yf::Multi::~Multi() {
206 }
207
208
209 void yf::Multi::Backend::operator() (void) 
210 {
211     m_package->move(m_route);
212 }
213
214
215 void yf::Multi::Frontend::close(Package &package)
216 {
217     std::list<BackendPtr>::const_iterator bit;
218     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
219     {
220         BackendPtr b = *bit;
221
222         b->m_package->copy_filter(package);
223         b->m_package->request() = (Z_GDU *) 0;
224         b->m_package->session().close();
225         b->m_package->move(b->m_route);
226     }
227 }
228
229 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
230 {
231     std::list<BackendPtr>::const_iterator bit;
232     boost::thread_group g;
233     for (bit = blist.begin(); bit != blist.end(); bit++)
234     {
235         g.add_thread(new boost::thread(**bit));
236     }
237     g.join_all();
238 }
239
240 void yf::Multi::FrontendSet::round_robin(int start, int number,
241                                          std::list<PresentJob> &jobs)
242 {
243     std::list<int> pos;
244     std::list<int> inside_pos;
245     std::list<BackendSet>::const_iterator bsit;
246     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
247     {
248         pos.push_back(1);
249         inside_pos.push_back(0);
250     }
251
252     int p = 1;
253 #if 1
254     // optimization step!
255     int omin = 0;
256     while(true)
257     {
258         int min = 0;
259         int no_left = 0;
260         // find min count for each set which is > omin
261         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
262         {
263             if (bsit->m_count > omin)
264             {
265                 if (no_left == 0 || bsit->m_count < min)
266                     min = bsit->m_count;
267                 no_left++;
268             }
269         }
270         if (no_left == 0) // if nothing greater than omin, bail out.
271             break;
272         int skip = no_left * min;
273         if (p + skip > start)  // step gets us "into" present range?
274         {
275             // Yes. skip until start.. Rounding off is deliberate!
276             min = (start-p) / no_left;
277             p += no_left * min;
278             
279             // update positions in each set..
280             std::list<int>::iterator psit = pos.begin();
281             for (psit = pos.begin(); psit != pos.end(); psit++)
282                 *psit += min;
283             break;
284         }
285         // skip on each set.. before "present range"..
286         p = p + skip;
287         
288         std::cout << "\nSKIP min=" << min << " no_left=" << no_left << "\n\n";
289         
290         std::list<int>::iterator psit = pos.begin();
291         for (psit = pos.begin(); psit != pos.end(); psit++)
292             *psit += min;
293         
294         omin = min; // update so we consider next class (with higher count)
295     }
296 #endif
297     int fetched = 0;
298     bool more = true;
299     while (more)
300     {
301         more = false;
302         std::list<int>::iterator psit = pos.begin();
303         std::list<int>::iterator esit = inside_pos.begin();
304         bsit = m_backend_sets.begin();
305
306         for (; bsit != m_backend_sets.end(); psit++,esit++,bsit++)
307         {
308             if (fetched >= number)
309             {
310                 more = false;
311                 break;
312             }
313             if (*psit <= bsit->m_count)
314             {
315                 if (p >= start)
316                 {
317                     PresentJob job;
318                     job.m_backend = bsit->m_backend;
319                     job.m_pos = *psit;
320                     job.m_inside_pos = *esit;
321                     jobs.push_back(job);
322                     (*esit)++;
323                     fetched++;
324                 }
325                 (*psit)++;
326                 p++;
327                 more = true;
328             }
329         }
330     }
331 }
332
333 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
334 {
335     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
336
337     std::list<std::string> targets;
338
339     mp::util::get_vhost_otherinfo(&req->otherInfo, false, targets);
340
341     if (targets.size() < 1)
342     {
343         package.move();
344         return;
345     }
346
347     std::list<std::string>::const_iterator t_it = targets.begin();
348     for (; t_it != targets.end(); t_it++)
349     {
350         Session s;
351         Backend *b = new Backend;
352         b->m_vhost = *t_it;
353
354         b->m_route = m_p->m_target_route[*t_it];
355         // b->m_route unset
356         b->m_package = PackagePtr(new Package(s, package.origin()));
357
358         m_backend_list.push_back(BackendPtr(b));
359     }
360     m_is_multi = true;
361
362     // create init request 
363     std::list<BackendPtr>::iterator bit;
364     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
365     {
366         mp::odr odr;
367         BackendPtr b = *bit;
368         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
369         
370         std::list<std::string>vhost_one;
371         vhost_one.push_back(b->m_vhost);
372         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
373                                        odr, vhost_one);
374
375         Z_InitRequest *req = init_apdu->u.initRequest;
376         
377         ODR_MASK_SET(req->options, Z_Options_search);
378         ODR_MASK_SET(req->options, Z_Options_present);
379         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
380         ODR_MASK_SET(req->options, Z_Options_scan);
381         
382         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
383         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
384         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
385         
386         b->m_package->request() = init_apdu;
387
388         b->m_package->copy_filter(package);
389     }
390     multi_move(m_backend_list);
391
392     // create the frontend init response based on each backend init response
393     mp::odr odr;
394
395     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
396     Z_InitResponse *f_resp = f_apdu->u.initResponse;
397
398     ODR_MASK_SET(f_resp->options, Z_Options_search);
399     ODR_MASK_SET(f_resp->options, Z_Options_present);
400     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
401     
402     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
403     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
404     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
405
406     int no_failed = 0;
407     int no_succeeded = 0;
408     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
409     {
410         PackagePtr p = (*bit)->m_package;
411         
412         if (p->session().is_closed()) // if any backend closes, close frontend
413         {
414             no_failed++;
415             bit = m_backend_list.erase(bit);
416             continue;
417         }
418         no_succeeded++;
419
420         Z_GDU *gdu = p->response().get();
421         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
422             Z_APDU_initResponse)
423         {
424             int i;
425             Z_APDU *b_apdu = gdu->u.z3950;
426             Z_InitResponse *b_resp = b_apdu->u.initResponse;
427
428             // common options for all backends
429             for (i = 0; i <= Z_Options_stringSchema; i++)
430             {
431                 if (!ODR_MASK_GET(b_resp->options, i))
432                     ODR_MASK_CLEAR(f_resp->options, i);
433             }
434             // common protocol version
435             for (i = 0; i <= Z_ProtocolVersion_3; i++)
436                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
437                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
438             // reject if any of the backends reject
439             if (!*b_resp->result)
440                 *f_resp->result = 0;
441         }
442         else
443         {
444             // if any target does not return init return that (close or
445             // similar )
446             package.response() = p->response();
447             return;
448         }
449         bit++;
450     }
451     if (m_p->m_hide_unavailable)
452     {
453         if (no_succeeded == 0)
454             package.session().close();
455     }
456     else
457     {
458         if (no_failed)
459             package.session().close();
460     }
461     package.response() = f_apdu;
462 }
463
464 void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
465 {
466     // create search request 
467     Z_SearchRequest *req = apdu_req->u.searchRequest;
468
469     // save these for later
470     int smallSetUpperBound = *req->smallSetUpperBound;
471     int largeSetLowerBound = *req->largeSetLowerBound;
472     int mediumSetPresentNumber = *req->mediumSetPresentNumber;
473     
474     // they are altered now - to disable piggyback
475     *req->smallSetUpperBound = 0;
476     *req->largeSetLowerBound = 1;
477     *req->mediumSetPresentNumber = 1;
478
479     int default_num_db = req->num_databaseNames;
480     char **default_db = req->databaseNames;
481
482     std::list<BackendPtr>::const_iterator bit;
483     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
484     {
485         PackagePtr p = (*bit)->m_package;
486         mp::odr odr;
487     
488         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
489                                                 &req->num_databaseNames,
490                                                 &req->databaseNames))
491         {
492             req->num_databaseNames = default_num_db;
493             req->databaseNames = default_db;
494         }
495         p->request() = apdu_req;
496         p->copy_filter(package);
497     }
498     multi_move(m_backend_list);
499
500     // look at each response
501     FrontendSet resultSet(std::string(req->resultSetName));
502
503     int result_set_size = 0;
504     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
505     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
506     {
507         PackagePtr p = (*bit)->m_package;
508         
509         if (p->session().is_closed()) // if any backend closes, close frontend
510             package.session().close();
511         
512         Z_GDU *gdu = p->response().get();
513         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
514             Z_APDU_searchResponse)
515         {
516             Z_APDU *b_apdu = gdu->u.z3950;
517             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
518          
519             // see we get any errors (AKA diagnstics)
520             if (b_resp->records)
521             {
522                 if (b_resp->records->which == Z_Records_NSD
523                     || b_resp->records->which == Z_Records_multipleNSD)
524                     z_records_diag = b_resp->records;
525                 // we may set this multiple times (TOO BAD!)
526             }
527             BackendSet backendSet;
528             backendSet.m_backend = *bit;
529             backendSet.m_count = *b_resp->resultCount;
530             result_set_size += *b_resp->resultCount;
531             resultSet.m_backend_sets.push_back(backendSet);
532         }
533         else
534         {
535             // if any target does not return search response - return that 
536             package.response() = p->response();
537             return;
538         }
539     }
540
541     mp::odr odr;
542     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
543     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
544
545     *f_resp->resultCount = result_set_size;
546     if (z_records_diag)
547     {
548         // search error
549         f_resp->records = z_records_diag;
550         package.response() = f_apdu;
551         return;
552     }
553     // assume OK
554     m_sets[resultSet.m_setname] = resultSet;
555
556     int number;
557     mp::util::piggyback(smallSetUpperBound,
558                          largeSetLowerBound,
559                          mediumSetPresentNumber,
560                          result_set_size,
561                          number);
562     Package pp(package.session(), package.origin());
563     if (number > 0)
564     {
565         pp.copy_filter(package);
566         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
567         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
568         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
569         p_req->resultSetId = req->resultSetName;
570         *p_req->resultSetStartPoint = 1;
571         *p_req->numberOfRecordsRequested = number;
572         pp.request() = p_apdu;
573         present(pp, p_apdu);
574         
575         if (pp.session().is_closed())
576             package.session().close();
577         
578         Z_GDU *gdu = pp.response().get();
579         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
580             Z_APDU_presentResponse)
581         {
582             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
583             f_resp->records = p_res->records;
584             *f_resp->numberOfRecordsReturned = 
585                 *p_res->numberOfRecordsReturned;
586             *f_resp->nextResultSetPosition = 
587                 *p_res->nextResultSetPosition;
588         }
589         else 
590         {
591             package.response() = pp.response(); 
592             return;
593         }
594     }
595     package.response() = f_apdu; // in this scope because of p
596 }
597
598 void yf::Multi::Frontend::present(Package &package, Z_APDU *apdu_req)
599 {
600     // create present request 
601     Z_PresentRequest *req = apdu_req->u.presentRequest;
602
603     Sets_it it;
604     it = m_sets.find(std::string(req->resultSetId));
605     if (it == m_sets.end())
606     {
607         mp::odr odr;
608         Z_APDU *apdu = 
609             odr.create_presentResponse(
610                 apdu_req,
611                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
612                 req->resultSetId);
613         package.response() = apdu;
614         return;
615     }
616     std::list<Multi::FrontendSet::PresentJob> jobs;
617     int start = *req->resultSetStartPoint;
618     int number = *req->numberOfRecordsRequested;
619     it->second.round_robin(start, number, jobs);
620
621     std::list<BackendPtr> present_backend_list;
622
623     std::list<BackendSet>::const_iterator bsit;
624     bsit = it->second.m_backend_sets.begin();
625     for (; bsit != it->second.m_backend_sets.end(); bsit++)
626     {
627         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
628         int start = -1;
629         int end = -1;
630         
631         for (jit = jobs.begin(); jit != jobs.end(); jit++)
632         {
633             if (jit->m_backend == bsit->m_backend)
634             {
635                 if (start == -1 || jit->m_pos < start)
636                     start = jit->m_pos;
637                 if (end == -1 || jit->m_pos > end)
638                     end = jit->m_pos;
639             }
640         }
641         if (start != -1)
642         {
643             PackagePtr p = bsit->m_backend->m_package;
644
645             *req->resultSetStartPoint = start;
646             *req->numberOfRecordsRequested = end - start + 1;
647             
648             p->request() = apdu_req;
649             p->copy_filter(package);
650
651             present_backend_list.push_back(bsit->m_backend);
652         }
653     }
654     multi_move(present_backend_list);
655
656     // look at each response
657     Z_Records *z_records_diag = 0;
658
659     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
660     for (; pbit != present_backend_list.end(); pbit++)
661     {
662         PackagePtr p = (*pbit)->m_package;
663         
664         if (p->session().is_closed()) // if any backend closes, close frontend
665             package.session().close();
666         
667         Z_GDU *gdu = p->response().get();
668         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
669             Z_APDU_presentResponse)
670         {
671             Z_APDU *b_apdu = gdu->u.z3950;
672             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
673          
674             // see we get any errors (AKA diagnstics)
675             if (b_resp->records)
676             {
677                 if (b_resp->records->which != Z_Records_DBOSD)
678                     z_records_diag = b_resp->records;
679                 // we may set this multiple times (TOO BAD!)
680             }
681         }
682         else
683         {
684             // if any target does not return present response - return that 
685             package.response() = p->response();
686             return;
687         }
688     }
689
690     mp::odr odr;
691     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
692     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
693
694     if (z_records_diag)
695     {
696         f_resp->records = z_records_diag;
697         *f_resp->presentStatus = Z_PresentStatus_failure;
698     }
699     else
700     {
701         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
702         Z_Records * records = f_resp->records;
703         records->which = Z_Records_DBOSD;
704         records->u.databaseOrSurDiagnostics =
705             (Z_NamePlusRecordList *)
706             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
707         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
708         nprl->num_records = jobs.size();
709         nprl->records = (Z_NamePlusRecord**)
710             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
711         int i = 0;
712         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
713         for (jit = jobs.begin(); jit != jobs.end(); jit++)
714         {
715             PackagePtr p = jit->m_backend->m_package;
716             
717             Z_GDU *gdu = p->response().get();
718             Z_APDU *b_apdu = gdu->u.z3950;
719             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
720
721             nprl->records[i++] =
722                 b_resp->records->u.databaseOrSurDiagnostics->
723                 records[jit->m_inside_pos];
724         }
725         *f_resp->nextResultSetPosition = start + i;
726         *f_resp->numberOfRecordsReturned = i;
727     }
728     package.response() = f_apdu;
729 }
730
731 void yf::Multi::Frontend::scan1(Package &package, Z_APDU *apdu_req)
732 {
733     if (m_backend_list.size() > 1)
734     {
735         mp::odr odr;
736         Z_APDU *f_apdu = 
737             odr.create_scanResponse(
738                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
739         package.response() = f_apdu;
740         return;
741     }
742     Z_ScanRequest *req = apdu_req->u.scanRequest;
743
744     int default_num_db = req->num_databaseNames;
745     char **default_db = req->databaseNames;
746
747     std::list<BackendPtr>::const_iterator bit;
748     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
749     {
750         PackagePtr p = (*bit)->m_package;
751         mp::odr odr;
752     
753         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
754                                                 &req->num_databaseNames,
755                                                 &req->databaseNames))
756         {
757             req->num_databaseNames = default_num_db;
758             req->databaseNames = default_db;
759         }
760         p->request() = apdu_req;
761         p->copy_filter(package);
762     }
763     multi_move(m_backend_list);
764
765     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
766     {
767         PackagePtr p = (*bit)->m_package;
768         
769         if (p->session().is_closed()) // if any backend closes, close frontend
770             package.session().close();
771         
772         Z_GDU *gdu = p->response().get();
773         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
774             Z_APDU_scanResponse)
775         {
776             package.response() = p->response();
777             break;
778         }
779         else
780         {
781             // if any target does not return scan response - return that 
782             package.response() = p->response();
783             return;
784         }
785     }
786 }
787
788 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
789 {
790     return m_norm_term < k.m_norm_term;
791 }
792
793 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
794 {
795     return m_norm_term == k.m_norm_term;
796 }
797
798 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
799 {
800     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
801     e->which = Z_Entry_termInfo;
802     Z_TermInfo *t;
803     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
804     t->suggestedAttributes = 0;
805     t->displayTerm = 0;
806     t->alternativeTerm = 0;
807     t->byAttributes = 0;
808     t->otherTermInfo = 0;
809     t->globalOccurrences = odr_intdup(odr, m_count);
810     t->term = (Z_Term *)
811         odr_malloc(odr, sizeof(*t->term));
812     t->term->which = Z_Term_general;
813     Odr_oct *o;
814     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
815
816     o->len = o->size = m_norm_term.size();
817     o->buf = (unsigned char *) odr_malloc(odr, o->len);
818     memcpy(o->buf, m_norm_term.c_str(), o->len);
819     return e;
820 }
821
822 void yf::Multi::Frontend::scan2(Package &package, Z_APDU *apdu_req)
823 {
824     Z_ScanRequest *req = apdu_req->u.scanRequest;
825
826     int default_num_db = req->num_databaseNames;
827     char **default_db = req->databaseNames;
828
829     std::list<BackendPtr>::const_iterator bit;
830     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
831     {
832         PackagePtr p = (*bit)->m_package;
833         mp::odr odr;
834     
835         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
836                                                 &req->num_databaseNames,
837                                                 &req->databaseNames))
838         {
839             req->num_databaseNames = default_num_db;
840             req->databaseNames = default_db;
841         }
842         p->request() = apdu_req;
843         p->copy_filter(package);
844     }
845     multi_move(m_backend_list);
846
847     ScanTermInfoList entries_before;
848     ScanTermInfoList entries_after;
849     int no_before = 0;
850     int no_after = 0;
851
852     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
853     {
854         PackagePtr p = (*bit)->m_package;
855         
856         if (p->session().is_closed()) // if any backend closes, close frontend
857             package.session().close();
858         
859         Z_GDU *gdu = p->response().get();
860         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
861             Z_APDU_scanResponse)
862         {
863             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
864
865             if (res->entries && res->entries->nonsurrogateDiagnostics)
866             {
867                 // failure
868                 mp::odr odr;
869                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
870                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
871
872                 f_res->entries->nonsurrogateDiagnostics = 
873                     res->entries->nonsurrogateDiagnostics;
874                 f_res->entries->num_nonsurrogateDiagnostics = 
875                     res->entries->num_nonsurrogateDiagnostics;
876
877                 package.response() = f_apdu;
878                 return;
879             }
880
881             if (res->entries && res->entries->entries)
882             {
883                 Z_Entry **entries = res->entries->entries;
884                 int num_entries = res->entries->num_entries;
885                 int position = 1;
886                 if (req->preferredPositionInResponse)
887                     position = *req->preferredPositionInResponse;
888                 if (res->positionOfTerm)
889                     position = *res->positionOfTerm;
890
891                 // before
892                 int i;
893                 for (i = 0; i<position-1 && i<num_entries; i++)
894                 {
895                     Z_Entry *ent = entries[i];
896
897                     if (ent->which == Z_Entry_termInfo)
898                     {
899                         ScanTermInfo my;
900
901                         int *occur = ent->u.termInfo->globalOccurrences;
902                         my.m_count = occur ? *occur : 0;
903
904                         if (ent->u.termInfo->term->which == Z_Term_general)
905                         {
906                             my.m_norm_term = std::string(
907                                 (const char *)
908                                 ent->u.termInfo->term->u.general->buf,
909                                 ent->u.termInfo->term->u.general->len);
910                         }
911                         if (my.m_norm_term.length())
912                         {
913                             ScanTermInfoList::iterator it = 
914                                 entries_before.begin();
915                             while (it != entries_before.end() && my <*it)
916                                 it++;
917                             if (my == *it)
918                             {
919                                 it->m_count += my.m_count;
920                             }
921                             else
922                             {
923                                 entries_before.insert(it, my);
924                                 no_before++;
925                             }
926                         }
927                     }
928                 }
929                 // after
930                 if (position <= 0)
931                     i = 0;
932                 else
933                     i = position-1;
934                 for ( ; i<num_entries; i++)
935                 {
936                     Z_Entry *ent = entries[i];
937
938                     if (ent->which == Z_Entry_termInfo)
939                     {
940                         ScanTermInfo my;
941
942                         int *occur = ent->u.termInfo->globalOccurrences;
943                         my.m_count = occur ? *occur : 0;
944
945                         if (ent->u.termInfo->term->which == Z_Term_general)
946                         {
947                             my.m_norm_term = std::string(
948                                 (const char *)
949                                 ent->u.termInfo->term->u.general->buf,
950                                 ent->u.termInfo->term->u.general->len);
951                         }
952                         if (my.m_norm_term.length())
953                         {
954                             ScanTermInfoList::iterator it = 
955                                 entries_after.begin();
956                             while (it != entries_after.end() && *it < my)
957                                 it++;
958                             if (my == *it)
959                             {
960                                 it->m_count += my.m_count;
961                             }
962                             else
963                             {
964                                 entries_after.insert(it, my);
965                                 no_after++;
966                             }
967                         }
968                     }
969                 }
970
971             }                
972         }
973         else
974         {
975             // if any target does not return scan response - return that 
976             package.response() = p->response();
977             return;
978         }
979     }
980
981     if (true)
982     {
983         std::cout << "BEFORE\n";
984         ScanTermInfoList::iterator it = entries_before.begin();
985         for(; it != entries_before.end(); it++)
986         {
987             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
988         }
989         
990         std::cout << "AFTER\n";
991         it = entries_after.begin();
992         for(; it != entries_after.end(); it++)
993         {
994             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
995         }
996     }
997
998     if (false)
999     {
1000         mp::odr odr;
1001         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1002         package.response() = f_apdu;
1003     }
1004     else
1005     {
1006         mp::odr odr;
1007         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1008         Z_ScanResponse *resp = f_apdu->u.scanResponse;
1009         
1010         int number_returned = *req->numberOfTermsRequested;
1011         int position_returned = *req->preferredPositionInResponse;
1012         
1013         resp->entries->num_entries = number_returned;
1014         resp->entries->entries = (Z_Entry**)
1015             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1016         int i;
1017
1018         int lbefore = entries_before.size();
1019         if (lbefore < position_returned-1)
1020             position_returned = lbefore+1;
1021
1022         ScanTermInfoList::iterator it = entries_before.begin();
1023         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1024         {
1025             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1026         }
1027
1028         it = entries_after.begin();
1029
1030         if (position_returned <= 0)
1031             i = 0;
1032         else
1033             i = position_returned-1;
1034         for (; i<number_returned && it != entries_after.end(); i++, it++)
1035         {
1036             resp->entries->entries[i] = it->get_entry(odr);
1037         }
1038
1039         number_returned = i;
1040
1041         resp->positionOfTerm = odr_intdup(odr, position_returned);
1042         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1043         resp->entries->num_entries = number_returned;
1044
1045         package.response() = f_apdu;
1046     }
1047 }
1048
1049
1050 void yf::Multi::process(Package &package) const
1051 {
1052     FrontendPtr f = m_p->get_frontend(package);
1053
1054     Z_GDU *gdu = package.request().get();
1055     
1056     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1057         Z_APDU_initRequest && !f->m_is_multi)
1058     {
1059         f->init(package, gdu);
1060     }
1061     else if (!f->m_is_multi)
1062         package.move();
1063     else if (gdu && gdu->which == Z_GDU_Z3950)
1064     {
1065         Z_APDU *apdu = gdu->u.z3950;
1066         if (apdu->which == Z_APDU_initRequest)
1067         {
1068             mp::odr odr;
1069             
1070             package.response() = odr.create_close(
1071                 apdu,
1072                 Z_Close_protocolError,
1073                 "double init");
1074             
1075             package.session().close();
1076         }
1077         else if (apdu->which == Z_APDU_searchRequest)
1078         {
1079             f->search(package, apdu);
1080         }
1081         else if (apdu->which == Z_APDU_presentRequest)
1082         {
1083             f->present(package, apdu);
1084         }
1085         else if (apdu->which == Z_APDU_scanRequest)
1086         {
1087             f->scan2(package, apdu);
1088         }
1089         else
1090         {
1091             mp::odr odr;
1092             
1093             package.response() = odr.create_close(
1094                 apdu, Z_Close_protocolError,
1095                 "unsupported APDU in filter multi");
1096             
1097             package.session().close();
1098         }
1099     }
1100     m_p->release_frontend(package);
1101 }
1102
1103 void mp::filter::Multi::configure(const xmlNode * ptr)
1104 {
1105     for (ptr = ptr->children; ptr; ptr = ptr->next)
1106     {
1107         if (ptr->type != XML_ELEMENT_NODE)
1108             continue;
1109         if (!strcmp((const char *) ptr->name, "target"))
1110         {
1111             std::string route = mp::xml::get_route(ptr);
1112             std::string target = mp::xml::get_text(ptr);
1113             std::cout << "route=" << route << " target=" << target << "\n";
1114             m_p->m_target_route[target] = route;
1115         }
1116         else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1117         {
1118             m_p->m_hide_unavailable = true;
1119         }
1120         else
1121         {
1122             throw mp::filter::FilterException
1123                 ("Bad element " 
1124                  + std::string((const char *) ptr->name)
1125                  + " in virt_db filter");
1126         }
1127     }
1128 }
1129
1130 static mp::filter::Base* filter_creator()
1131 {
1132     return new mp::filter::Multi;
1133 }
1134
1135 extern "C" {
1136     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1137         0,
1138         "multi",
1139         filter_creator
1140     };
1141 }
1142
1143
1144 /*
1145  * Local variables:
1146  * c-basic-offset: 4
1147  * indent-tabs-mode: nil
1148  * c-file-style: "stroustrup"
1149  * End:
1150  * vim: shiftwidth=4 tabstop=8 expandtab
1151  */