added initialization of char *recsyn = 0; and char *piggyback = 0;
[pazpar2-moved-to-github.git] / src / pazpar2.c
index ea3af8a..99cc6a7 100644 (file)
@@ -1,4 +1,23 @@
-/* $Id: pazpar2.c,v 1.17 2007-01-06 04:54:58 quinn Exp $ */;
+/* $Id: pazpar2.c,v 1.78 2007-04-13 11:13:08 marc Exp $
+   Copyright (c) 2006-2007, Index Data.
+
+This file is part of Pazpar2.
+
+Pazpar2 is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free
+Software Foundation; either version 2, or (at your option) any later
+version.
+
+Pazpar2 is distributed in the hope that it will be useful, but WITHOUT ANY
+WARRANTY; without even the implied warranty of MERCHANTABILITY or
+FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+for more details.
+
+You should have received a copy of the GNU General Public License
+along with Pazpar2; see the file LICENSE.  If not, write to the
+Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
+02111-1307, USA.
+ */
 
 #include <stdlib.h>
 #include <stdio.h>
 #include <yaz/proto.h>
 #include <yaz/readconf.h>
 #include <yaz/pquery.h>
+#include <yaz/otherinfo.h>
 #include <yaz/yaz-util.h>
 #include <yaz/nmem.h>
+#include <yaz/query-charset.h>
+#include <yaz/querytowrbuf.h>
+#if YAZ_VERSIONL >= 0x020163
+#include <yaz/oid_db.h>
+#endif
+
+#if HAVE_CONFIG_H
+#include "cconfig.h"
+#endif
 
 #define USE_TIMING 0
 #if USE_TIMING
 #include <yaz/timing.h>
 #endif
 
+#include <netinet/in.h>
+
 #include "pazpar2.h"
 #include "eventl.h"
 #include "http.h"
 #include "reclists.h"
 #include "relevance.h"
 #include "config.h"
+#include "database.h"
+#include "settings.h"
 
-#define PAZPAR2_VERSION "0.1"
 #define MAX_CHUNK 15
 
 static void client_fatal(struct client *cl);
 static void connection_destroy(struct connection *co);
 static int client_prep_connection(struct client *cl);
 static void ingest_records(struct client *cl, Z_Records *r);
-static struct conf_retrievalprofile *database_retrieval_profile(struct database *db);
 void session_alert_watch(struct session *s, int what);
 
 IOCHAN channel_list = 0;  // Master list of connections we're handling events to
@@ -48,9 +79,6 @@ IOCHAN channel_list = 0;  // Master list of connections we're handling events to
 static struct connection *connection_freelist = 0;
 static struct client *client_freelist = 0;
 
-static struct host *hosts = 0;  // The hosts we know about 
-static struct database *databases = 0; // The databases we know about
-
 static char *client_states[] = {
     "Client_Connecting",
     "Client_Connected",
@@ -64,20 +92,24 @@ static char *client_states[] = {
     "Client_Stopped"
 };
 
+// Note: Some things in this structure will eventually move to configuration
 struct parameters global_parameters = 
 {
+    "",
+    "",
+    "",
+    "",
+    0,
     0,
     30,
     "81",
     "Index Data PazPar2 (MasterKey)",
-    PAZPAR2_VERSION,
+    VERSION,
     600, // 10 minutes
     60,
     100,
     MAX_CHUNK,
     0,
-    0,
-    0,
     0
 };
 
@@ -109,9 +141,26 @@ static int send_apdu(struct client *c, Z_APDU *a)
     return 0;
 }
 
