If backend returns other than 200 (OK) produce diagnostic
[mp-sparql-moved-to-github.git] / src / filter_sparql.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include <metaproxy/package.hpp>
20 #include <metaproxy/util.hpp>
21 #include <yaz/log.h>
22 #include <yaz/srw.h>
23 #include <yaz/diagbib1.h>
24 #include <yaz/match_glob.h>
25 #include <boost/scoped_ptr.hpp>
26 #include <boost/thread/mutex.hpp>
27 #include <boost/thread/condition.hpp>
28 #include "sparql.h"
29
30 #include <yaz/zgdu.h>
31
32 namespace mp = metaproxy_1;
33 namespace yf = mp::filter;
34
35 namespace metaproxy_1 {
36     namespace filter {
37         class SPARQL : public Base {
38             class Session;
39             class Rep;
40             class Conf;
41             class FrontendSet;
42
43             typedef boost::shared_ptr<Session> SessionPtr;
44             typedef boost::shared_ptr<Conf> ConfPtr;
45
46             typedef boost::shared_ptr<FrontendSet> FrontendSetPtr;
47             typedef std::map<std::string,FrontendSetPtr> FrontendSets;
48         public:
49             SPARQL();
50             ~SPARQL();
51             void process(metaproxy_1::Package & package) const;
52             void configure(const xmlNode * ptr, bool test_only,
53                            const char *path);
54             SessionPtr get_session(Package &package, Z_APDU **apdu) const;
55             void release_session(Package &package) const;
56             boost::scoped_ptr<Rep> m_p;
57             std::list<ConfPtr> db_conf;
58         };
59         class SPARQL::Conf {
60         public:
61             std::string db;
62             std::string uri;
63             yaz_sparql_t s;
64             ~Conf();
65         };
66         class SPARQL::Rep {
67             friend class SPARQL;
68             boost::condition m_cond_session_ready;
69             boost::mutex m_mutex;
70             std::map<mp::Session,SessionPtr> m_clients;
71         };
72         class SPARQL::FrontendSet {
73         public:
74             FrontendSet();
75             ~FrontendSet();
76         private:
77             friend class Session;
78             Odr_int hits;
79             std::string db;
80             xmlDoc *doc;
81         };
82         class SPARQL::Session {
83         public:
84             Session(const SPARQL *);
85             ~Session();
86             void handle_z(Package &package, Z_APDU *apdu);
87             Z_APDU *run_sparql(mp::Package &package,
88                                Z_APDU *apdu_req,
89                                mp::odr &odr,
90                                const char *sparql_query,
91                                const char *uri);
92             Z_Records *fetch(
93                 FrontendSetPtr fset,
94                 ODR odr, Odr_oid *preferredRecordSyntax,
95                 Z_ElementSetNames *esn,
96                 int start, int number, int &error_code, std::string &addinfo,
97                 int *number_returned, int *next_position);
98             bool m_in_use;
99         private:
100             bool m_support_named_result_sets;
101             FrontendSets m_frontend_sets;
102             const SPARQL *m_sparql;
103         };
104     }
105 }
106
107 yf::SPARQL::FrontendSet::~FrontendSet()
108 {
109     if (doc)
110         xmlFreeDoc(doc);
111 }
112
113 yf::SPARQL::FrontendSet::FrontendSet()
114 {
115     doc = 0;
116 }
117
118 yf::SPARQL::SPARQL() : m_p(new Rep)
119 {
120 }
121
122 yf::SPARQL::~SPARQL()
123 {
124 }
125
126 void yf::SPARQL::configure(const xmlNode *xmlnode, bool test_only,
127                            const char *path)
128 {
129     const xmlNode *ptr = xmlnode->children;
130
131     for (; ptr; ptr = ptr->next)
132     {
133         if (ptr->type != XML_ELEMENT_NODE)
134             continue;
135         if (!strcmp((const char *) ptr->name, "db"))
136         {
137             yaz_sparql_t s = yaz_sparql_create();
138             ConfPtr conf(new Conf);
139             conf->s = s;
140
141             const struct _xmlAttr *attr;
142             for (attr = ptr->properties; attr; attr = attr->next)
143             {
144                 if (!strcmp((const char *) attr->name, "path"))
145                     conf->db = mp::xml::get_text(attr->children);
146                 else if (!strcmp((const char *) attr->name, "uri"))
147                     conf->uri = mp::xml::get_text(attr->children);
148                 else
149                     throw mp::filter::FilterException(
150                         "Bad attribute " + std::string((const char *)
151                                                        attr->name));
152             }
153             xmlNode *p = ptr->children;
154             for (; p; p = p->next)
155             {
156                 if (p->type != XML_ELEMENT_NODE)
157                     continue;
158                 std::string name = (const char *) p->name;
159                 const struct _xmlAttr *attr;
160                 for (attr = p->properties; attr; attr = attr->next)
161                 {
162                     if (!strcmp((const char *) attr->name, "type"))
163                     {
164                         name.append(".");
165                         name.append(mp::xml::get_text(attr->children));
166                     }
167                     else
168                         throw mp::filter::FilterException(
169                             "Bad attribute " + std::string((const char *)
170                                                            attr->name));
171                 }
172                 std::string value = mp::xml::get_text(p);
173                 if (yaz_sparql_add_pattern(s, name.c_str(), value.c_str()))
174                 {
175                     throw mp::filter::FilterException(
176                         "Bad SPARQL config " + name);
177                 }
178             }
179             if (!conf->uri.length())
180             {
181                 throw mp::filter::FilterException("Missing uri");
182             }
183             if (!conf->db.length())
184             {
185                 throw mp::filter::FilterException("Missing path");
186             }
187             db_conf.push_back(conf);
188         }
189         else
190         {
191             throw mp::filter::FilterException
192                 ("Bad element "
193                  + std::string((const char *) ptr->name)
194                  + " in sparql filter");
195         }
196     }
197 }
198
199 yf::SPARQL::Conf::~Conf()
200 {
201     yaz_sparql_destroy(s);
202 }
203
204 yf::SPARQL::Session::Session(const SPARQL *sparql) :
205     m_in_use(true),
206     m_support_named_result_sets(false),
207     m_sparql(sparql)
208 {
209 }
210
211 yf::SPARQL::Session::~Session()
212 {
213 }
214
215 yf::SPARQL::SessionPtr yf::SPARQL::get_session(Package & package,
216                                                Z_APDU **apdu) const
217 {
218     SessionPtr ptr0;
219
220     Z_GDU *gdu = package.request().get();
221
222     boost::mutex::scoped_lock lock(m_p->m_mutex);
223
224     std::map<mp::Session,SPARQL::SessionPtr>::iterator it;
225
226     if (gdu && gdu->which == Z_GDU_Z3950)
227         *apdu = gdu->u.z3950;
228     else
229         *apdu = 0;
230
231     while (true)
232     {
233         it = m_p->m_clients.find(package.session());
234         if (it == m_p->m_clients.end())
235             break;
236         if (!it->second->m_in_use)
237         {
238             it->second->m_in_use = true;
239             return it->second;
240         }
241         m_p->m_cond_session_ready.wait(lock);
242     }
243     if (!*apdu)
244         return ptr0;
245
246     // new Z39.50 session ..
247     SessionPtr p(new Session(this));
248     m_p->m_clients[package.session()] = p;
249     return p;
250 }
251
252 void yf::SPARQL::release_session(Package &package) const
253 {
254     boost::mutex::scoped_lock lock(m_p->m_mutex);
255     std::map<mp::Session,SessionPtr>::iterator it;
256
257     it = m_p->m_clients.find(package.session());
258     if (it != m_p->m_clients.end())
259     {
260         it->second->m_in_use = false;
261
262         if (package.session().is_closed())
263             m_p->m_clients.erase(it);
264         m_p->m_cond_session_ready.notify_all();
265     }
266 }
267
268 static xmlNode *get_result(xmlDoc *doc, Odr_int *sz, Odr_int pos)
269 {
270     xmlNode *ptr = xmlDocGetRootElement(doc);
271     Odr_int cur = 0;
272     for (; ptr; ptr = ptr->next)
273         if (ptr->type == XML_ELEMENT_NODE &&
274             !strcmp((const char *) ptr->name, "sparql"))
275             break;
276     if (ptr)
277     {
278         for (ptr = ptr->children; ptr; ptr = ptr->next)
279             if (ptr->type == XML_ELEMENT_NODE &&
280                 !strcmp((const char *) ptr->name, "results"))
281                 break;
282     }
283     if (ptr)
284     {
285         for (ptr = ptr->children; ptr; ptr = ptr->next)
286             if (ptr->type == XML_ELEMENT_NODE &&
287                 !strcmp((const char *) ptr->name, "result"))
288             {
289                 if (cur++ == pos)
290                     break;
291             }
292     }
293     if (sz)
294         *sz = cur;
295     return ptr;
296 }
297
298 Z_Records *yf::SPARQL::Session::fetch(
299     FrontendSetPtr fset,
300     ODR odr, Odr_oid *preferredRecordSyntax,
301     Z_ElementSetNames *esn,
302     int start, int number, int &error_code, std::string &addinfo,
303     int *number_returned, int *next_position)
304 {
305     Z_Records *rec = (Z_Records *) odr_malloc(odr, sizeof(Z_Records));
306     rec->which = Z_Records_DBOSD;
307     rec->u.databaseOrSurDiagnostics = (Z_NamePlusRecordList *)
308         odr_malloc(odr, sizeof(Z_NamePlusRecordList));
309     rec->u.databaseOrSurDiagnostics->records = (Z_NamePlusRecord **)
310         odr_malloc(odr, sizeof(Z_NamePlusRecord *) * number);
311     int i;
312     for (i = 0; i < number; i++)
313     {
314         rec->u.databaseOrSurDiagnostics->records[i] = (Z_NamePlusRecord *)
315             odr_malloc(odr, sizeof(Z_NamePlusRecord));
316         Z_NamePlusRecord *npr = rec->u.databaseOrSurDiagnostics->records[i];
317         npr->databaseName = odr_strdup(odr, fset->db.c_str());
318         npr->which = Z_NamePlusRecord_databaseRecord;
319
320         xmlNode *node = get_result(fset->doc, 0, start - 1 + i);
321         if (!node)
322             break;
323         assert(node->type == XML_ELEMENT_NODE);
324         assert(!strcmp((const char *) node->name, "result"));
325         xmlNode *tmp = xmlCopyNode(node, 1);
326         xmlBufferPtr buf = xmlBufferCreate();
327         xmlNodeDump(buf, tmp->doc, tmp, 0, 0);
328         npr->u.databaseRecord =
329             z_ext_record_xml(odr, (const char *) buf->content, buf->use);
330         xmlFreeNode(tmp);
331         xmlBufferFree(buf);
332     }
333     rec->u.databaseOrSurDiagnostics->num_records = i;
334     *number_returned = i;
335     if (start + number > fset->hits)
336         *next_position = 0;
337     else
338         *next_position = start + number;
339     return rec;
340 }
341
342 Z_APDU *yf::SPARQL::Session::run_sparql(mp::Package &package,
343                                         Z_APDU *apdu_req,
344                                         mp::odr &odr,
345                                         const char *sparql_query,
346                                         const char *uri)
347 {
348     Z_SearchRequest *req = apdu_req->u.searchRequest;
349     Package http_package(package.session(), package.origin());
350
351     http_package.copy_filter(package);
352     Z_GDU *gdu = z_get_HTTP_Request_uri(odr, uri, 0, 1);
353
354     z_HTTP_header_add(odr, &gdu->u.HTTP_Request->headers,
355                       "Content-Type", "application/x-www-form-urlencoded");
356     const char *names[2];
357     names[0] = "query";
358     names[1] = 0;
359     const char *values[1];
360     values[0] = sparql_query;
361     char *path = 0;
362     yaz_array_to_uri(&path, odr, (char **) names, (char **) values);
363
364     gdu->u.HTTP_Request->content_buf = path;
365     gdu->u.HTTP_Request->content_len = strlen(path);
366
367     yaz_log(YLOG_LOG, "sparql: HTTP request\n%s", sparql_query);
368
369     http_package.request() = gdu;
370     http_package.move();
371
372     Z_GDU *gdu_resp = http_package.response().get();
373     Z_APDU *apdu_res = 0;
374     if (!gdu_resp || gdu_resp->which != Z_GDU_HTTP_Response)
375     {
376         yaz_log(YLOG_LOG, "sparql: no HTTP response");
377         apdu_res = odr.create_searchResponse(apdu_req,
378                                              YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
379                                              "no HTTP response from backend");
380     }
381     else if (gdu_resp->u.HTTP_Response->code != 200)
382     {
383         mp::wrbuf w;
384
385         wrbuf_printf(w, "sparql: HTTP error %d from backend",
386                      gdu_resp->u.HTTP_Response->code);
387         apdu_res = odr.create_searchResponse(apdu_req,
388                                              YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
389                                              w.c_str());
390     }
391     else
392     {
393         Z_HTTP_Response *resp = gdu_resp->u.HTTP_Response;
394         FrontendSetPtr fset(new FrontendSet);
395
396         fset->doc = xmlParseMemory(resp->content_buf, resp->content_len);
397         fset->db = req->databaseNames[0];
398         if (!fset->doc)
399             apdu_res = odr.create_searchResponse(apdu_req,
400                                              YAZ_BIB1_TEMPORARY_SYSTEM_ERROR,
401                                              "invalid XML from backendbackend");
402         else
403         {
404             Z_Records *records = 0;
405             int number_returned = 0;
406             int next_position = 0;
407             int error_code = 0;
408             std::string addinfo;
409
410             get_result(fset->doc, &fset->hits, -1);
411             m_frontend_sets[req->resultSetName] = fset;
412
413             Odr_int number = 0;
414             const char *element_set_name = 0;
415             mp::util::piggyback_sr(req, fset->hits, number, &element_set_name);
416             if (number)
417             {
418                 Z_ElementSetNames *esn;
419
420                 if (number > *req->smallSetUpperBound)
421                     esn = req->mediumSetElementSetNames;
422                 else
423                     esn = req->smallSetElementSetNames;
424                 records = fetch(fset,
425                                 odr, req->preferredRecordSyntax, esn,
426                                 1, number,
427                                 error_code, addinfo,
428                                 &number_returned,
429                                 &next_position);
430             }
431             if (error_code)
432             {
433                 apdu_res =
434                     odr.create_searchResponse(
435                         apdu_req, error_code, addinfo.c_str());
436             }
437             else
438             {
439                 apdu_res =
440                     odr.create_searchResponse(apdu_req, 0, 0);
441                 Z_SearchResponse *resp = apdu_res->u.searchResponse;
442                 *resp->resultCount = fset->hits;
443                 *resp->numberOfRecordsReturned = number_returned;
444                 *resp->nextResultSetPosition = next_position;
445                 resp->records = records;
446             }
447         }
448     }
449     return apdu_res;
450 }
451
452 void yf::SPARQL::Session::handle_z(mp::Package &package, Z_APDU *apdu_req)
453 {
454     mp::odr odr;
455     Z_APDU *apdu_res = 0;
456     if (apdu_req->which == Z_APDU_initRequest)
457     {
458         apdu_res = odr.create_initResponse(apdu_req, 0, 0);
459         Z_InitRequest *req = apdu_req->u.initRequest;
460         Z_InitResponse *resp = apdu_res->u.initResponse;
461
462         resp->implementationName = odr_strdup(odr, "sparql");
463         if (ODR_MASK_GET(req->options, Z_Options_namedResultSets))
464             m_support_named_result_sets = true;
465         int i;
466         static const int masks[] = {
467             Z_Options_search, Z_Options_present,
468             Z_Options_namedResultSets, -1
469         };
470         for (i = 0; masks[i] != -1; i++)
471             if (ODR_MASK_GET(req->options, masks[i]))
472                 ODR_MASK_SET(resp->options, masks[i]);
473         static const int versions[] = {
474             Z_ProtocolVersion_1,
475             Z_ProtocolVersion_2,
476             Z_ProtocolVersion_3,
477             -1
478         };
479         for (i = 0; versions[i] != -1; i++)
480             if (ODR_MASK_GET(req->protocolVersion, versions[i]))
481                 ODR_MASK_SET(resp->protocolVersion, versions[i]);
482             else
483                 break;
484         *resp->preferredMessageSize = *req->preferredMessageSize;
485         *resp->maximumRecordSize = *req->maximumRecordSize;
486     }
487     else if (apdu_req->which == Z_APDU_close)
488     {
489         apdu_res = odr.create_close(apdu_req,
490                                     Z_Close_finished, 0);
491         package.session().close();
492     }
493     else if (apdu_req->which == Z_APDU_searchRequest)
494     {
495         Z_SearchRequest *req = apdu_req->u.searchRequest;
496
497         FrontendSets::iterator fset_it =
498             m_frontend_sets.find(req->resultSetName);
499         if (fset_it != m_frontend_sets.end())
500         {
501             // result set already exist
502             // if replace indicator is off: we return diagnostic if
503             // result set already exist.
504             if (*req->replaceIndicator == 0)
505             {
506                 Z_APDU *apdu =
507                     odr.create_searchResponse(
508                         apdu_req,
509                         YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
510                         0);
511                 package.response() = apdu_res;
512             }
513             m_frontend_sets.erase(fset_it);
514         }
515         if (req->query->which != Z_Query_type_1)
516         {
517             apdu_res = odr.create_searchResponse(
518                 apdu_req, YAZ_BIB1_QUERY_TYPE_UNSUPP, 0);
519         }
520         else if (req->num_databaseNames != 1)
521         {
522             apdu_res = odr.create_searchResponse(
523                 apdu_req,
524                 YAZ_BIB1_ACCESS_TO_SPECIFIED_DATABASE_DENIED, 0);
525         }
526         else
527         {
528             std::string db = req->databaseNames[0];
529             std::list<ConfPtr>::const_iterator it;
530
531             it = m_sparql->db_conf.begin();
532             for (; it != m_sparql->db_conf.end(); it++)
533                 if (yaz_match_glob((*it)->db.c_str(), db.c_str()))
534                     break;
535             if (it == m_sparql->db_conf.end())
536             {
537                 apdu_res = odr.create_searchResponse(
538                     apdu_req, YAZ_BIB1_DATABASE_DOES_NOT_EXIST, db.c_str());
539             }
540             else
541             {
542                 WRBUF addinfo_wr = wrbuf_alloc();
543                 WRBUF sparql_wr = wrbuf_alloc();
544                 int error =
545                     yaz_sparql_from_rpn_wrbuf((*it)->s,
546                                               addinfo_wr, sparql_wr,
547                                               req->query->u.type_1);
548                 if (error)
549                 {
550                     apdu_res = odr.create_searchResponse(
551                         apdu_req, error,
552                         wrbuf_len(addinfo_wr) ?
553                         wrbuf_cstr(addinfo_wr) : 0);
554                 }
555                 else
556                 {
557                     apdu_res = run_sparql(package, apdu_req, odr,
558                                           wrbuf_cstr(sparql_wr),
559                                           (*it)->uri.c_str());
560                 }
561                 wrbuf_destroy(addinfo_wr);
562                 wrbuf_destroy(sparql_wr);
563             }
564         }
565     }
566     else if (apdu_req->which == Z_APDU_presentRequest)
567     {
568         Z_PresentRequest *req = apdu_req->u.presentRequest;
569         FrontendSets::iterator fset_it =
570             m_frontend_sets.find(req->resultSetId);
571         if (fset_it == m_frontend_sets.end())
572         {
573             apdu_res =
574                 odr.create_presentResponse(
575                     apdu_req, YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
576                     req->resultSetId);
577             package.response() = apdu_res;
578             return;
579         }
580         int number_returned = 0;
581         int next_position = 0;
582         int error_code = 0;
583         std::string addinfo;
584         Z_ElementSetNames *esn = 0;
585         if (req->recordComposition)
586         {
587             if (req->recordComposition->which == Z_RecordComp_simple)
588                 esn = req->recordComposition->u.simple;
589             else
590             {
591                 apdu_res =
592                     odr.create_presentResponse(
593                         apdu_req,
594                         YAZ_BIB1_ONLY_A_SINGLE_ELEMENT_SET_NAME_SUPPORTED,
595                         0);
596                 package.response() = apdu_res;
597                 return;
598             }
599         }
600         Z_Records *records = fetch(
601             fset_it->second,
602             odr, req->preferredRecordSyntax, esn,
603             *req->resultSetStartPoint, *req->numberOfRecordsRequested,
604             error_code, addinfo,
605             &number_returned,
606             &next_position);
607         if (error_code)
608         {
609             apdu_res =
610                 odr.create_presentResponse(apdu_req, error_code,
611                                            addinfo.c_str());
612         }
613         else
614         {
615             apdu_res =
616                 odr.create_presentResponse(apdu_req, 0, 0);
617             Z_PresentResponse *resp = apdu_res->u.presentResponse;
618             resp->records = records;
619             *resp->numberOfRecordsReturned = number_returned;
620             *resp->nextResultSetPosition = next_position;
621         }
622     }
623     else
624     {
625         apdu_res = odr.create_close(apdu_req,
626                                     Z_Close_protocolError,
627                                     "sparql: unhandled APDU");
628         package.session().close();
629     }
630
631     assert(apdu_res);
632     package.response() = apdu_res;
633 }
634
635 void yf::SPARQL::process(mp::Package &package) const
636 {
637     Z_APDU *apdu;
638     SessionPtr p = get_session(package, &apdu);
639     if (p && apdu)
640     {
641         p->handle_z(package, apdu);
642     }
643     else
644         package.move();
645     release_session(package);
646 }
647
648 static mp::filter::Base* filter_creator()
649 {
650     return new mp::filter::SPARQL;
651 }
652
653 extern "C" {
654     struct metaproxy_1_filter_struct metaproxy_1_filter_sparql = {
655         0,
656         "sparql",
657         filter_creator
658     };
659 }
660
661
662 /*
663  * Local variables:
664  * c-basic-offset: 4
665  * c-file-style: "Stroustrup"
666  * indent-tabs-mode: nil
667  * End:
668  * vim: shiftwidth=4 tabstop=8 expandtab
669  */
670