added command line switch '-z' which lets one specify a Z39.50 proxy through which...
[pazpar2-moved-to-github.git] / src / pazpar2.c
1 /* $Id: pazpar2.c,v 1.53 2007-03-26 14:00:21 marc Exp $ */
2
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <sys/time.h>
7 #include <unistd.h>
8 #include <sys/socket.h>
9 #include <netdb.h>
10 #include <signal.h>
11 #include <ctype.h>
12 #include <assert.h>
13
14 #include <yaz/marcdisp.h>
15 #include <yaz/comstack.h>
16 #include <yaz/tcpip.h>
17 #include <yaz/proto.h>
18 #include <yaz/readconf.h>
19 #include <yaz/pquery.h>
20 #include <yaz/yaz-util.h>
21 #include <yaz/nmem.h>
22
23 #if HAVE_CONFIG_H
24 #include "cconfig.h"
25 #endif
26
27 #define USE_TIMING 0
28 #if USE_TIMING
29 #include <yaz/timing.h>
30 #endif
31
32 #include <netinet/in.h>
33
34 #include "pazpar2.h"
35 #include "eventl.h"
36 #include "http.h"
37 #include "termlists.h"
38 #include "reclists.h"
39 #include "relevance.h"
40 #include "config.h"
41 #include "database.h"
42
43 #define MAX_CHUNK 15
44
45 static void client_fatal(struct client *cl);
46 static void connection_destroy(struct connection *co);
47 static int client_prep_connection(struct client *cl);
48 static void ingest_records(struct client *cl, Z_Records *r);
49 //static struct conf_retrievalprofile *database_retrieval_profile(struct database *db);
50 void session_alert_watch(struct session *s, int what);
51
52 IOCHAN channel_list = 0;  // Master list of connections we're handling events to
53
54 static struct connection *connection_freelist = 0;
55 static struct client *client_freelist = 0;
56
57 static char *client_states[] = {
58     "Client_Connecting",
59     "Client_Connected",
60     "Client_Idle",
61     "Client_Initializing",
62     "Client_Searching",
63     "Client_Presenting",
64     "Client_Error",
65     "Client_Failed",
66     "Client_Disconnected",
67     "Client_Stopped"
68 };
69
70 // Note: Some things in this structure will eventually move to configuration
71 struct parameters global_parameters = 
72 {
73     "",
74     "",
75     "",
76     0,
77     0,
78     30,
79     "81",
80     "Index Data PazPar2 (MasterKey)",
81     VERSION,
82     600, // 10 minutes
83     60,
84     100,
85     MAX_CHUNK,
86     0,
87     0,
88     0,
89     0
90 };
91
92 static int send_apdu(struct client *c, Z_APDU *a)
93 {
94     struct connection *co = c->connection;
95     char *buf;
96     int len, r;
97
98     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
99     {
100         odr_perror(global_parameters.odr_out, "Encoding APDU");
101         abort();
102     }
103     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
104     r = cs_put(co->link, buf, len);
105     if (r < 0)
106     {
107         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
108         return -1;
109     }
110     else if (r == 1)
111     {
112         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
113         exit(1);
114     }
115     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
116     co->state = Conn_Waiting;
117     return 0;
118 }
119
120
121 static void send_init(IOCHAN i)
122 {
123
124     struct connection *co = iochan_getdata(i);
125     struct client *cl = co->client;
126     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
127
128     a->u.initRequest->implementationId = global_parameters.implementationId;
129     a->u.initRequest->implementationName = global_parameters.implementationName;
130     a->u.initRequest->implementationVersion =
131         global_parameters.implementationVersion;
132     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
133     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
134     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
135
136     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
137     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
138     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
139
140
141     /* add virtual host if tunneling through Z39.50 proxy */
142     
143     if (0 < strlen(global_parameters.zproxy_override) 
144         && 0 < strlen(cl->database->url))
145         yaz_oi_set_string_oidval(&a->u.initRequest->otherInfo, 
146                                  global_parameters.odr_out, VAL_PROXY,
147                                  1, cl->database->url);
148     
149
150
151     if (send_apdu(cl, a) >= 0)
152     {
153         iochan_setflags(i, EVENT_INPUT);
154         cl->state = Client_Initializing;
155     }
156     else
157         cl->state = Client_Error;
158     odr_reset(global_parameters.odr_out);
159 }
160
161 static void send_search(IOCHAN i)
162 {
163     struct connection *co = iochan_getdata(i);
164     struct client *cl = co->client; 
165     struct session *se = cl->session;
166     struct database *db = cl->database;
167     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
168     int ndb, cerror, cpos;
169     char **databaselist;
170     Z_Query *zquery;
171     struct ccl_rpn_node *cn;
172     int ssub = 0, lslb = 100000, mspn = 10;
173
174     yaz_log(YLOG_DEBUG, "Sending search");
175
176     cn = ccl_find_str(global_parameters.ccl_filter, se->query, &cerror, &cpos);
177     if (!cn)
178         return;
179     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
180             sizeof(Z_Query));
181     zquery->which = Z_Query_type_1;
182     zquery->u.type_1 = ccl_rpn_query(global_parameters.odr_out, cn);
183     ccl_rpn_delete(cn);
184
185     for (ndb = 0; db->databases[ndb]; ndb++)
186         ;
187     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
188     for (ndb = 0; db->databases[ndb]; ndb++)
189         databaselist[ndb] = db->databases[ndb];
190
191     a->u.searchRequest->preferredRecordSyntax =
192             yaz_oidval_to_z3950oid(global_parameters.odr_out,
193             CLASS_RECSYN, VAL_USMARC);
194     a->u.searchRequest->smallSetUpperBound = &ssub;
195     a->u.searchRequest->largeSetLowerBound = &lslb;
196     a->u.searchRequest->mediumSetPresentNumber = &mspn;
197     a->u.searchRequest->resultSetName = "Default";
198     a->u.searchRequest->databaseNames = databaselist;
199     a->u.searchRequest->num_databaseNames = ndb;
200
201     if (send_apdu(cl, a) >= 0)
202     {
203         iochan_setflags(i, EVENT_INPUT);
204         cl->state = Client_Searching;
205         cl->requestid = se->requestid;
206     }
207     else
208         cl->state = Client_Error;
209
210     odr_reset(global_parameters.odr_out);
211 }
212
213 static void send_present(IOCHAN i)
214 {
215     struct connection *co = iochan_getdata(i);
216     struct client *cl = co->client; 
217     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
218     int toget;
219     int start = cl->records + 1;
220
221     toget = global_parameters.chunk;
222     if (toget > global_parameters.toget - cl->records)
223         toget = global_parameters.toget - cl->records;
224     if (toget > cl->hits - cl->records)
225         toget = cl->hits - cl->records;
226
227     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
228
229     a->u.presentRequest->resultSetStartPoint = &start;
230     a->u.presentRequest->numberOfRecordsRequested = &toget;
231
232     a->u.presentRequest->resultSetId = "Default";
233
234     a->u.presentRequest->preferredRecordSyntax =
235             yaz_oidval_to_z3950oid(global_parameters.odr_out,
236             CLASS_RECSYN, VAL_USMARC);
237
238     if (send_apdu(cl, a) >= 0)
239     {
240         iochan_setflags(i, EVENT_INPUT);
241         cl->state = Client_Presenting;
242     }
243     else
244         cl->state = Client_Error;
245     odr_reset(global_parameters.odr_out);
246 }
247
248 static void do_initResponse(IOCHAN i, Z_APDU *a)
249 {
250     struct connection *co = iochan_getdata(i);
251     struct client *cl = co->client;
252     Z_InitResponse *r = a->u.initResponse;
253
254     yaz_log(YLOG_DEBUG, "Received init response");
255
256     if (*r->result)
257     {
258         cl->state = Client_Idle;
259     }
260     else
261         cl->state = Client_Failed; // FIXME need to do something to the connection
262 }
263
264 static void do_searchResponse(IOCHAN i, Z_APDU *a)
265 {
266     struct connection *co = iochan_getdata(i);
267     struct client *cl = co->client;
268     struct session *se = cl->session;
269     Z_SearchResponse *r = a->u.searchResponse;
270
271     yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
272
273     if (*r->searchStatus)
274     {
275         cl->hits = *r->resultCount;
276         se->total_hits += cl->hits;
277         if (r->presentStatus && !*r->presentStatus && r->records)
278         {
279             yaz_log(YLOG_DEBUG, "Records in search response");
280             ingest_records(cl, r->records);
281         }
282         cl->state = Client_Idle;
283     }
284     else
285     {          /*"FAILED"*/
286         cl->hits = 0;
287         cl->state = Client_Error;
288         if (r->records) {
289             Z_Records *recs = r->records;
290             if (recs->which == Z_Records_NSD)
291             {
292                 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
293                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
294                 cl->state = Client_Error;
295             }
296         }
297     }
298 }
299
300 char *normalize_mergekey(char *buf, int skiparticle)
301 {
302     char *p = buf, *pout = buf;
303
304     if (skiparticle)
305     {
306         char firstword[64];
307         char articles[] = "the den der die des an a "; // must end in space
308
309         while (*p && !isalnum(*p))
310             p++;
311         pout = firstword;
312         while (*p && *p != ' ' && pout - firstword < 62)
313             *(pout++) = tolower(*(p++));
314         *(pout++) = ' ';
315         *(pout++) = '\0';
316         if (!strstr(articles, firstword))
317             p = buf;
318         pout = buf;
319     }
320
321     while (*p)
322     {
323         while (*p && !isalnum(*p))
324             p++;
325         while (isalnum(*p))
326             *(pout++) = tolower(*(p++));
327         if (*p)
328             *(pout++) = ' ';
329         while (*p && !isalnum(*p))
330             p++;
331     }
332     if (buf != pout)
333         do {
334             *(pout--) = '\0';
335         }
336         while (pout > buf && *pout == ' ');
337
338     return buf;
339 }
340
341
342 #ifdef GAGA
343 // FIXME needs to be generalized. Should flexibly generate X lists per search
344 static void extract_subject(struct session *s, const char *rec)
345 {
346     const char *field, *subfield;
347
348     while ((field = find_field(rec, "650")))
349     {
350         rec = field; 
351         if ((subfield = find_subfield(field, 'a')))
352         {
353             char *e, *ef;
354             char buf[1024];
355             int len;
356
357             ef = index(subfield, '\n');
358             if (!ef)
359                 return;
360             if ((e = index(subfield, '\t')) && e < ef)
361                 ef = e;
362             while (ef > subfield && !isalpha(*(ef - 1)) && *(ef - 1) != ')')
363                 ef--;
364             len = ef - subfield;
365             assert(len < 1023);
366             memcpy(buf, subfield, len);
367             buf[len] = '\0';
368 #ifdef FIXME
369             if (*buf)
370                 termlist_insert(s->termlist, buf);
371 #endif
372         }
373     }
374 }
375 #endif
376
377 static void add_facet(struct session *s, const char *type, const char *value)
378 {
379     int i;
380
381     if (!*value)
382         return;
383     for (i = 0; i < s->num_termlists; i++)
384         if (!strcmp(s->termlists[i].name, type))
385             break;
386     if (i == s->num_termlists)
387     {
388         if (i == SESSION_MAX_TERMLISTS)
389         {
390             yaz_log(YLOG_FATAL, "Too many termlists");
391             exit(1);
392         }
393         s->termlists[i].name = nmem_strdup(s->nmem, type);
394         s->termlists[i].termlist = termlist_create(s->nmem, s->expected_maxrecs, 15);
395         s->num_termlists = i + 1;
396     }
397     termlist_insert(s->termlists[i].termlist, value);
398 }
399
400 static xmlDoc *normalize_record(struct client *cl, Z_External *rec)
401 {
402     struct conf_retrievalprofile *rprofile = cl->database->rprofile;
403     struct conf_retrievalmap *m;
404     xmlNode *res;
405     xmlDoc *rdoc;
406
407     // First normalize to XML
408     if (rprofile->native_syntax == Nativesyn_iso2709)
409     {
410         char *buf;
411         int len;
412         if (rec->which != Z_External_octet)
413         {
414             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
415             return 0;
416         }
417         buf = (char*) rec->u.octet_aligned->buf;
418         len = rec->u.octet_aligned->len;
419         if (yaz_marc_read_iso2709(rprofile->yaz_marc, buf, len) < 0)
420         {
421             yaz_log(YLOG_WARN, "Failed to decode MARC");
422             return 0;
423         }
424         if (yaz_marc_write_xml(rprofile->yaz_marc, &res,
425                     "http://www.loc.gov/MARC21/slim", 0, 0) < 0)
426         {
427             yaz_log(YLOG_WARN, "Failed to encode as XML");
428             return 0;
429         }
430         rdoc = xmlNewDoc("1.0");
431         xmlDocSetRootElement(rdoc, res);
432     }
433     else
434     {
435         yaz_log(YLOG_FATAL, "Unknown native_syntax in normalize_record");
436         exit(1);
437     }
438
439     if (global_parameters.dump_records)
440     {
441         fprintf(stderr, "Input Record (normalized):\n----------------\n");
442 #if LIBXML_VERSION >= 20600
443         xmlDocFormatDump(stderr, rdoc, 1);
444 #else
445         xmlDocDump(stderr, rdoc);
446 #endif
447     }
448
449     for (m = rprofile->maplist; m; m = m->next)
450     {
451         xmlDoc *new;
452         if (m->type != Map_xslt)
453         {
454             yaz_log(YLOG_WARN, "Unknown map type");
455             return 0;
456         }
457         if (!(new = xsltApplyStylesheet(m->stylesheet, rdoc, 0)))
458         {
459             yaz_log(YLOG_WARN, "XSLT transformation failed");
460             return 0;
461         }
462         xmlFreeDoc(rdoc);
463         rdoc = new;
464     }
465     if (global_parameters.dump_records)
466     {
467         fprintf(stderr, "Record:\n----------------\n");
468 #if LIBXML_VERSION >= 20600
469         xmlDocFormatDump(stderr, rdoc, 1);
470 #else
471         xmlDocDump(stderr, rdoc);
472 #endif
473     }
474     return rdoc;
475 }
476
477 // Extract what appears to be years from buf, storing highest and
478 // lowest values.
479 static int extract_years(const char *buf, int *first, int *last)
480 {
481     *first = -1;
482     *last = -1;
483     while (*buf)
484     {
485         const char *e;
486         int len;
487
488         while (*buf && !isdigit(*buf))
489             buf++;
490         len = 0;
491         for (e = buf; *e && isdigit(*e); e++)
492             len++;
493         if (len == 4)
494         {
495             int value = atoi(buf);
496             if (*first < 0 || value < *first)
497                 *first = value;
498             if (*last < 0 || value > *last)
499                 *last = value;
500         }
501         buf = e;
502     }
503     return *first;
504 }
505
506 static struct record *ingest_record(struct client *cl, Z_External *rec)
507 {
508     xmlDoc *xdoc = normalize_record(cl, rec);
509     xmlNode *root, *n;
510     struct record *res;
511     struct record_cluster *cluster;
512     struct session *se = cl->session;
513     xmlChar *mergekey, *mergekey_norm;
514     xmlChar *type = 0;
515     xmlChar *value = 0;
516     struct conf_service *service = global_parameters.server->service;
517
518     if (!xdoc)
519         return 0;
520
521     root = xmlDocGetRootElement(xdoc);
522     if (!(mergekey = xmlGetProp(root, "mergekey")))
523     {
524         yaz_log(YLOG_WARN, "No mergekey found in record");
525         xmlFreeDoc(xdoc);
526         return 0;
527     }
528
529     res = nmem_malloc(se->nmem, sizeof(struct record));
530     res->next = 0;
531     res->client = cl;
532     res->metadata = nmem_malloc(se->nmem,
533             sizeof(struct record_metadata*) * service->num_metadata);
534     memset(res->metadata, 0, sizeof(struct record_metadata*) * service->num_metadata);
535
536     mergekey_norm = nmem_strdup(se->nmem, (char*) mergekey);
537     xmlFree(mergekey);
538     normalize_mergekey(mergekey_norm, 0);
539
540     cluster = reclist_insert(se->reclist, res, mergekey_norm, &se->total_merged);
541     if (global_parameters.dump_records)
542         yaz_log(YLOG_LOG, "Cluster id %d from %s (#%d)", cluster->recid,
543                 cl->database->url, cl->records);
544     if (!cluster)
545     {
546         /* no room for record */
547         xmlFreeDoc(xdoc);
548         return 0;
549     }
550     relevance_newrec(se->relevance, cluster);
551
552     for (n = root->children; n; n = n->next)
553     {
554         if (type)
555             xmlFree(type);
556         if (value)
557             xmlFree(value);
558         type = value = 0;
559
560         if (n->type != XML_ELEMENT_NODE)
561             continue;
562         if (!strcmp(n->name, "metadata"))
563         {
564             struct conf_metadata *md = 0;
565             struct conf_sortkey *sk = 0;
566             struct record_metadata **wheretoput, *newm;
567             int imeta;
568             int first, last;
569
570             type = xmlGetProp(n, "type");
571             value = xmlNodeListGetString(xdoc, n->children, 0);
572
573             if (!type || !value)
574                 continue;
575
576             // First, find out what field we're looking at
577             for (imeta = 0; imeta < service->num_metadata; imeta++)
578                 if (!strcmp(type, service->metadata[imeta].name))
579                 {
580                     md = &service->metadata[imeta];
581                     if (md->sortkey_offset >= 0)
582                         sk = &service->sortkeys[md->sortkey_offset];
583                     break;
584                 }
585             if (!md)
586             {
587                 yaz_log(YLOG_WARN, "Ignoring unknown metadata element: %s", type);
588                 continue;
589             }
590
591             // Find out where we are putting it
592             if (md->merge == Metadata_merge_no)
593                 wheretoput = &res->metadata[imeta];
594             else
595                 wheretoput = &cluster->metadata[imeta];
596             
597             // Put it there
598             newm = nmem_malloc(se->nmem, sizeof(struct record_metadata));
599             newm->next = 0;
600             if (md->type == Metadata_type_generic)
601             {
602                 char *p, *pe;
603                 for (p = value; *p && isspace(*p); p++)
604                     ;
605                 for (pe = p + strlen(p) - 1;
606                         pe > p && strchr(" ,/.:([", *pe); pe--)
607                     *pe = '\0';
608                 newm->data.text = nmem_strdup(se->nmem, p);
609
610             }
611             else if (md->type == Metadata_type_year)
612             {
613                 if (extract_years(value, &first, &last) < 0)
614                     continue;
615             }
616             else
617             {
618                 yaz_log(YLOG_WARN, "Unknown type in metadata element %s", type);
619                 continue;
620             }
621             if (md->type == Metadata_type_year && md->merge != Metadata_merge_range)
622             {
623                 yaz_log(YLOG_WARN, "Only range merging supported for years");
624                 continue;
625             }
626             if (md->merge == Metadata_merge_unique)
627             {
628                 struct record_metadata *mnode;
629                 for (mnode = *wheretoput; mnode; mnode = mnode->next)
630                     if (!strcmp(mnode->data.text, newm->data.text))
631                         break;
632                 if (!mnode)
633                 {
634                     newm->next = *wheretoput;
635                     *wheretoput = newm;
636                 }
637             }
638             else if (md->merge == Metadata_merge_longest)
639             {
640                 if (!*wheretoput ||
641                         strlen(newm->data.text) > strlen((*wheretoput)->data.text))
642                 {
643                     *wheretoput = newm;
644                     if (sk)
645                     {
646                         char *s = nmem_strdup(se->nmem, newm->data.text);
647                         if (!cluster->sortkeys[md->sortkey_offset])
648                             cluster->sortkeys[md->sortkey_offset] = 
649                                 nmem_malloc(se->nmem, sizeof(union data_types));
650                         normalize_mergekey(s,
651                                 (sk->type == Metadata_sortkey_skiparticle));
652                         cluster->sortkeys[md->sortkey_offset]->text = s;
653                     }
654                 }
655             }
656             else if (md->merge == Metadata_merge_all || md->merge == Metadata_merge_no)
657             {
658                 newm->next = *wheretoput;
659                 *wheretoput = newm;
660             }
661             else if (md->merge == Metadata_merge_range)
662             {
663                 assert(md->type == Metadata_type_year);
664                 if (!*wheretoput)
665                 {
666                     *wheretoput = newm;
667                     (*wheretoput)->data.number.min = first;
668                     (*wheretoput)->data.number.max = last;
669                     if (sk)
670                         cluster->sortkeys[md->sortkey_offset] = &newm->data;
671                 }
672                 else
673                 {
674                     if (first < (*wheretoput)->data.number.min)
675                         (*wheretoput)->data.number.min = first;
676                     if (last > (*wheretoput)->data.number.max)
677                         (*wheretoput)->data.number.max = last;
678                 }
679 #ifdef GAGA
680                 if (sk)
681                 {
682                     union data_types *sdata = cluster->sortkeys[md->sortkey_offset];
683                     yaz_log(YLOG_LOG, "SK range: %d-%d", sdata->number.min, sdata->number.max);
684                 }
685 #endif
686             }
687             else
688                 yaz_log(YLOG_WARN, "Don't know how to merge on element name %s", md->name);
689
690             if (md->rank)
691                 relevance_countwords(se->relevance, cluster, value, md->rank);
692             if (md->termlist)
693             {
694                 if (md->type == Metadata_type_year)
695                 {
696                     char year[64];
697                     sprintf(year, "%d", last);
698                     add_facet(se, type, year);
699                     if (first != last)
700                     {
701                         sprintf(year, "%d", first);
702                         add_facet(se, type, year);
703                     }
704                 }
705                 else
706                     add_facet(se, type, value);
707             }
708             xmlFree(type);
709             xmlFree(value);
710             type = value = 0;
711         }
712         else
713             yaz_log(YLOG_WARN, "Unexpected element %s in internal record", n->name);
714     }
715     if (type)
716         xmlFree(type);
717     if (value)
718         xmlFree(value);
719
720     xmlFreeDoc(xdoc);
721
722     relevance_donerecord(se->relevance, cluster);
723     se->total_records++;
724
725     return res;
726 }
727
728 static void ingest_records(struct client *cl, Z_Records *r)
729 {
730 #if USE_TIMING
731     yaz_timing_t t = yaz_timing_create();
732 #endif
733     struct record *rec;
734     struct session *s = cl->session;
735     Z_NamePlusRecordList *rlist;
736     int i;
737
738     if (r->which != Z_Records_DBOSD)
739         return;
740     rlist = r->u.databaseOrSurDiagnostics;
741     for (i = 0; i < rlist->num_records; i++)
742     {
743         Z_NamePlusRecord *npr = rlist->records[i];
744
745         cl->records++;
746         if (npr->which != Z_NamePlusRecord_databaseRecord)
747         {
748             yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
749             continue;
750         }
751
752         rec = ingest_record(cl, npr->u.databaseRecord);
753         if (!rec)
754             continue;
755     }
756     if (s->watchlist[SESSION_WATCH_RECORDS].fun && rlist->num_records)
757         session_alert_watch(s, SESSION_WATCH_RECORDS);
758
759 #if USE_TIMING
760     yaz_timing_stop(t);
761     yaz_log(YLOG_LOG, "ingest_records %6.5f %3.2f %3.2f", 
762             yaz_timing_get_real(t), yaz_timing_get_user(t),
763             yaz_timing_get_sys(t));
764     yaz_timing_destroy(&t);
765 #endif
766 }
767
768 static void do_presentResponse(IOCHAN i, Z_APDU *a)
769 {
770     struct connection *co = iochan_getdata(i);
771     struct client *cl = co->client;
772     Z_PresentResponse *r = a->u.presentResponse;
773
774     if (r->records) {
775         Z_Records *recs = r->records;
776         if (recs->which == Z_Records_NSD)
777         {
778             yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
779             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
780             cl->state = Client_Error;
781         }
782     }
783
784     if (!*r->presentStatus && cl->state != Client_Error)
785     {
786         yaz_log(YLOG_DEBUG, "Good Present response");
787         ingest_records(cl, r->records);
788         cl->state = Client_Idle;
789     }
790     else if (*r->presentStatus) 
791     {
792         yaz_log(YLOG_WARN, "Bad Present response");
793         cl->state = Client_Error;
794     }
795 }
796
797 static void handler(IOCHAN i, int event)
798 {
799     struct connection *co = iochan_getdata(i);
800     struct client *cl = co->client;
801     struct session *se = 0;
802
803     if (cl)
804         se = cl->session;
805     else
806     {
807         yaz_log(YLOG_WARN, "Destroying orphan connection");
808         connection_destroy(co);
809         return;
810     }
811
812     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
813     {
814         int errcode;
815         socklen_t errlen = sizeof(errcode);
816
817         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
818             &errlen) < 0 || errcode != 0)
819         {
820             client_fatal(cl);
821             return;
822         }
823         else
824         {
825             yaz_log(YLOG_DEBUG, "Connect OK");
826             co->state = Conn_Open;
827             if (cl)
828                 cl->state = Client_Connected;
829         }
830     }
831
832     else if (event & EVENT_INPUT)
833     {
834         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
835
836         if (len < 0)
837         {
838             yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from Z server");
839             connection_destroy(co);
840             return;
841         }
842         else if (len == 0)
843         {
844             yaz_log(YLOG_WARN, "EOF reading from Z server");
845             connection_destroy(co);
846             return;
847         }
848         else if (len > 1) // We discard input if we have no connection
849         {
850             co->state = Conn_Open;
851
852             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
853             {
854                 Z_APDU *a;
855
856                 odr_reset(global_parameters.odr_in);
857                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
858                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
859                 {
860                     client_fatal(cl);
861                     return;
862                 }
863                 switch (a->which)
864                 {
865                     case Z_APDU_initResponse:
866                         do_initResponse(i, a);
867                         break;
868                     case Z_APDU_searchResponse:
869                         do_searchResponse(i, a);
870                         break;
871                     case Z_APDU_presentResponse:
872                         do_presentResponse(i, a);
873                         break;
874                     default:
875                         yaz_log(YLOG_WARN, "Unexpected result from server");
876                         client_fatal(cl);
877                         return;
878                 }
879                 // We aren't expecting staggered output from target
880                 // if (cs_more(t->link))
881                 //    iochan_setevent(i, EVENT_INPUT);
882             }
883             else  // we throw away response and go to idle mode
884             {
885                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
886                 cl->state = Client_Idle;
887             }
888         }
889         /* if len==1 we do nothing but wait for more input */
890     }
891
892     if (cl->state == Client_Connected) {
893         send_init(i);
894     }
895
896     if (cl->state == Client_Idle)
897     {
898         if (cl->requestid != se->requestid && *se->query) {
899             send_search(i);
900         }
901         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
902             cl->records < cl->hits) {
903             send_present(i);
904         }
905     }
906 }
907
908 // Disassociate connection from client
909 static void connection_release(struct connection *co)
910 {
911     struct client *cl = co->client;
912
913     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
914     if (!cl)
915         return;
916     cl->connection = 0;
917     co->client = 0;
918 }
919
920 // Close connection and recycle structure
921 static void connection_destroy(struct connection *co)
922 {
923     struct host *h = co->host;
924     cs_close(co->link);
925     iochan_destroy(co->iochan);
926
927     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
928     if (h->connections == co)
929         h->connections = co->next;
930     else
931     {
932         struct connection *pco;
933         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
934             ;
935         if (pco)
936             pco->next = co->next;
937         else
938             abort();
939     }
940     if (co->client)
941     {
942         if (co->client->state != Client_Idle)
943             co->client->state = Client_Disconnected;
944         co->client->connection = 0;
945     }
946     co->next = connection_freelist;
947     connection_freelist = co;
948 }
949
950 // Creates a new connection for client, associated with the host of 
951 // client's database
952 static struct connection *connection_create(struct client *cl)
953 {
954     struct connection *new;
955     COMSTACK link; 
956     int res;
957     void *addr;
958
959
960     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
961         {
962             yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
963             exit(1);
964         }
965     
966     if (0 == strlen(global_parameters.zproxy_override)){
967         /* no Z39.50 proxy needed - direct connect */
968         yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->url);
969         
970         if (!(addr = cs_straddr(link, cl->database->host->ipport)))
971             {
972                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
973                         "Lookup of IP address %s failed", 
974                         cl->database->host->ipport);
975                 return 0;
976             }
977     
978     } else {
979         /* Z39.50 proxy connect */
980         yaz_log(YLOG_DEBUG, "Connection create %s proxy %s", 
981                 cl->database->url, global_parameters.zproxy_override);
982
983         yaz_log(YLOG_LOG, "Connection cs_create_host %s proxy %s", 
984                 cl->database->url, global_parameters.zproxy_override);
985         
986         if (!(addr = cs_straddr(link, global_parameters.zproxy_override)))
987             {
988                 yaz_log(YLOG_WARN|YLOG_ERRNO, 
989                         "Lookup of IP address %s failed", 
990                         global_parameters.zproxy_override);
991                 return 0;
992             }
993     }
994
995     res = cs_connect(link, addr);
996     if (res < 0)
997     {
998         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->url);
999         return 0;
1000     }
1001
1002     if ((new = connection_freelist))
1003         connection_freelist = new->next;
1004     else
1005     {
1006         new = xmalloc(sizeof (struct connection));
1007         new->ibuf = 0;
1008         new->ibufsize = 0;
1009     }
1010     new->state = Conn_Connecting;
1011     new->host = cl->database->host;
1012     new->next = new->host->connections;
1013     new->host->connections = new;
1014     new->client = cl;
1015     cl->connection = new;
1016     new->link = link;
1017
1018     new->iochan = iochan_create(cs_fileno(link), handler, 0);
1019     iochan_setdata(new->iochan, new);
1020     new->iochan->next = channel_list;
1021     channel_list = new->iochan;
1022     return new;
1023 }
1024
1025 // Close connection and set state to error
1026 static void client_fatal(struct client *cl)
1027 {
1028     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->url);
1029     connection_destroy(cl->connection);
1030     cl->state = Client_Error;
1031 }
1032
1033 // Ensure that client has a connection associated
1034 static int client_prep_connection(struct client *cl)
1035 {
1036     struct connection *co;
1037     struct session *se = cl->session;
1038     struct host *host = cl->database->host;
1039
1040     co = cl->connection;
1041
1042     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->url);
1043
1044     if (!co)
1045     {
1046         // See if someone else has an idle connection
1047         // We should look at timestamps here to select the longest-idle connection
1048         for (co = host->connections; co; co = co->next)
1049             if (co->state == Conn_Open && (!co->client || co->client->session != se))
1050                 break;
1051         if (co)
1052         {
1053             connection_release(co);
1054             cl->connection = co;
1055             co->client = cl;
1056         }
1057         else
1058             co = connection_create(cl);
1059     }
1060     if (co)
1061     {
1062         if (co->state == Conn_Connecting)
1063         {
1064             cl->state = Client_Connecting;
1065             iochan_setflag(co->iochan, EVENT_OUTPUT);
1066         }
1067         else if (co->state == Conn_Open)
1068         {
1069             if (cl->state == Client_Error || cl->state == Client_Disconnected)
1070                 cl->state = Client_Idle;
1071             iochan_setflag(co->iochan, EVENT_OUTPUT);
1072         }
1073         return 1;
1074     }
1075     else
1076         return 0;
1077 }
1078
1079 #ifdef GAGA // Moved to database.c
1080
1081 // This function will most likely vanish when a proper target profile mechanism is
1082 // introduced.
1083 void load_simpletargets(const char *fn)
1084 {
1085     FILE *f = fopen(fn, "r");
1086     char line[256];
1087
1088     if (!f)
1089     {
1090         yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
1091         exit(1);
1092     }
1093
1094     while (fgets(line, 255, f))
1095     {
1096         char *url, *db;
1097         char *name;
1098         struct host *host;
1099         struct database *database;
1100
1101         if (strncmp(line, "target ", 7))
1102             continue;
1103         line[strlen(line) - 1] = '\0';
1104
1105         if ((name = strchr(line, ';')))
1106             *(name++) = '\0';
1107
1108         url = line + 7;
1109         if ((db = strchr(url, '/')))
1110             *(db++) = '\0';
1111         else
1112             db = "Default";
1113
1114         yaz_log(YLOG_LOG, "Target: %s, '%s'", url, db);
1115         for (host = hosts; host; host = host->next)
1116             if (!strcmp(url, host->hostport))
1117                 break;
1118         if (!host)
1119         {
1120             struct addrinfo *addrinfo, hints;
1121             char *port;
1122             char ipport[128];
1123             unsigned char addrbuf[4];
1124             int res;
1125
1126             host = xmalloc(sizeof(struct host));
1127             host->hostport = xstrdup(url);
1128             host->connections = 0;
1129
1130             if ((port = strchr(url, ':')))
1131                 *(port++) = '\0';
1132             else
1133                 port = "210";
1134
1135             hints.ai_flags = 0;
1136             hints.ai_family = PF_INET;
1137             hints.ai_socktype = SOCK_STREAM;
1138             hints.ai_protocol = IPPROTO_TCP;
1139             hints.ai_addrlen = 0;
1140             hints.ai_addr = 0;
1141             hints.ai_canonname = 0;
1142             hints.ai_next = 0;
1143             // This is not robust code. It assumes that getaddrinfo returns AF_INET
1144             // address.
1145             if ((res = getaddrinfo(url, port, &hints, &addrinfo)))
1146             {
1147                 yaz_log(YLOG_WARN, "Failed to resolve %s: %s", url, gai_strerror(res));
1148                 xfree(host->hostport);
1149                 xfree(host);
1150                 continue;
1151             }
1152             assert(addrinfo->ai_family == PF_INET);
1153             memcpy(addrbuf, &((struct sockaddr_in*)addrinfo->ai_addr)->sin_addr.s_addr, 4);
1154             sprintf(ipport, "%u.%u.%u.%u:%s",
1155                     addrbuf[0], addrbuf[1], addrbuf[2], addrbuf[3], port);
1156             host->ipport = xstrdup(ipport);
1157             freeaddrinfo(addrinfo);
1158             host->next = hosts;
1159             hosts = host;
1160         }
1161         database = xmalloc(sizeof(struct database));
1162         database->host = host;
1163         database->url = xmalloc(strlen(url) + strlen(db) + 2);
1164         strcpy(database->url, url);
1165         strcat(database->url, "/");
1166         strcat(database->url, db);
1167         if (name)
1168             database->name = xstrdup(name);
1169         else
1170             database->name = 0;
1171         
1172         database->databases = xmalloc(2 * sizeof(char *));
1173         database->databases[0] = xstrdup(db);
1174         database->databases[1] = 0;
1175         database->errors = 0;
1176         database->qprofile = 0;
1177         database->rprofile = database_retrieval_profile(database);
1178         database->next = databases;
1179         databases = database;
1180
1181     }
1182     fclose(f);
1183 }
1184
1185 #endif
1186
1187 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
1188 {
1189     switch (n->kind)
1190     {
1191         case CCL_RPN_AND:
1192         case CCL_RPN_OR:
1193         case CCL_RPN_NOT:
1194         case CCL_RPN_PROX:
1195             pull_terms(nmem, n->u.p[0], termlist, num);
1196             pull_terms(nmem, n->u.p[1], termlist, num);
1197             break;
1198         case CCL_RPN_TERM:
1199             termlist[(*num)++] = nmem_strdup(nmem, n->u.t.term);
1200             break;
1201         default: // NOOP
1202             break;
1203     }
1204 }
1205
1206 // Extract terms from query into null-terminated termlist
1207 static int extract_terms(NMEM nmem, char *query, char **termlist)
1208 {
1209     int error, pos;
1210     struct ccl_rpn_node *n;
1211     int num = 0;
1212
1213     n = ccl_find_str(global_parameters.ccl_filter, query, &error, &pos);
1214     if (!n)
1215         return -1;
1216     pull_terms(nmem, n, termlist, &num);
1217     termlist[num] = 0;
1218     ccl_rpn_delete(n);
1219     return 0;
1220 }
1221
1222 static struct client *client_create(void)
1223 {
1224     struct client *r;
1225     if (client_freelist)
1226     {
1227         r = client_freelist;
1228         client_freelist = client_freelist->next;
1229     }
1230     else
1231         r = xmalloc(sizeof(struct client));
1232     r->database = 0;
1233     r->connection = 0;
1234     r->session = 0;
1235     r->hits = 0;
1236     r->records = 0;
1237     r->setno = 0;
1238     r->requestid = -1;
1239     r->diagnostic = 0;
1240     r->state = Client_Disconnected;
1241     r->next = 0;
1242     return r;
1243 }
1244
1245 void client_destroy(struct client *c)
1246 {
1247     struct session *se = c->session;
1248     if (c == se->clients)
1249         se->clients = c->next;
1250     else
1251     {
1252         struct client *cc;
1253         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
1254             ;
1255         if (cc)
1256             cc->next = c->next;
1257     }
1258     if (c->connection)
1259         connection_release(c->connection);
1260     c->next = client_freelist;
1261     client_freelist = c;
1262 }
1263
1264 void session_set_watch(struct session *s, int what, session_watchfun fun, void *data)
1265 {
1266     s->watchlist[what].fun = fun;
1267     s->watchlist[what].data = data;
1268 }
1269
1270 void session_alert_watch(struct session *s, int what)
1271 {
1272     if (!s->watchlist[what].fun)
1273         return;
1274     (*s->watchlist[what].fun)(s->watchlist[what].data);
1275     s->watchlist[what].fun = 0;
1276     s->watchlist[what].data = 0;
1277 }
1278
1279 //callback for grep_databases
1280 static void select_targets_callback(void *context, struct database *db)
1281 {
1282     struct session *se = (struct session*) context;
1283     struct client *cl = client_create();
1284     cl->database = db;
1285     cl->session = se;
1286     cl->next = se->clients;
1287     se->clients = cl;
1288 }
1289
1290 // This should be extended with parameters to control selection criteria
1291 // Associates a set of clients with a session;
1292 int select_targets(struct session *se, struct database_criterion *crit)
1293 {
1294     while (se->clients)
1295         client_destroy(se->clients);
1296
1297     return grep_databases(se, crit, select_targets_callback);
1298 }
1299
1300 int session_active_clients(struct session *s)
1301 {
1302     struct client *c;
1303     int res = 0;
1304
1305     for (c = s->clients; c; c = c->next)
1306         if (c->connection && (c->state == Client_Connecting ||
1307                     c->state == Client_Initializing ||
1308                     c->state == Client_Searching ||
1309                     c->state == Client_Presenting))
1310             res++;
1311
1312     return res;
1313 }
1314
1315 // parses crit1=val1,crit2=val2|val3,...
1316 static struct database_criterion *parse_filter(NMEM m, const char *buf)
1317 {
1318     struct database_criterion *res = 0;
1319     char **values;
1320     int num;
1321     int i;
1322
1323     if (!buf || !*buf)
1324         return 0;
1325     nmem_strsplit(m, ",", buf,  &values, &num);
1326     for (i = 0; i < num; i++)
1327     {
1328         char **subvalues;
1329         int subnum;
1330         int subi;
1331         struct database_criterion *new = nmem_malloc(m, sizeof(*new));
1332         char *eq = strchr(values[i], '=');
1333         if (!eq)
1334         {
1335             yaz_log(YLOG_WARN, "Missing equal-sign in filter");
1336             return 0;
1337         }
1338         *(eq++) = '\0';
1339         new->name = values[i];
1340         nmem_strsplit(m, "|", eq, &subvalues, &subnum);
1341         new->values = 0;
1342         for (subi = 0; subi < subnum; subi++)
1343         {
1344             struct database_criterion_value *newv = nmem_malloc(m, sizeof(*newv));
1345             newv->value = subvalues[subi];
1346             newv->next = new->values;
1347             new->values = newv;
1348         }
1349         new->next = res;
1350         res = new;
1351     }
1352     return res;
1353 }
1354
1355 char *search(struct session *se, char *query, char *filter)
1356 {
1357     int live_channels = 0;
1358     struct client *cl;
1359     struct database_criterion *criteria;
1360
1361     yaz_log(YLOG_DEBUG, "Search");
1362
1363     nmem_reset(se->nmem);
1364     criteria = parse_filter(se->nmem, filter);
1365     strcpy(se->query, query);
1366     se->requestid++;
1367     // Release any existing clients
1368     select_targets(se, criteria);
1369     for (cl = se->clients; cl; cl = cl->next)
1370     {
1371         if (client_prep_connection(cl))
1372             live_channels++;
1373     }
1374     if (live_channels)
1375     {
1376         char *p[512];
1377         int maxrecs = live_channels * global_parameters.toget;
1378         se->num_termlists = 0;
1379         se->reclist = reclist_create(se->nmem, maxrecs);
1380         extract_terms(se->nmem, query, p);
1381         se->relevance = relevance_create(se->nmem, (const char **) p, maxrecs);
1382         se->total_records = se->total_hits = se->total_merged = 0;
1383         se->expected_maxrecs = maxrecs;
1384     }
1385     else
1386         return "NOTARGETS";
1387
1388     return 0;
1389 }
1390
1391 void destroy_session(struct session *s)
1392 {
1393     yaz_log(YLOG_LOG, "Destroying session");
1394     while (s->clients)
1395         client_destroy(s->clients);
1396     nmem_destroy(s->nmem);
1397     wrbuf_destroy(s->wrbuf);
1398 }
1399
1400 struct session *new_session() 
1401 {
1402     int i;
1403     struct session *session = xmalloc(sizeof(*session));
1404
1405     yaz_log(YLOG_DEBUG, "New pazpar2 session");
1406     
1407     session->total_hits = 0;
1408     session->total_records = 0;
1409     session->num_termlists = 0;
1410     session->reclist = 0;
1411     session->requestid = -1;
1412     session->clients = 0;
1413     session->expected_maxrecs = 0;
1414     session->query[0] = '\0';
1415     session->nmem = nmem_create();
1416     session->wrbuf = wrbuf_alloc();
1417     for (i = 0; i <= SESSION_WATCH_MAX; i++)
1418     {
1419         session->watchlist[i].data = 0;
1420         session->watchlist[i].fun = 0;
1421     }
1422
1423     return session;
1424 }
1425
1426 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1427 {
1428     static struct hitsbytarget res[1000]; // FIXME MM
1429     struct client *cl;
1430
1431     *count = 0;
1432     for (cl = se->clients; cl; cl = cl->next)
1433     {
1434         res[*count].id = cl->database->url;
1435         res[*count].name = cl->database->name;
1436         res[*count].hits = cl->hits;
1437         res[*count].records = cl->records;
1438         res[*count].diagnostic = cl->diagnostic;
1439         res[*count].state = client_states[cl->state];
1440         res[*count].connected  = cl->connection ? 1 : 0;
1441         (*count)++;
1442     }
1443
1444     return res;
1445 }
1446
1447 struct termlist_score **termlist(struct session *s, const char *name, int *num)
1448 {
1449     int i;
1450
1451     for (i = 0; i < s->num_termlists; i++)
1452         if (!strcmp(s->termlists[i].name, name))
1453             return termlist_highscore(s->termlists[i].termlist, num);
1454     return 0;
1455 }
1456
1457 #ifdef MISSING_HEADERS
1458 void report_nmem_stats(void)
1459 {
1460     size_t in_use, is_free;
1461
1462     nmem_get_memory_in_use(&in_use);
1463     nmem_get_memory_free(&is_free);
1464
1465     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
1466             (long) in_use, (long) is_free);
1467 }
1468 #endif
1469
1470 struct record_cluster *show_single(struct session *s, int id)
1471 {
1472     struct record_cluster *r;
1473
1474     reclist_rewind(s->reclist);
1475     while ((r = reclist_read_record(s->reclist)))
1476         if (r->recid == id)
1477             return r;
1478     return 0;
1479 }
1480
1481 struct record_cluster **show(struct session *s, struct reclist_sortparms *sp, int start,
1482         int *num, int *total, int *sumhits, NMEM nmem_show)
1483 {
1484     struct record_cluster **recs = nmem_malloc(nmem_show, *num 
1485                                        * sizeof(struct record_cluster *));
1486     struct reclist_sortparms *spp;
1487     int i;
1488 #if USE_TIMING    
1489     yaz_timing_t t = yaz_timing_create();
1490 #endif
1491
1492     for (spp = sp; spp; spp = spp->next)
1493         if (spp->type == Metadata_sortkey_relevance)
1494         {
1495             relevance_prepare_read(s->relevance, s->reclist);
1496             break;
1497         }
1498     reclist_sort(s->reclist, sp);
1499
1500     *total = s->reclist->num_records;
1501     *sumhits = s->total_hits;
1502
1503     for (i = 0; i < start; i++)
1504         if (!reclist_read_record(s->reclist))
1505         {
1506             *num = 0;
1507             recs = 0;
1508             break;
1509         }
1510
1511     for (i = 0; i < *num; i++)
1512     {
1513         struct record_cluster *r = reclist_read_record(s->reclist);
1514         if (!r)
1515         {
1516             *num = i;
1517             break;
1518         }
1519         recs[i] = r;
1520     }
1521 #if USE_TIMING
1522     yaz_timing_stop(t);
1523     yaz_log(YLOG_LOG, "show %6.5f %3.2f %3.2f", 
1524             yaz_timing_get_real(t), yaz_timing_get_user(t),
1525             yaz_timing_get_sys(t));
1526     yaz_timing_destroy(&t);
1527 #endif
1528     return recs;
1529 }
1530
1531 void statistics(struct session *se, struct statistics *stat)
1532 {
1533     struct client *cl;
1534     int count = 0;
1535
1536     memset(stat, 0, sizeof(*stat));
1537     for (cl = se->clients; cl; cl = cl->next)
1538     {
1539         if (!cl->connection)
1540             stat->num_no_connection++;
1541         switch (cl->state)
1542         {
1543             case Client_Connecting: stat->num_connecting++; break;
1544             case Client_Initializing: stat->num_initializing++; break;
1545             case Client_Searching: stat->num_searching++; break;
1546             case Client_Presenting: stat->num_presenting++; break;
1547             case Client_Idle: stat->num_idle++; break;
1548             case Client_Failed: stat->num_failed++; break;
1549             case Client_Error: stat->num_error++; break;
1550             default: break;
1551         }
1552         count++;
1553     }
1554     stat->num_hits = se->total_hits;
1555     stat->num_records = se->total_records;
1556
1557     stat->num_clients = count;
1558 }
1559
1560 static CCL_bibset load_cclfile(const char *fn)
1561 {
1562     CCL_bibset res = ccl_qual_mk();
1563     if (ccl_qual_fname(res, fn) < 0)
1564     {
1565         yaz_log(YLOG_FATAL|YLOG_ERRNO, "%s", fn);
1566         exit(1);
1567     }
1568     return res;
1569 }
1570
1571 static void start_http_listener(void)
1572 {
1573     char hp[128] = "";
1574     struct conf_server *ser = global_parameters.server;
1575
1576     if (*global_parameters.listener_override)
1577         strcpy(hp, global_parameters.listener_override);
1578     else
1579     {
1580         strcpy(hp, ser->host ? ser->host : "");
1581         if (ser->port)
1582         {
1583             if (*hp)
1584                 strcat(hp, ":");
1585             sprintf(hp + strlen(hp), "%d", ser->port);
1586         }
1587     }
1588     http_init(hp);
1589 }
1590
1591 static void start_proxy(void)
1592 {
1593     char hp[128] = "";
1594     struct conf_server *ser = global_parameters.server;
1595
1596     if (*global_parameters.proxy_override)
1597         strcpy(hp, global_parameters.proxy_override);
1598     else if (ser->proxy_host || ser->proxy_port)
1599     {
1600         strcpy(hp, ser->proxy_host ? ser->proxy_host : "");
1601         if (ser->proxy_port)
1602         {
1603             if (*hp)
1604                 strcat(hp, ":");
1605             sprintf(hp + strlen(hp), "%d", ser->proxy_port);
1606         }
1607     }
1608     else
1609         return;
1610
1611     http_set_proxyaddr(hp, ser->myurl ? ser->myurl : "");
1612 }
1613
1614 int main(int argc, char **argv)
1615 {
1616     int ret;
1617     char *arg;
1618
1619     if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
1620         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
1621
1622     yaz_log_init(YLOG_DEFAULT_LEVEL, "pazpar2", 0);
1623
1624     while ((ret = options("f:x:h:p:z:C:s:d", argv, argc, &arg)) != -2)
1625     {
1626         switch (ret) {
1627             case 'f':
1628                 if (!read_config(arg))
1629                     exit(1);
1630                 break;
1631             case 'h':
1632                 strcpy(global_parameters.listener_override, arg);
1633                 break;
1634             case 'C':
1635                 global_parameters.ccl_filter = load_cclfile(arg);
1636                 break;
1637             case 'p':
1638                 strcpy(global_parameters.proxy_override, arg);
1639                 break;
1640             case 'z':
1641                 strcpy(global_parameters.zproxy_override, arg);
1642                 break;
1643             case 's':
1644                 load_simpletargets(arg);
1645                 break;
1646             case 'd':
1647                 global_parameters.dump_records = 1;
1648                 break;
1649             default:
1650                 fprintf(stderr, "Usage: pazpar2\n"
1651                         "    -f configfile\n"
1652                         "    -h [host:]port          (REST protocol listener)\n"
1653                         "    -C cclconfig\n"
1654                         "    -s simpletargetfile\n"
1655                         "    -p hostname[:portno]    (HTTP proxy)\n"
1656                         "    -z hostname[:portno]    (Z39.50 proxy)\n"
1657                         "    -d                      (show internal records)\n");
1658                 exit(1);
1659         }
1660     }
1661
1662     if (!config)
1663     {
1664         yaz_log(YLOG_FATAL, "Load config with -f");
1665         exit(1);
1666     }
1667     global_parameters.server = config->servers;
1668
1669     start_http_listener();
1670     start_proxy();
1671     if (!global_parameters.ccl_filter)
1672         global_parameters.ccl_filter = load_cclfile("../etc/default.bib");
1673     global_parameters.yaz_marc = yaz_marc_create();
1674     yaz_marc_subfield_str(global_parameters.yaz_marc, "\t");
1675     global_parameters.odr_in = odr_createmem(ODR_DECODE);
1676     global_parameters.odr_out = odr_createmem(ODR_ENCODE);
1677
1678     event_loop(&channel_list);
1679
1680     return 0;
1681 }
1682
1683 /*
1684  * Local variables:
1685  * c-basic-offset: 4
1686  * indent-tabs-mode: nil
1687  * End:
1688  * vim: shiftwidth=4 tabstop=8 expandtab
1689  */