Functional session_shared module. Require yazpp 1.0.1.
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* $Id: filter_session_shared.cpp,v 1.12 2006-06-19 23:54:02 adam Exp $
2    Copyright (c) 2005-2006, Index Data.
3
4    See the LICENSE file for details
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
14 #include <boost/thread/thread.hpp>
15 #include <boost/thread/xtime.hpp>
16 #include <boost/shared_ptr.hpp>
17 #include <boost/format.hpp>
18
19 #include "util.hpp"
20 #include "filter_session_shared.hpp"
21
22 #include <yaz/log.h>
23 #include <yaz/zgdu.h>
24 #include <yaz/otherinfo.h>
25 #include <yaz/diagbib1.h>
26 #include <yazpp/z-query.h>
27 #include <map>
28 #include <iostream>
29 #include <time.h>
30
31 namespace mp = metaproxy_1;
32 namespace yf = metaproxy_1::filter;
33
34 namespace metaproxy_1 {
35
36     namespace filter {
37         class SessionShared::InitKey {
38         public:
39             bool operator < (const SessionShared::InitKey &k) const;
40             InitKey(Z_InitRequest *req);
41             InitKey(const InitKey &);
42             ~InitKey();
43         private:
44             InitKey &operator = (const InitKey &k);
45             char *m_idAuthentication_buf;
46             int m_idAuthentication_size;
47             char *m_otherInfo_buf;
48             int m_otherInfo_size;
49             ODR m_odr;
50         };
51         class SessionShared::Worker {
52         public:
53             Worker(SessionShared::Rep *rep);
54             void operator() (void);
55         private:
56             SessionShared::Rep *m_p;
57         };
58         class SessionShared::BackendSet {
59         public:
60             std::string m_result_set_id;
61             Databases m_databases;
62             int m_result_set_size;
63             yazpp_1::Yaz_Z_Query m_query;
64             time_t m_time_last_use;
65             void timestamp();
66             BackendSet(
67                 const std::string &result_set_id,
68                 const Databases &databases,
69                 const yazpp_1::Yaz_Z_Query &query);
70             bool search(
71                 Package &frontend_package,
72                 const Z_APDU *apdu_req,
73                 const BackendInstancePtr bp);
74         };
75         class SessionShared::BackendInstance {
76             friend class Rep;
77             friend class BackendClass;
78             friend class BackendSet;
79         public:
80             mp::Session m_session;
81             BackendSetList m_sets;
82             bool m_in_use;
83             int m_sequence_this;
84             int m_result_set_sequence;
85             time_t m_time_last_use;
86             mp::Package * m_close_package;
87             ~BackendInstance();
88         };
89         class SessionShared::BackendClass : boost::noncopyable {
90             friend class Rep;
91             friend struct Frontend;
92             bool m_named_result_sets;
93             BackendInstanceList m_backend_list;
94             BackendInstancePtr create_backend(const Package &package);
95             void remove_backend(BackendInstancePtr b);
96             BackendInstancePtr get_backend(const Package &package);
97             void use_backend(BackendInstancePtr b);
98             void release_backend(BackendInstancePtr b);
99             void expire();
100             yazpp_1::GDU m_init_request;
101             yazpp_1::GDU m_init_response;
102             boost::mutex m_mutex_backend_class;
103             int m_sequence_top;
104             time_t m_backend_set_ttl;
105             time_t m_backend_expiry_ttl;
106             size_t m_backend_set_max;
107         public:
108             BackendClass(const yazpp_1::GDU &init_request);
109             ~BackendClass();
110         };
111         class SessionShared::FrontendSet {
112             Databases m_databases;
113             yazpp_1::Yaz_Z_Query m_query;
114         public:
115             const Databases &get_databases();
116             const yazpp_1::Yaz_Z_Query &get_query();
117             FrontendSet(
118                 const Databases &databases,
119                 const yazpp_1::Yaz_Z_Query &query);
120             FrontendSet();
121         };
122         struct SessionShared::Frontend {
123             Frontend(Rep *rep);
124             ~Frontend();
125             bool m_is_virtual;
126             bool m_in_use;
127
128             void search(Package &package, Z_APDU *apdu);
129             void present(Package &package, Z_APDU *apdu);
130
131             void get_set(mp::Package &package,
132                          const Z_APDU *apdu_req,
133                          const Databases &databases,
134                          yazpp_1::Yaz_Z_Query &query,
135                          BackendInstancePtr &found_backend,
136                          BackendSetPtr &found_set);
137             void override_set(BackendInstancePtr &found_backend,
138                               std::string &result_set_id);
139
140             Rep *m_p;
141             BackendClassPtr m_backend_class;
142             FrontendSets m_frontend_sets;
143         };            
144         class SessionShared::Rep {
145             friend class SessionShared;
146             friend struct Frontend;
147             
148             FrontendPtr get_frontend(Package &package);
149             void release_frontend(Package &package);
150             Rep();
151         public:
152             void expire();
153         private:
154             void init(Package &package, const Z_GDU *gdu,
155                       FrontendPtr frontend);
156             boost::mutex m_mutex;
157             boost::condition m_cond_session_ready;
158             std::map<mp::Session, FrontendPtr> m_clients;
159
160             BackendClassMap m_backend_map;
161             boost::mutex m_mutex_backend_map;
162             boost::thread_group m_thrds;
163         };
164     }
165 }
166
167 yf::SessionShared::FrontendSet::FrontendSet(
168     const Databases &databases,
169     const yazpp_1::Yaz_Z_Query &query)
170     : m_databases(databases), m_query(query)
171 {
172 }
173
174 const yf::SessionShared::Databases & 
175 yf::SessionShared::FrontendSet::get_databases()
176 {
177     return m_databases;
178 }
179
180 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
181 {
182     return m_query;
183 }
184
185 yf::SessionShared::InitKey::InitKey(const InitKey &k)
186 {
187     m_odr = odr_createmem(ODR_ENCODE);
188     
189     m_idAuthentication_size =  k.m_idAuthentication_size;
190     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
191     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
192            m_idAuthentication_size);
193
194     m_otherInfo_size =  k.m_otherInfo_size;
195     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
196     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
197            m_otherInfo_size);
198 }
199
200 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
201 {
202     m_odr = odr_createmem(ODR_ENCODE);
203
204     Z_IdAuthentication *t = req->idAuthentication;
205     z_IdAuthentication(m_odr, &t, 1, 0);
206     m_idAuthentication_buf =
207         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
208
209     Z_OtherInformation *o = req->otherInfo;
210     z_OtherInformation(m_odr, &o, 1, 0);
211     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
212 }
213
214 yf::SessionShared::InitKey::~InitKey()
215 {
216     odr_destroy(m_odr);
217 }
218
219 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
220     const 
221 {
222     int c;
223     c = mp::util::memcmp2(
224         (void*) m_idAuthentication_buf, m_idAuthentication_size,
225         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
226     if (c < 0)
227         return true;
228     else if (c > 0)
229         return false;
230
231     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
232                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
233     if (c < 0)
234         return true;
235     else if (c > 0)
236         return false;
237     return false;
238 }
239
240 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
241 {
242     boost::mutex::scoped_lock lock(m_mutex_backend_class);
243     b->m_in_use = false;
244 }
245
246
247 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
248 {
249     BackendInstanceList::iterator it = m_backend_list.begin();
250     
251     while (it != m_backend_list.end())
252     {
253         if (*it == b)
254             it = m_backend_list.erase(it);
255         else
256             it++;
257     }
258 }
259
260
261
262 yf::SessionShared::BackendInstancePtr 
263 yf::SessionShared::BackendClass::get_backend(
264     const mp::Package &frontend_package)
265 {
266     {
267         boost::mutex::scoped_lock lock(m_mutex_backend_class);
268         
269         BackendInstanceList::const_iterator it = m_backend_list.begin();
270         
271         BackendInstancePtr backend1; // null
272         
273         for (; it != m_backend_list.end(); it++)
274         {
275             if (!(*it)->m_in_use)
276             {
277                 if (!backend1 
278                     || (*it)->m_sequence_this < backend1->m_sequence_this)
279                     backend1 = *it;
280             }
281         }
282         if (backend1)
283         {
284             use_backend(backend1);
285             return backend1;
286         }
287     }
288     return create_backend(frontend_package);
289 }
290
291 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
292 {
293     backend->m_in_use = true;
294     time(&backend->m_time_last_use);
295     backend->m_sequence_this = m_sequence_top++;
296 }
297
298 yf::SessionShared::BackendInstance::~BackendInstance()
299 {
300     delete m_close_package;
301 }
302
303 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
304     const mp::Package &frontend_package)
305 {
306     BackendInstancePtr bp(new BackendInstance);
307     BackendInstancePtr null;
308
309     bp->m_close_package =
310         new mp::Package(bp->m_session, frontend_package.origin());
311     bp->m_close_package->copy_filter(frontend_package);
312
313     Package init_package(bp->m_session, frontend_package.origin());
314
315     init_package.copy_filter(frontend_package);
316
317     init_package.request() = m_init_request;
318
319     init_package.move();
320
321     boost::mutex::scoped_lock lock(m_mutex_backend_class);
322
323     m_named_result_sets = false;
324     Z_GDU *gdu = init_package.response().get();
325     if (!init_package.session().is_closed()
326         && gdu && gdu->which == Z_GDU_Z3950 
327         && gdu->u.z3950->which == Z_APDU_initResponse)
328     {
329         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
330         if (!*res->result)
331             return null;
332         m_init_response = gdu->u.z3950;
333         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
334         {
335             m_named_result_sets = true;
336         }
337     }
338     else
339     {
340         // did not receive an init response or closed
341         return null;
342     }
343     bp->m_in_use = true;
344     time(&bp->m_time_last_use);
345     bp->m_sequence_this = 0;
346     bp->m_result_set_sequence = 0;
347     m_backend_list.push_back(bp);
348
349     return bp;
350 }
351
352
353 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request)
354     : m_named_result_sets(false), m_init_request(init_request),
355       m_sequence_top(0), m_backend_set_ttl(30),
356       m_backend_expiry_ttl(90), m_backend_set_max(10)
357 {}
358
359 yf::SessionShared::BackendClass::~BackendClass()
360 {}
361
362 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
363                                   FrontendPtr frontend)
364 {
365     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
366
367     frontend->m_is_virtual = true;
368     InitKey k(req);
369     {
370         boost::mutex::scoped_lock lock(m_mutex_backend_map);
371         BackendClassMap::const_iterator it;
372         it = m_backend_map.find(k);
373         if (it == m_backend_map.end())
374         {
375             BackendClassPtr b(new BackendClass(gdu->u.z3950));
376             m_backend_map[k] = b;
377             frontend->m_backend_class = b;
378             std::cout << "SessionShared::Rep::init new session " 
379                       << frontend->m_backend_class << "\n";
380         }
381         else
382         {
383             frontend->m_backend_class = it->second;            
384             std::cout << "SessionShared::Rep::init existing session "
385                       << frontend->m_backend_class << "\n";
386         }
387     }
388     BackendClassPtr bc = frontend->m_backend_class;
389     BackendInstancePtr backend = bc->get_backend(package);
390     
391     mp::odr odr;
392     if (!backend)
393     {
394         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
395         *apdu->u.initResponse->result = 0;
396         package.response() = apdu;
397         package.session().close();
398     }
399     else
400     {
401         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
402         Z_GDU *response_gdu = bc->m_init_response.get();
403         mp::util::transfer_referenceId(odr, gdu->u.z3950,
404                                        response_gdu->u.z3950);
405         package.response() = response_gdu;
406     }
407     if (backend)
408         bc->release_backend(backend);
409 }
410
411 void yf::SessionShared::BackendSet::timestamp()
412 {
413     time(&m_time_last_use);
414 }
415
416 yf::SessionShared::BackendSet::BackendSet(
417     const std::string &result_set_id,
418     const Databases &databases,
419     const yazpp_1::Yaz_Z_Query &query) :
420     m_result_set_id(result_set_id),
421     m_databases(databases), m_result_set_size(0), m_query(query) 
422 {
423     timestamp();
424 }
425
426 bool yf::SessionShared::BackendSet::search(
427     Package &frontend_package,
428     const Z_APDU *frontend_apdu,
429     const BackendInstancePtr bp)
430 {
431     Package search_package(bp->m_session, frontend_package.origin());
432
433     search_package.copy_filter(frontend_package);
434
435     mp::odr odr;
436     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
437     Z_SearchRequest *req = apdu_req->u.searchRequest;
438
439     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
440     req->query = m_query.get_Z_Query();
441
442     req->num_databaseNames = m_databases.size();
443     req->databaseNames = (char**) 
444         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
445     Databases::const_iterator it = m_databases.begin();
446     size_t i = 0;
447     for (; it != m_databases.end(); it++)
448         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
449
450     search_package.request() = apdu_req;
451
452     search_package.move();
453     
454     Z_Records *z_records_diag = 0;
455     Z_GDU *gdu = search_package.response().get();
456     if (!search_package.session().is_closed()
457         && gdu && gdu->which == Z_GDU_Z3950 
458         && gdu->u.z3950->which == Z_APDU_searchResponse)
459     {
460         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
461         if (b_resp->records)
462         {
463             if (b_resp->records->which == Z_Records_NSD 
464                 || b_resp->records->which == Z_Records_multipleNSD)
465                 z_records_diag = b_resp->records;
466         }
467         if (z_records_diag)
468         {
469             if (frontend_apdu->which == Z_APDU_searchRequest)
470             {
471                 Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu, 
472                                                            0, 0);
473                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
474                 f_resp->records = z_records_diag;
475                 frontend_package.response() = f_apdu;
476                 return false;
477             }
478             if (frontend_apdu->which == Z_APDU_presentRequest)
479             {
480                 Z_APDU *f_apdu = odr.create_presentResponse(frontend_apdu, 
481                                                             0, 0);
482                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
483                 f_resp->records = z_records_diag;
484                 frontend_package.response() = f_apdu;
485                 return false;
486             }
487         }
488         m_result_set_size = *b_resp->resultCount;
489         return true;
490     }
491     if (frontend_apdu->which == Z_APDU_searchRequest)
492     {
493         Z_APDU *f_apdu = 
494             odr.create_searchResponse(frontend_apdu, 1, "Search closed");
495         frontend_package.response() = f_apdu;
496     }
497     if (frontend_apdu->which == Z_APDU_presentRequest)
498     {
499         Z_APDU *f_apdu = 
500             odr.create_presentResponse(frontend_apdu, 1, "Search closed");
501         frontend_package.response() = f_apdu;
502     }
503     return false;
504 }
505
506 void yf::SessionShared::Frontend::override_set(
507     BackendInstancePtr &found_backend,
508     std::string &result_set_id)
509 {
510     BackendClassPtr bc = m_backend_class;
511     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
512     time_t now;
513     time(&now);
514     
515     for (; it != bc->m_backend_list.end(); it++)
516     {
517         if (!(*it)->m_in_use)
518         {
519             BackendSetList::iterator set_it = (*it)->m_sets.begin();
520             for (; set_it != (*it)->m_sets.end(); set_it++)
521             {
522                 if (now >= (*set_it)->m_time_last_use &&
523                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
524                 {
525                     found_backend = *it;
526                     result_set_id = (*set_it)->m_result_set_id;
527                     found_backend->m_sets.erase(set_it);
528                     std::cout << "REUSE TTL SET: " << result_set_id << "\n";
529                     return;
530                 }
531             }
532         }
533     }
534     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
535     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
536     {
537         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
538         {
539             found_backend = *it;
540             if (bc->m_named_result_sets)
541             {
542                 result_set_id = boost::io::str(
543                     boost::format("%1%") % 
544                     found_backend->m_result_set_sequence);
545                 found_backend->m_result_set_sequence++;
546             }
547             else
548                 result_set_id = "default";
549             std::cout << "AVAILABLE SET: " << result_set_id << "\n";
550             return;
551         }
552     }
553
554 }
555
556 void yf::SessionShared::Frontend::get_set(mp::Package &package,
557                                           const Z_APDU *apdu_req,
558                                           const Databases &databases,
559                                           yazpp_1::Yaz_Z_Query &query,
560                                           BackendInstancePtr &found_backend,
561                                           BackendSetPtr &found_set)
562 {
563     std::string result_set_id;
564     BackendClassPtr bc = m_backend_class;
565     {
566         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
567      
568         // look at each backend and see if we have a similar search
569         BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
570         
571         for (; it != bc->m_backend_list.end(); it++)
572         {
573             if (!(*it)->m_in_use)
574             {
575                 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
576                 for (; set_it != (*it)->m_sets.end(); set_it++)
577                 {
578                     if ((*set_it)->m_databases == databases 
579                         && query.match(&(*set_it)->m_query))
580                     {
581                         found_set = *set_it;
582                         found_backend = *it;
583                         bc->use_backend(found_backend);
584                         found_set->timestamp();
585                         std::cout << "MATCH SET: " << 
586                             found_set->m_result_set_id << "\n";
587                         // found matching set. No need to search again
588                         return;
589                     }
590                 }
591             }
592         }
593         override_set(found_backend, result_set_id);
594         if (found_backend)
595             bc->use_backend(found_backend);
596     }
597     if (!found_backend)
598     {
599         // create a new backend set (and new set)
600         found_backend = bc->create_backend(package);
601         assert(found_backend);
602         std::cout << "NEW " << found_backend << "\n";
603         
604         if (bc->m_named_result_sets)
605         {
606             result_set_id = boost::io::str(
607                 boost::format("%1%") % found_backend->m_result_set_sequence);
608             found_backend->m_result_set_sequence++;
609         }
610         else
611             result_set_id = "default";
612         std::cout << "NEW SET: " << result_set_id << "\n";
613     }
614     // we must search ...
615     BackendSetPtr new_set(new BackendSet(result_set_id,
616                                          databases, query));
617     if (!new_set->search(package, apdu_req, found_backend))
618     {
619         std::cout << "search error\n";
620         bc->release_backend(found_backend);
621         return; // search error 
622     }
623     found_set = new_set;
624     found_set->timestamp();
625     found_backend->m_sets.push_back(found_set);
626 }
627
628 void yf::SessionShared::Frontend::search(mp::Package &package,
629                                          Z_APDU *apdu_req)
630 {
631     Z_SearchRequest *req = apdu_req->u.searchRequest;
632     FrontendSets::iterator fset_it = 
633         m_frontend_sets.find(req->resultSetName);
634     if (fset_it != m_frontend_sets.end())
635     {
636         // result set already exist 
637         // if replace indicator is off: we return diagnostic if
638         // result set already exist.
639         if (*req->replaceIndicator == 0)
640         {
641             mp::odr odr;
642             Z_APDU *apdu = 
643                 odr.create_searchResponse(
644                     apdu_req,
645                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
646                     0);
647             package.response() = apdu;
648             
649             return;
650         }
651         m_frontend_sets.erase(fset_it);
652     }
653     
654     yazpp_1::Yaz_Z_Query query;
655     query.set_Z_Query(req->query);
656     Databases databases;
657     int i;
658     for (i = 0; i<req->num_databaseNames; i++)
659         databases.push_back(req->databaseNames[i]);
660
661     BackendSetPtr found_set; // null
662     BackendInstancePtr found_backend; // null
663
664     get_set(package, apdu_req, databases, query, found_backend, found_set);
665
666     if (!found_set)
667         return;
668     mp::odr odr;
669     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
670     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
671     *f_resp->resultCount = found_set->m_result_set_size;
672     package.response() = f_apdu;
673
674     FrontendSetPtr fset(new FrontendSet(databases, query));
675     m_frontend_sets[req->resultSetName] = fset;
676
677     m_backend_class->release_backend(found_backend);
678 }
679
680 void yf::SessionShared::Frontend::present(mp::Package &package,
681                                           Z_APDU *apdu_req)
682 {
683     mp::odr odr;
684     Z_PresentRequest *req = apdu_req->u.presentRequest;
685
686     FrontendSets::iterator fset_it = 
687         m_frontend_sets.find(req->resultSetId);
688
689     if (fset_it == m_frontend_sets.end())
690     {
691         Z_APDU *apdu = 
692             odr.create_presentResponse(
693                 apdu_req,
694                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
695                 req->resultSetId);
696         package.response() = apdu;
697         return;
698     }
699     FrontendSetPtr fset = fset_it->second;
700
701     Databases databases = fset->get_databases();
702     yazpp_1::Yaz_Z_Query query = fset->get_query();
703
704     BackendClassPtr bc = m_backend_class;
705     BackendSetPtr found_set; // null
706     BackendInstancePtr found_backend;
707
708     get_set(package, apdu_req, databases, query, found_backend, found_set);
709     if (!found_set)
710         return;
711
712     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
713     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
714     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
715     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
716     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
717     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
718     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
719     p_req->recordComposition = req->recordComposition;
720
721     Package present_package(found_backend->m_session, package.origin());
722     present_package.copy_filter(package);
723
724     present_package.request() = p_apdu;
725
726     present_package.move();
727
728     Z_GDU *gdu = present_package.response().get();
729     if (!present_package.session().is_closed()
730         && gdu && gdu->which == Z_GDU_Z3950 
731         && gdu->u.z3950->which == Z_APDU_presentResponse)
732     {
733         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
734         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
735         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
736
737         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
738         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
739         f_resp->presentStatus= b_resp->presentStatus;
740         f_resp->records = b_resp->records;
741         f_resp->otherInfo = b_resp->otherInfo;
742         package.response() = f_apdu_res;
743         bc->release_backend(found_backend);
744     }
745     else
746     {
747         bc->remove_backend(found_backend);
748         Z_APDU *f_apdu_res = 
749             odr.create_presentResponse(apdu_req, 1, "present error");
750         package.response() = f_apdu_res;
751     }
752 }
753
754 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
755 {
756 }
757
758 void yf::SessionShared::Worker::operator() (void)
759 {
760     m_p->expire();
761 }
762
763 void yf::SessionShared::BackendClass::expire()
764 {
765     time_t now;
766     time(&now);
767     boost::mutex::scoped_lock lock(m_mutex_backend_class);
768     BackendInstanceList::iterator bit = m_backend_list.begin();
769     while (bit != m_backend_list.end())
770     {
771         std::cout << "expiry ";
772         time_t last_use = (*bit)->m_time_last_use;
773         if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
774             || (now < last_use))
775         {
776             mp::odr odr;
777             (*bit)->m_close_package->response() = odr.create_close(
778                 0, Z_Close_lackOfActivity, "session expired");
779             (*bit)->m_close_package->session().close();
780             (*bit)->m_close_package->move();
781
782             bit = m_backend_list.erase(bit);
783             std::cout << "erase";
784         }
785         else
786         {
787             std::cout << "keep";
788             bit++;
789         }
790         std::cout << std::endl;
791     }
792 }
793
794 void yf::SessionShared::Rep::expire()
795 {
796     while (true)
797     {
798         boost::xtime xt;
799         boost::xtime_get(&xt, boost::TIME_UTC);
800         xt.sec += 30;
801         boost::thread::sleep(xt);
802         std::cout << "." << std::endl;
803         
804         BackendClassMap::const_iterator b_it = m_backend_map.begin();
805         for (; b_it != m_backend_map.end(); b_it++)
806             b_it->second->expire();
807     }
808 }
809
810 yf::SessionShared::Rep::Rep()
811 {
812     yf::SessionShared::Worker w(this);
813     m_thrds.add_thread(new boost::thread(w));
814 }
815
816 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
817 {
818 }
819
820 yf::SessionShared::~SessionShared() {
821 }
822
823
824 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
825 {
826 }
827
828 yf::SessionShared::Frontend::~Frontend()
829 {
830 }
831
832 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
833 {
834     boost::mutex::scoped_lock lock(m_mutex);
835
836     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
837     
838     while(true)
839     {
840         it = m_clients.find(package.session());
841         if (it == m_clients.end())
842             break;
843         
844         if (!it->second->m_in_use)
845         {
846             it->second->m_in_use = true;
847             return it->second;
848         }
849         m_cond_session_ready.wait(lock);
850     }
851     FrontendPtr f(new Frontend(this));
852     m_clients[package.session()] = f;
853     f->m_in_use = true;
854     return f;
855 }
856
857 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
858 {
859     boost::mutex::scoped_lock lock(m_mutex);
860     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
861     
862     it = m_clients.find(package.session());
863     if (it != m_clients.end())
864     {
865         if (package.session().is_closed())
866         {
867             m_clients.erase(it);
868         }
869         else
870         {
871             it->second->m_in_use = false;
872         }
873         m_cond_session_ready.notify_all();
874     }
875 }
876
877
878 void yf::SessionShared::process(mp::Package &package) const
879 {
880     FrontendPtr f = m_p->get_frontend(package);
881
882     Z_GDU *gdu = package.request().get();
883     
884     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
885         Z_APDU_initRequest && !f->m_is_virtual)
886     {
887         m_p->init(package, gdu, f);
888     }
889     else if (!f->m_is_virtual)
890         package.move();
891     else if (gdu && gdu->which == Z_GDU_Z3950)
892     {
893         Z_APDU *apdu = gdu->u.z3950;
894         if (apdu->which == Z_APDU_initRequest)
895         {
896             mp::odr odr;
897             
898             package.response() = odr.create_close(
899                 apdu,
900                 Z_Close_protocolError,
901                 "double init");
902             
903             package.session().close();
904         }
905         else if (apdu->which == Z_APDU_close)
906         {
907             mp::odr odr;
908             
909             package.response() = odr.create_close(
910                 apdu,
911                 Z_Close_peerAbort, "received close from client");
912             package.session().close();
913         }
914         else if (apdu->which == Z_APDU_searchRequest)
915         {
916             f->search(package, apdu);
917         }
918         else if (apdu->which == Z_APDU_presentRequest)
919         {
920             f->present(package, apdu);
921         }
922         else
923         {
924             mp::odr odr;
925             
926             package.response() = odr.create_close(
927                 apdu, Z_Close_protocolError,
928                 "unsupported APDU in filter_session_shared");
929             
930             package.session().close();
931         }
932     }
933     m_p->release_frontend(package);
934 }
935
936 static mp::filter::Base* filter_creator()
937 {
938     return new mp::filter::SessionShared;
939 }
940
941 extern "C" {
942     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
943         0,
944         "session_shared",
945         filter_creator
946     };
947 }
948
949 /*
950  * Local variables:
951  * c-basic-offset: 4
952  * indent-tabs-mode: nil
953  * c-file-style: "stroustrup"
954  * End:
955  * vim: shiftwidth=4 tabstop=8 expandtab
956  */