multi: empty route pattern uses route as default
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2012 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include <yaz/log.h>
20
21 #include "config.hpp"
22
23 #include <metaproxy/filter.hpp>
24 #include <metaproxy/package.hpp>
25
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition.hpp>
29 #include <boost/shared_ptr.hpp>
30
31 #include <metaproxy/util.hpp>
32 #include "filter_multi.hpp"
33
34 #include <yaz/zgdu.h>
35 #include <yaz/otherinfo.h>
36 #include <yaz/diagbib1.h>
37 #include <yaz/match_glob.h>
38
39 #include <vector>
40 #include <algorithm>
41 #include <map>
42 #include <iostream>
43
44 namespace mp = metaproxy_1;
45 namespace yf = mp::filter;
46
47 namespace metaproxy_1 {
48     namespace filter {
49         enum multi_merge_type {
50             round_robin,
51             serve_order
52         };
53         struct Multi::BackendSet {
54             BackendPtr m_backend;
55             int m_count;
56             bool operator < (const BackendSet &k) const;
57             bool operator == (const BackendSet &k) const;
58         };
59         struct Multi::ScanTermInfo {
60             std::string m_norm_term;
61             std::string m_display_term;
62             int m_count;
63             bool operator < (const ScanTermInfo &) const;
64             bool operator == (const ScanTermInfo &) const;
65             Z_Entry *get_entry(ODR odr);
66         };
67         struct Multi::FrontendSet {
68             class PresentJob {
69             public:
70                 BackendPtr m_backend;
71                 int m_pos; // position for backend (1=first, 2=second,..
72                 int m_start; // present request start
73                 PresentJob(BackendPtr ptr, int pos) : 
74                     m_backend(ptr), m_pos(pos), m_start(0) {};
75             };
76             FrontendSet(std::string setname);
77             FrontendSet();
78             ~FrontendSet();
79
80             void round_robin(int pos, int number, std::list<PresentJob> &job);
81             void serve_order(int pos, int number, std::list<PresentJob> &job);
82
83             std::list<BackendSet> m_backend_sets;
84             std::string m_setname;
85         };
86         struct Multi::Backend {
87             PackagePtr m_package;
88             std::string m_backend_database;
89             std::string m_vhost;
90             std::string m_route;
91             void operator() (void);  // thread operation
92         };
93         struct Multi::Frontend {
94             Frontend(Rep *rep);
95             ~Frontend();
96             bool m_is_multi;
97             bool m_in_use;
98             std::list<BackendPtr> m_backend_list;
99             std::map<std::string,Multi::FrontendSet> m_sets;
100
101             void multi_move(std::list<BackendPtr> &blist);
102             void init(Package &package, Z_GDU *gdu);
103             void close(Package &package);
104             void search(Package &package, Z_APDU *apdu);
105             void present(Package &package, Z_APDU *apdu);
106             void scan1(Package &package, Z_APDU *apdu);
107             void scan2(Package &package, Z_APDU *apdu);
108             void relay_apdu(Package &package, Z_APDU *apdu);
109             Rep *m_p;
110         };            
111         class Multi::Map {
112             std::string m_target_pattern;
113             std::string m_route;
114         public:
115             Map(std::string pattern, std::string route) : 
116                 m_target_pattern(pattern), m_route(route) {};
117             bool match(const std::string target, std::string *ret) const {
118                 if (yaz_match_glob(m_target_pattern.c_str(), target.c_str()))
119                 {
120                     *ret = m_route;
121                     return true;
122                 }
123                 return false;
124             };
125         };
126         class Multi::Rep {
127             friend class Multi;
128             friend struct Frontend;
129             
130             Rep();
131             FrontendPtr get_frontend(Package &package);
132             void release_frontend(Package &package);
133         private:
134             std::list<Multi::Map> m_route_patterns;
135             boost::mutex m_mutex;
136             boost::condition m_cond_session_ready;
137             std::map<mp::Session, FrontendPtr> m_clients;
138             bool m_hide_unavailable;
139             bool m_hide_errors;
140             multi_merge_type m_merge_type;
141         };
142     }
143 }
144
145 yf::Multi::Rep::Rep()
146 {
147     m_hide_unavailable = false;
148     m_hide_errors = false;
149     m_merge_type = round_robin;
150 }
151
152 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
153 {
154     return m_count < k.m_count;
155 }
156
157 yf::Multi::Frontend::Frontend(Rep *rep)
158 {
159     m_p = rep;
160     m_is_multi = false;
161 }
162
163 yf::Multi::Frontend::~Frontend()
164 {
165 }
166
167 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
168 {
169     boost::mutex::scoped_lock lock(m_mutex);
170
171     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
172     
173     while(true)
174     {
175         it = m_clients.find(package.session());
176         if (it == m_clients.end())
177             break;
178         
179         if (!it->second->m_in_use)
180         {
181             it->second->m_in_use = true;
182             return it->second;
183         }
184         m_cond_session_ready.wait(lock);
185     }
186     FrontendPtr f(new Frontend(this));
187     m_clients[package.session()] = f;
188     f->m_in_use = true;
189     return f;
190 }
191
192 void yf::Multi::Rep::release_frontend(mp::Package &package)
193 {
194     boost::mutex::scoped_lock lock(m_mutex);
195     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
196     
197     it = m_clients.find(package.session());
198     if (it != m_clients.end())
199     {
200         if (package.session().is_closed())
201         {
202             it->second->close(package);
203             m_clients.erase(it);
204         }
205         else
206         {
207             it->second->m_in_use = false;
208         }
209         m_cond_session_ready.notify_all();
210     }
211 }
212
213 yf::Multi::FrontendSet::FrontendSet(std::string setname)
214     :  m_setname(setname)
215 {
216 }
217
218
219 yf::Multi::FrontendSet::FrontendSet()
220 {
221 }
222
223
224 yf::Multi::FrontendSet::~FrontendSet()
225 {
226 }
227
228 yf::Multi::Multi() : m_p(new Multi::Rep)
229 {
230 }
231
232 yf::Multi::~Multi() {
233 }
234
235
236 void yf::Multi::Backend::operator() (void) 
237 {
238     m_package->move(m_route);
239 }
240
241
242 void yf::Multi::Frontend::close(mp::Package &package)
243 {
244     std::list<BackendPtr>::const_iterator bit;
245     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
246     {
247         BackendPtr b = *bit;
248
249         b->m_package->copy_filter(package);
250         b->m_package->request() = (Z_GDU *) 0;
251         b->m_package->session().close();
252         b->m_package->move(b->m_route);
253     }
254 }
255
256 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
257 {
258     std::list<BackendPtr>::const_iterator bit;
259     boost::thread_group g;
260     for (bit = blist.begin(); bit != blist.end(); bit++)
261     {
262         g.add_thread(new boost::thread(**bit));
263     }
264     g.join_all();
265 }
266
267 void yf::Multi::FrontendSet::serve_order(int start, int number,
268                                          std::list<PresentJob> &jobs)
269 {
270     int i;
271     for (i = 0; i < number; i++)
272     {
273         std::list<BackendSet>::const_iterator bsit;
274         int voffset = 0;
275         int offset = start + i - 1;
276         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); 
277              bsit++)
278         {
279             if (offset >= voffset && offset < voffset + bsit->m_count)
280             {
281                 PresentJob job(bsit->m_backend, offset - voffset + 1);
282                 jobs.push_back(job);
283                 break;
284             }
285             voffset += bsit->m_count;
286         }
287     }
288 }
289
290 void yf::Multi::FrontendSet::round_robin(int start, int number,
291                                          std::list<PresentJob> &jobs)
292 {
293     std::list<int> pos;
294     std::list<BackendSet>::const_iterator bsit;
295     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
296     {
297         pos.push_back(1);
298     }
299
300     int p = 1;
301 #if 1
302     // optimization step!
303     int omin = 0;
304     while(true)
305     {
306         int min = 0;
307         int no_left = 0;
308         // find min count for each set which is > omin
309         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
310         {
311             if (bsit->m_count > omin)
312             {
313                 if (no_left == 0 || bsit->m_count < min)
314                     min = bsit->m_count;
315                 no_left++;
316             }
317         }
318         if (no_left == 0) // if nothing greater than omin, bail out.
319             break;
320         int skip = no_left * min;
321         if (p + skip > start)  // step gets us "into" present range?
322         {
323             // Yes. skip until start.. Rounding off is deliberate!
324             min = (start-p) / no_left;
325             p += no_left * min;
326             
327             // update positions in each set..
328             std::list<int>::iterator psit = pos.begin();
329             for (psit = pos.begin(); psit != pos.end(); psit++)
330                 *psit += min;
331             break;
332         }
333         // skip on each set.. before "present range"..
334         p = p + skip;
335         
336         std::list<int>::iterator psit = pos.begin();
337         for (psit = pos.begin(); psit != pos.end(); psit++)
338             *psit += min;
339         
340         omin = min; // update so we consider next class (with higher count)
341     }
342 #endif
343     int fetched = 0;
344     bool more = true;
345     while (more)
346     {
347         more = false;
348         std::list<int>::iterator psit = pos.begin();
349         bsit = m_backend_sets.begin();
350
351         for (; bsit != m_backend_sets.end(); psit++,bsit++)
352         {
353             if (fetched >= number)
354             {
355                 more = false;
356                 break;
357             }
358             if (*psit <= bsit->m_count)
359             {
360                 if (p >= start)
361                 {
362                     PresentJob job(bsit->m_backend, *psit);
363                     jobs.push_back(job);
364                     fetched++;
365                 }
366                 (*psit)++;
367                 p++;
368                 more = true;
369             }
370         }
371     }
372 }
373
374 void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
375 {
376     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
377
378     std::list<std::string> targets;
379
380     mp::util::get_vhost_otherinfo(req->otherInfo, targets);
381
382     if (targets.size() < 1)
383     {
384         package.move();
385         return;
386     }
387
388     std::list<std::string>::const_iterator t_it = targets.begin();
389     for (; t_it != targets.end(); t_it++)
390     {
391         Session s;
392         Backend *b = new Backend;
393         b->m_vhost = *t_it;
394
395         std::list<Multi::Map>::const_iterator it =
396             m_p->m_route_patterns.begin();
397         while (it != m_p->m_route_patterns.end()) {
398             if (it->match(*t_it, &b->m_route))
399                 break;
400             it++;
401         }
402         // b->m_route = m_p->m_target_route[*t_it];
403         // b->m_route unset
404         b->m_package = PackagePtr(new Package(s, package.origin()));
405
406         m_backend_list.push_back(BackendPtr(b));
407     }
408     m_is_multi = true;
409
410     // create init request 
411     std::list<BackendPtr>::iterator bit;
412     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
413     {
414         mp::odr odr;
415         BackendPtr b = *bit;
416         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
417         
418         std::list<std::string>vhost_one;
419         vhost_one.push_back(b->m_vhost);
420         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
421                                        odr, vhost_one);
422
423
424         Z_InitRequest *breq = init_apdu->u.initRequest;
425
426         breq->idAuthentication = req->idAuthentication;
427         
428         *breq->preferredMessageSize = *req->preferredMessageSize;
429         *breq->maximumRecordSize = *req->maximumRecordSize;
430
431         ODR_MASK_SET(breq->options, Z_Options_search);
432         ODR_MASK_SET(breq->options, Z_Options_present);
433         ODR_MASK_SET(breq->options, Z_Options_namedResultSets);
434         ODR_MASK_SET(breq->options, Z_Options_scan);
435         
436         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_1);
437         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_2);
438         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_3);
439         
440         b->m_package->request() = init_apdu;
441
442         b->m_package->copy_filter(package);
443     }
444     multi_move(m_backend_list);
445
446     // create the frontend init response based on each backend init response
447     mp::odr odr;
448
449     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
450     Z_InitResponse *f_resp = f_apdu->u.initResponse;
451
452     ODR_MASK_SET(f_resp->options, Z_Options_search);
453     ODR_MASK_SET(f_resp->options, Z_Options_present);
454     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
455     ODR_MASK_SET(f_resp->options, Z_Options_scan);
456     
457     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
458     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
459     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
460
461     int no_failed = 0;
462     int no_succeeded = 0;
463
464     Odr_int preferredMessageSize = *req->preferredMessageSize;
465     Odr_int maximumRecordSize = *req->maximumRecordSize;
466     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
467     {
468         PackagePtr p = (*bit)->m_package;
469         
470         if (p->session().is_closed())
471         {
472             // failed. Remove from list and increment number of failed
473             no_failed++;
474             bit = m_backend_list.erase(bit);
475             continue;
476         }
477         Z_GDU *gdu = p->response().get();
478         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
479             Z_APDU_initResponse)
480         {
481             int i;
482             Z_APDU *b_apdu = gdu->u.z3950;
483             Z_InitResponse *b_resp = b_apdu->u.initResponse;
484
485             // common options for all backends
486             for (i = 0; i <= Z_Options_stringSchema; i++)
487             {
488                 if (!ODR_MASK_GET(b_resp->options, i))
489                     ODR_MASK_CLEAR(f_resp->options, i);
490             }
491             // common protocol version
492             for (i = 0; i <= Z_ProtocolVersion_3; i++)
493                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
494                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
495             if (*b_resp->result)
496             {
497                 no_succeeded++;
498                 if (preferredMessageSize > *b_resp->preferredMessageSize)
499                     preferredMessageSize = *b_resp->preferredMessageSize;
500                 if (maximumRecordSize > *b_resp->maximumRecordSize)
501                     maximumRecordSize = *b_resp->maximumRecordSize;
502             }
503             else
504                 no_failed++;
505         }
506         else
507             no_failed++;
508         bit++;
509     }
510     *f_resp->preferredMessageSize = preferredMessageSize;
511     *f_resp->maximumRecordSize = maximumRecordSize;
512
513     if (m_p->m_hide_unavailable)
514     {
515         if (no_succeeded == 0)
516         {
517             *f_resp->result = 0;
518             package.session().close();
519         }
520     }
521     else
522     {
523         if (no_failed)
524         {
525             *f_resp->result = 0;
526             package.session().close();
527         }
528     }
529     package.response() = f_apdu;
530 }
531
532 void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
533 {
534     // create search request 
535     Z_SearchRequest *req = apdu_req->u.searchRequest;
536
537     // save these for later
538     Odr_int smallSetUpperBound = *req->smallSetUpperBound;
539     Odr_int largeSetLowerBound = *req->largeSetLowerBound;
540     Odr_int mediumSetPresentNumber = *req->mediumSetPresentNumber;
541     
542     // they are altered now - to disable piggyback
543     *req->smallSetUpperBound = 0;
544     *req->largeSetLowerBound = 1;
545     *req->mediumSetPresentNumber = 1;
546
547     int default_num_db = req->num_databaseNames;
548     char **default_db = req->databaseNames;
549
550     std::list<BackendPtr>::const_iterator bit;
551     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
552     {
553         PackagePtr p = (*bit)->m_package;
554         mp::odr odr;
555     
556         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
557                                                 &req->num_databaseNames,
558                                                 &req->databaseNames))
559         {
560             req->num_databaseNames = default_num_db;
561             req->databaseNames = default_db;
562         }
563         p->request() = apdu_req;
564         p->copy_filter(package);
565     }
566     multi_move(m_backend_list);
567
568     // look at each response
569     FrontendSet resultSet(std::string(req->resultSetName));
570
571     mp::odr odr;
572     Odr_int result_set_size = 0;
573     Z_DiagRecs *z_diag = 0;
574     int no_successful = 0;
575     PackagePtr close_p;
576
577     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
578     {
579         PackagePtr p = (*bit)->m_package;
580         
581         // save closing package for at least one target
582         if (p->session().is_closed()) 
583             close_p = p;
584
585         Z_GDU *gdu = p->response().get();
586         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
587             Z_APDU_searchResponse)
588         {
589             Z_APDU *b_apdu = gdu->u.z3950;
590             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
591             
592             // see we get any errors (AKA diagnstics)
593             if (b_resp->records)
594             {
595                 if (b_resp->records->which == Z_Records_NSD)
596                 {
597                     if (!z_diag)
598                     {
599                         z_diag = (Z_DiagRecs *)
600                             odr_malloc(odr, sizeof(*z_diag));
601                         z_diag->num_diagRecs = 0;
602                         z_diag->diagRecs = (Z_DiagRec**)
603                             odr_malloc(odr, sizeof(*z_diag->diagRecs));
604                     }
605                     else
606                     {
607                         Z_DiagRec **n = (Z_DiagRec **)
608                             odr_malloc(odr,
609                                        (1+z_diag->num_diagRecs) * sizeof(*n));
610                         memcpy(n, z_diag->diagRecs, z_diag->num_diagRecs
611                                * sizeof(*n));
612                         z_diag->diagRecs = n;
613                     }
614                     Z_DiagRec *nr = (Z_DiagRec *) odr_malloc(odr, sizeof(*nr));
615                     nr->which = Z_DiagRec_defaultFormat;
616                     nr->u.defaultFormat =
617                         b_resp->records->u.nonSurrogateDiagnostic;
618                     z_diag->diagRecs[z_diag->num_diagRecs++] = nr;
619                 }
620                 else if (b_resp->records->which == Z_Records_multipleNSD)
621                 {
622                     // we may set this multiple times (TOO BAD!)
623                     z_diag = b_resp->records->u.multipleNonSurDiagnostics;
624                 }
625                 else
626                     no_successful++; // probably piggyback
627             }
628             else
629                 no_successful++;  // no records and no diagnostics
630             BackendSet backendSet;
631             backendSet.m_backend = *bit;
632             backendSet.m_count = *b_resp->resultCount;
633             result_set_size += *b_resp->resultCount;
634             resultSet.m_backend_sets.push_back(backendSet);
635         }
636     }
637
638     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
639     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
640
641     yaz_log(YLOG_LOG, "no_successful=%d is_closed=%s hide_errors=%s",
642             no_successful,
643             close_p ? "true" : "false",
644             m_p->m_hide_errors ? "true" : "false");
645     *f_resp->resultCount = result_set_size;
646     if (close_p && (no_successful == 0 || !m_p->m_hide_errors))
647     {
648         package.session().close();
649         package.response() = close_p->response();
650         return;
651     }
652     if (z_diag && (no_successful == 0 || !m_p->m_hide_errors))
653     {
654         f_resp->records = (Z_Records *)
655             odr_malloc(odr, sizeof(*f_resp->records));
656         if (z_diag->num_diagRecs > 1)
657         {
658             f_resp->records->which = Z_Records_multipleNSD;
659             f_resp->records->u.multipleNonSurDiagnostics = z_diag;
660         }
661         else
662         {
663             f_resp->records->which = Z_Records_NSD;
664             f_resp->records->u.nonSurrogateDiagnostic =
665                 z_diag->diagRecs[0]->u.defaultFormat;
666         }
667     }
668     // assume OK
669     m_sets[resultSet.m_setname] = resultSet;
670
671     Odr_int number;
672     mp::util::piggyback(smallSetUpperBound,
673                         largeSetLowerBound,
674                         mediumSetPresentNumber,
675                         0, 0,
676                         result_set_size,
677                         number, 0);
678     Package pp(package.session(), package.origin());
679     if (z_diag == 0 && number > 0)
680     {
681         pp.copy_filter(package);
682         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
683         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
684         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
685         p_req->resultSetId = req->resultSetName;
686         *p_req->resultSetStartPoint = 1;
687         *p_req->numberOfRecordsRequested = number;
688         pp.request() = p_apdu;
689         present(pp, p_apdu);
690         
691         if (pp.session().is_closed())
692             package.session().close();
693         
694         Z_GDU *gdu = pp.response().get();
695         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
696             Z_APDU_presentResponse)
697         {
698             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
699             f_resp->records = p_res->records;
700             *f_resp->numberOfRecordsReturned = 
701                 *p_res->numberOfRecordsReturned;
702             *f_resp->nextResultSetPosition = 
703                 *p_res->nextResultSetPosition;
704         }
705         else 
706         {
707             package.response() = pp.response(); 
708             return;
709         }
710     }
711     package.response() = f_apdu; // in this scope because of p
712 }
713
714 void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
715 {
716     // create present request 
717     Z_PresentRequest *req = apdu_req->u.presentRequest;
718
719     Sets_it it;
720     it = m_sets.find(std::string(req->resultSetId));
721     if (it == m_sets.end())
722     {
723         mp::odr odr;
724         Z_APDU *apdu = 
725             odr.create_presentResponse(
726                 apdu_req,
727                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
728                 req->resultSetId);
729         package.response() = apdu;
730         return;
731     }
732     std::list<Multi::FrontendSet::PresentJob> jobs;
733     int start = *req->resultSetStartPoint;
734     int number = *req->numberOfRecordsRequested;
735
736     if (m_p->m_merge_type == round_robin)
737         it->second.round_robin(start, number, jobs);
738     else if (m_p->m_merge_type == serve_order)
739         it->second.serve_order(start, number, jobs);
740
741     if (0)
742     {
743         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
744         for (jit = jobs.begin(); jit != jobs.end(); jit++)
745         {
746             yaz_log(YLOG_LOG, "job pos=%d", jit->m_pos);
747         }
748     }
749
750     std::list<BackendPtr> present_backend_list;
751
752     std::list<BackendSet>::const_iterator bsit;
753     bsit = it->second.m_backend_sets.begin();
754     for (; bsit != it->second.m_backend_sets.end(); bsit++)
755     {
756         int start = -1;
757         int end = -1;
758         {
759             std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
760             for (jit = jobs.begin(); jit != jobs.end(); jit++)
761             {
762                 if (jit->m_backend == bsit->m_backend)
763                 {
764                     if (start == -1 || jit->m_pos < start)
765                         start = jit->m_pos;
766                     if (end == -1 || jit->m_pos > end)
767                         end = jit->m_pos;
768                 }
769             }
770         }
771         if (start != -1)
772         {
773             std::list<Multi::FrontendSet::PresentJob>::iterator jit;
774             for (jit = jobs.begin(); jit != jobs.end(); jit++)
775             {
776                 if (jit->m_backend == bsit->m_backend)
777                 {
778                     if (jit->m_pos >= start && jit->m_pos <= end)
779                         jit->m_start = start;
780                 }
781             }
782
783             PackagePtr p = bsit->m_backend->m_package;
784
785             *req->resultSetStartPoint = start;
786             *req->numberOfRecordsRequested = end - start + 1;
787             
788             p->request() = apdu_req;
789             p->copy_filter(package);
790
791             present_backend_list.push_back(bsit->m_backend);
792         }
793     }
794     multi_move(present_backend_list);
795
796     // look at each response
797     Z_Records *z_records_diag = 0;
798
799     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
800     for (; pbit != present_backend_list.end(); pbit++)
801     {
802         PackagePtr p = (*pbit)->m_package;
803         
804         if (p->session().is_closed()) // if any backend closes, close frontend
805             package.session().close();
806         
807         Z_GDU *gdu = p->response().get();
808         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
809             Z_APDU_presentResponse)
810         {
811             Z_APDU *b_apdu = gdu->u.z3950;
812             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
813          
814             // see we get any errors (AKA diagnstics)
815             if (b_resp->records)
816             {
817                 if (b_resp->records->which != Z_Records_DBOSD)
818                     z_records_diag = b_resp->records;
819                 // we may set this multiple times (TOO BAD!)
820             }
821         }
822         else
823         {
824             // if any target does not return present response - return that 
825             package.response() = p->response();
826             return;
827         }
828     }
829
830     mp::odr odr;
831     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
832     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
833
834     if (z_records_diag)
835     {
836         f_resp->records = z_records_diag;
837         *f_resp->presentStatus = Z_PresentStatus_failure;
838     }
839     else if (number < 0 || (size_t) number > jobs.size())
840     {
841         f_apdu = 
842             odr.create_presentResponse(
843                 apdu_req,
844                 YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
845                 0);
846     }
847     else
848     {
849         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
850         Z_Records * records = f_resp->records;
851         records->which = Z_Records_DBOSD;
852         records->u.databaseOrSurDiagnostics =
853             (Z_NamePlusRecordList *)
854             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
855         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
856         nprl->num_records = jobs.size();
857         nprl->records = (Z_NamePlusRecord**)
858             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
859         int i = 0;
860         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
861         for (jit = jobs.begin(); jit != jobs.end(); jit++, i++)
862         {
863             PackagePtr p = jit->m_backend->m_package;
864             
865             Z_GDU *gdu = p->response().get();
866             Z_APDU *b_apdu = gdu->u.z3950;
867             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
868
869             nprl->records[i] = (Z_NamePlusRecord*)
870                 odr_malloc(odr, sizeof(Z_NamePlusRecord));
871             int inside_pos = jit->m_pos - jit->m_start;
872             if (inside_pos >= b_resp->records->
873                 u.databaseOrSurDiagnostics->num_records)
874                 break;
875             *nprl->records[i] = *b_resp->records->
876                 u.databaseOrSurDiagnostics->records[inside_pos];
877             nprl->records[i]->databaseName =
878                     odr_strdup(odr, jit->m_backend->m_vhost.c_str());
879         }
880         nprl->num_records = i; // usually same as jobs.size();
881         *f_resp->nextResultSetPosition = start + i;
882         *f_resp->numberOfRecordsReturned = i;
883     }
884     package.response() = f_apdu;
885 }
886
887 void yf::Multi::Frontend::scan1(mp::Package &package, Z_APDU *apdu_req)
888 {
889     if (m_backend_list.size() > 1)
890     {
891         mp::odr odr;
892         Z_APDU *f_apdu = 
893             odr.create_scanResponse(
894                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
895         package.response() = f_apdu;
896         return;
897     }
898     Z_ScanRequest *req = apdu_req->u.scanRequest;
899
900     int default_num_db = req->num_databaseNames;
901     char **default_db = req->databaseNames;
902
903     std::list<BackendPtr>::const_iterator bit;
904     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
905     {
906         PackagePtr p = (*bit)->m_package;
907         mp::odr odr;
908     
909         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
910                                                 &req->num_databaseNames,
911                                                 &req->databaseNames))
912         {
913             req->num_databaseNames = default_num_db;
914             req->databaseNames = default_db;
915         }
916         p->request() = apdu_req;
917         p->copy_filter(package);
918     }
919     multi_move(m_backend_list);
920
921     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
922     {
923         PackagePtr p = (*bit)->m_package;
924         
925         if (p->session().is_closed()) // if any backend closes, close frontend
926             package.session().close();
927         
928         Z_GDU *gdu = p->response().get();
929         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
930             Z_APDU_scanResponse)
931         {
932             package.response() = p->response();
933             break;
934         }
935         else
936         {
937             // if any target does not return scan response - return that 
938             package.response() = p->response();
939             return;
940         }
941     }
942 }
943
944 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
945 {
946     return m_norm_term < k.m_norm_term;
947 }
948
949 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
950 {
951     return m_norm_term == k.m_norm_term;
952 }
953
954 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
955 {
956     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
957     e->which = Z_Entry_termInfo;
958     Z_TermInfo *t;
959     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
960     t->suggestedAttributes = 0;
961     t->displayTerm = 0;
962     t->alternativeTerm = 0;
963     t->byAttributes = 0;
964     t->otherTermInfo = 0;
965     t->globalOccurrences = odr_intdup(odr, m_count);
966     t->term = (Z_Term *)
967         odr_malloc(odr, sizeof(*t->term));
968     t->term->which = Z_Term_general;
969     Odr_oct *o;
970     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
971
972     o->len = o->size = m_norm_term.size();
973     o->buf = (unsigned char *) odr_malloc(odr, o->len);
974     memcpy(o->buf, m_norm_term.c_str(), o->len);
975     return e;
976 }
977
978 void yf::Multi::Frontend::relay_apdu(mp::Package &package, Z_APDU *apdu_req)
979 {
980     std::list<BackendPtr>::const_iterator bit;
981     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
982     {
983         PackagePtr p = (*bit)->m_package;
984         mp::odr odr;
985     
986         p->request() = apdu_req;
987         p->copy_filter(package);
988     }
989     multi_move(m_backend_list);
990     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
991     {
992         PackagePtr p = (*bit)->m_package;
993         
994         if (p->session().is_closed()) // if any backend closes, close frontend
995             package.session().close();
996         
997         package.response() = p->response();
998     }
999 }
1000
1001
1002 void yf::Multi::Frontend::scan2(mp::Package &package, Z_APDU *apdu_req)
1003 {
1004     Z_ScanRequest *req = apdu_req->u.scanRequest;
1005
1006     int default_num_db = req->num_databaseNames;
1007     char **default_db = req->databaseNames;
1008
1009     std::list<BackendPtr>::const_iterator bit;
1010     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1011     {
1012         PackagePtr p = (*bit)->m_package;
1013         mp::odr odr;
1014     
1015         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
1016                                                 &req->num_databaseNames,
1017                                                 &req->databaseNames))
1018         {
1019             req->num_databaseNames = default_num_db;
1020             req->databaseNames = default_db;
1021         }
1022         p->request() = apdu_req;
1023         p->copy_filter(package);
1024     }
1025     multi_move(m_backend_list);
1026
1027     ScanTermInfoList entries_before;
1028     ScanTermInfoList entries_after;
1029     int no_before = 0;
1030     int no_after = 0;
1031
1032     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
1033     {
1034         PackagePtr p = (*bit)->m_package;
1035         
1036         if (p->session().is_closed()) // if any backend closes, close frontend
1037             package.session().close();
1038         
1039         Z_GDU *gdu = p->response().get();
1040         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1041             Z_APDU_scanResponse)
1042         {
1043             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
1044
1045             if (res->entries && res->entries->nonsurrogateDiagnostics)
1046             {
1047                 // failure
1048                 mp::odr odr;
1049                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
1050                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
1051
1052                 f_res->entries->nonsurrogateDiagnostics = 
1053                     res->entries->nonsurrogateDiagnostics;
1054                 f_res->entries->num_nonsurrogateDiagnostics = 
1055                     res->entries->num_nonsurrogateDiagnostics;
1056
1057                 package.response() = f_apdu;
1058                 return;
1059             }
1060
1061             if (res->entries && res->entries->entries)
1062             {
1063                 Z_Entry **entries = res->entries->entries;
1064                 int num_entries = res->entries->num_entries;
1065                 int position = 1;
1066                 if (req->preferredPositionInResponse)
1067                     position = *req->preferredPositionInResponse;
1068                 if (res->positionOfTerm)
1069                     position = *res->positionOfTerm;
1070
1071                 // before
1072                 int i;
1073                 for (i = 0; i<position-1 && i<num_entries; i++)
1074                 {
1075                     Z_Entry *ent = entries[i];
1076
1077                     if (ent->which == Z_Entry_termInfo)
1078                     {
1079                         ScanTermInfo my;
1080
1081                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1082                         my.m_count = occur ? *occur : 0;
1083
1084                         if (ent->u.termInfo->term->which == Z_Term_general)
1085                         {
1086                             my.m_norm_term = std::string(
1087                                 (const char *)
1088                                 ent->u.termInfo->term->u.general->buf,
1089                                 ent->u.termInfo->term->u.general->len);
1090                         }
1091                         if (my.m_norm_term.length())
1092                         {
1093                             ScanTermInfoList::iterator it = 
1094                                 entries_before.begin();
1095                             while (it != entries_before.end() && my <*it)
1096                                 it++;
1097                             if (my == *it)
1098                             {
1099                                 it->m_count += my.m_count;
1100                             }
1101                             else
1102                             {
1103                                 entries_before.insert(it, my);
1104                                 no_before++;
1105                             }
1106                         }
1107                     }
1108                 }
1109                 // after
1110                 if (position <= 0)
1111                     i = 0;
1112                 else
1113                     i = position-1;
1114                 for ( ; i<num_entries; i++)
1115                 {
1116                     Z_Entry *ent = entries[i];
1117
1118                     if (ent->which == Z_Entry_termInfo)
1119                     {
1120                         ScanTermInfo my;
1121
1122                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1123                         my.m_count = occur ? *occur : 0;
1124
1125                         if (ent->u.termInfo->term->which == Z_Term_general)
1126                         {
1127                             my.m_norm_term = std::string(
1128                                 (const char *)
1129                                 ent->u.termInfo->term->u.general->buf,
1130                                 ent->u.termInfo->term->u.general->len);
1131                         }
1132                         if (my.m_norm_term.length())
1133                         {
1134                             ScanTermInfoList::iterator it = 
1135                                 entries_after.begin();
1136                             while (it != entries_after.end() && *it < my)
1137                                 it++;
1138                             if (my == *it)
1139                             {
1140                                 it->m_count += my.m_count;
1141                             }
1142                             else
1143                             {
1144                                 entries_after.insert(it, my);
1145                                 no_after++;
1146                             }
1147                         }
1148                     }
1149                 }
1150
1151             }                
1152         }
1153         else
1154         {
1155             // if any target does not return scan response - return that 
1156             package.response() = p->response();
1157             return;
1158         }
1159     }
1160
1161     if (false)
1162     {
1163         std::cout << "BEFORE\n";
1164         ScanTermInfoList::iterator it = entries_before.begin();
1165         for(; it != entries_before.end(); it++)
1166         {
1167             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1168         }
1169         
1170         std::cout << "AFTER\n";
1171         it = entries_after.begin();
1172         for(; it != entries_after.end(); it++)
1173         {
1174             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1175         }
1176     }
1177
1178     if (false)
1179     {
1180         mp::odr odr;
1181         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1182         package.response() = f_apdu;
1183     }
1184     else
1185     {
1186         mp::odr odr;
1187         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1188         Z_ScanResponse *resp = f_apdu->u.scanResponse;
1189         
1190         int number_returned = *req->numberOfTermsRequested;
1191         int position_returned = *req->preferredPositionInResponse;
1192         
1193         resp->entries->num_entries = number_returned;
1194         resp->entries->entries = (Z_Entry**)
1195             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1196         int i;
1197
1198         int lbefore = entries_before.size();
1199         if (lbefore < position_returned-1)
1200             position_returned = lbefore+1;
1201
1202         ScanTermInfoList::iterator it = entries_before.begin();
1203         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1204         {
1205             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1206         }
1207
1208         it = entries_after.begin();
1209
1210         if (position_returned <= 0)
1211             i = 0;
1212         else
1213             i = position_returned-1;
1214         for (; i<number_returned && it != entries_after.end(); i++, it++)
1215         {
1216             resp->entries->entries[i] = it->get_entry(odr);
1217         }
1218
1219         number_returned = i;
1220
1221         resp->positionOfTerm = odr_intdup(odr, position_returned);
1222         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1223         resp->entries->num_entries = number_returned;
1224
1225         package.response() = f_apdu;
1226     }
1227 }
1228
1229
1230 void yf::Multi::process(mp::Package &package) const
1231 {
1232     FrontendPtr f = m_p->get_frontend(package);
1233
1234     Z_GDU *gdu = package.request().get();
1235     
1236     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1237         Z_APDU_initRequest && !f->m_is_multi)
1238     {
1239         f->init(package, gdu);
1240     }
1241     else if (!f->m_is_multi)
1242         package.move();
1243     else if (gdu && gdu->which == Z_GDU_Z3950)
1244     {
1245         Z_APDU *apdu = gdu->u.z3950;
1246         if (apdu->which == Z_APDU_initRequest)
1247         {
1248             mp::odr odr;
1249             
1250             package.response() = odr.create_close(
1251                 apdu,
1252                 Z_Close_protocolError,
1253                 "double init");
1254             
1255             package.session().close();
1256         }
1257         else if (apdu->which == Z_APDU_searchRequest)
1258         {
1259             f->search(package, apdu);
1260         }
1261         else if (apdu->which == Z_APDU_presentRequest)
1262         {
1263             f->present(package, apdu);
1264         }
1265         else if (apdu->which == Z_APDU_scanRequest)
1266         {
1267             f->scan2(package, apdu);
1268         }
1269         else if (apdu->which == Z_APDU_close)
1270         {
1271             f->relay_apdu(package, apdu);
1272         }
1273         else
1274         {
1275             mp::odr odr;
1276             
1277             package.response() = odr.create_close(
1278                 apdu, Z_Close_protocolError,
1279                 "unsupported APDU in filter multi");
1280             
1281             package.session().close();
1282         }
1283     }
1284     m_p->release_frontend(package);
1285 }
1286
1287 void mp::filter::Multi::configure(const xmlNode * ptr, bool test_only,
1288                                   const char *path)
1289 {
1290     for (ptr = ptr->children; ptr; ptr = ptr->next)
1291     {
1292         if (ptr->type != XML_ELEMENT_NODE)
1293             continue;
1294         if (!strcmp((const char *) ptr->name, "target"))
1295         {
1296             std::string route = mp::xml::get_route(ptr);
1297             std::string target = mp::xml::get_text(ptr);
1298             if (target.length() == 0)
1299                 target = route;
1300             m_p->m_route_patterns.push_back(Multi::Map(target, route));
1301         }
1302         else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1303         {
1304             m_p->m_hide_unavailable = true;
1305         }
1306         else if (!strcmp((const char *) ptr->name, "hideerrors"))
1307         {
1308             m_p->m_hide_errors = true;
1309         }
1310         else if (!strcmp((const char *) ptr->name, "mergetype"))
1311         {
1312             std::string mergetype = mp::xml::get_text(ptr);
1313             if (mergetype == "roundrobin")
1314                 m_p->m_merge_type = round_robin;
1315             else if (mergetype == "serveorder")
1316                 m_p->m_merge_type = serve_order;
1317             else
1318                 throw mp::filter::FilterException
1319                     ("Bad mergetype "  + mergetype + " in multi filter");
1320
1321         }
1322         else
1323         {
1324             throw mp::filter::FilterException
1325                 ("Bad element " 
1326                  + std::string((const char *) ptr->name)
1327                  + " in multi filter");
1328         }
1329     }
1330 }
1331
1332 static mp::filter::Base* filter_creator()
1333 {
1334     return new mp::filter::Multi;
1335 }
1336
1337 extern "C" {
1338     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1339         0,
1340         "multi",
1341         filter_creator
1342     };
1343 }
1344
1345
1346 /*
1347  * Local variables:
1348  * c-basic-offset: 4
1349  * c-file-style: "Stroustrup"
1350  * indent-tabs-mode: nil
1351  * End:
1352  * vim: shiftwidth=4 tabstop=8 expandtab
1353  */
1354