Out defined DEBUG logging of recieved facet
[pazpar2-moved-to-github.git] / src / session.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2011 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file session.c
21     \brief high-level logic; mostly user sessions and settings
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <time.h>
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <string.h>
32 #if HAVE_SYS_TIME_H
33 #include <sys/time.h>
34 #endif
35 #if HAVE_UNISTD_H
36 #include <unistd.h>
37 #endif
38 #ifdef WIN32
39 #include <windows.h>
40 #endif
41 #include <signal.h>
42 #include <ctype.h>
43 #include <assert.h>
44 #include <math.h>
45
46 #include <yaz/marcdisp.h>
47 #include <yaz/comstack.h>
48 #include <yaz/tcpip.h>
49 #include <yaz/proto.h>
50 #include <yaz/readconf.h>
51 #include <yaz/pquery.h>
52 #include <yaz/otherinfo.h>
53 #include <yaz/yaz-util.h>
54 #include <yaz/nmem.h>
55 #include <yaz/query-charset.h>
56 #include <yaz/querytowrbuf.h>
57 #include <yaz/oid_db.h>
58 #include <yaz/snprintf.h>
59 #include <yaz/gettimeofday.h>
60
61 #define USE_TIMING 0
62 #if USE_TIMING
63 #include <yaz/timing.h>
64 #endif
65
66 #include "ppmutex.h"
67 #include "parameters.h"
68 #include "session.h"
69 #include "eventl.h"
70 #include "http.h"
71 #include "termlists.h"
72 #include "reclists.h"
73 #include "relevance.h"
74 #include "database.h"
75 #include "client.h"
76 #include "settings.h"
77 #include "normalize7bit.h"
78
79 #define TERMLIST_HIGH_SCORE 25
80
81 #define MAX_CHUNK 15
82
83 #define MAX(a,b) ((a)>(b)?(a):(b))
84
85 // Note: Some things in this structure will eventually move to configuration
86 struct parameters global_parameters = 
87 {
88     0,   // dump_records
89     0    // debug_mode
90 };
91
92 struct client_list {
93     struct client *client;
94     struct client_list *next;
95 };
96
97 /* session counting (1) , disable client counting (0) */
98 static YAZ_MUTEX g_session_mutex = 0;
99 static int no_sessions = 0;
100
101 static int session_use(int delta)
102 {
103     int sessions;
104     if (!g_session_mutex)
105         yaz_mutex_create(&g_session_mutex);
106     yaz_mutex_enter(g_session_mutex);
107     no_sessions += delta;
108     sessions = no_sessions;
109     yaz_mutex_leave(g_session_mutex);
110     yaz_log(YLOG_DEBUG, "%s sesions=%d", delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), no_sessions);
111     return sessions;
112 }
113
114 int sessions_count(void) {
115     return session_use(0);
116 }
117
118 static void log_xml_doc(xmlDoc *doc)
119 {
120     FILE *lf = yaz_log_file();
121     xmlChar *result = 0;
122     int len = 0;
123 #if LIBXML_VERSION >= 20600
124     xmlDocDumpFormatMemory(doc, &result, &len, 1);
125 #else
126     xmlDocDumpMemory(doc, &result, &len);
127 #endif
128     if (lf && len)
129     {
130         (void) fwrite(result, 1, len, lf);
131         fprintf(lf, "\n");
132     }
133     xmlFree(result);
134 }
135
136 static void session_enter(struct session *s)
137 {
138     yaz_mutex_enter(s->session_mutex);
139 }
140
141 static void session_leave(struct session *s)
142 {
143     yaz_mutex_leave(s->session_mutex);
144 }
145
146 // Recursively traverse query structure to extract terms.
147 void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
148 {
149     char **words;
150     int numwords;
151     int i;
152
153     switch (n->kind)
154     {
155     case CCL_RPN_AND:
156     case CCL_RPN_OR:
157     case CCL_RPN_NOT:
158     case CCL_RPN_PROX:
159         pull_terms(nmem, n->u.p[0], termlist, num);
160         pull_terms(nmem, n->u.p[1], termlist, num);
161         break;
162     case CCL_RPN_TERM:
163         nmem_strsplit(nmem, " ", n->u.t.term, &words, &numwords);
164         for (i = 0; i < numwords; i++)
165             termlist[(*num)++] = words[i];
166         break;
167     default: // NOOP
168         break;
169     }
170 }
171
172
173 void add_facet(struct session *s, const char *type, const char *value, int count)
174 {
175     struct conf_service *service = s->service;
176     pp2_relevance_token_t prt;
177     const char *facet_component;
178     WRBUF facet_wrbuf = wrbuf_alloc();
179     prt = pp2_relevance_tokenize(service->facet_pct);
180     
181     pp2_relevance_first(prt, value, 0);
182     while ((facet_component = pp2_relevance_token_next(prt)))
183     {
184         if (*facet_component)
185         {
186             if (wrbuf_len(facet_wrbuf))
187                 wrbuf_puts(facet_wrbuf, " ");
188             wrbuf_puts(facet_wrbuf, facet_component);
189         }
190     }
191     pp2_relevance_token_destroy(prt);
192     
193     if (wrbuf_len(facet_wrbuf))
194     {
195         int i;
196         for (i = 0; i < s->num_termlists; i++)
197             if (!strcmp(s->termlists[i].name, type))
198                 break;
199         if (i == s->num_termlists)
200         {
201             if (i == SESSION_MAX_TERMLISTS)
202             {
203                 session_log(s, YLOG_FATAL, "Too many termlists");
204                 wrbuf_destroy(facet_wrbuf);
205                 return;
206             }
207             
208             s->termlists[i].name = nmem_strdup(s->nmem, type);
209             s->termlists[i].termlist 
210                 = termlist_create(s->nmem, TERMLIST_HIGH_SCORE);
211             s->num_termlists = i + 1;
212         }
213         
214 #if
215         session_log(s, YLOG_DEBUG, "Facets for %s: %s norm:%s (%d)", type, value, wrbuf_cstr(facet_wrbuf), count);
216 #endif
217         termlist_insert(s->termlists[i].termlist, wrbuf_cstr(facet_wrbuf),
218                         count);
219     }
220     wrbuf_destroy(facet_wrbuf);
221 }
222
223 static xmlDoc *record_to_xml(struct session *se,
224                              struct session_database *sdb, const char *rec)
225 {
226     struct database *db = sdb->database;
227     xmlDoc *rdoc = 0;
228
229     rdoc = xmlParseMemory(rec, strlen(rec));
230
231     if (!rdoc)
232     {
233         session_log(se, YLOG_FATAL, "Non-wellformed XML received from %s",
234                     db->url);
235         return 0;
236     }
237
238     if (global_parameters.dump_records)
239     {
240         session_log(se, YLOG_LOG, "Un-normalized record from %s", db->url);
241         log_xml_doc(rdoc);
242     }
243
244     return rdoc;
245 }
246
247 #define MAX_XSLT_ARGS 16
248
249 // Add static values from session database settings if applicable
250 static void insert_settings_parameters(struct session_database *sdb,
251                                        struct conf_service *service,
252                                        char **parms,
253                                        NMEM nmem)
254 {
255     int i;
256     int nparms = 0;
257     int offset = 0;
258
259     for (i = 0; i < service->num_metadata; i++)
260     {
261         struct conf_metadata *md = &service->metadata[i];
262         int setting;
263
264         if (md->setting == Metadata_setting_parameter &&
265             (setting = settings_lookup_offset(service, md->name)) >= 0)
266         {
267             const char *val = session_setting_oneval(sdb, setting);
268             if (val && nparms < MAX_XSLT_ARGS)
269             {
270                 char *buf;
271                 int len = strlen(val);
272                 buf = nmem_malloc(nmem, len + 3);
273                 buf[0] = '\'';
274                 strcpy(buf + 1, val);
275                 buf[len+1] = '\'';
276                 buf[len+2] = '\0';
277                 parms[offset++] = md->name;
278                 parms[offset++] = buf;
279                 nparms++;
280             }
281         }
282     }
283     parms[offset] = 0;
284 }
285
286 // Add static values from session database settings if applicable
287 static void insert_settings_values(struct session_database *sdb, xmlDoc *doc,
288     struct conf_service *service)
289 {
290     int i;
291
292     for (i = 0; i < service->num_metadata; i++)
293     {
294         struct conf_metadata *md = &service->metadata[i];
295         int offset;
296
297         if (md->setting == Metadata_setting_postproc &&
298             (offset = settings_lookup_offset(service, md->name)) >= 0)
299         {
300             const char *val = session_setting_oneval(sdb, offset);
301             if (val)
302             {
303                 xmlNode *r = xmlDocGetRootElement(doc);
304                 xmlNode *n = xmlNewTextChild(r, 0, (xmlChar *) "metadata",
305                                              (xmlChar *) val);
306                 xmlSetProp(n, (xmlChar *) "type", (xmlChar *) md->name);
307             }
308         }
309     }
310 }
311
312 static xmlDoc *normalize_record(struct session *se,
313                                 struct session_database *sdb,
314                                 struct conf_service *service,
315                                 const char *rec, NMEM nmem)
316 {
317     xmlDoc *rdoc = record_to_xml(se, sdb, rec);
318
319     if (rdoc)
320     {
321         char *parms[MAX_XSLT_ARGS*2+1];
322         
323         insert_settings_parameters(sdb, service, parms, nmem);
324         
325         if (normalize_record_transform(sdb->map, &rdoc, (const char **)parms))
326         {
327             session_log(se, YLOG_WARN, "Normalize failed from %s",
328                         sdb->database->url);
329         }
330         else
331         {
332             insert_settings_values(sdb, rdoc, service);
333             
334             if (global_parameters.dump_records)
335             {
336                 session_log(se, YLOG_LOG, "Normalized record from %s", 
337                             sdb->database->url);
338                 log_xml_doc(rdoc);
339             }
340         }
341     }
342     return rdoc;
343 }
344
345 void session_settings_dump(struct session *se,
346                            struct session_database *db,
347                            WRBUF w)
348 {
349     if (db->settings)
350     {
351         int i, num = db->num_settings;
352         for (i = 0; i < num; i++)
353         {
354             struct setting *s = db->settings[i];
355             for (;s ; s = s->next)
356             {
357                 wrbuf_puts(w, "<set name=\"");
358                 wrbuf_xmlputs(w, s->name);
359                 wrbuf_puts(w, "\" value=\"");
360                 wrbuf_xmlputs(w, s->value);
361                 wrbuf_puts(w, "\"/>");
362             }
363             if (db->settings[i])
364                 wrbuf_puts(w, "\n");
365         }
366     }
367 }
368
369 // Retrieve first defined value for 'name' for given database.
370 // Will be extended to take into account user associated with session
371 const char *session_setting_oneval(struct session_database *db, int offset)
372 {
373     if (offset >= db->num_settings || !db->settings[offset])
374         return "";
375     return db->settings[offset]->value;
376 }
377
378 // Prepare XSLT stylesheets for record normalization
379 // Structures are allocated on the session_wide nmem to avoid having
380 // to recompute this for every search. This would lead
381 // to leaking if a single session was to repeatedly change the PZ_XSLT
382 // setting. However, this is not a realistic use scenario.
383 static int prepare_map(struct session *se, struct session_database *sdb)
384 {
385     const char *s;
386
387     if (!sdb->settings)
388     {
389         session_log(se, YLOG_WARN, "No settings on %s", sdb->database->url);
390         return -1;
391     }
392     if ((s = session_setting_oneval(sdb, PZ_XSLT)))
393     {
394         char auto_stylesheet[256];
395
396         if (!strcmp(s, "auto"))
397         {
398             const char *request_syntax = session_setting_oneval(
399                 sdb, PZ_REQUESTSYNTAX);
400             if (request_syntax)
401             {
402                 char *cp;
403                 yaz_snprintf(auto_stylesheet, sizeof(auto_stylesheet),
404                              "%s.xsl", request_syntax);
405                 for (cp = auto_stylesheet; *cp; cp++)
406                 {
407                     /* deliberately only consider ASCII */
408                     if (*cp > 32 && *cp < 127)
409                         *cp = tolower(*cp);
410                 }
411                 s = auto_stylesheet;
412             }
413             else
414             {
415                 session_log(se, YLOG_WARN,
416                             "No pz:requestsyntax for auto stylesheet");
417             }
418         }
419         sdb->map = normalize_cache_get(se->normalize_cache,
420                                        se->service->server->config, s);
421         if (!sdb->map)
422             return -1;
423     }
424     return 0;
425 }
426
427 // This analyzes settings and recomputes any supporting data structures
428 // if necessary.
429 static int prepare_session_database(struct session *se, 
430                                     struct session_database *sdb)
431 {
432     if (!sdb->settings)
433     {
434         session_log(se, YLOG_WARN, 
435                 "No settings associated with %s", sdb->database->url);
436         return -1;
437     }
438     if (sdb->settings[PZ_XSLT] && !sdb->map)
439     {
440         if (prepare_map(se, sdb) < 0)
441             return -1;
442     }
443     return 0;
444 }
445
446 // called if watch should be removed because http_channel is to be destroyed
447 static void session_watch_cancel(void *data, struct http_channel *c,
448                                  void *data2)
449 {
450     struct session_watchentry *ent = data;
451
452     ent->fun = 0;
453     ent->data = 0;
454     ent->obs = 0;
455 }
456
457 // set watch. Returns 0=OK, -1 if watch is already set
458 int session_set_watch(struct session *s, int what, 
459                       session_watchfun fun, void *data,
460                       struct http_channel *chan)
461 {
462     int ret;
463     session_enter(s);
464     if (s->watchlist[what].fun)
465         ret = -1;
466     else
467     {
468         
469         s->watchlist[what].fun = fun;
470         s->watchlist[what].data = data;
471         s->watchlist[what].obs = http_add_observer(chan, &s->watchlist[what],
472                                                    session_watch_cancel);
473         ret = 0;
474     }
475     session_leave(s);
476     return 0;
477 }
478
479 void session_alert_watch(struct session *s, int what)
480 {
481     assert(s);
482     session_enter(s);
483     if (s->watchlist[what].fun)
484     {
485         /* our watch is no longer associated with http_channel */
486         void *data;
487         session_watchfun fun;
488
489         http_remove_observer(s->watchlist[what].obs);
490         fun  = s->watchlist[what].fun;
491         data = s->watchlist[what].data;
492
493         /* reset watch before fun is invoked - in case fun wants to set
494            it again */
495         s->watchlist[what].fun = 0;
496         s->watchlist[what].data = 0;
497         s->watchlist[what].obs = 0;
498
499         session_leave(s);
500         session_log(s, YLOG_DEBUG,
501                     "Alert Watch: %d calling function: %p", what, fun);
502         fun(data);
503     }
504     else
505         session_leave(s);
506 }
507
508 //callback for grep_databases
509 static void select_targets_callback(void *context, struct session_database *db)
510 {
511     struct session *se = (struct session*) context;
512     struct client *cl = client_create();
513     struct client_list *l;
514     client_set_database(cl, db);
515
516     client_set_session(cl, se);
517
518     l = xmalloc(sizeof(*l));
519     l->client = cl;
520     l->next = se->clients;
521     se->clients = l;
522 }
523
524 static void session_remove_clients(struct session *se)
525 {
526     struct client_list *l;
527
528     session_enter(se);
529     l = se->clients;
530     se->clients = 0;
531     session_leave(se);
532
533     while (l)
534     {
535         struct client_list *l_next = l->next;
536         client_lock(l->client);
537         client_set_session(l->client, 0);
538         client_set_database(l->client, 0);
539         client_unlock(l->client);
540         client_destroy(l->client);
541         xfree(l);
542         l = l_next;
543     }
544 }
545
546 // Associates a set of clients with a session;
547 // Note: Session-databases represent databases with per-session 
548 // setting overrides
549 static int select_targets(struct session *se, const char *filter)
550 {
551     return session_grep_databases(se, filter, select_targets_callback);
552 }
553
554 int session_active_clients(struct session *s)
555 {
556     struct client_list *l;
557     int res = 0;
558
559     for (l = s->clients; l; l = l->next)
560         if (client_is_active(l->client))
561             res++;
562
563     return res;
564 }
565
566 int session_is_preferred_clients_ready(struct session *s)
567 {
568     struct client_list *l;
569     int res = 0;
570
571     for (l = s->clients; l; l = l->next)
572         if (client_is_active_preferred(l->client))
573             res++;
574     session_log(s, YLOG_DEBUG, "Has %d active preferred clients.", res);
575     return res == 0;
576 }
577
578
579
580 enum pazpar2_error_code search(struct session *se,
581                                const char *query,
582                                const char *startrecs, const char *maxrecs,
583                                const char *filter,
584                                const char **addinfo)
585 {
586     int live_channels = 0;
587     int no_working = 0;
588     int no_failed = 0;
589     struct client_list *l;
590     struct timeval tval;
591
592     session_log(se, YLOG_DEBUG, "Search");
593
594     *addinfo = 0;
595
596     session_remove_clients(se);
597     
598     session_enter(se);
599     reclist_destroy(se->reclist);
600     se->reclist = 0;
601     relevance_destroy(&se->relevance);
602     nmem_reset(se->nmem);
603     se->total_records = se->total_hits = se->total_merged = 0;
604     se->num_termlists = 0;
605     live_channels = select_targets(se, filter);
606     if (!live_channels)
607     {
608         session_leave(se);
609         return PAZPAR2_NO_TARGETS;
610     }
611     se->reclist = reclist_create(se->nmem);
612
613     yaz_gettimeofday(&tval);
614     
615     tval.tv_sec += 5;
616
617     for (l = se->clients; l; l = l->next)
618     {
619         struct client *cl = l->client;
620
621         if (maxrecs)
622             client_set_maxrecs(cl, atoi(maxrecs));
623         if (startrecs)
624             client_set_startrecs(cl, atoi(startrecs));
625         if (prepare_session_database(se, client_get_database(cl)) < 0)
626             ;
627         else if (client_parse_query(cl, query) < 0)
628             no_failed++;
629         else
630         {
631             no_working++;
632             if (client_prep_connection(cl, se->service->z3950_operation_timeout,
633                                        se->service->z3950_session_timeout,
634                                        se->service->server->iochan_man,
635                                        &tval))
636                 client_start_search(cl);
637         }
638     }
639     session_leave(se);
640     if (no_working == 0)
641     {
642         if (no_failed > 0)
643         {
644             *addinfo = "query";
645             return PAZPAR2_MALFORMED_PARAMETER_VALUE;
646         }
647         else
648             return PAZPAR2_NO_TARGETS;
649     }
650     return PAZPAR2_NO_ERROR;
651 }
652
653 // Creates a new session_database object for a database
654 static void session_init_databases_fun(void *context, struct database *db)
655 {
656     struct session *se = (struct session *) context;
657     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
658     int i;
659
660     new->database = db;
661     
662     new->map = 0;
663     assert(db->settings);
664     new->settings = nmem_malloc(se->session_nmem,
665                                 sizeof(struct settings *) * db->num_settings);
666     new->num_settings = db->num_settings;
667     for (i = 0; i < db->num_settings; i++)
668     {
669         struct setting *setting = db->settings[i];
670         new->settings[i] = setting;
671     }
672     new->next = se->databases;
673     se->databases = new;
674 }
675
676 // Doesn't free memory associated with sdb -- nmem takes care of that
677 static void session_database_destroy(struct session_database *sdb)
678 {
679     sdb->map = 0;
680 }
681
682 // Initialize session_database list -- this represents this session's view
683 // of the database list -- subject to modification by the settings ws command
684 void session_init_databases(struct session *se)
685 {
686     se->databases = 0;
687     predef_grep_databases(se, se->service, session_init_databases_fun);
688 }
689
690 // Probably session_init_databases_fun should be refactored instead of
691 // called here.
692 static struct session_database *load_session_database(struct session *se, 
693                                                       char *id)
694 {
695     struct database *db = new_database(id, se->session_nmem);
696
697     resolve_database(se->service, db);
698
699     session_init_databases_fun((void*) se, db);
700
701     // New sdb is head of se->databases list
702     return se->databases;
703 }
704
705 // Find an existing session database. If not found, load it
706 static struct session_database *find_session_database(struct session *se, 
707                                                       char *id)
708 {
709     struct session_database *sdb;
710
711     for (sdb = se->databases; sdb; sdb = sdb->next)
712         if (!strcmp(sdb->database->url, id))
713             return sdb;
714     return load_session_database(se, id);
715 }
716
717 // Apply a session override to a database
718 void session_apply_setting(struct session *se, char *dbname, char *setting,
719                            char *value)
720 {
721     struct session_database *sdb = find_session_database(se, dbname);
722     struct conf_service *service = se->service;
723     struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
724     int offset = settings_create_offset(service, setting);
725
726     expand_settings_array(&sdb->settings, &sdb->num_settings, offset,
727                           se->session_nmem);
728     new->precedence = 0;
729     new->target = dbname;
730     new->name = setting;
731     new->value = value;
732     new->next = sdb->settings[offset];
733     sdb->settings[offset] = new;
734
735     // Force later recompute of settings-driven data structures
736     // (happens when a search starts and client connections are prepared)
737     switch (offset)
738     {
739     case PZ_XSLT:
740         if (sdb->map)
741         {
742             sdb->map = 0;
743         }
744         break;
745     }
746 }
747
748 void destroy_session(struct session *se)
749 {
750     struct session_database *sdb;
751     session_log(se, YLOG_DEBUG, "Destroying");
752     session_use(-1);
753     session_remove_clients(se);
754
755     for (sdb = se->databases; sdb; sdb = sdb->next)
756         session_database_destroy(sdb);
757     normalize_cache_destroy(se->normalize_cache);
758     relevance_destroy(&se->relevance);
759     reclist_destroy(se->reclist);
760     nmem_destroy(se->nmem);
761     service_destroy(se->service);
762     yaz_mutex_destroy(&se->session_mutex);
763     wrbuf_destroy(se->wrbuf);
764 }
765
766 size_t session_get_memory_status(struct session *session) {
767     size_t session_nmem;
768     if (session == 0)
769         return 0;
770     session_enter(session);
771     session_nmem = nmem_total(session->nmem);
772     session_leave(session);
773     return session_nmem;
774 }
775
776
777 struct session *new_session(NMEM nmem, struct conf_service *service,
778                             unsigned session_id)
779 {
780     int i;
781     struct session *session = nmem_malloc(nmem, sizeof(*session));
782
783     char tmp_str[50];
784
785     sprintf(tmp_str, "session#%u", session_id);
786
787     session->session_id = session_id;
788     session_log(session, YLOG_DEBUG, "New");
789     session->service = service;
790     session->relevance = 0;
791     session->total_hits = 0;
792     session->total_records = 0;
793     session->number_of_warnings_unknown_elements = 0;
794     session->number_of_warnings_unknown_metadata = 0;
795     session->num_termlists = 0;
796     session->reclist = 0;
797     session->clients = 0;
798     session->session_nmem = nmem;
799     session->nmem = nmem_create();
800     session->wrbuf = wrbuf_alloc();
801     session->databases = 0;
802     for (i = 0; i <= SESSION_WATCH_MAX; i++)
803     {
804         session->watchlist[i].data = 0;
805         session->watchlist[i].fun = 0;
806     }
807     session->normalize_cache = normalize_cache_create();
808     session->session_mutex = 0;
809     pazpar2_mutex_create(&session->session_mutex, tmp_str);
810     session_use(1);
811     return session;
812 }
813
814 struct hitsbytarget *hitsbytarget(struct session *se, int *count, NMEM nmem)
815 {
816     struct hitsbytarget *res = 0;
817     struct client_list *l;
818     size_t sz = 0;
819
820     session_enter(se);
821     for (l = se->clients; l; l = l->next)
822         sz++;
823
824     res = nmem_malloc(nmem, sizeof(*res) * sz);
825     *count = 0;
826     for (l = se->clients; l; l = l->next)
827     {
828         struct client *cl = l->client;
829         WRBUF w = wrbuf_alloc();
830         const char *name = session_setting_oneval(client_get_database(cl),
831                                                   PZ_NAME);
832
833         res[*count].id = client_get_database(cl)->database->url;
834         res[*count].name = *name ? name : "Unknown";
835         res[*count].hits = client_get_hits(cl);
836         res[*count].records = client_get_num_records(cl);
837         res[*count].diagnostic = client_get_diagnostic(cl);
838         res[*count].state = client_get_state_str(cl);
839         res[*count].connected  = client_get_connection(cl) ? 1 : 0;
840         session_settings_dump(se, client_get_database(cl), w);
841         res[*count].settings_xml = w;
842         (*count)++;
843     }
844     session_leave(se);
845     return res;
846 }
847
848 struct termlist_score **termlist(struct session *se, const char *name, int *num)
849 {
850     int i;
851     struct termlist_score **tl = 0;
852
853     session_enter(se);
854     for (i = 0; i < se->num_termlists; i++)
855         if (!strcmp((const char *) se->termlists[i].name, name))
856         {
857             tl = termlist_highscore(se->termlists[i].termlist, num);
858             break;
859         }
860     session_leave(se);
861     return tl;
862 }
863
864 #ifdef MISSING_HEADERS
865 void report_nmem_stats(void)
866 {
867     size_t in_use, is_free;
868
869     nmem_get_memory_in_use(&in_use);
870     nmem_get_memory_free(&is_free);
871
872     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
873             (long) in_use, (long) is_free);
874 }
875 #endif
876
877 struct record_cluster *show_single_start(struct session *se, const char *id,
878                                          struct record_cluster **prev_r,
879                                          struct record_cluster **next_r)
880 {
881     struct record_cluster *r = 0;
882
883     session_enter(se);
884     *prev_r = 0;
885     *next_r = 0;
886     if (se->reclist)
887     {
888         reclist_enter(se->reclist);
889         while ((r = reclist_read_record(se->reclist)))
890         {
891             if (!strcmp(r->recid, id))
892             {
893                 *next_r = reclist_read_record(se->reclist);
894                 break;
895             }
896             *prev_r = r;
897         }
898         reclist_leave(se->reclist);
899     }
900     if (!r)
901         session_leave(se);
902     return r;
903 }
904
905 void show_single_stop(struct session *se, struct record_cluster *rec)
906 {
907     session_leave(se);
908 }
909
910 struct record_cluster **show_range_start(struct session *se,
911                                          struct reclist_sortparms *sp, 
912                                          int start, int *num, int *total, Odr_int *sumhits)
913 {
914     struct record_cluster **recs;
915     struct reclist_sortparms *spp;
916     int i;
917 #if USE_TIMING    
918     yaz_timing_t t = yaz_timing_create();
919 #endif
920     session_enter(se);
921     recs = nmem_malloc(se->nmem, *num * sizeof(struct record_cluster *));
922     if (!se->relevance)
923     {
924         *num = 0;
925         *total = 0;
926         *sumhits = 0;
927         recs = 0;
928     }
929     else
930     {
931         for (spp = sp; spp; spp = spp->next)
932             if (spp->type == Metadata_sortkey_relevance)
933             {
934                 relevance_prepare_read(se->relevance, se->reclist);
935                 break;
936             }
937         reclist_sort(se->reclist, sp);
938         
939         reclist_enter(se->reclist);
940         *total = reclist_get_num_records(se->reclist);
941         *sumhits = se->total_hits;
942         
943         for (i = 0; i < start; i++)
944             if (!reclist_read_record(se->reclist))
945             {
946                 *num = 0;
947                 recs = 0;
948                 break;
949             }
950         
951         for (i = 0; i < *num; i++)
952         {
953             struct record_cluster *r = reclist_read_record(se->reclist);
954             if (!r)
955             {
956                 *num = i;
957                 break;
958             }
959             recs[i] = r;
960         }
961         reclist_leave(se->reclist);
962     }
963 #if USE_TIMING
964     yaz_timing_stop(t);
965     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
966             yaz_timing_get_real(t), yaz_timing_get_user(t),
967             yaz_timing_get_sys(t));
968     yaz_timing_destroy(&t);
969 #endif
970     return recs;
971 }
972
973 void show_range_stop(struct session *se, struct record_cluster **recs)
974 {
975     session_leave(se);
976 }
977
978 void statistics(struct session *se, struct statistics *stat)
979 {
980     struct client_list *l;
981     int count = 0;
982
983     memset(stat, 0, sizeof(*stat));
984     for (l = se->clients; l; l = l->next)
985     {
986         struct client *cl = l->client;
987         if (!client_get_connection(cl))
988             stat->num_no_connection++;
989         switch (client_get_state(cl))
990         {
991         case Client_Connecting: stat->num_connecting++; break;
992         case Client_Working: stat->num_working++; break;
993         case Client_Idle: stat->num_idle++; break;
994         case Client_Failed: stat->num_failed++; break;
995         case Client_Error: stat->num_error++; break;
996         default: break;
997         }
998         count++;
999     }
1000     stat->num_hits = se->total_hits;
1001     stat->num_records = se->total_records;
1002
1003     stat->num_clients = count;
1004 }
1005
1006 static struct record_metadata *record_metadata_init(
1007     NMEM nmem, const char *value, enum conf_metadata_type type,
1008     struct _xmlAttr *attr)
1009 {
1010     struct record_metadata *rec_md = record_metadata_create(nmem);
1011     struct record_metadata_attr **attrp = &rec_md->attributes;
1012     
1013     for (; attr; attr = attr->next)
1014     {
1015         if (attr->children && attr->children->content)
1016         {
1017             if (strcmp((const char *) attr->name, "type"))
1018             {  /* skip the "type" attribute.. Its value is already part of
1019                   the element in output (md-%s) and so repeating it here
1020                   is redundant */
1021                 *attrp = nmem_malloc(nmem, sizeof(**attrp));
1022                 (*attrp)->name =
1023                     nmem_strdup(nmem, (const char *) attr->name);
1024                 (*attrp)->value =
1025                     nmem_strdup(nmem, (const char *) attr->children->content);
1026                 attrp = &(*attrp)->next;
1027             }
1028         }
1029     }
1030     *attrp = 0;
1031
1032     if (type == Metadata_type_generic)
1033     {
1034         char *p = nmem_strdup(nmem, value);
1035
1036         p = normalize7bit_generic(p, " ,/.:([");
1037         
1038         rec_md->data.text.disp = p;
1039         rec_md->data.text.sort = 0;
1040     }
1041     else if (type == Metadata_type_year || type == Metadata_type_date)
1042     {
1043         int first, last;
1044         int longdate = 0;
1045
1046         if (type == Metadata_type_date)
1047             longdate = 1;
1048         if (extract7bit_dates((char *) value, &first, &last, longdate) < 0)
1049             return 0;
1050
1051         rec_md->data.number.min = first;
1052         rec_md->data.number.max = last;
1053     }
1054     else
1055         return 0;
1056     return rec_md;
1057 }
1058
1059 static int get_mergekey_from_doc(xmlDoc *doc, xmlNode *root, const char *name,
1060                                  struct conf_service *service, WRBUF norm_wr)
1061 {
1062     xmlNode *n;
1063     int no_found = 0;
1064     for (n = root->children; n; n = n->next)
1065     {
1066         if (n->type != XML_ELEMENT_NODE)
1067             continue;
1068         if (!strcmp((const char *) n->name, "metadata"))
1069         {
1070             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
1071             if (type == NULL) {
1072                 yaz_log(YLOG_FATAL, "Missing type attribute on metadata element. Skipping!");
1073             }
1074             else if (!strcmp(name, (const char *) type))
1075             {
1076                 xmlChar *value = xmlNodeListGetString(doc, n->children, 1);
1077                 if (value)
1078                 {
1079                     const char *norm_str;
1080                     pp2_relevance_token_t prt =
1081                         pp2_relevance_tokenize(service->mergekey_pct);
1082                     
1083                     pp2_relevance_first(prt, (const char *) value, 0);
1084                     if (wrbuf_len(norm_wr) > 0)
1085                         wrbuf_puts(norm_wr, " ");
1086                     wrbuf_puts(norm_wr, name);
1087                     while ((norm_str =
1088                             pp2_relevance_token_next(prt)))
1089                     {
1090                         if (*norm_str)
1091                         {
1092                             wrbuf_puts(norm_wr, " ");
1093                             wrbuf_puts(norm_wr, norm_str);
1094                         }
1095                     }
1096                     xmlFree(value);
1097                     pp2_relevance_token_destroy(prt);
1098                     no_found++;
1099                 }
1100             }
1101             xmlFree(type);
1102         }
1103     }
1104     return no_found;
1105 }
1106
1107 static const char *get_mergekey(xmlDoc *doc, struct client *cl, int record_no,
1108                                 struct conf_service *service, NMEM nmem)
1109 {
1110     char *mergekey_norm = 0;
1111     xmlNode *root = xmlDocGetRootElement(doc);
1112     WRBUF norm_wr = wrbuf_alloc();
1113
1114     /* consider mergekey from XSL first */
1115     xmlChar *mergekey = xmlGetProp(root, (xmlChar *) "mergekey");
1116     if (mergekey)
1117     {
1118         const char *norm_str;
1119         pp2_relevance_token_t prt =
1120             pp2_relevance_tokenize(service->mergekey_pct);
1121
1122         pp2_relevance_first(prt, (const char *) mergekey, 0);
1123         while ((norm_str = pp2_relevance_token_next(prt)))
1124         {
1125             if (*norm_str)
1126             {
1127                 if (wrbuf_len(norm_wr))
1128                     wrbuf_puts(norm_wr, " ");
1129                 wrbuf_puts(norm_wr, norm_str);
1130             }
1131         }
1132         pp2_relevance_token_destroy(prt);
1133         xmlFree(mergekey);
1134     }
1135     else
1136     {
1137         /* no mergekey defined in XSL. Look for mergekey metadata instead */
1138         int field_id;
1139         for (field_id = 0; field_id < service->num_metadata; field_id++)
1140         {
1141             struct conf_metadata *ser_md = &service->metadata[field_id];
1142             if (ser_md->mergekey != Metadata_mergekey_no)
1143             {
1144                 int r = get_mergekey_from_doc(doc, root, ser_md->name,
1145                                               service, norm_wr);
1146                 if (r == 0 && ser_md->mergekey == Metadata_mergekey_required)
1147                 {
1148                     /* no mergekey on this one and it is required.. 
1149                        Generate unique key instead */
1150                     wrbuf_rewind(norm_wr);
1151                     break;
1152                 }
1153             }
1154         }
1155     }
1156
1157     /* generate unique key if none is not generated already or is empty */
1158     if (wrbuf_len(norm_wr) == 0)
1159     {
1160         wrbuf_printf(norm_wr, "%s-%d",
1161                      client_get_database(cl)->database->url, record_no);
1162     }
1163     if (wrbuf_len(norm_wr) > 0)
1164         mergekey_norm = nmem_strdup(nmem, wrbuf_cstr(norm_wr));
1165     wrbuf_destroy(norm_wr);
1166     return mergekey_norm;
1167 }
1168
1169 /** \brief see if metadata for pz:recordfilter exists 
1170     \param root xml root element of normalized record
1171     \param sdb session database for client
1172     \retval 0 if there is no metadata for pz:recordfilter
1173     \retval 1 if there is metadata for pz:recordfilter
1174
1175     If there is no pz:recordfilter defined, this function returns 1
1176     as well.
1177 */
1178     
1179 static int check_record_filter(xmlNode *root, struct session_database *sdb)
1180 {
1181     int match = 0;
1182     xmlNode *n;
1183     const char *s;
1184     s = session_setting_oneval(sdb, PZ_RECORDFILTER);
1185
1186     if (!s || !*s)
1187         return 1;
1188
1189     for (n = root->children; n; n = n->next)
1190     {
1191         if (n->type != XML_ELEMENT_NODE)
1192             continue;
1193         if (!strcmp((const char *) n->name, "metadata"))
1194         {
1195             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
1196             if (type)
1197             {
1198                 size_t len;
1199                 int substring;
1200                 const char *eq;
1201
1202                 if ((eq = strchr(s, '=')))
1203                     substring = 0;
1204                 else if ((eq = strchr(s, '~')))
1205                     substring = 1;
1206                 if (eq)
1207                     len = eq - s;
1208                 else
1209                     len = strlen(s);
1210                 if (len == strlen((const char *)type) &&
1211                     !memcmp((const char *) type, s, len))
1212                 {
1213                     xmlChar *value = xmlNodeGetContent(n);
1214                     if (value && *value)
1215                     {
1216                         if (!eq ||
1217                             (substring && strstr((const char *) value, eq+1)) ||
1218                             (!substring && !strcmp((const char *) value, eq + 1)))
1219                             match = 1;
1220                     }
1221                     xmlFree(value);
1222                 }
1223                 xmlFree(type);
1224             }
1225         }
1226     }
1227     return match;
1228 }
1229
1230
1231 static int ingest_to_cluster(struct client *cl,
1232                              xmlDoc *xdoc,
1233                              xmlNode *root,
1234                              int record_no,
1235                              const char *mergekey_norm);
1236
1237 /** \brief ingest XML record
1238     \param cl client holds the result set for record
1239     \param rec record buffer (0 terminated)
1240     \param record_no record position (1, 2, ..)
1241     \param nmem working NMEM
1242     \retval 0 OK
1243     \retval -1 failure
1244 */
1245 int ingest_record(struct client *cl, const char *rec,
1246                   int record_no, NMEM nmem)
1247 {
1248     struct session *se = client_get_session(cl);
1249     int ret = 0;
1250     struct session_database *sdb = client_get_database(cl);
1251     struct conf_service *service = se->service;
1252     xmlDoc *xdoc = normalize_record(se, sdb, service, rec, nmem);
1253     xmlNode *root;
1254     const char *mergekey_norm;
1255     
1256     if (!xdoc)
1257         return -1;
1258     
1259     root = xmlDocGetRootElement(xdoc);
1260     
1261     if (!check_record_filter(root, sdb))
1262     {
1263         session_log(se, YLOG_WARN, "Filtered out record no %d from %s",
1264                     record_no, sdb->database->url);
1265         xmlFreeDoc(xdoc);
1266         return -1;
1267     }
1268     
1269     mergekey_norm = get_mergekey(xdoc, cl, record_no, service, nmem);
1270     if (!mergekey_norm)
1271     {
1272         session_log(se, YLOG_WARN, "Got no mergekey");
1273         xmlFreeDoc(xdoc);
1274         return -1;
1275     }
1276     session_enter(se);
1277     if (client_get_session(cl) == se)
1278         ret = ingest_to_cluster(cl, xdoc, root, record_no, mergekey_norm);
1279     session_leave(se);
1280     
1281     xmlFreeDoc(xdoc);
1282     return ret;
1283 }
1284
1285 static int ingest_to_cluster(struct client *cl,
1286                              xmlDoc *xdoc,
1287                              xmlNode *root,
1288                              int record_no,
1289                              const char *mergekey_norm)
1290 {
1291     xmlNode *n;
1292     xmlChar *type = 0;
1293     xmlChar *value = 0;
1294     struct session_database *sdb = client_get_database(cl);
1295     struct session *se = client_get_session(cl);
1296     struct conf_service *service = se->service;
1297     struct record *record = record_create(se->nmem, 
1298                                           service->num_metadata,
1299                                           service->num_sortkeys, cl,
1300                                           record_no);
1301     struct record_cluster *cluster = reclist_insert(se->reclist,
1302                                                     service, 
1303                                                     record,
1304                                                     mergekey_norm,
1305                                                     &se->total_merged);
1306
1307     const char *use_term_factor_str = session_setting_oneval(sdb, PZ_TERMLIST_TERM_FACTOR);
1308     int use_term_factor = 0;
1309     int term_factor = 1; 
1310     if (use_term_factor_str && use_term_factor_str[0] != 0)
1311        use_term_factor =  atoi(use_term_factor_str);
1312     if (use_term_factor) {
1313         int maxrecs = client_get_maxrecs(cl);
1314         int hits = (int) client_get_hits(cl);
1315         term_factor = MAX(hits, maxrecs) /  MAX(1, maxrecs);
1316         assert(term_factor >= 1);
1317         yaz_log(YLOG_DEBUG, "Using term factor: %d (%d / %d)", term_factor, MAX(hits, maxrecs), MAX(1, maxrecs));
1318     }
1319
1320     if (!cluster)
1321         return -1;
1322     if (global_parameters.dump_records)
1323         session_log(se, YLOG_LOG, "Cluster id %s from %s (#%d)", cluster->recid,
1324                     sdb->database->url, record_no);
1325     relevance_newrec(se->relevance, cluster);
1326     
1327     // now parsing XML record and adding data to cluster or record metadata
1328     for (n = root->children; n; n = n->next)
1329     {
1330         pp2_relevance_token_t prt;
1331         if (type)
1332             xmlFree(type);
1333         if (value)
1334             xmlFree(value);
1335         type = value = 0;
1336         
1337         if (n->type != XML_ELEMENT_NODE)
1338             continue;
1339         if (!strcmp((const char *) n->name, "metadata"))
1340         {
1341             struct conf_metadata *ser_md = 0;
1342             struct conf_sortkey *ser_sk = 0;
1343             struct record_metadata **wheretoput = 0;
1344             struct record_metadata *rec_md = 0;
1345             int md_field_id = -1;
1346             int sk_field_id = -1;
1347             
1348             type = xmlGetProp(n, (xmlChar *) "type");
1349             value = xmlNodeListGetString(xdoc, n->children, 1);
1350             
1351             if (!type || !value || !*value)
1352                 continue;
1353             
1354             md_field_id 
1355                 = conf_service_metadata_field_id(service, (const char *) type);
1356             if (md_field_id < 0)
1357             {
1358                 if (se->number_of_warnings_unknown_metadata == 0)
1359                 {
1360                     session_log(se, YLOG_WARN, 
1361                             "Ignoring unknown metadata element: %s", type);
1362                 }
1363                 se->number_of_warnings_unknown_metadata++;
1364                 continue;
1365             }
1366             
1367             ser_md = &service->metadata[md_field_id];
1368             
1369             if (ser_md->sortkey_offset >= 0){
1370                 sk_field_id = ser_md->sortkey_offset;
1371                 ser_sk = &service->sortkeys[sk_field_id];
1372             }
1373
1374             // non-merged metadata
1375             rec_md = record_metadata_init(se->nmem, (const char *) value,
1376                                           ser_md->type, n->properties);
1377             if (!rec_md)
1378             {
1379                 session_log(se, YLOG_WARN, "bad metadata data '%s' "
1380                             "for element '%s'", value, type);
1381                 continue;
1382             }
1383             wheretoput = &record->metadata[md_field_id];
1384             while (*wheretoput)
1385                 wheretoput = &(*wheretoput)->next;
1386             *wheretoput = rec_md;
1387
1388             // merged metadata
1389             rec_md = record_metadata_init(se->nmem, (const char *) value,
1390                                           ser_md->type, 0);
1391             wheretoput = &cluster->metadata[md_field_id];
1392
1393             // and polulate with data:
1394             // assign cluster or record based on merge action
1395             if (ser_md->merge == Metadata_merge_unique)
1396             {
1397                 struct record_metadata *mnode;
1398                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
1399                     if (!strcmp((const char *) mnode->data.text.disp, 
1400                                 rec_md->data.text.disp))
1401                         break;
1402                 if (!mnode)
1403                 {
1404                     rec_md->next = *wheretoput;
1405                     *wheretoput = rec_md;
1406                 }
1407             }
1408             else if (ser_md->merge == Metadata_merge_longest)
1409             {
1410                 if (!*wheretoput 
1411                     || strlen(rec_md->data.text.disp) 
1412                     > strlen((*wheretoput)->data.text.disp))
1413                 {
1414                     *wheretoput = rec_md;
1415                     if (ser_sk)
1416                     {
1417                         const char *sort_str = 0;
1418                         int skip_article = 
1419                             ser_sk->type == Metadata_sortkey_skiparticle;
1420
1421                         if (!cluster->sortkeys[sk_field_id])
1422                             cluster->sortkeys[sk_field_id] = 
1423                                 nmem_malloc(se->nmem, 
1424                                             sizeof(union data_types));
1425                          
1426                         prt = pp2_relevance_tokenize(service->sort_pct);
1427
1428                         pp2_relevance_first(prt, rec_md->data.text.disp,
1429                                             skip_article);
1430
1431                         pp2_relevance_token_next(prt);
1432                          
1433                         sort_str = pp2_get_sort(prt);
1434                          
1435                         cluster->sortkeys[sk_field_id]->text.disp = 
1436                             rec_md->data.text.disp;
1437                         if (!sort_str)
1438                         {
1439                             sort_str = rec_md->data.text.disp;
1440                             session_log(se, YLOG_WARN, 
1441                                     "Could not make sortkey. Bug #1858");
1442                         }
1443                         cluster->sortkeys[sk_field_id]->text.sort = 
1444                             nmem_strdup(se->nmem, sort_str);
1445                         pp2_relevance_token_destroy(prt);
1446                     }
1447                 }
1448             }
1449             else if (ser_md->merge == Metadata_merge_all)
1450             {
1451                 rec_md->next = *wheretoput;
1452                 *wheretoput = rec_md;
1453             }
1454             else if (ser_md->merge == Metadata_merge_range)
1455             {
1456                 if (!*wheretoput)
1457                 {
1458                     *wheretoput = rec_md;
1459                     if (ser_sk)
1460                         cluster->sortkeys[sk_field_id] 
1461                             = &rec_md->data;
1462                 }
1463                 else
1464                 {
1465                     int this_min = rec_md->data.number.min;
1466                     int this_max = rec_md->data.number.max;
1467                     if (this_min < (*wheretoput)->data.number.min)
1468                         (*wheretoput)->data.number.min = this_min;
1469                     if (this_max > (*wheretoput)->data.number.max)
1470                         (*wheretoput)->data.number.max = this_max;
1471                 }
1472             }
1473
1474
1475             // ranking of _all_ fields enabled ... 
1476             if (ser_md->rank)
1477                 relevance_countwords(se->relevance, cluster, 
1478                                      (char *) value, ser_md->rank,
1479                                      ser_md->name);
1480
1481             // construct facets ... unless the client already has reported them
1482             if (ser_md->termlist && !client_has_facet(cl, (char *) type))
1483             {
1484
1485                 if (ser_md->type == Metadata_type_year)
1486                 {
1487                     char year[64];
1488                     sprintf(year, "%d", rec_md->data.number.max);
1489
1490                     add_facet(se, (char *) type, year, term_factor);
1491                     if (rec_md->data.number.max != rec_md->data.number.min)
1492                     {
1493                         sprintf(year, "%d", rec_md->data.number.min);
1494                         add_facet(se, (char *) type, year, term_factor);
1495                     }
1496                 }
1497                 else
1498                     add_facet(se, (char *) type, (char *) value, term_factor);
1499             }
1500
1501             // cleaning up
1502             xmlFree(type);
1503             xmlFree(value);
1504             type = value = 0;
1505         }
1506         else
1507         {
1508             if (se->number_of_warnings_unknown_elements == 0)
1509                 session_log(se, YLOG_WARN,
1510                         "Unexpected element in internal record: %s", n->name);
1511             se->number_of_warnings_unknown_elements++;
1512         }
1513     }
1514     if (type)
1515         xmlFree(type);
1516     if (value)
1517         xmlFree(value);
1518
1519     relevance_donerecord(se->relevance, cluster);
1520     se->total_records++;
1521
1522     return 0;
1523 }
1524
1525 void session_log(struct session *s, int level, const char *fmt, ...)
1526 {
1527     char buf[1024];
1528     va_list ap;
1529     va_start(ap, fmt);
1530
1531     yaz_vsnprintf(buf, sizeof(buf)-30, fmt, ap);
1532     yaz_log(level, "Session (%u): %s", s->session_id, buf);
1533
1534     va_end(ap);
1535 }
1536
1537 /*
1538  * Local variables:
1539  * c-basic-offset: 4
1540  * c-file-style: "Stroustrup"
1541  * indent-tabs-mode: nil
1542  * End:
1543  * vim: shiftwidth=4 tabstop=8 expandtab
1544  */
1545