Updated footer comment
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2008 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include "filter.hpp"
22 #include "package.hpp"
23
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30
31 #include "util.hpp"
32 #include "filter_session_shared.hpp"
33
34 #include <yaz/log.h>
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yazpp/z-query.h>
39 #include <map>
40 #include <iostream>
41 #include <time.h>
42
43 namespace mp = metaproxy_1;
44 namespace yf = metaproxy_1::filter;
45
46 namespace metaproxy_1 {
47
48     namespace filter {
49         // key for session.. We'll only share sessions with same InitKey
50         class SessionShared::InitKey {
51         public:
52             bool operator < (const SessionShared::InitKey &k) const;
53             InitKey(Z_InitRequest *req);
54             InitKey(const InitKey &);
55             ~InitKey();
56         private:
57             char *m_idAuthentication_buf;
58             int m_idAuthentication_size;
59             char *m_otherInfo_buf;
60             int m_otherInfo_size;
61             ODR m_odr;
62         };
63         // worker thread .. for expiry of sessions
64         class SessionShared::Worker {
65         public:
66             Worker(SessionShared::Rep *rep);
67             void operator() (void);
68         private:
69             SessionShared::Rep *m_p;
70         };
71         // backend result set
72         class SessionShared::BackendSet {
73         public:
74             std::string m_result_set_id;
75             Databases m_databases;
76             int m_result_set_size;
77             yazpp_1::Yaz_Z_Query m_query;
78             time_t m_time_last_use;
79             void timestamp();
80             BackendSet(
81                 const std::string &result_set_id,
82                 const Databases &databases,
83                 const yazpp_1::Yaz_Z_Query &query);
84             bool search(
85                 Package &frontend_package,
86                 const Z_APDU *apdu_req,
87                 const BackendInstancePtr bp,
88                 bool &fatal_error);
89         };
90         // backend connection instance
91         class SessionShared::BackendInstance {
92             friend class Rep;
93             friend class BackendClass;
94             friend class BackendSet;
95         public:
96             mp::Session m_session;
97             BackendSetList m_sets;
98             bool m_in_use;
99             int m_sequence_this;
100             int m_result_set_sequence;
101             time_t m_time_last_use;
102             mp::Package * m_close_package;
103             ~BackendInstance();
104         };
105         // backends of some class (all with same InitKey)
106         class SessionShared::BackendClass : boost::noncopyable {
107             friend class Rep;
108             friend struct Frontend;
109             bool m_named_result_sets;
110             BackendInstanceList m_backend_list;
111             BackendInstancePtr create_backend(const Package &package);
112             void remove_backend(BackendInstancePtr b);
113             BackendInstancePtr get_backend(const Package &package);
114             void use_backend(BackendInstancePtr b);
115             void release_backend(BackendInstancePtr b);
116             void expire_class();
117             yazpp_1::GDU m_init_request;
118             yazpp_1::GDU m_init_response;
119             boost::mutex m_mutex_backend_class;
120             int m_sequence_top;
121             time_t m_backend_set_ttl;
122             time_t m_backend_expiry_ttl;
123             size_t m_backend_set_max;
124         public:
125             BackendClass(const yazpp_1::GDU &init_request,
126                          int resultset_ttl,
127                          int resultset_max,
128                          int session_ttl);
129             ~BackendClass();
130         };
131         // frontend result set
132         class SessionShared::FrontendSet {
133             Databases m_databases;
134             yazpp_1::Yaz_Z_Query m_query;
135         public:
136             const Databases &get_databases();
137             const yazpp_1::Yaz_Z_Query &get_query();
138             FrontendSet(
139                 const Databases &databases,
140                 const yazpp_1::Yaz_Z_Query &query);
141             FrontendSet();
142         };
143         // frontend session
144         struct SessionShared::Frontend {
145             Frontend(Rep *rep);
146             ~Frontend();
147             bool m_is_virtual;
148             bool m_in_use;
149             Z_Options m_init_options;
150             void search(Package &package, Z_APDU *apdu);
151             void present(Package &package, Z_APDU *apdu);
152             void scan(Package &package, Z_APDU *apdu);
153
154             void get_set(mp::Package &package,
155                          const Z_APDU *apdu_req,
156                          const Databases &databases,
157                          yazpp_1::Yaz_Z_Query &query,
158                          BackendInstancePtr &found_backend,
159                          BackendSetPtr &found_set);
160             void override_set(BackendInstancePtr &found_backend,
161                               std::string &result_set_id);
162
163             Rep *m_p;
164             BackendClassPtr m_backend_class;
165             FrontendSets m_frontend_sets;
166         };            
167         // representation
168         class SessionShared::Rep {
169             friend class SessionShared;
170             friend struct Frontend;
171             
172             FrontendPtr get_frontend(Package &package);
173             void release_frontend(Package &package);
174             Rep();
175         public:
176             void expire();
177         private:
178             void init(Package &package, const Z_GDU *gdu,
179                       FrontendPtr frontend);
180             boost::mutex m_mutex;
181             boost::condition m_cond_session_ready;
182             std::map<mp::Session, FrontendPtr> m_clients;
183
184             BackendClassMap m_backend_map;
185             boost::mutex m_mutex_backend_map;
186             boost::thread_group m_thrds;
187             int m_resultset_ttl;
188             int m_resultset_max;
189             int m_session_ttl;
190         };
191     }
192 }
193
194 yf::SessionShared::FrontendSet::FrontendSet(
195     const Databases &databases,
196     const yazpp_1::Yaz_Z_Query &query)
197     : m_databases(databases), m_query(query)
198 {
199 }
200
201 const yf::SessionShared::Databases & 
202 yf::SessionShared::FrontendSet::get_databases()
203 {
204     return m_databases;
205 }
206
207 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
208 {
209     return m_query;
210 }
211
212 yf::SessionShared::InitKey::InitKey(const InitKey &k)
213 {
214     m_odr = odr_createmem(ODR_ENCODE);
215     
216     m_idAuthentication_size =  k.m_idAuthentication_size;
217     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
218     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
219            m_idAuthentication_size);
220
221     m_otherInfo_size =  k.m_otherInfo_size;
222     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
223     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
224            m_otherInfo_size);
225 }
226
227 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
228 {
229     m_odr = odr_createmem(ODR_ENCODE);
230
231     Z_IdAuthentication *t = req->idAuthentication;
232     z_IdAuthentication(m_odr, &t, 1, 0);
233     m_idAuthentication_buf =
234         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
235
236     Z_OtherInformation *o = req->otherInfo;
237     z_OtherInformation(m_odr, &o, 1, 0);
238     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
239 }
240
241 yf::SessionShared::InitKey::~InitKey()
242 {
243     odr_destroy(m_odr);
244 }
245
246 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
247     const 
248 {
249     int c;
250     c = mp::util::memcmp2(
251         (void*) m_idAuthentication_buf, m_idAuthentication_size,
252         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
253     if (c < 0)
254         return true;
255     else if (c > 0)
256         return false;
257
258     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
259                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
260     if (c < 0)
261         return true;
262     else if (c > 0)
263         return false;
264     return false;
265 }
266
267 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
268 {
269     boost::mutex::scoped_lock lock(m_mutex_backend_class);
270     b->m_in_use = false;
271 }
272
273
274 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
275 {
276     BackendInstanceList::iterator it = m_backend_list.begin();
277     
278     while (it != m_backend_list.end())
279     {
280         if (*it == b)
281         {
282              mp::odr odr;
283             (*it)->m_close_package->response() = odr.create_close(
284                 0, Z_Close_lackOfActivity, 0);
285             (*it)->m_close_package->session().close();
286             (*it)->m_close_package->move();
287             
288             it = m_backend_list.erase(it);
289         }
290         else
291             it++;
292     }
293 }
294
295
296
297 yf::SessionShared::BackendInstancePtr 
298 yf::SessionShared::BackendClass::get_backend(
299     const mp::Package &frontend_package)
300 {
301     {
302         boost::mutex::scoped_lock lock(m_mutex_backend_class);
303         
304         BackendInstanceList::const_iterator it = m_backend_list.begin();
305         
306         BackendInstancePtr backend1; // null
307         
308         for (; it != m_backend_list.end(); it++)
309         {
310             if (!(*it)->m_in_use)
311             {
312                 if (!backend1 
313                     || (*it)->m_sequence_this < backend1->m_sequence_this)
314                     backend1 = *it;
315             }
316         }
317         if (backend1)
318         {
319             use_backend(backend1);
320             return backend1;
321         }
322     }
323     return create_backend(frontend_package);
324 }
325
326 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
327 {
328     backend->m_in_use = true;
329     time(&backend->m_time_last_use);
330     backend->m_sequence_this = m_sequence_top++;
331 }
332
333 yf::SessionShared::BackendInstance::~BackendInstance()
334 {
335     delete m_close_package;
336 }
337
338 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
339     const mp::Package &frontend_package)
340 {
341     BackendInstancePtr bp(new BackendInstance);
342     BackendInstancePtr null;
343
344     bp->m_close_package =
345         new mp::Package(bp->m_session, frontend_package.origin());
346     bp->m_close_package->copy_filter(frontend_package);
347
348     Package init_package(bp->m_session, frontend_package.origin());
349
350     init_package.copy_filter(frontend_package);
351
352     yazpp_1::GDU actual_init_request = m_init_request;
353     Z_GDU *init_pdu = actual_init_request.get();
354
355     assert(init_pdu->which == Z_GDU_Z3950);
356     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
357
358     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
359     ODR_MASK_ZERO(req->options);
360
361     ODR_MASK_SET(req->options, Z_Options_search);
362     ODR_MASK_SET(req->options, Z_Options_present);
363     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
364     ODR_MASK_SET(req->options, Z_Options_scan);
365
366     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
367     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
368     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
369
370     init_package.request() = init_pdu;
371
372     init_package.move();
373
374     boost::mutex::scoped_lock lock(m_mutex_backend_class);
375
376     m_named_result_sets = false;
377     Z_GDU *gdu = init_package.response().get();
378     if (!init_package.session().is_closed()
379         && gdu && gdu->which == Z_GDU_Z3950 
380         && gdu->u.z3950->which == Z_APDU_initResponse)
381     {
382         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
383         if (!*res->result)
384             return null;
385         m_init_response = gdu->u.z3950;
386         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
387         {
388             m_named_result_sets = true;
389         }
390     }
391     else
392     {
393         // did not receive an init response or closed
394         return null;
395     }
396     bp->m_in_use = true;
397     time(&bp->m_time_last_use);
398     bp->m_sequence_this = 0;
399     bp->m_result_set_sequence = 0;
400     m_backend_list.push_back(bp);
401
402     return bp;
403 }
404
405
406 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
407                                               int resultset_ttl,
408                                               int resultset_max,
409                                               int session_ttl)
410     : m_named_result_sets(false), m_init_request(init_request),
411       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
412       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
413 {}
414
415 yf::SessionShared::BackendClass::~BackendClass()
416 {}
417
418 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
419                                   FrontendPtr frontend)
420 {
421     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
422
423     frontend->m_is_virtual = true;
424     frontend->m_init_options = *req->options;
425     InitKey k(req);
426     {
427         boost::mutex::scoped_lock lock(m_mutex_backend_map);
428         BackendClassMap::const_iterator it;
429         it = m_backend_map.find(k);
430         if (it == m_backend_map.end())
431         {
432             BackendClassPtr b(new BackendClass(gdu->u.z3950,
433                                                m_resultset_ttl,
434                                                m_resultset_max,
435                                                m_session_ttl));
436             m_backend_map[k] = b;
437             frontend->m_backend_class = b;
438         }
439         else
440         {
441             frontend->m_backend_class = it->second;            
442         }
443     }
444     BackendClassPtr bc = frontend->m_backend_class;
445     BackendInstancePtr backend = bc->get_backend(package);
446     
447     mp::odr odr;
448     if (!backend)
449     {
450         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
451         *apdu->u.initResponse->result = 0;
452         package.response() = apdu;
453         package.session().close();
454     }
455     else
456     {
457         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
458         yazpp_1::GDU init_response = bc->m_init_response;
459         Z_GDU *response_gdu = init_response.get();
460         mp::util::transfer_referenceId(odr, gdu->u.z3950,
461                                        response_gdu->u.z3950);
462
463         Z_Options *server_options =
464             response_gdu->u.z3950->u.initResponse->options;
465         Z_Options *client_options = &frontend->m_init_options;
466
467         int i;
468         for (i = 0; i<30; i++)
469             if (!ODR_MASK_GET(client_options, i))
470                 ODR_MASK_CLEAR(server_options, i);
471         package.response() = init_response;
472     }
473     if (backend)
474         bc->release_backend(backend);
475 }
476
477 void yf::SessionShared::BackendSet::timestamp()
478 {
479     time(&m_time_last_use);
480 }
481
482 yf::SessionShared::BackendSet::BackendSet(
483     const std::string &result_set_id,
484     const Databases &databases,
485     const yazpp_1::Yaz_Z_Query &query) :
486     m_result_set_id(result_set_id),
487     m_databases(databases), m_result_set_size(0), m_query(query) 
488 {
489     timestamp();
490 }
491
492 bool yf::SessionShared::BackendSet::search(
493     mp::Package &frontend_package,
494     const Z_APDU *frontend_apdu,
495     const BackendInstancePtr bp,
496     bool & fatal_error)
497 {
498     Package search_package(bp->m_session, frontend_package.origin());
499
500     search_package.copy_filter(frontend_package);
501
502     mp::odr odr;
503     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
504     Z_SearchRequest *req = apdu_req->u.searchRequest;
505
506     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
507     req->query = m_query.get_Z_Query();
508
509     req->num_databaseNames = m_databases.size();
510     req->databaseNames = (char**) 
511         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
512     Databases::const_iterator it = m_databases.begin();
513     size_t i = 0;
514     for (; it != m_databases.end(); it++)
515         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
516
517     search_package.request() = apdu_req;
518
519     search_package.move();
520     fatal_error = false;  // assume backend session is good
521
522     Z_Records *z_records_diag = 0;
523     Z_GDU *gdu = search_package.response().get();
524     if (!search_package.session().is_closed()
525         && gdu && gdu->which == Z_GDU_Z3950 
526         && gdu->u.z3950->which == Z_APDU_searchResponse)
527     {
528         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
529         if (b_resp->records)
530         {
531             if (b_resp->records->which == Z_Records_NSD 
532                 || b_resp->records->which == Z_Records_multipleNSD)
533                 z_records_diag = b_resp->records;
534         }
535         if (z_records_diag)
536         {
537             // there could be diagnostics that are so bad.. that 
538             // we simply mark the error as fatal..  For now we assume
539             // we can resume
540             if (frontend_apdu->which == Z_APDU_searchRequest)
541             {
542                 Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu, 
543                                                            0, 0);
544                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
545                 *f_resp->searchStatus = *b_resp->searchStatus;
546                 f_resp->records = z_records_diag;
547                 frontend_package.response() = f_apdu;
548                 return false;
549             }
550             if (frontend_apdu->which == Z_APDU_presentRequest)
551             {
552                 Z_APDU *f_apdu = odr.create_presentResponse(frontend_apdu, 
553                                                             0, 0);
554                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
555                 f_resp->records = z_records_diag;
556                 frontend_package.response() = f_apdu;
557                 return false;
558             }
559         }
560         m_result_set_size = *b_resp->resultCount;
561         return true;
562     }
563     Z_APDU *f_apdu = 0;
564     if (frontend_apdu->which == Z_APDU_searchRequest)
565         f_apdu = odr.create_searchResponse(
566             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
567     else if (frontend_apdu->which == Z_APDU_presentRequest)
568         f_apdu = odr.create_presentResponse(
569             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
570     else
571         f_apdu = odr.create_close(
572             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
573     frontend_package.response() = f_apdu;
574     fatal_error = true; // weired response.. bad backend
575     return false;
576 }
577
578 void yf::SessionShared::Frontend::override_set(
579     BackendInstancePtr &found_backend,
580     std::string &result_set_id)
581 {
582     BackendClassPtr bc = m_backend_class;
583     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
584     time_t now;
585     time(&now);
586     
587     for (; it != bc->m_backend_list.end(); it++)
588     {
589         if (!(*it)->m_in_use)
590         {
591             BackendSetList::iterator set_it = (*it)->m_sets.begin();
592             for (; set_it != (*it)->m_sets.end(); set_it++)
593             {
594                 if (now >= (*set_it)->m_time_last_use &&
595                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
596                 {
597                     found_backend = *it;
598                     result_set_id = (*set_it)->m_result_set_id;
599                     found_backend->m_sets.erase(set_it);
600                     return;
601                 }
602             }
603         }
604     }
605     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
606     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
607     {
608         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
609         {
610             found_backend = *it;
611             if (bc->m_named_result_sets)
612             {
613                 result_set_id = boost::io::str(
614                     boost::format("%1%") % 
615                     found_backend->m_result_set_sequence);
616                 found_backend->m_result_set_sequence++;
617             }
618             else
619                 result_set_id = "default";
620             return;
621         }
622     }
623 }
624
625 void yf::SessionShared::Frontend::get_set(mp::Package &package,
626                                           const Z_APDU *apdu_req,
627                                           const Databases &databases,
628                                           yazpp_1::Yaz_Z_Query &query,
629                                           BackendInstancePtr &found_backend,
630                                           BackendSetPtr &found_set)
631 {
632     std::string result_set_id;
633     BackendClassPtr bc = m_backend_class;
634     {
635         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
636      
637         // look at each backend and see if we have a similar search
638         BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
639         
640         for (; it != bc->m_backend_list.end(); it++)
641         {
642             if (!(*it)->m_in_use)
643             {
644                 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
645                 for (; set_it != (*it)->m_sets.end(); set_it++)
646                 {
647                     if ((*set_it)->m_databases == databases 
648                         && query.match(&(*set_it)->m_query))
649                     {
650                         found_set = *set_it;
651                         found_backend = *it;
652                         bc->use_backend(found_backend);
653                         found_set->timestamp();
654                         // found matching set. No need to search again
655                         return;
656                     }
657                 }
658             }
659         }
660         override_set(found_backend, result_set_id);
661         if (found_backend)
662             bc->use_backend(found_backend);
663     }
664     if (!found_backend)
665     {
666         // create a new backend set (and new set)
667         found_backend = bc->create_backend(package);
668
669         if (!found_backend)
670         {
671             Z_APDU *f_apdu = 0;
672             mp::odr odr;
673             if (apdu_req->which == Z_APDU_searchRequest)
674             {
675                 f_apdu = odr.create_searchResponse(
676                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
677             }
678             else if (apdu_req->which == Z_APDU_presentRequest)
679             {
680                 f_apdu = odr.create_presentResponse(
681                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
682             }
683             else
684             {
685                 f_apdu = odr.create_close(
686                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
687             }
688             package.response() = f_apdu;
689             return;
690         }
691         if (bc->m_named_result_sets)
692         {
693             result_set_id = boost::io::str(
694                 boost::format("%1%") % found_backend->m_result_set_sequence);
695             found_backend->m_result_set_sequence++;
696         }
697         else
698             result_set_id = "default";
699     }
700     // we must search ...
701     BackendSetPtr new_set(new BackendSet(result_set_id,
702                                          databases, query));
703     bool fatal_error = false;
704     if (!new_set->search(package, apdu_req, found_backend, fatal_error))
705     {
706         if (fatal_error)
707             bc->remove_backend(found_backend);
708         else
709             bc->release_backend(found_backend);
710         return; // search error 
711     }
712     found_set = new_set;
713     found_set->timestamp();
714     found_backend->m_sets.push_back(found_set);
715 }
716
717 void yf::SessionShared::Frontend::search(mp::Package &package,
718                                          Z_APDU *apdu_req)
719 {
720     Z_SearchRequest *req = apdu_req->u.searchRequest;
721     FrontendSets::iterator fset_it = 
722         m_frontend_sets.find(req->resultSetName);
723     if (fset_it != m_frontend_sets.end())
724     {
725         // result set already exist 
726         // if replace indicator is off: we return diagnostic if
727         // result set already exist.
728         if (*req->replaceIndicator == 0)
729         {
730             mp::odr odr;
731             Z_APDU *apdu = 
732                 odr.create_searchResponse(
733                     apdu_req,
734                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
735                     0);
736             package.response() = apdu;
737             
738             return;
739         }
740         m_frontend_sets.erase(fset_it);
741     }
742     
743     yazpp_1::Yaz_Z_Query query;
744     query.set_Z_Query(req->query);
745     Databases databases;
746     int i;
747     for (i = 0; i<req->num_databaseNames; i++)
748         databases.push_back(req->databaseNames[i]);
749
750     BackendSetPtr found_set; // null
751     BackendInstancePtr found_backend; // null
752
753     get_set(package, apdu_req, databases, query, found_backend, found_set);
754     if (!found_set)
755         return;
756
757     mp::odr odr;
758     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
759     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
760     *f_resp->resultCount = found_set->m_result_set_size;
761     package.response() = f_apdu;
762
763     FrontendSetPtr fset(new FrontendSet(databases, query));
764     m_frontend_sets[req->resultSetName] = fset;
765
766     m_backend_class->release_backend(found_backend);
767 }
768
769 void yf::SessionShared::Frontend::present(mp::Package &package,
770                                           Z_APDU *apdu_req)
771 {
772     mp::odr odr;
773     Z_PresentRequest *req = apdu_req->u.presentRequest;
774
775     FrontendSets::iterator fset_it = 
776         m_frontend_sets.find(req->resultSetId);
777
778     if (fset_it == m_frontend_sets.end())
779     {
780         Z_APDU *apdu = 
781             odr.create_presentResponse(
782                 apdu_req,
783                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
784                 req->resultSetId);
785         package.response() = apdu;
786         return;
787     }
788     FrontendSetPtr fset = fset_it->second;
789
790     Databases databases = fset->get_databases();
791     yazpp_1::Yaz_Z_Query query = fset->get_query();
792
793     BackendClassPtr bc = m_backend_class;
794     BackendSetPtr found_set; // null
795     BackendInstancePtr found_backend;
796
797     get_set(package, apdu_req, databases, query, found_backend, found_set);
798     if (!found_set)
799         return;
800
801     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
802     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
803     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
804     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
805     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
806     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
807     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
808     p_req->recordComposition = req->recordComposition;
809
810     Package present_package(found_backend->m_session, package.origin());
811     present_package.copy_filter(package);
812
813     present_package.request() = p_apdu;
814
815     present_package.move();
816
817     Z_GDU *gdu = present_package.response().get();
818     if (!present_package.session().is_closed()
819         && gdu && gdu->which == Z_GDU_Z3950 
820         && gdu->u.z3950->which == Z_APDU_presentResponse)
821     {
822         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
823         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
824         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
825
826         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
827         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
828         f_resp->presentStatus= b_resp->presentStatus;
829         f_resp->records = b_resp->records;
830         f_resp->otherInfo = b_resp->otherInfo;
831         package.response() = f_apdu_res;
832         bc->release_backend(found_backend);
833     }
834     else
835     {
836         bc->remove_backend(found_backend);
837         Z_APDU *f_apdu_res = 
838             odr.create_presentResponse(
839                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
840         package.response() = f_apdu_res;
841     }
842 }
843
844 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
845                                        Z_APDU *apdu_req)
846 {
847     BackendClassPtr bc = m_backend_class;
848     BackendInstancePtr backend = bc->get_backend(frontend_package);
849     if (!backend)
850     {
851         mp::odr odr;
852         Z_APDU *apdu = odr.create_scanResponse(
853             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
854         frontend_package.response() = apdu;
855     }
856     else
857     {
858         Package scan_package(backend->m_session, frontend_package.origin());
859         scan_package.copy_filter(frontend_package);
860         scan_package.request() = apdu_req;
861         scan_package.move();
862         frontend_package.response() = scan_package.response();
863         if (scan_package.session().is_closed())
864         {
865             frontend_package.session().close();
866             bc->remove_backend(backend);
867         }
868         else
869             bc->release_backend(backend);
870     }
871 }
872
873 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
874 {
875 }
876
877 void yf::SessionShared::Worker::operator() (void)
878 {
879     m_p->expire();
880 }
881
882 void yf::SessionShared::BackendClass::expire_class()
883 {
884     time_t now;
885     time(&now);
886     boost::mutex::scoped_lock lock(m_mutex_backend_class);
887     BackendInstanceList::iterator bit = m_backend_list.begin();
888     while (bit != m_backend_list.end())
889     {
890         time_t last_use = (*bit)->m_time_last_use;
891         
892         if ((*bit)->m_in_use)
893         {
894             bit++;
895         }
896         else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
897             || (now < last_use))
898         {
899             mp::odr odr;
900             (*bit)->m_close_package->response() = odr.create_close(
901                 0, Z_Close_lackOfActivity, 0);
902             (*bit)->m_close_package->session().close();
903             (*bit)->m_close_package->move();
904
905             bit = m_backend_list.erase(bit);
906         }
907         else
908         {
909             bit++;
910         }
911     }
912 }
913
914 void yf::SessionShared::Rep::expire()
915 {
916     while (true)
917     {
918         boost::xtime xt;
919         boost::xtime_get(&xt, boost::TIME_UTC);
920         xt.sec += 30;
921         boost::thread::sleep(xt);
922         
923         BackendClassMap::const_iterator b_it = m_backend_map.begin();
924         for (; b_it != m_backend_map.end(); b_it++)
925             b_it->second->expire_class();
926     }
927 }
928
929 yf::SessionShared::Rep::Rep()
930 {
931     m_resultset_ttl = 30;
932     m_resultset_max = 10;
933     m_session_ttl = 90;
934     yf::SessionShared::Worker w(this);
935     m_thrds.add_thread(new boost::thread(w));
936 }
937
938 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
939 {
940 }
941
942 yf::SessionShared::~SessionShared() {
943 }
944
945
946 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
947 {
948 }
949
950 yf::SessionShared::Frontend::~Frontend()
951 {
952 }
953
954 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
955 {
956     boost::mutex::scoped_lock lock(m_mutex);
957
958     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
959     
960     while(true)
961     {
962         it = m_clients.find(package.session());
963         if (it == m_clients.end())
964             break;
965         
966         if (!it->second->m_in_use)
967         {
968             it->second->m_in_use = true;
969             return it->second;
970         }
971         m_cond_session_ready.wait(lock);
972     }
973     FrontendPtr f(new Frontend(this));
974     m_clients[package.session()] = f;
975     f->m_in_use = true;
976     return f;
977 }
978
979 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
980 {
981     boost::mutex::scoped_lock lock(m_mutex);
982     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
983     
984     it = m_clients.find(package.session());
985     if (it != m_clients.end())
986     {
987         if (package.session().is_closed())
988         {
989             m_clients.erase(it);
990         }
991         else
992         {
993             it->second->m_in_use = false;
994         }
995         m_cond_session_ready.notify_all();
996     }
997 }
998
999
1000 void yf::SessionShared::process(mp::Package &package) const
1001 {
1002     FrontendPtr f = m_p->get_frontend(package);
1003
1004     Z_GDU *gdu = package.request().get();
1005     
1006     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1007         Z_APDU_initRequest && !f->m_is_virtual)
1008     {
1009         m_p->init(package, gdu, f);
1010     }
1011     else if (!f->m_is_virtual)
1012         package.move();
1013     else if (gdu && gdu->which == Z_GDU_Z3950)
1014     {
1015         Z_APDU *apdu = gdu->u.z3950;
1016         if (apdu->which == Z_APDU_initRequest)
1017         {
1018             mp::odr odr;
1019             
1020             package.response() = odr.create_close(
1021                 apdu,
1022                 Z_Close_protocolError,
1023                 "double init");
1024             
1025             package.session().close();
1026         }
1027         else if (apdu->which == Z_APDU_close)
1028         {
1029             mp::odr odr;
1030             
1031             package.response() = odr.create_close(
1032                 apdu,
1033                 Z_Close_peerAbort, "received close from client");
1034             package.session().close();
1035         }
1036         else if (apdu->which == Z_APDU_searchRequest)
1037         {
1038             f->search(package, apdu);
1039         }
1040         else if (apdu->which == Z_APDU_presentRequest)
1041         {
1042             f->present(package, apdu);
1043         }
1044         else if (apdu->which == Z_APDU_scanRequest)
1045         {
1046             f->scan(package, apdu);
1047         }
1048         else
1049         {
1050             mp::odr odr;
1051             
1052             package.response() = odr.create_close(
1053                 apdu, Z_Close_protocolError,
1054                 "unsupported APDU in filter_session_shared");
1055             
1056             package.session().close();
1057         }
1058     }
1059     m_p->release_frontend(package);
1060 }
1061
1062 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only)
1063 {
1064     for (ptr = ptr->children; ptr; ptr = ptr->next)
1065     {
1066         if (ptr->type != XML_ELEMENT_NODE)
1067             continue;
1068         if (!strcmp((const char *) ptr->name, "resultset"))
1069         {
1070             const struct _xmlAttr *attr;
1071             for (attr = ptr->properties; attr; attr = attr->next)
1072             {
1073                 if (!strcmp((const char *) attr->name, "ttl"))
1074                     m_p->m_resultset_ttl = 
1075                         mp::xml::get_int(attr->children, 30);
1076                 else if (!strcmp((const char *) attr->name, "max"))
1077                 {
1078                     m_p->m_resultset_max = 
1079                         mp::xml::get_int(attr->children, 10);
1080                 }
1081                 else
1082                     throw mp::filter::FilterException(
1083                         "Bad attribute " + std::string((const char *)
1084                                                        attr->name));
1085             }
1086         }
1087         else if (!strcmp((const char *) ptr->name, "session"))
1088         {
1089             const struct _xmlAttr *attr;
1090             for (attr = ptr->properties; attr; attr = attr->next)
1091             {
1092                 if (!strcmp((const char *) attr->name, "ttl"))
1093                     m_p->m_session_ttl = 
1094                         mp::xml::get_int(attr->children, 120);
1095                 else
1096                     throw mp::filter::FilterException(
1097                         "Bad attribute " + std::string((const char *)
1098                                                        attr->name));
1099             }
1100         }
1101         else
1102         {
1103             throw mp::filter::FilterException("Bad element " 
1104                                                + std::string((const char *)
1105                                                              ptr->name));
1106         }
1107     }
1108 }
1109
1110 static mp::filter::Base* filter_creator()
1111 {
1112     return new mp::filter::SessionShared;
1113 }
1114
1115 extern "C" {
1116     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1117         0,
1118         "session_shared",
1119         filter_creator
1120     };
1121 }
1122
1123 /*
1124  * Local variables:
1125  * c-basic-offset: 4
1126  * c-file-style: "Stroustrup"
1127  * indent-tabs-mode: nil
1128  * End:
1129  * vim: shiftwidth=4 tabstop=8 expandtab
1130  */
1131