Comments. Removed unused definition.
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2008 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include "filter.hpp"
22 #include "package.hpp"
23
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30
31 #include "util.hpp"
32 #include "filter_session_shared.hpp"
33
34 #include <yaz/log.h>
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yazpp/z-query.h>
39 #include <map>
40 #include <iostream>
41 #include <time.h>
42
43 namespace mp = metaproxy_1;
44 namespace yf = metaproxy_1::filter;
45
46 namespace metaproxy_1 {
47
48     namespace filter {
49         // key for session.. We'll only share sessions with same InitKey
50         class SessionShared::InitKey {
51         public:
52             bool operator < (const SessionShared::InitKey &k) const;
53             InitKey(Z_InitRequest *req);
54             InitKey(const InitKey &);
55             ~InitKey();
56         private:
57             char *m_idAuthentication_buf;
58             int m_idAuthentication_size;
59             char *m_otherInfo_buf;
60             int m_otherInfo_size;
61             ODR m_odr;
62         };
63         // worker thread .. for expiry of sessions
64         class SessionShared::Worker {
65         public:
66             Worker(SessionShared::Rep *rep);
67             void operator() (void);
68         private:
69             SessionShared::Rep *m_p;
70         };
71         // backend result set
72         class SessionShared::BackendSet {
73         public:
74             std::string m_result_set_id;
75             Databases m_databases;
76             int m_result_set_size;
77             yazpp_1::Yaz_Z_Query m_query;
78             time_t m_time_last_use;
79             void timestamp();
80             BackendSet(
81                 const std::string &result_set_id,
82                 const Databases &databases,
83                 const yazpp_1::Yaz_Z_Query &query);
84             bool search(
85                 Package &frontend_package,
86                 const Z_APDU *apdu_req,
87                 const BackendInstancePtr bp);
88         };
89         // backend connection instance
90         class SessionShared::BackendInstance {
91             friend class Rep;
92             friend class BackendClass;
93             friend class BackendSet;
94         public:
95             mp::Session m_session;
96             BackendSetList m_sets;
97             bool m_in_use;
98             int m_sequence_this;
99             int m_result_set_sequence;
100             time_t m_time_last_use;
101             mp::Package * m_close_package;
102             ~BackendInstance();
103         };
104         // backends of some class (all with same InitKey)
105         class SessionShared::BackendClass : boost::noncopyable {
106             friend class Rep;
107             friend struct Frontend;
108             bool m_named_result_sets;
109             BackendInstanceList m_backend_list;
110             BackendInstancePtr create_backend(const Package &package);
111             void remove_backend(BackendInstancePtr b);
112             BackendInstancePtr get_backend(const Package &package);
113             void use_backend(BackendInstancePtr b);
114             void release_backend(BackendInstancePtr b);
115             void expire_class();
116             yazpp_1::GDU m_init_request;
117             yazpp_1::GDU m_init_response;
118             boost::mutex m_mutex_backend_class;
119             int m_sequence_top;
120             time_t m_backend_set_ttl;
121             time_t m_backend_expiry_ttl;
122             size_t m_backend_set_max;
123         public:
124             BackendClass(const yazpp_1::GDU &init_request,
125                          int resultset_ttl,
126                          int resultset_max,
127                          int session_ttl);
128             ~BackendClass();
129         };
130         // frontend result set
131         class SessionShared::FrontendSet {
132             Databases m_databases;
133             yazpp_1::Yaz_Z_Query m_query;
134         public:
135             const Databases &get_databases();
136             const yazpp_1::Yaz_Z_Query &get_query();
137             FrontendSet(
138                 const Databases &databases,
139                 const yazpp_1::Yaz_Z_Query &query);
140             FrontendSet();
141         };
142         // frontend session
143         struct SessionShared::Frontend {
144             Frontend(Rep *rep);
145             ~Frontend();
146             bool m_is_virtual;
147             bool m_in_use;
148             Z_Options m_init_options;
149             void search(Package &package, Z_APDU *apdu);
150             void present(Package &package, Z_APDU *apdu);
151             void scan(Package &package, Z_APDU *apdu);
152
153             void get_set(mp::Package &package,
154                          const Z_APDU *apdu_req,
155                          const Databases &databases,
156                          yazpp_1::Yaz_Z_Query &query,
157                          BackendInstancePtr &found_backend,
158                          BackendSetPtr &found_set);
159             void override_set(BackendInstancePtr &found_backend,
160                               std::string &result_set_id);
161
162             Rep *m_p;
163             BackendClassPtr m_backend_class;
164             FrontendSets m_frontend_sets;
165         };            
166         // representation
167         class SessionShared::Rep {
168             friend class SessionShared;
169             friend struct Frontend;
170             
171             FrontendPtr get_frontend(Package &package);
172             void release_frontend(Package &package);
173             Rep();
174         public:
175             void expire();
176         private:
177             void init(Package &package, const Z_GDU *gdu,
178                       FrontendPtr frontend);
179             boost::mutex m_mutex;
180             boost::condition m_cond_session_ready;
181             std::map<mp::Session, FrontendPtr> m_clients;
182
183             BackendClassMap m_backend_map;
184             boost::mutex m_mutex_backend_map;
185             boost::thread_group m_thrds;
186             int m_resultset_ttl;
187             int m_resultset_max;
188             int m_session_ttl;
189         };
190     }
191 }
192
193 yf::SessionShared::FrontendSet::FrontendSet(
194     const Databases &databases,
195     const yazpp_1::Yaz_Z_Query &query)
196     : m_databases(databases), m_query(query)
197 {
198 }
199
200 const yf::SessionShared::Databases & 
201 yf::SessionShared::FrontendSet::get_databases()
202 {
203     return m_databases;
204 }
205
206 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
207 {
208     return m_query;
209 }
210
211 yf::SessionShared::InitKey::InitKey(const InitKey &k)
212 {
213     m_odr = odr_createmem(ODR_ENCODE);
214     
215     m_idAuthentication_size =  k.m_idAuthentication_size;
216     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
217     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
218            m_idAuthentication_size);
219
220     m_otherInfo_size =  k.m_otherInfo_size;
221     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
222     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
223            m_otherInfo_size);
224 }
225
226 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
227 {
228     m_odr = odr_createmem(ODR_ENCODE);
229
230     Z_IdAuthentication *t = req->idAuthentication;
231     z_IdAuthentication(m_odr, &t, 1, 0);
232     m_idAuthentication_buf =
233         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
234
235     Z_OtherInformation *o = req->otherInfo;
236     z_OtherInformation(m_odr, &o, 1, 0);
237     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
238 }
239
240 yf::SessionShared::InitKey::~InitKey()
241 {
242     odr_destroy(m_odr);
243 }
244
245 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
246     const 
247 {
248     int c;
249     c = mp::util::memcmp2(
250         (void*) m_idAuthentication_buf, m_idAuthentication_size,
251         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
252     if (c < 0)
253         return true;
254     else if (c > 0)
255         return false;
256
257     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
258                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
259     if (c < 0)
260         return true;
261     else if (c > 0)
262         return false;
263     return false;
264 }
265
266 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
267 {
268     boost::mutex::scoped_lock lock(m_mutex_backend_class);
269     b->m_in_use = false;
270 }
271
272
273 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
274 {
275     BackendInstanceList::iterator it = m_backend_list.begin();
276     
277     while (it != m_backend_list.end())
278     {
279         if (*it == b)
280             it = m_backend_list.erase(it);
281         else
282             it++;
283     }
284 }
285
286
287
288 yf::SessionShared::BackendInstancePtr 
289 yf::SessionShared::BackendClass::get_backend(
290     const mp::Package &frontend_package)
291 {
292     {
293         boost::mutex::scoped_lock lock(m_mutex_backend_class);
294         
295         BackendInstanceList::const_iterator it = m_backend_list.begin();
296         
297         BackendInstancePtr backend1; // null
298         
299         for (; it != m_backend_list.end(); it++)
300         {
301             if (!(*it)->m_in_use)
302             {
303                 if (!backend1 
304                     || (*it)->m_sequence_this < backend1->m_sequence_this)
305                     backend1 = *it;
306             }
307         }
308         if (backend1)
309         {
310             use_backend(backend1);
311             return backend1;
312         }
313     }
314     return create_backend(frontend_package);
315 }
316
317 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
318 {
319     backend->m_in_use = true;
320     time(&backend->m_time_last_use);
321     backend->m_sequence_this = m_sequence_top++;
322 }
323
324 yf::SessionShared::BackendInstance::~BackendInstance()
325 {
326     delete m_close_package;
327 }
328
329 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
330     const mp::Package &frontend_package)
331 {
332     BackendInstancePtr bp(new BackendInstance);
333     BackendInstancePtr null;
334
335     bp->m_close_package =
336         new mp::Package(bp->m_session, frontend_package.origin());
337     bp->m_close_package->copy_filter(frontend_package);
338
339     Package init_package(bp->m_session, frontend_package.origin());
340
341     init_package.copy_filter(frontend_package);
342
343     yazpp_1::GDU actual_init_request = m_init_request;
344     Z_GDU *init_pdu = actual_init_request.get();
345
346     assert(init_pdu->which == Z_GDU_Z3950);
347     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
348
349     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
350     ODR_MASK_ZERO(req->options);
351
352     ODR_MASK_SET(req->options, Z_Options_search);
353     ODR_MASK_SET(req->options, Z_Options_present);
354     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
355     ODR_MASK_SET(req->options, Z_Options_scan);
356
357     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
358     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
359     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
360
361     init_package.request() = init_pdu;
362
363     init_package.move();
364
365     boost::mutex::scoped_lock lock(m_mutex_backend_class);
366
367     m_named_result_sets = false;
368     Z_GDU *gdu = init_package.response().get();
369     if (!init_package.session().is_closed()
370         && gdu && gdu->which == Z_GDU_Z3950 
371         && gdu->u.z3950->which == Z_APDU_initResponse)
372     {
373         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
374         if (!*res->result)
375             return null;
376         m_init_response = gdu->u.z3950;
377         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
378         {
379             m_named_result_sets = true;
380         }
381     }
382     else
383     {
384         // did not receive an init response or closed
385         return null;
386     }
387     bp->m_in_use = true;
388     time(&bp->m_time_last_use);
389     bp->m_sequence_this = 0;
390     bp->m_result_set_sequence = 0;
391     m_backend_list.push_back(bp);
392
393     return bp;
394 }
395
396
397 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
398                                               int resultset_ttl,
399                                               int resultset_max,
400                                               int session_ttl)
401     : m_named_result_sets(false), m_init_request(init_request),
402       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
403       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
404 {}
405
406 yf::SessionShared::BackendClass::~BackendClass()
407 {}
408
409 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
410                                   FrontendPtr frontend)
411 {
412     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
413
414     frontend->m_is_virtual = true;
415     frontend->m_init_options = *req->options;
416     InitKey k(req);
417     {
418         boost::mutex::scoped_lock lock(m_mutex_backend_map);
419         BackendClassMap::const_iterator it;
420         it = m_backend_map.find(k);
421         if (it == m_backend_map.end())
422         {
423             BackendClassPtr b(new BackendClass(gdu->u.z3950,
424                                                m_resultset_ttl,
425                                                m_resultset_max,
426                                                m_session_ttl));
427             m_backend_map[k] = b;
428             frontend->m_backend_class = b;
429         }
430         else
431         {
432             frontend->m_backend_class = it->second;            
433         }
434     }
435     BackendClassPtr bc = frontend->m_backend_class;
436     BackendInstancePtr backend = bc->get_backend(package);
437     
438     mp::odr odr;
439     if (!backend)
440     {
441         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
442         *apdu->u.initResponse->result = 0;
443         package.response() = apdu;
444         package.session().close();
445     }
446     else
447     {
448         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
449         yazpp_1::GDU init_response = bc->m_init_response;
450         Z_GDU *response_gdu = init_response.get();
451         mp::util::transfer_referenceId(odr, gdu->u.z3950,
452                                        response_gdu->u.z3950);
453
454         Z_Options *server_options =
455             response_gdu->u.z3950->u.initResponse->options;
456         Z_Options *client_options = &frontend->m_init_options;
457
458         int i;
459         for (i = 0; i<30; i++)
460             if (!ODR_MASK_GET(client_options, i))
461                 ODR_MASK_CLEAR(server_options, i);
462         package.response() = init_response;
463     }
464     if (backend)
465         bc->release_backend(backend);
466 }
467
468 void yf::SessionShared::BackendSet::timestamp()
469 {
470     time(&m_time_last_use);
471 }
472
473 yf::SessionShared::BackendSet::BackendSet(
474     const std::string &result_set_id,
475     const Databases &databases,
476     const yazpp_1::Yaz_Z_Query &query) :
477     m_result_set_id(result_set_id),
478     m_databases(databases), m_result_set_size(0), m_query(query) 
479 {
480     timestamp();
481 }
482
483 bool yf::SessionShared::BackendSet::search(
484     mp::Package &frontend_package,
485     const Z_APDU *frontend_apdu,
486     const BackendInstancePtr bp)
487 {
488     Package search_package(bp->m_session, frontend_package.origin());
489
490     search_package.copy_filter(frontend_package);
491
492     mp::odr odr;
493     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
494     Z_SearchRequest *req = apdu_req->u.searchRequest;
495
496     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
497     req->query = m_query.get_Z_Query();
498
499     req->num_databaseNames = m_databases.size();
500     req->databaseNames = (char**) 
501         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
502     Databases::const_iterator it = m_databases.begin();
503     size_t i = 0;
504     for (; it != m_databases.end(); it++)
505         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
506
507     search_package.request() = apdu_req;
508
509     search_package.move();
510     
511     Z_Records *z_records_diag = 0;
512     Z_GDU *gdu = search_package.response().get();
513     if (!search_package.session().is_closed()
514         && gdu && gdu->which == Z_GDU_Z3950 
515         && gdu->u.z3950->which == Z_APDU_searchResponse)
516     {
517         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
518         if (b_resp->records)
519         {
520             if (b_resp->records->which == Z_Records_NSD 
521                 || b_resp->records->which == Z_Records_multipleNSD)
522                 z_records_diag = b_resp->records;
523         }
524         if (z_records_diag)
525         {
526             if (frontend_apdu->which == Z_APDU_searchRequest)
527             {
528                 Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu, 
529                                                            0, 0);
530                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
531                 f_resp->records = z_records_diag;
532                 frontend_package.response() = f_apdu;
533                 return false;
534             }
535             if (frontend_apdu->which == Z_APDU_presentRequest)
536             {
537                 Z_APDU *f_apdu = odr.create_presentResponse(frontend_apdu, 
538                                                             0, 0);
539                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
540                 f_resp->records = z_records_diag;
541                 frontend_package.response() = f_apdu;
542                 return false;
543             }
544         }
545         m_result_set_size = *b_resp->resultCount;
546         return true;
547     }
548     Z_APDU *f_apdu = 0;
549     if (frontend_apdu->which == Z_APDU_searchRequest)
550         f_apdu = odr.create_searchResponse(
551             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
552     else if (frontend_apdu->which == Z_APDU_presentRequest)
553         f_apdu = odr.create_presentResponse(
554             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
555     else
556         f_apdu = odr.create_close(
557             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
558     frontend_package.response() = f_apdu;
559     return false;
560 }
561
562 void yf::SessionShared::Frontend::override_set(
563     BackendInstancePtr &found_backend,
564     std::string &result_set_id)
565 {
566     BackendClassPtr bc = m_backend_class;
567     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
568     time_t now;
569     time(&now);
570     
571     for (; it != bc->m_backend_list.end(); it++)
572     {
573         if (!(*it)->m_in_use)
574         {
575             BackendSetList::iterator set_it = (*it)->m_sets.begin();
576             for (; set_it != (*it)->m_sets.end(); set_it++)
577             {
578                 if (now >= (*set_it)->m_time_last_use &&
579                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
580                 {
581                     found_backend = *it;
582                     result_set_id = (*set_it)->m_result_set_id;
583                     found_backend->m_sets.erase(set_it);
584                     return;
585                 }
586             }
587         }
588     }
589     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
590     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
591     {
592         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
593         {
594             found_backend = *it;
595             if (bc->m_named_result_sets)
596             {
597                 result_set_id = boost::io::str(
598                     boost::format("%1%") % 
599                     found_backend->m_result_set_sequence);
600                 found_backend->m_result_set_sequence++;
601             }
602             else
603                 result_set_id = "default";
604             return;
605         }
606     }
607 }
608
609 void yf::SessionShared::Frontend::get_set(mp::Package &package,
610                                           const Z_APDU *apdu_req,
611                                           const Databases &databases,
612                                           yazpp_1::Yaz_Z_Query &query,
613                                           BackendInstancePtr &found_backend,
614                                           BackendSetPtr &found_set)
615 {
616     std::string result_set_id;
617     BackendClassPtr bc = m_backend_class;
618     {
619         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
620      
621         // look at each backend and see if we have a similar search
622         BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
623         
624         for (; it != bc->m_backend_list.end(); it++)
625         {
626             if (!(*it)->m_in_use)
627             {
628                 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
629                 for (; set_it != (*it)->m_sets.end(); set_it++)
630                 {
631                     if ((*set_it)->m_databases == databases 
632                         && query.match(&(*set_it)->m_query))
633                     {
634                         found_set = *set_it;
635                         found_backend = *it;
636                         bc->use_backend(found_backend);
637                         found_set->timestamp();
638                         // found matching set. No need to search again
639                         return;
640                     }
641                 }
642             }
643         }
644         override_set(found_backend, result_set_id);
645         if (found_backend)
646             bc->use_backend(found_backend);
647     }
648     if (!found_backend)
649     {
650         // create a new backend set (and new set)
651         found_backend = bc->create_backend(package);
652
653         if (!found_backend)
654         {
655             Z_APDU *f_apdu = 0;
656             mp::odr odr;
657             if (apdu_req->which == Z_APDU_searchRequest)
658             {
659                 f_apdu = odr.create_searchResponse(
660                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
661             }
662             else if (apdu_req->which == Z_APDU_presentRequest)
663             {
664                 f_apdu = odr.create_presentResponse(
665                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
666             }
667             else
668             {
669                 f_apdu = odr.create_close(
670                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
671             }
672             package.response() = f_apdu;
673             return;
674         }
675         if (bc->m_named_result_sets)
676         {
677             result_set_id = boost::io::str(
678                 boost::format("%1%") % found_backend->m_result_set_sequence);
679             found_backend->m_result_set_sequence++;
680         }
681         else
682             result_set_id = "default";
683     }
684     // we must search ...
685     BackendSetPtr new_set(new BackendSet(result_set_id,
686                                          databases, query));
687     if (!new_set->search(package, apdu_req, found_backend))
688     {
689         bc->remove_backend(found_backend);
690         return; // search error 
691     }
692     found_set = new_set;
693     found_set->timestamp();
694     found_backend->m_sets.push_back(found_set);
695 }
696
697 void yf::SessionShared::Frontend::search(mp::Package &package,
698                                          Z_APDU *apdu_req)
699 {
700     Z_SearchRequest *req = apdu_req->u.searchRequest;
701     FrontendSets::iterator fset_it = 
702         m_frontend_sets.find(req->resultSetName);
703     if (fset_it != m_frontend_sets.end())
704     {
705         // result set already exist 
706         // if replace indicator is off: we return diagnostic if
707         // result set already exist.
708         if (*req->replaceIndicator == 0)
709         {
710             mp::odr odr;
711             Z_APDU *apdu = 
712                 odr.create_searchResponse(
713                     apdu_req,
714                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
715                     0);
716             package.response() = apdu;
717             
718             return;
719         }
720         m_frontend_sets.erase(fset_it);
721     }
722     
723     yazpp_1::Yaz_Z_Query query;
724     query.set_Z_Query(req->query);
725     Databases databases;
726     int i;
727     for (i = 0; i<req->num_databaseNames; i++)
728         databases.push_back(req->databaseNames[i]);
729
730     BackendSetPtr found_set; // null
731     BackendInstancePtr found_backend; // null
732
733     get_set(package, apdu_req, databases, query, found_backend, found_set);
734     if (!found_set)
735         return;
736
737     mp::odr odr;
738     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
739     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
740     *f_resp->resultCount = found_set->m_result_set_size;
741     package.response() = f_apdu;
742
743     FrontendSetPtr fset(new FrontendSet(databases, query));
744     m_frontend_sets[req->resultSetName] = fset;
745
746     m_backend_class->release_backend(found_backend);
747 }
748
749 void yf::SessionShared::Frontend::present(mp::Package &package,
750                                           Z_APDU *apdu_req)
751 {
752     mp::odr odr;
753     Z_PresentRequest *req = apdu_req->u.presentRequest;
754
755     FrontendSets::iterator fset_it = 
756         m_frontend_sets.find(req->resultSetId);
757
758     if (fset_it == m_frontend_sets.end())
759     {
760         Z_APDU *apdu = 
761             odr.create_presentResponse(
762                 apdu_req,
763                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
764                 req->resultSetId);
765         package.response() = apdu;
766         return;
767     }
768     FrontendSetPtr fset = fset_it->second;
769
770     Databases databases = fset->get_databases();
771     yazpp_1::Yaz_Z_Query query = fset->get_query();
772
773     BackendClassPtr bc = m_backend_class;
774     BackendSetPtr found_set; // null
775     BackendInstancePtr found_backend;
776
777     get_set(package, apdu_req, databases, query, found_backend, found_set);
778     if (!found_set)
779         return;
780
781     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
782     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
783     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
784     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
785     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
786     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
787     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
788     p_req->recordComposition = req->recordComposition;
789
790     Package present_package(found_backend->m_session, package.origin());
791     present_package.copy_filter(package);
792
793     present_package.request() = p_apdu;
794
795     present_package.move();
796
797     Z_GDU *gdu = present_package.response().get();
798     if (!present_package.session().is_closed()
799         && gdu && gdu->which == Z_GDU_Z3950 
800         && gdu->u.z3950->which == Z_APDU_presentResponse)
801     {
802         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
803         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
804         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
805
806         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
807         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
808         f_resp->presentStatus= b_resp->presentStatus;
809         f_resp->records = b_resp->records;
810         f_resp->otherInfo = b_resp->otherInfo;
811         package.response() = f_apdu_res;
812         bc->release_backend(found_backend);
813     }
814     else
815     {
816         bc->remove_backend(found_backend);
817         Z_APDU *f_apdu_res = 
818             odr.create_presentResponse(
819                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
820         package.response() = f_apdu_res;
821     }
822 }
823
824 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
825                                        Z_APDU *apdu_req)
826 {
827     BackendClassPtr bc = m_backend_class;
828     BackendInstancePtr backend = bc->get_backend(frontend_package);
829     if (!backend)
830     {
831         mp::odr odr;
832         Z_APDU *apdu = odr.create_scanResponse(
833             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
834         frontend_package.response() = apdu;
835     }
836     else
837     {
838         Package scan_package(backend->m_session, frontend_package.origin());
839         scan_package.copy_filter(frontend_package);
840         scan_package.request() = apdu_req;
841         scan_package.move();
842         frontend_package.response() = scan_package.response();
843         if (scan_package.session().is_closed())
844         {
845             frontend_package.session().close();
846             bc->remove_backend(backend);
847         }
848         else
849             bc->release_backend(backend);
850     }
851 }
852
853 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
854 {
855 }
856
857 void yf::SessionShared::Worker::operator() (void)
858 {
859     m_p->expire();
860 }
861
862 void yf::SessionShared::BackendClass::expire_class()
863 {
864     time_t now;
865     time(&now);
866     boost::mutex::scoped_lock lock(m_mutex_backend_class);
867     BackendInstanceList::iterator bit = m_backend_list.begin();
868     while (bit != m_backend_list.end())
869     {
870         time_t last_use = (*bit)->m_time_last_use;
871         
872         if ((*bit)->m_in_use)
873         {
874             bit++;
875         }
876         else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
877             || (now < last_use))
878         {
879             mp::odr odr;
880             (*bit)->m_close_package->response() = odr.create_close(
881                 0, Z_Close_lackOfActivity, 0);
882             (*bit)->m_close_package->session().close();
883             (*bit)->m_close_package->move();
884
885             bit = m_backend_list.erase(bit);
886         }
887         else
888         {
889             bit++;
890         }
891     }
892 }
893
894 void yf::SessionShared::Rep::expire()
895 {
896     while (true)
897     {
898         boost::xtime xt;
899         boost::xtime_get(&xt, boost::TIME_UTC);
900         xt.sec += 30;
901         boost::thread::sleep(xt);
902         
903         BackendClassMap::const_iterator b_it = m_backend_map.begin();
904         for (; b_it != m_backend_map.end(); b_it++)
905             b_it->second->expire_class();
906     }
907 }
908
909 yf::SessionShared::Rep::Rep()
910 {
911     m_resultset_ttl = 30;
912     m_resultset_max = 10;
913     m_session_ttl = 90;
914     yf::SessionShared::Worker w(this);
915     m_thrds.add_thread(new boost::thread(w));
916 }
917
918 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
919 {
920 }
921
922 yf::SessionShared::~SessionShared() {
923 }
924
925
926 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
927 {
928 }
929
930 yf::SessionShared::Frontend::~Frontend()
931 {
932 }
933
934 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
935 {
936     boost::mutex::scoped_lock lock(m_mutex);
937
938     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
939     
940     while(true)
941     {
942         it = m_clients.find(package.session());
943         if (it == m_clients.end())
944             break;
945         
946         if (!it->second->m_in_use)
947         {
948             it->second->m_in_use = true;
949             return it->second;
950         }
951         m_cond_session_ready.wait(lock);
952     }
953     FrontendPtr f(new Frontend(this));
954     m_clients[package.session()] = f;
955     f->m_in_use = true;
956     return f;
957 }
958
959 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
960 {
961     boost::mutex::scoped_lock lock(m_mutex);
962     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
963     
964     it = m_clients.find(package.session());
965     if (it != m_clients.end())
966     {
967         if (package.session().is_closed())
968         {
969             m_clients.erase(it);
970         }
971         else
972         {
973             it->second->m_in_use = false;
974         }
975         m_cond_session_ready.notify_all();
976     }
977 }
978
979
980 void yf::SessionShared::process(mp::Package &package) const
981 {
982     FrontendPtr f = m_p->get_frontend(package);
983
984     Z_GDU *gdu = package.request().get();
985     
986     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
987         Z_APDU_initRequest && !f->m_is_virtual)
988     {
989         m_p->init(package, gdu, f);
990     }
991     else if (!f->m_is_virtual)
992         package.move();
993     else if (gdu && gdu->which == Z_GDU_Z3950)
994     {
995         Z_APDU *apdu = gdu->u.z3950;
996         if (apdu->which == Z_APDU_initRequest)
997         {
998             mp::odr odr;
999             
1000             package.response() = odr.create_close(
1001                 apdu,
1002                 Z_Close_protocolError,
1003                 "double init");
1004             
1005             package.session().close();
1006         }
1007         else if (apdu->which == Z_APDU_close)
1008         {
1009             mp::odr odr;
1010             
1011             package.response() = odr.create_close(
1012                 apdu,
1013                 Z_Close_peerAbort, "received close from client");
1014             package.session().close();
1015         }
1016         else if (apdu->which == Z_APDU_searchRequest)
1017         {
1018             f->search(package, apdu);
1019         }
1020         else if (apdu->which == Z_APDU_presentRequest)
1021         {
1022             f->present(package, apdu);
1023         }
1024         else if (apdu->which == Z_APDU_scanRequest)
1025         {
1026             f->scan(package, apdu);
1027         }
1028         else
1029         {
1030             mp::odr odr;
1031             
1032             package.response() = odr.create_close(
1033                 apdu, Z_Close_protocolError,
1034                 "unsupported APDU in filter_session_shared");
1035             
1036             package.session().close();
1037         }
1038     }
1039     m_p->release_frontend(package);
1040 }
1041
1042 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only)
1043 {
1044     for (ptr = ptr->children; ptr; ptr = ptr->next)
1045     {
1046         if (ptr->type != XML_ELEMENT_NODE)
1047             continue;
1048         if (!strcmp((const char *) ptr->name, "resultset"))
1049         {
1050             const struct _xmlAttr *attr;
1051             for (attr = ptr->properties; attr; attr = attr->next)
1052             {
1053                 if (!strcmp((const char *) attr->name, "ttl"))
1054                     m_p->m_resultset_ttl = 
1055                         mp::xml::get_int(attr->children, 30);
1056                 else if (!strcmp((const char *) attr->name, "max"))
1057                 {
1058                     m_p->m_resultset_max = 
1059                         mp::xml::get_int(attr->children, 10);
1060                 }
1061                 else
1062                     throw mp::filter::FilterException(
1063                         "Bad attribute " + std::string((const char *)
1064                                                        attr->name));
1065             }
1066         }
1067         else if (!strcmp((const char *) ptr->name, "session"))
1068         {
1069             const struct _xmlAttr *attr;
1070             for (attr = ptr->properties; attr; attr = attr->next)
1071             {
1072                 if (!strcmp((const char *) attr->name, "ttl"))
1073                     m_p->m_session_ttl = 
1074                         mp::xml::get_int(attr->children, 120);
1075                 else
1076                     throw mp::filter::FilterException(
1077                         "Bad attribute " + std::string((const char *)
1078                                                        attr->name));
1079             }
1080         }
1081         else
1082         {
1083             throw mp::filter::FilterException("Bad element " 
1084                                                + std::string((const char *)
1085                                                              ptr->name));
1086         }
1087     }
1088 }
1089
1090 static mp::filter::Base* filter_creator()
1091 {
1092     return new mp::filter::SessionShared;
1093 }
1094
1095 extern "C" {
1096     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1097         0,
1098         "session_shared",
1099         filter_creator
1100     };
1101 }
1102
1103 /*
1104  * Local variables:
1105  * c-basic-offset: 4
1106  * indent-tabs-mode: nil
1107  * c-file-style: "stroustrup"
1108  * End:
1109  * vim: shiftwidth=4 tabstop=8 expandtab
1110  */