aligning log messages for nice look-and-feel
[pazpar2-moved-to-github.git] / src / pazpar2.c
1 /* $Id: pazpar2.c,v 1.58 2007-03-31 20:06:18 marc Exp $ */
2
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <sys/time.h>
7 #include <unistd.h>
8 #include <sys/socket.h>
9 #include <netdb.h>
10 #include <signal.h>
11 #include <ctype.h>
12 #include <assert.h>
13
14 #include <yaz/marcdisp.h>
15 #include <yaz/comstack.h>
16 #include <yaz/tcpip.h>
17 #include <yaz/proto.h>
18 #include <yaz/readconf.h>
19 #include <yaz/pquery.h>
20 #include <yaz/otherinfo.h>
21 #include <yaz/yaz-util.h>
22 #include <yaz/nmem.h>
23
24 #if HAVE_CONFIG_H
25 #include "cconfig.h"
26 #endif
27
28 #define USE_TIMING 0
29 #if USE_TIMING
30 #include <yaz/timing.h>
31 #endif
32
33 #include <netinet/in.h>
34
35 #include "pazpar2.h"
36 #include "eventl.h"
37 #include "http.h"
38 #include "termlists.h"
39 #include "reclists.h"
40 #include "relevance.h"
41 #include "config.h"
42 #include "database.h"
43 #include "settings.h"
44
45 #define MAX_CHUNK 15
46
47 static void client_fatal(struct client *cl);
48 static void connection_destroy(struct connection *co);
49 static int client_prep_connection(struct client *cl);
50 static void ingest_records(struct client *cl, Z_Records *r);
51 //static struct conf_retrievalprofile *database_retrieval_profile(struct database *db);
52 void session_alert_watch(struct session *s, int what);
53
54 IOCHAN channel_list = 0;  // Master list of connections we're handling events to
55
56 static struct connection *connection_freelist = 0;
57 static struct client *client_freelist = 0;
58
59 static char *client_states[] = {
60     "Client_Connecting",
61     "Client_Connected",
62     "Client_Idle",
63     "Client_Initializing",
64     "Client_Searching",
65     "Client_Presenting",
66     "Client_Error",
67     "Client_Failed",
68     "Client_Disconnected",
69     "Client_Stopped"
70 };
71
72 // Note: Some things in this structure will eventually move to configuration
73 struct parameters global_parameters = 
74 {
75     "",
76     "",
77     "",
78     "",
79     0,
80     0,
81     30,
82     "81",
83     "Index Data PazPar2 (MasterKey)",
84     VERSION,
85     600, // 10 minutes
86     60,
87     100,
88     MAX_CHUNK,
89     0,
90     0,
91     0
92 };
93
94 static int send_apdu(struct client *c, Z_APDU *a)
95 {
96     struct connection *co = c->connection;
97     char *buf;
98     int len, r;
99
100     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
101     {
102         odr_perror(global_parameters.odr_out, "Encoding APDU");
103         abort();
104     }
105     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
106     r = cs_put(co->link, buf, len);
107     if (r < 0)
108     {
109         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
110         return -1;
111     }
112     else if (r == 1)
113     {
114         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
115         exit(1);
116     }
117     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
118     co->state = Conn_Waiting;
119     return 0;
120 }
121
122
123 static void send_init(IOCHAN i)
124 {
125
126     struct connection *co = iochan_getdata(i);
127     struct client *cl = co->client;
128     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
129
130     a->u.initRequest->implementationId = global_parameters.implementationId;
131     a->u.initRequest->implementationName = global_parameters.implementationName;
132     a->u.initRequest->implementationVersion =
133         global_parameters.implementationVersion;
134     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
135     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
136     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
137
138     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
139     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
140     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
141
142
143     /* add virtual host if tunneling through Z39.50 proxy */
144     
145     if (0 < strlen(global_parameters.zproxy_override) 
146         && 0 < strlen(cl->database->url))
147         yaz_oi_set_string_oidval(&a->u.initRequest->otherInfo, 
148                                  global_parameters.odr_out, VAL_PROXY,
149                                  1, cl->database->url);
150     
151
152
153     if (send_apdu(cl, a) >= 0)
154     {
155         iochan_setflags(i, EVENT_INPUT);
156         cl->state = Client_Initializing;
157     }
158     else
159         cl->state = Client_Error;
160     odr_reset(global_parameters.odr_out);
161 }
162
163 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
164 {
165     switch (n->kind)
166     {
167         case CCL_RPN_AND:
168         case CCL_RPN_OR:
169         case CCL_RPN_NOT:
170         case CCL_RPN_PROX:
171             pull_terms(nmem, n->u.p[0], termlist, num);
172             pull_terms(nmem, n->u.p[1], termlist, num);
173             break;
174         case CCL_RPN_TERM:
175             termlist[(*num)++] = nmem_strdup(nmem, n->u.t.term);
176             break;
177         default: // NOOP
178             break;
179     }
180 }
181
182 // Extract terms from query into null-terminated termlist
183 static void extract_terms(NMEM nmem, struct ccl_rpn_node *query, char **termlist)
184 {
185     int num = 0;
186
187     pull_terms(nmem, query, termlist, &num);
188     termlist[num] = 0;
189 }
190
191 static void send_search(IOCHAN i)
192 {
193     struct connection *co = iochan_getdata(i);
194     struct client *cl = co->client; 
195     struct session *se = cl->session;
196     struct database *db = cl->database;
197     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
198     int ndb, cerror, cpos;
199     char **databaselist;
200     Z_Query *zquery;
201     struct ccl_rpn_node *cn;
202     int ssub = 0, lslb = 100000, mspn = 10;
203
204     yaz_log(YLOG_DEBUG, "Sending search");
205
206     cn = ccl_find_str(db->ccl_map, se->query, &cerror, &cpos);
207     if (!cn)
208         return;
209
210     if (!se->relevance)
211     {
212         // Initialize relevance structure with query terms
213         char *p[512];
214         extract_terms(se->nmem, cn, p);
215         se->relevance = relevance_create(se->nmem, (const char **) p,
216                 se->expected_maxrecs);
217     }
218
219     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
220             sizeof(Z_Query));
221     zquery->which = Z_Query_type_1;
222     zquery->u.type_1 = ccl_rpn_query(global_parameters.odr_out, cn);
223     ccl_rpn_delete(cn);
224
225     for (ndb = 0; db->databases[ndb]; ndb++)
226         ;
227     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
228     for (ndb = 0; db->databases[ndb]; ndb++)
229         databaselist[ndb] = db->databases[ndb];
230
231     a->u.searchRequest->preferredRecordSyntax =
232             yaz_oidval_to_z3950oid(global_parameters.odr_out,
233             CLASS_RECSYN, VAL_USMARC);
234     a->u.searchRequest->smallSetUpperBound = &ssub;
235     a->u.searchRequest->largeSetLowerBound = &lslb;
236     a->u.searchRequest->mediumSetPresentNumber = &mspn;
237     a->u.searchRequest->resultSetName = "Default";
238     a->u.searchRequest->databaseNames = databaselist;
239     a->u.searchRequest->num_databaseNames = ndb;
240
241     if (send_apdu(cl, a) >= 0)
242     {
243         iochan_setflags(i, EVENT_INPUT);
244         cl->state = Client_Searching;
245         cl->requestid = se->requestid;
246     }
247     else
248         cl->state = Client_Error;
249
250     odr_reset(global_parameters.odr_out);
251 }
252
253 static void send_present(IOCHAN i)
254 {
255     struct connection *co = iochan_getdata(i);
256     struct client *cl = co->client; 
257     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
258     int toget;
259     int start = cl->records + 1;
260
261     toget = global_parameters.chunk;
262     if (toget > global_parameters.toget - cl->records)
263         toget = global_parameters.toget - cl->records;
264     if (toget > cl->hits - cl->records)
265         toget = cl->hits - cl->records;
266
267     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
268
269     a->u.presentRequest->resultSetStartPoint = &start;
270     a->u.presentRequest->numberOfRecordsRequested = &toget;
271
272     a->u.presentRequest->resultSetId = "Default";
273
274     a->u.presentRequest->preferredRecordSyntax =
275             yaz_oidval_to_z3950oid(global_parameters.odr_out,
276             CLASS_RECSYN, VAL_USMARC);
277
278     if (send_apdu(cl, a) >= 0)
279     {
280         iochan_setflags(i, EVENT_INPUT);
281         cl->state = Client_Presenting;
282     }
283     else
284         cl->state = Client_Error;
285     odr_reset(global_parameters.odr_out);
286 }
287
288 static void do_initResponse(IOCHAN i, Z_APDU *a)
289 {
290     struct connection *co = iochan_getdata(i);
291     struct client *cl = co->client;
292     Z_InitResponse *r = a->u.initResponse;
293
294     yaz_log(YLOG_DEBUG, "Received init response");
295
296     if (*r->result)
297     {
298         cl->state = Client_Idle;
299     }
300     else
301         cl->state = Client_Failed; // FIXME need to do something to the connection
302 }
303
304 static void do_searchResponse(IOCHAN i, Z_APDU *a)
305 {
306     struct connection *co = iochan_getdata(i);
307     struct client *cl = co->client;
308     struct session *se = cl->session;
309     Z_SearchResponse *r = a->u.searchResponse;
310
311     yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
312
313     if (*r->searchStatus)
314     {
315         cl->hits = *r->resultCount;
316         se->total_hits += cl->hits;
317         if (r->presentStatus && !*r->presentStatus && r->records)
318         {
319             yaz_log(YLOG_DEBUG, "Records in search response");
320             ingest_records(cl, r->records);
321         }
322         cl->state = Client_Idle;
323     }
324     else
325     {          /*"FAILED"*/
326         cl->hits = 0;
327         cl->state = Client_Error;
328         if (r->records) {
329             Z_Records *recs = r->records;
330             if (recs->which == Z_Records_NSD)
331             {
332                 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
333                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
334                 cl->state = Client_Error;
335             }
336         }
337     }
338 }
339
340 char *normalize_mergekey(char *buf, int skiparticle)
341 {
342     char *p = buf, *pout = buf;
343
344     if (skiparticle)
345     {
346         char firstword[64];
347         char articles[] = "the den der die des an a "; // must end in space
348
349         while (*p && !isalnum(*p))
350             p++;
351         pout = firstword;
352         while (*p && *p != ' ' && pout - firstword < 62)
353             *(pout++) = tolower(*(p++));
354         *(pout++) = ' ';
355         *(pout++) = '\0';
356         if (!strstr(articles, firstword))
357             p = buf;
358         pout = buf;
359     }
360
361     while (*p)
362     {
363         while (*p && !isalnum(*p))
364             p++;
365         while (isalnum(*p))
366             *(pout++) = tolower(*(p++));
367         if (*p)
368             *(pout++) = ' ';
369         while (*p && !isalnum(*p))
370             p++;
371     }
372     if (buf != pout)
373         do {
374             *(pout--) = '\0';
375         }
376         while (pout > buf && *pout == ' ');
377
378     return buf;
379 }
380
381
382 #ifdef GAGA
383 // FIXME needs to be generalized. Should flexibly generate X lists per search
384 static void extract_subject(struct session *s, const char *rec)
385 {
386     const char *field, *subfield;
387
388     while ((field = find_field(rec, "650")))
389     {
390         rec = field; 
391         if ((subfield = find_subfield(field, 'a')))
392         {
393             char *e, *ef;
394             char buf[1024];
395             int len;
396
397             ef = index(subfield, '\n');
398             if (!ef)
399                 return;
400             if ((e = index(subfield, '\t')) && e < ef)
401                 ef = e;
402             while (ef > subfield && !isalpha(*(ef - 1)) && *(ef - 1) != ')')
403                 ef--;
404             len = ef - subfield;
405             assert(len < 1023);
406             memcpy(buf, subfield, len);
407             buf[len] = '\0';
408 #ifdef FIXME
409             if (*buf)
410                 termlist_insert(s->termlist, buf);
411 #endif
412         }
413     }
414 }
415 #endif
416
417 static void add_facet(struct session *s, const char *type, const char *value)
418 {
419     int i;
420
421     if (!*value)
422         return;
423     for (i = 0; i < s->num_termlists; i++)
424         if (!strcmp(s->termlists[i].name, type))
425             break;
426     if (i == s->num_termlists)
427     {
428         if (i == SESSION_MAX_TERMLISTS)
429         {
430             yaz_log(YLOG_FATAL, "Too many termlists");
431             exit(1);
432         }
433         s->termlists[i].name = nmem_strdup(s->nmem, type);
434         s->termlists[i].termlist = termlist_create(s->nmem, s->expected_maxrecs, 15);
435         s->num_termlists = i + 1;
436     }
437     termlist_insert(s->termlists[i].termlist, value);
438 }
439
440 static xmlDoc *normalize_record(struct client *cl, Z_External *rec)
441 {
442     struct conf_retrievalprofile *rprofile = cl->database->rprofile;
443     struct conf_retrievalmap *m;
444     xmlNode *res;
445     xmlDoc *rdoc;
446
447     // First normalize to XML
448     if (rprofile->native_syntax == Nativesyn_iso2709)
449     {
450         char *buf;
451         int len;
452         if (rec->which != Z_External_octet)
453         {
454             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
455             return 0;
456         }
457         buf = (char*) rec->u.octet_aligned->buf;
458         len = rec->u.octet_aligned->len;
459         if (yaz_marc_read_iso2709(rprofile->yaz_marc, buf, len) < 0)
460         {
461             yaz_log(YLOG_WARN, "Failed to decode MARC");
462             return 0;
463         }
464         if (yaz_marc_write_xml(rprofile->yaz_marc, &res,
465                     "http://www.loc.gov/MARC21/slim", 0, 0) < 0)
466         {
467             yaz_log(YLOG_WARN, "Failed to encode as XML");
468             return 0;
469         }
470         rdoc = xmlNewDoc((xmlChar *) "1.0");
471         xmlDocSetRootElement(rdoc, res);
472     }
473     else
474     {
475         yaz_log(YLOG_FATAL, "Unknown native_syntax in normalize_record");
476         exit(1);
477     }
478
479     if (global_parameters.dump_records)
480     {
481         fprintf(stderr, "Input Record (normalized):\n----------------\n");
482 #if LIBXML_VERSION >= 20600
483         xmlDocFormatDump(stderr, rdoc, 1);
484 #else
485         xmlDocDump(stderr, rdoc);
486 #endif
487     }
488
489     for (m = rprofile->maplist; m; m = m->next)
490     {
491         xmlDoc *new;
492         if (m->type != Map_xslt)
493         {
494             yaz_log(YLOG_WARN, "Unknown map type");
495             return 0;
496         }
497         if (!(new = xsltApplyStylesheet(m->stylesheet, rdoc, 0)))
498         {
499             yaz_log(YLOG_WARN, "XSLT transformation failed");
500             return 0;
501         }
502         xmlFreeDoc(rdoc);
503         rdoc = new;
504     }
505     if (global_parameters.dump_records)
506     {
507         fprintf(stderr, "Record:\n----------------\n");
508 #if LIBXML_VERSION >= 20600
509         xmlDocFormatDump(stderr, rdoc, 1);
510 #else
511         xmlDocDump(stderr, rdoc);
512 #endif
513     }
514     return rdoc;
515 }
516
517 // Extract what appears to be years from buf, storing highest and
518 // lowest values.
519 static int extract_years(const char *buf, int *first, int *last)
520 {
521     *first = -1;
522     *last = -1;
523     while (*buf)
524     {
525         const char *e;
526         int len;
527
528         while (*buf && !isdigit(*buf))
529             buf++;
530         len = 0;
531         for (e = buf; *e && isdigit(*e); e++)
532             len++;
533         if (len == 4)
534         {
535             int value = atoi(buf);
536             if (*first < 0 || value < *first)
537                 *first = value;
538             if (*last < 0 || value > *last)
539                 *last = value;
540         }
541         buf = e;
542     }
543     return *first;
544 }
545
546 static struct record *ingest_record(struct client *cl, Z_External *rec)
547 {
548     xmlDoc *xdoc = normalize_record(cl, rec);
549     xmlNode *root, *n;
550     struct record *res;
551     struct record_cluster *cluster;
552     struct session *se = cl->session;
553     xmlChar *mergekey, *mergekey_norm;
554     xmlChar *type = 0;
555     xmlChar *value = 0;
556     struct conf_service *service = global_parameters.server->service;
557
558     if (!xdoc)
559         return 0;
560
561     root = xmlDocGetRootElement(xdoc);
562     if (!(mergekey = xmlGetProp(root, (xmlChar *) "mergekey")))
563     {
564         yaz_log(YLOG_WARN, "No mergekey found in record");
565         xmlFreeDoc(xdoc);
566         return 0;
567     }
568
569     res = nmem_malloc(se->nmem, sizeof(struct record));
570     res->next = 0;
571     res->client = cl;
572     res->metadata = nmem_malloc(se->nmem,
573             sizeof(struct record_metadata*) * service->num_metadata);
574     memset(res->metadata, 0, sizeof(struct record_metadata*) * service->num_metadata);
575
576     mergekey_norm = (xmlChar *) nmem_strdup(se->nmem, (char*) mergekey);
577     xmlFree(mergekey);
578     normalize_mergekey((char *) mergekey_norm, 0);
579
580     cluster = reclist_insert(se->reclist, res, (char *) mergekey_norm, 
581                              &se->total_merged);
582     if (global_parameters.dump_records)
583         yaz_log(YLOG_LOG, "Cluster id %d from %s (#%d)", cluster->recid,
584                 cl->database->url, cl->records);
585     if (!cluster)
586     {
587         /* no room for record */
588         xmlFreeDoc(xdoc);
589         return 0;
590     }
591     relevance_newrec(se->relevance, cluster);
592
593     for (n = root->children; n; n = n->next)
594     {
595         if (type)
596             xmlFree(type);
597         if (value)
598             xmlFree(value);
599         type = value = 0;
600
601         if (n->type != XML_ELEMENT_NODE)
602             continue;
603         if (!strcmp((const char *) n->name, "metadata"))
604         {
605             struct conf_metadata *md = 0;
606             struct conf_sortkey *sk = 0;
607             struct record_metadata **wheretoput, *newm;
608             int imeta;
609             int first, last;
610
611             type = xmlGetProp(n, (xmlChar *) "type");
612             value = xmlNodeListGetString(xdoc, n->children, 0);
613
614             if (!type || !value)
615                 continue;
616
617             // First, find out what field we're looking at
618             for (imeta = 0; imeta < service->num_metadata; imeta++)
619                 if (!strcmp((const char *) type, service->metadata[imeta].name))
620                 {
621                     md = &service->metadata[imeta];
622                     if (md->sortkey_offset >= 0)
623                         sk = &service->sortkeys[md->sortkey_offset];
624                     break;
625                 }
626             if (!md)
627             {
628                 yaz_log(YLOG_WARN, "Ignoring unknown metadata element: %s", type);
629                 continue;
630             }
631
632             // Find out where we are putting it
633             if (md->merge == Metadata_merge_no)
634                 wheretoput = &res->metadata[imeta];
635             else
636                 wheretoput = &cluster->metadata[imeta];
637             
638             // Put it there
639             newm = nmem_malloc(se->nmem, sizeof(struct record_metadata));
640             newm->next = 0;
641             if (md->type == Metadata_type_generic)
642             {
643                 char *p, *pe;
644                 for (p = (char *) value; *p && isspace(*p); p++)
645                     ;
646                 for (pe = p + strlen(p) - 1;
647                         pe > p && strchr(" ,/.:([", *pe); pe--)
648                     *pe = '\0';
649                 newm->data.text = nmem_strdup(se->nmem, p);
650
651             }
652             else if (md->type == Metadata_type_year)
653             {
654                 if (extract_years((char *) value, &first, &last) < 0)
655                     continue;
656             }
657             else
658             {
659                 yaz_log(YLOG_WARN, "Unknown type in metadata element %s", type);
660                 continue;
661             }
662             if (md->type == Metadata_type_year && md->merge != Metadata_merge_range)
663             {
664                 yaz_log(YLOG_WARN, "Only range merging supported for years");
665                 continue;
666             }
667             if (md->merge == Metadata_merge_unique)
668             {
669                 struct record_metadata *mnode;
670                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
671                     if (!strcmp((const char *) mnode->data.text, newm->data.text))
672                         break;
673                 if (!mnode)
674                 {
675                     newm->next = *wheretoput;
676                     *wheretoput = newm;
677                 }
678             }
679             else if (md->merge == Metadata_merge_longest)
680             {
681                 if (!*wheretoput ||
682                         strlen(newm->data.text) > strlen((*wheretoput)->data.text))
683                 {
684                     *wheretoput = newm;
685                     if (sk)
686                     {
687                         char *s = nmem_strdup(se->nmem, newm->data.text);
688                         if (!cluster->sortkeys[md->sortkey_offset])
689                             cluster->sortkeys[md->sortkey_offset] = 
690                                 nmem_malloc(se->nmem, sizeof(union data_types));
691                         normalize_mergekey(s,
692                                 (sk->type == Metadata_sortkey_skiparticle));
693                         cluster->sortkeys[md->sortkey_offset]->text = s;
694                     }
695                 }
696             }
697             else if (md->merge == Metadata_merge_all || md->merge == Metadata_merge_no)
698             {
699                 newm->next = *wheretoput;
700                 *wheretoput = newm;
701             }
702             else if (md->merge == Metadata_merge_range)
703             {
704                 assert(md->type == Metadata_type_year);
705                 if (!*wheretoput)
706                 {
707                     *wheretoput = newm;
708                     (*wheretoput)->data.number.min = first;
709                     (*wheretoput)->data.number.max = last;
710                     if (sk)
711                         cluster->sortkeys[md->sortkey_offset] = &newm->data;
712                 }
713                 else
714                 {
715                     if (first < (*wheretoput)->data.number.min)
716                         (*wheretoput)->data.number.min = first;
717                     if (last > (*wheretoput)->data.number.max)
718                         (*wheretoput)->data.number.max = last;
719                 }
720 #ifdef GAGA
721                 if (sk)
722                 {
723                     union data_types *sdata = cluster->sortkeys[md->sortkey_offset];
724                     yaz_log(YLOG_LOG, "SK range: %d-%d", sdata->number.min, sdata->number.max);
725                 }
726 #endif
727             }
728             else
729                 yaz_log(YLOG_WARN, "Don't know how to merge on element name %s", md->name);
730
731             if (md->rank)
732                 relevance_countwords(se->relevance, cluster, 
733                                      (char *) value, md->rank);
734             if (md->termlist)
735             {
736                 if (md->type == Metadata_type_year)
737                 {
738                     char year[64];
739                     sprintf(year, "%d", last);
740                     add_facet(se, (char *) type, year);
741                     if (first != last)
742                     {
743                         sprintf(year, "%d", first);
744                         add_facet(se, (char *) type, year);
745                     }
746                 }
747                 else
748                     add_facet(se, (char *) type, (char *) value);
749             }
750             xmlFree(type);
751             xmlFree(value);
752             type = value = 0;
753         }
754         else
755             yaz_log(YLOG_WARN, "Unexpected element %s in internal record", n->name);
756     }
757     if (type)
758         xmlFree(type);
759     if (value)
760         xmlFree(value);
761
762     xmlFreeDoc(xdoc);
763
764     relevance_donerecord(se->relevance, cluster);
765     se->total_records++;
766
767     return res;
768 }
769
770 static void ingest_records(struct client *cl, Z_Records *r)
771 {
772 #if USE_TIMING
773     yaz_timing_t t = yaz_timing_create();
774 #endif
775     struct record *rec;
776     struct session *s = cl->session;
777     Z_NamePlusRecordList *rlist;
778     int i;
779
780     if (r->which != Z_Records_DBOSD)
781         return;
782     rlist = r->u.databaseOrSurDiagnostics;
783     for (i = 0; i < rlist->num_records; i++)
784     {
785         Z_NamePlusRecord *npr = rlist->records[i];
786
787         cl->records++;
788         if (npr->which != Z_NamePlusRecord_databaseRecord)
789         {
790             yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
791             continue;
792         }
793
794         rec = ingest_record(cl, npr->u.databaseRecord);
795         if (!rec)
796             continue;
797     }
798     if (s->watchlist[SESSION_WATCH_RECORDS].fun && rlist->num_records)
799         session_alert_watch(s, SESSION_WATCH_RECORDS);
800
801 #if USE_TIMING
802     yaz_timing_stop(t);
803     yaz_log(YLOG_LOG, "ingest_records %6.5f %3.2f %3.2f", 
804             yaz_timing_get_real(t), yaz_timing_get_user(t),
805             yaz_timing_get_sys(t));
806     yaz_timing_destroy(&t);
807 #endif
808 }
809
810 static void do_presentResponse(IOCHAN i, Z_APDU *a)
811 {
812     struct connection *co = iochan_getdata(i);
813     struct client *cl = co->client;
814     Z_PresentResponse *r = a->u.presentResponse;
815
816     if (r->records) {
817         Z_Records *recs = r->records;
818         if (recs->which == Z_Records_NSD)
819         {
820             yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
821             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
822             cl->state = Client_Error;
823         }
824     }
825
826     if (!*r->presentStatus && cl->state != Client_Error)
827     {
828         yaz_log(YLOG_DEBUG, "Good Present response");
829         ingest_records(cl, r->records);
830         cl->state = Client_Idle;
831     }
832     else if (*r->presentStatus) 
833     {
834         yaz_log(YLOG_WARN, "Bad Present response");
835         cl->state = Client_Error;
836     }
837 }
838
839 static void handler(IOCHAN i, int event)
840 {
841     struct connection *co = iochan_getdata(i);
842     struct client *cl = co->client;
843     struct session *se = 0;
844
845     if (cl)
846         se = cl->session;
847     else
848     {
849         yaz_log(YLOG_WARN, "Destroying orphan connection");
850         connection_destroy(co);
851         return;
852     }
853
854     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
855     {
856         int errcode;
857         socklen_t errlen = sizeof(errcode);
858
859         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
860             &errlen) < 0 || errcode != 0)
861         {
862             client_fatal(cl);
863             return;
864         }
865         else
866         {
867             yaz_log(YLOG_DEBUG, "Connect OK");
868             co->state = Conn_Open;
869             if (cl)
870                 cl->state = Client_Connected;
871         }
872     }
873
874     else if (event & EVENT_INPUT)
875     {
876         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
877
878         if (len < 0)
879         {
880             yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from Z server");
881             connection_destroy(co);
882             return;
883         }
884         else if (len == 0)
885         {
886             yaz_log(YLOG_WARN, "EOF reading from Z server");
887             connection_destroy(co);
888             return;
889         }
890         else if (len > 1) // We discard input if we have no connection
891         {
892             co->state = Conn_Open;
893
894             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
895             {
896                 Z_APDU *a;
897
898                 odr_reset(global_parameters.odr_in);
899                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
900                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
901                 {
902                     client_fatal(cl);
903                     return;
904                 }
905                 switch (a->which)
906                 {
907                     case Z_APDU_initResponse:
908                         do_initResponse(i, a);
909                         break;
910                     case Z_APDU_searchResponse:
911                         do_searchResponse(i, a);
912                         break;
913                     case Z_APDU_presentResponse:
914                         do_presentResponse(i, a);
915                         break;
916                     default:
917                         yaz_log(YLOG_WARN, "Unexpected result from server");
918                         client_fatal(cl);
919                         return;
920                 }
921                 // We aren't expecting staggered output from target
922                 // if (cs_more(t->link))
923                 //    iochan_setevent(i, EVENT_INPUT);
924             }
925             else  // we throw away response and go to idle mode
926             {
927                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
928                 cl->state = Client_Idle;
929             }
930         }
931         /* if len==1 we do nothing but wait for more input */
932     }
933
934     if (cl->state == Client_Connected) {
935         send_init(i);
936     }
937
938     if (cl->state == Client_Idle)
939     {
940         if (cl->requestid != se->requestid && *se->query) {
941             send_search(i);
942         }
943         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
944             cl->records < cl->hits) {
945             send_present(i);
946         }
947     }
948 }
949
950 // Disassociate connection from client
951 static void connection_release(struct connection *co)
952 {
953     struct client *cl = co->client;
954
955     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
956     if (!cl)
957         return;
958     cl->connection = 0;
959     co->client = 0;
960 }
961
962 // Close connection and recycle structure
963 static void connection_destroy(struct connection *co)
964 {
965     struct host *h = co->host;
966     cs_close(co->link);
967     iochan_destroy(co->iochan);
968
969     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
970     if (h->connections == co)
971         h->connections = co->next;
972     else
973     {
974         struct connection *pco;
975         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
976             ;
977         if (pco)
978             pco->next = co->next;
979         else
980             abort();
981     }
982     if (co->client)
983     {
984         if (co->client->state != Client_Idle)
985             co->client->state = Client_Disconnected;
986         co->client->connection = 0;
987     }
988     co->next = connection_freelist;
989     connection_freelist = co;
990 }
991
992 // Creates a new connection for client, associated with the host of 
993 // client's database
994 static struct connection *connection_create(struct client *cl)
995 {
996     struct connection *new;
997     COMSTACK link; 
998     int res;
999     void *addr;
1000
1001
1002     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
1003         {
1004             yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
1005             exit(1);
1006         }
1007     
1008     if (0 == strlen(global_parameters.zproxy_override)){
1009         /* no Z39.50 proxy needed - direct connect */
1010         yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->url);
1011         
1012         if (!(addr = cs_straddr(link, cl->database->host->ipport)))
1013             {
1014                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1015                         "Lookup of IP address %s failed", 
1016                         cl->database->host->ipport);
1017                 return 0;
1018             }
1019     
1020     } else {
1021         /* Z39.50 proxy connect */
1022         yaz_log(YLOG_DEBUG, "Connection create %s proxy %s", 
1023                 cl->database->url, global_parameters.zproxy_override);
1024
1025         yaz_log(YLOG_LOG, "Connection cs_create_host %s proxy %s", 
1026                 cl->database->url, global_parameters.zproxy_override);
1027         
1028         if (!(addr = cs_straddr(link, global_parameters.zproxy_override)))
1029             {
1030                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1031                         "Lookup of IP address %s failed", 
1032                         global_parameters.zproxy_override);
1033                 return 0;
1034             }
1035     }
1036
1037     res = cs_connect(link, addr);
1038     if (res < 0)
1039     {
1040         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->url);
1041         return 0;
1042     }
1043
1044     if ((new = connection_freelist))
1045         connection_freelist = new->next;
1046     else
1047     {
1048         new = xmalloc(sizeof (struct connection));
1049         new->ibuf = 0;
1050         new->ibufsize = 0;
1051     }
1052     new->state = Conn_Connecting;
1053     new->host = cl->database->host;
1054     new->next = new->host->connections;
1055     new->host->connections = new;
1056     new->client = cl;
1057     cl->connection = new;
1058     new->link = link;
1059
1060     new->iochan = iochan_create(cs_fileno(link), 0, handler, 0);
1061     iochan_setdata(new->iochan, new);
1062     new->iochan->next = channel_list;
1063     channel_list = new->iochan;
1064     return new;
1065 }
1066
1067 // Close connection and set state to error
1068 static void client_fatal(struct client *cl)
1069 {
1070     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->url);
1071     connection_destroy(cl->connection);
1072     cl->state = Client_Error;
1073 }
1074
1075 // Ensure that client has a connection associated
1076 static int client_prep_connection(struct client *cl)
1077 {
1078     struct connection *co;
1079     struct session *se = cl->session;
1080     struct host *host = cl->database->host;
1081
1082     co = cl->connection;
1083
1084     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->url);
1085
1086     if (!co)
1087     {
1088         // See if someone else has an idle connection
1089         // We should look at timestamps here to select the longest-idle connection
1090         for (co = host->connections; co; co = co->next)
1091             if (co->state == Conn_Open && (!co->client || co->client->session != se))
1092                 break;
1093         if (co)
1094         {
1095             connection_release(co);
1096             cl->connection = co;
1097             co->client = cl;
1098         }
1099         else
1100             co = connection_create(cl);
1101     }
1102     if (co)
1103     {
1104         if (co->state == Conn_Connecting)
1105         {
1106             cl->state = Client_Connecting;
1107             iochan_setflag(co->iochan, EVENT_OUTPUT);
1108         }
1109         else if (co->state == Conn_Open)
1110         {
1111             if (cl->state == Client_Error || cl->state == Client_Disconnected)
1112                 cl->state = Client_Idle;
1113             iochan_setflag(co->iochan, EVENT_OUTPUT);
1114         }
1115         return 1;
1116     }
1117     else
1118         return 0;
1119 }
1120
1121 #ifdef GAGA // Moved to database.c
1122
1123 // This function will most likely vanish when a proper target profile mechanism is
1124 // introduced.
1125 void load_simpletargets(const char *fn)
1126 {
1127     FILE *f = fopen(fn, "r");
1128     char line[256];
1129
1130     if (!f)
1131     {
1132         yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
1133         exit(1);
1134     }
1135
1136     while (fgets(line, 255, f))
1137     {
1138         char *url, *db;
1139         char *name;
1140         struct host *host;
1141         struct database *database;
1142
1143         if (strncmp(line, "target ", 7))
1144             continue;
1145         line[strlen(line) - 1] = '\0';
1146
1147         if ((name = strchr(line, ';')))
1148             *(name++) = '\0';
1149
1150         url = line + 7;
1151         if ((db = strchr(url, '/')))
1152             *(db++) = '\0';
1153         else
1154             db = "Default";
1155
1156         yaz_log(YLOG_LOG, "Target: %s, '%s'", url, db);
1157         for (host = hosts; host; host = host->next)
1158             if (!strcmp((const char *) url, host->hostport))
1159                 break;
1160         if (!host)
1161         {
1162             struct addrinfo *addrinfo, hints;
1163             char *port;
1164             char ipport[128];
1165             unsigned char addrbuf[4];
1166             int res;
1167
1168             host = xmalloc(sizeof(struct host));
1169             host->hostport = xstrdup(url);
1170             host->connections = 0;
1171
1172             if ((port = strchr(url, ':')))
1173                 *(port++) = '\0';
1174             else
1175                 port = "210";
1176
1177             hints.ai_flags = 0;
1178             hints.ai_family = PF_INET;
1179             hints.ai_socktype = SOCK_STREAM;
1180             hints.ai_protocol = IPPROTO_TCP;
1181             hints.ai_addrlen = 0;
1182             hints.ai_addr = 0;
1183             hints.ai_canonname = 0;
1184             hints.ai_next = 0;
1185             // This is not robust code. It assumes that getaddrinfo returns AF_INET
1186             // address.
1187             if ((res = getaddrinfo(url, port, &hints, &addrinfo)))
1188             {
1189                 yaz_log(YLOG_WARN, "Failed to resolve %s: %s", url, gai_strerror(res));
1190                 xfree(host->hostport);
1191                 xfree(host);
1192                 continue;
1193             }
1194             assert(addrinfo->ai_family == PF_INET);
1195             memcpy(addrbuf, &((struct sockaddr_in*)addrinfo->ai_addr)->sin_addr.s_addr, 4);
1196             sprintf(ipport, "%u.%u.%u.%u:%s",
1197                     addrbuf[0], addrbuf[1], addrbuf[2], addrbuf[3], port);
1198             host->ipport = xstrdup(ipport);
1199             freeaddrinfo(addrinfo);
1200             host->next = hosts;
1201             hosts = host;
1202         }
1203         database = xmalloc(sizeof(struct database));
1204         database->host = host;
1205         database->url = xmalloc(strlen(url) + strlen(db) + 2);
1206         strcpy(database->url, url);
1207         strcat(database->url, "/");
1208         strcat(database->url, db);
1209         if (name)
1210             database->name = xstrdup(name);
1211         else
1212             database->name = 0;
1213         
1214         database->databases = xmalloc(2 * sizeof(char *));
1215         database->databases[0] = xstrdup(db);
1216         database->databases[1] = 0;
1217         database->errors = 0;
1218         database->qprofile = 0;
1219         database->rprofile = database_retrieval_profile(database);
1220         database->next = databases;
1221         databases = database;
1222
1223     }
1224     fclose(f);
1225 }
1226
1227 #endif
1228
1229 static struct client *client_create(void)
1230 {
1231     struct client *r;
1232     if (client_freelist)
1233     {
1234         r = client_freelist;
1235         client_freelist = client_freelist->next;
1236     }
1237     else
1238         r = xmalloc(sizeof(struct client));
1239     r->database = 0;
1240     r->connection = 0;
1241     r->session = 0;
1242     r->hits = 0;
1243     r->records = 0;
1244     r->setno = 0;
1245     r->requestid = -1;
1246     r->diagnostic = 0;
1247     r->state = Client_Disconnected;
1248     r->next = 0;
1249     return r;
1250 }
1251
1252 void client_destroy(struct client *c)
1253 {
1254     struct session *se = c->session;
1255     if (c == se->clients)
1256         se->clients = c->next;
1257     else
1258     {
1259         struct client *cc;
1260         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
1261             ;
1262         if (cc)
1263             cc->next = c->next;
1264     }
1265     if (c->connection)
1266         connection_release(c->connection);
1267     c->next = client_freelist;
1268     client_freelist = c;
1269 }
1270
1271 void session_set_watch(struct session *s, int what, session_watchfun fun, void *data)
1272 {
1273     s->watchlist[what].fun = fun;
1274     s->watchlist[what].data = data;
1275 }
1276
1277 void session_alert_watch(struct session *s, int what)
1278 {
1279     if (!s->watchlist[what].fun)
1280         return;
1281     (*s->watchlist[what].fun)(s->watchlist[what].data);
1282     s->watchlist[what].fun = 0;
1283     s->watchlist[what].data = 0;
1284 }
1285
1286 //callback for grep_databases
1287 static void select_targets_callback(void *context, struct database *db)
1288 {
1289     struct session *se = (struct session*) context;
1290     struct client *cl = client_create();
1291     cl->database = db;
1292     cl->session = se;
1293     cl->next = se->clients;
1294     se->clients = cl;
1295 }
1296
1297 // This should be extended with parameters to control selection criteria
1298 // Associates a set of clients with a session;
1299 int select_targets(struct session *se, struct database_criterion *crit)
1300 {
1301     while (se->clients)
1302         client_destroy(se->clients);
1303
1304     return grep_databases(se, crit, select_targets_callback);
1305 }
1306
1307 int session_active_clients(struct session *s)
1308 {
1309     struct client *c;
1310     int res = 0;
1311
1312     for (c = s->clients; c; c = c->next)
1313         if (c->connection && (c->state == Client_Connecting ||
1314                     c->state == Client_Initializing ||
1315                     c->state == Client_Searching ||
1316                     c->state == Client_Presenting))
1317             res++;
1318
1319     return res;
1320 }
1321
1322 // parses crit1=val1,crit2=val2|val3,...
1323 static struct database_criterion *parse_filter(NMEM m, const char *buf)
1324 {
1325     struct database_criterion *res = 0;
1326     char **values;
1327     int num;
1328     int i;
1329
1330     if (!buf || !*buf)
1331         return 0;
1332     nmem_strsplit(m, ",", buf,  &values, &num);
1333     for (i = 0; i < num; i++)
1334     {
1335         char **subvalues;
1336         int subnum;
1337         int subi;
1338         struct database_criterion *new = nmem_malloc(m, sizeof(*new));
1339         char *eq = strchr(values[i], '=');
1340         if (!eq)
1341         {
1342             yaz_log(YLOG_WARN, "Missing equal-sign in filter");
1343             return 0;
1344         }
1345         *(eq++) = '\0';
1346         new->name = values[i];
1347         nmem_strsplit(m, "|", eq, &subvalues, &subnum);
1348         new->values = 0;
1349         for (subi = 0; subi < subnum; subi++)
1350         {
1351             struct database_criterion_value *newv = nmem_malloc(m, sizeof(*newv));
1352             newv->value = subvalues[subi];
1353             newv->next = new->values;
1354             new->values = newv;
1355         }
1356         new->next = res;
1357         res = new;
1358     }
1359     return res;
1360 }
1361
1362 char *search(struct session *se, char *query, char *filter)
1363 {
1364     int live_channels = 0;
1365     struct client *cl;
1366     struct database_criterion *criteria;
1367
1368     yaz_log(YLOG_DEBUG, "Search");
1369
1370     nmem_reset(se->nmem);
1371     criteria = parse_filter(se->nmem, filter);
1372     strcpy(se->query, query);
1373     se->requestid++;
1374     // Release any existing clients
1375     select_targets(se, criteria);
1376     for (cl = se->clients; cl; cl = cl->next)
1377     {
1378         if (client_prep_connection(cl))
1379             live_channels++;
1380     }
1381     if (live_channels)
1382     {
1383         int maxrecs = live_channels * global_parameters.toget;
1384         se->num_termlists = 0;
1385         se->reclist = reclist_create(se->nmem, maxrecs);
1386         // This will be initialized in send_search()
1387         se->relevance = 0;
1388         se->total_records = se->total_hits = se->total_merged = 0;
1389         se->expected_maxrecs = maxrecs;
1390     }
1391     else
1392         return "NOTARGETS";
1393
1394     return 0;
1395 }
1396
1397 void destroy_session(struct session *s)
1398 {
1399     yaz_log(YLOG_LOG, "Destroying session");
1400     while (s->clients)
1401         client_destroy(s->clients);
1402     nmem_destroy(s->nmem);
1403     wrbuf_destroy(s->wrbuf);
1404 }
1405
1406 struct session *new_session() 
1407 {
1408     int i;
1409     struct session *session = xmalloc(sizeof(*session));
1410
1411     yaz_log(YLOG_DEBUG, "New pazpar2 session");
1412     
1413     session->total_hits = 0;
1414     session->total_records = 0;
1415     session->num_termlists = 0;
1416     session->reclist = 0;
1417     session->requestid = -1;
1418     session->clients = 0;
1419     session->expected_maxrecs = 0;
1420     session->query[0] = '\0';
1421     session->nmem = nmem_create();
1422     session->wrbuf = wrbuf_alloc();
1423     for (i = 0; i <= SESSION_WATCH_MAX; i++)
1424     {
1425         session->watchlist[i].data = 0;
1426         session->watchlist[i].fun = 0;
1427     }
1428
1429     return session;
1430 }
1431
1432 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1433 {
1434     static struct hitsbytarget res[1000]; // FIXME MM
1435     struct client *cl;
1436
1437     *count = 0;
1438     for (cl = se->clients; cl; cl = cl->next)
1439     {
1440         res[*count].id = cl->database->url;
1441         res[*count].name = cl->database->name;
1442         res[*count].hits = cl->hits;
1443         res[*count].records = cl->records;
1444         res[*count].diagnostic = cl->diagnostic;
1445         res[*count].state = client_states[cl->state];
1446         res[*count].connected  = cl->connection ? 1 : 0;
1447         (*count)++;
1448     }
1449
1450     return res;
1451 }
1452
1453 struct termlist_score **termlist(struct session *s, const char *name, int *num)
1454 {
1455     int i;
1456
1457     for (i = 0; i < s->num_termlists; i++)
1458         if (!strcmp((const char *) s->termlists[i].name, name))
1459             return termlist_highscore(s->termlists[i].termlist, num);
1460     return 0;
1461 }
1462
1463 #ifdef MISSING_HEADERS
1464 void report_nmem_stats(void)
1465 {
1466     size_t in_use, is_free;
1467
1468     nmem_get_memory_in_use(&in_use);
1469     nmem_get_memory_free(&is_free);
1470
1471     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
1472             (long) in_use, (long) is_free);
1473 }
1474 #endif
1475
1476 struct record_cluster *show_single(struct session *s, int id)
1477 {
1478     struct record_cluster *r;
1479
1480     reclist_rewind(s->reclist);
1481     while ((r = reclist_read_record(s->reclist)))
1482         if (r->recid == id)
1483             return r;
1484     return 0;
1485 }
1486
1487 struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, int start,
1488         int *num, int *total, int *sumhits, NMEM nmem_show)
1489 {
1490     struct record_cluster **recs = nmem_malloc(nmem_show, *num 
1491                                        * sizeof(struct record_cluster *));
1492     struct reclist_sortparms *spp;
1493     int i;
1494 #if USE_TIMING    
1495     yaz_timing_t t = yaz_timing_create();
1496 #endif
1497
1498     for (spp = sp; spp; spp = spp->next)
1499         if (spp->type == Metadata_sortkey_relevance)
1500         {
1501             relevance_prepare_read(s->relevance, s->reclist);
1502             break;
1503         }
1504     reclist_sort(s->reclist, sp);
1505
1506     *total = s->reclist->num_records;
1507     *sumhits = s->total_hits;
1508
1509     for (i = 0; i < start; i++)
1510         if (!reclist_read_record(s->reclist))
1511         {
1512             *num = 0;
1513             recs = 0;
1514             break;
1515         }
1516
1517     for (i = 0; i < *num; i++)
1518     {
1519         struct record_cluster *r = reclist_read_record(s->reclist);
1520         if (!r)
1521         {
1522             *num = i;
1523             break;
1524         }
1525         recs[i] = r;
1526     }
1527 #if USE_TIMING
1528     yaz_timing_stop(t);
1529     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
1530             yaz_timing_get_real(t), yaz_timing_get_user(t),
1531             yaz_timing_get_sys(t));
1532     yaz_timing_destroy(&t);
1533 #endif
1534     return recs;
1535 }
1536
1537 void statistics(struct session *se, struct statistics *stat)
1538 {
1539     struct client *cl;
1540     int count = 0;
1541
1542     memset(stat, 0, sizeof(*stat));
1543     for (cl = se->clients; cl; cl = cl->next)
1544     {
1545         if (!cl->connection)
1546             stat->num_no_connection++;
1547         switch (cl->state)
1548         {
1549             case Client_Connecting: stat->num_connecting++; break;
1550             case Client_Initializing: stat->num_initializing++; break;
1551             case Client_Searching: stat->num_searching++; break;
1552             case Client_Presenting: stat->num_presenting++; break;
1553             case Client_Idle: stat->num_idle++; break;
1554             case Client_Failed: stat->num_failed++; break;
1555             case Client_Error: stat->num_error++; break;
1556             default: break;
1557         }
1558         count++;
1559     }
1560     stat->num_hits = se->total_hits;
1561     stat->num_records = se->total_records;
1562
1563     stat->num_clients = count;
1564 }
1565
1566 static void start_http_listener(void)
1567 {
1568     char hp[128] = "";
1569     struct conf_server *ser = global_parameters.server;
1570
1571     if (*global_parameters.listener_override)
1572         strcpy(hp, global_parameters.listener_override);
1573     else
1574     {
1575         strcpy(hp, ser->host ? ser->host : "");
1576         if (ser->port)
1577         {
1578             if (*hp)
1579                 strcat(hp, ":");
1580             sprintf(hp + strlen(hp), "%d", ser->port);
1581         }
1582     }
1583     http_init(hp);
1584 }
1585
1586 // Initialize CCL map for a target
1587 // Note: This approach ignores user-specific CCL maps, for which I
1588 // don't presently see any application.
1589 static void prepare_cclmap(void *context, struct database *db)
1590 {
1591     struct setting *s;
1592
1593     if (!db->settings)
1594         return;
1595     db->ccl_map = ccl_qual_mk();
1596     for (s = db->settings[PZ_CCLMAP]; s; s = s->next)
1597         if (!*s->user)
1598         {
1599             char *p = strchr(s->name + 3, ':');
1600             if (!p)
1601             {
1602                 yaz_log(YLOG_FATAL, "Malformed cclmap name: %s", s->name);
1603                 exit(1);
1604             }
1605             p++;
1606             ccl_qual_fitem(db->ccl_map, s->value, p);
1607         }
1608 }
1609
1610 // Read settings for each database, and prepare a CCL map for that database
1611 static void prepare_cclmaps(void)
1612 {
1613     grep_databases(0, 0, prepare_cclmap);
1614 }
1615
1616 static void start_proxy(void)
1617 {
1618     char hp[128] = "";
1619     struct conf_server *ser = global_parameters.server;
1620
1621     if (*global_parameters.proxy_override)
1622         strcpy(hp, global_parameters.proxy_override);
1623     else if (ser->proxy_host || ser->proxy_port)
1624     {
1625         strcpy(hp, ser->proxy_host ? ser->proxy_host : "");
1626         if (ser->proxy_port)
1627         {
1628             if (*hp)
1629                 strcat(hp, ":");
1630             sprintf(hp + strlen(hp), "%d", ser->proxy_port);
1631         }
1632     }
1633     else
1634         return;
1635
1636     http_set_proxyaddr(hp, ser->myurl ? ser->myurl : "");
1637 }
1638
1639 static void start_zproxy(void)
1640 {
1641     struct conf_server *ser = global_parameters.server;
1642
1643     if (*global_parameters.zproxy_override){
1644         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1645                 global_parameters.zproxy_override);
1646         return;
1647     }
1648
1649     else if (ser->zproxy_host || ser->zproxy_port)
1650     {
1651         char hp[128] = "";
1652
1653         strcpy(hp, ser->zproxy_host ? ser->zproxy_host : "");
1654         if (ser->zproxy_port)
1655         {
1656             if (*hp)
1657                 strcat(hp, ":");
1658             else
1659                 strcat(hp, "@:");
1660
1661             sprintf(hp + strlen(hp), "%d", ser->zproxy_port);
1662         }
1663         strcpy(global_parameters.zproxy_override, hp);
1664         yaz_log(YLOG_LOG, "Z39.50 proxy %s", 
1665                 global_parameters.zproxy_override);
1666
1667     }
1668     else
1669         return;
1670 }
1671
1672
1673
1674 int main(int argc, char **argv)
1675 {
1676     int ret;
1677     char *arg;
1678
1679     if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
1680         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
1681
1682     yaz_log_init(YLOG_DEFAULT_LEVEL, "pazpar2", 0);
1683
1684     while ((ret = options("t:f:x:h:p:z:s:d", argv, argc, &arg)) != -2)
1685     {
1686         switch (ret) {
1687             case 'f':
1688                 if (!read_config(arg))
1689                     exit(1);
1690                 break;
1691             case 'h':
1692                 strcpy(global_parameters.listener_override, arg);
1693                 break;
1694             case 'p':
1695                 strcpy(global_parameters.proxy_override, arg);
1696                 break;
1697             case 'z':
1698                 strcpy(global_parameters.zproxy_override, arg);
1699                 break;
1700             case 't':
1701                 strcpy(global_parameters.settings_path_override, arg);
1702                 break;
1703             case 's':
1704                 load_simpletargets(arg);
1705                 break;
1706             case 'd':
1707                 global_parameters.dump_records = 1;
1708                 break;
1709             default:
1710                 fprintf(stderr, "Usage: pazpar2\n"
1711                         "    -f configfile\n"
1712                         "    -h [host:]port          (REST protocol listener)\n"
1713                         "    -C cclconfig\n"
1714                         "    -s simpletargetfile\n"
1715                         "    -p hostname[:portno]    (HTTP proxy)\n"
1716                         "    -z hostname[:portno]    (Z39.50 proxy)\n"
1717                         "    -d                      (show internal records)\n");
1718                 exit(1);
1719         }
1720     }
1721
1722     if (!config)
1723     {
1724         yaz_log(YLOG_FATAL, "Load config with -f");
1725         exit(1);
1726     }
1727     global_parameters.server = config->servers;
1728
1729     start_http_listener();
1730     start_proxy();
1731     start_zproxy();
1732
1733     if (*global_parameters.settings_path_override)
1734         settings_read(global_parameters.settings_path_override);
1735     else if (global_parameters.server->settings)
1736         settings_read(global_parameters.server->settings);
1737     else
1738         yaz_log(YLOG_WARN, "No settings-directory specified. Problems may ensue!");
1739     prepare_cclmaps();
1740     global_parameters.yaz_marc = yaz_marc_create();
1741     yaz_marc_subfield_str(global_parameters.yaz_marc, "\t");
1742     global_parameters.odr_in = odr_createmem(ODR_DECODE);
1743     global_parameters.odr_out = odr_createmem(ODR_ENCODE);
1744
1745     event_loop(&channel_list);
1746
1747     return 0;
1748 }
1749
1750 /*
1751  * Local variables:
1752  * c-basic-offset: 4
1753  * indent-tabs-mode: nil
1754  * End:
1755  * vim: shiftwidth=4 tabstop=8 expandtab
1756  */