Inval ses for diagnostic 'temporary system error'
[metaproxy-moved-to-github.git] / src / filter_session_shared.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2009 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20
21 #include "filter.hpp"
22 #include "package.hpp"
23
24 #include <boost/thread/mutex.hpp>
25 #include <boost/thread/condition.hpp>
26 #include <boost/thread/thread.hpp>
27 #include <boost/thread/xtime.hpp>
28 #include <boost/shared_ptr.hpp>
29 #include <boost/format.hpp>
30
31 #include "util.hpp"
32 #include "filter_session_shared.hpp"
33
34 #include <yaz/log.h>
35 #include <yaz/zgdu.h>
36 #include <yaz/otherinfo.h>
37 #include <yaz/diagbib1.h>
38 #include <yazpp/z-query.h>
39 #include <yazpp/record-cache.h>
40 #include <map>
41 #include <iostream>
42 #include <time.h>
43
44 namespace mp = metaproxy_1;
45 namespace yf = metaproxy_1::filter;
46
47 namespace metaproxy_1 {
48
49     namespace filter {
50         // key for session.. We'll only share sessions with same InitKey
51         class SessionShared::InitKey {
52         public:
53             bool operator < (const SessionShared::InitKey &k) const;
54             InitKey(Z_InitRequest *req);
55             InitKey(const InitKey &);
56             ~InitKey();
57         private:
58             char *m_idAuthentication_buf;
59             int m_idAuthentication_size;
60             char *m_otherInfo_buf;
61             int m_otherInfo_size;
62             ODR m_odr;
63         };
64         // worker thread .. for expiry of sessions
65         class SessionShared::Worker {
66         public:
67             Worker(SessionShared::Rep *rep);
68             void operator() (void);
69         private:
70             SessionShared::Rep *m_p;
71         };
72         // backend result set
73         class SessionShared::BackendSet {
74         public:
75             std::string m_result_set_id;
76             Databases m_databases;
77             int m_result_set_size;
78             yazpp_1::Yaz_Z_Query m_query;
79             time_t m_time_last_use;
80             void timestamp();
81             yazpp_1::RecordCache m_record_cache;
82             BackendSet(
83                 const std::string &result_set_id,
84                 const Databases &databases,
85                 const yazpp_1::Yaz_Z_Query &query);
86             bool search(
87                 Package &frontend_package,
88                 const Z_APDU *apdu_req,
89                 const BackendInstancePtr bp,
90                 Z_Records **z_records);
91         };
92         // backend connection instance
93         class SessionShared::BackendInstance {
94             friend class Rep;
95             friend class BackendClass;
96             friend class BackendSet;
97         public:
98             mp::Session m_session;
99             BackendSetList m_sets;
100             bool m_in_use;
101             int m_sequence_this;
102             int m_result_set_sequence;
103             time_t m_time_last_use;
104             mp::Package * m_close_package;
105             ~BackendInstance();
106         };
107         // backends of some class (all with same InitKey)
108         class SessionShared::BackendClass : boost::noncopyable {
109             friend class Rep;
110             friend struct Frontend;
111             bool m_named_result_sets;
112             BackendInstanceList m_backend_list;
113             BackendInstancePtr create_backend(const Package &package);
114             void remove_backend(BackendInstancePtr b);
115             BackendInstancePtr get_backend(const Package &package);
116             void use_backend(BackendInstancePtr b);
117             void release_backend(BackendInstancePtr b);
118             void expire_class();
119             yazpp_1::GDU m_init_request;
120             yazpp_1::GDU m_init_response;
121             boost::mutex m_mutex_backend_class;
122             int m_sequence_top;
123             time_t m_backend_set_ttl;
124             time_t m_backend_expiry_ttl;
125             size_t m_backend_set_max;
126         public:
127             BackendClass(const yazpp_1::GDU &init_request,
128                          int resultset_ttl,
129                          int resultset_max,
130                          int session_ttl);
131             ~BackendClass();
132         };
133         // frontend result set
134         class SessionShared::FrontendSet {
135             Databases m_databases;
136             yazpp_1::Yaz_Z_Query m_query;
137         public:
138             const Databases &get_databases();
139             const yazpp_1::Yaz_Z_Query &get_query();
140             FrontendSet(
141                 const Databases &databases,
142                 const yazpp_1::Yaz_Z_Query &query);
143             FrontendSet();
144         };
145         // frontend session
146         struct SessionShared::Frontend {
147             Frontend(Rep *rep);
148             ~Frontend();
149             bool m_is_virtual;
150             bool m_in_use;
151             Z_Options m_init_options;
152             void search(Package &package, Z_APDU *apdu);
153             void present(Package &package, Z_APDU *apdu);
154             void scan(Package &package, Z_APDU *apdu);
155
156             void get_set(mp::Package &package,
157                          const Z_APDU *apdu_req,
158                          const Databases &databases,
159                          yazpp_1::Yaz_Z_Query &query,
160                          BackendInstancePtr &found_backend,
161                          BackendSetPtr &found_set);
162             void override_set(BackendInstancePtr &found_backend,
163                               std::string &result_set_id);
164
165             Rep *m_p;
166             BackendClassPtr m_backend_class;
167             FrontendSets m_frontend_sets;
168         };            
169         // representation
170         class SessionShared::Rep {
171             friend class SessionShared;
172             friend struct Frontend;
173             
174             FrontendPtr get_frontend(Package &package);
175             void release_frontend(Package &package);
176             Rep();
177         public:
178             void expire();
179         private:
180             void init(Package &package, const Z_GDU *gdu,
181                       FrontendPtr frontend);
182             boost::mutex m_mutex;
183             boost::condition m_cond_session_ready;
184             std::map<mp::Session, FrontendPtr> m_clients;
185
186             BackendClassMap m_backend_map;
187             boost::mutex m_mutex_backend_map;
188             boost::thread_group m_thrds;
189             int m_resultset_ttl;
190             int m_resultset_max;
191             int m_session_ttl;
192         };
193     }
194 }
195
196 yf::SessionShared::FrontendSet::FrontendSet(
197     const Databases &databases,
198     const yazpp_1::Yaz_Z_Query &query)
199     : m_databases(databases), m_query(query)
200 {
201 }
202
203 const yf::SessionShared::Databases & 
204 yf::SessionShared::FrontendSet::get_databases()
205 {
206     return m_databases;
207 }
208
209 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
210 {
211     return m_query;
212 }
213
214 yf::SessionShared::InitKey::InitKey(const InitKey &k)
215 {
216     m_odr = odr_createmem(ODR_ENCODE);
217     
218     m_idAuthentication_size =  k.m_idAuthentication_size;
219     m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
220     memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
221            m_idAuthentication_size);
222
223     m_otherInfo_size =  k.m_otherInfo_size;
224     m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
225     memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
226            m_otherInfo_size);
227 }
228
229 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
230 {
231     m_odr = odr_createmem(ODR_ENCODE);
232
233     Z_IdAuthentication *t = req->idAuthentication;
234     z_IdAuthentication(m_odr, &t, 1, 0);
235     m_idAuthentication_buf =
236         odr_getbuf(m_odr, &m_idAuthentication_size, 0);
237
238     Z_OtherInformation *o = req->otherInfo;
239     z_OtherInformation(m_odr, &o, 1, 0);
240     m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
241 }
242
243 yf::SessionShared::InitKey::~InitKey()
244 {
245     odr_destroy(m_odr);
246 }
247
248 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
249     const 
250 {
251     int c;
252     c = mp::util::memcmp2(
253         (void*) m_idAuthentication_buf, m_idAuthentication_size,
254         (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
255     if (c < 0)
256         return true;
257     else if (c > 0)
258         return false;
259
260     c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
261                           (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
262     if (c < 0)
263         return true;
264     else if (c > 0)
265         return false;
266     return false;
267 }
268
269 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
270 {
271     boost::mutex::scoped_lock lock(m_mutex_backend_class);
272     b->m_in_use = false;
273 }
274
275
276 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
277 {
278     BackendInstanceList::iterator it = m_backend_list.begin();
279     
280     while (it != m_backend_list.end())
281     {
282         if (*it == b)
283         {
284              mp::odr odr;
285             (*it)->m_close_package->response() = odr.create_close(
286                 0, Z_Close_lackOfActivity, 0);
287             (*it)->m_close_package->session().close();
288             (*it)->m_close_package->move();
289             
290             it = m_backend_list.erase(it);
291         }
292         else
293             it++;
294     }
295 }
296
297
298
299 yf::SessionShared::BackendInstancePtr 
300 yf::SessionShared::BackendClass::get_backend(
301     const mp::Package &frontend_package)
302 {
303     {
304         boost::mutex::scoped_lock lock(m_mutex_backend_class);
305         
306         BackendInstanceList::const_iterator it = m_backend_list.begin();
307         
308         BackendInstancePtr backend1; // null
309         
310         for (; it != m_backend_list.end(); it++)
311         {
312             if (!(*it)->m_in_use)
313             {
314                 if (!backend1 
315                     || (*it)->m_sequence_this < backend1->m_sequence_this)
316                     backend1 = *it;
317             }
318         }
319         if (backend1)
320         {
321             use_backend(backend1);
322             return backend1;
323         }
324     }
325     return create_backend(frontend_package);
326 }
327
328 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
329 {
330     backend->m_in_use = true;
331     time(&backend->m_time_last_use);
332     backend->m_sequence_this = m_sequence_top++;
333 }
334
335 yf::SessionShared::BackendInstance::~BackendInstance()
336 {
337     delete m_close_package;
338 }
339
340 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
341     const mp::Package &frontend_package)
342 {
343     BackendInstancePtr bp(new BackendInstance);
344     BackendInstancePtr null;
345
346     bp->m_close_package =
347         new mp::Package(bp->m_session, frontend_package.origin());
348     bp->m_close_package->copy_filter(frontend_package);
349
350     Package init_package(bp->m_session, frontend_package.origin());
351
352     init_package.copy_filter(frontend_package);
353
354     yazpp_1::GDU actual_init_request = m_init_request;
355     Z_GDU *init_pdu = actual_init_request.get();
356
357     assert(init_pdu->which == Z_GDU_Z3950);
358     assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
359
360     Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
361     ODR_MASK_ZERO(req->options);
362
363     ODR_MASK_SET(req->options, Z_Options_search);
364     ODR_MASK_SET(req->options, Z_Options_present);
365     ODR_MASK_SET(req->options, Z_Options_namedResultSets);
366     ODR_MASK_SET(req->options, Z_Options_scan);
367
368     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
369     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
370     ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
371
372     init_package.request() = init_pdu;
373
374     init_package.move();
375
376     boost::mutex::scoped_lock lock(m_mutex_backend_class);
377
378     m_named_result_sets = false;
379     Z_GDU *gdu = init_package.response().get();
380     if (!init_package.session().is_closed()
381         && gdu && gdu->which == Z_GDU_Z3950 
382         && gdu->u.z3950->which == Z_APDU_initResponse)
383     {
384         Z_InitResponse *res = gdu->u.z3950->u.initResponse;
385         if (!*res->result)
386             return null;
387         m_init_response = gdu->u.z3950;
388         if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
389         {
390             m_named_result_sets = true;
391         }
392     }
393     else
394     {
395         // did not receive an init response or closed
396         return null;
397     }
398     bp->m_in_use = true;
399     time(&bp->m_time_last_use);
400     bp->m_sequence_this = 0;
401     bp->m_result_set_sequence = 0;
402     m_backend_list.push_back(bp);
403
404     return bp;
405 }
406
407
408 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request,
409                                               int resultset_ttl,
410                                               int resultset_max,
411                                               int session_ttl)
412     : m_named_result_sets(false), m_init_request(init_request),
413       m_sequence_top(0), m_backend_set_ttl(resultset_ttl),
414       m_backend_expiry_ttl(session_ttl), m_backend_set_max(resultset_max)
415 {}
416
417 yf::SessionShared::BackendClass::~BackendClass()
418 {}
419
420 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
421                                   FrontendPtr frontend)
422 {
423     Z_InitRequest *req = gdu->u.z3950->u.initRequest;
424
425     frontend->m_is_virtual = true;
426     frontend->m_init_options = *req->options;
427     InitKey k(req);
428     {
429         boost::mutex::scoped_lock lock(m_mutex_backend_map);
430         BackendClassMap::const_iterator it;
431         it = m_backend_map.find(k);
432         if (it == m_backend_map.end())
433         {
434             BackendClassPtr b(new BackendClass(gdu->u.z3950,
435                                                m_resultset_ttl,
436                                                m_resultset_max,
437                                                m_session_ttl));
438             m_backend_map[k] = b;
439             frontend->m_backend_class = b;
440         }
441         else
442         {
443             frontend->m_backend_class = it->second;            
444         }
445     }
446     BackendClassPtr bc = frontend->m_backend_class;
447     BackendInstancePtr backend = bc->get_backend(package);
448     
449     mp::odr odr;
450     if (!backend)
451     {
452         Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
453         *apdu->u.initResponse->result = 0;
454         package.response() = apdu;
455         package.session().close();
456     }
457     else
458     {
459         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
460         yazpp_1::GDU init_response = bc->m_init_response;
461         Z_GDU *response_gdu = init_response.get();
462         mp::util::transfer_referenceId(odr, gdu->u.z3950,
463                                        response_gdu->u.z3950);
464
465         Z_Options *server_options =
466             response_gdu->u.z3950->u.initResponse->options;
467         Z_Options *client_options = &frontend->m_init_options;
468
469         int i;
470         for (i = 0; i<30; i++)
471             if (!ODR_MASK_GET(client_options, i))
472                 ODR_MASK_CLEAR(server_options, i);
473         package.response() = init_response;
474     }
475     if (backend)
476         bc->release_backend(backend);
477 }
478
479 void yf::SessionShared::BackendSet::timestamp()
480 {
481     time(&m_time_last_use);
482 }
483
484 yf::SessionShared::BackendSet::BackendSet(
485     const std::string &result_set_id,
486     const Databases &databases,
487     const yazpp_1::Yaz_Z_Query &query) :
488     m_result_set_id(result_set_id),
489     m_databases(databases), m_result_set_size(0), m_query(query) 
490 {
491     timestamp();
492 }
493
494 static int get_diagnostic(Z_DefaultDiagFormat *r)
495 {
496     return *r->condition;
497 }
498
499 bool yf::SessionShared::BackendSet::search(
500     mp::Package &frontend_package,
501     const Z_APDU *frontend_apdu,
502     const BackendInstancePtr bp,
503     Z_Records **z_records)
504 {
505     Package search_package(bp->m_session, frontend_package.origin());
506
507     search_package.copy_filter(frontend_package);
508
509     mp::odr odr;
510     Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
511     Z_SearchRequest *req = apdu_req->u.searchRequest;
512
513     req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
514     req->query = m_query.get_Z_Query();
515
516     req->num_databaseNames = m_databases.size();
517     req->databaseNames = (char**) 
518         odr_malloc(odr, req->num_databaseNames * sizeof(char *));
519     Databases::const_iterator it = m_databases.begin();
520     size_t i = 0;
521     for (; it != m_databases.end(); it++)
522         req->databaseNames[i++] = odr_strdup(odr, it->c_str());
523
524     search_package.request() = apdu_req;
525
526     search_package.move();
527
528     Z_GDU *gdu = search_package.response().get();
529     if (!search_package.session().is_closed()
530         && gdu && gdu->which == Z_GDU_Z3950 
531         && gdu->u.z3950->which == Z_APDU_searchResponse)
532     {
533         Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
534         *z_records = b_resp->records;
535         m_result_set_size = *b_resp->resultCount;
536         return true;
537     }
538     Z_APDU *f_apdu = 0;
539     if (frontend_apdu->which == Z_APDU_searchRequest)
540         f_apdu = odr.create_searchResponse(
541             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
542     else if (frontend_apdu->which == Z_APDU_presentRequest)
543         f_apdu = odr.create_presentResponse(
544             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
545     else
546         f_apdu = odr.create_close(
547             frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
548     frontend_package.response() = f_apdu;
549     return false;
550 }
551
552 void yf::SessionShared::Frontend::override_set(
553     BackendInstancePtr &found_backend,
554     std::string &result_set_id)
555 {
556     BackendClassPtr bc = m_backend_class;
557     BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
558     time_t now;
559     time(&now);
560     
561     for (; it != bc->m_backend_list.end(); it++)
562     {
563         if (!(*it)->m_in_use)
564         {
565             BackendSetList::iterator set_it = (*it)->m_sets.begin();
566             for (; set_it != (*it)->m_sets.end(); set_it++)
567             {
568                 if (now >= (*set_it)->m_time_last_use &&
569                     now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
570                 {
571                     found_backend = *it;
572                     result_set_id = (*set_it)->m_result_set_id;
573                     found_backend->m_sets.erase(set_it);
574                     return;
575                 }
576             }
577         }
578     }
579     size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
580     for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
581     {
582         if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
583         {
584             found_backend = *it;
585             if (bc->m_named_result_sets)
586             {
587                 result_set_id = boost::io::str(
588                     boost::format("%1%") % 
589                     found_backend->m_result_set_sequence);
590                 found_backend->m_result_set_sequence++;
591             }
592             else
593                 result_set_id = "default";
594             return;
595         }
596     }
597 }
598
599 void yf::SessionShared::Frontend::get_set(mp::Package &package,
600                                           const Z_APDU *apdu_req,
601                                           const Databases &databases,
602                                           yazpp_1::Yaz_Z_Query &query,
603                                           BackendInstancePtr &found_backend,
604                                           BackendSetPtr &found_set)
605 {
606     bool session_restarted = false;
607
608 restart:
609     std::string result_set_id;
610     BackendClassPtr bc = m_backend_class;
611     {
612         boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
613      
614         // look at each backend and see if we have a similar search
615         BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
616         
617         for (; it != bc->m_backend_list.end(); it++)
618         {
619             if (!(*it)->m_in_use)
620             {
621                 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
622                 for (; set_it != (*it)->m_sets.end(); set_it++)
623                 {
624                     if ((*set_it)->m_databases == databases 
625                         && query.match(&(*set_it)->m_query))
626                     {
627                         found_set = *set_it;
628                         found_backend = *it;
629                         bc->use_backend(found_backend);
630                         found_set->timestamp();
631                         // found matching set. No need to search again
632                         return;
633                     }
634                 }
635             }
636         }
637         override_set(found_backend, result_set_id);
638         if (found_backend)
639             bc->use_backend(found_backend);
640     }
641     if (!found_backend)
642     {
643         // create a new backend set (and new set)
644         found_backend = bc->create_backend(package);
645
646         if (!found_backend)
647         {
648             Z_APDU *f_apdu = 0;
649             mp::odr odr;
650             if (apdu_req->which == Z_APDU_searchRequest)
651             {
652                 f_apdu = odr.create_searchResponse(
653                         apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
654             }
655             else if (apdu_req->which == Z_APDU_presentRequest)
656             {
657                 f_apdu = odr.create_presentResponse(
658                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
659             }
660             else
661             {
662                 f_apdu = odr.create_close(
663                     apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
664             }
665             package.response() = f_apdu;
666             return;
667         }
668         if (bc->m_named_result_sets)
669         {
670             result_set_id = boost::io::str(
671                 boost::format("%1%") % found_backend->m_result_set_sequence);
672             found_backend->m_result_set_sequence++;
673         }
674         else
675             result_set_id = "default";
676     }
677     // we must search ...
678     BackendSetPtr new_set(new BackendSet(result_set_id,
679                                          databases, query));
680     Z_Records *z_records = 0;
681     if (!new_set->search(package, apdu_req, found_backend, &z_records))
682     {
683         bc->remove_backend(found_backend);
684         return; // search error 
685     }
686
687     if (z_records)
688     {
689         int condition = 0;
690         if (z_records->which == Z_Records_NSD)
691         {
692             condition =
693                 get_diagnostic(z_records->u.nonSurrogateDiagnostic);
694         }
695         else if (z_records->which == Z_Records_multipleNSD)
696         {
697             if (z_records->u.multipleNonSurDiagnostics->num_diagRecs >= 1
698                 && 
699                 
700                 z_records->u.multipleNonSurDiagnostics->diagRecs[0]->which ==
701                 Z_DiagRec_defaultFormat)
702             {
703                 condition = get_diagnostic(
704                     z_records->u.multipleNonSurDiagnostics->diagRecs[0]->u.defaultFormat);
705                 
706             }
707         }
708         if (!session_restarted &&
709             condition == YAZ_BIB1_TEMPORARY_SYSTEM_ERROR)
710         {
711             bc->remove_backend(found_backend);
712             session_restarted = true;
713             found_backend.reset();
714             goto restart;
715
716         }
717
718         if (condition)
719         {
720             mp::odr odr;
721             if (apdu_req->which == Z_APDU_searchRequest)
722             {
723                 Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 
724                                                            0, 0);
725                 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
726                 *f_resp->searchStatus = Z_SearchResponse_none;
727                 f_resp->records = z_records;
728                 package.response() = f_apdu;
729             }
730             if (apdu_req->which == Z_APDU_presentRequest)
731             {
732                 Z_APDU *f_apdu = odr.create_presentResponse(apdu_req, 
733                                                             0, 0);
734                 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
735                 f_resp->records = z_records;
736                 package.response() = f_apdu;
737             }
738             bc->release_backend(found_backend);
739             return; // search error 
740         }
741     }
742     if (!session_restarted && new_set->m_result_set_size < 0)
743     {
744         bc->remove_backend(found_backend);
745         session_restarted = true;
746         found_backend.reset();
747         goto restart;
748     }
749
750     found_set = new_set;
751     found_set->timestamp();
752     found_backend->m_sets.push_back(found_set);
753 }
754
755 void yf::SessionShared::Frontend::search(mp::Package &package,
756                                          Z_APDU *apdu_req)
757 {
758     Z_SearchRequest *req = apdu_req->u.searchRequest;
759     FrontendSets::iterator fset_it = 
760         m_frontend_sets.find(req->resultSetName);
761     if (fset_it != m_frontend_sets.end())
762     {
763         // result set already exist 
764         // if replace indicator is off: we return diagnostic if
765         // result set already exist.
766         if (*req->replaceIndicator == 0)
767         {
768             mp::odr odr;
769             Z_APDU *apdu = 
770                 odr.create_searchResponse(
771                     apdu_req,
772                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
773                     0);
774             package.response() = apdu;
775             
776             return;
777         }
778         m_frontend_sets.erase(fset_it);
779     }
780     
781     yazpp_1::Yaz_Z_Query query;
782     query.set_Z_Query(req->query);
783     Databases databases;
784     int i;
785     for (i = 0; i<req->num_databaseNames; i++)
786         databases.push_back(req->databaseNames[i]);
787
788     BackendSetPtr found_set; // null
789     BackendInstancePtr found_backend; // null
790
791     get_set(package, apdu_req, databases, query, found_backend, found_set);
792     if (!found_set)
793         return;
794
795     mp::odr odr;
796     Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
797     Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
798     *f_resp->resultCount = found_set->m_result_set_size;
799     package.response() = f_apdu;
800
801     FrontendSetPtr fset(new FrontendSet(databases, query));
802     m_frontend_sets[req->resultSetName] = fset;
803
804     m_backend_class->release_backend(found_backend);
805 }
806
807 void yf::SessionShared::Frontend::present(mp::Package &package,
808                                           Z_APDU *apdu_req)
809 {
810     mp::odr odr;
811     Z_PresentRequest *req = apdu_req->u.presentRequest;
812
813     FrontendSets::iterator fset_it = 
814         m_frontend_sets.find(req->resultSetId);
815
816     if (fset_it == m_frontend_sets.end())
817     {
818         Z_APDU *apdu = 
819             odr.create_presentResponse(
820                 apdu_req,
821                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
822                 req->resultSetId);
823         package.response() = apdu;
824         return;
825     }
826     FrontendSetPtr fset = fset_it->second;
827
828     Databases databases = fset->get_databases();
829     yazpp_1::Yaz_Z_Query query = fset->get_query();
830
831     BackendClassPtr bc = m_backend_class;
832     BackendSetPtr found_set; // null
833     BackendInstancePtr found_backend;
834
835     get_set(package, apdu_req, databases, query, found_backend, found_set);
836     if (!found_set)
837         return;
838
839     Z_NamePlusRecordList *npr_res = 0;
840     if (found_set->m_record_cache.lookup(odr, &npr_res, 
841                                          *req->resultSetStartPoint,
842                                          *req->numberOfRecordsRequested,
843                                          req->preferredRecordSyntax,
844                                          req->recordComposition))
845     {
846         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
847         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
848
849         yaz_log(YLOG_LOG, "Found %d+%d records in cache %p",
850                 *req->resultSetStartPoint,                      
851                 *req->numberOfRecordsRequested,
852                 &found_set->m_record_cache);        
853
854         *f_resp->numberOfRecordsReturned = *req->numberOfRecordsRequested;
855         *f_resp->nextResultSetPosition = 
856             *req->resultSetStartPoint + *req->numberOfRecordsRequested;
857         // f_resp->presentStatus assumed OK.
858         f_resp->records = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
859         f_resp->records->which = Z_Records_DBOSD;
860         f_resp->records->u.databaseOrSurDiagnostics = npr_res;
861         package.response() = f_apdu_res;
862         bc->release_backend(found_backend);
863         return;
864     }
865                               
866     Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
867     Z_PresentRequest *p_req = p_apdu->u.presentRequest;
868     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
869     p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
870     *p_req->resultSetStartPoint = *req->resultSetStartPoint;
871     *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
872     p_req->preferredRecordSyntax = req->preferredRecordSyntax;
873     p_req->recordComposition = req->recordComposition;
874
875     Package present_package(found_backend->m_session, package.origin());
876     present_package.copy_filter(package);
877
878     present_package.request() = p_apdu;
879
880     present_package.move();
881
882     Z_GDU *gdu = present_package.response().get();
883     if (!present_package.session().is_closed()
884         && gdu && gdu->which == Z_GDU_Z3950 
885         && gdu->u.z3950->which == Z_APDU_presentResponse)
886     {
887         Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
888         Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
889         Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
890
891         f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
892         f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
893         f_resp->presentStatus= b_resp->presentStatus;
894         f_resp->records = b_resp->records;
895         f_resp->otherInfo = b_resp->otherInfo;
896         package.response() = f_apdu_res;
897
898         if (b_resp->records && b_resp->records->which ==  Z_Records_DBOSD)
899         {
900             yaz_log(YLOG_LOG, "Adding %d+%d records to cache %p",
901                     *req->resultSetStartPoint,                      
902                     *f_resp->numberOfRecordsReturned,
903                     &found_set->m_record_cache);        
904             found_set->m_record_cache.add(
905                 odr,
906                 b_resp->records->u.databaseOrSurDiagnostics,
907                 *req->resultSetStartPoint,                      
908                 *f_resp->numberOfRecordsReturned);
909         }
910         bc->release_backend(found_backend);
911     }
912     else
913     {
914         bc->remove_backend(found_backend);
915         Z_APDU *f_apdu_res = 
916             odr.create_presentResponse(
917                 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
918         package.response() = f_apdu_res;
919     }
920 }
921
922 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
923                                        Z_APDU *apdu_req)
924 {
925     BackendClassPtr bc = m_backend_class;
926     BackendInstancePtr backend = bc->get_backend(frontend_package);
927     if (!backend)
928     {
929         mp::odr odr;
930         Z_APDU *apdu = odr.create_scanResponse(
931             apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
932         frontend_package.response() = apdu;
933     }
934     else
935     {
936         Package scan_package(backend->m_session, frontend_package.origin());
937         scan_package.copy_filter(frontend_package);
938         scan_package.request() = apdu_req;
939         scan_package.move();
940         frontend_package.response() = scan_package.response();
941         if (scan_package.session().is_closed())
942         {
943             frontend_package.session().close();
944             bc->remove_backend(backend);
945         }
946         else
947             bc->release_backend(backend);
948     }
949 }
950
951 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
952 {
953 }
954
955 void yf::SessionShared::Worker::operator() (void)
956 {
957     m_p->expire();
958 }
959
960 void yf::SessionShared::BackendClass::expire_class()
961 {
962     time_t now;
963     time(&now);
964     boost::mutex::scoped_lock lock(m_mutex_backend_class);
965     BackendInstanceList::iterator bit = m_backend_list.begin();
966     while (bit != m_backend_list.end())
967     {
968         time_t last_use = (*bit)->m_time_last_use;
969         
970         if ((*bit)->m_in_use)
971         {
972             bit++;
973         }
974         else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
975             || (now < last_use))
976         {
977             mp::odr odr;
978             (*bit)->m_close_package->response() = odr.create_close(
979                 0, Z_Close_lackOfActivity, 0);
980             (*bit)->m_close_package->session().close();
981             (*bit)->m_close_package->move();
982
983             bit = m_backend_list.erase(bit);
984         }
985         else
986         {
987             bit++;
988         }
989     }
990 }
991
992 void yf::SessionShared::Rep::expire()
993 {
994     while (true)
995     {
996         boost::xtime xt;
997         boost::xtime_get(&xt, boost::TIME_UTC);
998         xt.sec += 30;
999         boost::thread::sleep(xt);
1000         
1001         BackendClassMap::const_iterator b_it = m_backend_map.begin();
1002         for (; b_it != m_backend_map.end(); b_it++)
1003             b_it->second->expire_class();
1004     }
1005 }
1006
1007 yf::SessionShared::Rep::Rep()
1008 {
1009     m_resultset_ttl = 30;
1010     m_resultset_max = 10;
1011     m_session_ttl = 90;
1012     yf::SessionShared::Worker w(this);
1013     m_thrds.add_thread(new boost::thread(w));
1014 }
1015
1016 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
1017 {
1018 }
1019
1020 yf::SessionShared::~SessionShared() {
1021 }
1022
1023
1024 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
1025 {
1026 }
1027
1028 yf::SessionShared::Frontend::~Frontend()
1029 {
1030 }
1031
1032 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
1033 {
1034     boost::mutex::scoped_lock lock(m_mutex);
1035
1036     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1037     
1038     while(true)
1039     {
1040         it = m_clients.find(package.session());
1041         if (it == m_clients.end())
1042             break;
1043         
1044         if (!it->second->m_in_use)
1045         {
1046             it->second->m_in_use = true;
1047             return it->second;
1048         }
1049         m_cond_session_ready.wait(lock);
1050     }
1051     FrontendPtr f(new Frontend(this));
1052     m_clients[package.session()] = f;
1053     f->m_in_use = true;
1054     return f;
1055 }
1056
1057 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
1058 {
1059     boost::mutex::scoped_lock lock(m_mutex);
1060     std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
1061     
1062     it = m_clients.find(package.session());
1063     if (it != m_clients.end())
1064     {
1065         if (package.session().is_closed())
1066         {
1067             m_clients.erase(it);
1068         }
1069         else
1070         {
1071             it->second->m_in_use = false;
1072         }
1073         m_cond_session_ready.notify_all();
1074     }
1075 }
1076
1077
1078 void yf::SessionShared::process(mp::Package &package) const
1079 {
1080     FrontendPtr f = m_p->get_frontend(package);
1081
1082     Z_GDU *gdu = package.request().get();
1083     
1084     if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
1085         Z_APDU_initRequest && !f->m_is_virtual)
1086     {
1087         m_p->init(package, gdu, f);
1088     }
1089     else if (!f->m_is_virtual)
1090         package.move();
1091     else if (gdu && gdu->which == Z_GDU_Z3950)
1092     {
1093         Z_APDU *apdu = gdu->u.z3950;
1094         if (apdu->which == Z_APDU_initRequest)
1095         {
1096             mp::odr odr;
1097             
1098             package.response() = odr.create_close(
1099                 apdu,
1100                 Z_Close_protocolError,
1101                 "double init");
1102             
1103             package.session().close();
1104         }
1105         else if (apdu->which == Z_APDU_close)
1106         {
1107             mp::odr odr;
1108             
1109             package.response() = odr.create_close(
1110                 apdu,
1111                 Z_Close_peerAbort, "received close from client");
1112             package.session().close();
1113         }
1114         else if (apdu->which == Z_APDU_searchRequest)
1115         {
1116             f->search(package, apdu);
1117         }
1118         else if (apdu->which == Z_APDU_presentRequest)
1119         {
1120             f->present(package, apdu);
1121         }
1122         else if (apdu->which == Z_APDU_scanRequest)
1123         {
1124             f->scan(package, apdu);
1125         }
1126         else
1127         {
1128             mp::odr odr;
1129             
1130             package.response() = odr.create_close(
1131                 apdu, Z_Close_protocolError,
1132                 "unsupported APDU in filter_session_shared");
1133             
1134             package.session().close();
1135         }
1136     }
1137     m_p->release_frontend(package);
1138 }
1139
1140 void yf::SessionShared::configure(const xmlNode *ptr, bool test_only)
1141 {
1142     for (ptr = ptr->children; ptr; ptr = ptr->next)
1143     {
1144         if (ptr->type != XML_ELEMENT_NODE)
1145             continue;
1146         if (!strcmp((const char *) ptr->name, "resultset"))
1147         {
1148             const struct _xmlAttr *attr;
1149             for (attr = ptr->properties; attr; attr = attr->next)
1150             {
1151                 if (!strcmp((const char *) attr->name, "ttl"))
1152                     m_p->m_resultset_ttl = 
1153                         mp::xml::get_int(attr->children, 30);
1154                 else if (!strcmp((const char *) attr->name, "max"))
1155                 {
1156                     m_p->m_resultset_max = 
1157                         mp::xml::get_int(attr->children, 10);
1158                 }
1159                 else
1160                     throw mp::filter::FilterException(
1161                         "Bad attribute " + std::string((const char *)
1162                                                        attr->name));
1163             }
1164         }
1165         else if (!strcmp((const char *) ptr->name, "session"))
1166         {
1167             const struct _xmlAttr *attr;
1168             for (attr = ptr->properties; attr; attr = attr->next)
1169             {
1170                 if (!strcmp((const char *) attr->name, "ttl"))
1171                     m_p->m_session_ttl = 
1172                         mp::xml::get_int(attr->children, 120);
1173                 else
1174                     throw mp::filter::FilterException(
1175                         "Bad attribute " + std::string((const char *)
1176                                                        attr->name));
1177             }
1178         }
1179         else
1180         {
1181             throw mp::filter::FilterException("Bad element " 
1182                                                + std::string((const char *)
1183                                                              ptr->name));
1184         }
1185     }
1186 }
1187
1188 static mp::filter::Base* filter_creator()
1189 {
1190     return new mp::filter::SessionShared;
1191 }
1192
1193 extern "C" {
1194     struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1195         0,
1196         "session_shared",
1197         filter_creator
1198     };
1199 }
1200
1201 /*
1202  * Local variables:
1203  * c-basic-offset: 4
1204  * c-file-style: "Stroustrup"
1205  * indent-tabs-mode: nil
1206  * End:
1207  * vim: shiftwidth=4 tabstop=8 expandtab
1208  */
1209