GPL v2.
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* $Id: filter_session_shared.cpp,v 1.18 2007-05-09 21:23:09 adam Exp $
2    Copyright (c) 2005-2007, Index Data.
3
4 This file is part of Metaproxy.
5
6 Metaproxy is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Metaproxy; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #include "config.hpp"
23
24 #include "filter.hpp"
25 #include "package.hpp"
26
27 #include <boost/thread/mutex.hpp>
28 #include <boost/thread/condition.hpp>
29 #include <boost/thread/thread.hpp>
30 #include <boost/thread/xtime.hpp>
31 #include <boost/shared_ptr.hpp>
32 #include <boost/format.hpp>
33
34 #include "util.hpp"
35 #include "filter_session_shared.hpp"
36
37 #include <yaz/log.h>
38 #include <yaz/zgdu.h>
39 #include <yaz/otherinfo.h>
40 #include <yaz/diagbib1.h>
41 #include <yazpp/z-query.h>
42 #include <map>
43 #include <iostream>
44 #include <time.h>
45
46 namespace mp = metaproxy_1;
47 namespace yf = metaproxy_1::filter;
48
49 namespace metaproxy_1 {
50
51     namespace filter {
52         class SessionShared::InitKey {
53         public:
54             bool operator < (const SessionShared::InitKey &k) const;
55             InitKey(Z_InitRequest *req);
56             InitKey(const InitKey &);
57             ~InitKey();
58         private:
59             InitKey &operator = (const InitKey &k);
60             char *m_idAuthentication_buf;
61             int m_idAuthentication_size;
62             char *m_otherInfo_buf;
63             int m_otherInfo_size;
64             ODR m_odr;
65         };
66         class SessionShared::Worker {
67         public:
68             Worker(SessionShared::Rep *rep);
69             void operator() (void);
70         private:
71             SessionShared::Rep *m_p;
72         };
73         class SessionShared::BackendSet {
74         public:
75             std::string m_result_set_id;
76             Databases m_databases;
77             int m_result_set_size;
78             yazpp_1::Yaz_Z_Query m_query;
79             time_t m_time_last_use;
80             void timestamp();
81             BackendSet(
82                 const std::string &result_set_id,
83                 const Databases &databases,
84                 const yazpp_1::Yaz_Z_Query &query);
85             bool search(
86                 Package &frontend_package,
87                 const Z_APDU *apdu_req,
88                 const BackendInstancePtr bp);
89         };
90         class SessionShared::BackendInstance {
91             friend class Rep;
92             friend class BackendClass;
93             friend class BackendSet;
94         public:
95             mp::Session m_session;
96             BackendSetList m_sets;
97             bool m_in_use;
98             int m_sequence_this;
99             int m_result_set_sequence;
100             time_t m_time_last_use;
101             mp::Package * m_close_package;
102             ~BackendInstance();
103         };
104         class SessionShared::BackendClass : boost::noncopyable {
105             friend class Rep;
106             friend struct Frontend;
107             bool m_named_result_sets;
108             BackendInstanceList m_backend_list;
109             BackendInstancePtr create_backend(const Package &package);
110             void remove_backend(BackendInstancePtr b);
111             BackendInstancePtr get_backend(const Package &package);
112             void use_backend(BackendInstancePtr b);
113             void release_backend(BackendInstancePtr b);
114             void expire();
115             yazpp_1::GDU m_init_request;
116             yazpp_1::GDU m_init_response;
117             boost::mutex m_mutex_backend_class;
118             int m_sequence_top;
119             time_t m_backend_set_ttl;
120             time_t m_backend_expiry_ttl;
121             size_t m_backend_set_max;
122         public:
123             BackendClass(const yazpp_1::GDU &init_request,
124                          int resultset_ttl,
125                          int resultset_max,
126                          int session_ttl);
127             ~BackendClass();
128         };
129         class SessionShared::FrontendSet {
130             Databases m_databases;
131             yazpp_1::Yaz_Z_Query m_query;
132         public:
133             const Databases &get_databases();
134             const yazpp_1::Yaz_Z_Query &get_query();
135             FrontendSet(
136                 const Databases &databases,
137                 const yazpp_1::Yaz_Z_Query &query);
138             FrontendSet();
139         };
140         struct SessionShared::Frontend {
141             Frontend(Rep *rep);
142             ~Frontend();
143             bool m_is_virtual;
144             bool m_in_use;
145             Z_Options m_init_options;
146             void search(Package &package, Z_APDU *apdu);
147             void present(Package &package, Z_APDU *apdu);
148             void scan(Package &package, Z_APDU *apdu);
149
150             void get_set(mp::Package &package,
151                          const Z_APDU *apdu_req,
152                          const Databases &databases,
153                          yazpp_1::Yaz_Z_Query &query,
154                          BackendInstancePtr &found_backend,
155                          BackendSetPtr &found_set);
156             void override_set(BackendInstancePtr &found_backend,
157                               std::string &result_set_id);
158
159             Rep *m_p;
160             BackendClassPtr m_backend_class;
161             FrontendSets m_frontend_sets;
162         };            
163         class SessionShared::Rep {
164             friend class SessionShared;
165             friend struct Frontend;
166             
167             FrontendPtr get_frontend(Package &package);
168             void release_frontend(Package &package);
169             Rep();
170         public:
171             void expire();
172         private:
173             void init(Package &package, const Z_GDU *gdu,
174                       FrontendPtr frontend);
175             boost::mutex m_mutex;
176             boost::condition m_cond_session_ready;
177             std::map<mp::Session, FrontendPtr> m_clients;
178
179             BackendClassMap m_backend_map;
180             boost::mutex m_mutex_backend_map;
181             boost::thread_group m_thrds;
182             int m_resultset_ttl;
183             int m_resultset_max;
184             int m_session_ttl;
185         };
186     }
187 }
188
189 yf::SessionShared::FrontendSet::FrontendSet(
190     const Databases &databases,
191     const yazpp_1::Yaz_Z_Query &query)
192     : m_databases(databases), m_query(query)
193 {
194 }
195
196 const yf::SessionShared::Databases & 
197 yf::SessionShared::FrontendSet::get_databases()
198 {
199     return m_databases;
200 }
201
202 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
203 {
204     return m_query;
205 }
206
207 yf::SessionShared::InitKey::InitKey(const InitKey &k)
208 {
209     m_odr = odr_createmem(ODR_ENCODE);
210     
211     m_idAuthentication_size =  k.m_idAuthentication_size;
212     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
213     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
214            m_idAuthentication_size);
215
216     m_otherInfo_size =  k.m_otherInfo_size;
217     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
218     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
219            m_otherInfo_size);
220 }
221
222 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
223 {
224     m_odr = odr_createmem(ODR_ENCODE);
225
226     Z_IdAuthentication *t = req->idAuthentication;
227     z_IdAuthentication(m_odr, &t, 1, 0);
228     m_idAuthentication_buf =
229         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
230
231     Z_OtherInformation *o = req->otherInfo;
232     z_OtherInformation(m_odr, &o, 1, 0);
233     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
234 }
235
236 yf::SessionShared::InitKey::~InitKey()
237 {
238     odr_destroy(m_odr);
239 }
240
241 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
242     const 
243 {
244     int c;
245     c = mp::util::memcmp2(
246         (void*) m_idAuthentication_buf, m_idAuthentication_size,
247         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
248     if (c < 0)
249         return true;
250     else if (c > 0)
251         return false;
252
253     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
254                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
255     if (c < 0)
256         return true;
257     else if (c > 0)
258         return false;
259     return false;
260 }
261
262 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
263 {
264     boost::mutex::scoped_lock lock(m_mutex_backend_class);
265     b->m_in_use = false;
266 }
267
268
269 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
270 {
271     BackendInstanceList::iterator it = m_backend_list.begin();
272     
273     while (it != m_backend_list.end())
274     {
275         if (*it == b)
276             it = m_backend_list.erase(it);
277         else
278             it++;
279     }
280 }
281
282
283
284 yf::SessionShared::BackendInstancePtr 
285 yf::SessionShared::BackendClass::get_backend(
286     const mp::Package &frontend_package)
287 {
288     {
289         boost::mutex::scoped_lock lock(m_mutex_backend_class);
290         
291         BackendInstanceList::const_iterator it = m_backend_list.begin();
292         
293         BackendInstancePtr backend1; // null
294         
295         for (; it != m_backend_list.end(); it++)
296         {
297             if (!(*it)->m_in_use)
298             {
299                 if (!backend1 
300                     || (*it)->m_sequence_this < backend1->m_sequence_this)
301                     backend1 = *it;
302             }
303         }
304         if (backend1)
305         {
306             use_backend(backend1);
307             return backend1;
308         }
309     }
310     return create_backend(frontend_package);
311 }
312
313 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
314 {
315     backend->m_in_use = true;
316     time(&backend->m_time_last_use);
317     backend->m_sequence_this = m_sequence_top++;
318 }
319
320 yf::SessionShared::BackendInstance::~BackendInstance()
321 {
322     delete m_close_package;
323 }
324
325 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
326     const mp::Package &frontend_package)
327 {
328     BackendInstancePtr bp(new BackendInstance);
329     BackendInstancePtr null;
330
331     bp->m_close_package =
332         new mp::Package(bp->m_session, frontend_package.origin());
333     bp->m_close_package->copy_filter(frontend_package);
334
335     Package init_package(bp->m_session, frontend_package.origin());
336
337     init_package.copy_filter(frontend_package);
338
339     yazpp_1::GDU actual_init_request = m_init_request;
340     Z_GDU *init_pdu = actual_init_request.get();
341
342     assert(init_pdu->which == Z_GDU_Z3950);
343     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
344
345     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
346     ODR_MASK_ZERO(req->options);
347
348     ODR_MASK_SET(req->options, Z_Options_search);
349     ODR_MASK_SET(req->options, Z_Options_present);
350     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
351     ODR_MASK_SET(req->options, Z_Options_scan);
352
353     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
354     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
355     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
356
357     init_package.request() = init_pdu;
358
359     init_package.move();
360
361     boost::mutex::scoped_lock lock(m_mutex_backend_class);
362
363     m_named_result_sets = false;
364     Z_GDU *gdu = init_package.response().get();
365     if (!init_package.session().is_closed()
366         && gdu && gdu->which == Z_GDU_Z3950 
367         && gdu->u.z3950->which == Z_APDU_initResponse)
368     {
369         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
370         if (!*res->result)
371             return null;
372         m_init_response = gdu->u.z3950;
373         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
374         {
375             m_named_result_sets = true;
376         }
377     }
378     else
379     {
380         // did not receive an init response or closed
381         return null;
382     }
383     bp->m_in_use = true;
384     time(&bp->m_time_last_use);
385     bp->m_sequence_this = 0;
386     bp->m_result_set_sequence = 0;
387     m_backend_list.push_back(bp);
388
389     return bp;
390 }
391
392
393 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
394                                               int resultset_ttl,
395                                               int resultset_max,
396                                               int session_ttl)
397     : m_named_result_sets(false), m_init_request(init_request),
398       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
399       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
400 {}
401
402 yf::SessionShared::BackendClass::~BackendClass()
403 {}
404
405 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
406                                   FrontendPtr frontend)
407 {
408     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
409
410     frontend->m_is_virtual = true;
411     frontend->m_init_options = *req->options;
412     InitKey k(req);
413     {
414         boost::mutex::scoped_lock lock(m_mutex_backend_map);
415         BackendClassMap::const_iterator it;
416         it = m_backend_map.find(k);
417         if (it == m_backend_map.end())
418         {
419             BackendClassPtr b(new BackendClass(gdu->u.z3950,
420                                                m_resultset_ttl,
421                                                m_resultset_max,
422                                                m_session_ttl));
423             m_backend_map[k] = b;
424             frontend->m_backend_class = b;
425             std::cout << "SessionShared::Rep::init new session " 
426                       << frontend->m_backend_class << "\n";
427         }
428         else
429         {
430             frontend->m_backend_class = it->second;            
431             std::cout << "SessionShared::Rep::init existing session "
432                       << frontend->m_backend_class << "\n";
433         }
434     }
435     BackendClassPtr bc = frontend->m_backend_class;
436     BackendInstancePtr backend = bc->get_backend(package);
437     
438     mp::odr odr;
439     if (!backend)
440     {
441         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
442         *apdu->u.initResponse->result = 0;
443         package.response() = apdu;
444         package.session().close();
445     }
446     else
447     {
448         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
449         yazpp_1::GDU init_response = bc->m_init_response;
450         Z_GDU *response_gdu = init_response.get();
451         mp::util::transfer_referenceId(odr, gdu->u.z3950,
452                                        response_gdu->u.z3950);
453
454         Z_Options *server_options =
455             response_gdu->u.z3950->u.initResponse->options;
456         Z_Options *client_options = &frontend->m_init_options;
457
458         int i;
459         for (i = 0; i<30; i++)
460             if (!ODR_MASK_GET(client_options, i))
461                 ODR_MASK_CLEAR(server_options, i);
462         package.response() = init_response;
463     }
464     if (backend)
465         bc->release_backend(backend);
466 }
467
468 void yf::SessionShared::BackendSet::timestamp()
469 {
470     time(&m_time_last_use);
471 }
472
473 yf::SessionShared::BackendSet::BackendSet(
474     const std::string &result_set_id,
475     const Databases &databases,
476     const yazpp_1::Yaz_Z_Query &query) :
477     m_result_set_id(result_set_id),
478     m_databases(databases), m_result_set_size(0), m_query(query) 
479 {
480     timestamp();
481 }
482
483 bool yf::SessionShared::BackendSet::search(
484     mp::Package &frontend_package,
485     const Z_APDU *frontend_apdu,
486     const BackendInstancePtr bp)
487 {
488     Package search_package(bp->m_session, frontend_package.origin());
489
490     search_package.copy_filter(frontend_package);
491
492     mp::odr odr;
493     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
494     Z_SearchRequest *req = apdu_req->u.searchRequest;
495
496     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
497     req->query = m_query.get_Z_Query();
498
499     req->num_databaseNames = m_databases.size();
500     req->databaseNames = (char**) 
501         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
502     Databases::const_iterator it = m_databases.begin();
503     size_t i = 0;
504     for (; it != m_databases.end(); it++)
505         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
506
507     search_package.request() = apdu_req;
508
509     search_package.move();
510     
511     Z_Records *z_records_diag = 0;
512     Z_GDU *gdu = search_package.response().get();
513     if (!search_package.session().is_closed()
514         && gdu && gdu->which == Z_GDU_Z3950 
515         && gdu->u.z3950->which == Z_APDU_searchResponse)
516     {
517         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
518         if (b_resp->records)
519         {
520             if (b_resp->records->which == Z_Records_NSD 
521                 || b_resp->records->which == Z_Records_multipleNSD)
522                 z_records_diag = b_resp->records;
523         }
524         if (z_records_diag)
525         {
526             if (frontend_apdu->which == Z_APDU_searchRequest)
527             {
528                 Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu, 
529                                                            0, 0);
530                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
531                 f_resp->records = z_records_diag;
532                 frontend_package.response() = f_apdu;
533                 return false;
534             }
535             if (frontend_apdu->which == Z_APDU_presentRequest)
536             {
537                 Z_APDU *f_apdu = odr.create_presentResponse(frontend_apdu, 
538                                                             0, 0);
539                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
540                 f_resp->records = z_records_diag;
541                 frontend_package.response() = f_apdu;
542                 return false;
543             }
544         }
545         m_result_set_size = *b_resp->resultCount;
546         return true;
547     }
548     Z_APDU *f_apdu = 0;
549     if (frontend_apdu->which == Z_APDU_searchRequest)
550         f_apdu = odr.create_searchResponse(
551             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
552     else if (frontend_apdu->which == Z_APDU_presentRequest)
553         f_apdu = odr.create_presentResponse(
554             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
555     else
556         f_apdu = odr.create_close(
557             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
558     frontend_package.response() = f_apdu;
559     return false;
560 }
561
562 void yf::SessionShared::Frontend::override_set(
563     BackendInstancePtr &found_backend,
564     std::string &result_set_id)
565 {
566     BackendClassPtr bc = m_backend_class;
567     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
568     time_t now;
569     time(&now);
570     
571     for (; it != bc->m_backend_list.end(); it++)
572     {
573         if (!(*it)->m_in_use)
574         {
575             BackendSetList::iterator set_it = (*it)->m_sets.begin();
576             for (; set_it != (*it)->m_sets.end(); set_it++)
577             {
578                 if (now >= (*set_it)->m_time_last_use &&
579                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
580                 {
581                     found_backend = *it;
582                     result_set_id = (*set_it)->m_result_set_id;
583                     found_backend->m_sets.erase(set_it);
584                     std::cout << "REUSE TTL SET: " << result_set_id << "\n";
585                     return;
586                 }
587             }
588         }
589     }
590     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
591     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
592     {
593         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
594         {
595             found_backend = *it;
596             if (bc->m_named_result_sets)
597             {
598                 result_set_id = boost::io::str(
599                     boost::format("%1%") % 
600                     found_backend->m_result_set_sequence);
601                 found_backend->m_result_set_sequence++;
602             }
603             else
604                 result_set_id = "default";
605             std::cout << "AVAILABLE SET: " << result_set_id << "\n";
606             return;
607         }
608     }
609 }
610
611 void yf::SessionShared::Frontend::get_set(mp::Package &package,
612                                           const Z_APDU *apdu_req,
613                                           const Databases &databases,
614                                           yazpp_1::Yaz_Z_Query &query,
615                                           BackendInstancePtr &found_backend,
616                                           BackendSetPtr &found_set)
617 {
618     std::string result_set_id;
619     BackendClassPtr bc = m_backend_class;
620     {
621         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
622      
623         // look at each backend and see if we have a similar search
624         BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
625         
626         for (; it != bc->m_backend_list.end(); it++)
627         {
628             if (!(*it)->m_in_use)
629             {
630                 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
631                 for (; set_it != (*it)->m_sets.end(); set_it++)
632                 {
633                     if ((*set_it)->m_databases == databases 
634                         && query.match(&(*set_it)->m_query))
635                     {
636                         found_set = *set_it;
637                         found_backend = *it;
638                         bc->use_backend(found_backend);
639                         found_set->timestamp();
640                         std::cout << "MATCH SET: " << 
641                             found_set->m_result_set_id << "\n";
642                         // found matching set. No need to search again
643                         return;
644                     }
645                 }
646             }
647         }
648         override_set(found_backend, result_set_id);
649         if (found_backend)
650             bc->use_backend(found_backend);
651     }
652     if (!found_backend)
653     {
654         // create a new backend set (and new set)
655         found_backend = bc->create_backend(package);
656
657         if (!found_backend)
658         {
659             Z_APDU *f_apdu = 0;
660             mp::odr odr;
661             if (apdu_req->which == Z_APDU_searchRequest)
662             {
663                 f_apdu = odr.create_searchResponse(
664                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
665             }
666             else if (apdu_req->which == Z_APDU_presentRequest)
667             {
668                 f_apdu = odr.create_presentResponse(
669                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
670             }
671             else
672             {
673                 f_apdu = odr.create_close(
674                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
675             }
676             package.response() = f_apdu;
677             return;
678         }
679         std::cout << "NEW " << found_backend << "\n";
680         
681         if (bc->m_named_result_sets)
682         {
683             result_set_id = boost::io::str(
684                 boost::format("%1%") % found_backend->m_result_set_sequence);
685             found_backend->m_result_set_sequence++;
686         }
687         else
688             result_set_id = "default";
689         std::cout << "NEW SET: " << result_set_id << "\n";
690     }
691     // we must search ...
692     BackendSetPtr new_set(new BackendSet(result_set_id,
693                                          databases, query));
694     if (!new_set->search(package, apdu_req, found_backend))
695     {
696         std::cout << "search error\n";
697         bc->remove_backend(found_backend);
698         return; // search error 
699     }
700     found_set = new_set;
701     found_set->timestamp();
702     found_backend->m_sets.push_back(found_set);
703 }
704
705 void yf::SessionShared::Frontend::search(mp::Package &package,
706                                          Z_APDU *apdu_req)
707 {
708     Z_SearchRequest *req = apdu_req->u.searchRequest;
709     FrontendSets::iterator fset_it = 
710         m_frontend_sets.find(req->resultSetName);
711     if (fset_it != m_frontend_sets.end())
712     {
713         // result set already exist 
714         // if replace indicator is off: we return diagnostic if
715         // result set already exist.
716         if (*req->replaceIndicator == 0)
717         {
718             mp::odr odr;
719             Z_APDU *apdu = 
720                 odr.create_searchResponse(
721                     apdu_req,
722                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
723                     0);
724             package.response() = apdu;
725             
726             return;
727         }
728         m_frontend_sets.erase(fset_it);
729     }
730     
731     yazpp_1::Yaz_Z_Query query;
732     query.set_Z_Query(req->query);
733     Databases databases;
734     int i;
735     for (i = 0; i<req->num_databaseNames; i++)
736         databases.push_back(req->databaseNames[i]);
737
738     BackendSetPtr found_set; // null
739     BackendInstancePtr found_backend; // null
740
741     get_set(package, apdu_req, databases, query, found_backend, found_set);
742     if (!found_set)
743         return;
744
745     mp::odr odr;
746     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
747     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
748     *f_resp->resultCount = found_set->m_result_set_size;
749     package.response() = f_apdu;
750
751     FrontendSetPtr fset(new FrontendSet(databases, query));
752     m_frontend_sets[req->resultSetName] = fset;
753
754     m_backend_class->release_backend(found_backend);
755 }
756
757 void yf::SessionShared::Frontend::present(mp::Package &package,
758                                           Z_APDU *apdu_req)
759 {
760     mp::odr odr;
761     Z_PresentRequest *req = apdu_req->u.presentRequest;
762
763     FrontendSets::iterator fset_it = 
764         m_frontend_sets.find(req->resultSetId);
765
766     if (fset_it == m_frontend_sets.end())
767     {
768         Z_APDU *apdu = 
769             odr.create_presentResponse(
770                 apdu_req,
771                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
772                 req->resultSetId);
773         package.response() = apdu;
774         return;
775     }
776     FrontendSetPtr fset = fset_it->second;
777
778     Databases databases = fset->get_databases();
779     yazpp_1::Yaz_Z_Query query = fset->get_query();
780
781     BackendClassPtr bc = m_backend_class;
782     BackendSetPtr found_set; // null
783     BackendInstancePtr found_backend;
784
785     get_set(package, apdu_req, databases, query, found_backend, found_set);
786     if (!found_set)
787         return;
788
789     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
790     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
791     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
792     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
793     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
794     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
795     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
796     p_req->recordComposition = req->recordComposition;
797
798     Package present_package(found_backend->m_session, package.origin());
799     present_package.copy_filter(package);
800
801     present_package.request() = p_apdu;
802
803     present_package.move();
804
805     Z_GDU *gdu = present_package.response().get();
806     if (!present_package.session().is_closed()
807         && gdu && gdu->which == Z_GDU_Z3950 
808         && gdu->u.z3950->which == Z_APDU_presentResponse)
809     {
810         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
811         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
812         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
813
814         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
815         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
816         f_resp->presentStatus= b_resp->presentStatus;
817         f_resp->records = b_resp->records;
818         f_resp->otherInfo = b_resp->otherInfo;
819         package.response() = f_apdu_res;
820         bc->release_backend(found_backend);
821     }
822     else
823     {
824         bc->remove_backend(found_backend);
825         Z_APDU *f_apdu_res = 
826             odr.create_presentResponse(
827                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
828         package.response() = f_apdu_res;
829     }
830 }
831
832 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
833                                        Z_APDU *apdu_req)
834 {
835     BackendClassPtr bc = m_backend_class;
836     BackendInstancePtr backend = bc->get_backend(frontend_package);
837     if (!backend)
838     {
839         mp::odr odr;
840         Z_APDU *apdu = odr.create_scanResponse(
841             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
842         frontend_package.response() = apdu;
843     }
844     else
845     {
846         Package scan_package(backend->m_session, frontend_package.origin());
847         scan_package.copy_filter(frontend_package);
848         scan_package.request() = apdu_req;
849         scan_package.move();
850         frontend_package.response() = scan_package.response();
851         if (scan_package.session().is_closed())
852         {
853             frontend_package.session().close();
854             bc->remove_backend(backend);
855         }
856         else
857             bc->release_backend(backend);
858     }
859 }
860
861 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
862 {
863 }
864
865 void yf::SessionShared::Worker::operator() (void)
866 {
867     m_p->expire();
868 }
869
870 void yf::SessionShared::BackendClass::expire()
871 {
872     time_t now;
873     time(&now);
874     boost::mutex::scoped_lock lock(m_mutex_backend_class);
875     BackendInstanceList::iterator bit = m_backend_list.begin();
876     while (bit != m_backend_list.end())
877     {
878         std::cout << "expiry ";
879         time_t last_use = (*bit)->m_time_last_use;
880         
881         if ((*bit)->m_in_use)
882         {
883             std::cout << "inuse";
884             bit++;
885         }
886         else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
887             || (now < last_use))
888         {
889             mp::odr odr;
890             (*bit)->m_close_package->response() = odr.create_close(
891                 0, Z_Close_lackOfActivity, 0);
892             (*bit)->m_close_package->session().close();
893             (*bit)->m_close_package->move();
894
895             bit = m_backend_list.erase(bit);
896             std::cout << "erase";
897         }
898         else
899         {
900             std::cout << "keep";
901             bit++;
902         }
903         std::cout << std::endl;
904     }
905 }
906
907 void yf::SessionShared::Rep::expire()
908 {
909     while (true)
910     {
911         boost::xtime xt;
912         boost::xtime_get(&xt, boost::TIME_UTC);
913         xt.sec += 30;
914         boost::thread::sleep(xt);
915         //std::cout << "." << std::endl;
916         
917         BackendClassMap::const_iterator b_it = m_backend_map.begin();
918         for (; b_it != m_backend_map.end(); b_it++)
919             b_it->second->expire();
920     }
921 }
922
923 yf::SessionShared::Rep::Rep()
924 {
925     m_resultset_ttl = 30;
926     m_resultset_max = 10;
927     m_session_ttl = 90;
928     yf::SessionShared::Worker w(this);
929     m_thrds.add_thread(new boost::thread(w));
930 }
931
932 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
933 {
934 }
935
936 yf::SessionShared::~SessionShared() {
937 }
938
939
940 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
941 {
942 }
943
944 yf::SessionShared::Frontend::~Frontend()
945 {
946 }
947
948 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
949 {
950     boost::mutex::scoped_lock lock(m_mutex);
951
952     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
953     
954     while(true)
955     {
956         it = m_clients.find(package.session());
957         if (it == m_clients.end())
958             break;
959         
960         if (!it->second->m_in_use)
961         {
962             it->second->m_in_use = true;
963             return it->second;
964         }
965         m_cond_session_ready.wait(lock);
966     }
967     FrontendPtr f(new Frontend(this));
968     m_clients[package.session()] = f;
969     f->m_in_use = true;
970     return f;
971 }
972
973 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
974 {
975     boost::mutex::scoped_lock lock(m_mutex);
976     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
977     
978     it = m_clients.find(package.session());
979     if (it != m_clients.end())
980     {
981         if (package.session().is_closed())
982         {
983             m_clients.erase(it);
984         }
985         else
986         {
987             it->second->m_in_use = false;
988         }
989         m_cond_session_ready.notify_all();
990     }
991 }
992
993
994 void yf::SessionShared::process(mp::Package &package) const
995 {
996     FrontendPtr f = m_p->get_frontend(package);
997
998     Z_GDU *gdu = package.request().get();
999     
1000     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1001         Z_APDU_initRequest && !f->m_is_virtual)
1002     {
1003         m_p->init(package, gdu, f);
1004     }
1005     else if (!f->m_is_virtual)
1006         package.move();
1007     else if (gdu && gdu->which == Z_GDU_Z3950)
1008     {
1009         Z_APDU *apdu = gdu->u.z3950;
1010         if (apdu->which == Z_APDU_initRequest)
1011         {
1012             mp::odr odr;
1013             
1014             package.response() = odr.create_close(
1015                 apdu,
1016                 Z_Close_protocolError,
1017                 "double init");
1018             
1019             package.session().close();
1020         }
1021         else if (apdu->which == Z_APDU_close)
1022         {
1023             mp::odr odr;
1024             
1025             package.response() = odr.create_close(
1026                 apdu,
1027                 Z_Close_peerAbort, "received close from client");
1028             package.session().close();
1029         }
1030         else if (apdu->which == Z_APDU_searchRequest)
1031         {
1032             f->search(package, apdu);
1033         }
1034         else if (apdu->which == Z_APDU_presentRequest)
1035         {
1036             f->present(package, apdu);
1037         }
1038         else if (apdu->which == Z_APDU_scanRequest)
1039         {
1040             f->scan(package, apdu);
1041         }
1042         else
1043         {
1044             mp::odr odr;
1045             
1046             package.response() = odr.create_close(
1047                 apdu, Z_Close_protocolError,
1048                 "unsupported APDU in filter_session_shared");
1049             
1050             package.session().close();
1051         }
1052     }
1053     m_p->release_frontend(package);
1054 }
1055
1056 void yf::SessionShared::configure(const xmlNode *ptr)
1057 {
1058     for (ptr = ptr->children; ptr; ptr = ptr->next)
1059     {
1060         if (ptr->type != XML_ELEMENT_NODE)
1061             continue;
1062         if (!strcmp((const char *) ptr->name, "resultset"))
1063         {
1064             const struct _xmlAttr *attr;
1065             for (attr = ptr->properties; attr; attr = attr->next)
1066             {
1067                 if (!strcmp((const char *) attr->name, "ttl"))
1068                     m_p->m_resultset_ttl = 
1069                         mp::xml::get_int(attr->children, 30);
1070                 else if (!strcmp((const char *) attr->name, "max"))
1071                 {
1072                     m_p->m_resultset_max = 
1073                         mp::xml::get_int(attr->children, 10);
1074                 }
1075                 else
1076                     throw mp::filter::FilterException(
1077                         "Bad attribute " + std::string((const char *)
1078                                                        attr->name));
1079             }
1080         }
1081         else if (!strcmp((const char *) ptr->name, "session"))
1082         {
1083             const struct _xmlAttr *attr;
1084             for (attr = ptr->properties; attr; attr = attr->next)
1085             {
1086                 if (!strcmp((const char *) attr->name, "ttl"))
1087                     m_p->m_session_ttl = 
1088                         mp::xml::get_int(attr->children, 120);
1089                 else
1090                     throw mp::filter::FilterException(
1091                         "Bad attribute " + std::string((const char *)
1092                                                        attr->name));
1093             }
1094         }
1095         else
1096         {
1097             throw mp::filter::FilterException("Bad element " 
1098                                                + std::string((const char *)
1099                                                              ptr->name));
1100         }
1101     }
1102 }
1103
1104 static mp::filter::Base* filter_creator()
1105 {
1106     return new mp::filter::SessionShared;
1107 }
1108
1109 extern "C" {
1110     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1111         0,
1112         "session_shared",
1113         filter_creator
1114     };
1115 }
1116
1117 /*
1118  * Local variables:
1119  * c-basic-offset: 4
1120  * indent-tabs-mode: nil
1121  * c-file-style: "stroustrup"
1122  * End:
1123  * vim: shiftwidth=4 tabstop=8 expandtab
1124  */