Implemented record caching (bug #2699).
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2008 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include "filter.hpp"
22 #include "package.hpp"
23
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30
31 #include "util.hpp"
32 #include "filter_session_shared.hpp"
33
34 #include <yaz/log.h>
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yazpp/z-query.h>
39 #include <yazpp/record-cache.h>
40 #include <map>
41 #include <iostream>
42 #include <time.h>
43
44 namespace mp = metaproxy_1;
45 namespace yf = metaproxy_1::filter;
46
47 namespace metaproxy_1 {
48
49     namespace filter {
50         // key for session.. We'll only share sessions with same InitKey
51         class SessionShared::InitKey {
52         public:
53             bool operator < (const SessionShared::InitKey &k) const;
54             InitKey(Z_InitRequest *req);
55             InitKey(const InitKey &);
56             ~InitKey();
57         private:
58             char *m_idAuthentication_buf;
59             int m_idAuthentication_size;
60             char *m_otherInfo_buf;
61             int m_otherInfo_size;
62             ODR m_odr;
63         };
64         // worker thread .. for expiry of sessions
65         class SessionShared::Worker {
66         public:
67             Worker(SessionShared::Rep *rep);
68             void operator() (void);
69         private:
70             SessionShared::Rep *m_p;
71         };
72         // backend result set
73         class SessionShared::BackendSet {
74         public:
75             std::string m_result_set_id;
76             Databases m_databases;
77             int m_result_set_size;
78             yazpp_1::Yaz_Z_Query m_query;
79             time_t m_time_last_use;
80             void timestamp();
81             yazpp_1::RecordCache m_record_cache;
82             BackendSet(
83                 const std::string &result_set_id,
84                 const Databases &databases,
85                 const yazpp_1::Yaz_Z_Query &query);
86             bool search(
87                 Package &frontend_package,
88                 const Z_APDU *apdu_req,
89                 const BackendInstancePtr bp,
90                 bool &fatal_error);
91         };
92         // backend connection instance
93         class SessionShared::BackendInstance {
94             friend class Rep;
95             friend class BackendClass;
96             friend class BackendSet;
97         public:
98             mp::Session m_session;
99             BackendSetList m_sets;
100             bool m_in_use;
101             int m_sequence_this;
102             int m_result_set_sequence;
103             time_t m_time_last_use;
104             mp::Package * m_close_package;
105             ~BackendInstance();
106         };
107         // backends of some class (all with same InitKey)
108         class SessionShared::BackendClass : boost::noncopyable {
109             friend class Rep;
110             friend struct Frontend;
111             bool m_named_result_sets;
112             BackendInstanceList m_backend_list;
113             BackendInstancePtr create_backend(const Package &package);
114             void remove_backend(BackendInstancePtr b);
115             BackendInstancePtr get_backend(const Package &package);
116             void use_backend(BackendInstancePtr b);
117             void release_backend(BackendInstancePtr b);
118             void expire_class();
119             yazpp_1::GDU m_init_request;
120             yazpp_1::GDU m_init_response;
121             boost::mutex m_mutex_backend_class;
122             int m_sequence_top;
123             time_t m_backend_set_ttl;
124             time_t m_backend_expiry_ttl;
125             size_t m_backend_set_max;
126         public:
127             BackendClass(const yazpp_1::GDU &init_request,
128                          int resultset_ttl,
129                          int resultset_max,
130                          int session_ttl);
131             ~BackendClass();
132         };
133         // frontend result set
134         class SessionShared::FrontendSet {
135             Databases m_databases;
136             yazpp_1::Yaz_Z_Query m_query;
137         public:
138             const Databases &get_databases();
139             const yazpp_1::Yaz_Z_Query &get_query();
140             FrontendSet(
141                 const Databases &databases,
142                 const yazpp_1::Yaz_Z_Query &query);
143             FrontendSet();
144         };
145         // frontend session
146         struct SessionShared::Frontend {
147             Frontend(Rep *rep);
148             ~Frontend();
149             bool m_is_virtual;
150             bool m_in_use;
151             Z_Options m_init_options;
152             void search(Package &package, Z_APDU *apdu);
153             void present(Package &package, Z_APDU *apdu);
154             void scan(Package &package, Z_APDU *apdu);
155
156             void get_set(mp::Package &package,
157                          const Z_APDU *apdu_req,
158                          const Databases &databases,
159                          yazpp_1::Yaz_Z_Query &query,
160                          BackendInstancePtr &found_backend,
161                          BackendSetPtr &found_set);
162             void override_set(BackendInstancePtr &found_backend,
163                               std::string &result_set_id);
164
165             Rep *m_p;
166             BackendClassPtr m_backend_class;
167             FrontendSets m_frontend_sets;
168         };            
169         // representation
170         class SessionShared::Rep {
171             friend class SessionShared;
172             friend struct Frontend;
173             
174             FrontendPtr get_frontend(Package &package);
175             void release_frontend(Package &package);
176             Rep();
177         public:
178             void expire();
179         private:
180             void init(Package &package, const Z_GDU *gdu,
181                       FrontendPtr frontend);
182             boost::mutex m_mutex;
183             boost::condition m_cond_session_ready;
184             std::map<mp::Session, FrontendPtr> m_clients;
185
186             BackendClassMap m_backend_map;
187             boost::mutex m_mutex_backend_map;
188             boost::thread_group m_thrds;
189             int m_resultset_ttl;
190             int m_resultset_max;
191             int m_session_ttl;
192         };
193     }
194 }
195
196 yf::SessionShared::FrontendSet::FrontendSet(
197     const Databases &databases,
198     const yazpp_1::Yaz_Z_Query &query)
199     : m_databases(databases), m_query(query)
200 {
201 }
202
203 const yf::SessionShared::Databases & 
204 yf::SessionShared::FrontendSet::get_databases()
205 {
206     return m_databases;
207 }
208
209 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
210 {
211     return m_query;
212 }
213
214 yf::SessionShared::InitKey::InitKey(const InitKey &k)
215 {
216     m_odr = odr_createmem(ODR_ENCODE);
217     
218     m_idAuthentication_size =  k.m_idAuthentication_size;
219     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
220     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
221            m_idAuthentication_size);
222
223     m_otherInfo_size =  k.m_otherInfo_size;
224     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
225     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
226            m_otherInfo_size);
227 }
228
229 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
230 {
231     m_odr = odr_createmem(ODR_ENCODE);
232
233     Z_IdAuthentication *t = req->idAuthentication;
234     z_IdAuthentication(m_odr, &t, 1, 0);
235     m_idAuthentication_buf =
236         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
237
238     Z_OtherInformation *o = req->otherInfo;
239     z_OtherInformation(m_odr, &o, 1, 0);
240     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
241 }
242
243 yf::SessionShared::InitKey::~InitKey()
244 {
245     odr_destroy(m_odr);
246 }
247
248 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
249     const 
250 {
251     int c;
252     c = mp::util::memcmp2(
253         (void*) m_idAuthentication_buf, m_idAuthentication_size,
254         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
255     if (c < 0)
256         return true;
257     else if (c > 0)
258         return false;
259
260     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
261                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
262     if (c < 0)
263         return true;
264     else if (c > 0)
265         return false;
266     return false;
267 }
268
269 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
270 {
271     boost::mutex::scoped_lock lock(m_mutex_backend_class);
272     b->m_in_use = false;
273 }
274
275
276 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
277 {
278     BackendInstanceList::iterator it = m_backend_list.begin();
279     
280     while (it != m_backend_list.end())
281     {
282         if (*it == b)
283         {
284              mp::odr odr;
285             (*it)->m_close_package->response() = odr.create_close(
286                 0, Z_Close_lackOfActivity, 0);
287             (*it)->m_close_package->session().close();
288             (*it)->m_close_package->move();
289             
290             it = m_backend_list.erase(it);
291         }
292         else
293             it++;
294     }
295 }
296
297
298
299 yf::SessionShared::BackendInstancePtr 
300 yf::SessionShared::BackendClass::get_backend(
301     const mp::Package &frontend_package)
302 {
303     {
304         boost::mutex::scoped_lock lock(m_mutex_backend_class);
305         
306         BackendInstanceList::const_iterator it = m_backend_list.begin();
307         
308         BackendInstancePtr backend1; // null
309         
310         for (; it != m_backend_list.end(); it++)
311         {
312             if (!(*it)->m_in_use)
313             {
314                 if (!backend1 
315                     || (*it)->m_sequence_this < backend1->m_sequence_this)
316                     backend1 = *it;
317             }
318         }
319         if (backend1)
320         {
321             use_backend(backend1);
322             return backend1;
323         }
324     }
325     return create_backend(frontend_package);
326 }
327
328 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
329 {
330     backend->m_in_use = true;
331     time(&backend->m_time_last_use);
332     backend->m_sequence_this = m_sequence_top++;
333 }
334
335 yf::SessionShared::BackendInstance::~BackendInstance()
336 {
337     delete m_close_package;
338 }
339
340 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
341     const mp::Package &frontend_package)
342 {
343     BackendInstancePtr bp(new BackendInstance);
344     BackendInstancePtr null;
345
346     bp->m_close_package =
347         new mp::Package(bp->m_session, frontend_package.origin());
348     bp->m_close_package->copy_filter(frontend_package);
349
350     Package init_package(bp->m_session, frontend_package.origin());
351
352     init_package.copy_filter(frontend_package);
353
354     yazpp_1::GDU actual_init_request = m_init_request;
355     Z_GDU *init_pdu = actual_init_request.get();
356
357     assert(init_pdu->which == Z_GDU_Z3950);
358     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
359
360     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
361     ODR_MASK_ZERO(req->options);
362
363     ODR_MASK_SET(req->options, Z_Options_search);
364     ODR_MASK_SET(req->options, Z_Options_present);
365     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
366     ODR_MASK_SET(req->options, Z_Options_scan);
367
368     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
369     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
370     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
371
372     init_package.request() = init_pdu;
373
374     init_package.move();
375
376     boost::mutex::scoped_lock lock(m_mutex_backend_class);
377
378     m_named_result_sets = false;
379     Z_GDU *gdu = init_package.response().get();
380     if (!init_package.session().is_closed()
381         && gdu && gdu->which == Z_GDU_Z3950 
382         && gdu->u.z3950->which == Z_APDU_initResponse)
383     {
384         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
385         if (!*res->result)
386             return null;
387         m_init_response = gdu->u.z3950;
388         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
389         {
390             m_named_result_sets = true;
391         }
392     }
393     else
394     {
395         // did not receive an init response or closed
396         return null;
397     }
398     bp->m_in_use = true;
399     time(&bp->m_time_last_use);
400     bp->m_sequence_this = 0;
401     bp->m_result_set_sequence = 0;
402     m_backend_list.push_back(bp);
403
404     return bp;
405 }
406
407
408 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
409                                               int resultset_ttl,
410                                               int resultset_max,
411                                               int session_ttl)
412     : m_named_result_sets(false), m_init_request(init_request),
413       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
414       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
415 {}
416
417 yf::SessionShared::BackendClass::~BackendClass()
418 {}
419
420 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
421                                   FrontendPtr frontend)
422 {
423     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
424
425     frontend->m_is_virtual = true;
426     frontend->m_init_options = *req->options;
427     InitKey k(req);
428     {
429         boost::mutex::scoped_lock lock(m_mutex_backend_map);
430         BackendClassMap::const_iterator it;
431         it = m_backend_map.find(k);
432         if (it == m_backend_map.end())
433         {
434             BackendClassPtr b(new BackendClass(gdu->u.z3950,
435                                                m_resultset_ttl,
436                                                m_resultset_max,
437                                                m_session_ttl));
438             m_backend_map[k] = b;
439             frontend->m_backend_class = b;
440         }
441         else
442         {
443             frontend->m_backend_class = it->second;            
444         }
445     }
446     BackendClassPtr bc = frontend->m_backend_class;
447     BackendInstancePtr backend = bc->get_backend(package);
448     
449     mp::odr odr;
450     if (!backend)
451     {
452         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
453         *apdu->u.initResponse->result = 0;
454         package.response() = apdu;
455         package.session().close();
456     }
457     else
458     {
459         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
460         yazpp_1::GDU init_response = bc->m_init_response;
461         Z_GDU *response_gdu = init_response.get();
462         mp::util::transfer_referenceId(odr, gdu->u.z3950,
463                                        response_gdu->u.z3950);
464
465         Z_Options *server_options =
466             response_gdu->u.z3950->u.initResponse->options;
467         Z_Options *client_options = &frontend->m_init_options;
468
469         int i;
470         for (i = 0; i<30; i++)
471             if (!ODR_MASK_GET(client_options, i))
472                 ODR_MASK_CLEAR(server_options, i);
473         package.response() = init_response;
474     }
475     if (backend)
476         bc->release_backend(backend);
477 }
478
479 void yf::SessionShared::BackendSet::timestamp()
480 {
481     time(&m_time_last_use);
482 }
483
484 yf::SessionShared::BackendSet::BackendSet(
485     const std::string &result_set_id,
486     const Databases &databases,
487     const yazpp_1::Yaz_Z_Query &query) :
488     m_result_set_id(result_set_id),
489     m_databases(databases), m_result_set_size(0), m_query(query) 
490 {
491     timestamp();
492 }
493
494 bool yf::SessionShared::BackendSet::search(
495     mp::Package &frontend_package,
496     const Z_APDU *frontend_apdu,
497     const BackendInstancePtr bp,
498     bool & fatal_error)
499 {
500     Package search_package(bp->m_session, frontend_package.origin());
501
502     search_package.copy_filter(frontend_package);
503
504     mp::odr odr;
505     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
506     Z_SearchRequest *req = apdu_req->u.searchRequest;
507
508     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
509     req->query = m_query.get_Z_Query();
510
511     req->num_databaseNames = m_databases.size();
512     req->databaseNames = (char**) 
513         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
514     Databases::const_iterator it = m_databases.begin();
515     size_t i = 0;
516     for (; it != m_databases.end(); it++)
517         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
518
519     search_package.request() = apdu_req;
520
521     search_package.move();
522     fatal_error = false;  // assume backend session is good
523
524     Z_Records *z_records_diag = 0;
525     Z_GDU *gdu = search_package.response().get();
526     if (!search_package.session().is_closed()
527         && gdu && gdu->which == Z_GDU_Z3950 
528         && gdu->u.z3950->which == Z_APDU_searchResponse)
529     {
530         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
531         if (b_resp->records)
532         {
533             if (b_resp->records->which == Z_Records_NSD 
534                 || b_resp->records->which == Z_Records_multipleNSD)
535                 z_records_diag = b_resp->records;
536         }
537         if (z_records_diag)
538         {
539             // there could be diagnostics that are so bad.. that 
540             // we simply mark the error as fatal..  For now we assume
541             // we can resume
542             if (frontend_apdu->which == Z_APDU_searchRequest)
543             {
544                 Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu, 
545                                                            0, 0);
546                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
547                 *f_resp->searchStatus = *b_resp->searchStatus;
548                 f_resp->records = z_records_diag;
549                 frontend_package.response() = f_apdu;
550                 return false;
551             }
552             if (frontend_apdu->which == Z_APDU_presentRequest)
553             {
554                 Z_APDU *f_apdu = odr.create_presentResponse(frontend_apdu, 
555                                                             0, 0);
556                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
557                 f_resp->records = z_records_diag;
558                 frontend_package.response() = f_apdu;
559                 return false;
560             }
561         }
562         m_result_set_size = *b_resp->resultCount;
563         return true;
564     }
565     Z_APDU *f_apdu = 0;
566     if (frontend_apdu->which == Z_APDU_searchRequest)
567         f_apdu = odr.create_searchResponse(
568             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
569     else if (frontend_apdu->which == Z_APDU_presentRequest)
570         f_apdu = odr.create_presentResponse(
571             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
572     else
573         f_apdu = odr.create_close(
574             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
575     frontend_package.response() = f_apdu;
576     fatal_error = true; // weired response.. bad backend
577     return false;
578 }
579
580 void yf::SessionShared::Frontend::override_set(
581     BackendInstancePtr &found_backend,
582     std::string &result_set_id)
583 {
584     BackendClassPtr bc = m_backend_class;
585     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
586     time_t now;
587     time(&now);
588     
589     for (; it != bc->m_backend_list.end(); it++)
590     {
591         if (!(*it)->m_in_use)
592         {
593             BackendSetList::iterator set_it = (*it)->m_sets.begin();
594             for (; set_it != (*it)->m_sets.end(); set_it++)
595             {
596                 if (now >= (*set_it)->m_time_last_use &&
597                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
598                 {
599                     found_backend = *it;
600                     result_set_id = (*set_it)->m_result_set_id;
601                     found_backend->m_sets.erase(set_it);
602                     return;
603                 }
604             }
605         }
606     }
607     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
608     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
609     {
610         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
611         {
612             found_backend = *it;
613             if (bc->m_named_result_sets)
614             {
615                 result_set_id = boost::io::str(
616                     boost::format("%1%") % 
617                     found_backend->m_result_set_sequence);
618                 found_backend->m_result_set_sequence++;
619             }
620             else
621                 result_set_id = "default";
622             return;
623         }
624     }
625 }
626
627 void yf::SessionShared::Frontend::get_set(mp::Package &package,
628                                           const Z_APDU *apdu_req,
629                                           const Databases &databases,
630                                           yazpp_1::Yaz_Z_Query &query,
631                                           BackendInstancePtr &found_backend,
632                                           BackendSetPtr &found_set)
633 {
634     bool session_restarted = false;
635
636 restart:
637     std::string result_set_id;
638     BackendClassPtr bc = m_backend_class;
639     {
640         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
641      
642         // look at each backend and see if we have a similar search
643         BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
644         
645         for (; it != bc->m_backend_list.end(); it++)
646         {
647             if (!(*it)->m_in_use)
648             {
649                 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
650                 for (; set_it != (*it)->m_sets.end(); set_it++)
651                 {
652                     if ((*set_it)->m_databases == databases 
653                         && query.match(&(*set_it)->m_query))
654                     {
655                         found_set = *set_it;
656                         found_backend = *it;
657                         bc->use_backend(found_backend);
658                         found_set->timestamp();
659                         // found matching set. No need to search again
660                         return;
661                     }
662                 }
663             }
664         }
665         override_set(found_backend, result_set_id);
666         if (found_backend)
667             bc->use_backend(found_backend);
668     }
669     if (!found_backend)
670     {
671         // create a new backend set (and new set)
672         found_backend = bc->create_backend(package);
673
674         if (!found_backend)
675         {
676             Z_APDU *f_apdu = 0;
677             mp::odr odr;
678             if (apdu_req->which == Z_APDU_searchRequest)
679             {
680                 f_apdu = odr.create_searchResponse(
681                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
682             }
683             else if (apdu_req->which == Z_APDU_presentRequest)
684             {
685                 f_apdu = odr.create_presentResponse(
686                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
687             }
688             else
689             {
690                 f_apdu = odr.create_close(
691                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
692             }
693             package.response() = f_apdu;
694             return;
695         }
696         if (bc->m_named_result_sets)
697         {
698             result_set_id = boost::io::str(
699                 boost::format("%1%") % found_backend->m_result_set_sequence);
700             found_backend->m_result_set_sequence++;
701         }
702         else
703             result_set_id = "default";
704     }
705     // we must search ...
706     BackendSetPtr new_set(new BackendSet(result_set_id,
707                                          databases, query));
708     bool fatal_error = false;
709     if (!new_set->search(package, apdu_req, found_backend, fatal_error))
710     {
711         if (fatal_error)
712             bc->remove_backend(found_backend);
713         else
714             bc->release_backend(found_backend);
715         return; // search error 
716     }
717     if (!session_restarted && new_set->m_result_set_size < 0)
718     {
719         bc->remove_backend(found_backend);
720         session_restarted = true;
721         found_backend.reset();
722         goto restart;
723     }
724
725     found_set = new_set;
726     found_set->timestamp();
727     found_backend->m_sets.push_back(found_set);
728 }
729
730 void yf::SessionShared::Frontend::search(mp::Package &package,
731                                          Z_APDU *apdu_req)
732 {
733     Z_SearchRequest *req = apdu_req->u.searchRequest;
734     FrontendSets::iterator fset_it = 
735         m_frontend_sets.find(req->resultSetName);
736     if (fset_it != m_frontend_sets.end())
737     {
738         // result set already exist 
739         // if replace indicator is off: we return diagnostic if
740         // result set already exist.
741         if (*req->replaceIndicator == 0)
742         {
743             mp::odr odr;
744             Z_APDU *apdu = 
745                 odr.create_searchResponse(
746                     apdu_req,
747                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
748                     0);
749             package.response() = apdu;
750             
751             return;
752         }
753         m_frontend_sets.erase(fset_it);
754     }
755     
756     yazpp_1::Yaz_Z_Query query;
757     query.set_Z_Query(req->query);
758     Databases databases;
759     int i;
760     for (i = 0; i<req->num_databaseNames; i++)
761         databases.push_back(req->databaseNames[i]);
762
763     BackendSetPtr found_set; // null
764     BackendInstancePtr found_backend; // null
765
766     get_set(package, apdu_req, databases, query, found_backend, found_set);
767     if (!found_set)
768         return;
769
770     mp::odr odr;
771     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
772     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
773     *f_resp->resultCount = found_set->m_result_set_size;
774     package.response() = f_apdu;
775
776     FrontendSetPtr fset(new FrontendSet(databases, query));
777     m_frontend_sets[req->resultSetName] = fset;
778
779     m_backend_class->release_backend(found_backend);
780 }
781
782 void yf::SessionShared::Frontend::present(mp::Package &package,
783                                           Z_APDU *apdu_req)
784 {
785     mp::odr odr;
786     Z_PresentRequest *req = apdu_req->u.presentRequest;
787
788     FrontendSets::iterator fset_it = 
789         m_frontend_sets.find(req->resultSetId);
790
791     if (fset_it == m_frontend_sets.end())
792     {
793         Z_APDU *apdu = 
794             odr.create_presentResponse(
795                 apdu_req,
796                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
797                 req->resultSetId);
798         package.response() = apdu;
799         return;
800     }
801     FrontendSetPtr fset = fset_it->second;
802
803     Databases databases = fset->get_databases();
804     yazpp_1::Yaz_Z_Query query = fset->get_query();
805
806     BackendClassPtr bc = m_backend_class;
807     BackendSetPtr found_set; // null
808     BackendInstancePtr found_backend;
809
810     get_set(package, apdu_req, databases, query, found_backend, found_set);
811     if (!found_set)
812         return;
813
814     Z_NamePlusRecordList *npr_res = 0;
815     if (found_set->m_record_cache.lookup(odr, &npr_res, 
816                                          *req->resultSetStartPoint,
817                                          *req->numberOfRecordsRequested,
818                                          req->preferredRecordSyntax,
819                                          req->recordComposition))
820     {
821         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
822         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
823
824         yaz_log(YLOG_LOG, "Found %d+%d records in cache %p",
825                 *req->resultSetStartPoint,                      
826                 *req->numberOfRecordsRequested,
827                 &found_set->m_record_cache);        
828
829         *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested;
830         *f_resp->nextResultSetPosition = 
831             *req->resultSetStartPoint + *req->numberOfRecordsRequested;
832         // f_resp->presentStatus assumed OK.
833         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
834         f_resp->records->which = Z_Records_DBOSD;
835         f_resp->records->u.databaseOrSurDiagnostics = npr_res;
836         package.response() = f_apdu_res;
837         bc->release_backend(found_backend);
838         return;
839     }
840                               
841     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
842     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
843     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
844     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
845     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
846     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
847     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
848     p_req->recordComposition = req->recordComposition;
849
850     Package present_package(found_backend->m_session, package.origin());
851     present_package.copy_filter(package);
852
853     present_package.request() = p_apdu;
854
855     present_package.move();
856
857     Z_GDU *gdu = present_package.response().get();
858     if (!present_package.session().is_closed()
859         && gdu && gdu->which == Z_GDU_Z3950 
860         && gdu->u.z3950->which == Z_APDU_presentResponse)
861     {
862         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
863         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
864         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
865
866         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
867         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
868         f_resp->presentStatus= b_resp->presentStatus;
869         f_resp->records = b_resp->records;
870         f_resp->otherInfo = b_resp->otherInfo;
871         package.response() = f_apdu_res;
872
873         if (b_resp->records && b_resp->records->which ==  Z_Records_DBOSD)
874         {
875             yaz_log(YLOG_LOG, "Adding %d+%d records to cache %p",
876                     *req->resultSetStartPoint,                      
877                     *f_resp->numberOfRecordsReturned,
878                     &found_set->m_record_cache);        
879             found_set->m_record_cache.add(
880                 odr,
881                 b_resp->records->u.databaseOrSurDiagnostics,
882                 *req->resultSetStartPoint,                      
883                 *f_resp->numberOfRecordsReturned);
884         }
885         bc->release_backend(found_backend);
886     }
887     else
888     {
889         bc->remove_backend(found_backend);
890         Z_APDU *f_apdu_res = 
891             odr.create_presentResponse(
892                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
893         package.response() = f_apdu_res;
894     }
895 }
896
897 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
898                                        Z_APDU *apdu_req)
899 {
900     BackendClassPtr bc = m_backend_class;
901     BackendInstancePtr backend = bc->get_backend(frontend_package);
902     if (!backend)
903     {
904         mp::odr odr;
905         Z_APDU *apdu = odr.create_scanResponse(
906             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
907         frontend_package.response() = apdu;
908     }
909     else
910     {
911         Package scan_package(backend->m_session, frontend_package.origin());
912         scan_package.copy_filter(frontend_package);
913         scan_package.request() = apdu_req;
914         scan_package.move();
915         frontend_package.response() = scan_package.response();
916         if (scan_package.session().is_closed())
917         {
918             frontend_package.session().close();
919             bc->remove_backend(backend);
920         }
921         else
922             bc->release_backend(backend);
923     }
924 }
925
926 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
927 {
928 }
929
930 void yf::SessionShared::Worker::operator() (void)
931 {
932     m_p->expire();
933 }
934
935 void yf::SessionShared::BackendClass::expire_class()
936 {
937     time_t now;
938     time(&now);
939     boost::mutex::scoped_lock lock(m_mutex_backend_class);
940     BackendInstanceList::iterator bit = m_backend_list.begin();
941     while (bit != m_backend_list.end())
942     {
943         time_t last_use = (*bit)->m_time_last_use;
944         
945         if ((*bit)->m_in_use)
946         {
947             bit++;
948         }
949         else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
950             || (now < last_use))
951         {
952             mp::odr odr;
953             (*bit)->m_close_package->response() = odr.create_close(
954                 0, Z_Close_lackOfActivity, 0);
955             (*bit)->m_close_package->session().close();
956             (*bit)->m_close_package->move();
957
958             bit = m_backend_list.erase(bit);
959         }
960         else
961         {
962             bit++;
963         }
964     }
965 }
966
967 void yf::SessionShared::Rep::expire()
968 {
969     while (true)
970     {
971         boost::xtime xt;
972         boost::xtime_get(&xt, boost::TIME_UTC);
973         xt.sec += 30;
974         boost::thread::sleep(xt);
975         
976         BackendClassMap::const_iterator b_it = m_backend_map.begin();
977         for (; b_it != m_backend_map.end(); b_it++)
978             b_it->second->expire_class();
979     }
980 }
981
982 yf::SessionShared::Rep::Rep()
983 {
984     m_resultset_ttl = 30;
985     m_resultset_max = 10;
986     m_session_ttl = 90;
987     yf::SessionShared::Worker w(this);
988     m_thrds.add_thread(new boost::thread(w));
989 }
990
991 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
992 {
993 }
994
995 yf::SessionShared::~SessionShared() {
996 }
997
998
999 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
1000 {
1001 }
1002
1003 yf::SessionShared::Frontend::~Frontend()
1004 {
1005 }
1006
1007 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
1008 {
1009     boost::mutex::scoped_lock lock(m_mutex);
1010
1011     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1012     
1013     while(true)
1014     {
1015         it = m_clients.find(package.session());
1016         if (it == m_clients.end())
1017             break;
1018         
1019         if (!it->second->m_in_use)
1020         {
1021             it->second->m_in_use = true;
1022             return it->second;
1023         }
1024         m_cond_session_ready.wait(lock);
1025     }
1026     FrontendPtr f(new Frontend(this));
1027     m_clients[package.session()] = f;
1028     f->m_in_use = true;
1029     return f;
1030 }
1031
1032 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
1033 {
1034     boost::mutex::scoped_lock lock(m_mutex);
1035     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1036     
1037     it = m_clients.find(package.session());
1038     if (it != m_clients.end())
1039     {
1040         if (package.session().is_closed())
1041         {
1042             m_clients.erase(it);
1043         }
1044         else
1045         {
1046             it->second->m_in_use = false;
1047         }
1048         m_cond_session_ready.notify_all();
1049     }
1050 }
1051
1052
1053 void yf::SessionShared::process(mp::Package &package) const
1054 {
1055     FrontendPtr f = m_p->get_frontend(package);
1056
1057     Z_GDU *gdu = package.request().get();
1058     
1059     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1060         Z_APDU_initRequest && !f->m_is_virtual)
1061     {
1062         m_p->init(package, gdu, f);
1063     }
1064     else if (!f->m_is_virtual)
1065         package.move();
1066     else if (gdu && gdu->which == Z_GDU_Z3950)
1067     {
1068         Z_APDU *apdu = gdu->u.z3950;
1069         if (apdu->which == Z_APDU_initRequest)
1070         {
1071             mp::odr odr;
1072             
1073             package.response() = odr.create_close(
1074                 apdu,
1075                 Z_Close_protocolError,
1076                 "double init");
1077             
1078             package.session().close();
1079         }
1080         else if (apdu->which == Z_APDU_close)
1081         {
1082             mp::odr odr;
1083             
1084             package.response() = odr.create_close(
1085                 apdu,
1086                 Z_Close_peerAbort, "received close from client");
1087             package.session().close();
1088         }
1089         else if (apdu->which == Z_APDU_searchRequest)
1090         {
1091             f->search(package, apdu);
1092         }
1093         else if (apdu->which == Z_APDU_presentRequest)
1094         {
1095             f->present(package, apdu);
1096         }
1097         else if (apdu->which == Z_APDU_scanRequest)
1098         {
1099             f->scan(package, apdu);
1100         }
1101         else
1102         {
1103             mp::odr odr;
1104             
1105             package.response() = odr.create_close(
1106                 apdu, Z_Close_protocolError,
1107                 "unsupported APDU in filter_session_shared");
1108             
1109             package.session().close();
1110         }
1111     }
1112     m_p->release_frontend(package);
1113 }
1114
1115 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only)
1116 {
1117     for (ptr = ptr->children; ptr; ptr = ptr->next)
1118     {
1119         if (ptr->type != XML_ELEMENT_NODE)
1120             continue;
1121         if (!strcmp((const char *) ptr->name, "resultset"))
1122         {
1123             const struct _xmlAttr *attr;
1124             for (attr = ptr->properties; attr; attr = attr->next)
1125             {
1126                 if (!strcmp((const char *) attr->name, "ttl"))
1127                     m_p->m_resultset_ttl = 
1128                         mp::xml::get_int(attr->children, 30);
1129                 else if (!strcmp((const char *) attr->name, "max"))
1130                 {
1131                     m_p->m_resultset_max = 
1132                         mp::xml::get_int(attr->children, 10);
1133                 }
1134                 else
1135                     throw mp::filter::FilterException(
1136                         "Bad attribute " + std::string((const char *)
1137                                                        attr->name));
1138             }
1139         }
1140         else if (!strcmp((const char *) ptr->name, "session"))
1141         {
1142             const struct _xmlAttr *attr;
1143             for (attr = ptr->properties; attr; attr = attr->next)
1144             {
1145                 if (!strcmp((const char *) attr->name, "ttl"))
1146                     m_p->m_session_ttl = 
1147                         mp::xml::get_int(attr->children, 120);
1148                 else
1149                     throw mp::filter::FilterException(
1150                         "Bad attribute " + std::string((const char *)
1151                                                        attr->name));
1152             }
1153         }
1154         else
1155         {
1156             throw mp::filter::FilterException("Bad element " 
1157                                                + std::string((const char *)
1158                                                              ptr->name));
1159         }
1160     }
1161 }
1162
1163 static mp::filter::Base* filter_creator()
1164 {
1165     return new mp::filter::SessionShared;
1166 }
1167
1168 extern "C" {
1169     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1170         0,
1171         "session_shared",
1172         filter_creator
1173     };
1174 }
1175
1176 /*
1177  * Local variables:
1178  * c-basic-offset: 4
1179  * c-file-style: "Stroustrup"
1180  * indent-tabs-mode: nil
1181  * End:
1182  * vim: shiftwidth=4 tabstop=8 expandtab
1183  */
1184