Change the way NamePlusRecord struct is modified for database fixup.
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.23 2006-08-09 12:27:18 adam Exp $
2    Copyright (c) 2005-2006, Index Data.
3
4    See the LICENSE file for details
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <vector>
25 #include <algorithm>
26 #include <map>
27 #include <iostream>
28
29 namespace mp = metaproxy_1;
30 namespace yf = mp::filter;
31
32 namespace metaproxy_1 {
33     namespace filter {
34
35         struct Multi::BackendSet {
36             BackendPtr m_backend;
37             int m_count;
38             bool operator < (const BackendSet &k) const;
39             bool operator == (const BackendSet &k) const;
40         };
41         struct Multi::ScanTermInfo {
42             std::string m_norm_term;
43             std::string m_display_term;
44             int m_count;
45             bool operator < (const ScanTermInfo &) const;
46             bool operator == (const ScanTermInfo &) const;
47             Z_Entry *get_entry(ODR odr);
48         };
49         struct Multi::FrontendSet {
50             struct PresentJob {
51                 BackendPtr m_backend;
52                 int m_pos;
53                 int m_inside_pos;
54             };
55             FrontendSet(std::string setname);
56             FrontendSet();
57             ~FrontendSet();
58
59             void round_robin(int pos, int number, std::list<PresentJob> &job);
60
61             std::list<BackendSet> m_backend_sets;
62             std::string m_setname;
63         };
64         struct Multi::Backend {
65             PackagePtr m_package;
66             std::string m_backend_database;
67             std::string m_vhost;
68             std::string m_route;
69             void operator() (void);  // thread operation
70         };
71         struct Multi::Frontend {
72             Frontend(Rep *rep);
73             ~Frontend();
74             bool m_is_multi;
75             bool m_in_use;
76             std::list<BackendPtr> m_backend_list;
77             std::map<std::string,Multi::FrontendSet> m_sets;
78
79             void multi_move(std::list<BackendPtr> &blist);
80             void init(Package &package, Z_GDU *gdu);
81             void close(Package &package);
82             void search(Package &package, Z_APDU *apdu);
83             void present(Package &package, Z_APDU *apdu);
84             void scan1(Package &package, Z_APDU *apdu);
85             void scan2(Package &package, Z_APDU *apdu);
86             Rep *m_p;
87         };            
88         struct Multi::Map {
89             Map(std::list<std::string> hosts, std::string route);
90             Map();
91             std::list<std::string> m_hosts;
92             std::string m_route;
93         };
94         class Multi::Rep {
95             friend class Multi;
96             friend struct Frontend;
97             
98             Rep();
99             FrontendPtr get_frontend(Package &package);
100             void release_frontend(Package &package);
101         private:
102             std::map<std::string,std::string> m_target_route;
103             boost::mutex m_mutex;
104             boost::condition m_cond_session_ready;
105             std::map<mp::Session, FrontendPtr> m_clients;
106             bool m_hide_unavailable;
107         };
108     }
109 }
110
111 yf::Multi::Rep::Rep()
112 {
113     m_hide_unavailable = false;
114 }
115
116 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
117 {
118     return m_count < k.m_count;
119 }
120
121 yf::Multi::Frontend::Frontend(Rep *rep)
122 {
123     m_p = rep;
124     m_is_multi = false;
125 }
126
127 yf::Multi::Frontend::~Frontend()
128 {
129 }
130
131 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
132 {
133     boost::mutex::scoped_lock lock(m_mutex);
134
135     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
136     
137     while(true)
138     {
139         it = m_clients.find(package.session());
140         if (it == m_clients.end())
141             break;
142         
143         if (!it->second->m_in_use)
144         {
145             it->second->m_in_use = true;
146             return it->second;
147         }
148         m_cond_session_ready.wait(lock);
149     }
150     FrontendPtr f(new Frontend(this));
151     m_clients[package.session()] = f;
152     f->m_in_use = true;
153     return f;
154 }
155
156 void yf::Multi::Rep::release_frontend(mp::Package &package)
157 {
158     boost::mutex::scoped_lock lock(m_mutex);
159     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
160     
161     it = m_clients.find(package.session());
162     if (it != m_clients.end())
163     {
164         if (package.session().is_closed())
165         {
166             it->second->close(package);
167             m_clients.erase(it);
168         }
169         else
170         {
171             it->second->m_in_use = false;
172         }
173         m_cond_session_ready.notify_all();
174     }
175 }
176
177 yf::Multi::FrontendSet::FrontendSet(std::string setname)
178     :  m_setname(setname)
179 {
180 }
181
182
183 yf::Multi::FrontendSet::FrontendSet()
184 {
185 }
186
187
188 yf::Multi::FrontendSet::~FrontendSet()
189 {
190 }
191
192 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
193     : m_hosts(hosts), m_route(route) 
194 {
195 }
196
197 yf::Multi::Map::Map()
198 {
199 }
200
201 yf::Multi::Multi() : m_p(new Multi::Rep)
202 {
203 }
204
205 yf::Multi::~Multi() {
206 }
207
208
209 void yf::Multi::Backend::operator() (void) 
210 {
211     m_package->move(m_route);
212 }
213
214
215 void yf::Multi::Frontend::close(mp::Package &package)
216 {
217     std::list<BackendPtr>::const_iterator bit;
218     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
219     {
220         BackendPtr b = *bit;
221
222         b->m_package->copy_filter(package);
223         b->m_package->request() = (Z_GDU *) 0;
224         b->m_package->session().close();
225         b->m_package->move(b->m_route);
226     }
227 }
228
229 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
230 {
231     std::list<BackendPtr>::const_iterator bit;
232     boost::thread_group g;
233     for (bit = blist.begin(); bit != blist.end(); bit++)
234     {
235         g.add_thread(new boost::thread(**bit));
236     }
237     g.join_all();
238 }
239
240 void yf::Multi::FrontendSet::round_robin(int start, int number,
241                                          std::list<PresentJob> &jobs)
242 {
243     std::list<int> pos;
244     std::list<int> inside_pos;
245     std::list<BackendSet>::const_iterator bsit;
246     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
247     {
248         pos.push_back(1);
249         inside_pos.push_back(0);
250     }
251
252     int p = 1;
253 #if 1
254     // optimization step!
255     int omin = 0;
256     while(true)
257     {
258         int min = 0;
259         int no_left = 0;
260         // find min count for each set which is > omin
261         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
262         {
263             if (bsit->m_count > omin)
264             {
265                 if (no_left == 0 || bsit->m_count < min)
266                     min = bsit->m_count;
267                 no_left++;
268             }
269         }
270         if (no_left == 0) // if nothing greater than omin, bail out.
271             break;
272         int skip = no_left * min;
273         if (p + skip > start)  // step gets us "into" present range?
274         {
275             // Yes. skip until start.. Rounding off is deliberate!
276             min = (start-p) / no_left;
277             p += no_left * min;
278             
279             // update positions in each set..
280             std::list<int>::iterator psit = pos.begin();
281             for (psit = pos.begin(); psit != pos.end(); psit++)
282                 *psit += min;
283             break;
284         }
285         // skip on each set.. before "present range"..
286         p = p + skip;
287         
288         std::cout << "\nSKIP min=" << min << " no_left=" << no_left << "\n\n";
289         
290         std::list<int>::iterator psit = pos.begin();
291         for (psit = pos.begin(); psit != pos.end(); psit++)
292             *psit += min;
293         
294         omin = min; // update so we consider next class (with higher count)
295     }
296 #endif
297     int fetched = 0;
298     bool more = true;
299     while (more)
300     {
301         more = false;
302         std::list<int>::iterator psit = pos.begin();
303         std::list<int>::iterator esit = inside_pos.begin();
304         bsit = m_backend_sets.begin();
305
306         for (; bsit != m_backend_sets.end(); psit++,esit++,bsit++)
307         {
308             if (fetched >= number)
309             {
310                 more = false;
311                 break;
312             }
313             if (*psit <= bsit->m_count)
314             {
315                 if (p >= start)
316                 {
317                     PresentJob job;
318                     job.m_backend = bsit->m_backend;
319                     job.m_pos = *psit;
320                     job.m_inside_pos = *esit;
321                     jobs.push_back(job);
322                     (*esit)++;
323                     fetched++;
324                 }
325                 (*psit)++;
326                 p++;
327                 more = true;
328             }
329         }
330     }
331 }
332
333 void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
334 {
335     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
336
337     std::list<std::string> targets;
338
339     mp::util::get_vhost_otherinfo(&req->otherInfo, false, targets);
340
341     if (targets.size() < 1)
342     {
343         package.move();
344         return;
345     }
346
347     std::list<std::string>::const_iterator t_it = targets.begin();
348     for (; t_it != targets.end(); t_it++)
349     {
350         Session s;
351         Backend *b = new Backend;
352         b->m_vhost = *t_it;
353
354         b->m_route = m_p->m_target_route[*t_it];
355         // b->m_route unset
356         b->m_package = PackagePtr(new Package(s, package.origin()));
357
358         m_backend_list.push_back(BackendPtr(b));
359     }
360     m_is_multi = true;
361
362     // create init request 
363     std::list<BackendPtr>::iterator bit;
364     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
365     {
366         mp::odr odr;
367         BackendPtr b = *bit;
368         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
369         
370         std::list<std::string>vhost_one;
371         vhost_one.push_back(b->m_vhost);
372         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
373                                        odr, vhost_one);
374
375         Z_InitRequest *req = init_apdu->u.initRequest;
376         
377         ODR_MASK_SET(req->options, Z_Options_search);
378         ODR_MASK_SET(req->options, Z_Options_present);
379         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
380         ODR_MASK_SET(req->options, Z_Options_scan);
381         
382         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
383         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
384         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
385         
386         b->m_package->request() = init_apdu;
387
388         b->m_package->copy_filter(package);
389     }
390     multi_move(m_backend_list);
391
392     // create the frontend init response based on each backend init response
393     mp::odr odr;
394
395     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
396     Z_InitResponse *f_resp = f_apdu->u.initResponse;
397
398     ODR_MASK_SET(f_resp->options, Z_Options_search);
399     ODR_MASK_SET(f_resp->options, Z_Options_present);
400     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
401     
402     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
403     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
404     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
405
406     int no_failed = 0;
407     int no_succeeded = 0;
408     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
409     {
410         PackagePtr p = (*bit)->m_package;
411         
412         if (p->session().is_closed())
413         {
414             // failed. Remove from list and increment number of failed
415             no_failed++;
416             bit = m_backend_list.erase(bit);
417             continue;
418         }
419         no_succeeded++;
420
421         Z_GDU *gdu = p->response().get();
422         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
423             Z_APDU_initResponse)
424         {
425             int i;
426             Z_APDU *b_apdu = gdu->u.z3950;
427             Z_InitResponse *b_resp = b_apdu->u.initResponse;
428
429             // common options for all backends
430             for (i = 0; i <= Z_Options_stringSchema; i++)
431             {
432                 if (!ODR_MASK_GET(b_resp->options, i))
433                     ODR_MASK_CLEAR(f_resp->options, i);
434             }
435             // common protocol version
436             for (i = 0; i <= Z_ProtocolVersion_3; i++)
437                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
438                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
439             // reject if any of the backends reject
440             if (!*b_resp->result)
441                 *f_resp->result = 0;
442         }
443         else
444         {
445             // if any target does not return init return that (close or
446             // similar )
447             package.response() = p->response();
448             return;
449         }
450         bit++;
451     }
452     if (m_p->m_hide_unavailable)
453     {
454         if (no_succeeded == 0)
455             package.session().close();
456     }
457     else
458     {
459         if (no_failed)
460             package.session().close();
461     }
462     package.response() = f_apdu;
463 }
464
465 void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
466 {
467     // create search request 
468     Z_SearchRequest *req = apdu_req->u.searchRequest;
469
470     // save these for later
471     int smallSetUpperBound = *req->smallSetUpperBound;
472     int largeSetLowerBound = *req->largeSetLowerBound;
473     int mediumSetPresentNumber = *req->mediumSetPresentNumber;
474     
475     // they are altered now - to disable piggyback
476     *req->smallSetUpperBound = 0;
477     *req->largeSetLowerBound = 1;
478     *req->mediumSetPresentNumber = 1;
479
480     int default_num_db = req->num_databaseNames;
481     char **default_db = req->databaseNames;
482
483     std::list<BackendPtr>::const_iterator bit;
484     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
485     {
486         PackagePtr p = (*bit)->m_package;
487         mp::odr odr;
488     
489         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
490                                                 &req->num_databaseNames,
491                                                 &req->databaseNames))
492         {
493             req->num_databaseNames = default_num_db;
494             req->databaseNames = default_db;
495         }
496         p->request() = apdu_req;
497         p->copy_filter(package);
498     }
499     multi_move(m_backend_list);
500
501     // look at each response
502     FrontendSet resultSet(std::string(req->resultSetName));
503
504     int result_set_size = 0;
505     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
506     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
507     {
508         PackagePtr p = (*bit)->m_package;
509         
510         if (p->session().is_closed()) // if any backend closes, close frontend
511             package.session().close();
512         
513         Z_GDU *gdu = p->response().get();
514         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
515             Z_APDU_searchResponse)
516         {
517             Z_APDU *b_apdu = gdu->u.z3950;
518             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
519          
520             // see we get any errors (AKA diagnstics)
521             if (b_resp->records)
522             {
523                 if (b_resp->records->which == Z_Records_NSD
524                     || b_resp->records->which == Z_Records_multipleNSD)
525                     z_records_diag = b_resp->records;
526                 // we may set this multiple times (TOO BAD!)
527             }
528             BackendSet backendSet;
529             backendSet.m_backend = *bit;
530             backendSet.m_count = *b_resp->resultCount;
531             result_set_size += *b_resp->resultCount;
532             resultSet.m_backend_sets.push_back(backendSet);
533         }
534         else
535         {
536             // if any target does not return search response - return that 
537             package.response() = p->response();
538             return;
539         }
540     }
541
542     mp::odr odr;
543     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
544     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
545
546     *f_resp->resultCount = result_set_size;
547     if (z_records_diag)
548     {
549         // search error
550         f_resp->records = z_records_diag;
551         package.response() = f_apdu;
552         return;
553     }
554     // assume OK
555     m_sets[resultSet.m_setname] = resultSet;
556
557     int number;
558     mp::util::piggyback(smallSetUpperBound,
559                          largeSetLowerBound,
560                          mediumSetPresentNumber,
561                          result_set_size,
562                          number);
563     Package pp(package.session(), package.origin());
564     if (number > 0)
565     {
566         pp.copy_filter(package);
567         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
568         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
569         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
570         p_req->resultSetId = req->resultSetName;
571         *p_req->resultSetStartPoint = 1;
572         *p_req->numberOfRecordsRequested = number;
573         pp.request() = p_apdu;
574         present(pp, p_apdu);
575         
576         if (pp.session().is_closed())
577             package.session().close();
578         
579         Z_GDU *gdu = pp.response().get();
580         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
581             Z_APDU_presentResponse)
582         {
583             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
584             f_resp->records = p_res->records;
585             *f_resp->numberOfRecordsReturned = 
586                 *p_res->numberOfRecordsReturned;
587             *f_resp->nextResultSetPosition = 
588                 *p_res->nextResultSetPosition;
589         }
590         else 
591         {
592             package.response() = pp.response(); 
593             return;
594         }
595     }
596     package.response() = f_apdu; // in this scope because of p
597 }
598
599 void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
600 {
601     // create present request 
602     Z_PresentRequest *req = apdu_req->u.presentRequest;
603
604     Sets_it it;
605     it = m_sets.find(std::string(req->resultSetId));
606     if (it == m_sets.end())
607     {
608         mp::odr odr;
609         Z_APDU *apdu = 
610             odr.create_presentResponse(
611                 apdu_req,
612                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
613                 req->resultSetId);
614         package.response() = apdu;
615         return;
616     }
617     std::list<Multi::FrontendSet::PresentJob> jobs;
618     int start = *req->resultSetStartPoint;
619     int number = *req->numberOfRecordsRequested;
620     it->second.round_robin(start, number, jobs);
621
622     std::list<BackendPtr> present_backend_list;
623
624     std::list<BackendSet>::const_iterator bsit;
625     bsit = it->second.m_backend_sets.begin();
626     for (; bsit != it->second.m_backend_sets.end(); bsit++)
627     {
628         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
629         int start = -1;
630         int end = -1;
631         
632         for (jit = jobs.begin(); jit != jobs.end(); jit++)
633         {
634             if (jit->m_backend == bsit->m_backend)
635             {
636                 if (start == -1 || jit->m_pos < start)
637                     start = jit->m_pos;
638                 if (end == -1 || jit->m_pos > end)
639                     end = jit->m_pos;
640             }
641         }
642         if (start != -1)
643         {
644             PackagePtr p = bsit->m_backend->m_package;
645
646             *req->resultSetStartPoint = start;
647             *req->numberOfRecordsRequested = end - start + 1;
648             
649             p->request() = apdu_req;
650             p->copy_filter(package);
651
652             present_backend_list.push_back(bsit->m_backend);
653         }
654     }
655     multi_move(present_backend_list);
656
657     // look at each response
658     Z_Records *z_records_diag = 0;
659
660     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
661     for (; pbit != present_backend_list.end(); pbit++)
662     {
663         PackagePtr p = (*pbit)->m_package;
664         
665         if (p->session().is_closed()) // if any backend closes, close frontend
666             package.session().close();
667         
668         Z_GDU *gdu = p->response().get();
669         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
670             Z_APDU_presentResponse)
671         {
672             Z_APDU *b_apdu = gdu->u.z3950;
673             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
674          
675             // see we get any errors (AKA diagnstics)
676             if (b_resp->records)
677             {
678                 if (b_resp->records->which != Z_Records_DBOSD)
679                     z_records_diag = b_resp->records;
680                 // we may set this multiple times (TOO BAD!)
681             }
682         }
683         else
684         {
685             // if any target does not return present response - return that 
686             package.response() = p->response();
687             return;
688         }
689     }
690
691     mp::odr odr;
692     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
693     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
694
695     if (z_records_diag)
696     {
697         f_resp->records = z_records_diag;
698         *f_resp->presentStatus = Z_PresentStatus_failure;
699     }
700     else
701     {
702         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
703         Z_Records * records = f_resp->records;
704         records->which = Z_Records_DBOSD;
705         records->u.databaseOrSurDiagnostics =
706             (Z_NamePlusRecordList *)
707             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
708         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
709         nprl->num_records = jobs.size();
710         nprl->records = (Z_NamePlusRecord**)
711             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
712         int i = 0;
713         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
714         for (jit = jobs.begin(); jit != jobs.end(); jit++, i++)
715         {
716             PackagePtr p = jit->m_backend->m_package;
717             
718             Z_GDU *gdu = p->response().get();
719             Z_APDU *b_apdu = gdu->u.z3950;
720             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
721
722             nprl->records[i] = (Z_NamePlusRecord*)
723                 odr_malloc(odr, sizeof(Z_NamePlusRecord));
724             *nprl->records[i] = *b_resp->records->
725                 u.databaseOrSurDiagnostics->records[jit->m_inside_pos];
726             nprl->records[i]->databaseName =
727                     odr_strdup(odr, jit->m_backend->m_vhost.c_str());
728         }
729         *f_resp->nextResultSetPosition = start + i;
730         *f_resp->numberOfRecordsReturned = i;
731     }
732     package.response() = f_apdu;
733 }
734
735 void yf::Multi::Frontend::scan1(mp::Package &package, Z_APDU *apdu_req)
736 {
737     if (m_backend_list.size() > 1)
738     {
739         mp::odr odr;
740         Z_APDU *f_apdu = 
741             odr.create_scanResponse(
742                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
743         package.response() = f_apdu;
744         return;
745     }
746     Z_ScanRequest *req = apdu_req->u.scanRequest;
747
748     int default_num_db = req->num_databaseNames;
749     char **default_db = req->databaseNames;
750
751     std::list<BackendPtr>::const_iterator bit;
752     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
753     {
754         PackagePtr p = (*bit)->m_package;
755         mp::odr odr;
756     
757         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
758                                                 &req->num_databaseNames,
759                                                 &req->databaseNames))
760         {
761             req->num_databaseNames = default_num_db;
762             req->databaseNames = default_db;
763         }
764         p->request() = apdu_req;
765         p->copy_filter(package);
766     }
767     multi_move(m_backend_list);
768
769     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
770     {
771         PackagePtr p = (*bit)->m_package;
772         
773         if (p->session().is_closed()) // if any backend closes, close frontend
774             package.session().close();
775         
776         Z_GDU *gdu = p->response().get();
777         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
778             Z_APDU_scanResponse)
779         {
780             package.response() = p->response();
781             break;
782         }
783         else
784         {
785             // if any target does not return scan response - return that 
786             package.response() = p->response();
787             return;
788         }
789     }
790 }
791
792 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
793 {
794     return m_norm_term < k.m_norm_term;
795 }
796
797 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
798 {
799     return m_norm_term == k.m_norm_term;
800 }
801
802 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
803 {
804     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
805     e->which = Z_Entry_termInfo;
806     Z_TermInfo *t;
807     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
808     t->suggestedAttributes = 0;
809     t->displayTerm = 0;
810     t->alternativeTerm = 0;
811     t->byAttributes = 0;
812     t->otherTermInfo = 0;
813     t->globalOccurrences = odr_intdup(odr, m_count);
814     t->term = (Z_Term *)
815         odr_malloc(odr, sizeof(*t->term));
816     t->term->which = Z_Term_general;
817     Odr_oct *o;
818     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
819
820     o->len = o->size = m_norm_term.size();
821     o->buf = (unsigned char *) odr_malloc(odr, o->len);
822     memcpy(o->buf, m_norm_term.c_str(), o->len);
823     return e;
824 }
825
826 void yf::Multi::Frontend::scan2(mp::Package &package, Z_APDU *apdu_req)
827 {
828     Z_ScanRequest *req = apdu_req->u.scanRequest;
829
830     int default_num_db = req->num_databaseNames;
831     char **default_db = req->databaseNames;
832
833     std::list<BackendPtr>::const_iterator bit;
834     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
835     {
836         PackagePtr p = (*bit)->m_package;
837         mp::odr odr;
838     
839         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
840                                                 &req->num_databaseNames,
841                                                 &req->databaseNames))
842         {
843             req->num_databaseNames = default_num_db;
844             req->databaseNames = default_db;
845         }
846         p->request() = apdu_req;
847         p->copy_filter(package);
848     }
849     multi_move(m_backend_list);
850
851     ScanTermInfoList entries_before;
852     ScanTermInfoList entries_after;
853     int no_before = 0;
854     int no_after = 0;
855
856     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
857     {
858         PackagePtr p = (*bit)->m_package;
859         
860         if (p->session().is_closed()) // if any backend closes, close frontend
861             package.session().close();
862         
863         Z_GDU *gdu = p->response().get();
864         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
865             Z_APDU_scanResponse)
866         {
867             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
868
869             if (res->entries && res->entries->nonsurrogateDiagnostics)
870             {
871                 // failure
872                 mp::odr odr;
873                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
874                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
875
876                 f_res->entries->nonsurrogateDiagnostics = 
877                     res->entries->nonsurrogateDiagnostics;
878                 f_res->entries->num_nonsurrogateDiagnostics = 
879                     res->entries->num_nonsurrogateDiagnostics;
880
881                 package.response() = f_apdu;
882                 return;
883             }
884
885             if (res->entries && res->entries->entries)
886             {
887                 Z_Entry **entries = res->entries->entries;
888                 int num_entries = res->entries->num_entries;
889                 int position = 1;
890                 if (req->preferredPositionInResponse)
891                     position = *req->preferredPositionInResponse;
892                 if (res->positionOfTerm)
893                     position = *res->positionOfTerm;
894
895                 // before
896                 int i;
897                 for (i = 0; i<position-1 && i<num_entries; i++)
898                 {
899                     Z_Entry *ent = entries[i];
900
901                     if (ent->which == Z_Entry_termInfo)
902                     {
903                         ScanTermInfo my;
904
905                         int *occur = ent->u.termInfo->globalOccurrences;
906                         my.m_count = occur ? *occur : 0;
907
908                         if (ent->u.termInfo->term->which == Z_Term_general)
909                         {
910                             my.m_norm_term = std::string(
911                                 (const char *)
912                                 ent->u.termInfo->term->u.general->buf,
913                                 ent->u.termInfo->term->u.general->len);
914                         }
915                         if (my.m_norm_term.length())
916                         {
917                             ScanTermInfoList::iterator it = 
918                                 entries_before.begin();
919                             while (it != entries_before.end() && my <*it)
920                                 it++;
921                             if (my == *it)
922                             {
923                                 it->m_count += my.m_count;
924                             }
925                             else
926                             {
927                                 entries_before.insert(it, my);
928                                 no_before++;
929                             }
930                         }
931                     }
932                 }
933                 // after
934                 if (position <= 0)
935                     i = 0;
936                 else
937                     i = position-1;
938                 for ( ; i<num_entries; i++)
939                 {
940                     Z_Entry *ent = entries[i];
941
942                     if (ent->which == Z_Entry_termInfo)
943                     {
944                         ScanTermInfo my;
945
946                         int *occur = ent->u.termInfo->globalOccurrences;
947                         my.m_count = occur ? *occur : 0;
948
949                         if (ent->u.termInfo->term->which == Z_Term_general)
950                         {
951                             my.m_norm_term = std::string(
952                                 (const char *)
953                                 ent->u.termInfo->term->u.general->buf,
954                                 ent->u.termInfo->term->u.general->len);
955                         }
956                         if (my.m_norm_term.length())
957                         {
958                             ScanTermInfoList::iterator it = 
959                                 entries_after.begin();
960                             while (it != entries_after.end() && *it < my)
961                                 it++;
962                             if (my == *it)
963                             {
964                                 it->m_count += my.m_count;
965                             }
966                             else
967                             {
968                                 entries_after.insert(it, my);
969                                 no_after++;
970                             }
971                         }
972                     }
973                 }
974
975             }                
976         }
977         else
978         {
979             // if any target does not return scan response - return that 
980             package.response() = p->response();
981             return;
982         }
983     }
984
985     if (true)
986     {
987         std::cout << "BEFORE\n";
988         ScanTermInfoList::iterator it = entries_before.begin();
989         for(; it != entries_before.end(); it++)
990         {
991             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
992         }
993         
994         std::cout << "AFTER\n";
995         it = entries_after.begin();
996         for(; it != entries_after.end(); it++)
997         {
998             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
999         }
1000     }
1001
1002     if (false)
1003     {
1004         mp::odr odr;
1005         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1006         package.response() = f_apdu;
1007     }
1008     else
1009     {
1010         mp::odr odr;
1011         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1012         Z_ScanResponse *resp = f_apdu->u.scanResponse;
1013         
1014         int number_returned = *req->numberOfTermsRequested;
1015         int position_returned = *req->preferredPositionInResponse;
1016         
1017         resp->entries->num_entries = number_returned;
1018         resp->entries->entries = (Z_Entry**)
1019             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1020         int i;
1021
1022         int lbefore = entries_before.size();
1023         if (lbefore < position_returned-1)
1024             position_returned = lbefore+1;
1025
1026         ScanTermInfoList::iterator it = entries_before.begin();
1027         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1028         {
1029             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1030         }
1031
1032         it = entries_after.begin();
1033
1034         if (position_returned <= 0)
1035             i = 0;
1036         else
1037             i = position_returned-1;
1038         for (; i<number_returned && it != entries_after.end(); i++, it++)
1039         {
1040             resp->entries->entries[i] = it->get_entry(odr);
1041         }
1042
1043         number_returned = i;
1044
1045         resp->positionOfTerm = odr_intdup(odr, position_returned);
1046         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1047         resp->entries->num_entries = number_returned;
1048
1049         package.response() = f_apdu;
1050     }
1051 }
1052
1053
1054 void yf::Multi::process(mp::Package &package) const
1055 {
1056     FrontendPtr f = m_p->get_frontend(package);
1057
1058     Z_GDU *gdu = package.request().get();
1059     
1060     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1061         Z_APDU_initRequest && !f->m_is_multi)
1062     {
1063         f->init(package, gdu);
1064     }
1065     else if (!f->m_is_multi)
1066         package.move();
1067     else if (gdu && gdu->which == Z_GDU_Z3950)
1068     {
1069         Z_APDU *apdu = gdu->u.z3950;
1070         if (apdu->which == Z_APDU_initRequest)
1071         {
1072             mp::odr odr;
1073             
1074             package.response() = odr.create_close(
1075                 apdu,
1076                 Z_Close_protocolError,
1077                 "double init");
1078             
1079             package.session().close();
1080         }
1081         else if (apdu->which == Z_APDU_searchRequest)
1082         {
1083             f->search(package, apdu);
1084         }
1085         else if (apdu->which == Z_APDU_presentRequest)
1086         {
1087             f->present(package, apdu);
1088         }
1089         else if (apdu->which == Z_APDU_scanRequest)
1090         {
1091             f->scan2(package, apdu);
1092         }
1093         else
1094         {
1095             mp::odr odr;
1096             
1097             package.response() = odr.create_close(
1098                 apdu, Z_Close_protocolError,
1099                 "unsupported APDU in filter multi");
1100             
1101             package.session().close();
1102         }
1103     }
1104     m_p->release_frontend(package);
1105 }
1106
1107 void mp::filter::Multi::configure(const xmlNode * ptr)
1108 {
1109     for (ptr = ptr->children; ptr; ptr = ptr->next)
1110     {
1111         if (ptr->type != XML_ELEMENT_NODE)
1112             continue;
1113         if (!strcmp((const char *) ptr->name, "target"))
1114         {
1115             std::string route = mp::xml::get_route(ptr);
1116             std::string target = mp::xml::get_text(ptr);
1117             std::cout << "route=" << route << " target=" << target << "\n";
1118             m_p->m_target_route[target] = route;
1119         }
1120         else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1121         {
1122             m_p->m_hide_unavailable = true;
1123         }
1124         else
1125         {
1126             throw mp::filter::FilterException
1127                 ("Bad element " 
1128                  + std::string((const char *) ptr->name)
1129                  + " in virt_db filter");
1130         }
1131     }
1132 }
1133
1134 static mp::filter::Base* filter_creator()
1135 {
1136     return new mp::filter::Multi;
1137 }
1138
1139 extern "C" {
1140     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1141         0,
1142         "multi",
1143         filter_creator
1144     };
1145 }
1146
1147
1148 /*
1149  * Local variables:
1150  * c-basic-offset: 4
1151  * indent-tabs-mode: nil
1152  * c-file-style: "stroustrup"
1153  * End:
1154  * vim: shiftwidth=4 tabstop=8 expandtab
1155  */