removed some of the most obvious exit() statements, which are now with the dynamic...
[pazpar2-moved-to-github.git] / src / logic.c
1 /* $Id: logic.c,v 1.19 2007-04-23 08:15:22 marc Exp $
2    Copyright (c) 2006-2007, Index Data.
3
4 This file is part of Pazpar2.
5
6 Pazpar2 is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Pazpar2; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 // This file contains the primary business logic. Several parts of it should
23 // Eventually be factored into separate modules.
24
25 #include <stdlib.h>
26 #include <stdio.h>
27 #include <string.h>
28 #include <sys/time.h>
29 #include <unistd.h>
30 #include <sys/socket.h>
31 #include <netdb.h>
32 #include <signal.h>
33 #include <ctype.h>
34 #include <assert.h>
35
36 #include <yaz/marcdisp.h>
37 #include <yaz/comstack.h>
38 #include <yaz/tcpip.h>
39 #include <yaz/proto.h>
40 #include <yaz/readconf.h>
41 #include <yaz/pquery.h>
42 #include <yaz/otherinfo.h>
43 #include <yaz/yaz-util.h>
44 #include <yaz/nmem.h>
45 #include <yaz/query-charset.h>
46 #include <yaz/querytowrbuf.h>
47 #if YAZ_VERSIONL >= 0x020163
48 #include <yaz/oid_db.h>
49 #endif
50
51 #if HAVE_CONFIG_H
52 #include "cconfig.h"
53 #endif
54
55 #define USE_TIMING 0
56 #if USE_TIMING
57 #include <yaz/timing.h>
58 #endif
59
60 #include <netinet/in.h>
61
62 #include "pazpar2.h"
63 #include "eventl.h"
64 #include "http.h"
65 #include "termlists.h"
66 #include "reclists.h"
67 #include "relevance.h"
68 #include "config.h"
69 #include "database.h"
70 #include "settings.h"
71
72 #define MAX_CHUNK 15
73
74 static void client_fatal(struct client *cl);
75 static void connection_destroy(struct connection *co);
76 static int client_prep_connection(struct client *cl);
77 static void ingest_records(struct client *cl, Z_Records *r);
78 void session_alert_watch(struct session *s, int what);
79
80 static struct connection *connection_freelist = 0;
81 static struct client *client_freelist = 0;
82
83 static char *client_states[] = {
84     "Client_Connecting",
85     "Client_Connected",
86     "Client_Idle",
87     "Client_Initializing",
88     "Client_Searching",
89     "Client_Presenting",
90     "Client_Error",
91     "Client_Failed",
92     "Client_Disconnected",
93     "Client_Stopped"
94 };
95
96 // Note: Some things in this structure will eventually move to configuration
97 struct parameters global_parameters = 
98 {
99     "",
100     "",
101     "",
102     "",
103     0,
104     0,
105     30,
106     "81",
107     "Index Data PazPar2",
108     VERSION,
109     600, // 10 minutes
110     60,
111     100,
112     MAX_CHUNK,
113     0,
114     0
115 };
116
117 static int send_apdu(struct client *c, Z_APDU *a)
118 {
119     struct connection *co = c->connection;
120     char *buf;
121     int len, r;
122
123     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
124     {
125         odr_perror(global_parameters.odr_out, "Encoding APDU");
126         abort();
127     }
128     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
129     r = cs_put(co->link, buf, len);
130     if (r < 0)
131     {
132         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
133         return -1;
134     }
135     else if (r == 1)
136     {
137         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
138         exit(1);
139     }
140     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
141     co->state = Conn_Waiting;
142     return 0;
143 }
144
145 // Set authentication token in init if one is set for the client
146 // TODO: Extend this to handle other schemes than open (should be simple)
147 static void init_authentication(struct client *cl, Z_InitRequest *req)
148 {
149     struct session_database *sdb = cl->database;
150     char *auth = session_setting_oneval(sdb, PZ_AUTHENTICATION);
151
152     if (auth)
153     {
154         Z_IdAuthentication *idAuth = odr_malloc(global_parameters.odr_out,
155                 sizeof(*idAuth));
156         idAuth->which = Z_IdAuthentication_open;
157         idAuth->u.open = auth;
158         req->idAuthentication = idAuth;
159     }
160 }
161
162 static void send_init(IOCHAN i)
163 {
164
165     struct connection *co = iochan_getdata(i);
166     struct client *cl = co->client;
167     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
168
169     a->u.initRequest->implementationId = global_parameters.implementationId;
170     a->u.initRequest->implementationName = global_parameters.implementationName;
171     a->u.initRequest->implementationVersion =
172         global_parameters.implementationVersion;
173     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
174     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
175     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
176
177     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
178     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
179     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
180
181     init_authentication(cl, a->u.initRequest);
182
183     /* add virtual host if tunneling through Z39.50 proxy */
184     
185     if (0 < strlen(global_parameters.zproxy_override) 
186         && 0 < strlen(cl->database->database->url))
187     {
188 #if YAZ_VERSIONL >= 0x020163
189         yaz_oi_set_string_oid(&a->u.initRequest->otherInfo,
190                               global_parameters.odr_out,
191                               yaz_oid_userinfo_proxy,
192                               1, cl->database->database->url);
193 #else
194         yaz_oi_set_string_oidval(&a->u.initRequest->otherInfo,
195                                  global_parameters.odr_out, VAL_PROXY,
196                                  1, cl->database->database->url);
197 #endif
198     }
199
200     if (send_apdu(cl, a) >= 0)
201     {
202         iochan_setflags(i, EVENT_INPUT);
203         cl->state = Client_Initializing;
204     }
205     else
206         cl->state = Client_Error;
207     odr_reset(global_parameters.odr_out);
208 }
209
210 // Recursively traverse query structure to extract terms.
211 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
212 {
213     char **words;
214     int numwords;
215     int i;
216
217     switch (n->kind)
218     {
219         case CCL_RPN_AND:
220         case CCL_RPN_OR:
221         case CCL_RPN_NOT:
222         case CCL_RPN_PROX:
223             pull_terms(nmem, n->u.p[0], termlist, num);
224             pull_terms(nmem, n->u.p[1], termlist, num);
225             break;
226         case CCL_RPN_TERM:
227             nmem_strsplit(nmem, " ", n->u.t.term, &words, &numwords);
228             for (i = 0; i < numwords; i++)
229                 termlist[(*num)++] = words[i];
230             break;
231         default: // NOOP
232             break;
233     }
234 }
235
236 // Extract terms from query into null-terminated termlist
237 static void extract_terms(NMEM nmem, struct ccl_rpn_node *query, char **termlist)
238 {
239     int num = 0;
240
241     pull_terms(nmem, query, termlist, &num);
242     termlist[num] = 0;
243 }
244
245 static void send_search(IOCHAN i)
246 {
247     struct connection *co = iochan_getdata(i);
248     struct client *cl = co->client; 
249     struct session *se = cl->session;
250     struct session_database *sdb = cl->database;
251     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
252     int ndb;
253     char **databaselist;
254     Z_Query *zquery;
255     int ssub = 0, lslb = 100000, mspn = 10;
256     char *recsyn = 0;
257     char *piggyback = 0;
258     char *queryenc = 0;
259     yaz_iconv_t iconv = 0;
260
261     yaz_log(YLOG_DEBUG, "Sending search to %s", cl->database->database->url);
262
263     // constructing RPN query
264     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
265             sizeof(Z_Query));
266     zquery->which = Z_Query_type_1;
267     zquery->u.type_1 = p_query_rpn(global_parameters.odr_out, cl->pquery);
268
269     // converting to target encoding
270     if ((queryenc = session_setting_oneval(sdb, PZ_QUERYENCODING))){
271         iconv = yaz_iconv_open(queryenc, "UTF-8");
272         if (iconv){
273             yaz_query_charset_convert_rpnquery(zquery->u.type_1, 
274                                                global_parameters.odr_out, 
275                                                iconv);
276             yaz_iconv_close(iconv);
277         } else
278             yaz_log(YLOG_WARN, "Query encoding failed %s %s", 
279                     cl->database->database->url, queryenc);
280     }
281
282     for (ndb = 0; sdb->database->databases[ndb]; ndb++)
283         ;
284     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
285     for (ndb = 0; sdb->database->databases[ndb]; ndb++)
286         databaselist[ndb] = sdb->database->databases[ndb];
287
288     if (!(piggyback = session_setting_oneval(sdb, PZ_PIGGYBACK)) || *piggyback == '1')
289     {
290         if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
291         {
292 #if YAZ_VERSIONL >= 0x020163
293             a->u.searchRequest->preferredRecordSyntax =
294                 yaz_string_to_oid_odr(yaz_oid_std(),
295                                       CLASS_RECSYN, recsyn,
296                                       global_parameters.odr_out);
297 #else
298             a->u.searchRequest->preferredRecordSyntax =
299                 yaz_str_to_z3950oid(global_parameters.odr_out,
300                                     CLASS_RECSYN, recsyn);
301 #endif
302         }
303         a->u.searchRequest->smallSetUpperBound = &ssub;
304         a->u.searchRequest->largeSetLowerBound = &lslb;
305         a->u.searchRequest->mediumSetPresentNumber = &mspn;
306     }
307     a->u.searchRequest->resultSetName = "Default";
308     a->u.searchRequest->databaseNames = databaselist;
309     a->u.searchRequest->num_databaseNames = ndb;
310
311     
312     {  //scope for sending and logging queries 
313         WRBUF wbquery = wrbuf_alloc();
314         yaz_query_to_wrbuf(wbquery, zquery);
315
316
317         if (send_apdu(cl, a) >= 0)
318             {
319                 iochan_setflags(i, EVENT_INPUT);
320                 cl->state = Client_Searching;
321                 cl->requestid = se->requestid;
322                 yaz_log(YLOG_LOG, "SearchRequest %s %s %s", 
323                          cl->database->database->url,
324                         queryenc ? queryenc : "UTF-8",
325                         wrbuf_cstr(wbquery));
326             }
327         else {
328             cl->state = Client_Error;
329                 yaz_log(YLOG_WARN, "Failed SearchRequest %s  %s %s", 
330                          cl->database->database->url, 
331                         queryenc ? queryenc : "UTF-8",
332                         wrbuf_cstr(wbquery));
333         }
334         
335         wrbuf_destroy(wbquery);
336     }    
337
338     odr_reset(global_parameters.odr_out);
339 }
340
341 static void send_present(IOCHAN i)
342 {
343     struct connection *co = iochan_getdata(i);
344     struct client *cl = co->client; 
345     struct session_database *sdb = cl->database;
346     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
347     int toget;
348     int start = cl->records + 1;
349     char *recsyn;
350
351     toget = global_parameters.chunk;
352     if (toget > global_parameters.toget - cl->records)
353         toget = global_parameters.toget - cl->records;
354     if (toget > cl->hits - cl->records)
355         toget = cl->hits - cl->records;
356
357     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
358
359     a->u.presentRequest->resultSetStartPoint = &start;
360     a->u.presentRequest->numberOfRecordsRequested = &toget;
361
362     a->u.presentRequest->resultSetId = "Default";
363
364     if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
365     {
366 #if YAZ_VERSIONL >= 0x020163
367         a->u.presentRequest->preferredRecordSyntax =
368             yaz_string_to_oid_odr(yaz_oid_std(),
369                                   CLASS_RECSYN, recsyn,
370                                   global_parameters.odr_out);
371 #else
372         a->u.presentRequest->preferredRecordSyntax =
373             yaz_str_to_z3950oid(global_parameters.odr_out,
374                                 CLASS_RECSYN, recsyn);
375 #endif
376     }
377
378     if (send_apdu(cl, a) >= 0)
379     {
380         iochan_setflags(i, EVENT_INPUT);
381         cl->state = Client_Presenting;
382     }
383     else
384         cl->state = Client_Error;
385     odr_reset(global_parameters.odr_out);
386 }
387
388 static void do_initResponse(IOCHAN i, Z_APDU *a)
389 {
390     struct connection *co = iochan_getdata(i);
391     struct client *cl = co->client;
392     Z_InitResponse *r = a->u.initResponse;
393
394     yaz_log(YLOG_DEBUG, "Init response %s", cl->database->database->url);
395
396     if (*r->result)
397     {
398         cl->state = Client_Idle;
399     }
400     else
401         cl->state = Client_Failed; // FIXME need to do something to the connection
402 }
403
404 static void do_searchResponse(IOCHAN i, Z_APDU *a)
405 {
406     struct connection *co = iochan_getdata(i);
407     struct client *cl = co->client;
408     struct session *se = cl->session;
409     Z_SearchResponse *r = a->u.searchResponse;
410
411     yaz_log(YLOG_DEBUG, "Search response %s (status=%d)", 
412             cl->database->database->url, *r->searchStatus);
413
414     if (*r->searchStatus)
415     {
416         cl->hits = *r->resultCount;
417         se->total_hits += cl->hits;
418         if (r->presentStatus && !*r->presentStatus && r->records)
419         {
420             yaz_log(YLOG_DEBUG, "Records in search response %s", 
421                     cl->database->database->url);
422             ingest_records(cl, r->records);
423         }
424         cl->state = Client_Idle;
425     }
426     else
427     {          /*"FAILED"*/
428         cl->hits = 0;
429         cl->state = Client_Error;
430         if (r->records) {
431             Z_Records *recs = r->records;
432             if (recs->which == Z_Records_NSD)
433             {
434                 yaz_log(YLOG_WARN, 
435                         "Search response: Non-surrogate diagnostic %s",
436                         cl->database->database->url);
437                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
438                 cl->state = Client_Error;
439             }
440         }
441     }
442 }
443
444 static void do_closeResponse(IOCHAN i, Z_APDU *a)
445 {
446     struct connection *co = iochan_getdata(i);
447     struct client *cl = co->client;
448     /* Z_Close *r = a->u.close; */
449
450     yaz_log(YLOG_WARN, "Close response %s", cl->database->database->url);
451
452     cl->state = Client_Failed;
453     connection_destroy(co);
454 }
455
456
457 char *normalize_mergekey(char *buf, int skiparticle)
458 {
459     char *p = buf, *pout = buf;
460
461     if (skiparticle)
462     {
463         char firstword[64];
464         char articles[] = "the den der die des an a "; // must end in space
465
466         while (*p && !isalnum(*p))
467             p++;
468         pout = firstword;
469         while (*p && *p != ' ' && pout - firstword < 62)
470             *(pout++) = tolower(*(p++));
471         *(pout++) = ' ';
472         *(pout++) = '\0';
473         if (!strstr(articles, firstword))
474             p = buf;
475         pout = buf;
476     }
477
478     while (*p)
479     {
480         while (*p && !isalnum(*p))
481             p++;
482         while (isalnum(*p))
483             *(pout++) = tolower(*(p++));
484         if (*p)
485             *(pout++) = ' ';
486         while (*p && !isalnum(*p))
487             p++;
488     }
489     if (buf != pout)
490         do {
491             *(pout--) = '\0';
492         }
493         while (pout > buf && *pout == ' ');
494
495     return buf;
496 }
497
498 static void add_facet(struct session *s, const char *type, const char *value)
499 {
500     int i;
501
502     if (!*value)
503         return;
504     for (i = 0; i < s->num_termlists; i++)
505         if (!strcmp(s->termlists[i].name, type))
506             break;
507     if (i == s->num_termlists)
508     {
509         if (i == SESSION_MAX_TERMLISTS)
510         {
511             yaz_log(YLOG_FATAL, "Too many termlists");
512             return;
513         }
514
515         s->termlists[i].name = nmem_strdup(s->nmem, type);
516         s->termlists[i].termlist = termlist_create(s->nmem, s->expected_maxrecs, 15);
517         s->num_termlists = i + 1;
518     }
519     termlist_insert(s->termlists[i].termlist, value);
520 }
521
522 static xmlDoc *normalize_record(struct client *cl, Z_External *rec)
523 {
524     struct database_retrievalmap *m;
525     struct session_database *sdb = cl->database;
526     struct database *db = sdb->database;
527     xmlNode *res;
528     xmlDoc *rdoc;
529
530     // First normalize to XML
531     if (sdb->yaz_marc)
532     {
533         char *buf;
534         int len;
535         if (rec->which != Z_External_octet)
536         {
537             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER %s",
538                     db->url);
539             return 0;
540         }
541         buf = (char*) rec->u.octet_aligned->buf;
542         len = rec->u.octet_aligned->len;
543         if (yaz_marc_read_iso2709(sdb->yaz_marc, buf, len) < 0)
544         {
545             yaz_log(YLOG_WARN, "Failed to decode MARC %s", db->url);
546             return 0;
547         }
548
549         yaz_marc_write_using_libxml2(sdb->yaz_marc, 1);
550         if (yaz_marc_write_xml(sdb->yaz_marc, &res,
551                     "http://www.loc.gov/MARC21/slim", 0, 0) < 0)
552         {
553             yaz_log(YLOG_WARN, "Failed to encode as XML %s",
554                     db->url);
555             return 0;
556         }
557         rdoc = xmlNewDoc((xmlChar *) "1.0");
558         xmlDocSetRootElement(rdoc, res);
559
560     }
561     else
562     {
563         yaz_log(YLOG_FATAL, 
564                 "Unknown native_syntax in normalize_record from %s",
565                 db->url);
566         return 0;
567     }
568
569     if (global_parameters.dump_records){
570         fprintf(stderr, 
571                 "Input Record (normalized) from %s\n----------------\n",
572                 db->url);
573 #if LIBXML_VERSION >= 20600
574         xmlDocFormatDump(stderr, rdoc, 1);
575 #else
576         xmlDocDump(stderr, rdoc);
577 #endif
578     }
579
580     for (m = sdb->map; m; m = m->next){
581         xmlDoc *new = 0;
582
583 #if 1
584         {
585             xmlNodePtr root = 0;
586             new = xsltApplyStylesheet(m->stylesheet, rdoc, 0);
587             root= xmlDocGetRootElement(new);
588         if (!new || !root || !(root->children))
589         {
590             yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
591                     cl->database->database->url);
592             xmlFreeDoc(new);
593             xmlFreeDoc(rdoc);
594             return 0;
595         }
596         }
597 #endif
598
599 #if 0
600         // do it another way to detect transformation errors right now
601         // but does not seem to work either!
602         {
603             xsltTransformContextPtr ctxt;
604             ctxt = xsltNewTransformContext(m->stylesheet, rdoc);
605             new = xsltApplyStylesheetUser(m->stylesheet, rdoc, 0, 0, 0, ctxt);
606             if ((ctxt->state == XSLT_STATE_ERROR) ||
607                 (ctxt->state == XSLT_STATE_STOPPED)){
608                 yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
609                         cl->database->database->url);
610                 xmlFreeDoc(new);
611                 xmlFreeDoc(rdoc);
612                 return 0;
613             }
614         }
615 #endif      
616    
617         xmlFreeDoc(rdoc);
618         rdoc = new;
619     }
620     if (global_parameters.dump_records)
621     {
622         fprintf(stderr, "Record from %s\n----------------\n", 
623                 cl->database->database->url);
624 #if LIBXML_VERSION >= 20600
625         xmlDocFormatDump(stderr, rdoc, 1);
626 #else
627         xmlDocDump(stderr, rdoc);
628 #endif
629     }
630     return rdoc;
631 }
632
633 // Extract what appears to be years from buf, storing highest and
634 // lowest values.
635 static int extract_years(const char *buf, int *first, int *last)
636 {
637     *first = -1;
638     *last = -1;
639     while (*buf)
640     {
641         const char *e;
642         int len;
643
644         while (*buf && !isdigit(*buf))
645             buf++;
646         len = 0;
647         for (e = buf; *e && isdigit(*e); e++)
648             len++;
649         if (len == 4)
650         {
651             int value = atoi(buf);
652             if (*first < 0 || value < *first)
653                 *first = value;
654             if (*last < 0 || value > *last)
655                 *last = value;
656         }
657         buf = e;
658     }
659     return *first;
660 }
661
662 static struct record *ingest_record(struct client *cl, Z_External *rec)
663 {
664     xmlDoc *xdoc = normalize_record(cl, rec);
665     xmlNode *root, *n;
666     struct record *res;
667     struct record_cluster *cluster;
668     struct session *se = cl->session;
669     xmlChar *mergekey, *mergekey_norm;
670     xmlChar *type = 0;
671     xmlChar *value = 0;
672     struct conf_service *service = global_parameters.server->service;
673
674     if (!xdoc)
675         return 0;
676
677     root = xmlDocGetRootElement(xdoc);
678     if (!(mergekey = xmlGetProp(root, (xmlChar *) "mergekey")))
679     {
680         yaz_log(YLOG_WARN, "No mergekey found in record");
681         xmlFreeDoc(xdoc);
682         return 0;
683     }
684
685     res = nmem_malloc(se->nmem, sizeof(struct record));
686     res->next = 0;
687     res->client = cl;
688     res->metadata = nmem_malloc(se->nmem,
689             sizeof(struct record_metadata*) * service->num_metadata);
690     memset(res->metadata, 0, sizeof(struct record_metadata*) * service->num_metadata);
691
692     mergekey_norm = (xmlChar *) nmem_strdup(se->nmem, (char*) mergekey);
693     xmlFree(mergekey);
694     normalize_mergekey((char *) mergekey_norm, 0);
695
696     cluster = reclist_insert(se->reclist, 
697                              global_parameters.server->service, 
698                              res, (char *) mergekey_norm, 
699                              &se->total_merged);
700     if (global_parameters.dump_records)
701         yaz_log(YLOG_LOG, "Cluster id %d from %s (#%d)", cluster->recid,
702                 cl->database->database->url, cl->records);
703     if (!cluster)
704     {
705         /* no room for record */
706         xmlFreeDoc(xdoc);
707         return 0;
708     }
709     relevance_newrec(se->relevance, cluster);
710
711     for (n = root->children; n; n = n->next)
712     {
713         if (type)
714             xmlFree(type);
715         if (value)
716             xmlFree(value);
717         type = value = 0;
718
719         if (n->type != XML_ELEMENT_NODE)
720             continue;
721         if (!strcmp((const char *) n->name, "metadata"))
722         {
723             struct conf_metadata *md = 0;
724             struct conf_sortkey *sk = 0;
725             struct record_metadata **wheretoput, *newm;
726             int imeta;
727             int first, last;
728
729             type = xmlGetProp(n, (xmlChar *) "type");
730             value = xmlNodeListGetString(xdoc, n->children, 0);
731
732             if (!type || !value)
733                 continue;
734
735             // First, find out what field we're looking at
736             for (imeta = 0; imeta < service->num_metadata; imeta++)
737                 if (!strcmp((const char *) type, service->metadata[imeta].name))
738                 {
739                     md = &service->metadata[imeta];
740                     if (md->sortkey_offset >= 0)
741                         sk = &service->sortkeys[md->sortkey_offset];
742                     break;
743                 }
744             if (!md)
745             {
746                 yaz_log(YLOG_WARN, "Ignoring unknown metadata element: %s", type);
747                 continue;
748             }
749
750             // Find out where we are putting it
751             if (md->merge == Metadata_merge_no)
752                 wheretoput = &res->metadata[imeta];
753             else
754                 wheretoput = &cluster->metadata[imeta];
755             
756             // Put it there
757             newm = nmem_malloc(se->nmem, sizeof(struct record_metadata));
758             newm->next = 0;
759             if (md->type == Metadata_type_generic)
760             {
761                 char *p, *pe;
762                 for (p = (char *) value; *p && isspace(*p); p++)
763                     ;
764                 for (pe = p + strlen(p) - 1;
765                         pe > p && strchr(" ,/.:([", *pe); pe--)
766                     *pe = '\0';
767                 newm->data.text = nmem_strdup(se->nmem, p);
768
769             }
770             else if (md->type == Metadata_type_year)
771             {
772                 if (extract_years((char *) value, &first, &last) < 0)
773                     continue;
774             }
775             else
776             {
777                 yaz_log(YLOG_WARN, "Unknown type in metadata element %s", type);
778                 continue;
779             }
780             if (md->type == Metadata_type_year && md->merge != Metadata_merge_range)
781             {
782                 yaz_log(YLOG_WARN, "Only range merging supported for years");
783                 continue;
784             }
785             if (md->merge == Metadata_merge_unique)
786             {
787                 struct record_metadata *mnode;
788                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
789                     if (!strcmp((const char *) mnode->data.text, newm->data.text))
790                         break;
791                 if (!mnode)
792                 {
793                     newm->next = *wheretoput;
794                     *wheretoput = newm;
795                 }
796             }
797             else if (md->merge == Metadata_merge_longest)
798             {
799                 if (!*wheretoput ||
800                         strlen(newm->data.text) > strlen((*wheretoput)->data.text))
801                 {
802                     *wheretoput = newm;
803                     if (sk)
804                     {
805                         char *s = nmem_strdup(se->nmem, newm->data.text);
806                         if (!cluster->sortkeys[md->sortkey_offset])
807                             cluster->sortkeys[md->sortkey_offset] = 
808                                 nmem_malloc(se->nmem, sizeof(union data_types));
809                         normalize_mergekey(s,
810                                 (sk->type == Metadata_sortkey_skiparticle));
811                         cluster->sortkeys[md->sortkey_offset]->text = s;
812                     }
813                 }
814             }
815             else if (md->merge == Metadata_merge_all || md->merge == Metadata_merge_no)
816             {
817                 newm->next = *wheretoput;
818                 *wheretoput = newm;
819             }
820             else if (md->merge == Metadata_merge_range)
821             {
822                 assert(md->type == Metadata_type_year);
823                 if (!*wheretoput)
824                 {
825                     *wheretoput = newm;
826                     (*wheretoput)->data.number.min = first;
827                     (*wheretoput)->data.number.max = last;
828                     if (sk)
829                         cluster->sortkeys[md->sortkey_offset] = &newm->data;
830                 }
831                 else
832                 {
833                     if (first < (*wheretoput)->data.number.min)
834                         (*wheretoput)->data.number.min = first;
835                     if (last > (*wheretoput)->data.number.max)
836                         (*wheretoput)->data.number.max = last;
837                 }
838 #ifdef GAGA
839                 if (sk)
840                 {
841                     union data_types *sdata = cluster->sortkeys[md->sortkey_offset];
842                     yaz_log(YLOG_LOG, "SK range: %d-%d", sdata->number.min, sdata->number.max);
843                 }
844 #endif
845             }
846             else
847                 yaz_log(YLOG_WARN, "Don't know how to merge on element name %s", md->name);
848
849             if (md->rank)
850                 relevance_countwords(se->relevance, cluster, 
851                                      (char *) value, md->rank);
852             if (md->termlist)
853             {
854                 if (md->type == Metadata_type_year)
855                 {
856                     char year[64];
857                     sprintf(year, "%d", last);
858                     add_facet(se, (char *) type, year);
859                     if (first != last)
860                     {
861                         sprintf(year, "%d", first);
862                         add_facet(se, (char *) type, year);
863                     }
864                 }
865                 else
866                     add_facet(se, (char *) type, (char *) value);
867             }
868             xmlFree(type);
869             xmlFree(value);
870             type = value = 0;
871         }
872         else
873             yaz_log(YLOG_WARN, "Unexpected element %s in internal record", n->name);
874     }
875     if (type)
876         xmlFree(type);
877     if (value)
878         xmlFree(value);
879
880     xmlFreeDoc(xdoc);
881
882     relevance_donerecord(se->relevance, cluster);
883     se->total_records++;
884
885     return res;
886 }
887
888 // Retrieve first defined value for 'name' for given database.
889 // Will be extended to take into account user associated with session
890 char *session_setting_oneval(struct session_database *db, int offset)
891 {
892     if (!db->settings[offset])
893         return "";
894     return db->settings[offset]->value;
895 }
896
897 static void ingest_records(struct client *cl, Z_Records *r)
898 {
899 #if USE_TIMING
900     yaz_timing_t t = yaz_timing_create();
901 #endif
902     struct record *rec;
903     struct session *s = cl->session;
904     Z_NamePlusRecordList *rlist;
905     int i;
906
907     if (r->which != Z_Records_DBOSD)
908         return;
909     rlist = r->u.databaseOrSurDiagnostics;
910     for (i = 0; i < rlist->num_records; i++)
911     {
912         Z_NamePlusRecord *npr = rlist->records[i];
913
914         cl->records++;
915         if (npr->which != Z_NamePlusRecord_databaseRecord)
916         {
917             yaz_log(YLOG_WARN, 
918                     "Unexpected record type, probably diagnostic %s",
919                     cl->database->database->url);
920             continue;
921         }
922
923         rec = ingest_record(cl, npr->u.databaseRecord);
924         if (!rec)
925             continue;
926     }
927     if (s->watchlist[SESSION_WATCH_RECORDS].fun && rlist->num_records)
928         session_alert_watch(s, SESSION_WATCH_RECORDS);
929
930 #if USE_TIMING
931     yaz_timing_stop(t);
932     yaz_log(YLOG_LOG, "ingest_records %6.5f %3.2f %3.2f", 
933             yaz_timing_get_real(t), yaz_timing_get_user(t),
934             yaz_timing_get_sys(t));
935     yaz_timing_destroy(&t);
936 #endif
937 }
938
939 static void do_presentResponse(IOCHAN i, Z_APDU *a)
940 {
941     struct connection *co = iochan_getdata(i);
942     struct client *cl = co->client;
943     Z_PresentResponse *r = a->u.presentResponse;
944
945     if (r->records) {
946         Z_Records *recs = r->records;
947         if (recs->which == Z_Records_NSD)
948         {
949             yaz_log(YLOG_WARN, "Non-surrogate diagnostic %s",
950                     cl->database->database->url);
951             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
952             cl->state = Client_Error;
953         }
954     }
955
956     if (!*r->presentStatus && cl->state != Client_Error)
957     {
958         yaz_log(YLOG_DEBUG, "Good Present response %s",
959                 cl->database->database->url);
960         ingest_records(cl, r->records);
961         cl->state = Client_Idle;
962     }
963     else if (*r->presentStatus) 
964     {
965         yaz_log(YLOG_WARN, "Bad Present response %s",
966                 cl->database->database->url);
967         cl->state = Client_Error;
968     }
969 }
970
971 void connection_handler(IOCHAN i, int event)
972 {
973     struct connection *co = iochan_getdata(i);
974     struct client *cl = co->client;
975     struct session *se = 0;
976
977     if (cl)
978         se = cl->session;
979     else
980     {
981         yaz_log(YLOG_WARN, "Destroying orphan connection");
982         connection_destroy(co);
983         return;
984     }
985
986     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
987     {
988         int errcode;
989         socklen_t errlen = sizeof(errcode);
990
991         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
992             &errlen) < 0 || errcode != 0)
993         {
994             client_fatal(cl);
995             return;
996         }
997         else
998         {
999             yaz_log(YLOG_DEBUG, "Connect OK");
1000             co->state = Conn_Open;
1001             if (cl)
1002                 cl->state = Client_Connected;
1003         }
1004     }
1005
1006     else if (event & EVENT_INPUT)
1007     {
1008         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
1009
1010         if (len < 0)
1011         {
1012             yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from %s", 
1013                     cl->database->database->url);
1014             connection_destroy(co);
1015             return;
1016         }
1017         else if (len == 0)
1018         {
1019             yaz_log(YLOG_WARN, "EOF reading from %s", cl->database->database->url);
1020             connection_destroy(co);
1021             return;
1022         }
1023         else if (len > 1) // We discard input if we have no connection
1024         {
1025             co->state = Conn_Open;
1026
1027             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
1028             {
1029                 Z_APDU *a;
1030
1031                 odr_reset(global_parameters.odr_in);
1032                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
1033                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
1034                 {
1035                     client_fatal(cl);
1036                     return;
1037                 }
1038                 switch (a->which)
1039                 {
1040                     case Z_APDU_initResponse:
1041                         do_initResponse(i, a);
1042                         break;
1043                     case Z_APDU_searchResponse:
1044                         do_searchResponse(i, a);
1045                         break;
1046                     case Z_APDU_presentResponse:
1047                         do_presentResponse(i, a);
1048                         break;
1049                     case Z_APDU_close:
1050                         do_closeResponse(i, a);
1051                         break;
1052                     default:
1053                         yaz_log(YLOG_WARN, 
1054                                 "Unexpected Z39.50 response from %s",  
1055                                 cl->database->database->url);
1056                         client_fatal(cl);
1057                         return;
1058                 }
1059                 // We aren't expecting staggered output from target
1060                 // if (cs_more(t->link))
1061                 //    iochan_setevent(i, EVENT_INPUT);
1062             }
1063             else  // we throw away response and go to idle mode
1064             {
1065                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
1066                 cl->state = Client_Idle;
1067             }
1068         }
1069         /* if len==1 we do nothing but wait for more input */
1070     }
1071
1072     if (cl->state == Client_Connected) {
1073         send_init(i);
1074     }
1075
1076     if (cl->state == Client_Idle)
1077     {
1078         if (cl->requestid != se->requestid && cl->pquery) {
1079             send_search(i);
1080         }
1081         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
1082             cl->records < cl->hits) {
1083             send_present(i);
1084         }
1085     }
1086 }
1087
1088 // Disassociate connection from client
1089 static void connection_release(struct connection *co)
1090 {
1091     struct client *cl = co->client;
1092
1093     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
1094     if (!cl)
1095         return;
1096     cl->connection = 0;
1097     co->client = 0;
1098 }
1099
1100 // Close connection and recycle structure
1101 static void connection_destroy(struct connection *co)
1102 {
1103     struct host *h = co->host;
1104     
1105     if (co->link)
1106     {
1107         cs_close(co->link);
1108         iochan_destroy(co->iochan);
1109     }
1110
1111     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
1112     if (h->connections == co)
1113         h->connections = co->next;
1114     else
1115     {
1116         struct connection *pco;
1117         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
1118             ;
1119         if (pco)
1120             pco->next = co->next;
1121         else
1122             abort();
1123     }
1124     if (co->client)
1125     {
1126         if (co->client->state != Client_Idle)
1127             co->client->state = Client_Disconnected;
1128         co->client->connection = 0;
1129     }
1130     co->next = connection_freelist;
1131     connection_freelist = co;
1132 }
1133
1134 static int connection_connect(struct connection *con)
1135 {
1136     COMSTACK link = 0;
1137     struct client *cl = con->client;
1138     struct host *host = con->host;
1139     void *addr;
1140     int res;
1141
1142     assert(host->ipport);
1143     assert(cl);
1144
1145     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
1146     {
1147         yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
1148         return -1;
1149     }
1150     
1151     if (0 == strlen(global_parameters.zproxy_override)){
1152         /* no Z39.50 proxy needed - direct connect */
1153         yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->database->url);
1154         
1155         if (!(addr = cs_straddr(link, host->ipport)))
1156         {
1157             yaz_log(YLOG_WARN|YLOG_ERRNO, 
1158                     "Lookup of IP address %s failed", host->ipport);
1159             return -1;
1160         }
1161         
1162     } else {
1163         /* Z39.50 proxy connect */
1164         yaz_log(YLOG_DEBUG, "Connection create %s proxy %s", 
1165                 cl->database->database->url, global_parameters.zproxy_override);
1166         
1167         if (!(addr = cs_straddr(link, global_parameters.zproxy_override)))
1168         {
1169             yaz_log(YLOG_WARN|YLOG_ERRNO, 
1170                     "Lookup of IP address %s failed", 
1171                     global_parameters.zproxy_override);
1172             return -1;
1173         }
1174     }
1175     
1176     res = cs_connect(link, addr);
1177     if (res < 0)
1178     {
1179         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->database->url);
1180         return -1;
1181     }
1182     con->link = link;
1183     con->state = Conn_Connecting;
1184     con->iochan = iochan_create(cs_fileno(link), connection_handler, 0);
1185     iochan_setdata(con->iochan, con);
1186     pazpar2_add_channel(con->iochan);
1187
1188     /* this fragment is bad DRY: from client_prep_connection */
1189     cl->state = Client_Connecting;
1190     iochan_setflag(con->iochan, EVENT_OUTPUT);
1191     return 0;
1192 }
1193
1194 void connect_resolver_host(struct host *host)
1195 {
1196     struct connection *con = host->connections;
1197
1198     while (con)
1199     {
1200         if (con->state == Conn_Resolving)
1201         {
1202             if (!host->ipport) /* unresolved */
1203             {
1204                 connection_destroy(con);
1205                 /* start all over .. at some point it will be NULL */
1206                 con = host->connections;
1207             }
1208             else if (!con->client)
1209             {
1210                 yaz_log(YLOG_WARN, "connect_unresolved_host : ophan client");
1211                 connection_destroy(con);
1212                 /* start all over .. at some point it will be NULL */
1213                 con = host->connections;
1214             }
1215             else
1216             {
1217                 connection_connect(con);
1218                 con = con->next;
1219             }
1220         }
1221     }
1222 }
1223
1224
1225 // Creates a new connection for client, associated with the host of 
1226 // client's database
1227 static struct connection *connection_create(struct client *cl)
1228 {
1229     struct connection *new;
1230     struct host *host = cl->database->database->host;
1231
1232     if ((new = connection_freelist))
1233         connection_freelist = new->next;
1234     else
1235     {
1236         new = xmalloc(sizeof (struct connection));
1237         new->ibuf = 0;
1238         new->ibufsize = 0;
1239     }
1240     new->host = host;
1241     new->next = new->host->connections;
1242     new->host->connections = new;
1243     new->client = cl;
1244     cl->connection = new;
1245     new->link = 0;
1246     new->state = Conn_Resolving;
1247     if (host->ipport)
1248         connection_connect(new);
1249     return new;
1250 }
1251
1252 // Close connection and set state to error
1253 static void client_fatal(struct client *cl)
1254 {
1255     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->database->url);
1256     connection_destroy(cl->connection);
1257     cl->state = Client_Error;
1258 }
1259
1260 // Ensure that client has a connection associated
1261 static int client_prep_connection(struct client *cl)
1262 {
1263     struct connection *co;
1264     struct session *se = cl->session;
1265     struct host *host = cl->database->database->host;
1266
1267     co = cl->connection;
1268
1269     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->database->url);
1270
1271     if (!co)
1272     {
1273         // See if someone else has an idle connection
1274         // We should look at timestamps here to select the longest-idle connection
1275         for (co = host->connections; co; co = co->next)
1276             if (co->state == Conn_Open && (!co->client || co->client->session != se))
1277                 break;
1278         if (co)
1279         {
1280             connection_release(co);
1281             cl->connection = co;
1282             co->client = cl;
1283         }
1284         else
1285             co = connection_create(cl);
1286     }
1287     if (co)
1288     {
1289         if (co->state == Conn_Connecting)
1290         {
1291             cl->state = Client_Connecting;
1292             iochan_setflag(co->iochan, EVENT_OUTPUT);
1293         }
1294         else if (co->state == Conn_Open)
1295         {
1296             if (cl->state == Client_Error || cl->state == Client_Disconnected)
1297                 cl->state = Client_Idle;
1298             iochan_setflag(co->iochan, EVENT_OUTPUT);
1299         }
1300         return 1;
1301     }
1302     else
1303         return 0;
1304 }
1305
1306 // Initialize YAZ Map structures for MARC-based targets
1307 static int prepare_yazmarc(struct session_database *sdb)
1308 {
1309     char *s;
1310
1311     if (!sdb->settings)
1312     {
1313         yaz_log(YLOG_WARN, "No settings for %s", sdb->database->url);
1314         return -1;
1315     }
1316     if ((s = session_setting_oneval(sdb, PZ_NATIVESYNTAX)) && !strncmp(s, "iso2709", 7))
1317     {
1318         char *encoding = "marc-8s", *e;
1319         yaz_iconv_t cm;
1320
1321         // See if a native encoding is specified
1322         if ((e = strchr(s, ';')))
1323             encoding = e + 1;
1324
1325         sdb->yaz_marc = yaz_marc_create();
1326         yaz_marc_subfield_str(sdb->yaz_marc, "\t");
1327         
1328         cm = yaz_iconv_open("utf-8", encoding);
1329         if (!cm)
1330         {
1331             yaz_log(YLOG_FATAL, 
1332                     "Unable to map from %s to UTF-8 for target %s", 
1333                     encoding, sdb->database->url);
1334             return -1;
1335         }
1336         yaz_marc_iconv(sdb->yaz_marc, cm);
1337     }
1338     return 0;
1339 }
1340
1341 // Prepare XSLT stylesheets for record normalization
1342 // Structures are allocated on the session_wide nmem to avoid having
1343 // to recompute this for every search. This would lead
1344 // to leaking if a single session was to repeatedly change the PZ_XSLT
1345 // setting. However, this is not a realistic use scenario.
1346 static int prepare_map(struct session *se, struct session_database *sdb)
1347 {
1348    char *s;
1349
1350     if (!sdb->settings)
1351     {
1352         yaz_log(YLOG_WARN, "No settings on %s", sdb->database->url);
1353         return -1;
1354     }
1355     if ((s = session_setting_oneval(sdb, PZ_XSLT)))
1356     {
1357         char **stylesheets;
1358         struct database_retrievalmap **m = &sdb->map;
1359         int num, i;
1360
1361         nmem_strsplit(se->session_nmem, ",", s, &stylesheets, &num);
1362         for (i = 0; i < num; i++)
1363         {
1364             (*m) = nmem_malloc(se->session_nmem, sizeof(**m));
1365             (*m)->next = 0;
1366             if (!((*m)->stylesheet = conf_load_stylesheet(stylesheets[i])))
1367             {
1368                 yaz_log(YLOG_FATAL, "Unable to load stylesheet: %s",
1369                         stylesheets[i]);
1370                 return -1;
1371             }
1372             m = &(*m)->next;
1373         }
1374     }
1375     if (!sdb->map)
1376         yaz_log(YLOG_WARN, "No Normalization stylesheet for target %s",
1377                 sdb->database->url);
1378     return 0;
1379 }
1380
1381 // This analyzes settings and recomputes any supporting data structures
1382 // if necessary.
1383 static int prepare_session_database(struct session *se, struct session_database *sdb)
1384 {
1385     if (!sdb->settings)
1386     {
1387         yaz_log(YLOG_WARN, "No settings associated with %s", sdb->database->url);
1388         return -1;
1389     }
1390     if (sdb->settings[PZ_NATIVESYNTAX] && !sdb->yaz_marc)
1391     {
1392         if (prepare_yazmarc(sdb) < 0)
1393             return -1;
1394     }
1395     if (sdb->settings[PZ_XSLT] && !sdb->map)
1396     {
1397         if (prepare_map(se, sdb) < 0)
1398             return -1;
1399     }
1400     return 0;
1401 }
1402
1403 // Initialize CCL map for a target
1404 static CCL_bibset prepare_cclmap(struct client *cl)
1405 {
1406     struct session_database *sdb = cl->database;
1407     struct setting *s;
1408     CCL_bibset res;
1409
1410     if (!sdb->settings)
1411         return 0;
1412     res = ccl_qual_mk();
1413     for (s = sdb->settings[PZ_CCLMAP]; s; s = s->next)
1414     {
1415         char *p = strchr(s->name + 3, ':');
1416         if (!p)
1417         {
1418             yaz_log(YLOG_WARN, "Malformed cclmap name: %s", s->name);
1419             ccl_qual_rm(&res);
1420             return 0;
1421         }
1422         p++;
1423         ccl_qual_fitem(res, s->value, p);
1424     }
1425     return res;
1426 }
1427
1428 // Parse the query given the settings specific to this client
1429 static int client_parse_query(struct client *cl, const char *query)
1430 {
1431     struct session *se = cl->session;
1432     struct ccl_rpn_node *cn;
1433     int cerror, cpos;
1434     CCL_bibset ccl_map = prepare_cclmap(cl);
1435
1436     if (!ccl_map)
1437         return -1;
1438     cn = ccl_find_str(ccl_map, query, &cerror, &cpos);
1439     ccl_qual_rm(&ccl_map);
1440     if (!cn)
1441     {
1442         cl->state = Client_Error;
1443         yaz_log(YLOG_WARN, "Failed to parse query for %s",
1444                          cl->database->database->url);
1445         return -1;
1446     }
1447     wrbuf_rewind(se->wrbuf);
1448     ccl_pquery(se->wrbuf, cn);
1449     wrbuf_putc(se->wrbuf, '\0');
1450     if (cl->pquery)
1451         xfree(cl->pquery);
1452     cl->pquery = xstrdup(wrbuf_buf(se->wrbuf));
1453
1454     if (!se->relevance)
1455     {
1456         // Initialize relevance structure with query terms
1457         char *p[512];
1458         extract_terms(se->nmem, cn, p);
1459         se->relevance = relevance_create(se->nmem, (const char **) p,
1460                 se->expected_maxrecs);
1461     }
1462
1463     ccl_rpn_delete(cn);
1464     return 0;
1465 }
1466
1467 static struct client *client_create(void)
1468 {
1469     struct client *r;
1470     if (client_freelist)
1471     {
1472         r = client_freelist;
1473         client_freelist = client_freelist->next;
1474     }
1475     else
1476         r = xmalloc(sizeof(struct client));
1477     r->pquery = 0;
1478     r->database = 0;
1479     r->connection = 0;
1480     r->session = 0;
1481     r->hits = 0;
1482     r->records = 0;
1483     r->setno = 0;
1484     r->requestid = -1;
1485     r->diagnostic = 0;
1486     r->state = Client_Disconnected;
1487     r->next = 0;
1488     return r;
1489 }
1490
1491 void client_destroy(struct client *c)
1492 {
1493     struct session *se = c->session;
1494     if (c == se->clients)
1495         se->clients = c->next;
1496     else
1497     {
1498         struct client *cc;
1499         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
1500             ;
1501         if (cc)
1502             cc->next = c->next;
1503     }
1504     if (c->connection)
1505         connection_release(c->connection);
1506     c->next = client_freelist;
1507     client_freelist = c;
1508 }
1509
1510 void session_set_watch(struct session *s, int what, session_watchfun fun, void *data)
1511 {
1512     s->watchlist[what].fun = fun;
1513     s->watchlist[what].data = data;
1514 }
1515
1516 void session_alert_watch(struct session *s, int what)
1517 {
1518     if (!s->watchlist[what].fun)
1519         return;
1520     (*s->watchlist[what].fun)(s->watchlist[what].data);
1521     s->watchlist[what].fun = 0;
1522     s->watchlist[what].data = 0;
1523 }
1524
1525 //callback for grep_databases
1526 static void select_targets_callback(void *context, struct session_database *db)
1527 {
1528     struct session *se = (struct session*) context;
1529     struct client *cl = client_create();
1530     cl->database = db;
1531     cl->session = se;
1532     cl->next = se->clients;
1533     se->clients = cl;
1534 }
1535
1536 // Associates a set of clients with a session;
1537 // Note: Session-databases represent databases with per-session setting overrides
1538 int select_targets(struct session *se, struct database_criterion *crit)
1539 {
1540     while (se->clients)
1541         client_destroy(se->clients);
1542
1543     return session_grep_databases(se, crit, select_targets_callback);
1544 }
1545
1546 int session_active_clients(struct session *s)
1547 {
1548     struct client *c;
1549     int res = 0;
1550
1551     for (c = s->clients; c; c = c->next)
1552         if (c->connection && (c->state == Client_Connecting ||
1553                     c->state == Client_Initializing ||
1554                     c->state == Client_Searching ||
1555                     c->state == Client_Presenting))
1556             res++;
1557
1558     return res;
1559 }
1560
1561 // parses crit1=val1,crit2=val2|val3,...
1562 static struct database_criterion *parse_filter(NMEM m, const char *buf)
1563 {
1564     struct database_criterion *res = 0;
1565     char **values;
1566     int num;
1567     int i;
1568
1569     if (!buf || !*buf)
1570         return 0;
1571     nmem_strsplit(m, ",", buf,  &values, &num);
1572     for (i = 0; i < num; i++)
1573     {
1574         char **subvalues;
1575         int subnum;
1576         int subi;
1577         struct database_criterion *new = nmem_malloc(m, sizeof(*new));
1578         char *eq = strchr(values[i], '=');
1579         if (!eq)
1580         {
1581             yaz_log(YLOG_WARN, "Missing equal-sign in filter");
1582             return 0;
1583         }
1584         *(eq++) = '\0';
1585         new->name = values[i];
1586         nmem_strsplit(m, "|", eq, &subvalues, &subnum);
1587         new->values = 0;
1588         for (subi = 0; subi < subnum; subi++)
1589         {
1590             struct database_criterion_value *newv = nmem_malloc(m, sizeof(*newv));
1591             newv->value = subvalues[subi];
1592             newv->next = new->values;
1593             new->values = newv;
1594         }
1595         new->next = res;
1596         res = new;
1597     }
1598     return res;
1599 }
1600
1601 char *search(struct session *se, char *query, char *filter)
1602 {
1603     int live_channels = 0;
1604     struct client *cl;
1605     struct database_criterion *criteria;
1606
1607     yaz_log(YLOG_DEBUG, "Search");
1608
1609     nmem_reset(se->nmem);
1610     criteria = parse_filter(se->nmem, filter);
1611     se->requestid++;
1612     live_channels = select_targets(se, criteria);
1613     if (live_channels)
1614     {
1615         int maxrecs = live_channels * global_parameters.toget;
1616         se->num_termlists = 0;
1617         se->reclist = reclist_create(se->nmem, maxrecs);
1618         // This will be initialized in send_search()
1619         se->total_records = se->total_hits = se->total_merged = 0;
1620         se->expected_maxrecs = maxrecs;
1621     }
1622     else
1623         return "NOTARGETS";
1624
1625     se->relevance = 0;
1626
1627     for (cl = se->clients; cl; cl = cl->next)
1628     {
1629         if (prepare_session_database(se, cl->database) < 0)
1630             return "CONFIG_ERROR";
1631         if (client_parse_query(cl, query) < 0)  // Query must parse for all targets
1632             return "QUERY";
1633     }
1634
1635     for (cl = se->clients; cl; cl = cl->next)
1636     {
1637         client_prep_connection(cl);
1638     }
1639
1640     return 0;
1641 }
1642
1643 // Creates a new session_database object for a database
1644 static void session_init_databases_fun(void *context, struct database *db)
1645 {
1646     struct session *se = (struct session *) context;
1647     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
1648     int num = settings_num();
1649     int i;
1650
1651     new->database = db;
1652     new->yaz_marc = 0;
1653     new->map = 0;
1654     new->settings = nmem_malloc(se->session_nmem, sizeof(struct settings *) * num);
1655     memset(new->settings, 0, sizeof(struct settings*) * num);
1656     if (db->settings)
1657     {
1658         for (i = 0; i < num; i++)
1659             new->settings[i] = db->settings[i];
1660     }
1661     new->next = se->databases;
1662     se->databases = new;
1663 }
1664
1665 // Doesn't free memory associated with sdb -- nmem takes care of that
1666 static void session_database_destroy(struct session_database *sdb)
1667 {
1668     struct database_retrievalmap *m;
1669
1670     for (m = sdb->map; m; m = m->next)
1671         xsltFreeStylesheet(m->stylesheet);
1672     if (sdb->yaz_marc)
1673         yaz_marc_destroy(sdb->yaz_marc);
1674 }
1675
1676 // Initialize session_database list -- this represents this session's view
1677 // of the database list -- subject to modification by the settings ws command
1678 void session_init_databases(struct session *se)
1679 {
1680     se->databases = 0;
1681     grep_databases(se, 0, session_init_databases_fun);
1682 }
1683
1684 // Probably session_init_databases_fun should be refactored instead of
1685 // called here.
1686 static struct session_database *load_session_database(struct session *se, char *id)
1687 {
1688     struct database *db = find_database(id, 0);
1689
1690     session_init_databases_fun((void*) se, db);
1691     // New sdb is head of se->databases list
1692     return se->databases;
1693 }
1694
1695 // Find an existing session database. If not found, load it
1696 static struct session_database *find_session_database(struct session *se, char *id)
1697 {
1698     struct session_database *sdb;
1699
1700     for (sdb = se->databases; sdb; sdb = sdb->next)
1701         if (!strcmp(sdb->database->url, id))
1702             return sdb;
1703     return load_session_database(se, id);
1704 }
1705
1706 // Apply a session override to a database
1707 void session_apply_setting(struct session *se, char *dbname, char *setting, char *value)
1708 {
1709     struct session_database *sdb = find_session_database(se, dbname);
1710     struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
1711     int offset = settings_offset(setting);
1712
1713     if (offset < 0)
1714     {
1715         yaz_log(YLOG_WARN, "Unknown setting %s", setting);
1716         return;
1717     }
1718     new->precedence = 0;
1719     new->target = dbname;
1720     new->name = setting;
1721     new->value = value;
1722     new->next = sdb->settings[offset];
1723     sdb->settings[offset] = new;
1724
1725     // Force later recompute of settings-driven data structures
1726     // (happens when a search starts and client connections are prepared)
1727     switch (offset)
1728     {
1729         case PZ_NATIVESYNTAX:
1730             if (sdb->yaz_marc)
1731             {
1732                 yaz_marc_destroy(sdb->yaz_marc);
1733                 sdb->yaz_marc = 0;
1734             }
1735             break;
1736         case PZ_XSLT:
1737             if (sdb->map)
1738             {
1739                 struct database_retrievalmap *m;
1740                 // We don't worry about the map structure -- it's in nmem
1741                 for (m = sdb->map; m; m = m->next)
1742                     xsltFreeStylesheet(m->stylesheet);
1743                 sdb->map = 0;
1744             }
1745             break;
1746     }
1747 }
1748
1749 void destroy_session(struct session *s)
1750 {
1751     struct session_database *sdb;
1752
1753     yaz_log(YLOG_LOG, "Destroying session");
1754     while (s->clients)
1755         client_destroy(s->clients);
1756     for (sdb = s->databases; sdb; sdb = sdb->next)
1757         session_database_destroy(sdb);
1758     nmem_destroy(s->nmem);
1759     wrbuf_destroy(s->wrbuf);
1760 }
1761
1762 struct session *new_session(NMEM nmem) 
1763 {
1764     int i;
1765     struct session *session = nmem_malloc(nmem, sizeof(*session));
1766
1767     yaz_log(YLOG_DEBUG, "New Pazpar2 session");
1768     
1769     session->total_hits = 0;
1770     session->total_records = 0;
1771     session->num_termlists = 0;
1772     session->reclist = 0;
1773     session->requestid = -1;
1774     session->clients = 0;
1775     session->expected_maxrecs = 0;
1776     session->session_nmem = nmem;
1777     session->nmem = nmem_create();
1778     session->wrbuf = wrbuf_alloc();
1779     session_init_databases(session);
1780     for (i = 0; i <= SESSION_WATCH_MAX; i++)
1781     {
1782         session->watchlist[i].data = 0;
1783         session->watchlist[i].fun = 0;
1784     }
1785
1786     return session;
1787 }
1788
1789 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1790 {
1791     static struct hitsbytarget res[1000]; // FIXME MM
1792     struct client *cl;
1793
1794     *count = 0;
1795     for (cl = se->clients; cl; cl = cl->next)
1796     {
1797         char *name = session_setting_oneval(cl->database, PZ_NAME);
1798
1799         res[*count].id = cl->database->database->url;
1800         res[*count].name = *name ? name : "Unknown";
1801         res[*count].hits = cl->hits;
1802         res[*count].records = cl->records;
1803         res[*count].diagnostic = cl->diagnostic;
1804         res[*count].state = client_states[cl->state];
1805         res[*count].connected  = cl->connection ? 1 : 0;
1806         (*count)++;
1807     }
1808
1809     return res;
1810 }
1811
1812 struct termlist_score **termlist(struct session *s, const char *name, int *num)
1813 {
1814     int i;
1815
1816     for (i = 0; i < s->num_termlists; i++)
1817         if (!strcmp((const char *) s->termlists[i].name, name))
1818             return termlist_highscore(s->termlists[i].termlist, num);
1819     return 0;
1820 }
1821
1822 #ifdef MISSING_HEADERS
1823 void report_nmem_stats(void)
1824 {
1825     size_t in_use, is_free;
1826
1827     nmem_get_memory_in_use(&in_use);
1828     nmem_get_memory_free(&is_free);
1829
1830     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
1831             (long) in_use, (long) is_free);
1832 }
1833 #endif
1834
1835 struct record_cluster *show_single(struct session *s, int id)
1836 {
1837     struct record_cluster *r;
1838
1839     reclist_rewind(s->reclist);
1840     while ((r = reclist_read_record(s->reclist)))
1841         if (r->recid == id)
1842             return r;
1843     return 0;
1844 }
1845
1846 struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, int start,
1847         int *num, int *total, int *sumhits, NMEM nmem_show)
1848 {
1849     struct record_cluster **recs = nmem_malloc(nmem_show, *num 
1850                                        * sizeof(struct record_cluster *));
1851     struct reclist_sortparms *spp;
1852     int i;
1853 #if USE_TIMING    
1854     yaz_timing_t t = yaz_timing_create();
1855 #endif
1856
1857     for (spp = sp; spp; spp = spp->next)
1858         if (spp->type == Metadata_sortkey_relevance)
1859         {
1860             relevance_prepare_read(s->relevance, s->reclist);
1861             break;
1862         }
1863     reclist_sort(s->reclist, sp);
1864
1865     *total = s->reclist->num_records;
1866     *sumhits = s->total_hits;
1867
1868     for (i = 0; i < start; i++)
1869         if (!reclist_read_record(s->reclist))
1870         {
1871             *num = 0;
1872             recs = 0;
1873             break;
1874         }
1875
1876     for (i = 0; i < *num; i++)
1877     {
1878         struct record_cluster *r = reclist_read_record(s->reclist);
1879         if (!r)
1880         {
1881             *num = i;
1882             break;
1883         }
1884         recs[i] = r;
1885     }
1886 #if USE_TIMING
1887     yaz_timing_stop(t);
1888     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
1889             yaz_timing_get_real(t), yaz_timing_get_user(t),
1890             yaz_timing_get_sys(t));
1891     yaz_timing_destroy(&t);
1892 #endif
1893     return recs;
1894 }
1895
1896 void statistics(struct session *se, struct statistics *stat)
1897 {
1898     struct client *cl;
1899     int count = 0;
1900
1901     memset(stat, 0, sizeof(*stat));
1902     for (cl = se->clients; cl; cl = cl->next)
1903     {
1904         if (!cl->connection)
1905             stat->num_no_connection++;
1906         switch (cl->state)
1907         {
1908             case Client_Connecting: stat->num_connecting++; break;
1909             case Client_Initializing: stat->num_initializing++; break;
1910             case Client_Searching: stat->num_searching++; break;
1911             case Client_Presenting: stat->num_presenting++; break;
1912             case Client_Idle: stat->num_idle++; break;
1913             case Client_Failed: stat->num_failed++; break;
1914             case Client_Error: stat->num_error++; break;
1915             default: break;
1916         }
1917         count++;
1918     }
1919     stat->num_hits = se->total_hits;
1920     stat->num_records = se->total_records;
1921
1922     stat->num_clients = count;
1923 }
1924
1925 void start_http_listener(void)
1926 {
1927     char hp[128] = "";
1928     struct conf_server *ser = global_parameters.server;
1929
1930     if (*global_parameters.listener_override)
1931         strcpy(hp, global_parameters.listener_override);
1932     else
1933     {
1934         strcpy(hp, ser->host ? ser->host : "");
1935         if (ser->port)
1936         {
1937             if (*hp)
1938                 strcat(hp, ":");
1939             sprintf(hp + strlen(hp), "%d", ser->port);
1940         }
1941     }
1942     http_init(hp);
1943 }
1944
1945 void start_proxy(void)
1946 {
1947     char hp[128] = "";
1948     struct conf_server *ser = global_parameters.server;
1949
1950     if (*global_parameters.proxy_override)
1951         strcpy(hp, global_parameters.proxy_override);
1952     else if (ser->proxy_host || ser->proxy_port)
1953     {
1954         strcpy(hp, ser->proxy_host ? ser->proxy_host : "");
1955         if (ser->proxy_port)
1956         {
1957             if (*hp)
1958                 strcat(hp, ":");
1959             sprintf(hp + strlen(hp), "%d", ser->proxy_port);
1960         }
1961     }
1962     else
1963         return;
1964
1965     http_set_proxyaddr(hp, ser->myurl ? ser->myurl : "");
1966 }
1967
1968 void start_zproxy(void)
1969 {
1970     struct conf_server *ser = global_parameters.server;
1971
1972     if (*global_parameters.zproxy_override){
1973         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1974                 global_parameters.zproxy_override);
1975         return;
1976     }
1977
1978     else if (ser->zproxy_host || ser->zproxy_port)
1979     {
1980         char hp[128] = "";
1981
1982         strcpy(hp, ser->zproxy_host ? ser->zproxy_host : "");
1983         if (ser->zproxy_port)
1984         {
1985             if (*hp)
1986                 strcat(hp, ":");
1987             else
1988                 strcat(hp, "@:");
1989
1990             sprintf(hp + strlen(hp), "%d", ser->zproxy_port);
1991         }
1992         strcpy(global_parameters.zproxy_override, hp);
1993         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1994                 global_parameters.zproxy_override);
1995
1996     }
1997     else
1998         return;
1999 }
2000
2001 // Master list of connections we're handling events to
2002 static IOCHAN channel_list = 0; 
2003 void pazpar2_add_channel(IOCHAN chan)
2004 {
2005     chan->next = channel_list;
2006     channel_list = chan;
2007 }
2008
2009 void pazpar2_event_loop()
2010 {
2011     event_loop(&channel_list);
2012 }
2013
2014 /*
2015  * Local variables:
2016  * c-basic-offset: 4
2017  * indent-tabs-mode: nil
2018  * End:
2019  * vim: shiftwidth=4 tabstop=8 expandtab
2020  */