Returning records
[mp-sparql-moved-to-github.git] / src / filter_sparql.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include <metaproxy/package.hpp>
20 #include <metaproxy/util.hpp>
21 #include <yaz/log.h>
22 #include <yaz/srw.h>
23 #include <yaz/diagbib1.h>
24 #include <yaz/match_glob.h>
25 #include <boost/scoped_ptr.hpp>
26 #include <boost/thread/mutex.hpp>
27 #include <boost/thread/condition.hpp>
28 #include "sparql.h"
29
30 #include <yaz/zgdu.h>
31
32 namespace mp = metaproxy_1;
33 namespace yf = mp::filter;
34
35 namespace metaproxy_1 {
36     namespace filter {
37         class SPARQL : public Base {
38             class Session;
39             class Rep;
40             class Conf;
41             class FrontendSet;
42
43             typedef boost::shared_ptr<Session> SessionPtr;
44             typedef boost::shared_ptr<Conf> ConfPtr;
45
46             typedef boost::shared_ptr<FrontendSet> FrontendSetPtr;
47             typedef std::map<std::string,FrontendSetPtr> FrontendSets;
48         public:
49             SPARQL();
50             ~SPARQL();
51             void process(metaproxy_1::Package & package) const;
52             void configure(const xmlNode * ptr, bool test_only,
53                            const char *path);
54             SessionPtr get_session(Package &package, Z_APDU **apdu) const;
55             void release_session(Package &package) const;
56             boost::scoped_ptr<Rep> m_p;
57             std::list<ConfPtr> db_conf;
58         };
59         class SPARQL::Conf {
60         public:
61             std::string db;
62             std::string uri;
63             yaz_sparql_t s;
64             ~Conf();
65         };
66         class SPARQL::Rep {
67             friend class SPARQL;
68             boost::condition m_cond_session_ready;
69             boost::mutex m_mutex;
70             std::map<mp::Session,SessionPtr> m_clients;
71         };
72         class SPARQL::FrontendSet {
73         public:
74             FrontendSet();
75             ~FrontendSet();
76         private:
77             friend class Session;
78             Odr_int hits;
79             std::string db;
80             xmlDoc *doc;
81         };
82         class SPARQL::Session {
83         public:
84             Session(const SPARQL *);
85             ~Session();
86             void handle_z(Package &package, Z_APDU *apdu);
87             Z_APDU *run_sparql(mp::Package &package,
88                                Z_APDU *apdu_req,
89                                mp::odr &odr,
90                                const char *sparql_query,
91                                const char *uri);
92             Z_Records *fetch(
93                 FrontendSetPtr fset,
94                 ODR odr, Odr_oid *preferredRecordSyntax,
95                 Z_ElementSetNames *esn,
96                 int start, int number, int &error_code, std::string &addinfo,
97                 int *number_returned, int *next_position);
98             bool m_in_use;
99         private:
100             bool m_support_named_result_sets;
101             FrontendSets m_frontend_sets;
102             const SPARQL *m_sparql;
103         };
104     }
105 }
106
107 yf::SPARQL::FrontendSet::~FrontendSet()
108 {
109     if (doc)
110         xmlFreeDoc(doc);
111 }
112
113 yf::SPARQL::FrontendSet::FrontendSet()
114 {
115     doc = 0;
116 }
117
118 yf::SPARQL::SPARQL() : m_p(new Rep)
119 {
120 }
121
122 yf::SPARQL::~SPARQL()
123 {
124 }
125
126 void yf::SPARQL::configure(const xmlNode *xmlnode, bool test_only,
127                            const char *path)
128 {
129     const xmlNode *ptr = xmlnode->children;
130
131     for (; ptr; ptr = ptr->next)
132     {
133         if (ptr->type != XML_ELEMENT_NODE)
134             continue;
135         if (!strcmp((const char *) ptr->name, "db"))
136         {
137             yaz_sparql_t s = yaz_sparql_create();
138             ConfPtr conf(new Conf);
139             conf->s = s;
140
141             const struct _xmlAttr *attr;
142             for (attr = ptr->properties; attr; attr = attr->next)
143             {
144                 if (!strcmp((const char *) attr->name, "path"))
145                     conf->db = mp::xml::get_text(attr->children);
146                 else if (!strcmp((const char *) attr->name, "uri"))
147                     conf->uri = mp::xml::get_text(attr->children);
148                 else
149                     throw mp::filter::FilterException(
150                         "Bad attribute " + std::string((const char *)
151                                                        attr->name));
152             }
153             xmlNode *p = ptr->children;
154             for (; p; p = p->next)
155             {
156                 if (p->type != XML_ELEMENT_NODE)
157                     continue;
158                 std::string name = (const char *) p->name;
159                 const struct _xmlAttr *attr;
160                 for (attr = p->properties; attr; attr = attr->next)
161                 {
162                     if (!strcmp((const char *) attr->name, "type"))
163                     {
164                         name.append(".");
165                         name.append(mp::xml::get_text(attr->children));
166                     }
167                     else
168                         throw mp::filter::FilterException(
169                             "Bad attribute " + std::string((const char *)
170                                                            attr->name));
171                 }
172                 std::string value = mp::xml::get_text(p);
173                 if (yaz_sparql_add_pattern(s, name.c_str(), value.c_str()))
174                 {
175                     throw mp::filter::FilterException(
176                         "Bad SPARQL config " + name);
177                 }
178             }
179             if (!conf->uri.length())
180             {
181                 throw mp::filter::FilterException("Missing uri");
182             }
183             if (!conf->db.length())
184             {
185                 throw mp::filter::FilterException("Missing path");
186             }
187             db_conf.push_back(conf);
188         }
189         else
190         {
191             throw mp::filter::FilterException
192                 ("Bad element "
193                  + std::string((const char *) ptr->name)
194                  + " in sparql filter");
195         }
196     }
197 }
198
199 yf::SPARQL::Conf::~Conf()
200 {
201     yaz_sparql_destroy(s);
202 }
203
204 yf::SPARQL::Session::Session(const SPARQL *sparql) :
205     m_in_use(true),
206     m_support_named_result_sets(false),
207     m_sparql(sparql)
208 {
209 }
210
211 yf::SPARQL::Session::~Session()
212 {
213 }
214
215 yf::SPARQL::SessionPtr yf::SPARQL::get_session(Package & package,
216                                                Z_APDU **apdu) const
217 {
218     SessionPtr ptr0;
219
220     Z_GDU *gdu = package.request().get();
221
222     boost::mutex::scoped_lock lock(m_p->m_mutex);
223
224     std::map<mp::Session,SPARQL::SessionPtr>::iterator it;
225
226     if (gdu && gdu->which == Z_GDU_Z3950)
227         *apdu = gdu->u.z3950;
228     else
229         *apdu = 0;
230
231     while (true)
232     {
233         it = m_p->m_clients.find(package.session());
234         if (it == m_p->m_clients.end())
235             break;
236         if (!it->second->m_in_use)
237         {
238             it->second->m_in_use = true;
239             return it->second;
240         }
241         m_p->m_cond_session_ready.wait(lock);
242     }
243     if (!*apdu)
244         return ptr0;
245
246     // new Z39.50 session ..
247     SessionPtr p(new Session(this));
248     m_p->m_clients[package.session()] = p;
249     return p;
250 }
251
252 void yf::SPARQL::release_session(Package &package) const
253 {
254     boost::mutex::scoped_lock lock(m_p->m_mutex);
255     std::map<mp::Session,SessionPtr>::iterator it;
256
257     it = m_p->m_clients.find(package.session());
258     if (it != m_p->m_clients.end())
259     {
260         it->second->m_in_use = false;
261
262         if (package.session().is_closed())
263             m_p->m_clients.erase(it);
264         m_p->m_cond_session_ready.notify_all();
265     }
266 }
267
268 static xmlNode *get_result(xmlDoc *doc, Odr_int *sz, Odr_int pos)
269 {
270     xmlNode *ptr = xmlDocGetRootElement(doc);
271     Odr_int cur = 0;
272     for (; ptr; ptr = ptr->next)
273         if (ptr->type == XML_ELEMENT_NODE &&
274             !strcmp((const char *) ptr->name, "sparql"))
275             break;
276     if (ptr)
277     {
278         for (ptr = ptr->children; ptr; ptr = ptr->next)
279             if (ptr->type == XML_ELEMENT_NODE &&
280                 !strcmp((const char *) ptr->name, "results"))
281                 break;
282     }
283     if (ptr)
284     {
285         for (ptr = ptr->children; ptr; ptr = ptr->next)
286             if (ptr->type == XML_ELEMENT_NODE &&
287                 !strcmp((const char *) ptr->name, "result"))
288             {
289                 if (cur++ == pos)
290                     break;
291             }
292     }
293     if (sz)
294         *sz = cur;
295     return ptr;
296 }
297
298 Z_Records *yf::SPARQL::Session::fetch(
299     FrontendSetPtr fset,
300     ODR odr, Odr_oid *preferredRecordSyntax,
301     Z_ElementSetNames *esn,
302     int start, int number, int &error_code, std::string &addinfo,
303     int *number_returned, int *next_position)
304 {
305     Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
306     rec->which = Z_Records_DBOSD;
307     rec->u.databaseOrSurDiagnostics = (Z_NamePlusRecordList *)
308         odr_malloc(odr, sizeof(Z_NamePlusRecordList));
309     rec->u.databaseOrSurDiagnostics->records = (Z_NamePlusRecord **)
310         odr_malloc(odr, sizeof(Z_NamePlusRecord *) * number);
311     int i;
312     for (i = 0; i < number; i++)
313     {
314         rec->u.databaseOrSurDiagnostics->records[i] = (Z_NamePlusRecord *)
315             odr_malloc(odr, sizeof(Z_NamePlusRecord));
316         Z_NamePlusRecord *npr = rec->u.databaseOrSurDiagnostics->records[i];
317         npr->databaseName = odr_strdup(odr, fset->db.c_str());
318         npr->which = Z_NamePlusRecord_databaseRecord;
319
320         xmlNode *node = get_result(fset->doc, 0, start - 1 + i);
321         if (!node)
322             break;
323         assert(node->type == XML_ELEMENT_NODE);
324         assert(!strcmp((const char *) node->name, "result"));
325         xmlNode *tmp = xmlCopyNode(node, 1);
326         xmlBufferPtr buf = xmlBufferCreate();
327         xmlNodeDump(buf, tmp->doc, tmp, 0, 0);
328         npr->u.databaseRecord =
329             z_ext_record_xml(odr, (const char *) buf->content, buf->use);
330         xmlFreeNode(tmp);
331         xmlBufferFree(buf);
332     }
333     rec->u.databaseOrSurDiagnostics->num_records = i;
334     *number_returned = i;
335     if (start + number > fset->hits)
336         *next_position = 0;
337     else
338         *next_position = start + number;
339     return rec;
340 }
341
342 Z_APDU *yf::SPARQL::Session::run_sparql(mp::Package &package,
343                                         Z_APDU *apdu_req,
344                                         mp::odr &odr,
345                                         const char *sparql_query,
346                                         const char *uri)
347 {
348     Z_SearchRequest *req = apdu_req->u.searchRequest;
349     Package http_package(package.session(), package.origin());
350
351     http_package.copy_filter(package);
352     Z_GDU *gdu = z_get_HTTP_Request_uri(odr, uri, 0, 1);
353
354     z_HTTP_header_add(odr, &gdu->u.HTTP_Request->headers,
355                       "Content-Type", "application/x-www-form-urlencoded");
356     const char *names[2];
357     names[0] = "query";
358     names[1] = 0;
359     const char *values[1];
360     values[0] = sparql_query;
361     char *path = 0;
362     yaz_array_to_uri(&path, odr, (char **) names, (char **) values);
363
364     gdu->u.HTTP_Request->content_buf = path;
365     gdu->u.HTTP_Request->content_len = strlen(path);
366
367     yaz_log(YLOG_LOG, "sparql: HTTP request\n%s", sparql_query);
368
369     http_package.request() = gdu;
370     http_package.move();
371
372     Z_GDU *gdu_resp = http_package.response().get();
373     Z_APDU *apdu_res = 0;
374     if (gdu_resp && gdu_resp->which == Z_GDU_HTTP_Response)
375     {
376         Z_HTTP_Response *resp = gdu_resp->u.HTTP_Response;
377         FrontendSetPtr fset(new FrontendSet);
378
379         fset->doc = xmlParseMemory(resp->content_buf, resp->content_len);
380         fset->db = req->databaseNames[0];
381         if (!fset->doc)
382             apdu_res = odr.create_searchResponse(apdu_req,
383                                              YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
384                                              "invalid XML from backendbackend");
385         else
386         {
387             Z_Records *records = 0;
388             int number_returned = 0;
389             int next_position = 0;
390             int error_code = 0;
391             std::string addinfo;
392
393             get_result(fset->doc, &fset->hits, -1);
394             m_frontend_sets[req->resultSetName] = fset;
395
396             Odr_int number = 0;
397             const char *element_set_name = 0;
398             mp::util::piggyback_sr(req, fset->hits, number, &element_set_name);
399             if (number)
400             {
401                 Z_ElementSetNames *esn;
402
403                 if (number > *req->smallSetUpperBound)
404                     esn = req->mediumSetElementSetNames;
405                 else
406                     esn = req->smallSetElementSetNames;
407                 records = fetch(fset,
408                                 odr, req->preferredRecordSyntax, esn,
409                                 1, number,
410                                 error_code, addinfo,
411                                 &number_returned,
412                                 &next_position);
413             }
414             if (error_code)
415             {
416                 apdu_res =
417                     odr.create_searchResponse(
418                         apdu_req, error_code, addinfo.c_str());
419             }
420             else
421             {
422                 apdu_res =
423                     odr.create_searchResponse(apdu_req, 0, 0);
424                 Z_SearchResponse *resp = apdu_res->u.searchResponse;
425                 *resp->resultCount = fset->hits;
426                 *resp->numberOfRecordsReturned = number_returned;
427                 *resp->nextResultSetPosition = next_position;
428                 resp->records = records;
429             }
430         }
431     }
432     else
433     {
434         yaz_log(YLOG_LOG, "sparql: no HTTP response");
435         apdu_res = odr.create_searchResponse(apdu_req,
436                                              YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
437                                              "no HTTP response from backend");
438     }
439     return apdu_res;
440 }
441
442 void yf::SPARQL::Session::handle_z(mp::Package &package, Z_APDU *apdu_req)
443 {
444     mp::odr odr;
445     Z_APDU *apdu_res = 0;
446     if (apdu_req->which == Z_APDU_initRequest)
447     {
448         apdu_res = odr.create_initResponse(apdu_req, 0, 0);
449         Z_InitRequest *req = apdu_req->u.initRequest;
450         Z_InitResponse *resp = apdu_res->u.initResponse;
451
452         resp->implementationName = odr_strdup(odr, "sparql");
453         if (ODR_MASK_GET(req->options, Z_Options_namedResultSets))
454             m_support_named_result_sets = true;
455         int i;
456         static const int masks[] = {
457             Z_Options_search, Z_Options_present,
458             Z_Options_namedResultSets, -1
459         };
460         for (i = 0; masks[i] != -1; i++)
461             if (ODR_MASK_GET(req->options, masks[i]))
462                 ODR_MASK_SET(resp->options, masks[i]);
463         static const int versions[] = {
464             Z_ProtocolVersion_1,
465             Z_ProtocolVersion_2,
466             Z_ProtocolVersion_3,
467             -1
468         };
469         for (i = 0; versions[i] != -1; i++)
470             if (ODR_MASK_GET(req->protocolVersion, versions[i]))
471                 ODR_MASK_SET(resp->protocolVersion, versions[i]);
472             else
473                 break;
474         *resp->preferredMessageSize = *req->preferredMessageSize;
475         *resp->maximumRecordSize = *req->maximumRecordSize;
476     }
477     else if (apdu_req->which == Z_APDU_close)
478     {
479         apdu_res = odr.create_close(apdu_req,
480                                     Z_Close_finished, 0);
481         package.session().close();
482     }
483     else if (apdu_req->which == Z_APDU_searchRequest)
484     {
485         Z_SearchRequest *req = apdu_req->u.searchRequest;
486
487         FrontendSets::iterator fset_it =
488             m_frontend_sets.find(req->resultSetName);
489         if (fset_it != m_frontend_sets.end())
490         {
491             // result set already exist
492             // if replace indicator is off: we return diagnostic if
493             // result set already exist.
494             if (*req->replaceIndicator == 0)
495             {
496                 Z_APDU *apdu =
497                     odr.create_searchResponse(
498                         apdu_req,
499                         YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
500                         0);
501                 package.response() = apdu_res;
502             }
503             m_frontend_sets.erase(fset_it);
504         }
505         if (req->query->which != Z_Query_type_1)
506         {
507             apdu_res = odr.create_searchResponse(
508                 apdu_req, YAZ_BIB1_QUERY_TYPE_UNSUPP, 0);
509         }
510         else if (req->num_databaseNames != 1)
511         {
512             apdu_res = odr.create_searchResponse(
513                 apdu_req,
514                 YAZ_BIB1_ACCESS_TO_SPECIFIED_DATABASE_DENIED, 0);
515         }
516         else
517         {
518             std::string db = req->databaseNames[0];
519             std::list<ConfPtr>::const_iterator it;
520
521             it = m_sparql->db_conf.begin();
522             for (; it != m_sparql->db_conf.end(); it++)
523                 if (yaz_match_glob((*it)->db.c_str(), db.c_str()))
524                     break;
525             if (it == m_sparql->db_conf.end())
526             {
527                 apdu_res = odr.create_searchResponse(
528                     apdu_req, YAZ_BIB1_DATABASE_DOES_NOT_EXIST, db.c_str());
529             }
530             else
531             {
532                 WRBUF addinfo_wr = wrbuf_alloc();
533                 WRBUF sparql_wr = wrbuf_alloc();
534                 int error =
535                     yaz_sparql_from_rpn_wrbuf((*it)->s,
536                                               addinfo_wr, sparql_wr,
537                                               req->query->u.type_1);
538                 if (error)
539                 {
540                     apdu_res = odr.create_searchResponse(
541                         apdu_req, error,
542                         wrbuf_len(addinfo_wr) ?
543                         wrbuf_cstr(addinfo_wr) : 0);
544                 }
545                 else
546                 {
547                     apdu_res = run_sparql(package, apdu_req, odr,
548                                           wrbuf_cstr(sparql_wr),
549                                           (*it)->uri.c_str());
550                 }
551                 wrbuf_destroy(addinfo_wr);
552                 wrbuf_destroy(sparql_wr);
553             }
554         }
555     }
556     else if (apdu_req->which == Z_APDU_presentRequest)
557     {
558         Z_PresentRequest *req = apdu_req->u.presentRequest;
559         FrontendSets::iterator fset_it =
560             m_frontend_sets.find(req->resultSetId);
561         if (fset_it == m_frontend_sets.end())
562         {
563             apdu_res =
564                 odr.create_presentResponse(
565                     apdu_req, YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
566                     req->resultSetId);
567             package.response() = apdu_res;
568             return;
569         }
570         int number_returned = 0;
571         int next_position = 0;
572         int error_code = 0;
573         std::string addinfo;
574         Z_ElementSetNames *esn = 0;
575         if (req->recordComposition)
576         {
577             if (req->recordComposition->which == Z_RecordComp_simple)
578                 esn = req->recordComposition->u.simple;
579             else
580             {
581                 apdu_res =
582                     odr.create_presentResponse(
583                         apdu_req,
584                         YAZ_BIB1_ONLY_A_SINGLE_ELEMENT_SET_NAME_SUPPORTED,
585                         0);
586                 package.response() = apdu_res;
587                 return;
588             }
589         }
590         Z_Records *records = fetch(
591             fset_it->second,
592             odr, req->preferredRecordSyntax, esn,
593             *req->resultSetStartPoint, *req->numberOfRecordsRequested,
594             error_code, addinfo,
595             &number_returned,
596             &next_position);
597         if (error_code)
598         {
599             apdu_res =
600                 odr.create_presentResponse(apdu_req, error_code,
601                                            addinfo.c_str());
602         }
603         else
604         {
605             apdu_res =
606                 odr.create_presentResponse(apdu_req, 0, 0);
607             Z_PresentResponse *resp = apdu_res->u.presentResponse;
608             resp->records = records;
609             *resp->numberOfRecordsReturned = number_returned;
610             *resp->nextResultSetPosition = next_position;
611         }
612     }
613     else
614     {
615         apdu_res = odr.create_close(apdu_req,
616                                     Z_Close_protocolError,
617                                     "sparql: unhandled APDU");
618         package.session().close();
619     }
620
621     assert(apdu_res);
622     package.response() = apdu_res;
623 }
624
625 void yf::SPARQL::process(mp::Package &package) const
626 {
627     Z_APDU *apdu;
628     SessionPtr p = get_session(package, &apdu);
629     if (p && apdu)
630     {
631         p->handle_z(package, apdu);
632     }
633     else
634         package.move();
635     release_session(package);
636 }
637
638 static mp::filter::Base* filter_creator()
639 {
640     return new mp::filter::SPARQL;
641 }
642
643 extern "C" {
644     struct metaproxy_1_filter_struct metaproxy_1_filter_sparql = {
645         0,
646         "sparql",
647         filter_creator
648     };
649 }
650
651
652 /*
653  * Local variables:
654  * c-basic-offset: 4
655  * c-file-style: "Stroustrup"
656  * indent-tabs-mode: nil
657  * End:
658  * vim: shiftwidth=4 tabstop=8 expandtab
659  */
660