Updated copyright headers. Omit CVS ID.
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2008 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include "filter.hpp"
22 #include "package.hpp"
23
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30
31 #include "util.hpp"
32 #include "filter_session_shared.hpp"
33
34 #include <yaz/log.h>
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yazpp/z-query.h>
39 #include <map>
40 #include <iostream>
41 #include <time.h>
42
43 namespace mp = metaproxy_1;
44 namespace yf = metaproxy_1::filter;
45
46 namespace metaproxy_1 {
47
48     namespace filter {
49         class SessionShared::InitKey {
50         public:
51             bool operator < (const SessionShared::InitKey &k) const;
52             InitKey(Z_InitRequest *req);
53             InitKey(const InitKey &);
54             ~InitKey();
55         private:
56             InitKey &operator = (const InitKey &k);
57             char *m_idAuthentication_buf;
58             int m_idAuthentication_size;
59             char *m_otherInfo_buf;
60             int m_otherInfo_size;
61             ODR m_odr;
62         };
63         class SessionShared::Worker {
64         public:
65             Worker(SessionShared::Rep *rep);
66             void operator() (void);
67         private:
68             SessionShared::Rep *m_p;
69         };
70         class SessionShared::BackendSet {
71         public:
72             std::string m_result_set_id;
73             Databases m_databases;
74             int m_result_set_size;
75             yazpp_1::Yaz_Z_Query m_query;
76             time_t m_time_last_use;
77             void timestamp();
78             BackendSet(
79                 const std::string &result_set_id,
80                 const Databases &databases,
81                 const yazpp_1::Yaz_Z_Query &query);
82             bool search(
83                 Package &frontend_package,
84                 const Z_APDU *apdu_req,
85                 const BackendInstancePtr bp);
86         };
87         class SessionShared::BackendInstance {
88             friend class Rep;
89             friend class BackendClass;
90             friend class BackendSet;
91         public:
92             mp::Session m_session;
93             BackendSetList m_sets;
94             bool m_in_use;
95             int m_sequence_this;
96             int m_result_set_sequence;
97             time_t m_time_last_use;
98             mp::Package * m_close_package;
99             ~BackendInstance();
100         };
101         class SessionShared::BackendClass : boost::noncopyable {
102             friend class Rep;
103             friend struct Frontend;
104             bool m_named_result_sets;
105             BackendInstanceList m_backend_list;
106             BackendInstancePtr create_backend(const Package &package);
107             void remove_backend(BackendInstancePtr b);
108             BackendInstancePtr get_backend(const Package &package);
109             void use_backend(BackendInstancePtr b);
110             void release_backend(BackendInstancePtr b);
111             void expire();
112             yazpp_1::GDU m_init_request;
113             yazpp_1::GDU m_init_response;
114             boost::mutex m_mutex_backend_class;
115             int m_sequence_top;
116             time_t m_backend_set_ttl;
117             time_t m_backend_expiry_ttl;
118             size_t m_backend_set_max;
119         public:
120             BackendClass(const yazpp_1::GDU &init_request,
121                          int resultset_ttl,
122                          int resultset_max,
123                          int session_ttl);
124             ~BackendClass();
125         };
126         class SessionShared::FrontendSet {
127             Databases m_databases;
128             yazpp_1::Yaz_Z_Query m_query;
129         public:
130             const Databases &get_databases();
131             const yazpp_1::Yaz_Z_Query &get_query();
132             FrontendSet(
133                 const Databases &databases,
134                 const yazpp_1::Yaz_Z_Query &query);
135             FrontendSet();
136         };
137         struct SessionShared::Frontend {
138             Frontend(Rep *rep);
139             ~Frontend();
140             bool m_is_virtual;
141             bool m_in_use;
142             Z_Options m_init_options;
143             void search(Package &package, Z_APDU *apdu);
144             void present(Package &package, Z_APDU *apdu);
145             void scan(Package &package, Z_APDU *apdu);
146
147             void get_set(mp::Package &package,
148                          const Z_APDU *apdu_req,
149                          const Databases &databases,
150                          yazpp_1::Yaz_Z_Query &query,
151                          BackendInstancePtr &found_backend,
152                          BackendSetPtr &found_set);
153             void override_set(BackendInstancePtr &found_backend,
154                               std::string &result_set_id);
155
156             Rep *m_p;
157             BackendClassPtr m_backend_class;
158             FrontendSets m_frontend_sets;
159         };            
160         class SessionShared::Rep {
161             friend class SessionShared;
162             friend struct Frontend;
163             
164             FrontendPtr get_frontend(Package &package);
165             void release_frontend(Package &package);
166             Rep();
167         public:
168             void expire();
169         private:
170             void init(Package &package, const Z_GDU *gdu,
171                       FrontendPtr frontend);
172             boost::mutex m_mutex;
173             boost::condition m_cond_session_ready;
174             std::map<mp::Session, FrontendPtr> m_clients;
175
176             BackendClassMap m_backend_map;
177             boost::mutex m_mutex_backend_map;
178             boost::thread_group m_thrds;
179             int m_resultset_ttl;
180             int m_resultset_max;
181             int m_session_ttl;
182         };
183     }
184 }
185
186 yf::SessionShared::FrontendSet::FrontendSet(
187     const Databases &databases,
188     const yazpp_1::Yaz_Z_Query &query)
189     : m_databases(databases), m_query(query)
190 {
191 }
192
193 const yf::SessionShared::Databases & 
194 yf::SessionShared::FrontendSet::get_databases()
195 {
196     return m_databases;
197 }
198
199 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
200 {
201     return m_query;
202 }
203
204 yf::SessionShared::InitKey::InitKey(const InitKey &k)
205 {
206     m_odr = odr_createmem(ODR_ENCODE);
207     
208     m_idAuthentication_size =  k.m_idAuthentication_size;
209     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
210     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
211            m_idAuthentication_size);
212
213     m_otherInfo_size =  k.m_otherInfo_size;
214     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
215     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
216            m_otherInfo_size);
217 }
218
219 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
220 {
221     m_odr = odr_createmem(ODR_ENCODE);
222
223     Z_IdAuthentication *t = req->idAuthentication;
224     z_IdAuthentication(m_odr, &t, 1, 0);
225     m_idAuthentication_buf =
226         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
227
228     Z_OtherInformation *o = req->otherInfo;
229     z_OtherInformation(m_odr, &o, 1, 0);
230     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
231 }
232
233 yf::SessionShared::InitKey::~InitKey()
234 {
235     odr_destroy(m_odr);
236 }
237
238 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
239     const 
240 {
241     int c;
242     c = mp::util::memcmp2(
243         (void*) m_idAuthentication_buf, m_idAuthentication_size,
244         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
245     if (c < 0)
246         return true;
247     else if (c > 0)
248         return false;
249
250     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
251                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
252     if (c < 0)
253         return true;
254     else if (c > 0)
255         return false;
256     return false;
257 }
258
259 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
260 {
261     boost::mutex::scoped_lock lock(m_mutex_backend_class);
262     b->m_in_use = false;
263 }
264
265
266 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
267 {
268     BackendInstanceList::iterator it = m_backend_list.begin();
269     
270     while (it != m_backend_list.end())
271     {
272         if (*it == b)
273             it = m_backend_list.erase(it);
274         else
275             it++;
276     }
277 }
278
279
280
281 yf::SessionShared::BackendInstancePtr 
282 yf::SessionShared::BackendClass::get_backend(
283     const mp::Package &frontend_package)
284 {
285     {
286         boost::mutex::scoped_lock lock(m_mutex_backend_class);
287         
288         BackendInstanceList::const_iterator it = m_backend_list.begin();
289         
290         BackendInstancePtr backend1; // null
291         
292         for (; it != m_backend_list.end(); it++)
293         {
294             if (!(*it)->m_in_use)
295             {
296                 if (!backend1 
297                     || (*it)->m_sequence_this < backend1->m_sequence_this)
298                     backend1 = *it;
299             }
300         }
301         if (backend1)
302         {
303             use_backend(backend1);
304             return backend1;
305         }
306     }
307     return create_backend(frontend_package);
308 }
309
310 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
311 {
312     backend->m_in_use = true;
313     time(&backend->m_time_last_use);
314     backend->m_sequence_this = m_sequence_top++;
315 }
316
317 yf::SessionShared::BackendInstance::~BackendInstance()
318 {
319     delete m_close_package;
320 }
321
322 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
323     const mp::Package &frontend_package)
324 {
325     BackendInstancePtr bp(new BackendInstance);
326     BackendInstancePtr null;
327
328     bp->m_close_package =
329         new mp::Package(bp->m_session, frontend_package.origin());
330     bp->m_close_package->copy_filter(frontend_package);
331
332     Package init_package(bp->m_session, frontend_package.origin());
333
334     init_package.copy_filter(frontend_package);
335
336     yazpp_1::GDU actual_init_request = m_init_request;
337     Z_GDU *init_pdu = actual_init_request.get();
338
339     assert(init_pdu->which == Z_GDU_Z3950);
340     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
341
342     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
343     ODR_MASK_ZERO(req->options);
344
345     ODR_MASK_SET(req->options, Z_Options_search);
346     ODR_MASK_SET(req->options, Z_Options_present);
347     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
348     ODR_MASK_SET(req->options, Z_Options_scan);
349
350     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
351     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
352     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
353
354     init_package.request() = init_pdu;
355
356     init_package.move();
357
358     boost::mutex::scoped_lock lock(m_mutex_backend_class);
359
360     m_named_result_sets = false;
361     Z_GDU *gdu = init_package.response().get();
362     if (!init_package.session().is_closed()
363         && gdu && gdu->which == Z_GDU_Z3950 
364         && gdu->u.z3950->which == Z_APDU_initResponse)
365     {
366         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
367         if (!*res->result)
368             return null;
369         m_init_response = gdu->u.z3950;
370         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
371         {
372             m_named_result_sets = true;
373         }
374     }
375     else
376     {
377         // did not receive an init response or closed
378         return null;
379     }
380     bp->m_in_use = true;
381     time(&bp->m_time_last_use);
382     bp->m_sequence_this = 0;
383     bp->m_result_set_sequence = 0;
384     m_backend_list.push_back(bp);
385
386     return bp;
387 }
388
389
390 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
391                                               int resultset_ttl,
392                                               int resultset_max,
393                                               int session_ttl)
394     : m_named_result_sets(false), m_init_request(init_request),
395       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
396       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
397 {}
398
399 yf::SessionShared::BackendClass::~BackendClass()
400 {}
401
402 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
403                                   FrontendPtr frontend)
404 {
405     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
406
407     frontend->m_is_virtual = true;
408     frontend->m_init_options = *req->options;
409     InitKey k(req);
410     {
411         boost::mutex::scoped_lock lock(m_mutex_backend_map);
412         BackendClassMap::const_iterator it;
413         it = m_backend_map.find(k);
414         if (it == m_backend_map.end())
415         {
416             BackendClassPtr b(new BackendClass(gdu->u.z3950,
417                                                m_resultset_ttl,
418                                                m_resultset_max,
419                                                m_session_ttl));
420             m_backend_map[k] = b;
421             frontend->m_backend_class = b;
422         }
423         else
424         {
425             frontend->m_backend_class = it->second;            
426         }
427     }
428     BackendClassPtr bc = frontend->m_backend_class;
429     BackendInstancePtr backend = bc->get_backend(package);
430     
431     mp::odr odr;
432     if (!backend)
433     {
434         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
435         *apdu->u.initResponse->result = 0;
436         package.response() = apdu;
437         package.session().close();
438     }
439     else
440     {
441         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
442         yazpp_1::GDU init_response = bc->m_init_response;
443         Z_GDU *response_gdu = init_response.get();
444         mp::util::transfer_referenceId(odr, gdu->u.z3950,
445                                        response_gdu->u.z3950);
446
447         Z_Options *server_options =
448             response_gdu->u.z3950->u.initResponse->options;
449         Z_Options *client_options = &frontend->m_init_options;
450
451         int i;
452         for (i = 0; i<30; i++)
453             if (!ODR_MASK_GET(client_options, i))
454                 ODR_MASK_CLEAR(server_options, i);
455         package.response() = init_response;
456     }
457     if (backend)
458         bc->release_backend(backend);
459 }
460
461 void yf::SessionShared::BackendSet::timestamp()
462 {
463     time(&m_time_last_use);
464 }
465
466 yf::SessionShared::BackendSet::BackendSet(
467     const std::string &result_set_id,
468     const Databases &databases,
469     const yazpp_1::Yaz_Z_Query &query) :
470     m_result_set_id(result_set_id),
471     m_databases(databases), m_result_set_size(0), m_query(query) 
472 {
473     timestamp();
474 }
475
476 bool yf::SessionShared::BackendSet::search(
477     mp::Package &frontend_package,
478     const Z_APDU *frontend_apdu,
479     const BackendInstancePtr bp)
480 {
481     Package search_package(bp->m_session, frontend_package.origin());
482
483     search_package.copy_filter(frontend_package);
484
485     mp::odr odr;
486     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
487     Z_SearchRequest *req = apdu_req->u.searchRequest;
488
489     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
490     req->query = m_query.get_Z_Query();
491
492     req->num_databaseNames = m_databases.size();
493     req->databaseNames = (char**) 
494         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
495     Databases::const_iterator it = m_databases.begin();
496     size_t i = 0;
497     for (; it != m_databases.end(); it++)
498         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
499
500     search_package.request() = apdu_req;
501
502     search_package.move();
503     
504     Z_Records *z_records_diag = 0;
505     Z_GDU *gdu = search_package.response().get();
506     if (!search_package.session().is_closed()
507         && gdu && gdu->which == Z_GDU_Z3950 
508         && gdu->u.z3950->which == Z_APDU_searchResponse)
509     {
510         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
511         if (b_resp->records)
512         {
513             if (b_resp->records->which == Z_Records_NSD 
514                 || b_resp->records->which == Z_Records_multipleNSD)
515                 z_records_diag = b_resp->records;
516         }
517         if (z_records_diag)
518         {
519             if (frontend_apdu->which == Z_APDU_searchRequest)
520             {
521                 Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu, 
522                                                            0, 0);
523                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
524                 f_resp->records = z_records_diag;
525                 frontend_package.response() = f_apdu;
526                 return false;
527             }
528             if (frontend_apdu->which == Z_APDU_presentRequest)
529             {
530                 Z_APDU *f_apdu = odr.create_presentResponse(frontend_apdu, 
531                                                             0, 0);
532                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
533                 f_resp->records = z_records_diag;
534                 frontend_package.response() = f_apdu;
535                 return false;
536             }
537         }
538         m_result_set_size = *b_resp->resultCount;
539         return true;
540     }
541     Z_APDU *f_apdu = 0;
542     if (frontend_apdu->which == Z_APDU_searchRequest)
543         f_apdu = odr.create_searchResponse(
544             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
545     else if (frontend_apdu->which == Z_APDU_presentRequest)
546         f_apdu = odr.create_presentResponse(
547             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
548     else
549         f_apdu = odr.create_close(
550             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
551     frontend_package.response() = f_apdu;
552     return false;
553 }
554
555 void yf::SessionShared::Frontend::override_set(
556     BackendInstancePtr &found_backend,
557     std::string &result_set_id)
558 {
559     BackendClassPtr bc = m_backend_class;
560     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
561     time_t now;
562     time(&now);
563     
564     for (; it != bc->m_backend_list.end(); it++)
565     {
566         if (!(*it)->m_in_use)
567         {
568             BackendSetList::iterator set_it = (*it)->m_sets.begin();
569             for (; set_it != (*it)->m_sets.end(); set_it++)
570             {
571                 if (now >= (*set_it)->m_time_last_use &&
572                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
573                 {
574                     found_backend = *it;
575                     result_set_id = (*set_it)->m_result_set_id;
576                     found_backend->m_sets.erase(set_it);
577                     return;
578                 }
579             }
580         }
581     }
582     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
583     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
584     {
585         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
586         {
587             found_backend = *it;
588             if (bc->m_named_result_sets)
589             {
590                 result_set_id = boost::io::str(
591                     boost::format("%1%") % 
592                     found_backend->m_result_set_sequence);
593                 found_backend->m_result_set_sequence++;
594             }
595             else
596                 result_set_id = "default";
597             return;
598         }
599     }
600 }
601
602 void yf::SessionShared::Frontend::get_set(mp::Package &package,
603                                           const Z_APDU *apdu_req,
604                                           const Databases &databases,
605                                           yazpp_1::Yaz_Z_Query &query,
606                                           BackendInstancePtr &found_backend,
607                                           BackendSetPtr &found_set)
608 {
609     std::string result_set_id;
610     BackendClassPtr bc = m_backend_class;
611     {
612         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
613      
614         // look at each backend and see if we have a similar search
615         BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
616         
617         for (; it != bc->m_backend_list.end(); it++)
618         {
619             if (!(*it)->m_in_use)
620             {
621                 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
622                 for (; set_it != (*it)->m_sets.end(); set_it++)
623                 {
624                     if ((*set_it)->m_databases == databases 
625                         && query.match(&(*set_it)->m_query))
626                     {
627                         found_set = *set_it;
628                         found_backend = *it;
629                         bc->use_backend(found_backend);
630                         found_set->timestamp();
631                         // found matching set. No need to search again
632                         return;
633                     }
634                 }
635             }
636         }
637         override_set(found_backend, result_set_id);
638         if (found_backend)
639             bc->use_backend(found_backend);
640     }
641     if (!found_backend)
642     {
643         // create a new backend set (and new set)
644         found_backend = bc->create_backend(package);
645
646         if (!found_backend)
647         {
648             Z_APDU *f_apdu = 0;
649             mp::odr odr;
650             if (apdu_req->which == Z_APDU_searchRequest)
651             {
652                 f_apdu = odr.create_searchResponse(
653                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
654             }
655             else if (apdu_req->which == Z_APDU_presentRequest)
656             {
657                 f_apdu = odr.create_presentResponse(
658                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
659             }
660             else
661             {
662                 f_apdu = odr.create_close(
663                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
664             }
665             package.response() = f_apdu;
666             return;
667         }
668         if (bc->m_named_result_sets)
669         {
670             result_set_id = boost::io::str(
671                 boost::format("%1%") % found_backend->m_result_set_sequence);
672             found_backend->m_result_set_sequence++;
673         }
674         else
675             result_set_id = "default";
676     }
677     // we must search ...
678     BackendSetPtr new_set(new BackendSet(result_set_id,
679                                          databases, query));
680     if (!new_set->search(package, apdu_req, found_backend))
681     {
682         bc->remove_backend(found_backend);
683         return; // search error 
684     }
685     found_set = new_set;
686     found_set->timestamp();
687     found_backend->m_sets.push_back(found_set);
688 }
689
690 void yf::SessionShared::Frontend::search(mp::Package &package,
691                                          Z_APDU *apdu_req)
692 {
693     Z_SearchRequest *req = apdu_req->u.searchRequest;
694     FrontendSets::iterator fset_it = 
695         m_frontend_sets.find(req->resultSetName);
696     if (fset_it != m_frontend_sets.end())
697     {
698         // result set already exist 
699         // if replace indicator is off: we return diagnostic if
700         // result set already exist.
701         if (*req->replaceIndicator == 0)
702         {
703             mp::odr odr;
704             Z_APDU *apdu = 
705                 odr.create_searchResponse(
706                     apdu_req,
707                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
708                     0);
709             package.response() = apdu;
710             
711             return;
712         }
713         m_frontend_sets.erase(fset_it);
714     }
715     
716     yazpp_1::Yaz_Z_Query query;
717     query.set_Z_Query(req->query);
718     Databases databases;
719     int i;
720     for (i = 0; i<req->num_databaseNames; i++)
721         databases.push_back(req->databaseNames[i]);
722
723     BackendSetPtr found_set; // null
724     BackendInstancePtr found_backend; // null
725
726     get_set(package, apdu_req, databases, query, found_backend, found_set);
727     if (!found_set)
728         return;
729
730     mp::odr odr;
731     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
732     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
733     *f_resp->resultCount = found_set->m_result_set_size;
734     package.response() = f_apdu;
735
736     FrontendSetPtr fset(new FrontendSet(databases, query));
737     m_frontend_sets[req->resultSetName] = fset;
738
739     m_backend_class->release_backend(found_backend);
740 }
741
742 void yf::SessionShared::Frontend::present(mp::Package &package,
743                                           Z_APDU *apdu_req)
744 {
745     mp::odr odr;
746     Z_PresentRequest *req = apdu_req->u.presentRequest;
747
748     FrontendSets::iterator fset_it = 
749         m_frontend_sets.find(req->resultSetId);
750
751     if (fset_it == m_frontend_sets.end())
752     {
753         Z_APDU *apdu = 
754             odr.create_presentResponse(
755                 apdu_req,
756                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
757                 req->resultSetId);
758         package.response() = apdu;
759         return;
760     }
761     FrontendSetPtr fset = fset_it->second;
762
763     Databases databases = fset->get_databases();
764     yazpp_1::Yaz_Z_Query query = fset->get_query();
765
766     BackendClassPtr bc = m_backend_class;
767     BackendSetPtr found_set; // null
768     BackendInstancePtr found_backend;
769
770     get_set(package, apdu_req, databases, query, found_backend, found_set);
771     if (!found_set)
772         return;
773
774     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
775     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
776     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
777     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
778     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
779     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
780     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
781     p_req->recordComposition = req->recordComposition;
782
783     Package present_package(found_backend->m_session, package.origin());
784     present_package.copy_filter(package);
785
786     present_package.request() = p_apdu;
787
788     present_package.move();
789
790     Z_GDU *gdu = present_package.response().get();
791     if (!present_package.session().is_closed()
792         && gdu && gdu->which == Z_GDU_Z3950 
793         && gdu->u.z3950->which == Z_APDU_presentResponse)
794     {
795         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
796         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
797         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
798
799         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
800         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
801         f_resp->presentStatus= b_resp->presentStatus;
802         f_resp->records = b_resp->records;
803         f_resp->otherInfo = b_resp->otherInfo;
804         package.response() = f_apdu_res;
805         bc->release_backend(found_backend);
806     }
807     else
808     {
809         bc->remove_backend(found_backend);
810         Z_APDU *f_apdu_res = 
811             odr.create_presentResponse(
812                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
813         package.response() = f_apdu_res;
814     }
815 }
816
817 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
818                                        Z_APDU *apdu_req)
819 {
820     BackendClassPtr bc = m_backend_class;
821     BackendInstancePtr backend = bc->get_backend(frontend_package);
822     if (!backend)
823     {
824         mp::odr odr;
825         Z_APDU *apdu = odr.create_scanResponse(
826             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
827         frontend_package.response() = apdu;
828     }
829     else
830     {
831         Package scan_package(backend->m_session, frontend_package.origin());
832         scan_package.copy_filter(frontend_package);
833         scan_package.request() = apdu_req;
834         scan_package.move();
835         frontend_package.response() = scan_package.response();
836         if (scan_package.session().is_closed())
837         {
838             frontend_package.session().close();
839             bc->remove_backend(backend);
840         }
841         else
842             bc->release_backend(backend);
843     }
844 }
845
846 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
847 {
848 }
849
850 void yf::SessionShared::Worker::operator() (void)
851 {
852     m_p->expire();
853 }
854
855 void yf::SessionShared::BackendClass::expire()
856 {
857     time_t now;
858     time(&now);
859     boost::mutex::scoped_lock lock(m_mutex_backend_class);
860     BackendInstanceList::iterator bit = m_backend_list.begin();
861     while (bit != m_backend_list.end())
862     {
863         time_t last_use = (*bit)->m_time_last_use;
864         
865         if ((*bit)->m_in_use)
866         {
867             bit++;
868         }
869         else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
870             || (now < last_use))
871         {
872             mp::odr odr;
873             (*bit)->m_close_package->response() = odr.create_close(
874                 0, Z_Close_lackOfActivity, 0);
875             (*bit)->m_close_package->session().close();
876             (*bit)->m_close_package->move();
877
878             bit = m_backend_list.erase(bit);
879         }
880         else
881         {
882             bit++;
883         }
884     }
885 }
886
887 void yf::SessionShared::Rep::expire()
888 {
889     while (true)
890     {
891         boost::xtime xt;
892         boost::xtime_get(&xt, boost::TIME_UTC);
893         xt.sec += 30;
894         boost::thread::sleep(xt);
895         
896         BackendClassMap::const_iterator b_it = m_backend_map.begin();
897         for (; b_it != m_backend_map.end(); b_it++)
898             b_it->second->expire();
899     }
900 }
901
902 yf::SessionShared::Rep::Rep()
903 {
904     m_resultset_ttl = 30;
905     m_resultset_max = 10;
906     m_session_ttl = 90;
907     yf::SessionShared::Worker w(this);
908     m_thrds.add_thread(new boost::thread(w));
909 }
910
911 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
912 {
913 }
914
915 yf::SessionShared::~SessionShared() {
916 }
917
918
919 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
920 {
921 }
922
923 yf::SessionShared::Frontend::~Frontend()
924 {
925 }
926
927 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
928 {
929     boost::mutex::scoped_lock lock(m_mutex);
930
931     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
932     
933     while(true)
934     {
935         it = m_clients.find(package.session());
936         if (it == m_clients.end())
937             break;
938         
939         if (!it->second->m_in_use)
940         {
941             it->second->m_in_use = true;
942             return it->second;
943         }
944         m_cond_session_ready.wait(lock);
945     }
946     FrontendPtr f(new Frontend(this));
947     m_clients[package.session()] = f;
948     f->m_in_use = true;
949     return f;
950 }
951
952 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
953 {
954     boost::mutex::scoped_lock lock(m_mutex);
955     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
956     
957     it = m_clients.find(package.session());
958     if (it != m_clients.end())
959     {
960         if (package.session().is_closed())
961         {
962             m_clients.erase(it);
963         }
964         else
965         {
966             it->second->m_in_use = false;
967         }
968         m_cond_session_ready.notify_all();
969     }
970 }
971
972
973 void yf::SessionShared::process(mp::Package &package) const
974 {
975     FrontendPtr f = m_p->get_frontend(package);
976
977     Z_GDU *gdu = package.request().get();
978     
979     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
980         Z_APDU_initRequest && !f->m_is_virtual)
981     {
982         m_p->init(package, gdu, f);
983     }
984     else if (!f->m_is_virtual)
985         package.move();
986     else if (gdu && gdu->which == Z_GDU_Z3950)
987     {
988         Z_APDU *apdu = gdu->u.z3950;
989         if (apdu->which == Z_APDU_initRequest)
990         {
991             mp::odr odr;
992             
993             package.response() = odr.create_close(
994                 apdu,
995                 Z_Close_protocolError,
996                 "double init");
997             
998             package.session().close();
999         }
1000         else if (apdu->which == Z_APDU_close)
1001         {
1002             mp::odr odr;
1003             
1004             package.response() = odr.create_close(
1005                 apdu,
1006                 Z_Close_peerAbort, "received close from client");
1007             package.session().close();
1008         }
1009         else if (apdu->which == Z_APDU_searchRequest)
1010         {
1011             f->search(package, apdu);
1012         }
1013         else if (apdu->which == Z_APDU_presentRequest)
1014         {
1015             f->present(package, apdu);
1016         }
1017         else if (apdu->which == Z_APDU_scanRequest)
1018         {
1019             f->scan(package, apdu);
1020         }
1021         else
1022         {
1023             mp::odr odr;
1024             
1025             package.response() = odr.create_close(
1026                 apdu, Z_Close_protocolError,
1027                 "unsupported APDU in filter_session_shared");
1028             
1029             package.session().close();
1030         }
1031     }
1032     m_p->release_frontend(package);
1033 }
1034
1035 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only)
1036 {
1037     for (ptr = ptr->children; ptr; ptr = ptr->next)
1038     {
1039         if (ptr->type != XML_ELEMENT_NODE)
1040             continue;
1041         if (!strcmp((const char *) ptr->name, "resultset"))
1042         {
1043             const struct _xmlAttr *attr;
1044             for (attr = ptr->properties; attr; attr = attr->next)
1045             {
1046                 if (!strcmp((const char *) attr->name, "ttl"))
1047                     m_p->m_resultset_ttl = 
1048                         mp::xml::get_int(attr->children, 30);
1049                 else if (!strcmp((const char *) attr->name, "max"))
1050                 {
1051                     m_p->m_resultset_max = 
1052                         mp::xml::get_int(attr->children, 10);
1053                 }
1054                 else
1055                     throw mp::filter::FilterException(
1056                         "Bad attribute " + std::string((const char *)
1057                                                        attr->name));
1058             }
1059         }
1060         else if (!strcmp((const char *) ptr->name, "session"))
1061         {
1062             const struct _xmlAttr *attr;
1063             for (attr = ptr->properties; attr; attr = attr->next)
1064             {
1065                 if (!strcmp((const char *) attr->name, "ttl"))
1066                     m_p->m_session_ttl = 
1067                         mp::xml::get_int(attr->children, 120);
1068                 else
1069                     throw mp::filter::FilterException(
1070                         "Bad attribute " + std::string((const char *)
1071                                                        attr->name));
1072             }
1073         }
1074         else
1075         {
1076             throw mp::filter::FilterException("Bad element " 
1077                                                + std::string((const char *)
1078                                                              ptr->name));
1079         }
1080     }
1081 }
1082
1083 static mp::filter::Base* filter_creator()
1084 {
1085     return new mp::filter::SessionShared;
1086 }
1087
1088 extern "C" {
1089     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1090         0,
1091         "session_shared",
1092         filter_creator
1093     };
1094 }
1095
1096 /*
1097  * Local variables:
1098  * c-basic-offset: 4
1099  * indent-tabs-mode: nil
1100  * c-file-style: "stroustrup"
1101  * End:
1102  * vim: shiftwidth=4 tabstop=8 expandtab
1103  */