client_destroy destroys ZOOM_resultset (bug #3489).
[pazpar2-moved-to-github.git] / src / session.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2010 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file session.c
21     \brief high-level logic; mostly user sessions and settings
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <time.h>
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <string.h>
32 #if HAVE_SYS_TIME_H
33 #include <sys/time.h>
34 #endif
35 #if HAVE_UNISTD_H
36 #include <unistd.h>
37 #endif
38 #ifdef WIN32
39 #include <windows.h>
40 #endif
41 #include <signal.h>
42 #include <ctype.h>
43 #include <assert.h>
44
45 #include <yaz/marcdisp.h>
46 #include <yaz/comstack.h>
47 #include <yaz/tcpip.h>
48 #include <yaz/proto.h>
49 #include <yaz/readconf.h>
50 #include <yaz/pquery.h>
51 #include <yaz/otherinfo.h>
52 #include <yaz/yaz-util.h>
53 #include <yaz/nmem.h>
54 #include <yaz/query-charset.h>
55 #include <yaz/querytowrbuf.h>
56 #include <yaz/oid_db.h>
57 #include <yaz/snprintf.h>
58 #include <yaz/gettimeofday.h>
59
60 #define USE_TIMING 0
61 #if USE_TIMING
62 #include <yaz/timing.h>
63 #endif
64
65 #include "ppmutex.h"
66 #include "parameters.h"
67 #include "session.h"
68 #include "eventl.h"
69 #include "http.h"
70 #include "termlists.h"
71 #include "reclists.h"
72 #include "relevance.h"
73 #include "database.h"
74 #include "client.h"
75 #include "settings.h"
76 #include "normalize7bit.h"
77
78 #define TERMLIST_HIGH_SCORE 25
79
80 #define MAX_CHUNK 15
81
82 // Note: Some things in this structure will eventually move to configuration
83 struct parameters global_parameters = 
84 {
85     0,   // dump_records
86     0    // debug_mode
87 };
88
89 struct client_list {
90     struct client *client;
91     struct client_list *next;
92 };
93
94 static void log_xml_doc(xmlDoc *doc)
95 {
96     FILE *lf = yaz_log_file();
97     xmlChar *result = 0;
98     int len = 0;
99 #if LIBXML_VERSION >= 20600
100     xmlDocDumpFormatMemory(doc, &result, &len, 1);
101 #else
102     xmlDocDumpMemory(doc, &result, &len);
103 #endif
104     if (lf && len)
105     {
106         (void) fwrite(result, 1, len, lf);
107         fprintf(lf, "\n");
108     }
109     xmlFree(result);
110 }
111
112 static void session_enter(struct session *s)
113 {
114     yaz_mutex_enter(s->session_mutex);
115 }
116
117 static void session_leave(struct session *s)
118 {
119     yaz_mutex_leave(s->session_mutex);
120 }
121
122 // Recursively traverse query structure to extract terms.
123 void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
124 {
125     char **words;
126     int numwords;
127     int i;
128
129     switch (n->kind)
130     {
131     case CCL_RPN_AND:
132     case CCL_RPN_OR:
133     case CCL_RPN_NOT:
134     case CCL_RPN_PROX:
135         pull_terms(nmem, n->u.p[0], termlist, num);
136         pull_terms(nmem, n->u.p[1], termlist, num);
137         break;
138     case CCL_RPN_TERM:
139         nmem_strsplit(nmem, " ", n->u.t.term, &words, &numwords);
140         for (i = 0; i < numwords; i++)
141             termlist[(*num)++] = words[i];
142         break;
143     default: // NOOP
144         break;
145     }
146 }
147
148
149 static void add_facet(struct session *s, const char *type, const char *value)
150 {
151     int i;
152
153     if (!*value)
154         return;
155     for (i = 0; i < s->num_termlists; i++)
156         if (!strcmp(s->termlists[i].name, type))
157             break;
158     if (i == s->num_termlists)
159     {
160         if (i == SESSION_MAX_TERMLISTS)
161         {
162             yaz_log(YLOG_FATAL, "Too many termlists");
163             return;
164         }
165
166         s->termlists[i].name = nmem_strdup(s->nmem, type);
167         s->termlists[i].termlist 
168             = termlist_create(s->nmem, TERMLIST_HIGH_SCORE);
169         s->num_termlists = i + 1;
170     }
171     termlist_insert(s->termlists[i].termlist, value);
172 }
173
174 static xmlDoc *record_to_xml(struct session_database *sdb, const char *rec)
175 {
176     struct database *db = sdb->database;
177     xmlDoc *rdoc = 0;
178
179     rdoc = xmlParseMemory(rec, strlen(rec));
180
181     if (!rdoc)
182     {
183         yaz_log(YLOG_FATAL, "Non-wellformed XML received from %s",
184                 db->url);
185         return 0;
186     }
187
188     if (global_parameters.dump_records)
189     {
190         yaz_log(YLOG_LOG, "Un-normalized record from %s", db->url);
191         log_xml_doc(rdoc);
192     }
193
194     return rdoc;
195 }
196
197 #define MAX_XSLT_ARGS 16
198
199 // Add static values from session database settings if applicable
200 static void insert_settings_parameters(struct session_database *sdb,
201                                        struct conf_service *service,
202                                        char **parms,
203                                        NMEM nmem)
204 {
205     int i;
206     int nparms = 0;
207     int offset = 0;
208
209     for (i = 0; i < service->num_metadata; i++)
210     {
211         struct conf_metadata *md = &service->metadata[i];
212         int setting;
213
214         if (md->setting == Metadata_setting_parameter &&
215             (setting = settings_lookup_offset(service, md->name)) >= 0)
216         {
217             const char *val = session_setting_oneval(sdb, setting);
218             if (val && nparms < MAX_XSLT_ARGS)
219             {
220                 char *buf;
221                 int len = strlen(val);
222                 buf = nmem_malloc(nmem, len + 3);
223                 buf[0] = '\'';
224                 strcpy(buf + 1, val);
225                 buf[len+1] = '\'';
226                 buf[len+2] = '\0';
227                 parms[offset++] = md->name;
228                 parms[offset++] = buf;
229                 nparms++;
230             }
231         }
232     }
233     parms[offset] = 0;
234 }
235
236 // Add static values from session database settings if applicable
237 static void insert_settings_values(struct session_database *sdb, xmlDoc *doc,
238     struct conf_service *service)
239 {
240     int i;
241
242     for (i = 0; i < service->num_metadata; i++)
243     {
244         struct conf_metadata *md = &service->metadata[i];
245         int offset;
246
247         if (md->setting == Metadata_setting_postproc &&
248             (offset = settings_lookup_offset(service, md->name)) >= 0)
249         {
250             const char *val = session_setting_oneval(sdb, offset);
251             if (val)
252             {
253                 xmlNode *r = xmlDocGetRootElement(doc);
254                 xmlNode *n = xmlNewTextChild(r, 0, (xmlChar *) "metadata",
255                                              (xmlChar *) val);
256                 xmlSetProp(n, (xmlChar *) "type", (xmlChar *) md->name);
257             }
258         }
259     }
260 }
261
262 static xmlDoc *normalize_record(struct session_database *sdb,
263                                 struct conf_service *service,
264                                 const char *rec, NMEM nmem)
265 {
266     xmlDoc *rdoc = record_to_xml(sdb, rec);
267
268     if (rdoc)
269     {
270         char *parms[MAX_XSLT_ARGS*2+1];
271         
272         insert_settings_parameters(sdb, service, parms, nmem);
273         
274         if (normalize_record_transform(sdb->map, &rdoc, (const char **)parms))
275         {
276             yaz_log(YLOG_WARN, "Normalize failed from %s", sdb->database->url);
277         }
278         else
279         {
280             insert_settings_values(sdb, rdoc, service);
281             
282             if (global_parameters.dump_records)
283             {
284                 yaz_log(YLOG_LOG, "Normalized record from %s", 
285                         sdb->database->url);
286                 log_xml_doc(rdoc);
287             }
288         }
289     }
290     return rdoc;
291 }
292
293 void session_settings_dump(struct session *se,
294                            struct session_database *db,
295                            WRBUF w)
296 {
297     if (db->settings)
298     {
299         int i, num = db->num_settings;
300         for (i = 0; i < num; i++)
301         {
302             struct setting *s = db->settings[i];
303             for (;s ; s = s->next)
304             {
305                 wrbuf_puts(w, "<set name=\"");
306                 wrbuf_xmlputs(w, s->name);
307                 wrbuf_puts(w, "\" value=\"");
308                 wrbuf_xmlputs(w, s->value);
309                 wrbuf_puts(w, "\"/>");
310             }
311             if (db->settings[i])
312                 wrbuf_puts(w, "\n");
313         }
314     }
315 }
316
317 // Retrieve first defined value for 'name' for given database.
318 // Will be extended to take into account user associated with session
319 const char *session_setting_oneval(struct session_database *db, int offset)
320 {
321     if (offset >= db->num_settings || !db->settings[offset])
322         return "";
323     return db->settings[offset]->value;
324 }
325
326 // Prepare XSLT stylesheets for record normalization
327 // Structures are allocated on the session_wide nmem to avoid having
328 // to recompute this for every search. This would lead
329 // to leaking if a single session was to repeatedly change the PZ_XSLT
330 // setting. However, this is not a realistic use scenario.
331 static int prepare_map(struct session *se, struct session_database *sdb)
332 {
333     const char *s;
334
335     if (!sdb->settings)
336     {
337         yaz_log(YLOG_WARN, "No settings on %s", sdb->database->url);
338         return -1;
339     }
340     if ((s = session_setting_oneval(sdb, PZ_XSLT)))
341     {
342         char auto_stylesheet[256];
343
344         if (!strcmp(s, "auto"))
345         {
346             const char *request_syntax = session_setting_oneval(
347                 sdb, PZ_REQUESTSYNTAX);
348             if (request_syntax)
349             {
350                 char *cp;
351                 yaz_snprintf(auto_stylesheet, sizeof(auto_stylesheet),
352                              "%s.xsl", request_syntax);
353                 for (cp = auto_stylesheet; *cp; cp++)
354                 {
355                     /* deliberately only consider ASCII */
356                     if (*cp > 32 && *cp < 127)
357                         *cp = tolower(*cp);
358                 }
359                 s = auto_stylesheet;
360             }
361             else
362             {
363                 yaz_log(YLOG_WARN, "No pz:requestsyntax for auto stylesheet");
364             }
365         }
366         sdb->map = normalize_cache_get(se->normalize_cache,
367                                        se->service->server->config, s);
368         if (!sdb->map)
369             return -1;
370     }
371     return 0;
372 }
373
374 // This analyzes settings and recomputes any supporting data structures
375 // if necessary.
376 static int prepare_session_database(struct session *se, 
377                                     struct session_database *sdb)
378 {
379     if (!sdb->settings)
380     {
381         yaz_log(YLOG_WARN, 
382                 "No settings associated with %s", sdb->database->url);
383         return -1;
384     }
385     if (sdb->settings[PZ_XSLT] && !sdb->map)
386     {
387         if (prepare_map(se, sdb) < 0)
388             return -1;
389     }
390     return 0;
391 }
392
393 // called if watch should be removed because http_channel is to be destroyed
394 static void session_watch_cancel(void *data, struct http_channel *c,
395                                  void *data2)
396 {
397     struct session_watchentry *ent = data;
398
399     ent->fun = 0;
400     ent->data = 0;
401     ent->obs = 0;
402 }
403
404 // set watch. Returns 0=OK, -1 if watch is already set
405 int session_set_watch(struct session *s, int what, 
406                       session_watchfun fun, void *data,
407                       struct http_channel *chan)
408 {
409     int ret;
410     session_enter(s);
411     if (s->watchlist[what].fun)
412         ret = -1;
413     else
414     {
415         
416         s->watchlist[what].fun = fun;
417         s->watchlist[what].data = data;
418         s->watchlist[what].obs = http_add_observer(chan, &s->watchlist[what],
419                                                    session_watch_cancel);
420         ret = 0;
421     }
422     session_leave(s);
423     return 0;
424 }
425
426 void session_alert_watch(struct session *s, int what)
427 {
428     assert(s);
429     session_enter(s);
430     if (s->watchlist[what].fun)
431     {
432         /* our watch is no longer associated with http_channel */
433         void *data;
434         session_watchfun fun;
435
436         http_remove_observer(s->watchlist[what].obs);
437         fun = s->watchlist[what].fun;
438         data = s->watchlist[what].data;
439
440         /* reset watch before fun is invoked - in case fun wants to set
441            it again */
442         s->watchlist[what].fun = 0;
443         s->watchlist[what].data = 0;
444         s->watchlist[what].obs = 0;
445
446         session_leave(s);
447         fun(data);
448     }
449     else
450         session_leave(s);
451 }
452
453 //callback for grep_databases
454 static void select_targets_callback(void *context, struct session_database *db)
455 {
456     struct session *se = (struct session*) context;
457     struct client *cl = client_create();
458     struct client_list *l;
459     client_set_database(cl, db);
460
461     client_set_session(cl, se);
462     l = xmalloc(sizeof(*l));
463     l->client = cl;
464     l->next = se->clients;
465     se->clients = l;
466 }
467
468 static void session_remove_clients(struct session *se)
469 {
470     struct client_list *l;
471
472     session_enter(se);
473     l = se->clients;
474     se->clients = 0;
475     session_leave(se);
476
477     while (l)
478     {
479         struct client_list *l_next = l->next;
480         client_lock(l->client);
481         client_set_session(l->client, 0);
482         client_set_database(l->client, 0);
483         client_unlock(l->client);
484         client_destroy(l->client);
485         xfree(l);
486         l = l_next;
487     }
488 }
489
490 // Associates a set of clients with a session;
491 // Note: Session-databases represent databases with per-session 
492 // setting overrides
493 static int select_targets(struct session *se, const char *filter)
494 {
495     return session_grep_databases(se, filter, select_targets_callback);
496 }
497
498 int session_active_clients(struct session *s)
499 {
500     struct client_list *l;
501     int res = 0;
502
503     for (l = s->clients; l; l = l->next)
504         if (client_is_active(l->client))
505             res++;
506
507     return res;
508 }
509
510
511 enum pazpar2_error_code search(struct session *se,
512                                const char *query,
513                                const char *startrecs, const char *maxrecs,
514                                const char *filter,
515                                const char **addinfo)
516 {
517     int live_channels = 0;
518     int no_working = 0;
519     int no_failed = 0;
520     struct client_list *l;
521     struct timeval tval;
522
523     yaz_log(YLOG_DEBUG, "Search");
524
525     *addinfo = 0;
526
527     session_remove_clients(se);
528     
529     session_enter(se);
530     reclist_destroy(se->reclist);
531     se->reclist = 0;
532     relevance_destroy(&se->relevance);
533     nmem_reset(se->nmem);
534     se->total_records = se->total_hits = se->total_merged = 0;
535     se->num_termlists = 0;
536     live_channels = select_targets(se, filter);
537     if (!live_channels)
538     {
539         session_leave(se);
540         return PAZPAR2_NO_TARGETS;
541     }
542     se->reclist = reclist_create(se->nmem);
543
544     yaz_gettimeofday(&tval);
545     
546     tval.tv_sec += 5;
547
548     for (l = se->clients; l; l = l->next)
549     {
550         struct client *cl = l->client;
551
552         if (maxrecs)
553             client_set_maxrecs(cl, atoi(maxrecs));
554         if (startrecs)
555             client_set_startrecs(cl, atoi(startrecs));
556         if (prepare_session_database(se, client_get_database(cl)) < 0)
557             ;
558         else if (client_parse_query(cl, query) < 0)
559             no_failed++;
560         else
561         {
562             no_working++;
563             if (client_prep_connection(cl, se->service->z3950_operation_timeout,
564                                        se->service->z3950_session_timeout,
565                                        se->service->server->iochan_man,
566                                        &tval))
567                 client_start_search(cl);
568         }
569     }
570     session_leave(se);
571     if (no_working == 0)
572     {
573         if (no_failed > 0)
574         {
575             *addinfo = "query";
576             return PAZPAR2_MALFORMED_PARAMETER_VALUE;
577         }
578         else
579             return PAZPAR2_NO_TARGETS;
580     }
581     return PAZPAR2_NO_ERROR;
582 }
583
584 // Creates a new session_database object for a database
585 static void session_init_databases_fun(void *context, struct database *db)
586 {
587     struct session *se = (struct session *) context;
588     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
589     int i;
590
591     new->database = db;
592     
593     new->map = 0;
594     assert(db->settings);
595     new->settings = nmem_malloc(se->session_nmem,
596                                 sizeof(struct settings *) * db->num_settings);
597     new->num_settings = db->num_settings;
598     for (i = 0; i < db->num_settings; i++)
599     {
600         struct setting *setting = db->settings[i];
601         new->settings[i] = setting;
602     }
603     new->next = se->databases;
604     se->databases = new;
605 }
606
607 // Doesn't free memory associated with sdb -- nmem takes care of that
608 static void session_database_destroy(struct session_database *sdb)
609 {
610     sdb->map = 0;
611 }
612
613 // Initialize session_database list -- this represents this session's view
614 // of the database list -- subject to modification by the settings ws command
615 void session_init_databases(struct session *se)
616 {
617     se->databases = 0;
618     predef_grep_databases(se, se->service, session_init_databases_fun);
619 }
620
621 // Probably session_init_databases_fun should be refactored instead of
622 // called here.
623 static struct session_database *load_session_database(struct session *se, 
624                                                       char *id)
625 {
626     struct database *db = new_database(id, se->session_nmem);
627
628     resolve_database(se->service, db);
629
630     session_init_databases_fun((void*) se, db);
631
632     // New sdb is head of se->databases list
633     return se->databases;
634 }
635
636 // Find an existing session database. If not found, load it
637 static struct session_database *find_session_database(struct session *se, 
638                                                       char *id)
639 {
640     struct session_database *sdb;
641
642     for (sdb = se->databases; sdb; sdb = sdb->next)
643         if (!strcmp(sdb->database->url, id))
644             return sdb;
645     return load_session_database(se, id);
646 }
647
648 // Apply a session override to a database
649 void session_apply_setting(struct session *se, char *dbname, char *setting,
650                            char *value)
651 {
652     struct session_database *sdb = find_session_database(se, dbname);
653     struct conf_service *service = se->service;
654     struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
655     int offset = settings_create_offset(service, setting);
656
657     expand_settings_array(&sdb->settings, &sdb->num_settings, offset,
658                           se->session_nmem);
659     new->precedence = 0;
660     new->target = dbname;
661     new->name = setting;
662     new->value = value;
663     new->next = sdb->settings[offset];
664     sdb->settings[offset] = new;
665
666     // Force later recompute of settings-driven data structures
667     // (happens when a search starts and client connections are prepared)
668     switch (offset)
669     {
670     case PZ_XSLT:
671         if (sdb->map)
672         {
673             sdb->map = 0;
674         }
675         break;
676     }
677 }
678
679 void destroy_session(struct session *se)
680 {
681     struct session_database *sdb;
682
683     yaz_log(YLOG_DEBUG, "%p Pazpar2 session destroy", se);
684     session_remove_clients(se);
685
686     for (sdb = se->databases; sdb; sdb = sdb->next)
687         session_database_destroy(sdb);
688     normalize_cache_destroy(se->normalize_cache);
689     relevance_destroy(&se->relevance);
690     reclist_destroy(se->reclist);
691     nmem_destroy(se->nmem);
692     service_destroy(se->service);
693     yaz_mutex_destroy(&se->session_mutex);
694     wrbuf_destroy(se->wrbuf);
695 }
696
697 struct session *new_session(NMEM nmem, struct conf_service *service,
698                             const char *name)
699 {
700     int i;
701     struct session *session = nmem_malloc(nmem, sizeof(*session));
702
703     yaz_log(YLOG_DEBUG, "%p New Pazpar2 session", session);
704
705     session->service = service;
706     session->relevance = 0;
707     session->total_hits = 0;
708     session->total_records = 0;
709     session->number_of_warnings_unknown_elements = 0;
710     session->number_of_warnings_unknown_metadata = 0;
711     session->num_termlists = 0;
712     session->reclist = 0;
713     session->clients = 0;
714     session->session_nmem = nmem;
715     session->nmem = nmem_create();
716     session->wrbuf = wrbuf_alloc();
717     session->databases = 0;
718     for (i = 0; i <= SESSION_WATCH_MAX; i++)
719     {
720         session->watchlist[i].data = 0;
721         session->watchlist[i].fun = 0;
722     }
723     session->normalize_cache = normalize_cache_create();
724     session->session_mutex = 0;
725     pazpar2_mutex_create(&session->session_mutex, name);
726
727     return session;
728 }
729
730 struct hitsbytarget *hitsbytarget(struct session *se, int *count, NMEM nmem)
731 {
732     struct hitsbytarget *res = 0;
733     struct client_list *l;
734     size_t sz = 0;
735
736     session_enter(se);
737     for (l = se->clients; l; l = l->next)
738         sz++;
739
740     res = nmem_malloc(nmem, sizeof(*res) * sz);
741     *count = 0;
742     for (l = se->clients; l; l = l->next)
743     {
744         struct client *cl = l->client;
745         WRBUF w = wrbuf_alloc();
746         const char *name = session_setting_oneval(client_get_database(cl),
747                                                   PZ_NAME);
748
749         res[*count].id = client_get_database(cl)->database->url;
750         res[*count].name = *name ? name : "Unknown";
751         res[*count].hits = client_get_hits(cl);
752         res[*count].records = client_get_num_records(cl);
753         res[*count].diagnostic = client_get_diagnostic(cl);
754         res[*count].state = client_get_state_str(cl);
755         res[*count].connected  = client_get_connection(cl) ? 1 : 0;
756         session_settings_dump(se, client_get_database(cl), w);
757         res[*count].settings_xml = w;
758         (*count)++;
759     }
760     session_leave(se);
761     return res;
762 }
763
764 struct termlist_score **termlist(struct session *se, const char *name, int *num)
765 {
766     int i;
767     struct termlist_score **tl = 0;
768
769     session_enter(se);
770     for (i = 0; i < se->num_termlists; i++)
771         if (!strcmp((const char *) se->termlists[i].name, name))
772         {
773             tl = termlist_highscore(se->termlists[i].termlist, num);
774             break;
775         }
776     session_leave(se);
777     return tl;
778 }
779
780 #ifdef MISSING_HEADERS
781 void report_nmem_stats(void)
782 {
783     size_t in_use, is_free;
784
785     nmem_get_memory_in_use(&in_use);
786     nmem_get_memory_free(&is_free);
787
788     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
789             (long) in_use, (long) is_free);
790 }
791 #endif
792
793 struct record_cluster *show_single_start(struct session *se, const char *id,
794                                          struct record_cluster **prev_r,
795                                          struct record_cluster **next_r)
796 {
797     struct record_cluster *r;
798
799     session_enter(se);
800     reclist_enter(se->reclist);
801     *prev_r = 0;
802     *next_r = 0;
803     while ((r = reclist_read_record(se->reclist)))
804     {
805         if (!strcmp(r->recid, id))
806         {
807             *next_r = reclist_read_record(se->reclist);
808             break;
809         }
810         *prev_r = r;
811     }
812     reclist_leave(se->reclist);
813     if (!r)
814         session_leave(se);
815     return r;
816 }
817
818 void show_single_stop(struct session *se, struct record_cluster *rec)
819 {
820     session_leave(se);
821 }
822
823 struct record_cluster **show_range_start(struct session *se,
824                                          struct reclist_sortparms *sp, 
825                                          int start, int *num, int *total, Odr_int *sumhits)
826 {
827     struct record_cluster **recs;
828     struct reclist_sortparms *spp;
829     int i;
830 #if USE_TIMING    
831     yaz_timing_t t = yaz_timing_create();
832 #endif
833     session_enter(se);
834     recs = nmem_malloc(se->nmem, *num * sizeof(struct record_cluster *));
835     if (!se->relevance)
836     {
837         *num = 0;
838         *total = 0;
839         *sumhits = 0;
840         recs = 0;
841     }
842     else
843     {
844         for (spp = sp; spp; spp = spp->next)
845             if (spp->type == Metadata_sortkey_relevance)
846             {
847                 relevance_prepare_read(se->relevance, se->reclist);
848                 break;
849             }
850         reclist_sort(se->reclist, sp);
851         
852         reclist_enter(se->reclist);
853         *total = reclist_get_num_records(se->reclist);
854         *sumhits = se->total_hits;
855         
856         for (i = 0; i < start; i++)
857             if (!reclist_read_record(se->reclist))
858             {
859                 *num = 0;
860                 recs = 0;
861                 break;
862             }
863         
864         for (i = 0; i < *num; i++)
865         {
866             struct record_cluster *r = reclist_read_record(se->reclist);
867             if (!r)
868             {
869                 *num = i;
870                 break;
871             }
872             recs[i] = r;
873         }
874         reclist_leave(se->reclist);
875     }
876 #if USE_TIMING
877     yaz_timing_stop(t);
878     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
879             yaz_timing_get_real(t), yaz_timing_get_user(t),
880             yaz_timing_get_sys(t));
881     yaz_timing_destroy(&t);
882 #endif
883     return recs;
884 }
885
886 void show_range_stop(struct session *se, struct record_cluster **recs)
887 {
888     session_leave(se);
889 }
890
891 void statistics(struct session *se, struct statistics *stat)
892 {
893     struct client_list *l;
894     int count = 0;
895
896     memset(stat, 0, sizeof(*stat));
897     for (l = se->clients; l; l = l->next)
898     {
899         struct client *cl = l->client;
900         if (!client_get_connection(cl))
901             stat->num_no_connection++;
902         switch (client_get_state(cl))
903         {
904         case Client_Connecting: stat->num_connecting++; break;
905         case Client_Working: stat->num_working++; break;
906         case Client_Idle: stat->num_idle++; break;
907         case Client_Failed: stat->num_failed++; break;
908         case Client_Error: stat->num_error++; break;
909         default: break;
910         }
911         count++;
912     }
913     stat->num_hits = se->total_hits;
914     stat->num_records = se->total_records;
915
916     stat->num_clients = count;
917 }
918
919 static struct record_metadata *record_metadata_init(
920     NMEM nmem, const char *value, enum conf_metadata_type type,
921     struct _xmlAttr *attr)
922 {
923     struct record_metadata *rec_md = record_metadata_create(nmem);
924     struct record_metadata_attr **attrp = &rec_md->attributes;
925     
926     for (; attr; attr = attr->next)
927     {
928         if (attr->children && attr->children->content)
929         {
930             if (strcmp((const char *) attr->name, "type"))
931             {  /* skip the "type" attribute.. Its value is already part of
932                   the element in output (md-%s) and so repeating it here
933                   is redundant */
934                 *attrp = nmem_malloc(nmem, sizeof(**attrp));
935                 (*attrp)->name =
936                     nmem_strdup(nmem, (const char *) attr->name);
937                 (*attrp)->value =
938                     nmem_strdup(nmem, (const char *) attr->children->content);
939                 attrp = &(*attrp)->next;
940             }
941         }
942     }
943     *attrp = 0;
944
945     if (type == Metadata_type_generic)
946     {
947         char *p = nmem_strdup(nmem, value);
948
949         p = normalize7bit_generic(p, " ,/.:([");
950         
951         rec_md->data.text.disp = p;
952         rec_md->data.text.sort = 0;
953     }
954     else if (type == Metadata_type_year || type == Metadata_type_date)
955     {
956         int first, last;
957         int longdate = 0;
958
959         if (type == Metadata_type_date)
960             longdate = 1;
961         if (extract7bit_dates((char *) value, &first, &last, longdate) < 0)
962             return 0;
963
964         rec_md->data.number.min = first;
965         rec_md->data.number.max = last;
966     }
967     else
968         return 0;
969     return rec_md;
970 }
971
972 static int get_mergekey_from_doc(xmlDoc *doc, xmlNode *root, const char *name,
973                                  struct conf_service *service, WRBUF norm_wr)
974 {
975     xmlNode *n;
976     int no_found = 0;
977     for (n = root->children; n; n = n->next)
978     {
979         if (n->type != XML_ELEMENT_NODE)
980             continue;
981         if (!strcmp((const char *) n->name, "metadata"))
982         {
983             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
984             if (type == NULL) {
985                 yaz_log(YLOG_FATAL, "Missing type attribute on metadata element. Skipping!");
986             }
987             else if (!strcmp(name, (const char *) type))
988             {
989                 xmlChar *value = xmlNodeListGetString(doc, n->children, 1);
990                 if (value)
991                 {
992                     const char *norm_str;
993                     pp2_relevance_token_t prt =
994                         pp2_relevance_tokenize(service->mergekey_pct);
995                     
996                     pp2_relevance_first(prt, (const char *) value, 0);
997                     if (wrbuf_len(norm_wr) > 0)
998                         wrbuf_puts(norm_wr, " ");
999                     wrbuf_puts(norm_wr, name);
1000                     while ((norm_str =
1001                             pp2_relevance_token_next(prt)))
1002                     {
1003                         if (*norm_str)
1004                         {
1005                             wrbuf_puts(norm_wr, " ");
1006                             wrbuf_puts(norm_wr, norm_str);
1007                         }
1008                     }
1009                     xmlFree(value);
1010                     pp2_relevance_token_destroy(prt);
1011                     no_found++;
1012                 }
1013             }
1014             xmlFree(type);
1015         }
1016     }
1017     return no_found;
1018 }
1019
1020 static const char *get_mergekey(xmlDoc *doc, struct client *cl, int record_no,
1021                                 struct conf_service *service, NMEM nmem)
1022 {
1023     char *mergekey_norm = 0;
1024     xmlNode *root = xmlDocGetRootElement(doc);
1025     WRBUF norm_wr = wrbuf_alloc();
1026
1027     /* consider mergekey from XSL first */
1028     xmlChar *mergekey = xmlGetProp(root, (xmlChar *) "mergekey");
1029     if (mergekey)
1030     {
1031         const char *norm_str;
1032         pp2_relevance_token_t prt =
1033             pp2_relevance_tokenize(service->mergekey_pct);
1034
1035         pp2_relevance_first(prt, (const char *) mergekey, 0);
1036         while ((norm_str = pp2_relevance_token_next(prt)))
1037         {
1038             if (*norm_str)
1039             {
1040                 if (wrbuf_len(norm_wr))
1041                     wrbuf_puts(norm_wr, " ");
1042                 wrbuf_puts(norm_wr, norm_str);
1043             }
1044         }
1045         pp2_relevance_token_destroy(prt);
1046         xmlFree(mergekey);
1047     }
1048     else
1049     {
1050         /* no mergekey defined in XSL. Look for mergekey metadata instead */
1051         int field_id;
1052         for (field_id = 0; field_id < service->num_metadata; field_id++)
1053         {
1054             struct conf_metadata *ser_md = &service->metadata[field_id];
1055             if (ser_md->mergekey != Metadata_mergekey_no)
1056             {
1057                 int r = get_mergekey_from_doc(doc, root, ser_md->name,
1058                                               service, norm_wr);
1059                 if (r == 0 && ser_md->mergekey == Metadata_mergekey_required)
1060                 {
1061                     /* no mergekey on this one and it is required.. 
1062                        Generate unique key instead */
1063                     wrbuf_rewind(norm_wr);
1064                     break;
1065                 }
1066             }
1067         }
1068     }
1069
1070     /* generate unique key if none is not generated already or is empty */
1071     if (wrbuf_len(norm_wr) == 0)
1072     {
1073         wrbuf_printf(norm_wr, "%s-%d",
1074                      client_get_database(cl)->database->url, record_no);
1075     }
1076     if (wrbuf_len(norm_wr) > 0)
1077         mergekey_norm = nmem_strdup(nmem, wrbuf_cstr(norm_wr));
1078     wrbuf_destroy(norm_wr);
1079     return mergekey_norm;
1080 }
1081
1082 /** \brief see if metadata for pz:recordfilter exists 
1083     \param root xml root element of normalized record
1084     \param sdb session database for client
1085     \retval 0 if there is no metadata for pz:recordfilter
1086     \retval 1 if there is metadata for pz:recordfilter
1087
1088     If there is no pz:recordfilter defined, this function returns 1
1089     as well.
1090 */
1091     
1092 static int check_record_filter(xmlNode *root, struct session_database *sdb)
1093 {
1094     int match = 0;
1095     xmlNode *n;
1096     const char *s;
1097     s = session_setting_oneval(sdb, PZ_RECORDFILTER);
1098
1099     if (!s || !*s)
1100         return 1;
1101
1102     for (n = root->children; n; n = n->next)
1103     {
1104         if (n->type != XML_ELEMENT_NODE)
1105             continue;
1106         if (!strcmp((const char *) n->name, "metadata"))
1107         {
1108             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
1109             if (type)
1110             {
1111                 size_t len;
1112                 const char *eq = strchr(s, '~');
1113                 if (eq)
1114                     len = eq - s;
1115                 else
1116                     len = strlen(s);
1117                 if (len == strlen((const char *)type) &&
1118                     !memcmp((const char *) type, s, len))
1119                 {
1120                     xmlChar *value = xmlNodeGetContent(n);
1121                     if (value && *value)
1122                     {
1123                         if (!eq || strstr((const char *) value, eq+1))
1124                             match = 1;
1125                     }
1126                     xmlFree(value);
1127                 }
1128                 xmlFree(type);
1129             }
1130         }
1131     }
1132     return match;
1133 }
1134
1135
1136 static int ingest_to_cluster(struct client *cl,
1137                              xmlDoc *xdoc,
1138                              xmlNode *root,
1139                              int record_no,
1140                              const char *mergekey_norm);
1141
1142 /** \brief ingest XML record
1143     \param cl client holds the result set for record
1144     \param rec record buffer (0 terminated)
1145     \param record_no record position (1, 2, ..)
1146     \param nmem working NMEM
1147     \retval 0 OK
1148     \retval -1 failure
1149 */
1150 int ingest_record(struct client *cl, const char *rec,
1151                   int record_no, NMEM nmem)
1152 {
1153     struct session *se = client_get_session(cl);
1154     int ret = 0;
1155     struct session_database *sdb = client_get_database(cl);
1156     struct conf_service *service = se->service;
1157     xmlDoc *xdoc = normalize_record(sdb, service, rec, nmem);
1158     xmlNode *root;
1159     const char *mergekey_norm;
1160     
1161     if (!xdoc)
1162         return -1;
1163     
1164     root = xmlDocGetRootElement(xdoc);
1165     
1166     if (!check_record_filter(root, sdb))
1167     {
1168         yaz_log(YLOG_WARN, "Filtered out record no %d from %s", record_no,
1169                 sdb->database->url);
1170         xmlFreeDoc(xdoc);
1171         return -1;
1172     }
1173     
1174     mergekey_norm = get_mergekey(xdoc, cl, record_no, service, nmem);
1175     if (!mergekey_norm)
1176     {
1177         yaz_log(YLOG_WARN, "Got no mergekey");
1178         xmlFreeDoc(xdoc);
1179         return -1;
1180     }
1181     session_enter(se);
1182     if (client_get_session(cl) == se)
1183         ret = ingest_to_cluster(cl, xdoc, root, record_no, mergekey_norm);
1184     session_leave(se);
1185     
1186     xmlFreeDoc(xdoc);
1187     return ret;
1188 }
1189
1190 static int ingest_to_cluster(struct client *cl,
1191                              xmlDoc *xdoc,
1192                              xmlNode *root,
1193                              int record_no,
1194                              const char *mergekey_norm)
1195 {
1196     xmlNode *n;
1197     xmlChar *type = 0;
1198     xmlChar *value = 0;
1199     struct session_database *sdb = client_get_database(cl);
1200     struct session *se = client_get_session(cl);
1201     struct conf_service *service = se->service;
1202     struct record *record = record_create(se->nmem, 
1203                                           service->num_metadata,
1204                                           service->num_sortkeys, cl,
1205                                           record_no);
1206     struct record_cluster *cluster = reclist_insert(se->reclist,
1207                                                     service, 
1208                                                     record,
1209                                                     mergekey_norm,
1210                                                     &se->total_merged);
1211     if (!cluster)
1212         return -1;
1213     if (global_parameters.dump_records)
1214         yaz_log(YLOG_LOG, "Cluster id %s from %s (#%d)", cluster->recid,
1215                 sdb->database->url, record_no);
1216     relevance_newrec(se->relevance, cluster);
1217     
1218     // now parsing XML record and adding data to cluster or record metadata
1219     for (n = root->children; n; n = n->next)
1220     {
1221         pp2_relevance_token_t prt;
1222         if (type)
1223             xmlFree(type);
1224         if (value)
1225             xmlFree(value);
1226         type = value = 0;
1227         
1228         if (n->type != XML_ELEMENT_NODE)
1229             continue;
1230         if (!strcmp((const char *) n->name, "metadata"))
1231         {
1232             struct conf_metadata *ser_md = 0;
1233             struct conf_sortkey *ser_sk = 0;
1234             struct record_metadata **wheretoput = 0;
1235             struct record_metadata *rec_md = 0;
1236             int md_field_id = -1;
1237             int sk_field_id = -1;
1238             
1239             type = xmlGetProp(n, (xmlChar *) "type");
1240             value = xmlNodeListGetString(xdoc, n->children, 1);
1241             
1242             if (!type || !value || !*value)
1243                 continue;
1244             
1245             md_field_id 
1246                 = conf_service_metadata_field_id(service, (const char *) type);
1247             if (md_field_id < 0)
1248             {
1249                 if (se->number_of_warnings_unknown_metadata == 0)
1250                 {
1251                     yaz_log(YLOG_WARN, 
1252                             "Ignoring unknown metadata element: %s", type);
1253                 }
1254                 se->number_of_warnings_unknown_metadata++;
1255                 continue;
1256             }
1257             
1258             ser_md = &service->metadata[md_field_id];
1259             
1260             if (ser_md->sortkey_offset >= 0){
1261                 sk_field_id = ser_md->sortkey_offset;
1262                 ser_sk = &service->sortkeys[sk_field_id];
1263             }
1264
1265             // non-merged metadata
1266             rec_md = record_metadata_init(se->nmem, (const char *) value,
1267                                           ser_md->type, n->properties);
1268             if (!rec_md)
1269             {
1270                 yaz_log(YLOG_WARN, "bad metadata data '%s' for element '%s'",
1271                         value, type);
1272                 continue;
1273             }
1274             wheretoput = &record->metadata[md_field_id];
1275             while (*wheretoput)
1276                 wheretoput = &(*wheretoput)->next;
1277             *wheretoput = rec_md;
1278
1279             // merged metadata
1280             rec_md = record_metadata_init(se->nmem, (const char *) value,
1281                                           ser_md->type, 0);
1282             wheretoput = &cluster->metadata[md_field_id];
1283
1284             // and polulate with data:
1285             // assign cluster or record based on merge action
1286             if (ser_md->merge == Metadata_merge_unique)
1287             {
1288                 struct record_metadata *mnode;
1289                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
1290                     if (!strcmp((const char *) mnode->data.text.disp, 
1291                                 rec_md->data.text.disp))
1292                         break;
1293                 if (!mnode)
1294                 {
1295                     rec_md->next = *wheretoput;
1296                     *wheretoput = rec_md;
1297                 }
1298             }
1299             else if (ser_md->merge == Metadata_merge_longest)
1300             {
1301                 if (!*wheretoput 
1302                     || strlen(rec_md->data.text.disp) 
1303                     > strlen((*wheretoput)->data.text.disp))
1304                 {
1305                     *wheretoput = rec_md;
1306                     if (ser_sk)
1307                     {
1308                         const char *sort_str = 0;
1309                         int skip_article = 
1310                             ser_sk->type == Metadata_sortkey_skiparticle;
1311
1312                         if (!cluster->sortkeys[sk_field_id])
1313                             cluster->sortkeys[sk_field_id] = 
1314                                 nmem_malloc(se->nmem, 
1315                                             sizeof(union data_types));
1316                          
1317                         prt = pp2_relevance_tokenize(service->sort_pct);
1318
1319                         pp2_relevance_first(prt, rec_md->data.text.disp,
1320                                             skip_article);
1321
1322                         pp2_relevance_token_next(prt);
1323                          
1324                         sort_str = pp2_get_sort(prt);
1325                          
1326                         cluster->sortkeys[sk_field_id]->text.disp = 
1327                             rec_md->data.text.disp;
1328                         if (!sort_str)
1329                         {
1330                             sort_str = rec_md->data.text.disp;
1331                             yaz_log(YLOG_WARN, 
1332                                     "Could not make sortkey. Bug #1858");
1333                         }
1334                         cluster->sortkeys[sk_field_id]->text.sort = 
1335                             nmem_strdup(se->nmem, sort_str);
1336 #if 0
1337                         yaz_log(YLOG_LOG, "text disp=%s",
1338                                 cluster->sortkeys[sk_field_id]->text.disp);
1339                         yaz_log(YLOG_LOG, "text sort=%s",
1340                                 cluster->sortkeys[sk_field_id]->text.sort);
1341 #endif
1342                         pp2_relevance_token_destroy(prt);
1343                     }
1344                 }
1345             }
1346             else if (ser_md->merge == Metadata_merge_all)
1347             {
1348                 rec_md->next = *wheretoput;
1349                 *wheretoput = rec_md;
1350             }
1351             else if (ser_md->merge == Metadata_merge_range)
1352             {
1353                 if (!*wheretoput)
1354                 {
1355                     *wheretoput = rec_md;
1356                     if (ser_sk)
1357                         cluster->sortkeys[sk_field_id] 
1358                             = &rec_md->data;
1359                 }
1360                 else
1361                 {
1362                     int this_min = rec_md->data.number.min;
1363                     int this_max = rec_md->data.number.max;
1364                     if (this_min < (*wheretoput)->data.number.min)
1365                         (*wheretoput)->data.number.min = this_min;
1366                     if (this_max > (*wheretoput)->data.number.max)
1367                         (*wheretoput)->data.number.max = this_max;
1368                 }
1369             }
1370
1371
1372             // ranking of _all_ fields enabled ... 
1373             if (ser_md->rank)
1374                 relevance_countwords(se->relevance, cluster, 
1375                                      (char *) value, ser_md->rank,
1376                                      ser_md->name);
1377
1378             // construct facets ... 
1379             if (ser_md->termlist)
1380             {
1381                 if (ser_md->type == Metadata_type_year)
1382                 {
1383                     char year[64];
1384                     sprintf(year, "%d", rec_md->data.number.max);
1385                     add_facet(se, (char *) type, year);
1386                     if (rec_md->data.number.max != rec_md->data.number.min)
1387                     {
1388                         sprintf(year, "%d", rec_md->data.number.min);
1389                         add_facet(se, (char *) type, year);
1390                     }
1391                 }
1392                 else
1393                     add_facet(se, (char *) type, (char *) value);
1394             }
1395
1396             // cleaning up
1397             xmlFree(type);
1398             xmlFree(value);
1399             type = value = 0;
1400         }
1401         else
1402         {
1403             if (se->number_of_warnings_unknown_elements == 0)
1404                 yaz_log(YLOG_WARN,
1405                         "Unexpected element in internal record: %s", n->name);
1406             se->number_of_warnings_unknown_elements++;
1407         }
1408     }
1409     if (type)
1410         xmlFree(type);
1411     if (value)
1412         xmlFree(value);
1413
1414     relevance_donerecord(se->relevance, cluster);
1415     se->total_records++;
1416
1417     return 0;
1418 }
1419
1420 /*
1421  * Local variables:
1422  * c-basic-offset: 4
1423  * c-file-style: "Stroustrup"
1424  * indent-tabs-mode: nil
1425  * End:
1426  * vim: shiftwidth=4 tabstop=8 expandtab
1427  */
1428