Fixed bug #948: SRU Request reliably coredumps metaproxy (possibly high
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.26 2007-03-07 22:50:12 adam Exp $
2    Copyright (c) 2005-2007, Index Data.
3
4    See the LICENSE file for details
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <vector>
25 #include <algorithm>
26 #include <map>
27 #include <iostream>
28
29 namespace mp = metaproxy_1;
30 namespace yf = mp::filter;
31
32 namespace metaproxy_1 {
33     namespace filter {
34
35         struct Multi::BackendSet {
36             BackendPtr m_backend;
37             int m_count;
38             bool operator < (const BackendSet &k) const;
39             bool operator == (const BackendSet &k) const;
40         };
41         struct Multi::ScanTermInfo {
42             std::string m_norm_term;
43             std::string m_display_term;
44             int m_count;
45             bool operator < (const ScanTermInfo &) const;
46             bool operator == (const ScanTermInfo &) const;
47             Z_Entry *get_entry(ODR odr);
48         };
49         struct Multi::FrontendSet {
50             struct PresentJob {
51                 BackendPtr m_backend;
52                 int m_pos;
53                 int m_inside_pos;
54             };
55             FrontendSet(std::string setname);
56             FrontendSet();
57             ~FrontendSet();
58
59             void round_robin(int pos, int number, std::list<PresentJob> &job);
60
61             std::list<BackendSet> m_backend_sets;
62             std::string m_setname;
63         };
64         struct Multi::Backend {
65             PackagePtr m_package;
66             std::string m_backend_database;
67             std::string m_vhost;
68             std::string m_route;
69             void operator() (void);  // thread operation
70         };
71         struct Multi::Frontend {
72             Frontend(Rep *rep);
73             ~Frontend();
74             bool m_is_multi;
75             bool m_in_use;
76             std::list<BackendPtr> m_backend_list;
77             std::map<std::string,Multi::FrontendSet> m_sets;
78
79             void multi_move(std::list<BackendPtr> &blist);
80             void init(Package &package, Z_GDU *gdu);
81             void close(Package &package);
82             void search(Package &package, Z_APDU *apdu);
83             void present(Package &package, Z_APDU *apdu);
84             void scan1(Package &package, Z_APDU *apdu);
85             void scan2(Package &package, Z_APDU *apdu);
86             Rep *m_p;
87         };            
88         struct Multi::Map {
89             Map(std::list<std::string> hosts, std::string route);
90             Map();
91             std::list<std::string> m_hosts;
92             std::string m_route;
93         };
94         class Multi::Rep {
95             friend class Multi;
96             friend struct Frontend;
97             
98             Rep();
99             FrontendPtr get_frontend(Package &package);
100             void release_frontend(Package &package);
101         private:
102             std::map<std::string,std::string> m_target_route;
103             boost::mutex m_mutex;
104             boost::condition m_cond_session_ready;
105             std::map<mp::Session, FrontendPtr> m_clients;
106             bool m_hide_unavailable;
107         };
108     }
109 }
110
111 yf::Multi::Rep::Rep()
112 {
113     m_hide_unavailable = false;
114 }
115
116 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
117 {
118     return m_count < k.m_count;
119 }
120
121 yf::Multi::Frontend::Frontend(Rep *rep)
122 {
123     m_p = rep;
124     m_is_multi = false;
125 }
126
127 yf::Multi::Frontend::~Frontend()
128 {
129 }
130
131 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(mp::Package &package)
132 {
133     boost::mutex::scoped_lock lock(m_mutex);
134
135     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
136     
137     while(true)
138     {
139         it = m_clients.find(package.session());
140         if (it == m_clients.end())
141             break;
142         
143         if (!it->second->m_in_use)
144         {
145             it->second->m_in_use = true;
146             return it->second;
147         }
148         m_cond_session_ready.wait(lock);
149     }
150     FrontendPtr f(new Frontend(this));
151     m_clients[package.session()] = f;
152     f->m_in_use = true;
153     return f;
154 }
155
156 void yf::Multi::Rep::release_frontend(mp::Package &package)
157 {
158     boost::mutex::scoped_lock lock(m_mutex);
159     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
160     
161     it = m_clients.find(package.session());
162     if (it != m_clients.end())
163     {
164         if (package.session().is_closed())
165         {
166             it->second->close(package);
167             m_clients.erase(it);
168         }
169         else
170         {
171             it->second->m_in_use = false;
172         }
173         m_cond_session_ready.notify_all();
174     }
175 }
176
177 yf::Multi::FrontendSet::FrontendSet(std::string setname)
178     :  m_setname(setname)
179 {
180 }
181
182
183 yf::Multi::FrontendSet::FrontendSet()
184 {
185 }
186
187
188 yf::Multi::FrontendSet::~FrontendSet()
189 {
190 }
191
192 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
193     : m_hosts(hosts), m_route(route) 
194 {
195 }
196
197 yf::Multi::Map::Map()
198 {
199 }
200
201 yf::Multi::Multi() : m_p(new Multi::Rep)
202 {
203 }
204
205 yf::Multi::~Multi() {
206 }
207
208
209 void yf::Multi::Backend::operator() (void) 
210 {
211     m_package->move(m_route);
212 }
213
214
215 void yf::Multi::Frontend::close(mp::Package &package)
216 {
217     std::list<BackendPtr>::const_iterator bit;
218     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
219     {
220         BackendPtr b = *bit;
221
222         b->m_package->copy_filter(package);
223         b->m_package->request() = (Z_GDU *) 0;
224         b->m_package->session().close();
225         b->m_package->move(b->m_route);
226     }
227 }
228
229 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
230 {
231     std::list<BackendPtr>::const_iterator bit;
232     boost::thread_group g;
233     for (bit = blist.begin(); bit != blist.end(); bit++)
234     {
235         g.add_thread(new boost::thread(**bit));
236     }
237     g.join_all();
238 }
239
240 void yf::Multi::FrontendSet::round_robin(int start, int number,
241                                          std::list<PresentJob> &jobs)
242 {
243     std::list<int> pos;
244     std::list<int> inside_pos;
245     std::list<BackendSet>::const_iterator bsit;
246     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
247     {
248         pos.push_back(1);
249         inside_pos.push_back(0);
250     }
251
252     int p = 1;
253 #if 1
254     // optimization step!
255     int omin = 0;
256     while(true)
257     {
258         int min = 0;
259         int no_left = 0;
260         // find min count for each set which is > omin
261         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
262         {
263             if (bsit->m_count > omin)
264             {
265                 if (no_left == 0 || bsit->m_count < min)
266                     min = bsit->m_count;
267                 no_left++;
268             }
269         }
270         if (no_left == 0) // if nothing greater than omin, bail out.
271             break;
272         int skip = no_left * min;
273         if (p + skip > start)  // step gets us "into" present range?
274         {
275             // Yes. skip until start.. Rounding off is deliberate!
276             min = (start-p) / no_left;
277             p += no_left * min;
278             
279             // update positions in each set..
280             std::list<int>::iterator psit = pos.begin();
281             for (psit = pos.begin(); psit != pos.end(); psit++)
282                 *psit += min;
283             break;
284         }
285         // skip on each set.. before "present range"..
286         p = p + skip;
287         
288         std::cout << "\nSKIP min=" << min << " no_left=" << no_left << "\n\n";
289         
290         std::list<int>::iterator psit = pos.begin();
291         for (psit = pos.begin(); psit != pos.end(); psit++)
292             *psit += min;
293         
294         omin = min; // update so we consider next class (with higher count)
295     }
296 #endif
297     int fetched = 0;
298     bool more = true;
299     while (more)
300     {
301         more = false;
302         std::list<int>::iterator psit = pos.begin();
303         std::list<int>::iterator esit = inside_pos.begin();
304         bsit = m_backend_sets.begin();
305
306         for (; bsit != m_backend_sets.end(); psit++,esit++,bsit++)
307         {
308             if (fetched >= number)
309             {
310                 more = false;
311                 break;
312             }
313             if (*psit <= bsit->m_count)
314             {
315                 if (p >= start)
316                 {
317                     PresentJob job;
318                     job.m_backend = bsit->m_backend;
319                     job.m_pos = *psit;
320                     job.m_inside_pos = *esit;
321                     jobs.push_back(job);
322                     (*esit)++;
323                     fetched++;
324                 }
325                 (*psit)++;
326                 p++;
327                 more = true;
328             }
329         }
330     }
331 }
332
333 void yf::Multi::Frontend::init(mp::Package &package, Z_GDU *gdu)
334 {
335     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
336
337     std::list<std::string> targets;
338
339     mp::util::get_vhost_otherinfo(req->otherInfo, targets);
340
341     if (targets.size() < 1)
342     {
343         package.move();
344         return;
345     }
346
347     std::list<std::string>::const_iterator t_it = targets.begin();
348     for (; t_it != targets.end(); t_it++)
349     {
350         Session s;
351         Backend *b = new Backend;
352         b->m_vhost = *t_it;
353
354         b->m_route = m_p->m_target_route[*t_it];
355         // b->m_route unset
356         b->m_package = PackagePtr(new Package(s, package.origin()));
357
358         m_backend_list.push_back(BackendPtr(b));
359     }
360     m_is_multi = true;
361
362     // create init request 
363     std::list<BackendPtr>::iterator bit;
364     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
365     {
366         mp::odr odr;
367         BackendPtr b = *bit;
368         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
369         
370         std::list<std::string>vhost_one;
371         vhost_one.push_back(b->m_vhost);
372         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
373                                        odr, vhost_one);
374
375         Z_InitRequest *req = init_apdu->u.initRequest;
376         
377         ODR_MASK_SET(req->options, Z_Options_search);
378         ODR_MASK_SET(req->options, Z_Options_present);
379         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
380         ODR_MASK_SET(req->options, Z_Options_scan);
381         
382         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
383         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
384         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
385         
386         b->m_package->request() = init_apdu;
387
388         b->m_package->copy_filter(package);
389     }
390     multi_move(m_backend_list);
391
392     // create the frontend init response based on each backend init response
393     mp::odr odr;
394
395     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
396     Z_InitResponse *f_resp = f_apdu->u.initResponse;
397
398     ODR_MASK_SET(f_resp->options, Z_Options_search);
399     ODR_MASK_SET(f_resp->options, Z_Options_present);
400     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
401     
402     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
403     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
404     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
405
406     int no_failed = 0;
407     int no_succeeded = 0;
408     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); )
409     {
410         PackagePtr p = (*bit)->m_package;
411         
412         if (p->session().is_closed())
413         {
414             // failed. Remove from list and increment number of failed
415             no_failed++;
416             bit = m_backend_list.erase(bit);
417             continue;
418         }
419         no_succeeded++;
420
421         Z_GDU *gdu = p->response().get();
422         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
423             Z_APDU_initResponse)
424         {
425             int i;
426             Z_APDU *b_apdu = gdu->u.z3950;
427             Z_InitResponse *b_resp = b_apdu->u.initResponse;
428
429             // common options for all backends
430             for (i = 0; i <= Z_Options_stringSchema; i++)
431             {
432                 if (!ODR_MASK_GET(b_resp->options, i))
433                     ODR_MASK_CLEAR(f_resp->options, i);
434             }
435             // common protocol version
436             for (i = 0; i <= Z_ProtocolVersion_3; i++)
437                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
438                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
439             // reject if any of the backends reject
440             if (!*b_resp->result)
441                 *f_resp->result = 0;
442         }
443         else
444         {
445             // if any target does not return init return that (close or
446             // similar )
447             package.response() = p->response();
448             return;
449         }
450         bit++;
451     }
452     if (m_p->m_hide_unavailable)
453     {
454         if (no_succeeded == 0)
455             package.session().close();
456     }
457     else
458     {
459         if (no_failed)
460             package.session().close();
461     }
462     package.response() = f_apdu;
463 }
464
465 void yf::Multi::Frontend::search(mp::Package &package, Z_APDU *apdu_req)
466 {
467     // create search request 
468     Z_SearchRequest *req = apdu_req->u.searchRequest;
469
470     // save these for later
471     int smallSetUpperBound = *req->smallSetUpperBound;
472     int largeSetLowerBound = *req->largeSetLowerBound;
473     int mediumSetPresentNumber = *req->mediumSetPresentNumber;
474     
475     // they are altered now - to disable piggyback
476     *req->smallSetUpperBound = 0;
477     *req->largeSetLowerBound = 1;
478     *req->mediumSetPresentNumber = 1;
479
480     int default_num_db = req->num_databaseNames;
481     char **default_db = req->databaseNames;
482
483     std::list<BackendPtr>::const_iterator bit;
484     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
485     {
486         PackagePtr p = (*bit)->m_package;
487         mp::odr odr;
488     
489         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
490                                                 &req->num_databaseNames,
491                                                 &req->databaseNames))
492         {
493             req->num_databaseNames = default_num_db;
494             req->databaseNames = default_db;
495         }
496         p->request() = apdu_req;
497         p->copy_filter(package);
498     }
499     multi_move(m_backend_list);
500
501     // look at each response
502     FrontendSet resultSet(std::string(req->resultSetName));
503
504     int result_set_size = 0;
505     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
506     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
507     {
508         PackagePtr p = (*bit)->m_package;
509         
510         if (p->session().is_closed()) // if any backend closes, close frontend
511             package.session().close();
512         
513         Z_GDU *gdu = p->response().get();
514         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
515             Z_APDU_searchResponse)
516         {
517             Z_APDU *b_apdu = gdu->u.z3950;
518             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
519          
520             // see we get any errors (AKA diagnstics)
521             if (b_resp->records)
522             {
523                 if (b_resp->records->which == Z_Records_NSD
524                     || b_resp->records->which == Z_Records_multipleNSD)
525                     z_records_diag = b_resp->records;
526                 // we may set this multiple times (TOO BAD!)
527             }
528             BackendSet backendSet;
529             backendSet.m_backend = *bit;
530             backendSet.m_count = *b_resp->resultCount;
531             result_set_size += *b_resp->resultCount;
532             resultSet.m_backend_sets.push_back(backendSet);
533         }
534         else
535         {
536             // if any target does not return search response - return that 
537             package.response() = p->response();
538             return;
539         }
540     }
541
542     mp::odr odr;
543     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
544     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
545
546     *f_resp->resultCount = result_set_size;
547     if (z_records_diag)
548     {
549         // search error
550         f_resp->records = z_records_diag;
551         package.response() = f_apdu;
552         return;
553     }
554     // assume OK
555     m_sets[resultSet.m_setname] = resultSet;
556
557     int number;
558     mp::util::piggyback(smallSetUpperBound,
559                          largeSetLowerBound,
560                          mediumSetPresentNumber,
561                          result_set_size,
562                          number);
563     Package pp(package.session(), package.origin());
564     if (number > 0)
565     {
566         pp.copy_filter(package);
567         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
568         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
569         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
570         p_req->resultSetId = req->resultSetName;
571         *p_req->resultSetStartPoint = 1;
572         *p_req->numberOfRecordsRequested = number;
573         pp.request() = p_apdu;
574         present(pp, p_apdu);
575         
576         if (pp.session().is_closed())
577             package.session().close();
578         
579         Z_GDU *gdu = pp.response().get();
580         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
581             Z_APDU_presentResponse)
582         {
583             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
584             f_resp->records = p_res->records;
585             *f_resp->numberOfRecordsReturned = 
586                 *p_res->numberOfRecordsReturned;
587             *f_resp->nextResultSetPosition = 
588                 *p_res->nextResultSetPosition;
589         }
590         else 
591         {
592             package.response() = pp.response(); 
593             return;
594         }
595     }
596     package.response() = f_apdu; // in this scope because of p
597 }
598
599 void yf::Multi::Frontend::present(mp::Package &package, Z_APDU *apdu_req)
600 {
601     // create present request 
602     Z_PresentRequest *req = apdu_req->u.presentRequest;
603
604     Sets_it it;
605     it = m_sets.find(std::string(req->resultSetId));
606     if (it == m_sets.end())
607     {
608         mp::odr odr;
609         Z_APDU *apdu = 
610             odr.create_presentResponse(
611                 apdu_req,
612                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
613                 req->resultSetId);
614         package.response() = apdu;
615         return;
616     }
617     std::list<Multi::FrontendSet::PresentJob> jobs;
618     int start = *req->resultSetStartPoint;
619     int number = *req->numberOfRecordsRequested;
620     it->second.round_robin(start, number, jobs);
621
622     std::list<BackendPtr> present_backend_list;
623
624     std::list<BackendSet>::const_iterator bsit;
625     bsit = it->second.m_backend_sets.begin();
626     for (; bsit != it->second.m_backend_sets.end(); bsit++)
627     {
628         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
629         int start = -1;
630         int end = -1;
631         
632         for (jit = jobs.begin(); jit != jobs.end(); jit++)
633         {
634             if (jit->m_backend == bsit->m_backend)
635             {
636                 if (start == -1 || jit->m_pos < start)
637                     start = jit->m_pos;
638                 if (end == -1 || jit->m_pos > end)
639                     end = jit->m_pos;
640             }
641         }
642         if (start != -1)
643         {
644             PackagePtr p = bsit->m_backend->m_package;
645
646             *req->resultSetStartPoint = start;
647             *req->numberOfRecordsRequested = end - start + 1;
648             
649             p->request() = apdu_req;
650             p->copy_filter(package);
651
652             present_backend_list.push_back(bsit->m_backend);
653         }
654     }
655     multi_move(present_backend_list);
656
657     // look at each response
658     Z_Records *z_records_diag = 0;
659
660     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
661     for (; pbit != present_backend_list.end(); pbit++)
662     {
663         PackagePtr p = (*pbit)->m_package;
664         
665         if (p->session().is_closed()) // if any backend closes, close frontend
666             package.session().close();
667         
668         Z_GDU *gdu = p->response().get();
669         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
670             Z_APDU_presentResponse)
671         {
672             Z_APDU *b_apdu = gdu->u.z3950;
673             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
674          
675             // see we get any errors (AKA diagnstics)
676             if (b_resp->records)
677             {
678                 if (b_resp->records->which != Z_Records_DBOSD)
679                     z_records_diag = b_resp->records;
680                 // we may set this multiple times (TOO BAD!)
681             }
682         }
683         else
684         {
685             // if any target does not return present response - return that 
686             package.response() = p->response();
687             return;
688         }
689     }
690
691     mp::odr odr;
692     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
693     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
694
695     if (z_records_diag)
696     {
697         f_resp->records = z_records_diag;
698         *f_resp->presentStatus = Z_PresentStatus_failure;
699     }
700     else
701     {
702         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
703         Z_Records * records = f_resp->records;
704         records->which = Z_Records_DBOSD;
705         records->u.databaseOrSurDiagnostics =
706             (Z_NamePlusRecordList *)
707             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
708         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
709         nprl->num_records = jobs.size();
710         nprl->records = (Z_NamePlusRecord**)
711             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
712         int i = 0;
713         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
714         for (jit = jobs.begin(); jit != jobs.end(); jit++, i++)
715         {
716             PackagePtr p = jit->m_backend->m_package;
717             
718             Z_GDU *gdu = p->response().get();
719             Z_APDU *b_apdu = gdu->u.z3950;
720             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
721
722             nprl->records[i] = (Z_NamePlusRecord*)
723                 odr_malloc(odr, sizeof(Z_NamePlusRecord));
724             int inside_pos = jit->m_inside_pos;
725             if (inside_pos >= b_resp->records->
726                 u.databaseOrSurDiagnostics->num_records)
727                 break;
728             *nprl->records[i] = *b_resp->records->
729                 u.databaseOrSurDiagnostics->records[inside_pos];
730             nprl->records[i]->databaseName =
731                     odr_strdup(odr, jit->m_backend->m_vhost.c_str());
732         }
733         nprl->num_records = i; // usually same as jobs.size();
734         *f_resp->nextResultSetPosition = start + i;
735         *f_resp->numberOfRecordsReturned = i;
736     }
737     package.response() = f_apdu;
738 }
739
740 void yf::Multi::Frontend::scan1(mp::Package &package, Z_APDU *apdu_req)
741 {
742     if (m_backend_list.size() > 1)
743     {
744         mp::odr odr;
745         Z_APDU *f_apdu = 
746             odr.create_scanResponse(
747                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
748         package.response() = f_apdu;
749         return;
750     }
751     Z_ScanRequest *req = apdu_req->u.scanRequest;
752
753     int default_num_db = req->num_databaseNames;
754     char **default_db = req->databaseNames;
755
756     std::list<BackendPtr>::const_iterator bit;
757     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
758     {
759         PackagePtr p = (*bit)->m_package;
760         mp::odr odr;
761     
762         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
763                                                 &req->num_databaseNames,
764                                                 &req->databaseNames))
765         {
766             req->num_databaseNames = default_num_db;
767             req->databaseNames = default_db;
768         }
769         p->request() = apdu_req;
770         p->copy_filter(package);
771     }
772     multi_move(m_backend_list);
773
774     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
775     {
776         PackagePtr p = (*bit)->m_package;
777         
778         if (p->session().is_closed()) // if any backend closes, close frontend
779             package.session().close();
780         
781         Z_GDU *gdu = p->response().get();
782         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
783             Z_APDU_scanResponse)
784         {
785             package.response() = p->response();
786             break;
787         }
788         else
789         {
790             // if any target does not return scan response - return that 
791             package.response() = p->response();
792             return;
793         }
794     }
795 }
796
797 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
798 {
799     return m_norm_term < k.m_norm_term;
800 }
801
802 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
803 {
804     return m_norm_term == k.m_norm_term;
805 }
806
807 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
808 {
809     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
810     e->which = Z_Entry_termInfo;
811     Z_TermInfo *t;
812     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
813     t->suggestedAttributes = 0;
814     t->displayTerm = 0;
815     t->alternativeTerm = 0;
816     t->byAttributes = 0;
817     t->otherTermInfo = 0;
818     t->globalOccurrences = odr_intdup(odr, m_count);
819     t->term = (Z_Term *)
820         odr_malloc(odr, sizeof(*t->term));
821     t->term->which = Z_Term_general;
822     Odr_oct *o;
823     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
824
825     o->len = o->size = m_norm_term.size();
826     o->buf = (unsigned char *) odr_malloc(odr, o->len);
827     memcpy(o->buf, m_norm_term.c_str(), o->len);
828     return e;
829 }
830
831 void yf::Multi::Frontend::scan2(mp::Package &package, Z_APDU *apdu_req)
832 {
833     Z_ScanRequest *req = apdu_req->u.scanRequest;
834
835     int default_num_db = req->num_databaseNames;
836     char **default_db = req->databaseNames;
837
838     std::list<BackendPtr>::const_iterator bit;
839     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
840     {
841         PackagePtr p = (*bit)->m_package;
842         mp::odr odr;
843     
844         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
845                                                 &req->num_databaseNames,
846                                                 &req->databaseNames))
847         {
848             req->num_databaseNames = default_num_db;
849             req->databaseNames = default_db;
850         }
851         p->request() = apdu_req;
852         p->copy_filter(package);
853     }
854     multi_move(m_backend_list);
855
856     ScanTermInfoList entries_before;
857     ScanTermInfoList entries_after;
858     int no_before = 0;
859     int no_after = 0;
860
861     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
862     {
863         PackagePtr p = (*bit)->m_package;
864         
865         if (p->session().is_closed()) // if any backend closes, close frontend
866             package.session().close();
867         
868         Z_GDU *gdu = p->response().get();
869         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
870             Z_APDU_scanResponse)
871         {
872             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
873
874             if (res->entries && res->entries->nonsurrogateDiagnostics)
875             {
876                 // failure
877                 mp::odr odr;
878                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
879                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
880
881                 f_res->entries->nonsurrogateDiagnostics = 
882                     res->entries->nonsurrogateDiagnostics;
883                 f_res->entries->num_nonsurrogateDiagnostics = 
884                     res->entries->num_nonsurrogateDiagnostics;
885
886                 package.response() = f_apdu;
887                 return;
888             }
889
890             if (res->entries && res->entries->entries)
891             {
892                 Z_Entry **entries = res->entries->entries;
893                 int num_entries = res->entries->num_entries;
894                 int position = 1;
895                 if (req->preferredPositionInResponse)
896                     position = *req->preferredPositionInResponse;
897                 if (res->positionOfTerm)
898                     position = *res->positionOfTerm;
899
900                 // before
901                 int i;
902                 for (i = 0; i<position-1 && i<num_entries; i++)
903                 {
904                     Z_Entry *ent = entries[i];
905
906                     if (ent->which == Z_Entry_termInfo)
907                     {
908                         ScanTermInfo my;
909
910                         int *occur = ent->u.termInfo->globalOccurrences;
911                         my.m_count = occur ? *occur : 0;
912
913                         if (ent->u.termInfo->term->which == Z_Term_general)
914                         {
915                             my.m_norm_term = std::string(
916                                 (const char *)
917                                 ent->u.termInfo->term->u.general->buf,
918                                 ent->u.termInfo->term->u.general->len);
919                         }
920                         if (my.m_norm_term.length())
921                         {
922                             ScanTermInfoList::iterator it = 
923                                 entries_before.begin();
924                             while (it != entries_before.end() && my <*it)
925                                 it++;
926                             if (my == *it)
927                             {
928                                 it->m_count += my.m_count;
929                             }
930                             else
931                             {
932                                 entries_before.insert(it, my);
933                                 no_before++;
934                             }
935                         }
936                     }
937                 }
938                 // after
939                 if (position <= 0)
940                     i = 0;
941                 else
942                     i = position-1;
943                 for ( ; i<num_entries; i++)
944                 {
945                     Z_Entry *ent = entries[i];
946
947                     if (ent->which == Z_Entry_termInfo)
948                     {
949                         ScanTermInfo my;
950
951                         int *occur = ent->u.termInfo->globalOccurrences;
952                         my.m_count = occur ? *occur : 0;
953
954                         if (ent->u.termInfo->term->which == Z_Term_general)
955                         {
956                             my.m_norm_term = std::string(
957                                 (const char *)
958                                 ent->u.termInfo->term->u.general->buf,
959                                 ent->u.termInfo->term->u.general->len);
960                         }
961                         if (my.m_norm_term.length())
962                         {
963                             ScanTermInfoList::iterator it = 
964                                 entries_after.begin();
965                             while (it != entries_after.end() && *it < my)
966                                 it++;
967                             if (my == *it)
968                             {
969                                 it->m_count += my.m_count;
970                             }
971                             else
972                             {
973                                 entries_after.insert(it, my);
974                                 no_after++;
975                             }
976                         }
977                     }
978                 }
979
980             }                
981         }
982         else
983         {
984             // if any target does not return scan response - return that 
985             package.response() = p->response();
986             return;
987         }
988     }
989
990     if (true)
991     {
992         std::cout << "BEFORE\n";
993         ScanTermInfoList::iterator it = entries_before.begin();
994         for(; it != entries_before.end(); it++)
995         {
996             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
997         }
998         
999         std::cout << "AFTER\n";
1000         it = entries_after.begin();
1001         for(; it != entries_after.end(); it++)
1002         {
1003             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
1004         }
1005     }
1006
1007     if (false)
1008     {
1009         mp::odr odr;
1010         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
1011         package.response() = f_apdu;
1012     }
1013     else
1014     {
1015         mp::odr odr;
1016         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
1017         Z_ScanResponse *resp = f_apdu->u.scanResponse;
1018         
1019         int number_returned = *req->numberOfTermsRequested;
1020         int position_returned = *req->preferredPositionInResponse;
1021         
1022         resp->entries->num_entries = number_returned;
1023         resp->entries->entries = (Z_Entry**)
1024             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
1025         int i;
1026
1027         int lbefore = entries_before.size();
1028         if (lbefore < position_returned-1)
1029             position_returned = lbefore+1;
1030
1031         ScanTermInfoList::iterator it = entries_before.begin();
1032         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
1033         {
1034             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1035         }
1036
1037         it = entries_after.begin();
1038
1039         if (position_returned <= 0)
1040             i = 0;
1041         else
1042             i = position_returned-1;
1043         for (; i<number_returned && it != entries_after.end(); i++, it++)
1044         {
1045             resp->entries->entries[i] = it->get_entry(odr);
1046         }
1047
1048         number_returned = i;
1049
1050         resp->positionOfTerm = odr_intdup(odr, position_returned);
1051         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1052         resp->entries->num_entries = number_returned;
1053
1054         package.response() = f_apdu;
1055     }
1056 }
1057
1058
1059 void yf::Multi::process(mp::Package &package) const
1060 {
1061     FrontendPtr f = m_p->get_frontend(package);
1062
1063     Z_GDU *gdu = package.request().get();
1064     
1065     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1066         Z_APDU_initRequest && !f->m_is_multi)
1067     {
1068         f->init(package, gdu);
1069     }
1070     else if (!f->m_is_multi)
1071         package.move();
1072     else if (gdu && gdu->which == Z_GDU_Z3950)
1073     {
1074         Z_APDU *apdu = gdu->u.z3950;
1075         if (apdu->which == Z_APDU_initRequest)
1076         {
1077             mp::odr odr;
1078             
1079             package.response() = odr.create_close(
1080                 apdu,
1081                 Z_Close_protocolError,
1082                 "double init");
1083             
1084             package.session().close();
1085         }
1086         else if (apdu->which == Z_APDU_searchRequest)
1087         {
1088             f->search(package, apdu);
1089         }
1090         else if (apdu->which == Z_APDU_presentRequest)
1091         {
1092             f->present(package, apdu);
1093         }
1094         else if (apdu->which == Z_APDU_scanRequest)
1095         {
1096             f->scan2(package, apdu);
1097         }
1098         else
1099         {
1100             mp::odr odr;
1101             
1102             package.response() = odr.create_close(
1103                 apdu, Z_Close_protocolError,
1104                 "unsupported APDU in filter multi");
1105             
1106             package.session().close();
1107         }
1108     }
1109     m_p->release_frontend(package);
1110 }
1111
1112 void mp::filter::Multi::configure(const xmlNode * ptr)
1113 {
1114     for (ptr = ptr->children; ptr; ptr = ptr->next)
1115     {
1116         if (ptr->type != XML_ELEMENT_NODE)
1117             continue;
1118         if (!strcmp((const char *) ptr->name, "target"))
1119         {
1120             std::string route = mp::xml::get_route(ptr);
1121             std::string target = mp::xml::get_text(ptr);
1122             std::cout << "route=" << route << " target=" << target << "\n";
1123             m_p->m_target_route[target] = route;
1124         }
1125         else if (!strcmp((const char *) ptr->name, "hideunavailable"))
1126         {
1127             m_p->m_hide_unavailable = true;
1128         }
1129         else
1130         {
1131             throw mp::filter::FilterException
1132                 ("Bad element " 
1133                  + std::string((const char *) ptr->name)
1134                  + " in virt_db filter");
1135         }
1136     }
1137 }
1138
1139 static mp::filter::Base* filter_creator()
1140 {
1141     return new mp::filter::Multi;
1142 }
1143
1144 extern "C" {
1145     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1146         0,
1147         "multi",
1148         filter_creator
1149     };
1150 }
1151
1152
1153 /*
1154  * Local variables:
1155  * c-basic-offset: 4
1156  * indent-tabs-mode: nil
1157  * c-file-style: "stroustrup"
1158  * End:
1159  * vim: shiftwidth=4 tabstop=8 expandtab
1160  */