Implemented numeric sorting (sort by year). Bug #820.
[pazpar2-moved-to-github.git] / src / logic.c
1 /* $Id: logic.c,v 1.55 2007-07-18 13:37:30 adam Exp $
2    Copyright (c) 2006-2007, Index Data.
3
4 This file is part of Pazpar2.
5
6 Pazpar2 is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Pazpar2; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 /** \file logic.c
23     \brief high-level logic; mostly user sessions and settings
24 */
25
26 #include <stdlib.h>
27 #include <stdio.h>
28 #include <string.h>
29 #include <sys/time.h>
30 #include <unistd.h>
31 #include <sys/socket.h>
32 #include <netdb.h>
33 #include <signal.h>
34 #include <ctype.h>
35 #include <assert.h>
36
37 #include <yaz/marcdisp.h>
38 #include <yaz/comstack.h>
39 #include <yaz/tcpip.h>
40 #include <yaz/proto.h>
41 #include <yaz/readconf.h>
42 #include <yaz/pquery.h>
43 #include <yaz/otherinfo.h>
44 #include <yaz/yaz-util.h>
45 #include <yaz/nmem.h>
46 #include <yaz/query-charset.h>
47 #include <yaz/querytowrbuf.h>
48 #include <yaz/oid_db.h>
49 #include <yaz/snprintf.h>
50
51 #if HAVE_CONFIG_H
52 #include "cconfig.h"
53 #endif
54
55 #define USE_TIMING 0
56 #if USE_TIMING
57 #include <yaz/timing.h>
58 #endif
59
60 #include <netinet/in.h>
61
62 #include "pazpar2.h"
63 #include "eventl.h"
64 #include "http.h"
65 #include "termlists.h"
66 #include "reclists.h"
67 #include "relevance.h"
68 #include "config.h"
69 #include "database.h"
70 #include "client.h"
71 #include "settings.h"
72 #include "normalize7bit.h"
73
74 #define TERMLIST_HIGH_SCORE 25
75
76 #define MAX_CHUNK 15
77
78 // Note: Some things in this structure will eventually move to configuration
79 struct parameters global_parameters = 
80 {
81     "",
82     "",
83     "", 
84     0,
85     0, /* dump_records */
86     0, /* debug_mode */
87     30,
88     "81",
89     "Index Data PazPar2",
90     VERSION,
91     600, // 10 minutes
92     60,
93     100,
94     MAX_CHUNK,
95     0,
96     0
97 };
98
99
100 // Recursively traverse query structure to extract terms.
101 void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
102 {
103     char **words;
104     int numwords;
105     int i;
106
107     switch (n->kind)
108     {
109         case CCL_RPN_AND:
110         case CCL_RPN_OR:
111         case CCL_RPN_NOT:
112         case CCL_RPN_PROX:
113             pull_terms(nmem, n->u.p[0], termlist, num);
114             pull_terms(nmem, n->u.p[1], termlist, num);
115             break;
116         case CCL_RPN_TERM:
117             nmem_strsplit(nmem, " ", n->u.t.term, &words, &numwords);
118             for (i = 0; i < numwords; i++)
119                 termlist[(*num)++] = words[i];
120             break;
121         default: // NOOP
122             break;
123     }
124 }
125
126
127
128 static void add_facet(struct session *s, const char *type, const char *value)
129 {
130     int i;
131
132     if (!*value)
133         return;
134     for (i = 0; i < s->num_termlists; i++)
135         if (!strcmp(s->termlists[i].name, type))
136             break;
137     if (i == s->num_termlists)
138     {
139         if (i == SESSION_MAX_TERMLISTS)
140         {
141             yaz_log(YLOG_FATAL, "Too many termlists");
142             return;
143         }
144
145         s->termlists[i].name = nmem_strdup(s->nmem, type);
146         s->termlists[i].termlist 
147             = termlist_create(s->nmem, s->expected_maxrecs,
148                               TERMLIST_HIGH_SCORE);
149         s->num_termlists = i + 1;
150     }
151     termlist_insert(s->termlists[i].termlist, value);
152 }
153
154 xmlDoc *record_to_xml(struct session_database *sdb, Z_External *rec)
155 {
156     struct database *db = sdb->database;
157     xmlDoc *rdoc = 0;
158     const Odr_oid *oid = rec->direct_reference;
159
160     /* convert response record to XML somehow */
161     if (rec->which == Z_External_octet && oid
162         && !oid_oidcmp(oid, yaz_oid_recsyn_xml))
163     {
164         /* xml already */
165         rdoc = xmlParseMemory((char*) rec->u.octet_aligned->buf,
166                               rec->u.octet_aligned->len);
167         if (!rdoc)
168         {
169             yaz_log(YLOG_FATAL, "Non-wellformed XML received from %s",
170                     db->url);
171             return 0;
172         }
173     }
174     else if (rec->which == Z_External_OPAC)
175     {
176         if (!sdb->yaz_marc)
177         {
178             yaz_log(YLOG_WARN, "MARC decoding not configured");
179             return 0;
180         }
181         else
182         {
183             /* OPAC gets converted to XML too */
184             WRBUF wrbuf_opac = wrbuf_alloc();
185             /* MARCXML inside the OPAC XML. Charset is in effect because we
186                use the yaz_marc handle */
187             yaz_marc_xml(sdb->yaz_marc, YAZ_MARC_MARCXML);
188             yaz_opac_decode_wrbuf(sdb->yaz_marc, rec->u.opac, wrbuf_opac);
189             
190             rdoc = xmlParseMemory((char*) wrbuf_buf(wrbuf_opac),
191                                   wrbuf_len(wrbuf_opac));
192             if (!rdoc)
193                 yaz_log(YLOG_WARN, "Unable to parse OPAC XML");
194             wrbuf_destroy(wrbuf_opac);
195         }
196     }
197     else if (oid && yaz_oid_is_iso2709(oid))
198     {
199         /* ISO2709 gets converted to MARCXML */
200         if (!sdb->yaz_marc)
201         {
202             yaz_log(YLOG_WARN, "MARC decoding not configured");
203             return 0;
204         }
205         else
206         {
207             xmlNode *res;
208             char *buf;
209             int len;
210             
211             if (rec->which != Z_External_octet)
212             {
213                 yaz_log(YLOG_WARN, "Unexpected external branch, probably BER %s",
214                         db->url);
215                 return 0;
216             }
217             buf = (char*) rec->u.octet_aligned->buf;
218             len = rec->u.octet_aligned->len;
219             if (yaz_marc_read_iso2709(sdb->yaz_marc, buf, len) < 0)
220             {
221                 yaz_log(YLOG_WARN, "Failed to decode MARC %s", db->url);
222                 return 0;
223             }
224             
225             yaz_marc_write_using_libxml2(sdb->yaz_marc, 1);
226             if (yaz_marc_write_xml(sdb->yaz_marc, &res,
227                                    "http://www.loc.gov/MARC21/slim", 0, 0) < 0)
228             {
229                 yaz_log(YLOG_WARN, "Failed to encode as XML %s",
230                         db->url);
231                 return 0;
232             }
233             rdoc = xmlNewDoc((xmlChar *) "1.0");
234             xmlDocSetRootElement(rdoc, res);
235         }
236     }
237     else
238     {
239         char oid_name_buf[OID_STR_MAX];
240         const char *oid_name = yaz_oid_to_string_buf(oid, 0, oid_name_buf);
241         yaz_log(YLOG_FATAL, 
242                 "Unable to handle record of type %s from %s", 
243                 oid_name, db->url);
244         return 0;
245     }
246     
247     if (global_parameters.dump_records)
248     {
249         FILE *lf = yaz_log_file();
250         if (lf)
251         {
252             yaz_log(YLOG_LOG, "Un-normalized record from %s", db->url);
253 #if LIBXML_VERSION >= 20600
254             xmlDocFormatDump(lf, rdoc, 1);
255 #else
256             xmlDocDump(lf, rdoc);
257 #endif
258             fprintf(lf, "\n");
259         }
260     }
261     return rdoc;
262 }
263
264 xmlDoc *normalize_record(struct session_database *sdb, Z_External *rec)
265 {
266     struct database_retrievalmap *m;
267     xmlDoc *rdoc = record_to_xml(sdb, rec);
268     if (rdoc)
269     {
270         for (m = sdb->map; m; m = m->next){
271             xmlDoc *new = 0;
272             
273             {
274                 xmlNodePtr root = 0;
275                 new = xsltApplyStylesheet(m->stylesheet, rdoc, 0);
276                 root= xmlDocGetRootElement(new);
277                 if (!new || !root || !(root->children))
278                 {
279                     yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
280                             sdb->database->url);
281                     xmlFreeDoc(new);
282                     xmlFreeDoc(rdoc);
283                     return 0;
284                 }
285             }
286             
287             xmlFreeDoc(rdoc);
288             rdoc = new;
289         }
290         if (global_parameters.dump_records)
291         {
292             FILE *lf = yaz_log_file();
293             
294             if (lf)
295             {
296                 yaz_log(YLOG_LOG, "Normalized record from %s", 
297                         sdb->database->url);
298 #if LIBXML_VERSION >= 20600
299                 xmlDocFormatDump(lf, rdoc, 1);
300 #else
301                 xmlDocDump(lf, rdoc);
302 #endif
303                 fprintf(lf, "\n");
304             }
305         }
306     }
307     return rdoc;
308 }
309
310 // Retrieve first defined value for 'name' for given database.
311 // Will be extended to take into account user associated with session
312 char *session_setting_oneval(struct session_database *db, int offset)
313 {
314     if (!db->settings[offset])
315         return "";
316     return db->settings[offset]->value;
317 }
318
319
320
321 // Initialize YAZ Map structures for MARC-based targets
322 static int prepare_yazmarc(struct session_database *sdb)
323 {
324     char *s;
325
326     if (!sdb->settings)
327     {
328         yaz_log(YLOG_WARN, "No settings for %s", sdb->database->url);
329         return -1;
330     }
331     if ((s = session_setting_oneval(sdb, PZ_NATIVESYNTAX)) 
332         && !strncmp(s, "iso2709", 7))
333     {
334         char *encoding = "marc-8s", *e;
335         yaz_iconv_t cm;
336
337         // See if a native encoding is specified
338         if ((e = strchr(s, ';')))
339             encoding = e + 1;
340
341         sdb->yaz_marc = yaz_marc_create();
342         yaz_marc_subfield_str(sdb->yaz_marc, "\t");
343         
344         cm = yaz_iconv_open("utf-8", encoding);
345         if (!cm)
346         {
347             yaz_log(YLOG_FATAL, 
348                     "Unable to map from %s to UTF-8 for target %s", 
349                     encoding, sdb->database->url);
350             return -1;
351         }
352         yaz_marc_iconv(sdb->yaz_marc, cm);
353     }
354     return 0;
355 }
356
357 // Prepare XSLT stylesheets for record normalization
358 // Structures are allocated on the session_wide nmem to avoid having
359 // to recompute this for every search. This would lead
360 // to leaking if a single session was to repeatedly change the PZ_XSLT
361 // setting. However, this is not a realistic use scenario.
362 static int prepare_map(struct session *se, struct session_database *sdb)
363 {
364    char *s;
365
366     if (!sdb->settings)
367     {
368         yaz_log(YLOG_WARN, "No settings on %s", sdb->database->url);
369         return -1;
370     }
371     if ((s = session_setting_oneval(sdb, PZ_XSLT)))
372     {
373         char **stylesheets;
374         struct database_retrievalmap **m = &sdb->map;
375         int num, i;
376         char auto_stylesheet[256];
377
378         if (!strcmp(s, "auto"))
379         {
380             char *request_syntax = session_setting_oneval(sdb,
381                                                           PZ_REQUESTSYNTAX);
382             if (request_syntax)
383             {
384                 char *cp;
385                 yaz_snprintf(auto_stylesheet, sizeof(auto_stylesheet),
386                              "%s.xsl", request_syntax);
387                 for (cp = auto_stylesheet; *cp; cp++)
388                 {
389                     /* deliberately only consider ASCII */
390                     if (*cp > 32 && *cp < 127)
391                         *cp = tolower(*cp);
392                 }
393                 s = auto_stylesheet;
394             }
395             else
396             {
397                 yaz_log(YLOG_WARN, "No pz:requestsyntax for auto stylesheet");
398             }
399         }
400         nmem_strsplit(se->session_nmem, ",", s, &stylesheets, &num);
401         for (i = 0; i < num; i++)
402         {
403             (*m) = nmem_malloc(se->session_nmem, sizeof(**m));
404             (*m)->next = 0;
405             if (!((*m)->stylesheet = conf_load_stylesheet(stylesheets[i])))
406             {
407                 yaz_log(YLOG_FATAL|YLOG_ERRNO, "Unable to load stylesheet: %s",
408                         stylesheets[i]);
409                 return -1;
410             }
411             m = &(*m)->next;
412         }
413     }
414     if (!sdb->map)
415         yaz_log(YLOG_WARN, "No Normalization stylesheet for target %s",
416                 sdb->database->url);
417     return 0;
418 }
419
420 // This analyzes settings and recomputes any supporting data structures
421 // if necessary.
422 static int prepare_session_database(struct session *se, 
423                                     struct session_database *sdb)
424 {
425     if (!sdb->settings)
426     {
427         yaz_log(YLOG_WARN, 
428                 "No settings associated with %s", sdb->database->url);
429         return -1;
430     }
431     if (sdb->settings[PZ_NATIVESYNTAX] && !sdb->yaz_marc)
432     {
433         if (prepare_yazmarc(sdb) < 0)
434             return -1;
435     }
436     if (sdb->settings[PZ_XSLT] && !sdb->map)
437     {
438         if (prepare_map(se, sdb) < 0)
439             return -1;
440     }
441     return 0;
442 }
443
444 // called if watch should be removed because http_channel is to be destroyed
445 static void session_watch_cancel(void *data, struct http_channel *c)
446 {
447     struct session_watchentry *ent = data;
448
449     ent->fun = 0;
450     ent->data = 0;
451     ent->obs = 0;
452 }
453
454 // set watch. Returns 0=OK, -1 if watch is already set
455 int session_set_watch(struct session *s, int what, 
456                       session_watchfun fun, void *data,
457                       struct http_channel *chan)
458 {
459     if (s->watchlist[what].fun)
460         return -1;
461     s->watchlist[what].fun = fun;
462     s->watchlist[what].data = data;
463     s->watchlist[what].obs = http_add_observer(chan, &s->watchlist[what],
464                                                session_watch_cancel);
465     return 0;
466 }
467
468 void session_alert_watch(struct session *s, int what)
469 {
470     if (!s->watchlist[what].fun)
471         return;
472     http_remove_observer(s->watchlist[what].obs);
473     (*s->watchlist[what].fun)(s->watchlist[what].data);
474     s->watchlist[what].fun = 0;
475     s->watchlist[what].data = 0;
476     s->watchlist[what].obs = 0;
477 }
478
479 //callback for grep_databases
480 static void select_targets_callback(void *context, struct session_database *db)
481 {
482     struct session *se = (struct session*) context;
483     struct client *cl = client_create();
484     client_set_database(cl, db);
485     client_set_session(cl, se);
486 }
487
488 // Associates a set of clients with a session;
489 // Note: Session-databases represent databases with per-session 
490 // setting overrides
491 int select_targets(struct session *se, struct database_criterion *crit)
492 {
493     while (se->clients)
494         client_destroy(se->clients);
495
496     return session_grep_databases(se, crit, select_targets_callback);
497 }
498
499 int session_active_clients(struct session *s)
500 {
501     struct client *c;
502     int res = 0;
503
504     for (c = s->clients; c; c = client_next_in_session(c))
505         if (client_is_active(c))
506             res++;
507
508     return res;
509 }
510
511 // parses crit1=val1,crit2=val2|val3,...
512 static struct database_criterion *parse_filter(NMEM m, const char *buf)
513 {
514     struct database_criterion *res = 0;
515     char **values;
516     int num;
517     int i;
518
519     if (!buf || !*buf)
520         return 0;
521     nmem_strsplit(m, ",", buf,  &values, &num);
522     for (i = 0; i < num; i++)
523     {
524         char **subvalues;
525         int subnum;
526         int subi;
527         struct database_criterion *new = nmem_malloc(m, sizeof(*new));
528         char *eq = strchr(values[i], '=');
529         if (!eq)
530         {
531             yaz_log(YLOG_WARN, "Missing equal-sign in filter");
532             return 0;
533         }
534         *(eq++) = '\0';
535         new->name = values[i];
536         nmem_strsplit(m, "|", eq, &subvalues, &subnum);
537         new->values = 0;
538         for (subi = 0; subi < subnum; subi++)
539         {
540             struct database_criterion_value *newv
541                 = nmem_malloc(m, sizeof(*newv));
542             newv->value = subvalues[subi];
543             newv->next = new->values;
544             new->values = newv;
545         }
546         new->next = res;
547         res = new;
548     }
549     return res;
550 }
551
552 enum pazpar2_error_code search(struct session *se,
553                                char *query, char *filter,
554                                const char **addinfo)
555 {
556     int live_channels = 0;
557     int no_working = 0;
558     int no_failed = 0;
559     struct client *cl;
560     struct database_criterion *criteria;
561
562     yaz_log(YLOG_DEBUG, "Search");
563
564     *addinfo = 0;
565     nmem_reset(se->nmem);
566     criteria = parse_filter(se->nmem, filter);
567     se->requestid++;
568     live_channels = select_targets(se, criteria);
569     if (live_channels)
570     {
571         int maxrecs = live_channels * global_parameters.toget;
572         se->num_termlists = 0;
573         se->reclist = reclist_create(se->nmem, maxrecs);
574         // This will be initialized in send_search()
575         se->total_records = se->total_hits = se->total_merged = 0;
576         se->expected_maxrecs = maxrecs;
577     }
578     else
579         return PAZPAR2_NO_TARGETS;
580
581     se->relevance = 0;
582
583     for (cl = se->clients; cl; cl = client_next_in_session(cl))
584     {
585         if (prepare_session_database(se, client_get_database(cl)) < 0)
586         {
587             *addinfo = client_get_database(cl)->database->url;
588             return PAZPAR2_CONFIG_TARGET;
589         }
590         // Parse query for target
591         if (client_parse_query(cl, query) < 0)
592             no_failed++;
593         else
594         {
595             no_working++;
596             client_prep_connection(cl);
597         }
598     }
599
600     // If no queries could be mapped, we signal an error
601     if (no_working == 0)
602     {
603         *addinfo = "query";
604         return PAZPAR2_MALFORMED_PARAMETER_VALUE;
605     }
606     return PAZPAR2_NO_ERROR;
607 }
608
609 // Creates a new session_database object for a database
610 static void session_init_databases_fun(void *context, struct database *db)
611 {
612     struct session *se = (struct session *) context;
613     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
614     int num = settings_num();
615     int i;
616
617     new->database = db;
618     new->yaz_marc = 0;
619     
620 #ifdef HAVE_ICU
621     if (global_parameters.server && global_parameters.server->icu_chn)
622         new->pct = pp2_charset_create(global_parameters.server->icu_chn);
623     else
624         new->pct = pp2_charset_create(0);
625 #else // HAVE_ICU
626     new->pct = pp2_charset_create(0);
627 #endif // HAVE_ICU
628
629     new->map = 0;
630     new->settings 
631         = nmem_malloc(se->session_nmem, sizeof(struct settings *) * num);
632     memset(new->settings, 0, sizeof(struct settings*) * num);
633
634     if (db->settings)
635     {
636         for (i = 0; i < num; i++)
637             new->settings[i] = db->settings[i];
638     }
639     new->next = se->databases;
640     se->databases = new;
641 }
642
643 // Doesn't free memory associated with sdb -- nmem takes care of that
644 static void session_database_destroy(struct session_database *sdb)
645 {
646     struct database_retrievalmap *m;
647
648     for (m = sdb->map; m; m = m->next)
649         xsltFreeStylesheet(m->stylesheet);
650     if (sdb->yaz_marc)
651         yaz_marc_destroy(sdb->yaz_marc);
652     if (sdb->pct)
653         pp2_charset_destroy(sdb->pct);
654 }
655
656 // Initialize session_database list -- this represents this session's view
657 // of the database list -- subject to modification by the settings ws command
658 void session_init_databases(struct session *se)
659 {
660     se->databases = 0;
661     predef_grep_databases(se, 0, session_init_databases_fun);
662 }
663
664 // Probably session_init_databases_fun should be refactored instead of
665 // called here.
666 static struct session_database *load_session_database(struct session *se, 
667                                                       char *id)
668 {
669     struct database *db = find_database(id, 0);
670
671     session_init_databases_fun((void*) se, db);
672     // New sdb is head of se->databases list
673     return se->databases;
674 }
675
676 // Find an existing session database. If not found, load it
677 static struct session_database *find_session_database(struct session *se, 
678                                                       char *id)
679 {
680     struct session_database *sdb;
681
682     for (sdb = se->databases; sdb; sdb = sdb->next)
683         if (!strcmp(sdb->database->url, id))
684             return sdb;
685     return load_session_database(se, id);
686 }
687
688 // Apply a session override to a database
689 void session_apply_setting(struct session *se, char *dbname, char *setting,
690                            char *value)
691 {
692     struct session_database *sdb = find_session_database(se, dbname);
693     struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
694     int offset = settings_offset_cprefix(setting);
695
696     if (offset < 0)
697     {
698         yaz_log(YLOG_WARN, "Unknown setting %s", setting);
699         return;
700     }
701     // Jakub: This breaks the filter setting.
702     /*if (offset == PZ_ID)
703     {
704         yaz_log(YLOG_WARN, "No need to set pz:id setting. Ignoring");
705         return;
706     }*/
707     new->precedence = 0;
708     new->target = dbname;
709     new->name = setting;
710     new->value = value;
711     new->next = sdb->settings[offset];
712     sdb->settings[offset] = new;
713
714     // Force later recompute of settings-driven data structures
715     // (happens when a search starts and client connections are prepared)
716     switch (offset)
717     {
718         case PZ_NATIVESYNTAX:
719             if (sdb->yaz_marc)
720             {
721                 yaz_marc_destroy(sdb->yaz_marc);
722                 sdb->yaz_marc = 0;
723             }
724             break;
725         case PZ_XSLT:
726             if (sdb->map)
727             {
728                 struct database_retrievalmap *m;
729                 // We don't worry about the map structure -- it's in nmem
730                 for (m = sdb->map; m; m = m->next)
731                     xsltFreeStylesheet(m->stylesheet);
732                 sdb->map = 0;
733             }
734             break;
735     }
736 }
737
738 void destroy_session(struct session *s)
739 {
740     struct session_database *sdb;
741
742     while (s->clients)
743         client_destroy(s->clients);
744     for (sdb = s->databases; sdb; sdb = sdb->next)
745         session_database_destroy(sdb);
746     nmem_destroy(s->nmem);
747     wrbuf_destroy(s->wrbuf);
748 }
749
750 struct session *new_session(NMEM nmem) 
751 {
752     int i;
753     struct session *session = nmem_malloc(nmem, sizeof(*session));
754
755     yaz_log(YLOG_DEBUG, "New Pazpar2 session");
756     
757     session->relevance = 0;
758     session->total_hits = 0;
759     session->total_records = 0;
760     session->num_termlists = 0;
761     session->reclist = 0;
762     session->requestid = -1;
763     session->clients = 0;
764     session->expected_maxrecs = 0;
765     session->session_nmem = nmem;
766     session->nmem = nmem_create();
767     session->wrbuf = wrbuf_alloc();
768     session->databases = 0;
769     for (i = 0; i <= SESSION_WATCH_MAX; i++)
770     {
771         session->watchlist[i].data = 0;
772         session->watchlist[i].fun = 0;
773     }
774
775     return session;
776 }
777
778 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
779 {
780     static struct hitsbytarget res[1000]; // FIXME MM
781     struct client *cl;
782
783     *count = 0;
784     for (cl = se->clients; cl; cl = client_next_in_session(cl))
785     {
786         char *name = session_setting_oneval(client_get_database(cl), PZ_NAME);
787
788         res[*count].id = client_get_database(cl)->database->url;
789         res[*count].name = *name ? name : "Unknown";
790         res[*count].hits = client_get_hits(cl);
791         res[*count].records = client_get_num_records(cl);
792         res[*count].diagnostic = client_get_diagnostic(cl);
793         res[*count].state = client_get_state_str(cl);
794         res[*count].connected  = client_get_connection(cl) ? 1 : 0;
795         (*count)++;
796     }
797
798     return res;
799 }
800
801 struct termlist_score **termlist(struct session *s, const char *name, int *num)
802 {
803     int i;
804
805     for (i = 0; i < s->num_termlists; i++)
806         if (!strcmp((const char *) s->termlists[i].name, name))
807             return termlist_highscore(s->termlists[i].termlist, num);
808     return 0;
809 }
810
811 #ifdef MISSING_HEADERS
812 void report_nmem_stats(void)
813 {
814     size_t in_use, is_free;
815
816     nmem_get_memory_in_use(&in_use);
817     nmem_get_memory_free(&is_free);
818
819     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
820             (long) in_use, (long) is_free);
821 }
822 #endif
823
824 struct record_cluster *show_single(struct session *s, const char *id)
825 {
826     struct record_cluster *r;
827
828     reclist_rewind(s->reclist);
829     while ((r = reclist_read_record(s->reclist)))
830         if (!strcmp(r->recid, id))
831             return r;
832     return 0;
833 }
834
835 struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, 
836                              int start, int *num, int *total, int *sumhits, 
837                              NMEM nmem_show)
838 {
839     struct record_cluster **recs = nmem_malloc(nmem_show, *num 
840                                        * sizeof(struct record_cluster *));
841     struct reclist_sortparms *spp;
842     int i;
843 #if USE_TIMING    
844     yaz_timing_t t = yaz_timing_create();
845 #endif
846
847     if (!s->relevance)
848     {
849         *num = 0;
850         *total = 0;
851         *sumhits = 0;
852         recs = 0;
853     }
854     else
855     {
856         for (spp = sp; spp; spp = spp->next)
857             if (spp->type == Metadata_sortkey_relevance)
858             {
859                 relevance_prepare_read(s->relevance, s->reclist);
860                 break;
861             }
862         reclist_sort(s->reclist, sp);
863         
864         *total = s->reclist->num_records;
865         *sumhits = s->total_hits;
866         
867         for (i = 0; i < start; i++)
868             if (!reclist_read_record(s->reclist))
869             {
870                 *num = 0;
871                 recs = 0;
872                 break;
873             }
874         
875         for (i = 0; i < *num; i++)
876         {
877             struct record_cluster *r = reclist_read_record(s->reclist);
878             if (!r)
879             {
880                 *num = i;
881                 break;
882             }
883             recs[i] = r;
884         }
885     }
886 #if USE_TIMING
887     yaz_timing_stop(t);
888     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
889             yaz_timing_get_real(t), yaz_timing_get_user(t),
890             yaz_timing_get_sys(t));
891     yaz_timing_destroy(&t);
892 #endif
893     return recs;
894 }
895
896 void statistics(struct session *se, struct statistics *stat)
897 {
898     struct client *cl;
899     int count = 0;
900
901     memset(stat, 0, sizeof(*stat));
902     for (cl = se->clients; cl; cl = client_next_in_session(cl))
903     {
904         if (!client_get_connection(cl))
905             stat->num_no_connection++;
906         switch (client_get_state(cl))
907         {
908             case Client_Connecting: stat->num_connecting++; break;
909             case Client_Initializing: stat->num_initializing++; break;
910             case Client_Searching: stat->num_searching++; break;
911             case Client_Presenting: stat->num_presenting++; break;
912             case Client_Idle: stat->num_idle++; break;
913             case Client_Failed: stat->num_failed++; break;
914             case Client_Error: stat->num_error++; break;
915             default: break;
916         }
917         count++;
918     }
919     stat->num_hits = se->total_hits;
920     stat->num_records = se->total_records;
921
922     stat->num_clients = count;
923 }
924
925 void start_http_listener(void)
926 {
927     char hp[128] = "";
928     struct conf_server *ser = global_parameters.server;
929
930     if (*global_parameters.listener_override)
931         strcpy(hp, global_parameters.listener_override);
932     else
933     {
934         strcpy(hp, ser->host ? ser->host : "");
935         if (ser->port)
936         {
937             if (*hp)
938                 strcat(hp, ":");
939             sprintf(hp + strlen(hp), "%d", ser->port);
940         }
941     }
942     http_init(hp);
943 }
944
945 void start_proxy(void)
946 {
947     char hp[128] = "";
948     struct conf_server *ser = global_parameters.server;
949
950     if (*global_parameters.proxy_override)
951         strcpy(hp, global_parameters.proxy_override);
952     else if (ser->proxy_host || ser->proxy_port)
953     {
954         strcpy(hp, ser->proxy_host ? ser->proxy_host : "");
955         if (ser->proxy_port)
956         {
957             if (*hp)
958                 strcat(hp, ":");
959             sprintf(hp + strlen(hp), "%d", ser->proxy_port);
960         }
961     }
962     else
963         return;
964
965     http_set_proxyaddr(hp, ser->myurl ? ser->myurl : "");
966 }
967
968
969 // Master list of connections we're handling events to
970 static IOCHAN channel_list = 0; 
971 void pazpar2_add_channel(IOCHAN chan)
972 {
973     chan->next = channel_list;
974     channel_list = chan;
975 }
976
977 void pazpar2_event_loop()
978 {
979     event_loop(&channel_list);
980 }
981
982 static struct record_metadata *record_metadata_init(
983     NMEM nmem, char *value, enum conf_metadata_type type)
984 {
985     struct record_metadata *rec_md = record_metadata_create(nmem);
986     if (type == Metadata_type_generic)
987     {
988         char * p = value;
989         p = normalize7bit_generic(p, " ,/.:([");
990         
991         rec_md->data.text = nmem_strdup(nmem, p);
992     }
993     else if (type == Metadata_type_year)
994     {
995         int first, last;
996         if (extract7bit_years((char *) value, &first, &last) < 0)
997             return 0;
998         rec_md->data.number.min = first;
999         rec_md->data.number.max = last;
1000     }
1001     else
1002         return 0;
1003     return rec_md;
1004 }
1005
1006 struct record *ingest_record(struct client *cl, Z_External *rec,
1007                              int record_no)
1008 {
1009     xmlDoc *xdoc = normalize_record(client_get_database(cl), rec);
1010     xmlNode *root, *n;
1011     struct record *record;
1012     struct record_cluster *cluster;
1013     struct session *se = client_get_session(cl);
1014     xmlChar *mergekey, *mergekey_norm;
1015     xmlChar *type = 0;
1016     xmlChar *value = 0;
1017     struct conf_service *service = global_parameters.server->service;
1018
1019     if (!xdoc)
1020         return 0;
1021
1022     root = xmlDocGetRootElement(xdoc);
1023     if (!(mergekey = xmlGetProp(root, (xmlChar *) "mergekey")))
1024     {
1025         yaz_log(YLOG_WARN, "No mergekey found in record");
1026         xmlFreeDoc(xdoc);
1027         return 0;
1028     }
1029
1030     record = record_create(se->nmem, 
1031                            service->num_metadata, service->num_sortkeys, cl,
1032                            record_no);
1033
1034     mergekey_norm = (xmlChar *) nmem_strdup(se->nmem, (char*) mergekey);
1035     xmlFree(mergekey);
1036     normalize7bit_mergekey((char *) mergekey_norm, 0);
1037
1038     cluster = reclist_insert(se->reclist, 
1039                              global_parameters.server->service, 
1040                              record, (char *) mergekey_norm, 
1041                              &se->total_merged);
1042     if (global_parameters.dump_records)
1043         yaz_log(YLOG_LOG, "Cluster id %s from %s (#%d)", cluster->recid,
1044                 client_get_database(cl)->database->url, record_no);
1045     if (!cluster)
1046     {
1047         /* no room for record */
1048         xmlFreeDoc(xdoc);
1049         return 0;
1050     }
1051     relevance_newrec(se->relevance, cluster);
1052
1053
1054     // now parsing XML record and adding data to cluster or record metadata
1055     for (n = root->children; n; n = n->next)
1056     {
1057         if (type)
1058             xmlFree(type);
1059         if (value)
1060             xmlFree(value);
1061         type = value = 0;
1062
1063         if (n->type != XML_ELEMENT_NODE)
1064             continue;
1065         if (!strcmp((const char *) n->name, "metadata"))
1066         {
1067             struct conf_metadata *ser_md = 0;
1068             struct conf_sortkey *ser_sk = 0;
1069             struct record_metadata **wheretoput = 0;
1070             struct record_metadata *rec_md = 0;
1071             int md_field_id = -1;
1072             int sk_field_id = -1;
1073
1074             type = xmlGetProp(n, (xmlChar *) "type");
1075             value = xmlNodeListGetString(xdoc, n->children, 1);
1076
1077             if (!type || !value)
1078                 continue;
1079
1080             md_field_id 
1081                 = conf_service_metadata_field_id(service, (const char *) type);
1082             if (md_field_id < 0)
1083             {
1084                 yaz_log(YLOG_WARN, 
1085                         "Ignoring unknown metadata element: %s", type);
1086                 continue;
1087             }
1088
1089             ser_md = &service->metadata[md_field_id];
1090
1091             if (ser_md->sortkey_offset >= 0){
1092                 sk_field_id = ser_md->sortkey_offset;
1093                 ser_sk = &service->sortkeys[sk_field_id];
1094             }
1095
1096             // non-merged metadata
1097             rec_md = record_metadata_init(se->nmem, (char *) value,
1098                                           ser_md->type);
1099             if (!rec_md)
1100             {
1101                 yaz_log(YLOG_WARN, "bad metadata data '%s' for element '%s'",
1102                         value, type);
1103                 continue;
1104             }
1105             rec_md->next = record->metadata[md_field_id];
1106             record->metadata[md_field_id] = rec_md;
1107
1108             // merged metadata
1109             rec_md = record_metadata_init(se->nmem, (char *) value,
1110                                           ser_md->type);
1111             wheretoput = &cluster->metadata[md_field_id];
1112
1113             // and polulate with data:
1114             // assign cluster or record based on merge action
1115             if (ser_md->merge == Metadata_merge_unique)
1116             {
1117                 struct record_metadata *mnode;
1118                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
1119                     if (!strcmp((const char *) mnode->data.text, 
1120                                 rec_md->data.text))
1121                         break;
1122                 if (!mnode)
1123                 {
1124                     rec_md->next = *wheretoput;
1125                     *wheretoput = rec_md;
1126                 }
1127             }
1128             else if (ser_md->merge == Metadata_merge_longest)
1129             {
1130                 if (!*wheretoput 
1131                     || strlen(rec_md->data.text) 
1132                        > strlen((*wheretoput)->data.text))
1133                 {
1134                     *wheretoput = rec_md;
1135                     if (ser_sk)
1136                     {
1137                         char *s = nmem_strdup(se->nmem, rec_md->data.text);
1138                         if (!cluster->sortkeys[sk_field_id])
1139                             cluster->sortkeys[sk_field_id] = 
1140                                 nmem_malloc(se->nmem, 
1141                                             sizeof(union data_types));
1142                         normalize7bit_mergekey(s,
1143                              (ser_sk->type == Metadata_sortkey_skiparticle));
1144                         cluster->sortkeys[sk_field_id]->text = s;
1145                     }
1146                 }
1147             }
1148             else if (ser_md->merge == Metadata_merge_all)
1149             {
1150                 rec_md->next = *wheretoput;
1151                 *wheretoput = rec_md;
1152             }
1153             else if (ser_md->merge == Metadata_merge_range)
1154             {
1155                 if (!*wheretoput)
1156                 {
1157                     *wheretoput = rec_md;
1158                     if (ser_sk)
1159                         cluster->sortkeys[sk_field_id] 
1160                             = &rec_md->data;
1161                 }
1162                 else
1163                 {
1164                     int this_min = rec_md->data.number.min;
1165                     int this_max = rec_md->data.number.max;
1166                     if (this_min < (*wheretoput)->data.number.min)
1167                         (*wheretoput)->data.number.min = this_min;
1168                     if (this_max > (*wheretoput)->data.number.max)
1169                         (*wheretoput)->data.number.max = this_max;
1170                 }
1171 #ifdef GAGA
1172                 if (ser_sk)
1173                 {
1174                     union data_types *sdata 
1175                         = cluster->sortkeys[sk_field_id];
1176                     yaz_log(YLOG_LOG, "SK range: %d-%d",
1177                             sdata->number.min, sdata->number.max);
1178                 }
1179 #endif
1180             }
1181
1182
1183             // ranking of _all_ fields enabled ... 
1184             if (ser_md->rank)
1185                 relevance_countwords(se->relevance, cluster, 
1186                                      (char *) value, ser_md->rank);
1187
1188             // construct facets ... 
1189             if (ser_md->termlist)
1190             {
1191                 if (ser_md->type == Metadata_type_year)
1192                 {
1193                     char year[64];
1194                     sprintf(year, "%d", rec_md->data.number.max);
1195                     add_facet(se, (char *) type, year);
1196                     if (rec_md->data.number.max != rec_md->data.number.min)
1197                     {
1198                         sprintf(year, "%d", rec_md->data.number.min);
1199                         add_facet(se, (char *) type, year);
1200                     }
1201                 }
1202                 else
1203                     add_facet(se, (char *) type, (char *) value);
1204             }
1205
1206             // cleaning up
1207             xmlFree(type);
1208             xmlFree(value);
1209             type = value = 0;
1210         }
1211         else
1212             yaz_log(YLOG_WARN,
1213                     "Unexpected element %s in internal record", n->name);
1214     }
1215     if (type)
1216         xmlFree(type);
1217     if (value)
1218         xmlFree(value);
1219
1220     xmlFreeDoc(xdoc);
1221
1222     relevance_donerecord(se->relevance, cluster);
1223     se->total_records++;
1224
1225     return record;
1226 }
1227
1228
1229
1230 /*
1231  * Local variables:
1232  * c-basic-offset: 4
1233  * indent-tabs-mode: nil
1234  * End:
1235  * vim: shiftwidth=4 tabstop=8 expandtab
1236  */