session_shared: allow max sessions to be given
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2012 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include <metaproxy/filter.hpp>
22 #include <metaproxy/package.hpp>
23
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30
31 #include <metaproxy/util.hpp>
32 #include "filter_session_shared.hpp"
33
34 #include <yaz/log.h>
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yazpp/z-query.h>
39 #include <yazpp/record-cache.h>
40 #include <map>
41 #include <iostream>
42 #include <time.h>
43
44 namespace mp = metaproxy_1;
45 namespace yf = metaproxy_1::filter;
46
47 namespace metaproxy_1 {
48
49     namespace filter {
50         // key for session.. We'll only share sessions with same InitKey
51         class SessionShared::InitKey {
52         public:
53             bool operator < (const SessionShared::InitKey &k) const;
54             InitKey(Z_InitRequest *req);
55             InitKey(const InitKey &);
56             ~InitKey();
57         private:
58             char *m_idAuthentication_buf;
59             int m_idAuthentication_size;
60             char *m_otherInfo_buf;
61             int m_otherInfo_size;
62             ODR m_odr;
63         };
64         // worker thread .. for expiry of sessions
65         class SessionShared::Worker {
66         public:
67             Worker(SessionShared::Rep *rep);
68             void operator() (void);
69         private:
70             SessionShared::Rep *m_p;
71         };
72         // backend result set
73         class SessionShared::BackendSet {
74         public:
75             std::string m_result_set_id;
76             Databases m_databases;
77             int m_result_set_size;
78             yazpp_1::Yaz_Z_Query m_query;
79             time_t m_time_last_use;
80             void timestamp();
81             yazpp_1::RecordCache m_record_cache;
82             BackendSet(
83                 const std::string &result_set_id,
84                 const Databases &databases,
85                 const yazpp_1::Yaz_Z_Query &query);
86             bool search(
87                 Package &frontend_package,
88                 Package &search_package,
89                 const Z_APDU *apdu_req,
90                 const BackendInstancePtr bp,
91                 Z_Records **z_records);
92         };
93         // backend connection instance
94         class SessionShared::BackendInstance {
95             friend class Rep;
96             friend class BackendClass;
97             friend class BackendSet;
98         public:
99             mp::Session m_session;
100             BackendSetList m_sets;
101             bool m_in_use;
102             int m_sequence_this;
103             int m_result_set_sequence;
104             time_t m_time_last_use;
105             mp::Package * m_close_package;
106             ~BackendInstance();
107             void timestamp();
108         };
109         // backends of some class (all with same InitKey)
110         class SessionShared::BackendClass : boost::noncopyable {
111             friend class Rep;
112             friend struct Frontend;
113             bool m_named_result_sets;
114             BackendInstanceList m_backend_list;
115             BackendInstancePtr create_backend(const Package &package);
116             void remove_backend(BackendInstancePtr b);
117             BackendInstancePtr get_backend(const Package &package);
118             void use_backend(BackendInstancePtr b);
119             void release_backend(BackendInstancePtr b);
120             void expire_class();
121             yazpp_1::GDU m_init_request;
122             yazpp_1::GDU m_init_response;
123             boost::mutex m_mutex_backend_class;
124             int m_sequence_top;
125             time_t m_backend_set_ttl;
126             time_t m_backend_expiry_ttl;
127             size_t m_backend_set_max;
128         public:
129             BackendClass(const yazpp_1::GDU &init_request,
130                          int resultset_ttl,
131                          int resultset_max,
132                          int session_ttl);
133             ~BackendClass();
134         };
135         // frontend result set
136         class SessionShared::FrontendSet {
137             Databases m_databases;
138             yazpp_1::Yaz_Z_Query m_query;
139         public:
140             const Databases &get_databases();
141             const yazpp_1::Yaz_Z_Query &get_query();
142             FrontendSet(
143                 const Databases &databases,
144                 const yazpp_1::Yaz_Z_Query &query);
145             FrontendSet();
146         };
147         // frontend session
148         struct SessionShared::Frontend {
149             Frontend(Rep *rep);
150             ~Frontend();
151             bool m_is_virtual;
152             bool m_in_use;
153             Z_Options m_init_options;
154             void search(Package &package, Z_APDU *apdu);
155             void present(Package &package, Z_APDU *apdu);
156             void scan(Package &package, Z_APDU *apdu);
157
158             void get_set(mp::Package &package,
159                          const Z_APDU *apdu_req,
160                          const Databases &databases,
161                          yazpp_1::Yaz_Z_Query &query,
162                          BackendInstancePtr &found_backend,
163                          BackendSetPtr &found_set);
164             void override_set(BackendInstancePtr &found_backend,
165                               std::string &result_set_id,
166                               const Databases &databases,
167                               bool out_of_sessions);
168
169             Rep *m_p;
170             BackendClassPtr m_backend_class;
171             FrontendSets m_frontend_sets;
172         };            
173         // representation
174         class SessionShared::Rep {
175             friend class SessionShared;
176             friend struct Frontend;
177             
178             FrontendPtr get_frontend(Package &package);
179             void release_frontend(Package &package);
180             Rep();
181         public:
182             void expire();
183         private:
184             void init(Package &package, const Z_GDU *gdu,
185                       FrontendPtr frontend);
186             void start();
187             boost::mutex m_mutex;
188             boost::condition m_cond_session_ready;
189             std::map<mp::Session, FrontendPtr> m_clients;
190
191             BackendClassMap m_backend_map;
192             boost::mutex m_mutex_backend_map;
193             boost::thread_group m_thrds;
194             int m_resultset_ttl;
195             int m_resultset_max;
196             int m_session_ttl;
197             bool m_optimize_search;
198             int m_session_max;
199         };
200     }
201 }
202
203 yf::SessionShared::FrontendSet::FrontendSet(
204     const Databases &databases,
205     const yazpp_1::Yaz_Z_Query &query)
206     : m_databases(databases), m_query(query)
207 {
208 }
209
210 const yf::SessionShared::Databases & 
211 yf::SessionShared::FrontendSet::get_databases()
212 {
213     return m_databases;
214 }
215
216 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
217 {
218     return m_query;
219 }
220
221 yf::SessionShared::InitKey::InitKey(const InitKey &k)
222 {
223     m_odr = odr_createmem(ODR_ENCODE);
224     
225     m_idAuthentication_size =  k.m_idAuthentication_size;
226     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
227     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
228            m_idAuthentication_size);
229
230     m_otherInfo_size =  k.m_otherInfo_size;
231     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
232     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
233            m_otherInfo_size);
234 }
235
236 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
237 {
238     m_odr = odr_createmem(ODR_ENCODE);
239
240     Z_IdAuthentication *t = req->idAuthentication;
241     z_IdAuthentication(m_odr, &t, 1, 0);
242     m_idAuthentication_buf =
243         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
244
245     Z_OtherInformation *o = req->otherInfo;
246     z_OtherInformation(m_odr, &o, 1, 0);
247     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
248 }
249
250 yf::SessionShared::InitKey::~InitKey()
251 {
252     odr_destroy(m_odr);
253 }
254
255 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
256     const 
257 {
258     int c;
259     c = mp::util::memcmp2(
260         (void*) m_idAuthentication_buf, m_idAuthentication_size,
261         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
262     if (c < 0)
263         return true;
264     else if (c > 0)
265         return false;
266
267     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
268                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
269     if (c < 0)
270         return true;
271     else if (c > 0)
272         return false;
273     return false;
274 }
275
276 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
277 {
278     boost::mutex::scoped_lock lock(m_mutex_backend_class);
279     b->m_in_use = false;
280 }
281
282
283 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
284 {
285     BackendInstanceList::iterator it = m_backend_list.begin();
286     
287     while (it != m_backend_list.end())
288     {
289         if (*it == b)
290         {
291              mp::odr odr;
292             (*it)->m_close_package->response() = odr.create_close(
293                 0, Z_Close_lackOfActivity, 0);
294             (*it)->m_close_package->session().close();
295             (*it)->m_close_package->move();
296             
297             it = m_backend_list.erase(it);
298         }
299         else
300             it++;
301     }
302 }
303
304
305
306 yf::SessionShared::BackendInstancePtr 
307 yf::SessionShared::BackendClass::get_backend(
308     const mp::Package &frontend_package)
309 {
310     {
311         boost::mutex::scoped_lock lock(m_mutex_backend_class);
312         
313         BackendInstanceList::const_iterator it = m_backend_list.begin();
314         
315         BackendInstancePtr backend1; // null
316         
317         for (; it != m_backend_list.end(); it++)
318         {
319             if (!(*it)->m_in_use)
320             {
321                 if (!backend1 
322                     || (*it)->m_sequence_this < backend1->m_sequence_this)
323                     backend1 = *it;
324             }
325         }
326         if (backend1)
327         {
328             use_backend(backend1);
329             return backend1;
330         }
331     }
332     return create_backend(frontend_package);
333 }
334
335 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
336 {
337     backend->m_in_use = true;
338     backend->m_sequence_this = m_sequence_top++;
339 }
340
341 void yf::SessionShared::BackendInstance::timestamp()
342 {
343     assert(m_in_use);
344     time(&m_time_last_use);
345 }
346
347 yf::SessionShared::BackendInstance::~BackendInstance()
348 {
349     delete m_close_package;
350 }
351
352 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
353     const mp::Package &frontend_package)
354 {
355     BackendInstancePtr bp(new BackendInstance);
356     BackendInstancePtr null;
357
358     bp->m_close_package =
359         new mp::Package(bp->m_session, frontend_package.origin());
360     bp->m_close_package->copy_filter(frontend_package);
361
362     Package init_package(bp->m_session, frontend_package.origin());
363
364     init_package.copy_filter(frontend_package);
365
366     yazpp_1::GDU actual_init_request = m_init_request;
367     Z_GDU *init_pdu = actual_init_request.get();
368
369     assert(init_pdu->which == Z_GDU_Z3950);
370     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
371
372     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
373     ODR_MASK_ZERO(req->options);
374
375     ODR_MASK_SET(req->options, Z_Options_search);
376     ODR_MASK_SET(req->options, Z_Options_present);
377     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
378     ODR_MASK_SET(req->options, Z_Options_scan);
379
380     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
381     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
382     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
383
384     init_package.request() = init_pdu;
385
386     init_package.move();
387
388     boost::mutex::scoped_lock lock(m_mutex_backend_class);
389
390     m_named_result_sets = false;
391     Z_GDU *gdu = init_package.response().get();
392     if (init_package.session().is_closed())
393     {
394         /* already closed. We don't know why */
395         return null;
396     }
397     else if (gdu && gdu->which == Z_GDU_Z3950 
398              && gdu->u.z3950->which == Z_APDU_initResponse
399              && *gdu->u.z3950->u.initResponse->result)
400     {
401         /* successful init response */
402         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
403         m_init_response = gdu->u.z3950;
404         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
405         {
406             m_named_result_sets = true;
407         }
408     }
409     else
410     {
411         /* not init or init rejected */
412         init_package.copy_filter(frontend_package);
413         init_package.session().close();
414         init_package.move();
415         return null;
416     }
417     bp->m_in_use = true;
418     time(&bp->m_time_last_use);
419     bp->m_sequence_this = 0;
420     bp->m_result_set_sequence = 0;
421     m_backend_list.push_back(bp);
422
423     return bp;
424 }
425
426
427 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
428                                               int resultset_ttl,
429                                               int resultset_max,
430                                               int session_ttl)
431     : m_named_result_sets(false), m_init_request(init_request),
432       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
433       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
434 {}
435
436 yf::SessionShared::BackendClass::~BackendClass()
437 {}
438
439 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
440                                   FrontendPtr frontend)
441 {
442     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
443
444     frontend->m_is_virtual = true;
445     frontend->m_init_options = *req->options;
446     InitKey k(req);
447     {
448         boost::mutex::scoped_lock lock(m_mutex_backend_map);
449         BackendClassMap::const_iterator it;
450         it = m_backend_map.find(k);
451         if (it == m_backend_map.end())
452         {
453             BackendClassPtr b(new BackendClass(gdu->u.z3950,
454                                                m_resultset_ttl,
455                                                m_resultset_max,
456                                                m_session_ttl));
457             m_backend_map[k] = b;
458             frontend->m_backend_class = b;
459         }
460         else
461         {
462             frontend->m_backend_class = it->second;            
463         }
464     }
465     BackendClassPtr bc = frontend->m_backend_class;
466     BackendInstancePtr backend = bc->get_backend(package);
467     
468     mp::odr odr;
469     if (!backend)
470     {
471         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
472         *apdu->u.initResponse->result = 0;
473         package.response() = apdu;
474         package.session().close();
475     }
476     else
477     {
478         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
479         yazpp_1::GDU init_response = bc->m_init_response;
480         Z_GDU *response_gdu = init_response.get();
481         mp::util::transfer_referenceId(odr, gdu->u.z3950,
482                                        response_gdu->u.z3950);
483
484         Z_Options *server_options =
485             response_gdu->u.z3950->u.initResponse->options;
486         Z_Options *client_options = &frontend->m_init_options;
487
488         int i;
489         for (i = 0; i<30; i++)
490             if (!ODR_MASK_GET(client_options, i))
491                 ODR_MASK_CLEAR(server_options, i);
492         package.response() = init_response;
493     }
494     if (backend)
495         bc->release_backend(backend);
496 }
497
498 void yf::SessionShared::BackendSet::timestamp()
499 {
500     time(&m_time_last_use);
501 }
502
503 yf::SessionShared::BackendSet::BackendSet(
504     const std::string &result_set_id,
505     const Databases &databases,
506     const yazpp_1::Yaz_Z_Query &query) :
507     m_result_set_id(result_set_id),
508     m_databases(databases), m_result_set_size(0), m_query(query) 
509 {
510     timestamp();
511 }
512
513 static int get_diagnostic(Z_DefaultDiagFormat *r)
514 {
515     return *r->condition;
516 }
517
518 bool yf::SessionShared::BackendSet::search(
519     mp::Package &frontend_package,
520     mp::Package &search_package,
521     const Z_APDU *frontend_apdu,
522     const BackendInstancePtr bp,
523     Z_Records **z_records)
524 {
525     mp::odr odr;
526     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
527     Z_SearchRequest *req = apdu_req->u.searchRequest;
528
529     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
530     req->query = m_query.get_Z_Query();
531
532     req->num_databaseNames = m_databases.size();
533     req->databaseNames = (char**) 
534         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
535     Databases::const_iterator it = m_databases.begin();
536     size_t i = 0;
537     for (; it != m_databases.end(); it++)
538         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
539
540     if (frontend_apdu->which == Z_APDU_searchRequest)
541         req->preferredRecordSyntax =
542             frontend_apdu->u.searchRequest->preferredRecordSyntax;
543
544     search_package.request() = apdu_req;
545
546     search_package.move();
547
548     Z_GDU *gdu = search_package.response().get();
549     if (!search_package.session().is_closed()
550         && gdu && gdu->which == Z_GDU_Z3950 
551         && gdu->u.z3950->which == Z_APDU_searchResponse)
552     {
553         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
554         *z_records = b_resp->records;
555         m_result_set_size = *b_resp->resultCount;
556         return true;
557     }
558     Z_APDU *f_apdu = 0;
559     if (frontend_apdu->which == Z_APDU_searchRequest)
560         f_apdu = odr.create_searchResponse(
561             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
562     else if (frontend_apdu->which == Z_APDU_presentRequest)
563         f_apdu = odr.create_presentResponse(
564             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
565     else
566         f_apdu = odr.create_close(
567             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
568     frontend_package.response() = f_apdu;
569     return false;
570 }
571
572 void yf::SessionShared::Frontend::override_set(
573     BackendInstancePtr &found_backend,
574     std::string &result_set_id,
575     const Databases &databases,
576     bool out_of_sessions)
577 {
578     BackendClassPtr bc = m_backend_class;
579     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
580     time_t now;
581     time(&now);
582
583     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
584     for (; it != bc->m_backend_list.end(); it++)
585     {
586         if (!(*it)->m_in_use)
587         {
588             BackendSetList::iterator set_it = (*it)->m_sets.begin();
589             for (; set_it != (*it)->m_sets.end(); set_it++)
590             {
591                 if ((max_sets > 1 || (*set_it)->m_databases == databases)
592                     &&
593                     (out_of_sessions ||
594                      now < (*set_it)->m_time_last_use ||
595                      now - (*set_it)->m_time_last_use >= bc->m_backend_set_ttl))
596                 {
597                     found_backend = *it;
598                     result_set_id = (*set_it)->m_result_set_id;
599                     found_backend->m_sets.erase(set_it);
600                     return;
601                 }
602             }
603         }
604     }
605     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
606     {
607         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
608         {
609             found_backend = *it;
610             if (bc->m_named_result_sets)
611             {
612                 result_set_id = boost::io::str(
613                     boost::format("%1%") % 
614                     found_backend->m_result_set_sequence);
615                 found_backend->m_result_set_sequence++;
616             }
617             else
618                 result_set_id = "default";
619             return;
620         }
621     }
622 }
623
624 void yf::SessionShared::Frontend::get_set(mp::Package &package,
625                                           const Z_APDU *apdu_req,
626                                           const Databases &databases,
627                                           yazpp_1::Yaz_Z_Query &query,
628                                           BackendInstancePtr &found_backend,
629                                           BackendSetPtr &found_set)
630 {
631     bool session_restarted = false;
632
633 restart:
634     std::string result_set_id;
635     bool out_of_sessions = false;
636     BackendClassPtr bc = m_backend_class;
637     {
638         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
639
640         if ((int) bc->m_backend_list.size() >= m_p->m_session_max)
641             out_of_sessions = true;
642         
643         if (m_p->m_optimize_search)
644         {
645             // look at each backend and see if we have a similar search
646             BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
647             for (; it != bc->m_backend_list.end(); it++)
648             {
649                 if (!(*it)->m_in_use)
650                 {
651                     BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
652                     for (; set_it != (*it)->m_sets.end(); set_it++)
653                     {
654                         if ((*set_it)->m_databases == databases
655                             && query.match(&(*set_it)->m_query))
656                         {
657                             found_set = *set_it;
658                             found_backend = *it;
659                             bc->use_backend(found_backend);
660                             found_set->timestamp();
661                             // found matching set. No need to search again
662                             return;
663                         }
664                     }
665                 }
666             }
667         }
668         override_set(found_backend, result_set_id, databases, out_of_sessions);
669         if (found_backend)
670             bc->use_backend(found_backend);
671     }
672     if (!found_backend)
673     {
674         // create a new backend set (and new set) if we're not out of sessions
675         if (!out_of_sessions)
676             found_backend = bc->create_backend(package);
677
678         if (!found_backend)
679         {
680             Z_APDU *f_apdu = 0;
681             mp::odr odr;
682             const char *addinfo = 0;
683             
684             if (out_of_sessions)
685                 addinfo = "session_shared: all sessions in use";
686             if (apdu_req->which == Z_APDU_searchRequest)
687             {
688                 f_apdu = odr.create_searchResponse(
689                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
690             }
691             else if (apdu_req->which == Z_APDU_presentRequest)
692             {
693                 f_apdu = odr.create_presentResponse(
694                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
695             }
696             else
697             {
698                 f_apdu = odr.create_close(
699                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, addinfo);
700             }
701             package.response() = f_apdu;
702             return;
703         }
704         if (bc->m_named_result_sets)
705         {
706             result_set_id = boost::io::str(
707                 boost::format("%1%") % found_backend->m_result_set_sequence);
708             found_backend->m_result_set_sequence++;
709         }
710         else
711             result_set_id = "default";
712     }
713     found_backend->timestamp();
714
715     // we must search ...
716     BackendSetPtr new_set(new BackendSet(result_set_id,
717                                          databases, query));
718     Z_Records *z_records = 0;
719
720     Package search_package(found_backend->m_session, package.origin());
721     search_package.copy_filter(package);
722
723     if (!new_set->search(package, search_package,
724                          apdu_req, found_backend, &z_records))
725     {
726         bc->remove_backend(found_backend);
727         return; // search error 
728     }
729
730     if (z_records)
731     {
732         int condition = 0;
733         if (z_records->which == Z_Records_NSD)
734         {
735             condition =
736                 get_diagnostic(z_records->u.nonSurrogateDiagnostic);
737         }
738         else if (z_records->which == Z_Records_multipleNSD)
739         {
740             if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1
741                 && 
742                 
743                 z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which ==
744                 Z_DiagRec_defaultFormat)
745             {
746                 condition = get_diagnostic(
747                     z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat);
748                 
749             }
750         }
751         if (!session_restarted &&
752             condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR)
753         {
754             bc->remove_backend(found_backend);
755             session_restarted = true;
756             found_backend.reset();
757             goto restart;
758
759         }
760
761         if (condition)
762         {
763             mp::odr odr;
764             if (apdu_req->which == Z_APDU_searchRequest)
765             {
766                 Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 
767                                                            0, 0);
768                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
769                 *f_resp->searchStatus = Z_SearchResponse_none;
770                 f_resp->records = z_records;
771                 package.response() = f_apdu;
772             }
773             if (apdu_req->which == Z_APDU_presentRequest)
774             {
775                 Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 
776                                                             0, 0);
777                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
778                 f_resp->records = z_records;
779                 package.response() = f_apdu;
780             }
781             bc->release_backend(found_backend);
782             return; // search error 
783         }
784     }
785     if (!session_restarted && new_set->m_result_set_size < 0)
786     {
787         bc->remove_backend(found_backend);
788         session_restarted = true;
789         found_backend.reset();
790         goto restart;
791     }
792
793     found_set = new_set;
794     found_set->timestamp();
795     found_backend->m_sets.push_back(found_set);
796 }
797
798 void yf::SessionShared::Frontend::search(mp::Package &package,
799                                          Z_APDU *apdu_req)
800 {
801     Z_SearchRequest *req = apdu_req->u.searchRequest;
802     FrontendSets::iterator fset_it = 
803         m_frontend_sets.find(req->resultSetName);
804     if (fset_it != m_frontend_sets.end())
805     {
806         // result set already exist 
807         // if replace indicator is off: we return diagnostic if
808         // result set already exist.
809         if (*req->replaceIndicator == 0)
810         {
811             mp::odr odr;
812             Z_APDU *apdu = 
813                 odr.create_searchResponse(
814                     apdu_req,
815                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
816                     0);
817             package.response() = apdu;
818             
819             return;
820         }
821         m_frontend_sets.erase(fset_it);
822     }
823     
824     yazpp_1::Yaz_Z_Query query;
825     query.set_Z_Query(req->query);
826     Databases databases;
827     int i;
828     for (i = 0; i<req->num_databaseNames; i++)
829         databases.push_back(req->databaseNames[i]);
830
831     BackendSetPtr found_set; // null
832     BackendInstancePtr found_backend; // null
833
834     get_set(package, apdu_req, databases, query, found_backend, found_set);
835     if (!found_set)
836         return;
837
838     mp::odr odr;
839     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
840     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
841     *f_resp->resultCount = found_set->m_result_set_size;
842     package.response() = f_apdu;
843
844     FrontendSetPtr fset(new FrontendSet(databases, query));
845     m_frontend_sets[req->resultSetName] = fset;
846
847     m_backend_class->release_backend(found_backend);
848 }
849
850 void yf::SessionShared::Frontend::present(mp::Package &package,
851                                           Z_APDU *apdu_req)
852 {
853     mp::odr odr;
854     Z_PresentRequest *req = apdu_req->u.presentRequest;
855
856     FrontendSets::iterator fset_it = 
857         m_frontend_sets.find(req->resultSetId);
858
859     if (fset_it == m_frontend_sets.end())
860     {
861         Z_APDU *apdu = 
862             odr.create_presentResponse(
863                 apdu_req,
864                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
865                 req->resultSetId);
866         package.response() = apdu;
867         return;
868     }
869     FrontendSetPtr fset = fset_it->second;
870
871     Databases databases = fset->get_databases();
872     yazpp_1::Yaz_Z_Query query = fset->get_query();
873
874     BackendClassPtr bc = m_backend_class;
875     BackendSetPtr found_set; // null
876     BackendInstancePtr found_backend;
877
878     get_set(package, apdu_req, databases, query, found_backend, found_set);
879     if (!found_set)
880         return;
881
882     Z_NamePlusRecordList *npr_res = 0;
883     if (found_set->m_record_cache.lookup(odr, &npr_res, 
884                                          *req->resultSetStartPoint,
885                                          *req->numberOfRecordsRequested,
886                                          req->preferredRecordSyntax,
887                                          req->recordComposition))
888     {
889         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
890         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
891
892         yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF 
893                 " records in cache %p",
894                 *req->resultSetStartPoint,                      
895                 *req->numberOfRecordsRequested,
896                 &found_set->m_record_cache);        
897
898         *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested;
899         *f_resp->nextResultSetPosition = 
900             *req->resultSetStartPoint + *req->numberOfRecordsRequested;
901         // f_resp->presentStatus assumed OK.
902         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
903         f_resp->records->which = Z_Records_DBOSD;
904         f_resp->records->u.databaseOrSurDiagnostics = npr_res;
905         package.response() = f_apdu_res;
906         bc->release_backend(found_backend);
907         return;
908     }
909
910     found_backend->timestamp();
911                               
912     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
913     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
914     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
915     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
916     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
917     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
918     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
919     p_req->recordComposition = req->recordComposition;
920
921     Package present_package(found_backend->m_session, package.origin());
922     present_package.copy_filter(package);
923
924     present_package.request() = p_apdu;
925
926     present_package.move();
927
928     Z_GDU *gdu = present_package.response().get();
929     if (!present_package.session().is_closed()
930         && gdu && gdu->which == Z_GDU_Z3950 
931         && gdu->u.z3950->which == Z_APDU_presentResponse)
932     {
933         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
934         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
935         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
936
937         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
938         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
939         f_resp->presentStatus= b_resp->presentStatus;
940         f_resp->records = b_resp->records;
941         f_resp->otherInfo = b_resp->otherInfo;
942         package.response() = f_apdu_res;
943
944         if (b_resp->records && b_resp->records->which ==  Z_Records_DBOSD)
945         {
946             yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF
947                     " records to cache %p",
948                     *req->resultSetStartPoint,                      
949                     *f_resp->numberOfRecordsReturned,
950                     &found_set->m_record_cache);        
951             found_set->m_record_cache.add(
952                 odr,
953                 b_resp->records->u.databaseOrSurDiagnostics,
954                 *req->resultSetStartPoint,                      
955                 *f_resp->numberOfRecordsReturned);
956         }
957         bc->release_backend(found_backend);
958     }
959     else
960     {
961         bc->remove_backend(found_backend);
962         Z_APDU *f_apdu_res = 
963             odr.create_presentResponse(
964                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
965         package.response() = f_apdu_res;
966     }
967 }
968
969 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
970                                        Z_APDU *apdu_req)
971 {
972     BackendClassPtr bc = m_backend_class;
973     BackendInstancePtr backend = bc->get_backend(frontend_package);
974     if (!backend)
975     {
976         mp::odr odr;
977         Z_APDU *apdu = odr.create_scanResponse(
978             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
979         frontend_package.response() = apdu;
980     }
981     else
982     {
983         Package scan_package(backend->m_session, frontend_package.origin());
984         backend->timestamp();
985         scan_package.copy_filter(frontend_package);
986         scan_package.request() = apdu_req;
987         scan_package.move();
988         frontend_package.response() = scan_package.response();
989         if (scan_package.session().is_closed())
990         {
991             frontend_package.session().close();
992             bc->remove_backend(backend);
993         }
994         else
995             bc->release_backend(backend);
996     }
997 }
998
999 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
1000 {
1001 }
1002
1003 void yf::SessionShared::Worker::operator() (void)
1004 {
1005     m_p->expire();
1006 }
1007
1008 void yf::SessionShared::BackendClass::expire_class()
1009 {
1010     time_t now;
1011     time(&now);
1012     boost::mutex::scoped_lock lock(m_mutex_backend_class);
1013     BackendInstanceList::iterator bit = m_backend_list.begin();
1014     while (bit != m_backend_list.end())
1015     {
1016         time_t last_use = (*bit)->m_time_last_use;
1017         
1018         if ((*bit)->m_in_use)
1019         {
1020             bit++;
1021         }
1022         else if (now < last_use || now - last_use > m_backend_expiry_ttl)
1023         {
1024             mp::odr odr;
1025             (*bit)->m_close_package->response() = odr.create_close(
1026                 0, Z_Close_lackOfActivity, 0);
1027             (*bit)->m_close_package->session().close();
1028             (*bit)->m_close_package->move();
1029
1030             bit = m_backend_list.erase(bit);
1031         }
1032         else
1033         {
1034             bit++;
1035         }
1036     }
1037 }
1038
1039 void yf::SessionShared::Rep::expire()
1040 {
1041     while (true)
1042     {
1043         boost::xtime xt;
1044         boost::xtime_get(&xt, boost::TIME_UTC);
1045         xt.sec += m_session_ttl / 3;
1046         boost::thread::sleep(xt);
1047         
1048         BackendClassMap::const_iterator b_it = m_backend_map.begin();
1049         for (; b_it != m_backend_map.end(); b_it++)
1050             b_it->second->expire_class();
1051     }
1052 }
1053
1054 yf::SessionShared::Rep::Rep()
1055 {
1056     m_resultset_ttl = 30;
1057     m_resultset_max = 10;
1058     m_session_ttl = 90;
1059     m_optimize_search = true;
1060     m_session_max = 100;
1061 }
1062
1063 void yf::SessionShared::Rep::start()
1064 {
1065     yf::SessionShared::Worker w(this);
1066     m_thrds.add_thread(new boost::thread(w));
1067 }
1068
1069 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
1070 {
1071 }
1072
1073 yf::SessionShared::~SessionShared() {
1074 }
1075
1076 void yf::SessionShared::start() const
1077 {
1078     m_p->start();
1079 }
1080
1081 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
1082 {
1083 }
1084
1085 yf::SessionShared::Frontend::~Frontend()
1086 {
1087 }
1088
1089 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
1090 {
1091     boost::mutex::scoped_lock lock(m_mutex);
1092
1093     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1094     
1095     while(true)
1096     {
1097         it = m_clients.find(package.session());
1098         if (it == m_clients.end())
1099             break;
1100         
1101         if (!it->second->m_in_use)
1102         {
1103             it->second->m_in_use = true;
1104             return it->second;
1105         }
1106         m_cond_session_ready.wait(lock);
1107     }
1108     FrontendPtr f(new Frontend(this));
1109     m_clients[package.session()] = f;
1110     f->m_in_use = true;
1111     return f;
1112 }
1113
1114 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
1115 {
1116     boost::mutex::scoped_lock lock(m_mutex);
1117     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1118     
1119     it = m_clients.find(package.session());
1120     if (it != m_clients.end())
1121     {
1122         if (package.session().is_closed())
1123         {
1124             m_clients.erase(it);
1125         }
1126         else
1127         {
1128             it->second->m_in_use = false;
1129         }
1130         m_cond_session_ready.notify_all();
1131     }
1132 }
1133
1134
1135 void yf::SessionShared::process(mp::Package &package) const
1136 {
1137     FrontendPtr f = m_p->get_frontend(package);
1138
1139     Z_GDU *gdu = package.request().get();
1140     
1141     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1142         Z_APDU_initRequest && !f->m_is_virtual)
1143     {
1144         m_p->init(package, gdu, f);
1145     }
1146     else if (!f->m_is_virtual)
1147         package.move();
1148     else if (gdu && gdu->which == Z_GDU_Z3950)
1149     {
1150         Z_APDU *apdu = gdu->u.z3950;
1151         if (apdu->which == Z_APDU_initRequest)
1152         {
1153             mp::odr odr;
1154             
1155             package.response() = odr.create_close(
1156                 apdu,
1157                 Z_Close_protocolError,
1158                 "double init");
1159             
1160             package.session().close();
1161         }
1162         else if (apdu->which == Z_APDU_close)
1163         {
1164             mp::odr odr;
1165             
1166             package.response() = odr.create_close(
1167                 apdu,
1168                 Z_Close_peerAbort, "received close from client");
1169             package.session().close();
1170         }
1171         else if (apdu->which == Z_APDU_searchRequest)
1172         {
1173             f->search(package, apdu);
1174         }
1175         else if (apdu->which == Z_APDU_presentRequest)
1176         {
1177             f->present(package, apdu);
1178         }
1179         else if (apdu->which == Z_APDU_scanRequest)
1180         {
1181             f->scan(package, apdu);
1182         }
1183         else
1184         {
1185             mp::odr odr;
1186             
1187             package.response() = odr.create_close(
1188                 apdu, Z_Close_protocolError,
1189                 "unsupported APDU in filter_session_shared");
1190             
1191             package.session().close();
1192         }
1193     }
1194     m_p->release_frontend(package);
1195 }
1196
1197 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only,
1198                                   const char *path)
1199 {
1200     for (ptr = ptr->children; ptr; ptr = ptr->next)
1201     {
1202         if (ptr->type != XML_ELEMENT_NODE)
1203             continue;
1204         if (!strcmp((const char *) ptr->name, "resultset"))
1205         {
1206             const struct _xmlAttr *attr;
1207             for (attr = ptr->properties; attr; attr = attr->next)
1208             {
1209                 if (!strcmp((const char *) attr->name, "ttl"))
1210                     m_p->m_resultset_ttl = 
1211                         mp::xml::get_int(attr->children, 30);
1212                 else if (!strcmp((const char *) attr->name, "max"))
1213                 {
1214                     m_p->m_resultset_max = 
1215                         mp::xml::get_int(attr->children, 10);
1216                 }
1217                 else if (!strcmp((const char *) attr->name, "optimizesearch"))
1218                 {
1219                     m_p->m_optimize_search =
1220                         mp::xml::get_bool(attr->children, true);
1221                 }
1222                 else
1223                     throw mp::filter::FilterException(
1224                         "Bad attribute " + std::string((const char *)
1225                                                        attr->name));
1226             }
1227         }
1228         else if (!strcmp((const char *) ptr->name, "session"))
1229         {
1230             const struct _xmlAttr *attr;
1231             for (attr = ptr->properties; attr; attr = attr->next)
1232             {
1233                 if (!strcmp((const char *) attr->name, "ttl"))
1234                     m_p->m_session_ttl = 
1235                         mp::xml::get_int(attr->children, 90);
1236                 else if (!strcmp((const char *) attr->name, "max"))
1237                     m_p->m_session_max = 
1238                         mp::xml::get_int(attr->children, 100);
1239                 else
1240                     throw mp::filter::FilterException(
1241                         "Bad attribute " + std::string((const char *)
1242                                                        attr->name));
1243             }
1244         }
1245         else
1246         {
1247             throw mp::filter::FilterException("Bad element " 
1248                                                + std::string((const char *)
1249                                                              ptr->name));
1250         }
1251     }
1252 }
1253
1254 static mp::filter::Base* filter_creator()
1255 {
1256     return new mp::filter::SessionShared;
1257 }
1258
1259 extern "C" {
1260     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1261         0,
1262         "session_shared",
1263         filter_creator
1264     };
1265 }
1266
1267 /*
1268  * Local variables:
1269  * c-basic-offset: 4
1270  * c-file-style: "Stroustrup"
1271  * indent-tabs-mode: nil
1272  * End:
1273  * vim: shiftwidth=4 tabstop=8 expandtab
1274  */
1275