session_shared: only relay init for first request
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2012 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include <metaproxy/filter.hpp>
22 #include <metaproxy/package.hpp>
23
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30
31 #include <metaproxy/util.hpp>
32 #include "filter_session_shared.hpp"
33
34 #include <yaz/log.h>
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yazpp/z-query.h>
39 #include <yazpp/record-cache.h>
40 #include <map>
41 #include <iostream>
42 #include <time.h>
43
44 namespace mp = metaproxy_1;
45 namespace yf = metaproxy_1::filter;
46
47 namespace metaproxy_1 {
48
49     namespace filter {
50         // key for session.. We'll only share sessions with same InitKey
51         class SessionShared::InitKey {
52         public:
53             bool operator < (const SessionShared::InitKey &k) const;
54             InitKey(Z_InitRequest *req);
55             InitKey(const InitKey &);
56             ~InitKey();
57         private:
58             char *m_idAuthentication_buf;
59             int m_idAuthentication_size;
60             char *m_otherInfo_buf;
61             int m_otherInfo_size;
62             ODR m_odr;
63         };
64         // worker thread .. for expiry of sessions
65         class SessionShared::Worker {
66         public:
67             Worker(SessionShared::Rep *rep);
68             void operator() (void);
69         private:
70             SessionShared::Rep *m_p;
71         };
72         // backend result set
73         class SessionShared::BackendSet {
74         public:
75             std::string m_result_set_id;
76             Databases m_databases;
77             int m_result_set_size;
78             yazpp_1::Yaz_Z_Query m_query;
79             time_t m_time_last_use;
80             void timestamp();
81             yazpp_1::RecordCache m_record_cache;
82             BackendSet(
83                 const std::string &result_set_id,
84                 const Databases &databases,
85                 const yazpp_1::Yaz_Z_Query &query);
86             bool search(
87                 Package &frontend_package,
88                 Package &search_package,
89                 const Z_APDU *apdu_req,
90                 const BackendInstancePtr bp,
91                 Z_Records **z_records);
92         };
93         // backend connection instance
94         class SessionShared::BackendInstance {
95             friend class Rep;
96             friend class BackendClass;
97             friend class BackendSet;
98         public:
99             mp::Session m_session;
100             BackendSetList m_sets;
101             bool m_in_use;
102             int m_sequence_this;
103             int m_result_set_sequence;
104             time_t m_time_last_use;
105             mp::Package * m_close_package;
106             ~BackendInstance();
107             void timestamp();
108         };
109         // backends of some class (all with same InitKey)
110         class SessionShared::BackendClass : boost::noncopyable {
111             friend class Rep;
112             friend struct Frontend;
113             bool m_named_result_sets;
114             BackendInstanceList m_backend_list;
115             BackendInstancePtr create_backend(const Package &package);
116             void remove_backend(BackendInstancePtr b);
117             BackendInstancePtr get_backend(const Package &package);
118             void use_backend(BackendInstancePtr b);
119             void release_backend(BackendInstancePtr b);
120             void expire_class();
121             yazpp_1::GDU m_init_request;
122             yazpp_1::GDU m_init_response;
123             boost::mutex m_mutex_backend_class;
124             int m_sequence_top;
125             time_t m_backend_set_ttl;
126             time_t m_backend_expiry_ttl;
127             size_t m_backend_set_max;
128         public:
129             BackendClass(const yazpp_1::GDU &init_request,
130                          int resultset_ttl,
131                          int resultset_max,
132                          int session_ttl);
133             ~BackendClass();
134         };
135         // frontend result set
136         class SessionShared::FrontendSet {
137             Databases m_databases;
138             yazpp_1::Yaz_Z_Query m_query;
139         public:
140             const Databases &get_databases();
141             const yazpp_1::Yaz_Z_Query &get_query();
142             FrontendSet(
143                 const Databases &databases,
144                 const yazpp_1::Yaz_Z_Query &query);
145             FrontendSet();
146         };
147         // frontend session
148         struct SessionShared::Frontend {
149             Frontend(Rep *rep);
150             ~Frontend();
151             bool m_is_virtual;
152             bool m_in_use;
153             Z_Options m_init_options;
154             void search(Package &package, Z_APDU *apdu);
155             void present(Package &package, Z_APDU *apdu);
156             void scan(Package &package, Z_APDU *apdu);
157
158             void get_set(mp::Package &package,
159                          const Z_APDU *apdu_req,
160                          const Databases &databases,
161                          yazpp_1::Yaz_Z_Query &query,
162                          BackendInstancePtr &found_backend,
163                          BackendSetPtr &found_set);
164             void override_set(BackendInstancePtr &found_backend,
165                               std::string &result_set_id,
166                               const Databases &databases,
167                               bool out_of_sessions);
168
169             Rep *m_p;
170             BackendClassPtr m_backend_class;
171             FrontendSets m_frontend_sets;
172         };            
173         // representation
174         class SessionShared::Rep {
175             friend class SessionShared;
176             friend struct Frontend;
177             
178             FrontendPtr get_frontend(Package &package);
179             void release_frontend(Package &package);
180             Rep();
181         public:
182             void expire();
183         private:
184             void init(Package &package, const Z_GDU *gdu,
185                       FrontendPtr frontend);
186             void start();
187             boost::mutex m_mutex;
188             boost::condition m_cond_session_ready;
189             std::map<mp::Session, FrontendPtr> m_clients;
190
191             BackendClassMap m_backend_map;
192             boost::mutex m_mutex_backend_map;
193             boost::thread_group m_thrds;
194             int m_resultset_ttl;
195             int m_resultset_max;
196             int m_session_ttl;
197             bool m_optimize_search;
198             int m_session_max;
199         };
200     }
201 }
202
203 yf::SessionShared::FrontendSet::FrontendSet(
204     const Databases &databases,
205     const yazpp_1::Yaz_Z_Query &query)
206     : m_databases(databases), m_query(query)
207 {
208 }
209
210 const yf::SessionShared::Databases & 
211 yf::SessionShared::FrontendSet::get_databases()
212 {
213     return m_databases;
214 }
215
216 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
217 {
218     return m_query;
219 }
220
221 yf::SessionShared::InitKey::InitKey(const InitKey &k)
222 {
223     m_odr = odr_createmem(ODR_ENCODE);
224     
225     m_idAuthentication_size =  k.m_idAuthentication_size;
226     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
227     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
228            m_idAuthentication_size);
229
230     m_otherInfo_size =  k.m_otherInfo_size;
231     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
232     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
233            m_otherInfo_size);
234 }
235
236 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
237 {
238     m_odr = odr_createmem(ODR_ENCODE);
239
240     Z_IdAuthentication *t = req->idAuthentication;
241     z_IdAuthentication(m_odr, &t, 1, 0);
242     m_idAuthentication_buf =
243         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
244
245     Z_OtherInformation *o = req->otherInfo;
246     z_OtherInformation(m_odr, &o, 1, 0);
247     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
248 }
249
250 yf::SessionShared::InitKey::~InitKey()
251 {
252     odr_destroy(m_odr);
253 }
254
255 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
256     const 
257 {
258     int c;
259     c = mp::util::memcmp2(
260         (void*) m_idAuthentication_buf, m_idAuthentication_size,
261         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
262     if (c < 0)
263         return true;
264     else if (c > 0)
265         return false;
266
267     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
268                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
269     if (c < 0)
270         return true;
271     else if (c > 0)
272         return false;
273     return false;
274 }
275
276 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
277 {
278     boost::mutex::scoped_lock lock(m_mutex_backend_class);
279     b->m_in_use = false;
280 }
281
282
283 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
284 {
285     BackendInstanceList::iterator it = m_backend_list.begin();
286     
287     while (it != m_backend_list.end())
288     {
289         if (*it == b)
290         {
291              mp::odr odr;
292             (*it)->m_close_package->response() = odr.create_close(
293                 0, Z_Close_lackOfActivity, 0);
294             (*it)->m_close_package->session().close();
295             (*it)->m_close_package->move();
296             
297             it = m_backend_list.erase(it);
298         }
299         else
300             it++;
301     }
302 }
303
304
305
306 yf::SessionShared::BackendInstancePtr 
307 yf::SessionShared::BackendClass::get_backend(
308     const mp::Package &frontend_package)
309 {
310     {
311         boost::mutex::scoped_lock lock(m_mutex_backend_class);
312         
313         BackendInstanceList::const_iterator it = m_backend_list.begin();
314         
315         BackendInstancePtr backend1; // null
316         
317         for (; it != m_backend_list.end(); it++)
318         {
319             if (!(*it)->m_in_use)
320             {
321                 if (!backend1 
322                     || (*it)->m_sequence_this < backend1->m_sequence_this)
323                     backend1 = *it;
324             }
325         }
326         if (backend1)
327         {
328             use_backend(backend1);
329             return backend1;
330         }
331     }
332     return create_backend(frontend_package);
333 }
334
335 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
336 {
337     backend->m_in_use = true;
338     backend->m_sequence_this = m_sequence_top++;
339 }
340
341 void yf::SessionShared::BackendInstance::timestamp()
342 {
343     assert(m_in_use);
344     time(&m_time_last_use);
345 }
346
347 yf::SessionShared::BackendInstance::~BackendInstance()
348 {
349     delete m_close_package;
350 }
351
352 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
353     const mp::Package &frontend_package)
354 {
355     BackendInstancePtr bp(new BackendInstance);
356     BackendInstancePtr null;
357
358     bp->m_close_package =
359         new mp::Package(bp->m_session, frontend_package.origin());
360     bp->m_close_package->copy_filter(frontend_package);
361
362     Package init_package(bp->m_session, frontend_package.origin());
363
364     init_package.copy_filter(frontend_package);
365
366     yazpp_1::GDU actual_init_request = m_init_request;
367     Z_GDU *init_pdu = actual_init_request.get();
368
369     assert(init_pdu->which == Z_GDU_Z3950);
370     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
371
372     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
373     ODR_MASK_ZERO(req->options);
374
375     ODR_MASK_SET(req->options, Z_Options_search);
376     ODR_MASK_SET(req->options, Z_Options_present);
377     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
378     ODR_MASK_SET(req->options, Z_Options_scan);
379
380     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
381     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
382     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
383
384     init_package.request() = init_pdu;
385
386     init_package.move();
387
388     boost::mutex::scoped_lock lock(m_mutex_backend_class);
389
390     m_named_result_sets = false;
391     Z_GDU *gdu = init_package.response().get();
392     if (init_package.session().is_closed())
393     {
394         /* already closed. We don't know why */
395         return null;
396     }
397     else if (gdu && gdu->which == Z_GDU_Z3950 
398              && gdu->u.z3950->which == Z_APDU_initResponse
399              && *gdu->u.z3950->u.initResponse->result)
400     {
401         /* successful init response */
402         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
403         m_init_response = gdu->u.z3950;
404         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
405         {
406             m_named_result_sets = true;
407         }
408     }
409     else
410     {
411         /* not init or init rejected */
412         init_package.copy_filter(frontend_package);
413         init_package.session().close();
414         init_package.move();
415         return null;
416     }
417     bp->m_in_use = true;
418     time(&bp->m_time_last_use);
419     bp->m_sequence_this = 0;
420     bp->m_result_set_sequence = 0;
421     m_backend_list.push_back(bp);
422
423     return bp;
424 }
425
426
427 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
428                                               int resultset_ttl,
429                                               int resultset_max,
430                                               int session_ttl)
431     : m_named_result_sets(false), m_init_request(init_request),
432       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
433       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
434 {}
435
436 yf::SessionShared::BackendClass::~BackendClass()
437 {}
438
439 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
440                                   FrontendPtr frontend)
441 {
442     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
443
444     frontend->m_is_virtual = true;
445     frontend->m_init_options = *req->options;
446     InitKey k(req);
447     {
448         boost::mutex::scoped_lock lock(m_mutex_backend_map);
449         BackendClassMap::const_iterator it;
450         it = m_backend_map.find(k);
451         if (it == m_backend_map.end())
452         {
453             BackendClassPtr b(new BackendClass(gdu->u.z3950,
454                                                m_resultset_ttl,
455                                                m_resultset_max,
456                                                m_session_ttl));
457             m_backend_map[k] = b;
458             frontend->m_backend_class = b;
459         }
460         else
461         {
462             frontend->m_backend_class = it->second;            
463         }
464     }
465     BackendClassPtr bc = frontend->m_backend_class;
466     BackendInstancePtr backend;
467     mp::odr odr;
468
469     // we only need to get init response from "first" target in
470     // backend class - the assumption being that init response is
471     // same for all
472     if (bc->m_init_response.get() == 0)
473     {
474         backend = bc->get_backend(package);
475     }
476     {
477         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
478         if (bc->m_init_response.get() == 0)
479         {
480             Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
481             *apdu->u.initResponse->result = 0;
482             package.response() = apdu;
483             package.session().close();
484         }
485         else
486         {
487             yazpp_1::GDU init_response = bc->m_init_response;
488             Z_GDU *response_gdu = init_response.get();
489             mp::util::transfer_referenceId(odr, gdu->u.z3950,
490                                            response_gdu->u.z3950);
491             
492             Z_Options *server_options =
493                 response_gdu->u.z3950->u.initResponse->options;
494             Z_Options *client_options = &frontend->m_init_options;
495             
496             int i;
497             for (i = 0; i < 30; i++)
498                 if (!ODR_MASK_GET(client_options, i))
499                     ODR_MASK_CLEAR(server_options, i);
500             package.response() = init_response;
501         }
502     }
503     if (backend)
504         bc->release_backend(backend);
505 }
506
507 void yf::SessionShared::BackendSet::timestamp()
508 {
509     time(&m_time_last_use);
510 }
511
512 yf::SessionShared::BackendSet::BackendSet(
513     const std::string &result_set_id,
514     const Databases &databases,
515     const yazpp_1::Yaz_Z_Query &query) :
516     m_result_set_id(result_set_id),
517     m_databases(databases), m_result_set_size(0), m_query(query) 
518 {
519     timestamp();
520 }
521
522 static int get_diagnostic(Z_DefaultDiagFormat *r)
523 {
524     return *r->condition;
525 }
526
527 bool yf::SessionShared::BackendSet::search(
528     mp::Package &frontend_package,
529     mp::Package &search_package,
530     const Z_APDU *frontend_apdu,
531     const BackendInstancePtr bp,
532     Z_Records **z_records)
533 {
534     mp::odr odr;
535     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
536     Z_SearchRequest *req = apdu_req->u.searchRequest;
537
538     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
539     req->query = m_query.get_Z_Query();
540
541     req->num_databaseNames = m_databases.size();
542     req->databaseNames = (char**) 
543         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
544     Databases::const_iterator it = m_databases.begin();
545     size_t i = 0;
546     for (; it != m_databases.end(); it++)
547         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
548
549     if (frontend_apdu->which == Z_APDU_searchRequest)
550         req->preferredRecordSyntax =
551             frontend_apdu->u.searchRequest->preferredRecordSyntax;
552
553     search_package.request() = apdu_req;
554
555     search_package.move();
556
557     Z_GDU *gdu = search_package.response().get();
558     if (!search_package.session().is_closed()
559         && gdu && gdu->which == Z_GDU_Z3950 
560         && gdu->u.z3950->which == Z_APDU_searchResponse)
561     {
562         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
563         *z_records = b_resp->records;
564         m_result_set_size = *b_resp->resultCount;
565         return true;
566     }
567     Z_APDU *f_apdu = 0;
568     if (frontend_apdu->which == Z_APDU_searchRequest)
569         f_apdu = odr.create_searchResponse(
570             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
571     else if (frontend_apdu->which == Z_APDU_presentRequest)
572         f_apdu = odr.create_presentResponse(
573             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
574     else
575         f_apdu = odr.create_close(
576             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
577     frontend_package.response() = f_apdu;
578     return false;
579 }
580
581 void yf::SessionShared::Frontend::override_set(
582     BackendInstancePtr &found_backend,
583     std::string &result_set_id,
584     const Databases &databases,
585     bool out_of_sessions)
586 {
587     BackendClassPtr bc = m_backend_class;
588     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
589     time_t now;
590     time(&now);
591
592     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
593     for (; it != bc->m_backend_list.end(); it++)
594     {
595         if (!(*it)->m_in_use)
596         {
597             BackendSetList::iterator set_it = (*it)->m_sets.begin();
598             for (; set_it != (*it)->m_sets.end(); set_it++)
599             {
600                 if ((max_sets > 1 || (*set_it)->m_databases == databases)
601                     &&
602                     (out_of_sessions ||
603                      now < (*set_it)->m_time_last_use ||
604                      now - (*set_it)->m_time_last_use >= bc->m_backend_set_ttl))
605                 {
606                     found_backend = *it;
607                     result_set_id = (*set_it)->m_result_set_id;
608                     found_backend->m_sets.erase(set_it);
609                     return;
610                 }
611             }
612         }
613     }
614     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
615     {
616         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
617         {
618             found_backend = *it;
619             if (bc->m_named_result_sets)
620             {
621                 result_set_id = boost::io::str(
622                     boost::format("%1%") % 
623                     found_backend->m_result_set_sequence);
624                 found_backend->m_result_set_sequence++;
625             }
626             else
627                 result_set_id = "default";
628             return;
629         }
630     }
631 }
632
633 void yf::SessionShared::Frontend::get_set(mp::Package &package,
634                                           const Z_APDU *apdu_req,
635                                           const Databases &databases,
636                                           yazpp_1::Yaz_Z_Query &query,
637                                           BackendInstancePtr &found_backend,
638                                           BackendSetPtr &found_set)
639 {
640     bool session_restarted = false;
641
642 restart:
643     std::string result_set_id;
644     bool out_of_sessions = false;
645     BackendClassPtr bc = m_backend_class;
646     {
647         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
648
649         if ((int) bc->m_backend_list.size() >= m_p->m_session_max)
650             out_of_sessions = true;
651         
652         if (m_p->m_optimize_search)
653         {
654             // look at each backend and see if we have a similar search
655             BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
656             for (; it != bc->m_backend_list.end(); it++)
657             {
658                 if (!(*it)->m_in_use)
659                 {
660                     BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
661                     for (; set_it != (*it)->m_sets.end(); set_it++)
662                     {
663                         if ((*set_it)->m_databases == databases
664                             && query.match(&(*set_it)->m_query))
665                         {
666                             found_set = *set_it;
667                             found_backend = *it;
668                             bc->use_backend(found_backend);
669                             found_set->timestamp();
670                             // found matching set. No need to search again
671                             return;
672                         }
673                     }
674                 }
675             }
676         }
677         override_set(found_backend, result_set_id, databases, out_of_sessions);
678         if (found_backend)
679             bc->use_backend(found_backend);
680     }
681     if (!found_backend)
682     {
683         // create a new backend set (and new set) if we're not out of sessions
684         if (!out_of_sessions)
685             found_backend = bc->create_backend(package);
686
687         if (!found_backend)
688         {
689             Z_APDU *f_apdu = 0;
690             mp::odr odr;
691             const char *addinfo = 0;
692             
693             if (out_of_sessions)
694                 addinfo = "session_shared: all sessions in use";
695             if (apdu_req->which == Z_APDU_searchRequest)
696             {
697                 f_apdu = odr.create_searchResponse(
698                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
699             }
700             else if (apdu_req->which == Z_APDU_presentRequest)
701             {
702                 f_apdu = odr.create_presentResponse(
703                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
704             }
705             else
706             {
707                 f_apdu = odr.create_close(
708                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
709             }
710             package.response() = f_apdu;
711             return;
712         }
713         if (bc->m_named_result_sets)
714         {
715             result_set_id = boost::io::str(
716                 boost::format("%1%") % found_backend->m_result_set_sequence);
717             found_backend->m_result_set_sequence++;
718         }
719         else
720             result_set_id = "default";
721     }
722     found_backend->timestamp();
723
724     // we must search ...
725     BackendSetPtr new_set(new BackendSet(result_set_id,
726                                          databases, query));
727     Z_Records *z_records = 0;
728
729     Package search_package(found_backend->m_session, package.origin());
730     search_package.copy_filter(package);
731
732     if (!new_set->search(package, search_package,
733                          apdu_req, found_backend, &z_records))
734     {
735         bc->remove_backend(found_backend);
736         return; // search error 
737     }
738
739     if (z_records)
740     {
741         int condition = 0;
742         if (z_records->which == Z_Records_NSD)
743         {
744             condition =
745                 get_diagnostic(z_records->u.nonSurrogateDiagnostic);
746         }
747         else if (z_records->which == Z_Records_multipleNSD)
748         {
749             if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1
750                 && 
751                 
752                 z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which ==
753                 Z_DiagRec_defaultFormat)
754             {
755                 condition = get_diagnostic(
756                     z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat);
757                 
758             }
759         }
760         if (!session_restarted &&
761             condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR)
762         {
763             bc->remove_backend(found_backend);
764             session_restarted = true;
765             found_backend.reset();
766             goto restart;
767
768         }
769
770         if (condition)
771         {
772             mp::odr odr;
773             if (apdu_req->which == Z_APDU_searchRequest)
774             {
775                 Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 
776                                                            0, 0);
777                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
778                 *f_resp->searchStatus = Z_SearchResponse_none;
779                 f_resp->records = z_records;
780                 package.response() = f_apdu;
781             }
782             if (apdu_req->which == Z_APDU_presentRequest)
783             {
784                 Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 
785                                                             0, 0);
786                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
787                 f_resp->records = z_records;
788                 package.response() = f_apdu;
789             }
790             bc->release_backend(found_backend);
791             return; // search error 
792         }
793     }
794     if (!session_restarted && new_set->m_result_set_size < 0)
795     {
796         bc->remove_backend(found_backend);
797         session_restarted = true;
798         found_backend.reset();
799         goto restart;
800     }
801
802     found_set = new_set;
803     found_set->timestamp();
804     found_backend->m_sets.push_back(found_set);
805 }
806
807 void yf::SessionShared::Frontend::search(mp::Package &package,
808                                          Z_APDU *apdu_req)
809 {
810     Z_SearchRequest *req = apdu_req->u.searchRequest;
811     FrontendSets::iterator fset_it = 
812         m_frontend_sets.find(req->resultSetName);
813     if (fset_it != m_frontend_sets.end())
814     {
815         // result set already exist 
816         // if replace indicator is off: we return diagnostic if
817         // result set already exist.
818         if (*req->replaceIndicator == 0)
819         {
820             mp::odr odr;
821             Z_APDU *apdu = 
822                 odr.create_searchResponse(
823                     apdu_req,
824                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
825                     0);
826             package.response() = apdu;
827             
828             return;
829         }
830         m_frontend_sets.erase(fset_it);
831     }
832     
833     yazpp_1::Yaz_Z_Query query;
834     query.set_Z_Query(req->query);
835     Databases databases;
836     int i;
837     for (i = 0; i < req->num_databaseNames; i++)
838         databases.push_back(req->databaseNames[i]);
839
840     BackendSetPtr found_set; // null
841     BackendInstancePtr found_backend; // null
842
843     get_set(package, apdu_req, databases, query, found_backend, found_set);
844     if (!found_set)
845         return;
846
847     mp::odr odr;
848     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
849     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
850     *f_resp->resultCount = found_set->m_result_set_size;
851     package.response() = f_apdu;
852
853     FrontendSetPtr fset(new FrontendSet(databases, query));
854     m_frontend_sets[req->resultSetName] = fset;
855
856     m_backend_class->release_backend(found_backend);
857 }
858
859 void yf::SessionShared::Frontend::present(mp::Package &package,
860                                           Z_APDU *apdu_req)
861 {
862     mp::odr odr;
863     Z_PresentRequest *req = apdu_req->u.presentRequest;
864
865     FrontendSets::iterator fset_it = 
866         m_frontend_sets.find(req->resultSetId);
867
868     if (fset_it == m_frontend_sets.end())
869     {
870         Z_APDU *apdu = 
871             odr.create_presentResponse(
872                 apdu_req,
873                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
874                 req->resultSetId);
875         package.response() = apdu;
876         return;
877     }
878     FrontendSetPtr fset = fset_it->second;
879
880     Databases databases = fset->get_databases();
881     yazpp_1::Yaz_Z_Query query = fset->get_query();
882
883     BackendClassPtr bc = m_backend_class;
884     BackendSetPtr found_set; // null
885     BackendInstancePtr found_backend;
886
887     get_set(package, apdu_req, databases, query, found_backend, found_set);
888     if (!found_set)
889         return;
890
891     Z_NamePlusRecordList *npr_res = 0;
892     if (found_set->m_record_cache.lookup(odr, &npr_res, 
893                                          *req->resultSetStartPoint,
894                                          *req->numberOfRecordsRequested,
895                                          req->preferredRecordSyntax,
896                                          req->recordComposition))
897     {
898         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
899         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
900
901         yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF 
902                 " records in cache %p",
903                 *req->resultSetStartPoint,                      
904                 *req->numberOfRecordsRequested,
905                 &found_set->m_record_cache);        
906
907         *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested;
908         *f_resp->nextResultSetPosition = 
909             *req->resultSetStartPoint + *req->numberOfRecordsRequested;
910         // f_resp->presentStatus assumed OK.
911         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
912         f_resp->records->which = Z_Records_DBOSD;
913         f_resp->records->u.databaseOrSurDiagnostics = npr_res;
914         package.response() = f_apdu_res;
915         bc->release_backend(found_backend);
916         return;
917     }
918
919     found_backend->timestamp();
920                               
921     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
922     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
923     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
924     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
925     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
926     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
927     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
928     p_req->recordComposition = req->recordComposition;
929
930     Package present_package(found_backend->m_session, package.origin());
931     present_package.copy_filter(package);
932
933     present_package.request() = p_apdu;
934
935     present_package.move();
936
937     Z_GDU *gdu = present_package.response().get();
938     if (!present_package.session().is_closed()
939         && gdu && gdu->which == Z_GDU_Z3950 
940         && gdu->u.z3950->which == Z_APDU_presentResponse)
941     {
942         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
943         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
944         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
945
946         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
947         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
948         f_resp->presentStatus= b_resp->presentStatus;
949         f_resp->records = b_resp->records;
950         f_resp->otherInfo = b_resp->otherInfo;
951         package.response() = f_apdu_res;
952
953         if (b_resp->records && b_resp->records->which ==  Z_Records_DBOSD)
954         {
955             yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF
956                     " records to cache %p",
957                     *req->resultSetStartPoint,                      
958                     *f_resp->numberOfRecordsReturned,
959                     &found_set->m_record_cache);        
960             found_set->m_record_cache.add(
961                 odr,
962                 b_resp->records->u.databaseOrSurDiagnostics,
963                 *req->resultSetStartPoint,                      
964                 *f_resp->numberOfRecordsReturned);
965         }
966         bc->release_backend(found_backend);
967     }
968     else
969     {
970         bc->remove_backend(found_backend);
971         Z_APDU *f_apdu_res = 
972             odr.create_presentResponse(
973                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
974         package.response() = f_apdu_res;
975     }
976 }
977
978 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
979                                        Z_APDU *apdu_req)
980 {
981     BackendClassPtr bc = m_backend_class;
982     BackendInstancePtr backend = bc->get_backend(frontend_package);
983     if (!backend)
984     {
985         mp::odr odr;
986         Z_APDU *apdu = odr.create_scanResponse(
987             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
988         frontend_package.response() = apdu;
989     }
990     else
991     {
992         Package scan_package(backend->m_session, frontend_package.origin());
993         backend->timestamp();
994         scan_package.copy_filter(frontend_package);
995         scan_package.request() = apdu_req;
996         scan_package.move();
997         frontend_package.response() = scan_package.response();
998         if (scan_package.session().is_closed())
999         {
1000             frontend_package.session().close();
1001             bc->remove_backend(backend);
1002         }
1003         else
1004             bc->release_backend(backend);
1005     }
1006 }
1007
1008 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
1009 {
1010 }
1011
1012 void yf::SessionShared::Worker::operator() (void)
1013 {
1014     m_p->expire();
1015 }
1016
1017 void yf::SessionShared::BackendClass::expire_class()
1018 {
1019     time_t now;
1020     time(&now);
1021     boost::mutex::scoped_lock lock(m_mutex_backend_class);
1022     BackendInstanceList::iterator bit = m_backend_list.begin();
1023     while (bit != m_backend_list.end())
1024     {
1025         time_t last_use = (*bit)->m_time_last_use;
1026         
1027         if ((*bit)->m_in_use)
1028         {
1029             bit++;
1030         }
1031         else if (now < last_use || now - last_use > m_backend_expiry_ttl)
1032         {
1033             mp::odr odr;
1034             (*bit)->m_close_package->response() = odr.create_close(
1035                 0, Z_Close_lackOfActivity, 0);
1036             (*bit)->m_close_package->session().close();
1037             (*bit)->m_close_package->move();
1038
1039             bit = m_backend_list.erase(bit);
1040         }
1041         else
1042         {
1043             bit++;
1044         }
1045     }
1046 }
1047
1048 void yf::SessionShared::Rep::expire()
1049 {
1050     while (true)
1051     {
1052         boost::xtime xt;
1053         boost::xtime_get(&xt, boost::TIME_UTC);
1054         xt.sec += m_session_ttl / 3;
1055         boost::thread::sleep(xt);
1056         
1057         BackendClassMap::const_iterator b_it = m_backend_map.begin();
1058         for (; b_it != m_backend_map.end(); b_it++)
1059             b_it->second->expire_class();
1060     }
1061 }
1062
1063 yf::SessionShared::Rep::Rep()
1064 {
1065     m_resultset_ttl = 30;
1066     m_resultset_max = 10;
1067     m_session_ttl = 90;
1068     m_optimize_search = true;
1069     m_session_max = 100;
1070 }
1071
1072 void yf::SessionShared::Rep::start()
1073 {
1074     yf::SessionShared::Worker w(this);
1075     m_thrds.add_thread(new boost::thread(w));
1076 }
1077
1078 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
1079 {
1080 }
1081
1082 yf::SessionShared::~SessionShared() {
1083 }
1084
1085 void yf::SessionShared::start() const
1086 {
1087     m_p->start();
1088 }
1089
1090 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
1091 {
1092 }
1093
1094 yf::SessionShared::Frontend::~Frontend()
1095 {
1096 }
1097
1098 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
1099 {
1100     boost::mutex::scoped_lock lock(m_mutex);
1101
1102     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1103     
1104     while(true)
1105     {
1106         it = m_clients.find(package.session());
1107         if (it == m_clients.end())
1108             break;
1109         
1110         if (!it->second->m_in_use)
1111         {
1112             it->second->m_in_use = true;
1113             return it->second;
1114         }
1115         m_cond_session_ready.wait(lock);
1116     }
1117     FrontendPtr f(new Frontend(this));
1118     m_clients[package.session()] = f;
1119     f->m_in_use = true;
1120     return f;
1121 }
1122
1123 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
1124 {
1125     boost::mutex::scoped_lock lock(m_mutex);
1126     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1127     
1128     it = m_clients.find(package.session());
1129     if (it != m_clients.end())
1130     {
1131         if (package.session().is_closed())
1132         {
1133             m_clients.erase(it);
1134         }
1135         else
1136         {
1137             it->second->m_in_use = false;
1138         }
1139         m_cond_session_ready.notify_all();
1140     }
1141 }
1142
1143
1144 void yf::SessionShared::process(mp::Package &package) const
1145 {
1146     FrontendPtr f = m_p->get_frontend(package);
1147
1148     Z_GDU *gdu = package.request().get();
1149     
1150     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1151         Z_APDU_initRequest && !f->m_is_virtual)
1152     {
1153         m_p->init(package, gdu, f);
1154     }
1155     else if (!f->m_is_virtual)
1156         package.move();
1157     else if (gdu && gdu->which == Z_GDU_Z3950)
1158     {
1159         Z_APDU *apdu = gdu->u.z3950;
1160         if (apdu->which == Z_APDU_initRequest)
1161         {
1162             mp::odr odr;
1163             
1164             package.response() = odr.create_close(
1165                 apdu,
1166                 Z_Close_protocolError,
1167                 "double init");
1168             
1169             package.session().close();
1170         }
1171         else if (apdu->which == Z_APDU_close)
1172         {
1173             mp::odr odr;
1174             
1175             package.response() = odr.create_close(
1176                 apdu,
1177                 Z_Close_peerAbort, "received close from client");
1178             package.session().close();
1179         }
1180         else if (apdu->which == Z_APDU_searchRequest)
1181         {
1182             f->search(package, apdu);
1183         }
1184         else if (apdu->which == Z_APDU_presentRequest)
1185         {
1186             f->present(package, apdu);
1187         }
1188         else if (apdu->which == Z_APDU_scanRequest)
1189         {
1190             f->scan(package, apdu);
1191         }
1192         else
1193         {
1194             mp::odr odr;
1195             
1196             package.response() = odr.create_close(
1197                 apdu, Z_Close_protocolError,
1198                 "unsupported APDU in filter_session_shared");
1199             
1200             package.session().close();
1201         }
1202     }
1203     m_p->release_frontend(package);
1204 }
1205
1206 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only,
1207                                   const char *path)
1208 {
1209     for (ptr = ptr->children; ptr; ptr = ptr->next)
1210     {
1211         if (ptr->type != XML_ELEMENT_NODE)
1212             continue;
1213         if (!strcmp((const char *) ptr->name, "resultset"))
1214         {
1215             const struct _xmlAttr *attr;
1216             for (attr = ptr->properties; attr; attr = attr->next)
1217             {
1218                 if (!strcmp((const char *) attr->name, "ttl"))
1219                     m_p->m_resultset_ttl = 
1220                         mp::xml::get_int(attr->children, 30);
1221                 else if (!strcmp((const char *) attr->name, "max"))
1222                 {
1223                     m_p->m_resultset_max = 
1224                         mp::xml::get_int(attr->children, 10);
1225                 }
1226                 else if (!strcmp((const char *) attr->name, "optimizesearch"))
1227                 {
1228                     m_p->m_optimize_search =
1229                         mp::xml::get_bool(attr->children, true);
1230                 }
1231                 else
1232                     throw mp::filter::FilterException(
1233                         "Bad attribute " + std::string((const char *)
1234                                                        attr->name));
1235             }
1236         }
1237         else if (!strcmp((const char *) ptr->name, "session"))
1238         {
1239             const struct _xmlAttr *attr;
1240             for (attr = ptr->properties; attr; attr = attr->next)
1241             {
1242                 if (!strcmp((const char *) attr->name, "ttl"))
1243                     m_p->m_session_ttl = 
1244                         mp::xml::get_int(attr->children, 90);
1245                 else if (!strcmp((const char *) attr->name, "max"))
1246                     m_p->m_session_max = 
1247                         mp::xml::get_int(attr->children, 100);
1248                 else
1249                     throw mp::filter::FilterException(
1250                         "Bad attribute " + std::string((const char *)
1251                                                        attr->name));
1252             }
1253         }
1254         else
1255         {
1256             throw mp::filter::FilterException("Bad element " 
1257                                                + std::string((const char *)
1258                                                              ptr->name));
1259         }
1260     }
1261 }
1262
1263 static mp::filter::Base* filter_creator()
1264 {
1265     return new mp::filter::SessionShared;
1266 }
1267
1268 extern "C" {
1269     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1270         0,
1271         "session_shared",
1272         filter_creator
1273     };
1274 }
1275
1276 /*
1277  * Local variables:
1278  * c-basic-offset: 4
1279  * c-file-style: "Stroustrup"
1280  * indent-tabs-mode: nil
1281  * End:
1282  * vim: shiftwidth=4 tabstop=8 expandtab
1283  */
1284