+// Set authentication token in init if one is set for the client
+// TODO: Extend this to handle other schemes than open (should be simple)
+static void init_authentication(struct client *cl, Z_InitRequest *req)
+{
+    struct session_database *sdb = cl->database;
+    char *auth = session_setting_oneval(sdb, PZ_AUTHENTICATION);
+
+    if (auth)
+    {
+        Z_IdAuthentication *idAuth = odr_malloc(global_parameters.odr_out,
+                sizeof(*idAuth));
+        idAuth->which = Z_IdAuthentication_open;
+        idAuth->u.open = auth;
+        req->idAuthentication = idAuth;
+    }
+}
 
 static void send_init(IOCHAN i)
 {
+
     struct connection *co = iochan_getdata(i);
     struct client *cl = co->client;
     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
@@ -127,6 +176,27 @@ static void send_init(IOCHAN i)
     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
+
+    init_authentication(cl, a->u.initRequest);
+
+    /* add virtual host if tunneling through Z39.50 proxy */
+    
+    if (0 < strlen(global_parameters.zproxy_override) 
+        && 0 < strlen(cl->database->database->url))
+    {
+#if YAZ_VERSIONL >= 0x020163
+        const int *oid_proxy = yaz_string_to_oid(yaz_oid_std(),
+                                                 CLASS_USERINFO, OID_STR_PROXY);
+        yaz_oi_set_string_oid(&a->u.initRequest->otherInfo,
+                              global_parameters.odr_out, oid_proxy,
+                              1, cl->database->database->url);
+#else
+        yaz_oi_set_string_oidval(&a->u.initRequest->otherInfo,
+                                 global_parameters.odr_out, VAL_PROXY,
+                                 1, cl->database->database->url);
+#endif
+    }
+
     if (send_apdu(cl, a) >= 0)
     {
        iochan_setflags(i, EVENT_INPUT);
@@ -137,54 +207,141 @@ static void send_init(IOCHAN i)
     odr_reset(global_parameters.odr_out);
 }
 
+static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
+{
+    switch (n->kind)
+    {
+        case CCL_RPN_AND:
+        case CCL_RPN_OR:
+        case CCL_RPN_NOT:
+        case CCL_RPN_PROX:
+            pull_terms(nmem, n->u.p[0], termlist, num);
+            pull_terms(nmem, n->u.p[1], termlist, num);
+            break;
+        case CCL_RPN_TERM:
+            termlist[(*num)++] = nmem_strdup(nmem, n->u.t.term);
+            break;
+        default: // NOOP
+            break;
+    }
+}
+
+// Extract terms from query into null-terminated termlist
+static void extract_terms(NMEM nmem, struct ccl_rpn_node *query, char **termlist)
+{
+    int num = 0;
+
+    pull_terms(nmem, query, termlist, &num);
+    termlist[num] = 0;
+}
+
 static void send_search(IOCHAN i)
 {
     struct connection *co = iochan_getdata(i);
     struct client *cl = co->client; 
     struct session *se = cl->session;
-    struct database *db = cl->database;
+    struct session_database *sdb = cl->database;
     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
     int ndb, cerror, cpos;
     char **databaselist;
     Z_Query *zquery;
     struct ccl_rpn_node *cn;
     int ssub = 0, lslb = 100000, mspn = 10;
+    char *recsyn = 0;
+    char *piggyback = 0;
+    char *queryenc = 0;
+    yaz_iconv_t iconv = 0;
 
-    yaz_log(YLOG_DEBUG, "Sending search");
+    yaz_log(YLOG_DEBUG, "Sending search to %s", cl->database->database->url);
 
-    cn = ccl_find_str(global_parameters.ccl_filter, se->query, &cerror, &cpos);
+    cn = ccl_find_str(sdb->database->ccl_map, se->query, &cerror, &cpos);
     if (!cn)
         return;
+
+    if (!se->relevance)
+    {
+        // Initialize relevance structure with query terms
+        char *p[512];
+        extract_terms(se->nmem, cn, p);
+        se->relevance = relevance_create(se->nmem, (const char **) p,
+                se->expected_maxrecs);
+    }
+
+    // constructing RPN query
     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
             sizeof(Z_Query));
     zquery->which = Z_Query_type_1;
     zquery->u.type_1 = ccl_rpn_query(global_parameters.odr_out, cn);
     ccl_rpn_delete(cn);
 
-    for (ndb = 0; db->databases[ndb]; ndb++)
+    // converting to target encoding
+    if ((queryenc = session_setting_oneval(sdb, PZ_QUERYENCODING))){
+        iconv = yaz_iconv_open(queryenc, "UTF-8");
+        if (iconv){
+            yaz_query_charset_convert_rpnquery(zquery->u.type_1, 
+                                               global_parameters.odr_out, 
+                                               iconv);
+            yaz_iconv_close(iconv);
+        } else
+            yaz_log(YLOG_WARN, "Query encoding failed %s %s", 
+                    cl->database->database->url, queryenc);
+    }
+
+    for (ndb = 0; sdb->database->databases[ndb]; ndb++)
        ;
     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
-    for (ndb = 0; db->databases[ndb]; ndb++)
-       databaselist[ndb] = db->databases[ndb];
-
-    a->u.presentRequest->preferredRecordSyntax =
-            yaz_oidval_to_z3950oid(global_parameters.odr_out,
-            CLASS_RECSYN, VAL_USMARC);
-    a->u.searchRequest->smallSetUpperBound = &ssub;
-    a->u.searchRequest->largeSetLowerBound = &lslb;
-    a->u.searchRequest->mediumSetPresentNumber = &mspn;
+    for (ndb = 0; sdb->database->databases[ndb]; ndb++)
+       databaselist[ndb] = sdb->database->databases[ndb];
+
+    if (!(piggyback = session_setting_oneval(sdb, PZ_PIGGYBACK)) || *piggyback == '1')
+    {
+        if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
+        {
+#if YAZ_VERSIONL >= 0x020163
+            a->u.searchRequest->preferredRecordSyntax =
+                yaz_string_to_oid_odr(yaz_oid_std(),
+                                      CLASS_RECSYN, recsyn,
+                                      global_parameters.odr_out);
+#else
+            a->u.searchRequest->preferredRecordSyntax =
+                yaz_str_to_z3950oid(global_parameters.odr_out,
+                                    CLASS_RECSYN, recsyn);
+#endif
+        }
+        a->u.searchRequest->smallSetUpperBound = &ssub;
+        a->u.searchRequest->largeSetLowerBound = &lslb;
+        a->u.searchRequest->mediumSetPresentNumber = &mspn;
+    }
     a->u.searchRequest->resultSetName = "Default";
     a->u.searchRequest->databaseNames = databaselist;
     a->u.searchRequest->num_databaseNames = ndb;
 
-    if (send_apdu(cl, a) >= 0)
-    {
-       iochan_setflags(i, EVENT_INPUT);
-       cl->state = Client_Searching;
-        cl->requestid = se->requestid;
-    }
-    else
-        cl->state = Client_Error;
+    
+    {  //scope for sending and logging queries 
+        WRBUF wbquery = wrbuf_alloc();
+        yaz_query_to_wrbuf(wbquery, zquery);
+
+
+        if (send_apdu(cl, a) >= 0)
+            {
+                iochan_setflags(i, EVENT_INPUT);
+                cl->state = Client_Searching;
+                cl->requestid = se->requestid;
+                yaz_log(YLOG_LOG, "SearchRequest %s %s %s", 
+                         cl->database->database->url,
+                        queryenc ? queryenc : "UTF-8",
+                        wrbuf_cstr(wbquery));
+            }
+        else {
+            cl->state = Client_Error;
+                yaz_log(YLOG_WARN, "Failed SearchRequest %s  %s %s", 
+                         cl->database->database->url, 
+                        queryenc ? queryenc : "UTF-8",
+                        wrbuf_cstr(wbquery));
+        }
+        
+        wrbuf_destroy(wbquery);
+    }    
 
     odr_reset(global_parameters.odr_out);
 }
