multi: do not alter scan package either
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2013 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include <yaz/log.h>
20
21 #include "config.hpp"
22
23 #include <metaproxy/filter.hpp>
24 #include <metaproxy/package.hpp>
25
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition.hpp>
29 #include <boost/shared_ptr.hpp>
30
31 #include <metaproxy/util.hpp>
32 #include "filter_multi.hpp"
33
34 #include <yaz/zgdu.h>
35 #include <yaz/otherinfo.h>
36 #include <yaz/diagbib1.h>
37 #include <yaz/match_glob.h>
38
39 #include <vector>
40 #include <algorithm>
41 #include <map>
42 #include <iostream>
43
44 namespace mp = metaproxy_1;
45 namespace yf = mp::filter;
46
47 namespace metaproxy_1 {
48     namespace filter {
49         enum multi_merge_type {
50             round_robin,
51             serve_order
52         };
53         struct Multi::BackendSet {
54             BackendPtr m_backend;
55             int m_count;
56             bool operator < (const BackendSet &k) const;
57             bool operator == (const BackendSet &k) const;
58         };
59         struct Multi::ScanTermInfo {
60             std::string m_norm_term;
61             std::string m_display_term;
62             int m_count;
63             bool operator < (const ScanTermInfo &) const;
64             bool operator == (const ScanTermInfo &) const;
65             Z_Entry *get_entry(ODR odr);
66         };
67         struct Multi::FrontendSet {
68             class PresentJob {
69             public:
70                 BackendPtr m_backend;
71                 int m_pos; // position for backend (1=first, 2=second,..
72                 int m_start; // present request start
73                 PresentJob(BackendPtr ptr, int pos) :
74                     m_backend(ptr), m_pos(pos), m_start(0) {};
75             };
76             FrontendSet(std::string setname);
77             FrontendSet();
78             ~FrontendSet();
79
80             void round_robin(int pos, int number, std::list<PresentJob> &job);
81             void serve_order(int pos, int number, std::list<PresentJob> &job);
82
83             std::list<BackendSet> m_backend_sets;
84             std::string m_setname;
85         };
86         struct Multi::Backend {
87             PackagePtr m_package;
88             std::string m_backend_database;
89             std::string m_vhost;
90             std::string m_route;
91             std::string m_auth;
92             void operator() (void);  // thread operation
93         };
94         struct Multi::Frontend {
95             Frontend(Rep *rep);
96             ~Frontend();
97             bool m_is_multi;
98             bool m_in_use;
99             std::list<BackendPtr> m_backend_list;
100             std::map<std::string,Multi::FrontendSet> m_sets;
101
102             void multi_move(std::list<BackendPtr> &blist);
103             void init(Package &package, Z_GDU *gdu);
104             void close(Package &package);
105             void search(Package &package, Z_APDU *apdu);
106             void present(Package &package, Z_APDU *apdu);
107             void scan(Package &package, Z_APDU *apdu);
108             void relay_apdu(Package &package, Z_APDU *apdu);
109             void record_diagnostics(Z_Records *records, 
110                                     Z_DiagRecs * &z_diag,
111                                     ODR odr,
112                                     int &no_successful);
113             Rep *m_p;
114         };
115         class Multi::Map {
116             std::string m_target_pattern;
117             std::string m_route;
118             std::string m_auth;
119         public:
120             Map(std::string pattern, std::string route, std::string auth) :
121                 m_target_pattern(pattern), m_route(route), m_auth(auth) {};
122             bool match(const std::string target, std::string *ret,
123                        std::string *auth) const {
124                 if (yaz_match_glob(m_target_pattern.c_str(), target.c_str()))
125                 {
126                     *ret = m_route;
127                     *auth = m_auth;
128                     return true;
129                 }
130                 return false;
131             };
132         };
133         class Multi::Rep {
134             friend class Multi;
135             friend struct Frontend;
136
137             Rep();
138             FrontendPtr get_frontend(Package &package);
139             void release_frontend(Package &package);
140         private:
141             std::list<Multi::Map> m_route_patterns;
142             boost::mutex m_mutex;
143             boost::condition m_cond_session_ready;
144             std::map<mp::Session, FrontendPtr> m_clients;
145             bool m_hide_unavailable;
146             bool m_hide_errors;
147             multi_merge_type m_merge_type;
148         };
149     }
150 }
151
152 yf::Multi::Rep::Rep()
153 {
154     m_hide_unavailable = false;
155     m_hide_errors = false;
156     m_merge_type = round_robin;
157 }
158
159 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
160 {
161     return m_count < k.m_count;
162 }
163
164 yf::Multi::Frontend::Frontend(Rep *rep)
165 {
166     m_p = rep;
167     m_is_multi = false;
168 }
169
170 yf::Multi::Frontend::~Frontend()
171 {
172 }
173
174 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
175 {
176     boost::mutex::scoped_lock lock(m_mutex);
177
178     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
179
180     while(true)
181     {
182         it = m_clients.find(package.session());
183         if (it == m_clients.end())
184             break;
185
186         if (!it->second->m_in_use)
187         {
188             it->second->m_in_use = true;
189             return it->second;
190         }
191         m_cond_session_ready.wait(lock);
192     }
193     FrontendPtr f(new Frontend(this));
194     m_clients[package.session()] = f;
195     f->m_in_use = true;
196     return f;
197 }
198
199 void yf::Multi::Rep::release_frontend(mp::Package &package)
200 {
201     boost::mutex::scoped_lock lock(m_mutex);
202     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
203
204     it = m_clients.find(package.session());
205     if (it != m_clients.end())
206     {
207         if (package.session().is_closed())
208         {
209             it->second->close(package);
210             m_clients.erase(it);
211         }
212         else
213         {
214             it->second->m_in_use = false;
215         }
216         m_cond_session_ready.notify_all();
217     }
218 }
219
220 yf::Multi::FrontendSet::FrontendSet(std::string setname)
221     :  m_setname(setname)
222 {
223 }
224
225
226 yf::Multi::FrontendSet::FrontendSet()
227 {
228 }
229
230
231 yf::Multi::FrontendSet::~FrontendSet()
232 {
233 }
234
235 yf::Multi::Multi() : m_p(new Multi::Rep)
236 {
237 }
238
239 yf::Multi::~Multi() {
240 }
241
242
243 void yf::Multi::Backend::operator() (void)
244 {
245     m_package->move(m_route);
246 }
247
248
249 void yf::Multi::Frontend::close(mp::Package &package)
250 {
251     std::list<BackendPtr>::const_iterator bit;
252     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
253     {
254         BackendPtr b = *bit;
255
256         b->m_package->copy_filter(package);
257         b->m_package->request() = (Z_GDU *) 0;
258         b->m_package->session().close();
259         b->m_package->move(b->m_route);
260     }
261 }
262
263 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
264 {
265     std::list<BackendPtr>::const_iterator bit;
266     boost::thread_group g;
267     for (bit = blist.begin(); bit != blist.end(); bit++)
268     {
269         g.add_thread(new boost::thread(**bit));
270     }
271     g.join_all();
272 }
273
274 void yf::Multi::FrontendSet::serve_order(int start, int number,
275                                          std::list<PresentJob> &jobs)
276 {
277     int i;
278     for (i = 0; i < number; i++)
279     {
280         std::list<BackendSet>::const_iterator bsit;
281         int voffset = 0;
282         int offset = start + i - 1;
283         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end();
284              bsit++)
285         {
286             if (offset >= voffset && offset < voffset + bsit->m_count)
287             {
288                 PresentJob job(bsit->m_backend, offset - voffset + 1);
289                 jobs.push_back(job);
290                 break;
291             }
292             voffset += bsit->m_count;
293         }
294     }
295 }
296
297 void yf::Multi::FrontendSet::round_robin(int start, int number,
298                                          std::list<PresentJob> &jobs)
299 {
300     std::list<int> pos;
301     std::list<BackendSet>::const_iterator bsit;
302     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
303     {
304         pos.push_back(1);
305     }
306
307     int p = 1;
308 #if 1
309     // optimization step!
310     int omin = 0;
311     while(true)
312     {
313         int min = 0;
314         int no_left = 0;
315         // find min count for each set which is > omin
316         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
317         {
318             if (bsit->m_count > omin)
319             {
320                 if (no_left == 0 || bsit->m_count < min)
321                     min = bsit->m_count;
322                 no_left++;
323             }
324         }
325         if (no_left == 0) // if nothing greater than omin, bail out.
326             break;
327         int skip = no_left * min;
328         if (p + skip > start)  // step gets us "into" present range?
329         {
330             // Yes. skip until start.. Rounding off is deliberate!
331             min = (start-p) / no_left;
332             p += no_left * min;
333
334             // update positions in each set..
335             std::list<int>::iterator psit = pos.begin();
336             for (psit = pos.begin(); psit != pos.end(); psit++)
337                 *psit += min;
338             break;
339         }
340         // skip on each set.. before "present range"..
341         p = p + skip;
342
343         std::list<int>::iterator psit = pos.begin();
344         for (psit = pos.begin(); psit != pos.end(); psit++)
345             *psit += min;
346
347         omin = min; // update so we consider next class (with higher count)
348     }
349 #endif
350     int fetched = 0;
351     bool more = true;
352     while (more)
353     {
354         more = false;
355         std::list<int>::iterator psit = pos.begin();
356         bsit = m_backend_sets.begin();
357
358         for (; bsit != m_backend_sets.end(); psit++,bsit++)
359         {
360             if (fetched >= number)
361             {
362                 more = false;
363                 break;
364             }
365             if (*psit <= bsit->m_count)
366             {
367                 if (p >= start)
368                 {
369                     PresentJob job(bsit->m_backend, *psit);
370                     jobs.push_back(job);
371                     fetched++;
372                 }
373                 (*psit)++;
374                 p++;
375                 more = true;
376             }
377         }
378     }
379 }
380
381 void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
382 {
383     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
384
385     std::list<std::string> targets;
386
387     mp::util::get_vhost_otherinfo(req->otherInfo, targets);
388
389     if (targets.size() < 1)
390     {
391         package.move();
392         return;
393     }
394
395     std::list<std::string>::const_iterator t_it = targets.begin();
396     for (; t_it != targets.end(); t_it++)
397     {
398         Session s;
399         Backend *b = new Backend;
400         b->m_vhost = *t_it;
401
402         std::list<Multi::Map>::const_iterator it =
403             m_p->m_route_patterns.begin();
404         while (it != m_p->m_route_patterns.end()) {
405             if (it->match(*t_it, &b->m_route, &b->m_auth))
406                 break;
407             it++;
408         }
409         // b->m_route = m_p->m_target_route[*t_it];
410         // b->m_route unset
411         b->m_package = PackagePtr(new Package(s, package.origin()));
412
413         m_backend_list.push_back(BackendPtr(b));
414     }
415     m_is_multi = true;
416
417     // create init request
418     std::list<BackendPtr>::iterator bit;
419     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
420     {
421         mp::odr odr;
422         BackendPtr b = *bit;
423         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
424
425         std::list<std::string>vhost_one;
426         vhost_one.push_back(b->m_vhost);
427         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
428                                        odr, vhost_one);
429
430
431         Z_InitRequest *breq = init_apdu->u.initRequest;
432
433         if (b->m_auth.length())
434         {
435             breq->idAuthentication =
436                 (Z_IdAuthentication *)
437                 odr_malloc(odr, sizeof(*breq->idAuthentication));
438             breq->idAuthentication->which = Z_IdAuthentication_open;
439             breq->idAuthentication->u.open = odr_strdup(odr, b->m_auth.c_str());
440         }
441         else
442             breq->idAuthentication = req->idAuthentication;
443
444         *breq->preferredMessageSize = *req->preferredMessageSize;
445         *breq->maximumRecordSize = *req->maximumRecordSize;
446
447         ODR_MASK_SET(breq->options, Z_Options_search);
448         ODR_MASK_SET(breq->options, Z_Options_present);
449         ODR_MASK_SET(breq->options, Z_Options_namedResultSets);
450         ODR_MASK_SET(breq->options, Z_Options_scan);
451
452         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_1);
453         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_2);
454         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_3);
455
456         b->m_package->request() = init_apdu;
457
458         b->m_package->copy_filter(package);
459     }
460     multi_move(m_backend_list);
461
462     // create the frontend init response based on each backend init response
463     mp::odr odr;
464
465     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
466     Z_InitResponse *f_resp = f_apdu->u.initResponse;
467
468     ODR_MASK_SET(f_resp->options, Z_Options_search);
469     ODR_MASK_SET(f_resp->options, Z_Options_present);
470     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
471     ODR_MASK_SET(f_resp->options, Z_Options_scan);
472
473     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
474     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
475     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
476
477     int no_failed = 0;
478     int no_succeeded = 0;
479
480     Odr_int preferredMessageSize = *req->preferredMessageSize;
481     Odr_int maximumRecordSize = *req->maximumRecordSize;
482     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
483     {
484         PackagePtr p = (*bit)->m_package;
485
486         if (p->session().is_closed())
487         {
488             // failed. Remove from list and increment number of failed
489             no_failed++;
490             bit = m_backend_list.erase(bit);
491             continue;
492         }
493         Z_GDU *gdu = p->response().get();
494         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
495             Z_APDU_initResponse)
496         {
497             int i;
498             Z_APDU *b_apdu = gdu->u.z3950;
499             Z_InitResponse *b_resp = b_apdu->u.initResponse;
500
501             // common options for all backends
502             for (i = 0; i <= Z_Options_stringSchema; i++)
503             {
504                 if (!ODR_MASK_GET(b_resp->options, i))
505                     ODR_MASK_CLEAR(f_resp->options, i);
506             }
507             // common protocol version
508             for (i = 0; i <= Z_ProtocolVersion_3; i++)
509                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
510                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
511             if (*b_resp->result)
512             {
513                 no_succeeded++;
514                 if (preferredMessageSize > *b_resp->preferredMessageSize)
515                     preferredMessageSize = *b_resp->preferredMessageSize;
516                 if (maximumRecordSize > *b_resp->maximumRecordSize)
517                     maximumRecordSize = *b_resp->maximumRecordSize;
518             }
519             else
520             {
521                 if (!f_resp->userInformationField
522                     && b_resp->userInformationField)
523                     f_resp->userInformationField = b_resp->userInformationField;
524                 no_failed++;
525             }
526         }
527         else
528             no_failed++;
529         bit++;
530     }
531     *f_resp->preferredMessageSize = preferredMessageSize;
532     *f_resp->maximumRecordSize = maximumRecordSize;
533
534     if (m_p->m_hide_unavailable)
535     {
536         if (no_succeeded == 0)
537         {
538             *f_resp->result = 0;
539             package.session().close();
540         }
541     }
542     else
543     {
544         if (no_failed)
545         {
546             *f_resp->result = 0;
547             package.session().close();
548         }
549     }
550     package.response() = f_apdu;
551 }
552
553 void yf::Multi::Frontend::record_diagnostics(Z_Records *records, 
554                                              Z_DiagRecs * &z_diag,
555                                              ODR odr,
556                                              int &no_successful)
557 {
558     // see we get any errors (AKA diagnstics)
559     if (records)
560     {
561         if (records->which == Z_Records_NSD)
562         {
563             if (!z_diag)
564             {
565                 z_diag = (Z_DiagRecs *)
566                     odr_malloc(odr, sizeof(*z_diag));
567                 z_diag->num_diagRecs = 0;
568                 z_diag->diagRecs = (Z_DiagRec**)
569                     odr_malloc(odr, sizeof(*z_diag->diagRecs));
570             }
571             else
572             {
573                 Z_DiagRec **n = (Z_DiagRec **)
574                     odr_malloc(odr,
575                                (1 + z_diag->num_diagRecs) * sizeof(*n));
576                 memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs
577                        * sizeof(*n));
578                 z_diag->diagRecs = n;
579             }
580             Z_DiagRec *nr = (Z_DiagRec *) odr_malloc(odr, sizeof(*nr));
581             nr->which = Z_DiagRec_defaultFormat;
582             nr->u.defaultFormat =
583                 records->u.nonSurrogateDiagnostic;
584             z_diag->diagRecs[z_diag->num_diagRecs++] = nr;
585         }
586         else if (records->which == Z_Records_multipleNSD)
587         {
588             Z_DiagRecs * dr =records->u.multipleNonSurDiagnostics;
589
590             if (!z_diag)
591             {
592                 z_diag = (Z_DiagRecs *) odr_malloc(odr, sizeof(*z_diag));
593                 z_diag->num_diagRecs = 0;
594                 z_diag->diagRecs = 0;
595             }
596             Z_DiagRec **n = (Z_DiagRec **)
597                 odr_malloc(odr,
598                            (dr->num_diagRecs + z_diag->num_diagRecs) * 
599                            sizeof(*n));
600             if (z_diag->num_diagRecs)
601                 memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs * sizeof(*n));
602             memcpy(n + z_diag->num_diagRecs,
603                    dr->diagRecs, dr->num_diagRecs * sizeof(*n));
604             z_diag->diagRecs = n;
605             z_diag->num_diagRecs += dr->num_diagRecs;
606         }
607         else
608             no_successful++; // probably piggyback
609     }
610     else
611         no_successful++;  // no records and no diagnostics
612 }
613
614 void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
615 {
616     // create search request
617     Z_SearchRequest *req = apdu_req->u.searchRequest;
618
619     std::list<BackendPtr>::const_iterator bit;
620     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
621     {
622         PackagePtr p = (*bit)->m_package;
623         yazpp_1::GDU gdu1(apdu_req);
624         mp::odr odr;
625         Z_SearchRequest *req1 = gdu1.get()->u.z3950->u.searchRequest;
626
627         // they are altered now - to disable piggyback
628         *req1->smallSetUpperBound = 0;
629         *req1->largeSetLowerBound = 1;
630         *req1->mediumSetPresentNumber = 0;
631
632         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
633                                                &req1->num_databaseNames,
634                                                &req1->databaseNames))
635         {
636             req1->num_databaseNames = req->num_databaseNames;
637             req1->databaseNames = req->databaseNames;
638         }
639         p->request() = gdu1;
640         p->copy_filter(package);
641     }
642     multi_move(m_backend_list);
643
644     // look at each response
645     FrontendSet resultSet(std::string(req->resultSetName));
646
647     mp::odr odr;
648     Odr_int result_set_size = 0;
649     Z_DiagRecs *z_diag = 0;
650     int no_successful = 0;
651     PackagePtr close_p;
652
653     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
654     {
655         PackagePtr p = (*bit)->m_package;
656
657         // save closing package for at least one target
658         if (p->session().is_closed())
659             close_p = p;
660
661         Z_GDU *gdu = p->response().get();
662         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
663             Z_APDU_searchResponse)
664         {
665             Z_APDU *b_apdu = gdu->u.z3950;
666             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
667
668             record_diagnostics(b_resp->records, z_diag, odr, no_successful);
669
670             BackendSet backendSet;
671             backendSet.m_backend = *bit;
672             backendSet.m_count = *b_resp->resultCount;
673             result_set_size += *b_resp->resultCount;
674             resultSet.m_backend_sets.push_back(backendSet);
675         }
676     }
677
678     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
679     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
680
681     yaz_log(YLOG_LOG, "no_successful=%d is_closed=%s hide_errors=%s",
682             no_successful,
683             close_p ? "true" : "false",
684             m_p->m_hide_errors ? "true" : "false");
685     *f_resp->resultCount = result_set_size;
686     if (close_p && (no_successful == 0 || !m_p->m_hide_errors))
687     {
688         package.session().close();
689         package.response() = close_p->response();
690         return;
691     }
692     if (z_diag && (no_successful == 0 || !m_p->m_hide_errors))
693     {
694         f_resp->records = (Z_Records *)
695             odr_malloc(odr, sizeof(*f_resp->records));
696         if (z_diag->num_diagRecs > 1)
697         {
698             f_resp->records->which = Z_Records_multipleNSD;
699             f_resp->records->u.multipleNonSurDiagnostics = z_diag;
700         }
701         else
702         {
703             f_resp->records->which = Z_Records_NSD;
704             f_resp->records->u.nonSurrogateDiagnostic =
705                 z_diag->diagRecs[0]->u.defaultFormat;
706         }
707     }
708     // assume OK
709     m_sets[resultSet.m_setname] = resultSet;
710
711     Odr_int number;
712     mp::util::piggyback(*req->smallSetUpperBound,
713                         *req->largeSetLowerBound,
714                         *req->mediumSetPresentNumber,
715                         0, 0,
716                         result_set_size,
717                         number, 0);
718     Package pp(package.session(), package.origin());
719     if (z_diag == 0 && number > 0)
720     {
721         pp.copy_filter(package);
722         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
723         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
724         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
725         p_req->resultSetId = req->resultSetName;
726         *p_req->resultSetStartPoint = 1;
727         *p_req->numberOfRecordsRequested = number;
728         pp.request() = p_apdu;
729         present(pp, p_apdu);
730
731         if (pp.session().is_closed())
732             package.session().close();
733
734         Z_GDU *gdu = pp.response().get();
735         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
736             Z_APDU_presentResponse)
737         {
738             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
739             f_resp->records = p_res->records;
740             *f_resp->numberOfRecordsReturned =
741                 *p_res->numberOfRecordsReturned;
742             *f_resp->nextResultSetPosition =
743                 *p_res->nextResultSetPosition;
744         }
745         else
746         {
747             package.response() = pp.response();
748             return;
749         }
750     }
751     package.response() = f_apdu; // in this scope because of p
752 }
753
754 void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
755 {
756     // create present request
757     Z_PresentRequest *req = apdu_req->u.presentRequest;
758
759     Sets_it it;
760     it = m_sets.find(std::string(req->resultSetId));
761     if (it == m_sets.end())
762     {
763         mp::odr odr;
764         Z_APDU *apdu =
765             odr.create_presentResponse(
766                 apdu_req,
767                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
768                 req->resultSetId);
769         package.response() = apdu;
770         return;
771     }
772     std::list<Multi::FrontendSet::PresentJob> jobs;
773     int start = *req->resultSetStartPoint;
774     int number = *req->numberOfRecordsRequested;
775
776     if (m_p->m_merge_type == round_robin)
777         it->second.round_robin(start, number, jobs);
778     else if (m_p->m_merge_type == serve_order)
779         it->second.serve_order(start, number, jobs);
780
781     if (0)
782     {
783         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
784         for (jit = jobs.begin(); jit != jobs.end(); jit++)
785         {
786             yaz_log(YLOG_LOG, "job pos=%d", jit->m_pos);
787         }
788     }
789
790     std::list<BackendPtr> present_backend_list;
791
792     std::list<BackendSet>::const_iterator bsit;
793     bsit = it->second.m_backend_sets.begin();
794     for (; bsit != it->second.m_backend_sets.end(); bsit++)
795     {
796         int start = -1;
797         int end = -1;
798         {
799             std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
800             for (jit = jobs.begin(); jit != jobs.end(); jit++)
801             {
802                 if (jit->m_backend == bsit->m_backend)
803                 {
804                     if (start == -1 || jit->m_pos < start)
805                         start = jit->m_pos;
806                     if (end == -1 || jit->m_pos > end)
807                         end = jit->m_pos;
808                 }
809             }
810         }
811         if (start != -1)
812         {
813             std::list<Multi::FrontendSet::PresentJob>::iterator jit;
814             for (jit = jobs.begin(); jit != jobs.end(); jit++)
815             {
816                 if (jit->m_backend == bsit->m_backend)
817                 {
818                     if (jit->m_pos >= start && jit->m_pos <= end)
819                         jit->m_start = start;
820                 }
821             }
822
823             PackagePtr p = bsit->m_backend->m_package;
824
825             *req->resultSetStartPoint = start;
826             *req->numberOfRecordsRequested = end - start + 1;
827
828             p->request() = apdu_req;
829             p->copy_filter(package);
830
831             present_backend_list.push_back(bsit->m_backend);
832         }
833     }
834     multi_move(present_backend_list);
835
836     // look at each response
837     Z_DiagRecs *z_diag = 0;
838     int no_successful = 0;
839     mp::odr odr;
840     PackagePtr close_p;
841
842     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
843     for (; pbit != present_backend_list.end(); pbit++)
844     {
845         PackagePtr p = (*pbit)->m_package;
846
847         // save closing package for at least one target
848         if (p->session().is_closed())
849             close_p = p;
850
851         Z_GDU *gdu = p->response().get();
852         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
853             Z_APDU_presentResponse)
854         {
855             Z_APDU *b_apdu = gdu->u.z3950;
856             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
857
858             record_diagnostics(b_resp->records, z_diag, odr, no_successful);
859         }
860     }
861
862     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
863     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
864
865     if (close_p && (no_successful == 0 || !m_p->m_hide_errors))
866     {
867         package.session().close();
868         package.response() = close_p->response();
869         return;
870     }
871     if (z_diag && (no_successful == 0 || !m_p->m_hide_errors))
872     {
873         f_resp->records = (Z_Records *)
874             odr_malloc(odr, sizeof(*f_resp->records));
875         if (z_diag->num_diagRecs > 1)
876         {
877             f_resp->records->which = Z_Records_multipleNSD;
878             f_resp->records->u.multipleNonSurDiagnostics = z_diag;
879         }
880         else
881         {
882             f_resp->records->which = Z_Records_NSD;
883             f_resp->records->u.nonSurrogateDiagnostic =
884                 z_diag->diagRecs[0]->u.defaultFormat;
885         }
886     }
887     else if (number < 0 || (size_t) number > jobs.size())
888     {
889         f_apdu =
890             odr.create_presentResponse(
891                 apdu_req,
892                 YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
893                 0);
894     }
895     else
896     {
897         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
898         Z_Records * records = f_resp->records;
899         records->which = Z_Records_DBOSD;
900         records->u.databaseOrSurDiagnostics =
901             (Z_NamePlusRecordList *)
902             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
903         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
904         nprl->num_records = jobs.size();
905         nprl->records = (Z_NamePlusRecord**)
906             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
907         int i = 0;
908         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
909         for (jit = jobs.begin(); jit != jobs.end(); jit++)
910         {
911             PackagePtr p = jit->m_backend->m_package;
912
913             Z_GDU *gdu = p->response().get();
914             Z_APDU *b_apdu = gdu->u.z3950;
915             int inside_pos = jit->m_pos - jit->m_start;
916             Z_Records *records = b_apdu->u.presentResponse->records;
917
918             if (records && records->which == Z_Records_DBOSD
919                 && inside_pos < 
920                 records->u.databaseOrSurDiagnostics->num_records)
921             {
922                 nprl->records[i] = (Z_NamePlusRecord*)
923                     odr_malloc(odr, sizeof(Z_NamePlusRecord));
924                 *nprl->records[i] = *records->
925                     u.databaseOrSurDiagnostics->records[inside_pos];
926                 nprl->records[i]->databaseName =
927                     odr_strdup(odr, jit->m_backend->m_vhost.c_str());
928                 i++;
929             }
930         }
931         nprl->num_records = i; // usually same as jobs.size();
932         *f_resp->nextResultSetPosition = start + i;
933         *f_resp->numberOfRecordsReturned = i;
934     }
935     package.response() = f_apdu;
936 }
937
938 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
939 {
940     return m_norm_term < k.m_norm_term;
941 }
942
943 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
944 {
945     return m_norm_term == k.m_norm_term;
946 }
947
948 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
949 {
950     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
951     e->which = Z_Entry_termInfo;
952     Z_TermInfo *t;
953     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
954     t->suggestedAttributes = 0;
955     t->displayTerm = 0;
956     t->alternativeTerm = 0;
957     t->byAttributes = 0;
958     t->otherTermInfo = 0;
959     t->globalOccurrences = odr_intdup(odr, m_count);
960     t->term = (Z_Term *)
961         odr_malloc(odr, sizeof(*t->term));
962     t->term->which = Z_Term_general;
963     Odr_oct *o;
964     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
965
966     o->len = o->size = m_norm_term.size();
967     o->buf = (unsigned char *) odr_malloc(odr, o->len);
968     memcpy(o->buf, m_norm_term.c_str(), o->len);
969     return e;
970 }
971
972 void yf::Multi::Frontend::relay_apdu(mp::Package &package, Z_APDU *apdu_req)
973 {
974     std::list<BackendPtr>::const_iterator bit;
975     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
976     {
977         PackagePtr p = (*bit)->m_package;
978         mp::odr odr;
979
980         p->request() = apdu_req;
981         p->copy_filter(package);
982     }
983     multi_move(m_backend_list);
984     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
985     {
986         PackagePtr p = (*bit)->m_package;
987
988         if (p->session().is_closed()) // if any backend closes, close frontend
989             package.session().close();
990
991         package.response() = p->response();
992     }
993 }
994
995
996 void yf::Multi::Frontend::scan(mp::Package &package, Z_APDU *apdu_req)
997 {
998     Z_ScanRequest *req = apdu_req->u.scanRequest;
999
1000     std::list<BackendPtr>::const_iterator bit;
1001     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1002     {
1003         PackagePtr p = (*bit)->m_package;
1004         yazpp_1::GDU gdu1(apdu_req);
1005         mp::odr odr;
1006         Z_ScanRequest *req1 = gdu1.get()->u.z3950->u.scanRequest;
1007
1008         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
1009                                                &req1->num_databaseNames,
1010                                                &req1->databaseNames))
1011         {
1012             req1->num_databaseNames = req->num_databaseNames;
1013             req1->databaseNames = req->databaseNames;
1014         }
1015         p->request() = gdu1;
1016         p->copy_filter(package);
1017     }
1018     multi_move(m_backend_list);
1019
1020     ScanTermInfoList entries_before;
1021     ScanTermInfoList entries_after;
1022     int no_before = 0;
1023     int no_after = 0;
1024
1025     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1026     {
1027         PackagePtr p = (*bit)->m_package;
1028
1029         if (p->session().is_closed()) // if any backend closes, close frontend
1030             package.session().close();
1031
1032         Z_GDU *gdu = p->response().get();
1033         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1034             Z_APDU_scanResponse)
1035         {
1036             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
1037
1038             if (res->entries && res->entries->nonsurrogateDiagnostics)
1039             {
1040                 // failure
1041                 mp::odr odr;
1042                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
1043                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
1044
1045                 f_res->entries->nonsurrogateDiagnostics =
1046                     res->entries->nonsurrogateDiagnostics;
1047                 f_res->entries->num_nonsurrogateDiagnostics =
1048                     res->entries->num_nonsurrogateDiagnostics;
1049
1050                 package.response() = f_apdu;
1051                 return;
1052             }
1053
1054             if (res->entries && res->entries->entries)
1055             {
1056                 Z_Entry **entries = res->entries->entries;
1057                 int num_entries = res->entries->num_entries;
1058                 int position = 1;
1059                 if (req->preferredPositionInResponse)
1060                     position = *req->preferredPositionInResponse;
1061                 if (res->positionOfTerm)
1062                     position = *res->positionOfTerm;
1063
1064                 // before
1065                 int i;
1066                 for (i = 0; i<position-1 && i<num_entries; i++)
1067                 {
1068                     Z_Entry *ent = entries[i];
1069
1070                     if (ent->which == Z_Entry_termInfo)
1071                     {
1072                         ScanTermInfo my;
1073
1074                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1075                         my.m_count = occur ? *occur : 0;
1076
1077                         if (ent->u.termInfo->term->which == Z_Term_general)
1078                         {
1079                             my.m_norm_term = std::string(
1080                                 (const char *)
1081                                 ent->u.termInfo->term->u.general->buf,
1082                                 ent->u.termInfo->term->u.general->len);
1083                         }
1084                         if (my.m_norm_term.length())
1085                         {
1086                             ScanTermInfoList::iterator it =
1087                                 entries_before.begin();
1088                             while (it != entries_before.end() && my <*it)
1089                                 it++;
1090                             if (it != entries_before.end() && my == *it)
1091                             {
1092                                 it->m_count += my.m_count;
1093                             }
1094                             else
1095                             {
1096                                 entries_before.insert(it, my);
1097                                 no_before++;
1098                             }
1099                         }
1100                     }
1101                 }
1102                 // after
1103                 if (position <= 0)
1104                     i = 0;
1105                 else
1106                     i = position-1;
1107                 for ( ; i<num_entries; i++)
1108                 {
1109                     Z_Entry *ent = entries[i];
1110
1111                     if (ent->which == Z_Entry_termInfo)
1112                     {
1113                         ScanTermInfo my;
1114
1115                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1116                         my.m_count = occur ? *occur : 0;
1117
1118                         if (ent->u.termInfo->term->which == Z_Term_general)
1119                         {
1120                             my.m_norm_term = std::string(
1121                                 (const char *)
1122                                 ent->u.termInfo->term->u.general->buf,
1123                                 ent->u.termInfo->term->u.general->len);
1124                         }
1125                         if (my.m_norm_term.length())
1126                         {
1127                             ScanTermInfoList::iterator it =
1128                                 entries_after.begin();
1129                             while (it != entries_after.end() && *it < my)
1130                                 it++;
1131                             if (it != entries_after.end() && my == *it)
1132                             {
1133                                 it->m_count += my.m_count;
1134                             }
1135                             else
1136                             {
1137                                 entries_after.insert(it, my);
1138                                 no_after++;
1139                             }
1140                         }
1141                     }
1142                 }
1143
1144             }
1145         }
1146         else
1147         {
1148             // if any target does not return scan response - return that
1149             package.response() = p->response();
1150             return;
1151         }
1152     }
1153
1154     if (false)
1155     {
1156         std::cout << "BEFORE\n";
1157         ScanTermInfoList::iterator it = entries_before.begin();
1158         for(; it != entries_before.end(); it++)
1159         {
1160             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1161         }
1162
1163         std::cout << "AFTER\n";
1164         it = entries_after.begin();
1165         for(; it != entries_after.end(); it++)
1166         {
1167             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1168         }
1169     }
1170
1171     if (false)
1172     {
1173         mp::odr odr;
1174         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1175         package.response() = f_apdu;
1176     }
1177     else
1178     {
1179         mp::odr odr;
1180         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1181         Z_ScanResponse *resp = f_apdu->u.scanResponse;
1182
1183         int number_returned = *req->numberOfTermsRequested;
1184         int position_returned = *req->preferredPositionInResponse;
1185
1186         resp->entries->num_entries = number_returned;
1187         resp->entries->entries = (Z_Entry**)
1188             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1189         int i;
1190
1191         int lbefore = entries_before.size();
1192         if (lbefore < position_returned-1)
1193             position_returned = lbefore+1;
1194
1195         ScanTermInfoList::iterator it = entries_before.begin();
1196         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1197         {
1198             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1199         }
1200
1201         it = entries_after.begin();
1202
1203         if (position_returned <= 0)
1204             i = 0;
1205         else
1206             i = position_returned-1;
1207         for (; i<number_returned && it != entries_after.end(); i++, it++)
1208         {
1209             resp->entries->entries[i] = it->get_entry(odr);
1210         }
1211
1212         number_returned = i;
1213
1214         resp->positionOfTerm = odr_intdup(odr, position_returned);
1215         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1216         resp->entries->num_entries = number_returned;
1217
1218         package.response() = f_apdu;
1219     }
1220 }
1221
1222
1223 void yf::Multi::process(mp::Package &package) const
1224 {
1225     FrontendPtr f = m_p->get_frontend(package);
1226
1227     Z_GDU *gdu = package.request().get();
1228
1229     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1230         Z_APDU_initRequest && !f->m_is_multi)
1231     {
1232         f->init(package, gdu);
1233     }
1234     else if (!f->m_is_multi)
1235         package.move();
1236     else if (gdu && gdu->which == Z_GDU_Z3950)
1237     {
1238         Z_APDU *apdu = gdu->u.z3950;
1239         if (apdu->which == Z_APDU_initRequest)
1240         {
1241             mp::odr odr;
1242
1243             package.response() = odr.create_close(
1244                 apdu,
1245                 Z_Close_protocolError,
1246                 "double init");
1247
1248             package.session().close();
1249         }
1250         else if (apdu->which == Z_APDU_searchRequest)
1251         {
1252             f->search(package, apdu);
1253         }
1254         else if (apdu->which == Z_APDU_presentRequest)
1255         {
1256             f->present(package, apdu);
1257         }
1258         else if (apdu->which == Z_APDU_scanRequest)
1259         {
1260             f->scan(package, apdu);
1261         }
1262         else if (apdu->which == Z_APDU_close)
1263         {
1264             f->relay_apdu(package, apdu);
1265         }
1266         else
1267         {
1268             mp::odr odr;
1269
1270             package.response() = odr.create_close(
1271                 apdu, Z_Close_protocolError,
1272                 "unsupported APDU in filter multi");
1273
1274             package.session().close();
1275         }
1276     }
1277     m_p->release_frontend(package);
1278 }
1279
1280 void mp::filter::Multi::configure(const xmlNode * ptr, bool test_only,
1281                                   const char *path)
1282 {
1283     for (ptr = ptr->children; ptr; ptr = ptr->next)
1284     {
1285         if (ptr->type != XML_ELEMENT_NODE)
1286             continue;
1287         if (!strcmp((const char *) ptr->name, "target"))
1288         {
1289             std::string auth;
1290             std::string route = mp::xml::get_route(ptr, auth);
1291             std::string target = mp::xml::get_text(ptr);
1292             if (target.length() == 0)
1293                 target = route;
1294             m_p->m_route_patterns.push_back(Multi::Map(target, route, auth));
1295         }
1296         else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1297         {
1298             m_p->m_hide_unavailable = true;
1299         }
1300         else if (!strcmp((const char *) ptr->name, "hideerrors"))
1301         {
1302             m_p->m_hide_errors = true;
1303         }
1304         else if (!strcmp((const char *) ptr->name, "mergetype"))
1305         {
1306             std::string mergetype = mp::xml::get_text(ptr);
1307             if (mergetype == "roundrobin")
1308                 m_p->m_merge_type = round_robin;
1309             else if (mergetype == "serveorder")
1310                 m_p->m_merge_type = serve_order;
1311             else
1312                 throw mp::filter::FilterException
1313                     ("Bad mergetype "  + mergetype + " in multi filter");
1314
1315         }
1316         else
1317         {
1318             throw mp::filter::FilterException
1319                 ("Bad element "
1320                  + std::string((const char *) ptr->name)
1321                  + " in multi filter");
1322         }
1323     }
1324 }
1325
1326 static mp::filter::Base* filter_creator()
1327 {
1328     return new mp::filter::Multi;
1329 }
1330
1331 extern "C" {
1332     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1333         0,
1334         "multi",
1335         filter_creator
1336     };
1337 }
1338
1339
1340 /*
1341  * Local variables:
1342  * c-basic-offset: 4
1343  * c-file-style: "Stroustrup"
1344  * indent-tabs-mode: nil
1345  * End:
1346  * vim: shiftwidth=4 tabstop=8 expandtab
1347  */
1348