Experiments with session map template.
[metaproxy-moved-to-github.git] / src / filter_virt_db.cpp
1 /* $Id: filter_virt_db.cpp,v 1.9 2005-10-26 18:53:49 adam Exp $
2    Copyright (c) 2005, Index Data.
3
4 %LICENSE%
5  */
6
7 #include "config.hpp"
8
9 #include "filter.hpp"
10 #include "router.hpp"
11 #include "package.hpp"
12
13 #include <boost/thread/mutex.hpp>
14
15 #include "filter_virt_db.hpp"
16
17 #include <yaz/zgdu.h>
18 #include <yaz/log.h>
19 #include <yaz/otherinfo.h>
20 #include <yaz/diagbib1.h>
21
22 #include <list>
23 #include <map>
24 #include <iostream>
25
26 namespace yf = yp2::filter;
27
28 namespace yp2 {
29     namespace filter {
30         struct Virt_db_set {
31             Virt_db_set(yp2::Session &id, std::string setname,
32                         std::string vhost, bool named_result_sets);
33             Virt_db_set();
34             ~Virt_db_set();
35
36             yp2::Session m_backend_session;
37             std::string m_backend_setname;
38             std::string m_vhost;
39             bool m_named_result_sets;
40         };
41         struct Virt_db_session {
42             Virt_db_session(yp2::Session &id, bool use_vhost);
43             Virt_db_session();
44             yp2::Session m_session;
45             bool m_use_vhost;
46             std::map<std::string,Virt_db_set> m_sets;
47         };
48         struct Virt_db_map {
49             Virt_db_map(std::string vhost);
50             Virt_db_map();
51             std::string m_vhost;
52         };
53         class Virt_db::Rep {
54             friend class Virt_db;
55             
56             void release_session(Package &package);
57             void init(Package &package, Z_APDU *apdu, bool &move_later);
58             void search(Package &package, Z_APDU *apdu, bool &move_later);
59             void present(Package &package, Z_APDU *apdu, bool &move_later);
60         private:
61             boost::mutex m_sessions_mutex;
62             std::map<yp2::Session,Virt_db_session>m_sessions;
63             std::map<std::string, Virt_db_map>m_maps;
64
65             typedef std::map<yp2::Session,Virt_db_session>::iterator Ses_it;
66             typedef std::map<std::string,Virt_db_set>::iterator Sets_it;
67         };
68     }
69 }
70
71 yf::Virt_db_set::Virt_db_set(yp2::Session &id, std::string setname,
72                              std::string vhost, bool named_result_sets)
73     :   m_backend_session(id), m_backend_setname(setname), m_vhost(vhost),
74         m_named_result_sets(named_result_sets)
75 {
76 }
77
78
79 yf::Virt_db_set::Virt_db_set()
80 {
81 }
82
83
84 yf::Virt_db_set::~Virt_db_set()
85 {
86 }
87
88 yf::Virt_db_map::Virt_db_map(std::string vhost)
89     : m_vhost(vhost) 
90 {
91 }
92
93 yf::Virt_db_map::Virt_db_map()
94 {
95 }
96
97 yf::Virt_db_session::Virt_db_session()
98     : m_use_vhost(false)
99 {
100
101 }
102
103 yf::Virt_db_session::Virt_db_session(yp2::Session &id,
104                                      bool use_vhost) :
105     m_session(id) , m_use_vhost(use_vhost)
106 {
107
108 }
109
110 yf::Virt_db::Virt_db() {
111     m_p = new Virt_db::Rep;
112 }
113
114 yf::Virt_db::~Virt_db() {
115     delete m_p;
116 }
117
118 void yf::Virt_db::Rep::release_session(Package &package)
119 {
120     boost::mutex::scoped_lock lock(m_sessions_mutex);
121     
122     m_sessions.erase(package.session());
123 }
124
125 void yf::Virt_db::Rep::present(Package &package, Z_APDU *apdu, bool &move_later){
126     Session *id = 0;
127     Z_PresentRequest *req = apdu->u.presentRequest;
128     std::string resultSetId = req->resultSetId;
129     {
130         boost::mutex::scoped_lock lock(m_sessions_mutex);
131         
132         Ses_it it = m_sessions.find(package.session());
133         if (it == m_sessions.end())
134         {
135             ODR odr = odr_createmem(ODR_ENCODE);
136             
137             Z_APDU *apdu = zget_APDU(odr, Z_APDU_close);
138             
139             *apdu->u.close->closeReason = Z_Close_protocolError;
140             apdu->u.close->diagnosticInformation =
141                 odr_strdup(odr, "no session for present request");
142             
143             package.response() = apdu;
144             package.session().close();
145             odr_destroy(odr);
146             assert(false);
147             return;
148         }
149         if (it->second.m_use_vhost)
150         {
151             move_later = true;
152             return;
153         }
154         Sets_it sets_it = it->second.m_sets.find(resultSetId);
155         if (sets_it == it->second.m_sets.end())
156         {
157             ODR odr = odr_createmem(ODR_ENCODE);
158             Z_APDU *apdu = zget_APDU(odr, Z_APDU_presentResponse);
159             
160             Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
161             apdu->u.presentResponse->records = rec;
162             rec->which = Z_Records_NSD;
163             rec->u.nonSurrogateDiagnostic =
164                 zget_DefaultDiagFormat(
165                     odr,
166                     YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
167                     resultSetId.c_str());
168             package.response() = apdu;
169             odr_destroy(odr);
170             return;
171         }
172         id = new yp2::Session(sets_it->second.m_backend_session);
173     }
174     ODR odr = odr_createmem(ODR_ENCODE);
175     
176     // sending present to backend
177     Package present_package(*id, package.origin());
178     present_package.copy_filter(package);
179     
180     req->resultSetId = odr_strdup(odr, "default");
181     present_package.request() = yazpp_1::GDU(apdu);
182
183     odr_destroy(odr);
184
185     present_package.move();
186
187     if (present_package.session().is_closed())
188     {
189         ODR odr = odr_createmem(ODR_ENCODE);
190         Z_APDU *apdu = zget_APDU(odr, Z_APDU_presentResponse);
191         
192         Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
193         apdu->u.presentResponse->records = rec;
194         rec->which = Z_Records_NSD;
195         rec->u.nonSurrogateDiagnostic =
196             zget_DefaultDiagFormat(
197                 odr,
198                 YAZ_BIB1_RESULT_SET_NO_LONGER_EXISTS_UNILATERALLY_DELETED_BY_,
199                 resultSetId.c_str());
200         package.response() = apdu;
201         
202         odr_destroy(odr);
203
204         boost::mutex::scoped_lock lock(m_sessions_mutex);
205         Ses_it it = m_sessions.find(package.session());
206         if (it != m_sessions.end())
207             it->second.m_sets.erase(resultSetId);
208     }
209     else
210     {
211         package.response() = present_package.response();
212     }
213     delete id;
214 }
215
216 void yf::Virt_db::Rep::search(Package &package, Z_APDU *apdu, bool &move_later)
217 {
218     Z_SearchRequest *req = apdu->u.searchRequest;
219     std::string vhost;
220     std::string database;
221     std::string resultSetId = req->resultSetName;
222     bool support_named_result_sets = false;  // whether backend supports it
223     {
224         boost::mutex::scoped_lock lock(m_sessions_mutex);
225
226         Ses_it it = m_sessions.find(package.session());
227         if (it == m_sessions.end())
228         {
229             ODR odr = odr_createmem(ODR_ENCODE);
230             
231             Z_APDU *apdu = zget_APDU(odr, Z_APDU_close);
232             
233             *apdu->u.close->closeReason = Z_Close_protocolError;
234             apdu->u.close->diagnosticInformation =
235                 odr_strdup(odr, "no session for search request");
236             
237             package.response() = apdu;
238             package.session().close();
239             odr_destroy(odr);
240             return;
241         }
242         if (it->second.m_use_vhost)
243         {
244             move_later = true;
245             return;
246         }
247         if (req->num_databaseNames != 1)
248         {   // exactly one database must be specified
249             ODR odr = odr_createmem(ODR_ENCODE);
250             Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
251             
252             Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
253             apdu->u.searchResponse->records = rec;
254             rec->which = Z_Records_NSD;
255             rec->u.nonSurrogateDiagnostic =
256                 zget_DefaultDiagFormat(
257                     odr, YAZ_BIB1_TOO_MANY_DATABASES_SPECIFIED, 0);
258             package.response() = apdu;
259             
260             odr_destroy(odr);
261             return;
262         }
263         database = req->databaseNames[0];
264         std::map<std::string, Virt_db_map>::iterator map_it;
265         map_it = m_maps.find(database);
266         if (map_it == m_maps.end()) 
267         {   // no map for database: return diagnostic
268             ODR odr = odr_createmem(ODR_ENCODE);
269             Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
270             
271             Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
272             apdu->u.searchResponse->records = rec;
273             rec->which = Z_Records_NSD;
274             rec->u.nonSurrogateDiagnostic =
275                 zget_DefaultDiagFormat(
276                     odr, YAZ_BIB1_DATABASE_DOES_NOT_EXIST, database.c_str());
277             package.response() = apdu;
278             
279             odr_destroy(odr);
280             return;
281         }
282         if (*req->replaceIndicator == 0)
283         {
284             Sets_it sets_it = it->second.m_sets.find(req->resultSetName);
285             if (sets_it != it->second.m_sets.end())
286             {
287                 ODR odr = odr_createmem(ODR_ENCODE);
288                 Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
289                 
290                 Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
291                 apdu->u.searchResponse->records = rec;
292                 rec->which = Z_Records_NSD;
293                 rec->u.nonSurrogateDiagnostic =
294                     zget_DefaultDiagFormat(
295                         odr,
296                         YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
297                         0);
298                 package.response() = apdu;
299                 
300                 odr_destroy(odr);
301                 return;
302             }
303         }
304         it->second.m_sets.erase(req->resultSetName);
305         vhost = map_it->second.m_vhost;
306     }
307     // we might look for an existing session with same vhost
308     Session id;
309     const char *vhost_cstr = vhost.c_str();
310     if (true)
311     {  // sending init to backend
312         Package init_package(id, package.origin());
313         init_package.copy_filter(package);
314         
315         ODR odr = odr_createmem(ODR_ENCODE);
316         Z_APDU *init_apdu = zget_APDU(odr, Z_APDU_initRequest);
317         
318         yaz_oi_set_string_oidval(&init_apdu->u.initRequest->otherInfo, odr,
319                                  VAL_PROXY, 1, vhost_cstr);
320         
321         init_package.request() = init_apdu;
322         odr_destroy(odr);
323
324         init_package.move();  // sending init 
325
326         if (init_package.session().is_closed())
327         {
328             ODR odr = odr_createmem(ODR_ENCODE);
329             Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
330             
331             Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
332             apdu->u.searchResponse->records = rec;
333             rec->which = Z_Records_NSD;
334             rec->u.nonSurrogateDiagnostic =
335                 zget_DefaultDiagFormat(
336                     odr, YAZ_BIB1_DATABASE_UNAVAILABLE, database.c_str());
337             package.response() = apdu;
338             
339             odr_destroy(odr);
340         }
341         Z_GDU *gdu = init_package.response().get();
342         // we hope to get an init response
343         if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
344             Z_APDU_initResponse)
345         {
346             if (ODR_MASK_GET(gdu->u.z3950->u.initResponse->options,
347                              Z_Options_namedResultSets))
348                 support_named_result_sets = true;
349         }
350         else
351         {
352             ODR odr = odr_createmem(ODR_ENCODE);
353             Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
354             
355             Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
356             apdu->u.searchResponse->records = rec;
357             rec->which = Z_Records_NSD;
358             rec->u.nonSurrogateDiagnostic =
359                 zget_DefaultDiagFormat(
360                     odr, YAZ_BIB1_DATABASE_UNAVAILABLE, database.c_str());
361             package.response() = apdu;
362             
363             odr_destroy(odr);
364             return;
365         }
366     }
367     // sending search to backend
368     Package search_package(id, package.origin());
369
370     search_package.copy_filter(package);
371     const char *sep = strchr(vhost_cstr, '/');
372     ODR odr = odr_createmem(ODR_ENCODE);
373     if (sep)
374         req->databaseNames[0] = odr_strdup(odr, sep+1);
375
376     *req->replaceIndicator = 1;
377
378     std::string backend_resultSetId = "default";
379     req->resultSetName = odr_strdup(odr, backend_resultSetId.c_str());
380     search_package.request() = yazpp_1::GDU(apdu);
381     
382     odr_destroy(odr);
383     
384     search_package.move();
385
386     if (search_package.session().is_closed())
387     {
388         ODR odr = odr_createmem(ODR_ENCODE);
389         Z_APDU *apdu = zget_APDU(odr, Z_APDU_searchResponse);
390         
391         Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
392         apdu->u.searchResponse->records = rec;
393         rec->which = Z_Records_NSD;
394         rec->u.nonSurrogateDiagnostic =
395             zget_DefaultDiagFormat(
396                 odr, YAZ_BIB1_DATABASE_UNAVAILABLE, database.c_str());
397         package.response() = apdu;
398         
399         odr_destroy(odr);
400         return;
401     }
402     package.response() = search_package.response();
403     
404     boost::mutex::scoped_lock lock(m_sessions_mutex);
405     Ses_it it = m_sessions.find(package.session());
406     if (it != m_sessions.end())
407         it->second.m_sets[resultSetId] =
408             Virt_db_set(id, backend_resultSetId, vhost,
409                         support_named_result_sets);
410 }
411
412 void yf::Virt_db::Rep::init(Package &package, Z_APDU *apdu, bool &move_later)
413 {
414     release_session(package);
415     boost::mutex::scoped_lock lock(m_sessions_mutex);
416
417     Z_InitRequest *req = apdu->u.initRequest;
418     
419     const char *vhost =
420         yaz_oi_get_string_oidval(&req->otherInfo, VAL_PROXY, 1, 0);
421     if (!vhost)
422     {
423         ODR odr = odr_createmem(ODR_ENCODE);
424         
425         Z_APDU *apdu = zget_APDU(odr, Z_APDU_initResponse);
426         Z_InitResponse *resp = apdu->u.initResponse;
427         
428         int i;
429         static const int masks[] = {
430             Z_Options_search, Z_Options_present, Z_Options_namedResultSets, -1 
431         };
432         for (i = 0; masks[i] != -1; i++)
433             if (ODR_MASK_GET(req->options, masks[i]))
434                 ODR_MASK_SET(resp->options, masks[i]);
435         
436         static const int versions[] = {
437             Z_ProtocolVersion_1,
438             Z_ProtocolVersion_2,
439             Z_ProtocolVersion_3,
440             -1
441         };
442         for (i = 0; versions[i] != -1; i++)
443             if (ODR_MASK_GET(req->protocolVersion, versions[i]))
444                 ODR_MASK_SET(resp->protocolVersion, versions[i]);
445             else
446                 break;
447
448         package.response() = apdu;
449         
450         odr_destroy(odr);
451
452         m_sessions[package.session()] = Virt_db_session(package.session(), false);
453     }
454     else
455     {
456         m_sessions[package.session()] = Virt_db_session(package.session(), true);
457         move_later = true;
458     }
459 }
460
461 void yf::Virt_db::add_map_db2vhost(std::string db, std::string vhost)
462 {
463     m_p->m_maps[db] = Virt_db_map(vhost);
464 }
465
466 void yf::Virt_db::process(Package &package) const
467 {
468     Z_GDU *gdu = package.request().get();
469
470     if (!gdu || gdu->which != Z_GDU_Z3950)
471         package.move();
472     else
473     {
474         bool move_later = false;
475         Z_APDU *apdu = gdu->u.z3950;
476         if (apdu->which == Z_APDU_initRequest)
477         {
478             m_p->init(package, apdu, move_later);
479         }
480         else if (apdu->which == Z_APDU_searchRequest)
481         {
482             m_p->search(package, apdu, move_later);
483         }
484         else if (apdu->which == Z_APDU_presentRequest)
485         {
486             m_p->present(package, apdu, move_later);
487         }
488         else
489         {
490             ODR odr = odr_createmem(ODR_ENCODE);
491             
492             Z_APDU *apdu = zget_APDU(odr, Z_APDU_close);
493             
494             *apdu->u.close->closeReason = Z_Close_protocolError;
495
496             apdu->u.close->diagnosticInformation =
497                 odr_strdup(odr, "unsupported APDU in filter_virt_db");
498             
499             package.response() = apdu;
500             package.session().close();
501             odr_destroy(odr);
502         }
503         if (move_later)
504             package.move();
505     }
506     if (package.session().is_closed())
507         m_p->release_session(package);
508 }
509
510
511 /*
512  * Local variables:
513  * c-basic-offset: 4
514  * indent-tabs-mode: nil
515  * c-file-style: "stroustrup"
516  * End:
517  * vim: shiftwidth=4 tabstop=8 expandtab
518  */