@@ -193,9 +350,11 @@ static void send_present(IOCHAN i)
 {
     struct connection *co = iochan_getdata(i);
     struct client *cl = co->client; 
+    struct session_database *sdb = cl->database;
     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
     int toget;
     int start = cl->records + 1;
+    char *recsyn;
 
     toget = global_parameters.chunk;
     if (toget > global_parameters.toget - cl->records)
@@ -210,9 +369,19 @@ static void send_present(IOCHAN i)
 
     a->u.presentRequest->resultSetId = "Default";
 
-    a->u.presentRequest->preferredRecordSyntax =
-            yaz_oidval_to_z3950oid(global_parameters.odr_out,
-            CLASS_RECSYN, VAL_USMARC);
+    if ((recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX)))
+    {
+#if YAZ_VERSIONL >= 0x020163
+        a->u.presentRequest->preferredRecordSyntax =
+            yaz_string_to_oid_odr(yaz_oid_std(),
+                                  CLASS_RECSYN, recsyn,
+                                  global_parameters.odr_out);
+#else
+        a->u.presentRequest->preferredRecordSyntax =
+            yaz_str_to_z3950oid(global_parameters.odr_out,
+                                CLASS_RECSYN, recsyn);
+#endif
+    }
 
     if (send_apdu(cl, a) >= 0)
     {
@@ -230,7 +399,7 @@ static void do_initResponse(IOCHAN i, Z_APDU *a)
     struct client *cl = co->client;
     Z_InitResponse *r = a->u.initResponse;
 
-    yaz_log(YLOG_DEBUG, "Received init response");
+    yaz_log(YLOG_DEBUG, "Init response %s", cl->database->database->url);
 
     if (*r->result)
     {
@@ -247,7 +416,8 @@ static void do_searchResponse(IOCHAN i, Z_APDU *a)
     struct session *se = cl->session;
     Z_SearchResponse *r = a->u.searchResponse;
 
-    yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
+    yaz_log(YLOG_DEBUG, "Search response %s (status=%d)", 
+            cl->database->database->url, *r->searchStatus);
 
     if (*r->searchStatus)
     {
@@ -255,8 +425,8 @@ static void do_searchResponse(IOCHAN i, Z_APDU *a)
         se->total_hits += cl->hits;
         if (r->presentStatus && !*r->presentStatus && r->records)
         {
-            yaz_log(YLOG_DEBUG, "Records in search response");
-            cl->records += *r->numberOfRecordsReturned;
+            yaz_log(YLOG_DEBUG, "Records in search response %s", 
+                    cl->database->database->url);
             ingest_records(cl, r->records);
         }
         cl->state = Client_Idle;
@@ -269,7 +439,9 @@ static void do_searchResponse(IOCHAN i, Z_APDU *a)
             Z_Records *recs = r->records;
             if (recs->which == Z_Records_NSD)
             {
-                yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
+                yaz_log(YLOG_WARN, 
+                        "Search response: Non-surrogate diagnostic %s",
+                        cl->database->database->url);
                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
                 cl->state = Client_Error;
             }
@@ -277,10 +449,40 @@ static void do_searchResponse(IOCHAN i, Z_APDU *a)
     }
 }
 
-char *normalize_mergekey(char *buf)
+static void do_closeResponse(IOCHAN i, Z_APDU *a)
+{
+    struct connection *co = iochan_getdata(i);
+    struct client *cl = co->client;
+    /* Z_Close *r = a->u.close; */
+
+    yaz_log(YLOG_WARN, "Close response %s", cl->database->database->url);
+
+    cl->state = Client_Failed;
+    connection_destroy(co);
+}
+
+
+char *normalize_mergekey(char *buf, int skiparticle)
 {
     char *p = buf, *pout = buf;
 
+    if (skiparticle)
+    {
+        char firstword[64];
+        char articles[] = "the den der die des an a "; // must end in space
+
+        while (*p && !isalnum(*p))
+            p++;
+        pout = firstword;
+        while (*p && *p != ' ' && pout - firstword < 62)
+            *(pout++) = tolower(*(p++));
+        *(pout++) = ' ';
+        *(pout++) = '\0';
+        if (!strstr(articles, firstword))
+            p = buf;
+        pout = buf;
+    }
+
     while (*p)
     {
         while (*p && !isalnum(*p))
@@ -293,51 +495,20 @@ char *normalize_mergekey(char *buf)
             p++;
     }
     if (buf != pout)
-        *pout = '\0';
+        do {
+            *(pout--) = '\0';
+        }
+        while (pout > buf && *pout == ' ');
 
     return buf;
 }
 
-
-#ifdef GAGA
-// FIXME needs to be generalized. Should flexibly generate X lists per search
-static void extract_subject(struct session *s, const char *rec)
-{
-    const char *field, *subfield;
-
-    while ((field = find_field(rec, "650")))
-    {
-        rec = field; 
-        if ((subfield = find_subfield(field, 'a')))
-        {
-            char *e, *ef;
-            char buf[1024];
-            int len;
-
-            ef = index(subfield, '\n');
-            if (!ef)
-                return;
-            if ((e = index(subfield, '\t')) && e < ef)
-                ef = e;
-            while (ef > subfield && !isalpha(*(ef - 1)) && *(ef - 1) != ')')
-                ef--;
-            len = ef - subfield;
-            assert(len < 1023);
-            memcpy(buf, subfield, len);
-            buf[len] = '\0';
-#ifdef FIXME
-            if (*buf)
-                termlist_insert(s->termlist, buf);
-#endif
-        }
-    }
-}
-#endif
-
 static void add_facet(struct session *s, const char *type, const char *value)
 {
     int i;
 
+    if (!*value)
+        return;
     for (i = 0; i < s->num_termlists; i++)
         if (!strcmp(s->termlists[i].name, type))
             break;
@@ -357,79 +528,161 @@ static void add_facet(struct session *s, const char *type, const char *value)
 
 static xmlDoc *normalize_record(struct client *cl, Z_External *rec)
 {
-    struct conf_retrievalprofile *rprofile = cl->database->rprofile;
-    struct conf_retrievalmap *m;
+    struct database_retrievalmap *m;
+    struct database *db = cl->database->database;
     xmlNode *res;
     xmlDoc *rdoc;
 
     // First normalize to XML
-    if (rprofile->native_syntax == Nativesyn_iso2709)
+    if (db->yaz_marc)
     {
         char *buf;
         int len;
         if (rec->which != Z_External_octet)
         {
-            yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
+            yaz_log(YLOG_WARN, "Unexpected external branch, probably BER %s",
+                    cl->database->database->url);
             return 0;
         }
         buf = (char*) rec->u.octet_aligned->buf;
         len = rec->u.octet_aligned->len;
-        if (yaz_marc_read_iso2709(rprofile->yaz_marc, buf, len) < 0)
+        if (yaz_marc_read_iso2709(db->yaz_marc, buf, len) < 0)
         {
-            yaz_log(YLOG_WARN, "Failed to decode MARC");
+            yaz_log(YLOG_WARN, "Failed to decode MARC %s",
+                    cl->database->database->url);
             return 0;
         }
-        if (yaz_marc_write_xml(rprofile->yaz_marc, &res,
+
+        yaz_marc_write_using_libxml2(db->yaz_marc, 1);
+        if (yaz_marc_write_xml(db->yaz_marc, &res,
                     "http://www.loc.gov/MARC21/slim", 0, 0) < 0)
         {
-            yaz_log(YLOG_WARN, "Failed to encode as XML");
+            yaz_log(YLOG_WARN, "Failed to encode as XML %s",
+                    cl->database->database->url);
             return 0;
         }
-        rdoc = xmlNewDoc("1.0");
+        rdoc = xmlNewDoc((xmlChar *) "1.0");
         xmlDocSetRootElement(rdoc, res);
+
     }
     else
     {
-        yaz_log(YLOG_FATAL, "Unknown native_syntax in normalize_record");
+        yaz_log(YLOG_FATAL, 
+                "Unknown native_syntax in normalize_record from %s",
+                cl->database->database->url);
         exit(1);
     }
-    for (m = rprofile->maplist; m; m = m->next)
-    {
-        xmlDoc *new;
-        if (m->type != Map_xslt)
+
+    if (global_parameters.dump_records){
+        fprintf(stderr, 
+                "Input Record (normalized) from %s\n----------------\n",
+                cl->database->database->url);
+#if LIBXML_VERSION >= 20600
+        xmlDocFormatDump(stderr, rdoc, 1);
+#else
+        xmlDocDump(stderr, rdoc);
+#endif
+    }
+
+    for (m = db->map; m; m = m->next){
+        xmlDoc *new = 0;
+
+#if 1
+        {
+            xmlNodePtr root = 0;
+            new = xsltApplyStylesheet(m->stylesheet, rdoc, 0);
+            root= xmlDocGetRootElement(new);
+        if (!new || !root || !(root->children))
         {
-            yaz_log(YLOG_WARN, "Unknown map type");
+            yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
+                    cl->database->database->url);
+            xmlFreeDoc(new);
+            xmlFreeDoc(rdoc);
             return 0;
         }
-        if (!(new = xsltApplyStylesheet(m->stylesheet, rdoc, 0)))
+        }
+#endif
+
+#if 0
+        // do it another way to detect transformation errors right now
+        // but does not seem to work either!
         {
-            yaz_log(YLOG_WARN, "XSLT transformation failed");
-            return 0;
+            xsltTransformContextPtr ctxt;
+            ctxt = xsltNewTransformContext(m->stylesheet, rdoc);
+            new = xsltApplyStylesheetUser(m->stylesheet, rdoc, 0, 0, 0, ctxt);
+            if ((ctxt->state == XSLT_STATE_ERROR) ||
+                (ctxt->state == XSLT_STATE_STOPPED)){
+                yaz_log(YLOG_WARN, "XSLT transformation failed from %s",
+                        cl->database->database->url);
+                xmlFreeDoc(new);
+                xmlFreeDoc(rdoc);
+                return 0;
+            }
         }
+#endif      
+   
         xmlFreeDoc(rdoc);
         rdoc = new;
     }
     if (global_parameters.dump_records)
     {
-        fprintf(stderr, "Record:\n----------------\n");
+        fprintf(stderr, "Record from %s\n----------------\n", 
+                cl->database->database->url);
+#if LIBXML_VERSION >= 20600
         xmlDocFormatDump(stderr, rdoc, 1);
+#else
+        xmlDocDump(stderr, rdoc);
+#endif
     }
     return rdoc;
 }
 
+// Extract what appears to be years from buf, storing highest and
+// lowest values.
+static int extract_years(const char *buf, int *first, int *last)
+{
+    *first = -1;
+    *last = -1;
+    while (*buf)
+    {
+        const char *e;
+        int len;
+
+        while (*buf && !isdigit(*buf))
+            buf++;
+        len = 0;
+        for (e = buf; *e && isdigit(*e); e++)
+            len++;
+        if (len == 4)
+        {
+            int value = atoi(buf);
+            if (*first < 0 || value < *first)
+                *first = value;
+            if (*last < 0 || value > *last)
+                *last = value;
+        }
+        buf = e;
+    }
+    return *first;
+}
+
 static struct record *ingest_record(struct client *cl, Z_External *rec)
 {
     xmlDoc *xdoc = normalize_record(cl, rec);
     xmlNode *root, *n;
-    struct record *res, *head;
+    struct record *res;
+    struct record_cluster *cluster;
     struct session *se = cl->session;
     xmlChar *mergekey, *mergekey_norm;
+    xmlChar *type = 0;
+    xmlChar *value = 0;
+    struct conf_service *service = global_parameters.server->service;
 
     if (!xdoc)
         return 0;
 
     root = xmlDocGetRootElement(xdoc);
-    if (!(mergekey = xmlGetProp(root, "mergekey")))
+    if (!(mergekey = xmlGetProp(root, (xmlChar *) "mergekey")))
     {
         yaz_log(YLOG_WARN, "No mergekey found in record");
         xmlFreeDoc(xdoc);
@@ -437,68 +690,215 @@ static struct record *ingest_record(struct client *cl, Z_External *rec)
     }
 
     res = nmem_malloc(se->nmem, sizeof(struct record));
-    res->next_cluster = 0;
-    res->target_offset = -1;
-    res->term_frequency_vec = 0;
-    res->title = "Unknown";
-    res->relevance = 0;
+    res->next = 0;
+    res->client = cl;
+    res->metadata = nmem_malloc(se->nmem,
+            sizeof(struct record_metadata*) * service->num_metadata);
+    memset(res->metadata, 0, sizeof(struct record_metadata*) * service->num_metadata);
 
-    mergekey_norm = nmem_strdup(se->nmem, (char*) mergekey);
+    mergekey_norm = (xmlChar *) nmem_strdup(se->nmem, (char*) mergekey);
     xmlFree(mergekey);
-    res->merge_key = normalize_mergekey(mergekey_norm);
+    normalize_mergekey((char *) mergekey_norm, 0);
 
-    head = reclist_insert(se->reclist, res);
-    if (!head)
+    cluster = reclist_insert(se->reclist, res, (char *) mergekey_norm, 
+                             &se->total_merged);
+    if (global_parameters.dump_records)
+        yaz_log(YLOG_LOG, "Cluster id %d from %s (#%d)", cluster->recid,
+                cl->database->database->url, cl->records);
+    if (!cluster)
     {
         /* no room for record */
         xmlFreeDoc(xdoc);
         return 0;
     }
-    relevance_newrec(se->relevance, head);
+    relevance_newrec(se->relevance, cluster);
 
     for (n = root->children; n; n = n->next)
     {
+        if (type)
+            xmlFree(type);
+        if (value)
+            xmlFree(value);
+        type = value = 0;
+
         if (n->type != XML_ELEMENT_NODE)
             continue;
-        if (!strcmp(n->name, "facet"))
+        if (!strcmp((const char *) n->name, "metadata"))
         {
-            xmlChar *type = xmlGetProp(n, "type");
-            xmlChar *value = xmlNodeListGetString(xdoc, n->children, 0);
-            if (type && value)
+            struct conf_metadata *md = 0;
+            struct conf_sortkey *sk = 0;
+            struct record_metadata **wheretoput, *newm;
+            int imeta;
+            int first, last;
+
+            type = xmlGetProp(n, (xmlChar *) "type");
+            value = xmlNodeListGetString(xdoc, n->children, 0);
+
+            if (!type || !value)
+                continue;
+
+            // First, find out what field we're looking at
+            for (imeta = 0; imeta < service->num_metadata; imeta++)
+                if (!strcmp((const char *) type, service->metadata[imeta].name))
+                {
+                    md = &service->metadata[imeta];
+                    if (md->sortkey_offset >= 0)
+                        sk = &service->sortkeys[md->sortkey_offset];
+                    break;
+                }
+            if (!md)
             {
-                add_facet(se, type, value);
-                relevance_countwords(se->relevance, head, value, 1);
+                yaz_log(YLOG_WARN, "Ignoring unknown metadata element: %s", type);
+                continue;
             }
-            xmlFree(type);
-            xmlFree(value);
-        }
-        else if (!strcmp(n->name, "metadata"))
-        {
-            xmlChar *type = xmlGetProp(n, "type");
-            if (type && !strcmp(type, "title"))
+
+            // Find out where we are putting it
+            if (md->merge == Metadata_merge_no)
+                wheretoput = &res->metadata[imeta];
+            else
+                wheretoput = &cluster->metadata[imeta];
+            
+            // Put it there
+            newm = nmem_malloc(se->nmem, sizeof(struct record_metadata));
+            newm->next = 0;
+            if (md->type == Metadata_type_generic)
+            {
+                char *p, *pe;
+                for (p = (char *) value; *p && isspace(*p); p++)
+                    ;
+                for (pe = p + strlen(p) - 1;
+                        pe > p && strchr(" ,/.:([", *pe); pe--)
+                    *pe = '\0';
+                newm->data.text = nmem_strdup(se->nmem, p);
+
+            }
+            else if (md->type == Metadata_type_year)
+            {
+                if (extract_years((char *) value, &first, &last) < 0)
+                    continue;
+            }
+            else
+            {
+                yaz_log(YLOG_WARN, "Unknown type in metadata element %s", type);
+                continue;
+            }
+            if (md->type == Metadata_type_year && md->merge != Metadata_merge_range)
+            {
+                yaz_log(YLOG_WARN, "Only range merging supported for years");
+                continue;
+            }
+            if (md->merge == Metadata_merge_unique)
+            {
+                struct record_metadata *mnode;
+                for (mnode = *wheretoput; mnode; mnode = mnode->next)
+                    if (!strcmp((const char *) mnode->data.text, newm->data.text))
+                        break;
+                if (!mnode)
+                {
+                    newm->next = *wheretoput;
+                    *wheretoput = newm;
+                }
+            }
+            else if (md->merge == Metadata_merge_longest)
+            {
+                if (!*wheretoput ||
+                        strlen(newm->data.text) > strlen((*wheretoput)->data.text))
+                {
+                    *wheretoput = newm;
+                    if (sk)
+                    {
+                        char *s = nmem_strdup(se->nmem, newm->data.text);
+                        if (!cluster->sortkeys[md->sortkey_offset])
+                            cluster->sortkeys[md->sortkey_offset] = 
+                                nmem_malloc(se->nmem, sizeof(union data_types));
+                        normalize_mergekey(s,
+                                (sk->type == Metadata_sortkey_skiparticle));
+                        cluster->sortkeys[md->sortkey_offset]->text = s;
+                    }
+                }
+            }
+            else if (md->merge == Metadata_merge_all || md->merge == Metadata_merge_no)
+            {
+                newm->next = *wheretoput;
+                *wheretoput = newm;
+            }
+            else if (md->merge == Metadata_merge_range)
+            {
+                assert(md->type == Metadata_type_year);
+                if (!*wheretoput)
+                {
+                    *wheretoput = newm;
+                    (*wheretoput)->data.number.min = first;
+                    (*wheretoput)->data.number.max = last;
+                    if (sk)
+                        cluster->sortkeys[md->sortkey_offset] = &newm->data;
+                }
+                else
+                {
+                    if (first < (*wheretoput)->data.number.min)
+                        (*wheretoput)->data.number.min = first;
+                    if (last > (*wheretoput)->data.number.max)
+                        (*wheretoput)->data.number.max = last;
+                }
+#ifdef GAGA
+                if (sk)
+                {
+                    union data_types *sdata = cluster->sortkeys[md->sortkey_offset];
+                    yaz_log(YLOG_LOG, "SK range: %d-%d", sdata->number.min, sdata->number.max);
+                }
+#endif
+            }
+            else
+                yaz_log(YLOG_WARN, "Don't know how to merge on element name %s", md->name);
+
+            if (md->rank)
+                relevance_countwords(se->relevance, cluster, 
+                                     (char *) value, md->rank);
+            if (md->termlist)
             {
-                xmlChar *value = xmlNodeListGetString(xdoc, n->children, 0);
-                if (value)
+                if (md->type == Metadata_type_year)
                 {
-                    res->title = nmem_strdup(se->nmem, value);
-                    relevance_countwords(se->relevance, head, value, 4);
-                    xmlFree(value);
+                    char year[64];
+                    sprintf(year, "%d", last);
+                    add_facet(se, (char *) type, year);
+                    if (first != last)
+                    {
+                        sprintf(year, "%d", first);
+                        add_facet(se, (char *) type, year);
+                    }
                 }
+                else
+                    add_facet(se, (char *) type, (char *) value);
             }
             xmlFree(type);
+            xmlFree(value);
+            type = value = 0;
         }
         else
             yaz_log(YLOG_WARN, "Unexpected element %s in internal record", n->name);
     }
+    if (type)
+        xmlFree(type);
+    if (value)
+        xmlFree(value);
 
     xmlFreeDoc(xdoc);
 
-    relevance_donerecord(se->relevance, head);
+    relevance_donerecord(se->relevance, cluster);
     se->total_records++;
 
     return res;
 }
 
+// Retrieve first defined value for 'name' for given database.
+// Will be extended to take into account user associated with session
+char *session_setting_oneval(struct session_database *db, int offset)
+{
+    if (!db->settings[offset])
+        return "";
+    return db->settings[offset]->value;
+}
+
 static void ingest_records(struct client *cl, Z_Records *r)
 {
 #if USE_TIMING
@@ -516,9 +916,12 @@ static void ingest_records(struct client *cl, Z_Records *r)
     {
         Z_NamePlusRecord *npr = rlist->records[i];
 
+        cl->records++;
         if (npr->which != Z_NamePlusRecord_databaseRecord)
         {
-            yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
+            yaz_log(YLOG_WARN, 
+                    "Unexpected record type, probably diagnostic %s",
+                    cl->database->database->url);
             continue;
         }
 
@@ -548,7 +951,8 @@ static void do_presentResponse(IOCHAN i, Z_APDU *a)
         Z_Records *recs = r->records;
         if (recs->which == Z_Records_NSD)
         {
-            yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
+            yaz_log(YLOG_WARN, "Non-surrogate diagnostic %s",
+                    cl->database->database->url);
             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
             cl->state = Client_Error;
         }
@@ -556,14 +960,15 @@ static void do_presentResponse(IOCHAN i, Z_APDU *a)
 
     if (!*r->presentStatus && cl->state != Client_Error)
     {
-        yaz_log(YLOG_DEBUG, "Good Present response");
-        cl->records += *r->numberOfRecordsReturned;
+        yaz_log(YLOG_DEBUG, "Good Present response %s",
+                cl->database->database->url);
         ingest_records(cl, r->records);
         cl->state = Client_Idle;
     }
     else if (*r->presentStatus) 
     {
-        yaz_log(YLOG_WARN, "Bad Present response");
+        yaz_log(YLOG_WARN, "Bad Present response %s",
+                cl->database->database->url);
         cl->state = Client_Error;
     }
 }
@@ -609,13 +1014,14 @@ static void handler(IOCHAN i, int event)
 
        if (len < 0)
        {
-            yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from Z server");
+            yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from %s", 
+                    cl->database->database->url);
             connection_destroy(co);
            return;
        }
         else if (len == 0)
        {
-            yaz_log(YLOG_WARN, "EOF reading from Z server");
+            yaz_log(YLOG_WARN, "EOF reading from %s", cl->database->database->url);
             connection_destroy(co);
            return;
        }
@@ -645,8 +1051,13 @@ static void handler(IOCHAN i, int event)
                     case Z_APDU_presentResponse:
                         do_presentResponse(i, a);
                         break;
+                    case Z_APDU_close:
+                        do_closeResponse(i, a);
+                        break;
                     default:
-                        yaz_log(YLOG_WARN, "Unexpected result from server");
+                        yaz_log(YLOG_WARN, 
+                                "Unexpected Z39.50 response from %s",  
+                                cl->database->database->url);
                         client_fatal(cl);
                         return;
                 }
@@ -730,23 +1141,43 @@ static struct connection *connection_create(struct client *cl)
     int res;
     void *addr;
 
-    yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->url);
+
     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
-    {
-        yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
-        exit(1);
-    }
+        {
+            yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
+            exit(1);
+        }
+    
+    if (0 == strlen(global_parameters.zproxy_override)){
+        /* no Z39.50 proxy needed - direct connect */
+        yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->database->url);
+        
+        if (!(addr = cs_straddr(link, cl->database->database->host->ipport)))
+            {
+                yaz_log(YLOG_WARN|YLOG_ERRNO, 
+                        "Lookup of IP address %s failed", 
+                        cl->database->database->host->ipport);
+                return 0;
+            }
+    
+    } else {
+        /* Z39.50 proxy connect */
+        yaz_log(YLOG_DEBUG, "Connection create %s proxy %s", 
+                cl->database->database->url, global_parameters.zproxy_override);
 
-    if (!(addr = cs_straddr(link, cl->database->host->ipport)))
-    {
-        yaz_log(YLOG_WARN|YLOG_ERRNO, "Lookup of IP address failed?");
-        return 0;
+        if (!(addr = cs_straddr(link, global_parameters.zproxy_override)))
+            {
+                yaz_log(YLOG_WARN|YLOG_ERRNO, 
+                        "Lookup of IP address %s failed", 
+                        global_parameters.zproxy_override);
+                return 0;
+            }
     }
 
     res = cs_connect(link, addr);
     if (res < 0)
     {
-        yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->url);
+        yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->database->url);
         return 0;
     }
 
