Removed a lot of std::cout messages
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* $Id: filter_session_shared.cpp,v 1.19 2008-01-21 15:23:11 adam Exp $
2    Copyright (c) 2005-2007, Index Data.
3
4 This file is part of Metaproxy.
5
6 Metaproxy is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Metaproxy; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #include "config.hpp"
23
24 #include "filter.hpp"
25 #include "package.hpp"
26
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition.hpp>
29 #include <boost/thread/thread.hpp>
30 #include <boost/thread/xtime.hpp>
31 #include <boost/shared_ptr.hpp>
32 #include <boost/format.hpp>
33
34 #include "util.hpp"
35 #include "filter_session_shared.hpp"
36
37 #include <yaz/log.h>
38 #include <yaz/zgdu.h>
39 #include <yaz/otherinfo.h>
40 #include <yaz/diagbib1.h>
41 #include <yazpp/z-query.h>
42 #include <map>
43 #include <iostream>
44 #include <time.h>
45
46 namespace mp = metaproxy_1;
47 namespace yf = metaproxy_1::filter;
48
49 namespace metaproxy_1 {
50
51     namespace filter {
52         class SessionShared::InitKey {
53         public:
54             bool operator < (const SessionShared::InitKey &k) const;
55             InitKey(Z_InitRequest *req);
56             InitKey(const InitKey &);
57             ~InitKey();
58         private:
59             InitKey &operator = (const InitKey &k);
60             char *m_idAuthentication_buf;
61             int m_idAuthentication_size;
62             char *m_otherInfo_buf;
63             int m_otherInfo_size;
64             ODR m_odr;
65         };
66         class SessionShared::Worker {
67         public:
68             Worker(SessionShared::Rep *rep);
69             void operator() (void);
70         private:
71             SessionShared::Rep *m_p;
72         };
73         class SessionShared::BackendSet {
74         public:
75             std::string m_result_set_id;
76             Databases m_databases;
77             int m_result_set_size;
78             yazpp_1::Yaz_Z_Query m_query;
79             time_t m_time_last_use;
80             void timestamp();
81             BackendSet(
82                 const std::string &result_set_id,
83                 const Databases &databases,
84                 const yazpp_1::Yaz_Z_Query &query);
85             bool search(
86                 Package &frontend_package,
87                 const Z_APDU *apdu_req,
88                 const BackendInstancePtr bp);
89         };
90         class SessionShared::BackendInstance {
91             friend class Rep;
92             friend class BackendClass;
93             friend class BackendSet;
94         public:
95             mp::Session m_session;
96             BackendSetList m_sets;
97             bool m_in_use;
98             int m_sequence_this;
99             int m_result_set_sequence;
100             time_t m_time_last_use;
101             mp::Package * m_close_package;
102             ~BackendInstance();
103         };
104         class SessionShared::BackendClass : boost::noncopyable {
105             friend class Rep;
106             friend struct Frontend;
107             bool m_named_result_sets;
108             BackendInstanceList m_backend_list;
109             BackendInstancePtr create_backend(const Package &package);
110             void remove_backend(BackendInstancePtr b);
111             BackendInstancePtr get_backend(const Package &package);
112             void use_backend(BackendInstancePtr b);
113             void release_backend(BackendInstancePtr b);
114             void expire();
115             yazpp_1::GDU m_init_request;
116             yazpp_1::GDU m_init_response;
117             boost::mutex m_mutex_backend_class;
118             int m_sequence_top;
119             time_t m_backend_set_ttl;
120             time_t m_backend_expiry_ttl;
121             size_t m_backend_set_max;
122         public:
123             BackendClass(const yazpp_1::GDU &init_request,
124                          int resultset_ttl,
125                          int resultset_max,
126                          int session_ttl);
127             ~BackendClass();
128         };
129         class SessionShared::FrontendSet {
130             Databases m_databases;
131             yazpp_1::Yaz_Z_Query m_query;
132         public:
133             const Databases &get_databases();
134             const yazpp_1::Yaz_Z_Query &get_query();
135             FrontendSet(
136                 const Databases &databases,
137                 const yazpp_1::Yaz_Z_Query &query);
138             FrontendSet();
139         };
140         struct SessionShared::Frontend {
141             Frontend(Rep *rep);
142             ~Frontend();
143             bool m_is_virtual;
144             bool m_in_use;
145             Z_Options m_init_options;
146             void search(Package &package, Z_APDU *apdu);
147             void present(Package &package, Z_APDU *apdu);
148             void scan(Package &package, Z_APDU *apdu);
149
150             void get_set(mp::Package &package,
151                          const Z_APDU *apdu_req,
152                          const Databases &databases,
153                          yazpp_1::Yaz_Z_Query &query,
154                          BackendInstancePtr &found_backend,
155                          BackendSetPtr &found_set);
156             void override_set(BackendInstancePtr &found_backend,
157                               std::string &result_set_id);
158
159             Rep *m_p;
160             BackendClassPtr m_backend_class;
161             FrontendSets m_frontend_sets;
162         };            
163         class SessionShared::Rep {
164             friend class SessionShared;
165             friend struct Frontend;
166             
167             FrontendPtr get_frontend(Package &package);
168             void release_frontend(Package &package);
169             Rep();
170         public:
171             void expire();
172         private:
173             void init(Package &package, const Z_GDU *gdu,
174                       FrontendPtr frontend);
175             boost::mutex m_mutex;
176             boost::condition m_cond_session_ready;
177             std::map<mp::Session, FrontendPtr> m_clients;
178
179             BackendClassMap m_backend_map;
180             boost::mutex m_mutex_backend_map;
181             boost::thread_group m_thrds;
182             int m_resultset_ttl;
183             int m_resultset_max;
184             int m_session_ttl;
185         };
186     }
187 }
188
189 yf::SessionShared::FrontendSet::FrontendSet(
190     const Databases &databases,
191     const yazpp_1::Yaz_Z_Query &query)
192     : m_databases(databases), m_query(query)
193 {
194 }
195
196 const yf::SessionShared::Databases & 
197 yf::SessionShared::FrontendSet::get_databases()
198 {
199     return m_databases;
200 }
201
202 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
203 {
204     return m_query;
205 }
206
207 yf::SessionShared::InitKey::InitKey(const InitKey &k)
208 {
209     m_odr = odr_createmem(ODR_ENCODE);
210     
211     m_idAuthentication_size =  k.m_idAuthentication_size;
212     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
213     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
214            m_idAuthentication_size);
215
216     m_otherInfo_size =  k.m_otherInfo_size;
217     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
218     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
219            m_otherInfo_size);
220 }
221
222 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
223 {
224     m_odr = odr_createmem(ODR_ENCODE);
225
226     Z_IdAuthentication *t = req->idAuthentication;
227     z_IdAuthentication(m_odr, &t, 1, 0);
228     m_idAuthentication_buf =
229         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
230
231     Z_OtherInformation *o = req->otherInfo;
232     z_OtherInformation(m_odr, &o, 1, 0);
233     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
234 }
235
236 yf::SessionShared::InitKey::~InitKey()
237 {
238     odr_destroy(m_odr);
239 }
240
241 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
242     const 
243 {
244     int c;
245     c = mp::util::memcmp2(
246         (void*) m_idAuthentication_buf, m_idAuthentication_size,
247         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
248     if (c < 0)
249         return true;
250     else if (c > 0)
251         return false;
252
253     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
254                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
255     if (c < 0)
256         return true;
257     else if (c > 0)
258         return false;
259     return false;
260 }
261
262 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
263 {
264     boost::mutex::scoped_lock lock(m_mutex_backend_class);
265     b->m_in_use = false;
266 }
267
268
269 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
270 {
271     BackendInstanceList::iterator it = m_backend_list.begin();
272     
273     while (it != m_backend_list.end())
274     {
275         if (*it == b)
276             it = m_backend_list.erase(it);
277         else
278             it++;
279     }
280 }
281
282
283
284 yf::SessionShared::BackendInstancePtr 
285 yf::SessionShared::BackendClass::get_backend(
286     const mp::Package &frontend_package)
287 {
288     {
289         boost::mutex::scoped_lock lock(m_mutex_backend_class);
290         
291         BackendInstanceList::const_iterator it = m_backend_list.begin();
292         
293         BackendInstancePtr backend1; // null
294         
295         for (; it != m_backend_list.end(); it++)
296         {
297             if (!(*it)->m_in_use)
298             {
299                 if (!backend1 
300                     || (*it)->m_sequence_this < backend1->m_sequence_this)
301                     backend1 = *it;
302             }
303         }
304         if (backend1)
305         {
306             use_backend(backend1);
307             return backend1;
308         }
309     }
310     return create_backend(frontend_package);
311 }
312
313 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
314 {
315     backend->m_in_use = true;
316     time(&backend->m_time_last_use);
317     backend->m_sequence_this = m_sequence_top++;
318 }
319
320 yf::SessionShared::BackendInstance::~BackendInstance()
321 {
322     delete m_close_package;
323 }
324
325 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
326     const mp::Package &frontend_package)
327 {
328     BackendInstancePtr bp(new BackendInstance);
329     BackendInstancePtr null;
330
331     bp->m_close_package =
332         new mp::Package(bp->m_session, frontend_package.origin());
333     bp->m_close_package->copy_filter(frontend_package);
334
335     Package init_package(bp->m_session, frontend_package.origin());
336
337     init_package.copy_filter(frontend_package);
338
339     yazpp_1::GDU actual_init_request = m_init_request;
340     Z_GDU *init_pdu = actual_init_request.get();
341
342     assert(init_pdu->which == Z_GDU_Z3950);
343     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
344
345     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
346     ODR_MASK_ZERO(req->options);
347
348     ODR_MASK_SET(req->options, Z_Options_search);
349     ODR_MASK_SET(req->options, Z_Options_present);
350     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
351     ODR_MASK_SET(req->options, Z_Options_scan);
352
353     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
354     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
355     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
356
357     init_package.request() = init_pdu;
358
359     init_package.move();
360
361     boost::mutex::scoped_lock lock(m_mutex_backend_class);
362
363     m_named_result_sets = false;
364     Z_GDU *gdu = init_package.response().get();
365     if (!init_package.session().is_closed()
366         && gdu && gdu->which == Z_GDU_Z3950 
367         && gdu->u.z3950->which == Z_APDU_initResponse)
368     {
369         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
370         if (!*res->result)
371             return null;
372         m_init_response = gdu->u.z3950;
373         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
374         {
375             m_named_result_sets = true;
376         }
377     }
378     else
379     {
380         // did not receive an init response or closed
381         return null;
382     }
383     bp->m_in_use = true;
384     time(&bp->m_time_last_use);
385     bp->m_sequence_this = 0;
386     bp->m_result_set_sequence = 0;
387     m_backend_list.push_back(bp);
388
389     return bp;
390 }
391
392
393 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
394                                               int resultset_ttl,
395                                               int resultset_max,
396                                               int session_ttl)
397     : m_named_result_sets(false), m_init_request(init_request),
398       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
399       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
400 {}
401
402 yf::SessionShared::BackendClass::~BackendClass()
403 {}
404
405 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
406                                   FrontendPtr frontend)
407 {
408     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
409
410     frontend->m_is_virtual = true;
411     frontend->m_init_options = *req->options;
412     InitKey k(req);
413     {
414         boost::mutex::scoped_lock lock(m_mutex_backend_map);
415         BackendClassMap::const_iterator it;
416         it = m_backend_map.find(k);
417         if (it == m_backend_map.end())
418         {
419             BackendClassPtr b(new BackendClass(gdu->u.z3950,
420                                                m_resultset_ttl,
421                                                m_resultset_max,
422                                                m_session_ttl));
423             m_backend_map[k] = b;
424             frontend->m_backend_class = b;
425         }
426         else
427         {
428             frontend->m_backend_class = it->second;            
429         }
430     }
431     BackendClassPtr bc = frontend->m_backend_class;
432     BackendInstancePtr backend = bc->get_backend(package);
433     
434     mp::odr odr;
435     if (!backend)
436     {
437         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
438         *apdu->u.initResponse->result = 0;
439         package.response() = apdu;
440         package.session().close();
441     }
442     else
443     {
444         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
445         yazpp_1::GDU init_response = bc->m_init_response;
446         Z_GDU *response_gdu = init_response.get();
447         mp::util::transfer_referenceId(odr, gdu->u.z3950,
448                                        response_gdu->u.z3950);
449
450         Z_Options *server_options =
451             response_gdu->u.z3950->u.initResponse->options;
452         Z_Options *client_options = &frontend->m_init_options;
453
454         int i;
455         for (i = 0; i<30; i++)
456             if (!ODR_MASK_GET(client_options, i))
457                 ODR_MASK_CLEAR(server_options, i);
458         package.response() = init_response;
459     }
460     if (backend)
461         bc->release_backend(backend);
462 }
463
464 void yf::SessionShared::BackendSet::timestamp()
465 {
466     time(&m_time_last_use);
467 }
468
469 yf::SessionShared::BackendSet::BackendSet(
470     const std::string &result_set_id,
471     const Databases &databases,
472     const yazpp_1::Yaz_Z_Query &query) :
473     m_result_set_id(result_set_id),
474     m_databases(databases), m_result_set_size(0), m_query(query) 
475 {
476     timestamp();
477 }
478
479 bool yf::SessionShared::BackendSet::search(
480     mp::Package &frontend_package,
481     const Z_APDU *frontend_apdu,
482     const BackendInstancePtr bp)
483 {
484     Package search_package(bp->m_session, frontend_package.origin());
485
486     search_package.copy_filter(frontend_package);
487
488     mp::odr odr;
489     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
490     Z_SearchRequest *req = apdu_req->u.searchRequest;
491
492     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
493     req->query = m_query.get_Z_Query();
494
495     req->num_databaseNames = m_databases.size();
496     req->databaseNames = (char**) 
497         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
498     Databases::const_iterator it = m_databases.begin();
499     size_t i = 0;
500     for (; it != m_databases.end(); it++)
501         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
502
503     search_package.request() = apdu_req;
504
505     search_package.move();
506     
507     Z_Records *z_records_diag = 0;
508     Z_GDU *gdu = search_package.response().get();
509     if (!search_package.session().is_closed()
510         && gdu && gdu->which == Z_GDU_Z3950 
511         && gdu->u.z3950->which == Z_APDU_searchResponse)
512     {
513         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
514         if (b_resp->records)
515         {
516             if (b_resp->records->which == Z_Records_NSD 
517                 || b_resp->records->which == Z_Records_multipleNSD)
518                 z_records_diag = b_resp->records;
519         }
520         if (z_records_diag)
521         {
522             if (frontend_apdu->which == Z_APDU_searchRequest)
523             {
524                 Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu, 
525                                                            0, 0);
526                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
527                 f_resp->records = z_records_diag;
528                 frontend_package.response() = f_apdu;
529                 return false;
530             }
531             if (frontend_apdu->which == Z_APDU_presentRequest)
532             {
533                 Z_APDU *f_apdu = odr.create_presentResponse(frontend_apdu, 
534                                                             0, 0);
535                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
536                 f_resp->records = z_records_diag;
537                 frontend_package.response() = f_apdu;
538                 return false;
539             }
540         }
541         m_result_set_size = *b_resp->resultCount;
542         return true;
543     }
544     Z_APDU *f_apdu = 0;
545     if (frontend_apdu->which == Z_APDU_searchRequest)
546         f_apdu = odr.create_searchResponse(
547             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
548     else if (frontend_apdu->which == Z_APDU_presentRequest)
549         f_apdu = odr.create_presentResponse(
550             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
551     else
552         f_apdu = odr.create_close(
553             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
554     frontend_package.response() = f_apdu;
555     return false;
556 }
557
558 void yf::SessionShared::Frontend::override_set(
559     BackendInstancePtr &found_backend,
560     std::string &result_set_id)
561 {
562     BackendClassPtr bc = m_backend_class;
563     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
564     time_t now;
565     time(&now);
566     
567     for (; it != bc->m_backend_list.end(); it++)
568     {
569         if (!(*it)->m_in_use)
570         {
571             BackendSetList::iterator set_it = (*it)->m_sets.begin();
572             for (; set_it != (*it)->m_sets.end(); set_it++)
573             {
574                 if (now >= (*set_it)->m_time_last_use &&
575                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
576                 {
577                     found_backend = *it;
578                     result_set_id = (*set_it)->m_result_set_id;
579                     found_backend->m_sets.erase(set_it);
580                     return;
581                 }
582             }
583         }
584     }
585     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
586     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
587     {
588         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
589         {
590             found_backend = *it;
591             if (bc->m_named_result_sets)
592             {
593                 result_set_id = boost::io::str(
594                     boost::format("%1%") % 
595                     found_backend->m_result_set_sequence);
596                 found_backend->m_result_set_sequence++;
597             }
598             else
599                 result_set_id = "default";
600             return;
601         }
602     }
603 }
604
605 void yf::SessionShared::Frontend::get_set(mp::Package &package,
606                                           const Z_APDU *apdu_req,
607                                           const Databases &databases,
608                                           yazpp_1::Yaz_Z_Query &query,
609                                           BackendInstancePtr &found_backend,
610                                           BackendSetPtr &found_set)
611 {
612     std::string result_set_id;
613     BackendClassPtr bc = m_backend_class;
614     {
615         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
616      
617         // look at each backend and see if we have a similar search
618         BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
619         
620         for (; it != bc->m_backend_list.end(); it++)
621         {
622             if (!(*it)->m_in_use)
623             {
624                 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
625                 for (; set_it != (*it)->m_sets.end(); set_it++)
626                 {
627                     if ((*set_it)->m_databases == databases 
628                         && query.match(&(*set_it)->m_query))
629                     {
630                         found_set = *set_it;
631                         found_backend = *it;
632                         bc->use_backend(found_backend);
633                         found_set->timestamp();
634                         // found matching set. No need to search again
635                         return;
636                     }
637                 }
638             }
639         }
640         override_set(found_backend, result_set_id);
641         if (found_backend)
642             bc->use_backend(found_backend);
643     }
644     if (!found_backend)
645     {
646         // create a new backend set (and new set)
647         found_backend = bc->create_backend(package);
648
649         if (!found_backend)
650         {
651             Z_APDU *f_apdu = 0;
652             mp::odr odr;
653             if (apdu_req->which == Z_APDU_searchRequest)
654             {
655                 f_apdu = odr.create_searchResponse(
656                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
657             }
658             else if (apdu_req->which == Z_APDU_presentRequest)
659             {
660                 f_apdu = odr.create_presentResponse(
661                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
662             }
663             else
664             {
665                 f_apdu = odr.create_close(
666                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
667             }
668             package.response() = f_apdu;
669             return;
670         }
671         if (bc->m_named_result_sets)
672         {
673             result_set_id = boost::io::str(
674                 boost::format("%1%") % found_backend->m_result_set_sequence);
675             found_backend->m_result_set_sequence++;
676         }
677         else
678             result_set_id = "default";
679     }
680     // we must search ...
681     BackendSetPtr new_set(new BackendSet(result_set_id,
682                                          databases, query));
683     if (!new_set->search(package, apdu_req, found_backend))
684     {
685         bc->remove_backend(found_backend);
686         return; // search error 
687     }
688     found_set = new_set;
689     found_set->timestamp();
690     found_backend->m_sets.push_back(found_set);
691 }
692
693 void yf::SessionShared::Frontend::search(mp::Package &package,
694                                          Z_APDU *apdu_req)
695 {
696     Z_SearchRequest *req = apdu_req->u.searchRequest;
697     FrontendSets::iterator fset_it = 
698         m_frontend_sets.find(req->resultSetName);
699     if (fset_it != m_frontend_sets.end())
700     {
701         // result set already exist 
702         // if replace indicator is off: we return diagnostic if
703         // result set already exist.
704         if (*req->replaceIndicator == 0)
705         {
706             mp::odr odr;
707             Z_APDU *apdu = 
708                 odr.create_searchResponse(
709                     apdu_req,
710                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
711                     0);
712             package.response() = apdu;
713             
714             return;
715         }
716         m_frontend_sets.erase(fset_it);
717     }
718     
719     yazpp_1::Yaz_Z_Query query;
720     query.set_Z_Query(req->query);
721     Databases databases;
722     int i;
723     for (i = 0; i<req->num_databaseNames; i++)
724         databases.push_back(req->databaseNames[i]);
725
726     BackendSetPtr found_set; // null
727     BackendInstancePtr found_backend; // null
728
729     get_set(package, apdu_req, databases, query, found_backend, found_set);
730     if (!found_set)
731         return;
732
733     mp::odr odr;
734     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
735     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
736     *f_resp->resultCount = found_set->m_result_set_size;
737     package.response() = f_apdu;
738
739     FrontendSetPtr fset(new FrontendSet(databases, query));
740     m_frontend_sets[req->resultSetName] = fset;
741
742     m_backend_class->release_backend(found_backend);
743 }
744
745 void yf::SessionShared::Frontend::present(mp::Package &package,
746                                           Z_APDU *apdu_req)
747 {
748     mp::odr odr;
749     Z_PresentRequest *req = apdu_req->u.presentRequest;
750
751     FrontendSets::iterator fset_it = 
752         m_frontend_sets.find(req->resultSetId);
753
754     if (fset_it == m_frontend_sets.end())
755     {
756         Z_APDU *apdu = 
757             odr.create_presentResponse(
758                 apdu_req,
759                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
760                 req->resultSetId);
761         package.response() = apdu;
762         return;
763     }
764     FrontendSetPtr fset = fset_it->second;
765
766     Databases databases = fset->get_databases();
767     yazpp_1::Yaz_Z_Query query = fset->get_query();
768
769     BackendClassPtr bc = m_backend_class;
770     BackendSetPtr found_set; // null
771     BackendInstancePtr found_backend;
772
773     get_set(package, apdu_req, databases, query, found_backend, found_set);
774     if (!found_set)
775         return;
776
777     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
778     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
779     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
780     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
781     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
782     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
783     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
784     p_req->recordComposition = req->recordComposition;
785
786     Package present_package(found_backend->m_session, package.origin());
787     present_package.copy_filter(package);
788
789     present_package.request() = p_apdu;
790
791     present_package.move();
792
793     Z_GDU *gdu = present_package.response().get();
794     if (!present_package.session().is_closed()
795         && gdu && gdu->which == Z_GDU_Z3950 
796         && gdu->u.z3950->which == Z_APDU_presentResponse)
797     {
798         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
799         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
800         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
801
802         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
803         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
804         f_resp->presentStatus= b_resp->presentStatus;
805         f_resp->records = b_resp->records;
806         f_resp->otherInfo = b_resp->otherInfo;
807         package.response() = f_apdu_res;
808         bc->release_backend(found_backend);
809     }
810     else
811     {
812         bc->remove_backend(found_backend);
813         Z_APDU *f_apdu_res = 
814             odr.create_presentResponse(
815                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
816         package.response() = f_apdu_res;
817     }
818 }
819
820 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
821                                        Z_APDU *apdu_req)
822 {
823     BackendClassPtr bc = m_backend_class;
824     BackendInstancePtr backend = bc->get_backend(frontend_package);
825     if (!backend)
826     {
827         mp::odr odr;
828         Z_APDU *apdu = odr.create_scanResponse(
829             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
830         frontend_package.response() = apdu;
831     }
832     else
833     {
834         Package scan_package(backend->m_session, frontend_package.origin());
835         scan_package.copy_filter(frontend_package);
836         scan_package.request() = apdu_req;
837         scan_package.move();
838         frontend_package.response() = scan_package.response();
839         if (scan_package.session().is_closed())
840         {
841             frontend_package.session().close();
842             bc->remove_backend(backend);
843         }
844         else
845             bc->release_backend(backend);
846     }
847 }
848
849 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
850 {
851 }
852
853 void yf::SessionShared::Worker::operator() (void)
854 {
855     m_p->expire();
856 }
857
858 void yf::SessionShared::BackendClass::expire()
859 {
860     time_t now;
861     time(&now);
862     boost::mutex::scoped_lock lock(m_mutex_backend_class);
863     BackendInstanceList::iterator bit = m_backend_list.begin();
864     while (bit != m_backend_list.end())
865     {
866         time_t last_use = (*bit)->m_time_last_use;
867         
868         if ((*bit)->m_in_use)
869         {
870             bit++;
871         }
872         else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
873             || (now < last_use))
874         {
875             mp::odr odr;
876             (*bit)->m_close_package->response() = odr.create_close(
877                 0, Z_Close_lackOfActivity, 0);
878             (*bit)->m_close_package->session().close();
879             (*bit)->m_close_package->move();
880
881             bit = m_backend_list.erase(bit);
882         }
883         else
884         {
885             bit++;
886         }
887     }
888 }
889
890 void yf::SessionShared::Rep::expire()
891 {
892     while (true)
893     {
894         boost::xtime xt;
895         boost::xtime_get(&xt, boost::TIME_UTC);
896         xt.sec += 30;
897         boost::thread::sleep(xt);
898         
899         BackendClassMap::const_iterator b_it = m_backend_map.begin();
900         for (; b_it != m_backend_map.end(); b_it++)
901             b_it->second->expire();
902     }
903 }
904
905 yf::SessionShared::Rep::Rep()
906 {
907     m_resultset_ttl = 30;
908     m_resultset_max = 10;
909     m_session_ttl = 90;
910     yf::SessionShared::Worker w(this);
911     m_thrds.add_thread(new boost::thread(w));
912 }
913
914 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
915 {
916 }
917
918 yf::SessionShared::~SessionShared() {
919 }
920
921
922 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
923 {
924 }
925
926 yf::SessionShared::Frontend::~Frontend()
927 {
928 }
929
930 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
931 {
932     boost::mutex::scoped_lock lock(m_mutex);
933
934     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
935     
936     while(true)
937     {
938         it = m_clients.find(package.session());
939         if (it == m_clients.end())
940             break;
941         
942         if (!it->second->m_in_use)
943         {
944             it->second->m_in_use = true;
945             return it->second;
946         }
947         m_cond_session_ready.wait(lock);
948     }
949     FrontendPtr f(new Frontend(this));
950     m_clients[package.session()] = f;
951     f->m_in_use = true;
952     return f;
953 }
954
955 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
956 {
957     boost::mutex::scoped_lock lock(m_mutex);
958     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
959     
960     it = m_clients.find(package.session());
961     if (it != m_clients.end())
962     {
963         if (package.session().is_closed())
964         {
965             m_clients.erase(it);
966         }
967         else
968         {
969             it->second->m_in_use = false;
970         }
971         m_cond_session_ready.notify_all();
972     }
973 }
974
975
976 void yf::SessionShared::process(mp::Package &package) const
977 {
978     FrontendPtr f = m_p->get_frontend(package);
979
980     Z_GDU *gdu = package.request().get();
981     
982     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
983         Z_APDU_initRequest && !f->m_is_virtual)
984     {
985         m_p->init(package, gdu, f);
986     }
987     else if (!f->m_is_virtual)
988         package.move();
989     else if (gdu && gdu->which == Z_GDU_Z3950)
990     {
991         Z_APDU *apdu = gdu->u.z3950;
992         if (apdu->which == Z_APDU_initRequest)
993         {
994             mp::odr odr;
995             
996             package.response() = odr.create_close(
997                 apdu,
998                 Z_Close_protocolError,
999                 "double init");
1000             
1001             package.session().close();
1002         }
1003         else if (apdu->which == Z_APDU_close)
1004         {
1005             mp::odr odr;
1006             
1007             package.response() = odr.create_close(
1008                 apdu,
1009                 Z_Close_peerAbort, "received close from client");
1010             package.session().close();
1011         }
1012         else if (apdu->which == Z_APDU_searchRequest)
1013         {
1014             f->search(package, apdu);
1015         }
1016         else if (apdu->which == Z_APDU_presentRequest)
1017         {
1018             f->present(package, apdu);
1019         }
1020         else if (apdu->which == Z_APDU_scanRequest)
1021         {
1022             f->scan(package, apdu);
1023         }
1024         else
1025         {
1026             mp::odr odr;
1027             
1028             package.response() = odr.create_close(
1029                 apdu, Z_Close_protocolError,
1030                 "unsupported APDU in filter_session_shared");
1031             
1032             package.session().close();
1033         }
1034     }
1035     m_p->release_frontend(package);
1036 }
1037
1038 void yf::SessionShared::configure(const xmlNode *ptr)
1039 {
1040     for (ptr = ptr->children; ptr; ptr = ptr->next)
1041     {
1042         if (ptr->type != XML_ELEMENT_NODE)
1043             continue;
1044         if (!strcmp((const char *) ptr->name, "resultset"))
1045         {
1046             const struct _xmlAttr *attr;
1047             for (attr = ptr->properties; attr; attr = attr->next)
1048             {
1049                 if (!strcmp((const char *) attr->name, "ttl"))
1050                     m_p->m_resultset_ttl = 
1051                         mp::xml::get_int(attr->children, 30);
1052                 else if (!strcmp((const char *) attr->name, "max"))
1053                 {
1054                     m_p->m_resultset_max = 
1055                         mp::xml::get_int(attr->children, 10);
1056                 }
1057                 else
1058                     throw mp::filter::FilterException(
1059                         "Bad attribute " + std::string((const char *)
1060                                                        attr->name));
1061             }
1062         }
1063         else if (!strcmp((const char *) ptr->name, "session"))
1064         {
1065             const struct _xmlAttr *attr;
1066             for (attr = ptr->properties; attr; attr = attr->next)
1067             {
1068                 if (!strcmp((const char *) attr->name, "ttl"))
1069                     m_p->m_session_ttl = 
1070                         mp::xml::get_int(attr->children, 120);
1071                 else
1072                     throw mp::filter::FilterException(
1073                         "Bad attribute " + std::string((const char *)
1074                                                        attr->name));
1075             }
1076         }
1077         else
1078         {
1079             throw mp::filter::FilterException("Bad element " 
1080                                                + std::string((const char *)
1081                                                              ptr->name));
1082         }
1083     }
1084 }
1085
1086 static mp::filter::Base* filter_creator()
1087 {
1088     return new mp::filter::SessionShared;
1089 }
1090
1091 extern "C" {
1092     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1093         0,
1094         "session_shared",
1095         filter_creator
1096     };
1097 }
1098
1099 /*
1100  * Local variables:
1101  * c-basic-offset: 4
1102  * indent-tabs-mode: nil
1103  * c-file-style: "stroustrup"
1104  * End:
1105  * vim: shiftwidth=4 tabstop=8 expandtab
1106  */