Fixed comment
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.19 2006-05-15 20:47:26 adam Exp $
2    Copyright (c) 2005-2006, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <vector>
25 #include <algorithm>
26 #include <map>
27 #include <iostream>
28
29 namespace mp = metaproxy_1;
30 namespace yf = mp::filter;
31
32 namespace metaproxy_1 {
33     namespace filter {
34
35         struct Multi::BackendSet {
36             BackendPtr m_backend;
37             int m_count;
38             bool operator < (const BackendSet &k) const;
39             bool operator == (const BackendSet &k) const;
40         };
41         struct Multi::ScanTermInfo {
42             std::string m_norm_term;
43             std::string m_display_term;
44             int m_count;
45             bool operator < (const ScanTermInfo &) const;
46             bool operator == (const ScanTermInfo &) const;
47             Z_Entry *get_entry(ODR odr);
48         };
49         struct Multi::FrontendSet {
50             struct PresentJob {
51                 BackendPtr m_backend;
52                 int m_pos;
53                 int m_inside_pos;
54             };
55             FrontendSet(std::string setname);
56             FrontendSet();
57             ~FrontendSet();
58
59             void round_robin(int pos, int number, std::list<PresentJob> &job);
60
61             std::list<BackendSet> m_backend_sets;
62             std::string m_setname;
63         };
64         struct Multi::Backend {
65             PackagePtr m_package;
66             std::string m_backend_database;
67             std::string m_vhost;
68             std::string m_route;
69             void operator() (void);  // thread operation
70         };
71         struct Multi::Frontend {
72             Frontend(Rep *rep);
73             ~Frontend();
74             bool m_is_multi;
75             bool m_in_use;
76             std::list<BackendPtr> m_backend_list;
77             std::map<std::string,Multi::FrontendSet> m_sets;
78
79             void multi_move(std::list<BackendPtr> &blist);
80             void init(Package &package, Z_GDU *gdu);
81             void close(Package &package);
82             void search(Package &package, Z_APDU *apdu);
83             void present(Package &package, Z_APDU *apdu);
84             void scan1(Package &package, Z_APDU *apdu);
85             void scan2(Package &package, Z_APDU *apdu);
86             Rep *m_p;
87         };            
88         struct Multi::Map {
89             Map(std::list<std::string> hosts, std::string route);
90             Map();
91             std::list<std::string> m_hosts;
92             std::string m_route;
93         };
94         class Multi::Rep {
95             friend class Multi;
96             friend struct Frontend;
97             
98             Rep();
99             FrontendPtr get_frontend(Package &package);
100             void release_frontend(Package &package);
101         private:
102             std::map<std::string,std::string> m_target_route;
103             boost::mutex m_mutex;
104             boost::condition m_cond_session_ready;
105             std::map<mp::Session, FrontendPtr> m_clients;
106             bool m_hide_unavailable;
107         };
108     }
109 }
110
111 yf::Multi::Rep::Rep()
112 {
113     m_hide_unavailable = false;
114 }
115
116 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
117 {
118     return m_count < k.m_count;
119 }
120
121 yf::Multi::Frontend::Frontend(Rep *rep)
122 {
123     m_p = rep;
124     m_is_multi = false;
125 }
126
127 yf::Multi::Frontend::~Frontend()
128 {
129 }
130
131 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package)
132 {
133     boost::mutex::scoped_lock lock(m_mutex);
134
135     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
136     
137     while(true)
138     {
139         it = m_clients.find(package.session());
140         if (it == m_clients.end())
141             break;
142         
143         if (!it->second->m_in_use)
144         {
145             it->second->m_in_use = true;
146             return it->second;
147         }
148         m_cond_session_ready.wait(lock);
149     }
150     FrontendPtr f(new Frontend(this));
151     m_clients[package.session()] = f;
152     f->m_in_use = true;
153     return f;
154 }
155
156 void yf::Multi::Rep::release_frontend(Package &package)
157 {
158     boost::mutex::scoped_lock lock(m_mutex);
159     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
160     
161     it = m_clients.find(package.session());
162     if (it != m_clients.end())
163     {
164         if (package.session().is_closed())
165         {
166             it->second->close(package);
167             m_clients.erase(it);
168         }
169         else
170         {
171             it->second->m_in_use = false;
172         }
173         m_cond_session_ready.notify_all();
174     }
175 }
176
177 yf::Multi::FrontendSet::FrontendSet(std::string setname)
178     :  m_setname(setname)
179 {
180 }
181
182
183 yf::Multi::FrontendSet::FrontendSet()
184 {
185 }
186
187
188 yf::Multi::FrontendSet::~FrontendSet()
189 {
190 }
191
192 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
193     : m_hosts(hosts), m_route(route) 
194 {
195 }
196
197 yf::Multi::Map::Map()
198 {
199 }
200
201 yf::Multi::Multi() : m_p(new Multi::Rep)
202 {
203 }
204
205 yf::Multi::~Multi() {
206 }
207
208
209 void yf::Multi::Backend::operator() (void) 
210 {
211     m_package->move(m_route);
212 }
213
214
215 void yf::Multi::Frontend::close(Package &package)
216 {
217     std::list<BackendPtr>::const_iterator bit;
218     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
219     {
220         BackendPtr b = *bit;
221
222         b->m_package->copy_filter(package);
223         b->m_package->request() = (Z_GDU *) 0;
224         b->m_package->session().close();
225         b->m_package->move(b->m_route);
226     }
227 }
228
229 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
230 {
231     std::list<BackendPtr>::const_iterator bit;
232     boost::thread_group g;
233     for (bit = blist.begin(); bit != blist.end(); bit++)
234     {
235         g.add_thread(new boost::thread(**bit));
236     }
237     g.join_all();
238 }
239
240 void yf::Multi::FrontendSet::round_robin(int start, int number,
241                                          std::list<PresentJob> &jobs)
242 {
243     std::list<int> pos;
244     std::list<int> inside_pos;
245     std::list<BackendSet>::const_iterator bsit;
246     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
247     {
248         pos.push_back(1);
249         inside_pos.push_back(0);
250     }
251
252     int p = 1;
253 #if 1
254     // optimization step!
255     int omin = 0;
256     while(true)
257     {
258         int min = 0;
259         int no_left = 0;
260         // find min count for each set which is > omin
261         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
262         {
263             if (bsit->m_count > omin)
264             {
265                 if (no_left == 0 || bsit->m_count < min)
266                     min = bsit->m_count;
267                 no_left++;
268             }
269         }
270         if (no_left == 0) // if nothing greater than omin, bail out.
271             break;
272         int skip = no_left * min;
273         if (p + skip > start)  // step gets us "into" present range?
274         {
275             // Yes. skip until start.. Rounding off is deliberate!
276             min = (start-p) / no_left;
277             p += no_left * min;
278             
279             // update positions in each set..
280             std::list<int>::iterator psit = pos.begin();
281             for (psit = pos.begin(); psit != pos.end(); psit++)
282                 *psit += min;
283             break;
284         }
285         // skip on each set.. before "present range"..
286         p = p + skip;
287         
288         std::cout << "\nSKIP min=" << min << " no_left=" << no_left << "\n\n";
289         
290         std::list<int>::iterator psit = pos.begin();
291         for (psit = pos.begin(); psit != pos.end(); psit++)
292             *psit += min;
293         
294         omin = min; // update so we consider next class (with higher count)
295     }
296 #endif
297     int fetched = 0;
298     bool more = true;
299     while (more)
300     {
301         more = false;
302         std::list<int>::iterator psit = pos.begin();
303         std::list<int>::iterator esit = inside_pos.begin();
304         bsit = m_backend_sets.begin();
305
306         for (; bsit != m_backend_sets.end(); psit++,esit++,bsit++)
307         {
308             if (fetched >= number)
309             {
310                 more = false;
311                 break;
312             }
313             if (*psit <= bsit->m_count)
314             {
315                 if (p >= start)
316                 {
317                     PresentJob job;
318                     job.m_backend = bsit->m_backend;
319                     job.m_pos = *psit;
320                     job.m_inside_pos = *esit;
321                     jobs.push_back(job);
322                     (*esit)++;
323                     fetched++;
324                 }
325                 (*psit)++;
326                 p++;
327                 more = true;
328             }
329         }
330     }
331 }
332
333 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
334 {
335     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
336
337     std::list<std::string> targets;
338
339     mp::util::get_vhost_otherinfo(&req->otherInfo, false, targets);
340
341     if (targets.size() < 1)
342     {
343         package.move();
344         return;
345     }
346
347     std::list<std::string>::const_iterator t_it = targets.begin();
348     for (; t_it != targets.end(); t_it++)
349     {
350         Session s;
351         Backend *b = new Backend;
352         b->m_vhost = *t_it;
353
354         b->m_route = m_p->m_target_route[*t_it];
355         // b->m_route unset
356         b->m_package = PackagePtr(new Package(s, package.origin()));
357
358         m_backend_list.push_back(BackendPtr(b));
359     }
360     m_is_multi = true;
361
362     // create init request 
363     std::list<BackendPtr>::iterator bit;
364     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
365     {
366         mp::odr odr;
367         BackendPtr b = *bit;
368         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
369         
370         std::list<std::string>vhost_one;
371         vhost_one.push_back(b->m_vhost);
372         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
373                                        odr, vhost_one);
374
375         Z_InitRequest *req = init_apdu->u.initRequest;
376         
377         ODR_MASK_SET(req->options, Z_Options_search);
378         ODR_MASK_SET(req->options, Z_Options_present);
379         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
380         ODR_MASK_SET(req->options, Z_Options_scan);
381         
382         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
383         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
384         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
385         
386         b->m_package->request() = init_apdu;
387
388         b->m_package->copy_filter(package);
389     }
390     multi_move(m_backend_list);
391
392     // create the frontend init response based on each backend init response
393     mp::odr odr;
394
395     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
396     Z_InitResponse *f_resp = f_apdu->u.initResponse;
397
398     ODR_MASK_SET(f_resp->options, Z_Options_search);
399     ODR_MASK_SET(f_resp->options, Z_Options_present);
400     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
401     
402     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
403     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
404     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
405
406     int no_failed = 0;
407     int no_succeeded = 0;
408     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
409     {
410         PackagePtr p = (*bit)->m_package;
411         
412         if (p->session().is_closed())
413         {
414             // failed. Remove from list and increment number of failed
415             no_failed++;
416             bit = m_backend_list.erase(bit);
417             continue;
418         }
419         no_succeeded++;
420
421         Z_GDU *gdu = p->response().get();
422         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
423             Z_APDU_initResponse)
424         {
425             int i;
426             Z_APDU *b_apdu = gdu->u.z3950;
427             Z_InitResponse *b_resp = b_apdu->u.initResponse;
428
429             // common options for all backends
430             for (i = 0; i <= Z_Options_stringSchema; i++)
431             {
432                 if (!ODR_MASK_GET(b_resp->options, i))
433                     ODR_MASK_CLEAR(f_resp->options, i);
434             }
435             // common protocol version
436             for (i = 0; i <= Z_ProtocolVersion_3; i++)
437                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
438                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
439             // reject if any of the backends reject
440             if (!*b_resp->result)
441                 *f_resp->result = 0;
442         }
443         else
444         {
445             // if any target does not return init return that (close or
446             // similar )
447             package.response() = p->response();
448             return;
449         }
450         bit++;
451     }
452     if (m_p->m_hide_unavailable)
453     {
454         if (no_succeeded == 0)
455             package.session().close();
456     }
457     else
458     {
459         if (no_failed)
460             package.session().close();
461     }
462     package.response() = f_apdu;
463 }
464
465 void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
466 {
467     // create search request 
468     Z_SearchRequest *req = apdu_req->u.searchRequest;
469
470     // save these for later
471     int smallSetUpperBound = *req->smallSetUpperBound;
472     int largeSetLowerBound = *req->largeSetLowerBound;
473     int mediumSetPresentNumber = *req->mediumSetPresentNumber;
474     
475     // they are altered now - to disable piggyback
476     *req->smallSetUpperBound = 0;
477     *req->largeSetLowerBound = 1;
478     *req->mediumSetPresentNumber = 1;
479
480     int default_num_db = req->num_databaseNames;
481     char **default_db = req->databaseNames;
482
483     std::list<BackendPtr>::const_iterator bit;
484     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
485     {
486         PackagePtr p = (*bit)->m_package;
487         mp::odr odr;
488     
489         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
490                                                 &req->num_databaseNames,
491                                                 &req->databaseNames))
492         {
493             req->num_databaseNames = default_num_db;
494             req->databaseNames = default_db;
495         }
496         p->request() = apdu_req;
497         p->copy_filter(package);
498     }
499     multi_move(m_backend_list);
500
501     // look at each response
502     FrontendSet resultSet(std::string(req->resultSetName));
503
504     int result_set_size = 0;
505     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
506     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
507     {
508         PackagePtr p = (*bit)->m_package;
509         
510         if (p->session().is_closed()) // if any backend closes, close frontend
511             package.session().close();
512         
513         Z_GDU *gdu = p->response().get();
514         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
515             Z_APDU_searchResponse)
516         {
517             Z_APDU *b_apdu = gdu->u.z3950;
518             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
519          
520             // see we get any errors (AKA diagnstics)
521             if (b_resp->records)
522             {
523                 if (b_resp->records->which == Z_Records_NSD
524                     || b_resp->records->which == Z_Records_multipleNSD)
525                     z_records_diag = b_resp->records;
526                 // we may set this multiple times (TOO BAD!)
527             }
528             BackendSet backendSet;
529             backendSet.m_backend = *bit;
530             backendSet.m_count = *b_resp->resultCount;
531             result_set_size += *b_resp->resultCount;
532             resultSet.m_backend_sets.push_back(backendSet);
533         }
534         else
535         {
536             // if any target does not return search response - return that 
537             package.response() = p->response();
538             return;
539         }
540     }
541
542     mp::odr odr;
543     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
544     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
545
546     *f_resp->resultCount = result_set_size;
547     if (z_records_diag)
548     {
549         // search error
550         f_resp->records = z_records_diag;
551         package.response() = f_apdu;
552         return;
553     }
554     // assume OK
555     m_sets[resultSet.m_setname] = resultSet;
556
557     int number;
558     mp::util::piggyback(smallSetUpperBound,
559                          largeSetLowerBound,
560                          mediumSetPresentNumber,
561                          result_set_size,
562                          number);
563     Package pp(package.session(), package.origin());
564     if (number > 0)
565     {
566         pp.copy_filter(package);
567         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
568         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
569         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
570         p_req->resultSetId = req->resultSetName;
571         *p_req->resultSetStartPoint = 1;
572         *p_req->numberOfRecordsRequested = number;
573         pp.request() = p_apdu;
574         present(pp, p_apdu);
575         
576         if (pp.session().is_closed())
577             package.session().close();
578         
579         Z_GDU *gdu = pp.response().get();
580         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
581             Z_APDU_presentResponse)
582         {
583             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
584             f_resp->records = p_res->records;
585             *f_resp->numberOfRecordsReturned = 
586                 *p_res->numberOfRecordsReturned;
587             *f_resp->nextResultSetPosition = 
588                 *p_res->nextResultSetPosition;
589         }
590         else 
591         {
592             package.response() = pp.response(); 
593             return;
594         }
595     }
596     package.response() = f_apdu; // in this scope because of p
597 }
598
599 void yf::Multi::Frontend::present(Package &package, Z_APDU *apdu_req)
600 {
601     // create present request 
602     Z_PresentRequest *req = apdu_req->u.presentRequest;
603
604     Sets_it it;
605     it = m_sets.find(std::string(req->resultSetId));
606     if (it == m_sets.end())
607     {
608         mp::odr odr;
609         Z_APDU *apdu = 
610             odr.create_presentResponse(
611                 apdu_req,
612                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
613                 req->resultSetId);
614         package.response() = apdu;
615         return;
616     }
617     std::list<Multi::FrontendSet::PresentJob> jobs;
618     int start = *req->resultSetStartPoint;
619     int number = *req->numberOfRecordsRequested;
620     it->second.round_robin(start, number, jobs);
621
622     std::list<BackendPtr> present_backend_list;
623
624     std::list<BackendSet>::const_iterator bsit;
625     bsit = it->second.m_backend_sets.begin();
626     for (; bsit != it->second.m_backend_sets.end(); bsit++)
627     {
628         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
629         int start = -1;
630         int end = -1;
631         
632         for (jit = jobs.begin(); jit != jobs.end(); jit++)
633         {
634             if (jit->m_backend == bsit->m_backend)
635             {
636                 if (start == -1 || jit->m_pos < start)
637                     start = jit->m_pos;
638                 if (end == -1 || jit->m_pos > end)
639                     end = jit->m_pos;
640             }
641         }
642         if (start != -1)
643         {
644             PackagePtr p = bsit->m_backend->m_package;
645
646             *req->resultSetStartPoint = start;
647             *req->numberOfRecordsRequested = end - start + 1;
648             
649             p->request() = apdu_req;
650             p->copy_filter(package);
651
652             present_backend_list.push_back(bsit->m_backend);
653         }
654     }
655     multi_move(present_backend_list);
656
657     // look at each response
658     Z_Records *z_records_diag = 0;
659
660     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
661     for (; pbit != present_backend_list.end(); pbit++)
662     {
663         PackagePtr p = (*pbit)->m_package;
664         
665         if (p->session().is_closed()) // if any backend closes, close frontend
666             package.session().close();
667         
668         Z_GDU *gdu = p->response().get();
669         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
670             Z_APDU_presentResponse)
671         {
672             Z_APDU *b_apdu = gdu->u.z3950;
673             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
674          
675             // see we get any errors (AKA diagnstics)
676             if (b_resp->records)
677             {
678                 if (b_resp->records->which != Z_Records_DBOSD)
679                     z_records_diag = b_resp->records;
680                 // we may set this multiple times (TOO BAD!)
681             }
682         }
683         else
684         {
685             // if any target does not return present response - return that 
686             package.response() = p->response();
687             return;
688         }
689     }
690
691     mp::odr odr;
692     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
693     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
694
695     if (z_records_diag)
696     {
697         f_resp->records = z_records_diag;
698         *f_resp->presentStatus = Z_PresentStatus_failure;
699     }
700     else
701     {
702         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
703         Z_Records * records = f_resp->records;
704         records->which = Z_Records_DBOSD;
705         records->u.databaseOrSurDiagnostics =
706             (Z_NamePlusRecordList *)
707             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
708         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
709         nprl->num_records = jobs.size();
710         nprl->records = (Z_NamePlusRecord**)
711             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
712         int i = 0;
713         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
714         for (jit = jobs.begin(); jit != jobs.end(); jit++)
715         {
716             PackagePtr p = jit->m_backend->m_package;
717             
718             Z_GDU *gdu = p->response().get();
719             Z_APDU *b_apdu = gdu->u.z3950;
720             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
721
722             nprl->records[i++] =
723                 b_resp->records->u.databaseOrSurDiagnostics->
724                 records[jit->m_inside_pos];
725         }
726         *f_resp->nextResultSetPosition = start + i;
727         *f_resp->numberOfRecordsReturned = i;
728     }
729     package.response() = f_apdu;
730 }
731
732 void yf::Multi::Frontend::scan1(Package &package, Z_APDU *apdu_req)
733 {
734     if (m_backend_list.size() > 1)
735     {
736         mp::odr odr;
737         Z_APDU *f_apdu = 
738             odr.create_scanResponse(
739                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
740         package.response() = f_apdu;
741         return;
742     }
743     Z_ScanRequest *req = apdu_req->u.scanRequest;
744
745     int default_num_db = req->num_databaseNames;
746     char **default_db = req->databaseNames;
747
748     std::list<BackendPtr>::const_iterator bit;
749     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
750     {
751         PackagePtr p = (*bit)->m_package;
752         mp::odr odr;
753     
754         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
755                                                 &req->num_databaseNames,
756                                                 &req->databaseNames))
757         {
758             req->num_databaseNames = default_num_db;
759             req->databaseNames = default_db;
760         }
761         p->request() = apdu_req;
762         p->copy_filter(package);
763     }
764     multi_move(m_backend_list);
765
766     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
767     {
768         PackagePtr p = (*bit)->m_package;
769         
770         if (p->session().is_closed()) // if any backend closes, close frontend
771             package.session().close();
772         
773         Z_GDU *gdu = p->response().get();
774         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
775             Z_APDU_scanResponse)
776         {
777             package.response() = p->response();
778             break;
779         }
780         else
781         {
782             // if any target does not return scan response - return that 
783             package.response() = p->response();
784             return;
785         }
786     }
787 }
788
789 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
790 {
791     return m_norm_term < k.m_norm_term;
792 }
793
794 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
795 {
796     return m_norm_term == k.m_norm_term;
797 }
798
799 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
800 {
801     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
802     e->which = Z_Entry_termInfo;
803     Z_TermInfo *t;
804     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
805     t->suggestedAttributes = 0;
806     t->displayTerm = 0;
807     t->alternativeTerm = 0;
808     t->byAttributes = 0;
809     t->otherTermInfo = 0;
810     t->globalOccurrences = odr_intdup(odr, m_count);
811     t->term = (Z_Term *)
812         odr_malloc(odr, sizeof(*t->term));
813     t->term->which = Z_Term_general;
814     Odr_oct *o;
815     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
816
817     o->len = o->size = m_norm_term.size();
818     o->buf = (unsigned char *) odr_malloc(odr, o->len);
819     memcpy(o->buf, m_norm_term.c_str(), o->len);
820     return e;
821 }
822
823 void yf::Multi::Frontend::scan2(Package &package, Z_APDU *apdu_req)
824 {
825     Z_ScanRequest *req = apdu_req->u.scanRequest;
826
827     int default_num_db = req->num_databaseNames;
828     char **default_db = req->databaseNames;
829
830     std::list<BackendPtr>::const_iterator bit;
831     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
832     {
833         PackagePtr p = (*bit)->m_package;
834         mp::odr odr;
835     
836         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
837                                                 &req->num_databaseNames,
838                                                 &req->databaseNames))
839         {
840             req->num_databaseNames = default_num_db;
841             req->databaseNames = default_db;
842         }
843         p->request() = apdu_req;
844         p->copy_filter(package);
845     }
846     multi_move(m_backend_list);
847
848     ScanTermInfoList entries_before;
849     ScanTermInfoList entries_after;
850     int no_before = 0;
851     int no_after = 0;
852
853     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
854     {
855         PackagePtr p = (*bit)->m_package;
856         
857         if (p->session().is_closed()) // if any backend closes, close frontend
858             package.session().close();
859         
860         Z_GDU *gdu = p->response().get();
861         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
862             Z_APDU_scanResponse)
863         {
864             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
865
866             if (res->entries && res->entries->nonsurrogateDiagnostics)
867             {
868                 // failure
869                 mp::odr odr;
870                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
871                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
872
873                 f_res->entries->nonsurrogateDiagnostics = 
874                     res->entries->nonsurrogateDiagnostics;
875                 f_res->entries->num_nonsurrogateDiagnostics = 
876                     res->entries->num_nonsurrogateDiagnostics;
877
878                 package.response() = f_apdu;
879                 return;
880             }
881
882             if (res->entries && res->entries->entries)
883             {
884                 Z_Entry **entries = res->entries->entries;
885                 int num_entries = res->entries->num_entries;
886                 int position = 1;
887                 if (req->preferredPositionInResponse)
888                     position = *req->preferredPositionInResponse;
889                 if (res->positionOfTerm)
890                     position = *res->positionOfTerm;
891
892                 // before
893                 int i;
894                 for (i = 0; i<position-1 && i<num_entries; i++)
895                 {
896                     Z_Entry *ent = entries[i];
897
898                     if (ent->which == Z_Entry_termInfo)
899                     {
900                         ScanTermInfo my;
901
902                         int *occur = ent->u.termInfo->globalOccurrences;
903                         my.m_count = occur ? *occur : 0;
904
905                         if (ent->u.termInfo->term->which == Z_Term_general)
906                         {
907                             my.m_norm_term = std::string(
908                                 (const char *)
909                                 ent->u.termInfo->term->u.general->buf,
910                                 ent->u.termInfo->term->u.general->len);
911                         }
912                         if (my.m_norm_term.length())
913                         {
914                             ScanTermInfoList::iterator it = 
915                                 entries_before.begin();
916                             while (it != entries_before.end() && my <*it)
917                                 it++;
918                             if (my == *it)
919                             {
920                                 it->m_count += my.m_count;
921                             }
922                             else
923                             {
924                                 entries_before.insert(it, my);
925                                 no_before++;
926                             }
927                         }
928                     }
929                 }
930                 // after
931                 if (position <= 0)
932                     i = 0;
933                 else
934                     i = position-1;
935                 for ( ; i<num_entries; i++)
936                 {
937                     Z_Entry *ent = entries[i];
938
939                     if (ent->which == Z_Entry_termInfo)
940                     {
941                         ScanTermInfo my;
942
943                         int *occur = ent->u.termInfo->globalOccurrences;
944                         my.m_count = occur ? *occur : 0;
945
946                         if (ent->u.termInfo->term->which == Z_Term_general)
947                         {
948                             my.m_norm_term = std::string(
949                                 (const char *)
950                                 ent->u.termInfo->term->u.general->buf,
951                                 ent->u.termInfo->term->u.general->len);
952                         }
953                         if (my.m_norm_term.length())
954                         {
955                             ScanTermInfoList::iterator it = 
956                                 entries_after.begin();
957                             while (it != entries_after.end() && *it < my)
958                                 it++;
959                             if (my == *it)
960                             {
961                                 it->m_count += my.m_count;
962                             }
963                             else
964                             {
965                                 entries_after.insert(it, my);
966                                 no_after++;
967                             }
968                         }
969                     }
970                 }
971
972             }                
973         }
974         else
975         {
976             // if any target does not return scan response - return that 
977             package.response() = p->response();
978             return;
979         }
980     }
981
982     if (true)
983     {
984         std::cout << "BEFORE\n";
985         ScanTermInfoList::iterator it = entries_before.begin();
986         for(; it != entries_before.end(); it++)
987         {
988             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
989         }
990         
991         std::cout << "AFTER\n";
992         it = entries_after.begin();
993         for(; it != entries_after.end(); it++)
994         {
995             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
996         }
997     }
998
999     if (false)
1000     {
1001         mp::odr odr;
1002         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1003         package.response() = f_apdu;
1004     }
1005     else
1006     {
1007         mp::odr odr;
1008         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1009         Z_ScanResponse *resp = f_apdu->u.scanResponse;
1010         
1011         int number_returned = *req->numberOfTermsRequested;
1012         int position_returned = *req->preferredPositionInResponse;
1013         
1014         resp->entries->num_entries = number_returned;
1015         resp->entries->entries = (Z_Entry**)
1016             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1017         int i;
1018
1019         int lbefore = entries_before.size();
1020         if (lbefore < position_returned-1)
1021             position_returned = lbefore+1;
1022
1023         ScanTermInfoList::iterator it = entries_before.begin();
1024         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1025         {
1026             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1027         }
1028
1029         it = entries_after.begin();
1030
1031         if (position_returned <= 0)
1032             i = 0;
1033         else
1034             i = position_returned-1;
1035         for (; i<number_returned && it != entries_after.end(); i++, it++)
1036         {
1037             resp->entries->entries[i] = it->get_entry(odr);
1038         }
1039
1040         number_returned = i;
1041
1042         resp->positionOfTerm = odr_intdup(odr, position_returned);
1043         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1044         resp->entries->num_entries = number_returned;
1045
1046         package.response() = f_apdu;
1047     }
1048 }
1049
1050
1051 void yf::Multi::process(Package &package) const
1052 {
1053     FrontendPtr f = m_p->get_frontend(package);
1054
1055     Z_GDU *gdu = package.request().get();
1056     
1057     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1058         Z_APDU_initRequest && !f->m_is_multi)
1059     {
1060         f->init(package, gdu);
1061     }
1062     else if (!f->m_is_multi)
1063         package.move();
1064     else if (gdu && gdu->which == Z_GDU_Z3950)
1065     {
1066         Z_APDU *apdu = gdu->u.z3950;
1067         if (apdu->which == Z_APDU_initRequest)
1068         {
1069             mp::odr odr;
1070             
1071             package.response() = odr.create_close(
1072                 apdu,
1073                 Z_Close_protocolError,
1074                 "double init");
1075             
1076             package.session().close();
1077         }
1078         else if (apdu->which == Z_APDU_searchRequest)
1079         {
1080             f->search(package, apdu);
1081         }
1082         else if (apdu->which == Z_APDU_presentRequest)
1083         {
1084             f->present(package, apdu);
1085         }
1086         else if (apdu->which == Z_APDU_scanRequest)
1087         {
1088             f->scan2(package, apdu);
1089         }
1090         else
1091         {
1092             mp::odr odr;
1093             
1094             package.response() = odr.create_close(
1095                 apdu, Z_Close_protocolError,
1096                 "unsupported APDU in filter multi");
1097             
1098             package.session().close();
1099         }
1100     }
1101     m_p->release_frontend(package);
1102 }
1103
1104 void mp::filter::Multi::configure(const xmlNode * ptr)
1105 {
1106     for (ptr = ptr->children; ptr; ptr = ptr->next)
1107     {
1108         if (ptr->type != XML_ELEMENT_NODE)
1109             continue;
1110         if (!strcmp((const char *) ptr->name, "target"))
1111         {
1112             std::string route = mp::xml::get_route(ptr);
1113             std::string target = mp::xml::get_text(ptr);
1114             std::cout << "route=" << route << " target=" << target << "\n";
1115             m_p->m_target_route[target] = route;
1116         }
1117         else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1118         {
1119             m_p->m_hide_unavailable = true;
1120         }
1121         else
1122         {
1123             throw mp::filter::FilterException
1124                 ("Bad element " 
1125                  + std::string((const char *) ptr->name)
1126                  + " in virt_db filter");
1127         }
1128     }
1129 }
1130
1131 static mp::filter::Base* filter_creator()
1132 {
1133     return new mp::filter::Multi;
1134 }
1135
1136 extern "C" {
1137     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1138         0,
1139         "multi",
1140         filter_creator
1141     };
1142 }
1143
1144
1145 /*
1146  * Local variables:
1147  * c-basic-offset: 4
1148  * indent-tabs-mode: nil
1149  * c-file-style: "stroustrup"
1150  * End:
1151  * vim: shiftwidth=4 tabstop=8 expandtab
1152  */