@@ -759,7 +1190,7 @@ static struct connection *connection_create(struct client *cl)
         new->ibufsize = 0;
     }
     new->state = Conn_Connecting;
-    new->host = cl->database->host;
+    new->host = cl->database->database->host;
     new->next = new->host->connections;
     new->host->connections = new;
     new->client = cl;
@@ -776,7 +1207,7 @@ static struct connection *connection_create(struct client *cl)
 // Close connection and set state to error
 static void client_fatal(struct client *cl)
 {
-    yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->url);
+    yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->database->url);
     connection_destroy(cl->connection);
     cl->state = Client_Error;
 }
@@ -786,11 +1217,11 @@ static int client_prep_connection(struct client *cl)
 {
     struct connection *co;
     struct session *se = cl->session;
-    struct host *host = cl->database->host;
+    struct host *host = cl->database->database->host;
 
     co = cl->connection;
 
-    yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->url);
+    yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->database->url);
 
     if (!co)
     {
@@ -827,136 +1258,6 @@ static int client_prep_connection(struct client *cl)
         return 0;
 }
 
-// This function will most likely vanish when a proper target profile mechanism is
-// introduced.
-void load_simpletargets(const char *fn)
-{
-    FILE *f = fopen(fn, "r");
-    char line[256];
-
-    if (!f)
-    {
-        yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
-        exit(1);
-    }
-
-    while (fgets(line, 255, f))
-    {
-        char *url, *db;
-        struct host *host;
-        struct database *database;
-
-        if (strncmp(line, "target ", 7))
-            continue;
-        url = line + 7;
-        url[strlen(url) - 1] = '\0';
-        yaz_log(YLOG_DEBUG, "Target: %s", url);
-        if ((db = strchr(url, '/')))
-            *(db++) = '\0';
-        else
-            db = "Default";
-
-        for (host = hosts; host; host = host->next)
-            if (!strcmp(url, host->hostport))
-                break;
-        if (!host)
-        {
-            struct addrinfo *addrinfo, hints;
-            char *port;
-            char ipport[128];
-            unsigned char addrbuf[4];
-            int res;
-
-            host = xmalloc(sizeof(struct host));
-            host->hostport = xstrdup(url);
-            host->connections = 0;
-
-            if ((port = strchr(url, ':')))
-                *(port++) = '\0';
-            else
-                port = "210";
-
-            hints.ai_flags = 0;
-            hints.ai_family = PF_INET;
-            hints.ai_socktype = SOCK_STREAM;
-            hints.ai_protocol = IPPROTO_TCP;
-            hints.ai_addrlen = 0;
-            hints.ai_addr = 0;
-            hints.ai_canonname = 0;
-            hints.ai_next = 0;
-            // This is not robust code. It assumes that getaddrinfo returns AF_INET
-            // address.
-            if ((res = getaddrinfo(url, port, &hints, &addrinfo)))
-            {
-                yaz_log(YLOG_WARN, "Failed to resolve %s: %s", url, gai_strerror(res));
-                xfree(host->hostport);
-                xfree(host);
-                continue;
-            }
-            assert(addrinfo->ai_family == PF_INET);
-            memcpy(addrbuf, &((struct sockaddr_in*)addrinfo->ai_addr)->sin_addr.s_addr, 4);
-            sprintf(ipport, "%hhd.%hhd.%hhd.%hhd:%s",
-                    addrbuf[0], addrbuf[1], addrbuf[2], addrbuf[3], port);
-            host->ipport = xstrdup(ipport);
-            freeaddrinfo(addrinfo);
-            host->next = hosts;
-            hosts = host;
-        }
-        database = xmalloc(sizeof(struct database));
-        database->host = host;
-        database->url = xmalloc(strlen(url) + strlen(db) + 2);
-        strcpy(database->url, url);
-        strcat(database->url, "/");
-        strcat(database->url, db);
-        
-        database->databases = xmalloc(2 * sizeof(char *));
-        database->databases[0] = xstrdup(db);
-        database->databases[1] = 0;
-        database->errors = 0;
-        database->qprofile = 0;
-        database->rprofile = database_retrieval_profile(database);
-        database->next = databases;
-        databases = database;
-
-    }
-    fclose(f);
-}
-
-static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
-{
-    switch (n->kind)
-    {
-        case CCL_RPN_AND:
-        case CCL_RPN_OR:
-        case CCL_RPN_NOT:
-        case CCL_RPN_PROX:
-            pull_terms(nmem, n->u.p[0], termlist, num);
-            pull_terms(nmem, n->u.p[1], termlist, num);
-            break;
-        case CCL_RPN_TERM:
-            termlist[(*num)++] = nmem_strdup(nmem, n->u.t.term);
-            break;
-        default: // NOOP
-            break;
-    }
-}
-
-// Extract terms from query into null-terminated termlist
-static int extract_terms(NMEM nmem, char *query, char **termlist)
-{
-    int error, pos;
-    struct ccl_rpn_node *n;
-    int num = 0;
-
-    n = ccl_find_str(global_parameters.ccl_filter, query, &error, &pos);
-    if (!n)
-        return -1;
-    pull_terms(nmem, n, termlist, &num);
-    termlist[num] = 0;
-    ccl_rpn_delete(n);
-    return 0;
-}
-
 static struct client *client_create(void)
 {
     struct client *r;
@@ -1014,40 +1315,25 @@ void session_alert_watch(struct session *s, int what)
     s->watchlist[what].data = 0;
 }
 
