*** empty log message ***
[pazpar2-moved-to-github.git] / src / logic.c
1 /* $Id: logic.c,v 1.13 2007-04-20 15:36:48 quinn Exp $
2    Copyright (c) 2006-2007, Index Data.
3
4 This file is part of Pazpar2.
5
6 Pazpar2 is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Pazpar2; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <string.h>
25 #include <sys/time.h>
26 #include <unistd.h>
27 #include <sys/socket.h>
28 #include <netdb.h>
29 #include <signal.h>
30 #include <ctype.h>
31 #include <assert.h>
32
33 #include <yaz/marcdisp.h>
34 #include <yaz/comstack.h>
35 #include <yaz/tcpip.h>
36 #include <yaz/proto.h>
37 #include <yaz/readconf.h>
38 #include <yaz/pquery.h>
39 #include <yaz/otherinfo.h>
40 #include <yaz/yaz-util.h>
41 #include <yaz/nmem.h>
42 #include <yaz/query-charset.h>
43 #include <yaz/querytowrbuf.h>
44 #if YAZ_VERSIONL >= 0x020163
45 #include <yaz/oid_db.h>
46 #endif
47
48 #if HAVE_CONFIG_H
49 #include "cconfig.h"
50 #endif
51
52 #define USE_TIMING 0
53 #if USE_TIMING
54 #include <yaz/timing.h>
55 #endif
56
57 #include <netinet/in.h>
58
59 #include "pazpar2.h"
60 #include "eventl.h"
61 #include "http.h"
62 #include "termlists.h"
63 #include "reclists.h"
64 #include "relevance.h"
65 #include "config.h"
66 #include "database.h"
67 #include "settings.h"
68
69 #define MAX_CHUNK 15
70
71 static void client_fatal(struct client *cl);
72 static void connection_destroy(struct connection *co);
73 static int client_prep_connection(struct client *cl);
74 static void ingest_records(struct client *cl, Z_Records *r);
75 void session_alert_watch(struct session *s, int what);
76
77 static struct connection *connection_freelist = 0;
78 static struct client *client_freelist = 0;
79
80 static char *client_states[] = {
81     "Client_Connecting",
82     "Client_Connected",
83     "Client_Idle",
84     "Client_Initializing",
85     "Client_Searching",
86     "Client_Presenting",
87     "Client_Error",
88     "Client_Failed",
89     "Client_Disconnected",
90     "Client_Stopped"
91 };
92
93 // Note: Some things in this structure will eventually move to configuration
94 struct parameters global_parameters = 
95 {
96     "",
97     "",
98     "",
99     "",
100     0,
101     0,
102     30,
103     "81",
104     "Index Data PazPar2",
105     VERSION,
106     600, // 10 minutes
107     60,
108     100,
109     MAX_CHUNK,
110     0,
111     0
112 };
113
114 static int send_apdu(struct client *c, Z_APDU *a)
115 {
116     struct connection *co = c->connection;
117     char *buf;
118     int len, r;
119
120     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
121     {
122         odr_perror(global_parameters.odr_out, "Encoding APDU");
123         abort();
124     }
125     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
126     r = cs_put(co->link, buf, len);
127     if (r < 0)
128     {
129         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
130         return -1;
131     }
132     else if (r == 1)
133     {
134         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
135         exit(1);
136     }
137     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
138     co->state = Conn_Waiting;
139     return 0;
140 }
141
142 // Set authentication token in init if one is set for the client
143 // TODO: Extend this to handle other schemes than open (should be simple)
144 static void init_authentication(struct client *cl, Z_InitRequest *req)
145 {
146     struct session_database *sdb = cl->database;
147     char *auth = session_setting_oneval(sdb, PZ_AUTHENTICATION);
148
149     if (auth)
150     {
151         Z_IdAuthentication *idAuth = odr_malloc(global_parameters.odr_out,
152                 sizeof(*idAuth));
153         idAuth->which = Z_IdAuthentication_open;
154         idAuth->u.open = auth;
155         req->idAuthentication = idAuth;
156     }
157 }
158
159 static void send_init(IOCHAN i)
160 {
161
162     struct connection *co = iochan_getdata(i);
163     struct client *cl = co->client;
164     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
165
166     a->u.initRequest->implementationId = global_parameters.implementationId;
167     a->u.initRequest->implementationName = global_parameters.implementationName;
168     a->u.initRequest->implementationVersion =
169         global_parameters.implementationVersion;
170     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
171     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
172     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
173
174     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
175     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
176     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
177
178     init_authentication(cl, a->u.initRequest);
179
180     /* add virtual host if tunneling through Z39.50 proxy */
181     
182     if (0 < strlen(global_parameters.zproxy_override) 
183         && 0 < strlen(cl->database->database->url))
184     {
185 #if YAZ_VERSIONL >= 0x020163
186         yaz_oi_set_string_oid(&a->u.initRequest->otherInfo,
187                               global_parameters.odr_out,
188                               yaz_oid_userinfo_proxy,
189                               1, cl->database->database->url);
190 #else
191         yaz_oi_set_string_oidval(&a->u.initRequest->otherInfo,
192                                  global_parameters.odr_out, VAL_PROXY,
193                                  1, cl->database->database->url);
194 #endif
195     }
196
197     if (send_apdu(cl, a) >= 0)
198     {
199         iochan_setflags(i, EVENT_INPUT);
200         cl->state = Client_Initializing;
201     }
202     else
203         cl->state = Client_Error;
204     odr_reset(global_parameters.odr_out);
205 }
206
207 // Recursively traverse query structure to extract terms.
208 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
209 {
210     char **words;
211     int numwords;
212     int i;
213
214     switch (n->kind)
215     {
216         case CCL_RPN_AND:
217         case CCL_RPN_OR:
218         case CCL_RPN_NOT:
219         case CCL_RPN_PROX:
220             pull_terms(nmem, n->u.p[0], termlist, num);
221             pull_terms(nmem, n->u.p[1], termlist, num);
222             break;
223         case CCL_RPN_TERM:
224             nmem_strsplit(nmem, " ", n->u.t.term, &words, &numwords);
225             for (i = 0; i < numwords; i++)
226                 termlist[(*num)++] = words[i];
227             break;
228         default: // NOOP
229             break;
230     }
231 }
232
233 // Extract terms from query into null-terminated termlist
234 static void extract_terms(NMEM nmem, struct ccl_rpn_node *query, char **termlist)
235 {
236     int num = 0;
237
238     pull_terms(nmem, query, termlist, &num);
239     termlist[num] = 0;
240 }
241
242 static void send_search(IOCHAN i)
243 {
244     struct connection *co = iochan_getdata(i);
245     struct client *cl = co->client; 
246     struct session *se = cl->session;
247     struct session_database *sdb = cl->database;
248     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
249     int ndb;
250     char **databaselist;
251     Z_Query *zquery;
252     int ssub = 0, lslb = 100000, mspn = 10;
253     char *recsyn = 0;
254     char *piggyback = 0;
255     char *queryenc = 0;
256     yaz_iconv_t iconv = 0;
257
258     yaz_log(YLOG_DEBUG, "Sending search to %s", cl->database->database->url);
259
260     // constructing RPN query
261     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
262             sizeof(Z_Query));
263     zquery->which = Z_Query_type_1;
264     zquery->u.type_1 = p_query_rpn(global_parameters.odr_out, cl->pquery);
265
266     // converting to target encoding
267     if ((queryenc = session_setting_oneval(sdb, PZ_QUERYENCODING))){
268         iconv = yaz_iconv_open(queryenc, "UTF-8");
269         if (iconv){
270             yaz_query_charset_convert_rpnquery(zquery->u.type_1, 
271                                                global_parameters.odr_out, 
272                                                iconv);
273             yaz_iconv_close(iconv);
274         } else
275             yaz_log(YLOG_WARN, "Query encoding failed %s %s", 
276                     cl->database->database->url, queryenc);
277     }
278
279     for (ndb = 0; sdb->database->databases[ndb]; ndb++)
280         ;
281     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
282     for (ndb = 0; sdb->database->databases[ndb]; ndb++)
283         databaselist[ndb] = sdb->database->databases[ndb];
284
285     if (!(piggyback = session_setting_oneval(sdb, PZ_PIGGYBACK)) || *piggyback == '1')
286     {
287         if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
288         {
289 #if YAZ_VERSIONL >= 0x020163
290             a->u.searchRequest->preferredRecordSyntax =
291                 yaz_string_to_oid_odr(yaz_oid_std(),
292                                       CLASS_RECSYN, recsyn,
293                                       global_parameters.odr_out);
294 #else
295             a->u.searchRequest->preferredRecordSyntax =
296                 yaz_str_to_z3950oid(global_parameters.odr_out,
297                                     CLASS_RECSYN, recsyn);
298 #endif
299         }
300         a->u.searchRequest->smallSetUpperBound = &ssub;
301         a->u.searchRequest->largeSetLowerBound = &lslb;
302         a->u.searchRequest->mediumSetPresentNumber = &mspn;
303     }
304     a->u.searchRequest->resultSetName = "Default";
305     a->u.searchRequest->databaseNames = databaselist;
306     a->u.searchRequest->num_databaseNames = ndb;
307
308     
309     {  //scope for sending and logging queries 
310         WRBUF wbquery = wrbuf_alloc();
311         yaz_query_to_wrbuf(wbquery, zquery);
312
313
314         if (send_apdu(cl, a) >= 0)
315             {
316                 iochan_setflags(i, EVENT_INPUT);
317                 cl->state = Client_Searching;
318                 cl->requestid = se->requestid;
319                 yaz_log(YLOG_LOG, "SearchRequest %s %s %s", 
320                          cl->database->database->url,
321                         queryenc ? queryenc : "UTF-8",
322                         wrbuf_cstr(wbquery));
323             }
324         else {
325             cl->state = Client_Error;
326                 yaz_log(YLOG_WARN, "Failed SearchRequest %s  %s %s", 
327                          cl->database->database->url, 
328                         queryenc ? queryenc : "UTF-8",
329                         wrbuf_cstr(wbquery));
330         }
331         
332         wrbuf_destroy(wbquery);
333     }    
334
335     odr_reset(global_parameters.odr_out);
336 }
337
338 static void send_present(IOCHAN i)
339 {
340     struct connection *co = iochan_getdata(i);
341     struct client *cl = co->client; 
342     struct session_database *sdb = cl->database;
343     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
344     int toget;
345     int start = cl->records + 1;
346     char *recsyn;
347
348     toget = global_parameters.chunk;
349     if (toget > global_parameters.toget - cl->records)
350         toget = global_parameters.toget - cl->records;
351     if (toget > cl->hits - cl->records)
352         toget = cl->hits - cl->records;
353
354     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
355
356     a->u.presentRequest->resultSetStartPoint = &start;
357     a->u.presentRequest->numberOfRecordsRequested = &toget;
358
359     a->u.presentRequest->resultSetId = "Default";
360
361     if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
362     {
363 #if YAZ_VERSIONL >= 0x020163
364         a->u.presentRequest->preferredRecordSyntax =
365             yaz_string_to_oid_odr(yaz_oid_std(),
366                                   CLASS_RECSYN, recsyn,
367                                   global_parameters.odr_out);
368 #else
369         a->u.presentRequest->preferredRecordSyntax =
370             yaz_str_to_z3950oid(global_parameters.odr_out,
371                                 CLASS_RECSYN, recsyn);
372 #endif
373     }
374
375     if (send_apdu(cl, a) >= 0)
376     {
377         iochan_setflags(i, EVENT_INPUT);
378         cl->state = Client_Presenting;
379     }
380     else
381         cl->state = Client_Error;
382     odr_reset(global_parameters.odr_out);
383 }
384
385 static void do_initResponse(IOCHAN i, Z_APDU *a)
386 {
387     struct connection *co = iochan_getdata(i);
388     struct client *cl = co->client;
389     Z_InitResponse *r = a->u.initResponse;
390
391     yaz_log(YLOG_DEBUG, "Init response %s", cl->database->database->url);
392
393     if (*r->result)
394     {
395         cl->state = Client_Idle;
396     }
397     else
398         cl->state = Client_Failed; // FIXME need to do something to the connection
399 }
400
401 static void do_searchResponse(IOCHAN i, Z_APDU *a)
402 {
403     struct connection *co = iochan_getdata(i);
404     struct client *cl = co->client;
405     struct session *se = cl->session;
406     Z_SearchResponse *r = a->u.searchResponse;
407
408     yaz_log(YLOG_DEBUG, "Search response %s (status=%d)", 
409             cl->database->database->url, *r->searchStatus);
410
411     if (*r->searchStatus)
412     {
413         cl->hits = *r->resultCount;
414         se->total_hits += cl->hits;
415         if (r->presentStatus && !*r->presentStatus && r->records)
416         {
417             yaz_log(YLOG_DEBUG, "Records in search response %s", 
418                     cl->database->database->url);
419             ingest_records(cl, r->records);
420         }
421         cl->state = Client_Idle;
422     }
423     else
424     {          /*"FAILED"*/
425         cl->hits = 0;
426         cl->state = Client_Error;
427         if (r->records) {
428             Z_Records *recs = r->records;
429             if (recs->which == Z_Records_NSD)
430             {
431                 yaz_log(YLOG_WARN, 
432                         "Search response: Non-surrogate diagnostic %s",
433                         cl->database->database->url);
434                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
435                 cl->state = Client_Error;
436             }
437         }
438     }
439 }
440
441 static void do_closeResponse(IOCHAN i, Z_APDU *a)
442 {
443     struct connection *co = iochan_getdata(i);
444     struct client *cl = co->client;
445     /* Z_Close *r = a->u.close; */
446
447     yaz_log(YLOG_WARN, "Close response %s", cl->database->database->url);
448
449     cl->state = Client_Failed;
450     connection_destroy(co);
451 }
452
453
454 char *normalize_mergekey(char *buf, int skiparticle)
455 {
456     char *p = buf, *pout = buf;
457
458     if (skiparticle)
459     {
460         char firstword[64];
461         char articles[] = "the den der die des an a "; // must end in space
462
463         while (*p && !isalnum(*p))
464             p++;
465         pout = firstword;
466         while (*p && *p != ' ' && pout - firstword < 62)
467             *(pout++) = tolower(*(p++));
468         *(pout++) = ' ';
469         *(pout++) = '\0';
470         if (!strstr(articles, firstword))
471             p = buf;
472         pout = buf;
473     }
474
475     while (*p)
476     {
477         while (*p && !isalnum(*p))
478             p++;
479         while (isalnum(*p))
480             *(pout++) = tolower(*(p++));
481         if (*p)
482             *(pout++) = ' ';
483         while (*p && !isalnum(*p))
484             p++;
485     }
486     if (buf != pout)
487         do {
488             *(pout--) = '\0';
489         }
490         while (pout > buf && *pout == ' ');
491
492     return buf;
493 }
494
495 static void add_facet(struct session *s, const char *type, const char *value)
496 {
497     int i;
498
499     if (!*value)
500         return;
501     for (i = 0; i < s->num_termlists; i++)
502         if (!strcmp(s->termlists[i].name, type))
503             break;
504     if (i == s->num_termlists)
505     {
506         if (i == SESSION_MAX_TERMLISTS)
507         {
508             yaz_log(YLOG_FATAL, "Too many termlists");
509             exit(1);
510         }
511         s->termlists[i].name = nmem_strdup(s->nmem, type);
512         s->termlists[i].termlist = termlist_create(s->nmem, s->expected_maxrecs, 15);
513         s->num_termlists = i + 1;
514     }
515     termlist_insert(s->termlists[i].termlist, value);
516 }
517
518 static xmlDoc *normalize_record(struct client *cl, Z_External *rec)
519 {
520     struct database_retrievalmap *m;
521     struct session_database *sdb = cl->database;
522     struct database *db = sdb->database;
523     xmlNode *res;
524     xmlDoc *rdoc;
525
526     // First normalize to XML
527     if (sdb->yaz_marc)
528     {
529         char *buf;
530         int len;
531         if (rec->which != Z_External_octet)
532         {
533             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER %s",
534                     db->url);
535             return 0;
536         }
537         buf = (char*) rec->u.octet_aligned->buf;
538         len = rec->u.octet_aligned->len;
539         if (yaz_marc_read_iso2709(sdb->yaz_marc, buf, len) < 0)
540         {
541             yaz_log(YLOG_WARN, "Failed to decode MARC %s", db->url);
542             return 0;
543         }
544
545         yaz_marc_write_using_libxml2(sdb->yaz_marc, 1);
546         if (yaz_marc_write_xml(sdb->yaz_marc, &res,
547                     "http://www.loc.gov/MARC21/slim", 0, 0) < 0)
548         {
549             yaz_log(YLOG_WARN, "Failed to encode as XML %s",
550                     db->url);
551             return 0;
552         }
553         rdoc = xmlNewDoc((xmlChar *) "1.0");
554         xmlDocSetRootElement(rdoc, res);
555
556     }
557     else
558     {
559         yaz_log(YLOG_FATAL, 
560                 "Unknown native_syntax in normalize_record from %s",
561                 db->url);
562         exit(1);
563     }
564
565     if (global_parameters.dump_records){
566         fprintf(stderr, 
567                 "Input Record (normalized) from %s\n----------------\n",
568                 db->url);
569 #if LIBXML_VERSION >= 20600
570         xmlDocFormatDump(stderr, rdoc, 1);
571 #else
572         xmlDocDump(stderr, rdoc);
573 #endif
574     }
575
576     for (m = db->map; m; m = m->next){
577         xmlDoc *new = 0;
578
579 #if 1
580         {
581             xmlNodePtr root = 0;
582             new = xsltApplyStylesheet(m->stylesheet, rdoc, 0);
583             root= xmlDocGetRootElement(new);
584         if (!new || !root || !(root->children))
585         {
586             yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
587                     cl->database->database->url);
588             xmlFreeDoc(new);
589             xmlFreeDoc(rdoc);
590             return 0;
591         }
592         }
593 #endif
594
595 #if 0
596         // do it another way to detect transformation errors right now
597         // but does not seem to work either!
598         {
599             xsltTransformContextPtr ctxt;
600             ctxt = xsltNewTransformContext(m->stylesheet, rdoc);
601             new = xsltApplyStylesheetUser(m->stylesheet, rdoc, 0, 0, 0, ctxt);
602             if ((ctxt->state == XSLT_STATE_ERROR) ||
603                 (ctxt->state == XSLT_STATE_STOPPED)){
604                 yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
605                         cl->database->database->url);
606                 xmlFreeDoc(new);
607                 xmlFreeDoc(rdoc);
608                 return 0;
609             }
610         }
611 #endif      
612    
613         xmlFreeDoc(rdoc);
614         rdoc = new;
615     }
616     if (global_parameters.dump_records)
617     {
618         fprintf(stderr, "Record from %s\n----------------\n", 
619                 cl->database->database->url);
620 #if LIBXML_VERSION >= 20600
621         xmlDocFormatDump(stderr, rdoc, 1);
622 #else
623         xmlDocDump(stderr, rdoc);
624 #endif
625     }
626     return rdoc;
627 }
628
629 // Extract what appears to be years from buf, storing highest and
630 // lowest values.
631 static int extract_years(const char *buf, int *first, int *last)
632 {
633     *first = -1;
634     *last = -1;
635     while (*buf)
636     {
637         const char *e;
638         int len;
639
640         while (*buf && !isdigit(*buf))
641             buf++;
642         len = 0;
643         for (e = buf; *e && isdigit(*e); e++)
644             len++;
645         if (len == 4)
646         {
647             int value = atoi(buf);
648             if (*first < 0 || value < *first)
649                 *first = value;
650             if (*last < 0 || value > *last)
651                 *last = value;
652         }
653         buf = e;
654     }
655     return *first;
656 }
657
658 static struct record *ingest_record(struct client *cl, Z_External *rec)
659 {
660     xmlDoc *xdoc = normalize_record(cl, rec);
661     xmlNode *root, *n;
662     struct record *res;
663     struct record_cluster *cluster;
664     struct session *se = cl->session;
665     xmlChar *mergekey, *mergekey_norm;
666     xmlChar *type = 0;
667     xmlChar *value = 0;
668     struct conf_service *service = global_parameters.server->service;
669
670     if (!xdoc)
671         return 0;
672
673     root = xmlDocGetRootElement(xdoc);
674     if (!(mergekey = xmlGetProp(root, (xmlChar *) "mergekey")))
675     {
676         yaz_log(YLOG_WARN, "No mergekey found in record");
677         xmlFreeDoc(xdoc);
678         return 0;
679     }
680
681     res = nmem_malloc(se->nmem, sizeof(struct record));
682     res->next = 0;
683     res->client = cl;
684     res->metadata = nmem_malloc(se->nmem,
685             sizeof(struct record_metadata*) * service->num_metadata);
686     memset(res->metadata, 0, sizeof(struct record_metadata*) * service->num_metadata);
687
688     mergekey_norm = (xmlChar *) nmem_strdup(se->nmem, (char*) mergekey);
689     xmlFree(mergekey);
690     normalize_mergekey((char *) mergekey_norm, 0);
691
692     cluster = reclist_insert(se->reclist, 
693                              global_parameters.server->service, 
694                              res, (char *) mergekey_norm, 
695                              &se->total_merged);
696     if (global_parameters.dump_records)
697         yaz_log(YLOG_LOG, "Cluster id %d from %s (#%d)", cluster->recid,
698                 cl->database->database->url, cl->records);
699     if (!cluster)
700     {
701         /* no room for record */
702         xmlFreeDoc(xdoc);
703         return 0;
704     }
705     relevance_newrec(se->relevance, cluster);
706
707     for (n = root->children; n; n = n->next)
708     {
709         if (type)
710             xmlFree(type);
711         if (value)
712             xmlFree(value);
713         type = value = 0;
714
715         if (n->type != XML_ELEMENT_NODE)
716             continue;
717         if (!strcmp((const char *) n->name, "metadata"))
718         {
719             struct conf_metadata *md = 0;
720             struct conf_sortkey *sk = 0;
721             struct record_metadata **wheretoput, *newm;
722             int imeta;
723             int first, last;
724
725             type = xmlGetProp(n, (xmlChar *) "type");
726             value = xmlNodeListGetString(xdoc, n->children, 0);
727
728             if (!type || !value)
729                 continue;
730
731             // First, find out what field we're looking at
732             for (imeta = 0; imeta < service->num_metadata; imeta++)
733                 if (!strcmp((const char *) type, service->metadata[imeta].name))
734                 {
735                     md = &service->metadata[imeta];
736                     if (md->sortkey_offset >= 0)
737                         sk = &service->sortkeys[md->sortkey_offset];
738                     break;
739                 }
740             if (!md)
741             {
742                 yaz_log(YLOG_WARN, "Ignoring unknown metadata element: %s", type);
743                 continue;
744             }
745
746             // Find out where we are putting it
747             if (md->merge == Metadata_merge_no)
748                 wheretoput = &res->metadata[imeta];
749             else
750                 wheretoput = &cluster->metadata[imeta];
751             
752             // Put it there
753             newm = nmem_malloc(se->nmem, sizeof(struct record_metadata));
754             newm->next = 0;
755             if (md->type == Metadata_type_generic)
756             {
757                 char *p, *pe;
758                 for (p = (char *) value; *p && isspace(*p); p++)
759                     ;
760                 for (pe = p + strlen(p) - 1;
761                         pe > p && strchr(" ,/.:([", *pe); pe--)
762                     *pe = '\0';
763                 newm->data.text = nmem_strdup(se->nmem, p);
764
765             }
766             else if (md->type == Metadata_type_year)
767             {
768                 if (extract_years((char *) value, &first, &last) < 0)
769                     continue;
770             }
771             else
772             {
773                 yaz_log(YLOG_WARN, "Unknown type in metadata element %s", type);
774                 continue;
775             }
776             if (md->type == Metadata_type_year && md->merge != Metadata_merge_range)
777             {
778                 yaz_log(YLOG_WARN, "Only range merging supported for years");
779                 continue;
780             }
781             if (md->merge == Metadata_merge_unique)
782             {
783                 struct record_metadata *mnode;
784                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
785                     if (!strcmp((const char *) mnode->data.text, newm->data.text))
786                         break;
787                 if (!mnode)
788                 {
789                     newm->next = *wheretoput;
790                     *wheretoput = newm;
791                 }
792             }
793             else if (md->merge == Metadata_merge_longest)
794             {
795                 if (!*wheretoput ||
796                         strlen(newm->data.text) > strlen((*wheretoput)->data.text))
797                 {
798                     *wheretoput = newm;
799                     if (sk)
800                     {
801                         char *s = nmem_strdup(se->nmem, newm->data.text);
802                         if (!cluster->sortkeys[md->sortkey_offset])
803                             cluster->sortkeys[md->sortkey_offset] = 
804                                 nmem_malloc(se->nmem, sizeof(union data_types));
805                         normalize_mergekey(s,
806                                 (sk->type == Metadata_sortkey_skiparticle));
807                         cluster->sortkeys[md->sortkey_offset]->text = s;
808                     }
809                 }
810             }
811             else if (md->merge == Metadata_merge_all || md->merge == Metadata_merge_no)
812             {
813                 newm->next = *wheretoput;
814                 *wheretoput = newm;
815             }
816             else if (md->merge == Metadata_merge_range)
817             {
818                 assert(md->type == Metadata_type_year);
819                 if (!*wheretoput)
820                 {
821                     *wheretoput = newm;
822                     (*wheretoput)->data.number.min = first;
823                     (*wheretoput)->data.number.max = last;
824                     if (sk)
825                         cluster->sortkeys[md->sortkey_offset] = &newm->data;
826                 }
827                 else
828                 {
829                     if (first < (*wheretoput)->data.number.min)
830                         (*wheretoput)->data.number.min = first;
831                     if (last > (*wheretoput)->data.number.max)
832                         (*wheretoput)->data.number.max = last;
833                 }
834 #ifdef GAGA
835                 if (sk)
836                 {
837                     union data_types *sdata = cluster->sortkeys[md->sortkey_offset];
838                     yaz_log(YLOG_LOG, "SK range: %d-%d", sdata->number.min, sdata->number.max);
839                 }
840 #endif
841             }
842             else
843                 yaz_log(YLOG_WARN, "Don't know how to merge on element name %s", md->name);
844
845             if (md->rank)
846                 relevance_countwords(se->relevance, cluster, 
847                                      (char *) value, md->rank);
848             if (md->termlist)
849             {
850                 if (md->type == Metadata_type_year)
851                 {
852                     char year[64];
853                     sprintf(year, "%d", last);
854                     add_facet(se, (char *) type, year);
855                     if (first != last)
856                     {
857                         sprintf(year, "%d", first);
858                         add_facet(se, (char *) type, year);
859                     }
860                 }
861                 else
862                     add_facet(se, (char *) type, (char *) value);
863             }
864             xmlFree(type);
865             xmlFree(value);
866             type = value = 0;
867         }
868         else
869             yaz_log(YLOG_WARN, "Unexpected element %s in internal record", n->name);
870     }
871     if (type)
872         xmlFree(type);
873     if (value)
874         xmlFree(value);
875
876     xmlFreeDoc(xdoc);
877
878     relevance_donerecord(se->relevance, cluster);
879     se->total_records++;
880
881     return res;
882 }
883
884 // Retrieve first defined value for 'name' for given database.
885 // Will be extended to take into account user associated with session
886 char *session_setting_oneval(struct session_database *db, int offset)
887 {
888     if (!db->settings[offset])
889         return "";
890     return db->settings[offset]->value;
891 }
892
893 static void ingest_records(struct client *cl, Z_Records *r)
894 {
895 #if USE_TIMING
896     yaz_timing_t t = yaz_timing_create();
897 #endif
898     struct record *rec;
899     struct session *s = cl->session;
900     Z_NamePlusRecordList *rlist;
901     int i;
902
903     if (r->which != Z_Records_DBOSD)
904         return;
905     rlist = r->u.databaseOrSurDiagnostics;
906     for (i = 0; i < rlist->num_records; i++)
907     {
908         Z_NamePlusRecord *npr = rlist->records[i];
909
910         cl->records++;
911         if (npr->which != Z_NamePlusRecord_databaseRecord)
912         {
913             yaz_log(YLOG_WARN, 
914                     "Unexpected record type, probably diagnostic %s",
915                     cl->database->database->url);
916             continue;
917         }
918
919         rec = ingest_record(cl, npr->u.databaseRecord);
920         if (!rec)
921             continue;
922     }
923     if (s->watchlist[SESSION_WATCH_RECORDS].fun && rlist->num_records)
924         session_alert_watch(s, SESSION_WATCH_RECORDS);
925
926 #if USE_TIMING
927     yaz_timing_stop(t);
928     yaz_log(YLOG_LOG, "ingest_records %6.5f %3.2f %3.2f", 
929             yaz_timing_get_real(t), yaz_timing_get_user(t),
930             yaz_timing_get_sys(t));
931     yaz_timing_destroy(&t);
932 #endif
933 }
934
935 static void do_presentResponse(IOCHAN i, Z_APDU *a)
936 {
937     struct connection *co = iochan_getdata(i);
938     struct client *cl = co->client;
939     Z_PresentResponse *r = a->u.presentResponse;
940
941     if (r->records) {
942         Z_Records *recs = r->records;
943         if (recs->which == Z_Records_NSD)
944         {
945             yaz_log(YLOG_WARN, "Non-surrogate diagnostic %s",
946                     cl->database->database->url);
947             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
948             cl->state = Client_Error;
949         }
950     }
951
952     if (!*r->presentStatus && cl->state != Client_Error)
953     {
954         yaz_log(YLOG_DEBUG, "Good Present response %s",
955                 cl->database->database->url);
956         ingest_records(cl, r->records);
957         cl->state = Client_Idle;
958     }
959     else if (*r->presentStatus) 
960     {
961         yaz_log(YLOG_WARN, "Bad Present response %s",
962                 cl->database->database->url);
963         cl->state = Client_Error;
964     }
965 }
966
967 static void handler(IOCHAN i, int event)
968 {
969     struct connection *co = iochan_getdata(i);
970     struct client *cl = co->client;
971     struct session *se = 0;
972
973     if (cl)
974         se = cl->session;
975     else
976     {
977         yaz_log(YLOG_WARN, "Destroying orphan connection");
978         connection_destroy(co);
979         return;
980     }
981
982     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
983     {
984         int errcode;
985         socklen_t errlen = sizeof(errcode);
986
987         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
988             &errlen) < 0 || errcode != 0)
989         {
990             client_fatal(cl);
991             return;
992         }
993         else
994         {
995             yaz_log(YLOG_DEBUG, "Connect OK");
996             co->state = Conn_Open;
997             if (cl)
998                 cl->state = Client_Connected;
999         }
1000     }
1001
1002     else if (event & EVENT_INPUT)
1003     {
1004         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
1005
1006         if (len < 0)
1007         {
1008             yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from %s", 
1009                     cl->database->database->url);
1010             connection_destroy(co);
1011             return;
1012         }
1013         else if (len == 0)
1014         {
1015             yaz_log(YLOG_WARN, "EOF reading from %s", cl->database->database->url);
1016             connection_destroy(co);
1017             return;
1018         }
1019         else if (len > 1) // We discard input if we have no connection
1020         {
1021             co->state = Conn_Open;
1022
1023             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
1024             {
1025                 Z_APDU *a;
1026
1027                 odr_reset(global_parameters.odr_in);
1028                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
1029                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
1030                 {
1031                     client_fatal(cl);
1032                     return;
1033                 }
1034                 switch (a->which)
1035                 {
1036                     case Z_APDU_initResponse:
1037                         do_initResponse(i, a);
1038                         break;
1039                     case Z_APDU_searchResponse:
1040                         do_searchResponse(i, a);
1041                         break;
1042                     case Z_APDU_presentResponse:
1043                         do_presentResponse(i, a);
1044                         break;
1045                     case Z_APDU_close:
1046                         do_closeResponse(i, a);
1047                         break;
1048                     default:
1049                         yaz_log(YLOG_WARN, 
1050                                 "Unexpected Z39.50 response from %s",  
1051                                 cl->database->database->url);
1052                         client_fatal(cl);
1053                         return;
1054                 }
1055                 // We aren't expecting staggered output from target
1056                 // if (cs_more(t->link))
1057                 //    iochan_setevent(i, EVENT_INPUT);
1058             }
1059             else  // we throw away response and go to idle mode
1060             {
1061                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
1062                 cl->state = Client_Idle;
1063             }
1064         }
1065         /* if len==1 we do nothing but wait for more input */
1066     }
1067
1068     if (cl->state == Client_Connected) {
1069         send_init(i);
1070     }
1071
1072     if (cl->state == Client_Idle)
1073     {
1074         if (cl->requestid != se->requestid && cl->pquery) {
1075             send_search(i);
1076         }
1077         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
1078             cl->records < cl->hits) {
1079             send_present(i);
1080         }
1081     }
1082 }
1083
1084 // Disassociate connection from client
1085 static void connection_release(struct connection *co)
1086 {
1087     struct client *cl = co->client;
1088
1089     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
1090     if (!cl)
1091         return;
1092     cl->connection = 0;
1093     co->client = 0;
1094 }
1095
1096 // Close connection and recycle structure
1097 static void connection_destroy(struct connection *co)
1098 {
1099     struct host *h = co->host;
1100     cs_close(co->link);
1101     iochan_destroy(co->iochan);
1102
1103     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
1104     if (h->connections == co)
1105         h->connections = co->next;
1106     else
1107     {
1108         struct connection *pco;
1109         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
1110             ;
1111         if (pco)
1112             pco->next = co->next;
1113         else
1114             abort();
1115     }
1116     if (co->client)
1117     {
1118         if (co->client->state != Client_Idle)
1119             co->client->state = Client_Disconnected;
1120         co->client->connection = 0;
1121     }
1122     co->next = connection_freelist;
1123     connection_freelist = co;
1124 }
1125
1126 // Creates a new connection for client, associated with the host of 
1127 // client's database
1128 static struct connection *connection_create(struct client *cl)
1129 {
1130     struct connection *new;
1131     COMSTACK link; 
1132     int res;
1133     void *addr;
1134
1135
1136     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
1137         {
1138             yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
1139             exit(1);
1140         }
1141     
1142     if (0 == strlen(global_parameters.zproxy_override)){
1143         /* no Z39.50 proxy needed - direct connect */
1144         yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->database->url);
1145         
1146         if (!(addr = cs_straddr(link, cl->database->database->host->ipport)))
1147             {
1148                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1149                         "Lookup of IP address %s failed", 
1150                         cl->database->database->host->ipport);
1151                 return 0;
1152             }
1153     
1154     } else {
1155         /* Z39.50 proxy connect */
1156         yaz_log(YLOG_DEBUG, "Connection create %s proxy %s", 
1157                 cl->database->database->url, global_parameters.zproxy_override);
1158
1159         if (!(addr = cs_straddr(link, global_parameters.zproxy_override)))
1160             {
1161                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1162                         "Lookup of IP address %s failed", 
1163                         global_parameters.zproxy_override);
1164                 return 0;
1165             }
1166     }
1167
1168     res = cs_connect(link, addr);
1169     if (res < 0)
1170     {
1171         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->database->url);
1172         return 0;
1173     }
1174
1175     if ((new = connection_freelist))
1176         connection_freelist = new->next;
1177     else
1178     {
1179         new = xmalloc(sizeof (struct connection));
1180         new->ibuf = 0;
1181         new->ibufsize = 0;
1182     }
1183     new->state = Conn_Connecting;
1184     new->host = cl->database->database->host;
1185     new->next = new->host->connections;
1186     new->host->connections = new;
1187     new->client = cl;
1188     cl->connection = new;
1189     new->link = link;
1190
1191     new->iochan = iochan_create(cs_fileno(link), handler, 0);
1192     iochan_setdata(new->iochan, new);
1193     pazpar2_add_channel(new->iochan);
1194     return new;
1195 }
1196
1197 // Close connection and set state to error
1198 static void client_fatal(struct client *cl)
1199 {
1200     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->database->url);
1201     connection_destroy(cl->connection);
1202     cl->state = Client_Error;
1203 }
1204
1205 // Ensure that client has a connection associated
1206 static int client_prep_connection(struct client *cl)
1207 {
1208     struct connection *co;
1209     struct session *se = cl->session;
1210     struct host *host = cl->database->database->host;
1211
1212     co = cl->connection;
1213
1214     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->database->url);
1215
1216     if (!co)
1217     {
1218         // See if someone else has an idle connection
1219         // We should look at timestamps here to select the longest-idle connection
1220         for (co = host->connections; co; co = co->next)
1221             if (co->state == Conn_Open && (!co->client || co->client->session != se))
1222                 break;
1223         if (co)
1224         {
1225             connection_release(co);
1226             cl->connection = co;
1227             co->client = cl;
1228         }
1229         else
1230             co = connection_create(cl);
1231     }
1232     if (co)
1233     {
1234         if (co->state == Conn_Connecting)
1235         {
1236             cl->state = Client_Connecting;
1237             iochan_setflag(co->iochan, EVENT_OUTPUT);
1238         }
1239         else if (co->state == Conn_Open)
1240         {
1241             if (cl->state == Client_Error || cl->state == Client_Disconnected)
1242                 cl->state = Client_Idle;
1243             iochan_setflag(co->iochan, EVENT_OUTPUT);
1244         }
1245         return 1;
1246     }
1247     else
1248         return 0;
1249 }
1250
1251 // Initialize YAZ Map structures for MARC-based targets
1252 static int prepare_yazmarc(struct session_database *sdb)
1253 {
1254     struct setting *s;
1255
1256     if (!sdb->settings)
1257     {
1258         yaz_log(YLOG_WARN, "No settings for %s", sdb->database->url);
1259         return -1;
1260     }
1261     if ((s = sdb->settings[PZ_NATIVESYNTAX]) && !strncmp(s->value, "iso2709", 7))
1262     {
1263         char *encoding = "marc-8s", *e;
1264         yaz_iconv_t cm;
1265
1266         // See if a native encoding is specified
1267         if ((e = strchr(s->value, ';')))
1268             encoding = e + 1;
1269
1270         sdb->yaz_marc = yaz_marc_create();
1271         yaz_marc_subfield_str(sdb->yaz_marc, "\t");
1272         
1273         cm = yaz_iconv_open("utf-8", encoding);
1274         if (!cm)
1275         {
1276             yaz_log(YLOG_FATAL, 
1277                     "Unable to map from %s to UTF-8 for target %s", 
1278                     encoding, sdb->database->url);
1279             return -1;
1280         }
1281         yaz_marc_iconv(sdb->yaz_marc, cm);
1282     }
1283     return 0;
1284 }
1285
1286 // This analyzes settings and recomputes any supporting data structures
1287 // if necessary.
1288 static int prepare_session_database(struct session_database *sdb)
1289 {
1290     if (!sdb->settings)
1291     {
1292         yaz_log(YLOG_WARN, "No settings associates with %s", sdb->database->url);
1293         return -1;
1294     }
1295     if (sdb->settings[PZ_NATIVESYNTAX] && !sdb->yaz_marc)
1296     {
1297         if (prepare_yazmarc(sdb) < 0)
1298             return -1;
1299     }
1300     if (sdb->settings[PZ_XSLT] && !sdb->map)
1301     {
1302     }
1303     return 0;
1304 }
1305
1306 // Initialize CCL map for a target
1307 static CCL_bibset prepare_cclmap(struct client *cl)
1308 {
1309     struct session_database *sdb = cl->database;
1310     struct setting *s;
1311     CCL_bibset res;
1312
1313     if (!sdb->settings)
1314         return 0;
1315     res = ccl_qual_mk();
1316     for (s = sdb->settings[PZ_CCLMAP]; s; s = s->next)
1317     {
1318         char *p = strchr(s->name + 3, ':');
1319         if (!p)
1320         {
1321             yaz_log(YLOG_WARN, "Malformed cclmap name: %s", s->name);
1322             ccl_qual_rm(&res);
1323             return 0;
1324         }
1325         p++;
1326         ccl_qual_fitem(res, s->value, p);
1327     }
1328     return res;
1329 }
1330
1331 // Parse the query given the settings specific to this client
1332 static int client_parse_query(struct client *cl, const char *query)
1333 {
1334     struct session *se = cl->session;
1335     struct ccl_rpn_node *cn;
1336     int cerror, cpos;
1337     CCL_bibset ccl_map = prepare_cclmap(cl);
1338
1339     if (!ccl_map)
1340         return -1;
1341     cn = ccl_find_str(ccl_map, query, &cerror, &cpos);
1342     ccl_qual_rm(&ccl_map);
1343     if (!cn)
1344     {
1345         cl->state = Client_Error;
1346         yaz_log(YLOG_WARN, "Failed to parse query for %s",
1347                          cl->database->database->url);
1348         return -1;
1349     }
1350     wrbuf_rewind(se->wrbuf);
1351     ccl_pquery(se->wrbuf, cn);
1352     wrbuf_putc(se->wrbuf, '\0');
1353     if (cl->pquery)
1354         xfree(cl->pquery);
1355     cl->pquery = xstrdup(wrbuf_buf(se->wrbuf));
1356
1357     if (!se->relevance)
1358     {
1359         // Initialize relevance structure with query terms
1360         char *p[512];
1361         extract_terms(se->nmem, cn, p);
1362         se->relevance = relevance_create(se->nmem, (const char **) p,
1363                 se->expected_maxrecs);
1364     }
1365
1366     ccl_rpn_delete(cn);
1367     return 0;
1368 }
1369
1370 static struct client *client_create(void)
1371 {
1372     struct client *r;
1373     if (client_freelist)
1374     {
1375         r = client_freelist;
1376         client_freelist = client_freelist->next;
1377     }
1378     else
1379         r = xmalloc(sizeof(struct client));
1380     r->pquery = 0;
1381     r->database = 0;
1382     r->connection = 0;
1383     r->session = 0;
1384     r->hits = 0;
1385     r->records = 0;
1386     r->setno = 0;
1387     r->requestid = -1;
1388     r->diagnostic = 0;
1389     r->state = Client_Disconnected;
1390     r->next = 0;
1391     return r;
1392 }
1393
1394 void client_destroy(struct client *c)
1395 {
1396     struct session *se = c->session;
1397     if (c == se->clients)
1398         se->clients = c->next;
1399     else
1400     {
1401         struct client *cc;
1402         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
1403             ;
1404         if (cc)
1405             cc->next = c->next;
1406     }
1407     if (c->connection)
1408         connection_release(c->connection);
1409     c->next = client_freelist;
1410     client_freelist = c;
1411 }
1412
1413 void session_set_watch(struct session *s, int what, session_watchfun fun, void *data)
1414 {
1415     s->watchlist[what].fun = fun;
1416     s->watchlist[what].data = data;
1417 }
1418
1419 void session_alert_watch(struct session *s, int what)
1420 {
1421     if (!s->watchlist[what].fun)
1422         return;
1423     (*s->watchlist[what].fun)(s->watchlist[what].data);
1424     s->watchlist[what].fun = 0;
1425     s->watchlist[what].data = 0;
1426 }
1427
1428 //callback for grep_databases
1429 static void select_targets_callback(void *context, struct session_database *db)
1430 {
1431     struct session *se = (struct session*) context;
1432     struct client *cl = client_create();
1433     cl->database = db;
1434     cl->session = se;
1435     cl->next = se->clients;
1436     se->clients = cl;
1437 }
1438
1439 // Associates a set of clients with a session;
1440 // Note: Session-databases represent databases with per-session setting overrides
1441 int select_targets(struct session *se, struct database_criterion *crit)
1442 {
1443     while (se->clients)
1444         client_destroy(se->clients);
1445
1446     return session_grep_databases(se, crit, select_targets_callback);
1447 }
1448
1449 int session_active_clients(struct session *s)
1450 {
1451     struct client *c;
1452     int res = 0;
1453
1454     for (c = s->clients; c; c = c->next)
1455         if (c->connection && (c->state == Client_Connecting ||
1456                     c->state == Client_Initializing ||
1457                     c->state == Client_Searching ||
1458                     c->state == Client_Presenting))
1459             res++;
1460
1461     return res;
1462 }
1463
1464 // parses crit1=val1,crit2=val2|val3,...
1465 static struct database_criterion *parse_filter(NMEM m, const char *buf)
1466 {
1467     struct database_criterion *res = 0;
1468     char **values;
1469     int num;
1470     int i;
1471
1472     if (!buf || !*buf)
1473         return 0;
1474     nmem_strsplit(m, ",", buf,  &values, &num);
1475     for (i = 0; i < num; i++)
1476     {
1477         char **subvalues;
1478         int subnum;
1479         int subi;
1480         struct database_criterion *new = nmem_malloc(m, sizeof(*new));
1481         char *eq = strchr(values[i], '=');
1482         if (!eq)
1483         {
1484             yaz_log(YLOG_WARN, "Missing equal-sign in filter");
1485             return 0;
1486         }
1487         *(eq++) = '\0';
1488         new->name = values[i];
1489         nmem_strsplit(m, "|", eq, &subvalues, &subnum);
1490         new->values = 0;
1491         for (subi = 0; subi < subnum; subi++)
1492         {
1493             struct database_criterion_value *newv = nmem_malloc(m, sizeof(*newv));
1494             newv->value = subvalues[subi];
1495             newv->next = new->values;
1496             new->values = newv;
1497         }
1498         new->next = res;
1499         res = new;
1500     }
1501     return res;
1502 }
1503
1504 char *search(struct session *se, char *query, char *filter)
1505 {
1506     int live_channels = 0;
1507     struct client *cl;
1508     struct database_criterion *criteria;
1509
1510     yaz_log(YLOG_DEBUG, "Search");
1511
1512     nmem_reset(se->nmem);
1513     criteria = parse_filter(se->nmem, filter);
1514     se->requestid++;
1515     live_channels = select_targets(se, criteria);
1516     if (live_channels)
1517     {
1518         int maxrecs = live_channels * global_parameters.toget;
1519         se->num_termlists = 0;
1520         se->reclist = reclist_create(se->nmem, maxrecs);
1521         // This will be initialized in send_search()
1522         se->total_records = se->total_hits = se->total_merged = 0;
1523         se->expected_maxrecs = maxrecs;
1524     }
1525     else
1526         return "NOTARGETS";
1527
1528     se->relevance = 0;
1529
1530     for (cl = se->clients; cl; cl = cl->next)
1531     {
1532         if (prepare_session_database(cl->database) < 0)
1533             return "CONFIG_ERROR";
1534         if (client_parse_query(cl, query) < 0)  // Query must parse for all targets
1535             return "QUERY";
1536     }
1537
1538     for (cl = se->clients; cl; cl = cl->next)
1539     {
1540         client_prep_connection(cl);
1541     }
1542
1543     return 0;
1544 }
1545
1546 // Apply a session override to a database
1547 void session_apply_setting(struct session *se, char *dbname, char *setting, char *value)
1548 {
1549     struct session_database *sdb;
1550
1551     for (sdb = se->databases; sdb; sdb = sdb->next)
1552         if (!strcmp(dbname, sdb->database->url))
1553         {
1554             struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
1555             int offset = settings_offset(setting);
1556
1557             if (offset < 0)
1558             {
1559                 yaz_log(YLOG_WARN, "Unknown setting %s", setting);
1560                 return;
1561             }
1562             new->precedence = 0;
1563             new->target = dbname;
1564             new->name = setting;
1565             new->value = value;
1566             new->next = sdb->settings[offset];
1567             sdb->settings[offset] = new;
1568
1569             // Force later recompute of settings-driven data structures
1570             // (happens when a search starts and client connections are prepared)
1571             switch (offset)
1572             {
1573                 case PZ_NATIVESYNTAX:
1574                     if (sdb->yaz_marc)
1575                     {
1576                         yaz_marc_destroy(sdb->yaz_marc);
1577                         sdb->yaz_marc = 0;
1578                     }
1579                     break;
1580                 case PZ_XSLT:
1581                     if (sdb->map)
1582                     {
1583                         struct database_retrievalmap *m;
1584                         // We don't worry about the map structure -- it's in nmem
1585                         for (m = sdb->map; m; m = m->next)
1586                             xsltFreeStylesheet(m->stylesheet);
1587                         sdb->map = 0;
1588                     }
1589                     break;
1590             }
1591             break;
1592         }
1593     if (!sdb)
1594         yaz_log(YLOG_WARN, "Unknown database in setting override: %s", dbname);
1595 }
1596
1597 void session_init_databases_fun(void *context, struct database *db)
1598 {
1599     struct session *se = (struct session *) context;
1600     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
1601     int num = settings_num();
1602     int i;
1603
1604     new->database = db;
1605     new->yaz_marc = 0;
1606     new->map = 0;
1607     new->settings = nmem_malloc(se->session_nmem, sizeof(struct settings *) * num);
1608     for (i = 0; i < num; i++)
1609         new->settings[i] = db->settings[i];
1610     new->next = se->databases;
1611     se->databases = new;
1612 }
1613
1614 // Initialize session_database list -- this represents this session's view
1615 // of the database list -- subject to modification by the settings ws command
1616 void session_init_databases(struct session *se)
1617 {
1618     se->databases = 0;
1619     grep_databases(se, 0, session_init_databases_fun);
1620 }
1621
1622 void destroy_session(struct session *s)
1623 {
1624     yaz_log(YLOG_LOG, "Destroying session");
1625     while (s->clients)
1626         client_destroy(s->clients);
1627     nmem_destroy(s->nmem);
1628     wrbuf_destroy(s->wrbuf);
1629 }
1630
1631 struct session *new_session(NMEM nmem) 
1632 {
1633     int i;
1634     struct session *session = nmem_malloc(nmem, sizeof(*session));
1635
1636     yaz_log(YLOG_DEBUG, "New Pazpar2 session");
1637     
1638     session->total_hits = 0;
1639     session->total_records = 0;
1640     session->num_termlists = 0;
1641     session->reclist = 0;
1642     session->requestid = -1;
1643     session->clients = 0;
1644     session->expected_maxrecs = 0;
1645     session->session_nmem = nmem;
1646     session->nmem = nmem_create();
1647     session->wrbuf = wrbuf_alloc();
1648     session_init_databases(session);
1649     for (i = 0; i <= SESSION_WATCH_MAX; i++)
1650     {
1651         session->watchlist[i].data = 0;
1652         session->watchlist[i].fun = 0;
1653     }
1654
1655     return session;
1656 }
1657
1658 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1659 {
1660     static struct hitsbytarget res[1000]; // FIXME MM
1661     struct client *cl;
1662
1663     *count = 0;
1664     for (cl = se->clients; cl; cl = cl->next)
1665     {
1666         char *name = session_setting_oneval(cl->database, PZ_NAME);
1667
1668         res[*count].id = cl->database->database->url;
1669         res[*count].name = *name ? name : "Unknown";
1670         res[*count].hits = cl->hits;
1671         res[*count].records = cl->records;
1672         res[*count].diagnostic = cl->diagnostic;
1673         res[*count].state = client_states[cl->state];
1674         res[*count].connected  = cl->connection ? 1 : 0;
1675         (*count)++;
1676     }
1677
1678     return res;
1679 }
1680
1681 struct termlist_score **termlist(struct session *s, const char *name, int *num)
1682 {
1683     int i;
1684
1685     for (i = 0; i < s->num_termlists; i++)
1686         if (!strcmp((const char *) s->termlists[i].name, name))
1687             return termlist_highscore(s->termlists[i].termlist, num);
1688     return 0;
1689 }
1690
1691 #ifdef MISSING_HEADERS
1692 void report_nmem_stats(void)
1693 {
1694     size_t in_use, is_free;
1695
1696     nmem_get_memory_in_use(&in_use);
1697     nmem_get_memory_free(&is_free);
1698
1699     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
1700             (long) in_use, (long) is_free);
1701 }
1702 #endif
1703
1704 struct record_cluster *show_single(struct session *s, int id)
1705 {
1706     struct record_cluster *r;
1707
1708     reclist_rewind(s->reclist);
1709     while ((r = reclist_read_record(s->reclist)))
1710         if (r->recid == id)
1711             return r;
1712     return 0;
1713 }
1714
1715 struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, int start,
1716         int *num, int *total, int *sumhits, NMEM nmem_show)
1717 {
1718     struct record_cluster **recs = nmem_malloc(nmem_show, *num 
1719                                        * sizeof(struct record_cluster *));
1720     struct reclist_sortparms *spp;
1721     int i;
1722 #if USE_TIMING    
1723     yaz_timing_t t = yaz_timing_create();
1724 #endif
1725
1726     for (spp = sp; spp; spp = spp->next)
1727         if (spp->type == Metadata_sortkey_relevance)
1728         {
1729             relevance_prepare_read(s->relevance, s->reclist);
1730             break;
1731         }
1732     reclist_sort(s->reclist, sp);
1733
1734     *total = s->reclist->num_records;
1735     *sumhits = s->total_hits;
1736
1737     for (i = 0; i < start; i++)
1738         if (!reclist_read_record(s->reclist))
1739         {
1740             *num = 0;
1741             recs = 0;
1742             break;
1743         }
1744
1745     for (i = 0; i < *num; i++)
1746     {
1747         struct record_cluster *r = reclist_read_record(s->reclist);
1748         if (!r)
1749         {
1750             *num = i;
1751             break;
1752         }
1753         recs[i] = r;
1754     }
1755 #if USE_TIMING
1756     yaz_timing_stop(t);
1757     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
1758             yaz_timing_get_real(t), yaz_timing_get_user(t),
1759             yaz_timing_get_sys(t));
1760     yaz_timing_destroy(&t);
1761 #endif
1762     return recs;
1763 }
1764
1765 void statistics(struct session *se, struct statistics *stat)
1766 {
1767     struct client *cl;
1768     int count = 0;
1769
1770     memset(stat, 0, sizeof(*stat));
1771     for (cl = se->clients; cl; cl = cl->next)
1772     {
1773         if (!cl->connection)
1774             stat->num_no_connection++;
1775         switch (cl->state)
1776         {
1777             case Client_Connecting: stat->num_connecting++; break;
1778             case Client_Initializing: stat->num_initializing++; break;
1779             case Client_Searching: stat->num_searching++; break;
1780             case Client_Presenting: stat->num_presenting++; break;
1781             case Client_Idle: stat->num_idle++; break;
1782             case Client_Failed: stat->num_failed++; break;
1783             case Client_Error: stat->num_error++; break;
1784             default: break;
1785         }
1786         count++;
1787     }
1788     stat->num_hits = se->total_hits;
1789     stat->num_records = se->total_records;
1790
1791     stat->num_clients = count;
1792 }
1793
1794 void start_http_listener(void)
1795 {
1796     char hp[128] = "";
1797     struct conf_server *ser = global_parameters.server;
1798
1799     if (*global_parameters.listener_override)
1800         strcpy(hp, global_parameters.listener_override);
1801     else
1802     {
1803         strcpy(hp, ser->host ? ser->host : "");
1804         if (ser->port)
1805         {
1806             if (*hp)
1807                 strcat(hp, ":");
1808             sprintf(hp + strlen(hp), "%d", ser->port);
1809         }
1810     }
1811     http_init(hp);
1812 }
1813
1814 void start_proxy(void)
1815 {
1816     char hp[128] = "";
1817     struct conf_server *ser = global_parameters.server;
1818
1819     if (*global_parameters.proxy_override)
1820         strcpy(hp, global_parameters.proxy_override);
1821     else if (ser->proxy_host || ser->proxy_port)
1822     {
1823         strcpy(hp, ser->proxy_host ? ser->proxy_host : "");
1824         if (ser->proxy_port)
1825         {
1826             if (*hp)
1827                 strcat(hp, ":");
1828             sprintf(hp + strlen(hp), "%d", ser->proxy_port);
1829         }
1830     }
1831     else
1832         return;
1833
1834     http_set_proxyaddr(hp, ser->myurl ? ser->myurl : "");
1835 }
1836
1837 void start_zproxy(void)
1838 {
1839     struct conf_server *ser = global_parameters.server;
1840
1841     if (*global_parameters.zproxy_override){
1842         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1843                 global_parameters.zproxy_override);
1844         return;
1845     }
1846
1847     else if (ser->zproxy_host || ser->zproxy_port)
1848     {
1849         char hp[128] = "";
1850
1851         strcpy(hp, ser->zproxy_host ? ser->zproxy_host : "");
1852         if (ser->zproxy_port)
1853         {
1854             if (*hp)
1855                 strcat(hp, ":");
1856             else
1857                 strcat(hp, "@:");
1858
1859             sprintf(hp + strlen(hp), "%d", ser->zproxy_port);
1860         }
1861         strcpy(global_parameters.zproxy_override, hp);
1862         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1863                 global_parameters.zproxy_override);
1864
1865     }
1866     else
1867         return;
1868 }
1869
1870 // Master list of connections we're handling events to
1871 static IOCHAN channel_list = 0; 
1872 void pazpar2_add_channel(IOCHAN chan)
1873 {
1874     chan->next = channel_list;
1875     channel_list = chan;
1876 }
1877
1878 void pazpar2_event_loop()
1879 {
1880     event_loop(&channel_list);
1881 }
1882
1883 /*
1884  * Local variables:
1885  * c-basic-offset: 4
1886  * indent-tabs-mode: nil
1887  * End:
1888  * vim: shiftwidth=4 tabstop=8 expandtab
1889  */