Allows definition/override of pz:cclmap:* settings via settings ws.
[pazpar2-moved-to-github.git] / src / logic.c
1 /* $Id: logic.c,v 1.12 2007-04-20 04:32:33 quinn Exp $
2    Copyright (c) 2006-2007, Index Data.
3
4 This file is part of Pazpar2.
5
6 Pazpar2 is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Pazpar2; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <string.h>
25 #include <sys/time.h>
26 #include <unistd.h>
27 #include <sys/socket.h>
28 #include <netdb.h>
29 #include <signal.h>
30 #include <ctype.h>
31 #include <assert.h>
32
33 #include <yaz/marcdisp.h>
34 #include <yaz/comstack.h>
35 #include <yaz/tcpip.h>
36 #include <yaz/proto.h>
37 #include <yaz/readconf.h>
38 #include <yaz/pquery.h>
39 #include <yaz/otherinfo.h>
40 #include <yaz/yaz-util.h>
41 #include <yaz/nmem.h>
42 #include <yaz/query-charset.h>
43 #include <yaz/querytowrbuf.h>
44 #if YAZ_VERSIONL >= 0x020163
45 #include <yaz/oid_db.h>
46 #endif
47
48 #if HAVE_CONFIG_H
49 #include "cconfig.h"
50 #endif
51
52 #define USE_TIMING 0
53 #if USE_TIMING
54 #include <yaz/timing.h>
55 #endif
56
57 #include <netinet/in.h>
58
59 #include "pazpar2.h"
60 #include "eventl.h"
61 #include "http.h"
62 #include "termlists.h"
63 #include "reclists.h"
64 #include "relevance.h"
65 #include "config.h"
66 #include "database.h"
67 #include "settings.h"
68
69 #define MAX_CHUNK 15
70
71 static void client_fatal(struct client *cl);
72 static void connection_destroy(struct connection *co);
73 static int client_prep_connection(struct client *cl);
74 static void ingest_records(struct client *cl, Z_Records *r);
75 void session_alert_watch(struct session *s, int what);
76
77 static struct connection *connection_freelist = 0;
78 static struct client *client_freelist = 0;
79
80 static char *client_states[] = {
81     "Client_Connecting",
82     "Client_Connected",
83     "Client_Idle",
84     "Client_Initializing",
85     "Client_Searching",
86     "Client_Presenting",
87     "Client_Error",
88     "Client_Failed",
89     "Client_Disconnected",
90     "Client_Stopped"
91 };
92
93 // Note: Some things in this structure will eventually move to configuration
94 struct parameters global_parameters = 
95 {
96     "",
97     "",
98     "",
99     "",
100     0,
101     0,
102     30,
103     "81",
104     "Index Data PazPar2",
105     VERSION,
106     600, // 10 minutes
107     60,
108     100,
109     MAX_CHUNK,
110     0,
111     0
112 };
113
114 static int send_apdu(struct client *c, Z_APDU *a)
115 {
116     struct connection *co = c->connection;
117     char *buf;
118     int len, r;
119
120     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
121     {
122         odr_perror(global_parameters.odr_out, "Encoding APDU");
123         abort();
124     }
125     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
126     r = cs_put(co->link, buf, len);
127     if (r < 0)
128     {
129         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
130         return -1;
131     }
132     else if (r == 1)
133     {
134         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
135         exit(1);
136     }
137     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
138     co->state = Conn_Waiting;
139     return 0;
140 }
141
142 // Set authentication token in init if one is set for the client
143 // TODO: Extend this to handle other schemes than open (should be simple)
144 static void init_authentication(struct client *cl, Z_InitRequest *req)
145 {
146     struct session_database *sdb = cl->database;
147     char *auth = session_setting_oneval(sdb, PZ_AUTHENTICATION);
148
149     if (auth)
150     {
151         Z_IdAuthentication *idAuth = odr_malloc(global_parameters.odr_out,
152                 sizeof(*idAuth));
153         idAuth->which = Z_IdAuthentication_open;
154         idAuth->u.open = auth;
155         req->idAuthentication = idAuth;
156     }
157 }
158
159 static void send_init(IOCHAN i)
160 {
161
162     struct connection *co = iochan_getdata(i);
163     struct client *cl = co->client;
164     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
165
166     a->u.initRequest->implementationId = global_parameters.implementationId;
167     a->u.initRequest->implementationName = global_parameters.implementationName;
168     a->u.initRequest->implementationVersion =
169         global_parameters.implementationVersion;
170     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
171     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
172     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
173
174     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
175     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
176     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
177
178     init_authentication(cl, a->u.initRequest);
179
180     /* add virtual host if tunneling through Z39.50 proxy */
181     
182     if (0 < strlen(global_parameters.zproxy_override) 
183         && 0 < strlen(cl->database->database->url))
184     {
185 #if YAZ_VERSIONL >= 0x020163
186         yaz_oi_set_string_oid(&a->u.initRequest->otherInfo,
187                               global_parameters.odr_out,
188                               yaz_oid_userinfo_proxy,
189                               1, cl->database->database->url);
190 #else
191         yaz_oi_set_string_oidval(&a->u.initRequest->otherInfo,
192                                  global_parameters.odr_out, VAL_PROXY,
193                                  1, cl->database->database->url);
194 #endif
195     }
196
197     if (send_apdu(cl, a) >= 0)
198     {
199         iochan_setflags(i, EVENT_INPUT);
200         cl->state = Client_Initializing;
201     }
202     else
203         cl->state = Client_Error;
204     odr_reset(global_parameters.odr_out);
205 }
206
207 // Recursively traverse query structure to extract terms.
208 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
209 {
210     char **words;
211     int numwords;
212     int i;
213
214     switch (n->kind)
215     {
216         case CCL_RPN_AND:
217         case CCL_RPN_OR:
218         case CCL_RPN_NOT:
219         case CCL_RPN_PROX:
220             pull_terms(nmem, n->u.p[0], termlist, num);
221             pull_terms(nmem, n->u.p[1], termlist, num);
222             break;
223         case CCL_RPN_TERM:
224             nmem_strsplit(nmem, " ", n->u.t.term, &words, &numwords);
225             for (i = 0; i < numwords; i++)
226                 termlist[(*num)++] = words[i];
227             break;
228         default: // NOOP
229             break;
230     }
231 }
232
233 // Extract terms from query into null-terminated termlist
234 static void extract_terms(NMEM nmem, struct ccl_rpn_node *query, char **termlist)
235 {
236     int num = 0;
237
238     pull_terms(nmem, query, termlist, &num);
239     termlist[num] = 0;
240 }
241
242 static void send_search(IOCHAN i)
243 {
244     struct connection *co = iochan_getdata(i);
245     struct client *cl = co->client; 
246     struct session *se = cl->session;
247     struct session_database *sdb = cl->database;
248     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
249     int ndb;
250     char **databaselist;
251     Z_Query *zquery;
252     int ssub = 0, lslb = 100000, mspn = 10;
253     char *recsyn = 0;
254     char *piggyback = 0;
255     char *queryenc = 0;
256     yaz_iconv_t iconv = 0;
257
258     yaz_log(YLOG_DEBUG, "Sending search to %s", cl->database->database->url);
259
260     // constructing RPN query
261     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
262             sizeof(Z_Query));
263     zquery->which = Z_Query_type_1;
264     zquery->u.type_1 = p_query_rpn(global_parameters.odr_out, cl->pquery);
265
266     // converting to target encoding
267     if ((queryenc = session_setting_oneval(sdb, PZ_QUERYENCODING))){
268         iconv = yaz_iconv_open(queryenc, "UTF-8");
269         if (iconv){
270             yaz_query_charset_convert_rpnquery(zquery->u.type_1, 
271                                                global_parameters.odr_out, 
272                                                iconv);
273             yaz_iconv_close(iconv);
274         } else
275             yaz_log(YLOG_WARN, "Query encoding failed %s %s", 
276                     cl->database->database->url, queryenc);
277     }
278
279     for (ndb = 0; sdb->database->databases[ndb]; ndb++)
280         ;
281     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
282     for (ndb = 0; sdb->database->databases[ndb]; ndb++)
283         databaselist[ndb] = sdb->database->databases[ndb];
284
285     if (!(piggyback = session_setting_oneval(sdb, PZ_PIGGYBACK)) || *piggyback == '1')
286     {
287         if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
288         {
289 #if YAZ_VERSIONL >= 0x020163
290             a->u.searchRequest->preferredRecordSyntax =
291                 yaz_string_to_oid_odr(yaz_oid_std(),
292                                       CLASS_RECSYN, recsyn,
293                                       global_parameters.odr_out);
294 #else
295             a->u.searchRequest->preferredRecordSyntax =
296                 yaz_str_to_z3950oid(global_parameters.odr_out,
297                                     CLASS_RECSYN, recsyn);
298 #endif
299         }
300         a->u.searchRequest->smallSetUpperBound = &ssub;
301         a->u.searchRequest->largeSetLowerBound = &lslb;
302         a->u.searchRequest->mediumSetPresentNumber = &mspn;
303     }
304     a->u.searchRequest->resultSetName = "Default";
305     a->u.searchRequest->databaseNames = databaselist;
306     a->u.searchRequest->num_databaseNames = ndb;
307
308     
309     {  //scope for sending and logging queries 
310         WRBUF wbquery = wrbuf_alloc();
311         yaz_query_to_wrbuf(wbquery, zquery);
312
313
314         if (send_apdu(cl, a) >= 0)
315             {
316                 iochan_setflags(i, EVENT_INPUT);
317                 cl->state = Client_Searching;
318                 cl->requestid = se->requestid;
319                 yaz_log(YLOG_LOG, "SearchRequest %s %s %s", 
320                          cl->database->database->url,
321                         queryenc ? queryenc : "UTF-8",
322                         wrbuf_cstr(wbquery));
323             }
324         else {
325             cl->state = Client_Error;
326                 yaz_log(YLOG_WARN, "Failed SearchRequest %s  %s %s", 
327                          cl->database->database->url, 
328                         queryenc ? queryenc : "UTF-8",
329                         wrbuf_cstr(wbquery));
330         }
331         
332         wrbuf_destroy(wbquery);
333     }    
334
335     odr_reset(global_parameters.odr_out);
336 }
337
338 static void send_present(IOCHAN i)
339 {
340     struct connection *co = iochan_getdata(i);
341     struct client *cl = co->client; 
342     struct session_database *sdb = cl->database;
343     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
344     int toget;
345     int start = cl->records + 1;
346     char *recsyn;
347
348     toget = global_parameters.chunk;
349     if (toget > global_parameters.toget - cl->records)
350         toget = global_parameters.toget - cl->records;
351     if (toget > cl->hits - cl->records)
352         toget = cl->hits - cl->records;
353
354     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
355
356     a->u.presentRequest->resultSetStartPoint = &start;
357     a->u.presentRequest->numberOfRecordsRequested = &toget;
358
359     a->u.presentRequest->resultSetId = "Default";
360
361     if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
362     {
363 #if YAZ_VERSIONL >= 0x020163
364         a->u.presentRequest->preferredRecordSyntax =
365             yaz_string_to_oid_odr(yaz_oid_std(),
366                                   CLASS_RECSYN, recsyn,
367                                   global_parameters.odr_out);
368 #else
369         a->u.presentRequest->preferredRecordSyntax =
370             yaz_str_to_z3950oid(global_parameters.odr_out,
371                                 CLASS_RECSYN, recsyn);
372 #endif
373     }
374
375     if (send_apdu(cl, a) >= 0)
376     {
377         iochan_setflags(i, EVENT_INPUT);
378         cl->state = Client_Presenting;
379     }
380     else
381         cl->state = Client_Error;
382     odr_reset(global_parameters.odr_out);
383 }
384
385 static void do_initResponse(IOCHAN i, Z_APDU *a)
386 {
387     struct connection *co = iochan_getdata(i);
388     struct client *cl = co->client;
389     Z_InitResponse *r = a->u.initResponse;
390
391     yaz_log(YLOG_DEBUG, "Init response %s", cl->database->database->url);
392
393     if (*r->result)
394     {
395         cl->state = Client_Idle;
396     }
397     else
398         cl->state = Client_Failed; // FIXME need to do something to the connection
399 }
400
401 static void do_searchResponse(IOCHAN i, Z_APDU *a)
402 {
403     struct connection *co = iochan_getdata(i);
404     struct client *cl = co->client;
405     struct session *se = cl->session;
406     Z_SearchResponse *r = a->u.searchResponse;
407
408     yaz_log(YLOG_DEBUG, "Search response %s (status=%d)", 
409             cl->database->database->url, *r->searchStatus);
410
411     if (*r->searchStatus)
412     {
413         cl->hits = *r->resultCount;
414         se->total_hits += cl->hits;
415         if (r->presentStatus && !*r->presentStatus && r->records)
416         {
417             yaz_log(YLOG_DEBUG, "Records in search response %s", 
418                     cl->database->database->url);
419             ingest_records(cl, r->records);
420         }
421         cl->state = Client_Idle;
422     }
423     else
424     {          /*"FAILED"*/
425         cl->hits = 0;
426         cl->state = Client_Error;
427         if (r->records) {
428             Z_Records *recs = r->records;
429             if (recs->which == Z_Records_NSD)
430             {
431                 yaz_log(YLOG_WARN, 
432                         "Search response: Non-surrogate diagnostic %s",
433                         cl->database->database->url);
434                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
435                 cl->state = Client_Error;
436             }
437         }
438     }
439 }
440
441 static void do_closeResponse(IOCHAN i, Z_APDU *a)
442 {
443     struct connection *co = iochan_getdata(i);
444     struct client *cl = co->client;
445     /* Z_Close *r = a->u.close; */
446
447     yaz_log(YLOG_WARN, "Close response %s", cl->database->database->url);
448
449     cl->state = Client_Failed;
450     connection_destroy(co);
451 }
452
453
454 char *normalize_mergekey(char *buf, int skiparticle)
455 {
456     char *p = buf, *pout = buf;
457
458     if (skiparticle)
459     {
460         char firstword[64];
461         char articles[] = "the den der die des an a "; // must end in space
462
463         while (*p && !isalnum(*p))
464             p++;
465         pout = firstword;
466         while (*p && *p != ' ' && pout - firstword < 62)
467             *(pout++) = tolower(*(p++));
468         *(pout++) = ' ';
469         *(pout++) = '\0';
470         if (!strstr(articles, firstword))
471             p = buf;
472         pout = buf;
473     }
474
475     while (*p)
476     {
477         while (*p && !isalnum(*p))
478             p++;
479         while (isalnum(*p))
480             *(pout++) = tolower(*(p++));
481         if (*p)
482             *(pout++) = ' ';
483         while (*p && !isalnum(*p))
484             p++;
485     }
486     if (buf != pout)
487         do {
488             *(pout--) = '\0';
489         }
490         while (pout > buf && *pout == ' ');
491
492     return buf;
493 }
494
495 static void add_facet(struct session *s, const char *type, const char *value)
496 {
497     int i;
498
499     if (!*value)
500         return;
501     for (i = 0; i < s->num_termlists; i++)
502         if (!strcmp(s->termlists[i].name, type))
503             break;
504     if (i == s->num_termlists)
505     {
506         if (i == SESSION_MAX_TERMLISTS)
507         {
508             yaz_log(YLOG_FATAL, "Too many termlists");
509             exit(1);
510         }
511         s->termlists[i].name = nmem_strdup(s->nmem, type);
512         s->termlists[i].termlist = termlist_create(s->nmem, s->expected_maxrecs, 15);
513         s->num_termlists = i + 1;
514     }
515     termlist_insert(s->termlists[i].termlist, value);
516 }
517
518 static xmlDoc *normalize_record(struct client *cl, Z_External *rec)
519 {
520     struct database_retrievalmap *m;
521     struct database *db = cl->database->database;
522     xmlNode *res;
523     xmlDoc *rdoc;
524
525     // First normalize to XML
526     if (db->yaz_marc)
527     {
528         char *buf;
529         int len;
530         if (rec->which != Z_External_octet)
531         {
532             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER %s",
533                     cl->database->database->url);
534             return 0;
535         }
536         buf = (char*) rec->u.octet_aligned->buf;
537         len = rec->u.octet_aligned->len;
538         if (yaz_marc_read_iso2709(db->yaz_marc, buf, len) < 0)
539         {
540             yaz_log(YLOG_WARN, "Failed to decode MARC %s",
541                     cl->database->database->url);
542             return 0;
543         }
544
545         yaz_marc_write_using_libxml2(db->yaz_marc, 1);
546         if (yaz_marc_write_xml(db->yaz_marc, &res,
547                     "http://www.loc.gov/MARC21/slim", 0, 0) < 0)
548         {
549             yaz_log(YLOG_WARN, "Failed to encode as XML %s",
550                     cl->database->database->url);
551             return 0;
552         }
553         rdoc = xmlNewDoc((xmlChar *) "1.0");
554         xmlDocSetRootElement(rdoc, res);
555
556     }
557     else
558     {
559         yaz_log(YLOG_FATAL, 
560                 "Unknown native_syntax in normalize_record from %s",
561                 cl->database->database->url);
562         exit(1);
563     }
564
565     if (global_parameters.dump_records){
566         fprintf(stderr, 
567                 "Input Record (normalized) from %s\n----------------\n",
568                 cl->database->database->url);
569 #if LIBXML_VERSION >= 20600
570         xmlDocFormatDump(stderr, rdoc, 1);
571 #else
572         xmlDocDump(stderr, rdoc);
573 #endif
574     }
575
576     for (m = db->map; m; m = m->next){
577         xmlDoc *new = 0;
578
579 #if 1
580         {
581             xmlNodePtr root = 0;
582             new = xsltApplyStylesheet(m->stylesheet, rdoc, 0);
583             root= xmlDocGetRootElement(new);
584         if (!new || !root || !(root->children))
585         {
586             yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
587                     cl->database->database->url);
588             xmlFreeDoc(new);
589             xmlFreeDoc(rdoc);
590             return 0;
591         }
592         }
593 #endif
594
595 #if 0
596         // do it another way to detect transformation errors right now
597         // but does not seem to work either!
598         {
599             xsltTransformContextPtr ctxt;
600             ctxt = xsltNewTransformContext(m->stylesheet, rdoc);
601             new = xsltApplyStylesheetUser(m->stylesheet, rdoc, 0, 0, 0, ctxt);
602             if ((ctxt->state == XSLT_STATE_ERROR) ||
603                 (ctxt->state == XSLT_STATE_STOPPED)){
604                 yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
605                         cl->database->database->url);
606                 xmlFreeDoc(new);
607                 xmlFreeDoc(rdoc);
608                 return 0;
609             }
610         }
611 #endif      
612    
613         xmlFreeDoc(rdoc);
614         rdoc = new;
615     }
616     if (global_parameters.dump_records)
617     {
618         fprintf(stderr, "Record from %s\n----------------\n", 
619                 cl->database->database->url);
620 #if LIBXML_VERSION >= 20600
621         xmlDocFormatDump(stderr, rdoc, 1);
622 #else
623         xmlDocDump(stderr, rdoc);
624 #endif
625     }
626     return rdoc;
627 }
628
629 // Extract what appears to be years from buf, storing highest and
630 // lowest values.
631 static int extract_years(const char *buf, int *first, int *last)
632 {
633     *first = -1;
634     *last = -1;
635     while (*buf)
636     {
637         const char *e;
638         int len;
639
640         while (*buf && !isdigit(*buf))
641             buf++;
642         len = 0;
643         for (e = buf; *e && isdigit(*e); e++)
644             len++;
645         if (len == 4)
646         {
647             int value = atoi(buf);
648             if (*first < 0 || value < *first)
649                 *first = value;
650             if (*last < 0 || value > *last)
651                 *last = value;
652         }
653         buf = e;
654     }
655     return *first;
656 }
657
658 static struct record *ingest_record(struct client *cl, Z_External *rec)
659 {
660     xmlDoc *xdoc = normalize_record(cl, rec);
661     xmlNode *root, *n;
662     struct record *res;
663     struct record_cluster *cluster;
664     struct session *se = cl->session;
665     xmlChar *mergekey, *mergekey_norm;
666     xmlChar *type = 0;
667     xmlChar *value = 0;
668     struct conf_service *service = global_parameters.server->service;
669
670     if (!xdoc)
671         return 0;
672
673     root = xmlDocGetRootElement(xdoc);
674     if (!(mergekey = xmlGetProp(root, (xmlChar *) "mergekey")))
675     {
676         yaz_log(YLOG_WARN, "No mergekey found in record");
677         xmlFreeDoc(xdoc);
678         return 0;
679     }
680
681     res = nmem_malloc(se->nmem, sizeof(struct record));
682     res->next = 0;
683     res->client = cl;
684     res->metadata = nmem_malloc(se->nmem,
685             sizeof(struct record_metadata*) * service->num_metadata);
686     memset(res->metadata, 0, sizeof(struct record_metadata*) * service->num_metadata);
687
688     mergekey_norm = (xmlChar *) nmem_strdup(se->nmem, (char*) mergekey);
689     xmlFree(mergekey);
690     normalize_mergekey((char *) mergekey_norm, 0);
691
692     cluster = reclist_insert(se->reclist, 
693                              global_parameters.server->service, 
694                              res, (char *) mergekey_norm, 
695                              &se->total_merged);
696     if (global_parameters.dump_records)
697         yaz_log(YLOG_LOG, "Cluster id %d from %s (#%d)", cluster->recid,
698                 cl->database->database->url, cl->records);
699     if (!cluster)
700     {
701         /* no room for record */
702         xmlFreeDoc(xdoc);
703         return 0;
704     }
705     relevance_newrec(se->relevance, cluster);
706
707     for (n = root->children; n; n = n->next)
708     {
709         if (type)
710             xmlFree(type);
711         if (value)
712             xmlFree(value);
713         type = value = 0;
714
715         if (n->type != XML_ELEMENT_NODE)
716             continue;
717         if (!strcmp((const char *) n->name, "metadata"))
718         {
719             struct conf_metadata *md = 0;
720             struct conf_sortkey *sk = 0;
721             struct record_metadata **wheretoput, *newm;
722             int imeta;
723             int first, last;
724
725             type = xmlGetProp(n, (xmlChar *) "type");
726             value = xmlNodeListGetString(xdoc, n->children, 0);
727
728             if (!type || !value)
729                 continue;
730
731             // First, find out what field we're looking at
732             for (imeta = 0; imeta < service->num_metadata; imeta++)
733                 if (!strcmp((const char *) type, service->metadata[imeta].name))
734                 {
735                     md = &service->metadata[imeta];
736                     if (md->sortkey_offset >= 0)
737                         sk = &service->sortkeys[md->sortkey_offset];
738                     break;
739                 }
740             if (!md)
741             {
742                 yaz_log(YLOG_WARN, "Ignoring unknown metadata element: %s", type);
743                 continue;
744             }
745
746             // Find out where we are putting it
747             if (md->merge == Metadata_merge_no)
748                 wheretoput = &res->metadata[imeta];
749             else
750                 wheretoput = &cluster->metadata[imeta];
751             
752             // Put it there
753             newm = nmem_malloc(se->nmem, sizeof(struct record_metadata));
754             newm->next = 0;
755             if (md->type == Metadata_type_generic)
756             {
757                 char *p, *pe;
758                 for (p = (char *) value; *p && isspace(*p); p++)
759                     ;
760                 for (pe = p + strlen(p) - 1;
761                         pe > p && strchr(" ,/.:([", *pe); pe--)
762                     *pe = '\0';
763                 newm->data.text = nmem_strdup(se->nmem, p);
764
765             }
766             else if (md->type == Metadata_type_year)
767             {
768                 if (extract_years((char *) value, &first, &last) < 0)
769                     continue;
770             }
771             else
772             {
773                 yaz_log(YLOG_WARN, "Unknown type in metadata element %s", type);
774                 continue;
775             }
776             if (md->type == Metadata_type_year && md->merge != Metadata_merge_range)
777             {
778                 yaz_log(YLOG_WARN, "Only range merging supported for years");
779                 continue;
780             }
781             if (md->merge == Metadata_merge_unique)
782             {
783                 struct record_metadata *mnode;
784                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
785                     if (!strcmp((const char *) mnode->data.text, newm->data.text))
786                         break;
787                 if (!mnode)
788                 {
789                     newm->next = *wheretoput;
790                     *wheretoput = newm;
791                 }
792             }
793             else if (md->merge == Metadata_merge_longest)
794             {
795                 if (!*wheretoput ||
796                         strlen(newm->data.text) > strlen((*wheretoput)->data.text))
797                 {
798                     *wheretoput = newm;
799                     if (sk)
800                     {
801                         char *s = nmem_strdup(se->nmem, newm->data.text);
802                         if (!cluster->sortkeys[md->sortkey_offset])
803                             cluster->sortkeys[md->sortkey_offset] = 
804                                 nmem_malloc(se->nmem, sizeof(union data_types));
805                         normalize_mergekey(s,
806                                 (sk->type == Metadata_sortkey_skiparticle));
807                         cluster->sortkeys[md->sortkey_offset]->text = s;
808                     }
809                 }
810             }
811             else if (md->merge == Metadata_merge_all || md->merge == Metadata_merge_no)
812             {
813                 newm->next = *wheretoput;
814                 *wheretoput = newm;
815             }
816             else if (md->merge == Metadata_merge_range)
817             {
818                 assert(md->type == Metadata_type_year);
819                 if (!*wheretoput)
820                 {
821                     *wheretoput = newm;
822                     (*wheretoput)->data.number.min = first;
823                     (*wheretoput)->data.number.max = last;
824                     if (sk)
825                         cluster->sortkeys[md->sortkey_offset] = &newm->data;
826                 }
827                 else
828                 {
829                     if (first < (*wheretoput)->data.number.min)
830                         (*wheretoput)->data.number.min = first;
831                     if (last > (*wheretoput)->data.number.max)
832                         (*wheretoput)->data.number.max = last;
833                 }
834 #ifdef GAGA
835                 if (sk)
836                 {
837                     union data_types *sdata = cluster->sortkeys[md->sortkey_offset];
838                     yaz_log(YLOG_LOG, "SK range: %d-%d", sdata->number.min, sdata->number.max);
839                 }
840 #endif
841             }
842             else
843                 yaz_log(YLOG_WARN, "Don't know how to merge on element name %s", md->name);
844
845             if (md->rank)
846                 relevance_countwords(se->relevance, cluster, 
847                                      (char *) value, md->rank);
848             if (md->termlist)
849             {
850                 if (md->type == Metadata_type_year)
851                 {
852                     char year[64];
853                     sprintf(year, "%d", last);
854                     add_facet(se, (char *) type, year);
855                     if (first != last)
856                     {
857                         sprintf(year, "%d", first);
858                         add_facet(se, (char *) type, year);
859                     }
860                 }
861                 else
862                     add_facet(se, (char *) type, (char *) value);
863             }
864             xmlFree(type);
865             xmlFree(value);
866             type = value = 0;
867         }
868         else
869             yaz_log(YLOG_WARN, "Unexpected element %s in internal record", n->name);
870     }
871     if (type)
872         xmlFree(type);
873     if (value)
874         xmlFree(value);
875
876     xmlFreeDoc(xdoc);
877
878     relevance_donerecord(se->relevance, cluster);
879     se->total_records++;
880
881     return res;
882 }
883
884 // Retrieve first defined value for 'name' for given database.
885 // Will be extended to take into account user associated with session
886 char *session_setting_oneval(struct session_database *db, int offset)
887 {
888     if (!db->settings[offset])
889         return "";
890     return db->settings[offset]->value;
891 }
892
893 static void ingest_records(struct client *cl, Z_Records *r)
894 {
895 #if USE_TIMING
896     yaz_timing_t t = yaz_timing_create();
897 #endif
898     struct record *rec;
899     struct session *s = cl->session;
900     Z_NamePlusRecordList *rlist;
901     int i;
902
903     if (r->which != Z_Records_DBOSD)
904         return;
905     rlist = r->u.databaseOrSurDiagnostics;
906     for (i = 0; i < rlist->num_records; i++)
907     {
908         Z_NamePlusRecord *npr = rlist->records[i];
909
910         cl->records++;
911         if (npr->which != Z_NamePlusRecord_databaseRecord)
912         {
913             yaz_log(YLOG_WARN, 
914                     "Unexpected record type, probably diagnostic %s",
915                     cl->database->database->url);
916             continue;
917         }
918
919         rec = ingest_record(cl, npr->u.databaseRecord);
920         if (!rec)
921             continue;
922     }
923     if (s->watchlist[SESSION_WATCH_RECORDS].fun && rlist->num_records)
924         session_alert_watch(s, SESSION_WATCH_RECORDS);
925
926 #if USE_TIMING
927     yaz_timing_stop(t);
928     yaz_log(YLOG_LOG, "ingest_records %6.5f %3.2f %3.2f", 
929             yaz_timing_get_real(t), yaz_timing_get_user(t),
930             yaz_timing_get_sys(t));
931     yaz_timing_destroy(&t);
932 #endif
933 }
934
935 static void do_presentResponse(IOCHAN i, Z_APDU *a)
936 {
937     struct connection *co = iochan_getdata(i);
938     struct client *cl = co->client;
939     Z_PresentResponse *r = a->u.presentResponse;
940
941     if (r->records) {
942         Z_Records *recs = r->records;
943         if (recs->which == Z_Records_NSD)
944         {
945             yaz_log(YLOG_WARN, "Non-surrogate diagnostic %s",
946                     cl->database->database->url);
947             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
948             cl->state = Client_Error;
949         }
950     }
951
952     if (!*r->presentStatus && cl->state != Client_Error)
953     {
954         yaz_log(YLOG_DEBUG, "Good Present response %s",
955                 cl->database->database->url);
956         ingest_records(cl, r->records);
957         cl->state = Client_Idle;
958     }
959     else if (*r->presentStatus) 
960     {
961         yaz_log(YLOG_WARN, "Bad Present response %s",
962                 cl->database->database->url);
963         cl->state = Client_Error;
964     }
965 }
966
967 static void handler(IOCHAN i, int event)
968 {
969     struct connection *co = iochan_getdata(i);
970     struct client *cl = co->client;
971     struct session *se = 0;
972
973     if (cl)
974         se = cl->session;
975     else
976     {
977         yaz_log(YLOG_WARN, "Destroying orphan connection");
978         connection_destroy(co);
979         return;
980     }
981
982     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
983     {
984         int errcode;
985         socklen_t errlen = sizeof(errcode);
986
987         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
988             &errlen) < 0 || errcode != 0)
989         {
990             client_fatal(cl);
991             return;
992         }
993         else
994         {
995             yaz_log(YLOG_DEBUG, "Connect OK");
996             co->state = Conn_Open;
997             if (cl)
998                 cl->state = Client_Connected;
999         }
1000     }
1001
1002     else if (event & EVENT_INPUT)
1003     {
1004         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
1005
1006         if (len < 0)
1007         {
1008             yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from %s", 
1009                     cl->database->database->url);
1010             connection_destroy(co);
1011             return;
1012         }
1013         else if (len == 0)
1014         {
1015             yaz_log(YLOG_WARN, "EOF reading from %s", cl->database->database->url);
1016             connection_destroy(co);
1017             return;
1018         }
1019         else if (len > 1) // We discard input if we have no connection
1020         {
1021             co->state = Conn_Open;
1022
1023             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
1024             {
1025                 Z_APDU *a;
1026
1027                 odr_reset(global_parameters.odr_in);
1028                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
1029                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
1030                 {
1031                     client_fatal(cl);
1032                     return;
1033                 }
1034                 switch (a->which)
1035                 {
1036                     case Z_APDU_initResponse:
1037                         do_initResponse(i, a);
1038                         break;
1039                     case Z_APDU_searchResponse:
1040                         do_searchResponse(i, a);
1041                         break;
1042                     case Z_APDU_presentResponse:
1043                         do_presentResponse(i, a);
1044                         break;
1045                     case Z_APDU_close:
1046                         do_closeResponse(i, a);
1047                         break;
1048                     default:
1049                         yaz_log(YLOG_WARN, 
1050                                 "Unexpected Z39.50 response from %s",  
1051                                 cl->database->database->url);
1052                         client_fatal(cl);
1053                         return;
1054                 }
1055                 // We aren't expecting staggered output from target
1056                 // if (cs_more(t->link))
1057                 //    iochan_setevent(i, EVENT_INPUT);
1058             }
1059             else  // we throw away response and go to idle mode
1060             {
1061                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
1062                 cl->state = Client_Idle;
1063             }
1064         }
1065         /* if len==1 we do nothing but wait for more input */
1066     }
1067
1068     if (cl->state == Client_Connected) {
1069         send_init(i);
1070     }
1071
1072     if (cl->state == Client_Idle)
1073     {
1074         if (cl->requestid != se->requestid && cl->pquery) {
1075             send_search(i);
1076         }
1077         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
1078             cl->records < cl->hits) {
1079             send_present(i);
1080         }
1081     }
1082 }
1083
1084 // Disassociate connection from client
1085 static void connection_release(struct connection *co)
1086 {
1087     struct client *cl = co->client;
1088
1089     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
1090     if (!cl)
1091         return;
1092     cl->connection = 0;
1093     co->client = 0;
1094 }
1095
1096 // Close connection and recycle structure
1097 static void connection_destroy(struct connection *co)
1098 {
1099     struct host *h = co->host;
1100     cs_close(co->link);
1101     iochan_destroy(co->iochan);
1102
1103     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
1104     if (h->connections == co)
1105         h->connections = co->next;
1106     else
1107     {
1108         struct connection *pco;
1109         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
1110             ;
1111         if (pco)
1112             pco->next = co->next;
1113         else
1114             abort();
1115     }
1116     if (co->client)
1117     {
1118         if (co->client->state != Client_Idle)
1119             co->client->state = Client_Disconnected;
1120         co->client->connection = 0;
1121     }
1122     co->next = connection_freelist;
1123     connection_freelist = co;
1124 }
1125
1126 // Creates a new connection for client, associated with the host of 
1127 // client's database
1128 static struct connection *connection_create(struct client *cl)
1129 {
1130     struct connection *new;
1131     COMSTACK link; 
1132     int res;
1133     void *addr;
1134
1135
1136     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
1137         {
1138             yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
1139             exit(1);
1140         }
1141     
1142     if (0 == strlen(global_parameters.zproxy_override)){
1143         /* no Z39.50 proxy needed - direct connect */
1144         yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->database->url);
1145         
1146         if (!(addr = cs_straddr(link, cl->database->database->host->ipport)))
1147             {
1148                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1149                         "Lookup of IP address %s failed", 
1150                         cl->database->database->host->ipport);
1151                 return 0;
1152             }
1153     
1154     } else {
1155         /* Z39.50 proxy connect */
1156         yaz_log(YLOG_DEBUG, "Connection create %s proxy %s", 
1157                 cl->database->database->url, global_parameters.zproxy_override);
1158
1159         if (!(addr = cs_straddr(link, global_parameters.zproxy_override)))
1160             {
1161                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1162                         "Lookup of IP address %s failed", 
1163                         global_parameters.zproxy_override);
1164                 return 0;
1165             }
1166     }
1167
1168     res = cs_connect(link, addr);
1169     if (res < 0)
1170     {
1171         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->database->url);
1172         return 0;
1173     }
1174
1175     if ((new = connection_freelist))
1176         connection_freelist = new->next;
1177     else
1178     {
1179         new = xmalloc(sizeof (struct connection));
1180         new->ibuf = 0;
1181         new->ibufsize = 0;
1182     }
1183     new->state = Conn_Connecting;
1184     new->host = cl->database->database->host;
1185     new->next = new->host->connections;
1186     new->host->connections = new;
1187     new->client = cl;
1188     cl->connection = new;
1189     new->link = link;
1190
1191     new->iochan = iochan_create(cs_fileno(link), handler, 0);
1192     iochan_setdata(new->iochan, new);
1193     pazpar2_add_channel(new->iochan);
1194     return new;
1195 }
1196
1197 // Close connection and set state to error
1198 static void client_fatal(struct client *cl)
1199 {
1200     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->database->url);
1201     connection_destroy(cl->connection);
1202     cl->state = Client_Error;
1203 }
1204
1205 // Ensure that client has a connection associated
1206 static int client_prep_connection(struct client *cl)
1207 {
1208     struct connection *co;
1209     struct session *se = cl->session;
1210     struct host *host = cl->database->database->host;
1211
1212     co = cl->connection;
1213
1214     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->database->url);
1215
1216     if (!co)
1217     {
1218         // See if someone else has an idle connection
1219         // We should look at timestamps here to select the longest-idle connection
1220         for (co = host->connections; co; co = co->next)
1221             if (co->state == Conn_Open && (!co->client || co->client->session != se))
1222                 break;
1223         if (co)
1224         {
1225             connection_release(co);
1226             cl->connection = co;
1227             co->client = cl;
1228         }
1229         else
1230             co = connection_create(cl);
1231     }
1232     if (co)
1233     {
1234         if (co->state == Conn_Connecting)
1235         {
1236             cl->state = Client_Connecting;
1237             iochan_setflag(co->iochan, EVENT_OUTPUT);
1238         }
1239         else if (co->state == Conn_Open)
1240         {
1241             if (cl->state == Client_Error || cl->state == Client_Disconnected)
1242                 cl->state = Client_Idle;
1243             iochan_setflag(co->iochan, EVENT_OUTPUT);
1244         }
1245         return 1;
1246     }
1247     else
1248         return 0;
1249 }
1250
1251 // Initialize CCL map for a target
1252 static CCL_bibset prepare_cclmap(struct client *cl)
1253 {
1254     struct session_database *sdb = cl->database;
1255     struct setting *s;
1256     CCL_bibset res;
1257
1258     if (!sdb->settings)
1259         return 0;
1260     res = ccl_qual_mk();
1261     for (s = sdb->settings[PZ_CCLMAP]; s; s = s->next)
1262     {
1263         char *p = strchr(s->name + 3, ':');
1264         if (!p)
1265         {
1266             yaz_log(YLOG_WARN, "Malformed cclmap name: %s", s->name);
1267             ccl_qual_rm(&res);
1268             return 0;
1269         }
1270         p++;
1271         ccl_qual_fitem(res, s->value, p);
1272     }
1273     return res;
1274 }
1275
1276 // Parse the query given the settings specific to this client
1277 static int client_parse_query(struct client *cl, const char *query)
1278 {
1279     struct session *se = cl->session;
1280     struct ccl_rpn_node *cn;
1281     int cerror, cpos;
1282     CCL_bibset ccl_map = prepare_cclmap(cl);
1283
1284     if (!ccl_map)
1285         return -1;
1286     cn = ccl_find_str(ccl_map, query, &cerror, &cpos);
1287     ccl_qual_rm(&ccl_map);
1288     if (!cn)
1289     {
1290         cl->state = Client_Error;
1291         yaz_log(YLOG_WARN, "Failed to parse query for %s",
1292                          cl->database->database->url);
1293         return -1;
1294     }
1295     wrbuf_rewind(se->wrbuf);
1296     ccl_pquery(se->wrbuf, cn);
1297     wrbuf_putc(se->wrbuf, '\0');
1298     if (cl->pquery)
1299         xfree(cl->pquery);
1300     cl->pquery = xstrdup(wrbuf_buf(se->wrbuf));
1301
1302     if (!se->relevance)
1303     {
1304         // Initialize relevance structure with query terms
1305         char *p[512];
1306         extract_terms(se->nmem, cn, p);
1307         se->relevance = relevance_create(se->nmem, (const char **) p,
1308                 se->expected_maxrecs);
1309     }
1310
1311     ccl_rpn_delete(cn);
1312     return 0;
1313 }
1314
1315 static struct client *client_create(void)
1316 {
1317     struct client *r;
1318     if (client_freelist)
1319     {
1320         r = client_freelist;
1321         client_freelist = client_freelist->next;
1322     }
1323     else
1324         r = xmalloc(sizeof(struct client));
1325     r->pquery = 0;
1326     r->database = 0;
1327     r->connection = 0;
1328     r->session = 0;
1329     r->hits = 0;
1330     r->records = 0;
1331     r->setno = 0;
1332     r->requestid = -1;
1333     r->diagnostic = 0;
1334     r->state = Client_Disconnected;
1335     r->next = 0;
1336     return r;
1337 }
1338
1339 void client_destroy(struct client *c)
1340 {
1341     struct session *se = c->session;
1342     if (c == se->clients)
1343         se->clients = c->next;
1344     else
1345     {
1346         struct client *cc;
1347         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
1348             ;
1349         if (cc)
1350             cc->next = c->next;
1351     }
1352     if (c->connection)
1353         connection_release(c->connection);
1354     c->next = client_freelist;
1355     client_freelist = c;
1356 }
1357
1358 void session_set_watch(struct session *s, int what, session_watchfun fun, void *data)
1359 {
1360     s->watchlist[what].fun = fun;
1361     s->watchlist[what].data = data;
1362 }
1363
1364 void session_alert_watch(struct session *s, int what)
1365 {
1366     if (!s->watchlist[what].fun)
1367         return;
1368     (*s->watchlist[what].fun)(s->watchlist[what].data);
1369     s->watchlist[what].fun = 0;
1370     s->watchlist[what].data = 0;
1371 }
1372
1373 //callback for grep_databases
1374 static void select_targets_callback(void *context, struct session_database *db)
1375 {
1376     struct session *se = (struct session*) context;
1377     struct client *cl = client_create();
1378     cl->database = db;
1379     cl->session = se;
1380     cl->next = se->clients;
1381     se->clients = cl;
1382 }
1383
1384 // Associates a set of clients with a session;
1385 // Note: Session-databases represent databases with per-session setting overrides
1386 int select_targets(struct session *se, struct database_criterion *crit)
1387 {
1388     while (se->clients)
1389         client_destroy(se->clients);
1390
1391     return session_grep_databases(se, crit, select_targets_callback);
1392 }
1393
1394 int session_active_clients(struct session *s)
1395 {
1396     struct client *c;
1397     int res = 0;
1398
1399     for (c = s->clients; c; c = c->next)
1400         if (c->connection && (c->state == Client_Connecting ||
1401                     c->state == Client_Initializing ||
1402                     c->state == Client_Searching ||
1403                     c->state == Client_Presenting))
1404             res++;
1405
1406     return res;
1407 }
1408
1409 // parses crit1=val1,crit2=val2|val3,...
1410 static struct database_criterion *parse_filter(NMEM m, const char *buf)
1411 {
1412     struct database_criterion *res = 0;
1413     char **values;
1414     int num;
1415     int i;
1416
1417     if (!buf || !*buf)
1418         return 0;
1419     nmem_strsplit(m, ",", buf,  &values, &num);
1420     for (i = 0; i < num; i++)
1421     {
1422         char **subvalues;
1423         int subnum;
1424         int subi;
1425         struct database_criterion *new = nmem_malloc(m, sizeof(*new));
1426         char *eq = strchr(values[i], '=');
1427         if (!eq)
1428         {
1429             yaz_log(YLOG_WARN, "Missing equal-sign in filter");
1430             return 0;
1431         }
1432         *(eq++) = '\0';
1433         new->name = values[i];
1434         nmem_strsplit(m, "|", eq, &subvalues, &subnum);
1435         new->values = 0;
1436         for (subi = 0; subi < subnum; subi++)
1437         {
1438             struct database_criterion_value *newv = nmem_malloc(m, sizeof(*newv));
1439             newv->value = subvalues[subi];
1440             newv->next = new->values;
1441             new->values = newv;
1442         }
1443         new->next = res;
1444         res = new;
1445     }
1446     return res;
1447 }
1448
1449 char *search(struct session *se, char *query, char *filter)
1450 {
1451     int live_channels = 0;
1452     struct client *cl;
1453     struct database_criterion *criteria;
1454
1455     yaz_log(YLOG_DEBUG, "Search");
1456
1457     nmem_reset(se->nmem);
1458     criteria = parse_filter(se->nmem, filter);
1459     se->requestid++;
1460     live_channels = select_targets(se, criteria);
1461     if (live_channels)
1462     {
1463         int maxrecs = live_channels * global_parameters.toget;
1464         se->num_termlists = 0;
1465         se->reclist = reclist_create(se->nmem, maxrecs);
1466         // This will be initialized in send_search()
1467         se->total_records = se->total_hits = se->total_merged = 0;
1468         se->expected_maxrecs = maxrecs;
1469     }
1470     else
1471         return "NOTARGETS";
1472
1473     se->relevance = 0;
1474     for (cl = se->clients; cl; cl = cl->next)
1475         if (client_parse_query(cl, query) < 0)  // Query must parse for all targets
1476             return "QUERY";
1477     for (cl = se->clients; cl; cl = cl->next)
1478         client_prep_connection(cl);
1479
1480     return 0;
1481 }
1482
1483 // Apply a session override to a database
1484 void session_apply_setting(struct session *se, char *dbname, char *setting, char *value)
1485 {
1486     struct session_database *sdb;
1487
1488     for (sdb = se->databases; sdb; sdb = sdb->next)
1489         if (!strcmp(dbname, sdb->database->url))
1490         {
1491             struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
1492             int offset = settings_offset(setting);
1493
1494             if (offset < 0)
1495             {
1496                 yaz_log(YLOG_WARN, "Unknown setting %s", setting);
1497                 return;
1498             }
1499             new->precedence = 0;
1500             new->target = dbname;
1501             new->name = setting;
1502             new->value = value;
1503             new->next = sdb->settings[offset];
1504             sdb->settings[offset] = new;
1505             break;
1506         }
1507     if (!sdb)
1508         yaz_log(YLOG_WARN, "Unknown database in setting override: %s", dbname);
1509 }
1510
1511 void session_init_databases_fun(void *context, struct database *db)
1512 {
1513     struct session *se = (struct session *) context;
1514     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
1515     int num = settings_num();
1516     int i;
1517
1518     new->database = db;
1519     new->settings = nmem_malloc(se->session_nmem, sizeof(struct settings *) * num);
1520     for (i = 0; i < num; i++)
1521         new->settings[i] = db->settings[i];
1522     new->next = se->databases;
1523     se->databases = new;
1524 }
1525
1526 // Initialize session_database list -- this represents this session's view
1527 // of the database list -- subject to modification by the settings ws command
1528 void session_init_databases(struct session *se)
1529 {
1530     se->databases = 0;
1531     grep_databases(se, 0, session_init_databases_fun);
1532 }
1533
1534 void destroy_session(struct session *s)
1535 {
1536     yaz_log(YLOG_LOG, "Destroying session");
1537     while (s->clients)
1538         client_destroy(s->clients);
1539     nmem_destroy(s->nmem);
1540     wrbuf_destroy(s->wrbuf);
1541 }
1542
1543 struct session *new_session(NMEM nmem) 
1544 {
1545     int i;
1546     struct session *session = nmem_malloc(nmem, sizeof(*session));
1547
1548     yaz_log(YLOG_DEBUG, "New Pazpar2 session");
1549     
1550     session->total_hits = 0;
1551     session->total_records = 0;
1552     session->num_termlists = 0;
1553     session->reclist = 0;
1554     session->requestid = -1;
1555     session->clients = 0;
1556     session->expected_maxrecs = 0;
1557     session->session_nmem = nmem;
1558     session->nmem = nmem_create();
1559     session->wrbuf = wrbuf_alloc();
1560     session_init_databases(session);
1561     for (i = 0; i <= SESSION_WATCH_MAX; i++)
1562     {
1563         session->watchlist[i].data = 0;
1564         session->watchlist[i].fun = 0;
1565     }
1566
1567     return session;
1568 }
1569
1570 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1571 {
1572     static struct hitsbytarget res[1000]; // FIXME MM
1573     struct client *cl;
1574
1575     *count = 0;
1576     for (cl = se->clients; cl; cl = cl->next)
1577     {
1578         char *name = session_setting_oneval(cl->database, PZ_NAME);
1579
1580         res[*count].id = cl->database->database->url;
1581         res[*count].name = *name ? name : "Unknown";
1582         res[*count].hits = cl->hits;
1583         res[*count].records = cl->records;
1584         res[*count].diagnostic = cl->diagnostic;
1585         res[*count].state = client_states[cl->state];
1586         res[*count].connected  = cl->connection ? 1 : 0;
1587         (*count)++;
1588     }
1589
1590     return res;
1591 }
1592
1593 struct termlist_score **termlist(struct session *s, const char *name, int *num)
1594 {
1595     int i;
1596
1597     for (i = 0; i < s->num_termlists; i++)
1598         if (!strcmp((const char *) s->termlists[i].name, name))
1599             return termlist_highscore(s->termlists[i].termlist, num);
1600     return 0;
1601 }
1602
1603 #ifdef MISSING_HEADERS
1604 void report_nmem_stats(void)
1605 {
1606     size_t in_use, is_free;
1607
1608     nmem_get_memory_in_use(&in_use);
1609     nmem_get_memory_free(&is_free);
1610
1611     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
1612             (long) in_use, (long) is_free);
1613 }
1614 #endif
1615
1616 struct record_cluster *show_single(struct session *s, int id)
1617 {
1618     struct record_cluster *r;
1619
1620     reclist_rewind(s->reclist);
1621     while ((r = reclist_read_record(s->reclist)))
1622         if (r->recid == id)
1623             return r;
1624     return 0;
1625 }
1626
1627 struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, int start,
1628         int *num, int *total, int *sumhits, NMEM nmem_show)
1629 {
1630     struct record_cluster **recs = nmem_malloc(nmem_show, *num 
1631                                        * sizeof(struct record_cluster *));
1632     struct reclist_sortparms *spp;
1633     int i;
1634 #if USE_TIMING    
1635     yaz_timing_t t = yaz_timing_create();
1636 #endif
1637
1638     for (spp = sp; spp; spp = spp->next)
1639         if (spp->type == Metadata_sortkey_relevance)
1640         {
1641             relevance_prepare_read(s->relevance, s->reclist);
1642             break;
1643         }
1644     reclist_sort(s->reclist, sp);
1645
1646     *total = s->reclist->num_records;
1647     *sumhits = s->total_hits;
1648
1649     for (i = 0; i < start; i++)
1650         if (!reclist_read_record(s->reclist))
1651         {
1652             *num = 0;
1653             recs = 0;
1654             break;
1655         }
1656
1657     for (i = 0; i < *num; i++)
1658     {
1659         struct record_cluster *r = reclist_read_record(s->reclist);
1660         if (!r)
1661         {
1662             *num = i;
1663             break;
1664         }
1665         recs[i] = r;
1666     }
1667 #if USE_TIMING
1668     yaz_timing_stop(t);
1669     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
1670             yaz_timing_get_real(t), yaz_timing_get_user(t),
1671             yaz_timing_get_sys(t));
1672     yaz_timing_destroy(&t);
1673 #endif
1674     return recs;
1675 }
1676
1677 void statistics(struct session *se, struct statistics *stat)
1678 {
1679     struct client *cl;
1680     int count = 0;
1681
1682     memset(stat, 0, sizeof(*stat));
1683     for (cl = se->clients; cl; cl = cl->next)
1684     {
1685         if (!cl->connection)
1686             stat->num_no_connection++;
1687         switch (cl->state)
1688         {
1689             case Client_Connecting: stat->num_connecting++; break;
1690             case Client_Initializing: stat->num_initializing++; break;
1691             case Client_Searching: stat->num_searching++; break;
1692             case Client_Presenting: stat->num_presenting++; break;
1693             case Client_Idle: stat->num_idle++; break;
1694             case Client_Failed: stat->num_failed++; break;
1695             case Client_Error: stat->num_error++; break;
1696             default: break;
1697         }
1698         count++;
1699     }
1700     stat->num_hits = se->total_hits;
1701     stat->num_records = se->total_records;
1702
1703     stat->num_clients = count;
1704 }
1705
1706 void start_http_listener(void)
1707 {
1708     char hp[128] = "";
1709     struct conf_server *ser = global_parameters.server;
1710
1711     if (*global_parameters.listener_override)
1712         strcpy(hp, global_parameters.listener_override);
1713     else
1714     {
1715         strcpy(hp, ser->host ? ser->host : "");
1716         if (ser->port)
1717         {
1718             if (*hp)
1719                 strcat(hp, ":");
1720             sprintf(hp + strlen(hp), "%d", ser->port);
1721         }
1722     }
1723     http_init(hp);
1724 }
1725
1726 void start_proxy(void)
1727 {
1728     char hp[128] = "";
1729     struct conf_server *ser = global_parameters.server;
1730
1731     if (*global_parameters.proxy_override)
1732         strcpy(hp, global_parameters.proxy_override);
1733     else if (ser->proxy_host || ser->proxy_port)
1734     {
1735         strcpy(hp, ser->proxy_host ? ser->proxy_host : "");
1736         if (ser->proxy_port)
1737         {
1738             if (*hp)
1739                 strcat(hp, ":");
1740             sprintf(hp + strlen(hp), "%d", ser->proxy_port);
1741         }
1742     }
1743     else
1744         return;
1745
1746     http_set_proxyaddr(hp, ser->myurl ? ser->myurl : "");
1747 }
1748
1749 void start_zproxy(void)
1750 {
1751     struct conf_server *ser = global_parameters.server;
1752
1753     if (*global_parameters.zproxy_override){
1754         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1755                 global_parameters.zproxy_override);
1756         return;
1757     }
1758
1759     else if (ser->zproxy_host || ser->zproxy_port)
1760     {
1761         char hp[128] = "";
1762
1763         strcpy(hp, ser->zproxy_host ? ser->zproxy_host : "");
1764         if (ser->zproxy_port)
1765         {
1766             if (*hp)
1767                 strcat(hp, ":");
1768             else
1769                 strcat(hp, "@:");
1770
1771             sprintf(hp + strlen(hp), "%d", ser->zproxy_port);
1772         }
1773         strcpy(global_parameters.zproxy_override, hp);
1774         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1775                 global_parameters.zproxy_override);
1776
1777     }
1778     else
1779         return;
1780 }
1781
1782 // Master list of connections we're handling events to
1783 static IOCHAN channel_list = 0; 
1784 void pazpar2_add_channel(IOCHAN chan)
1785 {
1786     chan->next = channel_list;
1787     channel_list = chan;
1788 }
1789
1790 void pazpar2_event_loop()
1791 {
1792     event_loop(&channel_list);
1793 }
1794
1795 /*
1796  * Local variables:
1797  * c-basic-offset: 4
1798  * indent-tabs-mode: nil
1799  * End:
1800  * vim: shiftwidth=4 tabstop=8 expandtab
1801  */