-// This needs to be extended with selection criteria
-static struct conf_retrievalprofile *database_retrieval_profile(struct database *db)
+//callback for grep_databases
+static void select_targets_callback(void *context, struct session_database *db)
 {
-    if (!config)
-    {
-        yaz_log(YLOG_FATAL, "Must load configuration (-f)");
-        exit(1);
-    }
-    if (!config->retrievalprofiles)
-    {
-        yaz_log(YLOG_FATAL, "No retrieval profiles defined");
-    }
-    return config->retrievalprofiles;
+    struct session *se = (struct session*) context;
+    struct client *cl = client_create();
+    cl->database = db;
+    cl->session = se;
+    cl->next = se->clients;
+    se->clients = cl;
 }
 
-// This should be extended with parameters to control selection criteria
 // Associates a set of clients with a session;
-int select_targets(struct session *se)
+// Note: Session-databases represent databases with per-session setting overrides
+int select_targets(struct session *se, struct database_criterion *crit)
 {
-    struct database *db;
-    int c = 0;
-
     while (se->clients)
         client_destroy(se->clients);
-    for (db = databases; db; db = db->next)
-    {
-        struct client *cl = client_create();
-        cl->database = db;
-        cl->session = se;
-        cl->next = se->clients;
-        se->clients = cl;
-        c++;
-    }
-    return c;
+
+    return session_grep_databases(se, crit, select_targets_callback);
 }
 
 int session_active_clients(struct session *s)
