db5b06d8d3e9bf9f87598f428593b04be3a718e6
[pazpar2-moved-to-github.git] / src / logic.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2009 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file logic.c
21     \brief high-level logic; mostly user sessions and settings
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <string.h>
31 #if HAVE_SYS_TIME_H
32 #include <sys/time.h>
33 #endif
34 #if HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif
37 #include <signal.h>
38 #include <ctype.h>
39 #include <assert.h>
40
41 #include <yaz/marcdisp.h>
42 #include <yaz/comstack.h>
43 #include <yaz/tcpip.h>
44 #include <yaz/proto.h>
45 #include <yaz/readconf.h>
46 #include <yaz/pquery.h>
47 #include <yaz/otherinfo.h>
48 #include <yaz/yaz-util.h>
49 #include <yaz/nmem.h>
50 #include <yaz/query-charset.h>
51 #include <yaz/querytowrbuf.h>
52 #include <yaz/oid_db.h>
53 #include <yaz/snprintf.h>
54
55 #define USE_TIMING 0
56 #if USE_TIMING
57 #include <yaz/timing.h>
58 #endif
59
60 #include "parameters.h"
61 #include "pazpar2.h"
62 #include "eventl.h"
63 #include "http.h"
64 #include "termlists.h"
65 #include "reclists.h"
66 #include "relevance.h"
67 #include "database.h"
68 #include "client.h"
69 #include "settings.h"
70 #include "normalize7bit.h"
71
72 #define TERMLIST_HIGH_SCORE 25
73
74 #define MAX_CHUNK 15
75
76 // Note: Some things in this structure will eventually move to configuration
77 struct parameters global_parameters = 
78 {
79     0,   // dump_records
80     0    // debug_mode
81 };
82
83 static void log_xml_doc(xmlDoc *doc)
84 {
85     FILE *lf = yaz_log_file();
86     xmlChar *result = 0;
87     int len = 0;
88 #if LIBXML_VERSION >= 20600
89     xmlDocDumpFormatMemory(doc, &result, &len, 1);
90 #else
91     xmlDocDumpMemory(doc, &result, &len);
92 #endif
93     if (lf && len)
94     {
95         fwrite(result, 1, len, lf);
96         fprintf(lf, "\n");
97     }
98     xmlFree(result);
99 }
100
101 // Recursively traverse query structure to extract terms.
102 void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
103 {
104     char **words;
105     int numwords;
106     int i;
107
108     switch (n->kind)
109     {
110     case CCL_RPN_AND:
111     case CCL_RPN_OR:
112     case CCL_RPN_NOT:
113     case CCL_RPN_PROX:
114         pull_terms(nmem, n->u.p[0], termlist, num);
115         pull_terms(nmem, n->u.p[1], termlist, num);
116         break;
117     case CCL_RPN_TERM:
118         nmem_strsplit(nmem, " ", n->u.t.term, &words, &numwords);
119         for (i = 0; i < numwords; i++)
120             termlist[(*num)++] = words[i];
121         break;
122     default: // NOOP
123         break;
124     }
125 }
126
127
128
129 static void add_facet(struct session *s, const char *type, const char *value)
130 {
131     int i;
132
133     if (!*value)
134         return;
135     for (i = 0; i < s->num_termlists; i++)
136         if (!strcmp(s->termlists[i].name, type))
137             break;
138     if (i == s->num_termlists)
139     {
140         if (i == SESSION_MAX_TERMLISTS)
141         {
142             yaz_log(YLOG_FATAL, "Too many termlists");
143             return;
144         }
145
146         s->termlists[i].name = nmem_strdup(s->nmem, type);
147         s->termlists[i].termlist 
148             = termlist_create(s->nmem, TERMLIST_HIGH_SCORE);
149         s->num_termlists = i + 1;
150     }
151     termlist_insert(s->termlists[i].termlist, value);
152 }
153
154 static xmlDoc *record_to_xml(struct session_database *sdb, const char *rec)
155 {
156     struct database *db = sdb->database;
157     xmlDoc *rdoc = 0;
158
159     rdoc = xmlParseMemory(rec, strlen(rec));
160
161     if (!rdoc)
162     {
163         yaz_log(YLOG_FATAL, "Non-wellformed XML received from %s",
164                 db->url);
165         return 0;
166     }
167
168     if (global_parameters.dump_records)
169     {
170         yaz_log(YLOG_LOG, "Un-normalized record from %s", db->url);
171         log_xml_doc(rdoc);
172     }
173
174     return rdoc;
175 }
176
177 #define MAX_XSLT_ARGS 16
178
179 // Add static values from session database settings if applicable
180 static void insert_settings_parameters(struct session_database *sdb,
181                                        struct session *se, char **parms)
182 {
183     struct conf_service *service = se->service;
184     int i;
185     int nparms = 0;
186     int offset = 0;
187
188     for (i = 0; i < service->num_metadata; i++)
189     {
190         struct conf_metadata *md = &service->metadata[i];
191         int setting;
192
193         if (md->setting == Metadata_setting_parameter &&
194             (setting = settings_offset(service, md->name)) > 0)
195         {
196             const char *val = session_setting_oneval(sdb, setting);
197             if (val && nparms < MAX_XSLT_ARGS)
198             {
199                 char *buf;
200                 int len = strlen(val);
201                 buf = nmem_malloc(se->nmem, len + 3);
202                 buf[0] = '\'';
203                 strcpy(buf + 1, val);
204                 buf[len+1] = '\'';
205                 buf[len+2] = '\0';
206                 parms[offset++] = md->name;
207                 parms[offset++] = buf;
208                 nparms++;
209             }
210         }
211     }
212     parms[offset] = 0;
213 }
214
215 // Add static values from session database settings if applicable
216 static void insert_settings_values(struct session_database *sdb, xmlDoc *doc,
217     struct conf_service *service)
218 {
219     int i;
220
221     for (i = 0; i < service->num_metadata; i++)
222     {
223         struct conf_metadata *md = &service->metadata[i];
224         int offset;
225
226         if (md->setting == Metadata_setting_postproc &&
227             (offset = settings_offset(service, md->name)) > 0)
228         {
229             const char *val = session_setting_oneval(sdb, offset);
230             if (val)
231             {
232                 xmlNode *r = xmlDocGetRootElement(doc);
233                 xmlNode *n = xmlNewTextChild(r, 0, (xmlChar *) "metadata",
234                                              (xmlChar *) val);
235                 xmlSetProp(n, (xmlChar *) "type", (xmlChar *) md->name);
236             }
237         }
238     }
239 }
240
241 static xmlDoc *normalize_record(struct session_database *sdb,
242                                 struct session *se,
243                                 const char *rec)
244 {
245     xmlDoc *rdoc = record_to_xml(sdb, rec);
246
247     if (rdoc)
248     {
249         char *parms[MAX_XSLT_ARGS*2+1];
250         
251         insert_settings_parameters(sdb, se, parms);
252         
253         if (normalize_record_transform(sdb->map, &rdoc, (const char **)parms))
254         {
255             yaz_log(YLOG_WARN, "Normalize failed from %s", sdb->database->url);
256         }
257         else
258         {
259             insert_settings_values(sdb, rdoc, se->service);
260             
261             if (global_parameters.dump_records)
262             {
263                 yaz_log(YLOG_LOG, "Normalized record from %s", 
264                         sdb->database->url);
265                 log_xml_doc(rdoc);
266             }
267         }
268     }
269     return rdoc;
270 }
271
272 // Retrieve first defined value for 'name' for given database.
273 // Will be extended to take into account user associated with session
274 const char *session_setting_oneval(struct session_database *db, int offset)
275 {
276     if (!db->settings[offset])
277         return "";
278     return db->settings[offset]->value;
279 }
280
281 // Prepare XSLT stylesheets for record normalization
282 // Structures are allocated on the session_wide nmem to avoid having
283 // to recompute this for every search. This would lead
284 // to leaking if a single session was to repeatedly change the PZ_XSLT
285 // setting. However, this is not a realistic use scenario.
286 static int prepare_map(struct session *se, struct session_database *sdb)
287 {
288     const char *s;
289
290     if (!sdb->settings)
291     {
292         yaz_log(YLOG_WARN, "No settings on %s", sdb->database->url);
293         return -1;
294     }
295     if ((s = session_setting_oneval(sdb, PZ_XSLT)))
296     {
297         char auto_stylesheet[256];
298
299         if (!strcmp(s, "auto"))
300         {
301             const char *request_syntax = session_setting_oneval(
302                 sdb, PZ_REQUESTSYNTAX);
303             if (request_syntax)
304             {
305                 char *cp;
306                 yaz_snprintf(auto_stylesheet, sizeof(auto_stylesheet),
307                              "%s.xsl", request_syntax);
308                 for (cp = auto_stylesheet; *cp; cp++)
309                 {
310                     /* deliberately only consider ASCII */
311                     if (*cp > 32 && *cp < 127)
312                         *cp = tolower(*cp);
313                 }
314                 s = auto_stylesheet;
315             }
316             else
317             {
318                 yaz_log(YLOG_WARN, "No pz:requestsyntax for auto stylesheet");
319             }
320         }
321         sdb->map = normalize_cache_get(se->normalize_cache,
322                                        se->service, s);
323         if (!sdb->map)
324             return -1;
325     }
326     return 0;
327 }
328
329 // This analyzes settings and recomputes any supporting data structures
330 // if necessary.
331 static int prepare_session_database(struct session *se, 
332                                     struct session_database *sdb)
333 {
334     if (!sdb->settings)
335     {
336         yaz_log(YLOG_WARN, 
337                 "No settings associated with %s", sdb->database->url);
338         return -1;
339     }
340     if (sdb->settings[PZ_XSLT] && !sdb->map)
341     {
342         if (prepare_map(se, sdb) < 0)
343             return -1;
344     }
345     return 0;
346 }
347
348 // called if watch should be removed because http_channel is to be destroyed
349 static void session_watch_cancel(void *data, struct http_channel *c,
350                                  void *data2)
351 {
352     struct session_watchentry *ent = data;
353
354     ent->fun = 0;
355     ent->data = 0;
356     ent->obs = 0;
357 }
358
359 // set watch. Returns 0=OK, -1 if watch is already set
360 int session_set_watch(struct session *s, int what, 
361                       session_watchfun fun, void *data,
362                       struct http_channel *chan)
363 {
364     if (s->watchlist[what].fun)
365         return -1;
366     s->watchlist[what].fun = fun;
367     s->watchlist[what].data = data;
368     s->watchlist[what].obs = http_add_observer(chan, &s->watchlist[what],
369                                                session_watch_cancel);
370     return 0;
371 }
372
373 void session_alert_watch(struct session *s, int what)
374 {
375     if (s->watchlist[what].fun)
376     {
377         /* our watch is no longer associated with http_channel */
378         void *data;
379         session_watchfun fun;
380
381         http_remove_observer(s->watchlist[what].obs);
382         fun = s->watchlist[what].fun;
383         data = s->watchlist[what].data;
384
385         /* reset watch before fun is invoked - in case fun wants to set
386            it again */
387         s->watchlist[what].fun = 0;
388         s->watchlist[what].data = 0;
389         s->watchlist[what].obs = 0;
390
391         fun(data);
392     }
393 }
394
395 //callback for grep_databases
396 static void select_targets_callback(void *context, struct session_database *db)
397 {
398     struct session *se = (struct session*) context;
399     struct client *cl = client_create();
400     client_set_database(cl, db);
401     client_set_session(cl, se);
402 }
403
404 // Associates a set of clients with a session;
405 // Note: Session-databases represent databases with per-session 
406 // setting overrides
407 int select_targets(struct session *se, struct database_criterion *crit)
408 {
409     while (se->clients)
410         client_destroy(se->clients);
411
412     return session_grep_databases(se, crit, select_targets_callback);
413 }
414
415 int session_active_clients(struct session *s)
416 {
417     struct client *c;
418     int res = 0;
419
420     for (c = s->clients; c; c = client_next_in_session(c))
421         if (client_is_active(c))
422             res++;
423
424     return res;
425 }
426
427 // parses crit1=val1,crit2=val2|val3,...
428 static struct database_criterion *parse_filter(NMEM m, const char *buf)
429 {
430     struct database_criterion *res = 0;
431     char **values;
432     int num;
433     int i;
434
435     if (!buf || !*buf)
436         return 0;
437     nmem_strsplit(m, ",", buf,  &values, &num);
438     for (i = 0; i < num; i++)
439     {
440         char **subvalues;
441         int subnum;
442         int subi;
443         struct database_criterion *new = nmem_malloc(m, sizeof(*new));
444         char *eq;
445         if ((eq = strchr(values[i], '=')))
446             new->type = PAZPAR2_STRING_MATCH;
447         else if ((eq = strchr(values[i], '~')))
448             new->type = PAZPAR2_SUBSTRING_MATCH;
449         else
450         {
451             yaz_log(YLOG_WARN, "Missing equal-sign/tilde in filter");
452             return 0;
453         }
454         *(eq++) = '\0';
455         new->name = values[i];
456         nmem_strsplit(m, "|", eq, &subvalues, &subnum);
457         new->values = 0;
458         for (subi = 0; subi < subnum; subi++)
459         {
460             struct database_criterion_value *newv
461                 = nmem_malloc(m, sizeof(*newv));
462             newv->value = subvalues[subi];
463             newv->next = new->values;
464             new->values = newv;
465         }
466         new->next = res;
467         res = new;
468     }
469     return res;
470 }
471
472 enum pazpar2_error_code search(struct session *se,
473                                const char *query, const char *filter,
474                                const char **addinfo)
475 {
476     int live_channels = 0;
477     int no_working = 0;
478     int no_failed = 0;
479     struct client *cl;
480     struct database_criterion *criteria;
481
482     yaz_log(YLOG_DEBUG, "Search");
483
484     *addinfo = 0;
485     nmem_reset(se->nmem);
486     se->relevance = 0;
487     se->total_records = se->total_hits = se->total_merged = 0;
488     se->reclist = 0;
489     se->num_termlists = 0;
490     criteria = parse_filter(se->nmem, filter);
491     live_channels = select_targets(se, criteria);
492     if (live_channels)
493     {
494         se->reclist = reclist_create(se->nmem);
495     }
496     else
497         return PAZPAR2_NO_TARGETS;
498
499     for (cl = se->clients; cl; cl = client_next_in_session(cl))
500     {
501         if (prepare_session_database(se, client_get_database(cl)) < 0)
502             continue;
503         // Parse query for target
504         if (client_parse_query(cl, query) < 0)
505             no_failed++;
506         else
507         {
508             no_working++;
509             if (client_prep_connection(cl, se->service->z3950_operation_timeout,
510                                        se->service->z3950_session_timeout))
511                 client_start_search(cl);
512         }
513     }
514     if (no_working == 0)
515     {
516         if (no_failed > 0)
517         {
518             *addinfo = "query";
519             return PAZPAR2_MALFORMED_PARAMETER_VALUE;
520         }
521         else
522             return PAZPAR2_NO_TARGETS;
523     }
524     return PAZPAR2_NO_ERROR;
525 }
526
527 // Creates a new session_database object for a database
528 static void session_init_databases_fun(void *context, struct database *db)
529 {
530     struct session *se = (struct session *) context;
531     struct conf_service *service = se->service;
532     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
533     int num = settings_num(service);
534     int i;
535
536     new->database = db;
537     
538     new->map = 0;
539     new->settings 
540         = nmem_malloc(se->session_nmem, sizeof(struct settings *) * num);
541     memset(new->settings, 0, sizeof(struct settings*) * num);
542
543     if (db->settings)
544     {
545         for (i = 0; i < num; i++)
546             new->settings[i] = db->settings[i];
547     }
548     new->next = se->databases;
549     se->databases = new;
550 }
551
552 // Doesn't free memory associated with sdb -- nmem takes care of that
553 static void session_database_destroy(struct session_database *sdb)
554 {
555     sdb->map = 0;
556 }
557
558 // Initialize session_database list -- this represents this session's view
559 // of the database list -- subject to modification by the settings ws command
560 void session_init_databases(struct session *se)
561 {
562     se->databases = 0;
563     predef_grep_databases(se, se->service, 0, session_init_databases_fun);
564 }
565
566 // Probably session_init_databases_fun should be refactored instead of
567 // called here.
568 static struct session_database *load_session_database(struct session *se, 
569                                                       char *id)
570 {
571     struct database *db = find_database(id, 0, se->service);
572
573     resolve_database(db);
574
575     session_init_databases_fun((void*) se, db);
576     // New sdb is head of se->databases list
577     return se->databases;
578 }
579
580 // Find an existing session database. If not found, load it
581 static struct session_database *find_session_database(struct session *se, 
582                                                       char *id)
583 {
584     struct session_database *sdb;
585
586     for (sdb = se->databases; sdb; sdb = sdb->next)
587         if (!strcmp(sdb->database->url, id))
588             return sdb;
589     return load_session_database(se, id);
590 }
591
592 // Apply a session override to a database
593 void session_apply_setting(struct session *se, char *dbname, char *setting,
594                            char *value)
595 {
596     struct session_database *sdb = find_session_database(se, dbname);
597     struct conf_service *service = se->service;
598     struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
599     int offset = settings_offset_cprefix(service, setting);
600
601     if (offset < 0)
602     {
603         yaz_log(YLOG_WARN, "Unknown setting %s", setting);
604         return;
605     }
606     // Jakub: This breaks the filter setting.
607     /*if (offset == PZ_ID)
608       {
609       yaz_log(YLOG_WARN, "No need to set pz:id setting. Ignoring");
610       return;
611       }*/
612     new->precedence = 0;
613     new->target = dbname;
614     new->name = setting;
615     new->value = value;
616     new->next = sdb->settings[offset];
617     sdb->settings[offset] = new;
618
619     // Force later recompute of settings-driven data structures
620     // (happens when a search starts and client connections are prepared)
621     switch (offset)
622     {
623     case PZ_XSLT:
624         if (sdb->map)
625         {
626             sdb->map = 0;
627         }
628         break;
629     }
630 }
631
632 void destroy_session(struct session *s)
633 {
634     struct session_database *sdb;
635
636     while (s->clients)
637         client_destroy(s->clients);
638     for (sdb = s->databases; sdb; sdb = sdb->next)
639         session_database_destroy(sdb);
640     normalize_cache_destroy(s->normalize_cache);
641     nmem_destroy(s->nmem);
642     service_destroy(s->service);
643     wrbuf_destroy(s->wrbuf);
644 }
645
646 struct session *new_session(NMEM nmem, struct conf_service *service) 
647 {
648     int i;
649     struct session *session = nmem_malloc(nmem, sizeof(*session));
650
651     yaz_log(YLOG_DEBUG, "New Pazpar2 session");
652
653     session->service = service;
654     session->relevance = 0;
655     session->total_hits = 0;
656     session->total_records = 0;
657     session->number_of_warnings_unknown_elements = 0;
658     session->number_of_warnings_unknown_metadata = 0;
659     session->num_termlists = 0;
660     session->reclist = 0;
661     session->clients = 0;
662     session->session_nmem = nmem;
663     session->nmem = nmem_create();
664     session->wrbuf = wrbuf_alloc();
665     session->databases = 0;
666     for (i = 0; i <= SESSION_WATCH_MAX; i++)
667     {
668         session->watchlist[i].data = 0;
669         session->watchlist[i].fun = 0;
670     }
671     session->normalize_cache = normalize_cache_create();
672
673     return session;
674 }
675
676 struct hitsbytarget *hitsbytarget(struct session *se, int *count, NMEM nmem)
677 {
678     struct hitsbytarget *res = 0;
679     struct client *cl;
680     size_t sz = 0;
681
682     for (cl = se->clients; cl; cl = client_next_in_session(cl))
683         sz++;
684
685     res = nmem_malloc(nmem, sizeof(*res) * sz);
686     *count = 0;
687     for (cl = se->clients; cl; cl = client_next_in_session(cl))
688     {
689         const char *name = session_setting_oneval(client_get_database(cl),
690                                                   PZ_NAME);
691
692         res[*count].id = client_get_database(cl)->database->url;
693         res[*count].name = *name ? name : "Unknown";
694         res[*count].hits = client_get_hits(cl);
695         res[*count].records = client_get_num_records(cl);
696         res[*count].diagnostic = client_get_diagnostic(cl);
697         res[*count].state = client_get_state_str(cl);
698         res[*count].connected  = client_get_connection(cl) ? 1 : 0;
699         (*count)++;
700     }
701     return res;
702 }
703
704 struct termlist_score **termlist(struct session *s, const char *name, int *num)
705 {
706     int i;
707
708     for (i = 0; i < s->num_termlists; i++)
709         if (!strcmp((const char *) s->termlists[i].name, name))
710             return termlist_highscore(s->termlists[i].termlist, num);
711     return 0;
712 }
713
714 #ifdef MISSING_HEADERS
715 void report_nmem_stats(void)
716 {
717     size_t in_use, is_free;
718
719     nmem_get_memory_in_use(&in_use);
720     nmem_get_memory_free(&is_free);
721
722     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
723             (long) in_use, (long) is_free);
724 }
725 #endif
726
727 struct record_cluster *show_single(struct session *s, const char *id,
728                                    struct record_cluster **prev_r,
729                                    struct record_cluster **next_r)
730 {
731     struct record_cluster *r;
732
733     reclist_rewind(s->reclist);
734     *prev_r = 0;
735     *next_r = 0;
736     while ((r = reclist_read_record(s->reclist)))
737     {
738         if (!strcmp(r->recid, id))
739         {
740             *next_r = reclist_read_record(s->reclist);
741             return r;
742         }
743         *prev_r = r;
744     }
745     return 0;
746 }
747
748 struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, 
749                              int start, int *num, int *total, int *sumhits, 
750                              NMEM nmem_show)
751 {
752     struct record_cluster **recs = nmem_malloc(nmem_show, *num 
753                                                * sizeof(struct record_cluster *));
754     struct reclist_sortparms *spp;
755     int i;
756 #if USE_TIMING    
757     yaz_timing_t t = yaz_timing_create();
758 #endif
759
760     if (!s->relevance)
761     {
762         *num = 0;
763         *total = 0;
764         *sumhits = 0;
765         recs = 0;
766     }
767     else
768     {
769         for (spp = sp; spp; spp = spp->next)
770             if (spp->type == Metadata_sortkey_relevance)
771             {
772                 relevance_prepare_read(s->relevance, s->reclist);
773                 break;
774             }
775         reclist_sort(s->reclist, sp);
776         
777         *total = reclist_get_num_records(s->reclist);
778         *sumhits = s->total_hits;
779         
780         for (i = 0; i < start; i++)
781             if (!reclist_read_record(s->reclist))
782             {
783                 *num = 0;
784                 recs = 0;
785                 break;
786             }
787         
788         for (i = 0; i < *num; i++)
789         {
790             struct record_cluster *r = reclist_read_record(s->reclist);
791             if (!r)
792             {
793                 *num = i;
794                 break;
795             }
796             recs[i] = r;
797         }
798     }
799 #if USE_TIMING
800     yaz_timing_stop(t);
801     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
802             yaz_timing_get_real(t), yaz_timing_get_user(t),
803             yaz_timing_get_sys(t));
804     yaz_timing_destroy(&t);
805 #endif
806     return recs;
807 }
808
809 void statistics(struct session *se, struct statistics *stat)
810 {
811     struct client *cl;
812     int count = 0;
813
814     memset(stat, 0, sizeof(*stat));
815     for (cl = se->clients; cl; cl = client_next_in_session(cl))
816     {
817         if (!client_get_connection(cl))
818             stat->num_no_connection++;
819         switch (client_get_state(cl))
820         {
821         case Client_Connecting: stat->num_connecting++; break;
822         case Client_Working: stat->num_working++; break;
823         case Client_Idle: stat->num_idle++; break;
824         case Client_Failed: stat->num_failed++; break;
825         case Client_Error: stat->num_error++; break;
826         default: break;
827         }
828         count++;
829     }
830     stat->num_hits = se->total_hits;
831     stat->num_records = se->total_records;
832
833     stat->num_clients = count;
834 }
835
836
837 // Master list of connections we're handling events to
838 static IOCHAN channel_list = 0;  /* thread pr */
839
840 void pazpar2_add_channel(IOCHAN chan)
841 {
842     chan->next = channel_list;
843     channel_list = chan;
844 }
845
846 void pazpar2_event_loop()
847 {
848     event_loop(&channel_list);
849 }
850
851 static struct record_metadata *record_metadata_init(
852     NMEM nmem, char *value, enum conf_metadata_type type)
853 {
854     struct record_metadata *rec_md = record_metadata_create(nmem);
855     if (type == Metadata_type_generic)
856     {
857         char * p = value;
858         p = normalize7bit_generic(p, " ,/.:([");
859         
860         rec_md->data.text.disp = nmem_strdup(nmem, p);
861         rec_md->data.text.sort = 0;
862     }
863     else if (type == Metadata_type_year || type == Metadata_type_date)
864     {
865         int first, last;
866         int longdate = 0;
867
868         if (type == Metadata_type_date)
869             longdate = 1;
870         if (extract7bit_dates((char *) value, &first, &last, longdate) < 0)
871             return 0;
872
873         rec_md->data.number.min = first;
874         rec_md->data.number.max = last;
875     }
876     else
877         return 0;
878     return rec_md;
879 }
880
881 static int get_mergekey_from_doc(xmlDoc *doc, xmlNode *root, const char *name,
882                                  struct conf_service *service, WRBUF norm_wr)
883 {
884     xmlNode *n;
885     int no_found = 0;
886     for (n = root->children; n; n = n->next)
887     {
888         if (n->type != XML_ELEMENT_NODE)
889             continue;
890         if (!strcmp((const char *) n->name, "metadata"))
891         {
892             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
893             if (!strcmp(name, (const char *) type))
894             {
895                 xmlChar *value = xmlNodeListGetString(doc, n->children, 1);
896                 if (value)
897                 {
898                     const char *norm_str;
899                     pp2_relevance_token_t prt =
900                         pp2_relevance_tokenize(
901                             service->mergekey_pct,
902                             (const char *) value);
903                     
904                     wrbuf_puts(norm_wr, name);
905                     wrbuf_puts(norm_wr, "=");
906                     while ((norm_str =
907                             pp2_relevance_token_next(prt)))
908                     {
909                         if (*norm_str)
910                         {
911                             if (wrbuf_len(norm_wr))
912                                 wrbuf_puts(norm_wr, " ");
913                             wrbuf_puts(norm_wr, norm_str);
914                         }
915                     }
916                     xmlFree(value);
917                     pp2_relevance_token_destroy(prt);
918                     no_found++;
919                 }
920             }
921             xmlFree(type);
922         }
923     }
924     return no_found;
925 }
926
927 static const char *get_mergekey(xmlDoc *doc, struct client *cl, int record_no,
928                                 struct conf_service *service, NMEM nmem)
929 {
930     char *mergekey_norm = 0;
931     xmlNode *root = xmlDocGetRootElement(doc);
932     WRBUF norm_wr = wrbuf_alloc();
933
934     /* consider mergekey from XSL first */
935     xmlChar *mergekey = xmlGetProp(root, (xmlChar *) "mergekey");
936     if (mergekey)
937     {
938         const char *norm_str;
939         pp2_relevance_token_t prt =
940             pp2_relevance_tokenize(
941                 service->mergekey_pct,
942                 (const char *) mergekey);
943         
944         while ((norm_str = pp2_relevance_token_next(prt)))
945         {
946             if (*norm_str)
947             {
948                 if (wrbuf_len(norm_wr))
949                     wrbuf_puts(norm_wr, " ");
950                 wrbuf_puts(norm_wr, norm_str);
951             }
952         }
953         pp2_relevance_token_destroy(prt);
954         xmlFree(mergekey);
955     }
956     else
957     {
958         /* no mergekey defined in XSL. Look for mergekey metadata instead */
959         int field_id;
960         for (field_id = 0; field_id < service->num_metadata; field_id++)
961         {
962             struct conf_metadata *ser_md = &service->metadata[field_id];
963             if (ser_md->mergekey != Metadata_mergekey_no)
964             {
965                 int r = get_mergekey_from_doc(doc, root, ser_md->name,
966                                               service, norm_wr);
967                 if (r == 0 && ser_md->mergekey == Metadata_mergekey_required)
968                 {
969                     /* no mergekey on this one and it is required.. 
970                        Generate unique key instead */
971                     wrbuf_rewind(norm_wr);
972                     break;
973                 }
974             }
975         }
976     }
977
978     /* generate unique key if none is not generated already or is empty */
979     if (wrbuf_len(norm_wr) == 0)
980     {
981         wrbuf_printf(norm_wr, "%s-%d",
982                      client_get_database(cl)->database->url, record_no);
983     }
984     if (wrbuf_len(norm_wr) > 0)
985         mergekey_norm = nmem_strdup(nmem, wrbuf_cstr(norm_wr));
986     wrbuf_destroy(norm_wr);
987     return mergekey_norm;
988 }
989
990 /** \brief see if metadata for pz:recordfilter exists 
991     \param root xml root element of normalized record
992     \param sdb session database for client
993     \retval 0 if there is no metadata for pz:recordfilter
994     \retval 1 if there is metadata for pz:recordfilter
995
996     If there is no pz:recordfilter defined, this function returns 1
997     as well.
998 */
999     
1000 static int check_record_filter(xmlNode *root, struct session_database *sdb)
1001 {
1002     int match = 0;
1003     xmlNode *n;
1004     const char *s;
1005     s = session_setting_oneval(sdb, PZ_RECORDFILTER);
1006
1007     if (!s || !*s)
1008         return 1;
1009
1010     for (n = root->children; n; n = n->next)
1011     {
1012         if (n->type != XML_ELEMENT_NODE)
1013             continue;
1014         if (!strcmp((const char *) n->name, "metadata"))
1015         {
1016             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
1017             if (type)
1018             {
1019                 size_t len;
1020                 const char *eq = strchr(s, '=');
1021                 if (eq)
1022                     len = eq - s;
1023                 else
1024                     len = strlen(s);
1025                 if (len == strlen((const char *)type) &&
1026                     !memcmp((const char *) type, s, len))
1027                 {
1028                     xmlChar *value = xmlNodeGetContent(n);
1029                     if (value && *value)
1030                     {
1031                         if (!eq || strstr((const char *) value, eq+1))
1032                             match = 1;
1033                     }
1034                     xmlFree(value);
1035                 }
1036                 xmlFree(type);
1037             }
1038         }
1039     }
1040     return match;
1041 }
1042
1043
1044 /** \brief ingest XML record
1045     \param cl client holds the result set for record
1046     \param rec record buffer (0 terminated)
1047     \param record_no record position (1, 2, ..)
1048     \returns resulting record or NULL on failure
1049 */
1050 struct record *ingest_record(struct client *cl, const char *rec,
1051                              int record_no)
1052 {
1053     struct session_database *sdb = client_get_database(cl);
1054     struct session *se = client_get_session(cl);
1055     xmlDoc *xdoc = normalize_record(sdb, se, rec);
1056     xmlNode *root, *n;
1057     struct record *record;
1058     struct record_cluster *cluster;
1059     const char *mergekey_norm;
1060     xmlChar *type = 0;
1061     xmlChar *value = 0;
1062     struct conf_service *service = se->service;
1063
1064     if (!xdoc)
1065         return 0;
1066
1067     root = xmlDocGetRootElement(xdoc);
1068
1069     if (!check_record_filter(root, sdb))
1070     {
1071         yaz_log(YLOG_WARN, "Filtered out record no %d from %s", record_no,
1072             sdb->database->url);
1073         xmlFreeDoc(xdoc);
1074         return 0;
1075     }
1076
1077     mergekey_norm = get_mergekey(xdoc, cl, record_no, service, se->nmem);
1078     if (!mergekey_norm)
1079     {
1080         yaz_log(YLOG_WARN, "Got no mergekey");
1081         xmlFreeDoc(xdoc);
1082         return 0;
1083     }
1084     record = record_create(se->nmem, 
1085                            service->num_metadata, service->num_sortkeys, cl,
1086                            record_no);
1087
1088     cluster = reclist_insert(se->reclist, 
1089                              service, 
1090                              record, (char *) mergekey_norm, 
1091                              &se->total_merged);
1092     if (!cluster)
1093     {
1094         /* no room for record */
1095         xmlFreeDoc(xdoc);
1096         return 0;
1097     }
1098     if (global_parameters.dump_records)
1099         yaz_log(YLOG_LOG, "Cluster id %s from %s (#%d)", cluster->recid,
1100                 sdb->database->url, record_no);
1101     relevance_newrec(se->relevance, cluster);
1102     
1103     // now parsing XML record and adding data to cluster or record metadata
1104     for (n = root->children; n; n = n->next)
1105     {
1106         pp2_relevance_token_t prt;
1107         if (type)
1108             xmlFree(type);
1109         if (value)
1110             xmlFree(value);
1111         type = value = 0;
1112         
1113         if (n->type != XML_ELEMENT_NODE)
1114             continue;
1115         if (!strcmp((const char *) n->name, "metadata"))
1116         {
1117             struct conf_metadata *ser_md = 0;
1118             struct conf_sortkey *ser_sk = 0;
1119             struct record_metadata **wheretoput = 0;
1120             struct record_metadata *rec_md = 0;
1121             int md_field_id = -1;
1122             int sk_field_id = -1;
1123             
1124             type = xmlGetProp(n, (xmlChar *) "type");
1125             value = xmlNodeListGetString(xdoc, n->children, 1);
1126             
1127             if (!type || !value || !*value)
1128                 continue;
1129             
1130             md_field_id 
1131                 = conf_service_metadata_field_id(service, (const char *) type);
1132             if (md_field_id < 0)
1133             {
1134                 if (se->number_of_warnings_unknown_metadata == 0)
1135                 {
1136                     yaz_log(YLOG_WARN, 
1137                             "Ignoring unknown metadata element: %s", type);
1138                 }
1139                 se->number_of_warnings_unknown_metadata++;
1140                 continue;
1141             }
1142             
1143             ser_md = &service->metadata[md_field_id];
1144             
1145             if (ser_md->sortkey_offset >= 0){
1146                 sk_field_id = ser_md->sortkey_offset;
1147                 ser_sk = &service->sortkeys[sk_field_id];
1148             }
1149
1150             // non-merged metadata
1151             rec_md = record_metadata_init(se->nmem, (char *) value,
1152                                           ser_md->type);
1153             if (!rec_md)
1154             {
1155                 yaz_log(YLOG_WARN, "bad metadata data '%s' for element '%s'",
1156                         value, type);
1157                 continue;
1158             }
1159             wheretoput = &record->metadata[md_field_id];
1160             while (*wheretoput)
1161                 wheretoput = &(*wheretoput)->next;
1162             *wheretoput = rec_md;
1163
1164             // merged metadata
1165             rec_md = record_metadata_init(se->nmem, (char *) value,
1166                                           ser_md->type);
1167             wheretoput = &cluster->metadata[md_field_id];
1168
1169             // and polulate with data:
1170             // assign cluster or record based on merge action
1171             if (ser_md->merge == Metadata_merge_unique)
1172             {
1173                 struct record_metadata *mnode;
1174                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
1175                     if (!strcmp((const char *) mnode->data.text.disp, 
1176                                 rec_md->data.text.disp))
1177                         break;
1178                 if (!mnode)
1179                 {
1180                     rec_md->next = *wheretoput;
1181                     *wheretoput = rec_md;
1182                 }
1183             }
1184             else if (ser_md->merge == Metadata_merge_longest)
1185             {
1186                 if (!*wheretoput 
1187                     || strlen(rec_md->data.text.disp) 
1188                     > strlen((*wheretoput)->data.text.disp))
1189                 {
1190                     *wheretoput = rec_md;
1191                     if (ser_sk)
1192                     {
1193                         const char *sort_str = 0;
1194                         int skip_article = 
1195                             ser_sk->type == Metadata_sortkey_skiparticle;
1196
1197                         if (!cluster->sortkeys[sk_field_id])
1198                             cluster->sortkeys[sk_field_id] = 
1199                                 nmem_malloc(se->nmem, 
1200                                             sizeof(union data_types));
1201                          
1202                         prt = pp2_relevance_tokenize(
1203                             service->sort_pct,
1204                             rec_md->data.text.disp);
1205
1206                         pp2_relevance_token_next(prt);
1207                          
1208                         sort_str = pp2_get_sort(prt, skip_article);
1209                          
1210                         cluster->sortkeys[sk_field_id]->text.disp = 
1211                             rec_md->data.text.disp;
1212                         if (!sort_str)
1213                         {
1214                             sort_str = rec_md->data.text.disp;
1215                             yaz_log(YLOG_WARN, 
1216                                     "Could not make sortkey. Bug #1858");
1217                         }
1218                         cluster->sortkeys[sk_field_id]->text.sort = 
1219                             nmem_strdup(se->nmem, sort_str);
1220 #if 0
1221                         yaz_log(YLOG_LOG, "text disp=%s",
1222                                 cluster->sortkeys[sk_field_id]->text.disp);
1223                         yaz_log(YLOG_LOG, "text sort=%s",
1224                                 cluster->sortkeys[sk_field_id]->text.sort);
1225 #endif
1226                         pp2_relevance_token_destroy(prt);
1227                     }
1228                 }
1229             }
1230             else if (ser_md->merge == Metadata_merge_all)
1231             {
1232                 rec_md->next = *wheretoput;
1233                 *wheretoput = rec_md;
1234             }
1235             else if (ser_md->merge == Metadata_merge_range)
1236             {
1237                 if (!*wheretoput)
1238                 {
1239                     *wheretoput = rec_md;
1240                     if (ser_sk)
1241                         cluster->sortkeys[sk_field_id] 
1242                             = &rec_md->data;
1243                 }
1244                 else
1245                 {
1246                     int this_min = rec_md->data.number.min;
1247                     int this_max = rec_md->data.number.max;
1248                     if (this_min < (*wheretoput)->data.number.min)
1249                         (*wheretoput)->data.number.min = this_min;
1250                     if (this_max > (*wheretoput)->data.number.max)
1251                         (*wheretoput)->data.number.max = this_max;
1252                 }
1253             }
1254
1255
1256             // ranking of _all_ fields enabled ... 
1257             if (ser_md->rank)
1258                 relevance_countwords(se->relevance, cluster, 
1259                                      (char *) value, ser_md->rank);
1260
1261             // construct facets ... 
1262             if (ser_md->termlist)
1263             {
1264                 if (ser_md->type == Metadata_type_year)
1265                 {
1266                     char year[64];
1267                     sprintf(year, "%d", rec_md->data.number.max);
1268                     add_facet(se, (char *) type, year);
1269                     if (rec_md->data.number.max != rec_md->data.number.min)
1270                     {
1271                         sprintf(year, "%d", rec_md->data.number.min);
1272                         add_facet(se, (char *) type, year);
1273                     }
1274                 }
1275                 else
1276                     add_facet(se, (char *) type, (char *) value);
1277             }
1278
1279             // cleaning up
1280             xmlFree(type);
1281             xmlFree(value);
1282             type = value = 0;
1283         }
1284         else
1285         {
1286             if (se->number_of_warnings_unknown_elements == 0)
1287                 yaz_log(YLOG_WARN,
1288                         "Unexpected element in internal record: %s", n->name);
1289             se->number_of_warnings_unknown_elements++;
1290         }
1291     }
1292     if (type)
1293         xmlFree(type);
1294     if (value)
1295         xmlFree(value);
1296
1297     xmlFreeDoc(xdoc);
1298
1299     relevance_donerecord(se->relevance, cluster);
1300     se->total_records++;
1301
1302     return record;
1303 }
1304
1305
1306
1307 /*
1308  * Local variables:
1309  * c-basic-offset: 4
1310  * c-file-style: "Stroustrup"
1311  * indent-tabs-mode: nil
1312  * End:
1313  * vim: shiftwidth=4 tabstop=8 expandtab
1314  */
1315