Add path to configure method of filter.
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2011 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include <yaz/log.h>
20
21 #include "config.hpp"
22
23 #include <metaproxy/filter.hpp>
24 #include <metaproxy/package.hpp>
25
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition.hpp>
29 #include <boost/shared_ptr.hpp>
30
31 #include <metaproxy/util.hpp>
32 #include "filter_multi.hpp"
33
34 #include <yaz/zgdu.h>
35 #include <yaz/otherinfo.h>
36 #include <yaz/diagbib1.h>
37 #include <yaz/match_glob.h>
38
39 #include <vector>
40 #include <algorithm>
41 #include <map>
42 #include <iostream>
43
44 namespace mp = metaproxy_1;
45 namespace yf = mp::filter;
46
47 namespace metaproxy_1 {
48     namespace filter {
49         enum multi_merge_type {
50             round_robin,
51             serve_order
52         };
53         struct Multi::BackendSet {
54             BackendPtr m_backend;
55             int m_count;
56             bool operator < (const BackendSet &k) const;
57             bool operator == (const BackendSet &k) const;
58         };
59         struct Multi::ScanTermInfo {
60             std::string m_norm_term;
61             std::string m_display_term;
62             int m_count;
63             bool operator < (const ScanTermInfo &) const;
64             bool operator == (const ScanTermInfo &) const;
65             Z_Entry *get_entry(ODR odr);
66         };
67         struct Multi::FrontendSet {
68             class PresentJob {
69             public:
70                 BackendPtr m_backend;
71                 int m_pos; // position for backend (1=first, 2=second,..
72                 int m_start; // present request start
73                 PresentJob(BackendPtr ptr, int pos) : 
74                     m_backend(ptr), m_pos(pos), m_start(0) {};
75             };
76             FrontendSet(std::string setname);
77             FrontendSet();
78             ~FrontendSet();
79
80             void round_robin(int pos, int number, std::list<PresentJob> &job);
81             void serve_order(int pos, int number, std::list<PresentJob> &job);
82
83             std::list<BackendSet> m_backend_sets;
84             std::string m_setname;
85         };
86         struct Multi::Backend {
87             PackagePtr m_package;
88             std::string m_backend_database;
89             std::string m_vhost;
90             std::string m_route;
91             void operator() (void);  // thread operation
92         };
93         struct Multi::Frontend {
94             Frontend(Rep *rep);
95             ~Frontend();
96             bool m_is_multi;
97             bool m_in_use;
98             std::list<BackendPtr> m_backend_list;
99             std::map<std::string,Multi::FrontendSet> m_sets;
100
101             void multi_move(std::list<BackendPtr> &blist);
102             void init(Package &package, Z_GDU *gdu);
103             void close(Package &package);
104             void search(Package &package, Z_APDU *apdu);
105             void present(Package &package, Z_APDU *apdu);
106             void scan1(Package &package, Z_APDU *apdu);
107             void scan2(Package &package, Z_APDU *apdu);
108             void relay_apdu(Package &package, Z_APDU *apdu);
109             Rep *m_p;
110         };            
111         class Multi::Map {
112             std::string m_target_pattern;
113             std::string m_route;
114         public:
115             Map(std::string pattern, std::string route) : 
116                 m_target_pattern(pattern), m_route(route) {};
117             bool match(const std::string target, std::string *ret) const {
118                 if (yaz_match_glob(m_target_pattern.c_str(), target.c_str()))
119                 {
120                     *ret = m_route;
121                     return true;
122                 }
123                 return false;
124             };
125         };
126         class Multi::Rep {
127             friend class Multi;
128             friend struct Frontend;
129             
130             Rep();
131             FrontendPtr get_frontend(Package &package);
132             void release_frontend(Package &package);
133         private:
134             std::list<Multi::Map> m_route_patterns;
135             boost::mutex m_mutex;
136             boost::condition m_cond_session_ready;
137             std::map<mp::Session, FrontendPtr> m_clients;
138             bool m_hide_unavailable;
139             multi_merge_type m_merge_type;
140         };
141     }
142 }
143
144 yf::Multi::Rep::Rep()
145 {
146     m_hide_unavailable = false;
147     m_merge_type = round_robin;
148 }
149
150 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
151 {
152     return m_count < k.m_count;
153 }
154
155 yf::Multi::Frontend::Frontend(Rep *rep)
156 {
157     m_p = rep;
158     m_is_multi = false;
159 }
160
161 yf::Multi::Frontend::~Frontend()
162 {
163 }
164
165 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
166 {
167     boost::mutex::scoped_lock lock(m_mutex);
168
169     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
170     
171     while(true)
172     {
173         it = m_clients.find(package.session());
174         if (it == m_clients.end())
175             break;
176         
177         if (!it->second->m_in_use)
178         {
179             it->second->m_in_use = true;
180             return it->second;
181         }
182         m_cond_session_ready.wait(lock);
183     }
184     FrontendPtr f(new Frontend(this));
185     m_clients[package.session()] = f;
186     f->m_in_use = true;
187     return f;
188 }
189
190 void yf::Multi::Rep::release_frontend(mp::Package &package)
191 {
192     boost::mutex::scoped_lock lock(m_mutex);
193     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
194     
195     it = m_clients.find(package.session());
196     if (it != m_clients.end())
197     {
198         if (package.session().is_closed())
199         {
200             it->second->close(package);
201             m_clients.erase(it);
202         }
203         else
204         {
205             it->second->m_in_use = false;
206         }
207         m_cond_session_ready.notify_all();
208     }
209 }
210
211 yf::Multi::FrontendSet::FrontendSet(std::string setname)
212     :  m_setname(setname)
213 {
214 }
215
216
217 yf::Multi::FrontendSet::FrontendSet()
218 {
219 }
220
221
222 yf::Multi::FrontendSet::~FrontendSet()
223 {
224 }
225
226 yf::Multi::Multi() : m_p(new Multi::Rep)
227 {
228 }
229
230 yf::Multi::~Multi() {
231 }
232
233
234 void yf::Multi::Backend::operator() (void) 
235 {
236     m_package->move(m_route);
237 }
238
239
240 void yf::Multi::Frontend::close(mp::Package &package)
241 {
242     std::list<BackendPtr>::const_iterator bit;
243     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
244     {
245         BackendPtr b = *bit;
246
247         b->m_package->copy_filter(package);
248         b->m_package->request() = (Z_GDU *) 0;
249         b->m_package->session().close();
250         b->m_package->move(b->m_route);
251     }
252 }
253
254 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
255 {
256     std::list<BackendPtr>::const_iterator bit;
257     boost::thread_group g;
258     for (bit = blist.begin(); bit != blist.end(); bit++)
259     {
260         g.add_thread(new boost::thread(**bit));
261     }
262     g.join_all();
263 }
264
265 void yf::Multi::FrontendSet::serve_order(int start, int number,
266                                          std::list<PresentJob> &jobs)
267 {
268     int i;
269     for (i = 0; i < number; i++)
270     {
271         std::list<BackendSet>::const_iterator bsit;
272         int voffset = 0;
273         int offset = start + i - 1;
274         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); 
275              bsit++)
276         {
277             if (offset >= voffset && offset < voffset + bsit->m_count)
278             {
279                 PresentJob job(bsit->m_backend, offset - voffset + 1);
280                 jobs.push_back(job);
281                 break;
282             }
283             voffset += bsit->m_count;
284         }
285     }
286 }
287
288 void yf::Multi::FrontendSet::round_robin(int start, int number,
289                                          std::list<PresentJob> &jobs)
290 {
291     std::list<int> pos;
292     std::list<BackendSet>::const_iterator bsit;
293     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
294     {
295         pos.push_back(1);
296     }
297
298     int p = 1;
299 #if 1
300     // optimization step!
301     int omin = 0;
302     while(true)
303     {
304         int min = 0;
305         int no_left = 0;
306         // find min count for each set which is > omin
307         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
308         {
309             if (bsit->m_count > omin)
310             {
311                 if (no_left == 0 || bsit->m_count < min)
312                     min = bsit->m_count;
313                 no_left++;
314             }
315         }
316         if (no_left == 0) // if nothing greater than omin, bail out.
317             break;
318         int skip = no_left * min;
319         if (p + skip > start)  // step gets us "into" present range?
320         {
321             // Yes. skip until start.. Rounding off is deliberate!
322             min = (start-p) / no_left;
323             p += no_left * min;
324             
325             // update positions in each set..
326             std::list<int>::iterator psit = pos.begin();
327             for (psit = pos.begin(); psit != pos.end(); psit++)
328                 *psit += min;
329             break;
330         }
331         // skip on each set.. before "present range"..
332         p = p + skip;
333         
334         std::list<int>::iterator psit = pos.begin();
335         for (psit = pos.begin(); psit != pos.end(); psit++)
336             *psit += min;
337         
338         omin = min; // update so we consider next class (with higher count)
339     }
340 #endif
341     int fetched = 0;
342     bool more = true;
343     while (more)
344     {
345         more = false;
346         std::list<int>::iterator psit = pos.begin();
347         bsit = m_backend_sets.begin();
348
349         for (; bsit != m_backend_sets.end(); psit++,bsit++)
350         {
351             if (fetched >= number)
352             {
353                 more = false;
354                 break;
355             }
356             if (*psit <= bsit->m_count)
357             {
358                 if (p >= start)
359                 {
360                     PresentJob job(bsit->m_backend, *psit);
361                     jobs.push_back(job);
362                     fetched++;
363                 }
364                 (*psit)++;
365                 p++;
366                 more = true;
367             }
368         }
369     }
370 }
371
372 void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
373 {
374     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
375
376     std::list<std::string> targets;
377
378     mp::util::get_vhost_otherinfo(req->otherInfo, targets);
379
380     if (targets.size() < 1)
381     {
382         package.move();
383         return;
384     }
385
386     std::list<std::string>::const_iterator t_it = targets.begin();
387     for (; t_it != targets.end(); t_it++)
388     {
389         Session s;
390         Backend *b = new Backend;
391         b->m_vhost = *t_it;
392
393         std::list<Multi::Map>::const_iterator it =
394             m_p->m_route_patterns.begin();
395         while (it != m_p->m_route_patterns.end()) {
396             if (it->match(*t_it, &b->m_route))
397                 break;
398             it++;
399         }
400         // b->m_route = m_p->m_target_route[*t_it];
401         // b->m_route unset
402         b->m_package = PackagePtr(new Package(s, package.origin()));
403
404         m_backend_list.push_back(BackendPtr(b));
405     }
406     m_is_multi = true;
407
408     // create init request 
409     std::list<BackendPtr>::iterator bit;
410     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
411     {
412         mp::odr odr;
413         BackendPtr b = *bit;
414         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
415         
416         std::list<std::string>vhost_one;
417         vhost_one.push_back(b->m_vhost);
418         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
419                                        odr, vhost_one);
420
421
422         Z_InitRequest *breq = init_apdu->u.initRequest;
423
424         breq->idAuthentication = req->idAuthentication;
425         
426         *breq->preferredMessageSize = *req->preferredMessageSize;
427         *breq->maximumRecordSize = *req->maximumRecordSize;
428
429         ODR_MASK_SET(breq->options, Z_Options_search);
430         ODR_MASK_SET(breq->options, Z_Options_present);
431         ODR_MASK_SET(breq->options, Z_Options_namedResultSets);
432         ODR_MASK_SET(breq->options, Z_Options_scan);
433         
434         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_1);
435         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_2);
436         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_3);
437         
438         b->m_package->request() = init_apdu;
439
440         b->m_package->copy_filter(package);
441     }
442     multi_move(m_backend_list);
443
444     // create the frontend init response based on each backend init response
445     mp::odr odr;
446
447     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
448     Z_InitResponse *f_resp = f_apdu->u.initResponse;
449
450     ODR_MASK_SET(f_resp->options, Z_Options_search);
451     ODR_MASK_SET(f_resp->options, Z_Options_present);
452     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
453     ODR_MASK_SET(f_resp->options, Z_Options_scan);
454     
455     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
456     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
457     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
458
459     int no_failed = 0;
460     int no_succeeded = 0;
461
462     Odr_int preferredMessageSize = *req->preferredMessageSize;
463     Odr_int maximumRecordSize = *req->maximumRecordSize;
464     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
465     {
466         PackagePtr p = (*bit)->m_package;
467         
468         if (p->session().is_closed())
469         {
470             // failed. Remove from list and increment number of failed
471             no_failed++;
472             bit = m_backend_list.erase(bit);
473             continue;
474         }
475         Z_GDU *gdu = p->response().get();
476         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
477             Z_APDU_initResponse)
478         {
479             int i;
480             Z_APDU *b_apdu = gdu->u.z3950;
481             Z_InitResponse *b_resp = b_apdu->u.initResponse;
482
483             // common options for all backends
484             for (i = 0; i <= Z_Options_stringSchema; i++)
485             {
486                 if (!ODR_MASK_GET(b_resp->options, i))
487                     ODR_MASK_CLEAR(f_resp->options, i);
488             }
489             // common protocol version
490             for (i = 0; i <= Z_ProtocolVersion_3; i++)
491                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
492                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
493             if (*b_resp->result)
494             {
495                 no_succeeded++;
496                 if (preferredMessageSize > *b_resp->preferredMessageSize)
497                     preferredMessageSize = *b_resp->preferredMessageSize;
498                 if (maximumRecordSize > *b_resp->maximumRecordSize)
499                     maximumRecordSize = *b_resp->maximumRecordSize;
500             }
501             else
502                 no_failed++;
503         }
504         else
505             no_failed++;
506         bit++;
507     }
508     *f_resp->preferredMessageSize = preferredMessageSize;
509     *f_resp->maximumRecordSize = maximumRecordSize;
510
511     if (m_p->m_hide_unavailable)
512     {
513         if (no_succeeded == 0)
514         {
515             *f_resp->result = 0;
516             package.session().close();
517         }
518     }
519     else
520     {
521         if (no_failed)
522         {
523             *f_resp->result = 0;
524             package.session().close();
525         }
526     }
527     package.response() = f_apdu;
528 }
529
530 void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
531 {
532     // create search request 
533     Z_SearchRequest *req = apdu_req->u.searchRequest;
534
535     // save these for later
536     Odr_int smallSetUpperBound = *req->smallSetUpperBound;
537     Odr_int largeSetLowerBound = *req->largeSetLowerBound;
538     Odr_int mediumSetPresentNumber = *req->mediumSetPresentNumber;
539     
540     // they are altered now - to disable piggyback
541     *req->smallSetUpperBound = 0;
542     *req->largeSetLowerBound = 1;
543     *req->mediumSetPresentNumber = 1;
544
545     int default_num_db = req->num_databaseNames;
546     char **default_db = req->databaseNames;
547
548     std::list<BackendPtr>::const_iterator bit;
549     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
550     {
551         PackagePtr p = (*bit)->m_package;
552         mp::odr odr;
553     
554         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
555                                                 &req->num_databaseNames,
556                                                 &req->databaseNames))
557         {
558             req->num_databaseNames = default_num_db;
559             req->databaseNames = default_db;
560         }
561         p->request() = apdu_req;
562         p->copy_filter(package);
563     }
564     multi_move(m_backend_list);
565
566     // look at each response
567     FrontendSet resultSet(std::string(req->resultSetName));
568
569     Odr_int result_set_size = 0;
570     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
571     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
572     {
573         PackagePtr p = (*bit)->m_package;
574         
575         if (p->session().is_closed()) // if any backend closes, close frontend
576             package.session().close();
577         
578         Z_GDU *gdu = p->response().get();
579         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
580             Z_APDU_searchResponse)
581         {
582             Z_APDU *b_apdu = gdu->u.z3950;
583             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
584          
585             // see we get any errors (AKA diagnstics)
586             if (b_resp->records)
587             {
588                 if (b_resp->records->which == Z_Records_NSD
589                     || b_resp->records->which == Z_Records_multipleNSD)
590                     z_records_diag = b_resp->records;
591                 // we may set this multiple times (TOO BAD!)
592             }
593             BackendSet backendSet;
594             backendSet.m_backend = *bit;
595             backendSet.m_count = *b_resp->resultCount;
596             result_set_size += *b_resp->resultCount;
597             resultSet.m_backend_sets.push_back(backendSet);
598         }
599         else
600         {
601             // if any target does not return search response - return that 
602             package.response() = p->response();
603             return;
604         }
605     }
606
607     mp::odr odr;
608     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
609     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
610
611     *f_resp->resultCount = result_set_size;
612     if (z_records_diag)
613     {
614         // search error
615         f_resp->records = z_records_diag;
616         package.response() = f_apdu;
617         return;
618     }
619     // assume OK
620     m_sets[resultSet.m_setname] = resultSet;
621
622     Odr_int number;
623     mp::util::piggyback(smallSetUpperBound,
624                         largeSetLowerBound,
625                         mediumSetPresentNumber,
626                         0, 0,
627                         result_set_size,
628                         number, 0);
629     Package pp(package.session(), package.origin());
630     if (number > 0)
631     {
632         pp.copy_filter(package);
633         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
634         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
635         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
636         p_req->resultSetId = req->resultSetName;
637         *p_req->resultSetStartPoint = 1;
638         *p_req->numberOfRecordsRequested = number;
639         pp.request() = p_apdu;
640         present(pp, p_apdu);
641         
642         if (pp.session().is_closed())
643             package.session().close();
644         
645         Z_GDU *gdu = pp.response().get();
646         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
647             Z_APDU_presentResponse)
648         {
649             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
650             f_resp->records = p_res->records;
651             *f_resp->numberOfRecordsReturned = 
652                 *p_res->numberOfRecordsReturned;
653             *f_resp->nextResultSetPosition = 
654                 *p_res->nextResultSetPosition;
655         }
656         else 
657         {
658             package.response() = pp.response(); 
659             return;
660         }
661     }
662     package.response() = f_apdu; // in this scope because of p
663 }
664
665 void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
666 {
667     // create present request 
668     Z_PresentRequest *req = apdu_req->u.presentRequest;
669
670     Sets_it it;
671     it = m_sets.find(std::string(req->resultSetId));
672     if (it == m_sets.end())
673     {
674         mp::odr odr;
675         Z_APDU *apdu = 
676             odr.create_presentResponse(
677                 apdu_req,
678                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
679                 req->resultSetId);
680         package.response() = apdu;
681         return;
682     }
683     std::list<Multi::FrontendSet::PresentJob> jobs;
684     int start = *req->resultSetStartPoint;
685     int number = *req->numberOfRecordsRequested;
686
687     if (m_p->m_merge_type == round_robin)
688         it->second.round_robin(start, number, jobs);
689     else if (m_p->m_merge_type == serve_order)
690         it->second.serve_order(start, number, jobs);
691
692     if (0)
693     {
694         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
695         for (jit = jobs.begin(); jit != jobs.end(); jit++)
696         {
697             yaz_log(YLOG_LOG, "job pos=%d", jit->m_pos);
698         }
699     }
700
701     std::list<BackendPtr> present_backend_list;
702
703     std::list<BackendSet>::const_iterator bsit;
704     bsit = it->second.m_backend_sets.begin();
705     for (; bsit != it->second.m_backend_sets.end(); bsit++)
706     {
707         int start = -1;
708         int end = -1;
709         {
710             std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
711             for (jit = jobs.begin(); jit != jobs.end(); jit++)
712             {
713                 if (jit->m_backend == bsit->m_backend)
714                 {
715                     if (start == -1 || jit->m_pos < start)
716                         start = jit->m_pos;
717                     if (end == -1 || jit->m_pos > end)
718                         end = jit->m_pos;
719                 }
720             }
721         }
722         if (start != -1)
723         {
724             std::list<Multi::FrontendSet::PresentJob>::iterator jit;
725             for (jit = jobs.begin(); jit != jobs.end(); jit++)
726             {
727                 if (jit->m_backend == bsit->m_backend)
728                 {
729                     if (jit->m_pos >= start && jit->m_pos <= end)
730                         jit->m_start = start;
731                 }
732             }
733
734             PackagePtr p = bsit->m_backend->m_package;
735
736             *req->resultSetStartPoint = start;
737             *req->numberOfRecordsRequested = end - start + 1;
738             
739             p->request() = apdu_req;
740             p->copy_filter(package);
741
742             present_backend_list.push_back(bsit->m_backend);
743         }
744     }
745     multi_move(present_backend_list);
746
747     // look at each response
748     Z_Records *z_records_diag = 0;
749
750     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
751     for (; pbit != present_backend_list.end(); pbit++)
752     {
753         PackagePtr p = (*pbit)->m_package;
754         
755         if (p->session().is_closed()) // if any backend closes, close frontend
756             package.session().close();
757         
758         Z_GDU *gdu = p->response().get();
759         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
760             Z_APDU_presentResponse)
761         {
762             Z_APDU *b_apdu = gdu->u.z3950;
763             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
764          
765             // see we get any errors (AKA diagnstics)
766             if (b_resp->records)
767             {
768                 if (b_resp->records->which != Z_Records_DBOSD)
769                     z_records_diag = b_resp->records;
770                 // we may set this multiple times (TOO BAD!)
771             }
772         }
773         else
774         {
775             // if any target does not return present response - return that 
776             package.response() = p->response();
777             return;
778         }
779     }
780
781     mp::odr odr;
782     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
783     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
784
785     if (z_records_diag)
786     {
787         f_resp->records = z_records_diag;
788         *f_resp->presentStatus = Z_PresentStatus_failure;
789     }
790     else if (number < 0 || (size_t) number > jobs.size())
791     {
792         f_apdu = 
793             odr.create_presentResponse(
794                 apdu_req,
795                 YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
796                 0);
797     }
798     else
799     {
800         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
801         Z_Records * records = f_resp->records;
802         records->which = Z_Records_DBOSD;
803         records->u.databaseOrSurDiagnostics =
804             (Z_NamePlusRecordList *)
805             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
806         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
807         nprl->num_records = jobs.size();
808         nprl->records = (Z_NamePlusRecord**)
809             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
810         int i = 0;
811         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
812         for (jit = jobs.begin(); jit != jobs.end(); jit++, i++)
813         {
814             PackagePtr p = jit->m_backend->m_package;
815             
816             Z_GDU *gdu = p->response().get();
817             Z_APDU *b_apdu = gdu->u.z3950;
818             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
819
820             nprl->records[i] = (Z_NamePlusRecord*)
821                 odr_malloc(odr, sizeof(Z_NamePlusRecord));
822             int inside_pos = jit->m_pos - jit->m_start;
823             if (inside_pos >= b_resp->records->
824                 u.databaseOrSurDiagnostics->num_records)
825                 break;
826             *nprl->records[i] = *b_resp->records->
827                 u.databaseOrSurDiagnostics->records[inside_pos];
828             nprl->records[i]->databaseName =
829                     odr_strdup(odr, jit->m_backend->m_vhost.c_str());
830         }
831         nprl->num_records = i; // usually same as jobs.size();
832         *f_resp->nextResultSetPosition = start + i;
833         *f_resp->numberOfRecordsReturned = i;
834     }
835     package.response() = f_apdu;
836 }
837
838 void yf::Multi::Frontend::scan1(mp::Package &package, Z_APDU *apdu_req)
839 {
840     if (m_backend_list.size() > 1)
841     {
842         mp::odr odr;
843         Z_APDU *f_apdu = 
844             odr.create_scanResponse(
845                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
846         package.response() = f_apdu;
847         return;
848     }
849     Z_ScanRequest *req = apdu_req->u.scanRequest;
850
851     int default_num_db = req->num_databaseNames;
852     char **default_db = req->databaseNames;
853
854     std::list<BackendPtr>::const_iterator bit;
855     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
856     {
857         PackagePtr p = (*bit)->m_package;
858         mp::odr odr;
859     
860         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
861                                                 &req->num_databaseNames,
862                                                 &req->databaseNames))
863         {
864             req->num_databaseNames = default_num_db;
865             req->databaseNames = default_db;
866         }
867         p->request() = apdu_req;
868         p->copy_filter(package);
869     }
870     multi_move(m_backend_list);
871
872     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
873     {
874         PackagePtr p = (*bit)->m_package;
875         
876         if (p->session().is_closed()) // if any backend closes, close frontend
877             package.session().close();
878         
879         Z_GDU *gdu = p->response().get();
880         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
881             Z_APDU_scanResponse)
882         {
883             package.response() = p->response();
884             break;
885         }
886         else
887         {
888             // if any target does not return scan response - return that 
889             package.response() = p->response();
890             return;
891         }
892     }
893 }
894
895 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
896 {
897     return m_norm_term < k.m_norm_term;
898 }
899
900 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
901 {
902     return m_norm_term == k.m_norm_term;
903 }
904
905 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
906 {
907     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
908     e->which = Z_Entry_termInfo;
909     Z_TermInfo *t;
910     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
911     t->suggestedAttributes = 0;
912     t->displayTerm = 0;
913     t->alternativeTerm = 0;
914     t->byAttributes = 0;
915     t->otherTermInfo = 0;
916     t->globalOccurrences = odr_intdup(odr, m_count);
917     t->term = (Z_Term *)
918         odr_malloc(odr, sizeof(*t->term));
919     t->term->which = Z_Term_general;
920     Odr_oct *o;
921     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
922
923     o->len = o->size = m_norm_term.size();
924     o->buf = (unsigned char *) odr_malloc(odr, o->len);
925     memcpy(o->buf, m_norm_term.c_str(), o->len);
926     return e;
927 }
928
929 void yf::Multi::Frontend::relay_apdu(mp::Package &package, Z_APDU *apdu_req)
930 {
931     std::list<BackendPtr>::const_iterator bit;
932     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
933     {
934         PackagePtr p = (*bit)->m_package;
935         mp::odr odr;
936     
937         p->request() = apdu_req;
938         p->copy_filter(package);
939     }
940     multi_move(m_backend_list);
941     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
942     {
943         PackagePtr p = (*bit)->m_package;
944         
945         if (p->session().is_closed()) // if any backend closes, close frontend
946             package.session().close();
947         
948         package.response() = p->response();
949     }
950 }
951
952
953 void yf::Multi::Frontend::scan2(mp::Package &package, Z_APDU *apdu_req)
954 {
955     Z_ScanRequest *req = apdu_req->u.scanRequest;
956
957     int default_num_db = req->num_databaseNames;
958     char **default_db = req->databaseNames;
959
960     std::list<BackendPtr>::const_iterator bit;
961     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
962     {
963         PackagePtr p = (*bit)->m_package;
964         mp::odr odr;
965     
966         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
967                                                 &req->num_databaseNames,
968                                                 &req->databaseNames))
969         {
970             req->num_databaseNames = default_num_db;
971             req->databaseNames = default_db;
972         }
973         p->request() = apdu_req;
974         p->copy_filter(package);
975     }
976     multi_move(m_backend_list);
977
978     ScanTermInfoList entries_before;
979     ScanTermInfoList entries_after;
980     int no_before = 0;
981     int no_after = 0;
982
983     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
984     {
985         PackagePtr p = (*bit)->m_package;
986         
987         if (p->session().is_closed()) // if any backend closes, close frontend
988             package.session().close();
989         
990         Z_GDU *gdu = p->response().get();
991         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
992             Z_APDU_scanResponse)
993         {
994             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
995
996             if (res->entries && res->entries->nonsurrogateDiagnostics)
997             {
998                 // failure
999                 mp::odr odr;
1000                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
1001                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
1002
1003                 f_res->entries->nonsurrogateDiagnostics = 
1004                     res->entries->nonsurrogateDiagnostics;
1005                 f_res->entries->num_nonsurrogateDiagnostics = 
1006                     res->entries->num_nonsurrogateDiagnostics;
1007
1008                 package.response() = f_apdu;
1009                 return;
1010             }
1011
1012             if (res->entries && res->entries->entries)
1013             {
1014                 Z_Entry **entries = res->entries->entries;
1015                 int num_entries = res->entries->num_entries;
1016                 int position = 1;
1017                 if (req->preferredPositionInResponse)
1018                     position = *req->preferredPositionInResponse;
1019                 if (res->positionOfTerm)
1020                     position = *res->positionOfTerm;
1021
1022                 // before
1023                 int i;
1024                 for (i = 0; i<position-1 && i<num_entries; i++)
1025                 {
1026                     Z_Entry *ent = entries[i];
1027
1028                     if (ent->which == Z_Entry_termInfo)
1029                     {
1030                         ScanTermInfo my;
1031
1032                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1033                         my.m_count = occur ? *occur : 0;
1034
1035                         if (ent->u.termInfo->term->which == Z_Term_general)
1036                         {
1037                             my.m_norm_term = std::string(
1038                                 (const char *)
1039                                 ent->u.termInfo->term->u.general->buf,
1040                                 ent->u.termInfo->term->u.general->len);
1041                         }
1042                         if (my.m_norm_term.length())
1043                         {
1044                             ScanTermInfoList::iterator it = 
1045                                 entries_before.begin();
1046                             while (it != entries_before.end() && my <*it)
1047                                 it++;
1048                             if (my == *it)
1049                             {
1050                                 it->m_count += my.m_count;
1051                             }
1052                             else
1053                             {
1054                                 entries_before.insert(it, my);
1055                                 no_before++;
1056                             }
1057                         }
1058                     }
1059                 }
1060                 // after
1061                 if (position <= 0)
1062                     i = 0;
1063                 else
1064                     i = position-1;
1065                 for ( ; i<num_entries; i++)
1066                 {
1067                     Z_Entry *ent = entries[i];
1068
1069                     if (ent->which == Z_Entry_termInfo)
1070                     {
1071                         ScanTermInfo my;
1072
1073                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1074                         my.m_count = occur ? *occur : 0;
1075
1076                         if (ent->u.termInfo->term->which == Z_Term_general)
1077                         {
1078                             my.m_norm_term = std::string(
1079                                 (const char *)
1080                                 ent->u.termInfo->term->u.general->buf,
1081                                 ent->u.termInfo->term->u.general->len);
1082                         }
1083                         if (my.m_norm_term.length())
1084                         {
1085                             ScanTermInfoList::iterator it = 
1086                                 entries_after.begin();
1087                             while (it != entries_after.end() && *it < my)
1088                                 it++;
1089                             if (my == *it)
1090                             {
1091                                 it->m_count += my.m_count;
1092                             }
1093                             else
1094                             {
1095                                 entries_after.insert(it, my);
1096                                 no_after++;
1097                             }
1098                         }
1099                     }
1100                 }
1101
1102             }                
1103         }
1104         else
1105         {
1106             // if any target does not return scan response - return that 
1107             package.response() = p->response();
1108             return;
1109         }
1110     }
1111
1112     if (false)
1113     {
1114         std::cout << "BEFORE\n";
1115         ScanTermInfoList::iterator it = entries_before.begin();
1116         for(; it != entries_before.end(); it++)
1117         {
1118             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1119         }
1120         
1121         std::cout << "AFTER\n";
1122         it = entries_after.begin();
1123         for(; it != entries_after.end(); it++)
1124         {
1125             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1126         }
1127     }
1128
1129     if (false)
1130     {
1131         mp::odr odr;
1132         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1133         package.response() = f_apdu;
1134     }
1135     else
1136     {
1137         mp::odr odr;
1138         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1139         Z_ScanResponse *resp = f_apdu->u.scanResponse;
1140         
1141         int number_returned = *req->numberOfTermsRequested;
1142         int position_returned = *req->preferredPositionInResponse;
1143         
1144         resp->entries->num_entries = number_returned;
1145         resp->entries->entries = (Z_Entry**)
1146             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1147         int i;
1148
1149         int lbefore = entries_before.size();
1150         if (lbefore < position_returned-1)
1151             position_returned = lbefore+1;
1152
1153         ScanTermInfoList::iterator it = entries_before.begin();
1154         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1155         {
1156             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1157         }
1158
1159         it = entries_after.begin();
1160
1161         if (position_returned <= 0)
1162             i = 0;
1163         else
1164             i = position_returned-1;
1165         for (; i<number_returned && it != entries_after.end(); i++, it++)
1166         {
1167             resp->entries->entries[i] = it->get_entry(odr);
1168         }
1169
1170         number_returned = i;
1171
1172         resp->positionOfTerm = odr_intdup(odr, position_returned);
1173         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1174         resp->entries->num_entries = number_returned;
1175
1176         package.response() = f_apdu;
1177     }
1178 }
1179
1180
1181 void yf::Multi::process(mp::Package &package) const
1182 {
1183     FrontendPtr f = m_p->get_frontend(package);
1184
1185     Z_GDU *gdu = package.request().get();
1186     
1187     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1188         Z_APDU_initRequest && !f->m_is_multi)
1189     {
1190         f->init(package, gdu);
1191     }
1192     else if (!f->m_is_multi)
1193         package.move();
1194     else if (gdu && gdu->which == Z_GDU_Z3950)
1195     {
1196         Z_APDU *apdu = gdu->u.z3950;
1197         if (apdu->which == Z_APDU_initRequest)
1198         {
1199             mp::odr odr;
1200             
1201             package.response() = odr.create_close(
1202                 apdu,
1203                 Z_Close_protocolError,
1204                 "double init");
1205             
1206             package.session().close();
1207         }
1208         else if (apdu->which == Z_APDU_searchRequest)
1209         {
1210             f->search(package, apdu);
1211         }
1212         else if (apdu->which == Z_APDU_presentRequest)
1213         {
1214             f->present(package, apdu);
1215         }
1216         else if (apdu->which == Z_APDU_scanRequest)
1217         {
1218             f->scan2(package, apdu);
1219         }
1220         else if (apdu->which == Z_APDU_close)
1221         {
1222             f->relay_apdu(package, apdu);
1223         }
1224         else
1225         {
1226             mp::odr odr;
1227             
1228             package.response() = odr.create_close(
1229                 apdu, Z_Close_protocolError,
1230                 "unsupported APDU in filter multi");
1231             
1232             package.session().close();
1233         }
1234     }
1235     m_p->release_frontend(package);
1236 }
1237
1238 void mp::filter::Multi::configure(const xmlNode * ptr, bool test_only,
1239                                   const char *path)
1240 {
1241     for (ptr = ptr->children; ptr; ptr = ptr->next)
1242     {
1243         if (ptr->type != XML_ELEMENT_NODE)
1244             continue;
1245         if (!strcmp((const char *) ptr->name, "target"))
1246         {
1247             std::string route = mp::xml::get_route(ptr);
1248             std::string target = mp::xml::get_text(ptr);
1249             m_p->m_route_patterns.push_back(Multi::Map(target, route));
1250         }
1251         else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1252         {
1253             m_p->m_hide_unavailable = true;
1254         }
1255         else if (!strcmp((const char *) ptr->name, "mergetype"))
1256         {
1257             std::string mergetype = mp::xml::get_text(ptr);
1258             if (mergetype == "roundrobin")
1259                 m_p->m_merge_type = round_robin;
1260             else if (mergetype == "serveorder")
1261                 m_p->m_merge_type = serve_order;
1262             else
1263                 throw mp::filter::FilterException
1264                     ("Bad mergetype "  + mergetype + " in multi filter");
1265
1266         }
1267         else
1268         {
1269             throw mp::filter::FilterException
1270                 ("Bad element " 
1271                  + std::string((const char *) ptr->name)
1272                  + " in multi filter");
1273         }
1274     }
1275 }
1276
1277 static mp::filter::Base* filter_creator()
1278 {
1279     return new mp::filter::Multi;
1280 }
1281
1282 extern "C" {
1283     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1284         0,
1285         "multi",
1286         filter_creator
1287     };
1288 }
1289
1290
1291 /*
1292  * Local variables:
1293  * c-basic-offset: 4
1294  * c-file-style: "Stroustrup"
1295  * indent-tabs-mode: nil
1296  * End:
1297  * vim: shiftwidth=4 tabstop=8 expandtab
1298  */
1299