Handling growing free http buffers and channels
[pazpar2-moved-to-github.git] / src / session.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2011 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file session.c
21     \brief high-level logic; mostly user sessions and settings
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <time.h>
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <string.h>
32 #if HAVE_SYS_TIME_H
33 #include <sys/time.h>
34 #endif
35 #if HAVE_UNISTD_H
36 #include <unistd.h>
37 #endif
38 #ifdef WIN32
39 #include <windows.h>
40 #endif
41 #include <signal.h>
42 #include <ctype.h>
43 #include <assert.h>
44 #include <math.h>
45
46 #include <yaz/marcdisp.h>
47 #include <yaz/comstack.h>
48 #include <yaz/tcpip.h>
49 #include <yaz/proto.h>
50 #include <yaz/readconf.h>
51 #include <yaz/pquery.h>
52 #include <yaz/otherinfo.h>
53 #include <yaz/yaz-util.h>
54 #include <yaz/nmem.h>
55 #include <yaz/query-charset.h>
56 #include <yaz/querytowrbuf.h>
57 #include <yaz/oid_db.h>
58 #include <yaz/snprintf.h>
59 #include <yaz/gettimeofday.h>
60
61 #define USE_TIMING 0
62 #if USE_TIMING
63 #include <yaz/timing.h>
64 #endif
65
66 #include "ppmutex.h"
67 #include "parameters.h"
68 #include "session.h"
69 #include "eventl.h"
70 #include "http.h"
71 #include "termlists.h"
72 #include "reclists.h"
73 #include "relevance.h"
74 #include "database.h"
75 #include "client.h"
76 #include "settings.h"
77 #include "normalize7bit.h"
78
79 #define TERMLIST_HIGH_SCORE 25
80
81 #define MAX_CHUNK 15
82
83 #define MAX(a,b) ((a)>(b)?(a):(b))
84
85 // Note: Some things in this structure will eventually move to configuration
86 struct parameters global_parameters = 
87 {
88     0,   // dump_records
89     0    // debug_mode
90 };
91
92 struct client_list {
93     struct client *client;
94     struct client_list *next;
95 };
96
97 /* session counting (1) , disable client counting (0) */
98 static YAZ_MUTEX g_session_mutex = 0;
99 static int no_sessions = 0;
100
101 static int session_use(int delta)
102 {
103     int sessions;
104     if (!g_session_mutex)
105         yaz_mutex_create(&g_session_mutex);
106     yaz_mutex_enter(g_session_mutex);
107     no_sessions += delta;
108     sessions = no_sessions;
109     yaz_mutex_leave(g_session_mutex);
110     yaz_log(YLOG_DEBUG, "%s sesions=%d", delta == 0 ? "" : (delta > 0 ? "INC" : "DEC"), no_sessions);
111     return sessions;
112 }
113
114 int sessions_count(void) {
115     return session_use(0);
116 }
117
118 static void log_xml_doc(xmlDoc *doc)
119 {
120     FILE *lf = yaz_log_file();
121     xmlChar *result = 0;
122     int len = 0;
123 #if LIBXML_VERSION >= 20600
124     xmlDocDumpFormatMemory(doc, &result, &len, 1);
125 #else
126     xmlDocDumpMemory(doc, &result, &len);
127 #endif
128     if (lf && len)
129     {
130         (void) fwrite(result, 1, len, lf);
131         fprintf(lf, "\n");
132     }
133     xmlFree(result);
134 }
135
136 static void session_enter(struct session *s)
137 {
138     yaz_mutex_enter(s->session_mutex);
139 }
140
141 static void session_leave(struct session *s)
142 {
143     yaz_mutex_leave(s->session_mutex);
144 }
145
146 // Recursively traverse query structure to extract terms.
147 void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
148 {
149     char **words;
150     int numwords;
151     int i;
152
153     switch (n->kind)
154     {
155     case CCL_RPN_AND:
156     case CCL_RPN_OR:
157     case CCL_RPN_NOT:
158     case CCL_RPN_PROX:
159         pull_terms(nmem, n->u.p[0], termlist, num);
160         pull_terms(nmem, n->u.p[1], termlist, num);
161         break;
162     case CCL_RPN_TERM:
163         nmem_strsplit(nmem, " ", n->u.t.term, &words, &numwords);
164         for (i = 0; i < numwords; i++)
165             termlist[(*num)++] = words[i];
166         break;
167     default: // NOOP
168         break;
169     }
170 }
171
172
173 void add_facet(struct session *s, const char *type, const char *value, int count)
174 {
175     struct conf_service *service = s->service;
176     pp2_relevance_token_t prt;
177     const char *facet_component;
178     WRBUF facet_wrbuf = wrbuf_alloc();
179     prt = pp2_relevance_tokenize(service->facet_pct);
180     
181     pp2_relevance_first(prt, value, 0);
182     while ((facet_component = pp2_relevance_token_next(prt)))
183     {
184         if (*facet_component)
185         {
186             if (wrbuf_len(facet_wrbuf))
187                 wrbuf_puts(facet_wrbuf, " ");
188             wrbuf_puts(facet_wrbuf, facet_component);
189         }
190     }
191     pp2_relevance_token_destroy(prt);
192     
193     if (wrbuf_len(facet_wrbuf))
194     {
195         int i;
196         for (i = 0; i < s->num_termlists; i++)
197             if (!strcmp(s->termlists[i].name, type))
198                 break;
199         if (i == s->num_termlists)
200         {
201             if (i == SESSION_MAX_TERMLISTS)
202             {
203                 session_log(s, YLOG_FATAL, "Too many termlists");
204                 wrbuf_destroy(facet_wrbuf);
205                 return;
206             }
207             
208             s->termlists[i].name = nmem_strdup(s->nmem, type);
209             s->termlists[i].termlist 
210                 = termlist_create(s->nmem, TERMLIST_HIGH_SCORE);
211             s->num_termlists = i + 1;
212         }
213         
214         session_log(s, YLOG_DEBUG, "Facets for %s: %s norm:%s (%d)",
215                     type, value, wrbuf_cstr(facet_wrbuf), count);
216         termlist_insert(s->termlists[i].termlist, wrbuf_cstr(facet_wrbuf),
217                         count);
218     }
219     wrbuf_destroy(facet_wrbuf);
220 }
221
222 static xmlDoc *record_to_xml(struct session *se,
223                              struct session_database *sdb, const char *rec)
224 {
225     struct database *db = sdb->database;
226     xmlDoc *rdoc = 0;
227
228     rdoc = xmlParseMemory(rec, strlen(rec));
229
230     if (!rdoc)
231     {
232         session_log(se, YLOG_FATAL, "Non-wellformed XML received from %s",
233                     db->url);
234         return 0;
235     }
236
237     if (global_parameters.dump_records)
238     {
239         session_log(se, YLOG_LOG, "Un-normalized record from %s", db->url);
240         log_xml_doc(rdoc);
241     }
242
243     return rdoc;
244 }
245
246 #define MAX_XSLT_ARGS 16
247
248 // Add static values from session database settings if applicable
249 static void insert_settings_parameters(struct session_database *sdb,
250                                        struct conf_service *service,
251                                        char **parms,
252                                        NMEM nmem)
253 {
254     int i;
255     int nparms = 0;
256     int offset = 0;
257
258     for (i = 0; i < service->num_metadata; i++)
259     {
260         struct conf_metadata *md = &service->metadata[i];
261         int setting;
262
263         if (md->setting == Metadata_setting_parameter &&
264             (setting = settings_lookup_offset(service, md->name)) >= 0)
265         {
266             const char *val = session_setting_oneval(sdb, setting);
267             if (val && nparms < MAX_XSLT_ARGS)
268             {
269                 char *buf;
270                 int len = strlen(val);
271                 buf = nmem_malloc(nmem, len + 3);
272                 buf[0] = '\'';
273                 strcpy(buf + 1, val);
274                 buf[len+1] = '\'';
275                 buf[len+2] = '\0';
276                 parms[offset++] = md->name;
277                 parms[offset++] = buf;
278                 nparms++;
279             }
280         }
281     }
282     parms[offset] = 0;
283 }
284
285 // Add static values from session database settings if applicable
286 static void insert_settings_values(struct session_database *sdb, xmlDoc *doc,
287     struct conf_service *service)
288 {
289     int i;
290
291     for (i = 0; i < service->num_metadata; i++)
292     {
293         struct conf_metadata *md = &service->metadata[i];
294         int offset;
295
296         if (md->setting == Metadata_setting_postproc &&
297             (offset = settings_lookup_offset(service, md->name)) >= 0)
298         {
299             const char *val = session_setting_oneval(sdb, offset);
300             if (val)
301             {
302                 xmlNode *r = xmlDocGetRootElement(doc);
303                 xmlNode *n = xmlNewTextChild(r, 0, (xmlChar *) "metadata",
304                                              (xmlChar *) val);
305                 xmlSetProp(n, (xmlChar *) "type", (xmlChar *) md->name);
306             }
307         }
308     }
309 }
310
311 static xmlDoc *normalize_record(struct session *se,
312                                 struct session_database *sdb,
313                                 struct conf_service *service,
314                                 const char *rec, NMEM nmem)
315 {
316     xmlDoc *rdoc = record_to_xml(se, sdb, rec);
317
318     if (rdoc)
319     {
320         char *parms[MAX_XSLT_ARGS*2+1];
321         
322         insert_settings_parameters(sdb, service, parms, nmem);
323         
324         if (normalize_record_transform(sdb->map, &rdoc, (const char **)parms))
325         {
326             session_log(se, YLOG_WARN, "Normalize failed from %s",
327                         sdb->database->url);
328         }
329         else
330         {
331             insert_settings_values(sdb, rdoc, service);
332             
333             if (global_parameters.dump_records)
334             {
335                 session_log(se, YLOG_LOG, "Normalized record from %s", 
336                             sdb->database->url);
337                 log_xml_doc(rdoc);
338             }
339         }
340     }
341     return rdoc;
342 }
343
344 void session_settings_dump(struct session *se,
345                            struct session_database *db,
346                            WRBUF w)
347 {
348     if (db->settings)
349     {
350         int i, num = db->num_settings;
351         for (i = 0; i < num; i++)
352         {
353             struct setting *s = db->settings[i];
354             for (;s ; s = s->next)
355             {
356                 wrbuf_puts(w, "<set name=\"");
357                 wrbuf_xmlputs(w, s->name);
358                 wrbuf_puts(w, "\" value=\"");
359                 wrbuf_xmlputs(w, s->value);
360                 wrbuf_puts(w, "\"/>");
361             }
362             if (db->settings[i])
363                 wrbuf_puts(w, "\n");
364         }
365     }
366 }
367
368 // Retrieve first defined value for 'name' for given database.
369 // Will be extended to take into account user associated with session
370 const char *session_setting_oneval(struct session_database *db, int offset)
371 {
372     if (offset >= db->num_settings || !db->settings[offset])
373         return "";
374     return db->settings[offset]->value;
375 }
376
377 // Prepare XSLT stylesheets for record normalization
378 // Structures are allocated on the session_wide nmem to avoid having
379 // to recompute this for every search. This would lead
380 // to leaking if a single session was to repeatedly change the PZ_XSLT
381 // setting. However, this is not a realistic use scenario.
382 static int prepare_map(struct session *se, struct session_database *sdb)
383 {
384     const char *s;
385
386     if (!sdb->settings)
387     {
388         session_log(se, YLOG_WARN, "No settings on %s", sdb->database->url);
389         return -1;
390     }
391     if ((s = session_setting_oneval(sdb, PZ_XSLT)))
392     {
393         char auto_stylesheet[256];
394
395         if (!strcmp(s, "auto"))
396         {
397             const char *request_syntax = session_setting_oneval(
398                 sdb, PZ_REQUESTSYNTAX);
399             if (request_syntax)
400             {
401                 char *cp;
402                 yaz_snprintf(auto_stylesheet, sizeof(auto_stylesheet),
403                              "%s.xsl", request_syntax);
404                 for (cp = auto_stylesheet; *cp; cp++)
405                 {
406                     /* deliberately only consider ASCII */
407                     if (*cp > 32 && *cp < 127)
408                         *cp = tolower(*cp);
409                 }
410                 s = auto_stylesheet;
411             }
412             else
413             {
414                 session_log(se, YLOG_WARN,
415                             "No pz:requestsyntax for auto stylesheet");
416             }
417         }
418         sdb->map = normalize_cache_get(se->normalize_cache,
419                                        se->service->server->config, s);
420         if (!sdb->map)
421             return -1;
422     }
423     return 0;
424 }
425
426 // This analyzes settings and recomputes any supporting data structures
427 // if necessary.
428 static int prepare_session_database(struct session *se, 
429                                     struct session_database *sdb)
430 {
431     if (!sdb->settings)
432     {
433         session_log(se, YLOG_WARN, 
434                 "No settings associated with %s", sdb->database->url);
435         return -1;
436     }
437     if (sdb->settings[PZ_XSLT] && !sdb->map)
438     {
439         if (prepare_map(se, sdb) < 0)
440             return -1;
441     }
442     return 0;
443 }
444
445 // called if watch should be removed because http_channel is to be destroyed
446 static void session_watch_cancel(void *data, struct http_channel *c,
447                                  void *data2)
448 {
449     struct session_watchentry *ent = data;
450
451     ent->fun = 0;
452     ent->data = 0;
453     ent->obs = 0;
454 }
455
456 // set watch. Returns 0=OK, -1 if watch is already set
457 int session_set_watch(struct session *s, int what, 
458                       session_watchfun fun, void *data,
459                       struct http_channel *chan)
460 {
461     int ret;
462     session_enter(s);
463     if (s->watchlist[what].fun)
464         ret = -1;
465     else
466     {
467         
468         s->watchlist[what].fun = fun;
469         s->watchlist[what].data = data;
470         s->watchlist[what].obs = http_add_observer(chan, &s->watchlist[what],
471                                                    session_watch_cancel);
472         ret = 0;
473     }
474     session_leave(s);
475     return 0;
476 }
477
478 void session_alert_watch(struct session *s, int what)
479 {
480     assert(s);
481     session_enter(s);
482     if (s->watchlist[what].fun)
483     {
484         /* our watch is no longer associated with http_channel */
485         void *data;
486         session_watchfun fun;
487
488         http_remove_observer(s->watchlist[what].obs);
489         fun  = s->watchlist[what].fun;
490         data = s->watchlist[what].data;
491
492         /* reset watch before fun is invoked - in case fun wants to set
493            it again */
494         s->watchlist[what].fun = 0;
495         s->watchlist[what].data = 0;
496         s->watchlist[what].obs = 0;
497
498         session_leave(s);
499         session_log(s, YLOG_DEBUG,
500                     "Alert Watch: %d calling function: %p", what, fun);
501         fun(data);
502     }
503     else
504         session_leave(s);
505 }
506
507 //callback for grep_databases
508 static void select_targets_callback(void *context, struct session_database *db)
509 {
510     struct session *se = (struct session*) context;
511     struct client *cl = client_create();
512     struct client_list *l;
513     client_set_database(cl, db);
514
515     client_set_session(cl, se);
516
517     l = xmalloc(sizeof(*l));
518     l->client = cl;
519     l->next = se->clients;
520     se->clients = l;
521 }
522
523 static void session_remove_clients(struct session *se)
524 {
525     struct client_list *l;
526
527     session_enter(se);
528     l = se->clients;
529     se->clients = 0;
530     session_leave(se);
531
532     while (l)
533     {
534         struct client_list *l_next = l->next;
535         client_lock(l->client);
536         client_set_session(l->client, 0);
537         client_set_database(l->client, 0);
538         client_unlock(l->client);
539         client_destroy(l->client);
540         xfree(l);
541         l = l_next;
542     }
543 }
544
545 // Associates a set of clients with a session;
546 // Note: Session-databases represent databases with per-session 
547 // setting overrides
548 static int select_targets(struct session *se, const char *filter)
549 {
550     return session_grep_databases(se, filter, select_targets_callback);
551 }
552
553 int session_active_clients(struct session *s)
554 {
555     struct client_list *l;
556     int res = 0;
557
558     for (l = s->clients; l; l = l->next)
559         if (client_is_active(l->client))
560             res++;
561
562     return res;
563 }
564
565 int session_is_preferred_clients_ready(struct session *s)
566 {
567     struct client_list *l;
568     int res = 0;
569
570     for (l = s->clients; l; l = l->next)
571         if (client_is_active_preferred(l->client))
572             res++;
573     session_log(s, YLOG_DEBUG, "Has %d active preferred clients.", res);
574     return res == 0;
575 }
576
577
578
579 enum pazpar2_error_code search(struct session *se,
580                                const char *query,
581                                const char *startrecs, const char *maxrecs,
582                                const char *filter,
583                                const char **addinfo)
584 {
585     int live_channels = 0;
586     int no_working = 0;
587     int no_failed = 0;
588     struct client_list *l;
589     struct timeval tval;
590
591     session_log(se, YLOG_DEBUG, "Search");
592
593     *addinfo = 0;
594
595     session_remove_clients(se);
596     
597     session_enter(se);
598     reclist_destroy(se->reclist);
599     se->reclist = 0;
600     relevance_destroy(&se->relevance);
601     nmem_reset(se->nmem);
602     se->total_records = se->total_hits = se->total_merged = 0;
603     se->num_termlists = 0;
604     live_channels = select_targets(se, filter);
605     if (!live_channels)
606     {
607         session_leave(se);
608         return PAZPAR2_NO_TARGETS;
609     }
610     se->reclist = reclist_create(se->nmem);
611
612     yaz_gettimeofday(&tval);
613     
614     tval.tv_sec += 5;
615
616     for (l = se->clients; l; l = l->next)
617     {
618         struct client *cl = l->client;
619
620         if (maxrecs)
621             client_set_maxrecs(cl, atoi(maxrecs));
622         if (startrecs)
623             client_set_startrecs(cl, atoi(startrecs));
624         if (prepare_session_database(se, client_get_database(cl)) < 0)
625             ;
626         else if (client_parse_query(cl, query) < 0)
627             no_failed++;
628         else
629         {
630             no_working++;
631             if (client_prep_connection(cl, se->service->z3950_operation_timeout,
632                                        se->service->z3950_session_timeout,
633                                        se->service->server->iochan_man,
634                                        &tval))
635                 client_start_search(cl);
636         }
637     }
638     session_leave(se);
639     if (no_working == 0)
640     {
641         if (no_failed > 0)
642         {
643             *addinfo = "query";
644             return PAZPAR2_MALFORMED_PARAMETER_VALUE;
645         }
646         else
647             return PAZPAR2_NO_TARGETS;
648     }
649     return PAZPAR2_NO_ERROR;
650 }
651
652 // Creates a new session_database object for a database
653 static void session_init_databases_fun(void *context, struct database *db)
654 {
655     struct session *se = (struct session *) context;
656     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
657     int i;
658
659     new->database = db;
660     
661     new->map = 0;
662     assert(db->settings);
663     new->settings = nmem_malloc(se->session_nmem,
664                                 sizeof(struct settings *) * db->num_settings);
665     new->num_settings = db->num_settings;
666     for (i = 0; i < db->num_settings; i++)
667     {
668         struct setting *setting = db->settings[i];
669         new->settings[i] = setting;
670     }
671     new->next = se->databases;
672     se->databases = new;
673 }
674
675 // Doesn't free memory associated with sdb -- nmem takes care of that
676 static void session_database_destroy(struct session_database *sdb)
677 {
678     sdb->map = 0;
679 }
680
681 // Initialize session_database list -- this represents this session's view
682 // of the database list -- subject to modification by the settings ws command
683 void session_init_databases(struct session *se)
684 {
685     se->databases = 0;
686     predef_grep_databases(se, se->service, session_init_databases_fun);
687 }
688
689 // Probably session_init_databases_fun should be refactored instead of
690 // called here.
691 static struct session_database *load_session_database(struct session *se, 
692                                                       char *id)
693 {
694     struct database *db = new_database(id, se->session_nmem);
695
696     resolve_database(se->service, db);
697
698     session_init_databases_fun((void*) se, db);
699
700     // New sdb is head of se->databases list
701     return se->databases;
702 }
703
704 // Find an existing session database. If not found, load it
705 static struct session_database *find_session_database(struct session *se, 
706                                                       char *id)
707 {
708     struct session_database *sdb;
709
710     for (sdb = se->databases; sdb; sdb = sdb->next)
711         if (!strcmp(sdb->database->url, id))
712             return sdb;
713     return load_session_database(se, id);
714 }
715
716 // Apply a session override to a database
717 void session_apply_setting(struct session *se, char *dbname, char *setting,
718                            char *value)
719 {
720     struct session_database *sdb = find_session_database(se, dbname);
721     struct conf_service *service = se->service;
722     struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
723     int offset = settings_create_offset(service, setting);
724
725     expand_settings_array(&sdb->settings, &sdb->num_settings, offset,
726                           se->session_nmem);
727     new->precedence = 0;
728     new->target = dbname;
729     new->name = setting;
730     new->value = value;
731     new->next = sdb->settings[offset];
732     sdb->settings[offset] = new;
733
734     // Force later recompute of settings-driven data structures
735     // (happens when a search starts and client connections are prepared)
736     switch (offset)
737     {
738     case PZ_XSLT:
739         if (sdb->map)
740         {
741             sdb->map = 0;
742         }
743         break;
744     }
745 }
746
747 void destroy_session(struct session *se)
748 {
749     struct session_database *sdb;
750     session_log(se, YLOG_DEBUG, "Destroying");
751     session_use(-1);
752     session_remove_clients(se);
753
754     for (sdb = se->databases; sdb; sdb = sdb->next)
755         session_database_destroy(sdb);
756     normalize_cache_destroy(se->normalize_cache);
757     relevance_destroy(&se->relevance);
758     reclist_destroy(se->reclist);
759     nmem_destroy(se->nmem);
760     service_destroy(se->service);
761     yaz_mutex_destroy(&se->session_mutex);
762     wrbuf_destroy(se->wrbuf);
763 }
764
765 size_t session_get_memory_status(struct session *session) {
766     size_t session_nmem;
767     if (session == 0)
768         return 0;
769     session_enter(session);
770     session_nmem = nmem_total(session->nmem);
771     session_leave(session);
772     return session_nmem;
773 }
774
775
776 struct session *new_session(NMEM nmem, struct conf_service *service,
777                             unsigned session_id)
778 {
779     int i;
780     struct session *session = nmem_malloc(nmem, sizeof(*session));
781
782     char tmp_str[50];
783
784     sprintf(tmp_str, "session#%u", session_id);
785
786     session->session_id = session_id;
787     session_log(session, YLOG_DEBUG, "New");
788     session->service = service;
789     session->relevance = 0;
790     session->total_hits = 0;
791     session->total_records = 0;
792     session->number_of_warnings_unknown_elements = 0;
793     session->number_of_warnings_unknown_metadata = 0;
794     session->num_termlists = 0;
795     session->reclist = 0;
796     session->clients = 0;
797     session->session_nmem = nmem;
798     session->nmem = nmem_create();
799     session->wrbuf = wrbuf_alloc();
800     session->databases = 0;
801     for (i = 0; i <= SESSION_WATCH_MAX; i++)
802     {
803         session->watchlist[i].data = 0;
804         session->watchlist[i].fun = 0;
805     }
806     session->normalize_cache = normalize_cache_create();
807     session->session_mutex = 0;
808     pazpar2_mutex_create(&session->session_mutex, tmp_str);
809     session_use(1);
810     return session;
811 }
812
813 struct hitsbytarget *hitsbytarget(struct session *se, int *count, NMEM nmem)
814 {
815     struct hitsbytarget *res = 0;
816     struct client_list *l;
817     size_t sz = 0;
818
819     session_enter(se);
820     for (l = se->clients; l; l = l->next)
821         sz++;
822
823     res = nmem_malloc(nmem, sizeof(*res) * sz);
824     *count = 0;
825     for (l = se->clients; l; l = l->next)
826     {
827         struct client *cl = l->client;
828         WRBUF w = wrbuf_alloc();
829         const char *name = session_setting_oneval(client_get_database(cl),
830                                                   PZ_NAME);
831
832         res[*count].id = client_get_database(cl)->database->url;
833         res[*count].name = *name ? name : "Unknown";
834         res[*count].hits = client_get_hits(cl);
835         res[*count].records = client_get_num_records(cl);
836         res[*count].diagnostic = client_get_diagnostic(cl);
837         res[*count].state = client_get_state_str(cl);
838         res[*count].connected  = client_get_connection(cl) ? 1 : 0;
839         session_settings_dump(se, client_get_database(cl), w);
840         res[*count].settings_xml = w;
841         (*count)++;
842     }
843     session_leave(se);
844     return res;
845 }
846
847 struct termlist_score **termlist(struct session *se, const char *name, int *num)
848 {
849     int i;
850     struct termlist_score **tl = 0;
851
852     session_enter(se);
853     for (i = 0; i < se->num_termlists; i++)
854         if (!strcmp((const char *) se->termlists[i].name, name))
855         {
856             tl = termlist_highscore(se->termlists[i].termlist, num);
857             break;
858         }
859     session_leave(se);
860     return tl;
861 }
862
863 #ifdef MISSING_HEADERS
864 void report_nmem_stats(void)
865 {
866     size_t in_use, is_free;
867
868     nmem_get_memory_in_use(&in_use);
869     nmem_get_memory_free(&is_free);
870
871     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
872             (long) in_use, (long) is_free);
873 }
874 #endif
875
876 struct record_cluster *show_single_start(struct session *se, const char *id,
877                                          struct record_cluster **prev_r,
878                                          struct record_cluster **next_r)
879 {
880     struct record_cluster *r = 0;
881
882     session_enter(se);
883     *prev_r = 0;
884     *next_r = 0;
885     if (se->reclist)
886     {
887         reclist_enter(se->reclist);
888         while ((r = reclist_read_record(se->reclist)))
889         {
890             if (!strcmp(r->recid, id))
891             {
892                 *next_r = reclist_read_record(se->reclist);
893                 break;
894             }
895             *prev_r = r;
896         }
897         reclist_leave(se->reclist);
898     }
899     if (!r)
900         session_leave(se);
901     return r;
902 }
903
904 void show_single_stop(struct session *se, struct record_cluster *rec)
905 {
906     session_leave(se);
907 }
908
909 struct record_cluster **show_range_start(struct session *se,
910                                          struct reclist_sortparms *sp, 
911                                          int start, int *num, int *total, Odr_int *sumhits)
912 {
913     struct record_cluster **recs;
914     struct reclist_sortparms *spp;
915     int i;
916 #if USE_TIMING    
917     yaz_timing_t t = yaz_timing_create();
918 #endif
919     session_enter(se);
920     recs = nmem_malloc(se->nmem, *num * sizeof(struct record_cluster *));
921     if (!se->relevance)
922     {
923         *num = 0;
924         *total = 0;
925         *sumhits = 0;
926         recs = 0;
927     }
928     else
929     {
930         for (spp = sp; spp; spp = spp->next)
931             if (spp->type == Metadata_sortkey_relevance)
932             {
933                 relevance_prepare_read(se->relevance, se->reclist);
934                 break;
935             }
936         reclist_sort(se->reclist, sp);
937         
938         reclist_enter(se->reclist);
939         *total = reclist_get_num_records(se->reclist);
940         *sumhits = se->total_hits;
941         
942         for (i = 0; i < start; i++)
943             if (!reclist_read_record(se->reclist))
944             {
945                 *num = 0;
946                 recs = 0;
947                 break;
948             }
949         
950         for (i = 0; i < *num; i++)
951         {
952             struct record_cluster *r = reclist_read_record(se->reclist);
953             if (!r)
954             {
955                 *num = i;
956                 break;
957             }
958             recs[i] = r;
959         }
960         reclist_leave(se->reclist);
961     }
962 #if USE_TIMING
963     yaz_timing_stop(t);
964     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
965             yaz_timing_get_real(t), yaz_timing_get_user(t),
966             yaz_timing_get_sys(t));
967     yaz_timing_destroy(&t);
968 #endif
969     return recs;
970 }
971
972 void show_range_stop(struct session *se, struct record_cluster **recs)
973 {
974     session_leave(se);
975 }
976
977 void statistics(struct session *se, struct statistics *stat)
978 {
979     struct client_list *l;
980     int count = 0;
981
982     memset(stat, 0, sizeof(*stat));
983     for (l = se->clients; l; l = l->next)
984     {
985         struct client *cl = l->client;
986         if (!client_get_connection(cl))
987             stat->num_no_connection++;
988         switch (client_get_state(cl))
989         {
990         case Client_Connecting: stat->num_connecting++; break;
991         case Client_Working: stat->num_working++; break;
992         case Client_Idle: stat->num_idle++; break;
993         case Client_Failed: stat->num_failed++; break;
994         case Client_Error: stat->num_error++; break;
995         default: break;
996         }
997         count++;
998     }
999     stat->num_hits = se->total_hits;
1000     stat->num_records = se->total_records;
1001
1002     stat->num_clients = count;
1003 }
1004
1005 static struct record_metadata *record_metadata_init(
1006     NMEM nmem, const char *value, enum conf_metadata_type type,
1007     struct _xmlAttr *attr)
1008 {
1009     struct record_metadata *rec_md = record_metadata_create(nmem);
1010     struct record_metadata_attr **attrp = &rec_md->attributes;
1011     
1012     for (; attr; attr = attr->next)
1013     {
1014         if (attr->children && attr->children->content)
1015         {
1016             if (strcmp((const char *) attr->name, "type"))
1017             {  /* skip the "type" attribute.. Its value is already part of
1018                   the element in output (md-%s) and so repeating it here
1019                   is redundant */
1020                 *attrp = nmem_malloc(nmem, sizeof(**attrp));
1021                 (*attrp)->name =
1022                     nmem_strdup(nmem, (const char *) attr->name);
1023                 (*attrp)->value =
1024                     nmem_strdup(nmem, (const char *) attr->children->content);
1025                 attrp = &(*attrp)->next;
1026             }
1027         }
1028     }
1029     *attrp = 0;
1030
1031     if (type == Metadata_type_generic)
1032     {
1033         char *p = nmem_strdup(nmem, value);
1034
1035         p = normalize7bit_generic(p, " ,/.:([");
1036         
1037         rec_md->data.text.disp = p;
1038         rec_md->data.text.sort = 0;
1039     }
1040     else if (type == Metadata_type_year || type == Metadata_type_date)
1041     {
1042         int first, last;
1043         int longdate = 0;
1044
1045         if (type == Metadata_type_date)
1046             longdate = 1;
1047         if (extract7bit_dates((char *) value, &first, &last, longdate) < 0)
1048             return 0;
1049
1050         rec_md->data.number.min = first;
1051         rec_md->data.number.max = last;
1052     }
1053     else
1054         return 0;
1055     return rec_md;
1056 }
1057
1058 static int get_mergekey_from_doc(xmlDoc *doc, xmlNode *root, const char *name,
1059                                  struct conf_service *service, WRBUF norm_wr)
1060 {
1061     xmlNode *n;
1062     int no_found = 0;
1063     for (n = root->children; n; n = n->next)
1064     {
1065         if (n->type != XML_ELEMENT_NODE)
1066             continue;
1067         if (!strcmp((const char *) n->name, "metadata"))
1068         {
1069             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
1070             if (type == NULL) {
1071                 yaz_log(YLOG_FATAL, "Missing type attribute on metadata element. Skipping!");
1072             }
1073             else if (!strcmp(name, (const char *) type))
1074             {
1075                 xmlChar *value = xmlNodeListGetString(doc, n->children, 1);
1076                 if (value)
1077                 {
1078                     const char *norm_str;
1079                     pp2_relevance_token_t prt =
1080                         pp2_relevance_tokenize(service->mergekey_pct);
1081                     
1082                     pp2_relevance_first(prt, (const char *) value, 0);
1083                     if (wrbuf_len(norm_wr) > 0)
1084                         wrbuf_puts(norm_wr, " ");
1085                     wrbuf_puts(norm_wr, name);
1086                     while ((norm_str =
1087                             pp2_relevance_token_next(prt)))
1088                     {
1089                         if (*norm_str)
1090                         {
1091                             wrbuf_puts(norm_wr, " ");
1092                             wrbuf_puts(norm_wr, norm_str);
1093                         }
1094                     }
1095                     xmlFree(value);
1096                     pp2_relevance_token_destroy(prt);
1097                     no_found++;
1098                 }
1099             }
1100             xmlFree(type);
1101         }
1102     }
1103     return no_found;
1104 }
1105
1106 static const char *get_mergekey(xmlDoc *doc, struct client *cl, int record_no,
1107                                 struct conf_service *service, NMEM nmem)
1108 {
1109     char *mergekey_norm = 0;
1110     xmlNode *root = xmlDocGetRootElement(doc);
1111     WRBUF norm_wr = wrbuf_alloc();
1112
1113     /* consider mergekey from XSL first */
1114     xmlChar *mergekey = xmlGetProp(root, (xmlChar *) "mergekey");
1115     if (mergekey)
1116     {
1117         const char *norm_str;
1118         pp2_relevance_token_t prt =
1119             pp2_relevance_tokenize(service->mergekey_pct);
1120
1121         pp2_relevance_first(prt, (const char *) mergekey, 0);
1122         while ((norm_str = pp2_relevance_token_next(prt)))
1123         {
1124             if (*norm_str)
1125             {
1126                 if (wrbuf_len(norm_wr))
1127                     wrbuf_puts(norm_wr, " ");
1128                 wrbuf_puts(norm_wr, norm_str);
1129             }
1130         }
1131         pp2_relevance_token_destroy(prt);
1132         xmlFree(mergekey);
1133     }
1134     else
1135     {
1136         /* no mergekey defined in XSL. Look for mergekey metadata instead */
1137         int field_id;
1138         for (field_id = 0; field_id < service->num_metadata; field_id++)
1139         {
1140             struct conf_metadata *ser_md = &service->metadata[field_id];
1141             if (ser_md->mergekey != Metadata_mergekey_no)
1142             {
1143                 int r = get_mergekey_from_doc(doc, root, ser_md->name,
1144                                               service, norm_wr);
1145                 if (r == 0 && ser_md->mergekey == Metadata_mergekey_required)
1146                 {
1147                     /* no mergekey on this one and it is required.. 
1148                        Generate unique key instead */
1149                     wrbuf_rewind(norm_wr);
1150                     break;
1151                 }
1152             }
1153         }
1154     }
1155
1156     /* generate unique key if none is not generated already or is empty */
1157     if (wrbuf_len(norm_wr) == 0)
1158     {
1159         wrbuf_printf(norm_wr, "%s-%d",
1160                      client_get_database(cl)->database->url, record_no);
1161     }
1162     if (wrbuf_len(norm_wr) > 0)
1163         mergekey_norm = nmem_strdup(nmem, wrbuf_cstr(norm_wr));
1164     wrbuf_destroy(norm_wr);
1165     return mergekey_norm;
1166 }
1167
1168 /** \brief see if metadata for pz:recordfilter exists 
1169     \param root xml root element of normalized record
1170     \param sdb session database for client
1171     \retval 0 if there is no metadata for pz:recordfilter
1172     \retval 1 if there is metadata for pz:recordfilter
1173
1174     If there is no pz:recordfilter defined, this function returns 1
1175     as well.
1176 */
1177     
1178 static int check_record_filter(xmlNode *root, struct session_database *sdb)
1179 {
1180     int match = 0;
1181     xmlNode *n;
1182     const char *s;
1183     s = session_setting_oneval(sdb, PZ_RECORDFILTER);
1184
1185     if (!s || !*s)
1186         return 1;
1187
1188     for (n = root->children; n; n = n->next)
1189     {
1190         if (n->type != XML_ELEMENT_NODE)
1191             continue;
1192         if (!strcmp((const char *) n->name, "metadata"))
1193         {
1194             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
1195             if (type)
1196             {
1197                 size_t len;
1198                 int substring;
1199                 const char *eq;
1200
1201                 if ((eq = strchr(s, '=')))
1202                     substring = 0;
1203                 else if ((eq = strchr(s, '~')))
1204                     substring = 1;
1205                 if (eq)
1206                     len = eq - s;
1207                 else
1208                     len = strlen(s);
1209                 if (len == strlen((const char *)type) &&
1210                     !memcmp((const char *) type, s, len))
1211                 {
1212                     xmlChar *value = xmlNodeGetContent(n);
1213                     if (value && *value)
1214                     {
1215                         if (!eq ||
1216                             (substring && strstr((const char *) value, eq+1)) ||
1217                             (!substring && !strcmp((const char *) value, eq + 1)))
1218                             match = 1;
1219                     }
1220                     xmlFree(value);
1221                 }
1222                 xmlFree(type);
1223             }
1224         }
1225     }
1226     return match;
1227 }
1228
1229
1230 static int ingest_to_cluster(struct client *cl,
1231                              xmlDoc *xdoc,
1232                              xmlNode *root,
1233                              int record_no,
1234                              const char *mergekey_norm);
1235
1236 /** \brief ingest XML record
1237     \param cl client holds the result set for record
1238     \param rec record buffer (0 terminated)
1239     \param record_no record position (1, 2, ..)
1240     \param nmem working NMEM
1241     \retval 0 OK
1242     \retval -1 failure
1243 */
1244 int ingest_record(struct client *cl, const char *rec,
1245                   int record_no, NMEM nmem)
1246 {
1247     struct session *se = client_get_session(cl);
1248     int ret = 0;
1249     struct session_database *sdb = client_get_database(cl);
1250     struct conf_service *service = se->service;
1251     xmlDoc *xdoc = normalize_record(se, sdb, service, rec, nmem);
1252     xmlNode *root;
1253     const char *mergekey_norm;
1254     
1255     if (!xdoc)
1256         return -1;
1257     
1258     root = xmlDocGetRootElement(xdoc);
1259     
1260     if (!check_record_filter(root, sdb))
1261     {
1262         session_log(se, YLOG_WARN, "Filtered out record no %d from %s",
1263                     record_no, sdb->database->url);
1264         xmlFreeDoc(xdoc);
1265         return -1;
1266     }
1267     
1268     mergekey_norm = get_mergekey(xdoc, cl, record_no, service, nmem);
1269     if (!mergekey_norm)
1270     {
1271         session_log(se, YLOG_WARN, "Got no mergekey");
1272         xmlFreeDoc(xdoc);
1273         return -1;
1274     }
1275     session_enter(se);
1276     if (client_get_session(cl) == se)
1277         ret = ingest_to_cluster(cl, xdoc, root, record_no, mergekey_norm);
1278     session_leave(se);
1279     
1280     xmlFreeDoc(xdoc);
1281     return ret;
1282 }
1283
1284 static int ingest_to_cluster(struct client *cl,
1285                              xmlDoc *xdoc,
1286                              xmlNode *root,
1287                              int record_no,
1288                              const char *mergekey_norm)
1289 {
1290     xmlNode *n;
1291     xmlChar *type = 0;
1292     xmlChar *value = 0;
1293     struct session_database *sdb = client_get_database(cl);
1294     struct session *se = client_get_session(cl);
1295     struct conf_service *service = se->service;
1296     struct record *record = record_create(se->nmem, 
1297                                           service->num_metadata,
1298                                           service->num_sortkeys, cl,
1299                                           record_no);
1300     struct record_cluster *cluster = reclist_insert(se->reclist,
1301                                                     service, 
1302                                                     record,
1303                                                     mergekey_norm,
1304                                                     &se->total_merged);
1305
1306     const char *use_term_factor_str = session_setting_oneval(sdb, PZ_TERMLIST_TERM_FACTOR);
1307     // TODO: Work-around to default to use term factor, until other MK2 components supports it
1308     int use_term_factor = 1;
1309     int term_factor = 1; 
1310     if (use_term_factor_str && use_term_factor_str[0] != 0)
1311        use_term_factor =  atoi(use_term_factor_str);
1312     if (use_term_factor) {
1313         int maxrecs = client_get_maxrecs(cl);
1314         int hits = (int) client_get_hits(cl);
1315         term_factor = MAX(hits, maxrecs) /  MAX(1, maxrecs);
1316         assert(term_factor >= 1);
1317         yaz_log(YLOG_DEBUG, "Using term factor: %d (%d / %d)", term_factor, MAX(hits, maxrecs), MAX(1, maxrecs));
1318     }
1319
1320     if (!cluster)
1321         return -1;
1322     if (global_parameters.dump_records)
1323         session_log(se, YLOG_LOG, "Cluster id %s from %s (#%d)", cluster->recid,
1324                     sdb->database->url, record_no);
1325     relevance_newrec(se->relevance, cluster);
1326     
1327     // now parsing XML record and adding data to cluster or record metadata
1328     for (n = root->children; n; n = n->next)
1329     {
1330         pp2_relevance_token_t prt;
1331         if (type)
1332             xmlFree(type);
1333         if (value)
1334             xmlFree(value);
1335         type = value = 0;
1336         
1337         if (n->type != XML_ELEMENT_NODE)
1338             continue;
1339         if (!strcmp((const char *) n->name, "metadata"))
1340         {
1341             struct conf_metadata *ser_md = 0;
1342             struct conf_sortkey *ser_sk = 0;
1343             struct record_metadata **wheretoput = 0;
1344             struct record_metadata *rec_md = 0;
1345             int md_field_id = -1;
1346             int sk_field_id = -1;
1347             
1348             type = xmlGetProp(n, (xmlChar *) "type");
1349             value = xmlNodeListGetString(xdoc, n->children, 1);
1350             
1351             if (!type || !value || !*value)
1352                 continue;
1353             
1354             md_field_id 
1355                 = conf_service_metadata_field_id(service, (const char *) type);
1356             if (md_field_id < 0)
1357             {
1358                 if (se->number_of_warnings_unknown_metadata == 0)
1359                 {
1360                     session_log(se, YLOG_WARN, 
1361                             "Ignoring unknown metadata element: %s", type);
1362                 }
1363                 se->number_of_warnings_unknown_metadata++;
1364                 continue;
1365             }
1366             
1367             ser_md = &service->metadata[md_field_id];
1368             
1369             if (ser_md->sortkey_offset >= 0){
1370                 sk_field_id = ser_md->sortkey_offset;
1371                 ser_sk = &service->sortkeys[sk_field_id];
1372             }
1373
1374             // non-merged metadata
1375             rec_md = record_metadata_init(se->nmem, (const char *) value,
1376                                           ser_md->type, n->properties);
1377             if (!rec_md)
1378             {
1379                 session_log(se, YLOG_WARN, "bad metadata data '%s' "
1380                             "for element '%s'", value, type);
1381                 continue;
1382             }
1383             wheretoput = &record->metadata[md_field_id];
1384             while (*wheretoput)
1385                 wheretoput = &(*wheretoput)->next;
1386             *wheretoput = rec_md;
1387
1388             // merged metadata
1389             rec_md = record_metadata_init(se->nmem, (const char *) value,
1390                                           ser_md->type, 0);
1391             wheretoput = &cluster->metadata[md_field_id];
1392
1393             // and polulate with data:
1394             // assign cluster or record based on merge action
1395             if (ser_md->merge == Metadata_merge_unique)
1396             {
1397                 struct record_metadata *mnode;
1398                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
1399                     if (!strcmp((const char *) mnode->data.text.disp, 
1400                                 rec_md->data.text.disp))
1401                         break;
1402                 if (!mnode)
1403                 {
1404                     rec_md->next = *wheretoput;
1405                     *wheretoput = rec_md;
1406                 }
1407             }
1408             else if (ser_md->merge == Metadata_merge_longest)
1409             {
1410                 if (!*wheretoput 
1411                     || strlen(rec_md->data.text.disp) 
1412                     > strlen((*wheretoput)->data.text.disp))
1413                 {
1414                     *wheretoput = rec_md;
1415                     if (ser_sk)
1416                     {
1417                         const char *sort_str = 0;
1418                         int skip_article = 
1419                             ser_sk->type == Metadata_sortkey_skiparticle;
1420
1421                         if (!cluster->sortkeys[sk_field_id])
1422                             cluster->sortkeys[sk_field_id] = 
1423                                 nmem_malloc(se->nmem, 
1424                                             sizeof(union data_types));
1425                          
1426                         prt = pp2_relevance_tokenize(service->sort_pct);
1427
1428                         pp2_relevance_first(prt, rec_md->data.text.disp,
1429                                             skip_article);
1430
1431                         pp2_relevance_token_next(prt);
1432                          
1433                         sort_str = pp2_get_sort(prt);
1434                          
1435                         cluster->sortkeys[sk_field_id]->text.disp = 
1436                             rec_md->data.text.disp;
1437                         if (!sort_str)
1438                         {
1439                             sort_str = rec_md->data.text.disp;
1440                             session_log(se, YLOG_WARN, 
1441                                     "Could not make sortkey. Bug #1858");
1442                         }
1443                         cluster->sortkeys[sk_field_id]->text.sort = 
1444                             nmem_strdup(se->nmem, sort_str);
1445                         pp2_relevance_token_destroy(prt);
1446                     }
1447                 }
1448             }
1449             else if (ser_md->merge == Metadata_merge_all)
1450             {
1451                 rec_md->next = *wheretoput;
1452                 *wheretoput = rec_md;
1453             }
1454             else if (ser_md->merge == Metadata_merge_range)
1455             {
1456                 if (!*wheretoput)
1457                 {
1458                     *wheretoput = rec_md;
1459                     if (ser_sk)
1460                         cluster->sortkeys[sk_field_id] 
1461                             = &rec_md->data;
1462                 }
1463                 else
1464                 {
1465                     int this_min = rec_md->data.number.min;
1466                     int this_max = rec_md->data.number.max;
1467                     if (this_min < (*wheretoput)->data.number.min)
1468                         (*wheretoput)->data.number.min = this_min;
1469                     if (this_max > (*wheretoput)->data.number.max)
1470                         (*wheretoput)->data.number.max = this_max;
1471                 }
1472             }
1473
1474
1475             // ranking of _all_ fields enabled ... 
1476             if (ser_md->rank)
1477                 relevance_countwords(se->relevance, cluster, 
1478                                      (char *) value, ser_md->rank,
1479                                      ser_md->name);
1480
1481             // construct facets ... unless the client already has reported them
1482             if (ser_md->termlist && !client_has_facet(cl, (char *) type))
1483             {
1484
1485                 if (ser_md->type == Metadata_type_year)
1486                 {
1487                     char year[64];
1488                     sprintf(year, "%d", rec_md->data.number.max);
1489
1490                     add_facet(se, (char *) type, year, term_factor);
1491                     if (rec_md->data.number.max != rec_md->data.number.min)
1492                     {
1493                         sprintf(year, "%d", rec_md->data.number.min);
1494                         add_facet(se, (char *) type, year, term_factor);
1495                     }
1496                 }
1497                 else
1498                     add_facet(se, (char *) type, (char *) value, term_factor);
1499             }
1500
1501             // cleaning up
1502             xmlFree(type);
1503             xmlFree(value);
1504             type = value = 0;
1505         }
1506         else
1507         {
1508             if (se->number_of_warnings_unknown_elements == 0)
1509                 session_log(se, YLOG_WARN,
1510                         "Unexpected element in internal record: %s", n->name);
1511             se->number_of_warnings_unknown_elements++;
1512         }
1513     }
1514     if (type)
1515         xmlFree(type);
1516     if (value)
1517         xmlFree(value);
1518
1519     relevance_donerecord(se->relevance, cluster);
1520     se->total_records++;
1521
1522     return 0;
1523 }
1524
1525 void session_log(struct session *s, int level, const char *fmt, ...)
1526 {
1527     char buf[1024];
1528     va_list ap;
1529     va_start(ap, fmt);
1530
1531     yaz_vsnprintf(buf, sizeof(buf)-30, fmt, ap);
1532     yaz_log(level, "Session (%u): %s", s->session_id, buf);
1533
1534     va_end(ap);
1535 }
1536
1537 /*
1538  * Local variables:
1539  * c-basic-offset: 4
1540  * c-file-style: "Stroustrup"
1541  * indent-tabs-mode: nil
1542  * End:
1543  * vim: shiftwidth=4 tabstop=8 expandtab
1544  */
1545