session_shared: only reuse sets with matching db
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2012 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include <metaproxy/filter.hpp>
22 #include <metaproxy/package.hpp>
23
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30
31 #include <metaproxy/util.hpp>
32 #include "filter_session_shared.hpp"
33
34 #include <yaz/log.h>
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yazpp/z-query.h>
39 #include <yazpp/record-cache.h>
40 #include <map>
41 #include <iostream>
42 #include <time.h>
43
44 namespace mp = metaproxy_1;
45 namespace yf = metaproxy_1::filter;
46
47 namespace metaproxy_1 {
48
49     namespace filter {
50         // key for session.. We'll only share sessions with same InitKey
51         class SessionShared::InitKey {
52         public:
53             bool operator < (const SessionShared::InitKey &k) const;
54             InitKey(Z_InitRequest *req);
55             InitKey(const InitKey &);
56             ~InitKey();
57         private:
58             char *m_idAuthentication_buf;
59             int m_idAuthentication_size;
60             char *m_otherInfo_buf;
61             int m_otherInfo_size;
62             ODR m_odr;
63         };
64         // worker thread .. for expiry of sessions
65         class SessionShared::Worker {
66         public:
67             Worker(SessionShared::Rep *rep);
68             void operator() (void);
69         private:
70             SessionShared::Rep *m_p;
71         };
72         // backend result set
73         class SessionShared::BackendSet {
74         public:
75             std::string m_result_set_id;
76             Databases m_databases;
77             int m_result_set_size;
78             yazpp_1::Yaz_Z_Query m_query;
79             time_t m_time_last_use;
80             void timestamp();
81             yazpp_1::RecordCache m_record_cache;
82             BackendSet(
83                 const std::string &result_set_id,
84                 const Databases &databases,
85                 const yazpp_1::Yaz_Z_Query &query);
86             bool search(
87                 Package &frontend_package,
88                 Package &search_package,
89                 const Z_APDU *apdu_req,
90                 const BackendInstancePtr bp,
91                 Z_Records **z_records);
92         };
93         // backend connection instance
94         class SessionShared::BackendInstance {
95             friend class Rep;
96             friend class BackendClass;
97             friend class BackendSet;
98         public:
99             mp::Session m_session;
100             BackendSetList m_sets;
101             bool m_in_use;
102             int m_sequence_this;
103             int m_result_set_sequence;
104             time_t m_time_last_use;
105             mp::Package * m_close_package;
106             ~BackendInstance();
107             void timestamp();
108         };
109         // backends of some class (all with same InitKey)
110         class SessionShared::BackendClass : boost::noncopyable {
111             friend class Rep;
112             friend struct Frontend;
113             bool m_named_result_sets;
114             BackendInstanceList m_backend_list;
115             BackendInstancePtr create_backend(const Package &package);
116             void remove_backend(BackendInstancePtr b);
117             BackendInstancePtr get_backend(const Package &package);
118             void use_backend(BackendInstancePtr b);
119             void release_backend(BackendInstancePtr b);
120             void expire_class();
121             yazpp_1::GDU m_init_request;
122             yazpp_1::GDU m_init_response;
123             boost::mutex m_mutex_backend_class;
124             int m_sequence_top;
125             time_t m_backend_set_ttl;
126             time_t m_backend_expiry_ttl;
127             size_t m_backend_set_max;
128         public:
129             BackendClass(const yazpp_1::GDU &init_request,
130                          int resultset_ttl,
131                          int resultset_max,
132                          int session_ttl);
133             ~BackendClass();
134         };
135         // frontend result set
136         class SessionShared::FrontendSet {
137             Databases m_databases;
138             yazpp_1::Yaz_Z_Query m_query;
139         public:
140             const Databases &get_databases();
141             const yazpp_1::Yaz_Z_Query &get_query();
142             FrontendSet(
143                 const Databases &databases,
144                 const yazpp_1::Yaz_Z_Query &query);
145             FrontendSet();
146         };
147         // frontend session
148         struct SessionShared::Frontend {
149             Frontend(Rep *rep);
150             ~Frontend();
151             bool m_is_virtual;
152             bool m_in_use;
153             Z_Options m_init_options;
154             void search(Package &package, Z_APDU *apdu);
155             void present(Package &package, Z_APDU *apdu);
156             void scan(Package &package, Z_APDU *apdu);
157
158             void get_set(mp::Package &package,
159                          const Z_APDU *apdu_req,
160                          const Databases &databases,
161                          yazpp_1::Yaz_Z_Query &query,
162                          BackendInstancePtr &found_backend,
163                          BackendSetPtr &found_set);
164             void override_set(BackendInstancePtr &found_backend,
165                               std::string &result_set_id,
166                               const Databases &databases);
167
168             Rep *m_p;
169             BackendClassPtr m_backend_class;
170             FrontendSets m_frontend_sets;
171         };            
172         // representation
173         class SessionShared::Rep {
174             friend class SessionShared;
175             friend struct Frontend;
176             
177             FrontendPtr get_frontend(Package &package);
178             void release_frontend(Package &package);
179             Rep();
180         public:
181             void expire();
182         private:
183             void init(Package &package, const Z_GDU *gdu,
184                       FrontendPtr frontend);
185             void start();
186             boost::mutex m_mutex;
187             boost::condition m_cond_session_ready;
188             std::map<mp::Session, FrontendPtr> m_clients;
189
190             BackendClassMap m_backend_map;
191             boost::mutex m_mutex_backend_map;
192             boost::thread_group m_thrds;
193             int m_resultset_ttl;
194             int m_resultset_max;
195             int m_session_ttl;
196             bool m_optimize_search;
197         };
198     }
199 }
200
201 yf::SessionShared::FrontendSet::FrontendSet(
202     const Databases &databases,
203     const yazpp_1::Yaz_Z_Query &query)
204     : m_databases(databases), m_query(query)
205 {
206 }
207
208 const yf::SessionShared::Databases & 
209 yf::SessionShared::FrontendSet::get_databases()
210 {
211     return m_databases;
212 }
213
214 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
215 {
216     return m_query;
217 }
218
219 yf::SessionShared::InitKey::InitKey(const InitKey &k)
220 {
221     m_odr = odr_createmem(ODR_ENCODE);
222     
223     m_idAuthentication_size =  k.m_idAuthentication_size;
224     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
225     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
226            m_idAuthentication_size);
227
228     m_otherInfo_size =  k.m_otherInfo_size;
229     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
230     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
231            m_otherInfo_size);
232 }
233
234 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
235 {
236     m_odr = odr_createmem(ODR_ENCODE);
237
238     Z_IdAuthentication *t = req->idAuthentication;
239     z_IdAuthentication(m_odr, &t, 1, 0);
240     m_idAuthentication_buf =
241         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
242
243     Z_OtherInformation *o = req->otherInfo;
244     z_OtherInformation(m_odr, &o, 1, 0);
245     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
246 }
247
248 yf::SessionShared::InitKey::~InitKey()
249 {
250     odr_destroy(m_odr);
251 }
252
253 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
254     const 
255 {
256     int c;
257     c = mp::util::memcmp2(
258         (void*) m_idAuthentication_buf, m_idAuthentication_size,
259         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
260     if (c < 0)
261         return true;
262     else if (c > 0)
263         return false;
264
265     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
266                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
267     if (c < 0)
268         return true;
269     else if (c > 0)
270         return false;
271     return false;
272 }
273
274 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
275 {
276     boost::mutex::scoped_lock lock(m_mutex_backend_class);
277     b->m_in_use = false;
278 }
279
280
281 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
282 {
283     BackendInstanceList::iterator it = m_backend_list.begin();
284     
285     while (it != m_backend_list.end())
286     {
287         if (*it == b)
288         {
289              mp::odr odr;
290             (*it)->m_close_package->response() = odr.create_close(
291                 0, Z_Close_lackOfActivity, 0);
292             (*it)->m_close_package->session().close();
293             (*it)->m_close_package->move();
294             
295             it = m_backend_list.erase(it);
296         }
297         else
298             it++;
299     }
300 }
301
302
303
304 yf::SessionShared::BackendInstancePtr 
305 yf::SessionShared::BackendClass::get_backend(
306     const mp::Package &frontend_package)
307 {
308     {
309         boost::mutex::scoped_lock lock(m_mutex_backend_class);
310         
311         BackendInstanceList::const_iterator it = m_backend_list.begin();
312         
313         BackendInstancePtr backend1; // null
314         
315         for (; it != m_backend_list.end(); it++)
316         {
317             if (!(*it)->m_in_use)
318             {
319                 if (!backend1 
320                     || (*it)->m_sequence_this < backend1->m_sequence_this)
321                     backend1 = *it;
322             }
323         }
324         if (backend1)
325         {
326             use_backend(backend1);
327             return backend1;
328         }
329     }
330     return create_backend(frontend_package);
331 }
332
333 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
334 {
335     backend->m_in_use = true;
336     backend->m_sequence_this = m_sequence_top++;
337 }
338
339 void yf::SessionShared::BackendInstance::timestamp()
340 {
341     assert(m_in_use);
342     time(&m_time_last_use);
343 }
344
345 yf::SessionShared::BackendInstance::~BackendInstance()
346 {
347     delete m_close_package;
348 }
349
350 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
351     const mp::Package &frontend_package)
352 {
353     BackendInstancePtr bp(new BackendInstance);
354     BackendInstancePtr null;
355
356     bp->m_close_package =
357         new mp::Package(bp->m_session, frontend_package.origin());
358     bp->m_close_package->copy_filter(frontend_package);
359
360     Package init_package(bp->m_session, frontend_package.origin());
361
362     init_package.copy_filter(frontend_package);
363
364     yazpp_1::GDU actual_init_request = m_init_request;
365     Z_GDU *init_pdu = actual_init_request.get();
366
367     assert(init_pdu->which == Z_GDU_Z3950);
368     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
369
370     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
371     ODR_MASK_ZERO(req->options);
372
373     ODR_MASK_SET(req->options, Z_Options_search);
374     ODR_MASK_SET(req->options, Z_Options_present);
375     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
376     ODR_MASK_SET(req->options, Z_Options_scan);
377
378     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
379     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
380     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
381
382     init_package.request() = init_pdu;
383
384     init_package.move();
385
386     boost::mutex::scoped_lock lock(m_mutex_backend_class);
387
388     m_named_result_sets = false;
389     Z_GDU *gdu = init_package.response().get();
390     if (init_package.session().is_closed())
391     {
392         /* already closed. We don't know why */
393         return null;
394     }
395     else if (gdu && gdu->which == Z_GDU_Z3950 
396              && gdu->u.z3950->which == Z_APDU_initResponse
397              && *gdu->u.z3950->u.initResponse->result)
398     {
399         /* successful init response */
400         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
401         m_init_response = gdu->u.z3950;
402         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
403         {
404             m_named_result_sets = true;
405         }
406     }
407     else
408     {
409         /* not init or init rejected */
410         init_package.copy_filter(frontend_package);
411         init_package.session().close();
412         init_package.move();
413         return null;
414     }
415     bp->m_in_use = true;
416     time(&bp->m_time_last_use);
417     bp->m_sequence_this = 0;
418     bp->m_result_set_sequence = 0;
419     m_backend_list.push_back(bp);
420
421     return bp;
422 }
423
424
425 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
426                                               int resultset_ttl,
427                                               int resultset_max,
428                                               int session_ttl)
429     : m_named_result_sets(false), m_init_request(init_request),
430       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
431       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
432 {}
433
434 yf::SessionShared::BackendClass::~BackendClass()
435 {}
436
437 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
438                                   FrontendPtr frontend)
439 {
440     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
441
442     frontend->m_is_virtual = true;
443     frontend->m_init_options = *req->options;
444     InitKey k(req);
445     {
446         boost::mutex::scoped_lock lock(m_mutex_backend_map);
447         BackendClassMap::const_iterator it;
448         it = m_backend_map.find(k);
449         if (it == m_backend_map.end())
450         {
451             BackendClassPtr b(new BackendClass(gdu->u.z3950,
452                                                m_resultset_ttl,
453                                                m_resultset_max,
454                                                m_session_ttl));
455             m_backend_map[k] = b;
456             frontend->m_backend_class = b;
457         }
458         else
459         {
460             frontend->m_backend_class = it->second;            
461         }
462     }
463     BackendClassPtr bc = frontend->m_backend_class;
464     BackendInstancePtr backend = bc->get_backend(package);
465     
466     mp::odr odr;
467     if (!backend)
468     {
469         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
470         *apdu->u.initResponse->result = 0;
471         package.response() = apdu;
472         package.session().close();
473     }
474     else
475     {
476         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
477         yazpp_1::GDU init_response = bc->m_init_response;
478         Z_GDU *response_gdu = init_response.get();
479         mp::util::transfer_referenceId(odr, gdu->u.z3950,
480                                        response_gdu->u.z3950);
481
482         Z_Options *server_options =
483             response_gdu->u.z3950->u.initResponse->options;
484         Z_Options *client_options = &frontend->m_init_options;
485
486         int i;
487         for (i = 0; i<30; i++)
488             if (!ODR_MASK_GET(client_options, i))
489                 ODR_MASK_CLEAR(server_options, i);
490         package.response() = init_response;
491     }
492     if (backend)
493         bc->release_backend(backend);
494 }
495
496 void yf::SessionShared::BackendSet::timestamp()
497 {
498     time(&m_time_last_use);
499 }
500
501 yf::SessionShared::BackendSet::BackendSet(
502     const std::string &result_set_id,
503     const Databases &databases,
504     const yazpp_1::Yaz_Z_Query &query) :
505     m_result_set_id(result_set_id),
506     m_databases(databases), m_result_set_size(0), m_query(query) 
507 {
508     timestamp();
509 }
510
511 static int get_diagnostic(Z_DefaultDiagFormat *r)
512 {
513     return *r->condition;
514 }
515
516 bool yf::SessionShared::BackendSet::search(
517     mp::Package &frontend_package,
518     mp::Package &search_package,
519     const Z_APDU *frontend_apdu,
520     const BackendInstancePtr bp,
521     Z_Records **z_records)
522 {
523     mp::odr odr;
524     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
525     Z_SearchRequest *req = apdu_req->u.searchRequest;
526
527     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
528     req->query = m_query.get_Z_Query();
529
530     req->num_databaseNames = m_databases.size();
531     req->databaseNames = (char**) 
532         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
533     Databases::const_iterator it = m_databases.begin();
534     size_t i = 0;
535     for (; it != m_databases.end(); it++)
536         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
537
538     if (frontend_apdu->which == Z_APDU_searchRequest)
539         req->preferredRecordSyntax =
540             frontend_apdu->u.searchRequest->preferredRecordSyntax;
541
542     search_package.request() = apdu_req;
543
544     search_package.move();
545
546     Z_GDU *gdu = search_package.response().get();
547     if (!search_package.session().is_closed()
548         && gdu && gdu->which == Z_GDU_Z3950 
549         && gdu->u.z3950->which == Z_APDU_searchResponse)
550     {
551         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
552         *z_records = b_resp->records;
553         m_result_set_size = *b_resp->resultCount;
554         return true;
555     }
556     Z_APDU *f_apdu = 0;
557     if (frontend_apdu->which == Z_APDU_searchRequest)
558         f_apdu = odr.create_searchResponse(
559             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
560     else if (frontend_apdu->which == Z_APDU_presentRequest)
561         f_apdu = odr.create_presentResponse(
562             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
563     else
564         f_apdu = odr.create_close(
565             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
566     frontend_package.response() = f_apdu;
567     return false;
568 }
569
570 void yf::SessionShared::Frontend::override_set(
571     BackendInstancePtr &found_backend,
572     std::string &result_set_id,
573     const Databases &databases)
574 {
575     BackendClassPtr bc = m_backend_class;
576     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
577     time_t now;
578     time(&now);
579
580     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
581     for (; it != bc->m_backend_list.end(); it++)
582     {
583         if (!(*it)->m_in_use)
584         {
585             BackendSetList::iterator set_it = (*it)->m_sets.begin();
586             for (; set_it != (*it)->m_sets.end(); set_it++)
587             {
588                 if ((max_sets > 1 || (*set_it)->m_databases == databases)
589                     &&
590                     (now < (*set_it)->m_time_last_use ||
591                      now - (*set_it)->m_time_last_use >= bc->m_backend_set_ttl))
592                 {
593                     found_backend = *it;
594                     result_set_id = (*set_it)->m_result_set_id;
595                     found_backend->m_sets.erase(set_it);
596                     return;
597                 }
598             }
599         }
600     }
601     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
602     {
603         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
604         {
605             found_backend = *it;
606             if (bc->m_named_result_sets)
607             {
608                 result_set_id = boost::io::str(
609                     boost::format("%1%") % 
610                     found_backend->m_result_set_sequence);
611                 found_backend->m_result_set_sequence++;
612             }
613             else
614                 result_set_id = "default";
615             return;
616         }
617     }
618 }
619
620 void yf::SessionShared::Frontend::get_set(mp::Package &package,
621                                           const Z_APDU *apdu_req,
622                                           const Databases &databases,
623                                           yazpp_1::Yaz_Z_Query &query,
624                                           BackendInstancePtr &found_backend,
625                                           BackendSetPtr &found_set)
626 {
627     bool session_restarted = false;
628
629 restart:
630     std::string result_set_id;
631     BackendClassPtr bc = m_backend_class;
632     {
633         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
634         
635         if (m_p->m_optimize_search)
636         {
637             // look at each backend and see if we have a similar search
638             BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
639             for (; it != bc->m_backend_list.end(); it++)
640             {
641                 if (!(*it)->m_in_use)
642                 {
643                     BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
644                     for (; set_it != (*it)->m_sets.end(); set_it++)
645                     {
646                         if ((*set_it)->m_databases == databases
647                             && query.match(&(*set_it)->m_query))
648                         {
649                             found_set = *set_it;
650                             found_backend = *it;
651                             bc->use_backend(found_backend);
652                             found_set->timestamp();
653                             // found matching set. No need to search again
654                             return;
655                         }
656                     }
657                 }
658             }
659         }
660         override_set(found_backend, result_set_id, databases);
661         if (found_backend)
662             bc->use_backend(found_backend);
663     }
664     if (!found_backend)
665     {
666         // create a new backend set (and new set)
667         found_backend = bc->create_backend(package);
668
669         if (!found_backend)
670         {
671             Z_APDU *f_apdu = 0;
672             mp::odr odr;
673             if (apdu_req->which == Z_APDU_searchRequest)
674             {
675                 f_apdu = odr.create_searchResponse(
676                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
677             }
678             else if (apdu_req->which == Z_APDU_presentRequest)
679             {
680                 f_apdu = odr.create_presentResponse(
681                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
682             }
683             else
684             {
685                 f_apdu = odr.create_close(
686                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
687             }
688             package.response() = f_apdu;
689             return;
690         }
691         if (bc->m_named_result_sets)
692         {
693             result_set_id = boost::io::str(
694                 boost::format("%1%") % found_backend->m_result_set_sequence);
695             found_backend->m_result_set_sequence++;
696         }
697         else
698             result_set_id = "default";
699     }
700     found_backend->timestamp();
701
702     // we must search ...
703     BackendSetPtr new_set(new BackendSet(result_set_id,
704                                          databases, query));
705     Z_Records *z_records = 0;
706
707     Package search_package(found_backend->m_session, package.origin());
708     search_package.copy_filter(package);
709
710     if (!new_set->search(package, search_package,
711                          apdu_req, found_backend, &z_records))
712     {
713         bc->remove_backend(found_backend);
714         return; // search error 
715     }
716
717     if (z_records)
718     {
719         int condition = 0;
720         if (z_records->which == Z_Records_NSD)
721         {
722             condition =
723                 get_diagnostic(z_records->u.nonSurrogateDiagnostic);
724         }
725         else if (z_records->which == Z_Records_multipleNSD)
726         {
727             if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1
728                 && 
729                 
730                 z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which ==
731                 Z_DiagRec_defaultFormat)
732             {
733                 condition = get_diagnostic(
734                     z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat);
735                 
736             }
737         }
738         if (!session_restarted &&
739             condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR)
740         {
741             bc->remove_backend(found_backend);
742             session_restarted = true;
743             found_backend.reset();
744             goto restart;
745
746         }
747
748         if (condition)
749         {
750             mp::odr odr;
751             if (apdu_req->which == Z_APDU_searchRequest)
752             {
753                 Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 
754                                                            0, 0);
755                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
756                 *f_resp->searchStatus = Z_SearchResponse_none;
757                 f_resp->records = z_records;
758                 package.response() = f_apdu;
759             }
760             if (apdu_req->which == Z_APDU_presentRequest)
761             {
762                 Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 
763                                                             0, 0);
764                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
765                 f_resp->records = z_records;
766                 package.response() = f_apdu;
767             }
768             bc->release_backend(found_backend);
769             return; // search error 
770         }
771     }
772     if (!session_restarted && new_set->m_result_set_size < 0)
773     {
774         bc->remove_backend(found_backend);
775         session_restarted = true;
776         found_backend.reset();
777         goto restart;
778     }
779
780     found_set = new_set;
781     found_set->timestamp();
782     found_backend->m_sets.push_back(found_set);
783 }
784
785 void yf::SessionShared::Frontend::search(mp::Package &package,
786                                          Z_APDU *apdu_req)
787 {
788     Z_SearchRequest *req = apdu_req->u.searchRequest;
789     FrontendSets::iterator fset_it = 
790         m_frontend_sets.find(req->resultSetName);
791     if (fset_it != m_frontend_sets.end())
792     {
793         // result set already exist 
794         // if replace indicator is off: we return diagnostic if
795         // result set already exist.
796         if (*req->replaceIndicator == 0)
797         {
798             mp::odr odr;
799             Z_APDU *apdu = 
800                 odr.create_searchResponse(
801                     apdu_req,
802                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
803                     0);
804             package.response() = apdu;
805             
806             return;
807         }
808         m_frontend_sets.erase(fset_it);
809     }
810     
811     yazpp_1::Yaz_Z_Query query;
812     query.set_Z_Query(req->query);
813     Databases databases;
814     int i;
815     for (i = 0; i<req->num_databaseNames; i++)
816         databases.push_back(req->databaseNames[i]);
817
818     BackendSetPtr found_set; // null
819     BackendInstancePtr found_backend; // null
820
821     get_set(package, apdu_req, databases, query, found_backend, found_set);
822     if (!found_set)
823         return;
824
825     mp::odr odr;
826     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
827     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
828     *f_resp->resultCount = found_set->m_result_set_size;
829     package.response() = f_apdu;
830
831     FrontendSetPtr fset(new FrontendSet(databases, query));
832     m_frontend_sets[req->resultSetName] = fset;
833
834     m_backend_class->release_backend(found_backend);
835 }
836
837 void yf::SessionShared::Frontend::present(mp::Package &package,
838                                           Z_APDU *apdu_req)
839 {
840     mp::odr odr;
841     Z_PresentRequest *req = apdu_req->u.presentRequest;
842
843     FrontendSets::iterator fset_it = 
844         m_frontend_sets.find(req->resultSetId);
845
846     if (fset_it == m_frontend_sets.end())
847     {
848         Z_APDU *apdu = 
849             odr.create_presentResponse(
850                 apdu_req,
851                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
852                 req->resultSetId);
853         package.response() = apdu;
854         return;
855     }
856     FrontendSetPtr fset = fset_it->second;
857
858     Databases databases = fset->get_databases();
859     yazpp_1::Yaz_Z_Query query = fset->get_query();
860
861     BackendClassPtr bc = m_backend_class;
862     BackendSetPtr found_set; // null
863     BackendInstancePtr found_backend;
864
865     get_set(package, apdu_req, databases, query, found_backend, found_set);
866     if (!found_set)
867         return;
868
869     Z_NamePlusRecordList *npr_res = 0;
870     if (found_set->m_record_cache.lookup(odr, &npr_res, 
871                                          *req->resultSetStartPoint,
872                                          *req->numberOfRecordsRequested,
873                                          req->preferredRecordSyntax,
874                                          req->recordComposition))
875     {
876         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
877         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
878
879         yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF 
880                 " records in cache %p",
881                 *req->resultSetStartPoint,                      
882                 *req->numberOfRecordsRequested,
883                 &found_set->m_record_cache);        
884
885         *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested;
886         *f_resp->nextResultSetPosition = 
887             *req->resultSetStartPoint + *req->numberOfRecordsRequested;
888         // f_resp->presentStatus assumed OK.
889         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
890         f_resp->records->which = Z_Records_DBOSD;
891         f_resp->records->u.databaseOrSurDiagnostics = npr_res;
892         package.response() = f_apdu_res;
893         bc->release_backend(found_backend);
894         return;
895     }
896
897     found_backend->timestamp();
898                               
899     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
900     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
901     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
902     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
903     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
904     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
905     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
906     p_req->recordComposition = req->recordComposition;
907
908     Package present_package(found_backend->m_session, package.origin());
909     present_package.copy_filter(package);
910
911     present_package.request() = p_apdu;
912
913     present_package.move();
914
915     Z_GDU *gdu = present_package.response().get();
916     if (!present_package.session().is_closed()
917         && gdu && gdu->which == Z_GDU_Z3950 
918         && gdu->u.z3950->which == Z_APDU_presentResponse)
919     {
920         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
921         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
922         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
923
924         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
925         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
926         f_resp->presentStatus= b_resp->presentStatus;
927         f_resp->records = b_resp->records;
928         f_resp->otherInfo = b_resp->otherInfo;
929         package.response() = f_apdu_res;
930
931         if (b_resp->records && b_resp->records->which ==  Z_Records_DBOSD)
932         {
933             yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF
934                     " records to cache %p",
935                     *req->resultSetStartPoint,                      
936                     *f_resp->numberOfRecordsReturned,
937                     &found_set->m_record_cache);        
938             found_set->m_record_cache.add(
939                 odr,
940                 b_resp->records->u.databaseOrSurDiagnostics,
941                 *req->resultSetStartPoint,                      
942                 *f_resp->numberOfRecordsReturned);
943         }
944         bc->release_backend(found_backend);
945     }
946     else
947     {
948         bc->remove_backend(found_backend);
949         Z_APDU *f_apdu_res = 
950             odr.create_presentResponse(
951                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
952         package.response() = f_apdu_res;
953     }
954 }
955
956 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
957                                        Z_APDU *apdu_req)
958 {
959     BackendClassPtr bc = m_backend_class;
960     BackendInstancePtr backend = bc->get_backend(frontend_package);
961     if (!backend)
962     {
963         mp::odr odr;
964         Z_APDU *apdu = odr.create_scanResponse(
965             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
966         frontend_package.response() = apdu;
967     }
968     else
969     {
970         Package scan_package(backend->m_session, frontend_package.origin());
971         backend->timestamp();
972         scan_package.copy_filter(frontend_package);
973         scan_package.request() = apdu_req;
974         scan_package.move();
975         frontend_package.response() = scan_package.response();
976         if (scan_package.session().is_closed())
977         {
978             frontend_package.session().close();
979             bc->remove_backend(backend);
980         }
981         else
982             bc->release_backend(backend);
983     }
984 }
985
986 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
987 {
988 }
989
990 void yf::SessionShared::Worker::operator() (void)
991 {
992     m_p->expire();
993 }
994
995 void yf::SessionShared::BackendClass::expire_class()
996 {
997     time_t now;
998     time(&now);
999     boost::mutex::scoped_lock lock(m_mutex_backend_class);
1000     BackendInstanceList::iterator bit = m_backend_list.begin();
1001     while (bit != m_backend_list.end())
1002     {
1003         time_t last_use = (*bit)->m_time_last_use;
1004         
1005         if ((*bit)->m_in_use)
1006         {
1007             bit++;
1008         }
1009         else if (now < last_use || now - last_use > m_backend_expiry_ttl)
1010         {
1011             mp::odr odr;
1012             (*bit)->m_close_package->response() = odr.create_close(
1013                 0, Z_Close_lackOfActivity, 0);
1014             (*bit)->m_close_package->session().close();
1015             (*bit)->m_close_package->move();
1016
1017             bit = m_backend_list.erase(bit);
1018         }
1019         else
1020         {
1021             bit++;
1022         }
1023     }
1024 }
1025
1026 void yf::SessionShared::Rep::expire()
1027 {
1028     while (true)
1029     {
1030         boost::xtime xt;
1031         boost::xtime_get(&xt, boost::TIME_UTC);
1032         xt.sec += m_session_ttl / 3;
1033         boost::thread::sleep(xt);
1034         
1035         BackendClassMap::const_iterator b_it = m_backend_map.begin();
1036         for (; b_it != m_backend_map.end(); b_it++)
1037             b_it->second->expire_class();
1038     }
1039 }
1040
1041 yf::SessionShared::Rep::Rep()
1042 {
1043     m_resultset_ttl = 30;
1044     m_resultset_max = 10;
1045     m_session_ttl = 90;
1046     m_optimize_search = true;
1047 }
1048
1049 void yf::SessionShared::Rep::start()
1050 {
1051     yf::SessionShared::Worker w(this);
1052     m_thrds.add_thread(new boost::thread(w));
1053 }
1054
1055 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
1056 {
1057 }
1058
1059 yf::SessionShared::~SessionShared() {
1060 }
1061
1062 void yf::SessionShared::start() const
1063 {
1064     m_p->start();
1065 }
1066
1067 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
1068 {
1069 }
1070
1071 yf::SessionShared::Frontend::~Frontend()
1072 {
1073 }
1074
1075 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
1076 {
1077     boost::mutex::scoped_lock lock(m_mutex);
1078
1079     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1080     
1081     while(true)
1082     {
1083         it = m_clients.find(package.session());
1084         if (it == m_clients.end())
1085             break;
1086         
1087         if (!it->second->m_in_use)
1088         {
1089             it->second->m_in_use = true;
1090             return it->second;
1091         }
1092         m_cond_session_ready.wait(lock);
1093     }
1094     FrontendPtr f(new Frontend(this));
1095     m_clients[package.session()] = f;
1096     f->m_in_use = true;
1097     return f;
1098 }
1099
1100 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
1101 {
1102     boost::mutex::scoped_lock lock(m_mutex);
1103     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1104     
1105     it = m_clients.find(package.session());
1106     if (it != m_clients.end())
1107     {
1108         if (package.session().is_closed())
1109         {
1110             m_clients.erase(it);
1111         }
1112         else
1113         {
1114             it->second->m_in_use = false;
1115         }
1116         m_cond_session_ready.notify_all();
1117     }
1118 }
1119
1120
1121 void yf::SessionShared::process(mp::Package &package) const
1122 {
1123     FrontendPtr f = m_p->get_frontend(package);
1124
1125     Z_GDU *gdu = package.request().get();
1126     
1127     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1128         Z_APDU_initRequest && !f->m_is_virtual)
1129     {
1130         m_p->init(package, gdu, f);
1131     }
1132     else if (!f->m_is_virtual)
1133         package.move();
1134     else if (gdu && gdu->which == Z_GDU_Z3950)
1135     {
1136         Z_APDU *apdu = gdu->u.z3950;
1137         if (apdu->which == Z_APDU_initRequest)
1138         {
1139             mp::odr odr;
1140             
1141             package.response() = odr.create_close(
1142                 apdu,
1143                 Z_Close_protocolError,
1144                 "double init");
1145             
1146             package.session().close();
1147         }
1148         else if (apdu->which == Z_APDU_close)
1149         {
1150             mp::odr odr;
1151             
1152             package.response() = odr.create_close(
1153                 apdu,
1154                 Z_Close_peerAbort, "received close from client");
1155             package.session().close();
1156         }
1157         else if (apdu->which == Z_APDU_searchRequest)
1158         {
1159             f->search(package, apdu);
1160         }
1161         else if (apdu->which == Z_APDU_presentRequest)
1162         {
1163             f->present(package, apdu);
1164         }
1165         else if (apdu->which == Z_APDU_scanRequest)
1166         {
1167             f->scan(package, apdu);
1168         }
1169         else
1170         {
1171             mp::odr odr;
1172             
1173             package.response() = odr.create_close(
1174                 apdu, Z_Close_protocolError,
1175                 "unsupported APDU in filter_session_shared");
1176             
1177             package.session().close();
1178         }
1179     }
1180     m_p->release_frontend(package);
1181 }
1182
1183 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only,
1184                                   const char *path)
1185 {
1186     for (ptr = ptr->children; ptr; ptr = ptr->next)
1187     {
1188         if (ptr->type != XML_ELEMENT_NODE)
1189             continue;
1190         if (!strcmp((const char *) ptr->name, "resultset"))
1191         {
1192             const struct _xmlAttr *attr;
1193             for (attr = ptr->properties; attr; attr = attr->next)
1194             {
1195                 if (!strcmp((const char *) attr->name, "ttl"))
1196                     m_p->m_resultset_ttl = 
1197                         mp::xml::get_int(attr->children, 30);
1198                 else if (!strcmp((const char *) attr->name, "max"))
1199                 {
1200                     m_p->m_resultset_max = 
1201                         mp::xml::get_int(attr->children, 10);
1202                 }
1203                 else if (!strcmp((const char *) attr->name, "optimizesearch"))
1204                 {
1205                     m_p->m_optimize_search =
1206                         mp::xml::get_bool(attr->children, true);
1207                 }
1208                 else
1209                     throw mp::filter::FilterException(
1210                         "Bad attribute " + std::string((const char *)
1211                                                        attr->name));
1212             }
1213         }
1214         else if (!strcmp((const char *) ptr->name, "session"))
1215         {
1216             const struct _xmlAttr *attr;
1217             for (attr = ptr->properties; attr; attr = attr->next)
1218             {
1219                 if (!strcmp((const char *) attr->name, "ttl"))
1220                     m_p->m_session_ttl = 
1221                         mp::xml::get_int(attr->children, 120);
1222                 else
1223                     throw mp::filter::FilterException(
1224                         "Bad attribute " + std::string((const char *)
1225                                                        attr->name));
1226             }
1227         }
1228         else
1229         {
1230             throw mp::filter::FilterException("Bad element " 
1231                                                + std::string((const char *)
1232                                                              ptr->name));
1233         }
1234     }
1235 }
1236
1237 static mp::filter::Base* filter_creator()
1238 {
1239     return new mp::filter::SessionShared;
1240 }
1241
1242 extern "C" {
1243     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1244         0,
1245         "session_shared",
1246         filter_creator
1247     };
1248 }
1249
1250 /*
1251  * Local variables:
1252  * c-basic-offset: 4
1253  * c-file-style: "Stroustrup"
1254  * indent-tabs-mode: nil
1255  * End:
1256  * vim: shiftwidth=4 tabstop=8 expandtab
1257  */
1258