Consider border conditions for scan
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.11 2006-01-18 16:21:48 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <vector>
25 #include <algorithm>
26 #include <map>
27 #include <iostream>
28
29 namespace yf = yp2::filter;
30
31 namespace yp2 {
32     namespace filter {
33
34         struct Multi::BackendSet {
35             BackendPtr m_backend;
36             int m_count;
37             bool operator < (const BackendSet &k) const;
38             bool operator == (const BackendSet &k) const;
39         };
40         struct Multi::ScanTermInfo {
41             std::string m_norm_term;
42             std::string m_display_term;
43             int m_count;
44             bool operator < (const ScanTermInfo &) const;
45             bool operator == (const ScanTermInfo &) const;
46             Z_Entry *get_entry(ODR odr);
47         };
48         struct Multi::FrontendSet {
49             struct PresentJob {
50                 BackendPtr m_backend;
51                 int m_pos;
52                 int m_inside_pos;
53             };
54             FrontendSet(std::string setname);
55             FrontendSet();
56             ~FrontendSet();
57
58             void round_robin(int pos, int number, std::list<PresentJob> &job);
59
60             std::list<BackendSet> m_backend_sets;
61             std::string m_setname;
62         };
63         struct Multi::Backend {
64             PackagePtr m_package;
65             std::string m_backend_database;
66             std::string m_vhost;
67             std::string m_route;
68             void operator() (void);  // thread operation
69         };
70         struct Multi::Frontend {
71             Frontend(Rep *rep);
72             ~Frontend();
73             bool m_is_multi;
74             bool m_in_use;
75             std::list<BackendPtr> m_backend_list;
76             std::map<std::string,Multi::FrontendSet> m_sets;
77
78             void multi_move(std::list<BackendPtr> &blist);
79             void init(Package &package, Z_GDU *gdu);
80             void close(Package &package);
81             void search(Package &package, Z_APDU *apdu);
82             void present(Package &package, Z_APDU *apdu);
83             void scan1(Package &package, Z_APDU *apdu);
84             void scan2(Package &package, Z_APDU *apdu);
85             Rep *m_p;
86         };            
87         struct Multi::Map {
88             Map(std::list<std::string> hosts, std::string route);
89             Map();
90             std::list<std::string> m_hosts;
91             std::string m_route;
92         };
93         class Multi::Rep {
94             friend class Multi;
95             friend class Frontend;
96             
97             FrontendPtr get_frontend(Package &package);
98             void release_frontend(Package &package);
99         private:
100             boost::mutex m_sessions_mutex;
101             std::map<std::string, Multi::Map>m_maps;
102             std::map<std::string,std::string> m_target_route;
103             boost::mutex m_mutex;
104             boost::condition m_cond_session_ready;
105             std::map<yp2::Session, FrontendPtr> m_clients;
106         };
107     }
108 }
109
110 using namespace yp2;
111
112 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
113 {
114     return m_count < k.m_count;
115 }
116
117 yf::Multi::Frontend::Frontend(Rep *rep)
118 {
119     m_p = rep;
120     m_is_multi = false;
121 }
122
123 yf::Multi::Frontend::~Frontend()
124 {
125 }
126
127 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package)
128 {
129     boost::mutex::scoped_lock lock(m_mutex);
130
131     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
132     
133     while(true)
134     {
135         it = m_clients.find(package.session());
136         if (it == m_clients.end())
137             break;
138         
139         if (!it->second->m_in_use)
140         {
141             it->second->m_in_use = true;
142             return it->second;
143         }
144         m_cond_session_ready.wait(lock);
145     }
146     FrontendPtr f(new Frontend(this));
147     m_clients[package.session()] = f;
148     f->m_in_use = true;
149     return f;
150 }
151
152 void yf::Multi::Rep::release_frontend(Package &package)
153 {
154     boost::mutex::scoped_lock lock(m_mutex);
155     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
156     
157     it = m_clients.find(package.session());
158     if (it != m_clients.end())
159     {
160         if (package.session().is_closed())
161         {
162             it->second->close(package);
163             m_clients.erase(it);
164         }
165         else
166         {
167             it->second->m_in_use = false;
168         }
169         m_cond_session_ready.notify_all();
170     }
171 }
172
173 yf::Multi::FrontendSet::FrontendSet(std::string setname)
174     :  m_setname(setname)
175 {
176 }
177
178
179 yf::Multi::FrontendSet::FrontendSet()
180 {
181 }
182
183
184 yf::Multi::FrontendSet::~FrontendSet()
185 {
186 }
187
188 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
189     : m_hosts(hosts), m_route(route) 
190 {
191 }
192
193 yf::Multi::Map::Map()
194 {
195 }
196
197 yf::Multi::Multi() : m_p(new Multi::Rep)
198 {
199 }
200
201 yf::Multi::~Multi() {
202 }
203
204
205 void yf::Multi::add_map_host2hosts(std::string host,
206                                    std::list<std::string> hosts,
207                                    std::string route)
208 {
209     m_p->m_maps[host] = Multi::Map(hosts, route);
210 }
211
212 void yf::Multi::Backend::operator() (void) 
213 {
214     m_package->move(m_route);
215 }
216
217
218 void yf::Multi::Frontend::close(Package &package)
219 {
220     std::list<BackendPtr>::const_iterator bit;
221     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
222     {
223         BackendPtr b = *bit;
224
225         b->m_package->copy_filter(package);
226         b->m_package->request() = (Z_GDU *) 0;
227         b->m_package->session().close();
228         b->m_package->move(b->m_route);
229     }
230 }
231
232 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
233 {
234     std::list<BackendPtr>::const_iterator bit;
235     boost::thread_group g;
236     for (bit = blist.begin(); bit != blist.end(); bit++)
237     {
238         g.add_thread(new boost::thread(**bit));
239     }
240     g.join_all();
241 }
242
243
244 void yf::Multi::FrontendSet::round_robin(int start, int number,
245                                          std::list<PresentJob> &jobs)
246 {
247     int fetched = 0;
248     int p = 1;
249     bool eof = true;
250
251     std::list<int> pos;
252     std::list<int> inside_pos;
253     std::list<BackendSet>::const_iterator bsit;
254     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
255     {
256         pos.push_back(1);
257         inside_pos.push_back(0);
258     }
259
260     std::list<int>::iterator psit = pos.begin();
261     std::list<int>::iterator esit = inside_pos.begin();
262     bsit = m_backend_sets.begin();
263     while (fetched < number)
264     {
265         if (bsit == m_backend_sets.end())
266         {
267             psit = pos.begin();
268             esit = inside_pos.begin();
269             bsit = m_backend_sets.begin();
270             if (eof)
271                 break;
272             eof = true;
273         }
274         if (*psit <= bsit->m_count)
275         {
276             if (p >= start)
277             {
278                 PresentJob job;
279                 job.m_backend = bsit->m_backend;
280                 job.m_pos = *psit;
281                 job.m_inside_pos = *esit;
282                 jobs.push_back(job);
283                 (*esit)++;
284                 fetched++;
285             }
286             (*psit)++;
287             p++;
288             eof = false;
289         }
290         psit++;
291         esit++;
292         bsit++;
293     }
294 }
295
296 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
297 {
298     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
299
300     std::list<std::string> targets;
301
302     yp2::util::get_vhost_otherinfo(&req->otherInfo, false, targets);
303
304     if (targets.size() < 1)
305     {
306         package.move();
307         return;
308     }
309
310     std::list<std::string>::const_iterator t_it = targets.begin();
311     for (; t_it != targets.end(); t_it++)
312     {
313         Session s;
314         Backend *b = new Backend;
315         b->m_vhost = *t_it;
316
317         b->m_route = m_p->m_target_route[*t_it];
318         // b->m_route unset
319         b->m_package = PackagePtr(new Package(s, package.origin()));
320
321         m_backend_list.push_back(BackendPtr(b));
322     }
323     m_is_multi = true;
324
325     // create init request 
326     std::list<BackendPtr>::const_iterator bit;
327     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
328     {
329         yp2::odr odr;
330         BackendPtr b = *bit;
331         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
332         
333         std::list<std::string>vhost_one;
334         vhost_one.push_back(b->m_vhost);
335         yp2::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
336                                        odr, vhost_one);
337
338         Z_InitRequest *req = init_apdu->u.initRequest;
339         
340         ODR_MASK_SET(req->options, Z_Options_search);
341         ODR_MASK_SET(req->options, Z_Options_present);
342         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
343         ODR_MASK_SET(req->options, Z_Options_scan);
344         
345         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
346         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
347         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
348         
349         b->m_package->request() = init_apdu;
350
351         b->m_package->copy_filter(package);
352     }
353     multi_move(m_backend_list);
354
355     // create the frontend init response based on each backend init response
356     yp2::odr odr;
357
358     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
359     Z_InitResponse *f_resp = f_apdu->u.initResponse;
360
361     ODR_MASK_SET(f_resp->options, Z_Options_search);
362     ODR_MASK_SET(f_resp->options, Z_Options_present);
363     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
364     
365     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
366     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
367     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
368
369     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
370     {
371         PackagePtr p = (*bit)->m_package;
372         
373         if (p->session().is_closed()) // if any backend closes, close frontend
374             package.session().close();
375         Z_GDU *gdu = p->response().get();
376         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
377             Z_APDU_initResponse)
378         {
379             int i;
380             Z_APDU *b_apdu = gdu->u.z3950;
381             Z_InitResponse *b_resp = b_apdu->u.initResponse;
382
383             // common options for all backends
384             for (i = 0; i <= Z_Options_stringSchema; i++)
385             {
386                 if (!ODR_MASK_GET(b_resp->options, i))
387                     ODR_MASK_CLEAR(f_resp->options, i);
388             }
389             // common protocol version
390             for (i = 0; i <= Z_ProtocolVersion_3; i++)
391                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
392                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
393             // reject if any of the backends reject
394             if (!*b_resp->result)
395                 *f_resp->result = 0;
396         }
397         else
398         {
399             // if any target does not return init return that (close or
400             // similar )
401             package.response() = p->response();
402             return;
403         }
404     }
405     package.response() = f_apdu;
406 }
407
408 void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
409 {
410     // create search request 
411     Z_SearchRequest *req = apdu_req->u.searchRequest;
412
413     // save these for later
414     int smallSetUpperBound = *req->smallSetUpperBound;
415     int largeSetLowerBound = *req->largeSetLowerBound;
416     int mediumSetPresentNumber = *req->mediumSetPresentNumber;
417     
418     // they are altered now - to disable piggyback
419     *req->smallSetUpperBound = 0;
420     *req->largeSetLowerBound = 1;
421     *req->mediumSetPresentNumber = 1;
422
423     int default_num_db = req->num_databaseNames;
424     char **default_db = req->databaseNames;
425
426     std::list<BackendPtr>::const_iterator bit;
427     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
428     {
429         PackagePtr p = (*bit)->m_package;
430         yp2::odr odr;
431     
432         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
433                                                 &req->num_databaseNames,
434                                                 &req->databaseNames))
435         {
436             req->num_databaseNames = default_num_db;
437             req->databaseNames = default_db;
438         }
439         p->request() = apdu_req;
440         p->copy_filter(package);
441     }
442     multi_move(m_backend_list);
443
444     // look at each response
445     FrontendSet resultSet(std::string(req->resultSetName));
446
447     int result_set_size = 0;
448     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
449     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
450     {
451         PackagePtr p = (*bit)->m_package;
452         
453         if (p->session().is_closed()) // if any backend closes, close frontend
454             package.session().close();
455         
456         Z_GDU *gdu = p->response().get();
457         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
458             Z_APDU_searchResponse)
459         {
460             Z_APDU *b_apdu = gdu->u.z3950;
461             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
462          
463             // see we get any errors (AKA diagnstics)
464             if (b_resp->records)
465             {
466                 if (b_resp->records->which == Z_Records_NSD
467                     || b_resp->records->which == Z_Records_multipleNSD)
468                     z_records_diag = b_resp->records;
469                 // we may set this multiple times (TOO BAD!)
470             }
471             BackendSet backendSet;
472             backendSet.m_backend = *bit;
473             backendSet.m_count = *b_resp->resultCount;
474             result_set_size += *b_resp->resultCount;
475             resultSet.m_backend_sets.push_back(backendSet);
476         }
477         else
478         {
479             // if any target does not return search response - return that 
480             package.response() = p->response();
481             return;
482         }
483     }
484
485     yp2::odr odr;
486     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
487     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
488
489     *f_resp->resultCount = result_set_size;
490     if (z_records_diag)
491     {
492         // search error
493         f_resp->records = z_records_diag;
494         package.response() = f_apdu;
495         return;
496     }
497     // assume OK
498     m_sets[resultSet.m_setname] = resultSet;
499
500     int number;
501     yp2::util::piggyback(smallSetUpperBound,
502                          largeSetLowerBound,
503                          mediumSetPresentNumber,
504                          result_set_size,
505                          number);
506     Package pp(package.session(), package.origin());
507     if (number > 0)
508     {
509         pp.copy_filter(package);
510         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
511         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
512         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
513         p_req->resultSetId = req->resultSetName;
514         *p_req->resultSetStartPoint = 1;
515         *p_req->numberOfRecordsRequested = number;
516         pp.request() = p_apdu;
517         present(pp, p_apdu);
518         
519         if (pp.session().is_closed())
520             package.session().close();
521         
522         Z_GDU *gdu = pp.response().get();
523         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
524             Z_APDU_presentResponse)
525         {
526             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
527             f_resp->records = p_res->records;
528             *f_resp->numberOfRecordsReturned = 
529                 *p_res->numberOfRecordsReturned;
530             *f_resp->nextResultSetPosition = 
531                 *p_res->nextResultSetPosition;
532         }
533         else 
534         {
535             package.response() = pp.response(); 
536             return;
537         }
538     }
539     package.response() = f_apdu; // in this scope because of p
540 }
541
542 void yf::Multi::Frontend::present(Package &package, Z_APDU *apdu_req)
543 {
544     // create present request 
545     Z_PresentRequest *req = apdu_req->u.presentRequest;
546
547     Sets_it it;
548     it = m_sets.find(std::string(req->resultSetId));
549     if (it == m_sets.end())
550     {
551         yp2::odr odr;
552         Z_APDU *apdu = 
553             odr.create_presentResponse(
554                 apdu_req,
555                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
556                 req->resultSetId);
557         package.response() = apdu;
558         return;
559     }
560     std::list<Multi::FrontendSet::PresentJob> jobs;
561     int start = *req->resultSetStartPoint;
562     int number = *req->numberOfRecordsRequested;
563     it->second.round_robin(start, number, jobs);
564
565     std::list<BackendPtr> present_backend_list;
566
567     std::list<BackendSet>::const_iterator bsit;
568     bsit = it->second.m_backend_sets.begin();
569     for (; bsit != it->second.m_backend_sets.end(); bsit++)
570     {
571         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
572         int start = -1;
573         int end = -1;
574         
575         for (jit = jobs.begin(); jit != jobs.end(); jit++)
576         {
577             if (jit->m_backend == bsit->m_backend)
578             {
579                 if (start == -1 || jit->m_pos < start)
580                     start = jit->m_pos;
581                 if (end == -1 || jit->m_pos > end)
582                     end = jit->m_pos;
583             }
584         }
585         if (start != -1)
586         {
587             PackagePtr p = bsit->m_backend->m_package;
588
589             *req->resultSetStartPoint = start;
590             *req->numberOfRecordsRequested = end - start + 1;
591             
592             p->request() = apdu_req;
593             p->copy_filter(package);
594
595             present_backend_list.push_back(bsit->m_backend);
596         }
597     }
598     multi_move(present_backend_list);
599
600     // look at each response
601     Z_Records *z_records_diag = 0;
602
603     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
604     for (; pbit != present_backend_list.end(); pbit++)
605     {
606         PackagePtr p = (*pbit)->m_package;
607         
608         if (p->session().is_closed()) // if any backend closes, close frontend
609             package.session().close();
610         
611         Z_GDU *gdu = p->response().get();
612         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
613             Z_APDU_presentResponse)
614         {
615             Z_APDU *b_apdu = gdu->u.z3950;
616             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
617          
618             // see we get any errors (AKA diagnstics)
619             if (b_resp->records)
620             {
621                 if (b_resp->records->which != Z_Records_DBOSD)
622                     z_records_diag = b_resp->records;
623                 // we may set this multiple times (TOO BAD!)
624             }
625         }
626         else
627         {
628             // if any target does not return present response - return that 
629             package.response() = p->response();
630             return;
631         }
632     }
633
634     yp2::odr odr;
635     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
636     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
637
638     if (z_records_diag)
639     {
640         f_resp->records = z_records_diag;
641         *f_resp->presentStatus = Z_PresentStatus_failure;
642     }
643     else
644     {
645         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
646         Z_Records * records = f_resp->records;
647         records->which = Z_Records_DBOSD;
648         records->u.databaseOrSurDiagnostics =
649             (Z_NamePlusRecordList *)
650             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
651         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
652         nprl->num_records = jobs.size();
653         nprl->records = (Z_NamePlusRecord**)
654             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
655         int i = 0;
656         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
657         for (jit = jobs.begin(); jit != jobs.end(); jit++)
658         {
659             PackagePtr p = jit->m_backend->m_package;
660             
661             Z_GDU *gdu = p->response().get();
662             Z_APDU *b_apdu = gdu->u.z3950;
663             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
664
665             nprl->records[i++] =
666                 b_resp->records->u.databaseOrSurDiagnostics->
667                 records[jit->m_inside_pos];
668         }
669         *f_resp->nextResultSetPosition = start + i;
670         *f_resp->numberOfRecordsReturned = i;
671     }
672     package.response() = f_apdu;
673 }
674
675 void yf::Multi::Frontend::scan1(Package &package, Z_APDU *apdu_req)
676 {
677     if (m_backend_list.size() > 1)
678     {
679         yp2::odr odr;
680         Z_APDU *f_apdu = 
681             odr.create_scanResponse(
682                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
683         package.response() = f_apdu;
684         return;
685     }
686     Z_ScanRequest *req = apdu_req->u.scanRequest;
687
688     int default_num_db = req->num_databaseNames;
689     char **default_db = req->databaseNames;
690
691     std::list<BackendPtr>::const_iterator bit;
692     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
693     {
694         PackagePtr p = (*bit)->m_package;
695         yp2::odr odr;
696     
697         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
698                                                 &req->num_databaseNames,
699                                                 &req->databaseNames))
700         {
701             req->num_databaseNames = default_num_db;
702             req->databaseNames = default_db;
703         }
704         p->request() = apdu_req;
705         p->copy_filter(package);
706     }
707     multi_move(m_backend_list);
708
709     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
710     {
711         PackagePtr p = (*bit)->m_package;
712         
713         if (p->session().is_closed()) // if any backend closes, close frontend
714             package.session().close();
715         
716         Z_GDU *gdu = p->response().get();
717         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
718             Z_APDU_scanResponse)
719         {
720             package.response() = p->response();
721             break;
722         }
723         else
724         {
725             // if any target does not return scan response - return that 
726             package.response() = p->response();
727             return;
728         }
729     }
730 }
731
732 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
733 {
734     return m_norm_term < k.m_norm_term;
735 }
736
737 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
738 {
739     return m_norm_term == k.m_norm_term;
740 }
741
742 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
743 {
744     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
745     e->which = Z_Entry_termInfo;
746     Z_TermInfo *t;
747     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
748     t->suggestedAttributes = 0;
749     t->displayTerm = 0;
750     t->alternativeTerm = 0;
751     t->byAttributes = 0;
752     t->otherTermInfo = 0;
753     t->globalOccurrences = odr_intdup(odr, m_count);
754     t->term = (Z_Term *)
755         odr_malloc(odr, sizeof(*t->term));
756     t->term->which = Z_Term_general;
757     Odr_oct *o;
758     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
759
760     o->len = o->size = m_norm_term.size();
761     o->buf = (unsigned char *) odr_malloc(odr, o->len);
762     memcpy(o->buf, m_norm_term.c_str(), o->len);
763     return e;
764 }
765
766 void yf::Multi::Frontend::scan2(Package &package, Z_APDU *apdu_req)
767 {
768     Z_ScanRequest *req = apdu_req->u.scanRequest;
769
770     int default_num_db = req->num_databaseNames;
771     char **default_db = req->databaseNames;
772
773     std::list<BackendPtr>::const_iterator bit;
774     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
775     {
776         PackagePtr p = (*bit)->m_package;
777         yp2::odr odr;
778     
779         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
780                                                 &req->num_databaseNames,
781                                                 &req->databaseNames))
782         {
783             req->num_databaseNames = default_num_db;
784             req->databaseNames = default_db;
785         }
786         p->request() = apdu_req;
787         p->copy_filter(package);
788     }
789     multi_move(m_backend_list);
790
791     ScanTermInfoList entries_before;
792     ScanTermInfoList entries_after;
793     int no_before = 0;
794     int no_after = 0;
795
796     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
797     {
798         PackagePtr p = (*bit)->m_package;
799         
800         if (p->session().is_closed()) // if any backend closes, close frontend
801             package.session().close();
802         
803         Z_GDU *gdu = p->response().get();
804         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
805             Z_APDU_scanResponse)
806         {
807             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
808
809             if (res->entries && res->entries->nonsurrogateDiagnostics)
810             {
811                 // failure
812                 yp2::odr odr;
813                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
814                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
815
816                 f_res->entries->nonsurrogateDiagnostics = 
817                     res->entries->nonsurrogateDiagnostics;
818                 f_res->entries->num_nonsurrogateDiagnostics = 
819                     res->entries->num_nonsurrogateDiagnostics;
820
821                 package.response() = f_apdu;
822                 return;
823             }
824
825             if (res->entries && res->entries->entries)
826             {
827                 Z_Entry **entries = res->entries->entries;
828                 int num_entries = res->entries->num_entries;
829                 int position = 1;
830                 if (req->preferredPositionInResponse)
831                     position = *req->preferredPositionInResponse;
832                 if (res->positionOfTerm)
833                     position = *res->positionOfTerm;
834
835                 // before
836                 int i;
837                 for (i = 0; i<position-1 && i<num_entries; i++)
838                 {
839                     Z_Entry *ent = entries[i];
840
841                     if (ent->which == Z_Entry_termInfo)
842                     {
843                         ScanTermInfo my;
844
845                         int *occur = ent->u.termInfo->globalOccurrences;
846                         my.m_count = occur ? *occur : 0;
847
848                         if (ent->u.termInfo->term->which == Z_Term_general)
849                         {
850                             my.m_norm_term = std::string(
851                                 (const char *)
852                                 ent->u.termInfo->term->u.general->buf,
853                                 ent->u.termInfo->term->u.general->len);
854                         }
855                         if (my.m_norm_term.length())
856                         {
857                             ScanTermInfoList::iterator it = 
858                                 entries_before.begin();
859                             while (it != entries_before.end() && my <*it)
860                                 it++;
861                             if (my == *it)
862                             {
863                                 it->m_count += my.m_count;
864                             }
865                             else
866                             {
867                                 entries_before.insert(it, my);
868                                 no_before++;
869                             }
870                         }
871                     }
872                 }
873                 // after
874                 if (position <= 0)
875                     i = 0;
876                 else
877                     i = position-1;
878                 for ( ; i<num_entries; i++)
879                 {
880                     Z_Entry *ent = entries[i];
881
882                     if (ent->which == Z_Entry_termInfo)
883                     {
884                         ScanTermInfo my;
885
886                         int *occur = ent->u.termInfo->globalOccurrences;
887                         my.m_count = occur ? *occur : 0;
888
889                         if (ent->u.termInfo->term->which == Z_Term_general)
890                         {
891                             my.m_norm_term = std::string(
892                                 (const char *)
893                                 ent->u.termInfo->term->u.general->buf,
894                                 ent->u.termInfo->term->u.general->len);
895                         }
896                         if (my.m_norm_term.length())
897                         {
898                             ScanTermInfoList::iterator it = 
899                                 entries_after.begin();
900                             while (it != entries_after.end() && *it < my)
901                                 it++;
902                             if (my == *it)
903                             {
904                                 it->m_count += my.m_count;
905                             }
906                             else
907                             {
908                                 entries_after.insert(it, my);
909                                 no_after++;
910                             }
911                         }
912                     }
913                 }
914
915             }                
916         }
917         else
918         {
919             // if any target does not return scan response - return that 
920             package.response() = p->response();
921             return;
922         }
923     }
924
925     if (true)
926     {
927         std::cout << "BEFORE\n";
928         ScanTermInfoList::iterator it = entries_before.begin();
929         for(; it != entries_before.end(); it++)
930         {
931             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
932         }
933         
934         std::cout << "AFTER\n";
935         it = entries_after.begin();
936         for(; it != entries_after.end(); it++)
937         {
938             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
939         }
940     }
941
942     if (false)
943     {
944         yp2::odr odr;
945         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
946         package.response() = f_apdu;
947     }
948     else
949     {
950         yp2::odr odr;
951         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
952         Z_ScanResponse *resp = f_apdu->u.scanResponse;
953         
954         int number_returned = *req->numberOfTermsRequested;
955         int position_returned = *req->preferredPositionInResponse;
956         
957         resp->entries->num_entries = number_returned;
958         resp->entries->entries = (Z_Entry**)
959             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
960         int i;
961
962         int lbefore = entries_before.size();
963         if (lbefore < position_returned-1)
964             position_returned = lbefore+1;
965
966         ScanTermInfoList::iterator it = entries_before.begin();
967         for (i = 0; i<position_returned-1 && it != entries_before.end(); i++, it++)
968         {
969             resp->entries->entries[position_returned-2-i] = it->get_entry(odr);
970         }
971
972         it = entries_after.begin();
973
974         if (position_returned <= 0)
975             i = 0;
976         else
977             i = position_returned-1;
978         for (; i<number_returned && it != entries_after.end(); i++, it++)
979         {
980             resp->entries->entries[i] = it->get_entry(odr);
981         }
982
983         number_returned = i;
984
985         resp->positionOfTerm = odr_intdup(odr, position_returned);
986         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
987         resp->entries->num_entries = number_returned;
988
989         package.response() = f_apdu;
990     }
991 }
992
993
994 void yf::Multi::process(Package &package) const
995 {
996     FrontendPtr f = m_p->get_frontend(package);
997
998     Z_GDU *gdu = package.request().get();
999     
1000     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1001         Z_APDU_initRequest && !f->m_is_multi)
1002     {
1003         f->init(package, gdu);
1004     }
1005     else if (!f->m_is_multi)
1006         package.move();
1007     else if (gdu && gdu->which == Z_GDU_Z3950)
1008     {
1009         Z_APDU *apdu = gdu->u.z3950;
1010         if (apdu->which == Z_APDU_initRequest)
1011         {
1012             yp2::odr odr;
1013             
1014             package.response() = odr.create_close(
1015                 apdu,
1016                 Z_Close_protocolError,
1017                 "double init");
1018             
1019             package.session().close();
1020         }
1021         else if (apdu->which == Z_APDU_searchRequest)
1022         {
1023             f->search(package, apdu);
1024         }
1025         else if (apdu->which == Z_APDU_presentRequest)
1026         {
1027             f->present(package, apdu);
1028         }
1029         else if (apdu->which == Z_APDU_scanRequest)
1030         {
1031             f->scan2(package, apdu);
1032         }
1033         else
1034         {
1035             yp2::odr odr;
1036             
1037             package.response() = odr.create_close(
1038                 apdu, Z_Close_protocolError,
1039                 "unsupported APDU in filter multi");
1040             
1041             package.session().close();
1042         }
1043     }
1044     m_p->release_frontend(package);
1045 }
1046
1047 void yp2::filter::Multi::configure(const xmlNode * ptr)
1048 {
1049     for (ptr = ptr->children; ptr; ptr = ptr->next)
1050     {
1051         if (ptr->type != XML_ELEMENT_NODE)
1052             continue;
1053         if (!strcmp((const char *) ptr->name, "target"))
1054         {
1055             std::string route = yp2::xml::get_route(ptr);
1056             std::string target = yp2::xml::get_text(ptr);
1057             std::cout << "route=" << route << " target=" << target << "\n";
1058             m_p->m_target_route[target] = route;
1059         }
1060         else if (!strcmp((const char *) ptr->name, "virtual"))
1061         {
1062             std::list<std::string> targets;
1063             std::string vhost;
1064             xmlNode *v_node = ptr->children;
1065             for (; v_node; v_node = v_node->next)
1066             {
1067                 if (v_node->type != XML_ELEMENT_NODE)
1068                     continue;
1069                 
1070                 if (yp2::xml::is_element_yp2(v_node, "vhost"))
1071                     vhost = yp2::xml::get_text(v_node);
1072                 else if (yp2::xml::is_element_yp2(v_node, "target"))
1073                     targets.push_back(yp2::xml::get_text(v_node));
1074                 else
1075                     throw yp2::filter::FilterException
1076                         ("Bad element " 
1077                          + std::string((const char *) v_node->name)
1078                          + " in virtual section"
1079                             );
1080             }
1081             std::string route = yp2::xml::get_route(ptr);
1082             add_map_host2hosts(vhost, targets, route);
1083             std::list<std::string>::const_iterator it;
1084             for (it = targets.begin(); it != targets.end(); it++)
1085             {
1086                 std::cout << "Add " << vhost << "->" << *it
1087                           << "," << route << "\n";
1088             }
1089         }
1090         else
1091         {
1092             throw yp2::filter::FilterException
1093                 ("Bad element " 
1094                  + std::string((const char *) ptr->name)
1095                  + " in virt_db filter");
1096         }
1097     }
1098 }
1099
1100 static yp2::filter::Base* filter_creator()
1101 {
1102     return new yp2::filter::Multi;
1103 }
1104
1105 extern "C" {
1106     struct yp2_filter_struct yp2_filter_multi = {
1107         0,
1108         "multi",
1109         filter_creator
1110     };
1111 }
1112
1113
1114 /*
1115  * Local variables:
1116  * c-basic-offset: 4
1117  * indent-tabs-mode: nil
1118  * c-file-style: "stroustrup"
1119  * End:
1120  * vim: shiftwidth=4 tabstop=8 expandtab
1121  */