1 /* $Id: filter_session_shared.cpp,v 1.13 2006-06-20 22:27:45 adam Exp $
2 Copyright (c) 2005-2006, Index Data.
4 See the LICENSE file for details
10 #include "package.hpp"
12 #include <boost/thread/mutex.hpp>
13 #include <boost/thread/condition.hpp>
14 #include <boost/thread/thread.hpp>
15 #include <boost/thread/xtime.hpp>
16 #include <boost/shared_ptr.hpp>
17 #include <boost/format.hpp>
20 #include "filter_session_shared.hpp"
24 #include <yaz/otherinfo.h>
25 #include <yaz/diagbib1.h>
26 #include <yazpp/z-query.h>
31 namespace mp = metaproxy_1;
32 namespace yf = metaproxy_1::filter;
34 namespace metaproxy_1 {
37 class SessionShared::InitKey {
39 bool operator < (const SessionShared::InitKey &k) const;
40 InitKey(Z_InitRequest *req);
41 InitKey(const InitKey &);
44 InitKey &operator = (const InitKey &k);
45 char *m_idAuthentication_buf;
46 int m_idAuthentication_size;
47 char *m_otherInfo_buf;
51 class SessionShared::Worker {
53 Worker(SessionShared::Rep *rep);
54 void operator() (void);
56 SessionShared::Rep *m_p;
58 class SessionShared::BackendSet {
60 std::string m_result_set_id;
61 Databases m_databases;
62 int m_result_set_size;
63 yazpp_1::Yaz_Z_Query m_query;
64 time_t m_time_last_use;
67 const std::string &result_set_id,
68 const Databases &databases,
69 const yazpp_1::Yaz_Z_Query &query);
71 Package &frontend_package,
72 const Z_APDU *apdu_req,
73 const BackendInstancePtr bp);
75 class SessionShared::BackendInstance {
77 friend class BackendClass;
78 friend class BackendSet;
80 mp::Session m_session;
81 BackendSetList m_sets;
84 int m_result_set_sequence;
85 time_t m_time_last_use;
86 mp::Package * m_close_package;
89 class SessionShared::BackendClass : boost::noncopyable {
91 friend struct Frontend;
92 bool m_named_result_sets;
93 BackendInstanceList m_backend_list;
94 BackendInstancePtr create_backend(const Package &package);
95 void remove_backend(BackendInstancePtr b);
96 BackendInstancePtr get_backend(const Package &package);
97 void use_backend(BackendInstancePtr b);
98 void release_backend(BackendInstancePtr b);
100 yazpp_1::GDU m_init_request;
101 yazpp_1::GDU m_init_response;
102 boost::mutex m_mutex_backend_class;
104 time_t m_backend_set_ttl;
105 time_t m_backend_expiry_ttl;
106 size_t m_backend_set_max;
108 BackendClass(const yazpp_1::GDU &init_request);
111 class SessionShared::FrontendSet {
112 Databases m_databases;
113 yazpp_1::Yaz_Z_Query m_query;
115 const Databases &get_databases();
116 const yazpp_1::Yaz_Z_Query &get_query();
118 const Databases &databases,
119 const yazpp_1::Yaz_Z_Query &query);
122 struct SessionShared::Frontend {
127 Z_Options m_init_options;
128 void search(Package &package, Z_APDU *apdu);
129 void present(Package &package, Z_APDU *apdu);
130 void scan(Package &package, Z_APDU *apdu);
132 void get_set(mp::Package &package,
133 const Z_APDU *apdu_req,
134 const Databases &databases,
135 yazpp_1::Yaz_Z_Query &query,
136 BackendInstancePtr &found_backend,
137 BackendSetPtr &found_set);
138 void override_set(BackendInstancePtr &found_backend,
139 std::string &result_set_id);
142 BackendClassPtr m_backend_class;
143 FrontendSets m_frontend_sets;
145 class SessionShared::Rep {
146 friend class SessionShared;
147 friend struct Frontend;
149 FrontendPtr get_frontend(Package &package);
150 void release_frontend(Package &package);
155 void init(Package &package, const Z_GDU *gdu,
156 FrontendPtr frontend);
157 boost::mutex m_mutex;
158 boost::condition m_cond_session_ready;
159 std::map<mp::Session, FrontendPtr> m_clients;
161 BackendClassMap m_backend_map;
162 boost::mutex m_mutex_backend_map;
163 boost::thread_group m_thrds;
168 yf::SessionShared::FrontendSet::FrontendSet(
169 const Databases &databases,
170 const yazpp_1::Yaz_Z_Query &query)
171 : m_databases(databases), m_query(query)
175 const yf::SessionShared::Databases &
176 yf::SessionShared::FrontendSet::get_databases()
181 const yazpp_1::Yaz_Z_Query& yf::SessionShared::FrontendSet::get_query()
186 yf::SessionShared::InitKey::InitKey(const InitKey &k)
188 m_odr = odr_createmem(ODR_ENCODE);
190 m_idAuthentication_size = k.m_idAuthentication_size;
191 m_idAuthentication_buf = (char*)odr_malloc(m_odr, m_idAuthentication_size);
192 memcpy(m_idAuthentication_buf, k.m_idAuthentication_buf,
193 m_idAuthentication_size);
195 m_otherInfo_size = k.m_otherInfo_size;
196 m_otherInfo_buf = (char*)odr_malloc(m_odr, m_otherInfo_size);
197 memcpy(m_otherInfo_buf, k.m_otherInfo_buf,
201 yf::SessionShared::InitKey::InitKey(Z_InitRequest *req)
203 m_odr = odr_createmem(ODR_ENCODE);
205 Z_IdAuthentication *t = req->idAuthentication;
206 z_IdAuthentication(m_odr, &t, 1, 0);
207 m_idAuthentication_buf =
208 odr_getbuf(m_odr, &m_idAuthentication_size, 0);
210 Z_OtherInformation *o = req->otherInfo;
211 z_OtherInformation(m_odr, &o, 1, 0);
212 m_otherInfo_buf = odr_getbuf(m_odr, &m_otherInfo_size, 0);
215 yf::SessionShared::InitKey::~InitKey()
220 bool yf::SessionShared::InitKey::operator < (const SessionShared::InitKey &k)
224 c = mp::util::memcmp2(
225 (void*) m_idAuthentication_buf, m_idAuthentication_size,
226 (void*) k.m_idAuthentication_buf, k.m_idAuthentication_size);
232 c = mp::util::memcmp2((void*) m_otherInfo_buf, m_otherInfo_size,
233 (void*) k.m_otherInfo_buf, k.m_otherInfo_size);
241 void yf::SessionShared::BackendClass::release_backend(BackendInstancePtr b)
243 boost::mutex::scoped_lock lock(m_mutex_backend_class);
248 void yf::SessionShared::BackendClass::remove_backend(BackendInstancePtr b)
250 BackendInstanceList::iterator it = m_backend_list.begin();
252 while (it != m_backend_list.end())
255 it = m_backend_list.erase(it);
263 yf::SessionShared::BackendInstancePtr
264 yf::SessionShared::BackendClass::get_backend(
265 const mp::Package &frontend_package)
268 boost::mutex::scoped_lock lock(m_mutex_backend_class);
270 BackendInstanceList::const_iterator it = m_backend_list.begin();
272 BackendInstancePtr backend1; // null
274 for (; it != m_backend_list.end(); it++)
276 if (!(*it)->m_in_use)
279 || (*it)->m_sequence_this < backend1->m_sequence_this)
285 use_backend(backend1);
289 return create_backend(frontend_package);
292 void yf::SessionShared::BackendClass::use_backend(BackendInstancePtr backend)
294 backend->m_in_use = true;
295 time(&backend->m_time_last_use);
296 backend->m_sequence_this = m_sequence_top++;
299 yf::SessionShared::BackendInstance::~BackendInstance()
301 delete m_close_package;
304 yf::SessionShared::BackendInstancePtr yf::SessionShared::BackendClass::create_backend(
305 const mp::Package &frontend_package)
307 BackendInstancePtr bp(new BackendInstance);
308 BackendInstancePtr null;
310 bp->m_close_package =
311 new mp::Package(bp->m_session, frontend_package.origin());
312 bp->m_close_package->copy_filter(frontend_package);
314 Package init_package(bp->m_session, frontend_package.origin());
316 init_package.copy_filter(frontend_package);
318 yazpp_1::GDU actual_init_request = m_init_request;
319 Z_GDU *init_pdu = actual_init_request.get();
321 assert(init_pdu->which == Z_GDU_Z3950);
322 assert(init_pdu->u.z3950->which == Z_APDU_initRequest);
324 Z_InitRequest *req = init_pdu->u.z3950->u.initRequest;
325 ODR_MASK_ZERO(req->options);
327 ODR_MASK_SET(req->options, Z_Options_search);
328 ODR_MASK_SET(req->options, Z_Options_present);
329 ODR_MASK_SET(req->options, Z_Options_namedResultSets);
330 ODR_MASK_SET(req->options, Z_Options_scan);
332 ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_1);
333 ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_2);
334 ODR_MASK_SET(req->protocolVersion, Z_ProtocolVersion_3);
336 init_package.request() = init_pdu;
340 boost::mutex::scoped_lock lock(m_mutex_backend_class);
342 m_named_result_sets = false;
343 Z_GDU *gdu = init_package.response().get();
344 if (!init_package.session().is_closed()
345 && gdu && gdu->which == Z_GDU_Z3950
346 && gdu->u.z3950->which == Z_APDU_initResponse)
348 Z_InitResponse *res = gdu->u.z3950->u.initResponse;
351 m_init_response = gdu->u.z3950;
352 if (ODR_MASK_GET(res->options, Z_Options_namedResultSets))
354 m_named_result_sets = true;
359 // did not receive an init response or closed
363 time(&bp->m_time_last_use);
364 bp->m_sequence_this = 0;
365 bp->m_result_set_sequence = 0;
366 m_backend_list.push_back(bp);
372 yf::SessionShared::BackendClass::BackendClass(const yazpp_1::GDU &init_request)
373 : m_named_result_sets(false), m_init_request(init_request),
374 m_sequence_top(0), m_backend_set_ttl(30),
375 m_backend_expiry_ttl(30), m_backend_set_max(10)
378 yf::SessionShared::BackendClass::~BackendClass()
381 void yf::SessionShared::Rep::init(mp::Package &package, const Z_GDU *gdu,
382 FrontendPtr frontend)
384 Z_InitRequest *req = gdu->u.z3950->u.initRequest;
386 frontend->m_is_virtual = true;
387 frontend->m_init_options = *req->options;
390 boost::mutex::scoped_lock lock(m_mutex_backend_map);
391 BackendClassMap::const_iterator it;
392 it = m_backend_map.find(k);
393 if (it == m_backend_map.end())
395 BackendClassPtr b(new BackendClass(gdu->u.z3950));
396 m_backend_map[k] = b;
397 frontend->m_backend_class = b;
398 std::cout << "SessionShared::Rep::init new session "
399 << frontend->m_backend_class << "\n";
403 frontend->m_backend_class = it->second;
404 std::cout << "SessionShared::Rep::init existing session "
405 << frontend->m_backend_class << "\n";
408 BackendClassPtr bc = frontend->m_backend_class;
409 BackendInstancePtr backend = bc->get_backend(package);
414 Z_APDU *apdu = odr.create_initResponse(gdu->u.z3950, 0, 0);
415 *apdu->u.initResponse->result = 0;
416 package.response() = apdu;
417 package.session().close();
421 boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
422 yazpp_1::GDU init_response = bc->m_init_response;
423 Z_GDU *response_gdu = init_response.get();
424 mp::util::transfer_referenceId(odr, gdu->u.z3950,
425 response_gdu->u.z3950);
427 Z_Options *server_options =
428 response_gdu->u.z3950->u.initResponse->options;
429 Z_Options *client_options = &frontend->m_init_options;
432 for (i = 0; i<30; i++)
433 if (!ODR_MASK_GET(client_options, i))
434 ODR_MASK_CLEAR(server_options, i);
435 package.response() = init_response;
438 bc->release_backend(backend);
441 void yf::SessionShared::BackendSet::timestamp()
443 time(&m_time_last_use);
446 yf::SessionShared::BackendSet::BackendSet(
447 const std::string &result_set_id,
448 const Databases &databases,
449 const yazpp_1::Yaz_Z_Query &query) :
450 m_result_set_id(result_set_id),
451 m_databases(databases), m_result_set_size(0), m_query(query)
456 bool yf::SessionShared::BackendSet::search(
457 Package &frontend_package,
458 const Z_APDU *frontend_apdu,
459 const BackendInstancePtr bp)
461 Package search_package(bp->m_session, frontend_package.origin());
463 search_package.copy_filter(frontend_package);
466 Z_APDU *apdu_req = zget_APDU(odr, Z_APDU_searchRequest);
467 Z_SearchRequest *req = apdu_req->u.searchRequest;
469 req->resultSetName = odr_strdup(odr, m_result_set_id.c_str());
470 req->query = m_query.get_Z_Query();
472 req->num_databaseNames = m_databases.size();
473 req->databaseNames = (char**)
474 odr_malloc(odr, req->num_databaseNames * sizeof(char *));
475 Databases::const_iterator it = m_databases.begin();
477 for (; it != m_databases.end(); it++)
478 req->databaseNames[i++] = odr_strdup(odr, it->c_str());
480 search_package.request() = apdu_req;
482 search_package.move();
484 Z_Records *z_records_diag = 0;
485 Z_GDU *gdu = search_package.response().get();
486 if (!search_package.session().is_closed()
487 && gdu && gdu->which == Z_GDU_Z3950
488 && gdu->u.z3950->which == Z_APDU_searchResponse)
490 Z_SearchResponse *b_resp = gdu->u.z3950->u.searchResponse;
493 if (b_resp->records->which == Z_Records_NSD
494 || b_resp->records->which == Z_Records_multipleNSD)
495 z_records_diag = b_resp->records;
499 if (frontend_apdu->which == Z_APDU_searchRequest)
501 Z_APDU *f_apdu = odr.create_searchResponse(frontend_apdu,
503 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
504 f_resp->records = z_records_diag;
505 frontend_package.response() = f_apdu;
508 if (frontend_apdu->which == Z_APDU_presentRequest)
510 Z_APDU *f_apdu = odr.create_presentResponse(frontend_apdu,
512 Z_PresentResponse *f_resp = f_apdu->u.presentResponse;
513 f_resp->records = z_records_diag;
514 frontend_package.response() = f_apdu;
518 m_result_set_size = *b_resp->resultCount;
522 if (frontend_apdu->which == Z_APDU_searchRequest)
523 f_apdu = odr.create_searchResponse(
524 frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
525 else if (frontend_apdu->which == Z_APDU_presentRequest)
526 f_apdu = odr.create_presentResponse(
527 frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
529 f_apdu = odr.create_close(
530 frontend_apdu, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
531 frontend_package.response() = f_apdu;
535 void yf::SessionShared::Frontend::override_set(
536 BackendInstancePtr &found_backend,
537 std::string &result_set_id)
539 BackendClassPtr bc = m_backend_class;
540 BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
544 for (; it != bc->m_backend_list.end(); it++)
546 if (!(*it)->m_in_use)
548 BackendSetList::iterator set_it = (*it)->m_sets.begin();
549 for (; set_it != (*it)->m_sets.end(); set_it++)
551 if (now >= (*set_it)->m_time_last_use &&
552 now - (*set_it)->m_time_last_use > bc->m_backend_set_ttl)
555 result_set_id = (*set_it)->m_result_set_id;
556 found_backend->m_sets.erase(set_it);
557 std::cout << "REUSE TTL SET: " << result_set_id << "\n";
563 size_t max_sets = bc->m_named_result_sets ? bc->m_backend_set_max : 1;
564 for (it = bc->m_backend_list.begin(); it != bc->m_backend_list.end(); it++)
566 if (!(*it)->m_in_use && (*it)->m_sets.size() < max_sets)
569 if (bc->m_named_result_sets)
571 result_set_id = boost::io::str(
572 boost::format("%1%") %
573 found_backend->m_result_set_sequence);
574 found_backend->m_result_set_sequence++;
577 result_set_id = "default";
578 std::cout << "AVAILABLE SET: " << result_set_id << "\n";
584 void yf::SessionShared::Frontend::get_set(mp::Package &package,
585 const Z_APDU *apdu_req,
586 const Databases &databases,
587 yazpp_1::Yaz_Z_Query &query,
588 BackendInstancePtr &found_backend,
589 BackendSetPtr &found_set)
591 std::string result_set_id;
592 BackendClassPtr bc = m_backend_class;
594 boost::mutex::scoped_lock lock(bc->m_mutex_backend_class);
596 // look at each backend and see if we have a similar search
597 BackendInstanceList::const_iterator it = bc->m_backend_list.begin();
599 for (; it != bc->m_backend_list.end(); it++)
601 if (!(*it)->m_in_use)
603 BackendSetList::const_iterator set_it = (*it)->m_sets.begin();
604 for (; set_it != (*it)->m_sets.end(); set_it++)
606 if ((*set_it)->m_databases == databases
607 && query.match(&(*set_it)->m_query))
611 bc->use_backend(found_backend);
612 found_set->timestamp();
613 std::cout << "MATCH SET: " <<
614 found_set->m_result_set_id << "\n";
615 // found matching set. No need to search again
621 override_set(found_backend, result_set_id);
623 bc->use_backend(found_backend);
627 // create a new backend set (and new set)
628 found_backend = bc->create_backend(package);
634 if (apdu_req->which == Z_APDU_searchRequest)
636 f_apdu = odr.create_searchResponse(
637 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
639 else if (apdu_req->which == Z_APDU_presentRequest)
641 f_apdu = odr.create_presentResponse(
642 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
646 f_apdu = odr.create_close(
647 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
649 package.response() = f_apdu;
652 std::cout << "NEW " << found_backend << "\n";
654 if (bc->m_named_result_sets)
656 result_set_id = boost::io::str(
657 boost::format("%1%") % found_backend->m_result_set_sequence);
658 found_backend->m_result_set_sequence++;
661 result_set_id = "default";
662 std::cout << "NEW SET: " << result_set_id << "\n";
664 // we must search ...
665 BackendSetPtr new_set(new BackendSet(result_set_id,
667 if (!new_set->search(package, apdu_req, found_backend))
669 std::cout << "search error\n";
670 bc->remove_backend(found_backend);
671 return; // search error
674 found_set->timestamp();
675 found_backend->m_sets.push_back(found_set);
678 void yf::SessionShared::Frontend::search(mp::Package &package,
681 Z_SearchRequest *req = apdu_req->u.searchRequest;
682 FrontendSets::iterator fset_it =
683 m_frontend_sets.find(req->resultSetName);
684 if (fset_it != m_frontend_sets.end())
686 // result set already exist
687 // if replace indicator is off: we return diagnostic if
688 // result set already exist.
689 if (*req->replaceIndicator == 0)
693 odr.create_searchResponse(
695 YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
697 package.response() = apdu;
701 m_frontend_sets.erase(fset_it);
704 yazpp_1::Yaz_Z_Query query;
705 query.set_Z_Query(req->query);
708 for (i = 0; i<req->num_databaseNames; i++)
709 databases.push_back(req->databaseNames[i]);
711 BackendSetPtr found_set; // null
712 BackendInstancePtr found_backend; // null
714 get_set(package, apdu_req, databases, query, found_backend, found_set);
719 Z_APDU *f_apdu = odr.create_searchResponse(apdu_req, 0, 0);
720 Z_SearchResponse *f_resp = f_apdu->u.searchResponse;
721 *f_resp->resultCount = found_set->m_result_set_size;
722 package.response() = f_apdu;
724 FrontendSetPtr fset(new FrontendSet(databases, query));
725 m_frontend_sets[req->resultSetName] = fset;
727 m_backend_class->release_backend(found_backend);
730 void yf::SessionShared::Frontend::present(mp::Package &package,
734 Z_PresentRequest *req = apdu_req->u.presentRequest;
736 FrontendSets::iterator fset_it =
737 m_frontend_sets.find(req->resultSetId);
739 if (fset_it == m_frontend_sets.end())
742 odr.create_presentResponse(
744 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
746 package.response() = apdu;
749 FrontendSetPtr fset = fset_it->second;
751 Databases databases = fset->get_databases();
752 yazpp_1::Yaz_Z_Query query = fset->get_query();
754 BackendClassPtr bc = m_backend_class;
755 BackendSetPtr found_set; // null
756 BackendInstancePtr found_backend;
758 get_set(package, apdu_req, databases, query, found_backend, found_set);
762 Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
763 Z_PresentRequest *p_req = p_apdu->u.presentRequest;
764 p_req->preferredRecordSyntax = req->preferredRecordSyntax;
765 p_req->resultSetId = odr_strdup(odr, found_set->m_result_set_id.c_str());
766 *p_req->resultSetStartPoint = *req->resultSetStartPoint;
767 *p_req->numberOfRecordsRequested = *req->numberOfRecordsRequested;
768 p_req->preferredRecordSyntax = req->preferredRecordSyntax;
769 p_req->recordComposition = req->recordComposition;
771 Package present_package(found_backend->m_session, package.origin());
772 present_package.copy_filter(package);
774 present_package.request() = p_apdu;
776 present_package.move();
778 Z_GDU *gdu = present_package.response().get();
779 if (!present_package.session().is_closed()
780 && gdu && gdu->which == Z_GDU_Z3950
781 && gdu->u.z3950->which == Z_APDU_presentResponse)
783 Z_PresentResponse *b_resp = gdu->u.z3950->u.presentResponse;
784 Z_APDU *f_apdu_res = odr.create_presentResponse(apdu_req, 0, 0);
785 Z_PresentResponse *f_resp = f_apdu_res->u.presentResponse;
787 f_resp->numberOfRecordsReturned = b_resp->numberOfRecordsReturned;
788 f_resp->nextResultSetPosition = b_resp->nextResultSetPosition;
789 f_resp->presentStatus= b_resp->presentStatus;
790 f_resp->records = b_resp->records;
791 f_resp->otherInfo = b_resp->otherInfo;
792 package.response() = f_apdu_res;
793 bc->release_backend(found_backend);
797 bc->remove_backend(found_backend);
799 odr.create_presentResponse(
800 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
801 package.response() = f_apdu_res;
805 void yf::SessionShared::Frontend::scan(mp::Package &frontend_package,
808 BackendClassPtr bc = m_backend_class;
809 BackendInstancePtr backend = bc->get_backend(frontend_package);
813 Z_APDU *apdu = odr.create_scanResponse(
814 apdu_req, YAZ_BIB1_TEMPORARY_SYSTEM_ERROR, 0);
815 frontend_package.response() = apdu;
819 Package scan_package(backend->m_session, frontend_package.origin());
820 scan_package.copy_filter(frontend_package);
821 scan_package.request() = apdu_req;
823 frontend_package.response() = scan_package.response();
824 if (scan_package.session().is_closed())
826 frontend_package.session().close();
827 bc->remove_backend(backend);
830 bc->release_backend(backend);
834 yf::SessionShared::Worker::Worker(SessionShared::Rep *rep) : m_p(rep)
838 void yf::SessionShared::Worker::operator() (void)
843 void yf::SessionShared::BackendClass::expire()
847 boost::mutex::scoped_lock lock(m_mutex_backend_class);
848 BackendInstanceList::iterator bit = m_backend_list.begin();
849 while (bit != m_backend_list.end())
851 std::cout << "expiry ";
852 time_t last_use = (*bit)->m_time_last_use;
854 if ((*bit)->m_in_use)
856 std::cout << "inuse";
859 else if ((now >= last_use && now - last_use > m_backend_expiry_ttl)
863 (*bit)->m_close_package->response() = odr.create_close(
864 0, Z_Close_lackOfActivity, 0);
865 (*bit)->m_close_package->session().close();
866 (*bit)->m_close_package->move();
868 bit = m_backend_list.erase(bit);
869 std::cout << "erase";
876 std::cout << std::endl;
880 void yf::SessionShared::Rep::expire()
885 boost::xtime_get(&xt, boost::TIME_UTC);
887 boost::thread::sleep(xt);
888 std::cout << "." << std::endl;
890 BackendClassMap::const_iterator b_it = m_backend_map.begin();
891 for (; b_it != m_backend_map.end(); b_it++)
892 b_it->second->expire();
896 yf::SessionShared::Rep::Rep()
898 yf::SessionShared::Worker w(this);
899 m_thrds.add_thread(new boost::thread(w));
902 yf::SessionShared::SessionShared() : m_p(new SessionShared::Rep)
906 yf::SessionShared::~SessionShared() {
910 yf::SessionShared::Frontend::Frontend(Rep *rep) : m_is_virtual(false), m_p(rep)
914 yf::SessionShared::Frontend::~Frontend()
918 yf::SessionShared::FrontendPtr yf::SessionShared::Rep::get_frontend(mp::Package &package)
920 boost::mutex::scoped_lock lock(m_mutex);
922 std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
926 it = m_clients.find(package.session());
927 if (it == m_clients.end())
930 if (!it->second->m_in_use)
932 it->second->m_in_use = true;
935 m_cond_session_ready.wait(lock);
937 FrontendPtr f(new Frontend(this));
938 m_clients[package.session()] = f;
943 void yf::SessionShared::Rep::release_frontend(mp::Package &package)
945 boost::mutex::scoped_lock lock(m_mutex);
946 std::map<mp::Session,yf::SessionShared::FrontendPtr>::iterator it;
948 it = m_clients.find(package.session());
949 if (it != m_clients.end())
951 if (package.session().is_closed())
957 it->second->m_in_use = false;
959 m_cond_session_ready.notify_all();
964 void yf::SessionShared::process(mp::Package &package) const
966 FrontendPtr f = m_p->get_frontend(package);
968 Z_GDU *gdu = package.request().get();
970 if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
971 Z_APDU_initRequest && !f->m_is_virtual)
973 m_p->init(package, gdu, f);
975 else if (!f->m_is_virtual)
977 else if (gdu && gdu->which == Z_GDU_Z3950)
979 Z_APDU *apdu = gdu->u.z3950;
980 if (apdu->which == Z_APDU_initRequest)
984 package.response() = odr.create_close(
986 Z_Close_protocolError,
989 package.session().close();
991 else if (apdu->which == Z_APDU_close)
995 package.response() = odr.create_close(
997 Z_Close_peerAbort, "received close from client");
998 package.session().close();
1000 else if (apdu->which == Z_APDU_searchRequest)
1002 f->search(package, apdu);
1004 else if (apdu->which == Z_APDU_presentRequest)
1006 f->present(package, apdu);
1008 else if (apdu->which == Z_APDU_scanRequest)
1010 f->scan(package, apdu);
1016 package.response() = odr.create_close(
1017 apdu, Z_Close_protocolError,
1018 "unsupported APDU in filter_session_shared");
1020 package.session().close();
1023 m_p->release_frontend(package);
1026 static mp::filter::Base* filter_creator()
1028 return new mp::filter::SessionShared;
1032 struct metaproxy_1_filter_struct metaproxy_1_filter_session_shared = {
1042 * indent-tabs-mode: nil
1043 * c-file-style: "stroustrup"
1045 * vim: shiftwidth=4 tabstop=8 expandtab