Moved CCL Map, record syntax, charset normalization, record syntax normalization...
[pazpar2-moved-to-github.git] / src / pazpar2.c
1 /* $Id: pazpar2.c,v 1.64 2007-04-08 20:52:09 quinn Exp $ */
2
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <sys/time.h>
7 #include <unistd.h>
8 #include <sys/socket.h>
9 #include <netdb.h>
10 #include <signal.h>
11 #include <ctype.h>
12 #include <assert.h>
13
14 #include <yaz/marcdisp.h>
15 #include <yaz/comstack.h>
16 #include <yaz/tcpip.h>
17 #include <yaz/proto.h>
18 #include <yaz/readconf.h>
19 #include <yaz/pquery.h>
20 #include <yaz/otherinfo.h>
21 #include <yaz/yaz-util.h>
22 #include <yaz/nmem.h>
23
24 #if HAVE_CONFIG_H
25 #include "cconfig.h"
26 #endif
27
28 #define USE_TIMING 0
29 #if USE_TIMING
30 #include <yaz/timing.h>
31 #endif
32
33 #include <netinet/in.h>
34
35 #include "pazpar2.h"
36 #include "eventl.h"
37 #include "http.h"
38 #include "termlists.h"
39 #include "reclists.h"
40 #include "relevance.h"
41 #include "config.h"
42 #include "database.h"
43 #include "settings.h"
44
45 #define MAX_CHUNK 15
46
47 static void client_fatal(struct client *cl);
48 static void connection_destroy(struct connection *co);
49 static int client_prep_connection(struct client *cl);
50 static void ingest_records(struct client *cl, Z_Records *r);
51 //static struct conf_retrievalprofile *database_retrieval_profile(struct database *db);
52 void session_alert_watch(struct session *s, int what);
53 char *session_setting_oneval(struct session *s, struct database *db, int offset);
54
55 IOCHAN channel_list = 0;  // Master list of connections we're handling events to
56
57 static struct connection *connection_freelist = 0;
58 static struct client *client_freelist = 0;
59
60 static char *client_states[] = {
61     "Client_Connecting",
62     "Client_Connected",
63     "Client_Idle",
64     "Client_Initializing",
65     "Client_Searching",
66     "Client_Presenting",
67     "Client_Error",
68     "Client_Failed",
69     "Client_Disconnected",
70     "Client_Stopped"
71 };
72
73 // Note: Some things in this structure will eventually move to configuration
74 struct parameters global_parameters = 
75 {
76     "",
77     "",
78     "",
79     "",
80     0,
81     0,
82     30,
83     "81",
84     "Index Data PazPar2 (MasterKey)",
85     VERSION,
86     600, // 10 minutes
87     60,
88     100,
89     MAX_CHUNK,
90     0,
91     0
92 };
93
94 static int send_apdu(struct client *c, Z_APDU *a)
95 {
96     struct connection *co = c->connection;
97     char *buf;
98     int len, r;
99
100     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
101     {
102         odr_perror(global_parameters.odr_out, "Encoding APDU");
103         abort();
104     }
105     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
106     r = cs_put(co->link, buf, len);
107     if (r < 0)
108     {
109         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
110         return -1;
111     }
112     else if (r == 1)
113     {
114         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
115         exit(1);
116     }
117     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
118     co->state = Conn_Waiting;
119     return 0;
120 }
121
122
123 static void send_init(IOCHAN i)
124 {
125
126     struct connection *co = iochan_getdata(i);
127     struct client *cl = co->client;
128     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
129
130     a->u.initRequest->implementationId = global_parameters.implementationId;
131     a->u.initRequest->implementationName = global_parameters.implementationName;
132     a->u.initRequest->implementationVersion =
133         global_parameters.implementationVersion;
134     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
135     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
136     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
137
138     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
139     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
140     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
141
142
143     /* add virtual host if tunneling through Z39.50 proxy */
144     
145     if (0 < strlen(global_parameters.zproxy_override) 
146         && 0 < strlen(cl->database->url))
147         yaz_oi_set_string_oidval(&a->u.initRequest->otherInfo, 
148                                  global_parameters.odr_out, VAL_PROXY,
149                                  1, cl->database->url);
150     
151
152
153     if (send_apdu(cl, a) >= 0)
154     {
155         iochan_setflags(i, EVENT_INPUT);
156         cl->state = Client_Initializing;
157     }
158     else
159         cl->state = Client_Error;
160     odr_reset(global_parameters.odr_out);
161 }
162
163 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
164 {
165     switch (n->kind)
166     {
167         case CCL_RPN_AND:
168         case CCL_RPN_OR:
169         case CCL_RPN_NOT:
170         case CCL_RPN_PROX:
171             pull_terms(nmem, n->u.p[0], termlist, num);
172             pull_terms(nmem, n->u.p[1], termlist, num);
173             break;
174         case CCL_RPN_TERM:
175             termlist[(*num)++] = nmem_strdup(nmem, n->u.t.term);
176             break;
177         default: // NOOP
178             break;
179     }
180 }
181
182 // Extract terms from query into null-terminated termlist
183 static void extract_terms(NMEM nmem, struct ccl_rpn_node *query, char **termlist)
184 {
185     int num = 0;
186
187     pull_terms(nmem, query, termlist, &num);
188     termlist[num] = 0;
189 }
190
191 static void send_search(IOCHAN i)
192 {
193     struct connection *co = iochan_getdata(i);
194     struct client *cl = co->client; 
195     struct session *se = cl->session;
196     struct database *db = cl->database;
197     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
198     int ndb, cerror, cpos;
199     char **databaselist;
200     Z_Query *zquery;
201     struct ccl_rpn_node *cn;
202     int ssub = 0, lslb = 100000, mspn = 10;
203     char *recsyn;
204     char *piggyback;
205
206     yaz_log(YLOG_DEBUG, "Sending search");
207
208     cn = ccl_find_str(db->ccl_map, se->query, &cerror, &cpos);
209     if (!cn)
210         return;
211
212     if (!se->relevance)
213     {
214         // Initialize relevance structure with query terms
215         char *p[512];
216         extract_terms(se->nmem, cn, p);
217         se->relevance = relevance_create(se->nmem, (const char **) p,
218                 se->expected_maxrecs);
219     }
220
221     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
222             sizeof(Z_Query));
223     zquery->which = Z_Query_type_1;
224     zquery->u.type_1 = ccl_rpn_query(global_parameters.odr_out, cn);
225     ccl_rpn_delete(cn);
226
227     for (ndb = 0; db->databases[ndb]; ndb++)
228         ;
229     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
230     for (ndb = 0; db->databases[ndb]; ndb++)
231         databaselist[ndb] = db->databases[ndb];
232
233     if (!(piggyback = session_setting_oneval(se, db, PZ_PIGGYBACK)) || *piggyback == '1')
234     {
235         if ((recsyn = session_setting_oneval(se, db, PZ_NATIVESYNTAX)))
236             a->u.searchRequest->preferredRecordSyntax =
237                     yaz_str_to_z3950oid(global_parameters.odr_out,
238                     CLASS_RECSYN, recsyn);
239         a->u.searchRequest->smallSetUpperBound = &ssub;
240         a->u.searchRequest->largeSetLowerBound = &lslb;
241         a->u.searchRequest->mediumSetPresentNumber = &mspn;
242     }
243     a->u.searchRequest->resultSetName = "Default";
244     a->u.searchRequest->databaseNames = databaselist;
245     a->u.searchRequest->num_databaseNames = ndb;
246
247     if (send_apdu(cl, a) >= 0)
248     {
249         iochan_setflags(i, EVENT_INPUT);
250         cl->state = Client_Searching;
251         cl->requestid = se->requestid;
252     }
253     else
254         cl->state = Client_Error;
255
256     odr_reset(global_parameters.odr_out);
257 }
258
259 static void send_present(IOCHAN i)
260 {
261     struct connection *co = iochan_getdata(i);
262     struct client *cl = co->client; 
263     struct session *se = cl->session;
264     struct database *db = cl->database;
265     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
266     int toget;
267     int start = cl->records + 1;
268     char *recsyn;
269
270     toget = global_parameters.chunk;
271     if (toget > global_parameters.toget - cl->records)
272         toget = global_parameters.toget - cl->records;
273     if (toget > cl->hits - cl->records)
274         toget = cl->hits - cl->records;
275
276     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
277
278     a->u.presentRequest->resultSetStartPoint = &start;
279     a->u.presentRequest->numberOfRecordsRequested = &toget;
280
281     a->u.presentRequest->resultSetId = "Default";
282
283     if ((recsyn = session_setting_oneval(se, db, PZ_NATIVESYNTAX)))
284         a->u.presentRequest->preferredRecordSyntax =
285                 yaz_str_to_z3950oid(global_parameters.odr_out,
286                 CLASS_RECSYN, recsyn);
287
288     if (send_apdu(cl, a) >= 0)
289     {
290         iochan_setflags(i, EVENT_INPUT);
291         cl->state = Client_Presenting;
292     }
293     else
294         cl->state = Client_Error;
295     odr_reset(global_parameters.odr_out);
296 }
297
298 static void do_initResponse(IOCHAN i, Z_APDU *a)
299 {
300     struct connection *co = iochan_getdata(i);
301     struct client *cl = co->client;
302     Z_InitResponse *r = a->u.initResponse;
303
304     yaz_log(YLOG_DEBUG, "Init response %s", cl->database->url);
305
306     if (*r->result)
307     {
308         cl->state = Client_Idle;
309     }
310     else
311         cl->state = Client_Failed; // FIXME need to do something to the connection
312 }
313
314 static void do_searchResponse(IOCHAN i, Z_APDU *a)
315 {
316     struct connection *co = iochan_getdata(i);
317     struct client *cl = co->client;
318     struct session *se = cl->session;
319     Z_SearchResponse *r = a->u.searchResponse;
320
321     yaz_log(YLOG_DEBUG, "Search response %s (status=%d)", 
322             cl->database->url, *r->searchStatus);
323
324     if (*r->searchStatus)
325     {
326         cl->hits = *r->resultCount;
327         se->total_hits += cl->hits;
328         if (r->presentStatus && !*r->presentStatus && r->records)
329         {
330             yaz_log(YLOG_DEBUG, "Records in search response %s", 
331                     cl->database->url);
332             ingest_records(cl, r->records);
333         }
334         cl->state = Client_Idle;
335     }
336     else
337     {          /*"FAILED"*/
338         cl->hits = 0;
339         cl->state = Client_Error;
340         if (r->records) {
341             Z_Records *recs = r->records;
342             if (recs->which == Z_Records_NSD)
343             {
344                 yaz_log(YLOG_WARN, 
345                         "Search response: Non-surrogate diagnostic %s",
346                         cl->database->url);
347                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
348                 cl->state = Client_Error;
349             }
350         }
351     }
352 }
353
354 static void do_closeResponse(IOCHAN i, Z_APDU *a)
355 {
356     struct connection *co = iochan_getdata(i);
357     struct client *cl = co->client;
358     /* Z_Close *r = a->u.close; */
359
360     yaz_log(YLOG_WARN, "Close response %s", cl->database->url);
361
362     cl->state = Client_Failed;
363     connection_destroy(co);
364 }
365
366
367 char *normalize_mergekey(char *buf, int skiparticle)
368 {
369     char *p = buf, *pout = buf;
370
371     if (skiparticle)
372     {
373         char firstword[64];
374         char articles[] = "the den der die des an a "; // must end in space
375
376         while (*p && !isalnum(*p))
377             p++;
378         pout = firstword;
379         while (*p && *p != ' ' && pout - firstword < 62)
380             *(pout++) = tolower(*(p++));
381         *(pout++) = ' ';
382         *(pout++) = '\0';
383         if (!strstr(articles, firstword))
384             p = buf;
385         pout = buf;
386     }
387
388     while (*p)
389     {
390         while (*p && !isalnum(*p))
391             p++;
392         while (isalnum(*p))
393             *(pout++) = tolower(*(p++));
394         if (*p)
395             *(pout++) = ' ';
396         while (*p && !isalnum(*p))
397             p++;
398     }
399     if (buf != pout)
400         do {
401             *(pout--) = '\0';
402         }
403         while (pout > buf && *pout == ' ');
404
405     return buf;
406 }
407
408 static void add_facet(struct session *s, const char *type, const char *value)
409 {
410     int i;
411
412     if (!*value)
413         return;
414     for (i = 0; i < s->num_termlists; i++)
415         if (!strcmp(s->termlists[i].name, type))
416             break;
417     if (i == s->num_termlists)
418     {
419         if (i == SESSION_MAX_TERMLISTS)
420         {
421             yaz_log(YLOG_FATAL, "Too many termlists");
422             exit(1);
423         }
424         s->termlists[i].name = nmem_strdup(s->nmem, type);
425         s->termlists[i].termlist = termlist_create(s->nmem, s->expected_maxrecs, 15);
426         s->num_termlists = i + 1;
427     }
428     termlist_insert(s->termlists[i].termlist, value);
429 }
430
431 static xmlDoc *normalize_record(struct client *cl, Z_External *rec)
432 {
433     struct database_retrievalmap *m;
434     struct database *db = cl->database;
435     xmlNode *res;
436     xmlDoc *rdoc;
437
438     // First normalize to XML
439     if (db->yaz_marc)
440     {
441         char *buf;
442         int len;
443         if (rec->which != Z_External_octet)
444         {
445             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER %s",
446                     cl->database->url);
447             return 0;
448         }
449         buf = (char*) rec->u.octet_aligned->buf;
450         len = rec->u.octet_aligned->len;
451         if (yaz_marc_read_iso2709(db->yaz_marc, buf, len) < 0)
452         {
453             yaz_log(YLOG_WARN, "Failed to decode MARC %s",
454                     cl->database->url);
455             return 0;
456         }
457         if (yaz_marc_write_xml(db->yaz_marc, &res,
458                     "http://www.loc.gov/MARC21/slim", 0, 0) < 0)
459         {
460             yaz_log(YLOG_WARN, "Failed to encode as XML %s",
461                     cl->database->url);
462             return 0;
463         }
464         rdoc = xmlNewDoc((xmlChar *) "1.0");
465         xmlDocSetRootElement(rdoc, res);
466     }
467     else
468     {
469         yaz_log(YLOG_FATAL, "Unknown native_syntax in normalize_record");
470         exit(1);
471     }
472
473     if (global_parameters.dump_records)
474     {
475         fprintf(stderr, "Input Record (normalized):\n----------------\n");
476 #if LIBXML_VERSION >= 20600
477         xmlDocFormatDump(stderr, rdoc, 1);
478 #else
479         xmlDocDump(stderr, rdoc);
480 #endif
481     }
482
483     for (m = db->map; m; m = m->next)
484     {
485         xmlDoc *new;
486         if (!(new = xsltApplyStylesheet(m->stylesheet, rdoc, 0)))
487         {
488             yaz_log(YLOG_WARN, "XSLT transformation failed");
489             return 0;
490         }
491         xmlFreeDoc(rdoc);
492         rdoc = new;
493     }
494     if (global_parameters.dump_records)
495     {
496         fprintf(stderr, "Record:\n----------------\n");
497 #if LIBXML_VERSION >= 20600
498         xmlDocFormatDump(stderr, rdoc, 1);
499 #else
500         xmlDocDump(stderr, rdoc);
501 #endif
502     }
503     return rdoc;
504 }
505
506 // Extract what appears to be years from buf, storing highest and
507 // lowest values.
508 static int extract_years(const char *buf, int *first, int *last)
509 {
510     *first = -1;
511     *last = -1;
512     while (*buf)
513     {
514         const char *e;
515         int len;
516
517         while (*buf && !isdigit(*buf))
518             buf++;
519         len = 0;
520         for (e = buf; *e && isdigit(*e); e++)
521             len++;
522         if (len == 4)
523         {
524             int value = atoi(buf);
525             if (*first < 0 || value < *first)
526                 *first = value;
527             if (*last < 0 || value > *last)
528                 *last = value;
529         }
530         buf = e;
531     }
532     return *first;
533 }
534
535 static struct record *ingest_record(struct client *cl, Z_External *rec)
536 {
537     xmlDoc *xdoc = normalize_record(cl, rec);
538     xmlNode *root, *n;
539     struct record *res;
540     struct record_cluster *cluster;
541     struct session *se = cl->session;
542     xmlChar *mergekey, *mergekey_norm;
543     xmlChar *type = 0;
544     xmlChar *value = 0;
545     struct conf_service *service = global_parameters.server->service;
546
547     if (!xdoc)
548         return 0;
549
550     root = xmlDocGetRootElement(xdoc);
551     if (!(mergekey = xmlGetProp(root, (xmlChar *) "mergekey")))
552     {
553         yaz_log(YLOG_WARN, "No mergekey found in record");
554         xmlFreeDoc(xdoc);
555         return 0;
556     }
557
558     res = nmem_malloc(se->nmem, sizeof(struct record));
559     res->next = 0;
560     res->client = cl;
561     res->metadata = nmem_malloc(se->nmem,
562             sizeof(struct record_metadata*) * service->num_metadata);
563     memset(res->metadata, 0, sizeof(struct record_metadata*) * service->num_metadata);
564
565     mergekey_norm = (xmlChar *) nmem_strdup(se->nmem, (char*) mergekey);
566     xmlFree(mergekey);
567     normalize_mergekey((char *) mergekey_norm, 0);
568
569     cluster = reclist_insert(se->reclist, res, (char *) mergekey_norm, 
570                              &se->total_merged);
571     if (global_parameters.dump_records)
572         yaz_log(YLOG_LOG, "Cluster id %d from %s (#%d)", cluster->recid,
573                 cl->database->url, cl->records);
574     if (!cluster)
575     {
576         /* no room for record */
577         xmlFreeDoc(xdoc);
578         return 0;
579     }
580     relevance_newrec(se->relevance, cluster);
581
582     for (n = root->children; n; n = n->next)
583     {
584         if (type)
585             xmlFree(type);
586         if (value)
587             xmlFree(value);
588         type = value = 0;
589
590         if (n->type != XML_ELEMENT_NODE)
591             continue;
592         if (!strcmp((const char *) n->name, "metadata"))
593         {
594             struct conf_metadata *md = 0;
595             struct conf_sortkey *sk = 0;
596             struct record_metadata **wheretoput, *newm;
597             int imeta;
598             int first, last;
599
600             type = xmlGetProp(n, (xmlChar *) "type");
601             value = xmlNodeListGetString(xdoc, n->children, 0);
602
603             if (!type || !value)
604                 continue;
605
606             // First, find out what field we're looking at
607             for (imeta = 0; imeta < service->num_metadata; imeta++)
608                 if (!strcmp((const char *) type, service->metadata[imeta].name))
609                 {
610                     md = &service->metadata[imeta];
611                     if (md->sortkey_offset >= 0)
612                         sk = &service->sortkeys[md->sortkey_offset];
613                     break;
614                 }
615             if (!md)
616             {
617                 yaz_log(YLOG_WARN, "Ignoring unknown metadata element: %s", type);
618                 continue;
619             }
620
621             // Find out where we are putting it
622             if (md->merge == Metadata_merge_no)
623                 wheretoput = &res->metadata[imeta];
624             else
625                 wheretoput = &cluster->metadata[imeta];
626             
627             // Put it there
628             newm = nmem_malloc(se->nmem, sizeof(struct record_metadata));
629             newm->next = 0;
630             if (md->type == Metadata_type_generic)
631             {
632                 char *p, *pe;
633                 for (p = (char *) value; *p && isspace(*p); p++)
634                     ;
635                 for (pe = p + strlen(p) - 1;
636                         pe > p && strchr(" ,/.:([", *pe); pe--)
637                     *pe = '\0';
638                 newm->data.text = nmem_strdup(se->nmem, p);
639
640             }
641             else if (md->type == Metadata_type_year)
642             {
643                 if (extract_years((char *) value, &first, &last) < 0)
644                     continue;
645             }
646             else
647             {
648                 yaz_log(YLOG_WARN, "Unknown type in metadata element %s", type);
649                 continue;
650             }
651             if (md->type == Metadata_type_year && md->merge != Metadata_merge_range)
652             {
653                 yaz_log(YLOG_WARN, "Only range merging supported for years");
654                 continue;
655             }
656             if (md->merge == Metadata_merge_unique)
657             {
658                 struct record_metadata *mnode;
659                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
660                     if (!strcmp((const char *) mnode->data.text, newm->data.text))
661                         break;
662                 if (!mnode)
663                 {
664                     newm->next = *wheretoput;
665                     *wheretoput = newm;
666                 }
667             }
668             else if (md->merge == Metadata_merge_longest)
669             {
670                 if (!*wheretoput ||
671                         strlen(newm->data.text) > strlen((*wheretoput)->data.text))
672                 {
673                     *wheretoput = newm;
674                     if (sk)
675                     {
676                         char *s = nmem_strdup(se->nmem, newm->data.text);
677                         if (!cluster->sortkeys[md->sortkey_offset])
678                             cluster->sortkeys[md->sortkey_offset] = 
679                                 nmem_malloc(se->nmem, sizeof(union data_types));
680                         normalize_mergekey(s,
681                                 (sk->type == Metadata_sortkey_skiparticle));
682                         cluster->sortkeys[md->sortkey_offset]->text = s;
683                     }
684                 }
685             }
686             else if (md->merge == Metadata_merge_all || md->merge == Metadata_merge_no)
687             {
688                 newm->next = *wheretoput;
689                 *wheretoput = newm;
690             }
691             else if (md->merge == Metadata_merge_range)
692             {
693                 assert(md->type == Metadata_type_year);
694                 if (!*wheretoput)
695                 {
696                     *wheretoput = newm;
697                     (*wheretoput)->data.number.min = first;
698                     (*wheretoput)->data.number.max = last;
699                     if (sk)
700                         cluster->sortkeys[md->sortkey_offset] = &newm->data;
701                 }
702                 else
703                 {
704                     if (first < (*wheretoput)->data.number.min)
705                         (*wheretoput)->data.number.min = first;
706                     if (last > (*wheretoput)->data.number.max)
707                         (*wheretoput)->data.number.max = last;
708                 }
709 #ifdef GAGA
710                 if (sk)
711                 {
712                     union data_types *sdata = cluster->sortkeys[md->sortkey_offset];
713                     yaz_log(YLOG_LOG, "SK range: %d-%d", sdata->number.min, sdata->number.max);
714                 }
715 #endif
716             }
717             else
718                 yaz_log(YLOG_WARN, "Don't know how to merge on element name %s", md->name);
719
720             if (md->rank)
721                 relevance_countwords(se->relevance, cluster, 
722                                      (char *) value, md->rank);
723             if (md->termlist)
724             {
725                 if (md->type == Metadata_type_year)
726                 {
727                     char year[64];
728                     sprintf(year, "%d", last);
729                     add_facet(se, (char *) type, year);
730                     if (first != last)
731                     {
732                         sprintf(year, "%d", first);
733                         add_facet(se, (char *) type, year);
734                     }
735                 }
736                 else
737                     add_facet(se, (char *) type, (char *) value);
738             }
739             xmlFree(type);
740             xmlFree(value);
741             type = value = 0;
742         }
743         else
744             yaz_log(YLOG_WARN, "Unexpected element %s in internal record", n->name);
745     }
746     if (type)
747         xmlFree(type);
748     if (value)
749         xmlFree(value);
750
751     xmlFreeDoc(xdoc);
752
753     relevance_donerecord(se->relevance, cluster);
754     se->total_records++;
755
756     return res;
757 }
758
759 // Retrieve first defined value for 'name' for given database.
760 // Will be extended to take into account user associated with session
761 char *session_setting_oneval(struct session *s, struct database *db, int offset)
762 {
763     if (!db->settings[offset])
764         return 0;
765     return db->settings[offset]->value;
766 }
767
768 static void ingest_records(struct client *cl, Z_Records *r)
769 {
770 #if USE_TIMING
771     yaz_timing_t t = yaz_timing_create();
772 #endif
773     struct record *rec;
774     struct session *s = cl->session;
775     Z_NamePlusRecordList *rlist;
776     int i;
777
778     if (r->which != Z_Records_DBOSD)
779         return;
780     rlist = r->u.databaseOrSurDiagnostics;
781     for (i = 0; i < rlist->num_records; i++)
782     {
783         Z_NamePlusRecord *npr = rlist->records[i];
784
785         cl->records++;
786         if (npr->which != Z_NamePlusRecord_databaseRecord)
787         {
788             yaz_log(YLOG_WARN, 
789                     "Unexpected record type, probably diagnostic %s",
790                     cl->database->url);
791             continue;
792         }
793
794         rec = ingest_record(cl, npr->u.databaseRecord);
795         if (!rec)
796             continue;
797     }
798     if (s->watchlist[SESSION_WATCH_RECORDS].fun && rlist->num_records)
799         session_alert_watch(s, SESSION_WATCH_RECORDS);
800
801 #if USE_TIMING
802     yaz_timing_stop(t);
803     yaz_log(YLOG_LOG, "ingest_records %6.5f %3.2f %3.2f", 
804             yaz_timing_get_real(t), yaz_timing_get_user(t),
805             yaz_timing_get_sys(t));
806     yaz_timing_destroy(&t);
807 #endif
808 }
809
810 static void do_presentResponse(IOCHAN i, Z_APDU *a)
811 {
812     struct connection *co = iochan_getdata(i);
813     struct client *cl = co->client;
814     Z_PresentResponse *r = a->u.presentResponse;
815
816     if (r->records) {
817         Z_Records *recs = r->records;
818         if (recs->which == Z_Records_NSD)
819         {
820             yaz_log(YLOG_WARN, "Non-surrogate diagnostic %s",
821                     cl->database->url);
822             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
823             cl->state = Client_Error;
824         }
825     }
826
827     if (!*r->presentStatus && cl->state != Client_Error)
828     {
829         yaz_log(YLOG_DEBUG, "Good Present response %s",
830                 cl->database->url);
831         ingest_records(cl, r->records);
832         cl->state = Client_Idle;
833     }
834     else if (*r->presentStatus) 
835     {
836         yaz_log(YLOG_WARN, "Bad Present response %s",
837                 cl->database->url);
838         cl->state = Client_Error;
839     }
840 }
841
842 static void handler(IOCHAN i, int event)
843 {
844     struct connection *co = iochan_getdata(i);
845     struct client *cl = co->client;
846     struct session *se = 0;
847
848     if (cl)
849         se = cl->session;
850     else
851     {
852         yaz_log(YLOG_WARN, "Destroying orphan connection");
853         connection_destroy(co);
854         return;
855     }
856
857     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
858     {
859         int errcode;
860         socklen_t errlen = sizeof(errcode);
861
862         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
863             &errlen) < 0 || errcode != 0)
864         {
865             client_fatal(cl);
866             return;
867         }
868         else
869         {
870             yaz_log(YLOG_DEBUG, "Connect OK");
871             co->state = Conn_Open;
872             if (cl)
873                 cl->state = Client_Connected;
874         }
875     }
876
877     else if (event & EVENT_INPUT)
878     {
879         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
880
881         if (len < 0)
882         {
883             yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from %s", 
884                     cl->database->url);
885             connection_destroy(co);
886             return;
887         }
888         else if (len == 0)
889         {
890             yaz_log(YLOG_WARN, "EOF reading from %s", cl->database->url);
891             connection_destroy(co);
892             return;
893         }
894         else if (len > 1) // We discard input if we have no connection
895         {
896             co->state = Conn_Open;
897
898             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
899             {
900                 Z_APDU *a;
901
902                 odr_reset(global_parameters.odr_in);
903                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
904                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
905                 {
906                     client_fatal(cl);
907                     return;
908                 }
909                 switch (a->which)
910                 {
911                     case Z_APDU_initResponse:
912                         do_initResponse(i, a);
913                         break;
914                     case Z_APDU_searchResponse:
915                         do_searchResponse(i, a);
916                         break;
917                     case Z_APDU_presentResponse:
918                         do_presentResponse(i, a);
919                         break;
920                     case Z_APDU_close:
921                         do_closeResponse(i, a);
922                         break;
923                     default:
924                         yaz_log(YLOG_WARN, 
925                                 "Unexpected Z39.50 response from %s",  
926                                 cl->database->url);
927                         client_fatal(cl);
928                         return;
929                 }
930                 // We aren't expecting staggered output from target
931                 // if (cs_more(t->link))
932                 //    iochan_setevent(i, EVENT_INPUT);
933             }
934             else  // we throw away response and go to idle mode
935             {
936                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
937                 cl->state = Client_Idle;
938             }
939         }
940         /* if len==1 we do nothing but wait for more input */
941     }
942
943     if (cl->state == Client_Connected) {
944         send_init(i);
945     }
946
947     if (cl->state == Client_Idle)
948     {
949         if (cl->requestid != se->requestid && *se->query) {
950             send_search(i);
951         }
952         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
953             cl->records < cl->hits) {
954             send_present(i);
955         }
956     }
957 }
958
959 // Disassociate connection from client
960 static void connection_release(struct connection *co)
961 {
962     struct client *cl = co->client;
963
964     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
965     if (!cl)
966         return;
967     cl->connection = 0;
968     co->client = 0;
969 }
970
971 // Close connection and recycle structure
972 static void connection_destroy(struct connection *co)
973 {
974     struct host *h = co->host;
975     cs_close(co->link);
976     iochan_destroy(co->iochan);
977
978     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
979     if (h->connections == co)
980         h->connections = co->next;
981     else
982     {
983         struct connection *pco;
984         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
985             ;
986         if (pco)
987             pco->next = co->next;
988         else
989             abort();
990     }
991     if (co->client)
992     {
993         if (co->client->state != Client_Idle)
994             co->client->state = Client_Disconnected;
995         co->client->connection = 0;
996     }
997     co->next = connection_freelist;
998     connection_freelist = co;
999 }
1000
1001 // Creates a new connection for client, associated with the host of 
1002 // client's database
1003 static struct connection *connection_create(struct client *cl)
1004 {
1005     struct connection *new;
1006     COMSTACK link; 
1007     int res;
1008     void *addr;
1009
1010
1011     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
1012         {
1013             yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
1014             exit(1);
1015         }
1016     
1017     if (0 == strlen(global_parameters.zproxy_override)){
1018         /* no Z39.50 proxy needed - direct connect */
1019         yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->url);
1020         
1021         if (!(addr = cs_straddr(link, cl->database->host->ipport)))
1022             {
1023                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1024                         "Lookup of IP address %s failed", 
1025                         cl->database->host->ipport);
1026                 return 0;
1027             }
1028     
1029     } else {
1030         /* Z39.50 proxy connect */
1031         yaz_log(YLOG_DEBUG, "Connection create %s proxy %s", 
1032                 cl->database->url, global_parameters.zproxy_override);
1033
1034         if (!(addr = cs_straddr(link, global_parameters.zproxy_override)))
1035             {
1036                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
1037                         "Lookup of IP address %s failed", 
1038                         global_parameters.zproxy_override);
1039                 return 0;
1040             }
1041     }
1042
1043     res = cs_connect(link, addr);
1044     if (res < 0)
1045     {
1046         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->url);
1047         return 0;
1048     }
1049
1050     if ((new = connection_freelist))
1051         connection_freelist = new->next;
1052     else
1053     {
1054         new = xmalloc(sizeof (struct connection));
1055         new->ibuf = 0;
1056         new->ibufsize = 0;
1057     }
1058     new->state = Conn_Connecting;
1059     new->host = cl->database->host;
1060     new->next = new->host->connections;
1061     new->host->connections = new;
1062     new->client = cl;
1063     cl->connection = new;
1064     new->link = link;
1065
1066     new->iochan = iochan_create(cs_fileno(link), 0, handler, 0);
1067     iochan_setdata(new->iochan, new);
1068     new->iochan->next = channel_list;
1069     channel_list = new->iochan;
1070     return new;
1071 }
1072
1073 // Close connection and set state to error
1074 static void client_fatal(struct client *cl)
1075 {
1076     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->url);
1077     connection_destroy(cl->connection);
1078     cl->state = Client_Error;
1079 }
1080
1081 // Ensure that client has a connection associated
1082 static int client_prep_connection(struct client *cl)
1083 {
1084     struct connection *co;
1085     struct session *se = cl->session;
1086     struct host *host = cl->database->host;
1087
1088     co = cl->connection;
1089
1090     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->url);
1091
1092     if (!co)
1093     {
1094         // See if someone else has an idle connection
1095         // We should look at timestamps here to select the longest-idle connection
1096         for (co = host->connections; co; co = co->next)
1097             if (co->state == Conn_Open && (!co->client || co->client->session != se))
1098                 break;
1099         if (co)
1100         {
1101             connection_release(co);
1102             cl->connection = co;
1103             co->client = cl;
1104         }
1105         else
1106             co = connection_create(cl);
1107     }
1108     if (co)
1109     {
1110         if (co->state == Conn_Connecting)
1111         {
1112             cl->state = Client_Connecting;
1113             iochan_setflag(co->iochan, EVENT_OUTPUT);
1114         }
1115         else if (co->state == Conn_Open)
1116         {
1117             if (cl->state == Client_Error || cl->state == Client_Disconnected)
1118                 cl->state = Client_Idle;
1119             iochan_setflag(co->iochan, EVENT_OUTPUT);
1120         }
1121         return 1;
1122     }
1123     else
1124         return 0;
1125 }
1126
1127 #ifdef GAGA // Moved to database.c
1128
1129 // This function will most likely vanish when a proper target profile mechanism is
1130 // introduced.
1131 void load_simpletargets(const char *fn)
1132 {
1133     FILE *f = fopen(fn, "r");
1134     char line[256];
1135
1136     if (!f)
1137     {
1138         yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
1139         exit(1);
1140     }
1141
1142     while (fgets(line, 255, f))
1143     {
1144         char *url, *db;
1145         char *name;
1146         struct host *host;
1147         struct database *database;
1148
1149         if (strncmp(line, "target ", 7))
1150             continue;
1151         line[strlen(line) - 1] = '\0';
1152
1153         if ((name = strchr(line, ';')))
1154             *(name++) = '\0';
1155
1156         url = line + 7;
1157         if ((db = strchr(url, '/')))
1158             *(db++) = '\0';
1159         else
1160             db = "Default";
1161
1162         yaz_log(YLOG_LOG, "Target: %s, '%s'", url, db);
1163         for (host = hosts; host; host = host->next)
1164             if (!strcmp((const char *) url, host->hostport))
1165                 break;
1166         if (!host)
1167         {
1168             struct addrinfo *addrinfo, hints;
1169             char *port;
1170             char ipport[128];
1171             unsigned char addrbuf[4];
1172             int res;
1173
1174             host = xmalloc(sizeof(struct host));
1175             host->hostport = xstrdup(url);
1176             host->connections = 0;
1177
1178             if ((port = strchr(url, ':')))
1179                 *(port++) = '\0';
1180             else
1181                 port = "210";
1182
1183             hints.ai_flags = 0;
1184             hints.ai_family = PF_INET;
1185             hints.ai_socktype = SOCK_STREAM;
1186             hints.ai_protocol = IPPROTO_TCP;
1187             hints.ai_addrlen = 0;
1188             hints.ai_addr = 0;
1189             hints.ai_canonname = 0;
1190             hints.ai_next = 0;
1191             // This is not robust code. It assumes that getaddrinfo returns AF_INET
1192             // address.
1193             if ((res = getaddrinfo(url, port, &hints, &addrinfo)))
1194             {
1195                 yaz_log(YLOG_WARN, "Failed to resolve %s: %s", url, gai_strerror(res));
1196                 xfree(host->hostport);
1197                 xfree(host);
1198                 continue;
1199             }
1200             assert(addrinfo->ai_family == PF_INET);
1201             memcpy(addrbuf, &((struct sockaddr_in*)addrinfo->ai_addr)->sin_addr.s_addr, 4);
1202             sprintf(ipport, "%u.%u.%u.%u:%s",
1203                     addrbuf[0], addrbuf[1], addrbuf[2], addrbuf[3], port);
1204             host->ipport = xstrdup(ipport);
1205             freeaddrinfo(addrinfo);
1206             host->next = hosts;
1207             hosts = host;
1208         }
1209         database = xmalloc(sizeof(struct database));
1210         database->host = host;
1211         database->url = xmalloc(strlen(url) + strlen(db) + 2);
1212         strcpy(database->url, url);
1213         strcat(database->url, "/");
1214         strcat(database->url, db);
1215         if (name)
1216             database->name = xstrdup(name);
1217         else
1218             database->name = 0;
1219         
1220         database->databases = xmalloc(2 * sizeof(char *));
1221         database->databases[0] = xstrdup(db);
1222         database->databases[1] = 0;
1223         database->errors = 0;
1224         database->qprofile = 0;
1225         database->rprofile = database_retrieval_profile(database);
1226         database->next = databases;
1227         databases = database;
1228
1229     }
1230     fclose(f);
1231 }
1232
1233 #endif
1234
1235 static struct client *client_create(void)
1236 {
1237     struct client *r;
1238     if (client_freelist)
1239     {
1240         r = client_freelist;
1241         client_freelist = client_freelist->next;
1242     }
1243     else
1244         r = xmalloc(sizeof(struct client));
1245     r->database = 0;
1246     r->connection = 0;
1247     r->session = 0;
1248     r->hits = 0;
1249     r->records = 0;
1250     r->setno = 0;
1251     r->requestid = -1;
1252     r->diagnostic = 0;
1253     r->state = Client_Disconnected;
1254     r->next = 0;
1255     return r;
1256 }
1257
1258 void client_destroy(struct client *c)
1259 {
1260     struct session *se = c->session;
1261     if (c == se->clients)
1262         se->clients = c->next;
1263     else
1264     {
1265         struct client *cc;
1266         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
1267             ;
1268         if (cc)
1269             cc->next = c->next;
1270     }
1271     if (c->connection)
1272         connection_release(c->connection);
1273     c->next = client_freelist;
1274     client_freelist = c;
1275 }
1276
1277 void session_set_watch(struct session *s, int what, session_watchfun fun, void *data)
1278 {
1279     s->watchlist[what].fun = fun;
1280     s->watchlist[what].data = data;
1281 }
1282
1283 void session_alert_watch(struct session *s, int what)
1284 {
1285     if (!s->watchlist[what].fun)
1286         return;
1287     (*s->watchlist[what].fun)(s->watchlist[what].data);
1288     s->watchlist[what].fun = 0;
1289     s->watchlist[what].data = 0;
1290 }
1291
1292 //callback for grep_databases
1293 static void select_targets_callback(void *context, struct database *db)
1294 {
1295     struct session *se = (struct session*) context;
1296     struct client *cl = client_create();
1297     cl->database = db;
1298     cl->session = se;
1299     cl->next = se->clients;
1300     se->clients = cl;
1301 }
1302
1303 // This should be extended with parameters to control selection criteria
1304 // Associates a set of clients with a session;
1305 int select_targets(struct session *se, struct database_criterion *crit)
1306 {
1307     while (se->clients)
1308         client_destroy(se->clients);
1309
1310     return grep_databases(se, crit, select_targets_callback);
1311 }
1312
1313 int session_active_clients(struct session *s)
1314 {
1315     struct client *c;
1316     int res = 0;
1317
1318     for (c = s->clients; c; c = c->next)
1319         if (c->connection && (c->state == Client_Connecting ||
1320                     c->state == Client_Initializing ||
1321                     c->state == Client_Searching ||
1322                     c->state == Client_Presenting))
1323             res++;
1324
1325     return res;
1326 }
1327
1328 // parses crit1=val1,crit2=val2|val3,...
1329 static struct database_criterion *parse_filter(NMEM m, const char *buf)
1330 {
1331     struct database_criterion *res = 0;
1332     char **values;
1333     int num;
1334     int i;
1335
1336     if (!buf || !*buf)
1337         return 0;
1338     nmem_strsplit(m, ",", buf,  &values, &num);
1339     for (i = 0; i < num; i++)
1340     {
1341         char **subvalues;
1342         int subnum;
1343         int subi;
1344         struct database_criterion *new = nmem_malloc(m, sizeof(*new));
1345         char *eq = strchr(values[i], '=');
1346         if (!eq)
1347         {
1348             yaz_log(YLOG_WARN, "Missing equal-sign in filter");
1349             return 0;
1350         }
1351         *(eq++) = '\0';
1352         new->name = values[i];
1353         nmem_strsplit(m, "|", eq, &subvalues, &subnum);
1354         new->values = 0;
1355         for (subi = 0; subi < subnum; subi++)
1356         {
1357             struct database_criterion_value *newv = nmem_malloc(m, sizeof(*newv));
1358             newv->value = subvalues[subi];
1359             newv->next = new->values;
1360             new->values = newv;
1361         }
1362         new->next = res;
1363         res = new;
1364     }
1365     return res;
1366 }
1367
1368 char *search(struct session *se, char *query, char *filter)
1369 {
1370     int live_channels = 0;
1371     struct client *cl;
1372     struct database_criterion *criteria;
1373
1374     yaz_log(YLOG_DEBUG, "Search");
1375
1376     nmem_reset(se->nmem);
1377     criteria = parse_filter(se->nmem, filter);
1378     strcpy(se->query, query);
1379     se->requestid++;
1380     // Release any existing clients
1381     select_targets(se, criteria);
1382     for (cl = se->clients; cl; cl = cl->next)
1383     {
1384         if (client_prep_connection(cl))
1385             live_channels++;
1386     }
1387     if (live_channels)
1388     {
1389         int maxrecs = live_channels * global_parameters.toget;
1390         se->num_termlists = 0;
1391         se->reclist = reclist_create(se->nmem, maxrecs);
1392         // This will be initialized in send_search()
1393         se->relevance = 0;
1394         se->total_records = se->total_hits = se->total_merged = 0;
1395         se->expected_maxrecs = maxrecs;
1396     }
1397     else
1398         return "NOTARGETS";
1399
1400     return 0;
1401 }
1402
1403 void destroy_session(struct session *s)
1404 {
1405     yaz_log(YLOG_LOG, "Destroying session");
1406     while (s->clients)
1407         client_destroy(s->clients);
1408     nmem_destroy(s->nmem);
1409     wrbuf_destroy(s->wrbuf);
1410 }
1411
1412 struct session *new_session() 
1413 {
1414     int i;
1415     struct session *session = xmalloc(sizeof(*session));
1416
1417     yaz_log(YLOG_DEBUG, "New pazpar2 session");
1418     
1419     session->total_hits = 0;
1420     session->total_records = 0;
1421     session->num_termlists = 0;
1422     session->reclist = 0;
1423     session->requestid = -1;
1424     session->clients = 0;
1425     session->expected_maxrecs = 0;
1426     session->query[0] = '\0';
1427     session->nmem = nmem_create();
1428     session->wrbuf = wrbuf_alloc();
1429     for (i = 0; i <= SESSION_WATCH_MAX; i++)
1430     {
1431         session->watchlist[i].data = 0;
1432         session->watchlist[i].fun = 0;
1433     }
1434
1435     return session;
1436 }
1437
1438 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1439 {
1440     static struct hitsbytarget res[1000]; // FIXME MM
1441     struct client *cl;
1442
1443     *count = 0;
1444     for (cl = se->clients; cl; cl = cl->next)
1445     {
1446         res[*count].id = cl->database->url;
1447         res[*count].name = cl->database->name;
1448         res[*count].hits = cl->hits;
1449         res[*count].records = cl->records;
1450         res[*count].diagnostic = cl->diagnostic;
1451         res[*count].state = client_states[cl->state];
1452         res[*count].connected  = cl->connection ? 1 : 0;
1453         (*count)++;
1454     }
1455
1456     return res;
1457 }
1458
1459 struct termlist_score **termlist(struct session *s, const char *name, int *num)
1460 {
1461     int i;
1462
1463     for (i = 0; i < s->num_termlists; i++)
1464         if (!strcmp((const char *) s->termlists[i].name, name))
1465             return termlist_highscore(s->termlists[i].termlist, num);
1466     return 0;
1467 }
1468
1469 #ifdef MISSING_HEADERS
1470 void report_nmem_stats(void)
1471 {
1472     size_t in_use, is_free;
1473
1474     nmem_get_memory_in_use(&in_use);
1475     nmem_get_memory_free(&is_free);
1476
1477     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
1478             (long) in_use, (long) is_free);
1479 }
1480 #endif
1481
1482 struct record_cluster *show_single(struct session *s, int id)
1483 {
1484     struct record_cluster *r;
1485
1486     reclist_rewind(s->reclist);
1487     while ((r = reclist_read_record(s->reclist)))
1488         if (r->recid == id)
1489             return r;
1490     return 0;
1491 }
1492
1493 struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, int start,
1494         int *num, int *total, int *sumhits, NMEM nmem_show)
1495 {
1496     struct record_cluster **recs = nmem_malloc(nmem_show, *num 
1497                                        * sizeof(struct record_cluster *));
1498     struct reclist_sortparms *spp;
1499     int i;
1500 #if USE_TIMING    
1501     yaz_timing_t t = yaz_timing_create();
1502 #endif
1503
1504     for (spp = sp; spp; spp = spp->next)
1505         if (spp->type == Metadata_sortkey_relevance)
1506         {
1507             relevance_prepare_read(s->relevance, s->reclist);
1508             break;
1509         }
1510     reclist_sort(s->reclist, sp);
1511
1512     *total = s->reclist->num_records;
1513     *sumhits = s->total_hits;
1514
1515     for (i = 0; i < start; i++)
1516         if (!reclist_read_record(s->reclist))
1517         {
1518             *num = 0;
1519             recs = 0;
1520             break;
1521         }
1522
1523     for (i = 0; i < *num; i++)
1524     {
1525         struct record_cluster *r = reclist_read_record(s->reclist);
1526         if (!r)
1527         {
1528             *num = i;
1529             break;
1530         }
1531         recs[i] = r;
1532     }
1533 #if USE_TIMING
1534     yaz_timing_stop(t);
1535     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
1536             yaz_timing_get_real(t), yaz_timing_get_user(t),
1537             yaz_timing_get_sys(t));
1538     yaz_timing_destroy(&t);
1539 #endif
1540     return recs;
1541 }
1542
1543 void statistics(struct session *se, struct statistics *stat)
1544 {
1545     struct client *cl;
1546     int count = 0;
1547
1548     memset(stat, 0, sizeof(*stat));
1549     for (cl = se->clients; cl; cl = cl->next)
1550     {
1551         if (!cl->connection)
1552             stat->num_no_connection++;
1553         switch (cl->state)
1554         {
1555             case Client_Connecting: stat->num_connecting++; break;
1556             case Client_Initializing: stat->num_initializing++; break;
1557             case Client_Searching: stat->num_searching++; break;
1558             case Client_Presenting: stat->num_presenting++; break;
1559             case Client_Idle: stat->num_idle++; break;
1560             case Client_Failed: stat->num_failed++; break;
1561             case Client_Error: stat->num_error++; break;
1562             default: break;
1563         }
1564         count++;
1565     }
1566     stat->num_hits = se->total_hits;
1567     stat->num_records = se->total_records;
1568
1569     stat->num_clients = count;
1570 }
1571
1572 static void start_http_listener(void)
1573 {
1574     char hp[128] = "";
1575     struct conf_server *ser = global_parameters.server;
1576
1577     if (*global_parameters.listener_override)
1578         strcpy(hp, global_parameters.listener_override);
1579     else
1580     {
1581         strcpy(hp, ser->host ? ser->host : "");
1582         if (ser->port)
1583         {
1584             if (*hp)
1585                 strcat(hp, ":");
1586             sprintf(hp + strlen(hp), "%d", ser->port);
1587         }
1588     }
1589     http_init(hp);
1590 }
1591
1592 static void start_proxy(void)
1593 {
1594     char hp[128] = "";
1595     struct conf_server *ser = global_parameters.server;
1596
1597     if (*global_parameters.proxy_override)
1598         strcpy(hp, global_parameters.proxy_override);
1599     else if (ser->proxy_host || ser->proxy_port)
1600     {
1601         strcpy(hp, ser->proxy_host ? ser->proxy_host : "");
1602         if (ser->proxy_port)
1603         {
1604             if (*hp)
1605                 strcat(hp, ":");
1606             sprintf(hp + strlen(hp), "%d", ser->proxy_port);
1607         }
1608     }
1609     else
1610         return;
1611
1612     http_set_proxyaddr(hp, ser->myurl ? ser->myurl : "");
1613 }
1614
1615 static void start_zproxy(void)
1616 {
1617     struct conf_server *ser = global_parameters.server;
1618
1619     if (*global_parameters.zproxy_override){
1620         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1621                 global_parameters.zproxy_override);
1622         return;
1623     }
1624
1625     else if (ser->zproxy_host || ser->zproxy_port)
1626     {
1627         char hp[128] = "";
1628
1629         strcpy(hp, ser->zproxy_host ? ser->zproxy_host : "");
1630         if (ser->zproxy_port)
1631         {
1632             if (*hp)
1633                 strcat(hp, ":");
1634             else
1635                 strcat(hp, "@:");
1636
1637             sprintf(hp + strlen(hp), "%d", ser->zproxy_port);
1638         }
1639         strcpy(global_parameters.zproxy_override, hp);
1640         yaz_log(YLOG_LOG, "Z39.50 proxy  %s", 
1641                 global_parameters.zproxy_override);
1642
1643     }
1644     else
1645         return;
1646 }
1647
1648
1649
1650 int main(int argc, char **argv)
1651 {
1652     int ret;
1653     char *arg;
1654
1655     if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
1656         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
1657
1658     yaz_log_init(YLOG_DEFAULT_LEVEL, "pazpar2", 0);
1659
1660     while ((ret = options("t:f:x:h:p:z:s:d", argv, argc, &arg)) != -2)
1661     {
1662         switch (ret) {
1663             case 'f':
1664                 if (!read_config(arg))
1665                     exit(1);
1666                 break;
1667             case 'h':
1668                 strcpy(global_parameters.listener_override, arg);
1669                 break;
1670             case 'p':
1671                 strcpy(global_parameters.proxy_override, arg);
1672                 break;
1673             case 'z':
1674                 strcpy(global_parameters.zproxy_override, arg);
1675                 break;
1676             case 't':
1677                 strcpy(global_parameters.settings_path_override, arg);
1678                 break;
1679             case 's':
1680                 load_simpletargets(arg);
1681                 break;
1682             case 'd':
1683                 global_parameters.dump_records = 1;
1684                 break;
1685             default:
1686                 fprintf(stderr, "Usage: pazpar2\n"
1687                         "    -f configfile\n"
1688                         "    -h [host:]port          (REST protocol listener)\n"
1689                         "    -C cclconfig\n"
1690                         "    -s simpletargetfile\n"
1691                         "    -p hostname[:portno]    (HTTP proxy)\n"
1692                         "    -z hostname[:portno]    (Z39.50 proxy)\n"
1693                         "    -d                      (show internal records)\n");
1694                 exit(1);
1695         }
1696     }
1697
1698     if (!config)
1699     {
1700         yaz_log(YLOG_FATAL, "Load config with -f");
1701         exit(1);
1702     }
1703     global_parameters.server = config->servers;
1704
1705     start_http_listener();
1706     start_proxy();
1707     start_zproxy();
1708
1709     if (*global_parameters.settings_path_override)
1710         settings_read(global_parameters.settings_path_override);
1711     else if (global_parameters.server->settings)
1712         settings_read(global_parameters.server->settings);
1713     else
1714         yaz_log(YLOG_WARN, "No settings-directory specified. Problems may well ensue!");
1715     prepare_databases();
1716     global_parameters.odr_in = odr_createmem(ODR_DECODE);
1717     global_parameters.odr_out = odr_createmem(ODR_ENCODE);
1718
1719     event_loop(&channel_list);
1720
1721     return 0;
1722 }
1723
1724 /*
1725  * Local variables:
1726  * c-basic-offset: 4
1727  * indent-tabs-mode: nil
1728  * End:
1729  * vim: shiftwidth=4 tabstop=8 expandtab
1730  */