session_shared: relay preferredRecordSyntax for search
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2011 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include <metaproxy/filter.hpp>
22 #include <metaproxy/package.hpp>
23
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30
31 #include <metaproxy/util.hpp>
32 #include "filter_session_shared.hpp"
33
34 #include <yaz/log.h>
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yazpp/z-query.h>
39 #include <yazpp/record-cache.h>
40 #include <map>
41 #include <iostream>
42 #include <time.h>
43
44 namespace mp = metaproxy_1;
45 namespace yf = metaproxy_1::filter;
46
47 namespace metaproxy_1 {
48
49     namespace filter {
50         // key for session.. We'll only share sessions with same InitKey
51         class SessionShared::InitKey {
52         public:
53             bool operator < (const SessionShared::InitKey &k) const;
54             InitKey(Z_InitRequest *req);
55             InitKey(const InitKey &);
56             ~InitKey();
57         private:
58             char *m_idAuthentication_buf;
59             int m_idAuthentication_size;
60             char *m_otherInfo_buf;
61             int m_otherInfo_size;
62             ODR m_odr;
63         };
64         // worker thread .. for expiry of sessions
65         class SessionShared::Worker {
66         public:
67             Worker(SessionShared::Rep *rep);
68             void operator() (void);
69         private:
70             SessionShared::Rep *m_p;
71         };
72         // backend result set
73         class SessionShared::BackendSet {
74         public:
75             std::string m_result_set_id;
76             Databases m_databases;
77             int m_result_set_size;
78             yazpp_1::Yaz_Z_Query m_query;
79             time_t m_time_last_use;
80             void timestamp();
81             yazpp_1::RecordCache m_record_cache;
82             BackendSet(
83                 const std::string &result_set_id,
84                 const Databases &databases,
85                 const yazpp_1::Yaz_Z_Query &query);
86             bool search(
87                 Package &frontend_package,
88                 Package &search_package,
89                 const Z_APDU *apdu_req,
90                 const BackendInstancePtr bp,
91                 Z_Records **z_records);
92         };
93         // backend connection instance
94         class SessionShared::BackendInstance {
95             friend class Rep;
96             friend class BackendClass;
97             friend class BackendSet;
98         public:
99             mp::Session m_session;
100             BackendSetList m_sets;
101             bool m_in_use;
102             int m_sequence_this;
103             int m_result_set_sequence;
104             time_t m_time_last_use;
105             mp::Package * m_close_package;
106             ~BackendInstance();
107         };
108         // backends of some class (all with same InitKey)
109         class SessionShared::BackendClass : boost::noncopyable {
110             friend class Rep;
111             friend struct Frontend;
112             bool m_named_result_sets;
113             BackendInstanceList m_backend_list;
114             BackendInstancePtr create_backend(const Package &package);
115             void remove_backend(BackendInstancePtr b);
116             BackendInstancePtr get_backend(const Package &package);
117             void use_backend(BackendInstancePtr b);
118             void release_backend(BackendInstancePtr b);
119             void expire_class();
120             yazpp_1::GDU m_init_request;
121             yazpp_1::GDU m_init_response;
122             boost::mutex m_mutex_backend_class;
123             int m_sequence_top;
124             time_t m_backend_set_ttl;
125             time_t m_backend_expiry_ttl;
126             size_t m_backend_set_max;
127         public:
128             BackendClass(const yazpp_1::GDU &init_request,
129                          int resultset_ttl,
130                          int resultset_max,
131                          int session_ttl);
132             ~BackendClass();
133         };
134         // frontend result set
135         class SessionShared::FrontendSet {
136             Databases m_databases;
137             yazpp_1::Yaz_Z_Query m_query;
138         public:
139             const Databases &get_databases();
140             const yazpp_1::Yaz_Z_Query &get_query();
141             FrontendSet(
142                 const Databases &databases,
143                 const yazpp_1::Yaz_Z_Query &query);
144             FrontendSet();
145         };
146         // frontend session
147         struct SessionShared::Frontend {
148             Frontend(Rep *rep);
149             ~Frontend();
150             bool m_is_virtual;
151             bool m_in_use;
152             Z_Options m_init_options;
153             void search(Package &package, Z_APDU *apdu);
154             void present(Package &package, Z_APDU *apdu);
155             void scan(Package &package, Z_APDU *apdu);
156
157             void get_set(mp::Package &package,
158                          const Z_APDU *apdu_req,
159                          const Databases &databases,
160                          yazpp_1::Yaz_Z_Query &query,
161                          BackendInstancePtr &found_backend,
162                          BackendSetPtr &found_set);
163             void override_set(BackendInstancePtr &found_backend,
164                               std::string &result_set_id);
165
166             Rep *m_p;
167             BackendClassPtr m_backend_class;
168             FrontendSets m_frontend_sets;
169         };            
170         // representation
171         class SessionShared::Rep {
172             friend class SessionShared;
173             friend struct Frontend;
174             
175             FrontendPtr get_frontend(Package &package);
176             void release_frontend(Package &package);
177             Rep();
178         public:
179             void expire();
180         private:
181             void init(Package &package, const Z_GDU *gdu,
182                       FrontendPtr frontend);
183             void start();
184             boost::mutex m_mutex;
185             boost::condition m_cond_session_ready;
186             std::map<mp::Session, FrontendPtr> m_clients;
187
188             BackendClassMap m_backend_map;
189             boost::mutex m_mutex_backend_map;
190             boost::thread_group m_thrds;
191             int m_resultset_ttl;
192             int m_resultset_max;
193             int m_session_ttl;
194             bool m_optimize_search;
195         };
196     }
197 }
198
199 yf::SessionShared::FrontendSet::FrontendSet(
200     const Databases &databases,
201     const yazpp_1::Yaz_Z_Query &query)
202     : m_databases(databases), m_query(query)
203 {
204 }
205
206 const yf::SessionShared::Databases & 
207 yf::SessionShared::FrontendSet::get_databases()
208 {
209     return m_databases;
210 }
211
212 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
213 {
214     return m_query;
215 }
216
217 yf::SessionShared::InitKey::InitKey(const InitKey &k)
218 {
219     m_odr = odr_createmem(ODR_ENCODE);
220     
221     m_idAuthentication_size =  k.m_idAuthentication_size;
222     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
223     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
224            m_idAuthentication_size);
225
226     m_otherInfo_size =  k.m_otherInfo_size;
227     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
228     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
229            m_otherInfo_size);
230 }
231
232 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
233 {
234     m_odr = odr_createmem(ODR_ENCODE);
235
236     Z_IdAuthentication *t = req->idAuthentication;
237     z_IdAuthentication(m_odr, &t, 1, 0);
238     m_idAuthentication_buf =
239         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
240
241     Z_OtherInformation *o = req->otherInfo;
242     z_OtherInformation(m_odr, &o, 1, 0);
243     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
244 }
245
246 yf::SessionShared::InitKey::~InitKey()
247 {
248     odr_destroy(m_odr);
249 }
250
251 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
252     const 
253 {
254     int c;
255     c = mp::util::memcmp2(
256         (void*) m_idAuthentication_buf, m_idAuthentication_size,
257         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
258     if (c < 0)
259         return true;
260     else if (c > 0)
261         return false;
262
263     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
264                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
265     if (c < 0)
266         return true;
267     else if (c > 0)
268         return false;
269     return false;
270 }
271
272 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
273 {
274     boost::mutex::scoped_lock lock(m_mutex_backend_class);
275     b->m_in_use = false;
276 }
277
278
279 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
280 {
281     BackendInstanceList::iterator it = m_backend_list.begin();
282     
283     while (it != m_backend_list.end())
284     {
285         if (*it == b)
286         {
287              mp::odr odr;
288             (*it)->m_close_package->response() = odr.create_close(
289                 0, Z_Close_lackOfActivity, 0);
290             (*it)->m_close_package->session().close();
291             (*it)->m_close_package->move();
292             
293             it = m_backend_list.erase(it);
294         }
295         else
296             it++;
297     }
298 }
299
300
301
302 yf::SessionShared::BackendInstancePtr 
303 yf::SessionShared::BackendClass::get_backend(
304     const mp::Package &frontend_package)
305 {
306     {
307         boost::mutex::scoped_lock lock(m_mutex_backend_class);
308         
309         BackendInstanceList::const_iterator it = m_backend_list.begin();
310         
311         BackendInstancePtr backend1; // null
312         
313         for (; it != m_backend_list.end(); it++)
314         {
315             if (!(*it)->m_in_use)
316             {
317                 if (!backend1 
318                     || (*it)->m_sequence_this < backend1->m_sequence_this)
319                     backend1 = *it;
320             }
321         }
322         if (backend1)
323         {
324             use_backend(backend1);
325             return backend1;
326         }
327     }
328     return create_backend(frontend_package);
329 }
330
331 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
332 {
333     backend->m_in_use = true;
334     time(&backend->m_time_last_use);
335     backend->m_sequence_this = m_sequence_top++;
336 }
337
338 yf::SessionShared::BackendInstance::~BackendInstance()
339 {
340     delete m_close_package;
341 }
342
343 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
344     const mp::Package &frontend_package)
345 {
346     BackendInstancePtr bp(new BackendInstance);
347     BackendInstancePtr null;
348
349     bp->m_close_package =
350         new mp::Package(bp->m_session, frontend_package.origin());
351     bp->m_close_package->copy_filter(frontend_package);
352
353     Package init_package(bp->m_session, frontend_package.origin());
354
355     init_package.copy_filter(frontend_package);
356
357     yazpp_1::GDU actual_init_request = m_init_request;
358     Z_GDU *init_pdu = actual_init_request.get();
359
360     assert(init_pdu->which == Z_GDU_Z3950);
361     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
362
363     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
364     ODR_MASK_ZERO(req->options);
365
366     ODR_MASK_SET(req->options, Z_Options_search);
367     ODR_MASK_SET(req->options, Z_Options_present);
368     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
369     ODR_MASK_SET(req->options, Z_Options_scan);
370
371     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
372     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
373     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
374
375     init_package.request() = init_pdu;
376
377     init_package.move();
378
379     boost::mutex::scoped_lock lock(m_mutex_backend_class);
380
381     m_named_result_sets = false;
382     Z_GDU *gdu = init_package.response().get();
383     if (init_package.session().is_closed())
384     {
385         /* already closed. We don't know why */
386         return null;
387     }
388     else if (gdu && gdu->which == Z_GDU_Z3950 
389              && gdu->u.z3950->which == Z_APDU_initResponse
390              && *gdu->u.z3950->u.initResponse->result)
391     {
392         /* successful init response */
393         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
394         m_init_response = gdu->u.z3950;
395         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
396         {
397             m_named_result_sets = true;
398         }
399     }
400     else
401     {
402         /* not init or init rejected */
403         init_package.copy_filter(frontend_package);
404         init_package.session().close();
405         init_package.move();
406         return null;
407     }
408     bp->m_in_use = true;
409     time(&bp->m_time_last_use);
410     bp->m_sequence_this = 0;
411     bp->m_result_set_sequence = 0;
412     m_backend_list.push_back(bp);
413
414     return bp;
415 }
416
417
418 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
419                                               int resultset_ttl,
420                                               int resultset_max,
421                                               int session_ttl)
422     : m_named_result_sets(false), m_init_request(init_request),
423       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
424       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
425 {}
426
427 yf::SessionShared::BackendClass::~BackendClass()
428 {}
429
430 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
431                                   FrontendPtr frontend)
432 {
433     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
434
435     frontend->m_is_virtual = true;
436     frontend->m_init_options = *req->options;
437     InitKey k(req);
438     {
439         boost::mutex::scoped_lock lock(m_mutex_backend_map);
440         BackendClassMap::const_iterator it;
441         it = m_backend_map.find(k);
442         if (it == m_backend_map.end())
443         {
444             BackendClassPtr b(new BackendClass(gdu->u.z3950,
445                                                m_resultset_ttl,
446                                                m_resultset_max,
447                                                m_session_ttl));
448             m_backend_map[k] = b;
449             frontend->m_backend_class = b;
450         }
451         else
452         {
453             frontend->m_backend_class = it->second;            
454         }
455     }
456     BackendClassPtr bc = frontend->m_backend_class;
457     BackendInstancePtr backend = bc->get_backend(package);
458     
459     mp::odr odr;
460     if (!backend)
461     {
462         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
463         *apdu->u.initResponse->result = 0;
464         package.response() = apdu;
465         package.session().close();
466     }
467     else
468     {
469         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
470         yazpp_1::GDU init_response = bc->m_init_response;
471         Z_GDU *response_gdu = init_response.get();
472         mp::util::transfer_referenceId(odr, gdu->u.z3950,
473                                        response_gdu->u.z3950);
474
475         Z_Options *server_options =
476             response_gdu->u.z3950->u.initResponse->options;
477         Z_Options *client_options = &frontend->m_init_options;
478
479         int i;
480         for (i = 0; i<30; i++)
481             if (!ODR_MASK_GET(client_options, i))
482                 ODR_MASK_CLEAR(server_options, i);
483         package.response() = init_response;
484     }
485     if (backend)
486         bc->release_backend(backend);
487 }
488
489 void yf::SessionShared::BackendSet::timestamp()
490 {
491     time(&m_time_last_use);
492 }
493
494 yf::SessionShared::BackendSet::BackendSet(
495     const std::string &result_set_id,
496     const Databases &databases,
497     const yazpp_1::Yaz_Z_Query &query) :
498     m_result_set_id(result_set_id),
499     m_databases(databases), m_result_set_size(0), m_query(query) 
500 {
501     timestamp();
502 }
503
504 static int get_diagnostic(Z_DefaultDiagFormat *r)
505 {
506     return *r->condition;
507 }
508
509 bool yf::SessionShared::BackendSet::search(
510     mp::Package &frontend_package,
511     mp::Package &search_package,
512     const Z_APDU *frontend_apdu,
513     const BackendInstancePtr bp,
514     Z_Records **z_records)
515 {
516     mp::odr odr;
517     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
518     Z_SearchRequest *req = apdu_req->u.searchRequest;
519
520     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
521     req->query = m_query.get_Z_Query();
522
523     req->num_databaseNames = m_databases.size();
524     req->databaseNames = (char**) 
525         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
526     Databases::const_iterator it = m_databases.begin();
527     size_t i = 0;
528     for (; it != m_databases.end(); it++)
529         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
530
531     if (frontend_apdu->which == Z_APDU_searchRequest)
532         req->preferredRecordSyntax =
533             frontend_apdu->u.searchRequest->preferredRecordSyntax;
534
535     search_package.request() = apdu_req;
536
537     search_package.move();
538
539     Z_GDU *gdu = search_package.response().get();
540     if (!search_package.session().is_closed()
541         && gdu && gdu->which == Z_GDU_Z3950 
542         && gdu->u.z3950->which == Z_APDU_searchResponse)
543     {
544         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
545         *z_records = b_resp->records;
546         m_result_set_size = *b_resp->resultCount;
547         return true;
548     }
549     Z_APDU *f_apdu = 0;
550     if (frontend_apdu->which == Z_APDU_searchRequest)
551         f_apdu = odr.create_searchResponse(
552             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
553     else if (frontend_apdu->which == Z_APDU_presentRequest)
554         f_apdu = odr.create_presentResponse(
555             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
556     else
557         f_apdu = odr.create_close(
558             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
559     frontend_package.response() = f_apdu;
560     return false;
561 }
562
563 void yf::SessionShared::Frontend::override_set(
564     BackendInstancePtr &found_backend,
565     std::string &result_set_id)
566 {
567     BackendClassPtr bc = m_backend_class;
568     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
569     time_t now;
570     time(&now);
571     
572     for (; it != bc->m_backend_list.end(); it++)
573     {
574         if (!(*it)->m_in_use)
575         {
576             BackendSetList::iterator set_it = (*it)->m_sets.begin();
577             for (; set_it != (*it)->m_sets.end(); set_it++)
578             {
579                 if (now >= (*set_it)->m_time_last_use &&
580                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
581                 {
582                     found_backend = *it;
583                     result_set_id = (*set_it)->m_result_set_id;
584                     found_backend->m_sets.erase(set_it);
585                     return;
586                 }
587             }
588         }
589     }
590     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
591     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
592     {
593         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
594         {
595             found_backend = *it;
596             if (bc->m_named_result_sets)
597             {
598                 result_set_id = boost::io::str(
599                     boost::format("%1%") % 
600                     found_backend->m_result_set_sequence);
601                 found_backend->m_result_set_sequence++;
602             }
603             else
604                 result_set_id = "default";
605             return;
606         }
607     }
608 }
609
610 void yf::SessionShared::Frontend::get_set(mp::Package &package,
611                                           const Z_APDU *apdu_req,
612                                           const Databases &databases,
613                                           yazpp_1::Yaz_Z_Query &query,
614                                           BackendInstancePtr &found_backend,
615                                           BackendSetPtr &found_set)
616 {
617     bool session_restarted = false;
618
619 restart:
620     std::string result_set_id;
621     BackendClassPtr bc = m_backend_class;
622     {
623         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
624         
625         if (m_p->m_optimize_search)
626         {
627             // look at each backend and see if we have a similar search
628             BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
629             for (; it != bc->m_backend_list.end(); it++)
630             {
631                 if (!(*it)->m_in_use)
632                 {
633                     BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
634                     for (; set_it != (*it)->m_sets.end(); set_it++)
635                     {
636                         if ((*set_it)->m_databases == databases
637                             && query.match(&(*set_it)->m_query))
638                         {
639                             found_set = *set_it;
640                             found_backend = *it;
641                             bc->use_backend(found_backend);
642                             found_set->timestamp();
643                             // found matching set. No need to search again
644                             return;
645                         }
646                     }
647                 }
648             }
649         }
650         override_set(found_backend, result_set_id);
651         if (found_backend)
652             bc->use_backend(found_backend);
653     }
654     if (!found_backend)
655     {
656         // create a new backend set (and new set)
657         found_backend = bc->create_backend(package);
658
659         if (!found_backend)
660         {
661             Z_APDU *f_apdu = 0;
662             mp::odr odr;
663             if (apdu_req->which == Z_APDU_searchRequest)
664             {
665                 f_apdu = odr.create_searchResponse(
666                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
667             }
668             else if (apdu_req->which == Z_APDU_presentRequest)
669             {
670                 f_apdu = odr.create_presentResponse(
671                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
672             }
673             else
674             {
675                 f_apdu = odr.create_close(
676                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
677             }
678             package.response() = f_apdu;
679             return;
680         }
681         if (bc->m_named_result_sets)
682         {
683             result_set_id = boost::io::str(
684                 boost::format("%1%") % found_backend->m_result_set_sequence);
685             found_backend->m_result_set_sequence++;
686         }
687         else
688             result_set_id = "default";
689     }
690     // we must search ...
691     BackendSetPtr new_set(new BackendSet(result_set_id,
692                                          databases, query));
693     Z_Records *z_records = 0;
694
695     Package search_package(found_backend->m_session, package.origin());
696     search_package.copy_filter(package);
697
698     if (!new_set->search(package, search_package,
699                          apdu_req, found_backend, &z_records))
700     {
701         bc->remove_backend(found_backend);
702         return; // search error 
703     }
704
705     if (z_records)
706     {
707         int condition = 0;
708         if (z_records->which == Z_Records_NSD)
709         {
710             condition =
711                 get_diagnostic(z_records->u.nonSurrogateDiagnostic);
712         }
713         else if (z_records->which == Z_Records_multipleNSD)
714         {
715             if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1
716                 && 
717                 
718                 z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which ==
719                 Z_DiagRec_defaultFormat)
720             {
721                 condition = get_diagnostic(
722                     z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat);
723                 
724             }
725         }
726         if (!session_restarted &&
727             condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR)
728         {
729             bc->remove_backend(found_backend);
730             session_restarted = true;
731             found_backend.reset();
732             goto restart;
733
734         }
735
736         if (condition)
737         {
738             mp::odr odr;
739             if (apdu_req->which == Z_APDU_searchRequest)
740             {
741                 Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 
742                                                            0, 0);
743                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
744                 *f_resp->searchStatus = Z_SearchResponse_none;
745                 f_resp->records = z_records;
746                 package.response() = f_apdu;
747             }
748             if (apdu_req->which == Z_APDU_presentRequest)
749             {
750                 Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 
751                                                             0, 0);
752                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
753                 f_resp->records = z_records;
754                 package.response() = f_apdu;
755             }
756             bc->release_backend(found_backend);
757             return; // search error 
758         }
759     }
760     if (!session_restarted && new_set->m_result_set_size < 0)
761     {
762         bc->remove_backend(found_backend);
763         session_restarted = true;
764         found_backend.reset();
765         goto restart;
766     }
767
768     found_set = new_set;
769     found_set->timestamp();
770     found_backend->m_sets.push_back(found_set);
771 }
772
773 void yf::SessionShared::Frontend::search(mp::Package &package,
774                                          Z_APDU *apdu_req)
775 {
776     Z_SearchRequest *req = apdu_req->u.searchRequest;
777     FrontendSets::iterator fset_it = 
778         m_frontend_sets.find(req->resultSetName);
779     if (fset_it != m_frontend_sets.end())
780     {
781         // result set already exist 
782         // if replace indicator is off: we return diagnostic if
783         // result set already exist.
784         if (*req->replaceIndicator == 0)
785         {
786             mp::odr odr;
787             Z_APDU *apdu = 
788                 odr.create_searchResponse(
789                     apdu_req,
790                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
791                     0);
792             package.response() = apdu;
793             
794             return;
795         }
796         m_frontend_sets.erase(fset_it);
797     }
798     
799     yazpp_1::Yaz_Z_Query query;
800     query.set_Z_Query(req->query);
801     Databases databases;
802     int i;
803     for (i = 0; i<req->num_databaseNames; i++)
804         databases.push_back(req->databaseNames[i]);
805
806     BackendSetPtr found_set; // null
807     BackendInstancePtr found_backend; // null
808
809     get_set(package, apdu_req, databases, query, found_backend, found_set);
810     if (!found_set)
811         return;
812
813     mp::odr odr;
814     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
815     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
816     *f_resp->resultCount = found_set->m_result_set_size;
817     package.response() = f_apdu;
818
819     FrontendSetPtr fset(new FrontendSet(databases, query));
820     m_frontend_sets[req->resultSetName] = fset;
821
822     m_backend_class->release_backend(found_backend);
823 }
824
825 void yf::SessionShared::Frontend::present(mp::Package &package,
826                                           Z_APDU *apdu_req)
827 {
828     mp::odr odr;
829     Z_PresentRequest *req = apdu_req->u.presentRequest;
830
831     FrontendSets::iterator fset_it = 
832         m_frontend_sets.find(req->resultSetId);
833
834     if (fset_it == m_frontend_sets.end())
835     {
836         Z_APDU *apdu = 
837             odr.create_presentResponse(
838                 apdu_req,
839                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
840                 req->resultSetId);
841         package.response() = apdu;
842         return;
843     }
844     FrontendSetPtr fset = fset_it->second;
845
846     Databases databases = fset->get_databases();
847     yazpp_1::Yaz_Z_Query query = fset->get_query();
848
849     BackendClassPtr bc = m_backend_class;
850     BackendSetPtr found_set; // null
851     BackendInstancePtr found_backend;
852
853     get_set(package, apdu_req, databases, query, found_backend, found_set);
854     if (!found_set)
855         return;
856
857     Z_NamePlusRecordList *npr_res = 0;
858     if (found_set->m_record_cache.lookup(odr, &npr_res, 
859                                          *req->resultSetStartPoint,
860                                          *req->numberOfRecordsRequested,
861                                          req->preferredRecordSyntax,
862                                          req->recordComposition))
863     {
864         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
865         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
866
867         yaz_log(YLOG_LOG, "Found " ODR_INT_PRINTF "+" ODR_INT_PRINTF 
868                 " records in cache %p",
869                 *req->resultSetStartPoint,                      
870                 *req->numberOfRecordsRequested,
871                 &found_set->m_record_cache);        
872
873         *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested;
874         *f_resp->nextResultSetPosition = 
875             *req->resultSetStartPoint + *req->numberOfRecordsRequested;
876         // f_resp->presentStatus assumed OK.
877         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
878         f_resp->records->which = Z_Records_DBOSD;
879         f_resp->records->u.databaseOrSurDiagnostics = npr_res;
880         package.response() = f_apdu_res;
881         bc->release_backend(found_backend);
882         return;
883     }
884                               
885     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
886     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
887     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
888     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
889     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
890     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
891     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
892     p_req->recordComposition = req->recordComposition;
893
894     Package present_package(found_backend->m_session, package.origin());
895     present_package.copy_filter(package);
896
897     present_package.request() = p_apdu;
898
899     present_package.move();
900
901     Z_GDU *gdu = present_package.response().get();
902     if (!present_package.session().is_closed()
903         && gdu && gdu->which == Z_GDU_Z3950 
904         && gdu->u.z3950->which == Z_APDU_presentResponse)
905     {
906         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
907         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
908         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
909
910         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
911         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
912         f_resp->presentStatus= b_resp->presentStatus;
913         f_resp->records = b_resp->records;
914         f_resp->otherInfo = b_resp->otherInfo;
915         package.response() = f_apdu_res;
916
917         if (b_resp->records && b_resp->records->which ==  Z_Records_DBOSD)
918         {
919             yaz_log(YLOG_LOG, "Adding " ODR_INT_PRINTF "+" ODR_INT_PRINTF
920                     " records to cache %p",
921                     *req->resultSetStartPoint,                      
922                     *f_resp->numberOfRecordsReturned,
923                     &found_set->m_record_cache);        
924             found_set->m_record_cache.add(
925                 odr,
926                 b_resp->records->u.databaseOrSurDiagnostics,
927                 *req->resultSetStartPoint,                      
928                 *f_resp->numberOfRecordsReturned);
929         }
930         bc->release_backend(found_backend);
931     }
932     else
933     {
934         bc->remove_backend(found_backend);
935         Z_APDU *f_apdu_res = 
936             odr.create_presentResponse(
937                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
938         package.response() = f_apdu_res;
939     }
940 }
941
942 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
943                                        Z_APDU *apdu_req)
944 {
945     BackendClassPtr bc = m_backend_class;
946     BackendInstancePtr backend = bc->get_backend(frontend_package);
947     if (!backend)
948     {
949         mp::odr odr;
950         Z_APDU *apdu = odr.create_scanResponse(
951             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
952         frontend_package.response() = apdu;
953     }
954     else
955     {
956         Package scan_package(backend->m_session, frontend_package.origin());
957         scan_package.copy_filter(frontend_package);
958         scan_package.request() = apdu_req;
959         scan_package.move();
960         frontend_package.response() = scan_package.response();
961         if (scan_package.session().is_closed())
962         {
963             frontend_package.session().close();
964             bc->remove_backend(backend);
965         }
966         else
967             bc->release_backend(backend);
968     }
969 }
970
971 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
972 {
973 }
974
975 void yf::SessionShared::Worker::operator() (void)
976 {
977     m_p->expire();
978 }
979
980 void yf::SessionShared::BackendClass::expire_class()
981 {
982     time_t now;
983     time(&now);
984     boost::mutex::scoped_lock lock(m_mutex_backend_class);
985     BackendInstanceList::iterator bit = m_backend_list.begin();
986     while (bit != m_backend_list.end())
987     {
988         time_t last_use = (*bit)->m_time_last_use;
989         
990         if ((*bit)->m_in_use)
991         {
992             bit++;
993         }
994         else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
995             || (now < last_use))
996         {
997             mp::odr odr;
998             (*bit)->m_close_package->response() = odr.create_close(
999                 0, Z_Close_lackOfActivity, 0);
1000             (*bit)->m_close_package->session().close();
1001             (*bit)->m_close_package->move();
1002
1003             bit = m_backend_list.erase(bit);
1004         }
1005         else
1006         {
1007             bit++;
1008         }
1009     }
1010 }
1011
1012 void yf::SessionShared::Rep::expire()
1013 {
1014     while (true)
1015     {
1016         boost::xtime xt;
1017         boost::xtime_get(&xt, boost::TIME_UTC);
1018         xt.sec += 30;
1019         boost::thread::sleep(xt);
1020         
1021         BackendClassMap::const_iterator b_it = m_backend_map.begin();
1022         for (; b_it != m_backend_map.end(); b_it++)
1023             b_it->second->expire_class();
1024     }
1025 }
1026
1027 yf::SessionShared::Rep::Rep()
1028 {
1029     m_resultset_ttl = 30;
1030     m_resultset_max = 10;
1031     m_session_ttl = 90;
1032     m_optimize_search = true;
1033 }
1034
1035 void yf::SessionShared::Rep::start()
1036 {
1037     yf::SessionShared::Worker w(this);
1038     m_thrds.add_thread(new boost::thread(w));
1039 }
1040
1041 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
1042 {
1043 }
1044
1045 yf::SessionShared::~SessionShared() {
1046 }
1047
1048 void yf::SessionShared::start() const
1049 {
1050     m_p->start();
1051 }
1052
1053 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
1054 {
1055 }
1056
1057 yf::SessionShared::Frontend::~Frontend()
1058 {
1059 }
1060
1061 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
1062 {
1063     boost::mutex::scoped_lock lock(m_mutex);
1064
1065     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1066     
1067     while(true)
1068     {
1069         it = m_clients.find(package.session());
1070         if (it == m_clients.end())
1071             break;
1072         
1073         if (!it->second->m_in_use)
1074         {
1075             it->second->m_in_use = true;
1076             return it->second;
1077         }
1078         m_cond_session_ready.wait(lock);
1079     }
1080     FrontendPtr f(new Frontend(this));
1081     m_clients[package.session()] = f;
1082     f->m_in_use = true;
1083     return f;
1084 }
1085
1086 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
1087 {
1088     boost::mutex::scoped_lock lock(m_mutex);
1089     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1090     
1091     it = m_clients.find(package.session());
1092     if (it != m_clients.end())
1093     {
1094         if (package.session().is_closed())
1095         {
1096             m_clients.erase(it);
1097         }
1098         else
1099         {
1100             it->second->m_in_use = false;
1101         }
1102         m_cond_session_ready.notify_all();
1103     }
1104 }
1105
1106
1107 void yf::SessionShared::process(mp::Package &package) const
1108 {
1109     FrontendPtr f = m_p->get_frontend(package);
1110
1111     Z_GDU *gdu = package.request().get();
1112     
1113     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1114         Z_APDU_initRequest && !f->m_is_virtual)
1115     {
1116         m_p->init(package, gdu, f);
1117     }
1118     else if (!f->m_is_virtual)
1119         package.move();
1120     else if (gdu && gdu->which == Z_GDU_Z3950)
1121     {
1122         Z_APDU *apdu = gdu->u.z3950;
1123         if (apdu->which == Z_APDU_initRequest)
1124         {
1125             mp::odr odr;
1126             
1127             package.response() = odr.create_close(
1128                 apdu,
1129                 Z_Close_protocolError,
1130                 "double init");
1131             
1132             package.session().close();
1133         }
1134         else if (apdu->which == Z_APDU_close)
1135         {
1136             mp::odr odr;
1137             
1138             package.response() = odr.create_close(
1139                 apdu,
1140                 Z_Close_peerAbort, "received close from client");
1141             package.session().close();
1142         }
1143         else if (apdu->which == Z_APDU_searchRequest)
1144         {
1145             f->search(package, apdu);
1146         }
1147         else if (apdu->which == Z_APDU_presentRequest)
1148         {
1149             f->present(package, apdu);
1150         }
1151         else if (apdu->which == Z_APDU_scanRequest)
1152         {
1153             f->scan(package, apdu);
1154         }
1155         else
1156         {
1157             mp::odr odr;
1158             
1159             package.response() = odr.create_close(
1160                 apdu, Z_Close_protocolError,
1161                 "unsupported APDU in filter_session_shared");
1162             
1163             package.session().close();
1164         }
1165     }
1166     m_p->release_frontend(package);
1167 }
1168
1169 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only,
1170                                   const char *path)
1171 {
1172     for (ptr = ptr->children; ptr; ptr = ptr->next)
1173     {
1174         if (ptr->type != XML_ELEMENT_NODE)
1175             continue;
1176         if (!strcmp((const char *) ptr->name, "resultset"))
1177         {
1178             const struct _xmlAttr *attr;
1179             for (attr = ptr->properties; attr; attr = attr->next)
1180             {
1181                 if (!strcmp((const char *) attr->name, "ttl"))
1182                     m_p->m_resultset_ttl = 
1183                         mp::xml::get_int(attr->children, 30);
1184                 else if (!strcmp((const char *) attr->name, "max"))
1185                 {
1186                     m_p->m_resultset_max = 
1187                         mp::xml::get_int(attr->children, 10);
1188                 }
1189                 else if (!strcmp((const char *) attr->name, "optimizesearch"))
1190                 {
1191                     m_p->m_optimize_search =
1192                         mp::xml::get_bool(attr->children, true);
1193                 }
1194                 else
1195                     throw mp::filter::FilterException(
1196                         "Bad attribute " + std::string((const char *)
1197                                                        attr->name));
1198             }
1199         }
1200         else if (!strcmp((const char *) ptr->name, "session"))
1201         {
1202             const struct _xmlAttr *attr;
1203             for (attr = ptr->properties; attr; attr = attr->next)
1204             {
1205                 if (!strcmp((const char *) attr->name, "ttl"))
1206                     m_p->m_session_ttl = 
1207                         mp::xml::get_int(attr->children, 120);
1208                 else
1209                     throw mp::filter::FilterException(
1210                         "Bad attribute " + std::string((const char *)
1211                                                        attr->name));
1212             }
1213         }
1214         else
1215         {
1216             throw mp::filter::FilterException("Bad element " 
1217                                                + std::string((const char *)
1218                                                              ptr->name));
1219         }
1220     }
1221 }
1222
1223 static mp::filter::Base* filter_creator()
1224 {
1225     return new mp::filter::SessionShared;
1226 }
1227
1228 extern "C" {
1229     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1230         0,
1231         "session_shared",
1232         filter_creator
1233     };
1234 }
1235
1236 /*
1237  * Local variables:
1238  * c-basic-offset: 4
1239  * c-file-style: "Stroustrup"
1240  * indent-tabs-mode: nil
1241  * End:
1242  * vim: shiftwidth=4 tabstop=8 expandtab
1243  */
1244