@@ -1065,34 +1351,72 @@ int session_active_clients(struct session *s)
     return res;
 }
 
-char *search(struct session *se, char *query)
+// parses crit1=val1,crit2=val2|val3,...
+static struct database_criterion *parse_filter(NMEM m, const char *buf)
+{
+    struct database_criterion *res = 0;
+    char **values;
+    int num;
+    int i;
+
+    if (!buf || !*buf)
+        return 0;
+    nmem_strsplit(m, ",", buf,  &values, &num);
+    for (i = 0; i < num; i++)
+    {
+        char **subvalues;
+        int subnum;
+        int subi;
+        struct database_criterion *new = nmem_malloc(m, sizeof(*new));
+        char *eq = strchr(values[i], '=');
+        if (!eq)
+        {
+            yaz_log(YLOG_WARN, "Missing equal-sign in filter");
+            return 0;
+        }
+        *(eq++) = '\0';
+        new->name = values[i];
+        nmem_strsplit(m, "|", eq, &subvalues, &subnum);
+        new->values = 0;
+        for (subi = 0; subi < subnum; subi++)
+        {
+            struct database_criterion_value *newv = nmem_malloc(m, sizeof(*newv));
+            newv->value = subvalues[subi];
+            newv->next = new->values;
+            new->values = newv;
+        }
+        new->next = res;
+        res = new;
+    }
+    return res;
+}
+
+char *search(struct session *se, char *query, char *filter)
 {
     int live_channels = 0;
     struct client *cl;
+    struct database_criterion *criteria;
 
     yaz_log(YLOG_DEBUG, "Search");
 
+    nmem_reset(se->nmem);
+    criteria = parse_filter(se->nmem, filter);
     strcpy(se->query, query);
     se->requestid++;
-    nmem_reset(se->nmem);
+    select_targets(se, criteria);
     for (cl = se->clients; cl; cl = cl->next)
     {
-        cl->hits = -1;
-        cl->records = 0;
-        cl->diagnostic = 0;
-
         if (client_prep_connection(cl))
             live_channels++;
     }
     if (live_channels)
     {
-        char *p[512];
         int maxrecs = live_channels * global_parameters.toget;
         se->num_termlists = 0;
         se->reclist = reclist_create(se->nmem, maxrecs);
-        extract_terms(se->nmem, query, p);
-        se->relevance = relevance_create(se->nmem, (const char **) p, maxrecs);
-        se->total_records = se->total_hits = 0;
+        // This will be initialized in send_search()
+        se->relevance = 0;
+        se->total_records = se->total_hits = se->total_merged = 0;
         se->expected_maxrecs = maxrecs;
     }
     else
@@ -1101,21 +1425,72 @@ char *search(struct session *se, char *query)
     return 0;
 }
 
