multi: combines diagnostics from all backends
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2012 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include <yaz/log.h>
20
21 #include "config.hpp"
22
23 #include <metaproxy/filter.hpp>
24 #include <metaproxy/package.hpp>
25
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition.hpp>
29 #include <boost/shared_ptr.hpp>
30
31 #include <metaproxy/util.hpp>
32 #include "filter_multi.hpp"
33
34 #include <yaz/zgdu.h>
35 #include <yaz/otherinfo.h>
36 #include <yaz/diagbib1.h>
37 #include <yaz/match_glob.h>
38
39 #include <vector>
40 #include <algorithm>
41 #include <map>
42 #include <iostream>
43
44 namespace mp = metaproxy_1;
45 namespace yf = mp::filter;
46
47 namespace metaproxy_1 {
48     namespace filter {
49         enum multi_merge_type {
50             round_robin,
51             serve_order
52         };
53         struct Multi::BackendSet {
54             BackendPtr m_backend;
55             int m_count;
56             bool operator < (const BackendSet &k) const;
57             bool operator == (const BackendSet &k) const;
58         };
59         struct Multi::ScanTermInfo {
60             std::string m_norm_term;
61             std::string m_display_term;
62             int m_count;
63             bool operator < (const ScanTermInfo &) const;
64             bool operator == (const ScanTermInfo &) const;
65             Z_Entry *get_entry(ODR odr);
66         };
67         struct Multi::FrontendSet {
68             class PresentJob {
69             public:
70                 BackendPtr m_backend;
71                 int m_pos; // position for backend (1=first, 2=second,..
72                 int m_start; // present request start
73                 PresentJob(BackendPtr ptr, int pos) : 
74                     m_backend(ptr), m_pos(pos), m_start(0) {};
75             };
76             FrontendSet(std::string setname);
77             FrontendSet();
78             ~FrontendSet();
79
80             void round_robin(int pos, int number, std::list<PresentJob> &job);
81             void serve_order(int pos, int number, std::list<PresentJob> &job);
82
83             std::list<BackendSet> m_backend_sets;
84             std::string m_setname;
85         };
86         struct Multi::Backend {
87             PackagePtr m_package;
88             std::string m_backend_database;
89             std::string m_vhost;
90             std::string m_route;
91             void operator() (void);  // thread operation
92         };
93         struct Multi::Frontend {
94             Frontend(Rep *rep);
95             ~Frontend();
96             bool m_is_multi;
97             bool m_in_use;
98             std::list<BackendPtr> m_backend_list;
99             std::map<std::string,Multi::FrontendSet> m_sets;
100
101             void multi_move(std::list<BackendPtr> &blist);
102             void init(Package &package, Z_GDU *gdu);
103             void close(Package &package);
104             void search(Package &package, Z_APDU *apdu);
105             void present(Package &package, Z_APDU *apdu);
106             void scan1(Package &package, Z_APDU *apdu);
107             void scan2(Package &package, Z_APDU *apdu);
108             void relay_apdu(Package &package, Z_APDU *apdu);
109             Rep *m_p;
110         };            
111         class Multi::Map {
112             std::string m_target_pattern;
113             std::string m_route;
114         public:
115             Map(std::string pattern, std::string route) : 
116                 m_target_pattern(pattern), m_route(route) {};
117             bool match(const std::string target, std::string *ret) const {
118                 if (yaz_match_glob(m_target_pattern.c_str(), target.c_str()))
119                 {
120                     *ret = m_route;
121                     return true;
122                 }
123                 return false;
124             };
125         };
126         class Multi::Rep {
127             friend class Multi;
128             friend struct Frontend;
129             
130             Rep();
131             FrontendPtr get_frontend(Package &package);
132             void release_frontend(Package &package);
133         private:
134             std::list<Multi::Map> m_route_patterns;
135             boost::mutex m_mutex;
136             boost::condition m_cond_session_ready;
137             std::map<mp::Session, FrontendPtr> m_clients;
138             bool m_hide_unavailable;
139             multi_merge_type m_merge_type;
140         };
141     }
142 }
143
144 yf::Multi::Rep::Rep()
145 {
146     m_hide_unavailable = false;
147     m_merge_type = round_robin;
148 }
149
150 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
151 {
152     return m_count < k.m_count;
153 }
154
155 yf::Multi::Frontend::Frontend(Rep *rep)
156 {
157     m_p = rep;
158     m_is_multi = false;
159 }
160
161 yf::Multi::Frontend::~Frontend()
162 {
163 }
164
165 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
166 {
167     boost::mutex::scoped_lock lock(m_mutex);
168
169     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
170     
171     while(true)
172     {
173         it = m_clients.find(package.session());
174         if (it == m_clients.end())
175             break;
176         
177         if (!it->second->m_in_use)
178         {
179             it->second->m_in_use = true;
180             return it->second;
181         }
182         m_cond_session_ready.wait(lock);
183     }
184     FrontendPtr f(new Frontend(this));
185     m_clients[package.session()] = f;
186     f->m_in_use = true;
187     return f;
188 }
189
190 void yf::Multi::Rep::release_frontend(mp::Package &package)
191 {
192     boost::mutex::scoped_lock lock(m_mutex);
193     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
194     
195     it = m_clients.find(package.session());
196     if (it != m_clients.end())
197     {
198         if (package.session().is_closed())
199         {
200             it->second->close(package);
201             m_clients.erase(it);
202         }
203         else
204         {
205             it->second->m_in_use = false;
206         }
207         m_cond_session_ready.notify_all();
208     }
209 }
210
211 yf::Multi::FrontendSet::FrontendSet(std::string setname)
212     :  m_setname(setname)
213 {
214 }
215
216
217 yf::Multi::FrontendSet::FrontendSet()
218 {
219 }
220
221
222 yf::Multi::FrontendSet::~FrontendSet()
223 {
224 }
225
226 yf::Multi::Multi() : m_p(new Multi::Rep)
227 {
228 }
229
230 yf::Multi::~Multi() {
231 }
232
233
234 void yf::Multi::Backend::operator() (void) 
235 {
236     m_package->move(m_route);
237 }
238
239
240 void yf::Multi::Frontend::close(mp::Package &package)
241 {
242     std::list<BackendPtr>::const_iterator bit;
243     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
244     {
245         BackendPtr b = *bit;
246
247         b->m_package->copy_filter(package);
248         b->m_package->request() = (Z_GDU *) 0;
249         b->m_package->session().close();
250         b->m_package->move(b->m_route);
251     }
252 }
253
254 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
255 {
256     std::list<BackendPtr>::const_iterator bit;
257     boost::thread_group g;
258     for (bit = blist.begin(); bit != blist.end(); bit++)
259     {
260         g.add_thread(new boost::thread(**bit));
261     }
262     g.join_all();
263 }
264
265 void yf::Multi::FrontendSet::serve_order(int start, int number,
266                                          std::list<PresentJob> &jobs)
267 {
268     int i;
269     for (i = 0; i < number; i++)
270     {
271         std::list<BackendSet>::const_iterator bsit;
272         int voffset = 0;
273         int offset = start + i - 1;
274         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); 
275              bsit++)
276         {
277             if (offset >= voffset && offset < voffset + bsit->m_count)
278             {
279                 PresentJob job(bsit->m_backend, offset - voffset + 1);
280                 jobs.push_back(job);
281                 break;
282             }
283             voffset += bsit->m_count;
284         }
285     }
286 }
287
288 void yf::Multi::FrontendSet::round_robin(int start, int number,
289                                          std::list<PresentJob> &jobs)
290 {
291     std::list<int> pos;
292     std::list<BackendSet>::const_iterator bsit;
293     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
294     {
295         pos.push_back(1);
296     }
297
298     int p = 1;
299 #if 1
300     // optimization step!
301     int omin = 0;
302     while(true)
303     {
304         int min = 0;
305         int no_left = 0;
306         // find min count for each set which is > omin
307         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
308         {
309             if (bsit->m_count > omin)
310             {
311                 if (no_left == 0 || bsit->m_count < min)
312                     min = bsit->m_count;
313                 no_left++;
314             }
315         }
316         if (no_left == 0) // if nothing greater than omin, bail out.
317             break;
318         int skip = no_left * min;
319         if (p + skip > start)  // step gets us "into" present range?
320         {
321             // Yes. skip until start.. Rounding off is deliberate!
322             min = (start-p) / no_left;
323             p += no_left * min;
324             
325             // update positions in each set..
326             std::list<int>::iterator psit = pos.begin();
327             for (psit = pos.begin(); psit != pos.end(); psit++)
328                 *psit += min;
329             break;
330         }
331         // skip on each set.. before "present range"..
332         p = p + skip;
333         
334         std::list<int>::iterator psit = pos.begin();
335         for (psit = pos.begin(); psit != pos.end(); psit++)
336             *psit += min;
337         
338         omin = min; // update so we consider next class (with higher count)
339     }
340 #endif
341     int fetched = 0;
342     bool more = true;
343     while (more)
344     {
345         more = false;
346         std::list<int>::iterator psit = pos.begin();
347         bsit = m_backend_sets.begin();
348
349         for (; bsit != m_backend_sets.end(); psit++,bsit++)
350         {
351             if (fetched >= number)
352             {
353                 more = false;
354                 break;
355             }
356             if (*psit <= bsit->m_count)
357             {
358                 if (p >= start)
359                 {
360                     PresentJob job(bsit->m_backend, *psit);
361                     jobs.push_back(job);
362                     fetched++;
363                 }
364                 (*psit)++;
365                 p++;
366                 more = true;
367             }
368         }
369     }
370 }
371
372 void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
373 {
374     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
375
376     std::list<std::string> targets;
377
378     mp::util::get_vhost_otherinfo(req->otherInfo, targets);
379
380     if (targets.size() < 1)
381     {
382         package.move();
383         return;
384     }
385
386     std::list<std::string>::const_iterator t_it = targets.begin();
387     for (; t_it != targets.end(); t_it++)
388     {
389         Session s;
390         Backend *b = new Backend;
391         b->m_vhost = *t_it;
392
393         std::list<Multi::Map>::const_iterator it =
394             m_p->m_route_patterns.begin();
395         while (it != m_p->m_route_patterns.end()) {
396             if (it->match(*t_it, &b->m_route))
397                 break;
398             it++;
399         }
400         // b->m_route = m_p->m_target_route[*t_it];
401         // b->m_route unset
402         b->m_package = PackagePtr(new Package(s, package.origin()));
403
404         m_backend_list.push_back(BackendPtr(b));
405     }
406     m_is_multi = true;
407
408     // create init request 
409     std::list<BackendPtr>::iterator bit;
410     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
411     {
412         mp::odr odr;
413         BackendPtr b = *bit;
414         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
415         
416         std::list<std::string>vhost_one;
417         vhost_one.push_back(b->m_vhost);
418         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
419                                        odr, vhost_one);
420
421
422         Z_InitRequest *breq = init_apdu->u.initRequest;
423
424         breq->idAuthentication = req->idAuthentication;
425         
426         *breq->preferredMessageSize = *req->preferredMessageSize;
427         *breq->maximumRecordSize = *req->maximumRecordSize;
428
429         ODR_MASK_SET(breq->options, Z_Options_search);
430         ODR_MASK_SET(breq->options, Z_Options_present);
431         ODR_MASK_SET(breq->options, Z_Options_namedResultSets);
432         ODR_MASK_SET(breq->options, Z_Options_scan);
433         
434         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_1);
435         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_2);
436         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_3);
437         
438         b->m_package->request() = init_apdu;
439
440         b->m_package->copy_filter(package);
441     }
442     multi_move(m_backend_list);
443
444     // create the frontend init response based on each backend init response
445     mp::odr odr;
446
447     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
448     Z_InitResponse *f_resp = f_apdu->u.initResponse;
449
450     ODR_MASK_SET(f_resp->options, Z_Options_search);
451     ODR_MASK_SET(f_resp->options, Z_Options_present);
452     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
453     ODR_MASK_SET(f_resp->options, Z_Options_scan);
454     
455     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
456     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
457     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
458
459     int no_failed = 0;
460     int no_succeeded = 0;
461
462     Odr_int preferredMessageSize = *req->preferredMessageSize;
463     Odr_int maximumRecordSize = *req->maximumRecordSize;
464     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
465     {
466         PackagePtr p = (*bit)->m_package;
467         
468         if (p->session().is_closed())
469         {
470             // failed. Remove from list and increment number of failed
471             no_failed++;
472             bit = m_backend_list.erase(bit);
473             continue;
474         }
475         Z_GDU *gdu = p->response().get();
476         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
477             Z_APDU_initResponse)
478         {
479             int i;
480             Z_APDU *b_apdu = gdu->u.z3950;
481             Z_InitResponse *b_resp = b_apdu->u.initResponse;
482
483             // common options for all backends
484             for (i = 0; i <= Z_Options_stringSchema; i++)
485             {
486                 if (!ODR_MASK_GET(b_resp->options, i))
487                     ODR_MASK_CLEAR(f_resp->options, i);
488             }
489             // common protocol version
490             for (i = 0; i <= Z_ProtocolVersion_3; i++)
491                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
492                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
493             if (*b_resp->result)
494             {
495                 no_succeeded++;
496                 if (preferredMessageSize > *b_resp->preferredMessageSize)
497                     preferredMessageSize = *b_resp->preferredMessageSize;
498                 if (maximumRecordSize > *b_resp->maximumRecordSize)
499                     maximumRecordSize = *b_resp->maximumRecordSize;
500             }
501             else
502                 no_failed++;
503         }
504         else
505             no_failed++;
506         bit++;
507     }
508     *f_resp->preferredMessageSize = preferredMessageSize;
509     *f_resp->maximumRecordSize = maximumRecordSize;
510
511     if (m_p->m_hide_unavailable)
512     {
513         if (no_succeeded == 0)
514         {
515             *f_resp->result = 0;
516             package.session().close();
517         }
518     }
519     else
520     {
521         if (no_failed)
522         {
523             *f_resp->result = 0;
524             package.session().close();
525         }
526     }
527     package.response() = f_apdu;
528 }
529
530 void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
531 {
532     // create search request 
533     Z_SearchRequest *req = apdu_req->u.searchRequest;
534
535     // save these for later
536     Odr_int smallSetUpperBound = *req->smallSetUpperBound;
537     Odr_int largeSetLowerBound = *req->largeSetLowerBound;
538     Odr_int mediumSetPresentNumber = *req->mediumSetPresentNumber;
539     
540     // they are altered now - to disable piggyback
541     *req->smallSetUpperBound = 0;
542     *req->largeSetLowerBound = 1;
543     *req->mediumSetPresentNumber = 1;
544
545     int default_num_db = req->num_databaseNames;
546     char **default_db = req->databaseNames;
547
548     std::list<BackendPtr>::const_iterator bit;
549     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
550     {
551         PackagePtr p = (*bit)->m_package;
552         mp::odr odr;
553     
554         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
555                                                 &req->num_databaseNames,
556                                                 &req->databaseNames))
557         {
558             req->num_databaseNames = default_num_db;
559             req->databaseNames = default_db;
560         }
561         p->request() = apdu_req;
562         p->copy_filter(package);
563     }
564     multi_move(m_backend_list);
565
566     // look at each response
567     FrontendSet resultSet(std::string(req->resultSetName));
568
569     mp::odr odr;
570     Odr_int result_set_size = 0;
571     Z_DiagRecs *z_diag = 0;
572     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
573     {
574         PackagePtr p = (*bit)->m_package;
575         
576         if (p->session().is_closed()) // if any backend closes, close frontend
577             package.session().close();
578         
579         Z_GDU *gdu = p->response().get();
580         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
581             Z_APDU_searchResponse)
582         {
583             Z_APDU *b_apdu = gdu->u.z3950;
584             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
585          
586             // see we get any errors (AKA diagnstics)
587             if (b_resp->records)
588             {
589                 if (b_resp->records->which == Z_Records_NSD)
590                 {
591                     if (!z_diag)
592                     {
593                         z_diag = (Z_DiagRecs *)
594                             odr_malloc(odr, sizeof(*z_diag));
595                         z_diag->num_diagRecs = 0;
596                         z_diag->diagRecs = (Z_DiagRec**)
597                             odr_malloc(odr, sizeof(*z_diag->diagRecs));
598                     }
599                     else
600                     {
601                         Z_DiagRec **n = (Z_DiagRec **)
602                             odr_malloc(odr,
603                                        (1+z_diag->num_diagRecs) * sizeof(*n));
604                         memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs
605                                * sizeof(*n));
606                         z_diag->diagRecs = n;
607                     }
608                     Z_DiagRec *nr = (Z_DiagRec *) odr_malloc(odr, sizeof(*nr));
609                     nr->which = Z_DiagRec_defaultFormat;
610                     nr->u.defaultFormat =
611                         b_resp->records->u.nonSurrogateDiagnostic;
612                     z_diag->diagRecs[z_diag->num_diagRecs++] = nr;
613                 }
614                 if (b_resp->records->which == Z_Records_multipleNSD)
615                 {
616                     // we may set this multiple times (TOO BAD!)
617                     z_diag = b_resp->records->u.multipleNonSurDiagnostics;
618                 }
619             }
620             BackendSet backendSet;
621             backendSet.m_backend = *bit;
622             backendSet.m_count = *b_resp->resultCount;
623             result_set_size += *b_resp->resultCount;
624             resultSet.m_backend_sets.push_back(backendSet);
625         }
626         else
627         {
628             // if any target does not return search response - return that 
629             package.response() = p->response();
630             return;
631         }
632     }
633
634     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
635     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
636
637     *f_resp->resultCount = result_set_size;
638     if (z_diag)
639     {
640         f_resp->records = (Z_Records *)
641             odr_malloc(odr, sizeof(*f_resp->records));
642         if (z_diag->num_diagRecs > 1)
643         {
644             f_resp->records->which = Z_Records_multipleNSD;
645             f_resp->records->u.multipleNonSurDiagnostics = z_diag;
646         }
647         else
648         {
649             f_resp->records->which = Z_Records_NSD;
650             f_resp->records->u.nonSurrogateDiagnostic =
651                 z_diag->diagRecs[0]->u.defaultFormat;
652         }
653     }
654     // assume OK
655     m_sets[resultSet.m_setname] = resultSet;
656
657     Odr_int number;
658     mp::util::piggyback(smallSetUpperBound,
659                         largeSetLowerBound,
660                         mediumSetPresentNumber,
661                         0, 0,
662                         result_set_size,
663                         number, 0);
664     Package pp(package.session(), package.origin());
665     if (z_diag == 0 && number > 0)
666     {
667         pp.copy_filter(package);
668         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
669         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
670         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
671         p_req->resultSetId = req->resultSetName;
672         *p_req->resultSetStartPoint = 1;
673         *p_req->numberOfRecordsRequested = number;
674         pp.request() = p_apdu;
675         present(pp, p_apdu);
676         
677         if (pp.session().is_closed())
678             package.session().close();
679         
680         Z_GDU *gdu = pp.response().get();
681         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
682             Z_APDU_presentResponse)
683         {
684             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
685             f_resp->records = p_res->records;
686             *f_resp->numberOfRecordsReturned = 
687                 *p_res->numberOfRecordsReturned;
688             *f_resp->nextResultSetPosition = 
689                 *p_res->nextResultSetPosition;
690         }
691         else 
692         {
693             package.response() = pp.response(); 
694             return;
695         }
696     }
697     package.response() = f_apdu; // in this scope because of p
698 }
699
700 void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
701 {
702     // create present request 
703     Z_PresentRequest *req = apdu_req->u.presentRequest;
704
705     Sets_it it;
706     it = m_sets.find(std::string(req->resultSetId));
707     if (it == m_sets.end())
708     {
709         mp::odr odr;
710         Z_APDU *apdu = 
711             odr.create_presentResponse(
712                 apdu_req,
713                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
714                 req->resultSetId);
715         package.response() = apdu;
716         return;
717     }
718     std::list<Multi::FrontendSet::PresentJob> jobs;
719     int start = *req->resultSetStartPoint;
720     int number = *req->numberOfRecordsRequested;
721
722     if (m_p->m_merge_type == round_robin)
723         it->second.round_robin(start, number, jobs);
724     else if (m_p->m_merge_type == serve_order)
725         it->second.serve_order(start, number, jobs);
726
727     if (0)
728     {
729         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
730         for (jit = jobs.begin(); jit != jobs.end(); jit++)
731         {
732             yaz_log(YLOG_LOG, "job pos=%d", jit->m_pos);
733         }
734     }
735
736     std::list<BackendPtr> present_backend_list;
737
738     std::list<BackendSet>::const_iterator bsit;
739     bsit = it->second.m_backend_sets.begin();
740     for (; bsit != it->second.m_backend_sets.end(); bsit++)
741     {
742         int start = -1;
743         int end = -1;
744         {
745             std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
746             for (jit = jobs.begin(); jit != jobs.end(); jit++)
747             {
748                 if (jit->m_backend == bsit->m_backend)
749                 {
750                     if (start == -1 || jit->m_pos < start)
751                         start = jit->m_pos;
752                     if (end == -1 || jit->m_pos > end)
753                         end = jit->m_pos;
754                 }
755             }
756         }
757         if (start != -1)
758         {
759             std::list<Multi::FrontendSet::PresentJob>::iterator jit;
760             for (jit = jobs.begin(); jit != jobs.end(); jit++)
761             {
762                 if (jit->m_backend == bsit->m_backend)
763                 {
764                     if (jit->m_pos >= start && jit->m_pos <= end)
765                         jit->m_start = start;
766                 }
767             }
768
769             PackagePtr p = bsit->m_backend->m_package;
770
771             *req->resultSetStartPoint = start;
772             *req->numberOfRecordsRequested = end - start + 1;
773             
774             p->request() = apdu_req;
775             p->copy_filter(package);
776
777             present_backend_list.push_back(bsit->m_backend);
778         }
779     }
780     multi_move(present_backend_list);
781
782     // look at each response
783     Z_Records *z_records_diag = 0;
784
785     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
786     for (; pbit != present_backend_list.end(); pbit++)
787     {
788         PackagePtr p = (*pbit)->m_package;
789         
790         if (p->session().is_closed()) // if any backend closes, close frontend
791             package.session().close();
792         
793         Z_GDU *gdu = p->response().get();
794         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
795             Z_APDU_presentResponse)
796         {
797             Z_APDU *b_apdu = gdu->u.z3950;
798             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
799          
800             // see we get any errors (AKA diagnstics)
801             if (b_resp->records)
802             {
803                 if (b_resp->records->which != Z_Records_DBOSD)
804                     z_records_diag = b_resp->records;
805                 // we may set this multiple times (TOO BAD!)
806             }
807         }
808         else
809         {
810             // if any target does not return present response - return that 
811             package.response() = p->response();
812             return;
813         }
814     }
815
816     mp::odr odr;
817     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
818     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
819
820     if (z_records_diag)
821     {
822         f_resp->records = z_records_diag;
823         *f_resp->presentStatus = Z_PresentStatus_failure;
824     }
825     else if (number < 0 || (size_t) number > jobs.size())
826     {
827         f_apdu = 
828             odr.create_presentResponse(
829                 apdu_req,
830                 YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
831                 0);
832     }
833     else
834     {
835         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
836         Z_Records * records = f_resp->records;
837         records->which = Z_Records_DBOSD;
838         records->u.databaseOrSurDiagnostics =
839             (Z_NamePlusRecordList *)
840             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
841         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
842         nprl->num_records = jobs.size();
843         nprl->records = (Z_NamePlusRecord**)
844             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
845         int i = 0;
846         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
847         for (jit = jobs.begin(); jit != jobs.end(); jit++, i++)
848         {
849             PackagePtr p = jit->m_backend->m_package;
850             
851             Z_GDU *gdu = p->response().get();
852             Z_APDU *b_apdu = gdu->u.z3950;
853             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
854
855             nprl->records[i] = (Z_NamePlusRecord*)
856                 odr_malloc(odr, sizeof(Z_NamePlusRecord));
857             int inside_pos = jit->m_pos - jit->m_start;
858             if (inside_pos >= b_resp->records->
859                 u.databaseOrSurDiagnostics->num_records)
860                 break;
861             *nprl->records[i] = *b_resp->records->
862                 u.databaseOrSurDiagnostics->records[inside_pos];
863             nprl->records[i]->databaseName =
864                     odr_strdup(odr, jit->m_backend->m_vhost.c_str());
865         }
866         nprl->num_records = i; // usually same as jobs.size();
867         *f_resp->nextResultSetPosition = start + i;
868         *f_resp->numberOfRecordsReturned = i;
869     }
870     package.response() = f_apdu;
871 }
872
873 void yf::Multi::Frontend::scan1(mp::Package &package, Z_APDU *apdu_req)
874 {
875     if (m_backend_list.size() > 1)
876     {
877         mp::odr odr;
878         Z_APDU *f_apdu = 
879             odr.create_scanResponse(
880                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
881         package.response() = f_apdu;
882         return;
883     }
884     Z_ScanRequest *req = apdu_req->u.scanRequest;
885
886     int default_num_db = req->num_databaseNames;
887     char **default_db = req->databaseNames;
888
889     std::list<BackendPtr>::const_iterator bit;
890     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
891     {
892         PackagePtr p = (*bit)->m_package;
893         mp::odr odr;
894     
895         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
896                                                 &req->num_databaseNames,
897                                                 &req->databaseNames))
898         {
899             req->num_databaseNames = default_num_db;
900             req->databaseNames = default_db;
901         }
902         p->request() = apdu_req;
903         p->copy_filter(package);
904     }
905     multi_move(m_backend_list);
906
907     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
908     {
909         PackagePtr p = (*bit)->m_package;
910         
911         if (p->session().is_closed()) // if any backend closes, close frontend
912             package.session().close();
913         
914         Z_GDU *gdu = p->response().get();
915         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
916             Z_APDU_scanResponse)
917         {
918             package.response() = p->response();
919             break;
920         }
921         else
922         {
923             // if any target does not return scan response - return that 
924             package.response() = p->response();
925             return;
926         }
927     }
928 }
929
930 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
931 {
932     return m_norm_term < k.m_norm_term;
933 }
934
935 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
936 {
937     return m_norm_term == k.m_norm_term;
938 }
939
940 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
941 {
942     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
943     e->which = Z_Entry_termInfo;
944     Z_TermInfo *t;
945     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
946     t->suggestedAttributes = 0;
947     t->displayTerm = 0;
948     t->alternativeTerm = 0;
949     t->byAttributes = 0;
950     t->otherTermInfo = 0;
951     t->globalOccurrences = odr_intdup(odr, m_count);
952     t->term = (Z_Term *)
953         odr_malloc(odr, sizeof(*t->term));
954     t->term->which = Z_Term_general;
955     Odr_oct *o;
956     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
957
958     o->len = o->size = m_norm_term.size();
959     o->buf = (unsigned char *) odr_malloc(odr, o->len);
960     memcpy(o->buf, m_norm_term.c_str(), o->len);
961     return e;
962 }
963
964 void yf::Multi::Frontend::relay_apdu(mp::Package &package, Z_APDU *apdu_req)
965 {
966     std::list<BackendPtr>::const_iterator bit;
967     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
968     {
969         PackagePtr p = (*bit)->m_package;
970         mp::odr odr;
971     
972         p->request() = apdu_req;
973         p->copy_filter(package);
974     }
975     multi_move(m_backend_list);
976     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
977     {
978         PackagePtr p = (*bit)->m_package;
979         
980         if (p->session().is_closed()) // if any backend closes, close frontend
981             package.session().close();
982         
983         package.response() = p->response();
984     }
985 }
986
987
988 void yf::Multi::Frontend::scan2(mp::Package &package, Z_APDU *apdu_req)
989 {
990     Z_ScanRequest *req = apdu_req->u.scanRequest;
991
992     int default_num_db = req->num_databaseNames;
993     char **default_db = req->databaseNames;
994
995     std::list<BackendPtr>::const_iterator bit;
996     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
997     {
998         PackagePtr p = (*bit)->m_package;
999         mp::odr odr;
1000     
1001         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
1002                                                 &req->num_databaseNames,
1003                                                 &req->databaseNames))
1004         {
1005             req->num_databaseNames = default_num_db;
1006             req->databaseNames = default_db;
1007         }
1008         p->request() = apdu_req;
1009         p->copy_filter(package);
1010     }
1011     multi_move(m_backend_list);
1012
1013     ScanTermInfoList entries_before;
1014     ScanTermInfoList entries_after;
1015     int no_before = 0;
1016     int no_after = 0;
1017
1018     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1019     {
1020         PackagePtr p = (*bit)->m_package;
1021         
1022         if (p->session().is_closed()) // if any backend closes, close frontend
1023             package.session().close();
1024         
1025         Z_GDU *gdu = p->response().get();
1026         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1027             Z_APDU_scanResponse)
1028         {
1029             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
1030
1031             if (res->entries && res->entries->nonsurrogateDiagnostics)
1032             {
1033                 // failure
1034                 mp::odr odr;
1035                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
1036                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
1037
1038                 f_res->entries->nonsurrogateDiagnostics = 
1039                     res->entries->nonsurrogateDiagnostics;
1040                 f_res->entries->num_nonsurrogateDiagnostics = 
1041                     res->entries->num_nonsurrogateDiagnostics;
1042
1043                 package.response() = f_apdu;
1044                 return;
1045             }
1046
1047             if (res->entries && res->entries->entries)
1048             {
1049                 Z_Entry **entries = res->entries->entries;
1050                 int num_entries = res->entries->num_entries;
1051                 int position = 1;
1052                 if (req->preferredPositionInResponse)
1053                     position = *req->preferredPositionInResponse;
1054                 if (res->positionOfTerm)
1055                     position = *res->positionOfTerm;
1056
1057                 // before
1058                 int i;
1059                 for (i = 0; i<position-1 && i<num_entries; i++)
1060                 {
1061                     Z_Entry *ent = entries[i];
1062
1063                     if (ent->which == Z_Entry_termInfo)
1064                     {
1065                         ScanTermInfo my;
1066
1067                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1068                         my.m_count = occur ? *occur : 0;
1069
1070                         if (ent->u.termInfo->term->which == Z_Term_general)
1071                         {
1072                             my.m_norm_term = std::string(
1073                                 (const char *)
1074                                 ent->u.termInfo->term->u.general->buf,
1075                                 ent->u.termInfo->term->u.general->len);
1076                         }
1077                         if (my.m_norm_term.length())
1078                         {
1079                             ScanTermInfoList::iterator it = 
1080                                 entries_before.begin();
1081                             while (it != entries_before.end() && my <*it)
1082                                 it++;
1083                             if (my == *it)
1084                             {
1085                                 it->m_count += my.m_count;
1086                             }
1087                             else
1088                             {
1089                                 entries_before.insert(it, my);
1090                                 no_before++;
1091                             }
1092                         }
1093                     }
1094                 }
1095                 // after
1096                 if (position <= 0)
1097                     i = 0;
1098                 else
1099                     i = position-1;
1100                 for ( ; i<num_entries; i++)
1101                 {
1102                     Z_Entry *ent = entries[i];
1103
1104                     if (ent->which == Z_Entry_termInfo)
1105                     {
1106                         ScanTermInfo my;
1107
1108                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1109                         my.m_count = occur ? *occur : 0;
1110
1111                         if (ent->u.termInfo->term->which == Z_Term_general)
1112                         {
1113                             my.m_norm_term = std::string(
1114                                 (const char *)
1115                                 ent->u.termInfo->term->u.general->buf,
1116                                 ent->u.termInfo->term->u.general->len);
1117                         }
1118                         if (my.m_norm_term.length())
1119                         {
1120                             ScanTermInfoList::iterator it = 
1121                                 entries_after.begin();
1122                             while (it != entries_after.end() && *it < my)
1123                                 it++;
1124                             if (my == *it)
1125                             {
1126                                 it->m_count += my.m_count;
1127                             }
1128                             else
1129                             {
1130                                 entries_after.insert(it, my);
1131                                 no_after++;
1132                             }
1133                         }
1134                     }
1135                 }
1136
1137             }                
1138         }
1139         else
1140         {
1141             // if any target does not return scan response - return that 
1142             package.response() = p->response();
1143             return;
1144         }
1145     }
1146
1147     if (false)
1148     {
1149         std::cout << "BEFORE\n";
1150         ScanTermInfoList::iterator it = entries_before.begin();
1151         for(; it != entries_before.end(); it++)
1152         {
1153             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1154         }
1155         
1156         std::cout << "AFTER\n";
1157         it = entries_after.begin();
1158         for(; it != entries_after.end(); it++)
1159         {
1160             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1161         }
1162     }
1163
1164     if (false)
1165     {
1166         mp::odr odr;
1167         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1168         package.response() = f_apdu;
1169     }
1170     else
1171     {
1172         mp::odr odr;
1173         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1174         Z_ScanResponse *resp = f_apdu->u.scanResponse;
1175         
1176         int number_returned = *req->numberOfTermsRequested;
1177         int position_returned = *req->preferredPositionInResponse;
1178         
1179         resp->entries->num_entries = number_returned;
1180         resp->entries->entries = (Z_Entry**)
1181             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1182         int i;
1183
1184         int lbefore = entries_before.size();
1185         if (lbefore < position_returned-1)
1186             position_returned = lbefore+1;
1187
1188         ScanTermInfoList::iterator it = entries_before.begin();
1189         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1190         {
1191             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1192         }
1193
1194         it = entries_after.begin();
1195
1196         if (position_returned <= 0)
1197             i = 0;
1198         else
1199             i = position_returned-1;
1200         for (; i<number_returned && it != entries_after.end(); i++, it++)
1201         {
1202             resp->entries->entries[i] = it->get_entry(odr);
1203         }
1204
1205         number_returned = i;
1206
1207         resp->positionOfTerm = odr_intdup(odr, position_returned);
1208         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1209         resp->entries->num_entries = number_returned;
1210
1211         package.response() = f_apdu;
1212     }
1213 }
1214
1215
1216 void yf::Multi::process(mp::Package &package) const
1217 {
1218     FrontendPtr f = m_p->get_frontend(package);
1219
1220     Z_GDU *gdu = package.request().get();
1221     
1222     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1223         Z_APDU_initRequest && !f->m_is_multi)
1224     {
1225         f->init(package, gdu);
1226     }
1227     else if (!f->m_is_multi)
1228         package.move();
1229     else if (gdu && gdu->which == Z_GDU_Z3950)
1230     {
1231         Z_APDU *apdu = gdu->u.z3950;
1232         if (apdu->which == Z_APDU_initRequest)
1233         {
1234             mp::odr odr;
1235             
1236             package.response() = odr.create_close(
1237                 apdu,
1238                 Z_Close_protocolError,
1239                 "double init");
1240             
1241             package.session().close();
1242         }
1243         else if (apdu->which == Z_APDU_searchRequest)
1244         {
1245             f->search(package, apdu);
1246         }
1247         else if (apdu->which == Z_APDU_presentRequest)
1248         {
1249             f->present(package, apdu);
1250         }
1251         else if (apdu->which == Z_APDU_scanRequest)
1252         {
1253             f->scan2(package, apdu);
1254         }
1255         else if (apdu->which == Z_APDU_close)
1256         {
1257             f->relay_apdu(package, apdu);
1258         }
1259         else
1260         {
1261             mp::odr odr;
1262             
1263             package.response() = odr.create_close(
1264                 apdu, Z_Close_protocolError,
1265                 "unsupported APDU in filter multi");
1266             
1267             package.session().close();
1268         }
1269     }
1270     m_p->release_frontend(package);
1271 }
1272
1273 void mp::filter::Multi::configure(const xmlNode * ptr, bool test_only,
1274                                   const char *path)
1275 {
1276     for (ptr = ptr->children; ptr; ptr = ptr->next)
1277     {
1278         if (ptr->type != XML_ELEMENT_NODE)
1279             continue;
1280         if (!strcmp((const char *) ptr->name, "target"))
1281         {
1282             std::string route = mp::xml::get_route(ptr);
1283             std::string target = mp::xml::get_text(ptr);
1284             m_p->m_route_patterns.push_back(Multi::Map(target, route));
1285         }
1286         else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1287         {
1288             m_p->m_hide_unavailable = true;
1289         }
1290         else if (!strcmp((const char *) ptr->name, "mergetype"))
1291         {
1292             std::string mergetype = mp::xml::get_text(ptr);
1293             if (mergetype == "roundrobin")
1294                 m_p->m_merge_type = round_robin;
1295             else if (mergetype == "serveorder")
1296                 m_p->m_merge_type = serve_order;
1297             else
1298                 throw mp::filter::FilterException
1299                     ("Bad mergetype "  + mergetype + " in multi filter");
1300
1301         }
1302         else
1303         {
1304             throw mp::filter::FilterException
1305                 ("Bad element " 
1306                  + std::string((const char *) ptr->name)
1307                  + " in multi filter");
1308         }
1309     }
1310 }
1311
1312 static mp::filter::Base* filter_creator()
1313 {
1314     return new mp::filter::Multi;
1315 }
1316
1317 extern "C" {
1318     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1319         0,
1320         "multi",
1321         filter_creator
1322     };
1323 }
1324
1325
1326 /*
1327  * Local variables:
1328  * c-basic-offset: 4
1329  * c-file-style: "Stroustrup"
1330  * indent-tabs-mode: nil
1331  * End:
1332  * vim: shiftwidth=4 tabstop=8 expandtab
1333  */
1334