Updated footer comment
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2008 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include <yaz/log.h>
20
21 #include "config.hpp"
22
23 #include "filter.hpp"
24 #include "package.hpp"
25
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition.hpp>
29 #include <boost/shared_ptr.hpp>
30
31 #include "util.hpp"
32 #include "filter_multi.hpp"
33
34 #include <yaz/zgdu.h>
35 #include <yaz/otherinfo.h>
36 #include <yaz/diagbib1.h>
37
38 #include <vector>
39 #include <algorithm>
40 #include <map>
41 #include <iostream>
42
43 namespace mp = metaproxy_1;
44 namespace yf = mp::filter;
45
46 namespace metaproxy_1 {
47     namespace filter {
48         enum multi_merge_type {
49             round_robin,
50             serve_order
51         };
52         struct Multi::BackendSet {
53             BackendPtr m_backend;
54             int m_count;
55             bool operator < (const BackendSet &k) const;
56             bool operator == (const BackendSet &k) const;
57         };
58         struct Multi::ScanTermInfo {
59             std::string m_norm_term;
60             std::string m_display_term;
61             int m_count;
62             bool operator < (const ScanTermInfo &) const;
63             bool operator == (const ScanTermInfo &) const;
64             Z_Entry *get_entry(ODR odr);
65         };
66         struct Multi::FrontendSet {
67             class PresentJob {
68             public:
69                 BackendPtr m_backend;
70                 int m_pos; // position for backend (1=first, 2=second,..
71                 int m_start; // present request start
72                 PresentJob(BackendPtr ptr, int pos) : 
73                     m_backend(ptr), m_pos(pos), m_start(0) {};
74             };
75             FrontendSet(std::string setname);
76             FrontendSet();
77             ~FrontendSet();
78
79             void round_robin(int pos, int number, std::list<PresentJob> &job);
80             void serve_order(int pos, int number, std::list<PresentJob> &job);
81
82             std::list<BackendSet> m_backend_sets;
83             std::string m_setname;
84         };
85         struct Multi::Backend {
86             PackagePtr m_package;
87             std::string m_backend_database;
88             std::string m_vhost;
89             std::string m_route;
90             void operator() (void);  // thread operation
91         };
92         struct Multi::Frontend {
93             Frontend(Rep *rep);
94             ~Frontend();
95             bool m_is_multi;
96             bool m_in_use;
97             std::list<BackendPtr> m_backend_list;
98             std::map<std::string,Multi::FrontendSet> m_sets;
99
100             void multi_move(std::list<BackendPtr> &blist);
101             void init(Package &package, Z_GDU *gdu);
102             void close(Package &package);
103             void search(Package &package, Z_APDU *apdu);
104             void present(Package &package, Z_APDU *apdu);
105             void scan1(Package &package, Z_APDU *apdu);
106             void scan2(Package &package, Z_APDU *apdu);
107             Rep *m_p;
108         };            
109         struct Multi::Map {
110             Map(std::list<std::string> hosts, std::string route);
111             Map();
112             std::list<std::string> m_hosts;
113             std::string m_route;
114         };
115         class Multi::Rep {
116             friend class Multi;
117             friend struct Frontend;
118             
119             Rep();
120             FrontendPtr get_frontend(Package &package);
121             void release_frontend(Package &package);
122         private:
123             std::map<std::string,std::string> m_target_route;
124             boost::mutex m_mutex;
125             boost::condition m_cond_session_ready;
126             std::map<mp::Session, FrontendPtr> m_clients;
127             bool m_hide_unavailable;
128             multi_merge_type m_merge_type;
129         };
130     }
131 }
132
133 yf::Multi::Rep::Rep()
134 {
135     m_hide_unavailable = false;
136     m_merge_type = round_robin;
137 }
138
139 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
140 {
141     return m_count < k.m_count;
142 }
143
144 yf::Multi::Frontend::Frontend(Rep *rep)
145 {
146     m_p = rep;
147     m_is_multi = false;
148 }
149
150 yf::Multi::Frontend::~Frontend()
151 {
152 }
153
154 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
155 {
156     boost::mutex::scoped_lock lock(m_mutex);
157
158     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
159     
160     while(true)
161     {
162         it = m_clients.find(package.session());
163         if (it == m_clients.end())
164             break;
165         
166         if (!it->second->m_in_use)
167         {
168             it->second->m_in_use = true;
169             return it->second;
170         }
171         m_cond_session_ready.wait(lock);
172     }
173     FrontendPtr f(new Frontend(this));
174     m_clients[package.session()] = f;
175     f->m_in_use = true;
176     return f;
177 }
178
179 void yf::Multi::Rep::release_frontend(mp::Package &package)
180 {
181     boost::mutex::scoped_lock lock(m_mutex);
182     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
183     
184     it = m_clients.find(package.session());
185     if (it != m_clients.end())
186     {
187         if (package.session().is_closed())
188         {
189             it->second->close(package);
190             m_clients.erase(it);
191         }
192         else
193         {
194             it->second->m_in_use = false;
195         }
196         m_cond_session_ready.notify_all();
197     }
198 }
199
200 yf::Multi::FrontendSet::FrontendSet(std::string setname)
201     :  m_setname(setname)
202 {
203 }
204
205
206 yf::Multi::FrontendSet::FrontendSet()
207 {
208 }
209
210
211 yf::Multi::FrontendSet::~FrontendSet()
212 {
213 }
214
215 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
216     : m_hosts(hosts), m_route(route) 
217 {
218 }
219
220 yf::Multi::Map::Map()
221 {
222 }
223
224 yf::Multi::Multi() : m_p(new Multi::Rep)
225 {
226 }
227
228 yf::Multi::~Multi() {
229 }
230
231
232 void yf::Multi::Backend::operator() (void) 
233 {
234     m_package->move(m_route);
235 }
236
237
238 void yf::Multi::Frontend::close(mp::Package &package)
239 {
240     std::list<BackendPtr>::const_iterator bit;
241     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
242     {
243         BackendPtr b = *bit;
244
245         b->m_package->copy_filter(package);
246         b->m_package->request() = (Z_GDU *) 0;
247         b->m_package->session().close();
248         b->m_package->move(b->m_route);
249     }
250 }
251
252 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
253 {
254     std::list<BackendPtr>::const_iterator bit;
255     boost::thread_group g;
256     for (bit = blist.begin(); bit != blist.end(); bit++)
257     {
258         g.add_thread(new boost::thread(**bit));
259     }
260     g.join_all();
261 }
262
263 void yf::Multi::FrontendSet::serve_order(int start, int number,
264                                          std::list<PresentJob> &jobs)
265 {
266     int i;
267     for (i = 0; i < number; i++)
268     {
269         std::list<BackendSet>::const_iterator bsit;
270         int voffset = 0;
271         int offset = start + i - 1;
272         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); 
273              bsit++)
274         {
275             if (offset >= voffset && offset < voffset + bsit->m_count)
276             {
277                 PresentJob job(bsit->m_backend, offset - voffset + 1);
278                 jobs.push_back(job);
279                 break;
280             }
281             voffset += bsit->m_count;
282         }
283     }
284 }
285
286 void yf::Multi::FrontendSet::round_robin(int start, int number,
287                                          std::list<PresentJob> &jobs)
288 {
289     std::list<int> pos;
290     std::list<BackendSet>::const_iterator bsit;
291     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
292     {
293         pos.push_back(1);
294     }
295
296     int p = 1;
297 #if 1
298     // optimization step!
299     int omin = 0;
300     while(true)
301     {
302         int min = 0;
303         int no_left = 0;
304         // find min count for each set which is > omin
305         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
306         {
307             if (bsit->m_count > omin)
308             {
309                 if (no_left == 0 || bsit->m_count < min)
310                     min = bsit->m_count;
311                 no_left++;
312             }
313         }
314         if (no_left == 0) // if nothing greater than omin, bail out.
315             break;
316         int skip = no_left * min;
317         if (p + skip > start)  // step gets us "into" present range?
318         {
319             // Yes. skip until start.. Rounding off is deliberate!
320             min = (start-p) / no_left;
321             p += no_left * min;
322             
323             // update positions in each set..
324             std::list<int>::iterator psit = pos.begin();
325             for (psit = pos.begin(); psit != pos.end(); psit++)
326                 *psit += min;
327             break;
328         }
329         // skip on each set.. before "present range"..
330         p = p + skip;
331         
332         std::list<int>::iterator psit = pos.begin();
333         for (psit = pos.begin(); psit != pos.end(); psit++)
334             *psit += min;
335         
336         omin = min; // update so we consider next class (with higher count)
337     }
338 #endif
339     int fetched = 0;
340     bool more = true;
341     while (more)
342     {
343         more = false;
344         std::list<int>::iterator psit = pos.begin();
345         bsit = m_backend_sets.begin();
346
347         for (; bsit != m_backend_sets.end(); psit++,bsit++)
348         {
349             if (fetched >= number)
350             {
351                 more = false;
352                 break;
353             }
354             if (*psit <= bsit->m_count)
355             {
356                 if (p >= start)
357                 {
358                     PresentJob job(bsit->m_backend, *psit);
359                     jobs.push_back(job);
360                     fetched++;
361                 }
362                 (*psit)++;
363                 p++;
364                 more = true;
365             }
366         }
367     }
368 }
369
370 void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
371 {
372     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
373
374     std::list<std::string> targets;
375
376     mp::util::get_vhost_otherinfo(req->otherInfo, targets);
377
378     if (targets.size() < 1)
379     {
380         package.move();
381         return;
382     }
383
384     std::list<std::string>::const_iterator t_it = targets.begin();
385     for (; t_it != targets.end(); t_it++)
386     {
387         Session s;
388         Backend *b = new Backend;
389         b->m_vhost = *t_it;
390
391         b->m_route = m_p->m_target_route[*t_it];
392         // b->m_route unset
393         b->m_package = PackagePtr(new Package(s, package.origin()));
394
395         m_backend_list.push_back(BackendPtr(b));
396     }
397     m_is_multi = true;
398
399     // create init request 
400     std::list<BackendPtr>::iterator bit;
401     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
402     {
403         mp::odr odr;
404         BackendPtr b = *bit;
405         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
406         
407         std::list<std::string>vhost_one;
408         vhost_one.push_back(b->m_vhost);
409         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
410                                        odr, vhost_one);
411
412         Z_InitRequest *req = init_apdu->u.initRequest;
413         
414         ODR_MASK_SET(req->options, Z_Options_search);
415         ODR_MASK_SET(req->options, Z_Options_present);
416         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
417         ODR_MASK_SET(req->options, Z_Options_scan);
418         
419         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
420         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
421         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
422         
423         b->m_package->request() = init_apdu;
424
425         b->m_package->copy_filter(package);
426     }
427     multi_move(m_backend_list);
428
429     // create the frontend init response based on each backend init response
430     mp::odr odr;
431
432     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
433     Z_InitResponse *f_resp = f_apdu->u.initResponse;
434
435     ODR_MASK_SET(f_resp->options, Z_Options_search);
436     ODR_MASK_SET(f_resp->options, Z_Options_present);
437     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
438     
439     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
440     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
441     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
442
443     int no_failed = 0;
444     int no_succeeded = 0;
445     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
446     {
447         PackagePtr p = (*bit)->m_package;
448         
449         if (p->session().is_closed())
450         {
451             // failed. Remove from list and increment number of failed
452             no_failed++;
453             bit = m_backend_list.erase(bit);
454             continue;
455         }
456         no_succeeded++;
457
458         Z_GDU *gdu = p->response().get();
459         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
460             Z_APDU_initResponse)
461         {
462             int i;
463             Z_APDU *b_apdu = gdu->u.z3950;
464             Z_InitResponse *b_resp = b_apdu->u.initResponse;
465
466             // common options for all backends
467             for (i = 0; i <= Z_Options_stringSchema; i++)
468             {
469                 if (!ODR_MASK_GET(b_resp->options, i))
470                     ODR_MASK_CLEAR(f_resp->options, i);
471             }
472             // common protocol version
473             for (i = 0; i <= Z_ProtocolVersion_3; i++)
474                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
475                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
476             // reject if any of the backends reject
477             if (!*b_resp->result)
478                 *f_resp->result = 0;
479         }
480         else
481         {
482             // if any target does not return init return that (close or
483             // similar )
484             package.response() = p->response();
485             return;
486         }
487         bit++;
488     }
489     if (m_p->m_hide_unavailable)
490     {
491         if (no_succeeded == 0)
492             package.session().close();
493     }
494     else
495     {
496         if (no_failed)
497             package.session().close();
498     }
499     package.response() = f_apdu;
500 }
501
502 void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
503 {
504     // create search request 
505     Z_SearchRequest *req = apdu_req->u.searchRequest;
506
507     // save these for later
508     int smallSetUpperBound = *req->smallSetUpperBound;
509     int largeSetLowerBound = *req->largeSetLowerBound;
510     int mediumSetPresentNumber = *req->mediumSetPresentNumber;
511     
512     // they are altered now - to disable piggyback
513     *req->smallSetUpperBound = 0;
514     *req->largeSetLowerBound = 1;
515     *req->mediumSetPresentNumber = 1;
516
517     int default_num_db = req->num_databaseNames;
518     char **default_db = req->databaseNames;
519
520     std::list<BackendPtr>::const_iterator bit;
521     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
522     {
523         PackagePtr p = (*bit)->m_package;
524         mp::odr odr;
525     
526         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
527                                                 &req->num_databaseNames,
528                                                 &req->databaseNames))
529         {
530             req->num_databaseNames = default_num_db;
531             req->databaseNames = default_db;
532         }
533         p->request() = apdu_req;
534         p->copy_filter(package);
535     }
536     multi_move(m_backend_list);
537
538     // look at each response
539     FrontendSet resultSet(std::string(req->resultSetName));
540
541     int result_set_size = 0;
542     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
543     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
544     {
545         PackagePtr p = (*bit)->m_package;
546         
547         if (p->session().is_closed()) // if any backend closes, close frontend
548             package.session().close();
549         
550         Z_GDU *gdu = p->response().get();
551         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
552             Z_APDU_searchResponse)
553         {
554             Z_APDU *b_apdu = gdu->u.z3950;
555             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
556          
557             // see we get any errors (AKA diagnstics)
558             if (b_resp->records)
559             {
560                 if (b_resp->records->which == Z_Records_NSD
561                     || b_resp->records->which == Z_Records_multipleNSD)
562                     z_records_diag = b_resp->records;
563                 // we may set this multiple times (TOO BAD!)
564             }
565             BackendSet backendSet;
566             backendSet.m_backend = *bit;
567             backendSet.m_count = *b_resp->resultCount;
568             result_set_size += *b_resp->resultCount;
569             resultSet.m_backend_sets.push_back(backendSet);
570         }
571         else
572         {
573             // if any target does not return search response - return that 
574             package.response() = p->response();
575             return;
576         }
577     }
578
579     mp::odr odr;
580     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
581     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
582
583     *f_resp->resultCount = result_set_size;
584     if (z_records_diag)
585     {
586         // search error
587         f_resp->records = z_records_diag;
588         package.response() = f_apdu;
589         return;
590     }
591     // assume OK
592     m_sets[resultSet.m_setname] = resultSet;
593
594     int number;
595     mp::util::piggyback(smallSetUpperBound,
596                          largeSetLowerBound,
597                          mediumSetPresentNumber,
598                          result_set_size,
599                          number);
600     Package pp(package.session(), package.origin());
601     if (number > 0)
602     {
603         pp.copy_filter(package);
604         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
605         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
606         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
607         p_req->resultSetId = req->resultSetName;
608         *p_req->resultSetStartPoint = 1;
609         *p_req->numberOfRecordsRequested = number;
610         pp.request() = p_apdu;
611         present(pp, p_apdu);
612         
613         if (pp.session().is_closed())
614             package.session().close();
615         
616         Z_GDU *gdu = pp.response().get();
617         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
618             Z_APDU_presentResponse)
619         {
620             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
621             f_resp->records = p_res->records;
622             *f_resp->numberOfRecordsReturned = 
623                 *p_res->numberOfRecordsReturned;
624             *f_resp->nextResultSetPosition = 
625                 *p_res->nextResultSetPosition;
626         }
627         else 
628         {
629             package.response() = pp.response(); 
630             return;
631         }
632     }
633     package.response() = f_apdu; // in this scope because of p
634 }
635
636 void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
637 {
638     // create present request 
639     Z_PresentRequest *req = apdu_req->u.presentRequest;
640
641     Sets_it it;
642     it = m_sets.find(std::string(req->resultSetId));
643     if (it == m_sets.end())
644     {
645         mp::odr odr;
646         Z_APDU *apdu = 
647             odr.create_presentResponse(
648                 apdu_req,
649                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
650                 req->resultSetId);
651         package.response() = apdu;
652         return;
653     }
654     std::list<Multi::FrontendSet::PresentJob> jobs;
655     int start = *req->resultSetStartPoint;
656     int number = *req->numberOfRecordsRequested;
657
658     if (m_p->m_merge_type == round_robin)
659         it->second.round_robin(start, number, jobs);
660     else if (m_p->m_merge_type == serve_order)
661         it->second.serve_order(start, number, jobs);
662
663     if (0)
664     {
665         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
666         for (jit = jobs.begin(); jit != jobs.end(); jit++)
667         {
668             yaz_log(YLOG_LOG, "job pos=%d", jit->m_pos);
669         }
670     }
671
672     std::list<BackendPtr> present_backend_list;
673
674     std::list<BackendSet>::const_iterator bsit;
675     bsit = it->second.m_backend_sets.begin();
676     for (; bsit != it->second.m_backend_sets.end(); bsit++)
677     {
678         int start = -1;
679         int end = -1;
680         {
681             std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
682             for (jit = jobs.begin(); jit != jobs.end(); jit++)
683             {
684                 if (jit->m_backend == bsit->m_backend)
685                 {
686                     if (start == -1 || jit->m_pos < start)
687                         start = jit->m_pos;
688                     if (end == -1 || jit->m_pos > end)
689                         end = jit->m_pos;
690                 }
691             }
692         }
693         if (start != -1)
694         {
695             std::list<Multi::FrontendSet::PresentJob>::iterator jit;
696             for (jit = jobs.begin(); jit != jobs.end(); jit++)
697             {
698                 if (jit->m_backend == bsit->m_backend)
699                 {
700                     if (jit->m_pos >= start && jit->m_pos <= end)
701                         jit->m_start = start;
702                 }
703             }
704
705             PackagePtr p = bsit->m_backend->m_package;
706
707             *req->resultSetStartPoint = start;
708             *req->numberOfRecordsRequested = end - start + 1;
709             
710             p->request() = apdu_req;
711             p->copy_filter(package);
712
713             present_backend_list.push_back(bsit->m_backend);
714         }
715     }
716     multi_move(present_backend_list);
717
718     // look at each response
719     Z_Records *z_records_diag = 0;
720
721     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
722     for (; pbit != present_backend_list.end(); pbit++)
723     {
724         PackagePtr p = (*pbit)->m_package;
725         
726         if (p->session().is_closed()) // if any backend closes, close frontend
727             package.session().close();
728         
729         Z_GDU *gdu = p->response().get();
730         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
731             Z_APDU_presentResponse)
732         {
733             Z_APDU *b_apdu = gdu->u.z3950;
734             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
735          
736             // see we get any errors (AKA diagnstics)
737             if (b_resp->records)
738             {
739                 if (b_resp->records->which != Z_Records_DBOSD)
740                     z_records_diag = b_resp->records;
741                 // we may set this multiple times (TOO BAD!)
742             }
743         }
744         else
745         {
746             // if any target does not return present response - return that 
747             package.response() = p->response();
748             return;
749         }
750     }
751
752     mp::odr odr;
753     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
754     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
755
756     if (z_records_diag)
757     {
758         f_resp->records = z_records_diag;
759         *f_resp->presentStatus = Z_PresentStatus_failure;
760     }
761     else
762     {
763         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
764         Z_Records * records = f_resp->records;
765         records->which = Z_Records_DBOSD;
766         records->u.databaseOrSurDiagnostics =
767             (Z_NamePlusRecordList *)
768             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
769         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
770         nprl->num_records = jobs.size();
771         nprl->records = (Z_NamePlusRecord**)
772             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
773         int i = 0;
774         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
775         for (jit = jobs.begin(); jit != jobs.end(); jit++, i++)
776         {
777             PackagePtr p = jit->m_backend->m_package;
778             
779             Z_GDU *gdu = p->response().get();
780             Z_APDU *b_apdu = gdu->u.z3950;
781             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
782
783             nprl->records[i] = (Z_NamePlusRecord*)
784                 odr_malloc(odr, sizeof(Z_NamePlusRecord));
785             int inside_pos = jit->m_pos - jit->m_start;
786             if (inside_pos >= b_resp->records->
787                 u.databaseOrSurDiagnostics->num_records)
788                 break;
789             *nprl->records[i] = *b_resp->records->
790                 u.databaseOrSurDiagnostics->records[inside_pos];
791             nprl->records[i]->databaseName =
792                     odr_strdup(odr, jit->m_backend->m_vhost.c_str());
793         }
794         nprl->num_records = i; // usually same as jobs.size();
795         *f_resp->nextResultSetPosition = start + i;
796         *f_resp->numberOfRecordsReturned = i;
797     }
798     package.response() = f_apdu;
799 }
800
801 void yf::Multi::Frontend::scan1(mp::Package &package, Z_APDU *apdu_req)
802 {
803     if (m_backend_list.size() > 1)
804     {
805         mp::odr odr;
806         Z_APDU *f_apdu = 
807             odr.create_scanResponse(
808                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
809         package.response() = f_apdu;
810         return;
811     }
812     Z_ScanRequest *req = apdu_req->u.scanRequest;
813
814     int default_num_db = req->num_databaseNames;
815     char **default_db = req->databaseNames;
816
817     std::list<BackendPtr>::const_iterator bit;
818     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
819     {
820         PackagePtr p = (*bit)->m_package;
821         mp::odr odr;
822     
823         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
824                                                 &req->num_databaseNames,
825                                                 &req->databaseNames))
826         {
827             req->num_databaseNames = default_num_db;
828             req->databaseNames = default_db;
829         }
830         p->request() = apdu_req;
831         p->copy_filter(package);
832     }
833     multi_move(m_backend_list);
834
835     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
836     {
837         PackagePtr p = (*bit)->m_package;
838         
839         if (p->session().is_closed()) // if any backend closes, close frontend
840             package.session().close();
841         
842         Z_GDU *gdu = p->response().get();
843         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
844             Z_APDU_scanResponse)
845         {
846             package.response() = p->response();
847             break;
848         }
849         else
850         {
851             // if any target does not return scan response - return that 
852             package.response() = p->response();
853             return;
854         }
855     }
856 }
857
858 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
859 {
860     return m_norm_term < k.m_norm_term;
861 }
862
863 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
864 {
865     return m_norm_term == k.m_norm_term;
866 }
867
868 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
869 {
870     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
871     e->which = Z_Entry_termInfo;
872     Z_TermInfo *t;
873     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
874     t->suggestedAttributes = 0;
875     t->displayTerm = 0;
876     t->alternativeTerm = 0;
877     t->byAttributes = 0;
878     t->otherTermInfo = 0;
879     t->globalOccurrences = odr_intdup(odr, m_count);
880     t->term = (Z_Term *)
881         odr_malloc(odr, sizeof(*t->term));
882     t->term->which = Z_Term_general;
883     Odr_oct *o;
884     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
885
886     o->len = o->size = m_norm_term.size();
887     o->buf = (unsigned char *) odr_malloc(odr, o->len);
888     memcpy(o->buf, m_norm_term.c_str(), o->len);
889     return e;
890 }
891
892 void yf::Multi::Frontend::scan2(mp::Package &package, Z_APDU *apdu_req)
893 {
894     Z_ScanRequest *req = apdu_req->u.scanRequest;
895
896     int default_num_db = req->num_databaseNames;
897     char **default_db = req->databaseNames;
898
899     std::list<BackendPtr>::const_iterator bit;
900     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
901     {
902         PackagePtr p = (*bit)->m_package;
903         mp::odr odr;
904     
905         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
906                                                 &req->num_databaseNames,
907                                                 &req->databaseNames))
908         {
909             req->num_databaseNames = default_num_db;
910             req->databaseNames = default_db;
911         }
912         p->request() = apdu_req;
913         p->copy_filter(package);
914     }
915     multi_move(m_backend_list);
916
917     ScanTermInfoList entries_before;
918     ScanTermInfoList entries_after;
919     int no_before = 0;
920     int no_after = 0;
921
922     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
923     {
924         PackagePtr p = (*bit)->m_package;
925         
926         if (p->session().is_closed()) // if any backend closes, close frontend
927             package.session().close();
928         
929         Z_GDU *gdu = p->response().get();
930         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
931             Z_APDU_scanResponse)
932         {
933             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
934
935             if (res->entries && res->entries->nonsurrogateDiagnostics)
936             {
937                 // failure
938                 mp::odr odr;
939                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
940                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
941
942                 f_res->entries->nonsurrogateDiagnostics = 
943                     res->entries->nonsurrogateDiagnostics;
944                 f_res->entries->num_nonsurrogateDiagnostics = 
945                     res->entries->num_nonsurrogateDiagnostics;
946
947                 package.response() = f_apdu;
948                 return;
949             }
950
951             if (res->entries && res->entries->entries)
952             {
953                 Z_Entry **entries = res->entries->entries;
954                 int num_entries = res->entries->num_entries;
955                 int position = 1;
956                 if (req->preferredPositionInResponse)
957                     position = *req->preferredPositionInResponse;
958                 if (res->positionOfTerm)
959                     position = *res->positionOfTerm;
960
961                 // before
962                 int i;
963                 for (i = 0; i<position-1 && i<num_entries; i++)
964                 {
965                     Z_Entry *ent = entries[i];
966
967                     if (ent->which == Z_Entry_termInfo)
968                     {
969                         ScanTermInfo my;
970
971                         int *occur = ent->u.termInfo->globalOccurrences;
972                         my.m_count = occur ? *occur : 0;
973
974                         if (ent->u.termInfo->term->which == Z_Term_general)
975                         {
976                             my.m_norm_term = std::string(
977                                 (const char *)
978                                 ent->u.termInfo->term->u.general->buf,
979                                 ent->u.termInfo->term->u.general->len);
980                         }
981                         if (my.m_norm_term.length())
982                         {
983                             ScanTermInfoList::iterator it = 
984                                 entries_before.begin();
985                             while (it != entries_before.end() && my <*it)
986                                 it++;
987                             if (my == *it)
988                             {
989                                 it->m_count += my.m_count;
990                             }
991                             else
992                             {
993                                 entries_before.insert(it, my);
994                                 no_before++;
995                             }
996                         }
997                     }
998                 }
999                 // after
1000                 if (position <= 0)
1001                     i = 0;
1002                 else
1003                     i = position-1;
1004                 for ( ; i<num_entries; i++)
1005                 {
1006                     Z_Entry *ent = entries[i];
1007
1008                     if (ent->which == Z_Entry_termInfo)
1009                     {
1010                         ScanTermInfo my;
1011
1012                         int *occur = ent->u.termInfo->globalOccurrences;
1013                         my.m_count = occur ? *occur : 0;
1014
1015                         if (ent->u.termInfo->term->which == Z_Term_general)
1016                         {
1017                             my.m_norm_term = std::string(
1018                                 (const char *)
1019                                 ent->u.termInfo->term->u.general->buf,
1020                                 ent->u.termInfo->term->u.general->len);
1021                         }
1022                         if (my.m_norm_term.length())
1023                         {
1024                             ScanTermInfoList::iterator it = 
1025                                 entries_after.begin();
1026                             while (it != entries_after.end() && *it < my)
1027                                 it++;
1028                             if (my == *it)
1029                             {
1030                                 it->m_count += my.m_count;
1031                             }
1032                             else
1033                             {
1034                                 entries_after.insert(it, my);
1035                                 no_after++;
1036                             }
1037                         }
1038                     }
1039                 }
1040
1041             }                
1042         }
1043         else
1044         {
1045             // if any target does not return scan response - return that 
1046             package.response() = p->response();
1047             return;
1048         }
1049     }
1050
1051     if (false)
1052     {
1053         std::cout << "BEFORE\n";
1054         ScanTermInfoList::iterator it = entries_before.begin();
1055         for(; it != entries_before.end(); it++)
1056         {
1057             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1058         }
1059         
1060         std::cout << "AFTER\n";
1061         it = entries_after.begin();
1062         for(; it != entries_after.end(); it++)
1063         {
1064             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1065         }
1066     }
1067
1068     if (false)
1069     {
1070         mp::odr odr;
1071         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1072         package.response() = f_apdu;
1073     }
1074     else
1075     {
1076         mp::odr odr;
1077         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1078         Z_ScanResponse *resp = f_apdu->u.scanResponse;
1079         
1080         int number_returned = *req->numberOfTermsRequested;
1081         int position_returned = *req->preferredPositionInResponse;
1082         
1083         resp->entries->num_entries = number_returned;
1084         resp->entries->entries = (Z_Entry**)
1085             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1086         int i;
1087
1088         int lbefore = entries_before.size();
1089         if (lbefore < position_returned-1)
1090             position_returned = lbefore+1;
1091
1092         ScanTermInfoList::iterator it = entries_before.begin();
1093         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1094         {
1095             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1096         }
1097
1098         it = entries_after.begin();
1099
1100         if (position_returned <= 0)
1101             i = 0;
1102         else
1103             i = position_returned-1;
1104         for (; i<number_returned && it != entries_after.end(); i++, it++)
1105         {
1106             resp->entries->entries[i] = it->get_entry(odr);
1107         }
1108
1109         number_returned = i;
1110
1111         resp->positionOfTerm = odr_intdup(odr, position_returned);
1112         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1113         resp->entries->num_entries = number_returned;
1114
1115         package.response() = f_apdu;
1116     }
1117 }
1118
1119
1120 void yf::Multi::process(mp::Package &package) const
1121 {
1122     FrontendPtr f = m_p->get_frontend(package);
1123
1124     Z_GDU *gdu = package.request().get();
1125     
1126     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1127         Z_APDU_initRequest && !f->m_is_multi)
1128     {
1129         f->init(package, gdu);
1130     }
1131     else if (!f->m_is_multi)
1132         package.move();
1133     else if (gdu && gdu->which == Z_GDU_Z3950)
1134     {
1135         Z_APDU *apdu = gdu->u.z3950;
1136         if (apdu->which == Z_APDU_initRequest)
1137         {
1138             mp::odr odr;
1139             
1140             package.response() = odr.create_close(
1141                 apdu,
1142                 Z_Close_protocolError,
1143                 "double init");
1144             
1145             package.session().close();
1146         }
1147         else if (apdu->which == Z_APDU_searchRequest)
1148         {
1149             f->search(package, apdu);
1150         }
1151         else if (apdu->which == Z_APDU_presentRequest)
1152         {
1153             f->present(package, apdu);
1154         }
1155         else if (apdu->which == Z_APDU_scanRequest)
1156         {
1157             f->scan2(package, apdu);
1158         }
1159         else
1160         {
1161             mp::odr odr;
1162             
1163             package.response() = odr.create_close(
1164                 apdu, Z_Close_protocolError,
1165                 "unsupported APDU in filter multi");
1166             
1167             package.session().close();
1168         }
1169     }
1170     m_p->release_frontend(package);
1171 }
1172
1173 void mp::filter::Multi::configure(const xmlNode * ptr, bool test_only)
1174 {
1175     for (ptr = ptr->children; ptr; ptr = ptr->next)
1176     {
1177         if (ptr->type != XML_ELEMENT_NODE)
1178             continue;
1179         if (!strcmp((const char *) ptr->name, "target"))
1180         {
1181             std::string route = mp::xml::get_route(ptr);
1182             std::string target = mp::xml::get_text(ptr);
1183             m_p->m_target_route[target] = route;
1184         }
1185         else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1186         {
1187             m_p->m_hide_unavailable = true;
1188         }
1189         else if (!strcmp((const char *) ptr->name, "mergetype"))
1190         {
1191             std::string mergetype = mp::xml::get_text(ptr);
1192             if (mergetype == "roundrobin")
1193                 m_p->m_merge_type = round_robin;
1194             else if (mergetype == "serveorder")
1195                 m_p->m_merge_type = serve_order;
1196             else
1197                 throw mp::filter::FilterException
1198                     ("Bad mergetype "  + mergetype + " in multi filter");
1199
1200         }
1201         else
1202         {
1203             throw mp::filter::FilterException
1204                 ("Bad element " 
1205                  + std::string((const char *) ptr->name)
1206                  + " in multi filter");
1207         }
1208     }
1209 }
1210
1211 static mp::filter::Base* filter_creator()
1212 {
1213     return new mp::filter::Multi;
1214 }
1215
1216 extern "C" {
1217     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1218         0,
1219         "multi",
1220         filter_creator
1221     };
1222 }
1223
1224
1225 /*
1226  * Local variables:
1227  * c-basic-offset: 4
1228  * c-file-style: "Stroustrup"
1229  * indent-tabs-mode: nil
1230  * End:
1231  * vim: shiftwidth=4 tabstop=8 expandtab
1232  */
1233