multi: hideerrors also in use for present response
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2012 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include <yaz/log.h>
20
21 #include "config.hpp"
22
23 #include <metaproxy/filter.hpp>
24 #include <metaproxy/package.hpp>
25
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition.hpp>
29 #include <boost/shared_ptr.hpp>
30
31 #include <metaproxy/util.hpp>
32 #include "filter_multi.hpp"
33
34 #include <yaz/zgdu.h>
35 #include <yaz/otherinfo.h>
36 #include <yaz/diagbib1.h>
37 #include <yaz/match_glob.h>
38
39 #include <vector>
40 #include <algorithm>
41 #include <map>
42 #include <iostream>
43
44 namespace mp = metaproxy_1;
45 namespace yf = mp::filter;
46
47 namespace metaproxy_1 {
48     namespace filter {
49         enum multi_merge_type {
50             round_robin,
51             serve_order
52         };
53         struct Multi::BackendSet {
54             BackendPtr m_backend;
55             int m_count;
56             bool operator < (const BackendSet &k) const;
57             bool operator == (const BackendSet &k) const;
58         };
59         struct Multi::ScanTermInfo {
60             std::string m_norm_term;
61             std::string m_display_term;
62             int m_count;
63             bool operator < (const ScanTermInfo &) const;
64             bool operator == (const ScanTermInfo &) const;
65             Z_Entry *get_entry(ODR odr);
66         };
67         struct Multi::FrontendSet {
68             class PresentJob {
69             public:
70                 BackendPtr m_backend;
71                 int m_pos; // position for backend (1=first, 2=second,..
72                 int m_start; // present request start
73                 PresentJob(BackendPtr ptr, int pos) :
74                     m_backend(ptr), m_pos(pos), m_start(0) {};
75             };
76             FrontendSet(std::string setname);
77             FrontendSet();
78             ~FrontendSet();
79
80             void round_robin(int pos, int number, std::list<PresentJob> &job);
81             void serve_order(int pos, int number, std::list<PresentJob> &job);
82
83             std::list<BackendSet> m_backend_sets;
84             std::string m_setname;
85         };
86         struct Multi::Backend {
87             PackagePtr m_package;
88             std::string m_backend_database;
89             std::string m_vhost;
90             std::string m_route;
91             std::string m_auth;
92             void operator() (void);  // thread operation
93         };
94         struct Multi::Frontend {
95             Frontend(Rep *rep);
96             ~Frontend();
97             bool m_is_multi;
98             bool m_in_use;
99             std::list<BackendPtr> m_backend_list;
100             std::map<std::string,Multi::FrontendSet> m_sets;
101
102             void multi_move(std::list<BackendPtr> &blist);
103             void init(Package &package, Z_GDU *gdu);
104             void close(Package &package);
105             void search(Package &package, Z_APDU *apdu);
106             void present(Package &package, Z_APDU *apdu);
107             void scan1(Package &package, Z_APDU *apdu);
108             void scan2(Package &package, Z_APDU *apdu);
109             void relay_apdu(Package &package, Z_APDU *apdu);
110             void record_diagnostics(Z_Records *records, 
111                                     Z_DiagRecs * &z_diag,
112                                     ODR odr,
113                                     int &no_successful);
114             Rep *m_p;
115         };
116         class Multi::Map {
117             std::string m_target_pattern;
118             std::string m_route;
119             std::string m_auth;
120         public:
121             Map(std::string pattern, std::string route, std::string auth) :
122                 m_target_pattern(pattern), m_route(route), m_auth(auth) {};
123             bool match(const std::string target, std::string *ret,
124                        std::string *auth) const {
125                 if (yaz_match_glob(m_target_pattern.c_str(), target.c_str()))
126                 {
127                     *ret = m_route;
128                     *auth = m_auth;
129                     return true;
130                 }
131                 return false;
132             };
133         };
134         class Multi::Rep {
135             friend class Multi;
136             friend struct Frontend;
137
138             Rep();
139             FrontendPtr get_frontend(Package &package);
140             void release_frontend(Package &package);
141         private:
142             std::list<Multi::Map> m_route_patterns;
143             boost::mutex m_mutex;
144             boost::condition m_cond_session_ready;
145             std::map<mp::Session, FrontendPtr> m_clients;
146             bool m_hide_unavailable;
147             bool m_hide_errors;
148             multi_merge_type m_merge_type;
149         };
150     }
151 }
152
153 yf::Multi::Rep::Rep()
154 {
155     m_hide_unavailable = false;
156     m_hide_errors = false;
157     m_merge_type = round_robin;
158 }
159
160 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
161 {
162     return m_count < k.m_count;
163 }
164
165 yf::Multi::Frontend::Frontend(Rep *rep)
166 {
167     m_p = rep;
168     m_is_multi = false;
169 }
170
171 yf::Multi::Frontend::~Frontend()
172 {
173 }
174
175 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
176 {
177     boost::mutex::scoped_lock lock(m_mutex);
178
179     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
180
181     while(true)
182     {
183         it = m_clients.find(package.session());
184         if (it == m_clients.end())
185             break;
186
187         if (!it->second->m_in_use)
188         {
189             it->second->m_in_use = true;
190             return it->second;
191         }
192         m_cond_session_ready.wait(lock);
193     }
194     FrontendPtr f(new Frontend(this));
195     m_clients[package.session()] = f;
196     f->m_in_use = true;
197     return f;
198 }
199
200 void yf::Multi::Rep::release_frontend(mp::Package &package)
201 {
202     boost::mutex::scoped_lock lock(m_mutex);
203     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
204
205     it = m_clients.find(package.session());
206     if (it != m_clients.end())
207     {
208         if (package.session().is_closed())
209         {
210             it->second->close(package);
211             m_clients.erase(it);
212         }
213         else
214         {
215             it->second->m_in_use = false;
216         }
217         m_cond_session_ready.notify_all();
218     }
219 }
220
221 yf::Multi::FrontendSet::FrontendSet(std::string setname)
222     :  m_setname(setname)
223 {
224 }
225
226
227 yf::Multi::FrontendSet::FrontendSet()
228 {
229 }
230
231
232 yf::Multi::FrontendSet::~FrontendSet()
233 {
234 }
235
236 yf::Multi::Multi() : m_p(new Multi::Rep)
237 {
238 }
239
240 yf::Multi::~Multi() {
241 }
242
243
244 void yf::Multi::Backend::operator() (void)
245 {
246     m_package->move(m_route);
247 }
248
249
250 void yf::Multi::Frontend::close(mp::Package &package)
251 {
252     std::list<BackendPtr>::const_iterator bit;
253     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
254     {
255         BackendPtr b = *bit;
256
257         b->m_package->copy_filter(package);
258         b->m_package->request() = (Z_GDU *) 0;
259         b->m_package->session().close();
260         b->m_package->move(b->m_route);
261     }
262 }
263
264 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
265 {
266     std::list<BackendPtr>::const_iterator bit;
267     boost::thread_group g;
268     for (bit = blist.begin(); bit != blist.end(); bit++)
269     {
270         g.add_thread(new boost::thread(**bit));
271     }
272     g.join_all();
273 }
274
275 void yf::Multi::FrontendSet::serve_order(int start, int number,
276                                          std::list<PresentJob> &jobs)
277 {
278     int i;
279     for (i = 0; i < number; i++)
280     {
281         std::list<BackendSet>::const_iterator bsit;
282         int voffset = 0;
283         int offset = start + i - 1;
284         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end();
285              bsit++)
286         {
287             if (offset >= voffset && offset < voffset + bsit->m_count)
288             {
289                 PresentJob job(bsit->m_backend, offset - voffset + 1);
290                 jobs.push_back(job);
291                 break;
292             }
293             voffset += bsit->m_count;
294         }
295     }
296 }
297
298 void yf::Multi::FrontendSet::round_robin(int start, int number,
299                                          std::list<PresentJob> &jobs)
300 {
301     std::list<int> pos;
302     std::list<BackendSet>::const_iterator bsit;
303     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
304     {
305         pos.push_back(1);
306     }
307
308     int p = 1;
309 #if 1
310     // optimization step!
311     int omin = 0;
312     while(true)
313     {
314         int min = 0;
315         int no_left = 0;
316         // find min count for each set which is > omin
317         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
318         {
319             if (bsit->m_count > omin)
320             {
321                 if (no_left == 0 || bsit->m_count < min)
322                     min = bsit->m_count;
323                 no_left++;
324             }
325         }
326         if (no_left == 0) // if nothing greater than omin, bail out.
327             break;
328         int skip = no_left * min;
329         if (p + skip > start)  // step gets us "into" present range?
330         {
331             // Yes. skip until start.. Rounding off is deliberate!
332             min = (start-p) / no_left;
333             p += no_left * min;
334
335             // update positions in each set..
336             std::list<int>::iterator psit = pos.begin();
337             for (psit = pos.begin(); psit != pos.end(); psit++)
338                 *psit += min;
339             break;
340         }
341         // skip on each set.. before "present range"..
342         p = p + skip;
343
344         std::list<int>::iterator psit = pos.begin();
345         for (psit = pos.begin(); psit != pos.end(); psit++)
346             *psit += min;
347
348         omin = min; // update so we consider next class (with higher count)
349     }
350 #endif
351     int fetched = 0;
352     bool more = true;
353     while (more)
354     {
355         more = false;
356         std::list<int>::iterator psit = pos.begin();
357         bsit = m_backend_sets.begin();
358
359         for (; bsit != m_backend_sets.end(); psit++,bsit++)
360         {
361             if (fetched >= number)
362             {
363                 more = false;
364                 break;
365             }
366             if (*psit <= bsit->m_count)
367             {
368                 if (p >= start)
369                 {
370                     PresentJob job(bsit->m_backend, *psit);
371                     jobs.push_back(job);
372                     fetched++;
373                 }
374                 (*psit)++;
375                 p++;
376                 more = true;
377             }
378         }
379     }
380 }
381
382 void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
383 {
384     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
385
386     std::list<std::string> targets;
387
388     mp::util::get_vhost_otherinfo(req->otherInfo, targets);
389
390     if (targets.size() < 1)
391     {
392         package.move();
393         return;
394     }
395
396     std::list<std::string>::const_iterator t_it = targets.begin();
397     for (; t_it != targets.end(); t_it++)
398     {
399         Session s;
400         Backend *b = new Backend;
401         b->m_vhost = *t_it;
402
403         std::list<Multi::Map>::const_iterator it =
404             m_p->m_route_patterns.begin();
405         while (it != m_p->m_route_patterns.end()) {
406             if (it->match(*t_it, &b->m_route, &b->m_auth))
407                 break;
408             it++;
409         }
410         // b->m_route = m_p->m_target_route[*t_it];
411         // b->m_route unset
412         b->m_package = PackagePtr(new Package(s, package.origin()));
413
414         m_backend_list.push_back(BackendPtr(b));
415     }
416     m_is_multi = true;
417
418     // create init request
419     std::list<BackendPtr>::iterator bit;
420     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
421     {
422         mp::odr odr;
423         BackendPtr b = *bit;
424         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
425
426         std::list<std::string>vhost_one;
427         vhost_one.push_back(b->m_vhost);
428         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
429                                        odr, vhost_one);
430
431
432         Z_InitRequest *breq = init_apdu->u.initRequest;
433
434         if (b->m_auth.length())
435         {
436             breq->idAuthentication =
437                 (Z_IdAuthentication *)
438                 odr_malloc(odr, sizeof(*breq->idAuthentication));
439             breq->idAuthentication->which = Z_IdAuthentication_open;
440             breq->idAuthentication->u.open = odr_strdup(odr, b->m_auth.c_str());
441         }
442         else
443             breq->idAuthentication = req->idAuthentication;
444
445         *breq->preferredMessageSize = *req->preferredMessageSize;
446         *breq->maximumRecordSize = *req->maximumRecordSize;
447
448         ODR_MASK_SET(breq->options, Z_Options_search);
449         ODR_MASK_SET(breq->options, Z_Options_present);
450         ODR_MASK_SET(breq->options, Z_Options_namedResultSets);
451         ODR_MASK_SET(breq->options, Z_Options_scan);
452
453         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_1);
454         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_2);
455         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_3);
456
457         b->m_package->request() = init_apdu;
458
459         b->m_package->copy_filter(package);
460     }
461     multi_move(m_backend_list);
462
463     // create the frontend init response based on each backend init response
464     mp::odr odr;
465
466     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
467     Z_InitResponse *f_resp = f_apdu->u.initResponse;
468
469     ODR_MASK_SET(f_resp->options, Z_Options_search);
470     ODR_MASK_SET(f_resp->options, Z_Options_present);
471     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
472     ODR_MASK_SET(f_resp->options, Z_Options_scan);
473
474     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
475     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
476     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
477
478     int no_failed = 0;
479     int no_succeeded = 0;
480
481     Odr_int preferredMessageSize = *req->preferredMessageSize;
482     Odr_int maximumRecordSize = *req->maximumRecordSize;
483     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
484     {
485         PackagePtr p = (*bit)->m_package;
486
487         if (p->session().is_closed())
488         {
489             // failed. Remove from list and increment number of failed
490             no_failed++;
491             bit = m_backend_list.erase(bit);
492             continue;
493         }
494         Z_GDU *gdu = p->response().get();
495         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
496             Z_APDU_initResponse)
497         {
498             int i;
499             Z_APDU *b_apdu = gdu->u.z3950;
500             Z_InitResponse *b_resp = b_apdu->u.initResponse;
501
502             // common options for all backends
503             for (i = 0; i <= Z_Options_stringSchema; i++)
504             {
505                 if (!ODR_MASK_GET(b_resp->options, i))
506                     ODR_MASK_CLEAR(f_resp->options, i);
507             }
508             // common protocol version
509             for (i = 0; i <= Z_ProtocolVersion_3; i++)
510                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
511                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
512             if (*b_resp->result)
513             {
514                 no_succeeded++;
515                 if (preferredMessageSize > *b_resp->preferredMessageSize)
516                     preferredMessageSize = *b_resp->preferredMessageSize;
517                 if (maximumRecordSize > *b_resp->maximumRecordSize)
518                     maximumRecordSize = *b_resp->maximumRecordSize;
519             }
520             else
521             {
522                 if (!f_resp->userInformationField
523                     && b_resp->userInformationField)
524                     f_resp->userInformationField = b_resp->userInformationField;
525                 no_failed++;
526             }
527         }
528         else
529             no_failed++;
530         bit++;
531     }
532     *f_resp->preferredMessageSize = preferredMessageSize;
533     *f_resp->maximumRecordSize = maximumRecordSize;
534
535     if (m_p->m_hide_unavailable)
536     {
537         if (no_succeeded == 0)
538         {
539             *f_resp->result = 0;
540             package.session().close();
541         }
542     }
543     else
544     {
545         if (no_failed)
546         {
547             *f_resp->result = 0;
548             package.session().close();
549         }
550     }
551     package.response() = f_apdu;
552 }
553
554 void yf::Multi::Frontend::record_diagnostics(Z_Records *records, 
555                                              Z_DiagRecs * &z_diag,
556                                              ODR odr,
557                                              int &no_successful)
558 {
559     // see we get any errors (AKA diagnstics)
560     if (records)
561     {
562         if (records->which == Z_Records_NSD)
563         {
564             if (!z_diag)
565             {
566                 z_diag = (Z_DiagRecs *)
567                     odr_malloc(odr, sizeof(*z_diag));
568                 z_diag->num_diagRecs = 0;
569                 z_diag->diagRecs = (Z_DiagRec**)
570                     odr_malloc(odr, sizeof(*z_diag->diagRecs));
571             }
572             else
573             {
574                 Z_DiagRec **n = (Z_DiagRec **)
575                     odr_malloc(odr,
576                                (1+z_diag->num_diagRecs) * sizeof(*n));
577                 memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs
578                        * sizeof(*n));
579                 z_diag->diagRecs = n;
580             }
581             Z_DiagRec *nr = (Z_DiagRec *) odr_malloc(odr, sizeof(*nr));
582             nr->which = Z_DiagRec_defaultFormat;
583             nr->u.defaultFormat =
584                 records->u.nonSurrogateDiagnostic;
585             z_diag->diagRecs[z_diag->num_diagRecs++] = nr;
586         }
587         else if (records->which == Z_Records_multipleNSD)
588         {
589             // we may set this multiple times (TOO BAD!)
590             z_diag = records->u.multipleNonSurDiagnostics;
591         }
592         else
593             no_successful++; // probably piggyback
594     }
595     else
596         no_successful++;  // no records and no diagnostics
597 }
598
599 void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
600 {
601     // create search request
602     Z_SearchRequest *req = apdu_req->u.searchRequest;
603
604     // save these for later
605     Odr_int smallSetUpperBound = *req->smallSetUpperBound;
606     Odr_int largeSetLowerBound = *req->largeSetLowerBound;
607     Odr_int mediumSetPresentNumber = *req->mediumSetPresentNumber;
608
609     // they are altered now - to disable piggyback
610     *req->smallSetUpperBound = 0;
611     *req->largeSetLowerBound = 1;
612     *req->mediumSetPresentNumber = 0;
613
614     int default_num_db = req->num_databaseNames;
615     char **default_db = req->databaseNames;
616
617     std::list<BackendPtr>::const_iterator bit;
618     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
619     {
620         PackagePtr p = (*bit)->m_package;
621         mp::odr odr;
622
623         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
624                                                 &req->num_databaseNames,
625                                                 &req->databaseNames))
626         {
627             req->num_databaseNames = default_num_db;
628             req->databaseNames = default_db;
629         }
630         p->request() = apdu_req;
631         p->copy_filter(package);
632     }
633     multi_move(m_backend_list);
634
635     // look at each response
636     FrontendSet resultSet(std::string(req->resultSetName));
637
638     mp::odr odr;
639     Odr_int result_set_size = 0;
640     Z_DiagRecs *z_diag = 0;
641     int no_successful = 0;
642     PackagePtr close_p;
643
644     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
645     {
646         PackagePtr p = (*bit)->m_package;
647
648         // save closing package for at least one target
649         if (p->session().is_closed())
650             close_p = p;
651
652         Z_GDU *gdu = p->response().get();
653         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
654             Z_APDU_searchResponse)
655         {
656             Z_APDU *b_apdu = gdu->u.z3950;
657             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
658
659             record_diagnostics(b_resp->records, z_diag, odr, no_successful);
660
661             BackendSet backendSet;
662             backendSet.m_backend = *bit;
663             backendSet.m_count = *b_resp->resultCount;
664             result_set_size += *b_resp->resultCount;
665             resultSet.m_backend_sets.push_back(backendSet);
666         }
667     }
668
669     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
670     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
671
672     yaz_log(YLOG_LOG, "no_successful=%d is_closed=%s hide_errors=%s",
673             no_successful,
674             close_p ? "true" : "false",
675             m_p->m_hide_errors ? "true" : "false");
676     *f_resp->resultCount = result_set_size;
677     if (close_p && (no_successful == 0 || !m_p->m_hide_errors))
678     {
679         package.session().close();
680         package.response() = close_p->response();
681         return;
682     }
683     if (z_diag && (no_successful == 0 || !m_p->m_hide_errors))
684     {
685         f_resp->records = (Z_Records *)
686             odr_malloc(odr, sizeof(*f_resp->records));
687         if (z_diag->num_diagRecs > 1)
688         {
689             f_resp->records->which = Z_Records_multipleNSD;
690             f_resp->records->u.multipleNonSurDiagnostics = z_diag;
691         }
692         else
693         {
694             f_resp->records->which = Z_Records_NSD;
695             f_resp->records->u.nonSurrogateDiagnostic =
696                 z_diag->diagRecs[0]->u.defaultFormat;
697         }
698     }
699     // assume OK
700     m_sets[resultSet.m_setname] = resultSet;
701
702     Odr_int number;
703     mp::util::piggyback(smallSetUpperBound,
704                         largeSetLowerBound,
705                         mediumSetPresentNumber,
706                         0, 0,
707                         result_set_size,
708                         number, 0);
709     Package pp(package.session(), package.origin());
710     if (z_diag == 0 && number > 0)
711     {
712         pp.copy_filter(package);
713         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
714         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
715         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
716         p_req->resultSetId = req->resultSetName;
717         *p_req->resultSetStartPoint = 1;
718         *p_req->numberOfRecordsRequested = number;
719         pp.request() = p_apdu;
720         present(pp, p_apdu);
721
722         if (pp.session().is_closed())
723             package.session().close();
724
725         Z_GDU *gdu = pp.response().get();
726         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
727             Z_APDU_presentResponse)
728         {
729             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
730             f_resp->records = p_res->records;
731             *f_resp->numberOfRecordsReturned =
732                 *p_res->numberOfRecordsReturned;
733             *f_resp->nextResultSetPosition =
734                 *p_res->nextResultSetPosition;
735         }
736         else
737         {
738             package.response() = pp.response();
739             return;
740         }
741     }
742     package.response() = f_apdu; // in this scope because of p
743 }
744
745 void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
746 {
747     // create present request
748     Z_PresentRequest *req = apdu_req->u.presentRequest;
749
750     Sets_it it;
751     it = m_sets.find(std::string(req->resultSetId));
752     if (it == m_sets.end())
753     {
754         mp::odr odr;
755         Z_APDU *apdu =
756             odr.create_presentResponse(
757                 apdu_req,
758                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
759                 req->resultSetId);
760         package.response() = apdu;
761         return;
762     }
763     std::list<Multi::FrontendSet::PresentJob> jobs;
764     int start = *req->resultSetStartPoint;
765     int number = *req->numberOfRecordsRequested;
766
767     if (m_p->m_merge_type == round_robin)
768         it->second.round_robin(start, number, jobs);
769     else if (m_p->m_merge_type == serve_order)
770         it->second.serve_order(start, number, jobs);
771
772     if (0)
773     {
774         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
775         for (jit = jobs.begin(); jit != jobs.end(); jit++)
776         {
777             yaz_log(YLOG_LOG, "job pos=%d", jit->m_pos);
778         }
779     }
780
781     std::list<BackendPtr> present_backend_list;
782
783     std::list<BackendSet>::const_iterator bsit;
784     bsit = it->second.m_backend_sets.begin();
785     for (; bsit != it->second.m_backend_sets.end(); bsit++)
786     {
787         int start = -1;
788         int end = -1;
789         {
790             std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
791             for (jit = jobs.begin(); jit != jobs.end(); jit++)
792             {
793                 if (jit->m_backend == bsit->m_backend)
794                 {
795                     if (start == -1 || jit->m_pos < start)
796                         start = jit->m_pos;
797                     if (end == -1 || jit->m_pos > end)
798                         end = jit->m_pos;
799                 }
800             }
801         }
802         if (start != -1)
803         {
804             std::list<Multi::FrontendSet::PresentJob>::iterator jit;
805             for (jit = jobs.begin(); jit != jobs.end(); jit++)
806             {
807                 if (jit->m_backend == bsit->m_backend)
808                 {
809                     if (jit->m_pos >= start && jit->m_pos <= end)
810                         jit->m_start = start;
811                 }
812             }
813
814             PackagePtr p = bsit->m_backend->m_package;
815
816             *req->resultSetStartPoint = start;
817             *req->numberOfRecordsRequested = end - start + 1;
818
819             p->request() = apdu_req;
820             p->copy_filter(package);
821
822             present_backend_list.push_back(bsit->m_backend);
823         }
824     }
825     multi_move(present_backend_list);
826
827     // look at each response
828     Z_DiagRecs *z_diag = 0;
829     int no_successful = 0;
830     mp::odr odr;
831     PackagePtr close_p;
832
833     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
834     for (; pbit != present_backend_list.end(); pbit++)
835     {
836         PackagePtr p = (*pbit)->m_package;
837
838         // save closing package for at least one target
839         if (p->session().is_closed())
840             close_p = p;
841
842         Z_GDU *gdu = p->response().get();
843         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
844             Z_APDU_presentResponse)
845         {
846             Z_APDU *b_apdu = gdu->u.z3950;
847             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
848
849             record_diagnostics(b_resp->records, z_diag, odr, no_successful);
850         }
851     }
852
853     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
854     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
855
856     if (close_p && (no_successful == 0 || !m_p->m_hide_errors))
857     {
858         package.session().close();
859         package.response() = close_p->response();
860         return;
861     }
862     if (z_diag && (no_successful == 0 || !m_p->m_hide_errors))
863     {
864         f_resp->records = (Z_Records *)
865             odr_malloc(odr, sizeof(*f_resp->records));
866         if (z_diag->num_diagRecs > 1)
867         {
868             f_resp->records->which = Z_Records_multipleNSD;
869             f_resp->records->u.multipleNonSurDiagnostics = z_diag;
870         }
871         else
872         {
873             f_resp->records->which = Z_Records_NSD;
874             f_resp->records->u.nonSurrogateDiagnostic =
875                 z_diag->diagRecs[0]->u.defaultFormat;
876         }
877     }
878     else if (number < 0 || (size_t) number > jobs.size())
879     {
880         f_apdu =
881             odr.create_presentResponse(
882                 apdu_req,
883                 YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
884                 0);
885     }
886     else
887     {
888         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
889         Z_Records * records = f_resp->records;
890         records->which = Z_Records_DBOSD;
891         records->u.databaseOrSurDiagnostics =
892             (Z_NamePlusRecordList *)
893             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
894         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
895         nprl->num_records = jobs.size();
896         nprl->records = (Z_NamePlusRecord**)
897             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
898         int i = 0;
899         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
900         for (jit = jobs.begin(); jit != jobs.end(); jit++)
901         {
902             PackagePtr p = jit->m_backend->m_package;
903
904             Z_GDU *gdu = p->response().get();
905             Z_APDU *b_apdu = gdu->u.z3950;
906             int inside_pos = jit->m_pos - jit->m_start;
907             Z_Records *records = b_apdu->u.presentResponse->records;
908
909             if (records && records->which == Z_Records_DBOSD
910                 && inside_pos < 
911                 records->u.databaseOrSurDiagnostics->num_records)
912             {
913                 nprl->records[i] = (Z_NamePlusRecord*)
914                     odr_malloc(odr, sizeof(Z_NamePlusRecord));
915                 *nprl->records[i] = *records->
916                     u.databaseOrSurDiagnostics->records[inside_pos];
917                 nprl->records[i]->databaseName =
918                     odr_strdup(odr, jit->m_backend->m_vhost.c_str());
919                 i++;
920             }
921         }
922         nprl->num_records = i; // usually same as jobs.size();
923         *f_resp->nextResultSetPosition = start + i;
924         *f_resp->numberOfRecordsReturned = i;
925     }
926     package.response() = f_apdu;
927 }
928
929 void yf::Multi::Frontend::scan1(mp::Package &package, Z_APDU *apdu_req)
930 {
931     if (m_backend_list.size() > 1)
932     {
933         mp::odr odr;
934         Z_APDU *f_apdu =
935             odr.create_scanResponse(
936                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
937         package.response() = f_apdu;
938         return;
939     }
940     Z_ScanRequest *req = apdu_req->u.scanRequest;
941
942     int default_num_db = req->num_databaseNames;
943     char **default_db = req->databaseNames;
944
945     std::list<BackendPtr>::const_iterator bit;
946     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
947     {
948         PackagePtr p = (*bit)->m_package;
949         mp::odr odr;
950
951         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
952                                                 &req->num_databaseNames,
953                                                 &req->databaseNames))
954         {
955             req->num_databaseNames = default_num_db;
956             req->databaseNames = default_db;
957         }
958         p->request() = apdu_req;
959         p->copy_filter(package);
960     }
961     multi_move(m_backend_list);
962
963     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
964     {
965         PackagePtr p = (*bit)->m_package;
966
967         if (p->session().is_closed()) // if any backend closes, close frontend
968             package.session().close();
969
970         Z_GDU *gdu = p->response().get();
971         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
972             Z_APDU_scanResponse)
973         {
974             package.response() = p->response();
975             break;
976         }
977         else
978         {
979             // if any target does not return scan response - return that
980             package.response() = p->response();
981             return;
982         }
983     }
984 }
985
986 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
987 {
988     return m_norm_term < k.m_norm_term;
989 }
990
991 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
992 {
993     return m_norm_term == k.m_norm_term;
994 }
995
996 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
997 {
998     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
999     e->which = Z_Entry_termInfo;
1000     Z_TermInfo *t;
1001     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
1002     t->suggestedAttributes = 0;
1003     t->displayTerm = 0;
1004     t->alternativeTerm = 0;
1005     t->byAttributes = 0;
1006     t->otherTermInfo = 0;
1007     t->globalOccurrences = odr_intdup(odr, m_count);
1008     t->term = (Z_Term *)
1009         odr_malloc(odr, sizeof(*t->term));
1010     t->term->which = Z_Term_general;
1011     Odr_oct *o;
1012     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
1013
1014     o->len = o->size = m_norm_term.size();
1015     o->buf = (unsigned char *) odr_malloc(odr, o->len);
1016     memcpy(o->buf, m_norm_term.c_str(), o->len);
1017     return e;
1018 }
1019
1020 void yf::Multi::Frontend::relay_apdu(mp::Package &package, Z_APDU *apdu_req)
1021 {
1022     std::list<BackendPtr>::const_iterator bit;
1023     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1024     {
1025         PackagePtr p = (*bit)->m_package;
1026         mp::odr odr;
1027
1028         p->request() = apdu_req;
1029         p->copy_filter(package);
1030     }
1031     multi_move(m_backend_list);
1032     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1033     {
1034         PackagePtr p = (*bit)->m_package;
1035
1036         if (p->session().is_closed()) // if any backend closes, close frontend
1037             package.session().close();
1038
1039         package.response() = p->response();
1040     }
1041 }
1042
1043
1044 void yf::Multi::Frontend::scan2(mp::Package &package, Z_APDU *apdu_req)
1045 {
1046     Z_ScanRequest *req = apdu_req->u.scanRequest;
1047
1048     int default_num_db = req->num_databaseNames;
1049     char **default_db = req->databaseNames;
1050
1051     std::list<BackendPtr>::const_iterator bit;
1052     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1053     {
1054         PackagePtr p = (*bit)->m_package;
1055         mp::odr odr;
1056
1057         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
1058                                                 &req->num_databaseNames,
1059                                                 &req->databaseNames))
1060         {
1061             req->num_databaseNames = default_num_db;
1062             req->databaseNames = default_db;
1063         }
1064         p->request() = apdu_req;
1065         p->copy_filter(package);
1066     }
1067     multi_move(m_backend_list);
1068
1069     ScanTermInfoList entries_before;
1070     ScanTermInfoList entries_after;
1071     int no_before = 0;
1072     int no_after = 0;
1073
1074     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1075     {
1076         PackagePtr p = (*bit)->m_package;
1077
1078         if (p->session().is_closed()) // if any backend closes, close frontend
1079             package.session().close();
1080
1081         Z_GDU *gdu = p->response().get();
1082         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1083             Z_APDU_scanResponse)
1084         {
1085             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
1086
1087             if (res->entries && res->entries->nonsurrogateDiagnostics)
1088             {
1089                 // failure
1090                 mp::odr odr;
1091                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
1092                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
1093
1094                 f_res->entries->nonsurrogateDiagnostics =
1095                     res->entries->nonsurrogateDiagnostics;
1096                 f_res->entries->num_nonsurrogateDiagnostics =
1097                     res->entries->num_nonsurrogateDiagnostics;
1098
1099                 package.response() = f_apdu;
1100                 return;
1101             }
1102
1103             if (res->entries && res->entries->entries)
1104             {
1105                 Z_Entry **entries = res->entries->entries;
1106                 int num_entries = res->entries->num_entries;
1107                 int position = 1;
1108                 if (req->preferredPositionInResponse)
1109                     position = *req->preferredPositionInResponse;
1110                 if (res->positionOfTerm)
1111                     position = *res->positionOfTerm;
1112
1113                 // before
1114                 int i;
1115                 for (i = 0; i<position-1 && i<num_entries; i++)
1116                 {
1117                     Z_Entry *ent = entries[i];
1118
1119                     if (ent->which == Z_Entry_termInfo)
1120                     {
1121                         ScanTermInfo my;
1122
1123                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1124                         my.m_count = occur ? *occur : 0;
1125
1126                         if (ent->u.termInfo->term->which == Z_Term_general)
1127                         {
1128                             my.m_norm_term = std::string(
1129                                 (const char *)
1130                                 ent->u.termInfo->term->u.general->buf,
1131                                 ent->u.termInfo->term->u.general->len);
1132                         }
1133                         if (my.m_norm_term.length())
1134                         {
1135                             ScanTermInfoList::iterator it =
1136                                 entries_before.begin();
1137                             while (it != entries_before.end() && my <*it)
1138                                 it++;
1139                             if (it != entries_before.end() && my == *it)
1140                             {
1141                                 it->m_count += my.m_count;
1142                             }
1143                             else
1144                             {
1145                                 entries_before.insert(it, my);
1146                                 no_before++;
1147                             }
1148                         }
1149                     }
1150                 }
1151                 // after
1152                 if (position <= 0)
1153                     i = 0;
1154                 else
1155                     i = position-1;
1156                 for ( ; i<num_entries; i++)
1157                 {
1158                     Z_Entry *ent = entries[i];
1159
1160                     if (ent->which == Z_Entry_termInfo)
1161                     {
1162                         ScanTermInfo my;
1163
1164                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1165                         my.m_count = occur ? *occur : 0;
1166
1167                         if (ent->u.termInfo->term->which == Z_Term_general)
1168                         {
1169                             my.m_norm_term = std::string(
1170                                 (const char *)
1171                                 ent->u.termInfo->term->u.general->buf,
1172                                 ent->u.termInfo->term->u.general->len);
1173                         }
1174                         if (my.m_norm_term.length())
1175                         {
1176                             ScanTermInfoList::iterator it =
1177                                 entries_after.begin();
1178                             while (it != entries_after.end() && *it < my)
1179                                 it++;
1180                             if (it != entries_after.end() && my == *it)
1181                             {
1182                                 it->m_count += my.m_count;
1183                             }
1184                             else
1185                             {
1186                                 entries_after.insert(it, my);
1187                                 no_after++;
1188                             }
1189                         }
1190                     }
1191                 }
1192
1193             }
1194         }
1195         else
1196         {
1197             // if any target does not return scan response - return that
1198             package.response() = p->response();
1199             return;
1200         }
1201     }
1202
1203     if (false)
1204     {
1205         std::cout << "BEFORE\n";
1206         ScanTermInfoList::iterator it = entries_before.begin();
1207         for(; it != entries_before.end(); it++)
1208         {
1209             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1210         }
1211
1212         std::cout << "AFTER\n";
1213         it = entries_after.begin();
1214         for(; it != entries_after.end(); it++)
1215         {
1216             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1217         }
1218     }
1219
1220     if (false)
1221     {
1222         mp::odr odr;
1223         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1224         package.response() = f_apdu;
1225     }
1226     else
1227     {
1228         mp::odr odr;
1229         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1230         Z_ScanResponse *resp = f_apdu->u.scanResponse;
1231
1232         int number_returned = *req->numberOfTermsRequested;
1233         int position_returned = *req->preferredPositionInResponse;
1234
1235         resp->entries->num_entries = number_returned;
1236         resp->entries->entries = (Z_Entry**)
1237             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1238         int i;
1239
1240         int lbefore = entries_before.size();
1241         if (lbefore < position_returned-1)
1242             position_returned = lbefore+1;
1243
1244         ScanTermInfoList::iterator it = entries_before.begin();
1245         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1246         {
1247             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1248         }
1249
1250         it = entries_after.begin();
1251
1252         if (position_returned <= 0)
1253             i = 0;
1254         else
1255             i = position_returned-1;
1256         for (; i<number_returned && it != entries_after.end(); i++, it++)
1257         {
1258             resp->entries->entries[i] = it->get_entry(odr);
1259         }
1260
1261         number_returned = i;
1262
1263         resp->positionOfTerm = odr_intdup(odr, position_returned);
1264         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1265         resp->entries->num_entries = number_returned;
1266
1267         package.response() = f_apdu;
1268     }
1269 }
1270
1271
1272 void yf::Multi::process(mp::Package &package) const
1273 {
1274     FrontendPtr f = m_p->get_frontend(package);
1275
1276     Z_GDU *gdu = package.request().get();
1277
1278     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1279         Z_APDU_initRequest && !f->m_is_multi)
1280     {
1281         f->init(package, gdu);
1282     }
1283     else if (!f->m_is_multi)
1284         package.move();
1285     else if (gdu && gdu->which == Z_GDU_Z3950)
1286     {
1287         Z_APDU *apdu = gdu->u.z3950;
1288         if (apdu->which == Z_APDU_initRequest)
1289         {
1290             mp::odr odr;
1291
1292             package.response() = odr.create_close(
1293                 apdu,
1294                 Z_Close_protocolError,
1295                 "double init");
1296
1297             package.session().close();
1298         }
1299         else if (apdu->which == Z_APDU_searchRequest)
1300         {
1301             f->search(package, apdu);
1302         }
1303         else if (apdu->which == Z_APDU_presentRequest)
1304         {
1305             f->present(package, apdu);
1306         }
1307         else if (apdu->which == Z_APDU_scanRequest)
1308         {
1309             f->scan2(package, apdu);
1310         }
1311         else if (apdu->which == Z_APDU_close)
1312         {
1313             f->relay_apdu(package, apdu);
1314         }
1315         else
1316         {
1317             mp::odr odr;
1318
1319             package.response() = odr.create_close(
1320                 apdu, Z_Close_protocolError,
1321                 "unsupported APDU in filter multi");
1322
1323             package.session().close();
1324         }
1325     }
1326     m_p->release_frontend(package);
1327 }
1328
1329 void mp::filter::Multi::configure(const xmlNode * ptr, bool test_only,
1330                                   const char *path)
1331 {
1332     for (ptr = ptr->children; ptr; ptr = ptr->next)
1333     {
1334         if (ptr->type != XML_ELEMENT_NODE)
1335             continue;
1336         if (!strcmp((const char *) ptr->name, "target"))
1337         {
1338             std::string auth;
1339             std::string route = mp::xml::get_route(ptr, auth);
1340             std::string target = mp::xml::get_text(ptr);
1341             if (target.length() == 0)
1342                 target = route;
1343             m_p->m_route_patterns.push_back(Multi::Map(target, route, auth));
1344         }
1345         else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1346         {
1347             m_p->m_hide_unavailable = true;
1348         }
1349         else if (!strcmp((const char *) ptr->name, "hideerrors"))
1350         {
1351             m_p->m_hide_errors = true;
1352         }
1353         else if (!strcmp((const char *) ptr->name, "mergetype"))
1354         {
1355             std::string mergetype = mp::xml::get_text(ptr);
1356             if (mergetype == "roundrobin")
1357                 m_p->m_merge_type = round_robin;
1358             else if (mergetype == "serveorder")
1359                 m_p->m_merge_type = serve_order;
1360             else
1361                 throw mp::filter::FilterException
1362                     ("Bad mergetype "  + mergetype + " in multi filter");
1363
1364         }
1365         else
1366         {
1367             throw mp::filter::FilterException
1368                 ("Bad element "
1369                  + std::string((const char *) ptr->name)
1370                  + " in multi filter");
1371         }
1372     }
1373 }
1374
1375 static mp::filter::Base* filter_creator()
1376 {
1377     return new mp::filter::Multi;
1378 }
1379
1380 extern "C" {
1381     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1382         0,
1383         "multi",
1384         filter_creator
1385     };
1386 }
1387
1388
1389 /*
1390  * Local variables:
1391  * c-basic-offset: 4
1392  * c-file-style: "Stroustrup"
1393  * indent-tabs-mode: nil
1394  * End:
1395  * vim: shiftwidth=4 tabstop=8 expandtab
1396  */
1397