+// Apply a session override to a database
+void session_apply_setting(struct session *se, char *dbname, char *setting, char *value)
+{
+    struct session_database *sdb;
+
+    for (sdb = se->databases; sdb; sdb = sdb->next)
+        if (!strcmp(dbname, sdb->database->url))
+        {
+            struct setting *new = nmem_malloc(se->session_nmem, sizeof(*new));
+            int offset = settings_offset(setting);
+
+            if (offset < 0)
+            {
+                yaz_log(YLOG_WARN, "Unknown setting %s", setting);
+                return;
+            }
+            new->precedence = 0;
+            new->target = dbname;
+            new->name = setting;
+            new->value = value;
+            new->next = sdb->settings[offset];
+            sdb->settings[offset] = new;
+            break;
+        }
+    if (!sdb)
+        yaz_log(YLOG_WARN, "Unknown database in setting override: %s", dbname);
+}
+
+void session_init_databases_fun(void *context, struct database *db)
+{
+    struct session *se = (struct session *) context;
+    struct session_database *new = nmem_malloc(se->session_nmem, sizeof(*new));
+    int num = settings_num();
+    int i;
+
+    new->database = db;
+    new->settings = nmem_malloc(se->session_nmem, sizeof(struct settings *) * num);
+    for (i = 0; i < num; i++)
+        new->settings[i] = db->settings[i];
+    new->next = se->databases;
+    se->databases = new;
+}
+
+// Initialize session_database list -- this represents this session's view
+// of the database list -- subject to modification by the settings ws command
+void session_init_databases(struct session *se)
+{
+    se->databases = 0;
+    grep_databases(se, 0, session_init_databases_fun);
+}
+
 void destroy_session(struct session *s)
 {
     yaz_log(YLOG_LOG, "Destroying session");
     while (s->clients)
         client_destroy(s->clients);
     nmem_destroy(s->nmem);
-    wrbuf_free(s->wrbuf, 1);
+    wrbuf_destroy(s->wrbuf);
 }
 
-struct session *new_session() 
+struct session *new_session(NMEM nmem) 
 {
     int i;
-    struct session *session = xmalloc(sizeof(*session));
+    struct session *session = nmem_malloc(nmem, sizeof(*session));
 
-    yaz_log(YLOG_DEBUG, "New pazpar2 session");
+    yaz_log(YLOG_DEBUG, "New Pazpar2 session");
     
     session->total_hits = 0;
     session->total_records = 0;
@@ -1125,16 +1500,16 @@ struct session *new_session()
     session->clients = 0;
     session->expected_maxrecs = 0;
     session->query[0] = '\0';
+    session->session_nmem = nmem;
     session->nmem = nmem_create();
     session->wrbuf = wrbuf_alloc();
+    session_init_databases(session);
     for (i = 0; i <= SESSION_WATCH_MAX; i++)
     {
         session->watchlist[i].data = 0;
         session->watchlist[i].fun = 0;
     }
 
-    select_targets(session);
-
     return session;
 }
 
@@ -1146,7 +1521,10 @@ struct hitsbytarget *hitsbytarget(struct session *se, int *count)
     *count = 0;
     for (cl = se->clients; cl; cl = cl->next)
     {
-        strcpy(res[*count].id, cl->database->host->hostport);
+        char *name = session_setting_oneval(cl->database, PZ_NAME);
+
+        res[*count].id = cl->database->database->url;
+        res[*count].name = *name ? name : "Unknown";
         res[*count].hits = cl->hits;
         res[*count].records = cl->records;
         res[*count].diagnostic = cl->diagnostic;
@@ -1163,7 +1541,7 @@ struct termlist_score **termlist(struct session *s, const char *name, int *num)
     int i;
 
     for (i = 0; i < s->num_termlists; i++)
-        if (!strcmp(s->termlists[i].name, name))
+        if (!strcmp((const char *) s->termlists[i].name, name))
             return termlist_highscore(s->termlists[i].termlist, num);
     return 0;
 }
@@ -1181,16 +1559,35 @@ void report_nmem_stats(void)
 }
 #endif
 
