Work on session_shared - scan support, better error handling, Z39.50
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* $Id: filter_session_shared.cpp,v 1.13 2006-06-20 22:27:45 adam Exp $
2    Copyright (c) 2005-2006, Index Data.
3
4    See the LICENSE file for details
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "package.hpp"
11
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
14 #include <boost/thread/thread.hpp>
15 #include <boost/thread/xtime.hpp>
16 #include <boost/shared_ptr.hpp>
17 #include <boost/format.hpp>
18
19 #include "util.hpp"
20 #include "filter_session_shared.hpp"
21
22 #include <yaz/log.h>
23 #include <yaz/zgdu.h>
24 #include <yaz/otherinfo.h>
25 #include <yaz/diagbib1.h>
26 #include <yazpp/z-query.h>
27 #include <map>
28 #include <iostream>
29 #include <time.h>
30
31 namespace mp = metaproxy_1;
32 namespace yf = metaproxy_1::filter;
33
34 namespace metaproxy_1 {
35
36     namespace filter {
37         class SessionShared::InitKey {
38         public:
39             bool operator < (const SessionShared::InitKey &k) const;
40             InitKey(Z_InitRequest *req);
41             InitKey(const InitKey &);
42             ~InitKey();
43         private:
44             InitKey &operator = (const InitKey &k);
45             char *m_idAuthentication_buf;
46             int m_idAuthentication_size;
47             char *m_otherInfo_buf;
48             int m_otherInfo_size;
49             ODR m_odr;
50         };
51         class SessionShared::Worker {
52         public:
53             Worker(SessionShared::Rep *rep);
54             void operator() (void);
55         private:
56             SessionShared::Rep *m_p;
57         };
58         class SessionShared::BackendSet {
59         public:
60             std::string m_result_set_id;
61             Databases m_databases;
62             int m_result_set_size;
63             yazpp_1::Yaz_Z_Query m_query;
64             time_t m_time_last_use;
65             void timestamp();
66             BackendSet(
67                 const std::string &result_set_id,
68                 const Databases &databases,
69                 const yazpp_1::Yaz_Z_Query &query);
70             bool search(
71                 Package &frontend_package,
72                 const Z_APDU *apdu_req,
73                 const BackendInstancePtr bp);
74         };
75         class SessionShared::BackendInstance {
76             friend class Rep;
77             friend class BackendClass;
78             friend class BackendSet;
79         public:
80             mp::Session m_session;
81             BackendSetList m_sets;
82             bool m_in_use;
83             int m_sequence_this;
84             int m_result_set_sequence;
85             time_t m_time_last_use;
86             mp::Package * m_close_package;
87             ~BackendInstance();
88         };
89         class SessionShared::BackendClass : boost::noncopyable {
90             friend class Rep;
91             friend struct Frontend;
92             bool m_named_result_sets;
93             BackendInstanceList m_backend_list;
94             BackendInstancePtr create_backend(const Package &package);
95             void remove_backend(BackendInstancePtr b);
96             BackendInstancePtr get_backend(const Package &package);
97             void use_backend(BackendInstancePtr b);
98             void release_backend(BackendInstancePtr b);
99             void expire();
100             yazpp_1::GDU m_init_request;
101             yazpp_1::GDU m_init_response;
102             boost::mutex m_mutex_backend_class;
103             int m_sequence_top;
104             time_t m_backend_set_ttl;
105             time_t m_backend_expiry_ttl;
106             size_t m_backend_set_max;
107         public:
108             BackendClass(const yazpp_1::GDU &init_request);
109             ~BackendClass();
110         };
111         class SessionShared::FrontendSet {
112             Databases m_databases;
113             yazpp_1::Yaz_Z_Query m_query;
114         public:
115             const Databases &get_databases();
116             const yazpp_1::Yaz_Z_Query &get_query();
117             FrontendSet(
118                 const Databases &databases,
119                 const yazpp_1::Yaz_Z_Query &query);
120             FrontendSet();
121         };
122         struct SessionShared::Frontend {
123             Frontend(Rep *rep);
124             ~Frontend();
125             bool m_is_virtual;
126             bool m_in_use;
127             Z_Options m_init_options;
128             void search(Package &package, Z_APDU *apdu);
129             void present(Package &package, Z_APDU *apdu);
130             void scan(Package &package, Z_APDU *apdu);
131
132             void get_set(mp::Package &package,
133                          const Z_APDU *apdu_req,
134                          const Databases &databases,
135                          yazpp_1::Yaz_Z_Query &query,
136                          BackendInstancePtr &found_backend,
137                          BackendSetPtr &found_set);
138             void override_set(BackendInstancePtr &found_backend,
139                               std::string &result_set_id);
140
141             Rep *m_p;
142             BackendClassPtr m_backend_class;
143             FrontendSets m_frontend_sets;
144         };            
145         class SessionShared::Rep {
146             friend class SessionShared;
147             friend struct Frontend;
148             
149             FrontendPtr get_frontend(Package &package);
150             void release_frontend(Package &package);
151             Rep();
152         public:
153             void expire();
154         private:
155             void init(Package &package, const Z_GDU *gdu,
156                       FrontendPtr frontend);
157             boost::mutex m_mutex;
158             boost::condition m_cond_session_ready;
159             std::map<mp::Session, FrontendPtr> m_clients;
160
161             BackendClassMap m_backend_map;
162             boost::mutex m_mutex_backend_map;
163             boost::thread_group m_thrds;
164         };
165     }
166 }
167
168 yf::SessionShared::FrontendSet::FrontendSet(
169     const Databases &databases,
170     const yazpp_1::Yaz_Z_Query &query)
171     : m_databases(databases), m_query(query)
172 {
173 }
174
175 const yf::SessionShared::Databases & 
176 yf::SessionShared::FrontendSet::get_databases()
177 {
178     return m_databases;
179 }
180
181 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
182 {
183     return m_query;
184 }
185
186 yf::SessionShared::InitKey::InitKey(const InitKey &k)
187 {
188     m_odr = odr_createmem(ODR_ENCODE);
189     
190     m_idAuthentication_size =  k.m_idAuthentication_size;
191     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
192     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
193            m_idAuthentication_size);
194
195     m_otherInfo_size =  k.m_otherInfo_size;
196     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
197     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
198            m_otherInfo_size);
199 }
200
201 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
202 {
203     m_odr = odr_createmem(ODR_ENCODE);
204
205     Z_IdAuthentication *t = req->idAuthentication;
206     z_IdAuthentication(m_odr, &t, 1, 0);
207     m_idAuthentication_buf =
208         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
209
210     Z_OtherInformation *o = req->otherInfo;
211     z_OtherInformation(m_odr, &o, 1, 0);
212     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
213 }
214
215 yf::SessionShared::InitKey::~InitKey()
216 {
217     odr_destroy(m_odr);
218 }
219
220 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
221     const 
222 {
223     int c;
224     c = mp::util::memcmp2(
225         (void*) m_idAuthentication_buf, m_idAuthentication_size,
226         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
227     if (c < 0)
228         return true;
229     else if (c > 0)
230         return false;
231
232     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
233                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
234     if (c < 0)
235         return true;
236     else if (c > 0)
237         return false;
238     return false;
239 }
240
241 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
242 {
243     boost::mutex::scoped_lock lock(m_mutex_backend_class);
244     b->m_in_use = false;
245 }
246
247
248 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
249 {
250     BackendInstanceList::iterator it = m_backend_list.begin();
251     
252     while (it != m_backend_list.end())
253     {
254         if (*it == b)
255             it = m_backend_list.erase(it);
256         else
257             it++;
258     }
259 }
260
261
262
263 yf::SessionShared::BackendInstancePtr 
264 yf::SessionShared::BackendClass::get_backend(
265     const mp::Package &frontend_package)
266 {
267     {
268         boost::mutex::scoped_lock lock(m_mutex_backend_class);
269         
270         BackendInstanceList::const_iterator it = m_backend_list.begin();
271         
272         BackendInstancePtr backend1; // null
273         
274         for (; it != m_backend_list.end(); it++)
275         {
276             if (!(*it)->m_in_use)
277             {
278                 if (!backend1 
279                     || (*it)->m_sequence_this < backend1->m_sequence_this)
280                     backend1 = *it;
281             }
282         }
283         if (backend1)
284         {
285             use_backend(backend1);
286             return backend1;
287         }
288     }
289     return create_backend(frontend_package);
290 }
291
292 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
293 {
294     backend->m_in_use = true;
295     time(&backend->m_time_last_use);
296     backend->m_sequence_this = m_sequence_top++;
297 }
298
299 yf::SessionShared::BackendInstance::~BackendInstance()
300 {
301     delete m_close_package;
302 }
303
304 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
305     const mp::Package &frontend_package)
306 {
307     BackendInstancePtr bp(new BackendInstance);
308     BackendInstancePtr null;
309
310     bp->m_close_package =
311         new mp::Package(bp->m_session, frontend_package.origin());
312     bp->m_close_package->copy_filter(frontend_package);
313
314     Package init_package(bp->m_session, frontend_package.origin());
315
316     init_package.copy_filter(frontend_package);
317
318     yazpp_1::GDU actual_init_request = m_init_request;
319     Z_GDU *init_pdu = actual_init_request.get();
320
321     assert(init_pdu->which == Z_GDU_Z3950);
322     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
323
324     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
325     ODR_MASK_ZERO(req->options);
326
327     ODR_MASK_SET(req->options, Z_Options_search);
328     ODR_MASK_SET(req->options, Z_Options_present);
329     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
330     ODR_MASK_SET(req->options, Z_Options_scan);
331
332     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
333     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
334     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
335
336     init_package.request() = init_pdu;
337
338     init_package.move();
339
340     boost::mutex::scoped_lock lock(m_mutex_backend_class);
341
342     m_named_result_sets = false;
343     Z_GDU *gdu = init_package.response().get();
344     if (!init_package.session().is_closed()
345         && gdu && gdu->which == Z_GDU_Z3950 
346         && gdu->u.z3950->which == Z_APDU_initResponse)
347     {
348         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
349         if (!*res->result)
350             return null;
351         m_init_response = gdu->u.z3950;
352         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
353         {
354             m_named_result_sets = true;
355         }
356     }
357     else
358     {
359         // did not receive an init response or closed
360         return null;
361     }
362     bp->m_in_use = true;
363     time(&bp->m_time_last_use);
364     bp->m_sequence_this = 0;
365     bp->m_result_set_sequence = 0;
366     m_backend_list.push_back(bp);
367
368     return bp;
369 }
370
371
372 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request)
373     : m_named_result_sets(false), m_init_request(init_request),
374       m_sequence_top(0), m_backend_set_ttl(30),
375       m_backend_expiry_ttl(30), m_backend_set_max(10)
376 {}
377
378 yf::SessionShared::BackendClass::~BackendClass()
379 {}
380
381 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
382                                   FrontendPtr frontend)
383 {
384     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
385
386     frontend->m_is_virtual = true;
387     frontend->m_init_options = *req->options;
388     InitKey k(req);
389     {
390         boost::mutex::scoped_lock lock(m_mutex_backend_map);
391         BackendClassMap::const_iterator it;
392         it = m_backend_map.find(k);
393         if (it == m_backend_map.end())
394         {
395             BackendClassPtr b(new BackendClass(gdu->u.z3950));
396             m_backend_map[k] = b;
397             frontend->m_backend_class = b;
398             std::cout << "SessionShared::Rep::init new session " 
399                       << frontend->m_backend_class << "\n";
400         }
401         else
402         {
403             frontend->m_backend_class = it->second;            
404             std::cout << "SessionShared::Rep::init existing session "
405                       << frontend->m_backend_class << "\n";
406         }
407     }
408     BackendClassPtr bc = frontend->m_backend_class;
409     BackendInstancePtr backend = bc->get_backend(package);
410     
411     mp::odr odr;
412     if (!backend)
413     {
414         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
415         *apdu->u.initResponse->result = 0;
416         package.response() = apdu;
417         package.session().close();
418     }
419     else
420     {
421         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
422         yazpp_1::GDU init_response = bc->m_init_response;
423         Z_GDU *response_gdu = init_response.get();
424         mp::util::transfer_referenceId(odr, gdu->u.z3950,
425                                        response_gdu->u.z3950);
426
427         Z_Options *server_options =
428             response_gdu->u.z3950->u.initResponse->options;
429         Z_Options *client_options = &frontend->m_init_options;
430
431         int i;
432         for (i = 0; i<30; i++)
433             if (!ODR_MASK_GET(client_options, i))
434                 ODR_MASK_CLEAR(server_options, i);
435         package.response() = init_response;
436     }
437     if (backend)
438         bc->release_backend(backend);
439 }
440
441 void yf::SessionShared::BackendSet::timestamp()
442 {
443     time(&m_time_last_use);
444 }
445
446 yf::SessionShared::BackendSet::BackendSet(
447     const std::string &result_set_id,
448     const Databases &databases,
449     const yazpp_1::Yaz_Z_Query &query) :
450     m_result_set_id(result_set_id),
451     m_databases(databases), m_result_set_size(0), m_query(query) 
452 {
453     timestamp();
454 }
455
456 bool yf::SessionShared::BackendSet::search(
457     Package &frontend_package,
458     const Z_APDU *frontend_apdu,
459     const BackendInstancePtr bp)
460 {
461     Package search_package(bp->m_session, frontend_package.origin());
462
463     search_package.copy_filter(frontend_package);
464
465     mp::odr odr;
466     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
467     Z_SearchRequest *req = apdu_req->u.searchRequest;
468
469     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
470     req->query = m_query.get_Z_Query();
471
472     req->num_databaseNames = m_databases.size();
473     req->databaseNames = (char**) 
474         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
475     Databases::const_iterator it = m_databases.begin();
476     size_t i = 0;
477     for (; it != m_databases.end(); it++)
478         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
479
480     search_package.request() = apdu_req;
481
482     search_package.move();
483     
484     Z_Records *z_records_diag = 0;
485     Z_GDU *gdu = search_package.response().get();
486     if (!search_package.session().is_closed()
487         && gdu && gdu->which == Z_GDU_Z3950 
488         && gdu->u.z3950->which == Z_APDU_searchResponse)
489     {
490         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
491         if (b_resp->records)
492         {
493             if (b_resp->records->which == Z_Records_NSD 
494                 || b_resp->records->which == Z_Records_multipleNSD)
495                 z_records_diag = b_resp->records;
496         }
497         if (z_records_diag)
498         {
499             if (frontend_apdu->which == Z_APDU_searchRequest)
500             {
501                 Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu, 
502                                                            0, 0);
503                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
504                 f_resp->records = z_records_diag;
505                 frontend_package.response() = f_apdu;
506                 return false;
507             }
508             if (frontend_apdu->which == Z_APDU_presentRequest)
509             {
510                 Z_APDU *f_apdu = odr.create_presentResponse(frontend_apdu, 
511                                                             0, 0);
512                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
513                 f_resp->records = z_records_diag;
514                 frontend_package.response() = f_apdu;
515                 return false;
516             }
517         }
518         m_result_set_size = *b_resp->resultCount;
519         return true;
520     }
521     Z_APDU *f_apdu = 0;
522     if (frontend_apdu->which == Z_APDU_searchRequest)
523         f_apdu = odr.create_searchResponse(
524             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
525     else if (frontend_apdu->which == Z_APDU_presentRequest)
526         f_apdu = odr.create_presentResponse(
527             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
528     else
529         f_apdu = odr.create_close(
530             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
531     frontend_package.response() = f_apdu;
532     return false;
533 }
534
535 void yf::SessionShared::Frontend::override_set(
536     BackendInstancePtr &found_backend,
537     std::string &result_set_id)
538 {
539     BackendClassPtr bc = m_backend_class;
540     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
541     time_t now;
542     time(&now);
543     
544     for (; it != bc->m_backend_list.end(); it++)
545     {
546         if (!(*it)->m_in_use)
547         {
548             BackendSetList::iterator set_it = (*it)->m_sets.begin();
549             for (; set_it != (*it)->m_sets.end(); set_it++)
550             {
551                 if (now >= (*set_it)->m_time_last_use &&
552                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
553                 {
554                     found_backend = *it;
555                     result_set_id = (*set_it)->m_result_set_id;
556                     found_backend->m_sets.erase(set_it);
557                     std::cout << "REUSE TTL SET: " << result_set_id << "\n";
558                     return;
559                 }
560             }
561         }
562     }
563     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
564     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
565     {
566         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
567         {
568             found_backend = *it;
569             if (bc->m_named_result_sets)
570             {
571                 result_set_id = boost::io::str(
572                     boost::format("%1%") % 
573                     found_backend->m_result_set_sequence);
574                 found_backend->m_result_set_sequence++;
575             }
576             else
577                 result_set_id = "default";
578             std::cout << "AVAILABLE SET: " << result_set_id << "\n";
579             return;
580         }
581     }
582 }
583
584 void yf::SessionShared::Frontend::get_set(mp::Package &package,
585                                           const Z_APDU *apdu_req,
586                                           const Databases &databases,
587                                           yazpp_1::Yaz_Z_Query &query,
588                                           BackendInstancePtr &found_backend,
589                                           BackendSetPtr &found_set)
590 {
591     std::string result_set_id;
592     BackendClassPtr bc = m_backend_class;
593     {
594         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
595      
596         // look at each backend and see if we have a similar search
597         BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
598         
599         for (; it != bc->m_backend_list.end(); it++)
600         {
601             if (!(*it)->m_in_use)
602             {
603                 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
604                 for (; set_it != (*it)->m_sets.end(); set_it++)
605                 {
606                     if ((*set_it)->m_databases == databases 
607                         && query.match(&(*set_it)->m_query))
608                     {
609                         found_set = *set_it;
610                         found_backend = *it;
611                         bc->use_backend(found_backend);
612                         found_set->timestamp();
613                         std::cout << "MATCH SET: " << 
614                             found_set->m_result_set_id << "\n";
615                         // found matching set. No need to search again
616                         return;
617                     }
618                 }
619             }
620         }
621         override_set(found_backend, result_set_id);
622         if (found_backend)
623             bc->use_backend(found_backend);
624     }
625     if (!found_backend)
626     {
627         // create a new backend set (and new set)
628         found_backend = bc->create_backend(package);
629
630         if (!found_backend)
631         {
632             Z_APDU *f_apdu = 0;
633             mp::odr odr;
634             if (apdu_req->which == Z_APDU_searchRequest)
635             {
636                 f_apdu = odr.create_searchResponse(
637                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
638             }
639             else if (apdu_req->which == Z_APDU_presentRequest)
640             {
641                 f_apdu = odr.create_presentResponse(
642                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
643             }
644             else
645             {
646                 f_apdu = odr.create_close(
647                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
648             }
649             package.response() = f_apdu;
650             return;
651         }
652         std::cout << "NEW " << found_backend << "\n";
653         
654         if (bc->m_named_result_sets)
655         {
656             result_set_id = boost::io::str(
657                 boost::format("%1%") % found_backend->m_result_set_sequence);
658             found_backend->m_result_set_sequence++;
659         }
660         else
661             result_set_id = "default";
662         std::cout << "NEW SET: " << result_set_id << "\n";
663     }
664     // we must search ...
665     BackendSetPtr new_set(new BackendSet(result_set_id,
666                                          databases, query));
667     if (!new_set->search(package, apdu_req, found_backend))
668     {
669         std::cout << "search error\n";
670         bc->remove_backend(found_backend);
671         return; // search error 
672     }
673     found_set = new_set;
674     found_set->timestamp();
675     found_backend->m_sets.push_back(found_set);
676 }
677
678 void yf::SessionShared::Frontend::search(mp::Package &package,
679                                          Z_APDU *apdu_req)
680 {
681     Z_SearchRequest *req = apdu_req->u.searchRequest;
682     FrontendSets::iterator fset_it = 
683         m_frontend_sets.find(req->resultSetName);
684     if (fset_it != m_frontend_sets.end())
685     {
686         // result set already exist 
687         // if replace indicator is off: we return diagnostic if
688         // result set already exist.
689         if (*req->replaceIndicator == 0)
690         {
691             mp::odr odr;
692             Z_APDU *apdu = 
693                 odr.create_searchResponse(
694                     apdu_req,
695                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
696                     0);
697             package.response() = apdu;
698             
699             return;
700         }
701         m_frontend_sets.erase(fset_it);
702     }
703     
704     yazpp_1::Yaz_Z_Query query;
705     query.set_Z_Query(req->query);
706     Databases databases;
707     int i;
708     for (i = 0; i<req->num_databaseNames; i++)
709         databases.push_back(req->databaseNames[i]);
710
711     BackendSetPtr found_set; // null
712     BackendInstancePtr found_backend; // null
713
714     get_set(package, apdu_req, databases, query, found_backend, found_set);
715     if (!found_set)
716         return;
717
718     mp::odr odr;
719     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
720     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
721     *f_resp->resultCount = found_set->m_result_set_size;
722     package.response() = f_apdu;
723
724     FrontendSetPtr fset(new FrontendSet(databases, query));
725     m_frontend_sets[req->resultSetName] = fset;
726
727     m_backend_class->release_backend(found_backend);
728 }
729
730 void yf::SessionShared::Frontend::present(mp::Package &package,
731                                           Z_APDU *apdu_req)
732 {
733     mp::odr odr;
734     Z_PresentRequest *req = apdu_req->u.presentRequest;
735
736     FrontendSets::iterator fset_it = 
737         m_frontend_sets.find(req->resultSetId);
738
739     if (fset_it == m_frontend_sets.end())
740     {
741         Z_APDU *apdu = 
742             odr.create_presentResponse(
743                 apdu_req,
744                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
745                 req->resultSetId);
746         package.response() = apdu;
747         return;
748     }
749     FrontendSetPtr fset = fset_it->second;
750
751     Databases databases = fset->get_databases();
752     yazpp_1::Yaz_Z_Query query = fset->get_query();
753
754     BackendClassPtr bc = m_backend_class;
755     BackendSetPtr found_set; // null
756     BackendInstancePtr found_backend;
757
758     get_set(package, apdu_req, databases, query, found_backend, found_set);
759     if (!found_set)
760         return;
761
762     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
763     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
764     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
765     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
766     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
767     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
768     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
769     p_req->recordComposition = req->recordComposition;
770
771     Package present_package(found_backend->m_session, package.origin());
772     present_package.copy_filter(package);
773
774     present_package.request() = p_apdu;
775
776     present_package.move();
777
778     Z_GDU *gdu = present_package.response().get();
779     if (!present_package.session().is_closed()
780         && gdu && gdu->which == Z_GDU_Z3950 
781         && gdu->u.z3950->which == Z_APDU_presentResponse)
782     {
783         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
784         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
785         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
786
787         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
788         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
789         f_resp->presentStatus= b_resp->presentStatus;
790         f_resp->records = b_resp->records;
791         f_resp->otherInfo = b_resp->otherInfo;
792         package.response() = f_apdu_res;
793         bc->release_backend(found_backend);
794     }
795     else
796     {
797         bc->remove_backend(found_backend);
798         Z_APDU *f_apdu_res = 
799             odr.create_presentResponse(
800                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
801         package.response() = f_apdu_res;
802     }
803 }
804
805 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
806                                        Z_APDU *apdu_req)
807 {
808     BackendClassPtr bc = m_backend_class;
809     BackendInstancePtr backend = bc->get_backend(frontend_package);
810     if (!backend)
811     {
812         mp::odr odr;
813         Z_APDU *apdu = odr.create_scanResponse(
814             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
815         frontend_package.response() = apdu;
816     }
817     else
818     {
819         Package scan_package(backend->m_session, frontend_package.origin());
820         scan_package.copy_filter(frontend_package);
821         scan_package.request() = apdu_req;
822         scan_package.move();
823         frontend_package.response() = scan_package.response();
824         if (scan_package.session().is_closed())
825         {
826             frontend_package.session().close();
827             bc->remove_backend(backend);
828         }
829         else
830             bc->release_backend(backend);
831     }
832 }
833
834 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
835 {
836 }
837
838 void yf::SessionShared::Worker::operator() (void)
839 {
840     m_p->expire();
841 }
842
843 void yf::SessionShared::BackendClass::expire()
844 {
845     time_t now;
846     time(&now);
847     boost::mutex::scoped_lock lock(m_mutex_backend_class);
848     BackendInstanceList::iterator bit = m_backend_list.begin();
849     while (bit != m_backend_list.end())
850     {
851         std::cout << "expiry ";
852         time_t last_use = (*bit)->m_time_last_use;
853         
854         if ((*bit)->m_in_use)
855         {
856             std::cout << "inuse";
857             bit++;
858         }
859         else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
860             || (now < last_use))
861         {
862             mp::odr odr;
863             (*bit)->m_close_package->response() = odr.create_close(
864                 0, Z_Close_lackOfActivity, 0);
865             (*bit)->m_close_package->session().close();
866             (*bit)->m_close_package->move();
867
868             bit = m_backend_list.erase(bit);
869             std::cout << "erase";
870         }
871         else
872         {
873             std::cout << "keep";
874             bit++;
875         }
876         std::cout << std::endl;
877     }
878 }
879
880 void yf::SessionShared::Rep::expire()
881 {
882     while (true)
883     {
884         boost::xtime xt;
885         boost::xtime_get(&xt, boost::TIME_UTC);
886         xt.sec += 30;
887         boost::thread::sleep(xt);
888         std::cout << "." << std::endl;
889         
890         BackendClassMap::const_iterator b_it = m_backend_map.begin();
891         for (; b_it != m_backend_map.end(); b_it++)
892             b_it->second->expire();
893     }
894 }
895
896 yf::SessionShared::Rep::Rep()
897 {
898     yf::SessionShared::Worker w(this);
899     m_thrds.add_thread(new boost::thread(w));
900 }
901
902 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
903 {
904 }
905
906 yf::SessionShared::~SessionShared() {
907 }
908
909
910 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
911 {
912 }
913
914 yf::SessionShared::Frontend::~Frontend()
915 {
916 }
917
918 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
919 {
920     boost::mutex::scoped_lock lock(m_mutex);
921
922     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
923     
924     while(true)
925     {
926         it = m_clients.find(package.session());
927         if (it == m_clients.end())
928             break;
929         
930         if (!it->second->m_in_use)
931         {
932             it->second->m_in_use = true;
933             return it->second;
934         }
935         m_cond_session_ready.wait(lock);
936     }
937     FrontendPtr f(new Frontend(this));
938     m_clients[package.session()] = f;
939     f->m_in_use = true;
940     return f;
941 }
942
943 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
944 {
945     boost::mutex::scoped_lock lock(m_mutex);
946     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
947     
948     it = m_clients.find(package.session());
949     if (it != m_clients.end())
950     {
951         if (package.session().is_closed())
952         {
953             m_clients.erase(it);
954         }
955         else
956         {
957             it->second->m_in_use = false;
958         }
959         m_cond_session_ready.notify_all();
960     }
961 }
962
963
964 void yf::SessionShared::process(mp::Package &package) const
965 {
966     FrontendPtr f = m_p->get_frontend(package);
967
968     Z_GDU *gdu = package.request().get();
969     
970     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
971         Z_APDU_initRequest && !f->m_is_virtual)
972     {
973         m_p->init(package, gdu, f);
974     }
975     else if (!f->m_is_virtual)
976         package.move();
977     else if (gdu && gdu->which == Z_GDU_Z3950)
978     {
979         Z_APDU *apdu = gdu->u.z3950;
980         if (apdu->which == Z_APDU_initRequest)
981         {
982             mp::odr odr;
983             
984             package.response() = odr.create_close(
985                 apdu,
986                 Z_Close_protocolError,
987                 "double init");
988             
989             package.session().close();
990         }
991         else if (apdu->which == Z_APDU_close)
992         {
993             mp::odr odr;
994             
995             package.response() = odr.create_close(
996                 apdu,
997                 Z_Close_peerAbort, "received close from client");
998             package.session().close();
999         }
1000         else if (apdu->which == Z_APDU_searchRequest)
1001         {
1002             f->search(package, apdu);
1003         }
1004         else if (apdu->which == Z_APDU_presentRequest)
1005         {
1006             f->present(package, apdu);
1007         }
1008         else if (apdu->which == Z_APDU_scanRequest)
1009         {
1010             f->scan(package, apdu);
1011         }
1012         else
1013         {
1014             mp::odr odr;
1015             
1016             package.response() = odr.create_close(
1017                 apdu, Z_Close_protocolError,
1018                 "unsupported APDU in filter_session_shared");
1019             
1020             package.session().close();
1021         }
1022     }
1023     m_p->release_frontend(package);
1024 }
1025
1026 static mp::filter::Base* filter_creator()
1027 {
1028     return new mp::filter::SessionShared;
1029 }
1030
1031 extern "C" {
1032     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1033         0,
1034         "session_shared",
1035         filter_creator
1036     };
1037 }
1038
1039 /*
1040  * Local variables:
1041  * c-basic-offset: 4
1042  * indent-tabs-mode: nil
1043  * c-file-style: "stroustrup"
1044  * End:
1045  * vim: shiftwidth=4 tabstop=8 expandtab
1046  */