GPL v2.
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.27 2007-05-09 21:23:09 adam Exp $
2    Copyright (c) 2005-2007, Index Data.
3
4 This file is part of Metaproxy.
5
6 Metaproxy is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Metaproxy; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #include "config.hpp"
23
24 #include "filter.hpp"
25 #include "package.hpp"
26
27 #include <boost/thread/thread.hpp>
28 #include <boost/thread/mutex.hpp>
29 #include <boost/thread/condition.hpp>
30 #include <boost/shared_ptr.hpp>
31
32 #include "util.hpp"
33 #include "filter_multi.hpp"
34
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38
39 #include <vector>
40 #include <algorithm>
41 #include <map>
42 #include <iostream>
43
44 namespace mp = metaproxy_1;
45 namespace yf = mp::filter;
46
47 namespace metaproxy_1 {
48     namespace filter {
49
50         struct Multi::BackendSet {
51             BackendPtr m_backend;
52             int m_count;
53             bool operator < (const BackendSet &k) const;
54             bool operator == (const BackendSet &k) const;
55         };
56         struct Multi::ScanTermInfo {
57             std::string m_norm_term;
58             std::string m_display_term;
59             int m_count;
60             bool operator < (const ScanTermInfo &) const;
61             bool operator == (const ScanTermInfo &) const;
62             Z_Entry *get_entry(ODR odr);
63         };
64         struct Multi::FrontendSet {
65             struct PresentJob {
66                 BackendPtr m_backend;
67                 int m_pos;
68                 int m_inside_pos;
69             };
70             FrontendSet(std::string setname);
71             FrontendSet();
72             ~FrontendSet();
73
74             void round_robin(int pos, int number, std::list<PresentJob> &job);
75
76             std::list<BackendSet> m_backend_sets;
77             std::string m_setname;
78         };
79         struct Multi::Backend {
80             PackagePtr m_package;
81             std::string m_backend_database;
82             std::string m_vhost;
83             std::string m_route;
84             void operator() (void);  // thread operation
85         };
86         struct Multi::Frontend {
87             Frontend(Rep *rep);
88             ~Frontend();
89             bool m_is_multi;
90             bool m_in_use;
91             std::list<BackendPtr> m_backend_list;
92             std::map<std::string,Multi::FrontendSet> m_sets;
93
94             void multi_move(std::list<BackendPtr> &blist);
95             void init(Package &package, Z_GDU *gdu);
96             void close(Package &package);
97             void search(Package &package, Z_APDU *apdu);
98             void present(Package &package, Z_APDU *apdu);
99             void scan1(Package &package, Z_APDU *apdu);
100             void scan2(Package &package, Z_APDU *apdu);
101             Rep *m_p;
102         };            
103         struct Multi::Map {
104             Map(std::list<std::string> hosts, std::string route);
105             Map();
106             std::list<std::string> m_hosts;
107             std::string m_route;
108         };
109         class Multi::Rep {
110             friend class Multi;
111             friend struct Frontend;
112             
113             Rep();
114             FrontendPtr get_frontend(Package &package);
115             void release_frontend(Package &package);
116         private:
117             std::map<std::string,std::string> m_target_route;
118             boost::mutex m_mutex;
119             boost::condition m_cond_session_ready;
120             std::map<mp::Session, FrontendPtr> m_clients;
121             bool m_hide_unavailable;
122         };
123     }
124 }
125
126 yf::Multi::Rep::Rep()
127 {
128     m_hide_unavailable = false;
129 }
130
131 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
132 {
133     return m_count < k.m_count;
134 }
135
136 yf::Multi::Frontend::Frontend(Rep *rep)
137 {
138     m_p = rep;
139     m_is_multi = false;
140 }
141
142 yf::Multi::Frontend::~Frontend()
143 {
144 }
145
146 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
147 {
148     boost::mutex::scoped_lock lock(m_mutex);
149
150     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
151     
152     while(true)
153     {
154         it = m_clients.find(package.session());
155         if (it == m_clients.end())
156             break;
157         
158         if (!it->second->m_in_use)
159         {
160             it->second->m_in_use = true;
161             return it->second;
162         }
163         m_cond_session_ready.wait(lock);
164     }
165     FrontendPtr f(new Frontend(this));
166     m_clients[package.session()] = f;
167     f->m_in_use = true;
168     return f;
169 }
170
171 void yf::Multi::Rep::release_frontend(mp::Package &package)
172 {
173     boost::mutex::scoped_lock lock(m_mutex);
174     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
175     
176     it = m_clients.find(package.session());
177     if (it != m_clients.end())
178     {
179         if (package.session().is_closed())
180         {
181             it->second->close(package);
182             m_clients.erase(it);
183         }
184         else
185         {
186             it->second->m_in_use = false;
187         }
188         m_cond_session_ready.notify_all();
189     }
190 }
191
192 yf::Multi::FrontendSet::FrontendSet(std::string setname)
193     :  m_setname(setname)
194 {
195 }
196
197
198 yf::Multi::FrontendSet::FrontendSet()
199 {
200 }
201
202
203 yf::Multi::FrontendSet::~FrontendSet()
204 {
205 }
206
207 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
208     : m_hosts(hosts), m_route(route) 
209 {
210 }
211
212 yf::Multi::Map::Map()
213 {
214 }
215
216 yf::Multi::Multi() : m_p(new Multi::Rep)
217 {
218 }
219
220 yf::Multi::~Multi() {
221 }
222
223
224 void yf::Multi::Backend::operator() (void) 
225 {
226     m_package->move(m_route);
227 }
228
229
230 void yf::Multi::Frontend::close(mp::Package &package)
231 {
232     std::list<BackendPtr>::const_iterator bit;
233     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
234     {
235         BackendPtr b = *bit;
236
237         b->m_package->copy_filter(package);
238         b->m_package->request() = (Z_GDU *) 0;
239         b->m_package->session().close();
240         b->m_package->move(b->m_route);
241     }
242 }
243
244 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
245 {
246     std::list<BackendPtr>::const_iterator bit;
247     boost::thread_group g;
248     for (bit = blist.begin(); bit != blist.end(); bit++)
249     {
250         g.add_thread(new boost::thread(**bit));
251     }
252     g.join_all();
253 }
254
255 void yf::Multi::FrontendSet::round_robin(int start, int number,
256                                          std::list<PresentJob> &jobs)
257 {
258     std::list<int> pos;
259     std::list<int> inside_pos;
260     std::list<BackendSet>::const_iterator bsit;
261     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
262     {
263         pos.push_back(1);
264         inside_pos.push_back(0);
265     }
266
267     int p = 1;
268 #if 1
269     // optimization step!
270     int omin = 0;
271     while(true)
272     {
273         int min = 0;
274         int no_left = 0;
275         // find min count for each set which is > omin
276         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
277         {
278             if (bsit->m_count > omin)
279             {
280                 if (no_left == 0 || bsit->m_count < min)
281                     min = bsit->m_count;
282                 no_left++;
283             }
284         }
285         if (no_left == 0) // if nothing greater than omin, bail out.
286             break;
287         int skip = no_left * min;
288         if (p + skip > start)  // step gets us "into" present range?
289         {
290             // Yes. skip until start.. Rounding off is deliberate!
291             min = (start-p) / no_left;
292             p += no_left * min;
293             
294             // update positions in each set..
295             std::list<int>::iterator psit = pos.begin();
296             for (psit = pos.begin(); psit != pos.end(); psit++)
297                 *psit += min;
298             break;
299         }
300         // skip on each set.. before "present range"..
301         p = p + skip;
302         
303         std::cout << "\nSKIP min=" << min << " no_left=" << no_left << "\n\n";
304         
305         std::list<int>::iterator psit = pos.begin();
306         for (psit = pos.begin(); psit != pos.end(); psit++)
307             *psit += min;
308         
309         omin = min; // update so we consider next class (with higher count)
310     }
311 #endif
312     int fetched = 0;
313     bool more = true;
314     while (more)
315     {
316         more = false;
317         std::list<int>::iterator psit = pos.begin();
318         std::list<int>::iterator esit = inside_pos.begin();
319         bsit = m_backend_sets.begin();
320
321         for (; bsit != m_backend_sets.end(); psit++,esit++,bsit++)
322         {
323             if (fetched >= number)
324             {
325                 more = false;
326                 break;
327             }
328             if (*psit <= bsit->m_count)
329             {
330                 if (p >= start)
331                 {
332                     PresentJob job;
333                     job.m_backend = bsit->m_backend;
334                     job.m_pos = *psit;
335                     job.m_inside_pos = *esit;
336                     jobs.push_back(job);
337                     (*esit)++;
338                     fetched++;
339                 }
340                 (*psit)++;
341                 p++;
342                 more = true;
343             }
344         }
345     }
346 }
347
348 void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
349 {
350     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
351
352     std::list<std::string> targets;
353
354     mp::util::get_vhost_otherinfo(req->otherInfo, targets);
355
356     if (targets.size() < 1)
357     {
358         package.move();
359         return;
360     }
361
362     std::list<std::string>::const_iterator t_it = targets.begin();
363     for (; t_it != targets.end(); t_it++)
364     {
365         Session s;
366         Backend *b = new Backend;
367         b->m_vhost = *t_it;
368
369         b->m_route = m_p->m_target_route[*t_it];
370         // b->m_route unset
371         b->m_package = PackagePtr(new Package(s, package.origin()));
372
373         m_backend_list.push_back(BackendPtr(b));
374     }
375     m_is_multi = true;
376
377     // create init request 
378     std::list<BackendPtr>::iterator bit;
379     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
380     {
381         mp::odr odr;
382         BackendPtr b = *bit;
383         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
384         
385         std::list<std::string>vhost_one;
386         vhost_one.push_back(b->m_vhost);
387         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
388                                        odr, vhost_one);
389
390         Z_InitRequest *req = init_apdu->u.initRequest;
391         
392         ODR_MASK_SET(req->options, Z_Options_search);
393         ODR_MASK_SET(req->options, Z_Options_present);
394         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
395         ODR_MASK_SET(req->options, Z_Options_scan);
396         
397         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
398         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
399         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
400         
401         b->m_package->request() = init_apdu;
402
403         b->m_package->copy_filter(package);
404     }
405     multi_move(m_backend_list);
406
407     // create the frontend init response based on each backend init response
408     mp::odr odr;
409
410     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
411     Z_InitResponse *f_resp = f_apdu->u.initResponse;
412
413     ODR_MASK_SET(f_resp->options, Z_Options_search);
414     ODR_MASK_SET(f_resp->options, Z_Options_present);
415     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
416     
417     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
418     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
419     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
420
421     int no_failed = 0;
422     int no_succeeded = 0;
423     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
424     {
425         PackagePtr p = (*bit)->m_package;
426         
427         if (p->session().is_closed())
428         {
429             // failed. Remove from list and increment number of failed
430             no_failed++;
431             bit = m_backend_list.erase(bit);
432             continue;
433         }
434         no_succeeded++;
435
436         Z_GDU *gdu = p->response().get();
437         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
438             Z_APDU_initResponse)
439         {
440             int i;
441             Z_APDU *b_apdu = gdu->u.z3950;
442             Z_InitResponse *b_resp = b_apdu->u.initResponse;
443
444             // common options for all backends
445             for (i = 0; i <= Z_Options_stringSchema; i++)
446             {
447                 if (!ODR_MASK_GET(b_resp->options, i))
448                     ODR_MASK_CLEAR(f_resp->options, i);
449             }
450             // common protocol version
451             for (i = 0; i <= Z_ProtocolVersion_3; i++)
452                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
453                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
454             // reject if any of the backends reject
455             if (!*b_resp->result)
456                 *f_resp->result = 0;
457         }
458         else
459         {
460             // if any target does not return init return that (close or
461             // similar )
462             package.response() = p->response();
463             return;
464         }
465         bit++;
466     }
467     if (m_p->m_hide_unavailable)
468     {
469         if (no_succeeded == 0)
470             package.session().close();
471     }
472     else
473     {
474         if (no_failed)
475             package.session().close();
476     }
477     package.response() = f_apdu;
478 }
479
480 void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
481 {
482     // create search request 
483     Z_SearchRequest *req = apdu_req->u.searchRequest;
484
485     // save these for later
486     int smallSetUpperBound = *req->smallSetUpperBound;
487     int largeSetLowerBound = *req->largeSetLowerBound;
488     int mediumSetPresentNumber = *req->mediumSetPresentNumber;
489     
490     // they are altered now - to disable piggyback
491     *req->smallSetUpperBound = 0;
492     *req->largeSetLowerBound = 1;
493     *req->mediumSetPresentNumber = 1;
494
495     int default_num_db = req->num_databaseNames;
496     char **default_db = req->databaseNames;
497
498     std::list<BackendPtr>::const_iterator bit;
499     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
500     {
501         PackagePtr p = (*bit)->m_package;
502         mp::odr odr;
503     
504         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
505                                                 &req->num_databaseNames,
506                                                 &req->databaseNames))
507         {
508             req->num_databaseNames = default_num_db;
509             req->databaseNames = default_db;
510         }
511         p->request() = apdu_req;
512         p->copy_filter(package);
513     }
514     multi_move(m_backend_list);
515
516     // look at each response
517     FrontendSet resultSet(std::string(req->resultSetName));
518
519     int result_set_size = 0;
520     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
521     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
522     {
523         PackagePtr p = (*bit)->m_package;
524         
525         if (p->session().is_closed()) // if any backend closes, close frontend
526             package.session().close();
527         
528         Z_GDU *gdu = p->response().get();
529         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
530             Z_APDU_searchResponse)
531         {
532             Z_APDU *b_apdu = gdu->u.z3950;
533             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
534          
535             // see we get any errors (AKA diagnstics)
536             if (b_resp->records)
537             {
538                 if (b_resp->records->which == Z_Records_NSD
539                     || b_resp->records->which == Z_Records_multipleNSD)
540                     z_records_diag = b_resp->records;
541                 // we may set this multiple times (TOO BAD!)
542             }
543             BackendSet backendSet;
544             backendSet.m_backend = *bit;
545             backendSet.m_count = *b_resp->resultCount;
546             result_set_size += *b_resp->resultCount;
547             resultSet.m_backend_sets.push_back(backendSet);
548         }
549         else
550         {
551             // if any target does not return search response - return that 
552             package.response() = p->response();
553             return;
554         }
555     }
556
557     mp::odr odr;
558     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
559     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
560
561     *f_resp->resultCount = result_set_size;
562     if (z_records_diag)
563     {
564         // search error
565         f_resp->records = z_records_diag;
566         package.response() = f_apdu;
567         return;
568     }
569     // assume OK
570     m_sets[resultSet.m_setname] = resultSet;
571
572     int number;
573     mp::util::piggyback(smallSetUpperBound,
574                          largeSetLowerBound,
575                          mediumSetPresentNumber,
576                          result_set_size,
577                          number);
578     Package pp(package.session(), package.origin());
579     if (number > 0)
580     {
581         pp.copy_filter(package);
582         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
583         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
584         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
585         p_req->resultSetId = req->resultSetName;
586         *p_req->resultSetStartPoint = 1;
587         *p_req->numberOfRecordsRequested = number;
588         pp.request() = p_apdu;
589         present(pp, p_apdu);
590         
591         if (pp.session().is_closed())
592             package.session().close();
593         
594         Z_GDU *gdu = pp.response().get();
595         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
596             Z_APDU_presentResponse)
597         {
598             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
599             f_resp->records = p_res->records;
600             *f_resp->numberOfRecordsReturned = 
601                 *p_res->numberOfRecordsReturned;
602             *f_resp->nextResultSetPosition = 
603                 *p_res->nextResultSetPosition;
604         }
605         else 
606         {
607             package.response() = pp.response(); 
608             return;
609         }
610     }
611     package.response() = f_apdu; // in this scope because of p
612 }
613
614 void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
615 {
616     // create present request 
617     Z_PresentRequest *req = apdu_req->u.presentRequest;
618
619     Sets_it it;
620     it = m_sets.find(std::string(req->resultSetId));
621     if (it == m_sets.end())
622     {
623         mp::odr odr;
624         Z_APDU *apdu = 
625             odr.create_presentResponse(
626                 apdu_req,
627                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
628                 req->resultSetId);
629         package.response() = apdu;
630         return;
631     }
632     std::list<Multi::FrontendSet::PresentJob> jobs;
633     int start = *req->resultSetStartPoint;
634     int number = *req->numberOfRecordsRequested;
635     it->second.round_robin(start, number, jobs);
636
637     std::list<BackendPtr> present_backend_list;
638
639     std::list<BackendSet>::const_iterator bsit;
640     bsit = it->second.m_backend_sets.begin();
641     for (; bsit != it->second.m_backend_sets.end(); bsit++)
642     {
643         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
644         int start = -1;
645         int end = -1;
646         
647         for (jit = jobs.begin(); jit != jobs.end(); jit++)
648         {
649             if (jit->m_backend == bsit->m_backend)
650             {
651                 if (start == -1 || jit->m_pos < start)
652                     start = jit->m_pos;
653                 if (end == -1 || jit->m_pos > end)
654                     end = jit->m_pos;
655             }
656         }
657         if (start != -1)
658         {
659             PackagePtr p = bsit->m_backend->m_package;
660
661             *req->resultSetStartPoint = start;
662             *req->numberOfRecordsRequested = end - start + 1;
663             
664             p->request() = apdu_req;
665             p->copy_filter(package);
666
667             present_backend_list.push_back(bsit->m_backend);
668         }
669     }
670     multi_move(present_backend_list);
671
672     // look at each response
673     Z_Records *z_records_diag = 0;
674
675     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
676     for (; pbit != present_backend_list.end(); pbit++)
677     {
678         PackagePtr p = (*pbit)->m_package;
679         
680         if (p->session().is_closed()) // if any backend closes, close frontend
681             package.session().close();
682         
683         Z_GDU *gdu = p->response().get();
684         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
685             Z_APDU_presentResponse)
686         {
687             Z_APDU *b_apdu = gdu->u.z3950;
688             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
689          
690             // see we get any errors (AKA diagnstics)
691             if (b_resp->records)
692             {
693                 if (b_resp->records->which != Z_Records_DBOSD)
694                     z_records_diag = b_resp->records;
695                 // we may set this multiple times (TOO BAD!)
696             }
697         }
698         else
699         {
700             // if any target does not return present response - return that 
701             package.response() = p->response();
702             return;
703         }
704     }
705
706     mp::odr odr;
707     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
708     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
709
710     if (z_records_diag)
711     {
712         f_resp->records = z_records_diag;
713         *f_resp->presentStatus = Z_PresentStatus_failure;
714     }
715     else
716     {
717         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
718         Z_Records * records = f_resp->records;
719         records->which = Z_Records_DBOSD;
720         records->u.databaseOrSurDiagnostics =
721             (Z_NamePlusRecordList *)
722             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
723         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
724         nprl->num_records = jobs.size();
725         nprl->records = (Z_NamePlusRecord**)
726             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
727         int i = 0;
728         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
729         for (jit = jobs.begin(); jit != jobs.end(); jit++, i++)
730         {
731             PackagePtr p = jit->m_backend->m_package;
732             
733             Z_GDU *gdu = p->response().get();
734             Z_APDU *b_apdu = gdu->u.z3950;
735             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
736
737             nprl->records[i] = (Z_NamePlusRecord*)
738                 odr_malloc(odr, sizeof(Z_NamePlusRecord));
739             int inside_pos = jit->m_inside_pos;
740             if (inside_pos >= b_resp->records->
741                 u.databaseOrSurDiagnostics->num_records)
742                 break;
743             *nprl->records[i] = *b_resp->records->
744                 u.databaseOrSurDiagnostics->records[inside_pos];
745             nprl->records[i]->databaseName =
746                     odr_strdup(odr, jit->m_backend->m_vhost.c_str());
747         }
748         nprl->num_records = i; // usually same as jobs.size();
749         *f_resp->nextResultSetPosition = start + i;
750         *f_resp->numberOfRecordsReturned = i;
751     }
752     package.response() = f_apdu;
753 }
754
755 void yf::Multi::Frontend::scan1(mp::Package &package, Z_APDU *apdu_req)
756 {
757     if (m_backend_list.size() > 1)
758     {
759         mp::odr odr;
760         Z_APDU *f_apdu = 
761             odr.create_scanResponse(
762                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
763         package.response() = f_apdu;
764         return;
765     }
766     Z_ScanRequest *req = apdu_req->u.scanRequest;
767
768     int default_num_db = req->num_databaseNames;
769     char **default_db = req->databaseNames;
770
771     std::list<BackendPtr>::const_iterator bit;
772     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
773     {
774         PackagePtr p = (*bit)->m_package;
775         mp::odr odr;
776     
777         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
778                                                 &req->num_databaseNames,
779                                                 &req->databaseNames))
780         {
781             req->num_databaseNames = default_num_db;
782             req->databaseNames = default_db;
783         }
784         p->request() = apdu_req;
785         p->copy_filter(package);
786     }
787     multi_move(m_backend_list);
788
789     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
790     {
791         PackagePtr p = (*bit)->m_package;
792         
793         if (p->session().is_closed()) // if any backend closes, close frontend
794             package.session().close();
795         
796         Z_GDU *gdu = p->response().get();
797         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
798             Z_APDU_scanResponse)
799         {
800             package.response() = p->response();
801             break;
802         }
803         else
804         {
805             // if any target does not return scan response - return that 
806             package.response() = p->response();
807             return;
808         }
809     }
810 }
811
812 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
813 {
814     return m_norm_term < k.m_norm_term;
815 }
816
817 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
818 {
819     return m_norm_term == k.m_norm_term;
820 }
821
822 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
823 {
824     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
825     e->which = Z_Entry_termInfo;
826     Z_TermInfo *t;
827     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
828     t->suggestedAttributes = 0;
829     t->displayTerm = 0;
830     t->alternativeTerm = 0;
831     t->byAttributes = 0;
832     t->otherTermInfo = 0;
833     t->globalOccurrences = odr_intdup(odr, m_count);
834     t->term = (Z_Term *)
835         odr_malloc(odr, sizeof(*t->term));
836     t->term->which = Z_Term_general;
837     Odr_oct *o;
838     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
839
840     o->len = o->size = m_norm_term.size();
841     o->buf = (unsigned char *) odr_malloc(odr, o->len);
842     memcpy(o->buf, m_norm_term.c_str(), o->len);
843     return e;
844 }
845
846 void yf::Multi::Frontend::scan2(mp::Package &package, Z_APDU *apdu_req)
847 {
848     Z_ScanRequest *req = apdu_req->u.scanRequest;
849
850     int default_num_db = req->num_databaseNames;
851     char **default_db = req->databaseNames;
852
853     std::list<BackendPtr>::const_iterator bit;
854     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
855     {
856         PackagePtr p = (*bit)->m_package;
857         mp::odr odr;
858     
859         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
860                                                 &req->num_databaseNames,
861                                                 &req->databaseNames))
862         {
863             req->num_databaseNames = default_num_db;
864             req->databaseNames = default_db;
865         }
866         p->request() = apdu_req;
867         p->copy_filter(package);
868     }
869     multi_move(m_backend_list);
870
871     ScanTermInfoList entries_before;
872     ScanTermInfoList entries_after;
873     int no_before = 0;
874     int no_after = 0;
875
876     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
877     {
878         PackagePtr p = (*bit)->m_package;
879         
880         if (p->session().is_closed()) // if any backend closes, close frontend
881             package.session().close();
882         
883         Z_GDU *gdu = p->response().get();
884         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
885             Z_APDU_scanResponse)
886         {
887             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
888
889             if (res->entries && res->entries->nonsurrogateDiagnostics)
890             {
891                 // failure
892                 mp::odr odr;
893                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
894                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
895
896                 f_res->entries->nonsurrogateDiagnostics = 
897                     res->entries->nonsurrogateDiagnostics;
898                 f_res->entries->num_nonsurrogateDiagnostics = 
899                     res->entries->num_nonsurrogateDiagnostics;
900
901                 package.response() = f_apdu;
902                 return;
903             }
904
905             if (res->entries && res->entries->entries)
906             {
907                 Z_Entry **entries = res->entries->entries;
908                 int num_entries = res->entries->num_entries;
909                 int position = 1;
910                 if (req->preferredPositionInResponse)
911                     position = *req->preferredPositionInResponse;
912                 if (res->positionOfTerm)
913                     position = *res->positionOfTerm;
914
915                 // before
916                 int i;
917                 for (i = 0; i<position-1 && i<num_entries; i++)
918                 {
919                     Z_Entry *ent = entries[i];
920
921                     if (ent->which == Z_Entry_termInfo)
922                     {
923                         ScanTermInfo my;
924
925                         int *occur = ent->u.termInfo->globalOccurrences;
926                         my.m_count = occur ? *occur : 0;
927
928                         if (ent->u.termInfo->term->which == Z_Term_general)
929                         {
930                             my.m_norm_term = std::string(
931                                 (const char *)
932                                 ent->u.termInfo->term->u.general->buf,
933                                 ent->u.termInfo->term->u.general->len);
934                         }
935                         if (my.m_norm_term.length())
936                         {
937                             ScanTermInfoList::iterator it = 
938                                 entries_before.begin();
939                             while (it != entries_before.end() && my <*it)
940                                 it++;
941                             if (my == *it)
942                             {
943                                 it->m_count += my.m_count;
944                             }
945                             else
946                             {
947                                 entries_before.insert(it, my);
948                                 no_before++;
949                             }
950                         }
951                     }
952                 }
953                 // after
954                 if (position <= 0)
955                     i = 0;
956                 else
957                     i = position-1;
958                 for ( ; i<num_entries; i++)
959                 {
960                     Z_Entry *ent = entries[i];
961
962                     if (ent->which == Z_Entry_termInfo)
963                     {
964                         ScanTermInfo my;
965
966                         int *occur = ent->u.termInfo->globalOccurrences;
967                         my.m_count = occur ? *occur : 0;
968
969                         if (ent->u.termInfo->term->which == Z_Term_general)
970                         {
971                             my.m_norm_term = std::string(
972                                 (const char *)
973                                 ent->u.termInfo->term->u.general->buf,
974                                 ent->u.termInfo->term->u.general->len);
975                         }
976                         if (my.m_norm_term.length())
977                         {
978                             ScanTermInfoList::iterator it = 
979                                 entries_after.begin();
980                             while (it != entries_after.end() && *it < my)
981                                 it++;
982                             if (my == *it)
983                             {
984                                 it->m_count += my.m_count;
985                             }
986                             else
987                             {
988                                 entries_after.insert(it, my);
989                                 no_after++;
990                             }
991                         }
992                     }
993                 }
994
995             }                
996         }
997         else
998         {
999             // if any target does not return scan response - return that 
1000             package.response() = p->response();
1001             return;
1002         }
1003     }
1004
1005     if (true)
1006     {
1007         std::cout << "BEFORE\n";
1008         ScanTermInfoList::iterator it = entries_before.begin();
1009         for(; it != entries_before.end(); it++)
1010         {
1011             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1012         }
1013         
1014         std::cout << "AFTER\n";
1015         it = entries_after.begin();
1016         for(; it != entries_after.end(); it++)
1017         {
1018             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1019         }
1020     }
1021
1022     if (false)
1023     {
1024         mp::odr odr;
1025         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1026         package.response() = f_apdu;
1027     }
1028     else
1029     {
1030         mp::odr odr;
1031         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1032         Z_ScanResponse *resp = f_apdu->u.scanResponse;
1033         
1034         int number_returned = *req->numberOfTermsRequested;
1035         int position_returned = *req->preferredPositionInResponse;
1036         
1037         resp->entries->num_entries = number_returned;
1038         resp->entries->entries = (Z_Entry**)
1039             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1040         int i;
1041
1042         int lbefore = entries_before.size();
1043         if (lbefore < position_returned-1)
1044             position_returned = lbefore+1;
1045
1046         ScanTermInfoList::iterator it = entries_before.begin();
1047         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1048         {
1049             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1050         }
1051
1052         it = entries_after.begin();
1053
1054         if (position_returned <= 0)
1055             i = 0;
1056         else
1057             i = position_returned-1;
1058         for (; i<number_returned && it != entries_after.end(); i++, it++)
1059         {
1060             resp->entries->entries[i] = it->get_entry(odr);
1061         }
1062
1063         number_returned = i;
1064
1065         resp->positionOfTerm = odr_intdup(odr, position_returned);
1066         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1067         resp->entries->num_entries = number_returned;
1068
1069         package.response() = f_apdu;
1070     }
1071 }
1072
1073
1074 void yf::Multi::process(mp::Package &package) const
1075 {
1076     FrontendPtr f = m_p->get_frontend(package);
1077
1078     Z_GDU *gdu = package.request().get();
1079     
1080     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1081         Z_APDU_initRequest && !f->m_is_multi)
1082     {
1083         f->init(package, gdu);
1084     }
1085     else if (!f->m_is_multi)
1086         package.move();
1087     else if (gdu && gdu->which == Z_GDU_Z3950)
1088     {
1089         Z_APDU *apdu = gdu->u.z3950;
1090         if (apdu->which == Z_APDU_initRequest)
1091         {
1092             mp::odr odr;
1093             
1094             package.response() = odr.create_close(
1095                 apdu,
1096                 Z_Close_protocolError,
1097                 "double init");
1098             
1099             package.session().close();
1100         }
1101         else if (apdu->which == Z_APDU_searchRequest)
1102         {
1103             f->search(package, apdu);
1104         }
1105         else if (apdu->which == Z_APDU_presentRequest)
1106         {
1107             f->present(package, apdu);
1108         }
1109         else if (apdu->which == Z_APDU_scanRequest)
1110         {
1111             f->scan2(package, apdu);
1112         }
1113         else
1114         {
1115             mp::odr odr;
1116             
1117             package.response() = odr.create_close(
1118                 apdu, Z_Close_protocolError,
1119                 "unsupported APDU in filter multi");
1120             
1121             package.session().close();
1122         }
1123     }
1124     m_p->release_frontend(package);
1125 }
1126
1127 void mp::filter::Multi::configure(const xmlNode * ptr)
1128 {
1129     for (ptr = ptr->children; ptr; ptr = ptr->next)
1130     {
1131         if (ptr->type != XML_ELEMENT_NODE)
1132             continue;
1133         if (!strcmp((const char *) ptr->name, "target"))
1134         {
1135             std::string route = mp::xml::get_route(ptr);
1136             std::string target = mp::xml::get_text(ptr);
1137             std::cout << "route=" << route << " target=" << target << "\n";
1138             m_p->m_target_route[target] = route;
1139         }
1140         else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1141         {
1142             m_p->m_hide_unavailable = true;
1143         }
1144         else
1145         {
1146             throw mp::filter::FilterException
1147                 ("Bad element " 
1148                  + std::string((const char *) ptr->name)
1149                  + " in virt_db filter");
1150         }
1151     }
1152 }
1153
1154 static mp::filter::Base* filter_creator()
1155 {
1156     return new mp::filter::Multi;
1157 }
1158
1159 extern "C" {
1160     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1161         0,
1162         "multi",
1163         filter_creator
1164     };
1165 }
1166
1167
1168 /*
1169  * Local variables:
1170  * c-basic-offset: 4
1171  * indent-tabs-mode: nil
1172  * c-file-style: "stroustrup"
1173  * End:
1174  * vim: shiftwidth=4 tabstop=8 expandtab
1175  */