Happy new year
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2012 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include <metaproxy/filter.hpp>
22 #include <metaproxy/package.hpp>
23
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30
31 #include <metaproxy/util.hpp>
32 #include "filter_session_shared.hpp"
33
34 #include <yaz/log.h>
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yazpp/z-query.h>
39 #include <yazpp/record-cache.h>
40 #include <map>
41 #include <iostream>
42 #include <time.h>
43
44 namespace mp = metaproxy_1;
45 namespace yf = metaproxy_1::filter;
46
47 namespace metaproxy_1 {
48
49     namespace filter {
50         // key for session.. We'll only share sessions with same InitKey
51         class SessionShared::InitKey {
52         public:
53             bool operator < (const SessionShared::InitKey &k) const;
54             InitKey(Z_InitRequest *req);
55             InitKey(const InitKey &);
56             ~InitKey();
57         private:
58             char *m_idAuthentication_buf;
59             int m_idAuthentication_size;
60             char *m_otherInfo_buf;
61             int m_otherInfo_size;
62             ODR m_odr;
63         };
64         // worker thread .. for expiry of sessions
65         class SessionShared::Worker {
66         public:
67             Worker(SessionShared::Rep *rep);
68             void operator() (void);
69         private:
70             SessionShared::Rep *m_p;
71         };
72         // backend result set
73         class SessionShared::BackendSet {
74         public:
75             std::string m_result_set_id;
76             Databases m_databases;
77             int m_result_set_size;
78             yazpp_1::Yaz_Z_Query m_query;
79             time_t m_time_last_use;
80             void timestamp();
81             yazpp_1::RecordCache m_record_cache;
82             BackendSet(
83                 const std::string &result_set_id,
84                 const Databases &databases,
85                 const yazpp_1::Yaz_Z_Query &query);
86             bool search(
87                 Package &frontend_package,
88                 Package &search_package,
89                 const Z_APDU *apdu_req,
90                 const BackendInstancePtr bp,
91                 Z_Records **z_records);
92         };
93         // backend connection instance
94         class SessionShared::BackendInstance {
95             friend class Rep;
96             friend class BackendClass;
97             friend class BackendSet;
98         public:
99             mp::Session m_session;
100             BackendSetList m_sets;
101             bool m_in_use;
102             int m_sequence_this;
103             int m_result_set_sequence;
104             time_t m_time_last_use;
105             mp::Package * m_close_package;
106             ~BackendInstance();
107             void timestamp();
108         };
109         // backends of some class (all with same InitKey)
110         class SessionShared::BackendClass : boost::noncopyable {
111             friend class Rep;
112             friend struct Frontend;
113             bool m_named_result_sets;
114             BackendInstanceList m_backend_list;
115             BackendInstancePtr create_backend(const Package &package);
116             void remove_backend(BackendInstancePtr b);
117             BackendInstancePtr get_backend(const Package &package);
118             void use_backend(BackendInstancePtr b);
119             void release_backend(BackendInstancePtr b);
120             void expire_class();
121             yazpp_1::GDU m_init_request;
122             yazpp_1::GDU m_init_response;
123             boost::mutex m_mutex_backend_class;
124             int m_sequence_top;
125             time_t m_backend_set_ttl;
126             time_t m_backend_expiry_ttl;
127             size_t m_backend_set_max;
128         public:
129             BackendClass(const yazpp_1::GDU &init_request,
130                          int resultset_ttl,
131                          int resultset_max,
132                          int session_ttl);
133             ~BackendClass();
134         };
135         // frontend result set
136         class SessionShared::FrontendSet {
137             Databases m_databases;
138             yazpp_1::Yaz_Z_Query m_query;
139         public:
140             const Databases &get_databases();
141             const yazpp_1::Yaz_Z_Query &get_query();
142             FrontendSet(
143                 const Databases &databases,
144                 const yazpp_1::Yaz_Z_Query &query);
145             FrontendSet();
146         };
147         // frontend session
148         struct SessionShared::Frontend {
149             Frontend(Rep *rep);
150             ~Frontend();
151             bool m_is_virtual;
152             bool m_in_use;
153             Z_Options m_init_options;
154             void search(Package &package, Z_APDU *apdu);
155             void present(Package &package, Z_APDU *apdu);
156             void scan(Package &package, Z_APDU *apdu);
157
158             void get_set(mp::Package &package,
159                          const Z_APDU *apdu_req,
160                          const Databases &databases,
161                          yazpp_1::Yaz_Z_Query &query,
162                          BackendInstancePtr &found_backend,
163                          BackendSetPtr &found_set);
164             void override_set(BackendInstancePtr &found_backend,
165                               std::string &result_set_id);
166
167             Rep *m_p;
168             BackendClassPtr m_backend_class;
169             FrontendSets m_frontend_sets;
170         };            
171         // representation
172         class SessionShared::Rep {
173             friend class SessionShared;
174             friend struct Frontend;
175             
176             FrontendPtr get_frontend(Package &package);
177             void release_frontend(Package &package);
178             Rep();
179         public:
180             void expire();
181         private:
182             void init(Package &package, const Z_GDU *gdu,
183                       FrontendPtr frontend);
184             void start();
185             boost::mutex m_mutex;
186             boost::condition m_cond_session_ready;
187             std::map<mp::Session, FrontendPtr> m_clients;
188
189             BackendClassMap m_backend_map;
190             boost::mutex m_mutex_backend_map;
191             boost::thread_group m_thrds;
192             int m_resultset_ttl;
193             int m_resultset_max;
194             int m_session_ttl;
195             bool m_optimize_search;
196         };
197     }
198 }
199
200 yf::SessionShared::FrontendSet::FrontendSet(
201     const Databases &databases,
202     const yazpp_1::Yaz_Z_Query &query)
203     : m_databases(databases), m_query(query)
204 {
205 }
206
207 const yf::SessionShared::Databases & 
208 yf::SessionShared::FrontendSet::get_databases()
209 {
210     return m_databases;
211 }
212
213 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
214 {
215     return m_query;
216 }
217
218 yf::SessionShared::InitKey::InitKey(const InitKey &k)
219 {
220     m_odr = odr_createmem(ODR_ENCODE);
221     
222     m_idAuthentication_size =  k.m_idAuthentication_size;
223     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
224     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
225            m_idAuthentication_size);
226
227     m_otherInfo_size =  k.m_otherInfo_size;
228     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
229     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
230            m_otherInfo_size);
231 }
232
233 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
234 {
235     m_odr = odr_createmem(ODR_ENCODE);
236
237     Z_IdAuthentication *t = req->idAuthentication;
238     z_IdAuthentication(m_odr, &t, 1, 0);
239     m_idAuthentication_buf =
240         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
241
242     Z_OtherInformation *o = req->otherInfo;
243     z_OtherInformation(m_odr, &o, 1, 0);
244     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
245 }
246
247 yf::SessionShared::InitKey::~InitKey()
248 {
249     odr_destroy(m_odr);
250 }
251
252 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
253     const 
254 {
255     int c;
256     c = mp::util::memcmp2(
257         (void*) m_idAuthentication_buf, m_idAuthentication_size,
258         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
259     if (c < 0)
260         return true;
261     else if (c > 0)
262         return false;
263
264     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
265                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
266     if (c < 0)
267         return true;
268     else if (c > 0)
269         return false;
270     return false;
271 }
272
273 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
274 {
275     boost::mutex::scoped_lock lock(m_mutex_backend_class);
276     b->m_in_use = false;
277 }
278
279
280 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
281 {
282     BackendInstanceList::iterator it = m_backend_list.begin();
283     
284     while (it != m_backend_list.end())
285     {
286         if (*it == b)
287         {
288              mp::odr odr;
289             (*it)->m_close_package->response() = odr.create_close(
290                 0, Z_Close_lackOfActivity, 0);
291             (*it)->m_close_package->session().close();
292             (*it)->m_close_package->move();
293             
294             it = m_backend_list.erase(it);
295         }
296         else
297             it++;
298     }
299 }
300
301
302
303 yf::SessionShared::BackendInstancePtr 
304 yf::SessionShared::BackendClass::get_backend(
305     const mp::Package &frontend_package)
306 {
307     {
308         boost::mutex::scoped_lock lock(m_mutex_backend_class);
309         
310         BackendInstanceList::const_iterator it = m_backend_list.begin();
311         
312         BackendInstancePtr backend1; // null
313         
314         for (; it != m_backend_list.end(); it++)
315         {
316             if (!(*it)->m_in_use)
317             {
318                 if (!backend1 
319                     || (*it)->m_sequence_this < backend1->m_sequence_this)
320                     backend1 = *it;
321             }
322         }
323         if (backend1)
324         {
325             use_backend(backend1);
326             return backend1;
327         }
328     }
329     return create_backend(frontend_package);
330 }
331
332 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
333 {
334     backend->m_in_use = true;
335     backend->m_sequence_this = m_sequence_top++;
336 }
337
338 void yf::SessionShared::BackendInstance::timestamp()
339 {
340     assert(m_in_use);
341     time(&m_time_last_use);
342 }
343
344 yf::SessionShared::BackendInstance::~BackendInstance()
345 {
346     delete m_close_package;
347 }
348
349 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
350     const mp::Package &frontend_package)
351 {
352     BackendInstancePtr bp(new BackendInstance);
353     BackendInstancePtr null;
354
355     bp->m_close_package =
356         new mp::Package(bp->m_session, frontend_package.origin());
357     bp->m_close_package->copy_filter(frontend_package);
358
359     Package init_package(bp->m_session, frontend_package.origin());
360
361     init_package.copy_filter(frontend_package);
362
363     yazpp_1::GDU actual_init_request = m_init_request;
364     Z_GDU *init_pdu = actual_init_request.get();
365
366     assert(init_pdu->which == Z_GDU_Z3950);
367     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
368
369     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
370     ODR_MASK_ZERO(req->options);
371
372     ODR_MASK_SET(req->options, Z_Options_search);
373     ODR_MASK_SET(req->options, Z_Options_present);
374     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
375     ODR_MASK_SET(req->options, Z_Options_scan);
376
377     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
378     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
379     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
380
381     init_package.request() = init_pdu;
382
383     init_package.move();
384
385     boost::mutex::scoped_lock lock(m_mutex_backend_class);
386
387     m_named_result_sets = false;
388     Z_GDU *gdu = init_package.response().get();
389     if (init_package.session().is_closed())
390     {
391         /* already closed. We don't know why */
392         return null;
393     }
394     else if (gdu && gdu->which == Z_GDU_Z3950 
395              && gdu->u.z3950->which == Z_APDU_initResponse
396              && *gdu->u.z3950->u.initResponse->result)
397     {
398         /* successful init response */
399         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
400         m_init_response = gdu->u.z3950;
401         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
402         {
403             m_named_result_sets = true;
404         }
405     }
406     else
407     {
408         /* not init or init rejected */
409         init_package.copy_filter(frontend_package);
410         init_package.session().close();
411         init_package.move();
412         return null;
413     }
414     bp->m_in_use = true;
415     time(&bp->m_time_last_use);
416     bp->m_sequence_this = 0;
417     bp->m_result_set_sequence = 0;
418     m_backend_list.push_back(bp);
419
420     return bp;
421 }
422
423
424 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
425                                               int resultset_ttl,
426                                               int resultset_max,
427                                               int session_ttl)
428     : m_named_result_sets(false), m_init_request(init_request),
429       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
430       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
431 {}
432
433 yf::SessionShared::BackendClass::~BackendClass()
434 {}
435
436 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
437                                   FrontendPtr frontend)
438 {
439     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
440
441     frontend->m_is_virtual = true;
442     frontend->m_init_options = *req->options;
443     InitKey k(req);
444     {
445         boost::mutex::scoped_lock lock(m_mutex_backend_map);
446         BackendClassMap::const_iterator it;
447         it = m_backend_map.find(k);
448         if (it == m_backend_map.end())
449         {
450             BackendClassPtr b(new BackendClass(gdu->u.z3950,
451                                                m_resultset_ttl,
452                                                m_resultset_max,
453                                                m_session_ttl));
454             m_backend_map[k] = b;
455             frontend->m_backend_class = b;
456         }
457         else
458         {
459             frontend->m_backend_class = it->second;            
460         }
461     }
462     BackendClassPtr bc = frontend->m_backend_class;
463     BackendInstancePtr backend = bc->get_backend(package);
464     
465     mp::odr odr;
466     if (!backend)
467     {
468         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
469         *apdu->u.initResponse->result = 0;
470         package.response() = apdu;
471         package.session().close();
472     }
473     else
474     {
475         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
476         yazpp_1::GDU init_response = bc->m_init_response;
477         Z_GDU *response_gdu = init_response.get();
478         mp::util::transfer_referenceId(odr, gdu->u.z3950,
479                                        response_gdu->u.z3950);
480
481         Z_Options *server_options =
482             response_gdu->u.z3950->u.initResponse->options;
483         Z_Options *client_options = &frontend->m_init_options;
484
485         int i;
486         for (i = 0; i<30; i++)
487             if (!ODR_MASK_GET(client_options, i))
488                 ODR_MASK_CLEAR(server_options, i);
489         package.response() = init_response;
490     }
491     if (backend)
492         bc->release_backend(backend);
493 }
494
495 void yf::SessionShared::BackendSet::timestamp()
496 {
497     time(&m_time_last_use);
498 }
499
500 yf::SessionShared::BackendSet::BackendSet(
501     const std::string &result_set_id,
502     const Databases &databases,
503     const yazpp_1::Yaz_Z_Query &query) :
504     m_result_set_id(result_set_id),
505     m_databases(databases), m_result_set_size(0), m_query(query) 
506 {
507     timestamp();
508 }
509
510 static int get_diagnostic(Z_DefaultDiagFormat *r)
511 {
512     return *r->condition;
513 }
514
515 bool yf::SessionShared::BackendSet::search(
516     mp::Package &frontend_package,
517     mp::Package &search_package,
518     const Z_APDU *frontend_apdu,
519     const BackendInstancePtr bp,
520     Z_Records **z_records)
521 {
522     mp::odr odr;
523     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
524     Z_SearchRequest *req = apdu_req->u.searchRequest;
525
526     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
527     req->query = m_query.get_Z_Query();
528
529     req->num_databaseNames = m_databases.size();
530     req->databaseNames = (char**) 
531         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
532     Databases::const_iterator it = m_databases.begin();
533     size_t i = 0;
534     for (; it != m_databases.end(); it++)
535         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
536
537     if (frontend_apdu->which == Z_APDU_searchRequest)
538         req->preferredRecordSyntax =
539             frontend_apdu->u.searchRequest->preferredRecordSyntax;
540
541     search_package.request() = apdu_req;
542
543     search_package.move();
544
545     Z_GDU *gdu = search_package.response().get();
546     if (!search_package.session().is_closed()
547         && gdu && gdu->which == Z_GDU_Z3950 
548         && gdu->u.z3950->which == Z_APDU_searchResponse)
549     {
550         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
551         *z_records = b_resp->records;
552         m_result_set_size = *b_resp->resultCount;
553         return true;
554     }
555     Z_APDU *f_apdu = 0;
556     if (frontend_apdu->which == Z_APDU_searchRequest)
557         f_apdu = odr.create_searchResponse(
558             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
559     else if (frontend_apdu->which == Z_APDU_presentRequest)
560         f_apdu = odr.create_presentResponse(
561             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
562     else
563         f_apdu = odr.create_close(
564             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
565     frontend_package.response() = f_apdu;
566     return false;
567 }
568
569 void yf::SessionShared::Frontend::override_set(
570     BackendInstancePtr &found_backend,
571     std::string &result_set_id)
572 {
573     BackendClassPtr bc = m_backend_class;
574     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
575     time_t now;
576     time(&now);
577     
578     for (; it != bc->m_backend_list.end(); it++)
579     {
580         if (!(*it)->m_in_use)
581         {
582             BackendSetList::iterator set_it = (*it)->m_sets.begin();
583             for (; set_it != (*it)->m_sets.end(); set_it++)
584             {
585                 if (now >= (*set_it)->m_time_last_use &&
586                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
587                 {
588                     found_backend = *it;
589                     result_set_id = (*set_it)->m_result_set_id;
590                     found_backend->m_sets.erase(set_it);
591                     return;
592                 }
593             }
594         }
595     }
596     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
597     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
598     {
599         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
600         {
601             found_backend = *it;
602             if (bc->m_named_result_sets)
603             {
604                 result_set_id = boost::io::str(
605                     boost::format("%1%") % 
606                     found_backend->m_result_set_sequence);
607                 found_backend->m_result_set_sequence++;
608             }
609             else
610                 result_set_id = "default";
611             return;
612         }
613     }
614 }
615
616 void yf::SessionShared::Frontend::get_set(mp::Package &package,
617                                           const Z_APDU *apdu_req,
618                                           const Databases &databases,
619                                           yazpp_1::Yaz_Z_Query &query,
620                                           BackendInstancePtr &found_backend,
621                                           BackendSetPtr &found_set)
622 {
623     bool session_restarted = false;
624
625 restart:
626     std::string result_set_id;
627     BackendClassPtr bc = m_backend_class;
628     {
629         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
630         
631         if (m_p->m_optimize_search)
632         {
633             // look at each backend and see if we have a similar search
634             BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
635             for (; it != bc->m_backend_list.end(); it++)
636             {
637                 if (!(*it)->m_in_use)
638                 {
639                     BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
640                     for (; set_it != (*it)->m_sets.end(); set_it++)
641                     {
642                         if ((*set_it)->m_databases == databases
643                             && query.match(&(*set_it)->m_query))
644                         {
645                             found_set = *set_it;
646                             found_backend = *it;
647                             bc->use_backend(found_backend);
648                             found_set->timestamp();
649                             // found matching set. No need to search again
650                             return;
651                         }
652                     }
653                 }
654             }
655         }
656         override_set(found_backend, result_set_id);
657         if (found_backend)
658             bc->use_backend(found_backend);
659     }
660     if (!found_backend)
661     {
662         // create a new backend set (and new set)
663         found_backend = bc->create_backend(package);
664
665         if (!found_backend)
666         {
667             Z_APDU *f_apdu = 0;
668             mp::odr odr;
669             if (apdu_req->which == Z_APDU_searchRequest)
670             {
671                 f_apdu = odr.create_searchResponse(
672                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
673             }
674             else if (apdu_req->which == Z_APDU_presentRequest)
675             {
676                 f_apdu = odr.create_presentResponse(
677                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
678             }
679             else
680             {
681                 f_apdu = odr.create_close(
682                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
683             }
684             package.response() = f_apdu;
685             return;
686         }
687         if (bc->m_named_result_sets)
688         {
689             result_set_id = boost::io::str(
690                 boost::format("%1%") % found_backend->m_result_set_sequence);
691             found_backend->m_result_set_sequence++;
692         }
693         else
694             result_set_id = "default";
695     }
696     found_backend->timestamp();
697
698     // we must search ...
699     BackendSetPtr new_set(new BackendSet(result_set_id,
700                                          databases, query));
701     Z_Records *z_records = 0;
702
703     Package search_package(found_backend->m_session, package.origin());
704     search_package.copy_filter(package);
705
706     if (!new_set->search(package, search_package,
707                          apdu_req, found_backend, &z_records))
708     {
709         bc->remove_backend(found_backend);
710         return; // search error 
711     }
712
713     if (z_records)
714     {
715         int condition = 0;
716         if (z_records->which == Z_Records_NSD)
717         {
718             condition =
719                 get_diagnostic(z_records->u.nonSurrogateDiagnostic);
720         }
721         else if (z_records->which == Z_Records_multipleNSD)
722         {
723             if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1
724                 && 
725                 
726                 z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which ==
727                 Z_DiagRec_defaultFormat)
728             {
729                 condition = get_diagnostic(
730                     z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat);
731                 
732             }
733         }
734         if (!session_restarted &&
735             condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR)
736         {
737             bc->remove_backend(found_backend);
738             session_restarted = true;
739             found_backend.reset();
740             goto restart;
741
742         }
743
744         if (condition)
745         {
746             mp::odr odr;
747             if (apdu_req->which == Z_APDU_searchRequest)
748             {
749                 Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 
750                                                            0, 0);
751                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
752                 *f_resp->searchStatus = Z_SearchResponse_none;
753                 f_resp->records = z_records;
754                 package.response() = f_apdu;
755             }
756             if (apdu_req->which == Z_APDU_presentRequest)
757             {
758                 Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 
759                                                             0, 0);
760                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
761                 f_resp->records = z_records;
762                 package.response() = f_apdu;
763             }
764             bc->release_backend(found_backend);
765             return; // search error 
766         }
767     }
768     if (!session_restarted && new_set->m_result_set_size < 0)
769     {
770         bc->remove_backend(found_backend);
771         session_restarted = true;
772         found_backend.reset();
773         goto restart;
774     }
775
776     found_set = new_set;
777     found_set->timestamp();
778     found_backend->m_sets.push_back(found_set);
779 }
780
781 void yf::SessionShared::Frontend::search(mp::Package &package,
782                                          Z_APDU *apdu_req)
783 {
784     Z_SearchRequest *req = apdu_req->u.searchRequest;
785     FrontendSets::iterator fset_it = 
786         m_frontend_sets.find(req->resultSetName);
787     if (fset_it != m_frontend_sets.end())
788     {
789         // result set already exist 
790         // if replace indicator is off: we return diagnostic if
791         // result set already exist.
792         if (*req->replaceIndicator == 0)
793         {
794             mp::odr odr;
795             Z_APDU *apdu = 
796                 odr.create_searchResponse(
797                     apdu_req,
798                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
799                     0);
800             package.response() = apdu;
801             
802             return;
803         }
804         m_frontend_sets.erase(fset_it);
805     }
806     
807     yazpp_1::Yaz_Z_Query query;
808     query.set_Z_Query(req->query);
809     Databases databases;
810     int i;
811     for (i = 0; i<req->num_databaseNames; i++)
812         databases.push_back(req->databaseNames[i]);
813
814     BackendSetPtr found_set; // null
815     BackendInstancePtr found_backend; // null
816
817     get_set(package, apdu_req, databases, query, found_backend, found_set);
818     if (!found_set)
819         return;
820
821     mp::odr odr;
822     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
823     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
824     *f_resp->resultCount = found_set->m_result_set_size;
825     package.response() = f_apdu;
826
827     FrontendSetPtr fset(new FrontendSet(databases, query));
828     m_frontend_sets[req->resultSetName] = fset;
829
830     m_backend_class->release_backend(found_backend);
831 }
832
833 void yf::SessionShared::Frontend::present(mp::Package &package,
834                                           Z_APDU *apdu_req)
835 {
836     mp::odr odr;
837     Z_PresentRequest *req = apdu_req->u.presentRequest;
838
839     FrontendSets::iterator fset_it = 
840         m_frontend_sets.find(req->resultSetId);
841
842     if (fset_it == m_frontend_sets.end())
843     {
844         Z_APDU *apdu = 
845             odr.create_presentResponse(
846                 apdu_req,
847                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
848                 req->resultSetId);
849         package.response() = apdu;
850         return;
851     }
852     FrontendSetPtr fset = fset_it->second;
853
854     Databases databases = fset->get_databases();
855     yazpp_1::Yaz_Z_Query query = fset->get_query();
856
857     BackendClassPtr bc = m_backend_class;
858     BackendSetPtr found_set; // null
859     BackendInstancePtr found_backend;
860
861     get_set(package, apdu_req, databases, query, found_backend, found_set);
862     if (!found_set)
863         return;
864
865     Z_NamePlusRecordList *npr_res = 0;
866     if (found_set->m_record_cache.lookup(odr, &npr_res, 
867                                          *req->resultSetStartPoint,
868                                          *req->numberOfRecordsRequested,
869                                          req->preferredRecordSyntax,
870                                          req->recordComposition))
871     {
872         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
873         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
874
875         yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF 
876                 " records in cache %p",
877                 *req->resultSetStartPoint,                      
878                 *req->numberOfRecordsRequested,
879                 &found_set->m_record_cache);        
880
881         *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested;
882         *f_resp->nextResultSetPosition = 
883             *req->resultSetStartPoint + *req->numberOfRecordsRequested;
884         // f_resp->presentStatus assumed OK.
885         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
886         f_resp->records->which = Z_Records_DBOSD;
887         f_resp->records->u.databaseOrSurDiagnostics = npr_res;
888         package.response() = f_apdu_res;
889         bc->release_backend(found_backend);
890         return;
891     }
892
893     found_backend->timestamp();
894                               
895     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
896     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
897     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
898     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
899     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
900     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
901     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
902     p_req->recordComposition = req->recordComposition;
903
904     Package present_package(found_backend->m_session, package.origin());
905     present_package.copy_filter(package);
906
907     present_package.request() = p_apdu;
908
909     present_package.move();
910
911     Z_GDU *gdu = present_package.response().get();
912     if (!present_package.session().is_closed()
913         && gdu && gdu->which == Z_GDU_Z3950 
914         && gdu->u.z3950->which == Z_APDU_presentResponse)
915     {
916         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
917         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
918         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
919
920         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
921         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
922         f_resp->presentStatus= b_resp->presentStatus;
923         f_resp->records = b_resp->records;
924         f_resp->otherInfo = b_resp->otherInfo;
925         package.response() = f_apdu_res;
926
927         if (b_resp->records && b_resp->records->which ==  Z_Records_DBOSD)
928         {
929             yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF
930                     " records to cache %p",
931                     *req->resultSetStartPoint,                      
932                     *f_resp->numberOfRecordsReturned,
933                     &found_set->m_record_cache);        
934             found_set->m_record_cache.add(
935                 odr,
936                 b_resp->records->u.databaseOrSurDiagnostics,
937                 *req->resultSetStartPoint,                      
938                 *f_resp->numberOfRecordsReturned);
939         }
940         bc->release_backend(found_backend);
941     }
942     else
943     {
944         bc->remove_backend(found_backend);
945         Z_APDU *f_apdu_res = 
946             odr.create_presentResponse(
947                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
948         package.response() = f_apdu_res;
949     }
950 }
951
952 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
953                                        Z_APDU *apdu_req)
954 {
955     BackendClassPtr bc = m_backend_class;
956     BackendInstancePtr backend = bc->get_backend(frontend_package);
957     if (!backend)
958     {
959         mp::odr odr;
960         Z_APDU *apdu = odr.create_scanResponse(
961             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
962         frontend_package.response() = apdu;
963     }
964     else
965     {
966         Package scan_package(backend->m_session, frontend_package.origin());
967         backend->timestamp();
968         scan_package.copy_filter(frontend_package);
969         scan_package.request() = apdu_req;
970         scan_package.move();
971         frontend_package.response() = scan_package.response();
972         if (scan_package.session().is_closed())
973         {
974             frontend_package.session().close();
975             bc->remove_backend(backend);
976         }
977         else
978             bc->release_backend(backend);
979     }
980 }
981
982 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
983 {
984 }
985
986 void yf::SessionShared::Worker::operator() (void)
987 {
988     m_p->expire();
989 }
990
991 void yf::SessionShared::BackendClass::expire_class()
992 {
993     time_t now;
994     time(&now);
995     boost::mutex::scoped_lock lock(m_mutex_backend_class);
996     BackendInstanceList::iterator bit = m_backend_list.begin();
997     while (bit != m_backend_list.end())
998     {
999         time_t last_use = (*bit)->m_time_last_use;
1000         
1001         if ((*bit)->m_in_use)
1002         {
1003             bit++;
1004         }
1005         else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
1006             || (now < last_use))
1007         {
1008             mp::odr odr;
1009             (*bit)->m_close_package->response() = odr.create_close(
1010                 0, Z_Close_lackOfActivity, 0);
1011             (*bit)->m_close_package->session().close();
1012             (*bit)->m_close_package->move();
1013
1014             bit = m_backend_list.erase(bit);
1015         }
1016         else
1017         {
1018             bit++;
1019         }
1020     }
1021 }
1022
1023 void yf::SessionShared::Rep::expire()
1024 {
1025     while (true)
1026     {
1027         boost::xtime xt;
1028         boost::xtime_get(&xt, boost::TIME_UTC);
1029         xt.sec += m_session_ttl / 3;
1030         boost::thread::sleep(xt);
1031         
1032         BackendClassMap::const_iterator b_it = m_backend_map.begin();
1033         for (; b_it != m_backend_map.end(); b_it++)
1034             b_it->second->expire_class();
1035     }
1036 }
1037
1038 yf::SessionShared::Rep::Rep()
1039 {
1040     m_resultset_ttl = 30;
1041     m_resultset_max = 10;
1042     m_session_ttl = 90;
1043     m_optimize_search = true;
1044 }
1045
1046 void yf::SessionShared::Rep::start()
1047 {
1048     yf::SessionShared::Worker w(this);
1049     m_thrds.add_thread(new boost::thread(w));
1050 }
1051
1052 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
1053 {
1054 }
1055
1056 yf::SessionShared::~SessionShared() {
1057 }
1058
1059 void yf::SessionShared::start() const
1060 {
1061     m_p->start();
1062 }
1063
1064 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
1065 {
1066 }
1067
1068 yf::SessionShared::Frontend::~Frontend()
1069 {
1070 }
1071
1072 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
1073 {
1074     boost::mutex::scoped_lock lock(m_mutex);
1075
1076     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1077     
1078     while(true)
1079     {
1080         it = m_clients.find(package.session());
1081         if (it == m_clients.end())
1082             break;
1083         
1084         if (!it->second->m_in_use)
1085         {
1086             it->second->m_in_use = true;
1087             return it->second;
1088         }
1089         m_cond_session_ready.wait(lock);
1090     }
1091     FrontendPtr f(new Frontend(this));
1092     m_clients[package.session()] = f;
1093     f->m_in_use = true;
1094     return f;
1095 }
1096
1097 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
1098 {
1099     boost::mutex::scoped_lock lock(m_mutex);
1100     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1101     
1102     it = m_clients.find(package.session());
1103     if (it != m_clients.end())
1104     {
1105         if (package.session().is_closed())
1106         {
1107             m_clients.erase(it);
1108         }
1109         else
1110         {
1111             it->second->m_in_use = false;
1112         }
1113         m_cond_session_ready.notify_all();
1114     }
1115 }
1116
1117
1118 void yf::SessionShared::process(mp::Package &package) const
1119 {
1120     FrontendPtr f = m_p->get_frontend(package);
1121
1122     Z_GDU *gdu = package.request().get();
1123     
1124     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1125         Z_APDU_initRequest && !f->m_is_virtual)
1126     {
1127         m_p->init(package, gdu, f);
1128     }
1129     else if (!f->m_is_virtual)
1130         package.move();
1131     else if (gdu && gdu->which == Z_GDU_Z3950)
1132     {
1133         Z_APDU *apdu = gdu->u.z3950;
1134         if (apdu->which == Z_APDU_initRequest)
1135         {
1136             mp::odr odr;
1137             
1138             package.response() = odr.create_close(
1139                 apdu,
1140                 Z_Close_protocolError,
1141                 "double init");
1142             
1143             package.session().close();
1144         }
1145         else if (apdu->which == Z_APDU_close)
1146         {
1147             mp::odr odr;
1148             
1149             package.response() = odr.create_close(
1150                 apdu,
1151                 Z_Close_peerAbort, "received close from client");
1152             package.session().close();
1153         }
1154         else if (apdu->which == Z_APDU_searchRequest)
1155         {
1156             f->search(package, apdu);
1157         }
1158         else if (apdu->which == Z_APDU_presentRequest)
1159         {
1160             f->present(package, apdu);
1161         }
1162         else if (apdu->which == Z_APDU_scanRequest)
1163         {
1164             f->scan(package, apdu);
1165         }
1166         else
1167         {
1168             mp::odr odr;
1169             
1170             package.response() = odr.create_close(
1171                 apdu, Z_Close_protocolError,
1172                 "unsupported APDU in filter_session_shared");
1173             
1174             package.session().close();
1175         }
1176     }
1177     m_p->release_frontend(package);
1178 }
1179
1180 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only,
1181                                   const char *path)
1182 {
1183     for (ptr = ptr->children; ptr; ptr = ptr->next)
1184     {
1185         if (ptr->type != XML_ELEMENT_NODE)
1186             continue;
1187         if (!strcmp((const char *) ptr->name, "resultset"))
1188         {
1189             const struct _xmlAttr *attr;
1190             for (attr = ptr->properties; attr; attr = attr->next)
1191             {
1192                 if (!strcmp((const char *) attr->name, "ttl"))
1193                     m_p->m_resultset_ttl = 
1194                         mp::xml::get_int(attr->children, 30);
1195                 else if (!strcmp((const char *) attr->name, "max"))
1196                 {
1197                     m_p->m_resultset_max = 
1198                         mp::xml::get_int(attr->children, 10);
1199                 }
1200                 else if (!strcmp((const char *) attr->name, "optimizesearch"))
1201                 {
1202                     m_p->m_optimize_search =
1203                         mp::xml::get_bool(attr->children, true);
1204                 }
1205                 else
1206                     throw mp::filter::FilterException(
1207                         "Bad attribute " + std::string((const char *)
1208                                                        attr->name));
1209             }
1210         }
1211         else if (!strcmp((const char *) ptr->name, "session"))
1212         {
1213             const struct _xmlAttr *attr;
1214             for (attr = ptr->properties; attr; attr = attr->next)
1215             {
1216                 if (!strcmp((const char *) attr->name, "ttl"))
1217                     m_p->m_session_ttl = 
1218                         mp::xml::get_int(attr->children, 120);
1219                 else
1220                     throw mp::filter::FilterException(
1221                         "Bad attribute " + std::string((const char *)
1222                                                        attr->name));
1223             }
1224         }
1225         else
1226         {
1227             throw mp::filter::FilterException("Bad element " 
1228                                                + std::string((const char *)
1229                                                              ptr->name));
1230         }
1231     }
1232 }
1233
1234 static mp::filter::Base* filter_creator()
1235 {
1236     return new mp::filter::SessionShared;
1237 }
1238
1239 extern "C" {
1240     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1241         0,
1242         "session_shared",
1243         filter_creator
1244     };
1245 }
1246
1247 /*
1248  * Local variables:
1249  * c-basic-offset: 4
1250  * c-file-style: "Stroustrup"
1251  * indent-tabs-mode: nil
1252  * End:
1253  * vim: shiftwidth=4 tabstop=8 expandtab
1254  */
1255