Version 1.0.23. Bump copyright year.
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2010 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include "filter.hpp"
22 #include "package.hpp"
23
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30
31 #include "util.hpp"
32 #include "filter_session_shared.hpp"
33
34 #include <yaz/log.h>
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yazpp/z-query.h>
39 #include <yazpp/record-cache.h>
40 #include <map>
41 #include <iostream>
42 #include <time.h>
43
44 namespace mp = metaproxy_1;
45 namespace yf = metaproxy_1::filter;
46
47 namespace metaproxy_1 {
48
49     namespace filter {
50         // key for session.. We'll only share sessions with same InitKey
51         class SessionShared::InitKey {
52         public:
53             bool operator < (const SessionShared::InitKey &k) const;
54             InitKey(Z_InitRequest *req);
55             InitKey(const InitKey &);
56             ~InitKey();
57         private:
58             char *m_idAuthentication_buf;
59             int m_idAuthentication_size;
60             char *m_otherInfo_buf;
61             int m_otherInfo_size;
62             ODR m_odr;
63         };
64         // worker thread .. for expiry of sessions
65         class SessionShared::Worker {
66         public:
67             Worker(SessionShared::Rep *rep);
68             void operator() (void);
69         private:
70             SessionShared::Rep *m_p;
71         };
72         // backend result set
73         class SessionShared::BackendSet {
74         public:
75             std::string m_result_set_id;
76             Databases m_databases;
77             int m_result_set_size;
78             yazpp_1::Yaz_Z_Query m_query;
79             time_t m_time_last_use;
80             void timestamp();
81             yazpp_1::RecordCache m_record_cache;
82             BackendSet(
83                 const std::string &result_set_id,
84                 const Databases &databases,
85                 const yazpp_1::Yaz_Z_Query &query);
86             bool search(
87                 Package &frontend_package,
88                 const Z_APDU *apdu_req,
89                 const BackendInstancePtr bp,
90                 Z_Records **z_records);
91         };
92         // backend connection instance
93         class SessionShared::BackendInstance {
94             friend class Rep;
95             friend class BackendClass;
96             friend class BackendSet;
97         public:
98             mp::Session m_session;
99             BackendSetList m_sets;
100             bool m_in_use;
101             int m_sequence_this;
102             int m_result_set_sequence;
103             time_t m_time_last_use;
104             mp::Package * m_close_package;
105             ~BackendInstance();
106         };
107         // backends of some class (all with same InitKey)
108         class SessionShared::BackendClass : boost::noncopyable {
109             friend class Rep;
110             friend struct Frontend;
111             bool m_named_result_sets;
112             BackendInstanceList m_backend_list;
113             BackendInstancePtr create_backend(const Package &package);
114             void remove_backend(BackendInstancePtr b);
115             BackendInstancePtr get_backend(const Package &package);
116             void use_backend(BackendInstancePtr b);
117             void release_backend(BackendInstancePtr b);
118             void expire_class();
119             yazpp_1::GDU m_init_request;
120             yazpp_1::GDU m_init_response;
121             boost::mutex m_mutex_backend_class;
122             int m_sequence_top;
123             time_t m_backend_set_ttl;
124             time_t m_backend_expiry_ttl;
125             size_t m_backend_set_max;
126         public:
127             BackendClass(const yazpp_1::GDU &init_request,
128                          int resultset_ttl,
129                          int resultset_max,
130                          int session_ttl);
131             ~BackendClass();
132         };
133         // frontend result set
134         class SessionShared::FrontendSet {
135             Databases m_databases;
136             yazpp_1::Yaz_Z_Query m_query;
137         public:
138             const Databases &get_databases();
139             const yazpp_1::Yaz_Z_Query &get_query();
140             FrontendSet(
141                 const Databases &databases,
142                 const yazpp_1::Yaz_Z_Query &query);
143             FrontendSet();
144         };
145         // frontend session
146         struct SessionShared::Frontend {
147             Frontend(Rep *rep);
148             ~Frontend();
149             bool m_is_virtual;
150             bool m_in_use;
151             Z_Options m_init_options;
152             void search(Package &package, Z_APDU *apdu);
153             void present(Package &package, Z_APDU *apdu);
154             void scan(Package &package, Z_APDU *apdu);
155
156             void get_set(mp::Package &package,
157                          const Z_APDU *apdu_req,
158                          const Databases &databases,
159                          yazpp_1::Yaz_Z_Query &query,
160                          BackendInstancePtr &found_backend,
161                          BackendSetPtr &found_set);
162             void override_set(BackendInstancePtr &found_backend,
163                               std::string &result_set_id);
164
165             Rep *m_p;
166             BackendClassPtr m_backend_class;
167             FrontendSets m_frontend_sets;
168         };            
169         // representation
170         class SessionShared::Rep {
171             friend class SessionShared;
172             friend struct Frontend;
173             
174             FrontendPtr get_frontend(Package &package);
175             void release_frontend(Package &package);
176             Rep();
177         public:
178             void expire();
179         private:
180             void init(Package &package, const Z_GDU *gdu,
181                       FrontendPtr frontend);
182             boost::mutex m_mutex;
183             boost::condition m_cond_session_ready;
184             std::map<mp::Session, FrontendPtr> m_clients;
185
186             BackendClassMap m_backend_map;
187             boost::mutex m_mutex_backend_map;
188             boost::thread_group m_thrds;
189             int m_resultset_ttl;
190             int m_resultset_max;
191             int m_session_ttl;
192         };
193     }
194 }
195
196 yf::SessionShared::FrontendSet::FrontendSet(
197     const Databases &databases,
198     const yazpp_1::Yaz_Z_Query &query)
199     : m_databases(databases), m_query(query)
200 {
201 }
202
203 const yf::SessionShared::Databases & 
204 yf::SessionShared::FrontendSet::get_databases()
205 {
206     return m_databases;
207 }
208
209 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
210 {
211     return m_query;
212 }
213
214 yf::SessionShared::InitKey::InitKey(const InitKey &k)
215 {
216     m_odr = odr_createmem(ODR_ENCODE);
217     
218     m_idAuthentication_size =  k.m_idAuthentication_size;
219     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
220     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
221            m_idAuthentication_size);
222
223     m_otherInfo_size =  k.m_otherInfo_size;
224     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
225     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
226            m_otherInfo_size);
227 }
228
229 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
230 {
231     m_odr = odr_createmem(ODR_ENCODE);
232
233     Z_IdAuthentication *t = req->idAuthentication;
234     z_IdAuthentication(m_odr, &t, 1, 0);
235     m_idAuthentication_buf =
236         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
237
238     Z_OtherInformation *o = req->otherInfo;
239     z_OtherInformation(m_odr, &o, 1, 0);
240     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
241 }
242
243 yf::SessionShared::InitKey::~InitKey()
244 {
245     odr_destroy(m_odr);
246 }
247
248 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
249     const 
250 {
251     int c;
252     c = mp::util::memcmp2(
253         (void*) m_idAuthentication_buf, m_idAuthentication_size,
254         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
255     if (c < 0)
256         return true;
257     else if (c > 0)
258         return false;
259
260     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
261                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
262     if (c < 0)
263         return true;
264     else if (c > 0)
265         return false;
266     return false;
267 }
268
269 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
270 {
271     boost::mutex::scoped_lock lock(m_mutex_backend_class);
272     b->m_in_use = false;
273 }
274
275
276 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
277 {
278     BackendInstanceList::iterator it = m_backend_list.begin();
279     
280     while (it != m_backend_list.end())
281     {
282         if (*it == b)
283         {
284              mp::odr odr;
285             (*it)->m_close_package->response() = odr.create_close(
286                 0, Z_Close_lackOfActivity, 0);
287             (*it)->m_close_package->session().close();
288             (*it)->m_close_package->move();
289             
290             it = m_backend_list.erase(it);
291         }
292         else
293             it++;
294     }
295 }
296
297
298
299 yf::SessionShared::BackendInstancePtr 
300 yf::SessionShared::BackendClass::get_backend(
301     const mp::Package &frontend_package)
302 {
303     {
304         boost::mutex::scoped_lock lock(m_mutex_backend_class);
305         
306         BackendInstanceList::const_iterator it = m_backend_list.begin();
307         
308         BackendInstancePtr backend1; // null
309         
310         for (; it != m_backend_list.end(); it++)
311         {
312             if (!(*it)->m_in_use)
313             {
314                 if (!backend1 
315                     || (*it)->m_sequence_this < backend1->m_sequence_this)
316                     backend1 = *it;
317             }
318         }
319         if (backend1)
320         {
321             use_backend(backend1);
322             return backend1;
323         }
324     }
325     return create_backend(frontend_package);
326 }
327
328 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
329 {
330     backend->m_in_use = true;
331     time(&backend->m_time_last_use);
332     backend->m_sequence_this = m_sequence_top++;
333 }
334
335 yf::SessionShared::BackendInstance::~BackendInstance()
336 {
337     delete m_close_package;
338 }
339
340 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
341     const mp::Package &frontend_package)
342 {
343     BackendInstancePtr bp(new BackendInstance);
344     BackendInstancePtr null;
345
346     bp->m_close_package =
347         new mp::Package(bp->m_session, frontend_package.origin());
348     bp->m_close_package->copy_filter(frontend_package);
349
350     Package init_package(bp->m_session, frontend_package.origin());
351
352     init_package.copy_filter(frontend_package);
353
354     yazpp_1::GDU actual_init_request = m_init_request;
355     Z_GDU *init_pdu = actual_init_request.get();
356
357     assert(init_pdu->which == Z_GDU_Z3950);
358     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
359
360     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
361     ODR_MASK_ZERO(req->options);
362
363     ODR_MASK_SET(req->options, Z_Options_search);
364     ODR_MASK_SET(req->options, Z_Options_present);
365     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
366     ODR_MASK_SET(req->options, Z_Options_scan);
367
368     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
369     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
370     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
371
372     init_package.request() = init_pdu;
373
374     init_package.move();
375
376     boost::mutex::scoped_lock lock(m_mutex_backend_class);
377
378     m_named_result_sets = false;
379     Z_GDU *gdu = init_package.response().get();
380     if (!init_package.session().is_closed()
381         && gdu && gdu->which == Z_GDU_Z3950 
382         && gdu->u.z3950->which == Z_APDU_initResponse)
383     {
384         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
385         if (!*res->result)
386             return null;
387         m_init_response = gdu->u.z3950;
388         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
389         {
390             m_named_result_sets = true;
391         }
392     }
393     else
394     {
395         // did not receive an init response or closed
396         return null;
397     }
398     bp->m_in_use = true;
399     time(&bp->m_time_last_use);
400     bp->m_sequence_this = 0;
401     bp->m_result_set_sequence = 0;
402     m_backend_list.push_back(bp);
403
404     return bp;
405 }
406
407
408 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
409                                               int resultset_ttl,
410                                               int resultset_max,
411                                               int session_ttl)
412     : m_named_result_sets(false), m_init_request(init_request),
413       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
414       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
415 {}
416
417 yf::SessionShared::BackendClass::~BackendClass()
418 {}
419
420 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
421                                   FrontendPtr frontend)
422 {
423     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
424
425     frontend->m_is_virtual = true;
426     frontend->m_init_options = *req->options;
427     InitKey k(req);
428     {
429         boost::mutex::scoped_lock lock(m_mutex_backend_map);
430         BackendClassMap::const_iterator it;
431         it = m_backend_map.find(k);
432         if (it == m_backend_map.end())
433         {
434             BackendClassPtr b(new BackendClass(gdu->u.z3950,
435                                                m_resultset_ttl,
436                                                m_resultset_max,
437                                                m_session_ttl));
438             m_backend_map[k] = b;
439             frontend->m_backend_class = b;
440         }
441         else
442         {
443             frontend->m_backend_class = it->second;            
444         }
445     }
446     BackendClassPtr bc = frontend->m_backend_class;
447     BackendInstancePtr backend = bc->get_backend(package);
448     
449     mp::odr odr;
450     if (!backend)
451     {
452         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
453         *apdu->u.initResponse->result = 0;
454         package.response() = apdu;
455         package.session().close();
456     }
457     else
458     {
459         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
460         yazpp_1::GDU init_response = bc->m_init_response;
461         Z_GDU *response_gdu = init_response.get();
462         mp::util::transfer_referenceId(odr, gdu->u.z3950,
463                                        response_gdu->u.z3950);
464
465         Z_Options *server_options =
466             response_gdu->u.z3950->u.initResponse->options;
467         Z_Options *client_options = &frontend->m_init_options;
468
469         int i;
470         for (i = 0; i<30; i++)
471             if (!ODR_MASK_GET(client_options, i))
472                 ODR_MASK_CLEAR(server_options, i);
473         package.response() = init_response;
474     }
475     if (backend)
476         bc->release_backend(backend);
477 }
478
479 void yf::SessionShared::BackendSet::timestamp()
480 {
481     time(&m_time_last_use);
482 }
483
484 yf::SessionShared::BackendSet::BackendSet(
485     const std::string &result_set_id,
486     const Databases &databases,
487     const yazpp_1::Yaz_Z_Query &query) :
488     m_result_set_id(result_set_id),
489     m_databases(databases), m_result_set_size(0), m_query(query) 
490 {
491     timestamp();
492 }
493
494 static int get_diagnostic(Z_DefaultDiagFormat *r)
495 {
496     return *r->condition;
497 }
498
499 bool yf::SessionShared::BackendSet::search(
500     mp::Package &frontend_package,
501     const Z_APDU *frontend_apdu,
502     const BackendInstancePtr bp,
503     Z_Records **z_records)
504 {
505     Package search_package(bp->m_session, frontend_package.origin());
506
507     search_package.copy_filter(frontend_package);
508
509     mp::odr odr;
510     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
511     Z_SearchRequest *req = apdu_req->u.searchRequest;
512
513     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
514     req->query = m_query.get_Z_Query();
515
516     req->num_databaseNames = m_databases.size();
517     req->databaseNames = (char**) 
518         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
519     Databases::const_iterator it = m_databases.begin();
520     size_t i = 0;
521     for (; it != m_databases.end(); it++)
522         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
523
524     search_package.request() = apdu_req;
525
526     search_package.move();
527
528     Z_GDU *gdu = search_package.response().get();
529     if (!search_package.session().is_closed()
530         && gdu && gdu->which == Z_GDU_Z3950 
531         && gdu->u.z3950->which == Z_APDU_searchResponse)
532     {
533         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
534         *z_records = b_resp->records;
535         m_result_set_size = *b_resp->resultCount;
536         return true;
537     }
538     Z_APDU *f_apdu = 0;
539     if (frontend_apdu->which == Z_APDU_searchRequest)
540         f_apdu = odr.create_searchResponse(
541             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
542     else if (frontend_apdu->which == Z_APDU_presentRequest)
543         f_apdu = odr.create_presentResponse(
544             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
545     else
546         f_apdu = odr.create_close(
547             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
548     frontend_package.response() = f_apdu;
549     return false;
550 }
551
552 void yf::SessionShared::Frontend::override_set(
553     BackendInstancePtr &found_backend,
554     std::string &result_set_id)
555 {
556     BackendClassPtr bc = m_backend_class;
557     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
558     time_t now;
559     time(&now);
560     
561     for (; it != bc->m_backend_list.end(); it++)
562     {
563         if (!(*it)->m_in_use)
564         {
565             BackendSetList::iterator set_it = (*it)->m_sets.begin();
566             for (; set_it != (*it)->m_sets.end(); set_it++)
567             {
568                 if (now >= (*set_it)->m_time_last_use &&
569                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
570                 {
571                     found_backend = *it;
572                     result_set_id = (*set_it)->m_result_set_id;
573                     found_backend->m_sets.erase(set_it);
574                     return;
575                 }
576             }
577         }
578     }
579     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
580     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
581     {
582         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
583         {
584             found_backend = *it;
585             if (bc->m_named_result_sets)
586             {
587                 result_set_id = boost::io::str(
588                     boost::format("%1%") % 
589                     found_backend->m_result_set_sequence);
590                 found_backend->m_result_set_sequence++;
591             }
592             else
593                 result_set_id = "default";
594             return;
595         }
596     }
597 }
598
599 void yf::SessionShared::Frontend::get_set(mp::Package &package,
600                                           const Z_APDU *apdu_req,
601                                           const Databases &databases,
602                                           yazpp_1::Yaz_Z_Query &query,
603                                           BackendInstancePtr &found_backend,
604                                           BackendSetPtr &found_set)
605 {
606     bool session_restarted = false;
607
608 restart:
609     std::string result_set_id;
610     BackendClassPtr bc = m_backend_class;
611     {
612         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
613      
614         // look at each backend and see if we have a similar search
615         BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
616         
617         for (; it != bc->m_backend_list.end(); it++)
618         {
619             if (!(*it)->m_in_use)
620             {
621                 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
622                 for (; set_it != (*it)->m_sets.end(); set_it++)
623                 {
624                     if ((*set_it)->m_databases == databases 
625                         && query.match(&(*set_it)->m_query))
626                     {
627                         found_set = *set_it;
628                         found_backend = *it;
629                         bc->use_backend(found_backend);
630                         found_set->timestamp();
631                         // found matching set. No need to search again
632                         return;
633                     }
634                 }
635             }
636         }
637         override_set(found_backend, result_set_id);
638         if (found_backend)
639             bc->use_backend(found_backend);
640     }
641     if (!found_backend)
642     {
643         // create a new backend set (and new set)
644         found_backend = bc->create_backend(package);
645
646         if (!found_backend)
647         {
648             Z_APDU *f_apdu = 0;
649             mp::odr odr;
650             if (apdu_req->which == Z_APDU_searchRequest)
651             {
652                 f_apdu = odr.create_searchResponse(
653                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
654             }
655             else if (apdu_req->which == Z_APDU_presentRequest)
656             {
657                 f_apdu = odr.create_presentResponse(
658                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
659             }
660             else
661             {
662                 f_apdu = odr.create_close(
663                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
664             }
665             package.response() = f_apdu;
666             return;
667         }
668         if (bc->m_named_result_sets)
669         {
670             result_set_id = boost::io::str(
671                 boost::format("%1%") % found_backend->m_result_set_sequence);
672             found_backend->m_result_set_sequence++;
673         }
674         else
675             result_set_id = "default";
676     }
677     // we must search ...
678     BackendSetPtr new_set(new BackendSet(result_set_id,
679                                          databases, query));
680     Z_Records *z_records = 0;
681     if (!new_set->search(package, apdu_req, found_backend, &z_records))
682     {
683         bc->remove_backend(found_backend);
684         return; // search error 
685     }
686
687     if (z_records)
688     {
689         int condition = 0;
690         if (z_records->which == Z_Records_NSD)
691         {
692             condition =
693                 get_diagnostic(z_records->u.nonSurrogateDiagnostic);
694         }
695         else if (z_records->which == Z_Records_multipleNSD)
696         {
697             if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1
698                 && 
699                 
700                 z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which ==
701                 Z_DiagRec_defaultFormat)
702             {
703                 condition = get_diagnostic(
704                     z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat);
705                 
706             }
707         }
708         if (!session_restarted &&
709             condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR)
710         {
711             bc->remove_backend(found_backend);
712             session_restarted = true;
713             found_backend.reset();
714             goto restart;
715
716         }
717
718         if (condition)
719         {
720             mp::odr odr;
721             if (apdu_req->which == Z_APDU_searchRequest)
722             {
723                 Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 
724                                                            0, 0);
725                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
726                 *f_resp->searchStatus = Z_SearchResponse_none;
727                 f_resp->records = z_records;
728                 package.response() = f_apdu;
729             }
730             if (apdu_req->which == Z_APDU_presentRequest)
731             {
732                 Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 
733                                                             0, 0);
734                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
735                 f_resp->records = z_records;
736                 package.response() = f_apdu;
737             }
738             bc->release_backend(found_backend);
739             return; // search error 
740         }
741     }
742     if (!session_restarted && new_set->m_result_set_size < 0)
743     {
744         bc->remove_backend(found_backend);
745         session_restarted = true;
746         found_backend.reset();
747         goto restart;
748     }
749
750     found_set = new_set;
751     found_set->timestamp();
752     found_backend->m_sets.push_back(found_set);
753 }
754
755 void yf::SessionShared::Frontend::search(mp::Package &package,
756                                          Z_APDU *apdu_req)
757 {
758     Z_SearchRequest *req = apdu_req->u.searchRequest;
759     FrontendSets::iterator fset_it = 
760         m_frontend_sets.find(req->resultSetName);
761     if (fset_it != m_frontend_sets.end())
762     {
763         // result set already exist 
764         // if replace indicator is off: we return diagnostic if
765         // result set already exist.
766         if (*req->replaceIndicator == 0)
767         {
768             mp::odr odr;
769             Z_APDU *apdu = 
770                 odr.create_searchResponse(
771                     apdu_req,
772                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
773                     0);
774             package.response() = apdu;
775             
776             return;
777         }
778         m_frontend_sets.erase(fset_it);
779     }
780     
781     yazpp_1::Yaz_Z_Query query;
782     query.set_Z_Query(req->query);
783     Databases databases;
784     int i;
785     for (i = 0; i<req->num_databaseNames; i++)
786         databases.push_back(req->databaseNames[i]);
787
788     BackendSetPtr found_set; // null
789     BackendInstancePtr found_backend; // null
790
791     get_set(package, apdu_req, databases, query, found_backend, found_set);
792     if (!found_set)
793         return;
794
795     mp::odr odr;
796     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
797     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
798     *f_resp->resultCount = found_set->m_result_set_size;
799     package.response() = f_apdu;
800
801     FrontendSetPtr fset(new FrontendSet(databases, query));
802     m_frontend_sets[req->resultSetName] = fset;
803
804     m_backend_class->release_backend(found_backend);
805 }
806
807 void yf::SessionShared::Frontend::present(mp::Package &package,
808                                           Z_APDU *apdu_req)
809 {
810     mp::odr odr;
811     Z_PresentRequest *req = apdu_req->u.presentRequest;
812
813     FrontendSets::iterator fset_it = 
814         m_frontend_sets.find(req->resultSetId);
815
816     if (fset_it == m_frontend_sets.end())
817     {
818         Z_APDU *apdu = 
819             odr.create_presentResponse(
820                 apdu_req,
821                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
822                 req->resultSetId);
823         package.response() = apdu;
824         return;
825     }
826     FrontendSetPtr fset = fset_it->second;
827
828     Databases databases = fset->get_databases();
829     yazpp_1::Yaz_Z_Query query = fset->get_query();
830
831     BackendClassPtr bc = m_backend_class;
832     BackendSetPtr found_set; // null
833     BackendInstancePtr found_backend;
834
835     get_set(package, apdu_req, databases, query, found_backend, found_set);
836     if (!found_set)
837         return;
838
839     Z_NamePlusRecordList *npr_res = 0;
840     if (found_set->m_record_cache.lookup(odr, &npr_res, 
841                                          *req->resultSetStartPoint,
842                                          *req->numberOfRecordsRequested,
843                                          req->preferredRecordSyntax,
844                                          req->recordComposition))
845     {
846         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
847         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
848
849         yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF 
850                 " records in cache %p",
851                 *req->resultSetStartPoint,                      
852                 *req->numberOfRecordsRequested,
853                 &found_set->m_record_cache);        
854
855         *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested;
856         *f_resp->nextResultSetPosition = 
857             *req->resultSetStartPoint + *req->numberOfRecordsRequested;
858         // f_resp->presentStatus assumed OK.
859         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
860         f_resp->records->which = Z_Records_DBOSD;
861         f_resp->records->u.databaseOrSurDiagnostics = npr_res;
862         package.response() = f_apdu_res;
863         bc->release_backend(found_backend);
864         return;
865     }
866                               
867     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
868     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
869     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
870     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
871     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
872     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
873     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
874     p_req->recordComposition = req->recordComposition;
875
876     Package present_package(found_backend->m_session, package.origin());
877     present_package.copy_filter(package);
878
879     present_package.request() = p_apdu;
880
881     present_package.move();
882
883     Z_GDU *gdu = present_package.response().get();
884     if (!present_package.session().is_closed()
885         && gdu && gdu->which == Z_GDU_Z3950 
886         && gdu->u.z3950->which == Z_APDU_presentResponse)
887     {
888         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
889         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
890         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
891
892         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
893         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
894         f_resp->presentStatus= b_resp->presentStatus;
895         f_resp->records = b_resp->records;
896         f_resp->otherInfo = b_resp->otherInfo;
897         package.response() = f_apdu_res;
898
899         if (b_resp->records && b_resp->records->which ==  Z_Records_DBOSD)
900         {
901             yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF
902                     " records to cache %p",
903                     *req->resultSetStartPoint,                      
904                     *f_resp->numberOfRecordsReturned,
905                     &found_set->m_record_cache);        
906             found_set->m_record_cache.add(
907                 odr,
908                 b_resp->records->u.databaseOrSurDiagnostics,
909                 *req->resultSetStartPoint,                      
910                 *f_resp->numberOfRecordsReturned);
911         }
912         bc->release_backend(found_backend);
913     }
914     else
915     {
916         bc->remove_backend(found_backend);
917         Z_APDU *f_apdu_res = 
918             odr.create_presentResponse(
919                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
920         package.response() = f_apdu_res;
921     }
922 }
923
924 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
925                                        Z_APDU *apdu_req)
926 {
927     BackendClassPtr bc = m_backend_class;
928     BackendInstancePtr backend = bc->get_backend(frontend_package);
929     if (!backend)
930     {
931         mp::odr odr;
932         Z_APDU *apdu = odr.create_scanResponse(
933             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
934         frontend_package.response() = apdu;
935     }
936     else
937     {
938         Package scan_package(backend->m_session, frontend_package.origin());
939         scan_package.copy_filter(frontend_package);
940         scan_package.request() = apdu_req;
941         scan_package.move();
942         frontend_package.response() = scan_package.response();
943         if (scan_package.session().is_closed())
944         {
945             frontend_package.session().close();
946             bc->remove_backend(backend);
947         }
948         else
949             bc->release_backend(backend);
950     }
951 }
952
953 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
954 {
955 }
956
957 void yf::SessionShared::Worker::operator() (void)
958 {
959     m_p->expire();
960 }
961
962 void yf::SessionShared::BackendClass::expire_class()
963 {
964     time_t now;
965     time(&now);
966     boost::mutex::scoped_lock lock(m_mutex_backend_class);
967     BackendInstanceList::iterator bit = m_backend_list.begin();
968     while (bit != m_backend_list.end())
969     {
970         time_t last_use = (*bit)->m_time_last_use;
971         
972         if ((*bit)->m_in_use)
973         {
974             bit++;
975         }
976         else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
977             || (now < last_use))
978         {
979             mp::odr odr;
980             (*bit)->m_close_package->response() = odr.create_close(
981                 0, Z_Close_lackOfActivity, 0);
982             (*bit)->m_close_package->session().close();
983             (*bit)->m_close_package->move();
984
985             bit = m_backend_list.erase(bit);
986         }
987         else
988         {
989             bit++;
990         }
991     }
992 }
993
994 void yf::SessionShared::Rep::expire()
995 {
996     while (true)
997     {
998         boost::xtime xt;
999         boost::xtime_get(&xt, boost::TIME_UTC);
1000         xt.sec += 30;
1001         boost::thread::sleep(xt);
1002         
1003         BackendClassMap::const_iterator b_it = m_backend_map.begin();
1004         for (; b_it != m_backend_map.end(); b_it++)
1005             b_it->second->expire_class();
1006     }
1007 }
1008
1009 yf::SessionShared::Rep::Rep()
1010 {
1011     m_resultset_ttl = 30;
1012     m_resultset_max = 10;
1013     m_session_ttl = 90;
1014     yf::SessionShared::Worker w(this);
1015     m_thrds.add_thread(new boost::thread(w));
1016 }
1017
1018 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
1019 {
1020 }
1021
1022 yf::SessionShared::~SessionShared() {
1023 }
1024
1025
1026 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
1027 {
1028 }
1029
1030 yf::SessionShared::Frontend::~Frontend()
1031 {
1032 }
1033
1034 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
1035 {
1036     boost::mutex::scoped_lock lock(m_mutex);
1037
1038     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1039     
1040     while(true)
1041     {
1042         it = m_clients.find(package.session());
1043         if (it == m_clients.end())
1044             break;
1045         
1046         if (!it->second->m_in_use)
1047         {
1048             it->second->m_in_use = true;
1049             return it->second;
1050         }
1051         m_cond_session_ready.wait(lock);
1052     }
1053     FrontendPtr f(new Frontend(this));
1054     m_clients[package.session()] = f;
1055     f->m_in_use = true;
1056     return f;
1057 }
1058
1059 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
1060 {
1061     boost::mutex::scoped_lock lock(m_mutex);
1062     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1063     
1064     it = m_clients.find(package.session());
1065     if (it != m_clients.end())
1066     {
1067         if (package.session().is_closed())
1068         {
1069             m_clients.erase(it);
1070         }
1071         else
1072         {
1073             it->second->m_in_use = false;
1074         }
1075         m_cond_session_ready.notify_all();
1076     }
1077 }
1078
1079
1080 void yf::SessionShared::process(mp::Package &package) const
1081 {
1082     FrontendPtr f = m_p->get_frontend(package);
1083
1084     Z_GDU *gdu = package.request().get();
1085     
1086     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1087         Z_APDU_initRequest && !f->m_is_virtual)
1088     {
1089         m_p->init(package, gdu, f);
1090     }
1091     else if (!f->m_is_virtual)
1092         package.move();
1093     else if (gdu && gdu->which == Z_GDU_Z3950)
1094     {
1095         Z_APDU *apdu = gdu->u.z3950;
1096         if (apdu->which == Z_APDU_initRequest)
1097         {
1098             mp::odr odr;
1099             
1100             package.response() = odr.create_close(
1101                 apdu,
1102                 Z_Close_protocolError,
1103                 "double init");
1104             
1105             package.session().close();
1106         }
1107         else if (apdu->which == Z_APDU_close)
1108         {
1109             mp::odr odr;
1110             
1111             package.response() = odr.create_close(
1112                 apdu,
1113                 Z_Close_peerAbort, "received close from client");
1114             package.session().close();
1115         }
1116         else if (apdu->which == Z_APDU_searchRequest)
1117         {
1118             f->search(package, apdu);
1119         }
1120         else if (apdu->which == Z_APDU_presentRequest)
1121         {
1122             f->present(package, apdu);
1123         }
1124         else if (apdu->which == Z_APDU_scanRequest)
1125         {
1126             f->scan(package, apdu);
1127         }
1128         else
1129         {
1130             mp::odr odr;
1131             
1132             package.response() = odr.create_close(
1133                 apdu, Z_Close_protocolError,
1134                 "unsupported APDU in filter_session_shared");
1135             
1136             package.session().close();
1137         }
1138     }
1139     m_p->release_frontend(package);
1140 }
1141
1142 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only)
1143 {
1144     for (ptr = ptr->children; ptr; ptr = ptr->next)
1145     {
1146         if (ptr->type != XML_ELEMENT_NODE)
1147             continue;
1148         if (!strcmp((const char *) ptr->name, "resultset"))
1149         {
1150             const struct _xmlAttr *attr;
1151             for (attr = ptr->properties; attr; attr = attr->next)
1152             {
1153                 if (!strcmp((const char *) attr->name, "ttl"))
1154                     m_p->m_resultset_ttl = 
1155                         mp::xml::get_int(attr->children, 30);
1156                 else if (!strcmp((const char *) attr->name, "max"))
1157                 {
1158                     m_p->m_resultset_max = 
1159                         mp::xml::get_int(attr->children, 10);
1160                 }
1161                 else
1162                     throw mp::filter::FilterException(
1163                         "Bad attribute " + std::string((const char *)
1164                                                        attr->name));
1165             }
1166         }
1167         else if (!strcmp((const char *) ptr->name, "session"))
1168         {
1169             const struct _xmlAttr *attr;
1170             for (attr = ptr->properties; attr; attr = attr->next)
1171             {
1172                 if (!strcmp((const char *) attr->name, "ttl"))
1173                     m_p->m_session_ttl = 
1174                         mp::xml::get_int(attr->children, 120);
1175                 else
1176                     throw mp::filter::FilterException(
1177                         "Bad attribute " + std::string((const char *)
1178                                                        attr->name));
1179             }
1180         }
1181         else
1182         {
1183             throw mp::filter::FilterException("Bad element " 
1184                                                + std::string((const char *)
1185                                                              ptr->name));
1186         }
1187     }
1188 }
1189
1190 static mp::filter::Base* filter_creator()
1191 {
1192     return new mp::filter::SessionShared;
1193 }
1194
1195 extern "C" {
1196     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1197         0,
1198         "session_shared",
1199         filter_creator
1200     };
1201 }
1202
1203 /*
1204  * Local variables:
1205  * c-basic-offset: 4
1206  * c-file-style: "Stroustrup"
1207  * indent-tabs-mode: nil
1208  * End:
1209  * vim: shiftwidth=4 tabstop=8 expandtab
1210  */
1211