325ba288cf8bba00352af1dda9ef65f03a8af8c1
[pazpar2-moved-to-github.git] / src / session.c
1 /* This file is part of Pazpar2.
2    Copyright (C) Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file session.c
21     \brief high-level logic; mostly user sessions and settings
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <time.h>
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <string.h>
32 #if HAVE_SYS_TIME_H
33 #include <sys/time.h>
34 #endif
35 #if HAVE_UNISTD_H
36 #include <unistd.h>
37 #endif
38 #ifdef WIN32
39 #include <windows.h>
40 #endif
41 #include <signal.h>
42 #include <ctype.h>
43 #include <assert.h>
44 #include <math.h>
45
46 #include <yaz/marcdisp.h>
47 #include <yaz/comstack.h>
48 #include <yaz/tcpip.h>
49 #include <yaz/proto.h>
50 #include <yaz/readconf.h>
51 #include <yaz/pquery.h>
52 #include <yaz/otherinfo.h>
53 #include <yaz/yaz-util.h>
54 #include <yaz/nmem.h>
55 #include <yaz/query-charset.h>
56 #include <yaz/querytowrbuf.h>
57 #include <yaz/oid_db.h>
58 #include <yaz/snprintf.h>
59
60 #define USE_TIMING 0
61 #if USE_TIMING
62 #include <yaz/timing.h>
63 #endif
64
65 #include "ppmutex.h"
66 #include "parameters.h"
67 #include "session.h"
68 #include "eventl.h"
69 #include "http.h"
70 #include "termlists.h"
71 #include "reclists.h"
72 #include "relevance.h"
73 #include "database.h"
74 #include "client.h"
75 #include "settings.h"
76 #include "normalize7bit.h"
77
78 #include <libxml/tree.h>
79
80 #define MAX_CHUNK 15
81
82 #define MAX(a,b) ((a)>(b)?(a):(b))
83
84 // Note: Some things in this structure will eventually move to configuration
85 struct parameters global_parameters =
86 {
87     0,   // dump_records
88     0,   // debug_mode
89     0,   // predictable sessions
90 };
91
92 struct client_list {
93     struct client *client;
94     struct client_list *next;
95 };
96
97 /* session counting (1) , disable client counting (0) */
98 static YAZ_MUTEX g_session_mutex = 0;
99 static int no_sessions = 0;
100
101 static int session_use(int delta)
102 {
103     int sessions;
104     if (!g_session_mutex)
105         yaz_mutex_create(&g_session_mutex);
106     yaz_mutex_enter(g_session_mutex);
107     no_sessions += delta;
108     sessions = no_sessions;
109     yaz_mutex_leave(g_session_mutex);
110     yaz_log(YLOG_DEBUG, "%s sessions=%d", delta == 0 ? "" :
111             (delta > 0 ? "INC" : "DEC"), no_sessions);
112     return sessions;
113 }
114
115 int sessions_count(void)
116 {
117     return session_use(0);
118 }
119
120 static void log_xml_doc(xmlDoc *doc)
121 {
122     FILE *lf = yaz_log_file();
123     xmlChar *result = 0;
124     int len = 0;
125 #if LIBXML_VERSION >= 20600
126     xmlDocDumpFormatMemory(doc, &result, &len, 1);
127 #else
128     xmlDocDumpMemory(doc, &result, &len);
129 #endif
130     if (lf && len)
131     {
132         (void) fwrite(result, 1, len, lf);
133         fprintf(lf, "\n");
134     }
135     xmlFree(result);
136 }
137
138 static void session_enter(struct session *s, const char *caller)
139 {
140     if (caller)
141         session_log(s, YLOG_DEBUG, "Session lock by %s", caller);
142     yaz_mutex_enter(s->session_mutex);
143 }
144
145 static void session_leave(struct session *s, const char *caller)
146 {
147     yaz_mutex_leave(s->session_mutex);
148     if (caller)
149         session_log(s, YLOG_DEBUG, "Session unlock by %s", caller);
150 }
151
152 static void session_normalize_facet(struct session *s,
153                                     const char *type, const char *value,
154                                     WRBUF display_wrbuf, WRBUF facet_wrbuf)
155 {
156     struct conf_service *service = s->service;
157     pp2_charset_token_t prt;
158     const char *facet_component;
159     int i;
160     const char *icu_chain_id = 0;
161
162     for (i = 0; i < service->num_metadata; i++)
163         if (!strcmp((service->metadata + i)->name, type))
164             icu_chain_id = (service->metadata + i)->facetrule;
165     if (!icu_chain_id)
166         icu_chain_id = "facet";
167     prt = pp2_charset_token_create(service->charsets, icu_chain_id);
168     if (!prt)
169     {
170         session_log(s, YLOG_FATAL,
171                     "Unknown ICU chain '%s' for facet of type '%s'",
172                 icu_chain_id, type);
173         wrbuf_destroy(facet_wrbuf);
174         wrbuf_destroy(display_wrbuf);
175         return;
176     }
177     pp2_charset_token_first(prt, value, 0);
178     while ((facet_component = pp2_charset_token_next(prt)))
179     {
180         const char *display_component;
181         if (*facet_component)
182         {
183             if (wrbuf_len(facet_wrbuf))
184                 wrbuf_puts(facet_wrbuf, " ");
185             wrbuf_puts(facet_wrbuf, facet_component);
186         }
187         display_component = pp2_get_display(prt);
188         if (display_component)
189         {
190             if (wrbuf_len(display_wrbuf))
191                 wrbuf_puts(display_wrbuf, " ");
192             wrbuf_puts(display_wrbuf, display_component);
193         }
194     }
195     pp2_charset_token_destroy(prt);
196 }
197
198 void add_facet(struct session *s, const char *type, const char *value, int count)
199 {
200     WRBUF facet_wrbuf = wrbuf_alloc();
201     WRBUF display_wrbuf = wrbuf_alloc();
202
203     session_normalize_facet(s, type, value, display_wrbuf, facet_wrbuf);
204
205     if (wrbuf_len(facet_wrbuf))
206     {
207         struct named_termlist **tp = &s->termlists;
208         for (; (*tp); tp = &(*tp)->next)
209             if (!strcmp((*tp)->name, type))
210                 break;
211         if (!*tp)
212         {
213             *tp = nmem_malloc(s->nmem, sizeof(**tp));
214             (*tp)->name = nmem_strdup(s->nmem, type);
215             (*tp)->termlist = termlist_create(s->nmem);
216             (*tp)->next = 0;
217         }
218         termlist_insert((*tp)->termlist, wrbuf_cstr(display_wrbuf),
219                         wrbuf_cstr(facet_wrbuf), count);
220     }
221     wrbuf_destroy(facet_wrbuf);
222     wrbuf_destroy(display_wrbuf);
223 }
224
225 static xmlDoc *record_to_xml(struct session *se,
226                              struct session_database *sdb, const char *rec)
227 {
228     struct database *db = sdb->database;
229     xmlDoc *rdoc = 0;
230
231     rdoc = xmlParseMemory(rec, strlen(rec));
232
233     if (!rdoc)
234     {
235         session_log(se, YLOG_WARN, "Non-wellformed XML");
236         return 0;
237     }
238
239     if (global_parameters.dump_records)
240     {
241         session_log(se, YLOG_LOG, "Un-normalized record from %s", db->id);
242         log_xml_doc(rdoc);
243     }
244
245     return rdoc;
246 }
247
248 #define MAX_XSLT_ARGS 16
249
250 // Add static values from session database settings if applicable
251 static void insert_settings_parameters(struct session_database *sdb,
252                                        struct conf_service *service,
253                                        char **parms,
254                                        NMEM nmem)
255 {
256     int i;
257     int nparms = 0;
258     int offset = 0;
259
260     for (i = 0; i < service->num_metadata; i++)
261     {
262         struct conf_metadata *md = &service->metadata[i];
263         int setting;
264
265         if (md->setting == Metadata_setting_parameter &&
266             (setting = settings_lookup_offset(service, md->name)) >= 0)
267         {
268             const char *val = session_setting_oneval(sdb, setting);
269             if (val && nparms < MAX_XSLT_ARGS)
270             {
271                 char *buf;
272                 int len = strlen(val);
273                 buf = nmem_malloc(nmem, len + 3);
274                 buf[0] = '\'';
275                 strcpy(buf + 1, val);
276                 buf[len+1] = '\'';
277                 buf[len+2] = '\0';
278                 parms[offset++] = md->name;
279                 parms[offset++] = buf;
280                 nparms++;
281             }
282         }
283     }
284     parms[offset] = 0;
285 }
286
287 // Add static values from session database settings if applicable
288 static void insert_settings_values(struct session_database *sdb, xmlDoc *doc,
289                                    xmlNode *root,
290                                    struct conf_service *service)
291 {
292     int i;
293
294     for (i = 0; i < service->num_metadata; i++)
295     {
296         struct conf_metadata *md = &service->metadata[i];
297         int offset;
298
299         if (md->setting == Metadata_setting_postproc &&
300             (offset = settings_lookup_offset(service, md->name)) >= 0)
301         {
302             const char *val = session_setting_oneval(sdb, offset);
303             if (val)
304             {
305                 xmlNode *n = xmlNewTextChild(root, 0, (xmlChar *) "metadata",
306                                              (xmlChar *) val);
307                 xmlSetProp(n, (xmlChar *) "type", (xmlChar *) md->name);
308             }
309         }
310     }
311 }
312
313 static xmlDoc *normalize_record(struct session *se,
314                                 struct session_database *sdb,
315                                 struct conf_service *service,
316                                 const char *rec, NMEM nmem)
317 {
318     xmlDoc *rdoc = record_to_xml(se, sdb, rec);
319
320     if (rdoc)
321     {
322         char *parms[MAX_XSLT_ARGS*2+1];
323
324         insert_settings_parameters(sdb, service, parms, nmem);
325
326         if (normalize_record_transform(sdb->map, &rdoc, (const char **)parms))
327         {
328             session_log(se, YLOG_WARN, "Normalize failed");
329         }
330     }
331     return rdoc;
332 }
333
334 void session_settings_dump(struct session *se,
335                            struct session_database *db,
336                            WRBUF w)
337 {
338     if (db->settings)
339     {
340         int i, num = db->num_settings;
341         for (i = 0; i < num; i++)
342         {
343             struct setting *s = db->settings[i];
344             for (;s ; s = s->next)
345             {
346                 wrbuf_puts(w, "<set name=\"");
347                 wrbuf_xmlputs(w, s->name);
348                 wrbuf_puts(w, "\" value=\"");
349                 wrbuf_xmlputs(w, s->value);
350                 wrbuf_puts(w, "\"/>");
351             }
352             if (db->settings[i])
353                 wrbuf_puts(w, "\n");
354         }
355     }
356 }
357
358 // Retrieve first defined value for 'name' for given database.
359 // Will be extended to take into account user associated with session
360 const char *session_setting_oneval(struct session_database *db, int offset)
361 {
362     if (offset >= db->num_settings || !db->settings[offset])
363         return "";
364     return db->settings[offset]->value;
365 }
366
367 // Prepare XSLT stylesheets for record normalization
368 // Structures are allocated on the session_wide nmem to avoid having
369 // to recompute this for every search. This would lead
370 // to leaking if a single session was to repeatedly change the PZ_XSLT
371 // setting. However, this is not a realistic use scenario.
372 static int prepare_map(struct session *se, struct session_database *sdb)
373 {
374     if (sdb->settings && !sdb->map)
375     {
376         const char *s;
377
378         if (sdb->settings[PZ_XSLT] &&
379             (s = session_setting_oneval(sdb, PZ_XSLT)))
380         {
381             char auto_stylesheet[256];
382
383             if (!strcmp(s, "auto"))
384             {
385                 const char *request_syntax = session_setting_oneval(
386                     sdb, PZ_REQUESTSYNTAX);
387                 if (request_syntax)
388                 {
389                     char *cp;
390                     yaz_snprintf(auto_stylesheet, sizeof(auto_stylesheet),
391                                  "%s.xsl", request_syntax);
392                     for (cp = auto_stylesheet; *cp; cp++)
393                     {
394                         /* deliberately only consider ASCII */
395                         if (*cp > 32 && *cp < 127)
396                             *cp = tolower(*cp);
397                     }
398                     s = auto_stylesheet;
399                 }
400                 else
401                 {
402                     session_log(se, YLOG_WARN,
403                                 "No pz:requestsyntax for auto stylesheet");
404                 }
405             }
406             sdb->map = normalize_cache_get(se->normalize_cache,
407                                            se->service, s);
408             if (!sdb->map)
409                 return -1;
410         }
411     }
412     return 0;
413 }
414
415 // called if watch should be removed because http_channel is to be destroyed
416 static void session_watch_cancel(void *data, struct http_channel *c,
417                                  void *data2)
418 {
419     struct session_watchentry *ent = data;
420
421     ent->fun = 0;
422     ent->data = 0;
423     ent->obs = 0;
424 }
425
426 // set watch. Returns 0=OK, -1 if watch is already set
427 int session_set_watch(struct session *s, int what,
428                       session_watchfun fun, void *data,
429                       struct http_channel *chan)
430 {
431     int ret;
432     session_enter(s, "session_set_watch");
433     if (s->watchlist[what].fun)
434         ret = -1;
435     else
436     {
437
438         s->watchlist[what].fun = fun;
439         s->watchlist[what].data = data;
440         s->watchlist[what].obs = http_add_observer(chan, &s->watchlist[what],
441                                                    session_watch_cancel);
442         ret = 0;
443     }
444     session_leave(s, "session_set_watch");
445     return ret;
446 }
447
448 void session_alert_watch(struct session *s, int what)
449 {
450     assert(s);
451     session_enter(s, "session_alert_watch");
452     if (s->watchlist[what].fun)
453     {
454         /* our watch is no longer associated with http_channel */
455         void *data;
456         session_watchfun fun;
457
458         http_remove_observer(s->watchlist[what].obs);
459         fun  = s->watchlist[what].fun;
460         data = s->watchlist[what].data;
461
462         /* reset watch before fun is invoked - in case fun wants to set
463            it again */
464         s->watchlist[what].fun = 0;
465         s->watchlist[what].data = 0;
466         s->watchlist[what].obs = 0;
467
468         session_leave(s, "session_alert_watch");
469         session_log(s, YLOG_DEBUG,
470                     "Alert Watch: %d calling function: %p", what, fun);
471         fun(data);
472     }
473     else
474         session_leave(s,"session_alert_watch");
475 }
476
477 //callback for grep_databases
478 static void select_targets_callback(struct session *se,
479                                     struct session_database *db)
480 {
481     struct client *cl;
482     struct client_list *l;
483
484     for (l = se->clients_cached; l; l = l->next)
485         if (client_get_database(l->client) == db)
486             break;
487
488     if (l)
489         cl = l->client;
490     else
491     {
492         cl = client_create(db->database->id);
493         client_set_database(cl, db);
494
495         l = xmalloc(sizeof(*l));
496         l->client = cl;
497         l->next = se->clients_cached;
498         se->clients_cached = l;
499     }
500     /* set session always. If may be 0 if client is not active */
501     client_set_session(cl, se);
502
503     l = xmalloc(sizeof(*l));
504     l->client = cl;
505     l->next = se->clients_active;
506     se->clients_active = l;
507 }
508
509 static void session_reset_active_clients(struct session *se,
510                                          struct client_list *new_list)
511 {
512     struct client_list *l;
513
514     session_enter(se, "session_reset_active_clients");
515     l = se->clients_active;
516     se->clients_active = new_list;
517     session_leave(se, "session_reset_active_clients");
518
519     while (l)
520     {
521         struct client_list *l_next = l->next;
522
523         client_lock(l->client);
524         client_set_session(l->client, 0); /* mark client inactive */
525         client_unlock(l->client);
526
527         xfree(l);
528         l = l_next;
529     }
530 }
531
532 static void session_remove_cached_clients(struct session *se)
533 {
534     struct client_list *l;
535
536     session_reset_active_clients(se, 0);
537
538     session_enter(se, "session_remove_cached_clients");
539     l = se->clients_cached;
540     se->clients_cached = 0;
541     session_leave(se, "session_remove_cached_clients");
542
543     while (l)
544     {
545         struct client_list *l_next = l->next;
546         client_lock(l->client);
547         client_set_session(l->client, 0);
548         client_set_database(l->client, 0);
549         client_unlock(l->client);
550         client_destroy(l->client);
551         xfree(l);
552         l = l_next;
553     }
554 }
555
556 // Associates a set of clients with a session;
557 // Note: Session-databases represent databases with per-session
558 // setting overrides
559 static int select_targets(struct session *se, const char *filter)
560 {
561     return session_grep_databases(se, filter, select_targets_callback);
562 }
563
564 int session_active_clients(struct session *s)
565 {
566     struct client_list *l;
567     int res = 0;
568
569     for (l = s->clients_active; l; l = l->next)
570         if (client_is_active(l->client))
571             res++;
572
573     return res;
574 }
575
576 int session_is_preferred_clients_ready(struct session *s)
577 {
578     struct client_list *l;
579     int res = 0;
580
581     for (l = s->clients_active; l; l = l->next)
582         if (client_is_active_preferred(l->client))
583             res++;
584     session_log(s, YLOG_DEBUG, "Has %d active preferred clients.", res);
585     return res == 0;
586 }
587
588 static void session_clear_set(struct session *se, struct reclist_sortparms *sp)
589 {
590     reclist_destroy(se->reclist);
591     if (nmem_total(se->nmem))
592         session_log(se, YLOG_DEBUG, "NMEN operation usage %zd",
593                     nmem_total(se->nmem));
594     nmem_reset(se->nmem);
595     se->total_records = se->total_merged = 0;
596     se->termlists = 0;
597     relevance_clear(se->relevance);
598
599     /* reset list of sorted results and clear to relevance search */
600     se->sorted_results = nmem_malloc(se->nmem, sizeof(*se->sorted_results));
601     se->sorted_results->name = nmem_strdup(se->nmem, sp->name);
602     se->sorted_results->increasing = sp->increasing;
603     se->sorted_results->type = sp->type;
604     se->sorted_results->next = 0;
605
606     session_log(se, YLOG_DEBUG, "clear_set session_sort: field=%s increasing=%d type=%d configured",
607             sp->name, sp->increasing, sp->type);
608
609     se->reclist = reclist_create(se->nmem);
610 }
611
612 void session_sort(struct session *se, struct reclist_sortparms *sp,
613                   const char *mergekey, const char *rank)
614 {
615     struct client_list *l;
616     const char *field = sp->name;
617     int increasing = sp->increasing;
618     int type  = sp->type;
619     int clients_research = 0;
620
621     session_enter(se, "session_sort");
622     session_log(se, YLOG_DEBUG, "session_sort field=%s increasing=%d type=%d",
623                 field, increasing, type);
624
625     if (rank && (!se->rank || strcmp(se->rank, rank)))
626     {
627         /* new rank must research/reingest anyway */
628         assert(rank);
629         xfree(se->rank);
630         se->rank = *rank ? xstrdup(rank) : 0;
631         clients_research = 1;
632         session_log(se, YLOG_DEBUG, "session_sort: new rank = %s",
633                     rank);
634     }
635     if (mergekey && (!se->mergekey || strcmp(se->mergekey, mergekey)))
636     {
637         /* new mergekey must research/reingest anyway */
638         assert(mergekey);
639         xfree(se->mergekey);
640         se->mergekey = *mergekey ? xstrdup(mergekey) : 0;
641         clients_research = 1;
642         session_log(se, YLOG_DEBUG, "session_sort: new mergekey = %s",
643                     mergekey);
644     }
645     if (clients_research == 0)
646     {
647         struct reclist_sortparms *sr;
648         for (sr = se->sorted_results; sr; sr = sr->next)
649             if (!reclist_sortparms_cmp(sr, sp))
650                 break;
651         if (sr)
652         {
653             session_log(se, YLOG_DEBUG, "session_sort: field=%s increasing=%d type=%d already fetched",
654                         field, increasing, type);
655             session_leave(se, "session_sort");
656             return;
657         }
658     }
659     session_log(se, YLOG_DEBUG, "session_sort: field=%s increasing=%d type=%d must fetch",
660                 field, increasing, type);
661
662     // We need to reset reclist on every sort that changes the records, not just for position
663     // So if just one client requires new searching, we need to clear set.
664     // Ask each of the client if sorting requires re-search due to native sort
665     // If it does it will require us to
666     for (l = se->clients_active; l; l = l->next)
667     {
668         struct client *cl = l->client;
669         // Assume no re-search is required.
670         client_parse_init(cl, 1);
671         clients_research += client_parse_sort(cl, sp);
672     }
673     if (!clients_research || se->clients_starting)
674     {
675         // A new sorting based on same record set
676         struct reclist_sortparms *sr = nmem_malloc(se->nmem, sizeof(*sr));
677         sr->name = nmem_strdup(se->nmem, field);
678         sr->increasing = increasing;
679         sr->type = type;
680         sr->next = se->sorted_results;
681         se->sorted_results = sr;
682         session_log(se, YLOG_DEBUG, "session_sort: no research/ingesting done");
683         session_leave(se, "session_sort");
684     }
685     else
686     {
687         se->clients_starting = 1;
688         session_log(se, YLOG_DEBUG,
689                     "session_sort: reset results due to %d clients researching",
690                     clients_research);
691         session_clear_set(se, sp);
692         session_log(se, YLOG_DEBUG, "Re- search/ingesting for clients due to change in sort order");
693
694         session_leave(se, "session_sort");
695         for (l = se->clients_active; l; l = l->next)
696         {
697             struct client *cl = l->client;
698             if (client_get_state(cl) == Client_Connecting ||
699                 client_get_state(cl) == Client_Idle ||
700                 client_get_state(cl) == Client_Working) {
701                 client_start_search(cl);
702             }
703             else
704             {
705                 session_log(se, YLOG_DEBUG,
706                             "session_sort: %s: No re-start/ingest in show. "
707                             "Wrong client state: %d",
708                             client_get_id(cl), client_get_state(cl));
709             }
710         }
711         session_enter(se, "session_sort");
712         se->clients_starting = 0;
713         session_leave(se, "session_sort");
714     }
715 }
716
717 void session_stop(struct session *se)
718 {
719     struct client_list *l;
720     session_enter(se, "session_stop1");
721     if (se->clients_starting)
722     {
723         session_leave(se, "session_stop1");
724         return;
725     }
726     se->clients_starting = 1;
727     session_leave(se, "session_stop1");
728
729     session_alert_watch(se, SESSION_WATCH_SHOW);
730     session_alert_watch(se, SESSION_WATCH_BYTARGET);
731     session_alert_watch(se, SESSION_WATCH_TERMLIST);
732     session_alert_watch(se, SESSION_WATCH_SHOW_PREF);
733
734     for (l = se->clients_active; l; l = l->next)
735     {
736         struct client *cl = l->client;
737         client_stop(cl);
738     }
739     session_enter(se, "session_stop2");
740     se->clients_starting = 0;
741     session_leave(se, "session_stop2");
742 }
743
744 enum pazpar2_error_code session_search(struct session *se,
745                                        const char *query,
746                                        const char *startrecs,
747                                        const char *maxrecs,
748                                        const char *filter,
749                                        const char *limit,
750                                        const char **addinfo,
751                                        const char **addinfo2,
752                                        struct reclist_sortparms *sp,
753                                        const char *mergekey,
754                                        const char *rank)
755 {
756     int live_channels = 0;
757     int no_working = 0;
758     int no_failed_query = 0;
759     int no_failed_limit = 0;
760     struct client_list *l;
761
762     session_log(se, YLOG_DEBUG, "Search");
763
764     *addinfo = 0;
765
766     session_enter(se, "session_search0");
767     if (se->clients_starting)
768     {
769         session_leave(se, "session_search0");
770         return PAZPAR2_NO_ERROR;
771     }
772     se->clients_starting = 1;
773     session_leave(se, "session_search0");
774
775     if (se->settings_modified) {
776         session_remove_cached_clients(se);
777     }
778     else
779         session_reset_active_clients(se, 0);
780
781     session_enter(se, "session_search");
782     se->settings_modified = 0;
783
784     if (mergekey)
785     {
786         xfree(se->mergekey);
787         se->mergekey = *mergekey ? xstrdup(mergekey) : 0;
788     }
789     if (rank)
790     {
791         xfree(se->rank);
792         se->rank = *rank ? xstrdup(rank) : 0;
793     }
794
795     session_clear_set(se, sp);
796     relevance_destroy(&se->relevance);
797
798     live_channels = select_targets(se, filter);
799     if (!live_channels)
800     {
801         session_leave(se, "session_search");
802         se->clients_starting = 0;
803         return PAZPAR2_NO_TARGETS;
804     }
805
806     facet_limits_destroy(se->facet_limits);
807     se->facet_limits = facet_limits_create(limit);
808     if (!se->facet_limits)
809     {
810         *addinfo = "limit";
811         session_leave(se, "session_search");
812         se->clients_starting = 0;
813         return PAZPAR2_MALFORMED_PARAMETER_VALUE;
814     }
815
816     session_leave(se, "session_search");
817
818     session_alert_watch(se, SESSION_WATCH_SHOW);
819     session_alert_watch(se, SESSION_WATCH_BYTARGET);
820     session_alert_watch(se, SESSION_WATCH_TERMLIST);
821     session_alert_watch(se, SESSION_WATCH_SHOW_PREF);
822
823     for (l = se->clients_active; l; l = l->next)
824     {
825         int parse_ret;
826         struct client *cl = l->client;
827         client_parse_init(cl, 1);
828         if (prepare_map(se, client_get_database(cl)) < 0)
829             continue;
830
831         parse_ret = client_parse_query(cl, query, se->facet_limits, addinfo2);
832         if (parse_ret == -1)
833             no_failed_query++;
834         else if (parse_ret == -2)
835             no_failed_limit++;
836         else if (parse_ret < 0)
837             no_working++; /* other error, such as bad CCL map */
838         else
839         {
840             client_parse_range(cl, startrecs, maxrecs);
841             client_parse_sort(cl, sp);
842             client_start_search(cl);
843             no_working++;
844         }
845     }
846     session_enter(se, "session_search2");
847     se->clients_starting = 0;
848     session_leave(se, "session_search2");
849     if (no_working == 0)
850     {
851         if (no_failed_query > 0)
852         {
853             *addinfo = "query";
854             return PAZPAR2_MALFORMED_PARAMETER_VALUE;
855         }
856         else if (no_failed_limit > 0)
857         {
858             *addinfo = "limit";
859             return PAZPAR2_MALFORMED_PARAMETER_VALUE;
860         }
861         else
862             return PAZPAR2_NO_TARGETS;
863     }
864     return PAZPAR2_NO_ERROR;
865 }
866
867 // Creates a new session_database object for a database
868 static void session_init_databases_fun(void *context, struct database *db)
869 {
870     struct session *se = (struct session *) context;
871     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
872     int i;
873
874     new->database = db;
875
876     new->map = 0;
877     assert(db->settings);
878     new->settings = nmem_malloc(se->session_nmem,
879                                 sizeof(struct settings *) * db->num_settings);
880     new->num_settings = db->num_settings;
881     for (i = 0; i < db->num_settings; i++)
882     {
883         struct setting *setting = db->settings[i];
884         new->settings[i] = setting;
885     }
886     new->next = se->databases;
887     se->databases = new;
888 }
889
890 // Doesn't free memory associated with sdb -- nmem takes care of that
891 static void session_database_destroy(struct session_database *sdb)
892 {
893     sdb->map = 0;
894 }
895
896 // Initialize session_database list -- this represents this session's view
897 // of the database list -- subject to modification by the settings ws command
898 void session_init_databases(struct session *se)
899 {
900     se->databases = 0;
901     predef_grep_databases(se, se->service, session_init_databases_fun);
902 }
903
904 // Probably session_init_databases_fun should be refactored instead of
905 // called here.
906 static struct session_database *load_session_database(struct session *se,
907                                                       const char *id)
908 {
909     struct database *db = new_database_inherit_settings(id, se->session_nmem, se->service->settings);
910     session_init_databases_fun((void*) se, db);
911
912     // New sdb is head of se->databases list
913     return se->databases;
914 }
915
916 // Find an existing session database. If not found, load it
917 static struct session_database *find_session_database(struct session *se,
918                                                       const char *id)
919 {
920     struct session_database *sdb;
921
922     for (sdb = se->databases; sdb; sdb = sdb->next)
923         if (!strcmp(sdb->database->id, id))
924             return sdb;
925     return load_session_database(se, id);
926 }
927
928 // Apply a session override to a database
929 void session_apply_setting(struct session *se, const char *dbname,
930                            const char *name, const char *value)
931 {
932     session_enter(se, "session_apply_setting");
933     {
934         struct session_database *sdb = find_session_database(se, dbname);
935         struct conf_service *service = se->service;
936         struct setting *s;
937         int offset = settings_create_offset(service, name);
938
939         expand_settings_array(&sdb->settings, &sdb->num_settings, offset,
940                               se->session_nmem);
941         // Force later recompute of settings-driven data structures
942         // (happens when a search starts and client connections are prepared)
943         if (offset == PZ_XSLT)
944             sdb->map = 0;
945         se->settings_modified = 1;
946         for (s = sdb->settings[offset]; s; s = s->next)
947             if (!strcmp(s->name, name) &&
948                 dbname && s->target && !strcmp(dbname, s->target))
949                 break;
950         if (!s)
951         {
952             s = nmem_malloc(se->session_nmem, sizeof(*s));
953             s->precedence = 0;
954             s->target = nmem_strdup(se->session_nmem, dbname);
955             s->name = nmem_strdup(se->session_nmem, name);
956             s->next = sdb->settings[offset];
957             sdb->settings[offset] = s;
958         }
959         s->value = nmem_strdup(se->session_nmem, value);
960     }
961     session_leave(se, "session_apply_setting");
962 }
963
964 void session_destroy(struct session *se)
965 {
966     struct session_database *sdb;
967     session_log(se, YLOG_LOG, "destroy");
968     session_use(-1);
969     session_remove_cached_clients(se);
970
971     for (sdb = se->databases; sdb; sdb = sdb->next)
972         session_database_destroy(sdb);
973     normalize_cache_destroy(se->normalize_cache);
974     relevance_destroy(&se->relevance);
975     reclist_destroy(se->reclist);
976     xfree(se->mergekey);
977     xfree(se->rank);
978     if (nmem_total(se->nmem))
979         session_log(se, YLOG_DEBUG, "NMEN operation usage %zd", nmem_total(se->nmem));
980     if (nmem_total(se->session_nmem))
981         session_log(se, YLOG_DEBUG, "NMEN session usage %zd", nmem_total(se->session_nmem));
982     facet_limits_destroy(se->facet_limits);
983     nmem_destroy(se->nmem);
984     service_destroy(se->service);
985     yaz_mutex_destroy(&se->session_mutex);
986 }
987
988 size_t session_get_memory_status(struct session *session) {
989     size_t session_nmem;
990     if (session == 0)
991         return 0;
992     session_enter(session, "session_get_memory_status");
993     session_nmem = nmem_total(session->nmem);
994     session_leave(session, "session_get_memory_status");
995     return session_nmem;
996 }
997
998
999 struct session *new_session(NMEM nmem, struct conf_service *service,
1000                             unsigned session_id)
1001 {
1002     int i;
1003     struct session *session = nmem_malloc(nmem, sizeof(*session));
1004
1005     char tmp_str[50];
1006
1007     sprintf(tmp_str, "session#%u", session_id);
1008
1009     session->session_id = session_id;
1010     session_log(session, YLOG_DEBUG, "New");
1011     session->service = service;
1012     session->relevance = 0;
1013     session->total_records = 0;
1014     session->number_of_warnings_unknown_elements = 0;
1015     session->number_of_warnings_unknown_metadata = 0;
1016     session->termlists = 0;
1017     session->reclist = reclist_create(nmem);
1018     session->clients_active = 0;
1019     session->clients_cached = 0;
1020     session->settings_modified = 0;
1021     session->session_nmem = nmem;
1022     session->nmem = nmem_create();
1023     session->databases = 0;
1024     session->sorted_results = 0;
1025     session->facet_limits = 0;
1026     session->mergekey = 0;
1027     session->rank = 0;
1028     session->clients_starting = 0;
1029
1030     for (i = 0; i <= SESSION_WATCH_MAX; i++)
1031     {
1032         session->watchlist[i].data = 0;
1033         session->watchlist[i].fun = 0;
1034     }
1035     session->normalize_cache = normalize_cache_create();
1036     session->session_mutex = 0;
1037     pazpar2_mutex_create(&session->session_mutex, tmp_str);
1038     session_log(session, YLOG_LOG, "create");
1039
1040     session_use(1);
1041     return session;
1042 }
1043
1044 const char * client_get_suggestions_xml(struct client *cl, WRBUF wrbuf);
1045
1046 static struct hitsbytarget *hitsbytarget_nb(struct session *se,
1047                                             int *count, NMEM nmem)
1048 {
1049     struct hitsbytarget *res = 0;
1050     struct client_list *l;
1051     size_t sz = 0;
1052
1053     for (l = se->clients_active; l; l = l->next)
1054         sz++;
1055
1056     res = nmem_malloc(nmem, sizeof(*res) * sz);
1057     *count = 0;
1058     for (l = se->clients_active; l; l = l->next)
1059     {
1060         struct client *cl = l->client;
1061         WRBUF w = wrbuf_alloc();
1062         const char *name = session_setting_oneval(client_get_database(cl),
1063                                                   PZ_NAME);
1064         res[*count].id = client_get_id(cl);
1065         res[*count].name = *name ? name : "Unknown";
1066         res[*count].hits = client_get_hits(cl);
1067         res[*count].approximation = client_get_approximation(cl);
1068         res[*count].records = client_get_num_records(cl,
1069                                                      &res[*count].filtered,
1070                                                      0, 0);
1071         res[*count].diagnostic =
1072             client_get_diagnostic(cl, &res[*count].message,
1073                                   &res[*count].addinfo);
1074         res[*count].state = client_get_state_str(cl);
1075         res[*count].connected  = client_get_connection(cl) ? 1 : 0;
1076         session_settings_dump(se, client_get_database(cl), w);
1077         res[*count].settings_xml = nmem_strdup(nmem, wrbuf_cstr(w));
1078         wrbuf_rewind(w);
1079         wrbuf_puts(w, "");
1080         res[*count].suggestions_xml = nmem_strdup(nmem, client_get_suggestions_xml(cl, w));
1081         wrbuf_destroy(w);
1082         (*count)++;
1083     }
1084     return res;
1085 }
1086
1087 struct hitsbytarget *get_hitsbytarget(struct session *se, int *count, NMEM nmem)
1088 {
1089     struct hitsbytarget *p;
1090     session_enter(se, "get_hitsbytarget");
1091     p = hitsbytarget_nb(se, count, nmem);
1092     session_leave(se, "get_hitsbytarget");
1093     return p;
1094 }
1095
1096 // Compares two hitsbytarget nodes by hitcount
1097 static int cmp_ht(const void *p1, const void *p2)
1098 {
1099     const struct hitsbytarget *h1 = p1;
1100     const struct hitsbytarget *h2 = p2;
1101     return h2->hits - h1->hits;
1102 }
1103
1104 // Compares two hitsbytarget nodes by hitcount
1105 static int cmp_ht_approx(const void *p1, const void *p2)
1106 {
1107     const struct hitsbytarget *h1 = p1;
1108     const struct hitsbytarget *h2 = p2;
1109     return h2->approximation - h1->approximation;
1110 }
1111
1112 static int targets_termlist_nb(WRBUF wrbuf, struct session *se, int num,
1113                                NMEM nmem, int version)
1114 {
1115     struct hitsbytarget *ht;
1116     int count, i;
1117
1118     ht = hitsbytarget_nb(se, &count, nmem);
1119     if (version >= 2)
1120         qsort(ht, count, sizeof(struct hitsbytarget), cmp_ht_approx);
1121     else
1122         qsort(ht, count, sizeof(struct hitsbytarget), cmp_ht);
1123     for (i = 0; i < count && i < num && ht[i].hits > 0; i++)
1124     {
1125
1126         // do only print terms which have display names
1127
1128         wrbuf_puts(wrbuf, "<term>\n");
1129
1130         wrbuf_puts(wrbuf, "<id>");
1131         wrbuf_xmlputs(wrbuf, ht[i].id);
1132         wrbuf_puts(wrbuf, "</id>\n");
1133
1134         wrbuf_puts(wrbuf, "<name>");
1135         if (!ht[i].name || !ht[i].name[0])
1136             wrbuf_xmlputs(wrbuf, "NO TARGET NAME");
1137         else
1138             wrbuf_xmlputs(wrbuf, ht[i].name);
1139         wrbuf_puts(wrbuf, "</name>\n");
1140
1141         wrbuf_printf(wrbuf, "<frequency>" ODR_INT_PRINTF "</frequency>\n",
1142                      ht[i].hits);
1143
1144         if (version >= 2) {
1145             // Should not print if we know it isn't a approximation.
1146             wrbuf_printf(wrbuf, "<approximation>" ODR_INT_PRINTF "</approximation>\n", ht[i].approximation);
1147             wrbuf_printf(wrbuf, "<records>%d</records>\n", ht[i].records - ht[i].filtered);
1148             wrbuf_printf(wrbuf, "<filtered>%d</filtered>\n", ht[i].filtered);
1149         }
1150
1151         wrbuf_puts(wrbuf, "<state>");
1152         wrbuf_xmlputs(wrbuf, ht[i].state);
1153         wrbuf_puts(wrbuf, "</state>\n");
1154
1155         wrbuf_printf(wrbuf, "<diagnostic>%d</diagnostic>\n",
1156                      ht[i].diagnostic);
1157         wrbuf_puts(wrbuf, "</term>\n");
1158     }
1159     return count;
1160 }
1161
1162 void perform_termlist(struct http_channel *c, struct session *se,
1163                       const char *name, int num, int version)
1164 {
1165     int j;
1166     NMEM nmem_tmp = nmem_create();
1167     char **names;
1168     int num_names = 0;
1169
1170     if (!name)
1171         name = "*";
1172
1173     nmem_strsplit(nmem_tmp, ",", name, &names, &num_names);
1174
1175     session_enter(se, "perform_termlist");
1176
1177     for (j = 0; j < num_names; j++)
1178     {
1179         const char *tname;
1180         int must_generate_empty = 1; /* bug 5350 */
1181
1182         struct named_termlist *t = se->termlists;
1183         for (; t; t = t->next)
1184         {
1185             tname = t->name;
1186             if (!strcmp(names[j], tname) || !strcmp(names[j], "*"))
1187             {
1188                 struct termlist_score **p = 0;
1189                 int len;
1190
1191                 wrbuf_puts(c->wrbuf, "<list name=\"");
1192                 wrbuf_xmlputs(c->wrbuf, tname);
1193                 wrbuf_puts(c->wrbuf, "\">\n");
1194                 must_generate_empty = 0;
1195
1196                 p = termlist_highscore(t->termlist, &len, nmem_tmp);
1197                 if (p)
1198                 {
1199                     int i;
1200                     for (i = 0; i < len && i < num; i++)
1201                     {
1202                         // prevent sending empty term elements
1203                         if (!p[i]->display_term || !p[i]->display_term[0])
1204                             continue;
1205
1206                         wrbuf_puts(c->wrbuf, "<term>");
1207                         wrbuf_puts(c->wrbuf, "<name>");
1208                         wrbuf_xmlputs(c->wrbuf, p[i]->display_term);
1209                         wrbuf_puts(c->wrbuf, "</name>");
1210
1211                         wrbuf_printf(c->wrbuf,
1212                                      "<frequency>%d</frequency>",
1213                                      p[i]->frequency);
1214                         wrbuf_puts(c->wrbuf, "</term>\n");
1215                     }
1216                 }
1217                 wrbuf_puts(c->wrbuf, "</list>\n");
1218             }
1219         }
1220         tname = "xtargets";
1221         if (!strcmp(names[j], tname) || !strcmp(names[j], "*"))
1222         {
1223             wrbuf_puts(c->wrbuf, "<list name=\"");
1224             wrbuf_xmlputs(c->wrbuf, tname);
1225             wrbuf_puts(c->wrbuf, "\">\n");
1226
1227             targets_termlist_nb(c->wrbuf, se, num, c->nmem, version);
1228             wrbuf_puts(c->wrbuf, "</list>\n");
1229             must_generate_empty = 0;
1230         }
1231         if (must_generate_empty)
1232         {
1233             wrbuf_puts(c->wrbuf, "<list name=\"");
1234             wrbuf_xmlputs(c->wrbuf, names[j]);
1235             wrbuf_puts(c->wrbuf, "\"/>\n");
1236         }
1237     }
1238     session_leave(se, "perform_termlist");
1239     nmem_destroy(nmem_tmp);
1240 }
1241
1242 #ifdef MISSING_HEADERS
1243 void report_nmem_stats(void)
1244 {
1245     size_t in_use, is_free;
1246
1247     nmem_get_memory_in_use(&in_use);
1248     nmem_get_memory_free(&is_free);
1249
1250     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld",
1251             (long) in_use, (long) is_free);
1252 }
1253 #endif
1254
1255 struct record_cluster *show_single_start(struct session *se, const char *id,
1256                                          struct record_cluster **prev_r,
1257                                          struct record_cluster **next_r)
1258 {
1259     struct record_cluster *r = 0;
1260
1261     session_enter(se, "show_single_start");
1262     *prev_r = 0;
1263     *next_r = 0;
1264     reclist_limit(se->reclist, se, 1);
1265
1266     reclist_enter(se->reclist);
1267     while ((r = reclist_read_record(se->reclist)))
1268     {
1269         if (!strcmp(r->recid, id))
1270         {
1271             *next_r = reclist_read_record(se->reclist);
1272             break;
1273         }
1274         *prev_r = r;
1275     }
1276     reclist_leave(se->reclist);
1277     if (!r)
1278         session_leave(se, "show_single_start");
1279     return r;
1280 }
1281
1282 void show_single_stop(struct session *se, struct record_cluster *rec)
1283 {
1284     session_leave(se, "show_single_stop");
1285 }
1286
1287
1288 int session_fetch_more(struct session *se)
1289 {
1290     struct client_list *l;
1291     int ret = 0;
1292
1293     for (l = se->clients_active; l; l = l->next)
1294     {
1295         struct client *cl = l->client;
1296         if (client_get_state(cl) == Client_Idle)
1297         {
1298             if (client_fetch_more(cl))
1299             {
1300                 session_log(se, YLOG_LOG, "%s: more to fetch",
1301                             client_get_id(cl));
1302                 ret = 1;
1303             }
1304             else
1305             {
1306                 int filtered;
1307                 int ingest_failures;
1308                 int record_failures;
1309                 int num = client_get_num_records(
1310                     cl, &filtered, &ingest_failures, &record_failures);
1311
1312                 session_log(se, YLOG_LOG, "%s: hits=" ODR_INT_PRINTF
1313                             " fetched=%d filtered=%d",
1314                             client_get_id(cl),
1315                             client_get_hits(cl),
1316                             num, filtered);
1317                 if (ingest_failures || record_failures)
1318                 {
1319                     session_log(se, YLOG_WARN, "%s:"
1320                                 " ingest failures=%d record failures=%d",
1321                                 client_get_id(cl),
1322                                 ingest_failures, record_failures);
1323                 }
1324             }
1325         }
1326         else
1327         {
1328             session_log(se, YLOG_LOG, "%s: no fetch due to state=%s",
1329                         client_get_id(cl), client_get_state_str(cl));
1330         }
1331
1332     }
1333     return ret;
1334 }
1335
1336 struct record_cluster **show_range_start(struct session *se,
1337                                          struct reclist_sortparms *sp,
1338                                          int start, int *num, int *total,
1339                                          Odr_int *sumhits, Odr_int *approx_hits,
1340                                          void (*show_records_ready)(void *data),
1341                                          struct http_channel *chan)
1342 {
1343     struct record_cluster **recs = 0;
1344     struct reclist_sortparms *spp;
1345     struct client_list *l;
1346     int i;
1347 #if USE_TIMING
1348     yaz_timing_t t = yaz_timing_create();
1349 #endif
1350     session_enter(se, "show_range_start");
1351     *sumhits = 0;
1352     *approx_hits = 0;
1353     *total = 0;
1354     reclist_limit(se->reclist, se, 0);
1355     if (se->relevance)
1356     {
1357         for (spp = sp; spp; spp = spp->next)
1358             if (spp->type == Metadata_type_relevance)
1359             {
1360                 relevance_prepare_read(se->relevance, se->reclist);
1361                 break;
1362             }
1363         for (l = se->clients_active; l; l = l->next) {
1364             *sumhits += client_get_hits(l->client);
1365             *approx_hits += client_get_approximation(l->client);
1366         }
1367     }
1368     reclist_sort(se->reclist, sp);
1369
1370     reclist_enter(se->reclist);
1371     *total = reclist_get_num_records(se->reclist);
1372
1373     for (l = se->clients_active; l; l = l->next)
1374         client_update_show_stat(l->client, 0);
1375
1376     for (i = 0; i < start; i++)
1377     {
1378         struct record_cluster *r = reclist_read_record(se->reclist);
1379         if (!r)
1380         {
1381             *num = 0;
1382             break;
1383         }
1384         else
1385         {
1386             struct record *rec = r->records;
1387             for (;rec; rec = rec->next)
1388                 client_update_show_stat(rec->client, 1);
1389         }
1390     }
1391     recs = nmem_malloc(se->nmem, (*num > 0 ? *num : 1) * sizeof(*recs));
1392     for (i = 0; i < *num; i++)
1393     {
1394         struct record_cluster *r = reclist_read_record(se->reclist);
1395         if (!r)
1396         {
1397             *num = i;
1398             break;
1399         }
1400         else
1401         {
1402             struct record *rec = r->records;
1403             for (;rec; rec = rec->next)
1404                 client_update_show_stat(rec->client, 1);
1405             recs[i] = r;
1406         }
1407     }
1408     reclist_leave(se->reclist);
1409 #if USE_TIMING
1410     yaz_timing_stop(t);
1411     session_log(se, YLOG_LOG, "show %6.5f %3.2f %3.2f",
1412             yaz_timing_get_real(t), yaz_timing_get_user(t),
1413             yaz_timing_get_sys(t));
1414     yaz_timing_destroy(&t);
1415 #endif
1416
1417     if (!session_fetch_more(se))
1418         session_log(se, YLOG_LOG, "can not fetch more");
1419     else
1420     {
1421         show_range_stop(se, recs);
1422         session_log(se, YLOG_LOG, "fetching more in progress");
1423         if (session_set_watch(se, SESSION_WATCH_SHOW,
1424                               show_records_ready, chan, chan))
1425         {
1426             session_log(se, YLOG_WARN, "Ignoring show block");
1427             session_enter(se, "show_range_start");
1428         }
1429         else
1430         {
1431             session_log(se, YLOG_LOG, "session watch OK");
1432             return 0;
1433         }
1434     }
1435     return recs;
1436 }
1437
1438 void show_range_stop(struct session *se, struct record_cluster **recs)
1439 {
1440     session_leave(se, "show_range_stop");
1441 }
1442
1443 void statistics(struct session *se, struct statistics *stat)
1444 {
1445     struct client_list *l;
1446     int count = 0;
1447
1448     memset(stat, 0, sizeof(*stat));
1449     stat->num_hits = 0;
1450     for (l = se->clients_active; l; l = l->next)
1451     {
1452         struct client *cl = l->client;
1453         if (!client_get_connection(cl))
1454             stat->num_no_connection++;
1455         stat->num_hits += client_get_hits(cl);
1456         switch (client_get_state(cl))
1457         {
1458         case Client_Connecting: stat->num_connecting++; break;
1459         case Client_Working: stat->num_working++; break;
1460         case Client_Idle: stat->num_idle++; break;
1461         case Client_Failed: stat->num_failed++; break;
1462         case Client_Error: stat->num_error++; break;
1463         default: break;
1464         }
1465         count++;
1466     }
1467     stat->num_records = se->total_records;
1468
1469     stat->num_clients = count;
1470 }
1471
1472 static struct record_metadata *record_metadata_init(
1473     NMEM nmem, const char *value, enum conf_metadata_type type,
1474     struct _xmlAttr *attr)
1475 {
1476     struct record_metadata *rec_md = record_metadata_create(nmem);
1477     struct record_metadata_attr **attrp = &rec_md->attributes;
1478
1479     for (; attr; attr = attr->next)
1480     {
1481         if (attr->children && attr->children->content)
1482         {
1483             if (strcmp((const char *) attr->name, "type")
1484                 && strcmp((const char *) attr->name, "empty"))
1485             {  /* skip the "type" + "empty" attribute..
1486                   The "Type" is already part of the element in output
1487                   (md-%s) and so repeating it here is redundant */
1488                 *attrp = nmem_malloc(nmem, sizeof(**attrp));
1489                 (*attrp)->name =
1490                     nmem_strdup(nmem, (const char *) attr->name);
1491                 (*attrp)->value =
1492                     nmem_strdup(nmem, (const char *) attr->children->content);
1493                 attrp = &(*attrp)->next;
1494             }
1495         }
1496     }
1497     *attrp = 0;
1498
1499     switch (type)
1500     {
1501     case Metadata_type_generic:
1502     case Metadata_type_skiparticle:
1503         if (strstr(value, "://")) /* looks like a URL */
1504             rec_md->data.text.disp = nmem_strdup(nmem, value);
1505         else
1506             rec_md->data.text.disp =
1507                 normalize7bit_generic(nmem_strdup(nmem, value), " ,/.:([");
1508         rec_md->data.text.sort = 0;
1509         rec_md->data.text.snippet = 0;
1510         break;
1511     case Metadata_type_year:
1512     case Metadata_type_date:
1513     {
1514         int first, last;
1515         int longdate = 0;
1516
1517         if (type == Metadata_type_date)
1518             longdate = 1;
1519         if (extract7bit_dates((char *) value, &first, &last, longdate) < 0)
1520             return 0;
1521
1522         rec_md->data.number.min = first;
1523         rec_md->data.number.max = last;
1524     }
1525     break;
1526     case Metadata_type_float:
1527         rec_md->data.fnumber = atof(value);
1528         break;
1529     case Metadata_type_relevance:
1530     case Metadata_type_position:
1531         return 0;
1532     }
1533     return rec_md;
1534 }
1535
1536 static void mergekey_norm_wr(pp2_charset_fact_t charsets,
1537                              WRBUF norm_wr, const char *value)
1538 {
1539     const char *norm_str;
1540     pp2_charset_token_t prt =
1541         pp2_charset_token_create(charsets, "mergekey");
1542
1543     pp2_charset_token_first(prt, value, 0);
1544     while ((norm_str = pp2_charset_token_next(prt)))
1545     {
1546         if (*norm_str)
1547         {
1548             if (wrbuf_len(norm_wr))
1549                 wrbuf_puts(norm_wr, " ");
1550             wrbuf_puts(norm_wr, norm_str);
1551         }
1552     }
1553     pp2_charset_token_destroy(prt);
1554 }
1555
1556 static int get_mergekey_from_doc(xmlDoc *doc, xmlNode *root, const char *name,
1557                                  struct conf_service *service, WRBUF norm_wr)
1558 {
1559     xmlNode *n;
1560     int no_found = 0;
1561     for (n = root->children; n; n = n->next)
1562     {
1563         if (n->type != XML_ELEMENT_NODE)
1564             continue;
1565         if (!strcmp((const char *) n->name, "metadata"))
1566         {
1567             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
1568             if (type == NULL) {
1569                 yaz_log(YLOG_FATAL, "Missing type attribute on metadata element. Skipping!");
1570             }
1571             else if (!strcmp(name, (const char *) type))
1572             {
1573                 xmlChar *value = xmlNodeListGetString(doc, n->children, 1);
1574                 if (value && *value)
1575                 {
1576                     if (wrbuf_len(norm_wr) > 0)
1577                         wrbuf_puts(norm_wr, " ");
1578                     wrbuf_puts(norm_wr, name);
1579                     mergekey_norm_wr(service->charsets, norm_wr,
1580                                      (const char *) value);
1581                     no_found++;
1582                 }
1583                 if (value)
1584                     xmlFree(value);
1585             }
1586             xmlFree(type);
1587         }
1588     }
1589     return no_found;
1590 }
1591
1592 static const char *get_mergekey(xmlDoc *doc, xmlNode *root, 
1593                                 struct client *cl, int record_no,
1594                                 struct conf_service *service, NMEM nmem,
1595                                 const char *session_mergekey)
1596 {
1597     char *mergekey_norm = 0;
1598     WRBUF norm_wr = wrbuf_alloc();
1599     xmlChar *mergekey;
1600
1601     if (session_mergekey)
1602     {
1603         int i, num = 0;
1604         char **values = 0;
1605         nmem_strsplit_escape2(nmem, ",", session_mergekey, &values,
1606                               &num, 1, '\\', 1);
1607
1608         for (i = 0; i < num; i++)
1609             get_mergekey_from_doc(doc, root, values[i], service, norm_wr);
1610     }
1611     else if ((mergekey = xmlGetProp(root, (xmlChar *) "mergekey")))
1612     {
1613         mergekey_norm_wr(service->charsets, norm_wr, (const char *) mergekey);
1614         xmlFree(mergekey);
1615     }
1616     else
1617     {
1618         /* no mergekey defined in XSL. Look for mergekey metadata instead */
1619         int field_id;
1620         for (field_id = 0; field_id < service->num_metadata; field_id++)
1621         {
1622             struct conf_metadata *ser_md = &service->metadata[field_id];
1623             if (ser_md->mergekey != Metadata_mergekey_no)
1624             {
1625                 int r = get_mergekey_from_doc(doc, root, ser_md->name,
1626                                               service, norm_wr);
1627                 if (r == 0 && ser_md->mergekey == Metadata_mergekey_required)
1628                 {
1629                     /* no mergekey on this one and it is required..
1630                        Generate unique key instead */
1631                     wrbuf_rewind(norm_wr);
1632                     break;
1633                 }
1634             }
1635         }
1636     }
1637
1638     /* generate unique key if none is not generated already or is empty */
1639     if (wrbuf_len(norm_wr) == 0)
1640     {
1641         wrbuf_printf(norm_wr, "position: %s-%d",
1642                      client_get_id(cl), record_no);
1643     }
1644     else
1645     {
1646         const char *lead = "content: ";
1647         wrbuf_insert(norm_wr, 0, lead, strlen(lead));
1648     }
1649     if (wrbuf_len(norm_wr) > 0)
1650         mergekey_norm = nmem_strdup(nmem, wrbuf_cstr(norm_wr));
1651     wrbuf_destroy(norm_wr);
1652     return mergekey_norm;
1653 }
1654
1655 /** \brief see if metadata for pz:recordfilter exists
1656     \param root xml root element of normalized record
1657     \param sdb session database for client
1658     \retval 0 if there is no metadata for pz:recordfilter
1659     \retval 1 if there is metadata for pz:recordfilter
1660
1661     If there is no pz:recordfilter defined, this function returns 1
1662     as well.
1663 */
1664
1665 static int check_record_filter(xmlNode *root, struct session_database *sdb)
1666 {
1667     int match = 0;
1668     xmlNode *n;
1669     const char *s;
1670     s = session_setting_oneval(sdb, PZ_RECORDFILTER);
1671
1672     if (!s || !*s)
1673         return 1;
1674
1675     for (n = root->children; n; n = n->next)
1676     {
1677         if (n->type != XML_ELEMENT_NODE)
1678             continue;
1679         if (!strcmp((const char *) n->name, "metadata"))
1680         {
1681             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
1682             if (type)
1683             {
1684                 size_t len;
1685                 int substring;
1686                 const char *eq;
1687
1688                 if ((eq = strchr(s, '=')))
1689                     substring = 0;
1690                 else if ((eq = strchr(s, '~')))
1691                     substring = 1;
1692                 if (eq)
1693                     len = eq - s;
1694                 else
1695                     len = strlen(s);
1696                 if (len == strlen((const char *)type) &&
1697                     !memcmp((const char *) type, s, len))
1698                 {
1699                     xmlChar *value = xmlNodeGetContent(n);
1700                     if (value && *value)
1701                     {
1702                         if (!eq ||
1703                             (substring && strstr((const char *) value, eq+1)) ||
1704                             (!substring && !strcmp((const char *) value, eq + 1)))
1705                             match = 1;
1706                     }
1707                     xmlFree(value);
1708                 }
1709                 xmlFree(type);
1710             }
1711         }
1712     }
1713     return match;
1714 }
1715
1716 static int ingest_to_cluster(struct client *cl,
1717                              xmlDoc *xdoc,
1718                              xmlNode *root,
1719                              int record_no,
1720                              struct record_metadata_attr *mergekey);
1721
1722 static int ingest_sub_record(struct client *cl, xmlDoc *xdoc, xmlNode *root,
1723                              int record_no, NMEM nmem,
1724                              struct session_database *sdb,
1725                              struct record_metadata_attr *mergekeys)
1726 {
1727     int ret = 0;
1728     struct session *se = client_get_session(cl);
1729
1730     if (!check_record_filter(root, sdb))
1731     {
1732         session_log(se, YLOG_LOG,
1733                     "Filtered out record no %d from %s",
1734                     record_no, sdb->database->id);
1735         return 0;
1736     }
1737     session_enter(se, "ingest_sub_record");
1738     if (client_get_session(cl) == se && se->relevance)
1739         ret = ingest_to_cluster(cl, xdoc, root, record_no, mergekeys);
1740     session_leave(se, "ingest_sub_record");
1741
1742     return ret;
1743 }
1744
1745 /** \brief ingest XML record
1746     \param cl client holds the result set for record
1747     \param rec record buffer (0 terminated)
1748     \param record_no record position (1, 2, ..)
1749     \param nmem working NMEM
1750     \retval 0 OK
1751     \retval -1 failure
1752     \retval -2 Filtered
1753 */
1754 int ingest_record(struct client *cl, const char *rec,
1755                   int record_no, NMEM nmem)
1756 {
1757     struct session *se = client_get_session(cl);
1758     struct session_database *sdb = client_get_database(cl);
1759     struct conf_service *service = se->service;
1760     xmlDoc *xdoc = normalize_record(se, sdb, service, rec, nmem);
1761     int r = ingest_xml_record(cl, xdoc, record_no, nmem, 0);
1762     client_store_xdoc(cl, record_no, xdoc);
1763     return r;
1764 }
1765
1766 int ingest_xml_record(struct client *cl, xmlDoc *xdoc,
1767                       int record_no, NMEM nmem, int cached_copy)
1768 {
1769     struct session *se = client_get_session(cl);
1770     struct session_database *sdb = client_get_database(cl);
1771     struct conf_service *service = se->service;
1772     xmlNode *root;
1773     int r = 0;
1774     if (!xdoc)
1775         return -1;
1776
1777     if (global_parameters.dump_records)
1778     {
1779         session_log(se, YLOG_LOG, "Normalized record from %s",
1780                     sdb->database->id);
1781         log_xml_doc(xdoc);
1782     }
1783
1784     root = xmlDocGetRootElement(xdoc);
1785
1786     if (!strcmp((const char *) root->name, "cluster"))
1787     {
1788         int no_merge_keys = 0;
1789         int no_merge_dups = 0;
1790         xmlNode *sroot;
1791         struct record_metadata_attr *mk = 0;
1792
1793         for (sroot = root->children; sroot; sroot = sroot->next)
1794             if (sroot->type == XML_ELEMENT_NODE &&
1795                 !strcmp((const char *) sroot->name, "record"))
1796             {
1797                 struct record_metadata_attr **mkp;
1798                 const char *mergekey_norm =
1799                     get_mergekey(xdoc, sroot, cl, record_no, service, nmem,
1800                                  se->mergekey);
1801                 if (!mergekey_norm)
1802                 {
1803                     r = -1;
1804                     break;
1805                 }
1806                 for (mkp = &mk; *mkp; mkp = &(*mkp)->next)
1807                     if (!strcmp((*mkp)->value, mergekey_norm))
1808                         break;
1809                 if (!*mkp)
1810                 {
1811                     *mkp = (struct record_metadata_attr*)
1812                         nmem_malloc(nmem, sizeof(**mkp));
1813                     (*mkp)->name = 0;
1814                     (*mkp)->value = nmem_strdup(nmem, mergekey_norm);
1815                     (*mkp)->next = 0;
1816                     no_merge_keys++;
1817                 }
1818                 else
1819                     no_merge_dups++;
1820             }
1821         if (no_merge_keys > 1 || no_merge_dups > 0)
1822         {
1823             yaz_log(YLOG_LOG, "Got %d mergekeys, %d dups for position %d",
1824                     no_merge_keys, no_merge_dups, record_no);
1825         }
1826         for (sroot = root->children; !r && sroot; sroot = sroot->next)
1827             if (sroot->type == XML_ELEMENT_NODE &&
1828                 !strcmp((const char *) sroot->name, "record"))
1829             {
1830                 if (!cached_copy)
1831                     insert_settings_values(sdb, xdoc, root, service);
1832                 r = ingest_sub_record(cl, xdoc, sroot, record_no, nmem, sdb,
1833                                       mk);
1834             }
1835     }
1836     else if (!strcmp((const char *) root->name, "record"))
1837     {
1838         const char *mergekey_norm =
1839             get_mergekey(xdoc, root, cl, record_no, service, nmem,
1840                          se->mergekey);
1841         if (mergekey_norm)
1842         {
1843             struct record_metadata_attr *mk = (struct record_metadata_attr*)
1844                 nmem_malloc(nmem, sizeof(*mk));
1845             mk->name = 0;
1846             mk->value = nmem_strdup(nmem, mergekey_norm);
1847             mk->next = 0;
1848
1849             if (!cached_copy)
1850                 insert_settings_values(sdb, xdoc, root, service);
1851             r = ingest_sub_record(cl, xdoc, root, record_no, nmem, sdb, mk);
1852         }
1853     }
1854     else
1855     {
1856         session_log(se, YLOG_WARN, "Bad pz root element: %s",
1857                     (const char *) root->name);
1858         r = -1;
1859     }
1860     return r;
1861 }
1862
1863
1864 //    struct conf_metadata *ser_md = &service->metadata[md_field_id];
1865 //    struct record_metadata *rec_md = record->metadata[md_field_id];
1866 static int match_metadata_local(struct conf_service *service,
1867                                 struct conf_metadata *ser_md,
1868                                 struct record_metadata *rec_md0,
1869                                 char **values, int num_v)
1870 {
1871     int i;
1872     struct record_metadata *rec_md = rec_md0;
1873     WRBUF val_wr = 0;
1874     WRBUF text_wr = wrbuf_alloc();
1875     for (i = 0; i < num_v; )
1876     {
1877         if (rec_md)
1878         {
1879             if (ser_md->type == Metadata_type_year
1880                 || ser_md->type == Metadata_type_date)
1881             {
1882                 int y = atoi(values[i]);
1883                 if (y >= rec_md->data.number.min
1884                     && y <= rec_md->data.number.max)
1885                     break;
1886             }
1887             else
1888             {
1889                 if (!val_wr)
1890                 {
1891                     val_wr = wrbuf_alloc();
1892                     mergekey_norm_wr(service->charsets, val_wr, values[i]);
1893                 }
1894                 wrbuf_rewind(text_wr);
1895                 mergekey_norm_wr(service->charsets, text_wr,
1896                                  rec_md->data.text.disp);
1897                 if (!strcmp(wrbuf_cstr(val_wr), wrbuf_cstr(text_wr)))
1898                     break;
1899             }
1900             rec_md = rec_md->next;
1901         }
1902         else
1903         {
1904             rec_md = rec_md0;
1905             wrbuf_destroy(val_wr);
1906             val_wr = 0;
1907             i++;
1908         }
1909     }
1910     wrbuf_destroy(val_wr);
1911     wrbuf_destroy(text_wr);
1912     return i < num_v ? 1 : 0;
1913 }
1914
1915 int session_check_cluster_limit(struct session *se, struct record_cluster *rec)
1916 {
1917     int i;
1918     struct conf_service *service = se->service;
1919     int ret = 1;
1920     const char *name;
1921     const char *value;
1922     NMEM nmem_tmp = nmem_create();
1923
1924     for (i = 0; (name = facet_limits_get(se->facet_limits, i, &value)); i++)
1925     {
1926         int j;
1927         for (j = 0; j < service->num_metadata; j++)
1928         {
1929             struct conf_metadata *md = service->metadata + j;
1930             if (!strcmp(md->name, name) && md->limitcluster)
1931             {
1932                 char **values = 0;
1933                 int num = 0;
1934                 int md_field_id =
1935                     conf_service_metadata_field_id(service,
1936                                                    md->limitcluster);
1937
1938                 if (md_field_id < 0)
1939                 {
1940                     ret = 0;
1941                     break;
1942                 }
1943
1944                 nmem_strsplit_escape2(nmem_tmp, "|", value, &values,
1945                                       &num, 1, '\\', 1);
1946
1947                 if (!match_metadata_local(service,
1948                                           &service->metadata[md_field_id],
1949                                           rec->metadata[md_field_id],
1950                                           values, num))
1951                 {
1952                     ret = 0;
1953                     break;
1954                 }
1955             }
1956         }
1957     }
1958     nmem_destroy(nmem_tmp);
1959     return ret;
1960 }
1961
1962 // Skip record on non-zero
1963 static int check_limit_local(struct client *cl,
1964                              struct record *record,
1965                              int record_no)
1966 {
1967     int skip_record = 0;
1968     struct session *se = client_get_session(cl);
1969     struct conf_service *service = se->service;
1970     NMEM nmem_tmp = nmem_create();
1971     struct session_database *sdb = client_get_database(cl);
1972     int l = 0;
1973     while (!skip_record)
1974     {
1975         int md_field_id;
1976         char **values = 0;
1977         int num_v = 0;
1978         const char *name =
1979             client_get_facet_limit_local(cl, sdb, &l, nmem_tmp,
1980                                          &num_v, &values);
1981         if (!name)
1982             break;
1983
1984         if (!strcmp(name, "*"))
1985         {
1986             for (md_field_id = 0; md_field_id < service->num_metadata;
1987                  md_field_id++)
1988             {
1989                 if (match_metadata_local(
1990                         service,
1991                         &service->metadata[md_field_id],
1992                         record->metadata[md_field_id],
1993                         values, num_v))
1994                     break;
1995             }
1996             if (md_field_id == service->num_metadata)
1997                 skip_record = 1;
1998         }
1999         else
2000         {
2001             md_field_id = conf_service_metadata_field_id(service, name);
2002             if (md_field_id < 0)
2003             {
2004                 skip_record = 1;
2005                 break;
2006             }
2007             if (!match_metadata_local(
2008                     service,
2009                     &service->metadata[md_field_id],
2010                     record->metadata[md_field_id],
2011                     values, num_v))
2012             {
2013                 skip_record = 1;
2014             }
2015         }
2016     }
2017     nmem_destroy(nmem_tmp);
2018     return skip_record;
2019 }
2020
2021 static int ingest_to_cluster(struct client *cl,
2022                              xmlDoc *xdoc,
2023                              xmlNode *root,
2024                              int record_no,
2025                              struct record_metadata_attr *merge_keys)
2026 {
2027     xmlNode *n;
2028     xmlChar *type = 0;
2029     xmlChar *value = 0;
2030     struct session *se = client_get_session(cl);
2031     struct conf_service *service = se->service;
2032     int term_factor = 1;
2033     struct record_cluster *cluster;
2034     struct record_metadata **metadata0;
2035     struct session_database *sdb = client_get_database(cl);
2036     NMEM ingest_nmem = 0;
2037     char **rank_values = 0;
2038     int rank_num = 0;
2039     struct record *record = record_create(se->nmem,
2040                                           service->num_metadata,
2041                                           service->num_sortkeys, cl,
2042                                           record_no);
2043
2044     for (n = root->children; n; n = n->next)
2045     {
2046         if (type)
2047             xmlFree(type);
2048         if (value)
2049             xmlFree(value);
2050         type = value = 0;
2051
2052         if (n->type != XML_ELEMENT_NODE)
2053             continue;
2054         if (!strcmp((const char *) n->name, "metadata"))
2055         {
2056             struct conf_metadata *ser_md = 0;
2057             struct record_metadata **wheretoput = 0;
2058             struct record_metadata *rec_md = 0;
2059             int md_field_id = -1;
2060
2061             type = xmlGetProp(n, (xmlChar *) "type");
2062             value = xmlNodeListGetString(xdoc, n->children, 1);
2063             if (!type)
2064                 continue;
2065             if (!value || !*value)
2066             {
2067                 xmlChar *empty = xmlGetProp(n, (xmlChar *) "empty");
2068                 if (!empty)
2069                     continue;
2070                 if (value)
2071                     xmlFree(value);
2072                 value = empty;
2073             }
2074             md_field_id
2075                 = conf_service_metadata_field_id(service, (const char *) type);
2076             if (md_field_id < 0)
2077             {
2078                 if (se->number_of_warnings_unknown_metadata == 0)
2079                 {
2080                     session_log(se, YLOG_WARN,
2081                             "Ignoring unknown metadata element: %s", type);
2082                 }
2083                 se->number_of_warnings_unknown_metadata++;
2084                 continue;
2085             }
2086
2087             ser_md = &service->metadata[md_field_id];
2088
2089             // non-merged metadata
2090             rec_md = record_metadata_init(se->nmem, (const char *) value,
2091                                           ser_md->type, n->properties);
2092             if (!rec_md)
2093             {
2094                 session_log(se, YLOG_WARN, "bad metadata data '%s' "
2095                             "for element '%s'", value, type);
2096                 continue;
2097             }
2098
2099             if (ser_md->type == Metadata_type_generic)
2100             {
2101                 WRBUF w = wrbuf_alloc();
2102                 if (relevance_snippet(se->relevance,
2103                                       (char*) value, ser_md->name, w))
2104                     rec_md->data.text.snippet = nmem_strdup(se->nmem,
2105                                                             wrbuf_cstr(w));
2106                 wrbuf_destroy(w);
2107             }
2108
2109
2110             wheretoput = &record->metadata[md_field_id];
2111             while (*wheretoput)
2112                 wheretoput = &(*wheretoput)->next;
2113             *wheretoput = rec_md;
2114         }
2115     }
2116
2117     if (check_limit_local(cl, record, record_no))
2118     {
2119         if (type)
2120             xmlFree(type);
2121         if (value)
2122             xmlFree(value);
2123         return -2;
2124     }
2125     cluster = reclist_insert(se->reclist, se->relevance, service, record,
2126                              merge_keys, &se->total_merged);
2127     if (!cluster)
2128     {
2129         if (type)
2130             xmlFree(type);
2131         if (value)
2132             xmlFree(value);
2133         return 0; // complete match with existing record
2134     }
2135
2136     {
2137         const char *use_term_factor_str =
2138             session_setting_oneval(sdb, PZ_TERMLIST_TERM_FACTOR);
2139         if (use_term_factor_str && use_term_factor_str[0] == '1')
2140         {
2141             int maxrecs = client_get_maxrecs(cl);
2142             int hits = (int) client_get_hits(cl);
2143             term_factor = MAX(hits, maxrecs) /  MAX(1, maxrecs);
2144             assert(term_factor >= 1);
2145             session_log(se, YLOG_DEBUG, "Using term factor: %d (%d / %d)",
2146                         term_factor, MAX(hits, maxrecs), MAX(1, maxrecs));
2147         }
2148     }
2149
2150     if (global_parameters.dump_records)
2151         session_log(se, YLOG_LOG, "Cluster id %s from %s (#%d)", cluster->recid,
2152                     sdb->database->id, record_no);
2153
2154     // original metadata, to check if first existence of a field
2155     metadata0 = xmalloc(sizeof(*metadata0) * service->num_metadata);
2156     memcpy(metadata0, cluster->metadata,
2157            sizeof(*metadata0) * service->num_metadata);
2158
2159     ingest_nmem = nmem_create();
2160     if (se->rank)
2161     {
2162         yaz_log(YLOG_LOG, "local in sort : %s", se->rank);
2163         nmem_strsplit_escape2(ingest_nmem, ",", se->rank, &rank_values,
2164                               &rank_num, 1, '\\', 1);
2165     }
2166
2167     // now parsing XML record and adding data to cluster or record metadata
2168     for (n = root->children; n; n = n->next)
2169     {
2170         pp2_charset_token_t prt;
2171         if (type)
2172             xmlFree(type);
2173         if (value)
2174             xmlFree(value);
2175         type = value = 0;
2176
2177         if (n->type != XML_ELEMENT_NODE)
2178             continue;
2179         if (!strcmp((const char *) n->name, "metadata"))
2180         {
2181             struct conf_metadata *ser_md = 0;
2182             struct conf_sortkey *ser_sk = 0;
2183             struct record_metadata **wheretoput = 0;
2184             struct record_metadata *rec_md = 0;
2185             int md_field_id = -1;
2186             int sk_field_id = -1;
2187             const char *rank = 0;
2188             xmlChar *xml_rank = 0;
2189
2190             type = xmlGetProp(n, (xmlChar *) "type");
2191             value = xmlNodeListGetString(xdoc, n->children, 1);
2192
2193             if (!type || !value || !*value)
2194                 continue;
2195
2196             md_field_id
2197                 = conf_service_metadata_field_id(service, (const char *) type);
2198             if (md_field_id < 0)
2199                 continue;
2200
2201             ser_md = &service->metadata[md_field_id];
2202
2203             if (ser_md->sortkey_offset >= 0)
2204             {
2205                 sk_field_id = ser_md->sortkey_offset;
2206                 ser_sk = &service->sortkeys[sk_field_id];
2207             }
2208
2209             // merged metadata
2210             rec_md = record_metadata_init(se->nmem, (const char *) value,
2211                                           ser_md->type, 0);
2212
2213             // see if the field was not in cluster already (from beginning)
2214
2215             if (!rec_md)
2216                 continue;
2217
2218             if (rank_num)
2219             {
2220                 int i;
2221                 for (i = 0; i < rank_num; i++)
2222                 {
2223                     const char *val = rank_values[i];
2224                     const char *cp = strchr(val, '=');
2225                     if (!cp)
2226                         continue;
2227                     if ((cp - val) == strlen((const char *) type)
2228                         && !memcmp(val, type, cp - val))
2229                     {
2230                         rank = cp + 1;
2231                         break;
2232                     }
2233                 }
2234             }
2235             else
2236             {
2237                 xml_rank = xmlGetProp(n, (xmlChar *) "rank");
2238                 rank = xml_rank ? (const char *) xml_rank : ser_md->rank;
2239             }
2240
2241             wheretoput = &cluster->metadata[md_field_id];
2242
2243             if (ser_md->merge == Metadata_merge_first)
2244             {
2245                 if (!metadata0[md_field_id])
2246                 {
2247                     while (*wheretoput)
2248                         wheretoput = &(*wheretoput)->next;
2249                     *wheretoput = rec_md;
2250                 }
2251             }
2252             else if (ser_md->merge == Metadata_merge_unique)
2253             {
2254                 while (*wheretoput)
2255                 {
2256                     if (!strcmp((const char *) (*wheretoput)->data.text.disp,
2257                                 rec_md->data.text.disp))
2258                         break;
2259                     wheretoput = &(*wheretoput)->next;
2260                 }
2261                 if (!*wheretoput)
2262                     *wheretoput = rec_md;
2263             }
2264             else if (ser_md->merge == Metadata_merge_longest)
2265             {
2266                 if (!*wheretoput
2267                     || strlen(rec_md->data.text.disp)
2268                     > strlen((*wheretoput)->data.text.disp))
2269                 {
2270                     *wheretoput = rec_md;
2271                     if (ser_sk)
2272                     {
2273                         const char *sort_str = 0;
2274                         int skip_article =
2275                             ser_sk->type == Metadata_type_skiparticle;
2276
2277                         if (!cluster->sortkeys[sk_field_id])
2278                             cluster->sortkeys[sk_field_id] =
2279                                 nmem_malloc(se->nmem,
2280                                             sizeof(union data_types));
2281
2282                         prt =
2283                             pp2_charset_token_create(service->charsets, "sort");
2284
2285                         pp2_charset_token_first(prt, rec_md->data.text.disp,
2286                                                 skip_article);
2287
2288                         pp2_charset_token_next(prt);
2289
2290                         sort_str = pp2_get_sort(prt);
2291
2292                         cluster->sortkeys[sk_field_id]->text.disp =
2293                             rec_md->data.text.disp;
2294                         if (!sort_str)
2295                         {
2296                             sort_str = rec_md->data.text.disp;
2297                             session_log(se, YLOG_WARN,
2298                                     "Could not make sortkey. Bug #1858");
2299                         }
2300                         cluster->sortkeys[sk_field_id]->text.sort =
2301                             nmem_strdup(se->nmem, sort_str);
2302                         pp2_charset_token_destroy(prt);
2303                     }
2304                 }
2305             }
2306             else if (ser_md->merge == Metadata_merge_all)
2307             {
2308                 while (*wheretoput)
2309                     wheretoput = &(*wheretoput)->next;
2310                 *wheretoput = rec_md;
2311             }
2312             else if (ser_md->merge == Metadata_merge_range)
2313             {
2314                 if (!*wheretoput)
2315                 {
2316                     *wheretoput = rec_md;
2317                     if (ser_sk)
2318                         cluster->sortkeys[sk_field_id]
2319                             = &rec_md->data;
2320                 }
2321                 else
2322                 {
2323                     int this_min = rec_md->data.number.min;
2324                     int this_max = rec_md->data.number.max;
2325                     if (this_min < (*wheretoput)->data.number.min)
2326                         (*wheretoput)->data.number.min = this_min;
2327                     if (this_max > (*wheretoput)->data.number.max)
2328                         (*wheretoput)->data.number.max = this_max;
2329                 }
2330             }
2331
2332             // ranking of _all_ fields enabled ...
2333             if (rank)
2334             {
2335                 relevance_countwords(se->relevance, cluster,
2336                                      (char *) value, rank, ser_md->name);
2337             }
2338             // construct facets ... unless the client already has reported them
2339             if (ser_md->termlist && !client_has_facet(cl, (char *) type))
2340             {
2341                 if (ser_md->type == Metadata_type_year)
2342                 {
2343                     char year[64];
2344                     sprintf(year, "%d", rec_md->data.number.max);
2345
2346                     add_facet(se, (char *) type, year, term_factor);
2347                     if (rec_md->data.number.max != rec_md->data.number.min)
2348                     {
2349                         sprintf(year, "%d", rec_md->data.number.min);
2350                         add_facet(se, (char *) type, year, term_factor);
2351                     }
2352                 }
2353                 else
2354                     add_facet(se, (char *) type, (char *) value, term_factor);
2355             }
2356
2357             // cleaning up
2358             if (xml_rank)
2359                 xmlFree(xml_rank);
2360             xmlFree(type);
2361             xmlFree(value);
2362             type = value = 0;
2363         }
2364         else
2365         {
2366             if (se->number_of_warnings_unknown_elements == 0)
2367                 session_log(se, YLOG_WARN,
2368                         "Unexpected element in internal record: %s", n->name);
2369             se->number_of_warnings_unknown_elements++;
2370         }
2371     }
2372     if (type)
2373         xmlFree(type);
2374     if (value)
2375         xmlFree(value);
2376
2377     nmem_destroy(ingest_nmem);
2378     xfree(metadata0);
2379     relevance_donerecord(se->relevance, cluster);
2380     se->total_records++;
2381
2382     return 0;
2383 }
2384
2385 void session_log(struct session *s, int level, const char *fmt, ...)
2386 {
2387     char buf[1024];
2388     va_list ap;
2389     va_start(ap, fmt);
2390
2391     yaz_vsnprintf(buf, sizeof(buf)-30, fmt, ap);
2392     yaz_log(level, "Session %u: %s", s ? s->session_id : 0, buf);
2393
2394     va_end(ap);
2395 }
2396
2397 /*
2398  * Local variables:
2399  * c-basic-offset: 4
2400  * c-file-style: "Stroustrup"
2401  * indent-tabs-mode: nil
2402  * End:
2403  * vim: shiftwidth=4 tabstop=8 expandtab
2404  */
2405