Implement ICU normalization of facets, bug #3812
[pazpar2-moved-to-github.git] / src / session.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2010 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file session.c
21     \brief high-level logic; mostly user sessions and settings
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <time.h>
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <string.h>
32 #if HAVE_SYS_TIME_H
33 #include <sys/time.h>
34 #endif
35 #if HAVE_UNISTD_H
36 #include <unistd.h>
37 #endif
38 #ifdef WIN32
39 #include <windows.h>
40 #endif
41 #include <signal.h>
42 #include <ctype.h>
43 #include <assert.h>
44
45 #include <yaz/marcdisp.h>
46 #include <yaz/comstack.h>
47 #include <yaz/tcpip.h>
48 #include <yaz/proto.h>
49 #include <yaz/readconf.h>
50 #include <yaz/pquery.h>
51 #include <yaz/otherinfo.h>
52 #include <yaz/yaz-util.h>
53 #include <yaz/nmem.h>
54 #include <yaz/query-charset.h>
55 #include <yaz/querytowrbuf.h>
56 #include <yaz/oid_db.h>
57 #include <yaz/snprintf.h>
58 #include <yaz/gettimeofday.h>
59
60 #define USE_TIMING 0
61 #if USE_TIMING
62 #include <yaz/timing.h>
63 #endif
64
65 #include "ppmutex.h"
66 #include "parameters.h"
67 #include "session.h"
68 #include "eventl.h"
69 #include "http.h"
70 #include "termlists.h"
71 #include "reclists.h"
72 #include "relevance.h"
73 #include "database.h"
74 #include "client.h"
75 #include "settings.h"
76 #include "normalize7bit.h"
77
78 #define TERMLIST_HIGH_SCORE 25
79
80 #define MAX_CHUNK 15
81
82 // Note: Some things in this structure will eventually move to configuration
83 struct parameters global_parameters = 
84 {
85     0,   // dump_records
86     0    // debug_mode
87 };
88
89 struct client_list {
90     struct client *client;
91     struct client_list *next;
92 };
93
94 static void log_xml_doc(xmlDoc *doc)
95 {
96     FILE *lf = yaz_log_file();
97     xmlChar *result = 0;
98     int len = 0;
99 #if LIBXML_VERSION >= 20600
100     xmlDocDumpFormatMemory(doc, &result, &len, 1);
101 #else
102     xmlDocDumpMemory(doc, &result, &len);
103 #endif
104     if (lf && len)
105     {
106         (void) fwrite(result, 1, len, lf);
107         fprintf(lf, "\n");
108     }
109     xmlFree(result);
110 }
111
112 static void session_enter(struct session *s)
113 {
114     yaz_mutex_enter(s->session_mutex);
115 }
116
117 static void session_leave(struct session *s)
118 {
119     yaz_mutex_leave(s->session_mutex);
120 }
121
122 // Recursively traverse query structure to extract terms.
123 void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
124 {
125     char **words;
126     int numwords;
127     int i;
128
129     switch (n->kind)
130     {
131     case CCL_RPN_AND:
132     case CCL_RPN_OR:
133     case CCL_RPN_NOT:
134     case CCL_RPN_PROX:
135         pull_terms(nmem, n->u.p[0], termlist, num);
136         pull_terms(nmem, n->u.p[1], termlist, num);
137         break;
138     case CCL_RPN_TERM:
139         nmem_strsplit(nmem, " ", n->u.t.term, &words, &numwords);
140         for (i = 0; i < numwords; i++)
141             termlist[(*num)++] = words[i];
142         break;
143     default: // NOOP
144         break;
145     }
146 }
147
148
149 void add_facet(struct session *s, const char *type, const char *value, int count)
150 {
151     struct conf_service *service = s->service;
152     pp2_relevance_token_t prt;
153     const char *facet_component;
154     WRBUF facet_wrbuf = wrbuf_alloc();
155     prt = pp2_relevance_tokenize(service->facet_pct);
156     
157     pp2_relevance_first(prt, value, 0);
158     while ((facet_component = pp2_relevance_token_next(prt)))
159     {
160         if (*facet_component)
161         {
162             if (wrbuf_len(facet_wrbuf))
163                 wrbuf_puts(facet_wrbuf, " ");
164             wrbuf_puts(facet_wrbuf, facet_component);
165         }
166     }
167     pp2_relevance_token_destroy(prt);
168     
169     if (wrbuf_len(facet_wrbuf))
170     {
171         int i;
172         for (i = 0; i < s->num_termlists; i++)
173             if (!strcmp(s->termlists[i].name, type))
174                 break;
175         if (i == s->num_termlists)
176         {
177             if (i == SESSION_MAX_TERMLISTS)
178             {
179                 session_log(s, YLOG_FATAL, "Too many termlists");
180                 wrbuf_destroy(facet_wrbuf);
181                 return;
182             }
183             
184             s->termlists[i].name = nmem_strdup(s->nmem, type);
185             s->termlists[i].termlist 
186                 = termlist_create(s->nmem, TERMLIST_HIGH_SCORE);
187             s->num_termlists = i + 1;
188         }
189         
190         session_log(s, YLOG_DEBUG, "Session: facets for %s: %s norm:%s (%d)",
191                     type, value, wrbuf_cstr(facet_wrbuf), count);
192         termlist_insert(s->termlists[i].termlist, wrbuf_cstr(facet_wrbuf),
193                         count);
194     }
195     wrbuf_destroy(facet_wrbuf);
196 }
197
198 static xmlDoc *record_to_xml(struct session *se,
199                              struct session_database *sdb, const char *rec)
200 {
201     struct database *db = sdb->database;
202     xmlDoc *rdoc = 0;
203
204     rdoc = xmlParseMemory(rec, strlen(rec));
205
206     if (!rdoc)
207     {
208         session_log(se, YLOG_FATAL, "Non-wellformed XML received from %s",
209                     db->url);
210         return 0;
211     }
212
213     if (global_parameters.dump_records)
214     {
215         session_log(se, YLOG_LOG, "Un-normalized record from %s", db->url);
216         log_xml_doc(rdoc);
217     }
218
219     return rdoc;
220 }
221
222 #define MAX_XSLT_ARGS 16
223
224 // Add static values from session database settings if applicable
225 static void insert_settings_parameters(struct session_database *sdb,
226                                        struct conf_service *service,
227                                        char **parms,
228                                        NMEM nmem)
229 {
230     int i;
231     int nparms = 0;
232     int offset = 0;
233
234     for (i = 0; i < service->num_metadata; i++)
235     {
236         struct conf_metadata *md = &service->metadata[i];
237         int setting;
238
239         if (md->setting == Metadata_setting_parameter &&
240             (setting = settings_lookup_offset(service, md->name)) >= 0)
241         {
242             const char *val = session_setting_oneval(sdb, setting);
243             if (val && nparms < MAX_XSLT_ARGS)
244             {
245                 char *buf;
246                 int len = strlen(val);
247                 buf = nmem_malloc(nmem, len + 3);
248                 buf[0] = '\'';
249                 strcpy(buf + 1, val);
250                 buf[len+1] = '\'';
251                 buf[len+2] = '\0';
252                 parms[offset++] = md->name;
253                 parms[offset++] = buf;
254                 nparms++;
255             }
256         }
257     }
258     parms[offset] = 0;
259 }
260
261 // Add static values from session database settings if applicable
262 static void insert_settings_values(struct session_database *sdb, xmlDoc *doc,
263     struct conf_service *service)
264 {
265     int i;
266
267     for (i = 0; i < service->num_metadata; i++)
268     {
269         struct conf_metadata *md = &service->metadata[i];
270         int offset;
271
272         if (md->setting == Metadata_setting_postproc &&
273             (offset = settings_lookup_offset(service, md->name)) >= 0)
274         {
275             const char *val = session_setting_oneval(sdb, offset);
276             if (val)
277             {
278                 xmlNode *r = xmlDocGetRootElement(doc);
279                 xmlNode *n = xmlNewTextChild(r, 0, (xmlChar *) "metadata",
280                                              (xmlChar *) val);
281                 xmlSetProp(n, (xmlChar *) "type", (xmlChar *) md->name);
282             }
283         }
284     }
285 }
286
287 static xmlDoc *normalize_record(struct session *se,
288                                 struct session_database *sdb,
289                                 struct conf_service *service,
290                                 const char *rec, NMEM nmem)
291 {
292     xmlDoc *rdoc = record_to_xml(se, sdb, rec);
293
294     if (rdoc)
295     {
296         char *parms[MAX_XSLT_ARGS*2+1];
297         
298         insert_settings_parameters(sdb, service, parms, nmem);
299         
300         if (normalize_record_transform(sdb->map, &rdoc, (const char **)parms))
301         {
302             session_log(se, YLOG_WARN, "Normalize failed from %s",
303                         sdb->database->url);
304         }
305         else
306         {
307             insert_settings_values(sdb, rdoc, service);
308             
309             if (global_parameters.dump_records)
310             {
311                 session_log(se, YLOG_LOG, "Normalized record from %s", 
312                             sdb->database->url);
313                 log_xml_doc(rdoc);
314             }
315         }
316     }
317     return rdoc;
318 }
319
320 void session_settings_dump(struct session *se,
321                            struct session_database *db,
322                            WRBUF w)
323 {
324     if (db->settings)
325     {
326         int i, num = db->num_settings;
327         for (i = 0; i < num; i++)
328         {
329             struct setting *s = db->settings[i];
330             for (;s ; s = s->next)
331             {
332                 wrbuf_puts(w, "<set name=\"");
333                 wrbuf_xmlputs(w, s->name);
334                 wrbuf_puts(w, "\" value=\"");
335                 wrbuf_xmlputs(w, s->value);
336                 wrbuf_puts(w, "\"/>");
337             }
338             if (db->settings[i])
339                 wrbuf_puts(w, "\n");
340         }
341     }
342 }
343
344 // Retrieve first defined value for 'name' for given database.
345 // Will be extended to take into account user associated with session
346 const char *session_setting_oneval(struct session_database *db, int offset)
347 {
348     if (offset >= db->num_settings || !db->settings[offset])
349         return "";
350     return db->settings[offset]->value;
351 }
352
353 // Prepare XSLT stylesheets for record normalization
354 // Structures are allocated on the session_wide nmem to avoid having
355 // to recompute this for every search. This would lead
356 // to leaking if a single session was to repeatedly change the PZ_XSLT
357 // setting. However, this is not a realistic use scenario.
358 static int prepare_map(struct session *se, struct session_database *sdb)
359 {
360     const char *s;
361
362     if (!sdb->settings)
363     {
364         session_log(se, YLOG_WARN, "No settings on %s", sdb->database->url);
365         return -1;
366     }
367     if ((s = session_setting_oneval(sdb, PZ_XSLT)))
368     {
369         char auto_stylesheet[256];
370
371         if (!strcmp(s, "auto"))
372         {
373             const char *request_syntax = session_setting_oneval(
374                 sdb, PZ_REQUESTSYNTAX);
375             if (request_syntax)
376             {
377                 char *cp;
378                 yaz_snprintf(auto_stylesheet, sizeof(auto_stylesheet),
379                              "%s.xsl", request_syntax);
380                 for (cp = auto_stylesheet; *cp; cp++)
381                 {
382                     /* deliberately only consider ASCII */
383                     if (*cp > 32 && *cp < 127)
384                         *cp = tolower(*cp);
385                 }
386                 s = auto_stylesheet;
387             }
388             else
389             {
390                 session_log(se, YLOG_WARN,
391                             "No pz:requestsyntax for auto stylesheet");
392             }
393         }
394         sdb->map = normalize_cache_get(se->normalize_cache,
395                                        se->service->server->config, s);
396         if (!sdb->map)
397             return -1;
398     }
399     return 0;
400 }
401
402 // This analyzes settings and recomputes any supporting data structures
403 // if necessary.
404 static int prepare_session_database(struct session *se, 
405                                     struct session_database *sdb)
406 {
407     if (!sdb->settings)
408     {
409         session_log(se, YLOG_WARN, 
410                 "No settings associated with %s", sdb->database->url);
411         return -1;
412     }
413     if (sdb->settings[PZ_XSLT] && !sdb->map)
414     {
415         if (prepare_map(se, sdb) < 0)
416             return -1;
417     }
418     return 0;
419 }
420
421 // called if watch should be removed because http_channel is to be destroyed
422 static void session_watch_cancel(void *data, struct http_channel *c,
423                                  void *data2)
424 {
425     struct session_watchentry *ent = data;
426
427     ent->fun = 0;
428     ent->data = 0;
429     ent->obs = 0;
430 }
431
432 // set watch. Returns 0=OK, -1 if watch is already set
433 int session_set_watch(struct session *s, int what, 
434                       session_watchfun fun, void *data,
435                       struct http_channel *chan)
436 {
437     int ret;
438     session_enter(s);
439     if (s->watchlist[what].fun)
440         ret = -1;
441     else
442     {
443         
444         s->watchlist[what].fun = fun;
445         s->watchlist[what].data = data;
446         s->watchlist[what].obs = http_add_observer(chan, &s->watchlist[what],
447                                                    session_watch_cancel);
448         ret = 0;
449     }
450     session_leave(s);
451     return 0;
452 }
453
454 void session_alert_watch(struct session *s, int what)
455 {
456     assert(s);
457     session_enter(s);
458     if (s->watchlist[what].fun)
459     {
460         /* our watch is no longer associated with http_channel */
461         void *data;
462         session_watchfun fun;
463
464         http_remove_observer(s->watchlist[what].obs);
465         fun  = s->watchlist[what].fun;
466         data = s->watchlist[what].data;
467
468         /* reset watch before fun is invoked - in case fun wants to set
469            it again */
470         s->watchlist[what].fun = 0;
471         s->watchlist[what].data = 0;
472         s->watchlist[what].obs = 0;
473
474         session_leave(s);
475         session_log(s, YLOG_DEBUG,
476                     "session_alert_watch: %d calling function: %p", what, fun);
477         fun(data);
478     }
479     else
480         session_leave(s);
481 }
482
483 //callback for grep_databases
484 static void select_targets_callback(void *context, struct session_database *db)
485 {
486     struct session *se = (struct session*) context;
487     struct client *cl = client_create();
488     struct client_list *l;
489     client_set_database(cl, db);
490
491     client_set_session(cl, se);
492     l = xmalloc(sizeof(*l));
493     l->client = cl;
494     l->next = se->clients;
495     se->clients = l;
496 }
497
498 static void session_remove_clients(struct session *se)
499 {
500     struct client_list *l;
501
502     session_enter(se);
503     l = se->clients;
504     se->clients = 0;
505     session_leave(se);
506
507     while (l)
508     {
509         struct client_list *l_next = l->next;
510         client_lock(l->client);
511         client_set_session(l->client, 0);
512         client_set_database(l->client, 0);
513         client_unlock(l->client);
514         client_destroy(l->client);
515         xfree(l);
516         l = l_next;
517     }
518 }
519
520 // Associates a set of clients with a session;
521 // Note: Session-databases represent databases with per-session 
522 // setting overrides
523 static int select_targets(struct session *se, const char *filter)
524 {
525     return session_grep_databases(se, filter, select_targets_callback);
526 }
527
528 int session_active_clients(struct session *s)
529 {
530     struct client_list *l;
531     int res = 0;
532
533     for (l = s->clients; l; l = l->next)
534         if (client_is_active(l->client))
535             res++;
536
537     return res;
538 }
539
540 int session_is_preferred_clients_ready(struct session *s)
541 {
542     struct client_list *l;
543     int res = 0;
544
545     for (l = s->clients; l; l = l->next)
546         if (client_is_active_preferred(l->client))
547             res++;
548     session_log(s, YLOG_DEBUG, "Session has %d active preferred clients.", res);
549     return res == 0;
550 }
551
552
553
554 enum pazpar2_error_code search(struct session *se,
555                                const char *query,
556                                const char *startrecs, const char *maxrecs,
557                                const char *filter,
558                                const char **addinfo)
559 {
560     int live_channels = 0;
561     int no_working = 0;
562     int no_failed = 0;
563     struct client_list *l;
564     struct timeval tval;
565
566     session_log(se, YLOG_DEBUG, "Search");
567
568     *addinfo = 0;
569
570     session_remove_clients(se);
571     
572     session_enter(se);
573     reclist_destroy(se->reclist);
574     se->reclist = 0;
575     relevance_destroy(&se->relevance);
576     nmem_reset(se->nmem);
577     se->total_records = se->total_hits = se->total_merged = 0;
578     se->num_termlists = 0;
579     live_channels = select_targets(se, filter);
580     if (!live_channels)
581     {
582         session_leave(se);
583         return PAZPAR2_NO_TARGETS;
584     }
585     se->reclist = reclist_create(se->nmem);
586
587     yaz_gettimeofday(&tval);
588     
589     tval.tv_sec += 5;
590
591     for (l = se->clients; l; l = l->next)
592     {
593         struct client *cl = l->client;
594
595         if (maxrecs)
596             client_set_maxrecs(cl, atoi(maxrecs));
597         if (startrecs)
598             client_set_startrecs(cl, atoi(startrecs));
599         if (prepare_session_database(se, client_get_database(cl)) < 0)
600             ;
601         else if (client_parse_query(cl, query) < 0)
602             no_failed++;
603         else
604         {
605             no_working++;
606             if (client_prep_connection(cl, se->service->z3950_operation_timeout,
607                                        se->service->z3950_session_timeout,
608                                        se->service->server->iochan_man,
609                                        &tval))
610                 client_start_search(cl);
611         }
612     }
613     session_leave(se);
614     if (no_working == 0)
615     {
616         if (no_failed > 0)
617         {
618             *addinfo = "query";
619             return PAZPAR2_MALFORMED_PARAMETER_VALUE;
620         }
621         else
622             return PAZPAR2_NO_TARGETS;
623     }
624     return PAZPAR2_NO_ERROR;
625 }
626
627 // Creates a new session_database object for a database
628 static void session_init_databases_fun(void *context, struct database *db)
629 {
630     struct session *se = (struct session *) context;
631     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
632     int i;
633
634     new->database = db;
635     
636     new->map = 0;
637     assert(db->settings);
638     new->settings = nmem_malloc(se->session_nmem,
639                                 sizeof(struct settings *) * db->num_settings);
640     new->num_settings = db->num_settings;
641     for (i = 0; i < db->num_settings; i++)
642     {
643         struct setting *setting = db->settings[i];
644         new->settings[i] = setting;
645     }
646     new->next = se->databases;
647     se->databases = new;
648 }
649
650 // Doesn't free memory associated with sdb -- nmem takes care of that
651 static void session_database_destroy(struct session_database *sdb)
652 {
653     sdb->map = 0;
654 }
655
656 // Initialize session_database list -- this represents this session's view
657 // of the database list -- subject to modification by the settings ws command
658 void session_init_databases(struct session *se)
659 {
660     se->databases = 0;
661     predef_grep_databases(se, se->service, session_init_databases_fun);
662 }
663
664 // Probably session_init_databases_fun should be refactored instead of
665 // called here.
666 static struct session_database *load_session_database(struct session *se, 
667                                                       char *id)
668 {
669     struct database *db = new_database(id, se->session_nmem);
670
671     resolve_database(se->service, db);
672
673     session_init_databases_fun((void*) se, db);
674
675     // New sdb is head of se->databases list
676     return se->databases;
677 }
678
679 // Find an existing session database. If not found, load it
680 static struct session_database *find_session_database(struct session *se, 
681                                                       char *id)
682 {
683     struct session_database *sdb;
684
685     for (sdb = se->databases; sdb; sdb = sdb->next)
686         if (!strcmp(sdb->database->url, id))
687             return sdb;
688     return load_session_database(se, id);
689 }
690
691 // Apply a session override to a database
692 void session_apply_setting(struct session *se, char *dbname, char *setting,
693                            char *value)
694 {
695     struct session_database *sdb = find_session_database(se, dbname);
696     struct conf_service *service = se->service;
697     struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
698     int offset = settings_create_offset(service, setting);
699
700     expand_settings_array(&sdb->settings, &sdb->num_settings, offset,
701                           se->session_nmem);
702     new->precedence = 0;
703     new->target = dbname;
704     new->name = setting;
705     new->value = value;
706     new->next = sdb->settings[offset];
707     sdb->settings[offset] = new;
708
709     // Force later recompute of settings-driven data structures
710     // (happens when a search starts and client connections are prepared)
711     switch (offset)
712     {
713     case PZ_XSLT:
714         if (sdb->map)
715         {
716             sdb->map = 0;
717         }
718         break;
719     }
720 }
721
722 void destroy_session(struct session *se)
723 {
724     struct session_database *sdb;
725
726     session_log(se, YLOG_DEBUG, "Pazpar2 session destroy");
727     session_remove_clients(se);
728
729     for (sdb = se->databases; sdb; sdb = sdb->next)
730         session_database_destroy(sdb);
731     normalize_cache_destroy(se->normalize_cache);
732     relevance_destroy(&se->relevance);
733     reclist_destroy(se->reclist);
734     nmem_destroy(se->nmem);
735     service_destroy(se->service);
736     yaz_mutex_destroy(&se->session_mutex);
737     wrbuf_destroy(se->wrbuf);
738 }
739
740 struct session *new_session(NMEM nmem, struct conf_service *service,
741                             unsigned session_id)
742 {
743     int i;
744     struct session *session = nmem_malloc(nmem, sizeof(*session));
745
746     char tmp_str[50];
747
748     sprintf(tmp_str, "session#%u", session_id);
749
750     session->session_id = session_id;
751     session_log(session, YLOG_DEBUG, "New session");
752     session->service = service;
753     session->relevance = 0;
754     session->total_hits = 0;
755     session->total_records = 0;
756     session->number_of_warnings_unknown_elements = 0;
757     session->number_of_warnings_unknown_metadata = 0;
758     session->num_termlists = 0;
759     session->reclist = 0;
760     session->clients = 0;
761     session->session_nmem = nmem;
762     session->nmem = nmem_create();
763     session->wrbuf = wrbuf_alloc();
764     session->databases = 0;
765     for (i = 0; i <= SESSION_WATCH_MAX; i++)
766     {
767         session->watchlist[i].data = 0;
768         session->watchlist[i].fun = 0;
769     }
770     session->normalize_cache = normalize_cache_create();
771     session->session_mutex = 0;
772     pazpar2_mutex_create(&session->session_mutex, tmp_str);
773
774     return session;
775 }
776
777 struct hitsbytarget *hitsbytarget(struct session *se, int *count, NMEM nmem)
778 {
779     struct hitsbytarget *res = 0;
780     struct client_list *l;
781     size_t sz = 0;
782
783     session_enter(se);
784     for (l = se->clients; l; l = l->next)
785         sz++;
786
787     res = nmem_malloc(nmem, sizeof(*res) * sz);
788     *count = 0;
789     for (l = se->clients; l; l = l->next)
790     {
791         struct client *cl = l->client;
792         WRBUF w = wrbuf_alloc();
793         const char *name = session_setting_oneval(client_get_database(cl),
794                                                   PZ_NAME);
795
796         res[*count].id = client_get_database(cl)->database->url;
797         res[*count].name = *name ? name : "Unknown";
798         res[*count].hits = client_get_hits(cl);
799         res[*count].records = client_get_num_records(cl);
800         res[*count].diagnostic = client_get_diagnostic(cl);
801         res[*count].state = client_get_state_str(cl);
802         res[*count].connected  = client_get_connection(cl) ? 1 : 0;
803         session_settings_dump(se, client_get_database(cl), w);
804         res[*count].settings_xml = w;
805         (*count)++;
806     }
807     session_leave(se);
808     return res;
809 }
810
811 struct termlist_score **termlist(struct session *se, const char *name, int *num)
812 {
813     int i;
814     struct termlist_score **tl = 0;
815
816     session_enter(se);
817     for (i = 0; i < se->num_termlists; i++)
818         if (!strcmp((const char *) se->termlists[i].name, name))
819         {
820             tl = termlist_highscore(se->termlists[i].termlist, num);
821             break;
822         }
823     session_leave(se);
824     return tl;
825 }
826
827 #ifdef MISSING_HEADERS
828 void report_nmem_stats(void)
829 {
830     size_t in_use, is_free;
831
832     nmem_get_memory_in_use(&in_use);
833     nmem_get_memory_free(&is_free);
834
835     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
836             (long) in_use, (long) is_free);
837 }
838 #endif
839
840 struct record_cluster *show_single_start(struct session *se, const char *id,
841                                          struct record_cluster **prev_r,
842                                          struct record_cluster **next_r)
843 {
844     struct record_cluster *r;
845
846     session_enter(se);
847     reclist_enter(se->reclist);
848     *prev_r = 0;
849     *next_r = 0;
850     while ((r = reclist_read_record(se->reclist)))
851     {
852         if (!strcmp(r->recid, id))
853         {
854             *next_r = reclist_read_record(se->reclist);
855             break;
856         }
857         *prev_r = r;
858     }
859     reclist_leave(se->reclist);
860     if (!r)
861         session_leave(se);
862     return r;
863 }
864
865 void show_single_stop(struct session *se, struct record_cluster *rec)
866 {
867     session_leave(se);
868 }
869
870 struct record_cluster **show_range_start(struct session *se,
871                                          struct reclist_sortparms *sp, 
872                                          int start, int *num, int *total, Odr_int *sumhits)
873 {
874     struct record_cluster **recs;
875     struct reclist_sortparms *spp;
876     int i;
877 #if USE_TIMING    
878     yaz_timing_t t = yaz_timing_create();
879 #endif
880     session_enter(se);
881     recs = nmem_malloc(se->nmem, *num * sizeof(struct record_cluster *));
882     if (!se->relevance)
883     {
884         *num = 0;
885         *total = 0;
886         *sumhits = 0;
887         recs = 0;
888     }
889     else
890     {
891         for (spp = sp; spp; spp = spp->next)
892             if (spp->type == Metadata_sortkey_relevance)
893             {
894                 relevance_prepare_read(se->relevance, se->reclist);
895                 break;
896             }
897         reclist_sort(se->reclist, sp);
898         
899         reclist_enter(se->reclist);
900         *total = reclist_get_num_records(se->reclist);
901         *sumhits = se->total_hits;
902         
903         for (i = 0; i < start; i++)
904             if (!reclist_read_record(se->reclist))
905             {
906                 *num = 0;
907                 recs = 0;
908                 break;
909             }
910         
911         for (i = 0; i < *num; i++)
912         {
913             struct record_cluster *r = reclist_read_record(se->reclist);
914             if (!r)
915             {
916                 *num = i;
917                 break;
918             }
919             recs[i] = r;
920         }
921         reclist_leave(se->reclist);
922     }
923 #if USE_TIMING
924     yaz_timing_stop(t);
925     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
926             yaz_timing_get_real(t), yaz_timing_get_user(t),
927             yaz_timing_get_sys(t));
928     yaz_timing_destroy(&t);
929 #endif
930     return recs;
931 }
932
933 void show_range_stop(struct session *se, struct record_cluster **recs)
934 {
935     session_leave(se);
936 }
937
938 void statistics(struct session *se, struct statistics *stat)
939 {
940     struct client_list *l;
941     int count = 0;
942
943     memset(stat, 0, sizeof(*stat));
944     for (l = se->clients; l; l = l->next)
945     {
946         struct client *cl = l->client;
947         if (!client_get_connection(cl))
948             stat->num_no_connection++;
949         switch (client_get_state(cl))
950         {
951         case Client_Connecting: stat->num_connecting++; break;
952         case Client_Working: stat->num_working++; break;
953         case Client_Idle: stat->num_idle++; break;
954         case Client_Failed: stat->num_failed++; break;
955         case Client_Error: stat->num_error++; break;
956         default: break;
957         }
958         count++;
959     }
960     stat->num_hits = se->total_hits;
961     stat->num_records = se->total_records;
962
963     stat->num_clients = count;
964 }
965
966 static struct record_metadata *record_metadata_init(
967     NMEM nmem, const char *value, enum conf_metadata_type type,
968     struct _xmlAttr *attr)
969 {
970     struct record_metadata *rec_md = record_metadata_create(nmem);
971     struct record_metadata_attr **attrp = &rec_md->attributes;
972     
973     for (; attr; attr = attr->next)
974     {
975         if (attr->children && attr->children->content)
976         {
977             if (strcmp((const char *) attr->name, "type"))
978             {  /* skip the "type" attribute.. Its value is already part of
979                   the element in output (md-%s) and so repeating it here
980                   is redundant */
981                 *attrp = nmem_malloc(nmem, sizeof(**attrp));
982                 (*attrp)->name =
983                     nmem_strdup(nmem, (const char *) attr->name);
984                 (*attrp)->value =
985                     nmem_strdup(nmem, (const char *) attr->children->content);
986                 attrp = &(*attrp)->next;
987             }
988         }
989     }
990     *attrp = 0;
991
992     if (type == Metadata_type_generic)
993     {
994         char *p = nmem_strdup(nmem, value);
995
996         p = normalize7bit_generic(p, " ,/.:([");
997         
998         rec_md->data.text.disp = p;
999         rec_md->data.text.sort = 0;
1000     }
1001     else if (type == Metadata_type_year || type == Metadata_type_date)
1002     {
1003         int first, last;
1004         int longdate = 0;
1005
1006         if (type == Metadata_type_date)
1007             longdate = 1;
1008         if (extract7bit_dates((char *) value, &first, &last, longdate) < 0)
1009             return 0;
1010
1011         rec_md->data.number.min = first;
1012         rec_md->data.number.max = last;
1013     }
1014     else
1015         return 0;
1016     return rec_md;
1017 }
1018
1019 static int get_mergekey_from_doc(xmlDoc *doc, xmlNode *root, const char *name,
1020                                  struct conf_service *service, WRBUF norm_wr)
1021 {
1022     xmlNode *n;
1023     int no_found = 0;
1024     for (n = root->children; n; n = n->next)
1025     {
1026         if (n->type != XML_ELEMENT_NODE)
1027             continue;
1028         if (!strcmp((const char *) n->name, "metadata"))
1029         {
1030             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
1031             if (type == NULL) {
1032                 yaz_log(YLOG_FATAL, "Missing type attribute on metadata element. Skipping!");
1033             }
1034             else if (!strcmp(name, (const char *) type))
1035             {
1036                 xmlChar *value = xmlNodeListGetString(doc, n->children, 1);
1037                 if (value)
1038                 {
1039                     const char *norm_str;
1040                     pp2_relevance_token_t prt =
1041                         pp2_relevance_tokenize(service->mergekey_pct);
1042                     
1043                     pp2_relevance_first(prt, (const char *) value, 0);
1044                     if (wrbuf_len(norm_wr) > 0)
1045                         wrbuf_puts(norm_wr, " ");
1046                     wrbuf_puts(norm_wr, name);
1047                     while ((norm_str =
1048                             pp2_relevance_token_next(prt)))
1049                     {
1050                         if (*norm_str)
1051                         {
1052                             wrbuf_puts(norm_wr, " ");
1053                             wrbuf_puts(norm_wr, norm_str);
1054                         }
1055                     }
1056                     xmlFree(value);
1057                     pp2_relevance_token_destroy(prt);
1058                     no_found++;
1059                 }
1060             }
1061             xmlFree(type);
1062         }
1063     }
1064     return no_found;
1065 }
1066
1067 static const char *get_mergekey(xmlDoc *doc, struct client *cl, int record_no,
1068                                 struct conf_service *service, NMEM nmem)
1069 {
1070     char *mergekey_norm = 0;
1071     xmlNode *root = xmlDocGetRootElement(doc);
1072     WRBUF norm_wr = wrbuf_alloc();
1073
1074     /* consider mergekey from XSL first */
1075     xmlChar *mergekey = xmlGetProp(root, (xmlChar *) "mergekey");
1076     if (mergekey)
1077     {
1078         const char *norm_str;
1079         pp2_relevance_token_t prt =
1080             pp2_relevance_tokenize(service->mergekey_pct);
1081
1082         pp2_relevance_first(prt, (const char *) mergekey, 0);
1083         while ((norm_str = pp2_relevance_token_next(prt)))
1084         {
1085             if (*norm_str)
1086             {
1087                 if (wrbuf_len(norm_wr))
1088                     wrbuf_puts(norm_wr, " ");
1089                 wrbuf_puts(norm_wr, norm_str);
1090             }
1091         }
1092         pp2_relevance_token_destroy(prt);
1093         xmlFree(mergekey);
1094     }
1095     else
1096     {
1097         /* no mergekey defined in XSL. Look for mergekey metadata instead */
1098         int field_id;
1099         for (field_id = 0; field_id < service->num_metadata; field_id++)
1100         {
1101             struct conf_metadata *ser_md = &service->metadata[field_id];
1102             if (ser_md->mergekey != Metadata_mergekey_no)
1103             {
1104                 int r = get_mergekey_from_doc(doc, root, ser_md->name,
1105                                               service, norm_wr);
1106                 if (r == 0 && ser_md->mergekey == Metadata_mergekey_required)
1107                 {
1108                     /* no mergekey on this one and it is required.. 
1109                        Generate unique key instead */
1110                     wrbuf_rewind(norm_wr);
1111                     break;
1112                 }
1113             }
1114         }
1115     }
1116
1117     /* generate unique key if none is not generated already or is empty */
1118     if (wrbuf_len(norm_wr) == 0)
1119     {
1120         wrbuf_printf(norm_wr, "%s-%d",
1121                      client_get_database(cl)->database->url, record_no);
1122     }
1123     if (wrbuf_len(norm_wr) > 0)
1124         mergekey_norm = nmem_strdup(nmem, wrbuf_cstr(norm_wr));
1125     wrbuf_destroy(norm_wr);
1126     return mergekey_norm;
1127 }
1128
1129 /** \brief see if metadata for pz:recordfilter exists 
1130     \param root xml root element of normalized record
1131     \param sdb session database for client
1132     \retval 0 if there is no metadata for pz:recordfilter
1133     \retval 1 if there is metadata for pz:recordfilter
1134
1135     If there is no pz:recordfilter defined, this function returns 1
1136     as well.
1137 */
1138     
1139 static int check_record_filter(xmlNode *root, struct session_database *sdb)
1140 {
1141     int match = 0;
1142     xmlNode *n;
1143     const char *s;
1144     s = session_setting_oneval(sdb, PZ_RECORDFILTER);
1145
1146     if (!s || !*s)
1147         return 1;
1148
1149     for (n = root->children; n; n = n->next)
1150     {
1151         if (n->type != XML_ELEMENT_NODE)
1152             continue;
1153         if (!strcmp((const char *) n->name, "metadata"))
1154         {
1155             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
1156             if (type)
1157             {
1158                 size_t len;
1159                 int substring;
1160                 const char *eq;
1161
1162                 if ((eq = strchr(s, '=')))
1163                     substring = 0;
1164                 else if ((eq = strchr(s, '~')))
1165                     substring = 1;
1166                 if (eq)
1167                     len = eq - s;
1168                 else
1169                     len = strlen(s);
1170                 if (len == strlen((const char *)type) &&
1171                     !memcmp((const char *) type, s, len))
1172                 {
1173                     xmlChar *value = xmlNodeGetContent(n);
1174                     if (value && *value)
1175                     {
1176                         if (!eq ||
1177                             (substring && strstr((const char *) value, eq+1)) ||
1178                             (!substring && !strcmp((const char *) value, eq + 1)))
1179                             match = 1;
1180                     }
1181                     xmlFree(value);
1182                 }
1183                 xmlFree(type);
1184             }
1185         }
1186     }
1187     return match;
1188 }
1189
1190
1191 static int ingest_to_cluster(struct client *cl,
1192                              xmlDoc *xdoc,
1193                              xmlNode *root,
1194                              int record_no,
1195                              const char *mergekey_norm);
1196
1197 /** \brief ingest XML record
1198     \param cl client holds the result set for record
1199     \param rec record buffer (0 terminated)
1200     \param record_no record position (1, 2, ..)
1201     \param nmem working NMEM
1202     \retval 0 OK
1203     \retval -1 failure
1204 */
1205 int ingest_record(struct client *cl, const char *rec,
1206                   int record_no, NMEM nmem)
1207 {
1208     struct session *se = client_get_session(cl);
1209     int ret = 0;
1210     struct session_database *sdb = client_get_database(cl);
1211     struct conf_service *service = se->service;
1212     xmlDoc *xdoc = normalize_record(se, sdb, service, rec, nmem);
1213     xmlNode *root;
1214     const char *mergekey_norm;
1215     
1216     if (!xdoc)
1217         return -1;
1218     
1219     root = xmlDocGetRootElement(xdoc);
1220     
1221     if (!check_record_filter(root, sdb))
1222     {
1223         session_log(se, YLOG_WARN, "Filtered out record no %d from %s",
1224                     record_no, sdb->database->url);
1225         xmlFreeDoc(xdoc);
1226         return -1;
1227     }
1228     
1229     mergekey_norm = get_mergekey(xdoc, cl, record_no, service, nmem);
1230     if (!mergekey_norm)
1231     {
1232         session_log(se, YLOG_WARN, "Got no mergekey");
1233         xmlFreeDoc(xdoc);
1234         return -1;
1235     }
1236     session_enter(se);
1237     if (client_get_session(cl) == se)
1238         ret = ingest_to_cluster(cl, xdoc, root, record_no, mergekey_norm);
1239     session_leave(se);
1240     
1241     xmlFreeDoc(xdoc);
1242     return ret;
1243 }
1244
1245 static int ingest_to_cluster(struct client *cl,
1246                              xmlDoc *xdoc,
1247                              xmlNode *root,
1248                              int record_no,
1249                              const char *mergekey_norm)
1250 {
1251     xmlNode *n;
1252     xmlChar *type = 0;
1253     xmlChar *value = 0;
1254     struct session_database *sdb = client_get_database(cl);
1255     struct session *se = client_get_session(cl);
1256     struct conf_service *service = se->service;
1257     struct record *record = record_create(se->nmem, 
1258                                           service->num_metadata,
1259                                           service->num_sortkeys, cl,
1260                                           record_no);
1261     struct record_cluster *cluster = reclist_insert(se->reclist,
1262                                                     service, 
1263                                                     record,
1264                                                     mergekey_norm,
1265                                                     &se->total_merged);
1266     if (!cluster)
1267         return -1;
1268     if (global_parameters.dump_records)
1269         session_log(se, YLOG_LOG, "Cluster id %s from %s (#%d)", cluster->recid,
1270                     sdb->database->url, record_no);
1271     relevance_newrec(se->relevance, cluster);
1272     
1273     // now parsing XML record and adding data to cluster or record metadata
1274     for (n = root->children; n; n = n->next)
1275     {
1276         pp2_relevance_token_t prt;
1277         if (type)
1278             xmlFree(type);
1279         if (value)
1280             xmlFree(value);
1281         type = value = 0;
1282         
1283         if (n->type != XML_ELEMENT_NODE)
1284             continue;
1285         if (!strcmp((const char *) n->name, "metadata"))
1286         {
1287             struct conf_metadata *ser_md = 0;
1288             struct conf_sortkey *ser_sk = 0;
1289             struct record_metadata **wheretoput = 0;
1290             struct record_metadata *rec_md = 0;
1291             int md_field_id = -1;
1292             int sk_field_id = -1;
1293             
1294             type = xmlGetProp(n, (xmlChar *) "type");
1295             value = xmlNodeListGetString(xdoc, n->children, 1);
1296             
1297             if (!type || !value || !*value)
1298                 continue;
1299             
1300             md_field_id 
1301                 = conf_service_metadata_field_id(service, (const char *) type);
1302             if (md_field_id < 0)
1303             {
1304                 if (se->number_of_warnings_unknown_metadata == 0)
1305                 {
1306                     session_log(se, YLOG_WARN, 
1307                             "Ignoring unknown metadata element: %s", type);
1308                 }
1309                 se->number_of_warnings_unknown_metadata++;
1310                 continue;
1311             }
1312             
1313             ser_md = &service->metadata[md_field_id];
1314             
1315             if (ser_md->sortkey_offset >= 0){
1316                 sk_field_id = ser_md->sortkey_offset;
1317                 ser_sk = &service->sortkeys[sk_field_id];
1318             }
1319
1320             // non-merged metadata
1321             rec_md = record_metadata_init(se->nmem, (const char *) value,
1322                                           ser_md->type, n->properties);
1323             if (!rec_md)
1324             {
1325                 session_log(se, YLOG_WARN, "bad metadata data '%s' "
1326                             "for element '%s'", value, type);
1327                 continue;
1328             }
1329             wheretoput = &record->metadata[md_field_id];
1330             while (*wheretoput)
1331                 wheretoput = &(*wheretoput)->next;
1332             *wheretoput = rec_md;
1333
1334             // merged metadata
1335             rec_md = record_metadata_init(se->nmem, (const char *) value,
1336                                           ser_md->type, 0);
1337             wheretoput = &cluster->metadata[md_field_id];
1338
1339             // and polulate with data:
1340             // assign cluster or record based on merge action
1341             if (ser_md->merge == Metadata_merge_unique)
1342             {
1343                 struct record_metadata *mnode;
1344                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
1345                     if (!strcmp((const char *) mnode->data.text.disp, 
1346                                 rec_md->data.text.disp))
1347                         break;
1348                 if (!mnode)
1349                 {
1350                     rec_md->next = *wheretoput;
1351                     *wheretoput = rec_md;
1352                 }
1353             }
1354             else if (ser_md->merge == Metadata_merge_longest)
1355             {
1356                 if (!*wheretoput 
1357                     || strlen(rec_md->data.text.disp) 
1358                     > strlen((*wheretoput)->data.text.disp))
1359                 {
1360                     *wheretoput = rec_md;
1361                     if (ser_sk)
1362                     {
1363                         const char *sort_str = 0;
1364                         int skip_article = 
1365                             ser_sk->type == Metadata_sortkey_skiparticle;
1366
1367                         if (!cluster->sortkeys[sk_field_id])
1368                             cluster->sortkeys[sk_field_id] = 
1369                                 nmem_malloc(se->nmem, 
1370                                             sizeof(union data_types));
1371                          
1372                         prt = pp2_relevance_tokenize(service->sort_pct);
1373
1374                         pp2_relevance_first(prt, rec_md->data.text.disp,
1375                                             skip_article);
1376
1377                         pp2_relevance_token_next(prt);
1378                          
1379                         sort_str = pp2_get_sort(prt);
1380                          
1381                         cluster->sortkeys[sk_field_id]->text.disp = 
1382                             rec_md->data.text.disp;
1383                         if (!sort_str)
1384                         {
1385                             sort_str = rec_md->data.text.disp;
1386                             session_log(se, YLOG_WARN, 
1387                                     "Could not make sortkey. Bug #1858");
1388                         }
1389                         cluster->sortkeys[sk_field_id]->text.sort = 
1390                             nmem_strdup(se->nmem, sort_str);
1391                         pp2_relevance_token_destroy(prt);
1392                     }
1393                 }
1394             }
1395             else if (ser_md->merge == Metadata_merge_all)
1396             {
1397                 rec_md->next = *wheretoput;
1398                 *wheretoput = rec_md;
1399             }
1400             else if (ser_md->merge == Metadata_merge_range)
1401             {
1402                 if (!*wheretoput)
1403                 {
1404                     *wheretoput = rec_md;
1405                     if (ser_sk)
1406                         cluster->sortkeys[sk_field_id] 
1407                             = &rec_md->data;
1408                 }
1409                 else
1410                 {
1411                     int this_min = rec_md->data.number.min;
1412                     int this_max = rec_md->data.number.max;
1413                     if (this_min < (*wheretoput)->data.number.min)
1414                         (*wheretoput)->data.number.min = this_min;
1415                     if (this_max > (*wheretoput)->data.number.max)
1416                         (*wheretoput)->data.number.max = this_max;
1417                 }
1418             }
1419
1420
1421             // ranking of _all_ fields enabled ... 
1422             if (ser_md->rank)
1423                 relevance_countwords(se->relevance, cluster, 
1424                                      (char *) value, ser_md->rank,
1425                                      ser_md->name);
1426
1427             // construct facets ... unless the client already has reported them
1428             if (ser_md->termlist && !client_has_facet(cl, (char *) type))
1429             {
1430
1431                 if (ser_md->type == Metadata_type_year)
1432                 {
1433                     char year[64];
1434                     sprintf(year, "%d", rec_md->data.number.max);
1435                     add_facet(se, (char *) type, year, 1);
1436                     if (rec_md->data.number.max != rec_md->data.number.min)
1437                     {
1438                         sprintf(year, "%d", rec_md->data.number.min);
1439                         add_facet(se, (char *) type, year, 1);
1440                     }
1441                 }
1442                 else
1443                     add_facet(se, (char *) type, (char *) value, 1);
1444             }
1445
1446             // cleaning up
1447             xmlFree(type);
1448             xmlFree(value);
1449             type = value = 0;
1450         }
1451         else
1452         {
1453             if (se->number_of_warnings_unknown_elements == 0)
1454                 session_log(se, YLOG_WARN,
1455                         "Unexpected element in internal record: %s", n->name);
1456             se->number_of_warnings_unknown_elements++;
1457         }
1458     }
1459     if (type)
1460         xmlFree(type);
1461     if (value)
1462         xmlFree(value);
1463
1464     relevance_donerecord(se->relevance, cluster);
1465     se->total_records++;
1466
1467     return 0;
1468 }
1469
1470 void session_log(struct session *s, int level, const char *fmt, ...)
1471 {
1472     char buf[1024];
1473     va_list ap;
1474     va_start(ap, fmt);
1475
1476     yaz_vsnprintf(buf, sizeof(buf)-30, fmt, ap);
1477     yaz_log(level, "%u %s", s->session_id, buf);
1478
1479     va_end(ap);
1480 }
1481
1482 /*
1483  * Local variables:
1484  * c-basic-offset: 4
1485  * c-file-style: "Stroustrup"
1486  * indent-tabs-mode: nil
1487  * End:
1488  * vim: shiftwidth=4 tabstop=8 expandtab
1489  */
1490