New block: Wait for preferred targets to be "ready"
[pazpar2-moved-to-github.git] / src / session.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2010 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file session.c
21     \brief high-level logic; mostly user sessions and settings
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <time.h>
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <string.h>
32 #if HAVE_SYS_TIME_H
33 #include <sys/time.h>
34 #endif
35 #if HAVE_UNISTD_H
36 #include <unistd.h>
37 #endif
38 #ifdef WIN32
39 #include <windows.h>
40 #endif
41 #include <signal.h>
42 #include <ctype.h>
43 #include <assert.h>
44
45 #include <yaz/marcdisp.h>
46 #include <yaz/comstack.h>
47 #include <yaz/tcpip.h>
48 #include <yaz/proto.h>
49 #include <yaz/readconf.h>
50 #include <yaz/pquery.h>
51 #include <yaz/otherinfo.h>
52 #include <yaz/yaz-util.h>
53 #include <yaz/nmem.h>
54 #include <yaz/query-charset.h>
55 #include <yaz/querytowrbuf.h>
56 #include <yaz/oid_db.h>
57 #include <yaz/snprintf.h>
58 #include <yaz/gettimeofday.h>
59
60 #define USE_TIMING 0
61 #if USE_TIMING
62 #include <yaz/timing.h>
63 #endif
64
65 #include "ppmutex.h"
66 #include "parameters.h"
67 #include "session.h"
68 #include "eventl.h"
69 #include "http.h"
70 #include "termlists.h"
71 #include "reclists.h"
72 #include "relevance.h"
73 #include "database.h"
74 #include "client.h"
75 #include "settings.h"
76 #include "normalize7bit.h"
77
78 #define TERMLIST_HIGH_SCORE 25
79
80 #define MAX_CHUNK 15
81
82 // Note: Some things in this structure will eventually move to configuration
83 struct parameters global_parameters = 
84 {
85     0,   // dump_records
86     0    // debug_mode
87 };
88
89 struct client_list {
90     struct client *client;
91     struct client_list *next;
92 };
93
94 static void log_xml_doc(xmlDoc *doc)
95 {
96     FILE *lf = yaz_log_file();
97     xmlChar *result = 0;
98     int len = 0;
99 #if LIBXML_VERSION >= 20600
100     xmlDocDumpFormatMemory(doc, &result, &len, 1);
101 #else
102     xmlDocDumpMemory(doc, &result, &len);
103 #endif
104     if (lf && len)
105     {
106         (void) fwrite(result, 1, len, lf);
107         fprintf(lf, "\n");
108     }
109     xmlFree(result);
110 }
111
112 static void session_enter(struct session *s)
113 {
114     yaz_mutex_enter(s->session_mutex);
115 }
116
117 static void session_leave(struct session *s)
118 {
119     yaz_mutex_leave(s->session_mutex);
120 }
121
122 // Recursively traverse query structure to extract terms.
123 void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
124 {
125     char **words;
126     int numwords;
127     int i;
128
129     switch (n->kind)
130     {
131     case CCL_RPN_AND:
132     case CCL_RPN_OR:
133     case CCL_RPN_NOT:
134     case CCL_RPN_PROX:
135         pull_terms(nmem, n->u.p[0], termlist, num);
136         pull_terms(nmem, n->u.p[1], termlist, num);
137         break;
138     case CCL_RPN_TERM:
139         nmem_strsplit(nmem, " ", n->u.t.term, &words, &numwords);
140         for (i = 0; i < numwords; i++)
141             termlist[(*num)++] = words[i];
142         break;
143     default: // NOOP
144         break;
145     }
146 }
147
148
149 void add_facet(struct session *s, const char *type, const char *value, int count)
150 {
151     int i;
152
153     if (!*value)
154         return;
155     for (i = 0; i < s->num_termlists; i++)
156         if (!strcmp(s->termlists[i].name, type))
157             break;
158     if (i == s->num_termlists)
159     {
160         if (i == SESSION_MAX_TERMLISTS)
161         {
162             yaz_log(YLOG_FATAL, "Too many termlists");
163             return;
164         }
165
166         s->termlists[i].name = nmem_strdup(s->nmem, type);
167         s->termlists[i].termlist 
168             = termlist_create(s->nmem, TERMLIST_HIGH_SCORE);
169         s->num_termlists = i + 1;
170     }
171     yaz_log(YLOG_DEBUG, "Session: facets for %s: %s (%d)", type, value, count);
172
173     termlist_insert(s->termlists[i].termlist, value, count);
174 }
175
176 static xmlDoc *record_to_xml(struct session_database *sdb, const char *rec)
177 {
178     struct database *db = sdb->database;
179     xmlDoc *rdoc = 0;
180
181     rdoc = xmlParseMemory(rec, strlen(rec));
182
183     if (!rdoc)
184     {
185         yaz_log(YLOG_FATAL, "Non-wellformed XML received from %s",
186                 db->url);
187         return 0;
188     }
189
190     if (global_parameters.dump_records)
191     {
192         yaz_log(YLOG_LOG, "Un-normalized record from %s", db->url);
193         log_xml_doc(rdoc);
194     }
195
196     return rdoc;
197 }
198
199 #define MAX_XSLT_ARGS 16
200
201 // Add static values from session database settings if applicable
202 static void insert_settings_parameters(struct session_database *sdb,
203                                        struct conf_service *service,
204                                        char **parms,
205                                        NMEM nmem)
206 {
207     int i;
208     int nparms = 0;
209     int offset = 0;
210
211     for (i = 0; i < service->num_metadata; i++)
212     {
213         struct conf_metadata *md = &service->metadata[i];
214         int setting;
215
216         if (md->setting == Metadata_setting_parameter &&
217             (setting = settings_lookup_offset(service, md->name)) >= 0)
218         {
219             const char *val = session_setting_oneval(sdb, setting);
220             if (val && nparms < MAX_XSLT_ARGS)
221             {
222                 char *buf;
223                 int len = strlen(val);
224                 buf = nmem_malloc(nmem, len + 3);
225                 buf[0] = '\'';
226                 strcpy(buf + 1, val);
227                 buf[len+1] = '\'';
228                 buf[len+2] = '\0';
229                 parms[offset++] = md->name;
230                 parms[offset++] = buf;
231                 nparms++;
232             }
233         }
234     }
235     parms[offset] = 0;
236 }
237
238 // Add static values from session database settings if applicable
239 static void insert_settings_values(struct session_database *sdb, xmlDoc *doc,
240     struct conf_service *service)
241 {
242     int i;
243
244     for (i = 0; i < service->num_metadata; i++)
245     {
246         struct conf_metadata *md = &service->metadata[i];
247         int offset;
248
249         if (md->setting == Metadata_setting_postproc &&
250             (offset = settings_lookup_offset(service, md->name)) >= 0)
251         {
252             const char *val = session_setting_oneval(sdb, offset);
253             if (val)
254             {
255                 xmlNode *r = xmlDocGetRootElement(doc);
256                 xmlNode *n = xmlNewTextChild(r, 0, (xmlChar *) "metadata",
257                                              (xmlChar *) val);
258                 xmlSetProp(n, (xmlChar *) "type", (xmlChar *) md->name);
259             }
260         }
261     }
262 }
263
264 static xmlDoc *normalize_record(struct session_database *sdb,
265                                 struct conf_service *service,
266                                 const char *rec, NMEM nmem)
267 {
268     xmlDoc *rdoc = record_to_xml(sdb, rec);
269
270     if (rdoc)
271     {
272         char *parms[MAX_XSLT_ARGS*2+1];
273         
274         insert_settings_parameters(sdb, service, parms, nmem);
275         
276         if (normalize_record_transform(sdb->map, &rdoc, (const char **)parms))
277         {
278             yaz_log(YLOG_WARN, "Normalize failed from %s", sdb->database->url);
279         }
280         else
281         {
282             insert_settings_values(sdb, rdoc, service);
283             
284             if (global_parameters.dump_records)
285             {
286                 yaz_log(YLOG_LOG, "Normalized record from %s", 
287                         sdb->database->url);
288                 log_xml_doc(rdoc);
289             }
290         }
291     }
292     return rdoc;
293 }
294
295 void session_settings_dump(struct session *se,
296                            struct session_database *db,
297                            WRBUF w)
298 {
299     if (db->settings)
300     {
301         int i, num = db->num_settings;
302         for (i = 0; i < num; i++)
303         {
304             struct setting *s = db->settings[i];
305             for (;s ; s = s->next)
306             {
307                 wrbuf_puts(w, "<set name=\"");
308                 wrbuf_xmlputs(w, s->name);
309                 wrbuf_puts(w, "\" value=\"");
310                 wrbuf_xmlputs(w, s->value);
311                 wrbuf_puts(w, "\"/>");
312             }
313             if (db->settings[i])
314                 wrbuf_puts(w, "\n");
315         }
316     }
317 }
318
319 // Retrieve first defined value for 'name' for given database.
320 // Will be extended to take into account user associated with session
321 const char *session_setting_oneval(struct session_database *db, int offset)
322 {
323     if (offset >= db->num_settings || !db->settings[offset])
324         return "";
325     return db->settings[offset]->value;
326 }
327
328 // Prepare XSLT stylesheets for record normalization
329 // Structures are allocated on the session_wide nmem to avoid having
330 // to recompute this for every search. This would lead
331 // to leaking if a single session was to repeatedly change the PZ_XSLT
332 // setting. However, this is not a realistic use scenario.
333 static int prepare_map(struct session *se, struct session_database *sdb)
334 {
335     const char *s;
336
337     if (!sdb->settings)
338     {
339         yaz_log(YLOG_WARN, "No settings on %s", sdb->database->url);
340         return -1;
341     }
342     if ((s = session_setting_oneval(sdb, PZ_XSLT)))
343     {
344         char auto_stylesheet[256];
345
346         if (!strcmp(s, "auto"))
347         {
348             const char *request_syntax = session_setting_oneval(
349                 sdb, PZ_REQUESTSYNTAX);
350             if (request_syntax)
351             {
352                 char *cp;
353                 yaz_snprintf(auto_stylesheet, sizeof(auto_stylesheet),
354                              "%s.xsl", request_syntax);
355                 for (cp = auto_stylesheet; *cp; cp++)
356                 {
357                     /* deliberately only consider ASCII */
358                     if (*cp > 32 && *cp < 127)
359                         *cp = tolower(*cp);
360                 }
361                 s = auto_stylesheet;
362             }
363             else
364             {
365                 yaz_log(YLOG_WARN, "No pz:requestsyntax for auto stylesheet");
366             }
367         }
368         sdb->map = normalize_cache_get(se->normalize_cache,
369                                        se->service->server->config, s);
370         if (!sdb->map)
371             return -1;
372     }
373     return 0;
374 }
375
376 // This analyzes settings and recomputes any supporting data structures
377 // if necessary.
378 static int prepare_session_database(struct session *se, 
379                                     struct session_database *sdb)
380 {
381     if (!sdb->settings)
382     {
383         yaz_log(YLOG_WARN, 
384                 "No settings associated with %s", sdb->database->url);
385         return -1;
386     }
387     if (sdb->settings[PZ_XSLT] && !sdb->map)
388     {
389         if (prepare_map(se, sdb) < 0)
390             return -1;
391     }
392     return 0;
393 }
394
395 // called if watch should be removed because http_channel is to be destroyed
396 static void session_watch_cancel(void *data, struct http_channel *c,
397                                  void *data2)
398 {
399     struct session_watchentry *ent = data;
400
401     ent->fun = 0;
402     ent->data = 0;
403     ent->obs = 0;
404 }
405
406 // set watch. Returns 0=OK, -1 if watch is already set
407 int session_set_watch(struct session *s, int what, 
408                       session_watchfun fun, void *data,
409                       struct http_channel *chan)
410 {
411     int ret;
412     session_enter(s);
413     if (s->watchlist[what].fun)
414         ret = -1;
415     else
416     {
417         
418         s->watchlist[what].fun = fun;
419         s->watchlist[what].data = data;
420         s->watchlist[what].obs = http_add_observer(chan, &s->watchlist[what],
421                                                    session_watch_cancel);
422         ret = 0;
423     }
424     session_leave(s);
425     return 0;
426 }
427
428 void session_alert_watch(struct session *s, int what)
429 {
430     assert(s);
431     session_enter(s);
432     if (s->watchlist[what].fun)
433     {
434         /* our watch is no longer associated with http_channel */
435         void *data;
436         session_watchfun fun;
437
438         http_remove_observer(s->watchlist[what].obs);
439         fun  = s->watchlist[what].fun;
440         data = s->watchlist[what].data;
441
442         /* reset watch before fun is invoked - in case fun wants to set
443            it again */
444         s->watchlist[what].fun = 0;
445         s->watchlist[what].data = 0;
446         s->watchlist[what].obs = 0;
447
448         session_leave(s);
449         fun(data);
450     }
451     else
452         session_leave(s);
453 }
454
455 //callback for grep_databases
456 static void select_targets_callback(void *context, struct session_database *db)
457 {
458     struct session *se = (struct session*) context;
459     struct client *cl = client_create();
460     struct client_list *l;
461     client_set_database(cl, db);
462
463     client_set_session(cl, se);
464     l = xmalloc(sizeof(*l));
465     l->client = cl;
466     l->next = se->clients;
467     se->clients = l;
468 }
469
470 static void session_remove_clients(struct session *se)
471 {
472     struct client_list *l;
473
474     session_enter(se);
475     l = se->clients;
476     se->clients = 0;
477     session_leave(se);
478
479     while (l)
480     {
481         struct client_list *l_next = l->next;
482         client_lock(l->client);
483         client_set_session(l->client, 0);
484         client_set_database(l->client, 0);
485         client_unlock(l->client);
486         client_destroy(l->client);
487         xfree(l);
488         l = l_next;
489     }
490 }
491
492 // Associates a set of clients with a session;
493 // Note: Session-databases represent databases with per-session 
494 // setting overrides
495 static int select_targets(struct session *se, const char *filter)
496 {
497     return session_grep_databases(se, filter, select_targets_callback);
498 }
499
500 int session_active_clients(struct session *s)
501 {
502     struct client_list *l;
503     int res = 0;
504
505     for (l = s->clients; l; l = l->next)
506         if (client_is_active(l->client))
507             res++;
508
509     return res;
510 }
511
512 int session_preferred_clients_ready(struct session *s)
513 {
514     struct client_list *l;
515     int res = 0;
516
517     for (l = s->clients; l; l = l->next)
518         if (client_is_active_preferred(l->client))
519             res++;
520
521     return res == 0;
522 }
523
524
525
526 enum pazpar2_error_code search(struct session *se,
527                                const char *query,
528                                const char *startrecs, const char *maxrecs,
529                                const char *filter,
530                                const char **addinfo)
531 {
532     int live_channels = 0;
533     int no_working = 0;
534     int no_failed = 0;
535     struct client_list *l;
536     struct timeval tval;
537
538     yaz_log(YLOG_DEBUG, "Search");
539
540     *addinfo = 0;
541
542     session_remove_clients(se);
543     
544     session_enter(se);
545     reclist_destroy(se->reclist);
546     se->reclist = 0;
547     relevance_destroy(&se->relevance);
548     nmem_reset(se->nmem);
549     se->total_records = se->total_hits = se->total_merged = 0;
550     se->num_termlists = 0;
551     live_channels = select_targets(se, filter);
552     if (!live_channels)
553     {
554         session_leave(se);
555         return PAZPAR2_NO_TARGETS;
556     }
557     se->reclist = reclist_create(se->nmem);
558
559     yaz_gettimeofday(&tval);
560     
561     tval.tv_sec += 5;
562
563     for (l = se->clients; l; l = l->next)
564     {
565         struct client *cl = l->client;
566
567         if (maxrecs)
568             client_set_maxrecs(cl, atoi(maxrecs));
569         if (startrecs)
570             client_set_startrecs(cl, atoi(startrecs));
571         if (prepare_session_database(se, client_get_database(cl)) < 0)
572             ;
573         else if (client_parse_query(cl, query) < 0)
574             no_failed++;
575         else
576         {
577             no_working++;
578             if (client_prep_connection(cl, se->service->z3950_operation_timeout,
579                                        se->service->z3950_session_timeout,
580                                        se->service->server->iochan_man,
581                                        &tval))
582                 client_start_search(cl);
583         }
584     }
585     session_leave(se);
586     if (no_working == 0)
587     {
588         if (no_failed > 0)
589         {
590             *addinfo = "query";
591             return PAZPAR2_MALFORMED_PARAMETER_VALUE;
592         }
593         else
594             return PAZPAR2_NO_TARGETS;
595     }
596     return PAZPAR2_NO_ERROR;
597 }
598
599 // Creates a new session_database object for a database
600 static void session_init_databases_fun(void *context, struct database *db)
601 {
602     struct session *se = (struct session *) context;
603     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
604     int i;
605
606     new->database = db;
607     
608     new->map = 0;
609     assert(db->settings);
610     new->settings = nmem_malloc(se->session_nmem,
611                                 sizeof(struct settings *) * db->num_settings);
612     new->num_settings = db->num_settings;
613     for (i = 0; i < db->num_settings; i++)
614     {
615         struct setting *setting = db->settings[i];
616         new->settings[i] = setting;
617     }
618     new->next = se->databases;
619     se->databases = new;
620 }
621
622 // Doesn't free memory associated with sdb -- nmem takes care of that
623 static void session_database_destroy(struct session_database *sdb)
624 {
625     sdb->map = 0;
626 }
627
628 // Initialize session_database list -- this represents this session's view
629 // of the database list -- subject to modification by the settings ws command
630 void session_init_databases(struct session *se)
631 {
632     se->databases = 0;
633     predef_grep_databases(se, se->service, session_init_databases_fun);
634 }
635
636 // Probably session_init_databases_fun should be refactored instead of
637 // called here.
638 static struct session_database *load_session_database(struct session *se, 
639                                                       char *id)
640 {
641     struct database *db = new_database(id, se->session_nmem);
642
643     resolve_database(se->service, db);
644
645     session_init_databases_fun((void*) se, db);
646
647     // New sdb is head of se->databases list
648     return se->databases;
649 }
650
651 // Find an existing session database. If not found, load it
652 static struct session_database *find_session_database(struct session *se, 
653                                                       char *id)
654 {
655     struct session_database *sdb;
656
657     for (sdb = se->databases; sdb; sdb = sdb->next)
658         if (!strcmp(sdb->database->url, id))
659             return sdb;
660     return load_session_database(se, id);
661 }
662
663 // Apply a session override to a database
664 void session_apply_setting(struct session *se, char *dbname, char *setting,
665                            char *value)
666 {
667     struct session_database *sdb = find_session_database(se, dbname);
668     struct conf_service *service = se->service;
669     struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
670     int offset = settings_create_offset(service, setting);
671
672     expand_settings_array(&sdb->settings, &sdb->num_settings, offset,
673                           se->session_nmem);
674     new->precedence = 0;
675     new->target = dbname;
676     new->name = setting;
677     new->value = value;
678     new->next = sdb->settings[offset];
679     sdb->settings[offset] = new;
680
681     // Force later recompute of settings-driven data structures
682     // (happens when a search starts and client connections are prepared)
683     switch (offset)
684     {
685     case PZ_XSLT:
686         if (sdb->map)
687         {
688             sdb->map = 0;
689         }
690         break;
691     }
692 }
693
694 void destroy_session(struct session *se)
695 {
696     struct session_database *sdb;
697
698     yaz_log(YLOG_DEBUG, "%p Pazpar2 session destroy", se);
699     session_remove_clients(se);
700
701     for (sdb = se->databases; sdb; sdb = sdb->next)
702         session_database_destroy(sdb);
703     normalize_cache_destroy(se->normalize_cache);
704     relevance_destroy(&se->relevance);
705     reclist_destroy(se->reclist);
706     nmem_destroy(se->nmem);
707     service_destroy(se->service);
708     yaz_mutex_destroy(&se->session_mutex);
709     wrbuf_destroy(se->wrbuf);
710 }
711
712 struct session *new_session(NMEM nmem, struct conf_service *service,
713                             const char *name)
714 {
715     int i;
716     struct session *session = nmem_malloc(nmem, sizeof(*session));
717
718     yaz_log(YLOG_DEBUG, "%p New Pazpar2 session", session);
719
720     session->service = service;
721     session->relevance = 0;
722     session->total_hits = 0;
723     session->total_records = 0;
724     session->number_of_warnings_unknown_elements = 0;
725     session->number_of_warnings_unknown_metadata = 0;
726     session->num_termlists = 0;
727     session->reclist = 0;
728     session->clients = 0;
729     session->session_nmem = nmem;
730     session->nmem = nmem_create();
731     session->wrbuf = wrbuf_alloc();
732     session->databases = 0;
733     for (i = 0; i <= SESSION_WATCH_MAX; i++)
734     {
735         session->watchlist[i].data = 0;
736         session->watchlist[i].fun = 0;
737     }
738     session->normalize_cache = normalize_cache_create();
739     session->session_mutex = 0;
740     pazpar2_mutex_create(&session->session_mutex, name);
741
742     return session;
743 }
744
745 struct hitsbytarget *hitsbytarget(struct session *se, int *count, NMEM nmem)
746 {
747     struct hitsbytarget *res = 0;
748     struct client_list *l;
749     size_t sz = 0;
750
751     session_enter(se);
752     for (l = se->clients; l; l = l->next)
753         sz++;
754
755     res = nmem_malloc(nmem, sizeof(*res) * sz);
756     *count = 0;
757     for (l = se->clients; l; l = l->next)
758     {
759         struct client *cl = l->client;
760         WRBUF w = wrbuf_alloc();
761         const char *name = session_setting_oneval(client_get_database(cl),
762                                                   PZ_NAME);
763
764         res[*count].id = client_get_database(cl)->database->url;
765         res[*count].name = *name ? name : "Unknown";
766         res[*count].hits = client_get_hits(cl);
767         res[*count].records = client_get_num_records(cl);
768         res[*count].diagnostic = client_get_diagnostic(cl);
769         res[*count].state = client_get_state_str(cl);
770         res[*count].connected  = client_get_connection(cl) ? 1 : 0;
771         session_settings_dump(se, client_get_database(cl), w);
772         res[*count].settings_xml = w;
773         (*count)++;
774     }
775     session_leave(se);
776     return res;
777 }
778
779 struct termlist_score **termlist(struct session *se, const char *name, int *num)
780 {
781     int i;
782     struct termlist_score **tl = 0;
783
784     session_enter(se);
785     for (i = 0; i < se->num_termlists; i++)
786         if (!strcmp((const char *) se->termlists[i].name, name))
787         {
788             tl = termlist_highscore(se->termlists[i].termlist, num);
789             break;
790         }
791     session_leave(se);
792     return tl;
793 }
794
795 #ifdef MISSING_HEADERS
796 void report_nmem_stats(void)
797 {
798     size_t in_use, is_free;
799
800     nmem_get_memory_in_use(&in_use);
801     nmem_get_memory_free(&is_free);
802
803     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
804             (long) in_use, (long) is_free);
805 }
806 #endif
807
808 struct record_cluster *show_single_start(struct session *se, const char *id,
809                                          struct record_cluster **prev_r,
810                                          struct record_cluster **next_r)
811 {
812     struct record_cluster *r;
813
814     session_enter(se);
815     reclist_enter(se->reclist);
816     *prev_r = 0;
817     *next_r = 0;
818     while ((r = reclist_read_record(se->reclist)))
819     {
820         if (!strcmp(r->recid, id))
821         {
822             *next_r = reclist_read_record(se->reclist);
823             break;
824         }
825         *prev_r = r;
826     }
827     reclist_leave(se->reclist);
828     if (!r)
829         session_leave(se);
830     return r;
831 }
832
833 void show_single_stop(struct session *se, struct record_cluster *rec)
834 {
835     session_leave(se);
836 }
837
838 struct record_cluster **show_range_start(struct session *se,
839                                          struct reclist_sortparms *sp, 
840                                          int start, int *num, int *total, Odr_int *sumhits)
841 {
842     struct record_cluster **recs;
843     struct reclist_sortparms *spp;
844     int i;
845 #if USE_TIMING    
846     yaz_timing_t t = yaz_timing_create();
847 #endif
848     session_enter(se);
849     recs = nmem_malloc(se->nmem, *num * sizeof(struct record_cluster *));
850     if (!se->relevance)
851     {
852         *num = 0;
853         *total = 0;
854         *sumhits = 0;
855         recs = 0;
856     }
857     else
858     {
859         for (spp = sp; spp; spp = spp->next)
860             if (spp->type == Metadata_sortkey_relevance)
861             {
862                 relevance_prepare_read(se->relevance, se->reclist);
863                 break;
864             }
865         reclist_sort(se->reclist, sp);
866         
867         reclist_enter(se->reclist);
868         *total = reclist_get_num_records(se->reclist);
869         *sumhits = se->total_hits;
870         
871         for (i = 0; i < start; i++)
872             if (!reclist_read_record(se->reclist))
873             {
874                 *num = 0;
875                 recs = 0;
876                 break;
877             }
878         
879         for (i = 0; i < *num; i++)
880         {
881             struct record_cluster *r = reclist_read_record(se->reclist);
882             if (!r)
883             {
884                 *num = i;
885                 break;
886             }
887             recs[i] = r;
888         }
889         reclist_leave(se->reclist);
890     }
891 #if USE_TIMING
892     yaz_timing_stop(t);
893     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
894             yaz_timing_get_real(t), yaz_timing_get_user(t),
895             yaz_timing_get_sys(t));
896     yaz_timing_destroy(&t);
897 #endif
898     return recs;
899 }
900
901 void show_range_stop(struct session *se, struct record_cluster **recs)
902 {
903     session_leave(se);
904 }
905
906 void statistics(struct session *se, struct statistics *stat)
907 {
908     struct client_list *l;
909     int count = 0;
910
911     memset(stat, 0, sizeof(*stat));
912     for (l = se->clients; l; l = l->next)
913     {
914         struct client *cl = l->client;
915         if (!client_get_connection(cl))
916             stat->num_no_connection++;
917         switch (client_get_state(cl))
918         {
919         case Client_Connecting: stat->num_connecting++; break;
920         case Client_Working: stat->num_working++; break;
921         case Client_Idle: stat->num_idle++; break;
922         case Client_Failed: stat->num_failed++; break;
923         case Client_Error: stat->num_error++; break;
924         default: break;
925         }
926         count++;
927     }
928     stat->num_hits = se->total_hits;
929     stat->num_records = se->total_records;
930
931     stat->num_clients = count;
932 }
933
934 static struct record_metadata *record_metadata_init(
935     NMEM nmem, const char *value, enum conf_metadata_type type,
936     struct _xmlAttr *attr)
937 {
938     struct record_metadata *rec_md = record_metadata_create(nmem);
939     struct record_metadata_attr **attrp = &rec_md->attributes;
940     
941     for (; attr; attr = attr->next)
942     {
943         if (attr->children && attr->children->content)
944         {
945             if (strcmp((const char *) attr->name, "type"))
946             {  /* skip the "type" attribute.. Its value is already part of
947                   the element in output (md-%s) and so repeating it here
948                   is redundant */
949                 *attrp = nmem_malloc(nmem, sizeof(**attrp));
950                 (*attrp)->name =
951                     nmem_strdup(nmem, (const char *) attr->name);
952                 (*attrp)->value =
953                     nmem_strdup(nmem, (const char *) attr->children->content);
954                 attrp = &(*attrp)->next;
955             }
956         }
957     }
958     *attrp = 0;
959
960     if (type == Metadata_type_generic)
961     {
962         char *p = nmem_strdup(nmem, value);
963
964         p = normalize7bit_generic(p, " ,/.:([");
965         
966         rec_md->data.text.disp = p;
967         rec_md->data.text.sort = 0;
968     }
969     else if (type == Metadata_type_year || type == Metadata_type_date)
970     {
971         int first, last;
972         int longdate = 0;
973
974         if (type == Metadata_type_date)
975             longdate = 1;
976         if (extract7bit_dates((char *) value, &first, &last, longdate) < 0)
977             return 0;
978
979         rec_md->data.number.min = first;
980         rec_md->data.number.max = last;
981     }
982     else
983         return 0;
984     return rec_md;
985 }
986
987 static int get_mergekey_from_doc(xmlDoc *doc, xmlNode *root, const char *name,
988                                  struct conf_service *service, WRBUF norm_wr)
989 {
990     xmlNode *n;
991     int no_found = 0;
992     for (n = root->children; n; n = n->next)
993     {
994         if (n->type != XML_ELEMENT_NODE)
995             continue;
996         if (!strcmp((const char *) n->name, "metadata"))
997         {
998             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
999             if (type == NULL) {
1000                 yaz_log(YLOG_FATAL, "Missing type attribute on metadata element. Skipping!");
1001             }
1002             else if (!strcmp(name, (const char *) type))
1003             {
1004                 xmlChar *value = xmlNodeListGetString(doc, n->children, 1);
1005                 if (value)
1006                 {
1007                     const char *norm_str;
1008                     pp2_relevance_token_t prt =
1009                         pp2_relevance_tokenize(service->mergekey_pct);
1010                     
1011                     pp2_relevance_first(prt, (const char *) value, 0);
1012                     if (wrbuf_len(norm_wr) > 0)
1013                         wrbuf_puts(norm_wr, " ");
1014                     wrbuf_puts(norm_wr, name);
1015                     while ((norm_str =
1016                             pp2_relevance_token_next(prt)))
1017                     {
1018                         if (*norm_str)
1019                         {
1020                             wrbuf_puts(norm_wr, " ");
1021                             wrbuf_puts(norm_wr, norm_str);
1022                         }
1023                     }
1024                     xmlFree(value);
1025                     pp2_relevance_token_destroy(prt);
1026                     no_found++;
1027                 }
1028             }
1029             xmlFree(type);
1030         }
1031     }
1032     return no_found;
1033 }
1034
1035 static const char *get_mergekey(xmlDoc *doc, struct client *cl, int record_no,
1036                                 struct conf_service *service, NMEM nmem)
1037 {
1038     char *mergekey_norm = 0;
1039     xmlNode *root = xmlDocGetRootElement(doc);
1040     WRBUF norm_wr = wrbuf_alloc();
1041
1042     /* consider mergekey from XSL first */
1043     xmlChar *mergekey = xmlGetProp(root, (xmlChar *) "mergekey");
1044     if (mergekey)
1045     {
1046         const char *norm_str;
1047         pp2_relevance_token_t prt =
1048             pp2_relevance_tokenize(service->mergekey_pct);
1049
1050         pp2_relevance_first(prt, (const char *) mergekey, 0);
1051         while ((norm_str = pp2_relevance_token_next(prt)))
1052         {
1053             if (*norm_str)
1054             {
1055                 if (wrbuf_len(norm_wr))
1056                     wrbuf_puts(norm_wr, " ");
1057                 wrbuf_puts(norm_wr, norm_str);
1058             }
1059         }
1060         pp2_relevance_token_destroy(prt);
1061         xmlFree(mergekey);
1062     }
1063     else
1064     {
1065         /* no mergekey defined in XSL. Look for mergekey metadata instead */
1066         int field_id;
1067         for (field_id = 0; field_id < service->num_metadata; field_id++)
1068         {
1069             struct conf_metadata *ser_md = &service->metadata[field_id];
1070             if (ser_md->mergekey != Metadata_mergekey_no)
1071             {
1072                 int r = get_mergekey_from_doc(doc, root, ser_md->name,
1073                                               service, norm_wr);
1074                 if (r == 0 && ser_md->mergekey == Metadata_mergekey_required)
1075                 {
1076                     /* no mergekey on this one and it is required.. 
1077                        Generate unique key instead */
1078                     wrbuf_rewind(norm_wr);
1079                     break;
1080                 }
1081             }
1082         }
1083     }
1084
1085     /* generate unique key if none is not generated already or is empty */
1086     if (wrbuf_len(norm_wr) == 0)
1087     {
1088         wrbuf_printf(norm_wr, "%s-%d",
1089                      client_get_database(cl)->database->url, record_no);
1090     }
1091     if (wrbuf_len(norm_wr) > 0)
1092         mergekey_norm = nmem_strdup(nmem, wrbuf_cstr(norm_wr));
1093     wrbuf_destroy(norm_wr);
1094     return mergekey_norm;
1095 }
1096
1097 /** \brief see if metadata for pz:recordfilter exists 
1098     \param root xml root element of normalized record
1099     \param sdb session database for client
1100     \retval 0 if there is no metadata for pz:recordfilter
1101     \retval 1 if there is metadata for pz:recordfilter
1102
1103     If there is no pz:recordfilter defined, this function returns 1
1104     as well.
1105 */
1106     
1107 static int check_record_filter(xmlNode *root, struct session_database *sdb)
1108 {
1109     int match = 0;
1110     xmlNode *n;
1111     const char *s;
1112     s = session_setting_oneval(sdb, PZ_RECORDFILTER);
1113
1114     if (!s || !*s)
1115         return 1;
1116
1117     for (n = root->children; n; n = n->next)
1118     {
1119         if (n->type != XML_ELEMENT_NODE)
1120             continue;
1121         if (!strcmp((const char *) n->name, "metadata"))
1122         {
1123             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
1124             if (type)
1125             {
1126                 size_t len;
1127                 const char *eq = strchr(s, '~');
1128                 if (eq)
1129                     len = eq - s;
1130                 else
1131                     len = strlen(s);
1132                 if (len == strlen((const char *)type) &&
1133                     !memcmp((const char *) type, s, len))
1134                 {
1135                     xmlChar *value = xmlNodeGetContent(n);
1136                     if (value && *value)
1137                     {
1138                         if (!eq || strstr((const char *) value, eq+1))
1139                             match = 1;
1140                     }
1141                     xmlFree(value);
1142                 }
1143                 xmlFree(type);
1144             }
1145         }
1146     }
1147     return match;
1148 }
1149
1150
1151 static int ingest_to_cluster(struct client *cl,
1152                              xmlDoc *xdoc,
1153                              xmlNode *root,
1154                              int record_no,
1155                              const char *mergekey_norm);
1156
1157 /** \brief ingest XML record
1158     \param cl client holds the result set for record
1159     \param rec record buffer (0 terminated)
1160     \param record_no record position (1, 2, ..)
1161     \param nmem working NMEM
1162     \retval 0 OK
1163     \retval -1 failure
1164 */
1165 int ingest_record(struct client *cl, const char *rec,
1166                   int record_no, NMEM nmem)
1167 {
1168     struct session *se = client_get_session(cl);
1169     int ret = 0;
1170     struct session_database *sdb = client_get_database(cl);
1171     struct conf_service *service = se->service;
1172     xmlDoc *xdoc = normalize_record(sdb, service, rec, nmem);
1173     xmlNode *root;
1174     const char *mergekey_norm;
1175     
1176     if (!xdoc)
1177         return -1;
1178     
1179     root = xmlDocGetRootElement(xdoc);
1180     
1181     if (!check_record_filter(root, sdb))
1182     {
1183         yaz_log(YLOG_WARN, "Filtered out record no %d from %s", record_no,
1184                 sdb->database->url);
1185         xmlFreeDoc(xdoc);
1186         return -1;
1187     }
1188     
1189     mergekey_norm = get_mergekey(xdoc, cl, record_no, service, nmem);
1190     if (!mergekey_norm)
1191     {
1192         yaz_log(YLOG_WARN, "Got no mergekey");
1193         xmlFreeDoc(xdoc);
1194         return -1;
1195     }
1196     session_enter(se);
1197     if (client_get_session(cl) == se)
1198         ret = ingest_to_cluster(cl, xdoc, root, record_no, mergekey_norm);
1199     session_leave(se);
1200     
1201     xmlFreeDoc(xdoc);
1202     return ret;
1203 }
1204
1205 static int ingest_to_cluster(struct client *cl,
1206                              xmlDoc *xdoc,
1207                              xmlNode *root,
1208                              int record_no,
1209                              const char *mergekey_norm)
1210 {
1211     xmlNode *n;
1212     xmlChar *type = 0;
1213     xmlChar *value = 0;
1214     struct session_database *sdb = client_get_database(cl);
1215     struct session *se = client_get_session(cl);
1216     struct conf_service *service = se->service;
1217     struct record *record = record_create(se->nmem, 
1218                                           service->num_metadata,
1219                                           service->num_sortkeys, cl,
1220                                           record_no);
1221     struct record_cluster *cluster = reclist_insert(se->reclist,
1222                                                     service, 
1223                                                     record,
1224                                                     mergekey_norm,
1225                                                     &se->total_merged);
1226     if (!cluster)
1227         return -1;
1228     if (global_parameters.dump_records)
1229         yaz_log(YLOG_LOG, "Cluster id %s from %s (#%d)", cluster->recid,
1230                 sdb->database->url, record_no);
1231     relevance_newrec(se->relevance, cluster);
1232     
1233     // now parsing XML record and adding data to cluster or record metadata
1234     for (n = root->children; n; n = n->next)
1235     {
1236         pp2_relevance_token_t prt;
1237         if (type)
1238             xmlFree(type);
1239         if (value)
1240             xmlFree(value);
1241         type = value = 0;
1242         
1243         if (n->type != XML_ELEMENT_NODE)
1244             continue;
1245         if (!strcmp((const char *) n->name, "metadata"))
1246         {
1247             struct conf_metadata *ser_md = 0;
1248             struct conf_sortkey *ser_sk = 0;
1249             struct record_metadata **wheretoput = 0;
1250             struct record_metadata *rec_md = 0;
1251             int md_field_id = -1;
1252             int sk_field_id = -1;
1253             
1254             type = xmlGetProp(n, (xmlChar *) "type");
1255             value = xmlNodeListGetString(xdoc, n->children, 1);
1256             
1257             if (!type || !value || !*value)
1258                 continue;
1259             
1260             md_field_id 
1261                 = conf_service_metadata_field_id(service, (const char *) type);
1262             if (md_field_id < 0)
1263             {
1264                 if (se->number_of_warnings_unknown_metadata == 0)
1265                 {
1266                     yaz_log(YLOG_WARN, 
1267                             "Ignoring unknown metadata element: %s", type);
1268                 }
1269                 se->number_of_warnings_unknown_metadata++;
1270                 continue;
1271             }
1272             
1273             ser_md = &service->metadata[md_field_id];
1274             
1275             if (ser_md->sortkey_offset >= 0){
1276                 sk_field_id = ser_md->sortkey_offset;
1277                 ser_sk = &service->sortkeys[sk_field_id];
1278             }
1279
1280             // non-merged metadata
1281             rec_md = record_metadata_init(se->nmem, (const char *) value,
1282                                           ser_md->type, n->properties);
1283             if (!rec_md)
1284             {
1285                 yaz_log(YLOG_WARN, "bad metadata data '%s' for element '%s'",
1286                         value, type);
1287                 continue;
1288             }
1289             wheretoput = &record->metadata[md_field_id];
1290             while (*wheretoput)
1291                 wheretoput = &(*wheretoput)->next;
1292             *wheretoput = rec_md;
1293
1294             // merged metadata
1295             rec_md = record_metadata_init(se->nmem, (const char *) value,
1296                                           ser_md->type, 0);
1297             wheretoput = &cluster->metadata[md_field_id];
1298
1299             // and polulate with data:
1300             // assign cluster or record based on merge action
1301             if (ser_md->merge == Metadata_merge_unique)
1302             {
1303                 struct record_metadata *mnode;
1304                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
1305                     if (!strcmp((const char *) mnode->data.text.disp, 
1306                                 rec_md->data.text.disp))
1307                         break;
1308                 if (!mnode)
1309                 {
1310                     rec_md->next = *wheretoput;
1311                     *wheretoput = rec_md;
1312                 }
1313             }
1314             else if (ser_md->merge == Metadata_merge_longest)
1315             {
1316                 if (!*wheretoput 
1317                     || strlen(rec_md->data.text.disp) 
1318                     > strlen((*wheretoput)->data.text.disp))
1319                 {
1320                     *wheretoput = rec_md;
1321                     if (ser_sk)
1322                     {
1323                         const char *sort_str = 0;
1324                         int skip_article = 
1325                             ser_sk->type == Metadata_sortkey_skiparticle;
1326
1327                         if (!cluster->sortkeys[sk_field_id])
1328                             cluster->sortkeys[sk_field_id] = 
1329                                 nmem_malloc(se->nmem, 
1330                                             sizeof(union data_types));
1331                          
1332                         prt = pp2_relevance_tokenize(service->sort_pct);
1333
1334                         pp2_relevance_first(prt, rec_md->data.text.disp,
1335                                             skip_article);
1336
1337                         pp2_relevance_token_next(prt);
1338                          
1339                         sort_str = pp2_get_sort(prt);
1340                          
1341                         cluster->sortkeys[sk_field_id]->text.disp = 
1342                             rec_md->data.text.disp;
1343                         if (!sort_str)
1344                         {
1345                             sort_str = rec_md->data.text.disp;
1346                             yaz_log(YLOG_WARN, 
1347                                     "Could not make sortkey. Bug #1858");
1348                         }
1349                         cluster->sortkeys[sk_field_id]->text.sort = 
1350                             nmem_strdup(se->nmem, sort_str);
1351 #if 0
1352                         yaz_log(YLOG_LOG, "text disp=%s",
1353                                 cluster->sortkeys[sk_field_id]->text.disp);
1354                         yaz_log(YLOG_LOG, "text sort=%s",
1355                                 cluster->sortkeys[sk_field_id]->text.sort);
1356 #endif
1357                         pp2_relevance_token_destroy(prt);
1358                     }
1359                 }
1360             }
1361             else if (ser_md->merge == Metadata_merge_all)
1362             {
1363                 rec_md->next = *wheretoput;
1364                 *wheretoput = rec_md;
1365             }
1366             else if (ser_md->merge == Metadata_merge_range)
1367             {
1368                 if (!*wheretoput)
1369                 {
1370                     *wheretoput = rec_md;
1371                     if (ser_sk)
1372                         cluster->sortkeys[sk_field_id] 
1373                             = &rec_md->data;
1374                 }
1375                 else
1376                 {
1377                     int this_min = rec_md->data.number.min;
1378                     int this_max = rec_md->data.number.max;
1379                     if (this_min < (*wheretoput)->data.number.min)
1380                         (*wheretoput)->data.number.min = this_min;
1381                     if (this_max > (*wheretoput)->data.number.max)
1382                         (*wheretoput)->data.number.max = this_max;
1383                 }
1384             }
1385
1386
1387             // ranking of _all_ fields enabled ... 
1388             if (ser_md->rank)
1389                 relevance_countwords(se->relevance, cluster, 
1390                                      (char *) value, ser_md->rank,
1391                                      ser_md->name);
1392
1393             // construct facets ... unless the client already has reported them
1394             if (ser_md->termlist && !client_has_facet(cl, (char *) type))
1395             {
1396
1397                 if (ser_md->type == Metadata_type_year)
1398                 {
1399                     char year[64];
1400                     sprintf(year, "%d", rec_md->data.number.max);
1401                     add_facet(se, (char *) type, year, 1);
1402                     if (rec_md->data.number.max != rec_md->data.number.min)
1403                     {
1404                         sprintf(year, "%d", rec_md->data.number.min);
1405                         add_facet(se, (char *) type, year, 1);
1406                     }
1407                 }
1408                 else
1409                     add_facet(se, (char *) type, (char *) value, 1);
1410             }
1411
1412             // cleaning up
1413             xmlFree(type);
1414             xmlFree(value);
1415             type = value = 0;
1416         }
1417         else
1418         {
1419             if (se->number_of_warnings_unknown_elements == 0)
1420                 yaz_log(YLOG_WARN,
1421                         "Unexpected element in internal record: %s", n->name);
1422             se->number_of_warnings_unknown_elements++;
1423         }
1424     }
1425     if (type)
1426         xmlFree(type);
1427     if (value)
1428         xmlFree(value);
1429
1430     relevance_donerecord(se->relevance, cluster);
1431     se->total_records++;
1432
1433     return 0;
1434 }
1435
1436 /*
1437  * Local variables:
1438  * c-basic-offset: 4
1439  * c-file-style: "Stroustrup"
1440  * indent-tabs-mode: nil
1441  * End:
1442  * vim: shiftwidth=4 tabstop=8 expandtab
1443  */
1444