Merger scan returns diagnostics
[metaproxy-moved-to-github.git] / src / filter_multi.cpp
1 /* $Id: filter_multi.cpp,v 1.10 2006-01-18 14:36:15 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/thread.hpp>
13 #include <boost/thread/mutex.hpp>
14 #include <boost/thread/condition.hpp>
15 #include <boost/shared_ptr.hpp>
16
17 #include "util.hpp"
18 #include "filter_multi.hpp"
19
20 #include <yaz/zgdu.h>
21 #include <yaz/otherinfo.h>
22 #include <yaz/diagbib1.h>
23
24 #include <vector>
25 #include <algorithm>
26 #include <map>
27 #include <iostream>
28
29 namespace yf = yp2::filter;
30
31 namespace yp2 {
32     namespace filter {
33
34         struct Multi::BackendSet {
35             BackendPtr m_backend;
36             int m_count;
37             bool operator < (const BackendSet &k) const;
38             bool operator == (const BackendSet &k) const;
39         };
40         struct Multi::ScanTermInfo {
41             std::string m_norm_term;
42             std::string m_display_term;
43             int m_count;
44             bool operator < (const ScanTermInfo &) const;
45             bool operator == (const ScanTermInfo &) const;
46             Z_Entry *get_entry(ODR odr);
47         };
48         struct Multi::FrontendSet {
49             struct PresentJob {
50                 BackendPtr m_backend;
51                 int m_pos;
52                 int m_inside_pos;
53             };
54             FrontendSet(std::string setname);
55             FrontendSet();
56             ~FrontendSet();
57
58             void round_robin(int pos, int number, std::list<PresentJob> &job);
59
60             std::list<BackendSet> m_backend_sets;
61             std::string m_setname;
62         };
63         struct Multi::Backend {
64             PackagePtr m_package;
65             std::string m_backend_database;
66             std::string m_vhost;
67             std::string m_route;
68             void operator() (void);  // thread operation
69         };
70         struct Multi::Frontend {
71             Frontend(Rep *rep);
72             ~Frontend();
73             bool m_is_multi;
74             bool m_in_use;
75             std::list<BackendPtr> m_backend_list;
76             std::map<std::string,Multi::FrontendSet> m_sets;
77
78             void multi_move(std::list<BackendPtr> &blist);
79             void init(Package &package, Z_GDU *gdu);
80             void close(Package &package);
81             void search(Package &package, Z_APDU *apdu);
82             void present(Package &package, Z_APDU *apdu);
83             void scan1(Package &package, Z_APDU *apdu);
84             void scan2(Package &package, Z_APDU *apdu);
85             Rep *m_p;
86         };            
87         struct Multi::Map {
88             Map(std::list<std::string> hosts, std::string route);
89             Map();
90             std::list<std::string> m_hosts;
91             std::string m_route;
92         };
93         class Multi::Rep {
94             friend class Multi;
95             friend class Frontend;
96             
97             FrontendPtr get_frontend(Package &package);
98             void release_frontend(Package &package);
99         private:
100             boost::mutex m_sessions_mutex;
101             std::map<std::string, Multi::Map>m_maps;
102             std::map<std::string,std::string> m_target_route;
103             boost::mutex m_mutex;
104             boost::condition m_cond_session_ready;
105             std::map<yp2::Session, FrontendPtr> m_clients;
106         };
107     }
108 }
109
110 using namespace yp2;
111
112 bool yf::Multi::BackendSet::operator < (const BackendSet &k) const
113 {
114     return m_count < k.m_count;
115 }
116
117 yf::Multi::Frontend::Frontend(Rep *rep)
118 {
119     m_p = rep;
120     m_is_multi = false;
121 }
122
123 yf::Multi::Frontend::~Frontend()
124 {
125 }
126
127 yf::Multi::FrontendPtr yf::Multi::Rep::get_frontend(Package &package)
128 {
129     boost::mutex::scoped_lock lock(m_mutex);
130
131     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
132     
133     while(true)
134     {
135         it = m_clients.find(package.session());
136         if (it == m_clients.end())
137             break;
138         
139         if (!it->second->m_in_use)
140         {
141             it->second->m_in_use = true;
142             return it->second;
143         }
144         m_cond_session_ready.wait(lock);
145     }
146     FrontendPtr f(new Frontend(this));
147     m_clients[package.session()] = f;
148     f->m_in_use = true;
149     return f;
150 }
151
152 void yf::Multi::Rep::release_frontend(Package &package)
153 {
154     boost::mutex::scoped_lock lock(m_mutex);
155     std::map<yp2::Session,yf::Multi::FrontendPtr>::iterator it;
156     
157     it = m_clients.find(package.session());
158     if (it != m_clients.end())
159     {
160         if (package.session().is_closed())
161         {
162             it->second->close(package);
163             m_clients.erase(it);
164         }
165         else
166         {
167             it->second->m_in_use = false;
168         }
169         m_cond_session_ready.notify_all();
170     }
171 }
172
173 yf::Multi::FrontendSet::FrontendSet(std::string setname)
174     :  m_setname(setname)
175 {
176 }
177
178
179 yf::Multi::FrontendSet::FrontendSet()
180 {
181 }
182
183
184 yf::Multi::FrontendSet::~FrontendSet()
185 {
186 }
187
188 yf::Multi::Map::Map(std::list<std::string> hosts, std::string route)
189     : m_hosts(hosts), m_route(route) 
190 {
191 }
192
193 yf::Multi::Map::Map()
194 {
195 }
196
197 yf::Multi::Multi() : m_p(new Multi::Rep)
198 {
199 }
200
201 yf::Multi::~Multi() {
202 }
203
204
205 void yf::Multi::add_map_host2hosts(std::string host,
206                                    std::list<std::string> hosts,
207                                    std::string route)
208 {
209     m_p->m_maps[host] = Multi::Map(hosts, route);
210 }
211
212 void yf::Multi::Backend::operator() (void) 
213 {
214     m_package->move(m_route);
215 }
216
217
218 void yf::Multi::Frontend::close(Package &package)
219 {
220     std::list<BackendPtr>::const_iterator bit;
221     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
222     {
223         BackendPtr b = *bit;
224
225         b->m_package->copy_filter(package);
226         b->m_package->request() = (Z_GDU *) 0;
227         b->m_package->session().close();
228         b->m_package->move(b->m_route);
229     }
230 }
231
232 void yf::Multi::Frontend::multi_move(std::list<BackendPtr> &blist)
233 {
234     std::list<BackendPtr>::const_iterator bit;
235     boost::thread_group g;
236     for (bit = blist.begin(); bit != blist.end(); bit++)
237     {
238         g.add_thread(new boost::thread(**bit));
239     }
240     g.join_all();
241 }
242
243
244 void yf::Multi::FrontendSet::round_robin(int start, int number,
245                                          std::list<PresentJob> &jobs)
246 {
247     int fetched = 0;
248     int p = 1;
249     bool eof = true;
250
251     std::list<int> pos;
252     std::list<int> inside_pos;
253     std::list<BackendSet>::const_iterator bsit;
254     for (bsit = m_backend_sets.begin(); bsit != m_backend_sets.end(); bsit++)
255     {
256         pos.push_back(1);
257         inside_pos.push_back(0);
258     }
259
260     std::list<int>::iterator psit = pos.begin();
261     std::list<int>::iterator esit = inside_pos.begin();
262     bsit = m_backend_sets.begin();
263     while (fetched < number)
264     {
265         if (bsit == m_backend_sets.end())
266         {
267             psit = pos.begin();
268             esit = inside_pos.begin();
269             bsit = m_backend_sets.begin();
270             if (eof)
271                 break;
272             eof = true;
273         }
274         if (*psit <= bsit->m_count)
275         {
276             if (p >= start)
277             {
278                 PresentJob job;
279                 job.m_backend = bsit->m_backend;
280                 job.m_pos = *psit;
281                 job.m_inside_pos = *esit;
282                 jobs.push_back(job);
283                 (*esit)++;
284                 fetched++;
285             }
286             (*psit)++;
287             p++;
288             eof = false;
289         }
290         psit++;
291         esit++;
292         bsit++;
293     }
294 }
295
296 void yf::Multi::Frontend::init(Package &package, Z_GDU *gdu)
297 {
298     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
299
300     std::list<std::string> targets;
301
302     yp2::util::get_vhost_otherinfo(&req->otherInfo, false, targets);
303
304     if (targets.size() < 1)
305     {
306         package.move();
307         return;
308     }
309
310     std::list<std::string>::const_iterator t_it = targets.begin();
311     for (; t_it != targets.end(); t_it++)
312     {
313         Session s;
314         Backend *b = new Backend;
315         b->m_vhost = *t_it;
316
317         b->m_route = m_p->m_target_route[*t_it];
318         // b->m_route unset
319         b->m_package = PackagePtr(new Package(s, package.origin()));
320
321         m_backend_list.push_back(BackendPtr(b));
322     }
323     m_is_multi = true;
324
325     // create init request 
326     std::list<BackendPtr>::const_iterator bit;
327     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
328     {
329         yp2::odr odr;
330         BackendPtr b = *bit;
331         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
332         
333         std::list<std::string>vhost_one;
334         vhost_one.push_back(b->m_vhost);
335         yp2::util::set_vhost_otherinfo(&init_apdu->u.initRequest->otherInfo,
336                                        odr, vhost_one);
337
338         Z_InitRequest *req = init_apdu->u.initRequest;
339         
340         ODR_MASK_SET(req->options, Z_Options_search);
341         ODR_MASK_SET(req->options, Z_Options_present);
342         ODR_MASK_SET(req->options, Z_Options_namedResultSets);
343         ODR_MASK_SET(req->options, Z_Options_scan);
344         
345         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
346         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
347         ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
348         
349         b->m_package->request() = init_apdu;
350
351         b->m_package->copy_filter(package);
352     }
353     multi_move(m_backend_list);
354
355     // create the frontend init response based on each backend init response
356     yp2::odr odr;
357
358     Z_APDU *f_apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
359     Z_InitResponse *f_resp = f_apdu->u.initResponse;
360
361     ODR_MASK_SET(f_resp->options, Z_Options_search);
362     ODR_MASK_SET(f_resp->options, Z_Options_present);
363     ODR_MASK_SET(f_resp->options, Z_Options_namedResultSets);
364     
365     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_1);
366     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_2);
367     ODR_MASK_SET(f_resp->protocolVersion, Z_ProtocolVersion_3);
368
369     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
370     {
371         PackagePtr p = (*bit)->m_package;
372         
373         if (p->session().is_closed()) // if any backend closes, close frontend
374             package.session().close();
375         Z_GDU *gdu = p->response().get();
376         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
377             Z_APDU_initResponse)
378         {
379             int i;
380             Z_APDU *b_apdu = gdu->u.z3950;
381             Z_InitResponse *b_resp = b_apdu->u.initResponse;
382
383             // common options for all backends
384             for (i = 0; i <= Z_Options_stringSchema; i++)
385             {
386                 if (!ODR_MASK_GET(b_resp->options, i))
387                     ODR_MASK_CLEAR(f_resp->options, i);
388             }
389             // common protocol version
390             for (i = 0; i <= Z_ProtocolVersion_3; i++)
391                 if (!ODR_MASK_GET(b_resp->protocolVersion, i))
392                     ODR_MASK_CLEAR(f_resp->protocolVersion, i);
393             // reject if any of the backends reject
394             if (!*b_resp->result)
395                 *f_resp->result = 0;
396         }
397         else
398         {
399             // if any target does not return init return that (close or
400             // similar )
401             package.response() = p->response();
402             return;
403         }
404     }
405     package.response() = f_apdu;
406 }
407
408 void yf::Multi::Frontend::search(Package &package, Z_APDU *apdu_req)
409 {
410     // create search request 
411     Z_SearchRequest *req = apdu_req->u.searchRequest;
412
413     // save these for later
414     int smallSetUpperBound = *req->smallSetUpperBound;
415     int largeSetLowerBound = *req->largeSetLowerBound;
416     int mediumSetPresentNumber = *req->mediumSetPresentNumber;
417     
418     // they are altered now - to disable piggyback
419     *req->smallSetUpperBound = 0;
420     *req->largeSetLowerBound = 1;
421     *req->mediumSetPresentNumber = 1;
422
423     int default_num_db = req->num_databaseNames;
424     char **default_db = req->databaseNames;
425
426     std::list<BackendPtr>::const_iterator bit;
427     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
428     {
429         PackagePtr p = (*bit)->m_package;
430         yp2::odr odr;
431     
432         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
433                                                 &req->num_databaseNames,
434                                                 &req->databaseNames))
435         {
436             req->num_databaseNames = default_num_db;
437             req->databaseNames = default_db;
438         }
439         p->request() = apdu_req;
440         p->copy_filter(package);
441     }
442     multi_move(m_backend_list);
443
444     // look at each response
445     FrontendSet resultSet(std::string(req->resultSetName));
446
447     int result_set_size = 0;
448     Z_Records *z_records_diag = 0;  // no diagnostics (yet)
449     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
450     {
451         PackagePtr p = (*bit)->m_package;
452         
453         if (p->session().is_closed()) // if any backend closes, close frontend
454             package.session().close();
455         
456         Z_GDU *gdu = p->response().get();
457         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
458             Z_APDU_searchResponse)
459         {
460             Z_APDU *b_apdu = gdu->u.z3950;
461             Z_SearchResponse *b_resp = b_apdu->u.searchResponse;
462          
463             // see we get any errors (AKA diagnstics)
464             if (b_resp->records)
465             {
466                 if (b_resp->records->which == Z_Records_NSD
467                     || b_resp->records->which == Z_Records_multipleNSD)
468                     z_records_diag = b_resp->records;
469                 // we may set this multiple times (TOO BAD!)
470             }
471             BackendSet backendSet;
472             backendSet.m_backend = *bit;
473             backendSet.m_count = *b_resp->resultCount;
474             result_set_size += *b_resp->resultCount;
475             resultSet.m_backend_sets.push_back(backendSet);
476         }
477         else
478         {
479             // if any target does not return search response - return that 
480             package.response() = p->response();
481             return;
482         }
483     }
484
485     yp2::odr odr;
486     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
487     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
488
489     *f_resp->resultCount = result_set_size;
490     if (z_records_diag)
491     {
492         // search error
493         f_resp->records = z_records_diag;
494         package.response() = f_apdu;
495         return;
496     }
497     // assume OK
498     m_sets[resultSet.m_setname] = resultSet;
499
500     int number;
501     yp2::util::piggyback(smallSetUpperBound,
502                          largeSetLowerBound,
503                          mediumSetPresentNumber,
504                          result_set_size,
505                          number);
506     Package pp(package.session(), package.origin());
507     if (number > 0)
508     {
509         pp.copy_filter(package);
510         Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
511         Z_PresentRequest *p_req = p_apdu->u.presentRequest;
512         p_req->preferredRecordSyntax = req->preferredRecordSyntax;
513         p_req->resultSetId = req->resultSetName;
514         *p_req->resultSetStartPoint = 1;
515         *p_req->numberOfRecordsRequested = number;
516         pp.request() = p_apdu;
517         present(pp, p_apdu);
518         
519         if (pp.session().is_closed())
520             package.session().close();
521         
522         Z_GDU *gdu = pp.response().get();
523         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
524             Z_APDU_presentResponse)
525         {
526             Z_PresentResponse *p_res = gdu->u.z3950->u.presentResponse;
527             f_resp->records = p_res->records;
528             *f_resp->numberOfRecordsReturned = 
529                 *p_res->numberOfRecordsReturned;
530             *f_resp->nextResultSetPosition = 
531                 *p_res->nextResultSetPosition;
532         }
533         else 
534         {
535             package.response() = pp.response(); 
536             return;
537         }
538     }
539     package.response() = f_apdu; // in this scope because of p
540 }
541
542 void yf::Multi::Frontend::present(Package &package, Z_APDU *apdu_req)
543 {
544     // create present request 
545     Z_PresentRequest *req = apdu_req->u.presentRequest;
546
547     Sets_it it;
548     it = m_sets.find(std::string(req->resultSetId));
549     if (it == m_sets.end())
550     {
551         yp2::odr odr;
552         Z_APDU *apdu = 
553             odr.create_presentResponse(
554                 apdu_req,
555                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
556                 req->resultSetId);
557         package.response() = apdu;
558         return;
559     }
560     std::list<Multi::FrontendSet::PresentJob> jobs;
561     int start = *req->resultSetStartPoint;
562     int number = *req->numberOfRecordsRequested;
563     it->second.round_robin(start, number, jobs);
564
565     std::list<BackendPtr> present_backend_list;
566
567     std::list<BackendSet>::const_iterator bsit;
568     bsit = it->second.m_backend_sets.begin();
569     for (; bsit != it->second.m_backend_sets.end(); bsit++)
570     {
571         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
572         int start = -1;
573         int end = -1;
574         
575         for (jit = jobs.begin(); jit != jobs.end(); jit++)
576         {
577             if (jit->m_backend == bsit->m_backend)
578             {
579                 if (start == -1 || jit->m_pos < start)
580                     start = jit->m_pos;
581                 if (end == -1 || jit->m_pos > end)
582                     end = jit->m_pos;
583             }
584         }
585         if (start != -1)
586         {
587             PackagePtr p = bsit->m_backend->m_package;
588
589             *req->resultSetStartPoint = start;
590             *req->numberOfRecordsRequested = end - start + 1;
591             
592             p->request() = apdu_req;
593             p->copy_filter(package);
594
595             present_backend_list.push_back(bsit->m_backend);
596         }
597     }
598     multi_move(present_backend_list);
599
600     // look at each response
601     Z_Records *z_records_diag = 0;
602
603     std::list<BackendPtr>::const_iterator pbit = present_backend_list.begin();
604     for (; pbit != present_backend_list.end(); pbit++)
605     {
606         PackagePtr p = (*pbit)->m_package;
607         
608         if (p->session().is_closed()) // if any backend closes, close frontend
609             package.session().close();
610         
611         Z_GDU *gdu = p->response().get();
612         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
613             Z_APDU_presentResponse)
614         {
615             Z_APDU *b_apdu = gdu->u.z3950;
616             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
617          
618             // see we get any errors (AKA diagnstics)
619             if (b_resp->records)
620             {
621                 if (b_resp->records->which != Z_Records_DBOSD)
622                     z_records_diag = b_resp->records;
623                 // we may set this multiple times (TOO BAD!)
624             }
625         }
626         else
627         {
628             // if any target does not return present response - return that 
629             package.response() = p->response();
630             return;
631         }
632     }
633
634     yp2::odr odr;
635     Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 0, 0);
636     Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
637
638     if (z_records_diag)
639     {
640         f_resp->records = z_records_diag;
641         *f_resp->presentStatus = Z_PresentStatus_failure;
642     }
643     else
644     {
645         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
646         Z_Records * records = f_resp->records;
647         records->which = Z_Records_DBOSD;
648         records->u.databaseOrSurDiagnostics =
649             (Z_NamePlusRecordList *)
650             odr_malloc(odr, sizeof(Z_NamePlusRecordList));
651         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
652         nprl->num_records = jobs.size();
653         nprl->records = (Z_NamePlusRecord**)
654             odr_malloc(odr, sizeof(Z_NamePlusRecord *) * nprl->num_records);
655         int i = 0;
656         std::list<Multi::FrontendSet::PresentJob>::const_iterator jit;
657         for (jit = jobs.begin(); jit != jobs.end(); jit++)
658         {
659             PackagePtr p = jit->m_backend->m_package;
660             
661             Z_GDU *gdu = p->response().get();
662             Z_APDU *b_apdu = gdu->u.z3950;
663             Z_PresentResponse *b_resp = b_apdu->u.presentResponse;
664
665             nprl->records[i++] =
666                 b_resp->records->u.databaseOrSurDiagnostics->
667                 records[jit->m_inside_pos];
668         }
669         *f_resp->nextResultSetPosition = start + i;
670         *f_resp->numberOfRecordsReturned = i;
671     }
672     package.response() = f_apdu;
673 }
674
675 void yf::Multi::Frontend::scan1(Package &package, Z_APDU *apdu_req)
676 {
677     if (m_backend_list.size() > 1)
678     {
679         yp2::odr odr;
680         Z_APDU *f_apdu = 
681             odr.create_scanResponse(
682                 apdu_req, YAZ_BIB1_COMBI_OF_SPECIFIED_DATABASES_UNSUPP, 0);
683         package.response() = f_apdu;
684         return;
685     }
686     Z_ScanRequest *req = apdu_req->u.scanRequest;
687
688     int default_num_db = req->num_databaseNames;
689     char **default_db = req->databaseNames;
690
691     std::list<BackendPtr>::const_iterator bit;
692     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
693     {
694         PackagePtr p = (*bit)->m_package;
695         yp2::odr odr;
696     
697         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
698                                                 &req->num_databaseNames,
699                                                 &req->databaseNames))
700         {
701             req->num_databaseNames = default_num_db;
702             req->databaseNames = default_db;
703         }
704         p->request() = apdu_req;
705         p->copy_filter(package);
706     }
707     multi_move(m_backend_list);
708
709     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
710     {
711         PackagePtr p = (*bit)->m_package;
712         
713         if (p->session().is_closed()) // if any backend closes, close frontend
714             package.session().close();
715         
716         Z_GDU *gdu = p->response().get();
717         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
718             Z_APDU_scanResponse)
719         {
720             package.response() = p->response();
721             break;
722         }
723         else
724         {
725             // if any target does not return scan response - return that 
726             package.response() = p->response();
727             return;
728         }
729     }
730 }
731
732 bool yf::Multi::ScanTermInfo::operator < (const ScanTermInfo &k) const
733 {
734     return m_norm_term < k.m_norm_term;
735 }
736
737 bool yf::Multi::ScanTermInfo::operator == (const ScanTermInfo &k) const
738 {
739     return m_norm_term == k.m_norm_term;
740 }
741
742 Z_Entry *yf::Multi::ScanTermInfo::get_entry(ODR odr)
743 {
744     Z_Entry *e = (Z_Entry *)odr_malloc(odr, sizeof(*e));
745     e->which = Z_Entry_termInfo;
746     Z_TermInfo *t;
747     t = e->u.termInfo = (Z_TermInfo *) odr_malloc(odr, sizeof(*t));
748     t->suggestedAttributes = 0;
749     t->displayTerm = 0;
750     t->alternativeTerm = 0;
751     t->byAttributes = 0;
752     t->otherTermInfo = 0;
753     t->globalOccurrences = odr_intdup(odr, m_count);
754     t->term = (Z_Term *)
755         odr_malloc(odr, sizeof(*t->term));
756     t->term->which = Z_Term_general;
757     Odr_oct *o;
758     t->term->u.general = o = (Odr_oct *)odr_malloc(odr, sizeof(Odr_oct));
759
760     o->len = o->size = m_norm_term.size();
761     o->buf = (unsigned char *) odr_malloc(odr, o->len);
762     memcpy(o->buf, m_norm_term.c_str(), o->len);
763     return e;
764 }
765
766 void yf::Multi::Frontend::scan2(Package &package, Z_APDU *apdu_req)
767 {
768     Z_ScanRequest *req = apdu_req->u.scanRequest;
769     int no_targets = 0;
770
771     int default_num_db = req->num_databaseNames;
772     char **default_db = req->databaseNames;
773
774     std::list<BackendPtr>::const_iterator bit;
775     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
776     {
777         PackagePtr p = (*bit)->m_package;
778         yp2::odr odr;
779     
780         if (!yp2::util::set_databases_from_zurl(odr, (*bit)->m_vhost,
781                                                 &req->num_databaseNames,
782                                                 &req->databaseNames))
783         {
784             req->num_databaseNames = default_num_db;
785             req->databaseNames = default_db;
786         }
787         p->request() = apdu_req;
788         p->copy_filter(package);
789         no_targets++;
790     }
791     multi_move(m_backend_list);
792
793     ScanTermInfoList entries_before;
794     ScanTermInfoList entries_after;
795     int no_before = 0;
796     int no_after = 0;
797
798     for (bit = m_backend_list.begin(); bit != m_backend_list.end(); bit++)
799     {
800         PackagePtr p = (*bit)->m_package;
801         
802         if (p->session().is_closed()) // if any backend closes, close frontend
803             package.session().close();
804         
805         Z_GDU *gdu = p->response().get();
806         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
807             Z_APDU_scanResponse)
808         {
809             Z_ScanResponse *res = gdu->u.z3950->u.scanResponse;
810
811             if (res->entries && res->entries->nonsurrogateDiagnostics)
812             {
813                 // failure
814                 yp2::odr odr;
815                 Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, 0);
816                 Z_ScanResponse *f_res = f_apdu->u.scanResponse;
817
818                 f_res->entries->nonsurrogateDiagnostics = 
819                     res->entries->nonsurrogateDiagnostics;
820                 f_res->entries->num_nonsurrogateDiagnostics = 
821                     res->entries->num_nonsurrogateDiagnostics;
822
823                 package.response() = f_apdu;
824                 return;
825             }
826
827             if (res->entries && res->entries->entries)
828             {
829                 Z_Entry **entries = res->entries->entries;
830                 int num_entries = res->entries->num_entries;
831                 int position = 1;
832                 if (req->preferredPositionInResponse)
833                     position = *req->preferredPositionInResponse;
834                 if (res->positionOfTerm)
835                     position = *res->positionOfTerm;
836
837                 // before
838                 int i;
839                 for (i = 0; i<position-1 && i<num_entries; i++)
840                 {
841                     Z_Entry *ent = entries[i];
842
843                     if (ent->which == Z_Entry_termInfo)
844                     {
845                         ScanTermInfo my;
846
847                         int *occur = ent->u.termInfo->globalOccurrences;
848                         my.m_count = occur ? *occur : 0;
849
850                         if (ent->u.termInfo->term->which == Z_Term_general)
851                         {
852                             my.m_norm_term = std::string(
853                                 (const char *)
854                                 ent->u.termInfo->term->u.general->buf,
855                                 ent->u.termInfo->term->u.general->len);
856                         }
857                         if (my.m_norm_term.length())
858                         {
859                             ScanTermInfoList::iterator it = 
860                                 entries_before.begin();
861                             while (it != entries_before.end() && my <*it)
862                                 it++;
863                             if (my == *it)
864                             {
865                                 it->m_count += my.m_count;
866                             }
867                             else
868                             {
869                                 entries_before.insert(it, my);
870                                 no_before++;
871                             }
872                         }
873                     }
874                 }
875                 // after
876                 for (i = position-1; i<num_entries; i++)
877                 {
878                     Z_Entry *ent = entries[i];
879
880                     if (ent->which == Z_Entry_termInfo)
881                     {
882                         ScanTermInfo my;
883
884                         int *occur = ent->u.termInfo->globalOccurrences;
885                         my.m_count = occur ? *occur : 0;
886
887                         if (ent->u.termInfo->term->which == Z_Term_general)
888                         {
889                             my.m_norm_term = std::string(
890                                 (const char *)
891                                 ent->u.termInfo->term->u.general->buf,
892                                 ent->u.termInfo->term->u.general->len);
893                         }
894                         if (my.m_norm_term.length())
895                         {
896                             ScanTermInfoList::iterator it = 
897                                 entries_after.begin();
898                             while (it != entries_after.end() && *it < my)
899                                 it++;
900                             if (my == *it)
901                             {
902                                 it->m_count += my.m_count;
903                             }
904                             else
905                             {
906                                 entries_after.insert(it, my);
907                                 no_after++;
908                             }
909                         }
910                     }
911                 }
912
913             }                
914         }
915         else
916         {
917             // if any target does not return scan response - return that 
918             package.response() = p->response();
919             return;
920         }
921     }
922
923
924     if (false)
925     {
926         std::cout << "BEFORE\n";
927         ScanTermInfoList::iterator it = entries_before.begin();
928         for(; it != entries_before.end(); it++)
929         {
930             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
931         }
932         
933         std::cout << "AFTER\n";
934         it = entries_after.begin();
935         for(; it != entries_after.end(); it++)
936         {
937             std::cout << " " << it->m_norm_term << " " << it->m_count << "\n";
938         }
939     }
940
941     if (false)
942     {
943         yp2::odr odr;
944         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 1, "not implemented");
945         package.response() = f_apdu;
946     }
947     else
948     {
949         yp2::odr odr;
950         Z_APDU *f_apdu = odr.create_scanResponse(apdu_req, 0, 0);
951         Z_ScanResponse *resp = f_apdu->u.scanResponse;
952         
953         int number_returned = *req->numberOfTermsRequested;
954         int position_returned = *req->preferredPositionInResponse;
955         
956         resp->positionOfTerm = odr_intdup(odr, position_returned);
957         resp->numberOfEntriesReturned = odr_intdup(odr, number_returned);
958         
959         resp->entries->num_entries = number_returned;
960         resp->entries->entries = (Z_Entry**)
961             odr_malloc(odr, sizeof(Z_Entry*) * number_returned);
962         int i;
963         
964         ScanTermInfoList::iterator it = entries_before.begin();
965         for (i = 0; i<position_returned-1; i++, it++)
966         {
967             resp->entries->entries[i] = it->get_entry(odr);
968         }
969         it = entries_after.begin();
970         for (i = position_returned-1; i<number_returned; i++, it++)
971         {
972             resp->entries->entries[i] = it->get_entry(odr);
973         }
974         package.response() = f_apdu;
975     }
976 }
977
978
979 void yf::Multi::process(Package &package) const
980 {
981     FrontendPtr f = m_p->get_frontend(package);
982
983     Z_GDU *gdu = package.request().get();
984     
985     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
986         Z_APDU_initRequest && !f->m_is_multi)
987     {
988         f->init(package, gdu);
989     }
990     else if (!f->m_is_multi)
991         package.move();
992     else if (gdu && gdu->which == Z_GDU_Z3950)
993     {
994         Z_APDU *apdu = gdu->u.z3950;
995         if (apdu->which == Z_APDU_initRequest)
996         {
997             yp2::odr odr;
998             
999             package.response() = odr.create_close(
1000                 apdu,
1001                 Z_Close_protocolError,
1002                 "double init");
1003             
1004             package.session().close();
1005         }
1006         else if (apdu->which == Z_APDU_searchRequest)
1007         {
1008             f->search(package, apdu);
1009         }
1010         else if (apdu->which == Z_APDU_presentRequest)
1011         {
1012             f->present(package, apdu);
1013         }
1014         else if (apdu->which == Z_APDU_scanRequest)
1015         {
1016             f->scan2(package, apdu);
1017         }
1018         else
1019         {
1020             yp2::odr odr;
1021             
1022             package.response() = odr.create_close(
1023                 apdu, Z_Close_protocolError,
1024                 "unsupported APDU in filter multi");
1025             
1026             package.session().close();
1027         }
1028     }
1029     m_p->release_frontend(package);
1030 }
1031
1032 void yp2::filter::Multi::configure(const xmlNode * ptr)
1033 {
1034     for (ptr = ptr->children; ptr; ptr = ptr->next)
1035     {
1036         if (ptr->type != XML_ELEMENT_NODE)
1037             continue;
1038         if (!strcmp((const char *) ptr->name, "target"))
1039         {
1040             std::string route = yp2::xml::get_route(ptr);
1041             std::string target = yp2::xml::get_text(ptr);
1042             std::cout << "route=" << route << " target=" << target << "\n";
1043             m_p->m_target_route[target] = route;
1044         }
1045         else if (!strcmp((const char *) ptr->name, "virtual"))
1046         {
1047             std::list<std::string> targets;
1048             std::string vhost;
1049             xmlNode *v_node = ptr->children;
1050             for (; v_node; v_node = v_node->next)
1051             {
1052                 if (v_node->type != XML_ELEMENT_NODE)
1053                     continue;
1054                 
1055                 if (yp2::xml::is_element_yp2(v_node, "vhost"))
1056                     vhost = yp2::xml::get_text(v_node);
1057                 else if (yp2::xml::is_element_yp2(v_node, "target"))
1058                     targets.push_back(yp2::xml::get_text(v_node));
1059                 else
1060                     throw yp2::filter::FilterException
1061                         ("Bad element " 
1062                          + std::string((const char *) v_node->name)
1063                          + " in virtual section"
1064                             );
1065             }
1066             std::string route = yp2::xml::get_route(ptr);
1067             add_map_host2hosts(vhost, targets, route);
1068             std::list<std::string>::const_iterator it;
1069             for (it = targets.begin(); it != targets.end(); it++)
1070             {
1071                 std::cout << "Add " << vhost << "->" << *it
1072                           << "," << route << "\n";
1073             }
1074         }
1075         else
1076         {
1077             throw yp2::filter::FilterException
1078                 ("Bad element " 
1079                  + std::string((const char *) ptr->name)
1080                  + " in virt_db filter");
1081         }
1082     }
1083 }
1084
1085 static yp2::filter::Base* filter_creator()
1086 {
1087     return new yp2::filter::Multi;
1088 }
1089
1090 extern "C" {
1091     struct yp2_filter_struct yp2_filter_multi = {
1092         0,
1093         "multi",
1094         filter_creator
1095     };
1096 }
1097
1098
1099 /*
1100  * Local variables:
1101  * c-basic-offset: 4
1102  * indent-tabs-mode: nil
1103  * c-file-style: "stroustrup"
1104  * End:
1105  * vim: shiftwidth=4 tabstop=8 expandtab
1106  */