c581f2d9f1b43591de8881313db286ba8b64abab
[pazpar2-moved-to-github.git] / src / logic.c
1 /* $Id: logic.c,v 1.9 2007-04-18 19:45:09 quinn Exp $
2    Copyright (c) 2006-2007, Index Data.
3
4 This file is part of Pazpar2.
5
6 Pazpar2 is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Pazpar2; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <string.h>
25 #include <sys/time.h>
26 #include <unistd.h>
27 #include <sys/socket.h>
28 #include <netdb.h>
29 #include <signal.h>
30 #include <ctype.h>
31 #include <assert.h>
32
33 #include <yaz/marcdisp.h>
34 #include <yaz/comstack.h>
35 #include <yaz/tcpip.h>
36 #include <yaz/proto.h>
37 #include <yaz/readconf.h>
38 #include <yaz/pquery.h>
39 #include <yaz/otherinfo.h>
40 #include <yaz/yaz-util.h>
41 #include <yaz/nmem.h>
42 #include <yaz/query-charset.h>
43 #include <yaz/querytowrbuf.h>
44 #if YAZ_VERSIONL >= 0x020163
45 #include <yaz/oid_db.h>
46 #endif
47
48 #if HAVE_CONFIG_H
49 #include "cconfig.h"
50 #endif
51
52 #define USE_TIMING 0
53 #if USE_TIMING
54 #include <yaz/timing.h>
55 #endif
56
57 #include <netinet/in.h>
58
59 #include "pazpar2.h"
60 #include "eventl.h"
61 #include "http.h"
62 #include "termlists.h"
63 #include "reclists.h"
64 #include "relevance.h"
65 #include "config.h"
66 #include "database.h"
67 #include "settings.h"
68
69 #define MAX_CHUNK 15
70
71 static void client_fatal(struct client *cl);
72 static void connection_destroy(struct connection *co);
73 static int client_prep_connection(struct client *cl);
74 static void ingest_records(struct client *cl, Z_Records *r);
75 void session_alert_watch(struct session *s, int what);
76
77 IOCHAN channel_list = 0;  // Master list of connections we're handling events to
78
79 static struct connection *connection_freelist = 0;
80 static struct client *client_freelist = 0;
81
82 static char *client_states[] = {
83     "Client_Connecting",
84     "Client_Connected",
85     "Client_Idle",
86     "Client_Initializing",
87     "Client_Searching",
88     "Client_Presenting",
89     "Client_Error",
90     "Client_Failed",
91     "Client_Disconnected",
92     "Client_Stopped"
93 };
94
95 // Note: Some things in this structure will eventually move to configuration
96 struct parameters global_parameters = 
97 {
98     "",
99     "",
100     "",
101     "",
102     0,
103     0,
104     30,
105     "81",
106     "Index Data PazPar2",
107     VERSION,
108     600, // 10 minutes
109     60,
110     100,
111     MAX_CHUNK,
112     0,
113     0
114 };
115
116 static int send_apdu(struct client *c, Z_APDU *a)
117 {
118     struct connection *co = c->connection;
119     char *buf;
120     int len, r;
121
122     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
123     {
124         odr_perror(global_parameters.odr_out, "Encoding APDU");
125         abort();
126     }
127     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
128     r = cs_put(co->link, buf, len);
129     if (r < 0)
130     {
131         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
132         return -1;
133     }
134     else if (r == 1)
135     {
136         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
137         exit(1);
138     }
139     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
140     co->state = Conn_Waiting;
141     return 0;
142 }
143
144 // Set authentication token in init if one is set for the client
145 // TODO: Extend this to handle other schemes than open (should be simple)
146 static void init_authentication(struct client *cl, Z_InitRequest *req)
147 {
148     struct session_database *sdb = cl->database;
149     char *auth = session_setting_oneval(sdb, PZ_AUTHENTICATION);
150
151     if (auth)
152     {
153         Z_IdAuthentication *idAuth = odr_malloc(global_parameters.odr_out,
154                 sizeof(*idAuth));
155         idAuth->which = Z_IdAuthentication_open;
156         idAuth->u.open = auth;
157         req->idAuthentication = idAuth;
158     }
159 }
160
161 static void send_init(IOCHAN i)
162 {
163
164     struct connection *co = iochan_getdata(i);
165     struct client *cl = co->client;
166     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
167
168     a->u.initRequest->implementationId = global_parameters.implementationId;
169     a->u.initRequest->implementationName = global_parameters.implementationName;
170     a->u.initRequest->implementationVersion =
171         global_parameters.implementationVersion;
172     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
173     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
174     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
175
176     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
177     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
178     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
179
180     init_authentication(cl, a->u.initRequest);
181
182     /* add virtual host if tunneling through Z39.50 proxy */
183     
184     if (0 < strlen(global_parameters.zproxy_override) 
185         && 0 < strlen(cl->database->database->url))
186     {
187 #if YAZ_VERSIONL >= 0x020163
188         yaz_oi_set_string_oid(&a->u.initRequest->otherInfo,
189                               global_parameters.odr_out,
190                               yaz_oid_userinfo_proxy,
191                               1, cl->database->database->url);
192 #else
193         yaz_oi_set_string_oidval(&a->u.initRequest->otherInfo,
194                                  global_parameters.odr_out, VAL_PROXY,
195                                  1, cl->database->database->url);
196 #endif
197     }
198
199     if (send_apdu(cl, a) >= 0)
200     {
201         iochan_setflags(i, EVENT_INPUT);
202         cl->state = Client_Initializing;
203     }
204     else
205         cl->state = Client_Error;
206     odr_reset(global_parameters.odr_out);
207 }
208
209 // Recursively traverse query structure to extract terms.
210 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
211 {
212     char **words;
213     int numwords;
214     int i;
215
216     switch (n->kind)
217     {
218         case CCL_RPN_AND:
219         case CCL_RPN_OR:
220         case CCL_RPN_NOT:
221         case CCL_RPN_PROX:
222             pull_terms(nmem, n->u.p[0], termlist, num);
223             pull_terms(nmem, n->u.p[1], termlist, num);
224             break;
225         case CCL_RPN_TERM:
226             nmem_strsplit(nmem, " ", n->u.t.term, &words, &numwords);
227             for (i = 0; i < numwords; i++)
228                 termlist[(*num)++] = words[i];
229             break;
230         default: // NOOP
231             break;
232     }
233 }
234
235 // Extract terms from query into null-terminated termlist
236 static void extract_terms(NMEM nmem, struct ccl_rpn_node *query, char **termlist)
237 {
238     int num = 0;
239
240     pull_terms(nmem, query, termlist, &num);
241     termlist[num] = 0;
242 }
243
244 static void send_search(IOCHAN i)
245 {
246     struct connection *co = iochan_getdata(i);
247     struct client *cl = co->client; 
248     struct session *se = cl->session;
249     struct session_database *sdb = cl->database;
250     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
251     int ndb;
252     char **databaselist;
253     Z_Query *zquery;
254     int ssub = 0, lslb = 100000, mspn = 10;
255     char *recsyn = 0;
256     char *piggyback = 0;
257     char *queryenc = 0;
258     yaz_iconv_t iconv = 0;
259
260     yaz_log(YLOG_DEBUG, "Sending search to %s", cl->database->database->url);
261
262     // constructing RPN query
263     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
264             sizeof(Z_Query));
265     zquery->which = Z_Query_type_1;
266     zquery->u.type_1 = p_query_rpn(global_parameters.odr_out, cl->pquery);
267
268     // converting to target encoding
269     if ((queryenc = session_setting_oneval(sdb, PZ_QUERYENCODING))){
270         iconv = yaz_iconv_open(queryenc, "UTF-8");
271         if (iconv){
272             yaz_query_charset_convert_rpnquery(zquery->u.type_1, 
273                                                global_parameters.odr_out, 
274                                                iconv);
275             yaz_iconv_close(iconv);
276         } else
277             yaz_log(YLOG_WARN, "Query encoding failed %s %s", 
278                     cl->database->database->url, queryenc);
279     }
280
281     for (ndb = 0; sdb->database->databases[ndb]; ndb++)
282         ;
283     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
284     for (ndb = 0; sdb->database->databases[ndb]; ndb++)
285         databaselist[ndb] = sdb->database->databases[ndb];
286
287     if (!(piggyback = session_setting_oneval(sdb, PZ_PIGGYBACK)) || *piggyback == '1')
288     {
289         if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
290         {
291 #if YAZ_VERSIONL >= 0x020163
292             a->u.searchRequest->preferredRecordSyntax =
293                 yaz_string_to_oid_odr(yaz_oid_std(),
294                                       CLASS_RECSYN, recsyn,
295                                       global_parameters.odr_out);
296 #else
297             a->u.searchRequest->preferredRecordSyntax =
298                 yaz_str_to_z3950oid(global_parameters.odr_out,
299                                     CLASS_RECSYN, recsyn);
300 #endif
301         }
302         a->u.searchRequest->smallSetUpperBound = &ssub;
303         a->u.searchRequest->largeSetLowerBound = &lslb;
304         a->u.searchRequest->mediumSetPresentNumber = &mspn;
305     }
306     a->u.searchRequest->resultSetName = "Default";
307     a->u.searchRequest->databaseNames = databaselist;
308     a->u.searchRequest->num_databaseNames = ndb;
309
310     
311     {  //scope for sending and logging queries 
312         WRBUF wbquery = wrbuf_alloc();
313         yaz_query_to_wrbuf(wbquery, zquery);
314
315
316         if (send_apdu(cl, a) >= 0)
317             {
318                 iochan_setflags(i, EVENT_INPUT);
319                 cl->state = Client_Searching;
320                 cl->requestid = se->requestid;
321                 yaz_log(YLOG_LOG, "SearchRequest %s %s %s", 
322                          cl->database->database->url,
323                         queryenc ? queryenc : "UTF-8",
324                         wrbuf_cstr(wbquery));
325             }
326         else {
327             cl->state = Client_Error;
328                 yaz_log(YLOG_WARN, "Failed SearchRequest %s  %s %s", 
329                          cl->database->database->url, 
330                         queryenc ? queryenc : "UTF-8",
331                         wrbuf_cstr(wbquery));
332         }
333         
334         wrbuf_destroy(wbquery);
335     }    
336
337     odr_reset(global_parameters.odr_out);
338 }
339
340 static void send_present(IOCHAN i)
341 {
342     struct connection *co = iochan_getdata(i);
343     struct client *cl = co->client; 
344     struct session_database *sdb = cl->database;
345     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
346     int toget;
347     int start = cl->records + 1;
348     char *recsyn;
349
350     toget = global_parameters.chunk;
351     if (toget > global_parameters.toget - cl->records)
352         toget = global_parameters.toget - cl->records;
353     if (toget > cl->hits - cl->records)
354         toget = cl->hits - cl->records;
355
356     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
357
358     a->u.presentRequest->resultSetStartPoint = &start;
359     a->u.presentRequest->numberOfRecordsRequested = &toget;
360
361     a->u.presentRequest->resultSetId = "Default";
362
363     if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
364     {
365 #if YAZ_VERSIONL >= 0x020163
366         a->u.presentRequest->preferredRecordSyntax =
367             yaz_string_to_oid_odr(yaz_oid_std(),
368                                   CLASS_RECSYN, recsyn,
369                                   global_parameters.odr_out);
370 #else
371         a->u.presentRequest->preferredRecordSyntax =
372             yaz_str_to_z3950oid(global_parameters.odr_out,
373                                 CLASS_RECSYN, recsyn);
374 #endif
375     }
376
377     if (send_apdu(cl, a) >= 0)
378     {
379         iochan_setflags(i, EVENT_INPUT);
380         cl->state = Client_Presenting;
381     }
382     else
383         cl->state = Client_Error;
384     odr_reset(global_parameters.odr_out);
385 }
386
387 static void do_initResponse(IOCHAN i, Z_APDU *a)
388 {
389     struct connection *co = iochan_getdata(i);
390     struct client *cl = co->client;
391     Z_InitResponse *r = a->u.initResponse;
392
393     yaz_log(YLOG_DEBUG, "Init response %s", cl->database->database->url);
394
395     if (*r->result)
396     {
397         cl->state = Client_Idle;
398     }
399     else
400         cl->state = Client_Failed; // FIXME need to do something to the connection
401 }
402
403 static void do_searchResponse(IOCHAN i, Z_APDU *a)
404 {
405     struct connection *co = iochan_getdata(i);
406     struct client *cl = co->client;
407     struct session *se = cl->session;
408     Z_SearchResponse *r = a->u.searchResponse;
409
410     yaz_log(YLOG_DEBUG, "Search response %s (status=%d)", 
411             cl->database->database->url, *r->searchStatus);
412
413     if (*r->searchStatus)
414     {
415         cl->hits = *r->resultCount;
416         se->total_hits += cl->hits;
417         if (r->presentStatus && !*r->presentStatus && r->records)
418         {
419             yaz_log(YLOG_DEBUG, "Records in search response %s", 
420                     cl->database->database->url);
421             ingest_records(cl, r->records);
422         }
423         cl->state = Client_Idle;
424     }
425     else
426     {          /*"FAILED"*/
427         cl->hits = 0;
428         cl->state = Client_Error;
429         if (r->records) {
430             Z_Records *recs = r->records;
431             if (recs->which == Z_Records_NSD)
432             {
433                 yaz_log(YLOG_WARN, 
434                         "Search response: Non-surrogate diagnostic %s",
435                         cl->database->database->url);
436                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
437                 cl->state = Client_Error;
438             }
439         }
440     }
441 }
442
443 static void do_closeResponse(IOCHAN i, Z_APDU *a)
444 {
445     struct connection *co = iochan_getdata(i);
446     struct client *cl = co->client;
447     /* Z_Close *r = a->u.close; */
448
449     yaz_log(YLOG_WARN, "Close response %s", cl->database->database->url);
450
451     cl->state = Client_Failed;
452     connection_destroy(co);
453 }
454
455
456 char *normalize_mergekey(char *buf, int skiparticle)
457 {
458     char *p = buf, *pout = buf;
459
460     if (skiparticle)
461     {
462         char firstword[64];
463         char articles[] = "the den der die des an a "; // must end in space
464
465         while (*p && !isalnum(*p))
466             p++;
467         pout = firstword;
468         while (*p && *p != ' ' && pout - firstword < 62)
469             *(pout++) = tolower(*(p++));
470         *(pout++) = ' ';
471         *(pout++) = '\0';
472         if (!strstr(articles, firstword))
473             p = buf;
474         pout = buf;
475     }
476
477     while (*p)
478     {
479         while (*p && !isalnum(*p))
480             p++;
481         while (isalnum(*p))
482             *(pout++) = tolower(*(p++));
483         if (*p)
484             *(pout++) = ' ';
485         while (*p && !isalnum(*p))
486             p++;
487     }
488     if (buf != pout)
489         do {
490             *(pout--) = '\0';
491         }
492         while (pout > buf && *pout == ' ');
493
494     return buf;
495 }
496
497 static void add_facet(struct session *s, const char *type, const char *value)
498 {
499     int i;
500
501     if (!*value)
502         return;
503     for (i = 0; i < s->num_termlists; i++)
504         if (!strcmp(s->termlists[i].name, type))
505             break;
506     if (i == s->num_termlists)
507     {
508         if (i == SESSION_MAX_TERMLISTS)
509         {
510             yaz_log(YLOG_FATAL, "Too many termlists");
511             exit(1);
512         }
513         s->termlists[i].name = nmem_strdup(s->nmem, type);
514         s->termlists[i].termlist = termlist_create(s->nmem, s->expected_maxrecs, 15);
515         s->num_termlists = i + 1;
516     }
517     termlist_insert(s->termlists[i].termlist, value);
518 }
519
520 static xmlDoc *normalize_record(struct client *cl, Z_External *rec)
521 {
522     struct database_retrievalmap *m;
523     struct database *db = cl->database->database;
524     xmlNode *res;
525     xmlDoc *rdoc;
526
527     // First normalize to XML
528     if (db->yaz_marc)
529     {
530         char *buf;
531         int len;
532         if (rec->which != Z_External_octet)
533         {
534             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER %s",
535                     cl->database->database->url);
536             return 0;
537         }
538         buf = (char*) rec->u.octet_aligned->buf;
539         len = rec->u.octet_aligned->len;
540         if (yaz_marc_read_iso2709(db->yaz_marc, buf, len) < 0)
541         {
542             yaz_log(YLOG_WARN, "Failed to decode MARC %s",
543                     cl->database->database->url);
544             return 0;
545         }
546
547         yaz_marc_write_using_libxml2(db->yaz_marc, 1);
548         if (yaz_marc_write_xml(db->yaz_marc, &res,
549                     "http://www.loc.gov/MARC21/slim", 0, 0) < 0)
550         {
551             yaz_log(YLOG_WARN, "Failed to encode as XML %s",
552                     cl->database->database->url);
553             return 0;
554         }
555         rdoc = xmlNewDoc((xmlChar *) "1.0");
556         xmlDocSetRootElement(rdoc, res);
557
558     }
559     else
560     {
561         yaz_log(YLOG_FATAL, 
562                 "Unknown native_syntax in normalize_record from %s",
563                 cl->database->database->url);
564         exit(1);
565     }
566
567     if (global_parameters.dump_records){
568         fprintf(stderr, 
569                 "Input Record (normalized) from %s\n----------------\n",
570                 cl->database->database->url);
571 #if LIBXML_VERSION >= 20600
572         xmlDocFormatDump(stderr, rdoc, 1);
573 #else
574         xmlDocDump(stderr, rdoc);
575 #endif
576     }
577
578     for (m = db->map; m; m = m->next){
579         xmlDoc *new = 0;
580
581 #if 1
582         {
583             xmlNodePtr root = 0;
584             new = xsltApplyStylesheet(m->stylesheet, rdoc, 0);
585             root= xmlDocGetRootElement(new);
586         if (!new || !root || !(root->children))
587         {
588             yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
589                     cl->database->database->url);
590             xmlFreeDoc(new);
591             xmlFreeDoc(rdoc);
592             return 0;
593         }
594         }
595 #endif
596
597 #if 0
598         // do it another way to detect transformation errors right now
599         // but does not seem to work either!
600         {
601             xsltTransformContextPtr ctxt;
602             ctxt = xsltNewTransformContext(m->stylesheet, rdoc);
603             new = xsltApplyStylesheetUser(m->stylesheet, rdoc, 0, 0, 0, ctxt);
604             if ((ctxt->state == XSLT_STATE_ERROR) ||
605                 (ctxt->state == XSLT_STATE_STOPPED)){
606                 yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
607                         cl->database->database->url);
608                 xmlFreeDoc(new);
609                 xmlFreeDoc(rdoc);
610                 return 0;
611             }
612         }
613 #endif      
614    
615         xmlFreeDoc(rdoc);
616         rdoc = new;
617     }
618     if (global_parameters.dump_records)
619     {
620         fprintf(stderr, "Record from %s\n----------------\n", 
621                 cl->database->database->url);
622 #if LIBXML_VERSION >= 20600
623         xmlDocFormatDump(stderr, rdoc, 1);
624 #else
625         xmlDocDump(stderr, rdoc);
626 #endif
627     }
628     return rdoc;
629 }
630
631 // Extract what appears to be years from buf, storing highest and
632 // lowest values.
633 static int extract_years(const char *buf, int *first, int *last)
634 {
635     *first = -1;
636     *last = -1;
637     while (*buf)
638     {
639         const char *e;
640         int len;
641
642         while (*buf && !isdigit(*buf))
643             buf++;
644         len = 0;
645         for (e = buf; *e && isdigit(*e); e++)
646             len++;
647         if (len == 4)
648         {
649             int value = atoi(buf);
650             if (*first < 0 || value < *first)
651                 *first = value;
652             if (*last < 0 || value > *last)
653                 *last = value;
654         }
655         buf = e;
656     }
657     return *first;
658 }
659
660 static struct record *ingest_record(struct client *cl, Z_External *rec)
661 {
662     xmlDoc *xdoc = normalize_record(cl, rec);
663     xmlNode *root, *n;
664     struct record *res;
665     struct record_cluster *cluster;
666     struct session *se = cl->session;
667     xmlChar *mergekey, *mergekey_norm;
668     xmlChar *type = 0;
669     xmlChar *value = 0;
670     struct conf_service *service = global_parameters.server->service;
671
672     if (!xdoc)
673         return 0;
674
675     root = xmlDocGetRootElement(xdoc);
676     if (!(mergekey = xmlGetProp(root, (xmlChar *) "mergekey")))
677     {
678         yaz_log(YLOG_WARN, "No mergekey found in record");
679         xmlFreeDoc(xdoc);
680         return 0;
681     }
682
683     res = nmem_malloc(se->nmem, sizeof(struct record));
684     res->next = 0;
685     res->client = cl;
686     res->metadata = nmem_malloc(se->nmem,
687             sizeof(struct record_metadata*) * service->num_metadata);
688     memset(res->metadata, 0, sizeof(struct record_metadata*) * service->num_metadata);
689
690     mergekey_norm = (xmlChar *) nmem_strdup(se->nmem, (char*) mergekey);
691     xmlFree(mergekey);
692     normalize_mergekey((char *) mergekey_norm, 0);
693
694     cluster = reclist_insert(se->reclist, 
695                              global_parameters.server->service, 
696                              res, (char *) mergekey_norm, 
697                              &se->total_merged);
698     if (global_parameters.dump_records)
699         yaz_log(YLOG_LOG, "Cluster id %d from %s (#%d)", cluster->recid,
700                 cl->database->database->url, cl->records);
701     if (!cluster)
702     {
703         /* no room for record */
704         xmlFreeDoc(xdoc);
705         return 0;
706     }
707     relevance_newrec(se->relevance, cluster);
708
709     for (n = root->children; n; n = n->next)
710     {
711         if (type)
712             xmlFree(type);
713         if (value)
714             xmlFree(value);
715         type = value = 0;
716
717         if (n->type != XML_ELEMENT_NODE)
718             continue;
719         if (!strcmp((const char *) n->name, "metadata"))
720         {
721             struct conf_metadata *md = 0;
722             struct conf_sortkey *sk = 0;
723             struct record_metadata **wheretoput, *newm;
724             int imeta;
725             int first, last;
726
727             type = xmlGetProp(n, (xmlChar *) "type");
728             value = xmlNodeListGetString(xdoc, n->children, 0);
729
730             if (!type || !value)
731                 continue;
732
733             // First, find out what field we're looking at
734             for (imeta = 0; imeta < service->num_metadata; imeta++)
735                 if (!strcmp((const char *) type, service->metadata[imeta].name))
736                 {
737                     md = &service->metadata[imeta];
738                     if (md->sortkey_offset >= 0)
739                         sk = &service->sortkeys[md->sortkey_offset];
740                     break;
741                 }
742             if (!md)
743             {
744                 yaz_log(YLOG_WARN, "Ignoring unknown metadata element: %s", type);
745                 continue;
746             }
747
748             // Find out where we are putting it
749             if (md->merge == Metadata_merge_no)
750                 wheretoput = &res->metadata[imeta];
751             else
752                 wheretoput = &cluster->metadata[imeta];
753             
754             // Put it there
755             newm = nmem_malloc(se->nmem, sizeof(struct record_metadata));
756             newm->next = 0;
757             if (md->type == Metadata_type_generic)
758             {
759                 char *p, *pe;
760                 for (p = (char *) value; *p && isspace(*p); p++)
761                     ;
762                 for (pe = p + strlen(p) - 1;
763                         pe > p && strchr(" ,/.:([", *pe); pe--)
764                     *pe = '\0';
765                 newm->data.text = nmem_strdup(se->nmem, p);
766
767             }
768             else if (md->type == Metadata_type_year)
769             {
770                 if (extract_years((char *) value, &first, &last) < 0)
771                     continue;
772             }
773             else
774             {
775                 yaz_log(YLOG_WARN, "Unknown type in metadata element %s", type);
776                 continue;
777             }
778             if (md->type == Metadata_type_year && md->merge != Metadata_merge_range)
779             {
780                 yaz_log(YLOG_WARN, "Only range merging supported for years");
781                 continue;
782             }
783             if (md->merge == Metadata_merge_unique)
784             {
785                 struct record_metadata *mnode;
786                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
787                     if (!strcmp((const char *) mnode->data.text, newm->data.text))
788                         break;
789                 if (!mnode)
790                 {
791                     newm->next = *wheretoput;
792                     *wheretoput = newm;
793                 }
794             }
795             else if (md->merge == Metadata_merge_longest)
796             {
797                 if (!*wheretoput ||
798                         strlen(newm->data.text) > strlen((*wheretoput)->data.text))
799                 {
800                     *wheretoput = newm;
801                     if (sk)
802                     {
803                         char *s = nmem_strdup(se->nmem, newm->data.text);
804                         if (!cluster->sortkeys[md->sortkey_offset])
805                             cluster->sortkeys[md->sortkey_offset] = 
806                                 nmem_malloc(se->nmem, sizeof(union data_types));
807                         normalize_mergekey(s,
808                                 (sk->type == Metadata_sortkey_skiparticle));
809                         cluster->sortkeys[md->sortkey_offset]->text = s;
810                     }
811                 }
812             }
813             else if (md->merge == Metadata_merge_all || md->merge == Metadata_merge_no)
814             {
815                 newm->next = *wheretoput;
816                 *wheretoput = newm;
817             }
818             else if (md->merge == Metadata_merge_range)
819             {
820                 assert(md->type == Metadata_type_year);
821                 if (!*wheretoput)
822                 {
823                     *wheretoput = newm;
824                     (*wheretoput)->data.number.min = first;
825                     (*wheretoput)->data.number.max = last;
826                     if (sk)
827                         cluster->sortkeys[md->sortkey_offset] = &newm->data;
828                 }
829                 else
830                 {
831                     if (first < (*wheretoput)->data.number.min)
832                         (*wheretoput)->data.number.min = first;
833                     if (last > (*wheretoput)->data.number.max)
834                         (*wheretoput)->data.number.max = last;
835                 }
836 #ifdef GAGA
837                 if (sk)
838                 {
839                     union data_types *sdata = cluster->sortkeys[md->sortkey_offset];
840                     yaz_log(YLOG_LOG, "SK range: %d-%d", sdata->number.min, sdata->number.max);
841                 }
842 #endif
843             }
844             else
845                 yaz_log(YLOG_WARN, "Don't know how to merge on element name %s", md->name);
846
847             if (md->rank)
848                 relevance_countwords(se->relevance, cluster, 
849                                      (char *) value, md->rank);
850             if (md->termlist)
851             {
852                 if (md->type == Metadata_type_year)
853                 {
854                     char year[64];
855                     sprintf(year, "%d", last);
856                     add_facet(se, (char *) type, year);
857                     if (first != last)
858                     {
859                         sprintf(year, "%d", first);
860                         add_facet(se, (char *) type, year);
861                     }
862                 }
863                 else
864                     add_facet(se, (char *) type, (char *) value);
865             }
866             xmlFree(type);
867             xmlFree(value);
868             type = value = 0;
869         }
870         else
871             yaz_log(YLOG_WARN, "Unexpected element %s in internal record", n->name);
872     }
873     if (type)
874         xmlFree(type);
875     if (value)
876         xmlFree(value);
877
878     xmlFreeDoc(xdoc);
879
880     relevance_donerecord(se->relevance, cluster);
881     se->total_records++;
882
883     return res;
884 }
885
886 // Retrieve first defined value for 'name' for given database.
887 // Will be extended to take into account user associated with session
888 char *session_setting_oneval(struct session_database *db, int offset)
889 {
890     if (!db->settings[offset])
891         return "";
892     return db->settings[offset]->value;
893 }
894
895 static void ingest_records(struct client *cl, Z_Records *r)
896 {
897 #if USE_TIMING
898     yaz_timing_t t = yaz_timing_create();
899 #endif
900     struct record *rec;
901     struct session *s = cl->session;
902     Z_NamePlusRecordList *rlist;
903     int i;
904
905     if (r->which != Z_Records_DBOSD)
906         return;
907     rlist = r->u.databaseOrSurDiagnostics;
908     for (i = 0; i < rlist->num_records; i++)
909     {
910         Z_NamePlusRecord *npr = rlist->records[i];
911
912         cl->records++;
913         if (npr->which != Z_NamePlusRecord_databaseRecord)
914         {
915             yaz_log(YLOG_WARN, 
916                     "Unexpected record type, probably diagnostic %s",
917                     cl->database->database->url);
918             continue;
919         }
920
921         rec = ingest_record(cl, npr->u.databaseRecord);
922         if (!rec)
923             continue;
924     }
925     if (s->watchlist[SESSION_WATCH_RECORDS].fun && rlist->num_records)
926         session_alert_watch(s, SESSION_WATCH_RECORDS);
927
928 #if USE_TIMING
929     yaz_timing_stop(t);
930     yaz_log(YLOG_LOG, "ingest_records %6.5f %3.2f %3.2f", 
931             yaz_timing_get_real(t), yaz_timing_get_user(t),
932             yaz_timing_get_sys(t));
933     yaz_timing_destroy(&t);
934 #endif
935 }
936
937 static void do_presentResponse(IOCHAN i, Z_APDU *a)
938 {
939     struct connection *co = iochan_getdata(i);
940     struct client *cl = co->client;
941     Z_PresentResponse *r = a->u.presentResponse;
942
943     if (r->records) {
944         Z_Records *recs = r->records;
945         if (recs->which == Z_Records_NSD)
946         {
947             yaz_log(YLOG_WARN, "Non-surrogate diagnostic %s",
948                     cl->database->database->url);
949             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
950             cl->state = Client_Error;
951         }
952     }
953
954     if (!*r->presentStatus && cl->state != Client_Error)
955     {
956         yaz_log(YLOG_DEBUG, "Good Present response %s",
957                 cl->database->database->url);
958         ingest_records(cl, r->records);
959         cl->state = Client_Idle;
960     }
961     else if (*r->presentStatus) 
962     {
963         yaz_log(YLOG_WARN, "Bad Present response %s",
964                 cl->database->database->url);
965         cl->state = Client_Error;
966     }
967 }
968
969 static void handler(IOCHAN i, int event)
970 {
971     struct connection *co = iochan_getdata(i);
972     struct client *cl = co->client;
973     struct session *se = 0;
974
975     if (cl)
976         se = cl->session;
977     else
978     {
979         yaz_log(YLOG_WARN, "Destroying orphan connection");
980         connection_destroy(co);
981         return;
982     }
983
984     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
985     {
986         int errcode;
987         socklen_t errlen = sizeof(errcode);
988
989         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
990             &errlen) < 0 || errcode != 0)
991         {
992             client_fatal(cl);
993             return;
994         }
995         else
996         {
997             yaz_log(YLOG_DEBUG, "Connect OK");
998             co->state = Conn_Open;
999             if (cl)
1000                 cl->state = Client_Connected;
1001         }
1002     }
1003
1004     else if (event & EVENT_INPUT)
1005     {
1006         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
1007
1008         if (len < 0)
1009         {
1010             yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from %s", 
1011                     cl->database->database->url);
1012             connection_destroy(co);
1013             return;
1014         }
1015         else if (len == 0)
1016         {
1017             yaz_log(YLOG_WARN, "EOF reading from %s", cl->database->database->url);
1018             connection_destroy(co);
1019             return;
1020         }
1021         else if (len > 1) // We discard input if we have no connection
1022         {
1023             co->state = Conn_Open;
1024
1025             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
1026             {
1027                 Z_APDU *a;
1028
1029                 odr_reset(global_parameters.odr_in);
1030                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
1031                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
1032                 {
1033                     client_fatal(cl);
1034                     return;
1035                 }
1036                 switch (a->which)
1037                 {
1038                     case Z_APDU_initResponse:
1039                         do_initResponse(i, a);
1040                         break;
1041                     case Z_APDU_searchResponse:
1042                         do_searchResponse(i, a);
1043                         break;
1044                     case Z_APDU_presentResponse:
1045                         do_presentResponse(i, a);
1046                         break;
1047                     case Z_APDU_close:
1048                         do_closeResponse(i, a);
1049                         break;
1050                     default:
1051                         yaz_log(YLOG_WARN, 
1052                                 "Unexpected Z39.50 response from %s",  
1053                                 cl->database->database->url);
1054                         client_fatal(cl);
1055                         return;
1056                 }
1057                 // We aren't expecting staggered output from target
1058                 // if (cs_more(t->link))
1059                 //    iochan_setevent(i, EVENT_INPUT);
1060             }
1061             else  // we throw away response and go to idle mode
1062             {
1063                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
1064                 cl->state = Client_Idle;
1065             }
1066         }
1067         /* if len==1 we do nothing but wait for more input */
1068     }
1069
1070     if (cl->state == Client_Connected) {
1071         send_init(i);
1072     }
1073
1074     if (cl->state == Client_Idle)
1075     {
1076         if (cl->requestid != se->requestid && *se->query) {
1077             send_search(i);
1078         }
1079         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
1080             cl->records < cl->hits) {
1081             send_present(i);
1082         }
1083     }
1084 }
1085
1086 // Disassociate connection from client
1087 static void connection_release(struct connection *co)
1088 {
1089     struct client *cl = co->client;
1090
1091     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
1092     if (!cl)
1093         return;
1094     cl->connection = 0;
1095     co->client = 0;
1096 }
1097
1098 // Close connection and recycle structure
1099 static void connection_destroy(struct connection *co)
1100 {
1101     struct host *h = co->host;
1102     cs_close(co->link);
1103     iochan_destroy(co->iochan);
1104
1105     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
1106     if (h->connections == co)
1107         h->connections = co->next;
1108     else
1109     {
1110         struct connection *pco;
1111         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
1112             ;
1113         if (pco)
1114             pco->next = co->next;
1115         else
1116             abort();
1117     }
1118     if (co->client)
1119     {
1120         if (co->client->state != Client_Idle)
1121             co->client->state = Client_Disconnected;
1122         co->client->connection = 0;
1123     }
1124     co->next = connection_freelist;
1125     connection_freelist = co;
1126 }
1127
1128 // Creates a new connection for client, associated with the host of 
1129 // client's database
1130 static struct connection *connection_create(struct client *cl)
1131 {
1132     struct connection *new;
1133     COMSTACK link; 
1134     int res;
1135     void *addr;
1136
1137
1138     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
1139         {
1140             yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
1141             exit(1);
1142         }
1143     
1144     if (0 == strlen(global_parameters.zproxy_override)){
1145         /* no Z39.50 proxy needed - direct connect */
1146         yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->database->url);
1147         
1148         if (!(addr = cs_straddr(link, cl->database->database->host->ipport)))
1149             {
1150                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1151                         "Lookup of IP address %s failed", 
1152                         cl->database->database->host->ipport);
1153                 return 0;
1154             }
1155     
1156     } else {
1157         /* Z39.50 proxy connect */
1158         yaz_log(YLOG_DEBUG, "Connection create %s proxy %s", 
1159                 cl->database->database->url, global_parameters.zproxy_override);
1160
1161         if (!(addr = cs_straddr(link, global_parameters.zproxy_override)))
1162             {
1163                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1164                         "Lookup of IP address %s failed", 
1165                         global_parameters.zproxy_override);
1166                 return 0;
1167             }
1168     }
1169
1170     res = cs_connect(link, addr);
1171     if (res < 0)
1172     {
1173         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->database->url);
1174         return 0;
1175     }
1176
1177     if ((new = connection_freelist))
1178         connection_freelist = new->next;
1179     else
1180     {
1181         new = xmalloc(sizeof (struct connection));
1182         new->ibuf = 0;
1183         new->ibufsize = 0;
1184     }
1185     new->state = Conn_Connecting;
1186     new->host = cl->database->database->host;
1187     new->next = new->host->connections;
1188     new->host->connections = new;
1189     new->client = cl;
1190     cl->connection = new;
1191     new->link = link;
1192
1193     new->iochan = iochan_create(cs_fileno(link), handler, 0);
1194     iochan_setdata(new->iochan, new);
1195     new->iochan->next = channel_list;
1196     channel_list = new->iochan;
1197     return new;
1198 }
1199
1200 // Close connection and set state to error
1201 static void client_fatal(struct client *cl)
1202 {
1203     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->database->url);
1204     connection_destroy(cl->connection);
1205     cl->state = Client_Error;
1206 }
1207
1208 // Ensure that client has a connection associated
1209 static int client_prep_connection(struct client *cl)
1210 {
1211     struct connection *co;
1212     struct session *se = cl->session;
1213     struct host *host = cl->database->database->host;
1214
1215     co = cl->connection;
1216
1217     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->database->url);
1218
1219     if (!co)
1220     {
1221         // See if someone else has an idle connection
1222         // We should look at timestamps here to select the longest-idle connection
1223         for (co = host->connections; co; co = co->next)
1224             if (co->state == Conn_Open && (!co->client || co->client->session != se))
1225                 break;
1226         if (co)
1227         {
1228             connection_release(co);
1229             cl->connection = co;
1230             co->client = cl;
1231         }
1232         else
1233             co = connection_create(cl);
1234     }
1235     if (co)
1236     {
1237         if (co->state == Conn_Connecting)
1238         {
1239             cl->state = Client_Connecting;
1240             iochan_setflag(co->iochan, EVENT_OUTPUT);
1241         }
1242         else if (co->state == Conn_Open)
1243         {
1244             if (cl->state == Client_Error || cl->state == Client_Disconnected)
1245                 cl->state = Client_Idle;
1246             iochan_setflag(co->iochan, EVENT_OUTPUT);
1247         }
1248         return 1;
1249     }
1250     else
1251         return 0;
1252 }
1253
1254 // Parse the query given the settings specific to this client
1255 static int client_parse_query(struct client *cl)
1256 {
1257     struct session *se = cl->session;
1258     struct ccl_rpn_node *cn;
1259     int cerror, cpos;
1260
1261     cn = ccl_find_str(cl->database->database->ccl_map, se->query, &cerror, &cpos);
1262     if (!cn)
1263     {
1264         cl->state = Client_Error;
1265         yaz_log(YLOG_WARN, "Failed to parse query for %s",
1266                          cl->database->database->url);
1267         return -1;
1268     }
1269     wrbuf_rewind(se->wrbuf);
1270     ccl_pquery(se->wrbuf, cn);
1271     wrbuf_putc(se->wrbuf, '\0');
1272     if (cl->pquery)
1273         xfree(cl->pquery);
1274     cl->pquery = xstrdup(wrbuf_buf(se->wrbuf));
1275
1276     if (!se->relevance)
1277     {
1278         // Initialize relevance structure with query terms
1279         char *p[512];
1280         extract_terms(se->nmem, cn, p);
1281         se->relevance = relevance_create(se->nmem, (const char **) p,
1282                 se->expected_maxrecs);
1283     }
1284
1285     ccl_rpn_delete(cn);
1286     return 0;
1287 }
1288
1289 static struct client *client_create(void)
1290 {
1291     struct client *r;
1292     if (client_freelist)
1293     {
1294         r = client_freelist;
1295         client_freelist = client_freelist->next;
1296     }
1297     else
1298         r = xmalloc(sizeof(struct client));
1299     r->pquery = 0;
1300     r->database = 0;
1301     r->connection = 0;
1302     r->session = 0;
1303     r->hits = 0;
1304     r->records = 0;
1305     r->setno = 0;
1306     r->requestid = -1;
1307     r->diagnostic = 0;
1308     r->state = Client_Disconnected;
1309     r->next = 0;
1310     return r;
1311 }
1312
1313 void client_destroy(struct client *c)
1314 {
1315     struct session *se = c->session;
1316     if (c == se->clients)
1317         se->clients = c->next;
1318     else
1319     {
1320         struct client *cc;
1321         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
1322             ;
1323         if (cc)
1324             cc->next = c->next;
1325     }
1326     if (c->connection)
1327         connection_release(c->connection);
1328     c->next = client_freelist;
1329     client_freelist = c;
1330 }
1331
1332 void session_set_watch(struct session *s, int what, session_watchfun fun, void *data)
1333 {
1334     s->watchlist[what].fun = fun;
1335     s->watchlist[what].data = data;
1336 }
1337
1338 void session_alert_watch(struct session *s, int what)
1339 {
1340     if (!s->watchlist[what].fun)
1341         return;
1342     (*s->watchlist[what].fun)(s->watchlist[what].data);
1343     s->watchlist[what].fun = 0;
1344     s->watchlist[what].data = 0;
1345 }
1346
1347 //callback for grep_databases
1348 static void select_targets_callback(void *context, struct session_database *db)
1349 {
1350     struct session *se = (struct session*) context;
1351     struct client *cl = client_create();
1352     cl->database = db;
1353     cl->session = se;
1354     cl->next = se->clients;
1355     se->clients = cl;
1356 }
1357
1358 // Associates a set of clients with a session;
1359 // Note: Session-databases represent databases with per-session setting overrides
1360 int select_targets(struct session *se, struct database_criterion *crit)
1361 {
1362     while (se->clients)
1363         client_destroy(se->clients);
1364
1365     return session_grep_databases(se, crit, select_targets_callback);
1366 }
1367
1368 int session_active_clients(struct session *s)
1369 {
1370     struct client *c;
1371     int res = 0;
1372
1373     for (c = s->clients; c; c = c->next)
1374         if (c->connection && (c->state == Client_Connecting ||
1375                     c->state == Client_Initializing ||
1376                     c->state == Client_Searching ||
1377                     c->state == Client_Presenting))
1378             res++;
1379
1380     return res;
1381 }
1382
1383 // parses crit1=val1,crit2=val2|val3,...
1384 static struct database_criterion *parse_filter(NMEM m, const char *buf)
1385 {
1386     struct database_criterion *res = 0;
1387     char **values;
1388     int num;
1389     int i;
1390
1391     if (!buf || !*buf)
1392         return 0;
1393     nmem_strsplit(m, ",", buf,  &values, &num);
1394     for (i = 0; i < num; i++)
1395     {
1396         char **subvalues;
1397         int subnum;
1398         int subi;
1399         struct database_criterion *new = nmem_malloc(m, sizeof(*new));
1400         char *eq = strchr(values[i], '=');
1401         if (!eq)
1402         {
1403             yaz_log(YLOG_WARN, "Missing equal-sign in filter");
1404             return 0;
1405         }
1406         *(eq++) = '\0';
1407         new->name = values[i];
1408         nmem_strsplit(m, "|", eq, &subvalues, &subnum);
1409         new->values = 0;
1410         for (subi = 0; subi < subnum; subi++)
1411         {
1412             struct database_criterion_value *newv = nmem_malloc(m, sizeof(*newv));
1413             newv->value = subvalues[subi];
1414             newv->next = new->values;
1415             new->values = newv;
1416         }
1417         new->next = res;
1418         res = new;
1419     }
1420     return res;
1421 }
1422
1423 char *search(struct session *se, char *query, char *filter)
1424 {
1425     int live_channels = 0;
1426     struct client *cl;
1427     struct database_criterion *criteria;
1428
1429     yaz_log(YLOG_DEBUG, "Search");
1430
1431     nmem_reset(se->nmem);
1432     criteria = parse_filter(se->nmem, filter);
1433     strcpy(se->query, query);
1434     se->requestid++;
1435     live_channels = select_targets(se, criteria);
1436     if (live_channels)
1437     {
1438         int maxrecs = live_channels * global_parameters.toget;
1439         se->num_termlists = 0;
1440         se->reclist = reclist_create(se->nmem, maxrecs);
1441         // This will be initialized in send_search()
1442         se->total_records = se->total_hits = se->total_merged = 0;
1443         se->expected_maxrecs = maxrecs;
1444     }
1445     else
1446         return "NOTARGETS";
1447
1448     se->relevance = 0;
1449     for (cl = se->clients; cl; cl = cl->next)
1450         if (client_parse_query(cl) < 0)  // Query must parse for all targets
1451             return "QUERY";
1452     for (cl = se->clients; cl; cl = cl->next)
1453         client_prep_connection(cl);
1454
1455     return 0;
1456 }
1457
1458 // Apply a session override to a database
1459 void session_apply_setting(struct session *se, char *dbname, char *setting, char *value)
1460 {
1461     struct session_database *sdb;
1462
1463     for (sdb = se->databases; sdb; sdb = sdb->next)
1464         if (!strcmp(dbname, sdb->database->url))
1465         {
1466             struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
1467             int offset = settings_offset(setting);
1468
1469             if (offset < 0)
1470             {
1471                 yaz_log(YLOG_WARN, "Unknown setting %s", setting);
1472                 return;
1473             }
1474             new->precedence = 0;
1475             new->target = dbname;
1476             new->name = setting;
1477             new->value = value;
1478             new->next = sdb->settings[offset];
1479             sdb->settings[offset] = new;
1480             break;
1481         }
1482     if (!sdb)
1483         yaz_log(YLOG_WARN, "Unknown database in setting override: %s", dbname);
1484 }
1485
1486 void session_init_databases_fun(void *context, struct database *db)
1487 {
1488     struct session *se = (struct session *) context;
1489     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
1490     int num = settings_num();
1491     int i;
1492
1493     new->database = db;
1494     new->settings = nmem_malloc(se->session_nmem, sizeof(struct settings *) * num);
1495     for (i = 0; i < num; i++)
1496         new->settings[i] = db->settings[i];
1497     new->next = se->databases;
1498     se->databases = new;
1499 }
1500
1501 // Initialize session_database list -- this represents this session's view
1502 // of the database list -- subject to modification by the settings ws command
1503 void session_init_databases(struct session *se)
1504 {
1505     se->databases = 0;
1506     grep_databases(se, 0, session_init_databases_fun);
1507 }
1508
1509 void destroy_session(struct session *s)
1510 {
1511     yaz_log(YLOG_LOG, "Destroying session");
1512     while (s->clients)
1513         client_destroy(s->clients);
1514     nmem_destroy(s->nmem);
1515     wrbuf_destroy(s->wrbuf);
1516 }
1517
1518 struct session *new_session(NMEM nmem) 
1519 {
1520     int i;
1521     struct session *session = nmem_malloc(nmem, sizeof(*session));
1522
1523     yaz_log(YLOG_DEBUG, "New Pazpar2 session");
1524     
1525     session->total_hits = 0;
1526     session->total_records = 0;
1527     session->num_termlists = 0;
1528     session->reclist = 0;
1529     session->requestid = -1;
1530     session->clients = 0;
1531     session->expected_maxrecs = 0;
1532     session->query[0] = '\0';
1533     session->session_nmem = nmem;
1534     session->nmem = nmem_create();
1535     session->wrbuf = wrbuf_alloc();
1536     session_init_databases(session);
1537     for (i = 0; i <= SESSION_WATCH_MAX; i++)
1538     {
1539         session->watchlist[i].data = 0;
1540         session->watchlist[i].fun = 0;
1541     }
1542
1543     return session;
1544 }
1545
1546 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1547 {
1548     static struct hitsbytarget res[1000]; // FIXME MM
1549     struct client *cl;
1550
1551     *count = 0;
1552     for (cl = se->clients; cl; cl = cl->next)
1553     {
1554         char *name = session_setting_oneval(cl->database, PZ_NAME);
1555
1556         res[*count].id = cl->database->database->url;
1557         res[*count].name = *name ? name : "Unknown";
1558         res[*count].hits = cl->hits;
1559         res[*count].records = cl->records;
1560         res[*count].diagnostic = cl->diagnostic;
1561         res[*count].state = client_states[cl->state];
1562         res[*count].connected  = cl->connection ? 1 : 0;
1563         (*count)++;
1564     }
1565
1566     return res;
1567 }
1568
1569 struct termlist_score **termlist(struct session *s, const char *name, int *num)
1570 {
1571     int i;
1572
1573     for (i = 0; i < s->num_termlists; i++)
1574         if (!strcmp((const char *) s->termlists[i].name, name))
1575             return termlist_highscore(s->termlists[i].termlist, num);
1576     return 0;
1577 }
1578
1579 #ifdef MISSING_HEADERS
1580 void report_nmem_stats(void)
1581 {
1582     size_t in_use, is_free;
1583
1584     nmem_get_memory_in_use(&in_use);
1585     nmem_get_memory_free(&is_free);
1586
1587     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
1588             (long) in_use, (long) is_free);
1589 }
1590 #endif
1591
1592 struct record_cluster *show_single(struct session *s, int id)
1593 {
1594     struct record_cluster *r;
1595
1596     reclist_rewind(s->reclist);
1597     while ((r = reclist_read_record(s->reclist)))
1598         if (r->recid == id)
1599             return r;
1600     return 0;
1601 }
1602
1603 struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, int start,
1604         int *num, int *total, int *sumhits, NMEM nmem_show)
1605 {
1606     struct record_cluster **recs = nmem_malloc(nmem_show, *num 
1607                                        * sizeof(struct record_cluster *));
1608     struct reclist_sortparms *spp;
1609     int i;
1610 #if USE_TIMING    
1611     yaz_timing_t t = yaz_timing_create();
1612 #endif
1613
1614     for (spp = sp; spp; spp = spp->next)
1615         if (spp->type == Metadata_sortkey_relevance)
1616         {
1617             relevance_prepare_read(s->relevance, s->reclist);
1618             break;
1619         }
1620     reclist_sort(s->reclist, sp);
1621
1622     *total = s->reclist->num_records;
1623     *sumhits = s->total_hits;
1624
1625     for (i = 0; i < start; i++)
1626         if (!reclist_read_record(s->reclist))
1627         {
1628             *num = 0;
1629             recs = 0;
1630             break;
1631         }
1632
1633     for (i = 0; i < *num; i++)
1634     {
1635         struct record_cluster *r = reclist_read_record(s->reclist);
1636         if (!r)
1637         {
1638             *num = i;
1639             break;
1640         }
1641         recs[i] = r;
1642     }
1643 #if USE_TIMING
1644     yaz_timing_stop(t);
1645     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
1646             yaz_timing_get_real(t), yaz_timing_get_user(t),
1647             yaz_timing_get_sys(t));
1648     yaz_timing_destroy(&t);
1649 #endif
1650     return recs;
1651 }
1652
1653 void statistics(struct session *se, struct statistics *stat)
1654 {
1655     struct client *cl;
1656     int count = 0;
1657
1658     memset(stat, 0, sizeof(*stat));
1659     for (cl = se->clients; cl; cl = cl->next)
1660     {
1661         if (!cl->connection)
1662             stat->num_no_connection++;
1663         switch (cl->state)
1664         {
1665             case Client_Connecting: stat->num_connecting++; break;
1666             case Client_Initializing: stat->num_initializing++; break;
1667             case Client_Searching: stat->num_searching++; break;
1668             case Client_Presenting: stat->num_presenting++; break;
1669             case Client_Idle: stat->num_idle++; break;
1670             case Client_Failed: stat->num_failed++; break;
1671             case Client_Error: stat->num_error++; break;
1672             default: break;
1673         }
1674         count++;
1675     }
1676     stat->num_hits = se->total_hits;
1677     stat->num_records = se->total_records;
1678
1679     stat->num_clients = count;
1680 }
1681
1682 void start_http_listener(void)
1683 {
1684     char hp[128] = "";
1685     struct conf_server *ser = global_parameters.server;
1686
1687     if (*global_parameters.listener_override)
1688         strcpy(hp, global_parameters.listener_override);
1689     else
1690     {
1691         strcpy(hp, ser->host ? ser->host : "");
1692         if (ser->port)
1693         {
1694             if (*hp)
1695                 strcat(hp, ":");
1696             sprintf(hp + strlen(hp), "%d", ser->port);
1697         }
1698     }
1699     http_init(hp);
1700 }
1701
1702 void start_proxy(void)
1703 {
1704     char hp[128] = "";
1705     struct conf_server *ser = global_parameters.server;
1706
1707     if (*global_parameters.proxy_override)
1708         strcpy(hp, global_parameters.proxy_override);
1709     else if (ser->proxy_host || ser->proxy_port)
1710     {
1711         strcpy(hp, ser->proxy_host ? ser->proxy_host : "");
1712         if (ser->proxy_port)
1713         {
1714             if (*hp)
1715                 strcat(hp, ":");
1716             sprintf(hp + strlen(hp), "%d", ser->proxy_port);
1717         }
1718     }
1719     else
1720         return;
1721
1722     http_set_proxyaddr(hp, ser->myurl ? ser->myurl : "");
1723 }
1724
1725 void start_zproxy(void)
1726 {
1727     struct conf_server *ser = global_parameters.server;
1728
1729     if (*global_parameters.zproxy_override){
1730         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1731                 global_parameters.zproxy_override);
1732         return;
1733     }
1734
1735     else if (ser->zproxy_host || ser->zproxy_port)
1736     {
1737         char hp[128] = "";
1738
1739         strcpy(hp, ser->zproxy_host ? ser->zproxy_host : "");
1740         if (ser->zproxy_port)
1741         {
1742             if (*hp)
1743                 strcat(hp, ":");
1744             else
1745                 strcat(hp, "@:");
1746
1747             sprintf(hp + strlen(hp), "%d", ser->zproxy_port);
1748         }
1749         strcpy(global_parameters.zproxy_override, hp);
1750         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1751                 global_parameters.zproxy_override);
1752
1753     }
1754     else
1755         return;
1756 }
1757
1758
1759
1760 /*
1761  * Local variables:
1762  * c-basic-offset: 4
1763  * indent-tabs-mode: nil
1764  * End:
1765  * vim: shiftwidth=4 tabstop=8 expandtab
1766  */