-struct record **show(struct session *s, int start, int *num, int *total,
-                     int *sumhits, NMEM nmem_show)
+struct record_cluster *show_single(struct session *s, int id)
 {
-    struct record **recs = nmem_malloc(nmem_show, *num 
-                                       * sizeof(struct record *));
+    struct record_cluster *r;
+
+    reclist_rewind(s->reclist);
+    while ((r = reclist_read_record(s->reclist)))
+        if (r->recid == id)
+            return r;
+    return 0;
+}
+
+struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, int start,
+        int *num, int *total, int *sumhits, NMEM nmem_show)
+{
+    struct record_cluster **recs = nmem_malloc(nmem_show, *num 
+                                       * sizeof(struct record_cluster *));
+    struct reclist_sortparms *spp;
     int i;
 #if USE_TIMING    
     yaz_timing_t t = yaz_timing_create();
 #endif
-    relevance_prepare_read(s->relevance, s->reclist);
+
+    for (spp = sp; spp; spp = spp->next)
+        if (spp->type == Metadata_sortkey_relevance)
+        {
+            relevance_prepare_read(s->relevance, s->reclist);
+            break;
+        }
+    reclist_sort(s->reclist, sp);
 
     *total = s->reclist->num_records;
     *sumhits = s->total_hits;
@@ -1205,7 +1602,7 @@ struct record **show(struct session *s, int start, int *num, int *total,
 
     for (i = 0; i < *num; i++)
     {
-        struct record *r = reclist_read_record(s->reclist);
+        struct record_cluster *r = reclist_read_record(s->reclist);
         if (!r)
         {
             *num = i;
@@ -1228,7 +1625,7 @@ void statistics(struct session *se, struct statistics *stat)
     struct client *cl;
     int count = 0;
 
-    bzero(stat, sizeof(*stat));
+    memset(stat, 0, sizeof(*stat));
     for (cl = se->clients; cl; cl = cl->next)
     {
         if (!cl->connection)
@@ -1252,29 +1649,95 @@ void statistics(struct session *se, struct statistics *stat)
     stat->num_clients = count;
 }
 
-static CCL_bibset load_cclfile(const char *fn)
+static void start_http_listener(void)
 {
-    CCL_bibset res = ccl_qual_mk();
-    if (ccl_qual_fname(res, fn) < 0)
+    char hp[128] = "";
+    struct conf_server *ser = global_parameters.server;
+
+    if (*global_parameters.listener_override)
+        strcpy(hp, global_parameters.listener_override);
+    else
     {
-        yaz_log(YLOG_FATAL|YLOG_ERRNO, "%s", fn);
-        exit(1);
+        strcpy(hp, ser->host ? ser->host : "");
+        if (ser->port)
+        {
+            if (*hp)
+                strcat(hp, ":");
+            sprintf(hp + strlen(hp), "%d", ser->port);
+        }
     }
-    return res;
+    http_init(hp);
 }
 
+static void start_proxy(void)
+{
+    char hp[128] = "";
+    struct conf_server *ser = global_parameters.server;
+
+    if (*global_parameters.proxy_override)
+        strcpy(hp, global_parameters.proxy_override);
+    else if (ser->proxy_host || ser->proxy_port)
+    {
+        strcpy(hp, ser->proxy_host ? ser->proxy_host : "");
+        if (ser->proxy_port)
+        {
+            if (*hp)
+                strcat(hp, ":");
+            sprintf(hp + strlen(hp), "%d", ser->proxy_port);
+        }
+    }
+    else
+        return;
+
+    http_set_proxyaddr(hp, ser->myurl ? ser->myurl : "");
+}
+
+static void start_zproxy(void)
+{
+    struct conf_server *ser = global_parameters.server;
+
+    if (*global_parameters.zproxy_override){
+        yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
+                global_parameters.zproxy_override);
+        return;
+    }
+
+    else if (ser->zproxy_host || ser->zproxy_port)
+    {
+        char hp[128] = "";
+
+        strcpy(hp, ser->zproxy_host ? ser->zproxy_host : "");
+        if (ser->zproxy_port)
+        {
+            if (*hp)
+                strcat(hp, ":");
+            else
+                strcat(hp, "@:");
+
+            sprintf(hp + strlen(hp), "%d", ser->zproxy_port);
+        }
+        strcpy(global_parameters.zproxy_override, hp);
+        yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
+                global_parameters.zproxy_override);
+
+    }
+    else
+        return;
+}
+
+
+
 int main(int argc, char **argv)
 {
     int ret;
     char *arg;
-    int setport = 0;
 
-    if (signal(SIGPIPE, SIG_IGN) < 0)
+    if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
 
     yaz_log_init(YLOG_DEFAULT_LEVEL, "pazpar2", 0);
 
-    while ((ret = options("f:x:h:p:C:s:d", argv, argc, &arg)) != -2)
+    while ((ret = options("t:f:x:h:p:z:s:d", argv, argc, &arg)) != -2)
     {
        switch (ret) {
             case 'f':
@@ -1282,14 +1745,16 @@ int main(int argc, char **argv)
                     exit(1);
                 break;
             case 'h':
-                http_init(arg);
-                setport++;
-                break;
-            case 'C':
-                global_parameters.ccl_filter = load_cclfile(arg);
+                strcpy(global_parameters.listener_override, arg);
                 break;
             case 'p':
-                http_set_proxyaddr(arg);
+                strcpy(global_parameters.proxy_override, arg);
+                break;
+            case 'z':
+                strcpy(global_parameters.zproxy_override, arg);
+                break;
+            case 't':
+                strcpy(global_parameters.settings_path_override, arg);
                 break;
             case 's':
                 load_simpletargets(arg);
@@ -1303,20 +1768,31 @@ int main(int argc, char **argv)
                         "    -h [host:]port          (REST protocol listener)\n"
                         "    -C cclconfig\n"
                         "    -s simpletargetfile\n"
-                        "    -p hostname[:portno]    (HTTP proxy)\n");
+                        "    -p hostname[:portno]    (HTTP proxy)\n"
+                        "    -z hostname[:portno]    (Z39.50 proxy)\n"
+                        "    -d                      (show internal records)\n");
                exit(1);
        }
     }
 
-    if (!setport)
+    if (!config)
     {
-        fprintf(stderr, "Set command port with -h\n");
+        yaz_log(YLOG_FATAL, "Load config with -f");
         exit(1);
     }
+    global_parameters.server = config->servers;
 
-    global_parameters.ccl_filter = load_cclfile("../etc/default.bib");
-    global_parameters.yaz_marc = yaz_marc_create();
-    yaz_marc_subfield_str(global_parameters.yaz_marc, "\t");
+    start_http_listener();
+    start_proxy();
+    start_zproxy();
+
+    if (*global_parameters.settings_path_override)
+        settings_read(global_parameters.settings_path_override);
+    else if (global_parameters.server->settings)
+        settings_read(global_parameters.server->settings);
+    else
+        yaz_log(YLOG_WARN, "No settings-directory specified. Problems may well ensue!");
+    prepare_databases();
     global_parameters.odr_in = odr_createmem(ODR_DECODE);
     global_parameters.odr_out = odr_createmem(ODR_ENCODE);