Implement Session invalidate for -1 hits (bug #2696).
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2008 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include "filter.hpp"
22 #include "package.hpp"
23
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30
31 #include "util.hpp"
32 #include "filter_session_shared.hpp"
33
34 #include <yaz/log.h>
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yazpp/z-query.h>
39 #include <map>
40 #include <iostream>
41 #include <time.h>
42
43 namespace mp = metaproxy_1;
44 namespace yf = metaproxy_1::filter;
45
46 namespace metaproxy_1 {
47
48     namespace filter {
49         // key for session.. We'll only share sessions with same InitKey
50         class SessionShared::InitKey {
51         public:
52             bool operator < (const SessionShared::InitKey &k) const;
53             InitKey(Z_InitRequest *req);
54             InitKey(const InitKey &);
55             ~InitKey();
56         private:
57             char *m_idAuthentication_buf;
58             int m_idAuthentication_size;
59             char *m_otherInfo_buf;
60             int m_otherInfo_size;
61             ODR m_odr;
62         };
63         // worker thread .. for expiry of sessions
64         class SessionShared::Worker {
65         public:
66             Worker(SessionShared::Rep *rep);
67             void operator() (void);
68         private:
69             SessionShared::Rep *m_p;
70         };
71         // backend result set
72         class SessionShared::BackendSet {
73         public:
74             std::string m_result_set_id;
75             Databases m_databases;
76             int m_result_set_size;
77             yazpp_1::Yaz_Z_Query m_query;
78             time_t m_time_last_use;
79             void timestamp();
80             BackendSet(
81                 const std::string &result_set_id,
82                 const Databases &databases,
83                 const yazpp_1::Yaz_Z_Query &query);
84             bool search(
85                 Package &frontend_package,
86                 const Z_APDU *apdu_req,
87                 const BackendInstancePtr bp,
88                 bool &fatal_error);
89         };
90         // backend connection instance
91         class SessionShared::BackendInstance {
92             friend class Rep;
93             friend class BackendClass;
94             friend class BackendSet;
95         public:
96             mp::Session m_session;
97             BackendSetList m_sets;
98             bool m_in_use;
99             int m_sequence_this;
100             int m_result_set_sequence;
101             time_t m_time_last_use;
102             mp::Package * m_close_package;
103             ~BackendInstance();
104         };
105         // backends of some class (all with same InitKey)
106         class SessionShared::BackendClass : boost::noncopyable {
107             friend class Rep;
108             friend struct Frontend;
109             bool m_named_result_sets;
110             BackendInstanceList m_backend_list;
111             BackendInstancePtr create_backend(const Package &package);
112             void remove_backend(BackendInstancePtr b);
113             BackendInstancePtr get_backend(const Package &package);
114             void use_backend(BackendInstancePtr b);
115             void release_backend(BackendInstancePtr b);
116             void expire_class();
117             yazpp_1::GDU m_init_request;
118             yazpp_1::GDU m_init_response;
119             boost::mutex m_mutex_backend_class;
120             int m_sequence_top;
121             time_t m_backend_set_ttl;
122             time_t m_backend_expiry_ttl;
123             size_t m_backend_set_max;
124         public:
125             BackendClass(const yazpp_1::GDU &init_request,
126                          int resultset_ttl,
127                          int resultset_max,
128                          int session_ttl);
129             ~BackendClass();
130         };
131         // frontend result set
132         class SessionShared::FrontendSet {
133             Databases m_databases;
134             yazpp_1::Yaz_Z_Query m_query;
135         public:
136             const Databases &get_databases();
137             const yazpp_1::Yaz_Z_Query &get_query();
138             FrontendSet(
139                 const Databases &databases,
140                 const yazpp_1::Yaz_Z_Query &query);
141             FrontendSet();
142         };
143         // frontend session
144         struct SessionShared::Frontend {
145             Frontend(Rep *rep);
146             ~Frontend();
147             bool m_is_virtual;
148             bool m_in_use;
149             Z_Options m_init_options;
150             void search(Package &package, Z_APDU *apdu);
151             void present(Package &package, Z_APDU *apdu);
152             void scan(Package &package, Z_APDU *apdu);
153
154             void get_set(mp::Package &package,
155                          const Z_APDU *apdu_req,
156                          const Databases &databases,
157                          yazpp_1::Yaz_Z_Query &query,
158                          BackendInstancePtr &found_backend,
159                          BackendSetPtr &found_set);
160             void override_set(BackendInstancePtr &found_backend,
161                               std::string &result_set_id);
162
163             Rep *m_p;
164             BackendClassPtr m_backend_class;
165             FrontendSets m_frontend_sets;
166         };            
167         // representation
168         class SessionShared::Rep {
169             friend class SessionShared;
170             friend struct Frontend;
171             
172             FrontendPtr get_frontend(Package &package);
173             void release_frontend(Package &package);
174             Rep();
175         public:
176             void expire();
177         private:
178             void init(Package &package, const Z_GDU *gdu,
179                       FrontendPtr frontend);
180             boost::mutex m_mutex;
181             boost::condition m_cond_session_ready;
182             std::map<mp::Session, FrontendPtr> m_clients;
183
184             BackendClassMap m_backend_map;
185             boost::mutex m_mutex_backend_map;
186             boost::thread_group m_thrds;
187             int m_resultset_ttl;
188             int m_resultset_max;
189             int m_session_ttl;
190         };
191     }
192 }
193
194 yf::SessionShared::FrontendSet::FrontendSet(
195     const Databases &databases,
196     const yazpp_1::Yaz_Z_Query &query)
197     : m_databases(databases), m_query(query)
198 {
199 }
200
201 const yf::SessionShared::Databases & 
202 yf::SessionShared::FrontendSet::get_databases()
203 {
204     return m_databases;
205 }
206
207 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
208 {
209     return m_query;
210 }
211
212 yf::SessionShared::InitKey::InitKey(const InitKey &k)
213 {
214     m_odr = odr_createmem(ODR_ENCODE);
215     
216     m_idAuthentication_size =  k.m_idAuthentication_size;
217     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
218     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
219            m_idAuthentication_size);
220
221     m_otherInfo_size =  k.m_otherInfo_size;
222     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
223     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
224            m_otherInfo_size);
225 }
226
227 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
228 {
229     m_odr = odr_createmem(ODR_ENCODE);
230
231     Z_IdAuthentication *t = req->idAuthentication;
232     z_IdAuthentication(m_odr, &t, 1, 0);
233     m_idAuthentication_buf =
234         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
235
236     Z_OtherInformation *o = req->otherInfo;
237     z_OtherInformation(m_odr, &o, 1, 0);
238     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
239 }
240
241 yf::SessionShared::InitKey::~InitKey()
242 {
243     odr_destroy(m_odr);
244 }
245
246 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
247     const 
248 {
249     int c;
250     c = mp::util::memcmp2(
251         (void*) m_idAuthentication_buf, m_idAuthentication_size,
252         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
253     if (c < 0)
254         return true;
255     else if (c > 0)
256         return false;
257
258     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
259                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
260     if (c < 0)
261         return true;
262     else if (c > 0)
263         return false;
264     return false;
265 }
266
267 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
268 {
269     boost::mutex::scoped_lock lock(m_mutex_backend_class);
270     b->m_in_use = false;
271 }
272
273
274 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
275 {
276     BackendInstanceList::iterator it = m_backend_list.begin();
277     
278     while (it != m_backend_list.end())
279     {
280         if (*it == b)
281         {
282              mp::odr odr;
283             (*it)->m_close_package->response() = odr.create_close(
284                 0, Z_Close_lackOfActivity, 0);
285             (*it)->m_close_package->session().close();
286             (*it)->m_close_package->move();
287             
288             it = m_backend_list.erase(it);
289         }
290         else
291             it++;
292     }
293 }
294
295
296
297 yf::SessionShared::BackendInstancePtr 
298 yf::SessionShared::BackendClass::get_backend(
299     const mp::Package &frontend_package)
300 {
301     {
302         boost::mutex::scoped_lock lock(m_mutex_backend_class);
303         
304         BackendInstanceList::const_iterator it = m_backend_list.begin();
305         
306         BackendInstancePtr backend1; // null
307         
308         for (; it != m_backend_list.end(); it++)
309         {
310             if (!(*it)->m_in_use)
311             {
312                 if (!backend1 
313                     || (*it)->m_sequence_this < backend1->m_sequence_this)
314                     backend1 = *it;
315             }
316         }
317         if (backend1)
318         {
319             use_backend(backend1);
320             return backend1;
321         }
322     }
323     return create_backend(frontend_package);
324 }
325
326 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
327 {
328     backend->m_in_use = true;
329     time(&backend->m_time_last_use);
330     backend->m_sequence_this = m_sequence_top++;
331 }
332
333 yf::SessionShared::BackendInstance::~BackendInstance()
334 {
335     delete m_close_package;
336 }
337
338 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
339     const mp::Package &frontend_package)
340 {
341     BackendInstancePtr bp(new BackendInstance);
342     BackendInstancePtr null;
343
344     bp->m_close_package =
345         new mp::Package(bp->m_session, frontend_package.origin());
346     bp->m_close_package->copy_filter(frontend_package);
347
348     Package init_package(bp->m_session, frontend_package.origin());
349
350     init_package.copy_filter(frontend_package);
351
352     yazpp_1::GDU actual_init_request = m_init_request;
353     Z_GDU *init_pdu = actual_init_request.get();
354
355     assert(init_pdu->which == Z_GDU_Z3950);
356     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
357
358     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
359     ODR_MASK_ZERO(req->options);
360
361     ODR_MASK_SET(req->options, Z_Options_search);
362     ODR_MASK_SET(req->options, Z_Options_present);
363     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
364     ODR_MASK_SET(req->options, Z_Options_scan);
365
366     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
367     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
368     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
369
370     init_package.request() = init_pdu;
371
372     init_package.move();
373
374     boost::mutex::scoped_lock lock(m_mutex_backend_class);
375
376     m_named_result_sets = false;
377     Z_GDU *gdu = init_package.response().get();
378     if (!init_package.session().is_closed()
379         && gdu && gdu->which == Z_GDU_Z3950 
380         && gdu->u.z3950->which == Z_APDU_initResponse)
381     {
382         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
383         if (!*res->result)
384             return null;
385         m_init_response = gdu->u.z3950;
386         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
387         {
388             m_named_result_sets = true;
389         }
390     }
391     else
392     {
393         // did not receive an init response or closed
394         return null;
395     }
396     bp->m_in_use = true;
397     time(&bp->m_time_last_use);
398     bp->m_sequence_this = 0;
399     bp->m_result_set_sequence = 0;
400     m_backend_list.push_back(bp);
401
402     return bp;
403 }
404
405
406 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
407                                               int resultset_ttl,
408                                               int resultset_max,
409                                               int session_ttl)
410     : m_named_result_sets(false), m_init_request(init_request),
411       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
412       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
413 {}
414
415 yf::SessionShared::BackendClass::~BackendClass()
416 {}
417
418 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
419                                   FrontendPtr frontend)
420 {
421     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
422
423     frontend->m_is_virtual = true;
424     frontend->m_init_options = *req->options;
425     InitKey k(req);
426     {
427         boost::mutex::scoped_lock lock(m_mutex_backend_map);
428         BackendClassMap::const_iterator it;
429         it = m_backend_map.find(k);
430         if (it == m_backend_map.end())
431         {
432             BackendClassPtr b(new BackendClass(gdu->u.z3950,
433                                                m_resultset_ttl,
434                                                m_resultset_max,
435                                                m_session_ttl));
436             m_backend_map[k] = b;
437             frontend->m_backend_class = b;
438         }
439         else
440         {
441             frontend->m_backend_class = it->second;            
442         }
443     }
444     BackendClassPtr bc = frontend->m_backend_class;
445     BackendInstancePtr backend = bc->get_backend(package);
446     
447     mp::odr odr;
448     if (!backend)
449     {
450         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
451         *apdu->u.initResponse->result = 0;
452         package.response() = apdu;
453         package.session().close();
454     }
455     else
456     {
457         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
458         yazpp_1::GDU init_response = bc->m_init_response;
459         Z_GDU *response_gdu = init_response.get();
460         mp::util::transfer_referenceId(odr, gdu->u.z3950,
461                                        response_gdu->u.z3950);
462
463         Z_Options *server_options =
464             response_gdu->u.z3950->u.initResponse->options;
465         Z_Options *client_options = &frontend->m_init_options;
466
467         int i;
468         for (i = 0; i<30; i++)
469             if (!ODR_MASK_GET(client_options, i))
470                 ODR_MASK_CLEAR(server_options, i);
471         package.response() = init_response;
472     }
473     if (backend)
474         bc->release_backend(backend);
475 }
476
477 void yf::SessionShared::BackendSet::timestamp()
478 {
479     time(&m_time_last_use);
480 }
481
482 yf::SessionShared::BackendSet::BackendSet(
483     const std::string &result_set_id,
484     const Databases &databases,
485     const yazpp_1::Yaz_Z_Query &query) :
486     m_result_set_id(result_set_id),
487     m_databases(databases), m_result_set_size(0), m_query(query) 
488 {
489     timestamp();
490 }
491
492 bool yf::SessionShared::BackendSet::search(
493     mp::Package &frontend_package,
494     const Z_APDU *frontend_apdu,
495     const BackendInstancePtr bp,
496     bool & fatal_error)
497 {
498     Package search_package(bp->m_session, frontend_package.origin());
499
500     search_package.copy_filter(frontend_package);
501
502     mp::odr odr;
503     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
504     Z_SearchRequest *req = apdu_req->u.searchRequest;
505
506     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
507     req->query = m_query.get_Z_Query();
508
509     req->num_databaseNames = m_databases.size();
510     req->databaseNames = (char**) 
511         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
512     Databases::const_iterator it = m_databases.begin();
513     size_t i = 0;
514     for (; it != m_databases.end(); it++)
515         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
516
517     search_package.request() = apdu_req;
518
519     search_package.move();
520     fatal_error = false;  // assume backend session is good
521
522     Z_Records *z_records_diag = 0;
523     Z_GDU *gdu = search_package.response().get();
524     if (!search_package.session().is_closed()
525         && gdu && gdu->which == Z_GDU_Z3950 
526         && gdu->u.z3950->which == Z_APDU_searchResponse)
527     {
528         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
529         if (b_resp->records)
530         {
531             if (b_resp->records->which == Z_Records_NSD 
532                 || b_resp->records->which == Z_Records_multipleNSD)
533                 z_records_diag = b_resp->records;
534         }
535         if (z_records_diag)
536         {
537             // there could be diagnostics that are so bad.. that 
538             // we simply mark the error as fatal..  For now we assume
539             // we can resume
540             if (frontend_apdu->which == Z_APDU_searchRequest)
541             {
542                 Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu, 
543                                                            0, 0);
544                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
545                 *f_resp->searchStatus = *b_resp->searchStatus;
546                 f_resp->records = z_records_diag;
547                 frontend_package.response() = f_apdu;
548                 return false;
549             }
550             if (frontend_apdu->which == Z_APDU_presentRequest)
551             {
552                 Z_APDU *f_apdu = odr.create_presentResponse(frontend_apdu, 
553                                                             0, 0);
554                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
555                 f_resp->records = z_records_diag;
556                 frontend_package.response() = f_apdu;
557                 return false;
558             }
559         }
560         m_result_set_size = *b_resp->resultCount;
561         return true;
562     }
563     Z_APDU *f_apdu = 0;
564     if (frontend_apdu->which == Z_APDU_searchRequest)
565         f_apdu = odr.create_searchResponse(
566             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
567     else if (frontend_apdu->which == Z_APDU_presentRequest)
568         f_apdu = odr.create_presentResponse(
569             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
570     else
571         f_apdu = odr.create_close(
572             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
573     frontend_package.response() = f_apdu;
574     fatal_error = true; // weired response.. bad backend
575     return false;
576 }
577
578 void yf::SessionShared::Frontend::override_set(
579     BackendInstancePtr &found_backend,
580     std::string &result_set_id)
581 {
582     BackendClassPtr bc = m_backend_class;
583     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
584     time_t now;
585     time(&now);
586     
587     for (; it != bc->m_backend_list.end(); it++)
588     {
589         if (!(*it)->m_in_use)
590         {
591             BackendSetList::iterator set_it = (*it)->m_sets.begin();
592             for (; set_it != (*it)->m_sets.end(); set_it++)
593             {
594                 if (now >= (*set_it)->m_time_last_use &&
595                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
596                 {
597                     found_backend = *it;
598                     result_set_id = (*set_it)->m_result_set_id;
599                     found_backend->m_sets.erase(set_it);
600                     return;
601                 }
602             }
603         }
604     }
605     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
606     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
607     {
608         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
609         {
610             found_backend = *it;
611             if (bc->m_named_result_sets)
612             {
613                 result_set_id = boost::io::str(
614                     boost::format("%1%") % 
615                     found_backend->m_result_set_sequence);
616                 found_backend->m_result_set_sequence++;
617             }
618             else
619                 result_set_id = "default";
620             return;
621         }
622     }
623 }
624
625 void yf::SessionShared::Frontend::get_set(mp::Package &package,
626                                           const Z_APDU *apdu_req,
627                                           const Databases &databases,
628                                           yazpp_1::Yaz_Z_Query &query,
629                                           BackendInstancePtr &found_backend,
630                                           BackendSetPtr &found_set)
631 {
632     bool session_restarted = false;
633
634 restart:
635     std::string result_set_id;
636     BackendClassPtr bc = m_backend_class;
637     {
638         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
639      
640         // look at each backend and see if we have a similar search
641         BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
642         
643         for (; it != bc->m_backend_list.end(); it++)
644         {
645             if (!(*it)->m_in_use)
646             {
647                 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
648                 for (; set_it != (*it)->m_sets.end(); set_it++)
649                 {
650                     if ((*set_it)->m_databases == databases 
651                         && query.match(&(*set_it)->m_query))
652                     {
653                         found_set = *set_it;
654                         found_backend = *it;
655                         bc->use_backend(found_backend);
656                         found_set->timestamp();
657                         // found matching set. No need to search again
658                         return;
659                     }
660                 }
661             }
662         }
663         override_set(found_backend, result_set_id);
664         if (found_backend)
665             bc->use_backend(found_backend);
666     }
667     if (!found_backend)
668     {
669         // create a new backend set (and new set)
670         found_backend = bc->create_backend(package);
671
672         if (!found_backend)
673         {
674             Z_APDU *f_apdu = 0;
675             mp::odr odr;
676             if (apdu_req->which == Z_APDU_searchRequest)
677             {
678                 f_apdu = odr.create_searchResponse(
679                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
680             }
681             else if (apdu_req->which == Z_APDU_presentRequest)
682             {
683                 f_apdu = odr.create_presentResponse(
684                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
685             }
686             else
687             {
688                 f_apdu = odr.create_close(
689                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
690             }
691             package.response() = f_apdu;
692             return;
693         }
694         if (bc->m_named_result_sets)
695         {
696             result_set_id = boost::io::str(
697                 boost::format("%1%") % found_backend->m_result_set_sequence);
698             found_backend->m_result_set_sequence++;
699         }
700         else
701             result_set_id = "default";
702     }
703     // we must search ...
704     BackendSetPtr new_set(new BackendSet(result_set_id,
705                                          databases, query));
706     bool fatal_error = false;
707     if (!new_set->search(package, apdu_req, found_backend, fatal_error))
708     {
709         if (fatal_error)
710             bc->remove_backend(found_backend);
711         else
712             bc->release_backend(found_backend);
713         return; // search error 
714     }
715     if (!session_restarted && new_set->m_result_set_size < 0)
716     {
717         bc->remove_backend(found_backend);
718         session_restarted = true;
719         found_backend.reset();
720         goto restart;
721     }
722
723     found_set = new_set;
724     found_set->timestamp();
725     found_backend->m_sets.push_back(found_set);
726 }
727
728 void yf::SessionShared::Frontend::search(mp::Package &package,
729                                          Z_APDU *apdu_req)
730 {
731     Z_SearchRequest *req = apdu_req->u.searchRequest;
732     FrontendSets::iterator fset_it = 
733         m_frontend_sets.find(req->resultSetName);
734     if (fset_it != m_frontend_sets.end())
735     {
736         // result set already exist 
737         // if replace indicator is off: we return diagnostic if
738         // result set already exist.
739         if (*req->replaceIndicator == 0)
740         {
741             mp::odr odr;
742             Z_APDU *apdu = 
743                 odr.create_searchResponse(
744                     apdu_req,
745                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
746                     0);
747             package.response() = apdu;
748             
749             return;
750         }
751         m_frontend_sets.erase(fset_it);
752     }
753     
754     yazpp_1::Yaz_Z_Query query;
755     query.set_Z_Query(req->query);
756     Databases databases;
757     int i;
758     for (i = 0; i<req->num_databaseNames; i++)
759         databases.push_back(req->databaseNames[i]);
760
761     BackendSetPtr found_set; // null
762     BackendInstancePtr found_backend; // null
763
764     get_set(package, apdu_req, databases, query, found_backend, found_set);
765     if (!found_set)
766         return;
767
768     mp::odr odr;
769     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
770     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
771     *f_resp->resultCount = found_set->m_result_set_size;
772     package.response() = f_apdu;
773
774     FrontendSetPtr fset(new FrontendSet(databases, query));
775     m_frontend_sets[req->resultSetName] = fset;
776
777     m_backend_class->release_backend(found_backend);
778 }
779
780 void yf::SessionShared::Frontend::present(mp::Package &package,
781                                           Z_APDU *apdu_req)
782 {
783     mp::odr odr;
784     Z_PresentRequest *req = apdu_req->u.presentRequest;
785
786     FrontendSets::iterator fset_it = 
787         m_frontend_sets.find(req->resultSetId);
788
789     if (fset_it == m_frontend_sets.end())
790     {
791         Z_APDU *apdu = 
792             odr.create_presentResponse(
793                 apdu_req,
794                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
795                 req->resultSetId);
796         package.response() = apdu;
797         return;
798     }
799     FrontendSetPtr fset = fset_it->second;
800
801     Databases databases = fset->get_databases();
802     yazpp_1::Yaz_Z_Query query = fset->get_query();
803
804     BackendClassPtr bc = m_backend_class;
805     BackendSetPtr found_set; // null
806     BackendInstancePtr found_backend;
807
808     get_set(package, apdu_req, databases, query, found_backend, found_set);
809     if (!found_set)
810         return;
811
812     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
813     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
814     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
815     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
816     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
817     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
818     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
819     p_req->recordComposition = req->recordComposition;
820
821     Package present_package(found_backend->m_session, package.origin());
822     present_package.copy_filter(package);
823
824     present_package.request() = p_apdu;
825
826     present_package.move();
827
828     Z_GDU *gdu = present_package.response().get();
829     if (!present_package.session().is_closed()
830         && gdu && gdu->which == Z_GDU_Z3950 
831         && gdu->u.z3950->which == Z_APDU_presentResponse)
832     {
833         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
834         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
835         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
836
837         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
838         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
839         f_resp->presentStatus= b_resp->presentStatus;
840         f_resp->records = b_resp->records;
841         f_resp->otherInfo = b_resp->otherInfo;
842         package.response() = f_apdu_res;
843         bc->release_backend(found_backend);
844     }
845     else
846     {
847         bc->remove_backend(found_backend);
848         Z_APDU *f_apdu_res = 
849             odr.create_presentResponse(
850                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
851         package.response() = f_apdu_res;
852     }
853 }
854
855 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
856                                        Z_APDU *apdu_req)
857 {
858     BackendClassPtr bc = m_backend_class;
859     BackendInstancePtr backend = bc->get_backend(frontend_package);
860     if (!backend)
861     {
862         mp::odr odr;
863         Z_APDU *apdu = odr.create_scanResponse(
864             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
865         frontend_package.response() = apdu;
866     }
867     else
868     {
869         Package scan_package(backend->m_session, frontend_package.origin());
870         scan_package.copy_filter(frontend_package);
871         scan_package.request() = apdu_req;
872         scan_package.move();
873         frontend_package.response() = scan_package.response();
874         if (scan_package.session().is_closed())
875         {
876             frontend_package.session().close();
877             bc->remove_backend(backend);
878         }
879         else
880             bc->release_backend(backend);
881     }
882 }
883
884 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
885 {
886 }
887
888 void yf::SessionShared::Worker::operator() (void)
889 {
890     m_p->expire();
891 }
892
893 void yf::SessionShared::BackendClass::expire_class()
894 {
895     time_t now;
896     time(&now);
897     boost::mutex::scoped_lock lock(m_mutex_backend_class);
898     BackendInstanceList::iterator bit = m_backend_list.begin();
899     while (bit != m_backend_list.end())
900     {
901         time_t last_use = (*bit)->m_time_last_use;
902         
903         if ((*bit)->m_in_use)
904         {
905             bit++;
906         }
907         else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
908             || (now < last_use))
909         {
910             mp::odr odr;
911             (*bit)->m_close_package->response() = odr.create_close(
912                 0, Z_Close_lackOfActivity, 0);
913             (*bit)->m_close_package->session().close();
914             (*bit)->m_close_package->move();
915
916             bit = m_backend_list.erase(bit);
917         }
918         else
919         {
920             bit++;
921         }
922     }
923 }
924
925 void yf::SessionShared::Rep::expire()
926 {
927     while (true)
928     {
929         boost::xtime xt;
930         boost::xtime_get(&xt, boost::TIME_UTC);
931         xt.sec += 30;
932         boost::thread::sleep(xt);
933         
934         BackendClassMap::const_iterator b_it = m_backend_map.begin();
935         for (; b_it != m_backend_map.end(); b_it++)
936             b_it->second->expire_class();
937     }
938 }
939
940 yf::SessionShared::Rep::Rep()
941 {
942     m_resultset_ttl = 30;
943     m_resultset_max = 10;
944     m_session_ttl = 90;
945     yf::SessionShared::Worker w(this);
946     m_thrds.add_thread(new boost::thread(w));
947 }
948
949 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
950 {
951 }
952
953 yf::SessionShared::~SessionShared() {
954 }
955
956
957 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
958 {
959 }
960
961 yf::SessionShared::Frontend::~Frontend()
962 {
963 }
964
965 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
966 {
967     boost::mutex::scoped_lock lock(m_mutex);
968
969     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
970     
971     while(true)
972     {
973         it = m_clients.find(package.session());
974         if (it == m_clients.end())
975             break;
976         
977         if (!it->second->m_in_use)
978         {
979             it->second->m_in_use = true;
980             return it->second;
981         }
982         m_cond_session_ready.wait(lock);
983     }
984     FrontendPtr f(new Frontend(this));
985     m_clients[package.session()] = f;
986     f->m_in_use = true;
987     return f;
988 }
989
990 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
991 {
992     boost::mutex::scoped_lock lock(m_mutex);
993     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
994     
995     it = m_clients.find(package.session());
996     if (it != m_clients.end())
997     {
998         if (package.session().is_closed())
999         {
1000             m_clients.erase(it);
1001         }
1002         else
1003         {
1004             it->second->m_in_use = false;
1005         }
1006         m_cond_session_ready.notify_all();
1007     }
1008 }
1009
1010
1011 void yf::SessionShared::process(mp::Package &package) const
1012 {
1013     FrontendPtr f = m_p->get_frontend(package);
1014
1015     Z_GDU *gdu = package.request().get();
1016     
1017     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1018         Z_APDU_initRequest && !f->m_is_virtual)
1019     {
1020         m_p->init(package, gdu, f);
1021     }
1022     else if (!f->m_is_virtual)
1023         package.move();
1024     else if (gdu && gdu->which == Z_GDU_Z3950)
1025     {
1026         Z_APDU *apdu = gdu->u.z3950;
1027         if (apdu->which == Z_APDU_initRequest)
1028         {
1029             mp::odr odr;
1030             
1031             package.response() = odr.create_close(
1032                 apdu,
1033                 Z_Close_protocolError,
1034                 "double init");
1035             
1036             package.session().close();
1037         }
1038         else if (apdu->which == Z_APDU_close)
1039         {
1040             mp::odr odr;
1041             
1042             package.response() = odr.create_close(
1043                 apdu,
1044                 Z_Close_peerAbort, "received close from client");
1045             package.session().close();
1046         }
1047         else if (apdu->which == Z_APDU_searchRequest)
1048         {
1049             f->search(package, apdu);
1050         }
1051         else if (apdu->which == Z_APDU_presentRequest)
1052         {
1053             f->present(package, apdu);
1054         }
1055         else if (apdu->which == Z_APDU_scanRequest)
1056         {
1057             f->scan(package, apdu);
1058         }
1059         else
1060         {
1061             mp::odr odr;
1062             
1063             package.response() = odr.create_close(
1064                 apdu, Z_Close_protocolError,
1065                 "unsupported APDU in filter_session_shared");
1066             
1067             package.session().close();
1068         }
1069     }
1070     m_p->release_frontend(package);
1071 }
1072
1073 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only)
1074 {
1075     for (ptr = ptr->children; ptr; ptr = ptr->next)
1076     {
1077         if (ptr->type != XML_ELEMENT_NODE)
1078             continue;
1079         if (!strcmp((const char *) ptr->name, "resultset"))
1080         {
1081             const struct _xmlAttr *attr;
1082             for (attr = ptr->properties; attr; attr = attr->next)
1083             {
1084                 if (!strcmp((const char *) attr->name, "ttl"))
1085                     m_p->m_resultset_ttl = 
1086                         mp::xml::get_int(attr->children, 30);
1087                 else if (!strcmp((const char *) attr->name, "max"))
1088                 {
1089                     m_p->m_resultset_max = 
1090                         mp::xml::get_int(attr->children, 10);
1091                 }
1092                 else
1093                     throw mp::filter::FilterException(
1094                         "Bad attribute " + std::string((const char *)
1095                                                        attr->name));
1096             }
1097         }
1098         else if (!strcmp((const char *) ptr->name, "session"))
1099         {
1100             const struct _xmlAttr *attr;
1101             for (attr = ptr->properties; attr; attr = attr->next)
1102             {
1103                 if (!strcmp((const char *) attr->name, "ttl"))
1104                     m_p->m_session_ttl = 
1105                         mp::xml::get_int(attr->children, 120);
1106                 else
1107                     throw mp::filter::FilterException(
1108                         "Bad attribute " + std::string((const char *)
1109                                                        attr->name));
1110             }
1111         }
1112         else
1113         {
1114             throw mp::filter::FilterException("Bad element " 
1115                                                + std::string((const char *)
1116                                                              ptr->name));
1117         }
1118     }
1119 }
1120
1121 static mp::filter::Base* filter_creator()
1122 {
1123     return new mp::filter::SessionShared;
1124 }
1125
1126 extern "C" {
1127     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1128         0,
1129         "session_shared",
1130         filter_creator
1131     };
1132 }
1133
1134 /*
1135  * Local variables:
1136  * c-basic-offset: 4
1137  * c-file-style: "Stroustrup"
1138  * indent-tabs-mode: nil
1139  * End:
1140  * vim: shiftwidth=4 tabstop=8 expandtab
1141  */
1142