Removed unused code in filter_multi
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.17 2006-05-15 13:22:02 adam Exp $
2    Copyright (c) 2005-2006, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <vector>
25 #include <algorithm>
26 #include <map>
27 #include <iostream>
28
29 namespace mp = metaproxy_1;
30 namespace yf = mp::filter;
31
32 namespace metaproxy_1 {
33     namespace filter {
34
35         struct Multi::BackendSet {
36             BackendPtr m_backend;
37             int m_count;
38             bool operator < (const BackendSet &k) const;
39             bool operator == (const BackendSet &k) const;
40         };
41         struct Multi::ScanTermInfo {
42             std::string m_norm_term;
43             std::string m_display_term;
44             int m_count;
45             bool operator < (const ScanTermInfo &) const;
46             bool operator == (const ScanTermInfo &) const;
47             Z_Entry *get_entry(ODR odr);
48         };
49         struct Multi::FrontendSet {
50             struct PresentJob {
51                 BackendPtr m_backend;
52                 int m_pos;
53                 int m_inside_pos;
54             };
55             FrontendSet(std::string setname);
56             FrontendSet();
57             ~FrontendSet();
58
59             void round_robin(int pos, int number, std::list<PresentJob> &job);
60
61             std::list<BackendSet> m_backend_sets;
62             std::string m_setname;
63         };
64         struct Multi::Backend {
65             PackagePtr m_package;
66             std::string m_backend_database;
67             std::string m_vhost;
68             std::string m_route;
69             void operator() (void);  // thread operation
70         };
71         struct Multi::Frontend {
72             Frontend(Rep *rep);
73             ~Frontend();
74             bool m_is_multi;
75             bool m_in_use;
76             std::list<BackendPtr> m_backend_list;
77             std::map<std::string,Multi::FrontendSet> m_sets;
78
79             void multi_move(std::list<BackendPtr> &blist);
80             void init(Package &package, Z_GDU *gdu);
81             void close(Package &package);
82             void search(Package &package, Z_APDU *apdu);
83             void present(Package &package, Z_APDU *apdu);
84             void scan1(Package &package, Z_APDU *apdu);
85             void scan2(Package &package, Z_APDU *apdu);
86             Rep *m_p;
87         };            
88         struct Multi::Map {
89             Map(std::list<std::string> hosts, std::string route);
90             Map();
91             std::list<std::string> m_hosts;
92             std::string m_route;
93         };
94         class Multi::Rep {
95             friend class Multi;
96             friend struct Frontend;
97             
98             FrontendPtr get_frontend(Package &package);
99             void release_frontend(Package &package);
100         private:
101             std::map<std::string,std::string> m_target_route;
102             boost::mutex m_mutex;
103             boost::condition m_cond_session_ready;
104             std::map<mp::Session, FrontendPtr> m_clients;
105         };
106     }
107 }
108
109 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
110 {
111     return m_count < k.m_count;
112 }
113
114 yf::Multi::Frontend::Frontend(Rep *rep)
115 {
116     m_p = rep;
117     m_is_multi = false;
118 }
119
120 yf::Multi::Frontend::~Frontend()
121 {
122 }
123
124 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package)
125 {
126     boost::mutex::scoped_lock lock(m_mutex);
127
128     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
129     
130     while(true)
131     {
132         it = m_clients.find(package.session());
133         if (it == m_clients.end())
134             break;
135         
136         if (!it->second->m_in_use)
137         {
138             it->second->m_in_use = true;
139             return it->second;
140         }
141         m_cond_session_ready.wait(lock);
142     }
143     FrontendPtr f(new Frontend(this));
144     m_clients[package.session()] = f;
145     f->m_in_use = true;
146     return f;
147 }
148
149 void yf::Multi::Rep::release_frontend(Package &package)
150 {
151     boost::mutex::scoped_lock lock(m_mutex);
152     std::map<mp::Session,yf::Multi::FrontendPtr>::iterator it;
153     
154     it = m_clients.find(package.session());
155     if (it != m_clients.end())
156     {
157         if (package.session().is_closed())
158         {
159             it->second->close(package);
160             m_clients.erase(it);
161         }
162         else
163         {
164             it->second->m_in_use = false;
165         }
166         m_cond_session_ready.notify_all();
167     }
168 }
169
170 yf::Multi::FrontendSet::FrontendSet(std::string setname)
171     :  m_setname(setname)
172 {
173 }
174
175
176 yf::Multi::FrontendSet::FrontendSet()
177 {
178 }
179
180
181 yf::Multi::FrontendSet::~FrontendSet()
182 {
183 }
184
185 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
186     : m_hosts(hosts), m_route(route) 
187 {
188 }
189
190 yf::Multi::Map::Map()
191 {
192 }
193
194 yf::Multi::Multi() : m_p(new Multi::Rep)
195 {
196 }
197
198 yf::Multi::~Multi() {
199 }
200
201
202 void yf::Multi::Backend::operator() (void) 
203 {
204     m_package->move(m_route);
205 }
206
207
208 void yf::Multi::Frontend::close(Package &package)
209 {
210     std::list<BackendPtr>::const_iterator bit;
211     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
212     {
213         BackendPtr b = *bit;
214
215         b->m_package->copy_filter(package);
216         b->m_package->request() = (Z_GDU *) 0;
217         b->m_package->session().close();
218         b->m_package->move(b->m_route);
219     }
220 }
221
222 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
223 {
224     std::list<BackendPtr>::const_iterator bit;
225     boost::thread_group g;
226     for (bit = blist.begin(); bit != blist.end(); bit++)
227     {
228         g.add_thread(new boost::thread(**bit));
229     }
230     g.join_all();
231 }
232
233 void yf::Multi::FrontendSet::round_robin(int start, int number,
234                                          std::list<PresentJob> &jobs)
235 {
236     std::list<int> pos;
237     std::list<int> inside_pos;
238     std::list<BackendSet>::const_iterator bsit;
239     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
240     {
241         pos.push_back(1);
242         inside_pos.push_back(0);
243     }
244
245     int p = 1;
246 #if 1
247     // optimization step!
248     int omin = 0;
249     while(true)
250     {
251         int min = 0;
252         int no_left = 0;
253         // find min count for each set which is > omin
254         for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
255         {
256             if (bsit->m_count > omin)
257             {
258                 if (no_left == 0 || bsit->m_count < min)
259                     min = bsit->m_count;
260                 no_left++;
261             }
262         }
263         if (no_left == 0) // if nothing greater than omin, bail out.
264             break;
265         int skip = no_left * min;
266         if (p + skip > start)  // step gets us "into" present range?
267         {
268             // Yes. skip until start.. Rounding off is deliberate!
269             min = (start-p) / no_left;
270             p += no_left * min;
271             
272             // update positions in each set..
273             std::list<int>::iterator psit = pos.begin();
274             for (psit = pos.begin(); psit != pos.end(); psit++)
275                 *psit += min;
276             break;
277         }
278         // skip on each set.. before "present range"..
279         p = p + skip;
280         
281         std::cout << "\nSKIP min=" << min << " no_left=" << no_left << "\n\n";
282         
283         std::list<int>::iterator psit = pos.begin();
284         for (psit = pos.begin(); psit != pos.end(); psit++)
285             *psit += min;
286         
287         omin = min; // update so we consider next class (with higher count)
288     }
289 #endif
290     int fetched = 0;
291     bool more = true;
292     while (more)
293     {
294         more = false;
295         std::list<int>::iterator psit = pos.begin();
296         std::list<int>::iterator esit = inside_pos.begin();
297         bsit = m_backend_sets.begin();
298
299         for (; bsit != m_backend_sets.end(); psit++,esit++,bsit++)
300         {
301             if (fetched >= number)
302             {
303                 more = false;
304                 break;
305             }
306             if (*psit <= bsit->m_count)
307             {
308                 if (p >= start)
309                 {
310                     PresentJob job;
311                     job.m_backend = bsit->m_backend;
312                     job.m_pos = *psit;
313                     job.m_inside_pos = *esit;
314                     jobs.push_back(job);
315                     (*esit)++;
316                     fetched++;
317                 }
318                 (*psit)++;
319                 p++;
320                 more = true;
321             }
322         }
323     }
324 }
325
326 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
327 {
328     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
329
330     std::list<std::string> targets;
331
332     mp::util::get_vhost_otherinfo(&req->otherInfo, false, targets);
333
334     if (targets.size() < 1)
335     {
336         package.move();
337         return;
338     }
339
340     std::list<std::string>::const_iterator t_it = targets.begin();
341     for (; t_it != targets.end(); t_it++)
342     {
343         Session s;
344         Backend *b = new Backend;
345         b->m_vhost = *t_it;
346
347         b->m_route = m_p->m_target_route[*t_it];
348         // b->m_route unset
349         b->m_package = PackagePtr(new Package(s, package.origin()));
350
351         m_backend_list.push_back(BackendPtr(b));
352     }
353     m_is_multi = true;
354
355     // create init request 
356     std::list<BackendPtr>::const_iterator bit;
357     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
358     {
359         mp::odr odr;
360         BackendPtr b = *bit;
361         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
362         
363         std::list<std::string>vhost_one;
364         vhost_one.push_back(b->m_vhost);
365         mp::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
366                                        odr, vhost_one);
367
368         Z_InitRequest *req = init_apdu->u.initRequest;
369         
370         ODR_MASK_SET(req->options, Z_Options_search);
371         ODR_MASK_SET(req->options, Z_Options_present);
372         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
373         ODR_MASK_SET(req->options, Z_Options_scan);
374         
375         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
376         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
377         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
378         
379         b->m_package->request() = init_apdu;
380
381         b->m_package->copy_filter(package);
382     }
383     multi_move(m_backend_list);
384
385     // create the frontend init response based on each backend init response
386     mp::odr odr;
387
388     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
389     Z_InitResponse *f_resp = f_apdu->u.initResponse;
390
391     ODR_MASK_SET(f_resp->options, Z_Options_search);
392     ODR_MASK_SET(f_resp->options, Z_Options_present);
393     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
394     
395     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
396     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
397     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
398
399     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
400     {
401         PackagePtr p = (*bit)->m_package;
402         
403         if (p->session().is_closed()) // if any backend closes, close frontend
404             package.session().close();
405         Z_GDU *gdu = p->response().get();
406         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
407             Z_APDU_initResponse)
408         {
409             int i;
410             Z_APDU *b_apdu = gdu->u.z3950;
411             Z_InitResponse *b_resp = b_apdu->u.initResponse;
412
413             // common options for all backends
414             for (i = 0; i <= Z_Options_stringSchema; i++)
415             {
416                 if (!ODR_MASK_GET(b_resp->options, i))
417                     ODR_MASK_CLEAR(f_resp->options, i);
418             }
419             // common protocol version
420             for (i = 0; i <= Z_ProtocolVersion_3; i++)
421                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
422                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
423             // reject if any of the backends reject
424             if (!*b_resp->result)
425                 *f_resp->result = 0;
426         }
427         else
428         {
429             // if any target does not return init return that (close or
430             // similar )
431             package.response() = p->response();
432             return;
433         }
434     }
435     package.response() = f_apdu;
436 }
437
438 void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
439 {
440     // create search request 
441     Z_SearchRequest *req = apdu_req->u.searchRequest;
442
443     // save these for later
444     int smallSetUpperBound = *req->smallSetUpperBound;
445     int largeSetLowerBound = *req->largeSetLowerBound;
446     int mediumSetPresentNumber = *req->mediumSetPresentNumber;
447     
448     // they are altered now - to disable piggyback
449     *req->smallSetUpperBound = 0;
450     *req->largeSetLowerBound = 1;
451     *req->mediumSetPresentNumber = 1;
452
453     int default_num_db = req->num_databaseNames;
454     char **default_db = req->databaseNames;
455
456     std::list<BackendPtr>::const_iterator bit;
457     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
458     {
459         PackagePtr p = (*bit)->m_package;
460         mp::odr odr;
461     
462         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
463                                                 &req->num_databaseNames,
464                                                 &req->databaseNames))
465         {
466             req->num_databaseNames = default_num_db;
467             req->databaseNames = default_db;
468         }
469         p->request() = apdu_req;
470         p->copy_filter(package);
471     }
472     multi_move(m_backend_list);
473
474     // look at each response
475     FrontendSet resultSet(std::string(req->resultSetName));
476
477     int result_set_size = 0;
478     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
479     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
480     {
481         PackagePtr p = (*bit)->m_package;
482         
483         if (p->session().is_closed()) // if any backend closes, close frontend
484             package.session().close();
485         
486         Z_GDU *gdu = p->response().get();
487         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
488             Z_APDU_searchResponse)
489         {
490             Z_APDU *b_apdu = gdu->u.z3950;
491             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
492          
493             // see we get any errors (AKA diagnstics)
494             if (b_resp->records)
495             {
496                 if (b_resp->records->which == Z_Records_NSD
497                     || b_resp->records->which == Z_Records_multipleNSD)
498                     z_records_diag = b_resp->records;
499                 // we may set this multiple times (TOO BAD!)
500             }
501             BackendSet backendSet;
502             backendSet.m_backend = *bit;
503             backendSet.m_count = *b_resp->resultCount;
504             result_set_size += *b_resp->resultCount;
505             resultSet.m_backend_sets.push_back(backendSet);
506         }
507         else
508         {
509             // if any target does not return search response - return that 
510             package.response() = p->response();
511             return;
512         }
513     }
514
515     mp::odr odr;
516     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
517     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
518
519     *f_resp->resultCount = result_set_size;
520     if (z_records_diag)
521     {
522         // search error
523         f_resp->records = z_records_diag;
524         package.response() = f_apdu;
525         return;
526     }
527     // assume OK
528     m_sets[resultSet.m_setname] = resultSet;
529
530     int number;
531     mp::util::piggyback(smallSetUpperBound,
532                          largeSetLowerBound,
533                          mediumSetPresentNumber,
534                          result_set_size,
535                          number);
536     Package pp(package.session(), package.origin());
537     if (number > 0)
538     {
539         pp.copy_filter(package);
540         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
541         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
542         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
543         p_req->resultSetId = req->resultSetName;
544         *p_req->resultSetStartPoint = 1;
545         *p_req->numberOfRecordsRequested = number;
546         pp.request() = p_apdu;
547         present(pp, p_apdu);
548         
549         if (pp.session().is_closed())
550             package.session().close();
551         
552         Z_GDU *gdu = pp.response().get();
553         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
554             Z_APDU_presentResponse)
555         {
556             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
557             f_resp->records = p_res->records;
558             *f_resp->numberOfRecordsReturned = 
559                 *p_res->numberOfRecordsReturned;
560             *f_resp->nextResultSetPosition = 
561                 *p_res->nextResultSetPosition;
562         }
563         else 
564         {
565             package.response() = pp.response(); 
566             return;
567         }
568     }
569     package.response() = f_apdu; // in this scope because of p
570 }
571
572 void yf::Multi::Frontend::present(Package &package, Z_APDU *apdu_req)
573 {
574     // create present request 
575     Z_PresentRequest *req = apdu_req->u.presentRequest;
576
577     Sets_it it;
578     it = m_sets.find(std::string(req->resultSetId));
579     if (it == m_sets.end())
580     {
581         mp::odr odr;
582         Z_APDU *apdu = 
583             odr.create_presentResponse(
584                 apdu_req,
585                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
586                 req->resultSetId);
587         package.response() = apdu;
588         return;
589     }
590     std::list<Multi::FrontendSet::PresentJob> jobs;
591     int start = *req->resultSetStartPoint;
592     int number = *req->numberOfRecordsRequested;
593     it->second.round_robin(start, number, jobs);
594
595     std::list<BackendPtr> present_backend_list;
596
597     std::list<BackendSet>::const_iterator bsit;
598     bsit = it->second.m_backend_sets.begin();
599     for (; bsit != it->second.m_backend_sets.end(); bsit++)
600     {
601         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
602         int start = -1;
603         int end = -1;
604         
605         for (jit = jobs.begin(); jit != jobs.end(); jit++)
606         {
607             if (jit->m_backend == bsit->m_backend)
608             {
609                 if (start == -1 || jit->m_pos < start)
610                     start = jit->m_pos;
611                 if (end == -1 || jit->m_pos > end)
612                     end = jit->m_pos;
613             }
614         }
615         if (start != -1)
616         {
617             PackagePtr p = bsit->m_backend->m_package;
618
619             *req->resultSetStartPoint = start;
620             *req->numberOfRecordsRequested = end - start + 1;
621             
622             p->request() = apdu_req;
623             p->copy_filter(package);
624
625             present_backend_list.push_back(bsit->m_backend);
626         }
627     }
628     multi_move(present_backend_list);
629
630     // look at each response
631     Z_Records *z_records_diag = 0;
632
633     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
634     for (; pbit != present_backend_list.end(); pbit++)
635     {
636         PackagePtr p = (*pbit)->m_package;
637         
638         if (p->session().is_closed()) // if any backend closes, close frontend
639             package.session().close();
640         
641         Z_GDU *gdu = p->response().get();
642         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
643             Z_APDU_presentResponse)
644         {
645             Z_APDU *b_apdu = gdu->u.z3950;
646             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
647          
648             // see we get any errors (AKA diagnstics)
649             if (b_resp->records)
650             {
651                 if (b_resp->records->which != Z_Records_DBOSD)
652                     z_records_diag = b_resp->records;
653                 // we may set this multiple times (TOO BAD!)
654             }
655         }
656         else
657         {
658             // if any target does not return present response - return that 
659             package.response() = p->response();
660             return;
661         }
662     }
663
664     mp::odr odr;
665     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
666     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
667
668     if (z_records_diag)
669     {
670         f_resp->records = z_records_diag;
671         *f_resp->presentStatus = Z_PresentStatus_failure;
672     }
673     else
674     {
675         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
676         Z_Records * records = f_resp->records;
677         records->which = Z_Records_DBOSD;
678         records->u.databaseOrSurDiagnostics =
679             (Z_NamePlusRecordList *)
680             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
681         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
682         nprl->num_records = jobs.size();
683         nprl->records = (Z_NamePlusRecord**)
684             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
685         int i = 0;
686         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
687         for (jit = jobs.begin(); jit != jobs.end(); jit++)
688         {
689             PackagePtr p = jit->m_backend->m_package;
690             
691             Z_GDU *gdu = p->response().get();
692             Z_APDU *b_apdu = gdu->u.z3950;
693             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
694
695             nprl->records[i++] =
696                 b_resp->records->u.databaseOrSurDiagnostics->
697                 records[jit->m_inside_pos];
698         }
699         *f_resp->nextResultSetPosition = start + i;
700         *f_resp->numberOfRecordsReturned = i;
701     }
702     package.response() = f_apdu;
703 }
704
705 void yf::Multi::Frontend::scan1(Package &package, Z_APDU *apdu_req)
706 {
707     if (m_backend_list.size() > 1)
708     {
709         mp::odr odr;
710         Z_APDU *f_apdu = 
711             odr.create_scanResponse(
712                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
713         package.response() = f_apdu;
714         return;
715     }
716     Z_ScanRequest *req = apdu_req->u.scanRequest;
717
718     int default_num_db = req->num_databaseNames;
719     char **default_db = req->databaseNames;
720
721     std::list<BackendPtr>::const_iterator bit;
722     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
723     {
724         PackagePtr p = (*bit)->m_package;
725         mp::odr odr;
726     
727         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
728                                                 &req->num_databaseNames,
729                                                 &req->databaseNames))
730         {
731             req->num_databaseNames = default_num_db;
732             req->databaseNames = default_db;
733         }
734         p->request() = apdu_req;
735         p->copy_filter(package);
736     }
737     multi_move(m_backend_list);
738
739     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
740     {
741         PackagePtr p = (*bit)->m_package;
742         
743         if (p->session().is_closed()) // if any backend closes, close frontend
744             package.session().close();
745         
746         Z_GDU *gdu = p->response().get();
747         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
748             Z_APDU_scanResponse)
749         {
750             package.response() = p->response();
751             break;
752         }
753         else
754         {
755             // if any target does not return scan response - return that 
756             package.response() = p->response();
757             return;
758         }
759     }
760 }
761
762 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
763 {
764     return m_norm_term < k.m_norm_term;
765 }
766
767 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
768 {
769     return m_norm_term == k.m_norm_term;
770 }
771
772 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
773 {
774     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
775     e->which = Z_Entry_termInfo;
776     Z_TermInfo *t;
777     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
778     t->suggestedAttributes = 0;
779     t->displayTerm = 0;
780     t->alternativeTerm = 0;
781     t->byAttributes = 0;
782     t->otherTermInfo = 0;
783     t->globalOccurrences = odr_intdup(odr, m_count);
784     t->term = (Z_Term *)
785         odr_malloc(odr, sizeof(*t->term));
786     t->term->which = Z_Term_general;
787     Odr_oct *o;
788     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
789
790     o->len = o->size = m_norm_term.size();
791     o->buf = (unsigned char *) odr_malloc(odr, o->len);
792     memcpy(o->buf, m_norm_term.c_str(), o->len);
793     return e;
794 }
795
796 void yf::Multi::Frontend::scan2(Package &package, Z_APDU *apdu_req)
797 {
798     Z_ScanRequest *req = apdu_req->u.scanRequest;
799
800     int default_num_db = req->num_databaseNames;
801     char **default_db = req->databaseNames;
802
803     std::list<BackendPtr>::const_iterator bit;
804     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
805     {
806         PackagePtr p = (*bit)->m_package;
807         mp::odr odr;
808     
809         if (!mp::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
810                                                 &req->num_databaseNames,
811                                                 &req->databaseNames))
812         {
813             req->num_databaseNames = default_num_db;
814             req->databaseNames = default_db;
815         }
816         p->request() = apdu_req;
817         p->copy_filter(package);
818     }
819     multi_move(m_backend_list);
820
821     ScanTermInfoList entries_before;
822     ScanTermInfoList entries_after;
823     int no_before = 0;
824     int no_after = 0;
825
826     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
827     {
828         PackagePtr p = (*bit)->m_package;
829         
830         if (p->session().is_closed()) // if any backend closes, close frontend
831             package.session().close();
832         
833         Z_GDU *gdu = p->response().get();
834         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
835             Z_APDU_scanResponse)
836         {
837             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
838
839             if (res->entries && res->entries->nonsurrogateDiagnostics)
840             {
841                 // failure
842                 mp::odr odr;
843                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
844                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
845
846                 f_res->entries->nonsurrogateDiagnostics = 
847                     res->entries->nonsurrogateDiagnostics;
848                 f_res->entries->num_nonsurrogateDiagnostics = 
849                     res->entries->num_nonsurrogateDiagnostics;
850
851                 package.response() = f_apdu;
852                 return;
853             }
854
855             if (res->entries && res->entries->entries)
856             {
857                 Z_Entry **entries = res->entries->entries;
858                 int num_entries = res->entries->num_entries;
859                 int position = 1;
860                 if (req->preferredPositionInResponse)
861                     position = *req->preferredPositionInResponse;
862                 if (res->positionOfTerm)
863                     position = *res->positionOfTerm;
864
865                 // before
866                 int i;
867                 for (i = 0; i<position-1 && i<num_entries; i++)
868                 {
869                     Z_Entry *ent = entries[i];
870
871                     if (ent->which == Z_Entry_termInfo)
872                     {
873                         ScanTermInfo my;
874
875                         int *occur = ent->u.termInfo->globalOccurrences;
876                         my.m_count = occur ? *occur : 0;
877
878                         if (ent->u.termInfo->term->which == Z_Term_general)
879                         {
880                             my.m_norm_term = std::string(
881                                 (const char *)
882                                 ent->u.termInfo->term->u.general->buf,
883                                 ent->u.termInfo->term->u.general->len);
884                         }
885                         if (my.m_norm_term.length())
886                         {
887                             ScanTermInfoList::iterator it = 
888                                 entries_before.begin();
889                             while (it != entries_before.end() && my <*it)
890                                 it++;
891                             if (my == *it)
892                             {
893                                 it->m_count += my.m_count;
894                             }
895                             else
896                             {
897                                 entries_before.insert(it, my);
898                                 no_before++;
899                             }
900                         }
901                     }
902                 }
903                 // after
904                 if (position <= 0)
905                     i = 0;
906                 else
907                     i = position-1;
908                 for ( ; i<num_entries; i++)
909                 {
910                     Z_Entry *ent = entries[i];
911
912                     if (ent->which == Z_Entry_termInfo)
913                     {
914                         ScanTermInfo my;
915
916                         int *occur = ent->u.termInfo->globalOccurrences;
917                         my.m_count = occur ? *occur : 0;
918
919                         if (ent->u.termInfo->term->which == Z_Term_general)
920                         {
921                             my.m_norm_term = std::string(
922                                 (const char *)
923                                 ent->u.termInfo->term->u.general->buf,
924                                 ent->u.termInfo->term->u.general->len);
925                         }
926                         if (my.m_norm_term.length())
927                         {
928                             ScanTermInfoList::iterator it = 
929                                 entries_after.begin();
930                             while (it != entries_after.end() && *it < my)
931                                 it++;
932                             if (my == *it)
933                             {
934                                 it->m_count += my.m_count;
935                             }
936                             else
937                             {
938                                 entries_after.insert(it, my);
939                                 no_after++;
940                             }
941                         }
942                     }
943                 }
944
945             }                
946         }
947         else
948         {
949             // if any target does not return scan response - return that 
950             package.response() = p->response();
951             return;
952         }
953     }
954
955     if (true)
956     {
957         std::cout << "BEFORE\n";
958         ScanTermInfoList::iterator it = entries_before.begin();
959         for(; it != entries_before.end(); it++)
960         {
961             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
962         }
963         
964         std::cout << "AFTER\n";
965         it = entries_after.begin();
966         for(; it != entries_after.end(); it++)
967         {
968             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
969         }
970     }
971
972     if (false)
973     {
974         mp::odr odr;
975         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
976         package.response() = f_apdu;
977     }
978     else
979     {
980         mp::odr odr;
981         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
982         Z_ScanResponse *resp = f_apdu->u.scanResponse;
983         
984         int number_returned = *req->numberOfTermsRequested;
985         int position_returned = *req->preferredPositionInResponse;
986         
987         resp->entries->num_entries = number_returned;
988         resp->entries->entries = (Z_Entry**)
989             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
990         int i;
991
992         int lbefore = entries_before.size();
993         if (lbefore < position_returned-1)
994             position_returned = lbefore+1;
995
996         ScanTermInfoList::iterator it = entries_before.begin();
997         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
998         {
999             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
1000         }
1001
1002         it = entries_after.begin();
1003
1004         if (position_returned <= 0)
1005             i = 0;
1006         else
1007             i = position_returned-1;
1008         for (; i<number_returned && it != entries_after.end(); i++, it++)
1009         {
1010             resp->entries->entries[i] = it->get_entry(odr);
1011         }
1012
1013         number_returned = i;
1014
1015         resp->positionOfTerm = odr_intdup(odr, position_returned);
1016         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
1017         resp->entries->num_entries = number_returned;
1018
1019         package.response() = f_apdu;
1020     }
1021 }
1022
1023
1024 void yf::Multi::process(Package &package) const
1025 {
1026     FrontendPtr f = m_p->get_frontend(package);
1027
1028     Z_GDU *gdu = package.request().get();
1029     
1030     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1031         Z_APDU_initRequest && !f->m_is_multi)
1032     {
1033         f->init(package, gdu);
1034     }
1035     else if (!f->m_is_multi)
1036         package.move();
1037     else if (gdu && gdu->which == Z_GDU_Z3950)
1038     {
1039         Z_APDU *apdu = gdu->u.z3950;
1040         if (apdu->which == Z_APDU_initRequest)
1041         {
1042             mp::odr odr;
1043             
1044             package.response() = odr.create_close(
1045                 apdu,
1046                 Z_Close_protocolError,
1047                 "double init");
1048             
1049             package.session().close();
1050         }
1051         else if (apdu->which == Z_APDU_searchRequest)
1052         {
1053             f->search(package, apdu);
1054         }
1055         else if (apdu->which == Z_APDU_presentRequest)
1056         {
1057             f->present(package, apdu);
1058         }
1059         else if (apdu->which == Z_APDU_scanRequest)
1060         {
1061             f->scan2(package, apdu);
1062         }
1063         else
1064         {
1065             mp::odr odr;
1066             
1067             package.response() = odr.create_close(
1068                 apdu, Z_Close_protocolError,
1069                 "unsupported APDU in filter multi");
1070             
1071             package.session().close();
1072         }
1073     }
1074     m_p->release_frontend(package);
1075 }
1076
1077 void mp::filter::Multi::configure(const xmlNode * ptr)
1078 {
1079     for (ptr = ptr->children; ptr; ptr = ptr->next)
1080     {
1081         if (ptr->type != XML_ELEMENT_NODE)
1082             continue;
1083         if (!strcmp((const char *) ptr->name, "target"))
1084         {
1085             std::string route = mp::xml::get_route(ptr);
1086             std::string target = mp::xml::get_text(ptr);
1087             std::cout << "route=" << route << " target=" << target << "\n";
1088             m_p->m_target_route[target] = route;
1089         }
1090         else
1091         {
1092             throw mp::filter::FilterException
1093                 ("Bad element " 
1094                  + std::string((const char *) ptr->name)
1095                  + " in virt_db filter");
1096         }
1097     }
1098 }
1099
1100 static mp::filter::Base* filter_creator()
1101 {
1102     return new mp::filter::Multi;
1103 }
1104
1105 extern "C" {
1106     struct metaproxy_1_filter_struct metaproxy_1_filter_multi = {
1107         0,
1108         "multi",
1109         filter_creator
1110     };
1111 }
1112
1113
1114 /*
1115  * Local variables:
1116  * c-basic-offset: 4
1117  * indent-tabs-mode: nil
1118  * c-file-style: "stroustrup"
1119  * End:
1120  * vim: shiftwidth=4 tabstop=8 expandtab
1121  */