sort: cache sorted records
[metaproxy-moved-to-github.git] / src / filter_sort.cpp
1 /* This file is part of Metaproxy.
2    Copyright (C) 2005-2012 Index Data
3
4 Metaproxy is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Metaproxy is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "config.hpp"
20 #include "filter_sort.hpp"
21 #include <metaproxy/package.hpp>
22 #include <metaproxy/util.hpp>
23
24 #include <yaz/diagbib1.h>
25 #include <yaz/copy_types.h>
26 #include <yaz/log.h>
27 #include <yaz/oid_std.h>
28 #include <libxml/xpath.h>
29 #include <libxml/xpathInternals.h>
30
31 #include <boost/thread/mutex.hpp>
32 #include <boost/thread/condition.hpp>
33
34 #include <yaz/zgdu.h>
35
36 namespace mp = metaproxy_1;
37 namespace yf = mp::filter;
38
39 namespace metaproxy_1 {
40     namespace filter {
41         class Sort::Impl {
42             friend class Frontend;
43         public:
44             Impl();
45             ~Impl();
46             void process(metaproxy_1::Package & package);
47             void configure(const xmlNode * ptr, bool test_only,
48                            const char *path);
49         private:
50             int m_prefetch;
51             std::string m_xpath_expr;
52             std::string m_namespaces;
53             bool m_ascending;
54             boost::mutex m_mutex;
55             boost::condition m_cond_session_ready;
56             std::map<mp::Session, FrontendPtr> m_clients;
57             FrontendPtr get_frontend(mp::Package &package);
58             void release_frontend(mp::Package &package);
59         };
60         class Sort::Record {
61             friend class RecordList;
62             Z_NamePlusRecord *npr;
63             std::string score;
64             void get_xpath(xmlDoc *doc, const char *namespaces,
65                            const char *expr);
66             bool register_namespaces(xmlXPathContextPtr xpathCtx,
67                                      const char *nsList);
68         public:
69             Record(Z_NamePlusRecord *n, const char *namespaces,
70                    const char *expr);
71             ~Record();
72             bool operator < (const Record &rhs);
73         };
74         class Sort::RecordList : boost::noncopyable {
75             Odr_oid *syntax;
76             std::list<Record> npr_list;
77             mp::odr m_odr;
78             std::string namespaces;
79             std::string xpath_expr;
80         public:
81             bool cmp(Odr_oid *syntax);
82             void add(Z_NamePlusRecord *s);
83             int size();
84             Z_NamePlusRecord *get(int i, bool ascending);
85             void sort();
86             RecordList(Odr_oid *, std::string namespaces,
87                        std::string xpath_expr);
88             ~RecordList();
89         };
90         class Sort::ResultSet : boost::noncopyable {
91             friend class Frontend;
92             Odr_int hit_count;
93             std::list<RecordListPtr> record_lists;
94         };
95         class Sort::Frontend : boost::noncopyable {
96             friend class Impl;
97             Impl *m_p;
98             bool m_is_virtual;
99             bool m_in_use;
100             std::map<std::string, ResultSetPtr> m_sets;
101             typedef std::map<std::string, ResultSetPtr>::iterator Sets_it;
102             void handle_package(mp::Package &package);
103             void handle_search(mp::Package &package, Z_APDU *apdu_req);
104             void handle_present(mp::Package &package, Z_APDU *apdu_req);
105
106             void handle_records(mp::Package &package,
107                                 Z_APDU *apdu_reqq,
108                                 Z_Records *records,
109                                 Odr_int start_pos,
110                                 ResultSetPtr s,
111                                 Odr_oid *syntax,
112                                 const char *resultSetId);
113         public:
114             Frontend(Impl *impl);
115             ~Frontend();
116         };
117     }
118 }
119
120 static void print_xpath_nodes(xmlNodeSetPtr nodes, FILE* output)
121 {
122     xmlNodePtr cur;
123     int size;
124     int i;
125     
126     assert(output);
127     size = nodes ? nodes->nodeNr : 0;
128     
129     fprintf(output, "Result (%d nodes):\n", size);
130     for (i = 0; i < size; ++i) {
131         assert(nodes->nodeTab[i]);
132         
133         if (nodes->nodeTab[i]->type == XML_NAMESPACE_DECL)
134         {
135             xmlNsPtr ns = (xmlNsPtr)nodes->nodeTab[i];
136             cur = (xmlNodePtr)ns->next;
137             if (cur->ns)
138                 fprintf(output, "= namespace \"%s\"=\"%s\" for node %s:%s\n", 
139                         ns->prefix, ns->href, cur->ns->href, cur->name);
140             else
141                 fprintf(output, "= namespace \"%s\"=\"%s\" for node %s\n", 
142                         ns->prefix, ns->href, cur->name);
143         }
144         else if (nodes->nodeTab[i]->type == XML_ELEMENT_NODE)
145         {
146             cur = nodes->nodeTab[i];        
147             if (cur->ns)
148                 fprintf(output, "= element node \"%s:%s\"\n", 
149                         cur->ns->href, cur->name);
150             else
151                 fprintf(output, "= element node \"%s\"\n",  cur->name);
152         }
153         else
154         {
155             cur = nodes->nodeTab[i];    
156             fprintf(output, "= node \"%s\": type %d\n", cur->name, cur->type);
157         }
158     }
159 }
160
161 bool yf::Sort::Record::register_namespaces(xmlXPathContextPtr xpathCtx,
162                                            const char *nsList)
163 {
164     xmlChar* nsListDup;
165     xmlChar* prefix;
166     xmlChar* href;
167     xmlChar* next;
168     
169     assert(xpathCtx);
170     assert(nsList);
171
172     nsListDup = xmlStrdup((const xmlChar *) nsList);
173     if (!nsListDup)
174         return false;
175     
176     next = nsListDup; 
177     while (next)
178     {
179         /* skip spaces */
180         while (*next == ' ')
181             next++;
182         if (*next == '\0')
183             break;
184
185         /* find prefix */
186         prefix = next;
187         next = (xmlChar *) xmlStrchr(next, '=');
188         if (next == NULL)
189         {
190             xmlFree(nsListDup);
191             return false;
192         }
193         *next++ = '\0'; 
194         
195         /* find href */
196         href = next;
197         next = (xmlChar*)xmlStrchr(next, ' ');
198         if (next)
199             *next++ = '\0';     
200
201         /* do register namespace */
202         if (xmlXPathRegisterNs(xpathCtx, prefix, href) != 0)
203         {
204             xmlFree(nsListDup);
205             return false;
206         }
207     }
208     
209     xmlFree(nsListDup);
210     return true;
211 }
212
213
214
215 void yf::Sort::Record::get_xpath(xmlDoc *doc, const char *namespaces,
216                                  const char *expr)
217 {
218     xmlXPathContextPtr xpathCtx = xmlXPathNewContext(doc);
219     if (xpathCtx)
220     { 
221         register_namespaces(xpathCtx, namespaces);
222         xmlXPathObjectPtr xpathObj =
223             xmlXPathEvalExpression((const xmlChar *) expr, xpathCtx);
224         if (xpathObj)
225         {
226             xmlNodeSetPtr nodes = xpathObj->nodesetval;
227             if (nodes)
228             {
229                 int i;
230                 for (i = 0; i < nodes->nodeNr; i++)
231                 {
232                     std::string content;
233                     xmlNode *ptr = nodes->nodeTab[i];
234                     if (ptr->type == XML_ELEMENT_NODE ||
235                         ptr->type == XML_ATTRIBUTE_NODE)
236                     {
237                         content = mp::xml::get_text(ptr->children);
238                     }
239                     else if (ptr->type == XML_TEXT_NODE)
240                     {
241                         content = mp::xml::get_text(ptr);
242                     }
243                     if (content.c_str())
244                     {
245                         score = content;
246                         break;
247                     }
248                 }
249             }
250             xmlXPathFreeObject(xpathObj);
251         }
252         xmlXPathFreeContext(xpathCtx);
253     }
254 }
255
256 yf::Sort::Record::Record(Z_NamePlusRecord *n,
257                          const char *namespaces,
258                          const char *expr) : npr(n) 
259 {
260     if (npr->which == Z_NamePlusRecord_databaseRecord)
261     {
262         Z_External *ext = npr->u.databaseRecord;
263
264         if (ext->which == Z_External_octet &&
265             !oid_oidcmp(ext->direct_reference, yaz_oid_recsyn_xml))
266         {
267             xmlDoc *doc = xmlParseMemory(
268                 (const char *) ext->u.octet_aligned->buf,
269                 ext->u.octet_aligned->len);
270             if (doc)
271             {
272                 get_xpath(doc, namespaces, expr);
273                 xmlFreeDoc(doc);
274             }
275         }
276     }
277 }
278
279 yf::Sort::Record::~Record()
280 {
281 }
282
283 bool yf::Sort::Record::operator < (const Record &rhs)
284 {
285     if (strcmp(this->score.c_str(), rhs.score.c_str()) < 0)
286         return true;
287     return false;
288 }
289
290 yf::Sort::RecordList::RecordList(Odr_oid *syntax,
291                                  std::string a_namespaces,
292                                  std::string a_xpath_expr)
293     : namespaces(a_namespaces), xpath_expr(a_xpath_expr)
294
295 {
296     if (syntax)
297         this->syntax = odr_oiddup(m_odr, syntax);
298     else
299         this->syntax = 0;
300 }
301
302 yf::Sort::RecordList::~RecordList()
303 {
304     
305 }
306  
307 bool yf::Sort::RecordList::cmp(Odr_oid *syntax)
308 {
309     if ((!this->syntax && !syntax)
310         ||
311         (this->syntax && syntax && !oid_oidcmp(this->syntax, syntax)))
312         return true;
313     return false;
314 }
315
316 int yf::Sort::RecordList::size()
317 {
318     return npr_list.size();
319 }
320
321 void yf::Sort::RecordList::add(Z_NamePlusRecord *s)
322 {
323     ODR oi = m_odr;
324     Z_NamePlusRecord *npr = yaz_clone_z_NamePlusRecord(s, oi->mem);
325     Record record(npr, namespaces.c_str(), xpath_expr.c_str());
326     npr_list.push_back(record);
327 }
328
329 Z_NamePlusRecord *yf::Sort::RecordList::get(int pos, bool ascending)
330 {
331     std::list<Record>::const_iterator it = npr_list.begin();
332     int i = pos;
333     if (!ascending)
334         i = npr_list.size() - pos - 1;
335     for (; it != npr_list.end(); it++, --i)
336         if (i <= 0)
337         {
338             return it->npr;
339         }
340     return 0;
341 }
342
343 void yf::Sort::RecordList::sort()
344 {
345     npr_list.sort();
346 }
347
348 yf::Sort::Sort() : m_p(new Impl)
349 {
350 }
351
352 yf::Sort::~Sort()
353 {  // must have a destructor because of boost::scoped_ptr
354 }
355
356 void yf::Sort::configure(const xmlNode *xmlnode, bool test_only,
357                          const char *path)
358 {
359     m_p->configure(xmlnode, test_only, path);
360 }
361
362 void yf::Sort::process(mp::Package &package) const
363 {
364     m_p->process(package);
365 }
366
367
368 yf::Sort::Frontend::Frontend(Impl *impl) : 
369     m_p(impl), m_is_virtual(false), m_in_use(true)
370 {
371 }
372
373 yf::Sort::Frontend::~Frontend()
374 {
375 }
376
377
378 yf::Sort::Impl::Impl() : m_prefetch(20), m_ascending(true)
379 {
380 }
381
382 yf::Sort::Impl::~Impl()
383
384 }
385
386 yf::Sort::FrontendPtr yf::Sort::Impl::get_frontend(mp::Package &package)
387 {
388     boost::mutex::scoped_lock lock(m_mutex);
389
390     std::map<mp::Session,yf::Sort::FrontendPtr>::iterator it;
391     
392     while(true)
393     {
394         it = m_clients.find(package.session());
395         if (it == m_clients.end())
396             break;
397         
398         if (!it->second->m_in_use)
399         {
400             it->second->m_in_use = true;
401             return it->second;
402         }
403         m_cond_session_ready.wait(lock);
404     }
405     FrontendPtr f(new Frontend(this));
406     m_clients[package.session()] = f;
407     f->m_in_use = true;
408     return f;
409 }
410
411 void yf::Sort::Impl::release_frontend(mp::Package &package)
412 {
413     boost::mutex::scoped_lock lock(m_mutex);
414     std::map<mp::Session,yf::Sort::FrontendPtr>::iterator it;
415     
416     it = m_clients.find(package.session());
417     if (it != m_clients.end())
418     {
419         if (package.session().is_closed())
420         {
421             m_clients.erase(it);
422         }
423         else
424         {
425             it->second->m_in_use = false;
426         }
427         m_cond_session_ready.notify_all();
428     }
429 }
430
431 void yf::Sort::Impl::configure(const xmlNode *ptr, bool test_only,
432                                const char *path)
433 {
434     for (ptr = ptr->children; ptr; ptr = ptr->next)
435     {
436         if (ptr->type != XML_ELEMENT_NODE)
437             continue;
438         if (!strcmp((const char *) ptr->name, "config"))
439         {            
440             const struct _xmlAttr *attr;
441             for (attr = ptr->properties; attr; attr = attr->next)
442             {
443                 if (!strcmp((const char *) attr->name, "prefetch"))
444                 {
445                     m_prefetch = mp::xml::get_int(attr->children, -1);
446                     if (m_prefetch < 0)
447                     {
448                         throw mp::filter::FilterException(
449                             "Bad or missing value for attribute " + 
450                             std::string((const char *) attr->name));
451                     }
452                 }
453                 else if (!strcmp((const char *) attr->name, "xpath"))
454                 {
455                     m_xpath_expr = mp::xml::get_text(attr->children);
456                 }
457                 else if (!strcmp((const char *) attr->name, "namespaces"))
458                 {
459                     m_namespaces = mp::xml::get_text(attr->children);
460                 }
461                 else if (!strcmp((const char *) attr->name, "sortorder"))
462                 {
463                     std::string t = mp::xml::get_text(attr->children);
464                     if (t == "ascending")
465                         m_ascending = true;
466                     else if (t == "descending")
467                         m_ascending = false;
468                     else
469                         throw mp::filter::FilterException(
470                             "Bad attribute value " + t + " for attribute " +
471                             std::string((const char *) attr->name));
472
473                 }
474                 else
475                     throw mp::filter::FilterException(
476                         "Bad attribute " +
477                         std::string((const char *) attr->name));
478             }
479         }
480         else
481         {
482             throw mp::filter::FilterException
483                 ("Bad element " 
484                  + std::string((const char *) ptr->name)
485                  + " in sort filter");
486         }
487     }
488     if (m_xpath_expr.length() == 0)
489     {
490         throw mp::filter::FilterException
491             ("Missing xpath attribute for config element in sort filter");
492     }
493
494 }
495
496 void yf::Sort::Frontend::handle_records(mp::Package &package,
497                                         Z_APDU *apdu_req,
498                                         Z_Records *records,
499                                         Odr_int start_pos,
500                                         ResultSetPtr s,
501                                         Odr_oid *syntax,
502                                         const char *resultSetId)
503 {
504     if (records && records->which == Z_Records_DBOSD && start_pos == 1)
505     {
506         std::list<RecordListPtr>::const_iterator it = s->record_lists.begin();
507
508         for (; it != s->record_lists.end(); it++)
509             if ((*it)->cmp(syntax))
510                 return;
511
512         Z_NamePlusRecordList *nprl = records->u.databaseOrSurDiagnostics;
513         int i;    // i is number of records fetched in last response
514
515         int pos = 1;
516         RecordListPtr rlp(new RecordList(syntax,
517                                          m_p->m_namespaces.c_str(),
518                                          m_p->m_xpath_expr.c_str()));
519         for (i = 0; i < nprl->num_records; i++, pos++)
520             rlp->add(nprl->records[i]);
521
522         int end_pos = m_p->m_prefetch;
523         if (end_pos > s->hit_count)
524             end_pos = s->hit_count;
525         while (i && pos <= end_pos)
526         {
527             mp::odr odr;
528             i = 0;
529
530             Package present_package(package.session(), package.origin());
531             present_package.copy_filter(package);
532
533             Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentRequest);
534             Z_PresentRequest *p_req = p_apdu->u.presentRequest;
535
536             *p_req->resultSetStartPoint = pos;
537             *p_req->numberOfRecordsRequested = end_pos - pos + 1;
538             p_req->preferredRecordSyntax = syntax;
539             p_req->resultSetId = odr_strdup(odr, resultSetId);
540
541             present_package.request() = p_apdu;
542             present_package.move();
543
544             Z_GDU *gdu_res = present_package.response().get();
545             if (gdu_res && gdu_res->which == Z_GDU_Z3950 &&
546                 gdu_res->u.z3950->which == Z_APDU_presentResponse)
547             {
548                 Z_PresentResponse *res = gdu_res->u.z3950->u.presentResponse;
549                 Z_Records *records = res->records;
550                 if (records && records->which == Z_Records_DBOSD)
551                 {
552                     Z_NamePlusRecordList *nprl =
553                         records->u.databaseOrSurDiagnostics;
554                     for (i = 0; i < nprl->num_records; i++, pos++)
555                         rlp->add(nprl->records[i]);
556                 }
557             }
558         }
559         s->record_lists.push_back(rlp);
560         rlp->sort();
561
562         for (i = 0; i < nprl->num_records; i++)
563             nprl->records[i] = rlp->get(i, m_p->m_ascending);
564     }
565 }
566
567 void yf::Sort::Frontend::handle_search(mp::Package &package, Z_APDU *apdu_req)
568 {
569     Z_SearchRequest *req = apdu_req->u.searchRequest;    
570     std::string resultSetId = req->resultSetName;
571     Package b_package(package.session(), package.origin());
572     mp::odr odr;
573
574     b_package.copy_filter(package);
575     Sets_it sets_it = m_sets.find(req->resultSetName);
576     if (sets_it != m_sets.end())
577     {
578         // result set already exist 
579         // if replace indicator is off: we return diagnostic if
580         // result set already exist.
581         if (*req->replaceIndicator == 0)
582         {
583             Z_APDU *apdu = 
584                 odr.create_searchResponse(
585                     apdu_req,
586                     YAZ_BIB1_RESULT_SET_EXISTS_AND_REPLACE_INDICATOR_OFF,
587                     0);
588             package.response() = apdu;
589             return;
590         } 
591         m_sets.erase(resultSetId);
592     }
593     ResultSetPtr s(new ResultSet);
594     m_sets[resultSetId] = s;
595     package.move();
596     Z_GDU *gdu_res = package.response().get();
597     if (gdu_res && gdu_res->which == Z_GDU_Z3950 && gdu_res->u.z3950->which ==
598         Z_APDU_searchResponse)
599     {
600         Z_SearchResponse *res = gdu_res->u.z3950->u.searchResponse;
601         s->hit_count = *res->resultCount;
602         handle_records(b_package, apdu_req, res->records, 1, s,
603                        req->preferredRecordSyntax, resultSetId.c_str());
604         package.response() = gdu_res;
605     }
606 }
607
608 void yf::Sort::Frontend::handle_present(mp::Package &package, Z_APDU *apdu_req)
609 {
610     Z_PresentRequest *req = apdu_req->u.presentRequest;    
611     std::string resultSetId = req->resultSetId;
612     Package b_package(package.session(), package.origin());
613     mp::odr odr;
614
615     b_package.copy_filter(package);
616     Sets_it sets_it = m_sets.find(resultSetId);
617     if (sets_it == m_sets.end())
618     {
619         Z_APDU *apdu = 
620             odr.create_presentResponse(
621                 apdu_req,
622                 YAZ_BIB1_SPECIFIED_RESULT_SET_DOES_NOT_EXIST,
623                 resultSetId.c_str());
624         package.response() = apdu;
625         return;
626     }
627     ResultSetPtr rset = sets_it->second;
628     std::list<RecordListPtr>::const_iterator it = rset->record_lists.begin();
629     for (; it != rset->record_lists.end(); it++)
630         if ((*it)->cmp(req->preferredRecordSyntax))
631         {
632             if (*req->resultSetStartPoint - 1 + *req->numberOfRecordsRequested 
633                 <= (*it)->size())
634             {
635                 int i;
636                 Z_APDU *p_apdu = zget_APDU(odr, Z_APDU_presentResponse);
637                 Z_PresentResponse *p_res = p_apdu->u.presentResponse;
638
639                 *p_res->nextResultSetPosition = *req->resultSetStartPoint +
640                     *req->numberOfRecordsRequested;
641                 *p_res->numberOfRecordsReturned = 
642                     *req->numberOfRecordsRequested;
643                 p_res->records = (Z_Records *) 
644                     odr_malloc(odr, sizeof(*p_res->records));
645                 p_res->records->which = Z_Records_DBOSD;
646                 Z_NamePlusRecordList *nprl =  (Z_NamePlusRecordList *)
647                     odr_malloc(odr, sizeof(*nprl));
648                 p_res->records->u.databaseOrSurDiagnostics = nprl;
649                 nprl->num_records = *req->numberOfRecordsRequested;
650                 nprl->records = (Z_NamePlusRecord **)
651                     odr_malloc(odr, nprl->num_records * sizeof(*nprl->records));
652                 for (i = 0; i < nprl->num_records; i++)
653                 {
654                     int pos = i + *req->resultSetStartPoint - 1;
655                     nprl->records[i] = (*it)->get(pos, m_p->m_ascending);
656                 }
657                 package.response() = p_apdu;
658                 return;
659             }
660             break;
661         }
662     package.move();
663     Z_GDU *gdu_res = package.response().get();
664     if (gdu_res && gdu_res->which == Z_GDU_Z3950 && gdu_res->u.z3950->which ==
665         Z_APDU_presentResponse)
666     {
667         Z_PresentResponse *res = gdu_res->u.z3950->u.presentResponse;
668         handle_records(b_package, apdu_req, res->records, 
669                        *req->resultSetStartPoint, rset,
670                        req->preferredRecordSyntax, resultSetId.c_str());
671         package.response() = gdu_res;
672     }
673 }
674
675 void yf::Sort::Frontend::handle_package(mp::Package &package)
676 {
677     Z_GDU *gdu = package.request().get();
678     if (gdu && gdu->which == Z_GDU_Z3950)
679     {
680         Z_APDU *apdu_req = gdu->u.z3950;
681         switch (apdu_req->which)
682         {
683         case Z_APDU_searchRequest:
684             handle_search(package, apdu_req);
685             return;
686         case Z_APDU_presentRequest:
687             handle_present(package, apdu_req);
688             return;
689         }
690     }
691     package.move();
692 }
693
694 void yf::Sort::Impl::process(mp::Package &package)
695 {
696     FrontendPtr f = get_frontend(package);
697     Z_GDU *gdu = package.request().get();
698
699     if (f->m_is_virtual)
700     {
701         f->handle_package(package);
702     }
703     else if (gdu && gdu->which == Z_GDU_Z3950 && gdu->u.z3950->which ==
704              Z_APDU_initRequest)
705     {
706         package.move();
707         f->m_is_virtual = true;
708     }
709     else
710         package.move();
711
712     release_frontend(package);
713 }
714
715
716 static mp::filter::Base* filter_creator()
717 {
718     return new mp::filter::Sort;
719 }
720
721 extern "C" {
722     struct metaproxy_1_filter_struct metaproxy_1_filter_sort = {
723         0,
724         "sort",
725         filter_creator
726     };
727 }
728
729
730 /*
731  * Local variables:
732  * c-basic-offset: 4
733  * c-file-style: "Stroustrup"
734  * indent-tabs-mode: nil
735  * End:
736  * vim: shiftwidth=4 tabstop=8 expandtab
737  */
738