Filter multi: deals with close APDU
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2011 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include <yaz/log.h>
20
21 #include "config.hpp"
22
23 #include <metaproxy/filter.hpp>
24 #include <metaproxy/package.hpp>
25
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition.hpp>
29 #include <boost/shared_ptr.hpp>
30
31 #include <metaproxy/util.hpp>
32 #include "filter_multi.hpp"
33
34 #include <yaz/zgdu.h>
35 #include <yaz/otherinfo.h>
36 #include <yaz/diagbib1.h>
37 #include <yaz/match_glob.h>
38
39 #include <vector>
40 #include <algorithm>
41 #include <map>
42 #include <iostream>
43
44 namespace mp = metaproxy_1;
45 namespace yf = mp::filter;
46
47 namespace metaproxy_1 {
48     namespace filter {
49         enum multi_merge_type {
50             round_robin,
51             serve_order
52         };
53         struct Multi::BackendSet {
54             BackendPtr m_backend;
55             int m_count;
56             bool operator < (const BackendSet &k) const;
57             bool operator == (const BackendSet &k) const;
58         };
59         struct Multi::ScanTermInfo {
60             std::string m_norm_term;
61             std::string m_display_term;
62             int m_count;
63             bool operator < (const ScanTermInfo &) const;
64             bool operator == (const ScanTermInfo &) const;
65             Z_Entry *get_entry(ODR odr);
66         };
67         struct Multi::FrontendSet {
68             class PresentJob {
69             public:
70                 BackendPtr m_backend;
71                 int m_pos; // position for backend (1=first, 2=second,..
72                 int m_start; // present request start
73                 PresentJob(BackendPtr ptr, int pos) : 
74                     m_backend(ptr), m_pos(pos), m_start(0) {};
75             };
76             FrontendSet(std::string setname);
77             FrontendSet();
78             ~FrontendSet();
79
80             void round_robin(int pos, int number, std::list<PresentJob> &job);
81             void serve_order(int pos, int number, std::list<PresentJob> &job);
82
83             std::list<BackendSet> m_backend_sets;
84             std::string m_setname;
85         };
86         struct Multi::Backend {
87             PackagePtr m_package;
88             std::string m_backend_database;
89             std::string m_vhost;
90             std::string m_route;
91             void operator() (void);  // thread operation
92         };
93         struct Multi::Frontend {
94             Frontend(Rep *rep);
95             ~Frontend();
96             bool m_is_multi;
97             bool m_in_use;
98             std::list<BackendPtr> m_backend_list;
99             std::map<std::string,Multi::FrontendSet> m_sets;
100
101             void multi_move(std::list<BackendPtr> &blist);
102             void init(Package &package, Z_GDU *gdu);
103             void close(Package &package);
104             void search(Package &package, Z_APDU *apdu);
105             void present(Package &package, Z_APDU *apdu);
106             void scan1(Package &package, Z_APDU *apdu);
107             void scan2(Package &package, Z_APDU *apdu);
108             void relay_apdu(Package &package, Z_APDU *apdu);
109             Rep *m_p;
110         };            
111         class Multi::Map {
112             std::string m_target_pattern;
113             std::string m_route;
114         public:
115             Map(std::string pattern, std::string route) : 
116                 m_target_pattern(pattern), m_route(route) {};
117             bool match(const std::string target, std::string *ret) const {
118                 if (yaz_match_glob(m_target_pattern.c_str(), target.c_str()))
119                 {
120                     *ret = m_route;
121                     return true;
122                 }
123                 return false;
124             };
125         };
126         class Multi::Rep {
127             friend class Multi;
128             friend struct Frontend;
129             
130             Rep();
131             FrontendPtr get_frontend(Package &package);
132             void release_frontend(Package &package);
133         private:
134             std::list<Multi::Map> m_route_patterns;
135             boost::mutex m_mutex;
136             boost::condition m_cond_session_ready;
137             std::map<mp::Session, FrontendPtr> m_clients;
138             bool m_hide_unavailable;
139             multi_merge_type m_merge_type;
140         };
141     }
142 }
143
144 yf::Multi::Rep::Rep()
145 {
146     m_hide_unavailable = false;
147     m_merge_type = round_robin;
148 }
149
150 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
151 {
152     return m_count < k.m_count;
153 }
154
155 yf::Multi::Frontend::Frontend(Rep *rep)
156 {
157     m_p = rep;
158     m_is_multi = false;
159 }
160
161 yf::Multi::Frontend::~Frontend()
162 {
163 }
164
165 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
166 {
167     boost::mutex::scoped_lock lock(m_mutex);
168
169     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
170     
171     while(true)
172     {
173         it = m_clients.find(package.session());
174         if (it == m_clients.end())
175             break;
176         
177         if (!it->second->m_in_use)
178         {
179             it->second->m_in_use = true;
180             return it->second;
181         }
182         m_cond_session_ready.wait(lock);
183     }
184     FrontendPtr f(new Frontend(this));
185     m_clients[package.session()] = f;
186     f->m_in_use = true;
187     return f;
188 }
189
190 void yf::Multi::Rep::release_frontend(mp::Package &package)
191 {
192     boost::mutex::scoped_lock lock(m_mutex);
193     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
194     
195     it = m_clients.find(package.session());
196     if (it != m_clients.end())
197     {
198         if (package.session().is_closed())
199         {
200             it->second->close(package);
201             m_clients.erase(it);
202         }
203         else
204         {
205             it->second->m_in_use = false;
206         }
207         m_cond_session_ready.notify_all();
208     }
209 }
210
211 yf::Multi::FrontendSet::FrontendSet(std::string setname)
212     :  m_setname(setname)
213 {
214 }
215
216
217 yf::Multi::FrontendSet::FrontendSet()
218 {
219 }
220
221
222 yf::Multi::FrontendSet::~FrontendSet()
223 {
224 }
225
226 yf::Multi::Multi() : m_p(new Multi::Rep)
227 {
228 }
229
230 yf::Multi::~Multi() {
231 }
232
233
234 void yf::Multi::Backend::operator() (void) 
235 {
236     m_package->move(m_route);
237 }
238
239
240 void yf::Multi::Frontend::close(mp::Package &package)
241 {
242     std::list<BackendPtr>::const_iterator bit;
243     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
244     {
245         BackendPtr b = *bit;
246
247         b->m_package->copy_filter(package);
248         b->m_package->request() = (Z_GDU *) 0;
249         b->m_package->session().close();
250         b->m_package->move(b->m_route);
251     }
252 }
253
254 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
255 {
256     std::list<BackendPtr>::const_iterator bit;
257     boost::thread_group g;
258     for (bit = blist.begin(); bit != blist.end(); bit++)
259     {
260         g.add_thread(new boost::thread(**bit));
261     }
262     g.join_all();
263 }
264
265 void yf::Multi::FrontendSet::serve_order(int start, int number,
266                                          std::list<PresentJob> &jobs)
267 {
268     int i;
269     for (i = 0; i < number; i++)
270     {
271         std::list<BackendSet>::const_iterator bsit;
272         int voffset = 0;
273         int offset = start + i - 1;
274         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); 
275              bsit++)
276         {
277             if (offset >= voffset && offset < voffset + bsit->m_count)
278             {
279                 PresentJob job(bsit->m_backend, offset - voffset + 1);
280                 jobs.push_back(job);
281                 break;
282             }
283             voffset += bsit->m_count;
284         }
285     }
286 }
287
288 void yf::Multi::FrontendSet::round_robin(int start, int number,
289                                          std::list<PresentJob> &jobs)
290 {
291     std::list<int> pos;
292     std::list<BackendSet>::const_iterator bsit;
293     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
294     {
295         pos.push_back(1);
296     }
297
298     int p = 1;
299 #if 1
300     // optimization step!
301     int omin = 0;
302     while(true)
303     {
304         int min = 0;
305         int no_left = 0;
306         // find min count for each set which is > omin
307         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
308         {
309             if (bsit->m_count > omin)
310             {
311                 if (no_left == 0 || bsit->m_count < min)
312                     min = bsit->m_count;
313                 no_left++;
314             }
315         }
316         if (no_left == 0) // if nothing greater than omin, bail out.
317             break;
318         int skip = no_left * min;
319         if (p + skip > start)  // step gets us "into" present range?
320         {
321             // Yes. skip until start.. Rounding off is deliberate!
322             min = (start-p) / no_left;
323             p += no_left * min;
324             
325             // update positions in each set..
326             std::list<int>::iterator psit = pos.begin();
327             for (psit = pos.begin(); psit != pos.end(); psit++)
328                 *psit += min;
329             break;
330         }
331         // skip on each set.. before "present range"..
332         p = p + skip;
333         
334         std::list<int>::iterator psit = pos.begin();
335         for (psit = pos.begin(); psit != pos.end(); psit++)
336             *psit += min;
337         
338         omin = min; // update so we consider next class (with higher count)
339     }
340 #endif
341     int fetched = 0;
342     bool more = true;
343     while (more)
344     {
345         more = false;
346         std::list<int>::iterator psit = pos.begin();
347         bsit = m_backend_sets.begin();
348
349         for (; bsit != m_backend_sets.end(); psit++,bsit++)
350         {
351             if (fetched >= number)
352             {
353                 more = false;
354                 break;
355             }
356             if (*psit <= bsit->m_count)
357             {
358                 if (p >= start)
359                 {
360                     PresentJob job(bsit->m_backend, *psit);
361                     jobs.push_back(job);
362                     fetched++;
363                 }
364                 (*psit)++;
365                 p++;
366                 more = true;
367             }
368         }
369     }
370 }
371
372 void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
373 {
374     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
375
376     std::list<std::string> targets;
377
378     mp::util::get_vhost_otherinfo(req->otherInfo, targets);
379
380     if (targets.size() < 1)
381     {
382         package.move();
383         return;
384     }
385
386     std::list<std::string>::const_iterator t_it = targets.begin();
387     for (; t_it != targets.end(); t_it++)
388     {
389         Session s;
390         Backend *b = new Backend;
391         b->m_vhost = *t_it;
392
393         std::list<Multi::Map>::const_iterator it =
394             m_p->m_route_patterns.begin();
395         while (it != m_p->m_route_patterns.end()) {
396             if (it->match(*t_it, &b->m_route))
397                 break;
398             it++;
399         }
400         // b->m_route = m_p->m_target_route[*t_it];
401         // b->m_route unset
402         b->m_package = PackagePtr(new Package(s, package.origin()));
403
404         m_backend_list.push_back(BackendPtr(b));
405     }
406     m_is_multi = true;
407
408     // create init request 
409     std::list<BackendPtr>::iterator bit;
410     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
411     {
412         mp::odr odr;
413         BackendPtr b = *bit;
414         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
415         
416         std::list<std::string>vhost_one;
417         vhost_one.push_back(b->m_vhost);
418         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
419                                        odr, vhost_one);
420
421
422         Z_InitRequest *breq = init_apdu->u.initRequest;
423
424         breq->idAuthentication = req->idAuthentication;
425         
426         *breq->preferredMessageSize = *req->preferredMessageSize;
427         *breq->maximumRecordSize = *req->maximumRecordSize;
428
429         ODR_MASK_SET(breq->options, Z_Options_search);
430         ODR_MASK_SET(breq->options, Z_Options_present);
431         ODR_MASK_SET(breq->options, Z_Options_namedResultSets);
432         ODR_MASK_SET(breq->options, Z_Options_scan);
433         
434         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_1);
435         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_2);
436         ODR_MASK_SET(breq->protocolVersion, Z_ProtocolVersion_3);
437         
438         b->m_package->request() = init_apdu;
439
440         b->m_package->copy_filter(package);
441     }
442     multi_move(m_backend_list);
443
444     // create the frontend init response based on each backend init response
445     mp::odr odr;
446
447     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
448     Z_InitResponse *f_resp = f_apdu->u.initResponse;
449
450     ODR_MASK_SET(f_resp->options, Z_Options_search);
451     ODR_MASK_SET(f_resp->options, Z_Options_present);
452     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
453     ODR_MASK_SET(f_resp->options, Z_Options_scan);
454     
455     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
456     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
457     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
458
459     int no_failed = 0;
460     int no_succeeded = 0;
461
462     Odr_int preferredMessageSize = *req->preferredMessageSize;
463     Odr_int maximumRecordSize = *req->maximumRecordSize;
464     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
465     {
466         PackagePtr p = (*bit)->m_package;
467         
468         if (p->session().is_closed())
469         {
470             // failed. Remove from list and increment number of failed
471             no_failed++;
472             bit = m_backend_list.erase(bit);
473             continue;
474         }
475         Z_GDU *gdu = p->response().get();
476         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
477             Z_APDU_initResponse)
478         {
479             int i;
480             Z_APDU *b_apdu = gdu->u.z3950;
481             Z_InitResponse *b_resp = b_apdu->u.initResponse;
482
483             // common options for all backends
484             for (i = 0; i <= Z_Options_stringSchema; i++)
485             {
486                 if (!ODR_MASK_GET(b_resp->options, i))
487                     ODR_MASK_CLEAR(f_resp->options, i);
488             }
489             // common protocol version
490             for (i = 0; i <= Z_ProtocolVersion_3; i++)
491                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
492                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
493             if (*b_resp->result)
494             {
495                 no_succeeded++;
496                 if (preferredMessageSize > *b_resp->preferredMessageSize)
497                     preferredMessageSize = *b_resp->preferredMessageSize;
498                 if (maximumRecordSize > *b_resp->maximumRecordSize)
499                     maximumRecordSize = *b_resp->maximumRecordSize;
500             }
501             else
502                 no_failed++;
503         }
504         else
505             no_failed++;
506         bit++;
507     }
508     *f_resp->preferredMessageSize = preferredMessageSize;
509     *f_resp->maximumRecordSize = maximumRecordSize;
510
511     if (m_p->m_hide_unavailable)
512     {
513         if (no_succeeded == 0)
514         {
515             *f_resp->result = 0;
516             package.session().close();
517         }
518     }
519     else
520     {
521         if (no_failed)
522         {
523             *f_resp->result = 0;
524             package.session().close();
525         }
526     }
527     package.response() = f_apdu;
528 }
529
530 void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
531 {
532     // create search request 
533     Z_SearchRequest *req = apdu_req->u.searchRequest;
534
535     // save these for later
536     int smallSetUpperBound = *req->smallSetUpperBound;
537     int largeSetLowerBound = *req->largeSetLowerBound;
538     int mediumSetPresentNumber = *req->mediumSetPresentNumber;
539     
540     // they are altered now - to disable piggyback
541     *req->smallSetUpperBound = 0;
542     *req->largeSetLowerBound = 1;
543     *req->mediumSetPresentNumber = 1;
544
545     int default_num_db = req->num_databaseNames;
546     char **default_db = req->databaseNames;
547
548     std::list<BackendPtr>::const_iterator bit;
549     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
550     {
551         PackagePtr p = (*bit)->m_package;
552         mp::odr odr;
553     
554         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
555                                                 &req->num_databaseNames,
556                                                 &req->databaseNames))
557         {
558             req->num_databaseNames = default_num_db;
559             req->databaseNames = default_db;
560         }
561         p->request() = apdu_req;
562         p->copy_filter(package);
563     }
564     multi_move(m_backend_list);
565
566     // look at each response
567     FrontendSet resultSet(std::string(req->resultSetName));
568
569     int result_set_size = 0;
570     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
571     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
572     {
573         PackagePtr p = (*bit)->m_package;
574         
575         if (p->session().is_closed()) // if any backend closes, close frontend
576             package.session().close();
577         
578         Z_GDU *gdu = p->response().get();
579         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
580             Z_APDU_searchResponse)
581         {
582             Z_APDU *b_apdu = gdu->u.z3950;
583             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
584          
585             // see we get any errors (AKA diagnstics)
586             if (b_resp->records)
587             {
588                 if (b_resp->records->which == Z_Records_NSD
589                     || b_resp->records->which == Z_Records_multipleNSD)
590                     z_records_diag = b_resp->records;
591                 // we may set this multiple times (TOO BAD!)
592             }
593             BackendSet backendSet;
594             backendSet.m_backend = *bit;
595             backendSet.m_count = *b_resp->resultCount;
596             result_set_size += *b_resp->resultCount;
597             resultSet.m_backend_sets.push_back(backendSet);
598         }
599         else
600         {
601             // if any target does not return search response - return that 
602             package.response() = p->response();
603             return;
604         }
605     }
606
607     mp::odr odr;
608     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
609     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
610
611     *f_resp->resultCount = result_set_size;
612     if (z_records_diag)
613     {
614         // search error
615         f_resp->records = z_records_diag;
616         package.response() = f_apdu;
617         return;
618     }
619     // assume OK
620     m_sets[resultSet.m_setname] = resultSet;
621
622     int number;
623     mp::util::piggyback(smallSetUpperBound,
624                          largeSetLowerBound,
625                          mediumSetPresentNumber,
626                          result_set_size,
627                          number);
628     Package pp(package.session(), package.origin());
629     if (number > 0)
630     {
631         pp.copy_filter(package);
632         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
633         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
634         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
635         p_req->resultSetId = req->resultSetName;
636         *p_req->resultSetStartPoint = 1;
637         *p_req->numberOfRecordsRequested = number;
638         pp.request() = p_apdu;
639         present(pp, p_apdu);
640         
641         if (pp.session().is_closed())
642             package.session().close();
643         
644         Z_GDU *gdu = pp.response().get();
645         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
646             Z_APDU_presentResponse)
647         {
648             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
649             f_resp->records = p_res->records;
650             *f_resp->numberOfRecordsReturned = 
651                 *p_res->numberOfRecordsReturned;
652             *f_resp->nextResultSetPosition = 
653                 *p_res->nextResultSetPosition;
654         }
655         else 
656         {
657             package.response() = pp.response(); 
658             return;
659         }
660     }
661     package.response() = f_apdu; // in this scope because of p
662 }
663
664 void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
665 {
666     // create present request 
667     Z_PresentRequest *req = apdu_req->u.presentRequest;
668
669     Sets_it it;
670     it = m_sets.find(std::string(req->resultSetId));
671     if (it == m_sets.end())
672     {
673         mp::odr odr;
674         Z_APDU *apdu = 
675             odr.create_presentResponse(
676                 apdu_req,
677                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
678                 req->resultSetId);
679         package.response() = apdu;
680         return;
681     }
682     std::list<Multi::FrontendSet::PresentJob> jobs;
683     int start = *req->resultSetStartPoint;
684     int number = *req->numberOfRecordsRequested;
685
686     if (m_p->m_merge_type == round_robin)
687         it->second.round_robin(start, number, jobs);
688     else if (m_p->m_merge_type == serve_order)
689         it->second.serve_order(start, number, jobs);
690
691     if (0)
692     {
693         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
694         for (jit = jobs.begin(); jit != jobs.end(); jit++)
695         {
696             yaz_log(YLOG_LOG, "job pos=%d", jit->m_pos);
697         }
698     }
699
700     std::list<BackendPtr> present_backend_list;
701
702     std::list<BackendSet>::const_iterator bsit;
703     bsit = it->second.m_backend_sets.begin();
704     for (; bsit != it->second.m_backend_sets.end(); bsit++)
705     {
706         int start = -1;
707         int end = -1;
708         {
709             std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
710             for (jit = jobs.begin(); jit != jobs.end(); jit++)
711             {
712                 if (jit->m_backend == bsit->m_backend)
713                 {
714                     if (start == -1 || jit->m_pos < start)
715                         start = jit->m_pos;
716                     if (end == -1 || jit->m_pos > end)
717                         end = jit->m_pos;
718                 }
719             }
720         }
721         if (start != -1)
722         {
723             std::list<Multi::FrontendSet::PresentJob>::iterator jit;
724             for (jit = jobs.begin(); jit != jobs.end(); jit++)
725             {
726                 if (jit->m_backend == bsit->m_backend)
727                 {
728                     if (jit->m_pos >= start && jit->m_pos <= end)
729                         jit->m_start = start;
730                 }
731             }
732
733             PackagePtr p = bsit->m_backend->m_package;
734
735             *req->resultSetStartPoint = start;
736             *req->numberOfRecordsRequested = end - start + 1;
737             
738             p->request() = apdu_req;
739             p->copy_filter(package);
740
741             present_backend_list.push_back(bsit->m_backend);
742         }
743     }
744     multi_move(present_backend_list);
745
746     // look at each response
747     Z_Records *z_records_diag = 0;
748
749     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
750     for (; pbit != present_backend_list.end(); pbit++)
751     {
752         PackagePtr p = (*pbit)->m_package;
753         
754         if (p->session().is_closed()) // if any backend closes, close frontend
755             package.session().close();
756         
757         Z_GDU *gdu = p->response().get();
758         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
759             Z_APDU_presentResponse)
760         {
761             Z_APDU *b_apdu = gdu->u.z3950;
762             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
763          
764             // see we get any errors (AKA diagnstics)
765             if (b_resp->records)
766             {
767                 if (b_resp->records->which != Z_Records_DBOSD)
768                     z_records_diag = b_resp->records;
769                 // we may set this multiple times (TOO BAD!)
770             }
771         }
772         else
773         {
774             // if any target does not return present response - return that 
775             package.response() = p->response();
776             return;
777         }
778     }
779
780     mp::odr odr;
781     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
782     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
783
784     if (z_records_diag)
785     {
786         f_resp->records = z_records_diag;
787         *f_resp->presentStatus = Z_PresentStatus_failure;
788     }
789     else if (number < 0 || (size_t) number > jobs.size())
790     {
791         f_apdu = 
792             odr.create_presentResponse(
793                 apdu_req,
794                 YAZ_BIB1_PRESENT_REQUEST_OUT_OF_RANGE,
795                 0);
796     }
797     else
798     {
799         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
800         Z_Records * records = f_resp->records;
801         records->which = Z_Records_DBOSD;
802         records->u.databaseOrSurDiagnostics =
803             (Z_NamePlusRecordList *)
804             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
805         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
806         nprl->num_records = jobs.size();
807         nprl->records = (Z_NamePlusRecord**)
808             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
809         int i = 0;
810         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
811         for (jit = jobs.begin(); jit != jobs.end(); jit++, i++)
812         {
813             PackagePtr p = jit->m_backend->m_package;
814             
815             Z_GDU *gdu = p->response().get();
816             Z_APDU *b_apdu = gdu->u.z3950;
817             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
818
819             nprl->records[i] = (Z_NamePlusRecord*)
820                 odr_malloc(odr, sizeof(Z_NamePlusRecord));
821             int inside_pos = jit->m_pos - jit->m_start;
822             if (inside_pos >= b_resp->records->
823                 u.databaseOrSurDiagnostics->num_records)
824                 break;
825             *nprl->records[i] = *b_resp->records->
826                 u.databaseOrSurDiagnostics->records[inside_pos];
827             nprl->records[i]->databaseName =
828                     odr_strdup(odr, jit->m_backend->m_vhost.c_str());
829         }
830         nprl->num_records = i; // usually same as jobs.size();
831         *f_resp->nextResultSetPosition = start + i;
832         *f_resp->numberOfRecordsReturned = i;
833     }
834     package.response() = f_apdu;
835 }
836
837 void yf::Multi::Frontend::scan1(mp::Package &package, Z_APDU *apdu_req)
838 {
839     if (m_backend_list.size() > 1)
840     {
841         mp::odr odr;
842         Z_APDU *f_apdu = 
843             odr.create_scanResponse(
844                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
845         package.response() = f_apdu;
846         return;
847     }
848     Z_ScanRequest *req = apdu_req->u.scanRequest;
849
850     int default_num_db = req->num_databaseNames;
851     char **default_db = req->databaseNames;
852
853     std::list<BackendPtr>::const_iterator bit;
854     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
855     {
856         PackagePtr p = (*bit)->m_package;
857         mp::odr odr;
858     
859         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
860                                                 &req->num_databaseNames,
861                                                 &req->databaseNames))
862         {
863             req->num_databaseNames = default_num_db;
864             req->databaseNames = default_db;
865         }
866         p->request() = apdu_req;
867         p->copy_filter(package);
868     }
869     multi_move(m_backend_list);
870
871     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
872     {
873         PackagePtr p = (*bit)->m_package;
874         
875         if (p->session().is_closed()) // if any backend closes, close frontend
876             package.session().close();
877         
878         Z_GDU *gdu = p->response().get();
879         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
880             Z_APDU_scanResponse)
881         {
882             package.response() = p->response();
883             break;
884         }
885         else
886         {
887             // if any target does not return scan response - return that 
888             package.response() = p->response();
889             return;
890         }
891     }
892 }
893
894 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
895 {
896     return m_norm_term < k.m_norm_term;
897 }
898
899 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
900 {
901     return m_norm_term == k.m_norm_term;
902 }
903
904 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
905 {
906     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
907     e->which = Z_Entry_termInfo;
908     Z_TermInfo *t;
909     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
910     t->suggestedAttributes = 0;
911     t->displayTerm = 0;
912     t->alternativeTerm = 0;
913     t->byAttributes = 0;
914     t->otherTermInfo = 0;
915     t->globalOccurrences = odr_intdup(odr, m_count);
916     t->term = (Z_Term *)
917         odr_malloc(odr, sizeof(*t->term));
918     t->term->which = Z_Term_general;
919     Odr_oct *o;
920     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
921
922     o->len = o->size = m_norm_term.size();
923     o->buf = (unsigned char *) odr_malloc(odr, o->len);
924     memcpy(o->buf, m_norm_term.c_str(), o->len);
925     return e;
926 }
927
928 void yf::Multi::Frontend::relay_apdu(mp::Package &package, Z_APDU *apdu_req)
929 {
930     std::list<BackendPtr>::const_iterator bit;
931     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
932     {
933         PackagePtr p = (*bit)->m_package;
934         mp::odr odr;
935     
936         p->request() = apdu_req;
937         p->copy_filter(package);
938     }
939     multi_move(m_backend_list);
940     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
941     {
942         PackagePtr p = (*bit)->m_package;
943         
944         if (p->session().is_closed()) // if any backend closes, close frontend
945             package.session().close();
946         
947         package.response() = p->response();
948     }
949 }
950
951
952 void yf::Multi::Frontend::scan2(mp::Package &package, Z_APDU *apdu_req)
953 {
954     Z_ScanRequest *req = apdu_req->u.scanRequest;
955
956     int default_num_db = req->num_databaseNames;
957     char **default_db = req->databaseNames;
958
959     std::list<BackendPtr>::const_iterator bit;
960     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
961     {
962         PackagePtr p = (*bit)->m_package;
963         mp::odr odr;
964     
965         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
966                                                 &req->num_databaseNames,
967                                                 &req->databaseNames))
968         {
969             req->num_databaseNames = default_num_db;
970             req->databaseNames = default_db;
971         }
972         p->request() = apdu_req;
973         p->copy_filter(package);
974     }
975     multi_move(m_backend_list);
976
977     ScanTermInfoList entries_before;
978     ScanTermInfoList entries_after;
979     int no_before = 0;
980     int no_after = 0;
981
982     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
983     {
984         PackagePtr p = (*bit)->m_package;
985         
986         if (p->session().is_closed()) // if any backend closes, close frontend
987             package.session().close();
988         
989         Z_GDU *gdu = p->response().get();
990         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
991             Z_APDU_scanResponse)
992         {
993             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
994
995             if (res->entries && res->entries->nonsurrogateDiagnostics)
996             {
997                 // failure
998                 mp::odr odr;
999                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
1000                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
1001
1002                 f_res->entries->nonsurrogateDiagnostics = 
1003                     res->entries->nonsurrogateDiagnostics;
1004                 f_res->entries->num_nonsurrogateDiagnostics = 
1005                     res->entries->num_nonsurrogateDiagnostics;
1006
1007                 package.response() = f_apdu;
1008                 return;
1009             }
1010
1011             if (res->entries && res->entries->entries)
1012             {
1013                 Z_Entry **entries = res->entries->entries;
1014                 int num_entries = res->entries->num_entries;
1015                 int position = 1;
1016                 if (req->preferredPositionInResponse)
1017                     position = *req->preferredPositionInResponse;
1018                 if (res->positionOfTerm)
1019                     position = *res->positionOfTerm;
1020
1021                 // before
1022                 int i;
1023                 for (i = 0; i<position-1 && i<num_entries; i++)
1024                 {
1025                     Z_Entry *ent = entries[i];
1026
1027                     if (ent->which == Z_Entry_termInfo)
1028                     {
1029                         ScanTermInfo my;
1030
1031                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1032                         my.m_count = occur ? *occur : 0;
1033
1034                         if (ent->u.termInfo->term->which == Z_Term_general)
1035                         {
1036                             my.m_norm_term = std::string(
1037                                 (const char *)
1038                                 ent->u.termInfo->term->u.general->buf,
1039                                 ent->u.termInfo->term->u.general->len);
1040                         }
1041                         if (my.m_norm_term.length())
1042                         {
1043                             ScanTermInfoList::iterator it = 
1044                                 entries_before.begin();
1045                             while (it != entries_before.end() && my <*it)
1046                                 it++;
1047                             if (my == *it)
1048                             {
1049                                 it->m_count += my.m_count;
1050                             }
1051                             else
1052                             {
1053                                 entries_before.insert(it, my);
1054                                 no_before++;
1055                             }
1056                         }
1057                     }
1058                 }
1059                 // after
1060                 if (position <= 0)
1061                     i = 0;
1062                 else
1063                     i = position-1;
1064                 for ( ; i<num_entries; i++)
1065                 {
1066                     Z_Entry *ent = entries[i];
1067
1068                     if (ent->which == Z_Entry_termInfo)
1069                     {
1070                         ScanTermInfo my;
1071
1072                         Odr_int *occur = ent->u.termInfo->globalOccurrences;
1073                         my.m_count = occur ? *occur : 0;
1074
1075                         if (ent->u.termInfo->term->which == Z_Term_general)
1076                         {
1077                             my.m_norm_term = std::string(
1078                                 (const char *)
1079                                 ent->u.termInfo->term->u.general->buf,
1080                                 ent->u.termInfo->term->u.general->len);
1081                         }
1082                         if (my.m_norm_term.length())
1083                         {
1084                             ScanTermInfoList::iterator it = 
1085                                 entries_after.begin();
1086                             while (it != entries_after.end() && *it < my)
1087                                 it++;
1088                             if (my == *it)
1089                             {
1090                                 it->m_count += my.m_count;
1091                             }
1092                             else
1093                             {
1094                                 entries_after.insert(it, my);
1095                                 no_after++;
1096                             }
1097                         }
1098                     }
1099                 }
1100
1101             }                
1102         }
1103         else
1104         {
1105             // if any target does not return scan response - return that 
1106             package.response() = p->response();
1107             return;
1108         }
1109     }
1110
1111     if (false)
1112     {
1113         std::cout << "BEFORE\n";
1114         ScanTermInfoList::iterator it = entries_before.begin();
1115         for(; it != entries_before.end(); it++)
1116         {
1117             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1118         }
1119         
1120         std::cout << "AFTER\n";
1121         it = entries_after.begin();
1122         for(; it != entries_after.end(); it++)
1123         {
1124             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1125         }
1126     }
1127
1128     if (false)
1129     {
1130         mp::odr odr;
1131         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1132         package.response() = f_apdu;
1133     }
1134     else
1135     {
1136         mp::odr odr;
1137         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1138         Z_ScanResponse *resp = f_apdu->u.scanResponse;
1139         
1140         int number_returned = *req->numberOfTermsRequested;
1141         int position_returned = *req->preferredPositionInResponse;
1142         
1143         resp->entries->num_entries = number_returned;
1144         resp->entries->entries = (Z_Entry**)
1145             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1146         int i;
1147
1148         int lbefore = entries_before.size();
1149         if (lbefore < position_returned-1)
1150             position_returned = lbefore+1;
1151
1152         ScanTermInfoList::iterator it = entries_before.begin();
1153         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1154         {
1155             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1156         }
1157
1158         it = entries_after.begin();
1159
1160         if (position_returned <= 0)
1161             i = 0;
1162         else
1163             i = position_returned-1;
1164         for (; i<number_returned && it != entries_after.end(); i++, it++)
1165         {
1166             resp->entries->entries[i] = it->get_entry(odr);
1167         }
1168
1169         number_returned = i;
1170
1171         resp->positionOfTerm = odr_intdup(odr, position_returned);
1172         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1173         resp->entries->num_entries = number_returned;
1174
1175         package.response() = f_apdu;
1176     }
1177 }
1178
1179
1180 void yf::Multi::process(mp::Package &package) const
1181 {
1182     FrontendPtr f = m_p->get_frontend(package);
1183
1184     Z_GDU *gdu = package.request().get();
1185     
1186     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1187         Z_APDU_initRequest && !f->m_is_multi)
1188     {
1189         f->init(package, gdu);
1190     }
1191     else if (!f->m_is_multi)
1192         package.move();
1193     else if (gdu && gdu->which == Z_GDU_Z3950)
1194     {
1195         Z_APDU *apdu = gdu->u.z3950;
1196         if (apdu->which == Z_APDU_initRequest)
1197         {
1198             mp::odr odr;
1199             
1200             package.response() = odr.create_close(
1201                 apdu,
1202                 Z_Close_protocolError,
1203                 "double init");
1204             
1205             package.session().close();
1206         }
1207         else if (apdu->which == Z_APDU_searchRequest)
1208         {
1209             f->search(package, apdu);
1210         }
1211         else if (apdu->which == Z_APDU_presentRequest)
1212         {
1213             f->present(package, apdu);
1214         }
1215         else if (apdu->which == Z_APDU_scanRequest)
1216         {
1217             f->scan2(package, apdu);
1218         }
1219         else if (apdu->which == Z_APDU_close)
1220         {
1221             f->relay_apdu(package, apdu);
1222         }
1223         else
1224         {
1225             mp::odr odr;
1226             
1227             package.response() = odr.create_close(
1228                 apdu, Z_Close_protocolError,
1229                 "unsupported APDU in filter multi");
1230             
1231             package.session().close();
1232         }
1233     }
1234     m_p->release_frontend(package);
1235 }
1236
1237 void mp::filter::Multi::configure(const xmlNode * ptr, bool test_only)
1238 {
1239     for (ptr = ptr->children; ptr; ptr = ptr->next)
1240     {
1241         if (ptr->type != XML_ELEMENT_NODE)
1242             continue;
1243         if (!strcmp((const char *) ptr->name, "target"))
1244         {
1245             std::string route = mp::xml::get_route(ptr);
1246             std::string target = mp::xml::get_text(ptr);
1247             m_p->m_route_patterns.push_back(Multi::Map(target, route));
1248         }
1249         else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1250         {
1251             m_p->m_hide_unavailable = true;
1252         }
1253         else if (!strcmp((const char *) ptr->name, "mergetype"))
1254         {
1255             std::string mergetype = mp::xml::get_text(ptr);
1256             if (mergetype == "roundrobin")
1257                 m_p->m_merge_type = round_robin;
1258             else if (mergetype == "serveorder")
1259                 m_p->m_merge_type = serve_order;
1260             else
1261                 throw mp::filter::FilterException
1262                     ("Bad mergetype "  + mergetype + " in multi filter");
1263
1264         }
1265         else
1266         {
1267             throw mp::filter::FilterException
1268                 ("Bad element " 
1269                  + std::string((const char *) ptr->name)
1270                  + " in multi filter");
1271         }
1272     }
1273 }
1274
1275 static mp::filter::Base* filter_creator()
1276 {
1277     return new mp::filter::Multi;
1278 }
1279
1280 extern "C" {
1281     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1282         0,
1283         "multi",
1284         filter_creator
1285     };
1286 }
1287
1288
1289 /*
1290  * Local variables:
1291  * c-basic-offset: 4
1292  * c-file-style: "Stroustrup"
1293  * indent-tabs-mode: nil
1294  * End:
1295  * vim: shiftwidth=4 tabstop=8 expandtab
1296  */
1297