Prepare for YAZ 3s new OID system
[pazpar2-moved-to-github.git] / src / pazpar2.c
1 /* $Id: pazpar2.c,v 1.77 2007-04-12 13:46:28 adam Exp $
2    Copyright (c) 2006-2007, Index Data.
3
4 This file is part of Pazpar2.
5
6 Pazpar2 is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free
8 Software Foundation; either version 2, or (at your option) any later
9 version.
10
11 Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
12 WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with Pazpar2; see the file LICENSE.  If not, write to the
18 Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
19 02111-1307, USA.
20  */
21
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <string.h>
25 #include <sys/time.h>
26 #include <unistd.h>
27 #include <sys/socket.h>
28 #include <netdb.h>
29 #include <signal.h>
30 #include <ctype.h>
31 #include <assert.h>
32
33 #include <yaz/marcdisp.h>
34 #include <yaz/comstack.h>
35 #include <yaz/tcpip.h>
36 #include <yaz/proto.h>
37 #include <yaz/readconf.h>
38 #include <yaz/pquery.h>
39 #include <yaz/otherinfo.h>
40 #include <yaz/yaz-util.h>
41 #include <yaz/nmem.h>
42 #include <yaz/querytowrbuf.h>
43 #if YAZ_VERSIONL >= 0x020163
44 #include <yaz/oid_db.h>
45 #endif
46
47 #if HAVE_CONFIG_H
48 #include "cconfig.h"
49 #endif
50
51 #define USE_TIMING 0
52 #if USE_TIMING
53 #include <yaz/timing.h>
54 #endif
55
56 #include <netinet/in.h>
57
58 #include "pazpar2.h"
59 #include "eventl.h"
60 #include "http.h"
61 #include "termlists.h"
62 #include "reclists.h"
63 #include "relevance.h"
64 #include "config.h"
65 #include "database.h"
66 #include "settings.h"
67
68 #define MAX_CHUNK 15
69
70 static void client_fatal(struct client *cl);
71 static void connection_destroy(struct connection *co);
72 static int client_prep_connection(struct client *cl);
73 static void ingest_records(struct client *cl, Z_Records *r);
74 void session_alert_watch(struct session *s, int what);
75
76 IOCHAN channel_list = 0;  // Master list of connections we're handling events to
77
78 static struct connection *connection_freelist = 0;
79 static struct client *client_freelist = 0;
80
81 static char *client_states[] = {
82     "Client_Connecting",
83     "Client_Connected",
84     "Client_Idle",
85     "Client_Initializing",
86     "Client_Searching",
87     "Client_Presenting",
88     "Client_Error",
89     "Client_Failed",
90     "Client_Disconnected",
91     "Client_Stopped"
92 };
93
94 // Note: Some things in this structure will eventually move to configuration
95 struct parameters global_parameters = 
96 {
97     "",
98     "",
99     "",
100     "",
101     0,
102     0,
103     30,
104     "81",
105     "Index Data PazPar2 (MasterKey)",
106     VERSION,
107     600, // 10 minutes
108     60,
109     100,
110     MAX_CHUNK,
111     0,
112     0
113 };
114
115 static int send_apdu(struct client *c, Z_APDU *a)
116 {
117     struct connection *co = c->connection;
118     char *buf;
119     int len, r;
120
121     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
122     {
123         odr_perror(global_parameters.odr_out, "Encoding APDU");
124         abort();
125     }
126     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
127     r = cs_put(co->link, buf, len);
128     if (r < 0)
129     {
130         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
131         return -1;
132     }
133     else if (r == 1)
134     {
135         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
136         exit(1);
137     }
138     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
139     co->state = Conn_Waiting;
140     return 0;
141 }
142
143 // Set authentication token in init if one is set for the client
144 // TODO: Extend this to handle other schemes than open (should be simple)
145 static void init_authentication(struct client *cl, Z_InitRequest *req)
146 {
147     struct session_database *sdb = cl->database;
148     char *auth = session_setting_oneval(sdb, PZ_AUTHENTICATION);
149
150     if (auth)
151     {
152         Z_IdAuthentication *idAuth = odr_malloc(global_parameters.odr_out,
153                 sizeof(*idAuth));
154         idAuth->which = Z_IdAuthentication_open;
155         idAuth->u.open = auth;
156         req->idAuthentication = idAuth;
157     }
158 }
159
160 static void send_init(IOCHAN i)
161 {
162
163     struct connection *co = iochan_getdata(i);
164     struct client *cl = co->client;
165     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
166
167     a->u.initRequest->implementationId = global_parameters.implementationId;
168     a->u.initRequest->implementationName = global_parameters.implementationName;
169     a->u.initRequest->implementationVersion =
170         global_parameters.implementationVersion;
171     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
172     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
173     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
174
175     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
176     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
177     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
178
179     init_authentication(cl, a->u.initRequest);
180
181     /* add virtual host if tunneling through Z39.50 proxy */
182     
183     if (0 < strlen(global_parameters.zproxy_override) 
184         && 0 < strlen(cl->database->database->url))
185     {
186 #if YAZ_VERSIONL >= 0x020163
187         const int *oid_proxy = yaz_string_to_oid(yaz_oid_std(),
188                                                  CLASS_USERINFO, OID_STR_PROXY);
189         yaz_oi_set_string_oid(&a->u.initRequest->otherInfo,
190                               global_parameters.odr_out, oid_proxy,
191                               1, cl->database->database->url);
192 #else
193         yaz_oi_set_string_oidval(&a->u.initRequest->otherInfo,
194                                  global_parameters.odr_out, VAL_PROXY,
195                                  1, cl->database->database->url);
196 #endif
197     }
198
199     if (send_apdu(cl, a) >= 0)
200     {
201         iochan_setflags(i, EVENT_INPUT);
202         cl->state = Client_Initializing;
203     }
204     else
205         cl->state = Client_Error;
206     odr_reset(global_parameters.odr_out);
207 }
208
209 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
210 {
211     switch (n->kind)
212     {
213         case CCL_RPN_AND:
214         case CCL_RPN_OR:
215         case CCL_RPN_NOT:
216         case CCL_RPN_PROX:
217             pull_terms(nmem, n->u.p[0], termlist, num);
218             pull_terms(nmem, n->u.p[1], termlist, num);
219             break;
220         case CCL_RPN_TERM:
221             termlist[(*num)++] = nmem_strdup(nmem, n->u.t.term);
222             break;
223         default: // NOOP
224             break;
225     }
226 }
227
228 // Extract terms from query into null-terminated termlist
229 static void extract_terms(NMEM nmem, struct ccl_rpn_node *query, char **termlist)
230 {
231     int num = 0;
232
233     pull_terms(nmem, query, termlist, &num);
234     termlist[num] = 0;
235 }
236
237 static void send_search(IOCHAN i)
238 {
239     struct connection *co = iochan_getdata(i);
240     struct client *cl = co->client; 
241     struct session *se = cl->session;
242     struct session_database *sdb = cl->database;
243     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
244     int ndb, cerror, cpos;
245     char **databaselist;
246     Z_Query *zquery;
247     struct ccl_rpn_node *cn;
248     int ssub = 0, lslb = 100000, mspn = 10;
249     char *recsyn;
250     char *piggyback;
251
252     yaz_log(YLOG_DEBUG, "Sending search to %s", cl->database->database->url);
253
254     cn = ccl_find_str(sdb->database->ccl_map, se->query, &cerror, &cpos);
255     if (!cn)
256         return;
257
258     if (!se->relevance)
259     {
260         // Initialize relevance structure with query terms
261         char *p[512];
262         extract_terms(se->nmem, cn, p);
263         se->relevance = relevance_create(se->nmem, (const char **) p,
264                 se->expected_maxrecs);
265     }
266
267     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
268             sizeof(Z_Query));
269     zquery->which = Z_Query_type_1;
270     zquery->u.type_1 = ccl_rpn_query(global_parameters.odr_out, cn);
271     ccl_rpn_delete(cn);
272
273     for (ndb = 0; sdb->database->databases[ndb]; ndb++)
274         ;
275     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
276     for (ndb = 0; sdb->database->databases[ndb]; ndb++)
277         databaselist[ndb] = sdb->database->databases[ndb];
278
279     if (!(piggyback = session_setting_oneval(sdb, PZ_PIGGYBACK)) || *piggyback == '1')
280     {
281         if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
282         {
283 #if YAZ_VERSIONL >= 0x020163
284             a->u.searchRequest->preferredRecordSyntax =
285                 yaz_string_to_oid_odr(yaz_oid_std(),
286                                       CLASS_RECSYN, recsyn,
287                                       global_parameters.odr_out);
288 #else
289             a->u.searchRequest->preferredRecordSyntax =
290                 yaz_str_to_z3950oid(global_parameters.odr_out,
291                                     CLASS_RECSYN, recsyn);
292 #endif
293         }
294         a->u.searchRequest->smallSetUpperBound = &ssub;
295         a->u.searchRequest->largeSetLowerBound = &lslb;
296         a->u.searchRequest->mediumSetPresentNumber = &mspn;
297     }
298     a->u.searchRequest->resultSetName = "Default";
299     a->u.searchRequest->databaseNames = databaselist;
300     a->u.searchRequest->num_databaseNames = ndb;
301
302     
303     {  //scope for sending and logging queries 
304         WRBUF wbquery = wrbuf_alloc();
305         yaz_query_to_wrbuf(wbquery, zquery);
306
307         if (send_apdu(cl, a) >= 0)
308             {
309                 iochan_setflags(i, EVENT_INPUT);
310                 cl->state = Client_Searching;
311                 cl->requestid = se->requestid;
312                 yaz_log(YLOG_LOG, "SearchRequest %s  %s", 
313                          cl->database->database->url, wrbuf_cstr(wbquery));
314             }
315         else {
316             cl->state = Client_Error;
317                 yaz_log(YLOG_WARN, "Failed SearchRequest %s  %s", 
318                          cl->database->database->url, wrbuf_cstr(wbquery));
319         }
320         
321         wrbuf_destroy(wbquery);
322     }    
323
324     odr_reset(global_parameters.odr_out);
325 }
326
327 static void send_present(IOCHAN i)
328 {
329     struct connection *co = iochan_getdata(i);
330     struct client *cl = co->client; 
331     struct session_database *sdb = cl->database;
332     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
333     int toget;
334     int start = cl->records + 1;
335     char *recsyn;
336
337     toget = global_parameters.chunk;
338     if (toget > global_parameters.toget - cl->records)
339         toget = global_parameters.toget - cl->records;
340     if (toget > cl->hits - cl->records)
341         toget = cl->hits - cl->records;
342
343     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
344
345     a->u.presentRequest->resultSetStartPoint = &start;
346     a->u.presentRequest->numberOfRecordsRequested = &toget;
347
348     a->u.presentRequest->resultSetId = "Default";
349
350     if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
351     {
352 #if YAZ_VERSIONL >= 0x020163
353         a->u.presentRequest->preferredRecordSyntax =
354             yaz_string_to_oid_odr(yaz_oid_std(),
355                                   CLASS_RECSYN, recsyn,
356                                   global_parameters.odr_out);
357 #else
358         a->u.presentRequest->preferredRecordSyntax =
359             yaz_str_to_z3950oid(global_parameters.odr_out,
360                                 CLASS_RECSYN, recsyn);
361 #endif
362     }
363
364     if (send_apdu(cl, a) >= 0)
365     {
366         iochan_setflags(i, EVENT_INPUT);
367         cl->state = Client_Presenting;
368     }
369     else
370         cl->state = Client_Error;
371     odr_reset(global_parameters.odr_out);
372 }
373
374 static void do_initResponse(IOCHAN i, Z_APDU *a)
375 {
376     struct connection *co = iochan_getdata(i);
377     struct client *cl = co->client;
378     Z_InitResponse *r = a->u.initResponse;
379
380     yaz_log(YLOG_DEBUG, "Init response %s", cl->database->database->url);
381
382     if (*r->result)
383     {
384         cl->state = Client_Idle;
385     }
386     else
387         cl->state = Client_Failed; // FIXME need to do something to the connection
388 }
389
390 static void do_searchResponse(IOCHAN i, Z_APDU *a)
391 {
392     struct connection *co = iochan_getdata(i);
393     struct client *cl = co->client;
394     struct session *se = cl->session;
395     Z_SearchResponse *r = a->u.searchResponse;
396
397     yaz_log(YLOG_DEBUG, "Search response %s (status=%d)", 
398             cl->database->database->url, *r->searchStatus);
399
400     if (*r->searchStatus)
401     {
402         cl->hits = *r->resultCount;
403         se->total_hits += cl->hits;
404         if (r->presentStatus && !*r->presentStatus && r->records)
405         {
406             yaz_log(YLOG_DEBUG, "Records in search response %s", 
407                     cl->database->database->url);
408             ingest_records(cl, r->records);
409         }
410         cl->state = Client_Idle;
411     }
412     else
413     {          /*"FAILED"*/
414         cl->hits = 0;
415         cl->state = Client_Error;
416         if (r->records) {
417             Z_Records *recs = r->records;
418             if (recs->which == Z_Records_NSD)
419             {
420                 yaz_log(YLOG_WARN, 
421                         "Search response: Non-surrogate diagnostic %s",
422                         cl->database->database->url);
423                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
424                 cl->state = Client_Error;
425             }
426         }
427     }
428 }
429
430 static void do_closeResponse(IOCHAN i, Z_APDU *a)
431 {
432     struct connection *co = iochan_getdata(i);
433     struct client *cl = co->client;
434     /* Z_Close *r = a->u.close; */
435
436     yaz_log(YLOG_WARN, "Close response %s", cl->database->database->url);
437
438     cl->state = Client_Failed;
439     connection_destroy(co);
440 }
441
442
443 char *normalize_mergekey(char *buf, int skiparticle)
444 {
445     char *p = buf, *pout = buf;
446
447     if (skiparticle)
448     {
449         char firstword[64];
450         char articles[] = "the den der die des an a "; // must end in space
451
452         while (*p && !isalnum(*p))
453             p++;
454         pout = firstword;
455         while (*p && *p != ' ' && pout - firstword < 62)
456             *(pout++) = tolower(*(p++));
457         *(pout++) = ' ';
458         *(pout++) = '\0';
459         if (!strstr(articles, firstword))
460             p = buf;
461         pout = buf;
462     }
463
464     while (*p)
465     {
466         while (*p && !isalnum(*p))
467             p++;
468         while (isalnum(*p))
469             *(pout++) = tolower(*(p++));
470         if (*p)
471             *(pout++) = ' ';
472         while (*p && !isalnum(*p))
473             p++;
474     }
475     if (buf != pout)
476         do {
477             *(pout--) = '\0';
478         }
479         while (pout > buf && *pout == ' ');
480
481     return buf;
482 }
483
484 static void add_facet(struct session *s, const char *type, const char *value)
485 {
486     int i;
487
488     if (!*value)
489         return;
490     for (i = 0; i < s->num_termlists; i++)
491         if (!strcmp(s->termlists[i].name, type))
492             break;
493     if (i == s->num_termlists)
494     {
495         if (i == SESSION_MAX_TERMLISTS)
496         {
497             yaz_log(YLOG_FATAL, "Too many termlists");
498             exit(1);
499         }
500         s->termlists[i].name = nmem_strdup(s->nmem, type);
501         s->termlists[i].termlist = termlist_create(s->nmem, s->expected_maxrecs, 15);
502         s->num_termlists = i + 1;
503     }
504     termlist_insert(s->termlists[i].termlist, value);
505 }
506
507 static xmlDoc *normalize_record(struct client *cl, Z_External *rec)
508 {
509     struct database_retrievalmap *m;
510     struct database *db = cl->database->database;
511     xmlNode *res;
512     xmlDoc *rdoc;
513
514     // First normalize to XML
515     if (db->yaz_marc)
516     {
517         char *buf;
518         int len;
519         if (rec->which != Z_External_octet)
520         {
521             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER %s",
522                     cl->database->database->url);
523             return 0;
524         }
525         buf = (char*) rec->u.octet_aligned->buf;
526         len = rec->u.octet_aligned->len;
527         if (yaz_marc_read_iso2709(db->yaz_marc, buf, len) < 0)
528         {
529             yaz_log(YLOG_WARN, "Failed to decode MARC %s",
530                     cl->database->database->url);
531             return 0;
532         }
533
534         yaz_marc_write_using_libxml2(db->yaz_marc, 1);
535         if (yaz_marc_write_xml(db->yaz_marc, &res,
536                     "http://www.loc.gov/MARC21/slim", 0, 0) < 0)
537         {
538             yaz_log(YLOG_WARN, "Failed to encode as XML %s",
539                     cl->database->database->url);
540             return 0;
541         }
542         rdoc = xmlNewDoc((xmlChar *) "1.0");
543         xmlDocSetRootElement(rdoc, res);
544
545     }
546     else
547     {
548         yaz_log(YLOG_FATAL, 
549                 "Unknown native_syntax in normalize_record from %s",
550                 cl->database->database->url);
551         exit(1);
552     }
553
554     if (global_parameters.dump_records){
555         fprintf(stderr, 
556                 "Input Record (normalized) from %s\n----------------\n",
557                 cl->database->database->url);
558 #if LIBXML_VERSION >= 20600
559         xmlDocFormatDump(stderr, rdoc, 1);
560 #else
561         xmlDocDump(stderr, rdoc);
562 #endif
563     }
564
565     for (m = db->map; m; m = m->next){
566         xmlDoc *new = 0;
567
568 #if 1
569         {
570             xmlNodePtr root = 0;
571             new = xsltApplyStylesheet(m->stylesheet, rdoc, 0);
572             root= xmlDocGetRootElement(new);
573         if (!new || !root || !(root->children))
574         {
575             yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
576                     cl->database->database->url);
577             xmlFreeDoc(new);
578             xmlFreeDoc(rdoc);
579             return 0;
580         }
581         }
582 #endif
583
584 #if 0
585         // do it another way to detect transformation errors right now
586         // but does not seem to work either!
587         {
588             xsltTransformContextPtr ctxt;
589             ctxt = xsltNewTransformContext(m->stylesheet, rdoc);
590             new = xsltApplyStylesheetUser(m->stylesheet, rdoc, 0, 0, 0, ctxt);
591             if ((ctxt->state == XSLT_STATE_ERROR) ||
592                 (ctxt->state == XSLT_STATE_STOPPED)){
593                 yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
594                         cl->database->database->url);
595                 xmlFreeDoc(new);
596                 xmlFreeDoc(rdoc);
597                 return 0;
598             }
599         }
600 #endif      
601    
602         xmlFreeDoc(rdoc);
603         rdoc = new;
604     }
605     if (global_parameters.dump_records)
606     {
607         fprintf(stderr, "Record from %s\n----------------\n", 
608                 cl->database->database->url);
609 #if LIBXML_VERSION >= 20600
610         xmlDocFormatDump(stderr, rdoc, 1);
611 #else
612         xmlDocDump(stderr, rdoc);
613 #endif
614     }
615     return rdoc;
616 }
617
618 // Extract what appears to be years from buf, storing highest and
619 // lowest values.
620 static int extract_years(const char *buf, int *first, int *last)
621 {
622     *first = -1;
623     *last = -1;
624     while (*buf)
625     {
626         const char *e;
627         int len;
628
629         while (*buf && !isdigit(*buf))
630             buf++;
631         len = 0;
632         for (e = buf; *e && isdigit(*e); e++)
633             len++;
634         if (len == 4)
635         {
636             int value = atoi(buf);
637             if (*first < 0 || value < *first)
638                 *first = value;
639             if (*last < 0 || value > *last)
640                 *last = value;
641         }
642         buf = e;
643     }
644     return *first;
645 }
646
647 static struct record *ingest_record(struct client *cl, Z_External *rec)
648 {
649     xmlDoc *xdoc = normalize_record(cl, rec);
650     xmlNode *root, *n;
651     struct record *res;
652     struct record_cluster *cluster;
653     struct session *se = cl->session;
654     xmlChar *mergekey, *mergekey_norm;
655     xmlChar *type = 0;
656     xmlChar *value = 0;
657     struct conf_service *service = global_parameters.server->service;
658
659     if (!xdoc)
660         return 0;
661
662     root = xmlDocGetRootElement(xdoc);
663     if (!(mergekey = xmlGetProp(root, (xmlChar *) "mergekey")))
664     {
665         yaz_log(YLOG_WARN, "No mergekey found in record");
666         xmlFreeDoc(xdoc);
667         return 0;
668     }
669
670     res = nmem_malloc(se->nmem, sizeof(struct record));
671     res->next = 0;
672     res->client = cl;
673     res->metadata = nmem_malloc(se->nmem,
674             sizeof(struct record_metadata*) * service->num_metadata);
675     memset(res->metadata, 0, sizeof(struct record_metadata*) * service->num_metadata);
676
677     mergekey_norm = (xmlChar *) nmem_strdup(se->nmem, (char*) mergekey);
678     xmlFree(mergekey);
679     normalize_mergekey((char *) mergekey_norm, 0);
680
681     cluster = reclist_insert(se->reclist, res, (char *) mergekey_norm, 
682                              &se->total_merged);
683     if (global_parameters.dump_records)
684         yaz_log(YLOG_LOG, "Cluster id %d from %s (#%d)", cluster->recid,
685                 cl->database->database->url, cl->records);
686     if (!cluster)
687     {
688         /* no room for record */
689         xmlFreeDoc(xdoc);
690         return 0;
691     }
692     relevance_newrec(se->relevance, cluster);
693
694     for (n = root->children; n; n = n->next)
695     {
696         if (type)
697             xmlFree(type);
698         if (value)
699             xmlFree(value);
700         type = value = 0;
701
702         if (n->type != XML_ELEMENT_NODE)
703             continue;
704         if (!strcmp((const char *) n->name, "metadata"))
705         {
706             struct conf_metadata *md = 0;
707             struct conf_sortkey *sk = 0;
708             struct record_metadata **wheretoput, *newm;
709             int imeta;
710             int first, last;
711
712             type = xmlGetProp(n, (xmlChar *) "type");
713             value = xmlNodeListGetString(xdoc, n->children, 0);
714
715             if (!type || !value)
716                 continue;
717
718             // First, find out what field we're looking at
719             for (imeta = 0; imeta < service->num_metadata; imeta++)
720                 if (!strcmp((const char *) type, service->metadata[imeta].name))
721                 {
722                     md = &service->metadata[imeta];
723                     if (md->sortkey_offset >= 0)
724                         sk = &service->sortkeys[md->sortkey_offset];
725                     break;
726                 }
727             if (!md)
728             {
729                 yaz_log(YLOG_WARN, "Ignoring unknown metadata element: %s", type);
730                 continue;
731             }
732
733             // Find out where we are putting it
734             if (md->merge == Metadata_merge_no)
735                 wheretoput = &res->metadata[imeta];
736             else
737                 wheretoput = &cluster->metadata[imeta];
738             
739             // Put it there
740             newm = nmem_malloc(se->nmem, sizeof(struct record_metadata));
741             newm->next = 0;
742             if (md->type == Metadata_type_generic)
743             {
744                 char *p, *pe;
745                 for (p = (char *) value; *p && isspace(*p); p++)
746                     ;
747                 for (pe = p + strlen(p) - 1;
748                         pe > p && strchr(" ,/.:([", *pe); pe--)
749                     *pe = '\0';
750                 newm->data.text = nmem_strdup(se->nmem, p);
751
752             }
753             else if (md->type == Metadata_type_year)
754             {
755                 if (extract_years((char *) value, &first, &last) < 0)
756                     continue;
757             }
758             else
759             {
760                 yaz_log(YLOG_WARN, "Unknown type in metadata element %s", type);
761                 continue;
762             }
763             if (md->type == Metadata_type_year && md->merge != Metadata_merge_range)
764             {
765                 yaz_log(YLOG_WARN, "Only range merging supported for years");
766                 continue;
767             }
768             if (md->merge == Metadata_merge_unique)
769             {
770                 struct record_metadata *mnode;
771                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
772                     if (!strcmp((const char *) mnode->data.text, newm->data.text))
773                         break;
774                 if (!mnode)
775                 {
776                     newm->next = *wheretoput;
777                     *wheretoput = newm;
778                 }
779             }
780             else if (md->merge == Metadata_merge_longest)
781             {
782                 if (!*wheretoput ||
783                         strlen(newm->data.text) > strlen((*wheretoput)->data.text))
784                 {
785                     *wheretoput = newm;
786                     if (sk)
787                     {
788                         char *s = nmem_strdup(se->nmem, newm->data.text);
789                         if (!cluster->sortkeys[md->sortkey_offset])
790                             cluster->sortkeys[md->sortkey_offset] = 
791                                 nmem_malloc(se->nmem, sizeof(union data_types));
792                         normalize_mergekey(s,
793                                 (sk->type == Metadata_sortkey_skiparticle));
794                         cluster->sortkeys[md->sortkey_offset]->text = s;
795                     }
796                 }
797             }
798             else if (md->merge == Metadata_merge_all || md->merge == Metadata_merge_no)
799             {
800                 newm->next = *wheretoput;
801                 *wheretoput = newm;
802             }
803             else if (md->merge == Metadata_merge_range)
804             {
805                 assert(md->type == Metadata_type_year);
806                 if (!*wheretoput)
807                 {
808                     *wheretoput = newm;
809                     (*wheretoput)->data.number.min = first;
810                     (*wheretoput)->data.number.max = last;
811                     if (sk)
812                         cluster->sortkeys[md->sortkey_offset] = &newm->data;
813                 }
814                 else
815                 {
816                     if (first < (*wheretoput)->data.number.min)
817                         (*wheretoput)->data.number.min = first;
818                     if (last > (*wheretoput)->data.number.max)
819                         (*wheretoput)->data.number.max = last;
820                 }
821 #ifdef GAGA
822                 if (sk)
823                 {
824                     union data_types *sdata = cluster->sortkeys[md->sortkey_offset];
825                     yaz_log(YLOG_LOG, "SK range: %d-%d", sdata->number.min, sdata->number.max);
826                 }
827 #endif
828             }
829             else
830                 yaz_log(YLOG_WARN, "Don't know how to merge on element name %s", md->name);
831
832             if (md->rank)
833                 relevance_countwords(se->relevance, cluster, 
834                                      (char *) value, md->rank);
835             if (md->termlist)
836             {
837                 if (md->type == Metadata_type_year)
838                 {
839                     char year[64];
840                     sprintf(year, "%d", last);
841                     add_facet(se, (char *) type, year);
842                     if (first != last)
843                     {
844                         sprintf(year, "%d", first);
845                         add_facet(se, (char *) type, year);
846                     }
847                 }
848                 else
849                     add_facet(se, (char *) type, (char *) value);
850             }
851             xmlFree(type);
852             xmlFree(value);
853             type = value = 0;
854         }
855         else
856             yaz_log(YLOG_WARN, "Unexpected element %s in internal record", n->name);
857     }
858     if (type)
859         xmlFree(type);
860     if (value)
861         xmlFree(value);
862
863     xmlFreeDoc(xdoc);
864
865     relevance_donerecord(se->relevance, cluster);
866     se->total_records++;
867
868     return res;
869 }
870
871 // Retrieve first defined value for 'name' for given database.
872 // Will be extended to take into account user associated with session
873 char *session_setting_oneval(struct session_database *db, int offset)
874 {
875     if (!db->settings[offset])
876         return "";
877     return db->settings[offset]->value;
878 }
879
880 static void ingest_records(struct client *cl, Z_Records *r)
881 {
882 #if USE_TIMING
883     yaz_timing_t t = yaz_timing_create();
884 #endif
885     struct record *rec;
886     struct session *s = cl->session;
887     Z_NamePlusRecordList *rlist;
888     int i;
889
890     if (r->which != Z_Records_DBOSD)
891         return;
892     rlist = r->u.databaseOrSurDiagnostics;
893     for (i = 0; i < rlist->num_records; i++)
894     {
895         Z_NamePlusRecord *npr = rlist->records[i];
896
897         cl->records++;
898         if (npr->which != Z_NamePlusRecord_databaseRecord)
899         {
900             yaz_log(YLOG_WARN, 
901                     "Unexpected record type, probably diagnostic %s",
902                     cl->database->database->url);
903             continue;
904         }
905
906         rec = ingest_record(cl, npr->u.databaseRecord);
907         if (!rec)
908             continue;
909     }
910     if (s->watchlist[SESSION_WATCH_RECORDS].fun && rlist->num_records)
911         session_alert_watch(s, SESSION_WATCH_RECORDS);
912
913 #if USE_TIMING
914     yaz_timing_stop(t);
915     yaz_log(YLOG_LOG, "ingest_records %6.5f %3.2f %3.2f", 
916             yaz_timing_get_real(t), yaz_timing_get_user(t),
917             yaz_timing_get_sys(t));
918     yaz_timing_destroy(&t);
919 #endif
920 }
921
922 static void do_presentResponse(IOCHAN i, Z_APDU *a)
923 {
924     struct connection *co = iochan_getdata(i);
925     struct client *cl = co->client;
926     Z_PresentResponse *r = a->u.presentResponse;
927
928     if (r->records) {
929         Z_Records *recs = r->records;
930         if (recs->which == Z_Records_NSD)
931         {
932             yaz_log(YLOG_WARN, "Non-surrogate diagnostic %s",
933                     cl->database->database->url);
934             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
935             cl->state = Client_Error;
936         }
937     }
938
939     if (!*r->presentStatus && cl->state != Client_Error)
940     {
941         yaz_log(YLOG_DEBUG, "Good Present response %s",
942                 cl->database->database->url);
943         ingest_records(cl, r->records);
944         cl->state = Client_Idle;
945     }
946     else if (*r->presentStatus) 
947     {
948         yaz_log(YLOG_WARN, "Bad Present response %s",
949                 cl->database->database->url);
950         cl->state = Client_Error;
951     }
952 }
953
954 static void handler(IOCHAN i, int event)
955 {
956     struct connection *co = iochan_getdata(i);
957     struct client *cl = co->client;
958     struct session *se = 0;
959
960     if (cl)
961         se = cl->session;
962     else
963     {
964         yaz_log(YLOG_WARN, "Destroying orphan connection");
965         connection_destroy(co);
966         return;
967     }
968
969     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
970     {
971         int errcode;
972         socklen_t errlen = sizeof(errcode);
973
974         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
975             &errlen) < 0 || errcode != 0)
976         {
977             client_fatal(cl);
978             return;
979         }
980         else
981         {
982             yaz_log(YLOG_DEBUG, "Connect OK");
983             co->state = Conn_Open;
984             if (cl)
985                 cl->state = Client_Connected;
986         }
987     }
988
989     else if (event & EVENT_INPUT)
990     {
991         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
992
993         if (len < 0)
994         {
995             yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from %s", 
996                     cl->database->database->url);
997             connection_destroy(co);
998             return;
999         }
1000         else if (len == 0)
1001         {
1002             yaz_log(YLOG_WARN, "EOF reading from %s", cl->database->database->url);
1003             connection_destroy(co);
1004             return;
1005         }
1006         else if (len > 1) // We discard input if we have no connection
1007         {
1008             co->state = Conn_Open;
1009
1010             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
1011             {
1012                 Z_APDU *a;
1013
1014                 odr_reset(global_parameters.odr_in);
1015                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
1016                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
1017                 {
1018                     client_fatal(cl);
1019                     return;
1020                 }
1021                 switch (a->which)
1022                 {
1023                     case Z_APDU_initResponse:
1024                         do_initResponse(i, a);
1025                         break;
1026                     case Z_APDU_searchResponse:
1027                         do_searchResponse(i, a);
1028                         break;
1029                     case Z_APDU_presentResponse:
1030                         do_presentResponse(i, a);
1031                         break;
1032                     case Z_APDU_close:
1033                         do_closeResponse(i, a);
1034                         break;
1035                     default:
1036                         yaz_log(YLOG_WARN, 
1037                                 "Unexpected Z39.50 response from %s",  
1038                                 cl->database->database->url);
1039                         client_fatal(cl);
1040                         return;
1041                 }
1042                 // We aren't expecting staggered output from target
1043                 // if (cs_more(t->link))
1044                 //    iochan_setevent(i, EVENT_INPUT);
1045             }
1046             else  // we throw away response and go to idle mode
1047             {
1048                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
1049                 cl->state = Client_Idle;
1050             }
1051         }
1052         /* if len==1 we do nothing but wait for more input */
1053     }
1054
1055     if (cl->state == Client_Connected) {
1056         send_init(i);
1057     }
1058
1059     if (cl->state == Client_Idle)
1060     {
1061         if (cl->requestid != se->requestid && *se->query) {
1062             send_search(i);
1063         }
1064         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
1065             cl->records < cl->hits) {
1066             send_present(i);
1067         }
1068     }
1069 }
1070
1071 // Disassociate connection from client
1072 static void connection_release(struct connection *co)
1073 {
1074     struct client *cl = co->client;
1075
1076     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
1077     if (!cl)
1078         return;
1079     cl->connection = 0;
1080     co->client = 0;
1081 }
1082
1083 // Close connection and recycle structure
1084 static void connection_destroy(struct connection *co)
1085 {
1086     struct host *h = co->host;
1087     cs_close(co->link);
1088     iochan_destroy(co->iochan);
1089
1090     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
1091     if (h->connections == co)
1092         h->connections = co->next;
1093     else
1094     {
1095         struct connection *pco;
1096         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
1097             ;
1098         if (pco)
1099             pco->next = co->next;
1100         else
1101             abort();
1102     }
1103     if (co->client)
1104     {
1105         if (co->client->state != Client_Idle)
1106             co->client->state = Client_Disconnected;
1107         co->client->connection = 0;
1108     }
1109     co->next = connection_freelist;
1110     connection_freelist = co;
1111 }
1112
1113 // Creates a new connection for client, associated with the host of 
1114 // client's database
1115 static struct connection *connection_create(struct client *cl)
1116 {
1117     struct connection *new;
1118     COMSTACK link; 
1119     int res;
1120     void *addr;
1121
1122
1123     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
1124         {
1125             yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
1126             exit(1);
1127         }
1128     
1129     if (0 == strlen(global_parameters.zproxy_override)){
1130         /* no Z39.50 proxy needed - direct connect */
1131         yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->database->url);
1132         
1133         if (!(addr = cs_straddr(link, cl->database->database->host->ipport)))
1134             {
1135                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1136                         "Lookup of IP address %s failed", 
1137                         cl->database->database->host->ipport);
1138                 return 0;
1139             }
1140     
1141     } else {
1142         /* Z39.50 proxy connect */
1143         yaz_log(YLOG_DEBUG, "Connection create %s proxy %s", 
1144                 cl->database->database->url, global_parameters.zproxy_override);
1145
1146         if (!(addr = cs_straddr(link, global_parameters.zproxy_override)))
1147             {
1148                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1149                         "Lookup of IP address %s failed", 
1150                         global_parameters.zproxy_override);
1151                 return 0;
1152             }
1153     }
1154
1155     res = cs_connect(link, addr);
1156     if (res < 0)
1157     {
1158         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->database->url);
1159         return 0;
1160     }
1161
1162     if ((new = connection_freelist))
1163         connection_freelist = new->next;
1164     else
1165     {
1166         new = xmalloc(sizeof (struct connection));
1167         new->ibuf = 0;
1168         new->ibufsize = 0;
1169     }
1170     new->state = Conn_Connecting;
1171     new->host = cl->database->database->host;
1172     new->next = new->host->connections;
1173     new->host->connections = new;
1174     new->client = cl;
1175     cl->connection = new;
1176     new->link = link;
1177
1178     new->iochan = iochan_create(cs_fileno(link), handler, 0);
1179     iochan_setdata(new->iochan, new);
1180     new->iochan->next = channel_list;
1181     channel_list = new->iochan;
1182     return new;
1183 }
1184
1185 // Close connection and set state to error
1186 static void client_fatal(struct client *cl)
1187 {
1188     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->database->url);
1189     connection_destroy(cl->connection);
1190     cl->state = Client_Error;
1191 }
1192
1193 // Ensure that client has a connection associated
1194 static int client_prep_connection(struct client *cl)
1195 {
1196     struct connection *co;
1197     struct session *se = cl->session;
1198     struct host *host = cl->database->database->host;
1199
1200     co = cl->connection;
1201
1202     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->database->url);
1203
1204     if (!co)
1205     {
1206         // See if someone else has an idle connection
1207         // We should look at timestamps here to select the longest-idle connection
1208         for (co = host->connections; co; co = co->next)
1209             if (co->state == Conn_Open && (!co->client || co->client->session != se))
1210                 break;
1211         if (co)
1212         {
1213             connection_release(co);
1214             cl->connection = co;
1215             co->client = cl;
1216         }
1217         else
1218             co = connection_create(cl);
1219     }
1220     if (co)
1221     {
1222         if (co->state == Conn_Connecting)
1223         {
1224             cl->state = Client_Connecting;
1225             iochan_setflag(co->iochan, EVENT_OUTPUT);
1226         }
1227         else if (co->state == Conn_Open)
1228         {
1229             if (cl->state == Client_Error || cl->state == Client_Disconnected)
1230                 cl->state = Client_Idle;
1231             iochan_setflag(co->iochan, EVENT_OUTPUT);
1232         }
1233         return 1;
1234     }
1235     else
1236         return 0;
1237 }
1238
1239 static struct client *client_create(void)
1240 {
1241     struct client *r;
1242     if (client_freelist)
1243     {
1244         r = client_freelist;
1245         client_freelist = client_freelist->next;
1246     }
1247     else
1248         r = xmalloc(sizeof(struct client));
1249     r->database = 0;
1250     r->connection = 0;
1251     r->session = 0;
1252     r->hits = 0;
1253     r->records = 0;
1254     r->setno = 0;
1255     r->requestid = -1;
1256     r->diagnostic = 0;
1257     r->state = Client_Disconnected;
1258     r->next = 0;
1259     return r;
1260 }
1261
1262 void client_destroy(struct client *c)
1263 {
1264     struct session *se = c->session;
1265     if (c == se->clients)
1266         se->clients = c->next;
1267     else
1268     {
1269         struct client *cc;
1270         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
1271             ;
1272         if (cc)
1273             cc->next = c->next;
1274     }
1275     if (c->connection)
1276         connection_release(c->connection);
1277     c->next = client_freelist;
1278     client_freelist = c;
1279 }
1280
1281 void session_set_watch(struct session *s, int what, session_watchfun fun, void *data)
1282 {
1283     s->watchlist[what].fun = fun;
1284     s->watchlist[what].data = data;
1285 }
1286
1287 void session_alert_watch(struct session *s, int what)
1288 {
1289     if (!s->watchlist[what].fun)
1290         return;
1291     (*s->watchlist[what].fun)(s->watchlist[what].data);
1292     s->watchlist[what].fun = 0;
1293     s->watchlist[what].data = 0;
1294 }
1295
1296 //callback for grep_databases
1297 static void select_targets_callback(void *context, struct session_database *db)
1298 {
1299     struct session *se = (struct session*) context;
1300     struct client *cl = client_create();
1301     cl->database = db;
1302     cl->session = se;
1303     cl->next = se->clients;
1304     se->clients = cl;
1305 }
1306
1307 // Associates a set of clients with a session;
1308 // Note: Session-databases represent databases with per-session setting overrides
1309 int select_targets(struct session *se, struct database_criterion *crit)
1310 {
1311     while (se->clients)
1312         client_destroy(se->clients);
1313
1314     return session_grep_databases(se, crit, select_targets_callback);
1315 }
1316
1317 int session_active_clients(struct session *s)
1318 {
1319     struct client *c;
1320     int res = 0;
1321
1322     for (c = s->clients; c; c = c->next)
1323         if (c->connection && (c->state == Client_Connecting ||
1324                     c->state == Client_Initializing ||
1325                     c->state == Client_Searching ||
1326                     c->state == Client_Presenting))
1327             res++;
1328
1329     return res;
1330 }
1331
1332 // parses crit1=val1,crit2=val2|val3,...
1333 static struct database_criterion *parse_filter(NMEM m, const char *buf)
1334 {
1335     struct database_criterion *res = 0;
1336     char **values;
1337     int num;
1338     int i;
1339
1340     if (!buf || !*buf)
1341         return 0;
1342     nmem_strsplit(m, ",", buf,  &values, &num);
1343     for (i = 0; i < num; i++)
1344     {
1345         char **subvalues;
1346         int subnum;
1347         int subi;
1348         struct database_criterion *new = nmem_malloc(m, sizeof(*new));
1349         char *eq = strchr(values[i], '=');
1350         if (!eq)
1351         {
1352             yaz_log(YLOG_WARN, "Missing equal-sign in filter");
1353             return 0;
1354         }
1355         *(eq++) = '\0';
1356         new->name = values[i];
1357         nmem_strsplit(m, "|", eq, &subvalues, &subnum);
1358         new->values = 0;
1359         for (subi = 0; subi < subnum; subi++)
1360         {
1361             struct database_criterion_value *newv = nmem_malloc(m, sizeof(*newv));
1362             newv->value = subvalues[subi];
1363             newv->next = new->values;
1364             new->values = newv;
1365         }
1366         new->next = res;
1367         res = new;
1368     }
1369     return res;
1370 }
1371
1372 char *search(struct session *se, char *query, char *filter)
1373 {
1374     int live_channels = 0;
1375     struct client *cl;
1376     struct database_criterion *criteria;
1377
1378     yaz_log(YLOG_DEBUG, "Search");
1379
1380     nmem_reset(se->nmem);
1381     criteria = parse_filter(se->nmem, filter);
1382     strcpy(se->query, query);
1383     se->requestid++;
1384     select_targets(se, criteria);
1385     for (cl = se->clients; cl; cl = cl->next)
1386     {
1387         if (client_prep_connection(cl))
1388             live_channels++;
1389     }
1390     if (live_channels)
1391     {
1392         int maxrecs = live_channels * global_parameters.toget;
1393         se->num_termlists = 0;
1394         se->reclist = reclist_create(se->nmem, maxrecs);
1395         // This will be initialized in send_search()
1396         se->relevance = 0;
1397         se->total_records = se->total_hits = se->total_merged = 0;
1398         se->expected_maxrecs = maxrecs;
1399     }
1400     else
1401         return "NOTARGETS";
1402
1403     return 0;
1404 }
1405
1406 // Apply a session override to a database
1407 void session_apply_setting(struct session *se, char *dbname, char *setting, char *value)
1408 {
1409     struct session_database *sdb;
1410
1411     for (sdb = se->databases; sdb; sdb = sdb->next)
1412         if (!strcmp(dbname, sdb->database->url))
1413         {
1414             struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
1415             int offset = settings_offset(setting);
1416
1417             if (offset < 0)
1418             {
1419                 yaz_log(YLOG_WARN, "Unknown setting %s", setting);
1420                 return;
1421             }
1422             new->precedence = 0;
1423             new->target = dbname;
1424             new->name = setting;
1425             new->value = value;
1426             new->next = sdb->settings[offset];
1427             sdb->settings[offset] = new;
1428             break;
1429         }
1430     if (!sdb)
1431         yaz_log(YLOG_WARN, "Unknown database in setting override: %s", dbname);
1432 }
1433
1434 void session_init_databases_fun(void *context, struct database *db)
1435 {
1436     struct session *se = (struct session *) context;
1437     struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
1438     int num = settings_num();
1439     int i;
1440
1441     new->database = db;
1442     new->settings = nmem_malloc(se->session_nmem, sizeof(struct settings *) * num);
1443     for (i = 0; i < num; i++)
1444         new->settings[i] = db->settings[i];
1445     new->next = se->databases;
1446     se->databases = new;
1447 }
1448
1449 // Initialize session_database list -- this represents this session's view
1450 // of the database list -- subject to modification by the settings ws command
1451 void session_init_databases(struct session *se)
1452 {
1453     se->databases = 0;
1454     grep_databases(se, 0, session_init_databases_fun);
1455 }
1456
1457 void destroy_session(struct session *s)
1458 {
1459     yaz_log(YLOG_LOG, "Destroying session");
1460     while (s->clients)
1461         client_destroy(s->clients);
1462     nmem_destroy(s->nmem);
1463     wrbuf_destroy(s->wrbuf);
1464 }
1465
1466 struct session *new_session(NMEM nmem) 
1467 {
1468     int i;
1469     struct session *session = nmem_malloc(nmem, sizeof(*session));
1470
1471     yaz_log(YLOG_DEBUG, "New Pazpar2 session");
1472     
1473     session->total_hits = 0;
1474     session->total_records = 0;
1475     session->num_termlists = 0;
1476     session->reclist = 0;
1477     session->requestid = -1;
1478     session->clients = 0;
1479     session->expected_maxrecs = 0;
1480     session->query[0] = '\0';
1481     session->session_nmem = nmem;
1482     session->nmem = nmem_create();
1483     session->wrbuf = wrbuf_alloc();
1484     session_init_databases(session);
1485     for (i = 0; i <= SESSION_WATCH_MAX; i++)
1486     {
1487         session->watchlist[i].data = 0;
1488         session->watchlist[i].fun = 0;
1489     }
1490
1491     return session;
1492 }
1493
1494 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1495 {
1496     static struct hitsbytarget res[1000]; // FIXME MM
1497     struct client *cl;
1498
1499     *count = 0;
1500     for (cl = se->clients; cl; cl = cl->next)
1501     {
1502         char *name = session_setting_oneval(cl->database, PZ_NAME);
1503
1504         res[*count].id = cl->database->database->url;
1505         res[*count].name = *name ? name : "Unknown";
1506         res[*count].hits = cl->hits;
1507         res[*count].records = cl->records;
1508         res[*count].diagnostic = cl->diagnostic;
1509         res[*count].state = client_states[cl->state];
1510         res[*count].connected  = cl->connection ? 1 : 0;
1511         (*count)++;
1512     }
1513
1514     return res;
1515 }
1516
1517 struct termlist_score **termlist(struct session *s, const char *name, int *num)
1518 {
1519     int i;
1520
1521     for (i = 0; i < s->num_termlists; i++)
1522         if (!strcmp((const char *) s->termlists[i].name, name))
1523             return termlist_highscore(s->termlists[i].termlist, num);
1524     return 0;
1525 }
1526
1527 #ifdef MISSING_HEADERS
1528 void report_nmem_stats(void)
1529 {
1530     size_t in_use, is_free;
1531
1532     nmem_get_memory_in_use(&in_use);
1533     nmem_get_memory_free(&is_free);
1534
1535     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
1536             (long) in_use, (long) is_free);
1537 }
1538 #endif
1539
1540 struct record_cluster *show_single(struct session *s, int id)
1541 {
1542     struct record_cluster *r;
1543
1544     reclist_rewind(s->reclist);
1545     while ((r = reclist_read_record(s->reclist)))
1546         if (r->recid == id)
1547             return r;
1548     return 0;
1549 }
1550
1551 struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, int start,
1552         int *num, int *total, int *sumhits, NMEM nmem_show)
1553 {
1554     struct record_cluster **recs = nmem_malloc(nmem_show, *num 
1555                                        * sizeof(struct record_cluster *));
1556     struct reclist_sortparms *spp;
1557     int i;
1558 #if USE_TIMING    
1559     yaz_timing_t t = yaz_timing_create();
1560 #endif
1561
1562     for (spp = sp; spp; spp = spp->next)
1563         if (spp->type == Metadata_sortkey_relevance)
1564         {
1565             relevance_prepare_read(s->relevance, s->reclist);
1566             break;
1567         }
1568     reclist_sort(s->reclist, sp);
1569
1570     *total = s->reclist->num_records;
1571     *sumhits = s->total_hits;
1572
1573     for (i = 0; i < start; i++)
1574         if (!reclist_read_record(s->reclist))
1575         {
1576             *num = 0;
1577             recs = 0;
1578             break;
1579         }
1580
1581     for (i = 0; i < *num; i++)
1582     {
1583         struct record_cluster *r = reclist_read_record(s->reclist);
1584         if (!r)
1585         {
1586             *num = i;
1587             break;
1588         }
1589         recs[i] = r;
1590     }
1591 #if USE_TIMING
1592     yaz_timing_stop(t);
1593     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
1594             yaz_timing_get_real(t), yaz_timing_get_user(t),
1595             yaz_timing_get_sys(t));
1596     yaz_timing_destroy(&t);
1597 #endif
1598     return recs;
1599 }
1600
1601 void statistics(struct session *se, struct statistics *stat)
1602 {
1603     struct client *cl;
1604     int count = 0;
1605
1606     memset(stat, 0, sizeof(*stat));
1607     for (cl = se->clients; cl; cl = cl->next)
1608     {
1609         if (!cl->connection)
1610             stat->num_no_connection++;
1611         switch (cl->state)
1612         {
1613             case Client_Connecting: stat->num_connecting++; break;
1614             case Client_Initializing: stat->num_initializing++; break;
1615             case Client_Searching: stat->num_searching++; break;
1616             case Client_Presenting: stat->num_presenting++; break;
1617             case Client_Idle: stat->num_idle++; break;
1618             case Client_Failed: stat->num_failed++; break;
1619             case Client_Error: stat->num_error++; break;
1620             default: break;
1621         }
1622         count++;
1623     }
1624     stat->num_hits = se->total_hits;
1625     stat->num_records = se->total_records;
1626
1627     stat->num_clients = count;
1628 }
1629
1630 static void start_http_listener(void)
1631 {
1632     char hp[128] = "";
1633     struct conf_server *ser = global_parameters.server;
1634
1635     if (*global_parameters.listener_override)
1636         strcpy(hp, global_parameters.listener_override);
1637     else
1638     {
1639         strcpy(hp, ser->host ? ser->host : "");
1640         if (ser->port)
1641         {
1642             if (*hp)
1643                 strcat(hp, ":");
1644             sprintf(hp + strlen(hp), "%d", ser->port);
1645         }
1646     }
1647     http_init(hp);
1648 }
1649
1650 static void start_proxy(void)
1651 {
1652     char hp[128] = "";
1653     struct conf_server *ser = global_parameters.server;
1654
1655     if (*global_parameters.proxy_override)
1656         strcpy(hp, global_parameters.proxy_override);
1657     else if (ser->proxy_host || ser->proxy_port)
1658     {
1659         strcpy(hp, ser->proxy_host ? ser->proxy_host : "");
1660         if (ser->proxy_port)
1661         {
1662             if (*hp)
1663                 strcat(hp, ":");
1664             sprintf(hp + strlen(hp), "%d", ser->proxy_port);
1665         }
1666     }
1667     else
1668         return;
1669
1670     http_set_proxyaddr(hp, ser->myurl ? ser->myurl : "");
1671 }
1672
1673 static void start_zproxy(void)
1674 {
1675     struct conf_server *ser = global_parameters.server;
1676
1677     if (*global_parameters.zproxy_override){
1678         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1679                 global_parameters.zproxy_override);
1680         return;
1681     }
1682
1683     else if (ser->zproxy_host || ser->zproxy_port)
1684     {
1685         char hp[128] = "";
1686
1687         strcpy(hp, ser->zproxy_host ? ser->zproxy_host : "");
1688         if (ser->zproxy_port)
1689         {
1690             if (*hp)
1691                 strcat(hp, ":");
1692             else
1693                 strcat(hp, "@:");
1694
1695             sprintf(hp + strlen(hp), "%d", ser->zproxy_port);
1696         }
1697         strcpy(global_parameters.zproxy_override, hp);
1698         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1699                 global_parameters.zproxy_override);
1700
1701     }
1702     else
1703         return;
1704 }
1705
1706
1707
1708 int main(int argc, char **argv)
1709 {
1710     int ret;
1711     char *arg;
1712
1713     if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
1714         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
1715
1716     yaz_log_init(YLOG_DEFAULT_LEVEL, "pazpar2", 0);
1717
1718     while ((ret = options("t:f:x:h:p:z:s:d", argv, argc, &arg)) != -2)
1719     {
1720         switch (ret) {
1721             case 'f':
1722                 if (!read_config(arg))
1723                     exit(1);
1724                 break;
1725             case 'h':
1726                 strcpy(global_parameters.listener_override, arg);
1727                 break;
1728             case 'p':
1729                 strcpy(global_parameters.proxy_override, arg);
1730                 break;
1731             case 'z':
1732                 strcpy(global_parameters.zproxy_override, arg);
1733                 break;
1734             case 't':
1735                 strcpy(global_parameters.settings_path_override, arg);
1736                 break;
1737             case 's':
1738                 load_simpletargets(arg);
1739                 break;
1740             case 'd':
1741                 global_parameters.dump_records = 1;
1742                 break;
1743             default:
1744                 fprintf(stderr, "Usage: pazpar2\n"
1745                         "    -f configfile\n"
1746                         "    -h [host:]port          (REST protocol listener)\n"
1747                         "    -C cclconfig\n"
1748                         "    -s simpletargetfile\n"
1749                         "    -p hostname[:portno]    (HTTP proxy)\n"
1750                         "    -z hostname[:portno]    (Z39.50 proxy)\n"
1751                         "    -d                      (show internal records)\n");
1752                 exit(1);
1753         }
1754     }
1755
1756     if (!config)
1757     {
1758         yaz_log(YLOG_FATAL, "Load config with -f");
1759         exit(1);
1760     }
1761     global_parameters.server = config->servers;
1762
1763     start_http_listener();
1764     start_proxy();
1765     start_zproxy();
1766
1767     if (*global_parameters.settings_path_override)
1768         settings_read(global_parameters.settings_path_override);
1769     else if (global_parameters.server->settings)
1770         settings_read(global_parameters.server->settings);
1771     else
1772         yaz_log(YLOG_WARN, "No settings-directory specified. Problems may well ensue!");
1773     prepare_databases();
1774     global_parameters.odr_in = odr_createmem(ODR_DECODE);
1775     global_parameters.odr_out = odr_createmem(ODR_ENCODE);
1776
1777     event_loop(&channel_list);
1778
1779     return 0;
1780 }
1781
1782 /*
1783  * Local variables:
1784  * c-basic-offset: 4
1785  * indent-tabs-mode: nil
1786  * End:
1787  * vim: shiftwidth=4 tabstop=8 expandtab
1788  */