Fix compilation for VS 2005
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* $Id: filter_session_shared.cpp,v 1.15 2006-06-21 10:06:07 adam Exp $
2    Copyright (c) 2005-2006, Index Data.
3
4    See the LICENSE file for details
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
14 #include <boost/thread/thread.hpp>
15 #include <boost/thread/xtime.hpp>
16 #include <boost/shared_ptr.hpp>
17 #include <boost/format.hpp>
18
19 #include "util.hpp"
20 #include "filter_session_shared.hpp"
21
22 #include <yaz/log.h>
23 #include <yaz/zgdu.h>
24 #include <yaz/otherinfo.h>
25 #include <yaz/diagbib1.h>
26 #include <yazpp/z-query.h>
27 #include <map>
28 #include <iostream>
29 #include <time.h>
30
31 namespace mp = metaproxy_1;
32 namespace yf = metaproxy_1::filter;
33
34 namespace metaproxy_1 {
35
36     namespace filter {
37         class SessionShared::InitKey {
38         public:
39             bool operator < (const SessionShared::InitKey &k) const;
40             InitKey(Z_InitRequest *req);
41             InitKey(const InitKey &);
42             ~InitKey();
43         private:
44             InitKey &operator = (const InitKey &k);
45             char *m_idAuthentication_buf;
46             int m_idAuthentication_size;
47             char *m_otherInfo_buf;
48             int m_otherInfo_size;
49             ODR m_odr;
50         };
51         class SessionShared::Worker {
52         public:
53             Worker(SessionShared::Rep *rep);
54             void operator() (void);
55         private:
56             SessionShared::Rep *m_p;
57         };
58         class SessionShared::BackendSet {
59         public:
60             std::string m_result_set_id;
61             Databases m_databases;
62             int m_result_set_size;
63             yazpp_1::Yaz_Z_Query m_query;
64             time_t m_time_last_use;
65             void timestamp();
66             BackendSet(
67                 const std::string &result_set_id,
68                 const Databases &databases,
69                 const yazpp_1::Yaz_Z_Query &query);
70             bool search(
71                 Package &frontend_package,
72                 const Z_APDU *apdu_req,
73                 const BackendInstancePtr bp);
74         };
75         class SessionShared::BackendInstance {
76             friend class Rep;
77             friend class BackendClass;
78             friend class BackendSet;
79         public:
80             mp::Session m_session;
81             BackendSetList m_sets;
82             bool m_in_use;
83             int m_sequence_this;
84             int m_result_set_sequence;
85             time_t m_time_last_use;
86             mp::Package * m_close_package;
87             ~BackendInstance();
88         };
89         class SessionShared::BackendClass : boost::noncopyable {
90             friend class Rep;
91             friend struct Frontend;
92             bool m_named_result_sets;
93             BackendInstanceList m_backend_list;
94             BackendInstancePtr create_backend(const Package &package);
95             void remove_backend(BackendInstancePtr b);
96             BackendInstancePtr get_backend(const Package &package);
97             void use_backend(BackendInstancePtr b);
98             void release_backend(BackendInstancePtr b);
99             void expire();
100             yazpp_1::GDU m_init_request;
101             yazpp_1::GDU m_init_response;
102             boost::mutex m_mutex_backend_class;
103             int m_sequence_top;
104             time_t m_backend_set_ttl;
105             time_t m_backend_expiry_ttl;
106             size_t m_backend_set_max;
107         public:
108             BackendClass(const yazpp_1::GDU &init_request,
109                          int resultset_ttl,
110                          int resultset_max,
111                          int session_ttl);
112             ~BackendClass();
113         };
114         class SessionShared::FrontendSet {
115             Databases m_databases;
116             yazpp_1::Yaz_Z_Query m_query;
117         public:
118             const Databases &get_databases();
119             const yazpp_1::Yaz_Z_Query &get_query();
120             FrontendSet(
121                 const Databases &databases,
122                 const yazpp_1::Yaz_Z_Query &query);
123             FrontendSet();
124         };
125         struct SessionShared::Frontend {
126             Frontend(Rep *rep);
127             ~Frontend();
128             bool m_is_virtual;
129             bool m_in_use;
130             Z_Options m_init_options;
131             void search(Package &package, Z_APDU *apdu);
132             void present(Package &package, Z_APDU *apdu);
133             void scan(Package &package, Z_APDU *apdu);
134
135             void get_set(mp::Package &package,
136                          const Z_APDU *apdu_req,
137                          const Databases &databases,
138                          yazpp_1::Yaz_Z_Query &query,
139                          BackendInstancePtr &found_backend,
140                          BackendSetPtr &found_set);
141             void override_set(BackendInstancePtr &found_backend,
142                               std::string &result_set_id);
143
144             Rep *m_p;
145             BackendClassPtr m_backend_class;
146             FrontendSets m_frontend_sets;
147         };            
148         class SessionShared::Rep {
149             friend class SessionShared;
150             friend struct Frontend;
151             
152             FrontendPtr get_frontend(Package &package);
153             void release_frontend(Package &package);
154             Rep();
155         public:
156             void expire();
157         private:
158             void init(Package &package, const Z_GDU *gdu,
159                       FrontendPtr frontend);
160             boost::mutex m_mutex;
161             boost::condition m_cond_session_ready;
162             std::map<mp::Session, FrontendPtr> m_clients;
163
164             BackendClassMap m_backend_map;
165             boost::mutex m_mutex_backend_map;
166             boost::thread_group m_thrds;
167             int m_resultset_ttl;
168             int m_resultset_max;
169             int m_session_ttl;
170         };
171     }
172 }
173
174 yf::SessionShared::FrontendSet::FrontendSet(
175     const Databases &databases,
176     const yazpp_1::Yaz_Z_Query &query)
177     : m_databases(databases), m_query(query)
178 {
179 }
180
181 const yf::SessionShared::Databases & 
182 yf::SessionShared::FrontendSet::get_databases()
183 {
184     return m_databases;
185 }
186
187 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
188 {
189     return m_query;
190 }
191
192 yf::SessionShared::InitKey::InitKey(const InitKey &k)
193 {
194     m_odr = odr_createmem(ODR_ENCODE);
195     
196     m_idAuthentication_size =  k.m_idAuthentication_size;
197     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
198     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
199            m_idAuthentication_size);
200
201     m_otherInfo_size =  k.m_otherInfo_size;
202     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
203     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
204            m_otherInfo_size);
205 }
206
207 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
208 {
209     m_odr = odr_createmem(ODR_ENCODE);
210
211     Z_IdAuthentication *t = req->idAuthentication;
212     z_IdAuthentication(m_odr, &t, 1, 0);
213     m_idAuthentication_buf =
214         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
215
216     Z_OtherInformation *o = req->otherInfo;
217     z_OtherInformation(m_odr, &o, 1, 0);
218     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
219 }
220
221 yf::SessionShared::InitKey::~InitKey()
222 {
223     odr_destroy(m_odr);
224 }
225
226 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
227     const 
228 {
229     int c;
230     c = mp::util::memcmp2(
231         (void*) m_idAuthentication_buf, m_idAuthentication_size,
232         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
233     if (c < 0)
234         return true;
235     else if (c > 0)
236         return false;
237
238     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
239                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
240     if (c < 0)
241         return true;
242     else if (c > 0)
243         return false;
244     return false;
245 }
246
247 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
248 {
249     boost::mutex::scoped_lock lock(m_mutex_backend_class);
250     b->m_in_use = false;
251 }
252
253
254 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
255 {
256     BackendInstanceList::iterator it = m_backend_list.begin();
257     
258     while (it != m_backend_list.end())
259     {
260         if (*it == b)
261             it = m_backend_list.erase(it);
262         else
263             it++;
264     }
265 }
266
267
268
269 yf::SessionShared::BackendInstancePtr 
270 yf::SessionShared::BackendClass::get_backend(
271     const mp::Package &frontend_package)
272 {
273     {
274         boost::mutex::scoped_lock lock(m_mutex_backend_class);
275         
276         BackendInstanceList::const_iterator it = m_backend_list.begin();
277         
278         BackendInstancePtr backend1; // null
279         
280         for (; it != m_backend_list.end(); it++)
281         {
282             if (!(*it)->m_in_use)
283             {
284                 if (!backend1 
285                     || (*it)->m_sequence_this < backend1->m_sequence_this)
286                     backend1 = *it;
287             }
288         }
289         if (backend1)
290         {
291             use_backend(backend1);
292             return backend1;
293         }
294     }
295     return create_backend(frontend_package);
296 }
297
298 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
299 {
300     backend->m_in_use = true;
301     time(&backend->m_time_last_use);
302     backend->m_sequence_this = m_sequence_top++;
303 }
304
305 yf::SessionShared::BackendInstance::~BackendInstance()
306 {
307     delete m_close_package;
308 }
309
310 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
311     const mp::Package &frontend_package)
312 {
313     BackendInstancePtr bp(new BackendInstance);
314     BackendInstancePtr null;
315
316     bp->m_close_package =
317         new mp::Package(bp->m_session, frontend_package.origin());
318     bp->m_close_package->copy_filter(frontend_package);
319
320     Package init_package(bp->m_session, frontend_package.origin());
321
322     init_package.copy_filter(frontend_package);
323
324     yazpp_1::GDU actual_init_request = m_init_request;
325     Z_GDU *init_pdu = actual_init_request.get();
326
327     assert(init_pdu->which == Z_GDU_Z3950);
328     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
329
330     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
331     ODR_MASK_ZERO(req->options);
332
333     ODR_MASK_SET(req->options, Z_Options_search);
334     ODR_MASK_SET(req->options, Z_Options_present);
335     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
336     ODR_MASK_SET(req->options, Z_Options_scan);
337
338     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
339     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
340     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
341
342     init_package.request() = init_pdu;
343
344     init_package.move();
345
346     boost::mutex::scoped_lock lock(m_mutex_backend_class);
347
348     m_named_result_sets = false;
349     Z_GDU *gdu = init_package.response().get();
350     if (!init_package.session().is_closed()
351         && gdu && gdu->which == Z_GDU_Z3950 
352         && gdu->u.z3950->which == Z_APDU_initResponse)
353     {
354         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
355         if (!*res->result)
356             return null;
357         m_init_response = gdu->u.z3950;
358         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
359         {
360             m_named_result_sets = true;
361         }
362     }
363     else
364     {
365         // did not receive an init response or closed
366         return null;
367     }
368     bp->m_in_use = true;
369     time(&bp->m_time_last_use);
370     bp->m_sequence_this = 0;
371     bp->m_result_set_sequence = 0;
372     m_backend_list.push_back(bp);
373
374     return bp;
375 }
376
377
378 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
379                                               int resultset_ttl,
380                                               int resultset_max,
381                                               int session_ttl)
382     : m_named_result_sets(false), m_init_request(init_request),
383       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
384       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
385 {}
386
387 yf::SessionShared::BackendClass::~BackendClass()
388 {}
389
390 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
391                                   FrontendPtr frontend)
392 {
393     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
394
395     frontend->m_is_virtual = true;
396     frontend->m_init_options = *req->options;
397     InitKey k(req);
398     {
399         boost::mutex::scoped_lock lock(m_mutex_backend_map);
400         BackendClassMap::const_iterator it;
401         it = m_backend_map.find(k);
402         if (it == m_backend_map.end())
403         {
404             BackendClassPtr b(new BackendClass(gdu->u.z3950,
405                                                m_resultset_ttl,
406                                                m_resultset_max,
407                                                m_session_ttl));
408             m_backend_map[k] = b;
409             frontend->m_backend_class = b;
410             std::cout << "SessionShared::Rep::init new session " 
411                       << frontend->m_backend_class << "\n";
412         }
413         else
414         {
415             frontend->m_backend_class = it->second;            
416             std::cout << "SessionShared::Rep::init existing session "
417                       << frontend->m_backend_class << "\n";
418         }
419     }
420     BackendClassPtr bc = frontend->m_backend_class;
421     BackendInstancePtr backend = bc->get_backend(package);
422     
423     mp::odr odr;
424     if (!backend)
425     {
426         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
427         *apdu->u.initResponse->result = 0;
428         package.response() = apdu;
429         package.session().close();
430     }
431     else
432     {
433         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
434         yazpp_1::GDU init_response = bc->m_init_response;
435         Z_GDU *response_gdu = init_response.get();
436         mp::util::transfer_referenceId(odr, gdu->u.z3950,
437                                        response_gdu->u.z3950);
438
439         Z_Options *server_options =
440             response_gdu->u.z3950->u.initResponse->options;
441         Z_Options *client_options = &frontend->m_init_options;
442
443         int i;
444         for (i = 0; i<30; i++)
445             if (!ODR_MASK_GET(client_options, i))
446                 ODR_MASK_CLEAR(server_options, i);
447         package.response() = init_response;
448     }
449     if (backend)
450         bc->release_backend(backend);
451 }
452
453 void yf::SessionShared::BackendSet::timestamp()
454 {
455     time(&m_time_last_use);
456 }
457
458 yf::SessionShared::BackendSet::BackendSet(
459     const std::string &result_set_id,
460     const Databases &databases,
461     const yazpp_1::Yaz_Z_Query &query) :
462     m_result_set_id(result_set_id),
463     m_databases(databases), m_result_set_size(0), m_query(query) 
464 {
465     timestamp();
466 }
467
468 bool yf::SessionShared::BackendSet::search(
469     mp::Package &frontend_package,
470     const Z_APDU *frontend_apdu,
471     const BackendInstancePtr bp)
472 {
473     Package search_package(bp->m_session, frontend_package.origin());
474
475     search_package.copy_filter(frontend_package);
476
477     mp::odr odr;
478     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
479     Z_SearchRequest *req = apdu_req->u.searchRequest;
480
481     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
482     req->query = m_query.get_Z_Query();
483
484     req->num_databaseNames = m_databases.size();
485     req->databaseNames = (char**) 
486         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
487     Databases::const_iterator it = m_databases.begin();
488     size_t i = 0;
489     for (; it != m_databases.end(); it++)
490         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
491
492     search_package.request() = apdu_req;
493
494     search_package.move();
495     
496     Z_Records *z_records_diag = 0;
497     Z_GDU *gdu = search_package.response().get();
498     if (!search_package.session().is_closed()
499         && gdu && gdu->which == Z_GDU_Z3950 
500         && gdu->u.z3950->which == Z_APDU_searchResponse)
501     {
502         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
503         if (b_resp->records)
504         {
505             if (b_resp->records->which == Z_Records_NSD 
506                 || b_resp->records->which == Z_Records_multipleNSD)
507                 z_records_diag = b_resp->records;
508         }
509         if (z_records_diag)
510         {
511             if (frontend_apdu->which == Z_APDU_searchRequest)
512             {
513                 Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu, 
514                                                            0, 0);
515                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
516                 f_resp->records = z_records_diag;
517                 frontend_package.response() = f_apdu;
518                 return false;
519             }
520             if (frontend_apdu->which == Z_APDU_presentRequest)
521             {
522                 Z_APDU *f_apdu = odr.create_presentResponse(frontend_apdu, 
523                                                             0, 0);
524                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
525                 f_resp->records = z_records_diag;
526                 frontend_package.response() = f_apdu;
527                 return false;
528             }
529         }
530         m_result_set_size = *b_resp->resultCount;
531         return true;
532     }
533     Z_APDU *f_apdu = 0;
534     if (frontend_apdu->which == Z_APDU_searchRequest)
535         f_apdu = odr.create_searchResponse(
536             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
537     else if (frontend_apdu->which == Z_APDU_presentRequest)
538         f_apdu = odr.create_presentResponse(
539             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
540     else
541         f_apdu = odr.create_close(
542             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
543     frontend_package.response() = f_apdu;
544     return false;
545 }
546
547 void yf::SessionShared::Frontend::override_set(
548     BackendInstancePtr &found_backend,
549     std::string &result_set_id)
550 {
551     BackendClassPtr bc = m_backend_class;
552     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
553     time_t now;
554     time(&now);
555     
556     for (; it != bc->m_backend_list.end(); it++)
557     {
558         if (!(*it)->m_in_use)
559         {
560             BackendSetList::iterator set_it = (*it)->m_sets.begin();
561             for (; set_it != (*it)->m_sets.end(); set_it++)
562             {
563                 if (now >= (*set_it)->m_time_last_use &&
564                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
565                 {
566                     found_backend = *it;
567                     result_set_id = (*set_it)->m_result_set_id;
568                     found_backend->m_sets.erase(set_it);
569                     std::cout << "REUSE TTL SET: " << result_set_id << "\n";
570                     return;
571                 }
572             }
573         }
574     }
575     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
576     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
577     {
578         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
579         {
580             found_backend = *it;
581             if (bc->m_named_result_sets)
582             {
583                 result_set_id = boost::io::str(
584                     boost::format("%1%") % 
585                     found_backend->m_result_set_sequence);
586                 found_backend->m_result_set_sequence++;
587             }
588             else
589                 result_set_id = "default";
590             std::cout << "AVAILABLE SET: " << result_set_id << "\n";
591             return;
592         }
593     }
594 }
595
596 void yf::SessionShared::Frontend::get_set(mp::Package &package,
597                                           const Z_APDU *apdu_req,
598                                           const Databases &databases,
599                                           yazpp_1::Yaz_Z_Query &query,
600                                           BackendInstancePtr &found_backend,
601                                           BackendSetPtr &found_set)
602 {
603     std::string result_set_id;
604     BackendClassPtr bc = m_backend_class;
605     {
606         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
607      
608         // look at each backend and see if we have a similar search
609         BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
610         
611         for (; it != bc->m_backend_list.end(); it++)
612         {
613             if (!(*it)->m_in_use)
614             {
615                 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
616                 for (; set_it != (*it)->m_sets.end(); set_it++)
617                 {
618                     if ((*set_it)->m_databases == databases 
619                         && query.match(&(*set_it)->m_query))
620                     {
621                         found_set = *set_it;
622                         found_backend = *it;
623                         bc->use_backend(found_backend);
624                         found_set->timestamp();
625                         std::cout << "MATCH SET: " << 
626                             found_set->m_result_set_id << "\n";
627                         // found matching set. No need to search again
628                         return;
629                     }
630                 }
631             }
632         }
633         override_set(found_backend, result_set_id);
634         if (found_backend)
635             bc->use_backend(found_backend);
636     }
637     if (!found_backend)
638     {
639         // create a new backend set (and new set)
640         found_backend = bc->create_backend(package);
641
642         if (!found_backend)
643         {
644             Z_APDU *f_apdu = 0;
645             mp::odr odr;
646             if (apdu_req->which == Z_APDU_searchRequest)
647             {
648                 f_apdu = odr.create_searchResponse(
649                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
650             }
651             else if (apdu_req->which == Z_APDU_presentRequest)
652             {
653                 f_apdu = odr.create_presentResponse(
654                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
655             }
656             else
657             {
658                 f_apdu = odr.create_close(
659                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
660             }
661             package.response() = f_apdu;
662             return;
663         }
664         std::cout << "NEW " << found_backend << "\n";
665         
666         if (bc->m_named_result_sets)
667         {
668             result_set_id = boost::io::str(
669                 boost::format("%1%") % found_backend->m_result_set_sequence);
670             found_backend->m_result_set_sequence++;
671         }
672         else
673             result_set_id = "default";
674         std::cout << "NEW SET: " << result_set_id << "\n";
675     }
676     // we must search ...
677     BackendSetPtr new_set(new BackendSet(result_set_id,
678                                          databases, query));
679     if (!new_set->search(package, apdu_req, found_backend))
680     {
681         std::cout << "search error\n";
682         bc->remove_backend(found_backend);
683         return; // search error 
684     }
685     found_set = new_set;
686     found_set->timestamp();
687     found_backend->m_sets.push_back(found_set);
688 }
689
690 void yf::SessionShared::Frontend::search(mp::Package &package,
691                                          Z_APDU *apdu_req)
692 {
693     Z_SearchRequest *req = apdu_req->u.searchRequest;
694     FrontendSets::iterator fset_it = 
695         m_frontend_sets.find(req->resultSetName);
696     if (fset_it != m_frontend_sets.end())
697     {
698         // result set already exist 
699         // if replace indicator is off: we return diagnostic if
700         // result set already exist.
701         if (*req->replaceIndicator == 0)
702         {
703             mp::odr odr;
704             Z_APDU *apdu = 
705                 odr.create_searchResponse(
706                     apdu_req,
707                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
708                     0);
709             package.response() = apdu;
710             
711             return;
712         }
713         m_frontend_sets.erase(fset_it);
714     }
715     
716     yazpp_1::Yaz_Z_Query query;
717     query.set_Z_Query(req->query);
718     Databases databases;
719     int i;
720     for (i = 0; i<req->num_databaseNames; i++)
721         databases.push_back(req->databaseNames[i]);
722
723     BackendSetPtr found_set; // null
724     BackendInstancePtr found_backend; // null
725
726     get_set(package, apdu_req, databases, query, found_backend, found_set);
727     if (!found_set)
728         return;
729
730     mp::odr odr;
731     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
732     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
733     *f_resp->resultCount = found_set->m_result_set_size;
734     package.response() = f_apdu;
735
736     FrontendSetPtr fset(new FrontendSet(databases, query));
737     m_frontend_sets[req->resultSetName] = fset;
738
739     m_backend_class->release_backend(found_backend);
740 }
741
742 void yf::SessionShared::Frontend::present(mp::Package &package,
743                                           Z_APDU *apdu_req)
744 {
745     mp::odr odr;
746     Z_PresentRequest *req = apdu_req->u.presentRequest;
747
748     FrontendSets::iterator fset_it = 
749         m_frontend_sets.find(req->resultSetId);
750
751     if (fset_it == m_frontend_sets.end())
752     {
753         Z_APDU *apdu = 
754             odr.create_presentResponse(
755                 apdu_req,
756                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
757                 req->resultSetId);
758         package.response() = apdu;
759         return;
760     }
761     FrontendSetPtr fset = fset_it->second;
762
763     Databases databases = fset->get_databases();
764     yazpp_1::Yaz_Z_Query query = fset->get_query();
765
766     BackendClassPtr bc = m_backend_class;
767     BackendSetPtr found_set; // null
768     BackendInstancePtr found_backend;
769
770     get_set(package, apdu_req, databases, query, found_backend, found_set);
771     if (!found_set)
772         return;
773
774     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
775     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
776     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
777     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
778     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
779     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
780     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
781     p_req->recordComposition = req->recordComposition;
782
783     Package present_package(found_backend->m_session, package.origin());
784     present_package.copy_filter(package);
785
786     present_package.request() = p_apdu;
787
788     present_package.move();
789
790     Z_GDU *gdu = present_package.response().get();
791     if (!present_package.session().is_closed()
792         && gdu && gdu->which == Z_GDU_Z3950 
793         && gdu->u.z3950->which == Z_APDU_presentResponse)
794     {
795         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
796         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
797         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
798
799         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
800         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
801         f_resp->presentStatus= b_resp->presentStatus;
802         f_resp->records = b_resp->records;
803         f_resp->otherInfo = b_resp->otherInfo;
804         package.response() = f_apdu_res;
805         bc->release_backend(found_backend);
806     }
807     else
808     {
809         bc->remove_backend(found_backend);
810         Z_APDU *f_apdu_res = 
811             odr.create_presentResponse(
812                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
813         package.response() = f_apdu_res;
814     }
815 }
816
817 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
818                                        Z_APDU *apdu_req)
819 {
820     BackendClassPtr bc = m_backend_class;
821     BackendInstancePtr backend = bc->get_backend(frontend_package);
822     if (!backend)
823     {
824         mp::odr odr;
825         Z_APDU *apdu = odr.create_scanResponse(
826             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
827         frontend_package.response() = apdu;
828     }
829     else
830     {
831         Package scan_package(backend->m_session, frontend_package.origin());
832         scan_package.copy_filter(frontend_package);
833         scan_package.request() = apdu_req;
834         scan_package.move();
835         frontend_package.response() = scan_package.response();
836         if (scan_package.session().is_closed())
837         {
838             frontend_package.session().close();
839             bc->remove_backend(backend);
840         }
841         else
842             bc->release_backend(backend);
843     }
844 }
845
846 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
847 {
848 }
849
850 void yf::SessionShared::Worker::operator() (void)
851 {
852     m_p->expire();
853 }
854
855 void yf::SessionShared::BackendClass::expire()
856 {
857     time_t now;
858     time(&now);
859     boost::mutex::scoped_lock lock(m_mutex_backend_class);
860     BackendInstanceList::iterator bit = m_backend_list.begin();
861     while (bit != m_backend_list.end())
862     {
863         std::cout << "expiry ";
864         time_t last_use = (*bit)->m_time_last_use;
865         
866         if ((*bit)->m_in_use)
867         {
868             std::cout << "inuse";
869             bit++;
870         }
871         else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
872             || (now < last_use))
873         {
874             mp::odr odr;
875             (*bit)->m_close_package->response() = odr.create_close(
876                 0, Z_Close_lackOfActivity, 0);
877             (*bit)->m_close_package->session().close();
878             (*bit)->m_close_package->move();
879
880             bit = m_backend_list.erase(bit);
881             std::cout << "erase";
882         }
883         else
884         {
885             std::cout << "keep";
886             bit++;
887         }
888         std::cout << std::endl;
889     }
890 }
891
892 void yf::SessionShared::Rep::expire()
893 {
894     while (true)
895     {
896         boost::xtime xt;
897         boost::xtime_get(&xt, boost::TIME_UTC);
898         xt.sec += 30;
899         boost::thread::sleep(xt);
900         std::cout << "." << std::endl;
901         
902         BackendClassMap::const_iterator b_it = m_backend_map.begin();
903         for (; b_it != m_backend_map.end(); b_it++)
904             b_it->second->expire();
905     }
906 }
907
908 yf::SessionShared::Rep::Rep()
909 {
910     m_resultset_ttl = 30;
911     m_resultset_max = 10;
912     m_session_ttl = 90;
913     yf::SessionShared::Worker w(this);
914     m_thrds.add_thread(new boost::thread(w));
915 }
916
917 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
918 {
919 }
920
921 yf::SessionShared::~SessionShared() {
922 }
923
924
925 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
926 {
927 }
928
929 yf::SessionShared::Frontend::~Frontend()
930 {
931 }
932
933 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
934 {
935     boost::mutex::scoped_lock lock(m_mutex);
936
937     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
938     
939     while(true)
940     {
941         it = m_clients.find(package.session());
942         if (it == m_clients.end())
943             break;
944         
945         if (!it->second->m_in_use)
946         {
947             it->second->m_in_use = true;
948             return it->second;
949         }
950         m_cond_session_ready.wait(lock);
951     }
952     FrontendPtr f(new Frontend(this));
953     m_clients[package.session()] = f;
954     f->m_in_use = true;
955     return f;
956 }
957
958 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
959 {
960     boost::mutex::scoped_lock lock(m_mutex);
961     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
962     
963     it = m_clients.find(package.session());
964     if (it != m_clients.end())
965     {
966         if (package.session().is_closed())
967         {
968             m_clients.erase(it);
969         }
970         else
971         {
972             it->second->m_in_use = false;
973         }
974         m_cond_session_ready.notify_all();
975     }
976 }
977
978
979 void yf::SessionShared::process(mp::Package &package) const
980 {
981     FrontendPtr f = m_p->get_frontend(package);
982
983     Z_GDU *gdu = package.request().get();
984     
985     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
986         Z_APDU_initRequest && !f->m_is_virtual)
987     {
988         m_p->init(package, gdu, f);
989     }
990     else if (!f->m_is_virtual)
991         package.move();
992     else if (gdu && gdu->which == Z_GDU_Z3950)
993     {
994         Z_APDU *apdu = gdu->u.z3950;
995         if (apdu->which == Z_APDU_initRequest)
996         {
997             mp::odr odr;
998             
999             package.response() = odr.create_close(
1000                 apdu,
1001                 Z_Close_protocolError,
1002                 "double init");
1003             
1004             package.session().close();
1005         }
1006         else if (apdu->which == Z_APDU_close)
1007         {
1008             mp::odr odr;
1009             
1010             package.response() = odr.create_close(
1011                 apdu,
1012                 Z_Close_peerAbort, "received close from client");
1013             package.session().close();
1014         }
1015         else if (apdu->which == Z_APDU_searchRequest)
1016         {
1017             f->search(package, apdu);
1018         }
1019         else if (apdu->which == Z_APDU_presentRequest)
1020         {
1021             f->present(package, apdu);
1022         }
1023         else if (apdu->which == Z_APDU_scanRequest)
1024         {
1025             f->scan(package, apdu);
1026         }
1027         else
1028         {
1029             mp::odr odr;
1030             
1031             package.response() = odr.create_close(
1032                 apdu, Z_Close_protocolError,
1033                 "unsupported APDU in filter_session_shared");
1034             
1035             package.session().close();
1036         }
1037     }
1038     m_p->release_frontend(package);
1039 }
1040
1041 void yf::SessionShared::configure(const xmlNode *ptr)
1042 {
1043     for (ptr = ptr->children; ptr; ptr = ptr->next)
1044     {
1045         if (ptr->type != XML_ELEMENT_NODE)
1046             continue;
1047         if (!strcmp((const char *) ptr->name, "resultset"))
1048         {
1049             const struct _xmlAttr *attr;
1050             for (attr = ptr->properties; attr; attr = attr->next)
1051             {
1052                 if (!strcmp((const char *) attr->name, "ttl"))
1053                     m_p->m_resultset_ttl = 
1054                         mp::xml::get_int(attr->children, 30);
1055                 else if (!strcmp((const char *) attr->name, "max"))
1056                 {
1057                     m_p->m_resultset_max = 
1058                         mp::xml::get_int(attr->children, 10);
1059                 }
1060                 else
1061                     throw mp::filter::FilterException(
1062                         "Bad attribute " + std::string((const char *)
1063                                                        attr->name));
1064             }
1065         }
1066         else if (!strcmp((const char *) ptr->name, "session"))
1067         {
1068             const struct _xmlAttr *attr;
1069             for (attr = ptr->properties; attr; attr = attr->next)
1070             {
1071                 if (!strcmp((const char *) attr->name, "ttl"))
1072                     m_p->m_session_ttl = 
1073                         mp::xml::get_int(attr->children, 120);
1074                 else
1075                     throw mp::filter::FilterException(
1076                         "Bad attribute " + std::string((const char *)
1077                                                        attr->name));
1078             }
1079         }
1080         else
1081         {
1082             throw mp::filter::FilterException("Bad element " 
1083                                                + std::string((const char *)
1084                                                              ptr->name));
1085         }
1086     }
1087 }
1088
1089 static mp::filter::Base* filter_creator()
1090 {
1091     return new mp::filter::SessionShared;
1092 }
1093
1094 extern "C" {
1095     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1096         0,
1097         "session_shared",
1098         filter_creator
1099     };
1100 }
1101
1102 /*
1103  * Local variables:
1104  * c-basic-offset: 4
1105  * indent-tabs-mode: nil
1106  * c-file-style: "stroustrup"
1107  * End:
1108  * vim: shiftwidth=4 tabstop=8 expandtab
1109  */