Databases per-service.
[pazpar2-moved-to-github.git] / src / logic.c
1 /* This file is part of Pazpar2.
2    Copyright (C) 2006-2009 Index Data
3
4 Pazpar2 is free software; you can redistribute it and/or modify it under
5 the terms of the GNU General Public License as published by the Free
6 Software Foundation; either version 2, or (at your option) any later
7 version.
8
9 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
10 WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17
18 */
19
20 /** \file logic.c
21     \brief high-level logic; mostly user sessions and settings
22 */
23
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <string.h>
31 #if HAVE_SYS_TIME_H
32 #include <sys/time.h>
33 #endif
34 #if HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif
37 #include <signal.h>
38 #include <ctype.h>
39 #include <assert.h>
40
41 #include <yaz/marcdisp.h>
42 #include <yaz/comstack.h>
43 #include <yaz/tcpip.h>
44 #include <yaz/proto.h>
45 #include <yaz/readconf.h>
46 #include <yaz/pquery.h>
47 #include <yaz/otherinfo.h>
48 #include <yaz/yaz-util.h>
49 #include <yaz/nmem.h>
50 #include <yaz/query-charset.h>
51 #include <yaz/querytowrbuf.h>
52 #include <yaz/oid_db.h>
53 #include <yaz/snprintf.h>
54
55 #define USE_TIMING 0
56 #if USE_TIMING
57 #include <yaz/timing.h>
58 #endif
59
60
61 #include "pazpar2.h"
62 #include "eventl.h"
63 #include "http.h"
64 #include "termlists.h"
65 #include "reclists.h"
66 #include "relevance.h"
67 #include "database.h"
68 #include "client.h"
69 #include "settings.h"
70 #include "normalize7bit.h"
71
72 #define TERMLIST_HIGH_SCORE 25
73
74 #define MAX_CHUNK 15
75
76 // Note: Some things in this structure will eventually move to configuration
77 struct parameters global_parameters = 
78 {
79     "",
80     "",
81     0,
82     0,   // dump_records
83     0,   // debug_mode
84     30,  // operations timeout 
85     "81",
86     "Index Data PazPar2",
87     VERSION,
88     60,   // session timeout 
89     100,
90     MAX_CHUNK,
91     0,
92     0,
93     180, // Z39.50 session timeout
94     15   // Connect timeout
95 };
96
97 static void log_xml_doc(xmlDoc *doc)
98 {
99     FILE *lf = yaz_log_file();
100     xmlChar *result = 0;
101     int len = 0;
102 #if LIBXML_VERSION >= 20600
103     xmlDocDumpFormatMemory(doc, &result, &len, 1);
104 #else
105     xmlDocDumpMemory(doc, &result, &len);
106 #endif
107     if (lf && len)
108     {
109         fwrite(result, 1, len, lf);
110         fprintf(lf, "\n");
111     }
112     xmlFree(result);
113 }
114
115 // Recursively traverse query structure to extract terms.
116 void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
117 {
118     char **words;
119     int numwords;
120     int i;
121
122     switch (n->kind)
123     {
124     case CCL_RPN_AND:
125     case CCL_RPN_OR:
126     case CCL_RPN_NOT:
127     case CCL_RPN_PROX:
128         pull_terms(nmem, n->u.p[0], termlist, num);
129         pull_terms(nmem, n->u.p[1], termlist, num);
130         break;
131     case CCL_RPN_TERM:
132         nmem_strsplit(nmem, " ", n->u.t.term, &words, &numwords);
133         for (i = 0; i < numwords; i++)
134             termlist[(*num)++] = words[i];
135         break;
136     default: // NOOP
137         break;
138     }
139 }
140
141
142
143 static void add_facet(struct session *s, const char *type, const char *value)
144 {
145     int i;
146
147     if (!*value)
148         return;
149     for (i = 0; i < s->num_termlists; i++)
150         if (!strcmp(s->termlists[i].name, type))
151             break;
152     if (i == s->num_termlists)
153     {
154         if (i == SESSION_MAX_TERMLISTS)
155         {
156             yaz_log(YLOG_FATAL, "Too many termlists");
157             return;
158         }
159
160         s->termlists[i].name = nmem_strdup(s->nmem, type);
161         s->termlists[i].termlist 
162             = termlist_create(s->nmem, s->expected_maxrecs,
163                               TERMLIST_HIGH_SCORE);
164         s->num_termlists = i + 1;
165     }
166     termlist_insert(s->termlists[i].termlist, value);
167 }
168
169 xmlDoc *record_to_xml(struct session_database *sdb, const char *rec)
170 {
171     struct database *db = sdb->database;
172     xmlDoc *rdoc = 0;
173
174     rdoc = xmlParseMemory(rec, strlen(rec));
175
176     if (!rdoc)
177     {
178         yaz_log(YLOG_FATAL, "Non-wellformed XML received from %s",
179                 db->url);
180         return 0;
181     }
182
183     if (global_parameters.dump_records)
184     {
185         yaz_log(YLOG_LOG, "Un-normalized record from %s", db->url);
186         log_xml_doc(rdoc);
187     }
188
189     return rdoc;
190 }
191
192 #define MAX_XSLT_ARGS 16
193
194 // Add static values from session database settings if applicable
195 static void insert_settings_parameters(struct session_database *sdb,
196                                        struct session *se, char **parms)
197 {
198     struct conf_service *service = se->service;
199     int i;
200     int nparms = 0;
201     int offset = 0;
202
203     for (i = 0; i < service->num_metadata; i++)
204     {
205         struct conf_metadata *md = &service->metadata[i];
206         int setting;
207
208         if (md->setting == Metadata_setting_parameter &&
209             (setting = settings_offset(service, md->name)) > 0)
210         {
211             const char *val = session_setting_oneval(sdb, setting);
212             if (val && nparms < MAX_XSLT_ARGS)
213             {
214                 char *buf;
215                 int len = strlen(val);
216                 buf = nmem_malloc(se->nmem, len + 3);
217                 buf[0] = '\'';
218                 strcpy(buf + 1, val);
219                 buf[len+1] = '\'';
220                 buf[len+2] = '\0';
221                 parms[offset++] = md->name;
222                 parms[offset++] = buf;
223                 nparms++;
224             }
225         }
226     }
227     parms[offset] = 0;
228 }
229
230 // Add static values from session database settings if applicable
231 static void insert_settings_values(struct session_database *sdb, xmlDoc *doc,
232     struct conf_service *service)
233 {
234     int i;
235
236     for (i = 0; i < service->num_metadata; i++)
237     {
238         struct conf_metadata *md = &service->metadata[i];
239         int offset;
240
241         if (md->setting == Metadata_setting_postproc &&
242             (offset = settings_offset(service, md->name)) > 0)
243         {
244             const char *val = session_setting_oneval(sdb, offset);
245             if (val)
246             {
247                 xmlNode *r = xmlDocGetRootElement(doc);
248                 xmlNode *n = xmlNewTextChild(r, 0, (xmlChar *) "metadata",
249                                              (xmlChar *) val);
250                 xmlSetProp(n, (xmlChar *) "type", (xmlChar *) md->name);
251             }
252         }
253     }
254 }
255
256 xmlDoc *normalize_record(struct session_database *sdb, struct session *se,
257                          const char *rec)
258 {
259     struct database_retrievalmap *m;
260     xmlDoc *rdoc = record_to_xml(sdb, rec);
261     if (rdoc)
262     {
263         for (m = sdb->map; m; m = m->next)
264         {
265             xmlDoc *new = 0;
266             
267             {
268                 xmlNodePtr root = 0;
269                 char *parms[MAX_XSLT_ARGS*2+1];
270
271                 insert_settings_parameters(sdb, se, parms);
272
273                 new = xsltApplyStylesheet(m->stylesheet, rdoc, (const char **) parms);
274                 root= xmlDocGetRootElement(new);
275                 if (!new || !root || !(root->children))
276                 {
277                     yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
278                             sdb->database->url);
279                     xmlFreeDoc(new);
280                     xmlFreeDoc(rdoc);
281                     return 0;
282                 }
283             }
284             
285             xmlFreeDoc(rdoc);
286             rdoc = new;
287         }
288
289         insert_settings_values(sdb, rdoc, se->service);
290
291         if (global_parameters.dump_records)
292         {
293             yaz_log(YLOG_LOG, "Normalized record from %s", 
294                     sdb->database->url);
295             log_xml_doc(rdoc);
296         }
297     }
298     return rdoc;
299 }
300
301 // Retrieve first defined value for 'name' for given database.
302 // Will be extended to take into account user associated with session
303 const char *session_setting_oneval(struct session_database *db, int offset)
304 {
305     if (!db->settings[offset])
306         return "";
307     return db->settings[offset]->value;
308 }
309
310 // Prepare XSLT stylesheets for record normalization
311 // Structures are allocated on the session_wide nmem to avoid having
312 // to recompute this for every search. This would lead
313 // to leaking if a single session was to repeatedly change the PZ_XSLT
314 // setting. However, this is not a realistic use scenario.
315 static int prepare_map(struct session *se, struct session_database *sdb)
316 {
317     const char *s;
318
319     if (!sdb->settings)
320     {
321         yaz_log(YLOG_WARN, "No settings on %s", sdb->database->url);
322         return -1;
323     }
324     if ((s = session_setting_oneval(sdb, PZ_XSLT)))
325     {
326         char **stylesheets;
327         struct database_retrievalmap **m = &sdb->map;
328         int num, i;
329         char auto_stylesheet[256];
330
331         if (!strcmp(s, "auto"))
332         {
333             const char *request_syntax = session_setting_oneval(
334                 sdb, PZ_REQUESTSYNTAX);
335             if (request_syntax)
336             {
337                 char *cp;
338                 yaz_snprintf(auto_stylesheet, sizeof(auto_stylesheet),
339                              "%s.xsl", request_syntax);
340                 for (cp = auto_stylesheet; *cp; cp++)
341                 {
342                     /* deliberately only consider ASCII */
343                     if (*cp > 32 && *cp < 127)
344                         *cp = tolower(*cp);
345                 }
346                 s = auto_stylesheet;
347             }
348             else
349             {
350                 yaz_log(YLOG_WARN, "No pz:requestsyntax for auto stylesheet");
351             }
352         }
353         nmem_strsplit(se->session_nmem, ",", s, &stylesheets, &num);
354         for (i = 0; i < num; i++)
355         {
356             (*m) = nmem_malloc(se->session_nmem, sizeof(**m));
357             (*m)->next = 0;
358             if (!((*m)->stylesheet = conf_load_stylesheet(stylesheets[i])))
359             {
360                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "Unable to load stylesheet: %s",
361                         stylesheets[i]);
362                 return -1;
363             }
364             m = &(*m)->next;
365         }
366     }
367     if (!sdb->map)
368         yaz_log(YLOG_WARN, "No Normalization stylesheet for target %s",
369                 sdb->database->url);
370     return 0;
371 }
372
373 // This analyzes settings and recomputes any supporting data structures
374 // if necessary.
375 static int prepare_session_database(struct session *se, 
376                                     struct session_database *sdb)
377 {
378     if (!sdb->settings)
379     {
380         yaz_log(YLOG_WARN, 
381                 "No settings associated with %s", sdb->database->url);
382         return -1;
383     }
384     if (sdb->settings[PZ_XSLT] && !sdb->map)
385     {
386         if (prepare_map(se, sdb) < 0)
387             return -1;
388     }
389     return 0;
390 }
391
392 // called if watch should be removed because http_channel is to be destroyed
393 static void session_watch_cancel(void *data, struct http_channel *c,
394                                  void *data2)
395 {
396     struct session_watchentry *ent = data;
397
398     ent->fun = 0;
399     ent->data = 0;
400     ent->obs = 0;
401 }
402
403 // set watch. Returns 0=OK, -1 if watch is already set
404 int session_set_watch(struct session *s, int what, 
405                       session_watchfun fun, void *data,
406                       struct http_channel *chan)
407 {
408     if (s->watchlist[what].fun)
409         return -1;
410     s->watchlist[what].fun = fun;
411     s->watchlist[what].data = data;
412     s->watchlist[what].obs = http_add_observer(chan, &s->watchlist[what],
413                                                session_watch_cancel);
414     return 0;
415 }
416
417 void session_alert_watch(struct session *s, int what)
418 {
419     if (s->watchlist[what].fun)
420     {
421         /* our watch is no longer associated with http_channel */
422         void *data;
423         session_watchfun fun;
424
425         http_remove_observer(s->watchlist[what].obs);
426         fun = s->watchlist[what].fun;
427         data = s->watchlist[what].data;
428
429         /* reset watch before fun is invoked - in case fun wants to set
430            it again */
431         s->watchlist[what].fun = 0;
432         s->watchlist[what].data = 0;
433         s->watchlist[what].obs = 0;
434
435         fun(data);
436     }
437 }
438
439 //callback for grep_databases
440 static void select_targets_callback(void *context, struct session_database *db)
441 {
442     struct session *se = (struct session*) context;
443     struct client *cl = client_create();
444     client_set_database(cl, db);
445     client_set_session(cl, se);
446 }
447
448 // Associates a set of clients with a session;
449 // Note: Session-databases represent databases with per-session 
450 // setting overrides
451 int select_targets(struct session *se, struct database_criterion *crit)
452 {
453     while (se->clients)
454         client_destroy(se->clients);
455
456     return session_grep_databases(se, crit, select_targets_callback);
457 }
458
459 int session_active_clients(struct session *s)
460 {
461     struct client *c;
462     int res = 0;
463
464     for (c = s->clients; c; c = client_next_in_session(c))
465         if (client_is_active(c))
466             res++;
467
468     return res;
469 }
470
471 // parses crit1=val1,crit2=val2|val3,...
472 static struct database_criterion *parse_filter(NMEM m, const char *buf)
473 {
474     struct database_criterion *res = 0;
475     char **values;
476     int num;
477     int i;
478
479     if (!buf || !*buf)
480         return 0;
481     nmem_strsplit(m, ",", buf,  &values, &num);
482     for (i = 0; i < num; i++)
483     {
484         char **subvalues;
485         int subnum;
486         int subi;
487         struct database_criterion *new = nmem_malloc(m, sizeof(*new));
488         char *eq = strchr(values[i], '=');
489         if (!eq)
490         {
491             yaz_log(YLOG_WARN, "Missing equal-sign in filter");
492             return 0;
493         }
494         *(eq++) = '\0';
495         new->name = values[i];
496         nmem_strsplit(m, "|", eq, &subvalues, &subnum);
497         new->values = 0;
498         for (subi = 0; subi < subnum; subi++)
499         {
500             struct database_criterion_value *newv
501                 = nmem_malloc(m, sizeof(*newv));
502             newv->value = subvalues[subi];
503             newv->next = new->values;
504             new->values = newv;
505         }
506         new->next = res;
507         res = new;
508     }
509     return res;
510 }
511
512 enum pazpar2_error_code search(struct session *se,
513                                char *query, char *filter,
514                                const char **addinfo)
515 {
516     int live_channels = 0;
517     int no_working = 0;
518     int no_failed = 0;
519     struct client *cl;
520     struct database_criterion *criteria;
521
522     yaz_log(YLOG_DEBUG, "Search");
523
524     *addinfo = 0;
525     nmem_reset(se->nmem);
526     se->relevance = 0;
527     se->total_records = se->total_hits = se->total_merged = 0;
528     se->reclist = 0;
529     se->num_termlists = 0;
530     criteria = parse_filter(se->nmem, filter);
531     live_channels = select_targets(se, criteria);
532     if (live_channels)
533     {
534         int maxrecs = live_channels * global_parameters.toget; // This is buggy!!!
535         se->reclist = reclist_create(se->nmem, maxrecs);
536         se->expected_maxrecs = maxrecs;
537     }
538     else
539         return PAZPAR2_NO_TARGETS;
540
541     for (cl = se->clients; cl; cl = client_next_in_session(cl))
542     {
543         if (prepare_session_database(se, client_get_database(cl)) < 0)
544         {
545             *addinfo = client_get_database(cl)->database->url;
546             return PAZPAR2_CONFIG_TARGET;
547         }
548         // Parse query for target
549         if (client_parse_query(cl, query) < 0)
550             no_failed++;
551         else
552         {
553             no_working++;
554             if (client_prep_connection(cl))
555                 client_start_search(cl);
556         }
557     }
558
559     // If no queries could be mapped, we signal an error
560     if (no_working == 0)
561     {
562         *addinfo = "query";
563         return PAZPAR2_MALFORMED_PARAMETER_VALUE;
564     }
565     return PAZPAR2_NO_ERROR;
566 }
567
568 // Creates a new session_database object for a database
569 static void session_init_databases_fun(void *context, struct database *db)
570 {
571     struct session *se = (struct session *) context;
572     struct conf_service *service = se->service;
573     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
574     int num = settings_num(service);
575     int i;
576
577     new->database = db;
578     
579     new->map = 0;
580     new->settings 
581         = nmem_malloc(se->session_nmem, sizeof(struct settings *) * num);
582     memset(new->settings, 0, sizeof(struct settings*) * num);
583
584     if (db->settings)
585     {
586         for (i = 0; i < num; i++)
587             new->settings[i] = db->settings[i];
588     }
589     new->next = se->databases;
590     se->databases = new;
591 }
592
593 // Doesn't free memory associated with sdb -- nmem takes care of that
594 static void session_database_destroy(struct session_database *sdb)
595 {
596     struct database_retrievalmap *m;
597
598     for (m = sdb->map; m; m = m->next)
599         xsltFreeStylesheet(m->stylesheet);
600 }
601
602 // Initialize session_database list -- this represents this session's view
603 // of the database list -- subject to modification by the settings ws command
604 void session_init_databases(struct session *se)
605 {
606     se->databases = 0;
607     predef_grep_databases(se, se->service, 0, session_init_databases_fun);
608 }
609
610 // Probably session_init_databases_fun should be refactored instead of
611 // called here.
612 static struct session_database *load_session_database(struct session *se, 
613                                                       char *id)
614 {
615     struct database *db = find_database(id, 0, se->service);
616
617     session_init_databases_fun((void*) se, db);
618     // New sdb is head of se->databases list
619     return se->databases;
620 }
621
622 // Find an existing session database. If not found, load it
623 static struct session_database *find_session_database(struct session *se, 
624                                                       char *id)
625 {
626     struct session_database *sdb;
627
628     for (sdb = se->databases; sdb; sdb = sdb->next)
629         if (!strcmp(sdb->database->url, id))
630             return sdb;
631     return load_session_database(se, id);
632 }
633
634 // Apply a session override to a database
635 void session_apply_setting(struct session *se, char *dbname, char *setting,
636                            char *value)
637 {
638     struct session_database *sdb = find_session_database(se, dbname);
639     struct conf_service *service = se->service;
640     struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
641     int offset = settings_offset_cprefix(service, setting);
642
643     if (offset < 0)
644     {
645         yaz_log(YLOG_WARN, "Unknown setting %s", setting);
646         return;
647     }
648     // Jakub: This breaks the filter setting.
649     /*if (offset == PZ_ID)
650       {
651       yaz_log(YLOG_WARN, "No need to set pz:id setting. Ignoring");
652       return;
653       }*/
654     new->precedence = 0;
655     new->target = dbname;
656     new->name = setting;
657     new->value = value;
658     new->next = sdb->settings[offset];
659     sdb->settings[offset] = new;
660
661     // Force later recompute of settings-driven data structures
662     // (happens when a search starts and client connections are prepared)
663     switch (offset)
664     {
665     case PZ_XSLT:
666         if (sdb->map)
667         {
668             struct database_retrievalmap *m;
669             // We don't worry about the map structure -- it's in nmem
670             for (m = sdb->map; m; m = m->next)
671                 xsltFreeStylesheet(m->stylesheet);
672             sdb->map = 0;
673         }
674         break;
675     }
676 }
677
678 void destroy_session(struct session *s)
679 {
680     struct session_database *sdb;
681
682     while (s->clients)
683         client_destroy(s->clients);
684     for (sdb = s->databases; sdb; sdb = sdb->next)
685         session_database_destroy(sdb);
686     nmem_destroy(s->nmem);
687     wrbuf_destroy(s->wrbuf);
688 }
689
690 struct session *new_session(NMEM nmem, struct conf_service *service) 
691 {
692     int i;
693     struct session *session = nmem_malloc(nmem, sizeof(*session));
694
695     yaz_log(YLOG_DEBUG, "New Pazpar2 session");
696
697     session->service = service;
698     session->relevance = 0;
699     session->total_hits = 0;
700     session->total_records = 0;
701     session->number_of_warnings_unknown_elements = 0;
702     session->number_of_warnings_unknown_metadata = 0;
703     session->num_termlists = 0;
704     session->reclist = 0;
705     session->clients = 0;
706     session->expected_maxrecs = 0;
707     session->session_nmem = nmem;
708     session->nmem = nmem_create();
709     session->wrbuf = wrbuf_alloc();
710     session->databases = 0;
711     for (i = 0; i <= SESSION_WATCH_MAX; i++)
712     {
713         session->watchlist[i].data = 0;
714         session->watchlist[i].fun = 0;
715     }
716     return session;
717 }
718
719 struct hitsbytarget *hitsbytarget(struct session *se, int *count, NMEM nmem)
720 {
721     struct hitsbytarget *res = 0;
722     struct client *cl;
723     size_t sz = 0;
724
725     for (cl = se->clients; cl; cl = client_next_in_session(cl))
726         sz++;
727
728     res = nmem_malloc(nmem, sizeof(*res) * sz);
729     *count = 0;
730     for (cl = se->clients; cl; cl = client_next_in_session(cl))
731     {
732         const char *name = session_setting_oneval(client_get_database(cl),
733                                                   PZ_NAME);
734
735         res[*count].id = client_get_database(cl)->database->url;
736         res[*count].name = *name ? name : "Unknown";
737         res[*count].hits = client_get_hits(cl);
738         res[*count].records = client_get_num_records(cl);
739         res[*count].diagnostic = client_get_diagnostic(cl);
740         res[*count].state = client_get_state_str(cl);
741         res[*count].connected  = client_get_connection(cl) ? 1 : 0;
742         (*count)++;
743     }
744     return res;
745 }
746
747 struct termlist_score **termlist(struct session *s, const char *name, int *num)
748 {
749     int i;
750
751     for (i = 0; i < s->num_termlists; i++)
752         if (!strcmp((const char *) s->termlists[i].name, name))
753             return termlist_highscore(s->termlists[i].termlist, num);
754     return 0;
755 }
756
757 #ifdef MISSING_HEADERS
758 void report_nmem_stats(void)
759 {
760     size_t in_use, is_free;
761
762     nmem_get_memory_in_use(&in_use);
763     nmem_get_memory_free(&is_free);
764
765     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
766             (long) in_use, (long) is_free);
767 }
768 #endif
769
770 struct record_cluster *show_single(struct session *s, const char *id,
771                                    struct record_cluster **prev_r,
772                                    struct record_cluster **next_r)
773 {
774     struct record_cluster *r;
775
776     reclist_rewind(s->reclist);
777     *prev_r = 0;
778     *next_r = 0;
779     while ((r = reclist_read_record(s->reclist)))
780     {
781         if (!strcmp(r->recid, id))
782         {
783             *next_r = reclist_read_record(s->reclist);
784             return r;
785         }
786         *prev_r = r;
787     }
788     return 0;
789 }
790
791 struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, 
792                              int start, int *num, int *total, int *sumhits, 
793                              NMEM nmem_show)
794 {
795     struct record_cluster **recs = nmem_malloc(nmem_show, *num 
796                                                * sizeof(struct record_cluster *));
797     struct reclist_sortparms *spp;
798     int i;
799 #if USE_TIMING    
800     yaz_timing_t t = yaz_timing_create();
801 #endif
802
803     if (!s->relevance)
804     {
805         *num = 0;
806         *total = 0;
807         *sumhits = 0;
808         recs = 0;
809     }
810     else
811     {
812         for (spp = sp; spp; spp = spp->next)
813             if (spp->type == Metadata_sortkey_relevance)
814             {
815                 relevance_prepare_read(s->relevance, s->reclist);
816                 break;
817             }
818         reclist_sort(s->reclist, sp);
819         
820         *total = s->reclist->num_records;
821         *sumhits = s->total_hits;
822         
823         for (i = 0; i < start; i++)
824             if (!reclist_read_record(s->reclist))
825             {
826                 *num = 0;
827                 recs = 0;
828                 break;
829             }
830         
831         for (i = 0; i < *num; i++)
832         {
833             struct record_cluster *r = reclist_read_record(s->reclist);
834             if (!r)
835             {
836                 *num = i;
837                 break;
838             }
839             recs[i] = r;
840         }
841     }
842 #if USE_TIMING
843     yaz_timing_stop(t);
844     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
845             yaz_timing_get_real(t), yaz_timing_get_user(t),
846             yaz_timing_get_sys(t));
847     yaz_timing_destroy(&t);
848 #endif
849     return recs;
850 }
851
852 void statistics(struct session *se, struct statistics *stat)
853 {
854     struct client *cl;
855     int count = 0;
856
857     memset(stat, 0, sizeof(*stat));
858     for (cl = se->clients; cl; cl = client_next_in_session(cl))
859     {
860         if (!client_get_connection(cl))
861             stat->num_no_connection++;
862         switch (client_get_state(cl))
863         {
864         case Client_Connecting: stat->num_connecting++; break;
865         case Client_Working: stat->num_working++; break;
866         case Client_Idle: stat->num_idle++; break;
867         case Client_Failed: stat->num_failed++; break;
868         case Client_Error: stat->num_error++; break;
869         default: break;
870         }
871         count++;
872     }
873     stat->num_hits = se->total_hits;
874     stat->num_records = se->total_records;
875
876     stat->num_clients = count;
877 }
878
879 int start_http_listener(void)
880 {
881     char hp[128] = "";
882     struct conf_server *ser = global_parameters.server;
883
884     if (*global_parameters.listener_override)
885         strcpy(hp, global_parameters.listener_override);
886     else
887     {
888         strcpy(hp, ser->host ? ser->host : "");
889         if (ser->port)
890         {
891             if (*hp)
892                 strcat(hp, ":");
893             sprintf(hp + strlen(hp), "%d", ser->port);
894         }
895     }
896     return http_init(hp);
897 }
898
899 void start_proxy(void)
900 {
901     char hp[128] = "";
902     struct conf_server *ser = global_parameters.server;
903
904     if (*global_parameters.proxy_override)
905         strcpy(hp, global_parameters.proxy_override);
906     else if (ser->proxy_host || ser->proxy_port)
907     {
908         strcpy(hp, ser->proxy_host ? ser->proxy_host : "");
909         if (ser->proxy_port)
910         {
911             if (*hp)
912                 strcat(hp, ":");
913             sprintf(hp + strlen(hp), "%d", ser->proxy_port);
914         }
915     }
916     else
917         return;
918
919     http_set_proxyaddr(hp, ser->myurl ? ser->myurl : "");
920 }
921
922
923 // Master list of connections we're handling events to
924 static IOCHAN channel_list = 0; 
925 void pazpar2_add_channel(IOCHAN chan)
926 {
927     chan->next = channel_list;
928     channel_list = chan;
929 }
930
931 void pazpar2_event_loop()
932 {
933     event_loop(&channel_list);
934 }
935
936 static struct record_metadata *record_metadata_init(
937     NMEM nmem, char *value, enum conf_metadata_type type)
938 {
939     struct record_metadata *rec_md = record_metadata_create(nmem);
940     if (type == Metadata_type_generic)
941     {
942         char * p = value;
943         p = normalize7bit_generic(p, " ,/.:([");
944         
945         rec_md->data.text.disp = nmem_strdup(nmem, p);
946         rec_md->data.text.sort = 0;
947     }
948     else if (type == Metadata_type_year || type == Metadata_type_date)
949     {
950         int first, last;
951         int longdate = 0;
952
953         if (type == Metadata_type_date)
954             longdate = 1;
955         if (extract7bit_dates((char *) value, &first, &last, longdate) < 0)
956             return 0;
957
958         rec_md->data.number.min = first;
959         rec_md->data.number.max = last;
960     }
961     else
962         return 0;
963     return rec_md;
964 }
965
966 const char *get_mergekey(xmlDoc *doc, struct client *cl, int record_no,
967                          struct conf_service *service, NMEM nmem)
968 {
969     char *mergekey_norm = 0;
970     xmlNode *root = xmlDocGetRootElement(doc);
971     WRBUF norm_wr = wrbuf_alloc();
972     xmlNode *n;
973
974     /* create mergekey based on mergekey attribute from XSL (if any) */
975     xmlChar *mergekey = xmlGetProp(root, (xmlChar *) "mergekey");
976     if (mergekey)
977     {
978         const char *norm_str;
979         pp2_relevance_token_t prt =
980             pp2_relevance_tokenize(
981                 global_parameters.server->mergekey_pct,
982                 (const char *) mergekey);
983         
984         while ((norm_str = pp2_relevance_token_next(prt)))
985         {
986             if (*norm_str)
987             {
988                 if (wrbuf_len(norm_wr))
989                     wrbuf_puts(norm_wr, " ");
990                 wrbuf_puts(norm_wr, norm_str);
991             }
992         }
993         pp2_relevance_token_destroy(prt);
994         xmlFree(mergekey);
995     }
996     /* append (if any) mergekey=yes metadata values */
997     for (n = root->children; n; n = n->next)
998     {
999         if (n->type != XML_ELEMENT_NODE)
1000             continue;
1001         if (!strcmp((const char *) n->name, "metadata"))
1002         {
1003             struct conf_metadata *ser_md = 0;
1004             int md_field_id = -1;
1005             
1006             xmlChar *type = xmlGetProp(n, (xmlChar *) "type");
1007             
1008             if (!type)
1009                 continue;
1010                 
1011             md_field_id 
1012                 = conf_service_metadata_field_id(service, 
1013                                                  (const char *) type);
1014             if (md_field_id >= 0)
1015             {
1016                 ser_md = &service->metadata[md_field_id];
1017                 if (ser_md->mergekey == Metadata_mergekey_yes)
1018                 {
1019                     xmlChar *value = xmlNodeListGetString(doc, n->children, 1);
1020                     if (value)
1021                     {
1022                         const char *norm_str;
1023                         pp2_relevance_token_t prt =
1024                             pp2_relevance_tokenize(
1025                                 global_parameters.server->mergekey_pct,
1026                                 (const char *) value);
1027                         
1028                         while ((norm_str = pp2_relevance_token_next(prt)))
1029                         {
1030                             if (*norm_str)
1031                             {
1032                                 if (wrbuf_len(norm_wr))
1033                                     wrbuf_puts(norm_wr, " ");
1034                                 wrbuf_puts(norm_wr, norm_str);
1035                             }
1036                         }
1037                         xmlFree(value);
1038                         pp2_relevance_token_destroy(prt);
1039                     }
1040                 }
1041             }
1042             xmlFree(type);
1043         }
1044     }
1045
1046     /* generate unique key if none is not generated already or is empty */
1047     if (wrbuf_len(norm_wr) == 0)
1048     {
1049         wrbuf_printf(norm_wr, "%s-%d",
1050                      client_get_database(cl)->database->url, record_no);
1051     }
1052     if (wrbuf_len(norm_wr) > 0)
1053         mergekey_norm = nmem_strdup(nmem, wrbuf_cstr(norm_wr));
1054     wrbuf_destroy(norm_wr);
1055     return mergekey_norm;
1056 }
1057
1058
1059
1060 /** \brief ingest XML record
1061     \param cl client holds the result set for record
1062     \param rec record buffer (0 terminated)
1063     \param record_no record position (1, 2, ..)
1064     \returns resulting record or NULL on failure
1065 */
1066 struct record *ingest_record(struct client *cl, const char *rec,
1067                              int record_no)
1068 {
1069     xmlDoc *xdoc = normalize_record(client_get_database(cl),
1070                                     client_get_session(cl), rec);
1071     xmlNode *root, *n;
1072     struct record *record;
1073     struct record_cluster *cluster;
1074     struct session *se = client_get_session(cl);
1075     const char *mergekey_norm;
1076     xmlChar *type = 0;
1077     xmlChar *value = 0;
1078     struct conf_service *service = se->service;
1079
1080     if (!xdoc)
1081         return 0;
1082
1083     root = xmlDocGetRootElement(xdoc);
1084
1085     mergekey_norm = get_mergekey(xdoc, cl, record_no, service, se->nmem);
1086     if (!mergekey_norm)
1087     {
1088         yaz_log(YLOG_WARN, "Got no mergekey");
1089         xmlFreeDoc(xdoc);
1090         return 0;
1091     }
1092     record = record_create(se->nmem, 
1093                            service->num_metadata, service->num_sortkeys, cl,
1094                            record_no);
1095
1096     cluster = reclist_insert(se->reclist, 
1097                              service, 
1098                              record, (char *) mergekey_norm, 
1099                              &se->total_merged);
1100     if (global_parameters.dump_records)
1101         yaz_log(YLOG_LOG, "Cluster id %s from %s (#%d)", cluster->recid,
1102                 client_get_database(cl)->database->url, record_no);
1103     if (!cluster)
1104     {
1105         /* no room for record */
1106         xmlFreeDoc(xdoc);
1107         return 0;
1108     }
1109     relevance_newrec(se->relevance, cluster);
1110     
1111     // now parsing XML record and adding data to cluster or record metadata
1112     for (n = root->children; n; n = n->next)
1113     {
1114         pp2_relevance_token_t prt;
1115         if (type)
1116             xmlFree(type);
1117         if (value)
1118             xmlFree(value);
1119         type = value = 0;
1120         
1121         if (n->type != XML_ELEMENT_NODE)
1122             continue;
1123         if (!strcmp((const char *) n->name, "metadata"))
1124         {
1125             struct conf_metadata *ser_md = 0;
1126             struct conf_sortkey *ser_sk = 0;
1127             struct record_metadata **wheretoput = 0;
1128             struct record_metadata *rec_md = 0;
1129             int md_field_id = -1;
1130             int sk_field_id = -1;
1131             
1132             type = xmlGetProp(n, (xmlChar *) "type");
1133             value = xmlNodeListGetString(xdoc, n->children, 1);
1134             
1135             if (!type || !value || !*value)
1136                 continue;
1137             
1138             md_field_id 
1139                 = conf_service_metadata_field_id(service, (const char *) type);
1140             if (md_field_id < 0)
1141             {
1142                 if (se->number_of_warnings_unknown_metadata == 0)
1143                 {
1144                     yaz_log(YLOG_WARN, 
1145                             "Ignoring unknown metadata element: %s", type);
1146                 }
1147                 se->number_of_warnings_unknown_metadata++;
1148                 continue;
1149             }
1150             
1151             ser_md = &service->metadata[md_field_id];
1152             
1153             if (ser_md->sortkey_offset >= 0){
1154                 sk_field_id = ser_md->sortkey_offset;
1155                 ser_sk = &service->sortkeys[sk_field_id];
1156             }
1157
1158             // non-merged metadata
1159             rec_md = record_metadata_init(se->nmem, (char *) value,
1160                                           ser_md->type);
1161             if (!rec_md)
1162             {
1163                 yaz_log(YLOG_WARN, "bad metadata data '%s' for element '%s'",
1164                         value, type);
1165                 continue;
1166             }
1167             wheretoput = &record->metadata[md_field_id];
1168             while (*wheretoput)
1169                 wheretoput = &(*wheretoput)->next;
1170             *wheretoput = rec_md;
1171
1172             // merged metadata
1173             rec_md = record_metadata_init(se->nmem, (char *) value,
1174                                           ser_md->type);
1175             wheretoput = &cluster->metadata[md_field_id];
1176
1177             // and polulate with data:
1178             // assign cluster or record based on merge action
1179             if (ser_md->merge == Metadata_merge_unique)
1180             {
1181                 struct record_metadata *mnode;
1182                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
1183                     if (!strcmp((const char *) mnode->data.text.disp, 
1184                                 rec_md->data.text.disp))
1185                         break;
1186                 if (!mnode)
1187                 {
1188                     rec_md->next = *wheretoput;
1189                     *wheretoput = rec_md;
1190                 }
1191             }
1192             else if (ser_md->merge == Metadata_merge_longest)
1193             {
1194                 if (!*wheretoput 
1195                     || strlen(rec_md->data.text.disp) 
1196                     > strlen((*wheretoput)->data.text.disp))
1197                 {
1198                     *wheretoput = rec_md;
1199                     if (ser_sk)
1200                     {
1201                         const char *sort_str = 0;
1202                         int skip_article = 
1203                             ser_sk->type == Metadata_sortkey_skiparticle;
1204
1205                         if (!cluster->sortkeys[sk_field_id])
1206                             cluster->sortkeys[sk_field_id] = 
1207                                 nmem_malloc(se->nmem, 
1208                                             sizeof(union data_types));
1209                          
1210                         prt = pp2_relevance_tokenize(
1211                             global_parameters.server->sort_pct,
1212                             rec_md->data.text.disp);
1213
1214                         pp2_relevance_token_next(prt);
1215                          
1216                         sort_str = pp2_get_sort(prt, skip_article);
1217                          
1218                         cluster->sortkeys[sk_field_id]->text.disp = 
1219                             rec_md->data.text.disp;
1220                         if (!sort_str)
1221                         {
1222                             sort_str = rec_md->data.text.disp;
1223                             yaz_log(YLOG_WARN, 
1224                                     "Could not make sortkey. Bug #1858");
1225                         }
1226                         cluster->sortkeys[sk_field_id]->text.sort = 
1227                             nmem_strdup(se->nmem, sort_str);
1228 #if 0
1229                         yaz_log(YLOG_LOG, "text disp=%s",
1230                                 cluster->sortkeys[sk_field_id]->text.disp);
1231                         yaz_log(YLOG_LOG, "text sort=%s",
1232                                 cluster->sortkeys[sk_field_id]->text.sort);
1233 #endif
1234                         pp2_relevance_token_destroy(prt);
1235                     }
1236                 }
1237             }
1238             else if (ser_md->merge == Metadata_merge_all)
1239             {
1240                 rec_md->next = *wheretoput;
1241                 *wheretoput = rec_md;
1242             }
1243             else if (ser_md->merge == Metadata_merge_range)
1244             {
1245                 if (!*wheretoput)
1246                 {
1247                     *wheretoput = rec_md;
1248                     if (ser_sk)
1249                         cluster->sortkeys[sk_field_id] 
1250                             = &rec_md->data;
1251                 }
1252                 else
1253                 {
1254                     int this_min = rec_md->data.number.min;
1255                     int this_max = rec_md->data.number.max;
1256                     if (this_min < (*wheretoput)->data.number.min)
1257                         (*wheretoput)->data.number.min = this_min;
1258                     if (this_max > (*wheretoput)->data.number.max)
1259                         (*wheretoput)->data.number.max = this_max;
1260                 }
1261             }
1262
1263
1264             // ranking of _all_ fields enabled ... 
1265             if (ser_md->rank)
1266                 relevance_countwords(se->relevance, cluster, 
1267                                      (char *) value, ser_md->rank);
1268
1269             // construct facets ... 
1270             if (ser_md->termlist)
1271             {
1272                 if (ser_md->type == Metadata_type_year)
1273                 {
1274                     char year[64];
1275                     sprintf(year, "%d", rec_md->data.number.max);
1276                     add_facet(se, (char *) type, year);
1277                     if (rec_md->data.number.max != rec_md->data.number.min)
1278                     {
1279                         sprintf(year, "%d", rec_md->data.number.min);
1280                         add_facet(se, (char *) type, year);
1281                     }
1282                 }
1283                 else
1284                     add_facet(se, (char *) type, (char *) value);
1285             }
1286
1287             // cleaning up
1288             xmlFree(type);
1289             xmlFree(value);
1290             type = value = 0;
1291         }
1292         else
1293         {
1294             if (se->number_of_warnings_unknown_elements == 0)
1295                 yaz_log(YLOG_WARN,
1296                         "Unexpected element in internal record: %s", n->name);
1297             se->number_of_warnings_unknown_elements++;
1298         }
1299     }
1300     if (type)
1301         xmlFree(type);
1302     if (value)
1303         xmlFree(value);
1304
1305     xmlFreeDoc(xdoc);
1306
1307     relevance_donerecord(se->relevance, cluster);
1308     se->total_records++;
1309
1310     return record;
1311 }
1312
1313
1314
1315 /*
1316  * Local variables:
1317  * c-basic-offset: 4
1318  * c-file-style: "Stroustrup"
1319  * indent-tabs-mode: nil
1320  * End:
1321  * vim: shiftwidth=4 tabstop=8 expandtab
1322  */
1323