Refactor: hide reclist structure
[pazpar2-moved-to-github.git] / src / logic.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2009 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file logic.c
21     \brief high-level logic; mostly user sessions and settings
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <string.h>
31 #if HAVE_SYS_TIME_H
32 #include <sys/time.h>
33 #endif
34 #if HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif
37 #include <signal.h>
38 #include <ctype.h>
39 #include <assert.h>
40
41 #include <yaz/marcdisp.h>
42 #include <yaz/comstack.h>
43 #include <yaz/tcpip.h>
44 #include <yaz/proto.h>
45 #include <yaz/readconf.h>
46 #include <yaz/pquery.h>
47 #include <yaz/otherinfo.h>
48 #include <yaz/yaz-util.h>
49 #include <yaz/nmem.h>
50 #include <yaz/query-charset.h>
51 #include <yaz/querytowrbuf.h>
52 #include <yaz/oid_db.h>
53 #include <yaz/snprintf.h>
54
55 #define USE_TIMING 0
56 #if USE_TIMING
57 #include <yaz/timing.h>
58 #endif
59
60 #include "parameters.h"
61 #include "pazpar2.h"
62 #include "eventl.h"
63 #include "http.h"
64 #include "termlists.h"
65 #include "reclists.h"
66 #include "relevance.h"
67 #include "database.h"
68 #include "client.h"
69 #include "settings.h"
70 #include "normalize7bit.h"
71
72 #define TERMLIST_HIGH_SCORE 25
73
74 #define MAX_CHUNK 15
75
76 // Note: Some things in this structure will eventually move to configuration
77 struct parameters global_parameters = 
78 {
79     0,   // dump_records
80     0,   // debug_mode
81     100,
82 };
83
84 static void log_xml_doc(xmlDoc *doc)
85 {
86     FILE *lf = yaz_log_file();
87     xmlChar *result = 0;
88     int len = 0;
89 #if LIBXML_VERSION >= 20600
90     xmlDocDumpFormatMemory(doc, &result, &len, 1);
91 #else
92     xmlDocDumpMemory(doc, &result, &len);
93 #endif
94     if (lf && len)
95     {
96         fwrite(result, 1, len, lf);
97         fprintf(lf, "\n");
98     }
99     xmlFree(result);
100 }
101
102 // Recursively traverse query structure to extract terms.
103 void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
104 {
105     char **words;
106     int numwords;
107     int i;
108
109     switch (n->kind)
110     {
111     case CCL_RPN_AND:
112     case CCL_RPN_OR:
113     case CCL_RPN_NOT:
114     case CCL_RPN_PROX:
115         pull_terms(nmem, n->u.p[0], termlist, num);
116         pull_terms(nmem, n->u.p[1], termlist, num);
117         break;
118     case CCL_RPN_TERM:
119         nmem_strsplit(nmem, " ", n->u.t.term, &words, &numwords);
120         for (i = 0; i < numwords; i++)
121             termlist[(*num)++] = words[i];
122         break;
123     default: // NOOP
124         break;
125     }
126 }
127
128
129
130 static void add_facet(struct session *s, const char *type, const char *value)
131 {
132     int i;
133
134     if (!*value)
135         return;
136     for (i = 0; i < s->num_termlists; i++)
137         if (!strcmp(s->termlists[i].name, type))
138             break;
139     if (i == s->num_termlists)
140     {
141         if (i == SESSION_MAX_TERMLISTS)
142         {
143             yaz_log(YLOG_FATAL, "Too many termlists");
144             return;
145         }
146
147         s->termlists[i].name = nmem_strdup(s->nmem, type);
148         s->termlists[i].termlist 
149             = termlist_create(s->nmem, s->expected_maxrecs,
150                               TERMLIST_HIGH_SCORE);
151         s->num_termlists = i + 1;
152     }
153     termlist_insert(s->termlists[i].termlist, value);
154 }
155
156 static xmlDoc *record_to_xml(struct session_database *sdb, const char *rec)
157 {
158     struct database *db = sdb->database;
159     xmlDoc *rdoc = 0;
160
161     rdoc = xmlParseMemory(rec, strlen(rec));
162
163     if (!rdoc)
164     {
165         yaz_log(YLOG_FATAL, "Non-wellformed XML received from %s",
166                 db->url);
167         return 0;
168     }
169
170     if (global_parameters.dump_records)
171     {
172         yaz_log(YLOG_LOG, "Un-normalized record from %s", db->url);
173         log_xml_doc(rdoc);
174     }
175
176     return rdoc;
177 }
178
179 #define MAX_XSLT_ARGS 16
180
181 // Add static values from session database settings if applicable
182 static void insert_settings_parameters(struct session_database *sdb,
183                                        struct session *se, char **parms)
184 {
185     struct conf_service *service = se->service;
186     int i;
187     int nparms = 0;
188     int offset = 0;
189
190     for (i = 0; i < service->num_metadata; i++)
191     {
192         struct conf_metadata *md = &service->metadata[i];
193         int setting;
194
195         if (md->setting == Metadata_setting_parameter &&
196             (setting = settings_offset(service, md->name)) > 0)
197         {
198             const char *val = session_setting_oneval(sdb, setting);
199             if (val && nparms < MAX_XSLT_ARGS)
200             {
201                 char *buf;
202                 int len = strlen(val);
203                 buf = nmem_malloc(se->nmem, len + 3);
204                 buf[0] = '\'';
205                 strcpy(buf + 1, val);
206                 buf[len+1] = '\'';
207                 buf[len+2] = '\0';
208                 parms[offset++] = md->name;
209                 parms[offset++] = buf;
210                 nparms++;
211             }
212         }
213     }
214     parms[offset] = 0;
215 }
216
217 // Add static values from session database settings if applicable
218 static void insert_settings_values(struct session_database *sdb, xmlDoc *doc,
219     struct conf_service *service)
220 {
221     int i;
222
223     for (i = 0; i < service->num_metadata; i++)
224     {
225         struct conf_metadata *md = &service->metadata[i];
226         int offset;
227
228         if (md->setting == Metadata_setting_postproc &&
229             (offset = settings_offset(service, md->name)) > 0)
230         {
231             const char *val = session_setting_oneval(sdb, offset);
232             if (val)
233             {
234                 xmlNode *r = xmlDocGetRootElement(doc);
235                 xmlNode *n = xmlNewTextChild(r, 0, (xmlChar *) "metadata",
236                                              (xmlChar *) val);
237                 xmlSetProp(n, (xmlChar *) "type", (xmlChar *) md->name);
238             }
239         }
240     }
241 }
242
243 static xmlDoc *normalize_record(struct session_database *sdb,
244                                 struct session *se,
245                                 const char *rec)
246 {
247     xmlDoc *rdoc = record_to_xml(sdb, rec);
248
249     if (rdoc)
250     {
251         char *parms[MAX_XSLT_ARGS*2+1];
252         
253         insert_settings_parameters(sdb, se, parms);
254         
255         if (normalize_record_transform(sdb->map, &rdoc, (const char **)parms))
256         {
257             yaz_log(YLOG_WARN, "Normalize failed from %s", sdb->database->url);
258         }
259         else
260         {
261             insert_settings_values(sdb, rdoc, se->service);
262             
263             if (global_parameters.dump_records)
264             {
265                 yaz_log(YLOG_LOG, "Normalized record from %s", 
266                         sdb->database->url);
267                 log_xml_doc(rdoc);
268             }
269         }
270     }
271     return rdoc;
272 }
273
274 // Retrieve first defined value for 'name' for given database.
275 // Will be extended to take into account user associated with session
276 const char *session_setting_oneval(struct session_database *db, int offset)
277 {
278     if (!db->settings[offset])
279         return "";
280     return db->settings[offset]->value;
281 }
282
283 // Prepare XSLT stylesheets for record normalization
284 // Structures are allocated on the session_wide nmem to avoid having
285 // to recompute this for every search. This would lead
286 // to leaking if a single session was to repeatedly change the PZ_XSLT
287 // setting. However, this is not a realistic use scenario.
288 static int prepare_map(struct session *se, struct session_database *sdb)
289 {
290     const char *s;
291
292     if (!sdb->settings)
293     {
294         yaz_log(YLOG_WARN, "No settings on %s", sdb->database->url);
295         return -1;
296     }
297     if ((s = session_setting_oneval(sdb, PZ_XSLT)))
298     {
299         char auto_stylesheet[256];
300
301         if (!strcmp(s, "auto"))
302         {
303             const char *request_syntax = session_setting_oneval(
304                 sdb, PZ_REQUESTSYNTAX);
305             if (request_syntax)
306             {
307                 char *cp;
308                 yaz_snprintf(auto_stylesheet, sizeof(auto_stylesheet),
309                              "%s.xsl", request_syntax);
310                 for (cp = auto_stylesheet; *cp; cp++)
311                 {
312                     /* deliberately only consider ASCII */
313                     if (*cp > 32 && *cp < 127)
314                         *cp = tolower(*cp);
315                 }
316                 s = auto_stylesheet;
317             }
318             else
319             {
320                 yaz_log(YLOG_WARN, "No pz:requestsyntax for auto stylesheet");
321             }
322         }
323         sdb->map = normalize_cache_get(se->normalize_cache,
324                                        se->service, s);
325         if (!sdb->map)
326             return -1;
327     }
328     return 0;
329 }
330
331 // This analyzes settings and recomputes any supporting data structures
332 // if necessary.
333 static int prepare_session_database(struct session *se, 
334                                     struct session_database *sdb)
335 {
336     if (!sdb->settings)
337     {
338         yaz_log(YLOG_WARN, 
339                 "No settings associated with %s", sdb->database->url);
340         return -1;
341     }
342     if (sdb->settings[PZ_XSLT] && !sdb->map)
343     {
344         if (prepare_map(se, sdb) < 0)
345             return -1;
346     }
347     return 0;
348 }
349
350 // called if watch should be removed because http_channel is to be destroyed
351 static void session_watch_cancel(void *data, struct http_channel *c,
352                                  void *data2)
353 {
354     struct session_watchentry *ent = data;
355
356     ent->fun = 0;
357     ent->data = 0;
358     ent->obs = 0;
359 }
360
361 // set watch. Returns 0=OK, -1 if watch is already set
362 int session_set_watch(struct session *s, int what, 
363                       session_watchfun fun, void *data,
364                       struct http_channel *chan)
365 {
366     if (s->watchlist[what].fun)
367         return -1;
368     s->watchlist[what].fun = fun;
369     s->watchlist[what].data = data;
370     s->watchlist[what].obs = http_add_observer(chan, &s->watchlist[what],
371                                                session_watch_cancel);
372     return 0;
373 }
374
375 void session_alert_watch(struct session *s, int what)
376 {
377     if (s->watchlist[what].fun)
378     {
379         /* our watch is no longer associated with http_channel */
380         void *data;
381         session_watchfun fun;
382
383         http_remove_observer(s->watchlist[what].obs);
384         fun = s->watchlist[what].fun;
385         data = s->watchlist[what].data;
386
387         /* reset watch before fun is invoked - in case fun wants to set
388            it again */
389         s->watchlist[what].fun = 0;
390         s->watchlist[what].data = 0;
391         s->watchlist[what].obs = 0;
392
393         fun(data);
394     }
395 }
396
397 //callback for grep_databases
398 static void select_targets_callback(void *context, struct session_database *db)
399 {
400     struct session *se = (struct session*) context;
401     struct client *cl = client_create();
402     client_set_database(cl, db);
403     client_set_session(cl, se);
404 }
405
406 // Associates a set of clients with a session;
407 // Note: Session-databases represent databases with per-session 
408 // setting overrides
409 int select_targets(struct session *se, struct database_criterion *crit)
410 {
411     while (se->clients)
412         client_destroy(se->clients);
413
414     return session_grep_databases(se, crit, select_targets_callback);
415 }
416
417 int session_active_clients(struct session *s)
418 {
419     struct client *c;
420     int res = 0;
421
422     for (c = s->clients; c; c = client_next_in_session(c))
423         if (client_is_active(c))
424             res++;
425
426     return res;
427 }
428
429 // parses crit1=val1,crit2=val2|val3,...
430 static struct database_criterion *parse_filter(NMEM m, const char *buf)
431 {
432     struct database_criterion *res = 0;
433     char **values;
434     int num;
435     int i;
436
437     if (!buf || !*buf)
438         return 0;
439     nmem_strsplit(m, ",", buf,  &values, &num);
440     for (i = 0; i < num; i++)
441     {
442         char **subvalues;
443         int subnum;
444         int subi;
445         struct database_criterion *new = nmem_malloc(m, sizeof(*new));
446         char *eq;
447         if ((eq = strchr(values[i], '=')))
448             new->type = PAZPAR2_STRING_MATCH;
449         else if ((eq = strchr(values[i], '~')))
450             new->type = PAZPAR2_SUBSTRING_MATCH;
451         else
452         {
453             yaz_log(YLOG_WARN, "Missing equal-sign/tilde in filter");
454             return 0;
455         }
456         *(eq++) = '\0';
457         new->name = values[i];
458         nmem_strsplit(m, "|", eq, &subvalues, &subnum);
459         new->values = 0;
460         for (subi = 0; subi < subnum; subi++)
461         {
462             struct database_criterion_value *newv
463                 = nmem_malloc(m, sizeof(*newv));
464             newv->value = subvalues[subi];
465             newv->next = new->values;
466             new->values = newv;
467         }
468         new->next = res;
469         res = new;
470     }
471     return res;
472 }
473
474 enum pazpar2_error_code search(struct session *se,
475                                const char *query, const char *filter,
476                                const char **addinfo)
477 {
478     int live_channels = 0;
479     int no_working = 0;
480     int no_failed = 0;
481     struct client *cl;
482     struct database_criterion *criteria;
483
484     yaz_log(YLOG_DEBUG, "Search");
485
486     *addinfo = 0;
487     nmem_reset(se->nmem);
488     se->relevance = 0;
489     se->total_records = se->total_hits = se->total_merged = 0;
490     se->reclist = 0;
491     se->num_termlists = 0;
492     criteria = parse_filter(se->nmem, filter);
493     live_channels = select_targets(se, criteria);
494     if (live_channels)
495     {
496         int maxrecs = live_channels * global_parameters.toget; // This is buggy!!!
497         se->reclist = reclist_create(se->nmem, maxrecs);
498         se->expected_maxrecs = maxrecs;
499     }
500     else
501         return PAZPAR2_NO_TARGETS;
502
503     for (cl = se->clients; cl; cl = client_next_in_session(cl))
504     {
505         if (prepare_session_database(se, client_get_database(cl)) < 0)
506             continue;
507         // Parse query for target
508         if (client_parse_query(cl, query) < 0)
509             no_failed++;
510         else
511         {
512             no_working++;
513             if (client_prep_connection(cl, se->service->z3950_operation_timeout,
514                                        se->service->z3950_session_timeout))
515                 client_start_search(cl);
516         }
517     }
518     if (no_working == 0)
519     {
520         if (no_failed > 0)
521         {
522             *addinfo = "query";
523             return PAZPAR2_MALFORMED_PARAMETER_VALUE;
524         }
525         else
526             return PAZPAR2_NO_TARGETS;
527     }
528     return PAZPAR2_NO_ERROR;
529 }
530
531 // Creates a new session_database object for a database
532 static void session_init_databases_fun(void *context, struct database *db)
533 {
534     struct session *se = (struct session *) context;
535     struct conf_service *service = se->service;
536     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
537     int num = settings_num(service);
538     int i;
539
540     new->database = db;
541     
542     new->map = 0;
543     new->settings 
544         = nmem_malloc(se->session_nmem, sizeof(struct settings *) * num);
545     memset(new->settings, 0, sizeof(struct settings*) * num);
546
547     if (db->settings)
548     {
549         for (i = 0; i < num; i++)
550             new->settings[i] = db->settings[i];
551     }
552     new->next = se->databases;
553     se->databases = new;
554 }
555
556 // Doesn't free memory associated with sdb -- nmem takes care of that
557 static void session_database_destroy(struct session_database *sdb)
558 {
559     sdb->map = 0;
560 }
561
562 // Initialize session_database list -- this represents this session's view
563 // of the database list -- subject to modification by the settings ws command
564 void session_init_databases(struct session *se)
565 {
566     se->databases = 0;
567     predef_grep_databases(se, se->service, 0, session_init_databases_fun);
568 }
569
570 // Probably session_init_databases_fun should be refactored instead of
571 // called here.
572 static struct session_database *load_session_database(struct session *se, 
573                                                       char *id)
574 {
575     struct database *db = find_database(id, 0, se->service);
576
577     resolve_database(db);
578
579     session_init_databases_fun((void*) se, db);
580     // New sdb is head of se->databases list
581     return se->databases;
582 }
583
584 // Find an existing session database. If not found, load it
585 static struct session_database *find_session_database(struct session *se, 
586                                                       char *id)
587 {
588     struct session_database *sdb;
589
590     for (sdb = se->databases; sdb; sdb = sdb->next)
591         if (!strcmp(sdb->database->url, id))
592             return sdb;
593     return load_session_database(se, id);
594 }
595
596 // Apply a session override to a database
597 void session_apply_setting(struct session *se, char *dbname, char *setting,
598                            char *value)
599 {
600     struct session_database *sdb = find_session_database(se, dbname);
601     struct conf_service *service = se->service;
602     struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
603     int offset = settings_offset_cprefix(service, setting);
604
605     if (offset < 0)
606     {
607         yaz_log(YLOG_WARN, "Unknown setting %s", setting);
608         return;
609     }
610     // Jakub: This breaks the filter setting.
611     /*if (offset == PZ_ID)
612       {
613       yaz_log(YLOG_WARN, "No need to set pz:id setting. Ignoring");
614       return;
615       }*/
616     new->precedence = 0;
617     new->target = dbname;
618     new->name = setting;
619     new->value = value;
620     new->next = sdb->settings[offset];
621     sdb->settings[offset] = new;
622
623     // Force later recompute of settings-driven data structures
624     // (happens when a search starts and client connections are prepared)
625     switch (offset)
626     {
627     case PZ_XSLT:
628         if (sdb->map)
629         {
630             sdb->map = 0;
631         }
632         break;
633     }
634 }
635
636 void destroy_session(struct session *s)
637 {
638     struct session_database *sdb;
639
640     while (s->clients)
641         client_destroy(s->clients);
642     for (sdb = s->databases; sdb; sdb = sdb->next)
643         session_database_destroy(sdb);
644     normalize_cache_destroy(s->normalize_cache);
645     nmem_destroy(s->nmem);
646     service_destroy(s->service);
647     wrbuf_destroy(s->wrbuf);
648 }
649
650 struct session *new_session(NMEM nmem, struct conf_service *service) 
651 {
652     int i;
653     struct session *session = nmem_malloc(nmem, sizeof(*session));
654
655     yaz_log(YLOG_DEBUG, "New Pazpar2 session");
656
657     session->service = service;
658     session->relevance = 0;
659     session->total_hits = 0;
660     session->total_records = 0;
661     session->number_of_warnings_unknown_elements = 0;
662     session->number_of_warnings_unknown_metadata = 0;
663     session->num_termlists = 0;
664     session->reclist = 0;
665     session->clients = 0;
666     session->expected_maxrecs = 0;
667     session->session_nmem = nmem;
668     session->nmem = nmem_create();
669     session->wrbuf = wrbuf_alloc();
670     session->databases = 0;
671     for (i = 0; i <= SESSION_WATCH_MAX; i++)
672     {
673         session->watchlist[i].data = 0;
674         session->watchlist[i].fun = 0;
675     }
676     session->normalize_cache = normalize_cache_create();
677
678     return session;
679 }
680
681 struct hitsbytarget *hitsbytarget(struct session *se, int *count, NMEM nmem)
682 {
683     struct hitsbytarget *res = 0;
684     struct client *cl;
685     size_t sz = 0;
686
687     for (cl = se->clients; cl; cl = client_next_in_session(cl))
688         sz++;
689
690     res = nmem_malloc(nmem, sizeof(*res) * sz);
691     *count = 0;
692     for (cl = se->clients; cl; cl = client_next_in_session(cl))
693     {
694         const char *name = session_setting_oneval(client_get_database(cl),
695                                                   PZ_NAME);
696
697         res[*count].id = client_get_database(cl)->database->url;
698         res[*count].name = *name ? name : "Unknown";
699         res[*count].hits = client_get_hits(cl);
700         res[*count].records = client_get_num_records(cl);
701         res[*count].diagnostic = client_get_diagnostic(cl);
702         res[*count].state = client_get_state_str(cl);
703         res[*count].connected  = client_get_connection(cl) ? 1 : 0;
704         (*count)++;
705     }
706     return res;
707 }
708
709 struct termlist_score **termlist(struct session *s, const char *name, int *num)
710 {
711     int i;
712
713     for (i = 0; i < s->num_termlists; i++)
714         if (!strcmp((const char *) s->termlists[i].name, name))
715             return termlist_highscore(s->termlists[i].termlist, num);
716     return 0;
717 }
718
719 #ifdef MISSING_HEADERS
720 void report_nmem_stats(void)
721 {
722     size_t in_use, is_free;
723
724     nmem_get_memory_in_use(&in_use);
725     nmem_get_memory_free(&is_free);
726
727     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
728             (long) in_use, (long) is_free);
729 }
730 #endif
731
732 struct record_cluster *show_single(struct session *s, const char *id,
733                                    struct record_cluster **prev_r,
734                                    struct record_cluster **next_r)
735 {
736     struct record_cluster *r;
737
738     reclist_rewind(s->reclist);
739     *prev_r = 0;
740     *next_r = 0;
741     while ((r = reclist_read_record(s->reclist)))
742     {
743         if (!strcmp(r->recid, id))
744         {
745             *next_r = reclist_read_record(s->reclist);
746             return r;
747         }
748         *prev_r = r;
749     }
750     return 0;
751 }
752
753 struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, 
754                              int start, int *num, int *total, int *sumhits, 
755                              NMEM nmem_show)
756 {
757     struct record_cluster **recs = nmem_malloc(nmem_show, *num 
758                                                * sizeof(struct record_cluster *));
759     struct reclist_sortparms *spp;
760     int i;
761 #if USE_TIMING    
762     yaz_timing_t t = yaz_timing_create();
763 #endif
764
765     if (!s->relevance)
766     {
767         *num = 0;
768         *total = 0;
769         *sumhits = 0;
770         recs = 0;
771     }
772     else
773     {
774         for (spp = sp; spp; spp = spp->next)
775             if (spp->type == Metadata_sortkey_relevance)
776             {
777                 relevance_prepare_read(s->relevance, s->reclist);
778                 break;
779             }
780         reclist_sort(s->reclist, sp);
781         
782         *total = reclist_get_num_records(s->reclist);
783         *sumhits = s->total_hits;
784         
785         for (i = 0; i < start; i++)
786             if (!reclist_read_record(s->reclist))
787             {
788                 *num = 0;
789                 recs = 0;
790                 break;
791             }
792         
793         for (i = 0; i < *num; i++)
794         {
795             struct record_cluster *r = reclist_read_record(s->reclist);
796             if (!r)
797             {
798                 *num = i;
799                 break;
800             }
801             recs[i] = r;
802         }
803     }
804 #if USE_TIMING
805     yaz_timing_stop(t);
806     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
807             yaz_timing_get_real(t), yaz_timing_get_user(t),
808             yaz_timing_get_sys(t));
809     yaz_timing_destroy(&t);
810 #endif
811     return recs;
812 }
813
814 void statistics(struct session *se, struct statistics *stat)
815 {
816     struct client *cl;
817     int count = 0;
818
819     memset(stat, 0, sizeof(*stat));
820     for (cl = se->clients; cl; cl = client_next_in_session(cl))
821     {
822         if (!client_get_connection(cl))
823             stat->num_no_connection++;
824         switch (client_get_state(cl))
825         {
826         case Client_Connecting: stat->num_connecting++; break;
827         case Client_Working: stat->num_working++; break;
828         case Client_Idle: stat->num_idle++; break;
829         case Client_Failed: stat->num_failed++; break;
830         case Client_Error: stat->num_error++; break;
831         default: break;
832         }
833         count++;
834     }
835     stat->num_hits = se->total_hits;
836     stat->num_records = se->total_records;
837
838     stat->num_clients = count;
839 }
840
841
842 // Master list of connections we're handling events to
843 static IOCHAN channel_list = 0;  /* thread pr */
844
845 void pazpar2_add_channel(IOCHAN chan)
846 {
847     chan->next = channel_list;
848     channel_list = chan;
849 }
850
851 void pazpar2_event_loop()
852 {
853     event_loop(&channel_list);
854 }
855
856 static struct record_metadata *record_metadata_init(
857     NMEM nmem, char *value, enum conf_metadata_type type)
858 {
859     struct record_metadata *rec_md = record_metadata_create(nmem);
860     if (type == Metadata_type_generic)
861     {
862         char * p = value;
863         p = normalize7bit_generic(p, " ,/.:([");
864         
865         rec_md->data.text.disp = nmem_strdup(nmem, p);
866         rec_md->data.text.sort = 0;
867     }
868     else if (type == Metadata_type_year || type == Metadata_type_date)
869     {
870         int first, last;
871         int longdate = 0;
872
873         if (type == Metadata_type_date)
874             longdate = 1;
875         if (extract7bit_dates((char *) value, &first, &last, longdate) < 0)
876             return 0;
877
878         rec_md->data.number.min = first;
879         rec_md->data.number.max = last;
880     }
881     else
882         return 0;
883     return rec_md;
884 }
885
886 static int get_mergekey_from_doc(xmlDoc *doc, xmlNode *root, const char *name,
887                                  struct conf_service *service, WRBUF norm_wr)
888 {
889     xmlNode *n;
890     int no_found = 0;
891     for (n = root->children; n; n = n->next)
892     {
893         if (n->type != XML_ELEMENT_NODE)
894             continue;
895         if (!strcmp((const char *) n->name, "metadata"))
896         {
897             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
898             if (!strcmp(name, (const char *) type))
899             {
900                 xmlChar *value = xmlNodeListGetString(doc, n->children, 1);
901                 if (value)
902                 {
903                     const char *norm_str;
904                     pp2_relevance_token_t prt =
905                         pp2_relevance_tokenize(
906                             service->mergekey_pct,
907                             (const char *) value);
908                     
909                     wrbuf_puts(norm_wr, name);
910                     wrbuf_puts(norm_wr, "=");
911                     while ((norm_str =
912                             pp2_relevance_token_next(prt)))
913                     {
914                         if (*norm_str)
915                         {
916                             if (wrbuf_len(norm_wr))
917                                 wrbuf_puts(norm_wr, " ");
918                             wrbuf_puts(norm_wr, norm_str);
919                         }
920                     }
921                     xmlFree(value);
922                     pp2_relevance_token_destroy(prt);
923                     no_found++;
924                 }
925             }
926             xmlFree(type);
927         }
928     }
929     return no_found;
930 }
931
932 static const char *get_mergekey(xmlDoc *doc, struct client *cl, int record_no,
933                                 struct conf_service *service, NMEM nmem)
934 {
935     char *mergekey_norm = 0;
936     xmlNode *root = xmlDocGetRootElement(doc);
937     WRBUF norm_wr = wrbuf_alloc();
938
939     /* consider mergekey from XSL first */
940     xmlChar *mergekey = xmlGetProp(root, (xmlChar *) "mergekey");
941     if (mergekey)
942     {
943         const char *norm_str;
944         pp2_relevance_token_t prt =
945             pp2_relevance_tokenize(
946                 service->mergekey_pct,
947                 (const char *) mergekey);
948         
949         while ((norm_str = pp2_relevance_token_next(prt)))
950         {
951             if (*norm_str)
952             {
953                 if (wrbuf_len(norm_wr))
954                     wrbuf_puts(norm_wr, " ");
955                 wrbuf_puts(norm_wr, norm_str);
956             }
957         }
958         pp2_relevance_token_destroy(prt);
959         xmlFree(mergekey);
960     }
961     else
962     {
963         /* no mergekey defined in XSL. Look for mergekey metadata instead */
964         int field_id;
965         for (field_id = 0; field_id < service->num_metadata; field_id++)
966         {
967             struct conf_metadata *ser_md = &service->metadata[field_id];
968             if (ser_md->mergekey != Metadata_mergekey_no)
969             {
970                 int r = get_mergekey_from_doc(doc, root, ser_md->name,
971                                               service, norm_wr);
972                 if (r == 0 && ser_md->mergekey == Metadata_mergekey_required)
973                 {
974                     /* no mergekey on this one and it is required.. 
975                        Generate unique key instead */
976                     wrbuf_rewind(norm_wr);
977                     break;
978                 }
979             }
980         }
981     }
982
983     /* generate unique key if none is not generated already or is empty */
984     if (wrbuf_len(norm_wr) == 0)
985     {
986         wrbuf_printf(norm_wr, "%s-%d",
987                      client_get_database(cl)->database->url, record_no);
988     }
989     if (wrbuf_len(norm_wr) > 0)
990         mergekey_norm = nmem_strdup(nmem, wrbuf_cstr(norm_wr));
991     wrbuf_destroy(norm_wr);
992     return mergekey_norm;
993 }
994
995 /** \brief see if metadata for pz:recordfilter exists 
996     \param root xml root element of normalized record
997     \param sdb session database for client
998     \retval 0 if there is no metadata for pz:recordfilter
999     \retval 1 if there is metadata for pz:recordfilter
1000
1001     If there is no pz:recordfilter defined, this function returns 1
1002     as well.
1003 */
1004     
1005 static int check_record_filter(xmlNode *root, struct session_database *sdb)
1006 {
1007     int match = 0;
1008     xmlNode *n;
1009     const char *s;
1010     s = session_setting_oneval(sdb, PZ_RECORDFILTER);
1011
1012     if (!s || !*s)
1013         return 1;
1014
1015     for (n = root->children; n; n = n->next)
1016     {
1017         if (n->type != XML_ELEMENT_NODE)
1018             continue;
1019         if (!strcmp((const char *) n->name, "metadata"))
1020         {
1021             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
1022             if (type)
1023             {
1024                 size_t len;
1025                 const char *eq = strchr(s, '=');
1026                 if (eq)
1027                     len = eq - s;
1028                 else
1029                     len = strlen(s);
1030                 if (len == strlen((const char *)type) &&
1031                     !memcmp((const char *) type, s, len))
1032                 {
1033                     xmlChar *value = xmlNodeGetContent(n);
1034                     if (value && *value)
1035                     {
1036                         if (!eq || strstr((const char *) value, eq+1))
1037                             match = 1;
1038                     }
1039                     xmlFree(value);
1040                 }
1041                 xmlFree(type);
1042             }
1043         }
1044     }
1045     return match;
1046 }
1047
1048
1049 /** \brief ingest XML record
1050     \param cl client holds the result set for record
1051     \param rec record buffer (0 terminated)
1052     \param record_no record position (1, 2, ..)
1053     \returns resulting record or NULL on failure
1054 */
1055 struct record *ingest_record(struct client *cl, const char *rec,
1056                              int record_no)
1057 {
1058     struct session_database *sdb = client_get_database(cl);
1059     struct session *se = client_get_session(cl);
1060     xmlDoc *xdoc = normalize_record(sdb, se, rec);
1061     xmlNode *root, *n;
1062     struct record *record;
1063     struct record_cluster *cluster;
1064     const char *mergekey_norm;
1065     xmlChar *type = 0;
1066     xmlChar *value = 0;
1067     struct conf_service *service = se->service;
1068
1069     if (!xdoc)
1070         return 0;
1071
1072     root = xmlDocGetRootElement(xdoc);
1073
1074     if (!check_record_filter(root, sdb))
1075     {
1076         yaz_log(YLOG_WARN, "Filtered out record no %d from %s", record_no,
1077             sdb->database->url);
1078         xmlFreeDoc(xdoc);
1079         return 0;
1080     }
1081
1082     mergekey_norm = get_mergekey(xdoc, cl, record_no, service, se->nmem);
1083     if (!mergekey_norm)
1084     {
1085         yaz_log(YLOG_WARN, "Got no mergekey");
1086         xmlFreeDoc(xdoc);
1087         return 0;
1088     }
1089     record = record_create(se->nmem, 
1090                            service->num_metadata, service->num_sortkeys, cl,
1091                            record_no);
1092
1093     cluster = reclist_insert(se->reclist, 
1094                              service, 
1095                              record, (char *) mergekey_norm, 
1096                              &se->total_merged);
1097     if (global_parameters.dump_records)
1098         yaz_log(YLOG_LOG, "Cluster id %s from %s (#%d)", cluster->recid,
1099                 sdb->database->url, record_no);
1100     if (!cluster)
1101     {
1102         /* no room for record */
1103         xmlFreeDoc(xdoc);
1104         return 0;
1105     }
1106     relevance_newrec(se->relevance, cluster);
1107     
1108     // now parsing XML record and adding data to cluster or record metadata
1109     for (n = root->children; n; n = n->next)
1110     {
1111         pp2_relevance_token_t prt;
1112         if (type)
1113             xmlFree(type);
1114         if (value)
1115             xmlFree(value);
1116         type = value = 0;
1117         
1118         if (n->type != XML_ELEMENT_NODE)
1119             continue;
1120         if (!strcmp((const char *) n->name, "metadata"))
1121         {
1122             struct conf_metadata *ser_md = 0;
1123             struct conf_sortkey *ser_sk = 0;
1124             struct record_metadata **wheretoput = 0;
1125             struct record_metadata *rec_md = 0;
1126             int md_field_id = -1;
1127             int sk_field_id = -1;
1128             
1129             type = xmlGetProp(n, (xmlChar *) "type");
1130             value = xmlNodeListGetString(xdoc, n->children, 1);
1131             
1132             if (!type || !value || !*value)
1133                 continue;
1134             
1135             md_field_id 
1136                 = conf_service_metadata_field_id(service, (const char *) type);
1137             if (md_field_id < 0)
1138             {
1139                 if (se->number_of_warnings_unknown_metadata == 0)
1140                 {
1141                     yaz_log(YLOG_WARN, 
1142                             "Ignoring unknown metadata element: %s", type);
1143                 }
1144                 se->number_of_warnings_unknown_metadata++;
1145                 continue;
1146             }
1147             
1148             ser_md = &service->metadata[md_field_id];
1149             
1150             if (ser_md->sortkey_offset >= 0){
1151                 sk_field_id = ser_md->sortkey_offset;
1152                 ser_sk = &service->sortkeys[sk_field_id];
1153             }
1154
1155             // non-merged metadata
1156             rec_md = record_metadata_init(se->nmem, (char *) value,
1157                                           ser_md->type);
1158             if (!rec_md)
1159             {
1160                 yaz_log(YLOG_WARN, "bad metadata data '%s' for element '%s'",
1161                         value, type);
1162                 continue;
1163             }
1164             wheretoput = &record->metadata[md_field_id];
1165             while (*wheretoput)
1166                 wheretoput = &(*wheretoput)->next;
1167             *wheretoput = rec_md;
1168
1169             // merged metadata
1170             rec_md = record_metadata_init(se->nmem, (char *) value,
1171                                           ser_md->type);
1172             wheretoput = &cluster->metadata[md_field_id];
1173
1174             // and polulate with data:
1175             // assign cluster or record based on merge action
1176             if (ser_md->merge == Metadata_merge_unique)
1177             {
1178                 struct record_metadata *mnode;
1179                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
1180                     if (!strcmp((const char *) mnode->data.text.disp, 
1181                                 rec_md->data.text.disp))
1182                         break;
1183                 if (!mnode)
1184                 {
1185                     rec_md->next = *wheretoput;
1186                     *wheretoput = rec_md;
1187                 }
1188             }
1189             else if (ser_md->merge == Metadata_merge_longest)
1190             {
1191                 if (!*wheretoput 
1192                     || strlen(rec_md->data.text.disp) 
1193                     > strlen((*wheretoput)->data.text.disp))
1194                 {
1195                     *wheretoput = rec_md;
1196                     if (ser_sk)
1197                     {
1198                         const char *sort_str = 0;
1199                         int skip_article = 
1200                             ser_sk->type == Metadata_sortkey_skiparticle;
1201
1202                         if (!cluster->sortkeys[sk_field_id])
1203                             cluster->sortkeys[sk_field_id] = 
1204                                 nmem_malloc(se->nmem, 
1205                                             sizeof(union data_types));
1206                          
1207                         prt = pp2_relevance_tokenize(
1208                             service->sort_pct,
1209                             rec_md->data.text.disp);
1210
1211                         pp2_relevance_token_next(prt);
1212                          
1213                         sort_str = pp2_get_sort(prt, skip_article);
1214                          
1215                         cluster->sortkeys[sk_field_id]->text.disp = 
1216                             rec_md->data.text.disp;
1217                         if (!sort_str)
1218                         {
1219                             sort_str = rec_md->data.text.disp;
1220                             yaz_log(YLOG_WARN, 
1221                                     "Could not make sortkey. Bug #1858");
1222                         }
1223                         cluster->sortkeys[sk_field_id]->text.sort = 
1224                             nmem_strdup(se->nmem, sort_str);
1225 #if 0
1226                         yaz_log(YLOG_LOG, "text disp=%s",
1227                                 cluster->sortkeys[sk_field_id]->text.disp);
1228                         yaz_log(YLOG_LOG, "text sort=%s",
1229                                 cluster->sortkeys[sk_field_id]->text.sort);
1230 #endif
1231                         pp2_relevance_token_destroy(prt);
1232                     }
1233                 }
1234             }
1235             else if (ser_md->merge == Metadata_merge_all)
1236             {
1237                 rec_md->next = *wheretoput;
1238                 *wheretoput = rec_md;
1239             }
1240             else if (ser_md->merge == Metadata_merge_range)
1241             {
1242                 if (!*wheretoput)
1243                 {
1244                     *wheretoput = rec_md;
1245                     if (ser_sk)
1246                         cluster->sortkeys[sk_field_id] 
1247                             = &rec_md->data;
1248                 }
1249                 else
1250                 {
1251                     int this_min = rec_md->data.number.min;
1252                     int this_max = rec_md->data.number.max;
1253                     if (this_min < (*wheretoput)->data.number.min)
1254                         (*wheretoput)->data.number.min = this_min;
1255                     if (this_max > (*wheretoput)->data.number.max)
1256                         (*wheretoput)->data.number.max = this_max;
1257                 }
1258             }
1259
1260
1261             // ranking of _all_ fields enabled ... 
1262             if (ser_md->rank)
1263                 relevance_countwords(se->relevance, cluster, 
1264                                      (char *) value, ser_md->rank);
1265
1266             // construct facets ... 
1267             if (ser_md->termlist)
1268             {
1269                 if (ser_md->type == Metadata_type_year)
1270                 {
1271                     char year[64];
1272                     sprintf(year, "%d", rec_md->data.number.max);
1273                     add_facet(se, (char *) type, year);
1274                     if (rec_md->data.number.max != rec_md->data.number.min)
1275                     {
1276                         sprintf(year, "%d", rec_md->data.number.min);
1277                         add_facet(se, (char *) type, year);
1278                     }
1279                 }
1280                 else
1281                     add_facet(se, (char *) type, (char *) value);
1282             }
1283
1284             // cleaning up
1285             xmlFree(type);
1286             xmlFree(value);
1287             type = value = 0;
1288         }
1289         else
1290         {
1291             if (se->number_of_warnings_unknown_elements == 0)
1292                 yaz_log(YLOG_WARN,
1293                         "Unexpected element in internal record: %s", n->name);
1294             se->number_of_warnings_unknown_elements++;
1295         }
1296     }
1297     if (type)
1298         xmlFree(type);
1299     if (value)
1300         xmlFree(value);
1301
1302     xmlFreeDoc(xdoc);
1303
1304     relevance_donerecord(se->relevance, cluster);
1305     se->total_records++;
1306
1307     return record;
1308 }
1309
1310
1311
1312 /*
1313  * Local variables:
1314  * c-basic-offset: 4
1315  * c-file-style: "Stroustrup"
1316  * indent-tabs-mode: nil
1317  * End:
1318  * vim: shiftwidth=4 tabstop=8 expandtab
1319  */
1320