multi: initial work on hide_errors tweak
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2012 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include <yaz/log.h>
20
21 #include "config.hpp"
22
23 #include <metaproxy/filter.hpp>
24 #include <metaproxy/package.hpp>
25
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition.hpp>
29 #include <boost/shared_ptr.hpp>
30
31 #include <metaproxy/util.hpp>
32 #include "filter_multi.hpp"
33
34 #include <yaz/zgdu.h>
35 #include <yaz/otherinfo.h>
36 #include <yaz/diagbib1.h>
37 #include <yaz/match_glob.h>
38
39 #include <vector>
40 #include <algorithm>
41 #include <map>
42 #include <iostream>
43
44 namespace mp = metaproxy_1;
45 namespace yf = mp::filter;
46
47 namespace metaproxy_1 {
48     namespace filter {
49         enum multi_merge_type {
50             round_robin,
51             serve_order
52         };
53         struct Multi::BackendSet {
54             BackendPtr m_backend;
55             int m_count;
56             bool operator < (const BackendSet &k) const;
57             bool operator == (const BackendSet &k) const;
58         };
59         struct Multi::ScanTermInfo {
60             std::string m_norm_term;
61             std::string m_display_term;
62             int m_count;
63             bool operator < (const ScanTermInfo &) const;
64             bool operator == (const ScanTermInfo &) const;
65             Z_Entry *get_entry(ODR odr);
66         };
67         struct Multi::FrontendSet {
68             class PresentJob {
69             public:
70                 BackendPtr m_backend;
71                 int m_pos; // position for backend (1=first, 2=second,..
72                 int m_start; // present request start
73                 PresentJob(BackendPtr ptr, int pos) : 
74                     m_backend(ptr), m_pos(pos), m_start(0) {};
75             };
76             FrontendSet(std::string setname);
77             FrontendSet();
78             ~FrontendSet();
79
80             void round_robin(int pos, int number, std::list<PresentJob> &job);
81             void serve_order(int pos, int number, std::list<PresentJob> &job);
82
83             std::list<BackendSet> m_backend_sets;
84             std::string m_setname;
85         };
86         struct Multi::Backend {
87             PackagePtr m_package;
88             std::string m_backend_database;
89             std::string m_vhost;
90             std::string m_route;
91             void operator() (void);  // thread operation
92         };
93         struct Multi::Frontend {
94             Frontend(Rep *rep);
95             ~Frontend();
96             bool m_is_multi;
97             bool m_in_use;
98             std::list<BackendPtr> m_backend_list;
99             std::map<std::string,Multi::FrontendSet> m_sets;
100
101             void multi_move(std::list<BackendPtr> &blist);
102             void init(Package &package, Z_GDU *gdu);
103             void close(Package &package);
104             void search(Package &package, Z_APDU *apdu);
105             void present(Package &package, Z_APDU *apdu);
106             void scan1(Package &package, Z_APDU *apdu);
107             void scan2(Package &package, Z_APDU *apdu);
108             void relay_apdu(Package &package, Z_APDU *apdu);
109             Rep *m_p;
110         };            
111         class Multi::Map {
112             std::string m_target_pattern;
113             std::string m_route;
114         public:
115             Map(std::string pattern, std::string route) : 
116                 m_target_pattern(pattern), m_route(route) {};
117             bool match(const std::string target, std::string *ret) const {
118                 if (yaz_match_glob(m_target_pattern.c_str(), target.c_str()))
119                 {
120                     *ret = m_route;
121                     return true;
122                 }
123                 return false;
124             };
125         };
126         class Multi::Rep {
127             friend class Multi;
128             friend struct Frontend;
129             
130             Rep();
131             FrontendPtr get_frontend(Package &package);
132             void release_frontend(Package &package);
133         private:
134             std::list<Multi::Map> m_route_patterns;
135             boost::mutex m_mutex;
136             boost::condition m_cond_session_ready;
137             std::map<mp::Session, FrontendPtr> m_clients;
138             bool m_hide_unavailable;
139             bool m_hide_errors;
140             multi_merge_type m_merge_type;
141         };
142     }
143 }
144
145 yf::Multi::Rep::Rep()
146 {
147     m_hide_unavailable = false;
148     m_hide_errors = false;
149     m_merge_type = round_robin;
150 }
151
152 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
153 {
154     return m_count < k.m_count;
155 }
156
157 yf::Multi::Frontend::Frontend(Rep *rep)
158 {
159     m_p = rep;
160     m_is_multi = false;
161 }
162
163 yf::Multi::Frontend::~Frontend()
164 {
165 }
166
167 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
168 {
169     boost::mutex::scoped_lock lock(m_mutex);
170
171     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
172     
173     while(true)
174     {
175         it = m_clients.find(package.session());
176         if (it == m_clients.end())
177             break;
178         
179         if (!it->second->m_in_use)
180         {
181             it->second->m_in_use = true;
182             return it->second;
183         }
184         m_cond_session_ready.wait(lock);
185     }
186     FrontendPtr f(new Frontend(this));
187     m_clients[package.session()] = f;
188     f->m_in_use = true;
189     return f;
190 }
191
192 void yf::Multi::Rep::release_frontend(mp::Package &package)
193 {
194     boost::mutex::scoped_lock lock(m_mutex);
195     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
196     
197     it = m_clients.find(package.session());
198     if (it != m_clients.end())
199     {
200         if (package.session().is_closed())
201         {
202             it->second->close(package);
203             m_clients.erase(it);
204         }
205         else
206         {
207             it->second->m_in_use = false;
208         }
209         m_cond_session_ready.notify_all();
210     }
211 }
212
213 yf::Multi::FrontendSet::FrontendSet(std::string setname)
214     :  m_setname(setname)
215 {
216 }
217
218
219 yf::Multi::FrontendSet::FrontendSet()
220 {
221 }
222
223
224 yf::Multi::FrontendSet::~FrontendSet()
225 {
226 }
227
228 yf::Multi::Multi() : m_p(new Multi::Rep)
229 {
230 }
231
232 yf::Multi::~Multi() {
233 }
234
235
236 void yf::Multi::Backend::operator() (void) 
237 {
238     m_package->move(m_route);
239 }
240
241
242 void yf::Multi::Frontend::close(mp::Package &package)
243 {
244     std::list<BackendPtr>::const_iterator bit;
245     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
246     {
247         BackendPtr b = *bit;
248
249         b->m_package->copy_filter(package);
250         b->m_package->request() = (Z_GDU *) 0;
251         b->m_package->session().close();
252         b->m_package->move(b->m_route);
253     }
254 }
255
256 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
257 {
258     std::list<BackendPtr>::const_iterator bit;
259     boost::thread_group g;
260     for (bit = blist.begin(); bit != blist.end(); bit++)
261     {
262         g.add_thread(new boost::thread(**bit));
263     }
264     g.join_all();
265 }
266
267 void yf::Multi::FrontendSet::serve_order(int start, int number,
268                                          std::list<PresentJob> &jobs)
269 {
270     int i;
271     for (i = 0; i < number; i++)
272     {
273         std::list<BackendSet>::const_iterator bsit;
274         int voffset = 0;
275         int offset = start + i - 1;
276         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); 
277              bsit++)
278         {
279             if (offset >= voffset && offset < voffset + bsit->m_count)
280             {
281                 PresentJob job(bsit->m_backend, offset - voffset + 1);
282                 jobs.push_back(job);
283                 break;
284             }
285             voffset += bsit->m_count;
286         }
287     }
288 }
289
290 void yf::Multi::FrontendSet::round_robin(int start, int number,
291                                          std::list<PresentJob> &jobs)
292 {
293     std::list<int> pos;
294     std::list<BackendSet>::const_iterator bsit;
295     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
296     {
297         pos.push_back(1);
298     }
299
300     int p = 1;
301 #if 1
302     // optimization step!
303     int omin = 0;
304     while(true)
305     {
306         int min = 0;
307         int no_left = 0;
308         // find min count for each set which is > omin
309         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
310         {
311             if (bsit->m_count > omin)
312             {
313                 if (no_left == 0 || bsit->m_count < min)
314                     min = bsit->m_count;
315                 no_left++;
316             }
317         }
318         if (no_left == 0) // if nothing greater than omin, bail out.
319             break;
320         int skip = no_left * min;
321         if (p + skip > start)  // step gets us "into" present range?
322         {
323             // Yes. skip until start.. Rounding off is deliberate!
324             min = (start-p) / no_left;
325             p += no_left * min;
326             
327             // update positions in each set..
328             std::list<int>::iterator psit = pos.begin();
329             for (psit = pos.begin(); psit != pos.end(); psit++)
330                 *psit += min;
331             break;
332         }
333         // skip on each set.. before "present range"..
334         p = p + skip;
335         
336         std::list<int>::iterator psit = pos.begin();
337         for (psit = pos.begin(); psit != pos.end(); psit++)
338             *psit += min;
339         
340         omin = min; // update so we consider next class (with higher count)
341     }
342 #endif
343     int fetched = 0;
344     bool more = true;
345     while (more)
346     {
347         more = false;
348         std::list<int>::iterator psit = pos.begin();
349         bsit = m_backend_sets.begin();
350
351         for (; bsit != m_backend_sets.end(); psit++,bsit++)
352         {
353             if (fetched >= number)
354             {
355                 more = false;
356                 break;
357             }
358             if (*psit <= bsit->m_count)
359             {
360                 if (p >= start)
361                 {
362                     PresentJob job(bsit->m_backend, *psit);
363                     jobs.push_back(job);
364                     fetched++;
365                 }
366                 (*psit)++;
367                 p++;
368                 more = true;
369             }
370         }
371     }
372 }
373
374 void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
375 {
376     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
377
378     std::list<std::string> targets;
379
380     mp::util::get_vhost_otherinfo(req->otherInfo, targets);
381
382     if (targets.size() < 1)
383     {
384         package.move();
385         return;
386     }
387
388     std::list<std::string>::const_iterator t_it = targets.begin();
389     for (; t_it != targets.end(); t_it++)
390     {
391         Session s;
392         Backend *b = new Backend;
393         b->m_vhost = *t_it;
394
395         std::list<Multi::Map>::const_iterator it =
396             m_p->m_route_patterns.begin();
397         while (it != m_p->m_route_patterns.end()) {
398             if (it->match(*t_it, &b->m_route))
399                 break;
400             it++;
401         }
402         // b->m_route = m_p->m_target_route[*t_it];
403         // b->m_route unset
404         b->m_package = PackagePtr(new Package(s, package.origin()));
405
406         m_backend_list.push_back(BackendPtr(b));
407     }
408     m_is_multi = true;
409
410     // create init request 
411     std::list<BackendPtr>::iterator bit;
412     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
413     {
414         mp::odr odr;
415         BackendPtr b = *bit;
416         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
417         
418         std::list<std::string>vhost_one;
419         vhost_one.push_back(b->m_vhost);
420         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
421                                        odr, vhost_one);
422
423
424         Z_InitRequest *breq = init_apdu->u.initRequest;
425
426         breq->idAuthentication = req->idAuthentication;
427         
428         *breq->preferredMessageSize = *req->preferredMessageSize;
429         *breq->maximumRecordSize = *req->maximumRecordSize;
430
431         ODR_MASK_SET(breq->options, Z_Options_search);
432         ODR_MASK_SET(breq->options, Z_Options_present);
433         ODR_MASK_SET(breq->options, Z_Options_namedResultSets);
434         ODR_MASK_SET(breq->options, Z_Options_scan);
435         
436         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_1);
437         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_2);
438         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_3);
439         
440         b->m_package->request() = init_apdu;
441
442         b->m_package->copy_filter(package);
443     }
444     multi_move(m_backend_list);
445
446     // create the frontend init response based on each backend init response
447     mp::odr odr;
448
449     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
450     Z_InitResponse *f_resp = f_apdu->u.initResponse;
451
452     ODR_MASK_SET(f_resp->options, Z_Options_search);
453     ODR_MASK_SET(f_resp->options, Z_Options_present);
454     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
455     ODR_MASK_SET(f_resp->options, Z_Options_scan);
456     
457     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
458     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
459     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
460
461     int no_failed = 0;
462     int no_succeeded = 0;
463
464     Odr_int preferredMessageSize = *req->preferredMessageSize;
465     Odr_int maximumRecordSize = *req->maximumRecordSize;
466     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
467     {
468         PackagePtr p = (*bit)->m_package;
469         
470         if (p->session().is_closed())
471         {
472             // failed. Remove from list and increment number of failed
473             no_failed++;
474             bit = m_backend_list.erase(bit);
475             continue;
476         }
477         Z_GDU *gdu = p->response().get();
478         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
479             Z_APDU_initResponse)
480         {
481             int i;
482             Z_APDU *b_apdu = gdu->u.z3950;
483             Z_InitResponse *b_resp = b_apdu->u.initResponse;
484
485             // common options for all backends
486             for (i = 0; i <= Z_Options_stringSchema; i++)
487             {
488                 if (!ODR_MASK_GET(b_resp->options, i))
489                     ODR_MASK_CLEAR(f_resp->options, i);
490             }
491             // common protocol version
492             for (i = 0; i <= Z_ProtocolVersion_3; i++)
493                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
494                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
495             if (*b_resp->result)
496             {
497                 no_succeeded++;
498                 if (preferredMessageSize > *b_resp->preferredMessageSize)
499                     preferredMessageSize = *b_resp->preferredMessageSize;
500                 if (maximumRecordSize > *b_resp->maximumRecordSize)
501                     maximumRecordSize = *b_resp->maximumRecordSize;
502             }
503             else
504                 no_failed++;
505         }
506         else
507             no_failed++;
508         bit++;
509     }
510     *f_resp->preferredMessageSize = preferredMessageSize;
511     *f_resp->maximumRecordSize = maximumRecordSize;
512
513     if (m_p->m_hide_unavailable)
514     {
515         if (no_succeeded == 0)
516         {
517             *f_resp->result = 0;
518             package.session().close();
519         }
520     }
521     else
522     {
523         if (no_failed)
524         {
525             *f_resp->result = 0;
526             package.session().close();
527         }
528     }
529     package.response() = f_apdu;
530 }
531
532 void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
533 {
534     // create search request 
535     Z_SearchRequest *req = apdu_req->u.searchRequest;
536
537     // save these for later
538     Odr_int smallSetUpperBound = *req->smallSetUpperBound;
539     Odr_int largeSetLowerBound = *req->largeSetLowerBound;
540     Odr_int mediumSetPresentNumber = *req->mediumSetPresentNumber;
541     
542     // they are altered now - to disable piggyback
543     *req->smallSetUpperBound = 0;
544     *req->largeSetLowerBound = 1;
545     *req->mediumSetPresentNumber = 1;
546
547     int default_num_db = req->num_databaseNames;
548     char **default_db = req->databaseNames;
549
550     std::list<BackendPtr>::const_iterator bit;
551     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
552     {
553         PackagePtr p = (*bit)->m_package;
554         mp::odr odr;
555     
556         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
557                                                 &req->num_databaseNames,
558                                                 &req->databaseNames))
559         {
560             req->num_databaseNames = default_num_db;
561             req->databaseNames = default_db;
562         }
563         p->request() = apdu_req;
564         p->copy_filter(package);
565     }
566     multi_move(m_backend_list);
567
568     // look at each response
569     FrontendSet resultSet(std::string(req->resultSetName));
570
571     mp::odr odr;
572     Odr_int result_set_size = 0;
573     Z_DiagRecs *z_diag = 0;
574     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
575     {
576         PackagePtr p = (*bit)->m_package;
577         
578         if (p->session().is_closed()) // if any backend closes, close frontend
579             package.session().close();
580         
581         Z_GDU *gdu = p->response().get();
582         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
583             Z_APDU_searchResponse)
584         {
585             Z_APDU *b_apdu = gdu->u.z3950;
586             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
587          
588             // see we get any errors (AKA diagnstics)
589             if (b_resp->records)
590             {
591                 if (b_resp->records->which == Z_Records_NSD)
592                 {
593                     if (!z_diag)
594                     {
595                         z_diag = (Z_DiagRecs *)
596                             odr_malloc(odr, sizeof(*z_diag));
597                         z_diag->num_diagRecs = 0;
598                         z_diag->diagRecs = (Z_DiagRec**)
599                             odr_malloc(odr, sizeof(*z_diag->diagRecs));
600                     }
601                     else
602                     {
603                         Z_DiagRec **n = (Z_DiagRec **)
604                             odr_malloc(odr,
605                                        (1+z_diag->num_diagRecs) * sizeof(*n));
606                         memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs
607                                * sizeof(*n));
608                         z_diag->diagRecs = n;
609                     }
610                     Z_DiagRec *nr = (Z_DiagRec *) odr_malloc(odr, sizeof(*nr));
611                     nr->which = Z_DiagRec_defaultFormat;
612                     nr->u.defaultFormat =
613                         b_resp->records->u.nonSurrogateDiagnostic;
614                     z_diag->diagRecs[z_diag->num_diagRecs++] = nr;
615                 }
616                 if (b_resp->records->which == Z_Records_multipleNSD)
617                 {
618                     // we may set this multiple times (TOO BAD!)
619                     z_diag = b_resp->records->u.multipleNonSurDiagnostics;
620                 }
621             }
622             BackendSet backendSet;
623             backendSet.m_backend = *bit;
624             backendSet.m_count = *b_resp->resultCount;
625             result_set_size += *b_resp->resultCount;
626             resultSet.m_backend_sets.push_back(backendSet);
627         }
628         else
629         {
630             // if any target does not return search response - return that 
631             package.response() = p->response();
632             return;
633         }
634     }
635
636     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
637     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
638
639     *f_resp->resultCount = result_set_size;
640     if (z_diag)
641     {
642         f_resp->records = (Z_Records *)
643             odr_malloc(odr, sizeof(*f_resp->records));
644         if (z_diag->num_diagRecs > 1)
645         {
646             f_resp->records->which = Z_Records_multipleNSD;
647             f_resp->records->u.multipleNonSurDiagnostics = z_diag;
648         }
649         else
650         {
651             f_resp->records->which = Z_Records_NSD;
652             f_resp->records->u.nonSurrogateDiagnostic =
653                 z_diag->diagRecs[0]->u.defaultFormat;
654         }
655     }
656     // assume OK
657     m_sets[resultSet.m_setname] = resultSet;
658
659     Odr_int number;
660     mp::util::piggyback(smallSetUpperBound,
661                         largeSetLowerBound,
662                         mediumSetPresentNumber,
663                         0, 0,
664                         result_set_size,
665                         number, 0);
666     Package pp(package.session(), package.origin());
667     if (z_diag == 0 && number > 0)
668     {
669         pp.copy_filter(package);
670         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
671         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
672         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
673         p_req->resultSetId = req->resultSetName;
674         *p_req->resultSetStartPoint = 1;
675         *p_req->numberOfRecordsRequested = number;
676         pp.request() = p_apdu;
677         present(pp, p_apdu);
678         
679         if (pp.session().is_closed())
680             package.session().close();
681         
682         Z_GDU *gdu = pp.response().get();
683         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
684             Z_APDU_presentResponse)
685         {
686             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
687             f_resp->records = p_res->records;
688             *f_resp->numberOfRecordsReturned = 
689                 *p_res->numberOfRecordsReturned;
690             *f_resp->nextResultSetPosition = 
691                 *p_res->nextResultSetPosition;
692         }
693         else 
694         {
695             package.response() = pp.response(); 
696             return;
697         }
698     }
699     package.response() = f_apdu; // in this scope because of p
700 }
701
702 void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
703 {
704     // create present request 
705     Z_PresentRequest *req = apdu_req->u.presentRequest;
706
707     Sets_it it;
708     it = m_sets.find(std::string(req->resultSetId));
709     if (it == m_sets.end())
710     {
711         mp::odr odr;
712         Z_APDU *apdu = 
713             odr.create_presentResponse(
714                 apdu_req,
715                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
716                 req->resultSetId);
717         package.response() = apdu;
718         return;
719     }
720     std::list<Multi::FrontendSet::PresentJob> jobs;
721     int start = *req->resultSetStartPoint;
722     int number = *req->numberOfRecordsRequested;
723
724     if (m_p->m_merge_type == round_robin)
725         it->second.round_robin(start, number, jobs);
726     else if (m_p->m_merge_type == serve_order)
727         it->second.serve_order(start, number, jobs);
728
729     if (0)
730     {
731         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
732         for (jit = jobs.begin(); jit != jobs.end(); jit++)
733         {
734             yaz_log(YLOG_LOG, "job pos=%d", jit->m_pos);
735         }
736     }
737
738     std::list<BackendPtr> present_backend_list;
739
740     std::list<BackendSet>::const_iterator bsit;
741     bsit = it->second.m_backend_sets.begin();
742     for (; bsit != it->second.m_backend_sets.end(); bsit++)
743     {
744         int start = -1;
745         int end = -1;
746         {
747             std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
748             for (jit = jobs.begin(); jit != jobs.end(); jit++)
749             {
750                 if (jit->m_backend == bsit->m_backend)
751                 {
752                     if (start == -1 || jit->m_pos < start)
753                         start = jit->m_pos;
754                     if (end == -1 || jit->m_pos > end)
755                         end = jit->m_pos;
756                 }
757             }
758         }
759         if (start != -1)
760         {
761             std::list<Multi::FrontendSet::PresentJob>::iterator jit;
762             for (jit = jobs.begin(); jit != jobs.end(); jit++)
763             {
764                 if (jit->m_backend == bsit->m_backend)
765                 {
766                     if (jit->m_pos >= start && jit->m_pos <= end)
767                         jit->m_start = start;
768                 }
769             }
770
771             PackagePtr p = bsit->m_backend->m_package;
772
773             *req->resultSetStartPoint = start;
774             *req->numberOfRecordsRequested = end - start + 1;
775             
776             p->request() = apdu_req;
777             p->copy_filter(package);
778
779             present_backend_list.push_back(bsit->m_backend);
780         }
781     }
782     multi_move(present_backend_list);
783
784     // look at each response
785     Z_Records *z_records_diag = 0;
786
787     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
788     for (; pbit != present_backend_list.end(); pbit++)
789     {
790         PackagePtr p = (*pbit)->m_package;
791         
792         if (p->session().is_closed()) // if any backend closes, close frontend
793             package.session().close();
794         
795         Z_GDU *gdu = p->response().get();
796         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
797             Z_APDU_presentResponse)
798         {
799             Z_APDU *b_apdu = gdu->u.z3950;
800             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
801          
802             // see we get any errors (AKA diagnstics)
803             if (b_resp->records)
804             {
805                 if (b_resp->records->which != Z_Records_DBOSD)
806                     z_records_diag = b_resp->records;
807                 // we may set this multiple times (TOO BAD!)
808             }
809         }
810         else
811         {
812             // if any target does not return present response - return that 
813             package.response() = p->response();
814             return;
815         }
816     }
817
818     mp::odr odr;
819     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
820     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
821
822     if (z_records_diag)
823     {
824         f_resp->records = z_records_diag;
825         *f_resp->presentStatus = Z_PresentStatus_failure;
826     }
827     else if (number < 0 || (size_t) number > jobs.size())
828     {
829         f_apdu = 
830             odr.create_presentResponse(
831                 apdu_req,
832                 YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
833                 0);
834     }
835     else
836     {
837         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
838         Z_Records * records = f_resp->records;
839         records->which = Z_Records_DBOSD;
840         records->u.databaseOrSurDiagnostics =
841             (Z_NamePlusRecordList *)
842             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
843         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
844         nprl->num_records = jobs.size();
845         nprl->records = (Z_NamePlusRecord**)
846             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
847         int i = 0;
848         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
849         for (jit = jobs.begin(); jit != jobs.end(); jit++, i++)
850         {
851             PackagePtr p = jit->m_backend->m_package;
852             
853             Z_GDU *gdu = p->response().get();
854             Z_APDU *b_apdu = gdu->u.z3950;
855             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
856
857             nprl->records[i] = (Z_NamePlusRecord*)
858                 odr_malloc(odr, sizeof(Z_NamePlusRecord));
859             int inside_pos = jit->m_pos - jit->m_start;
860             if (inside_pos >= b_resp->records->
861                 u.databaseOrSurDiagnostics->num_records)
862                 break;
863             *nprl->records[i] = *b_resp->records->
864                 u.databaseOrSurDiagnostics->records[inside_pos];
865             nprl->records[i]->databaseName =
866                     odr_strdup(odr, jit->m_backend->m_vhost.c_str());
867         }
868         nprl->num_records = i; // usually same as jobs.size();
869         *f_resp->nextResultSetPosition = start + i;
870         *f_resp->numberOfRecordsReturned = i;
871     }
872     package.response() = f_apdu;
873 }
874
875 void yf::Multi::Frontend::scan1(mp::Package &package, Z_APDU *apdu_req)
876 {
877     if (m_backend_list.size() > 1)
878     {
879         mp::odr odr;
880         Z_APDU *f_apdu = 
881             odr.create_scanResponse(
882                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
883         package.response() = f_apdu;
884         return;
885     }
886     Z_ScanRequest *req = apdu_req->u.scanRequest;
887
888     int default_num_db = req->num_databaseNames;
889     char **default_db = req->databaseNames;
890
891     std::list<BackendPtr>::const_iterator bit;
892     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
893     {
894         PackagePtr p = (*bit)->m_package;
895         mp::odr odr;
896     
897         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
898                                                 &req->num_databaseNames,
899                                                 &req->databaseNames))
900         {
901             req->num_databaseNames = default_num_db;
902             req->databaseNames = default_db;
903         }
904         p->request() = apdu_req;
905         p->copy_filter(package);
906     }
907     multi_move(m_backend_list);
908
909     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
910     {
911         PackagePtr p = (*bit)->m_package;
912         
913         if (p->session().is_closed()) // if any backend closes, close frontend
914             package.session().close();
915         
916         Z_GDU *gdu = p->response().get();
917         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
918             Z_APDU_scanResponse)
919         {
920             package.response() = p->response();
921             break;
922         }
923         else
924         {
925             // if any target does not return scan response - return that 
926             package.response() = p->response();
927             return;
928         }
929     }
930 }
931
932 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
933 {
934     return m_norm_term < k.m_norm_term;
935 }
936
937 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
938 {
939     return m_norm_term == k.m_norm_term;
940 }
941
942 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
943 {
944     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
945     e->which = Z_Entry_termInfo;
946     Z_TermInfo *t;
947     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
948     t->suggestedAttributes = 0;
949     t->displayTerm = 0;
950     t->alternativeTerm = 0;
951     t->byAttributes = 0;
952     t->otherTermInfo = 0;
953     t->globalOccurrences = odr_intdup(odr, m_count);
954     t->term = (Z_Term *)
955         odr_malloc(odr, sizeof(*t->term));
956     t->term->which = Z_Term_general;
957     Odr_oct *o;
958     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
959
960     o->len = o->size = m_norm_term.size();
961     o->buf = (unsigned char *) odr_malloc(odr, o->len);
962     memcpy(o->buf, m_norm_term.c_str(), o->len);
963     return e;
964 }
965
966 void yf::Multi::Frontend::relay_apdu(mp::Package &package, Z_APDU *apdu_req)
967 {
968     std::list<BackendPtr>::const_iterator bit;
969     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
970     {
971         PackagePtr p = (*bit)->m_package;
972         mp::odr odr;
973     
974         p->request() = apdu_req;
975         p->copy_filter(package);
976     }
977     multi_move(m_backend_list);
978     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
979     {
980         PackagePtr p = (*bit)->m_package;
981         
982         if (p->session().is_closed()) // if any backend closes, close frontend
983             package.session().close();
984         
985         package.response() = p->response();
986     }
987 }
988
989
990 void yf::Multi::Frontend::scan2(mp::Package &package, Z_APDU *apdu_req)
991 {
992     Z_ScanRequest *req = apdu_req->u.scanRequest;
993
994     int default_num_db = req->num_databaseNames;
995     char **default_db = req->databaseNames;
996
997     std::list<BackendPtr>::const_iterator bit;
998     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
999     {
1000         PackagePtr p = (*bit)->m_package;
1001         mp::odr odr;
1002     
1003         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
1004                                                 &req->num_databaseNames,
1005                                                 &req->databaseNames))
1006         {
1007             req->num_databaseNames = default_num_db;
1008             req->databaseNames = default_db;
1009         }
1010         p->request() = apdu_req;
1011         p->copy_filter(package);
1012     }
1013     multi_move(m_backend_list);
1014
1015     ScanTermInfoList entries_before;
1016     ScanTermInfoList entries_after;
1017     int no_before = 0;
1018     int no_after = 0;
1019
1020     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1021     {
1022         PackagePtr p = (*bit)->m_package;
1023         
1024         if (p->session().is_closed()) // if any backend closes, close frontend
1025             package.session().close();
1026         
1027         Z_GDU *gdu = p->response().get();
1028         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1029             Z_APDU_scanResponse)
1030         {
1031             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
1032
1033             if (res->entries && res->entries->nonsurrogateDiagnostics)
1034             {
1035                 // failure
1036                 mp::odr odr;
1037                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
1038                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
1039
1040                 f_res->entries->nonsurrogateDiagnostics = 
1041                     res->entries->nonsurrogateDiagnostics;
1042                 f_res->entries->num_nonsurrogateDiagnostics = 
1043                     res->entries->num_nonsurrogateDiagnostics;
1044
1045                 package.response() = f_apdu;
1046                 return;
1047             }
1048
1049             if (res->entries && res->entries->entries)
1050             {
1051                 Z_Entry **entries = res->entries->entries;
1052                 int num_entries = res->entries->num_entries;
1053                 int position = 1;
1054                 if (req->preferredPositionInResponse)
1055                     position = *req->preferredPositionInResponse;
1056                 if (res->positionOfTerm)
1057                     position = *res->positionOfTerm;
1058
1059                 // before
1060                 int i;
1061                 for (i = 0; i<position-1 && i<num_entries; i++)
1062                 {
1063                     Z_Entry *ent = entries[i];
1064
1065                     if (ent->which == Z_Entry_termInfo)
1066                     {
1067                         ScanTermInfo my;
1068
1069                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1070                         my.m_count = occur ? *occur : 0;
1071
1072                         if (ent->u.termInfo->term->which == Z_Term_general)
1073                         {
1074                             my.m_norm_term = std::string(
1075                                 (const char *)
1076                                 ent->u.termInfo->term->u.general->buf,
1077                                 ent->u.termInfo->term->u.general->len);
1078                         }
1079                         if (my.m_norm_term.length())
1080                         {
1081                             ScanTermInfoList::iterator it = 
1082                                 entries_before.begin();
1083                             while (it != entries_before.end() && my <*it)
1084                                 it++;
1085                             if (my == *it)
1086                             {
1087                                 it->m_count += my.m_count;
1088                             }
1089                             else
1090                             {
1091                                 entries_before.insert(it, my);
1092                                 no_before++;
1093                             }
1094                         }
1095                     }
1096                 }
1097                 // after
1098                 if (position <= 0)
1099                     i = 0;
1100                 else
1101                     i = position-1;
1102                 for ( ; i<num_entries; i++)
1103                 {
1104                     Z_Entry *ent = entries[i];
1105
1106                     if (ent->which == Z_Entry_termInfo)
1107                     {
1108                         ScanTermInfo my;
1109
1110                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1111                         my.m_count = occur ? *occur : 0;
1112
1113                         if (ent->u.termInfo->term->which == Z_Term_general)
1114                         {
1115                             my.m_norm_term = std::string(
1116                                 (const char *)
1117                                 ent->u.termInfo->term->u.general->buf,
1118                                 ent->u.termInfo->term->u.general->len);
1119                         }
1120                         if (my.m_norm_term.length())
1121                         {
1122                             ScanTermInfoList::iterator it = 
1123                                 entries_after.begin();
1124                             while (it != entries_after.end() && *it < my)
1125                                 it++;
1126                             if (my == *it)
1127                             {
1128                                 it->m_count += my.m_count;
1129                             }
1130                             else
1131                             {
1132                                 entries_after.insert(it, my);
1133                                 no_after++;
1134                             }
1135                         }
1136                     }
1137                 }
1138
1139             }                
1140         }
1141         else
1142         {
1143             // if any target does not return scan response - return that 
1144             package.response() = p->response();
1145             return;
1146         }
1147     }
1148
1149     if (false)
1150     {
1151         std::cout << "BEFORE\n";
1152         ScanTermInfoList::iterator it = entries_before.begin();
1153         for(; it != entries_before.end(); it++)
1154         {
1155             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1156         }
1157         
1158         std::cout << "AFTER\n";
1159         it = entries_after.begin();
1160         for(; it != entries_after.end(); it++)
1161         {
1162             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1163         }
1164     }
1165
1166     if (false)
1167     {
1168         mp::odr odr;
1169         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1170         package.response() = f_apdu;
1171     }
1172     else
1173     {
1174         mp::odr odr;
1175         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1176         Z_ScanResponse *resp = f_apdu->u.scanResponse;
1177         
1178         int number_returned = *req->numberOfTermsRequested;
1179         int position_returned = *req->preferredPositionInResponse;
1180         
1181         resp->entries->num_entries = number_returned;
1182         resp->entries->entries = (Z_Entry**)
1183             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1184         int i;
1185
1186         int lbefore = entries_before.size();
1187         if (lbefore < position_returned-1)
1188             position_returned = lbefore+1;
1189
1190         ScanTermInfoList::iterator it = entries_before.begin();
1191         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1192         {
1193             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1194         }
1195
1196         it = entries_after.begin();
1197
1198         if (position_returned <= 0)
1199             i = 0;
1200         else
1201             i = position_returned-1;
1202         for (; i<number_returned && it != entries_after.end(); i++, it++)
1203         {
1204             resp->entries->entries[i] = it->get_entry(odr);
1205         }
1206
1207         number_returned = i;
1208
1209         resp->positionOfTerm = odr_intdup(odr, position_returned);
1210         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1211         resp->entries->num_entries = number_returned;
1212
1213         package.response() = f_apdu;
1214     }
1215 }
1216
1217
1218 void yf::Multi::process(mp::Package &package) const
1219 {
1220     FrontendPtr f = m_p->get_frontend(package);
1221
1222     Z_GDU *gdu = package.request().get();
1223     
1224     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1225         Z_APDU_initRequest && !f->m_is_multi)
1226     {
1227         f->init(package, gdu);
1228     }
1229     else if (!f->m_is_multi)
1230         package.move();
1231     else if (gdu && gdu->which == Z_GDU_Z3950)
1232     {
1233         Z_APDU *apdu = gdu->u.z3950;
1234         if (apdu->which == Z_APDU_initRequest)
1235         {
1236             mp::odr odr;
1237             
1238             package.response() = odr.create_close(
1239                 apdu,
1240                 Z_Close_protocolError,
1241                 "double init");
1242             
1243             package.session().close();
1244         }
1245         else if (apdu->which == Z_APDU_searchRequest)
1246         {
1247             f->search(package, apdu);
1248         }
1249         else if (apdu->which == Z_APDU_presentRequest)
1250         {
1251             f->present(package, apdu);
1252         }
1253         else if (apdu->which == Z_APDU_scanRequest)
1254         {
1255             f->scan2(package, apdu);
1256         }
1257         else if (apdu->which == Z_APDU_close)
1258         {
1259             f->relay_apdu(package, apdu);
1260         }
1261         else
1262         {
1263             mp::odr odr;
1264             
1265             package.response() = odr.create_close(
1266                 apdu, Z_Close_protocolError,
1267                 "unsupported APDU in filter multi");
1268             
1269             package.session().close();
1270         }
1271     }
1272     m_p->release_frontend(package);
1273 }
1274
1275 void mp::filter::Multi::configure(const xmlNode * ptr, bool test_only,
1276                                   const char *path)
1277 {
1278     for (ptr = ptr->children; ptr; ptr = ptr->next)
1279     {
1280         if (ptr->type != XML_ELEMENT_NODE)
1281             continue;
1282         if (!strcmp((const char *) ptr->name, "target"))
1283         {
1284             std::string route = mp::xml::get_route(ptr);
1285             std::string target = mp::xml::get_text(ptr);
1286             m_p->m_route_patterns.push_back(Multi::Map(target, route));
1287         }
1288         else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1289         {
1290             m_p->m_hide_unavailable = true;
1291         }
1292         else if (!strcmp((const char *) ptr->name, "hideerrors"))
1293         {
1294             m_p->m_hide_errors = true;
1295         }
1296         else if (!strcmp((const char *) ptr->name, "mergetype"))
1297         {
1298             std::string mergetype = mp::xml::get_text(ptr);
1299             if (mergetype == "roundrobin")
1300                 m_p->m_merge_type = round_robin;
1301             else if (mergetype == "serveorder")
1302                 m_p->m_merge_type = serve_order;
1303             else
1304                 throw mp::filter::FilterException
1305                     ("Bad mergetype "  + mergetype + " in multi filter");
1306
1307         }
1308         else
1309         {
1310             throw mp::filter::FilterException
1311                 ("Bad element " 
1312                  + std::string((const char *) ptr->name)
1313                  + " in multi filter");
1314         }
1315     }
1316 }
1317
1318 static mp::filter::Base* filter_creator()
1319 {
1320     return new mp::filter::Multi;
1321 }
1322
1323 extern "C" {
1324     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1325         0,
1326         "multi",
1327         filter_creator
1328     };
1329 }
1330
1331
1332 /*
1333  * Local variables:
1334  * c-basic-offset: 4
1335  * c-file-style: "Stroustrup"
1336  * indent-tabs-mode: nil
1337  * End:
1338  * vim: shiftwidth=4 tabstop=8 expandtab
1339  */
1340