Basic configuration functionality in place (not yet used)
[pazpar2-moved-to-github.git] / src / pazpar2.c
1 /* $Id: pazpar2.c,v 1.6 2006-12-27 21:11:10 quinn Exp $ */;
2
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <sys/time.h>
7 #include <unistd.h>
8 #include <sys/socket.h>
9 #include <netdb.h>
10 #include <signal.h>
11 #include <ctype.h>
12 #include <assert.h>
13
14 #include <yaz/comstack.h>
15 #include <yaz/tcpip.h>
16 #include <yaz/proto.h>
17 #include <yaz/readconf.h>
18 #include <yaz/pquery.h>
19 #include <yaz/yaz-util.h>
20 #include <yaz/nmem.h>
21
22 #include "pazpar2.h"
23 #include "eventl.h"
24 #include "command.h"
25 #include "http.h"
26 #include "termlists.h"
27 #include "reclists.h"
28 #include "relevance.h"
29 #include "config.h"
30
31 #define PAZPAR2_VERSION "0.1"
32 #define MAX_CHUNK 15
33
34 static void client_fatal(struct client *cl);
35 static void connection_destroy(struct connection *co);
36 static int client_prep_connection(struct client *cl);
37 static void ingest_records(struct client *cl, Z_Records *r);
38 void session_alert_watch(struct session *s, int what);
39
40 IOCHAN channel_list = 0;  // Master list of connections we're handling events to
41
42 static struct connection *connection_freelist = 0;
43 static struct client *client_freelist = 0;
44
45 static struct host *hosts = 0;  // The hosts we know about 
46 static struct database *databases = 0; // The databases we know about
47
48 static char *client_states[] = {
49     "Client_Connecting",
50     "Client_Connected",
51     "Client_Idle",
52     "Client_Initializing",
53     "Client_Searching",
54     "Client_Presenting",
55     "Client_Error",
56     "Client_Failed",
57     "Client_Disconnected",
58     "Client_Stopped"
59 };
60
61 struct parameters global_parameters = 
62 {
63     30,
64     "81",
65     "Index Data PazPar2 (MasterKey)",
66     PAZPAR2_VERSION,
67     600, // 10 minutes
68     60,
69     100,
70     MAX_CHUNK,
71     0,
72     0,
73     0,
74     0,
75     0
76 };
77
78
79 static int send_apdu(struct client *c, Z_APDU *a)
80 {
81     struct connection *co = c->connection;
82     char *buf;
83     int len, r;
84
85     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
86     {
87         odr_perror(global_parameters.odr_out, "Encoding APDU");
88         abort();
89     }
90     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
91     r = cs_put(co->link, buf, len);
92     if (r < 0)
93     {
94         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
95         return -1;
96     }
97     else if (r == 1)
98     {
99         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
100         exit(1);
101     }
102     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
103     co->state = Conn_Waiting;
104     return 0;
105 }
106
107
108 static void send_init(IOCHAN i)
109 {
110     struct connection *co = iochan_getdata(i);
111     struct client *cl = co->client;
112     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
113
114     a->u.initRequest->implementationId = global_parameters.implementationId;
115     a->u.initRequest->implementationName = global_parameters.implementationName;
116     a->u.initRequest->implementationVersion =
117         global_parameters.implementationVersion;
118     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
119     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
120     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
121
122     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
123     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
124     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
125     if (send_apdu(cl, a) >= 0)
126     {
127         iochan_setflags(i, EVENT_INPUT);
128         cl->state = Client_Initializing;
129     }
130     else
131         cl->state = Client_Error;
132     odr_reset(global_parameters.odr_out);
133 }
134
135 static void send_search(IOCHAN i)
136 {
137     struct connection *co = iochan_getdata(i);
138     struct client *cl = co->client; 
139     struct session *se = cl->session;
140     struct database *db = cl->database;
141     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
142     int ndb, cerror, cpos;
143     char **databaselist;
144     Z_Query *zquery;
145     struct ccl_rpn_node *cn;
146     int ssub = 0, lslb = 100000, mspn = 10;
147
148     yaz_log(YLOG_DEBUG, "Sending search");
149
150     cn = ccl_find_str(global_parameters.ccl_filter, se->query, &cerror, &cpos);
151     if (!cn)
152         return;
153     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
154             sizeof(Z_Query));
155     zquery->which = Z_Query_type_1;
156     zquery->u.type_1 = ccl_rpn_query(global_parameters.odr_out, cn);
157     ccl_rpn_delete(cn);
158
159     for (ndb = 0; db->databases[ndb]; ndb++)
160         ;
161     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
162     for (ndb = 0; db->databases[ndb]; ndb++)
163         databaselist[ndb] = db->databases[ndb];
164
165     a->u.presentRequest->preferredRecordSyntax =
166             yaz_oidval_to_z3950oid(global_parameters.odr_out,
167             CLASS_RECSYN, VAL_USMARC);
168     a->u.searchRequest->smallSetUpperBound = &ssub;
169     a->u.searchRequest->largeSetLowerBound = &lslb;
170     a->u.searchRequest->mediumSetPresentNumber = &mspn;
171     a->u.searchRequest->resultSetName = "Default";
172     a->u.searchRequest->databaseNames = databaselist;
173     a->u.searchRequest->num_databaseNames = ndb;
174
175     if (send_apdu(cl, a) >= 0)
176     {
177         iochan_setflags(i, EVENT_INPUT);
178         cl->state = Client_Searching;
179         cl->requestid = se->requestid;
180     }
181     else
182         cl->state = Client_Error;
183
184     odr_reset(global_parameters.odr_out);
185 }
186
187 static void send_present(IOCHAN i)
188 {
189     struct connection *co = iochan_getdata(i);
190     struct client *cl = co->client; 
191     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
192     int toget;
193     int start = cl->records + 1;
194
195     toget = global_parameters.chunk;
196     if (toget > cl->hits - cl->records)
197         toget = cl->hits - cl->records;
198
199     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
200
201     a->u.presentRequest->resultSetStartPoint = &start;
202     a->u.presentRequest->numberOfRecordsRequested = &toget;
203
204     a->u.presentRequest->resultSetId = "Default";
205
206     a->u.presentRequest->preferredRecordSyntax =
207             yaz_oidval_to_z3950oid(global_parameters.odr_out,
208             CLASS_RECSYN, VAL_USMARC);
209
210     if (send_apdu(cl, a) >= 0)
211     {
212         iochan_setflags(i, EVENT_INPUT);
213         cl->state = Client_Presenting;
214     }
215     else
216         cl->state = Client_Error;
217     odr_reset(global_parameters.odr_out);
218 }
219
220 static void do_initResponse(IOCHAN i, Z_APDU *a)
221 {
222     struct connection *co = iochan_getdata(i);
223     struct client *cl = co->client;
224     Z_InitResponse *r = a->u.initResponse;
225
226     yaz_log(YLOG_DEBUG, "Received init response");
227
228     if (*r->result)
229     {
230         cl->state = Client_Idle;
231     }
232     else
233         cl->state = Client_Failed; // FIXME need to do something to the connection
234 }
235
236 static void do_searchResponse(IOCHAN i, Z_APDU *a)
237 {
238     struct connection *co = iochan_getdata(i);
239     struct client *cl = co->client;
240     struct session *se = cl->session;
241     Z_SearchResponse *r = a->u.searchResponse;
242
243     yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
244
245     if (*r->searchStatus)
246     {
247         cl->hits = *r->resultCount;
248         se->total_hits += cl->hits;
249         if (r->presentStatus && !*r->presentStatus && r->records)
250         {
251             yaz_log(YLOG_DEBUG, "Records in search response");
252             cl->records += *r->numberOfRecordsReturned;
253             ingest_records(cl, r->records);
254         }
255         cl->state = Client_Idle;
256     }
257     else
258     {          /*"FAILED"*/
259         cl->hits = 0;
260         cl->state = Client_Error;
261         if (r->records) {
262             Z_Records *recs = r->records;
263             if (recs->which == Z_Records_NSD)
264             {
265                 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
266                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
267                 cl->state = Client_Error;
268             }
269         }
270     }
271 }
272
273 const char *find_field(const char *rec, const char *field)
274 {
275     char lbuf[5];
276     char *line;
277
278     lbuf[0] = '\n';
279     strcpy(lbuf + 1, field);
280
281     if ((line = strstr(rec, lbuf)))
282         return ++line;
283     else
284         return 0;
285 }
286
287 const char *find_subfield(const char *field, char subfield)
288 {
289     const char *p = field;
290
291     while (*p && *p != '\n')
292     {
293         while (*p != '\n' && *p != '\t')
294             p++;
295         if (*p == '\t' && *(++p) == subfield) {
296             if (*(++p) == ' ')
297             {
298                 while (isspace(*p))
299                     p++;
300                 return p;
301             }
302         }
303     }
304     return 0;
305 }
306
307 // Extract 245 $a $b 100 $a
308 char *extract_title(struct session *s, const char *rec)
309 {
310     const char *field, *subfield;
311     char *e, *ef;
312     unsigned char *obuf, *p;
313
314     wrbuf_rewind(s->wrbuf);
315
316     if (!(field = find_field(rec, "245")))
317         return 0;
318     if (!(subfield = find_subfield(field, 'a')))
319         return 0;
320     ef = index(subfield, '\n');
321     if ((e = index(subfield, '\t')) && e < ef)
322         ef = e;
323     if (ef)
324     {
325         wrbuf_write(s->wrbuf, subfield, ef - subfield);
326         if ((subfield = find_subfield(field, 'b'))) 
327         {
328             ef = index(subfield, '\n');
329             if ((e = index(subfield, '\t')) && e < ef)
330                 ef = e;
331             if (ef)
332             {
333                 wrbuf_putc(s->wrbuf, ' ');
334                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
335             }
336         }
337     }
338     if ((field = find_field(rec, "100")))
339     {
340         if ((subfield = find_subfield(field, 'a')))
341         {
342             ef = index(subfield, '\n');
343             if ((e = index(subfield, '\t')) && e < ef)
344                 ef = e;
345             if (ef)
346             {
347                 wrbuf_puts(s->wrbuf, ", by ");
348                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
349             }
350         }
351     }
352     wrbuf_putc(s->wrbuf, '\0');
353     obuf = (unsigned char*) nmem_strdup(s->nmem, wrbuf_buf(s->wrbuf));
354     for (p = obuf; *p; p++)
355         if (*p == '&' || *p == '<' || *p > 122 || *p < ' ')
356             *p = ' ';
357     return (char*) obuf;
358 }
359
360 // Extract 245 $a $b 100 $a
361 char *extract_mergekey(struct session *s, const char *rec)
362 {
363     const char *field, *subfield;
364     char *e, *ef;
365     char *out, *p, *pout;
366
367     wrbuf_rewind(s->wrbuf);
368
369     if (!(field = find_field(rec, "245")))
370         return 0;
371     if (!(subfield = find_subfield(field, 'a')))
372         return 0;
373     ef = index(subfield, '\n');
374     if ((e = index(subfield, '\t')) && e < ef)
375         ef = e;
376     if (ef)
377     {
378         wrbuf_write(s->wrbuf, subfield, ef - subfield);
379         if ((subfield = find_subfield(field, 'b'))) 
380         {
381             ef = index(subfield, '\n');
382             if ((e = index(subfield, '\t')) && e < ef)
383                 ef = e;
384             if (ef)
385             {
386                 wrbuf_puts(s->wrbuf, " field "); 
387                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
388             }
389         }
390     }
391     if ((field = find_field(rec, "100")))
392     {
393         if ((subfield = find_subfield(field, 'a')))
394         {
395             ef = index(subfield, '\n');
396             if ((e = index(subfield, '\t')) && e < ef)
397                 ef = e;
398             if (ef)
399             {
400                 wrbuf_puts(s->wrbuf, " field "); 
401                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
402             }
403         }
404     }
405     wrbuf_putc(s->wrbuf, '\0');
406     p = wrbuf_buf(s->wrbuf);
407     out = pout = nmem_malloc(s->nmem, strlen(p) + 1);
408
409     while (*p)
410     {
411         while (isalnum(*p))
412             *(pout++) = tolower(*(p++));
413         while (*p && !isalnum(*p))
414             p++;
415         *(pout++) = ' ';
416     }
417     if (out != pout)
418         *(--pout) = '\0';
419
420     return out;
421 }
422
423 #ifdef RECHEAP
424 static void push_record(struct session *s, struct record *r)
425 {
426     int p;
427     assert(s->recheap_max + 1 < s->recheap_size);
428
429     s->recheap[p = ++s->recheap_max] = r;
430     while (p > 0)
431     {
432         int parent = (p - 1) >> 1;
433         if (strcmp(s->recheap[p]->merge_key, s->recheap[parent]->merge_key) < 0)
434         {
435             struct record *tmp;
436             tmp = s->recheap[parent];
437             s->recheap[parent] = s->recheap[p];
438             s->recheap[p] = tmp;
439             p = parent;
440         }
441         else
442             break;
443     }
444 }
445
446 static struct record *top_record(struct session *s)
447 {
448     return s-> recheap_max >= 0 ?  s->recheap[0] : 0;
449 }
450
451 static struct record *pop_record(struct session *s)
452 {
453     struct record *res;
454     int p = 0;
455     int lastnonleaf = (s->recheap_max - 1) >> 1;
456
457     if (s->recheap_max < 0)
458         return 0;
459
460     res = s->recheap[0];
461
462     s->recheap[p] = s->recheap[s->recheap_max--];
463
464     while (p <= lastnonleaf)
465     {
466         int right = (p + 1) << 1;
467         int left = right - 1;
468         int min = left;
469
470         if (right < s->recheap_max &&
471                 strcmp(s->recheap[right]->merge_key, s->recheap[left]->merge_key) < 0)
472             min = right;
473         if (strcmp(s->recheap[min]->merge_key, s->recheap[p]->merge_key) < 0)
474         {
475             struct record *tmp = s->recheap[min];
476             s->recheap[min] = s->recheap[p];
477             s->recheap[p] = tmp;
478             p = min;
479         }
480         else
481             break;
482     }
483     return res;
484 }
485
486 // Like pop_record but collapses identical (merge_key) records
487 // The heap will contain multiple independent matching records and possibly
488 // one cluster, created the last time the list was scanned
489 static struct record *pop_mrecord(struct session *s)
490 {
491     struct record *this;
492     struct record *next;
493
494     if (!(this = pop_record(s)))
495         return 0;
496
497     // Collapse identical records
498     while ((next = top_record(s)))
499     {
500         struct record *p, *tmpnext;
501         if (strcmp(this->merge_key, next->merge_key))
502             break;
503         // Absorb record (and clustersiblings) into a supercluster
504         for (p = next; p; p = tmpnext) {
505             tmpnext = p->next_cluster;
506             p->next_cluster = this->next_cluster;
507             this->next_cluster = p;
508         }
509
510         pop_record(s);
511     }
512     return this;
513 }
514
515 // Reads records in sort order. Store records in top of heapspace until rewind is called.
516 static struct record *read_recheap(struct session *s)
517 {
518     struct record *r = pop_mrecord(s);
519
520     if (r)
521     {
522         if (s->recheap_scratch < 0)
523             s->recheap_scratch = s->recheap_size;
524         s->recheap[--s->recheap_scratch] = r;
525     }
526
527     return r;
528 }
529
530 // Return records to heap after read
531 static void rewind_recheap(struct session *s)
532 {
533     while (s->recheap_scratch >= 0) {
534         push_record(s, s->recheap[s->recheap_scratch++]);
535         if (s->recheap_scratch >= s->recheap_size)
536             s->recheap_scratch = -1;
537     }
538 }
539
540 #endif
541
542 // FIXME needs to be generalized. Should flexibly generate X lists per search
543 static void extract_subject(struct session *s, const char *rec)
544 {
545     const char *field, *subfield;
546
547     while ((field = find_field(rec, "650")))
548     {
549         rec = field; 
550         if ((subfield = find_subfield(field, 'a')))
551         {
552             char *e, *ef;
553             char buf[1024];
554             int len;
555
556             ef = index(subfield, '\n');
557             if (!ef)
558                 return;
559             if ((e = index(subfield, '\t')) && e < ef)
560                 ef = e;
561             while (ef > subfield && !isalpha(*(ef - 1)) && *(ef - 1) != ')')
562                 ef--;
563             len = ef - subfield;
564             assert(len < 1023);
565             memcpy(buf, subfield, len);
566             buf[len] = '\0';
567             if (*buf)
568                 termlist_insert(s->termlist, buf);
569         }
570     }
571 }
572
573 static void pull_relevance_field(struct session *s, struct record *head, const char *rec,
574         char *field, int mult)
575 {
576     const char *fb;
577     while ((fb = find_field(rec, field)))
578     {
579         char *ffield = strchr(fb, '\t');
580         if (!ffield)
581             return;
582         char *eol = strchr(ffield, '\n');
583         if (!eol)
584             return;
585         relevance_countwords(s->relevance, head, ffield, eol - ffield, mult);
586         rec = field + 1; // Crude way to cause a loop through repeating fields
587     }
588 }
589
590 static void pull_relevance_keys(struct session *s, struct record *head,  struct record *rec)
591 {
592     relevance_newrec(s->relevance, head);
593     pull_relevance_field(s, head, rec->buf, "100", 2);
594     pull_relevance_field(s, head, rec->buf, "245", 4);
595     //pull_relevance_field(s, head, rec->buf, "530", 1);
596     pull_relevance_field(s, head, rec->buf, "630", 1);
597     pull_relevance_field(s, head, rec->buf, "650", 1);
598     pull_relevance_field(s, head, rec->buf, "700", 1);
599     relevance_donerecord(s->relevance, head);
600 }
601
602 static struct record *ingest_record(struct client *cl, char *buf, int len)
603 {
604     struct session *se = cl->session;
605     struct record *res;
606     struct record *head;
607     const char *recbuf;
608
609     wrbuf_rewind(se->wrbuf);
610     yaz_marc_xml(global_parameters.yaz_marc, YAZ_MARC_LINE);
611     if (yaz_marc_decode_wrbuf(global_parameters.yaz_marc, buf, len, se->wrbuf) < 0)
612     {
613         yaz_log(YLOG_WARN, "Failed to decode MARC record");
614         return 0;
615     }
616     wrbuf_putc(se->wrbuf, '\0');
617     recbuf = wrbuf_buf(se->wrbuf);
618
619     res = nmem_malloc(se->nmem, sizeof(struct record));
620     res->buf = nmem_strdup(se->nmem, recbuf);
621
622     extract_subject(se, res->buf);
623
624     res->title = extract_title(se, res->buf);
625     res->merge_key = extract_mergekey(se, res->buf);
626     if (!res->merge_key)
627         return 0;
628     res->client = cl;
629     res->next_cluster = 0;
630     res->target_offset = -1;
631     res->term_frequency_vec = 0;
632
633     head = reclist_insert(se->reclist, res);
634
635     pull_relevance_keys(se, head, res);
636
637     se->total_records++;
638
639     return res;
640 }
641
642 static void ingest_records(struct client *cl, Z_Records *r)
643 {
644     struct record *rec;
645     struct session *s = cl->session;
646     Z_NamePlusRecordList *rlist;
647     int i;
648
649     if (r->which != Z_Records_DBOSD)
650         return;
651     rlist = r->u.databaseOrSurDiagnostics;
652     for (i = 0; i < rlist->num_records; i++)
653     {
654         Z_NamePlusRecord *npr = rlist->records[i];
655         Z_External *e;
656         char *buf;
657         int len;
658
659         if (npr->which != Z_NamePlusRecord_databaseRecord)
660         {
661             yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
662             continue;
663         }
664         e = npr->u.databaseRecord;
665         if (e->which != Z_External_octet)
666         {
667             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
668             continue;
669         }
670         buf = (char*) e->u.octet_aligned->buf;
671         len = e->u.octet_aligned->len;
672
673         rec = ingest_record(cl, buf, len);
674         if (!rec)
675             continue;
676     }
677     if (s->watchlist[SESSION_WATCH_RECORDS].fun && rlist->num_records)
678         session_alert_watch(s, SESSION_WATCH_RECORDS);
679 }
680
681 xsltStylesheetPtr load_stylesheet(const char *fname)
682 {
683     xsltStylesheetPtr ret;
684     if (!(ret = xsltParseStylesheetFile((const xmlChar *) fname)))
685     {
686         yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to load stylesheet %s", fname);
687         exit(1);
688     }
689     return ret;
690 }
691
692 static void do_presentResponse(IOCHAN i, Z_APDU *a)
693 {
694     struct connection *co = iochan_getdata(i);
695     struct client *cl = co->client;
696     Z_PresentResponse *r = a->u.presentResponse;
697
698     if (r->records) {
699         Z_Records *recs = r->records;
700         if (recs->which == Z_Records_NSD)
701         {
702             yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
703             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
704             cl->state = Client_Error;
705         }
706     }
707
708     if (!*r->presentStatus && cl->state != Client_Error)
709     {
710         yaz_log(YLOG_DEBUG, "Good Present response");
711         cl->records += *r->numberOfRecordsReturned;
712         ingest_records(cl, r->records);
713         cl->state = Client_Idle;
714     }
715     else if (*r->presentStatus) 
716     {
717         yaz_log(YLOG_WARN, "Bad Present response");
718         cl->state = Client_Error;
719     }
720 }
721
722 static void handler(IOCHAN i, int event)
723 {
724     struct connection *co = iochan_getdata(i);
725     struct client *cl = co->client;
726     struct session *se = 0;
727
728     if (cl)
729         se = cl->session;
730     else
731     {
732         yaz_log(YLOG_WARN, "Destroying orphan connection");
733         connection_destroy(co);
734         return;
735     }
736
737     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
738     {
739         int errcode;
740         socklen_t errlen = sizeof(errcode);
741
742         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
743             &errlen) < 0 || errcode != 0)
744         {
745             client_fatal(cl);
746             return;
747         }
748         else
749         {
750             yaz_log(YLOG_DEBUG, "Connect OK");
751             co->state = Conn_Open;
752             if (cl)
753                 cl->state = Client_Connected;
754         }
755     }
756
757     else if (event & EVENT_INPUT)
758     {
759         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
760
761         if (len < 0)
762         {
763             yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from Z server");
764             connection_destroy(co);
765             return;
766         }
767         else if (len == 0)
768         {
769             yaz_log(YLOG_WARN, "EOF reading from Z server");
770             connection_destroy(co);
771             return;
772         }
773         else if (len > 1) // We discard input if we have no connection
774         {
775             co->state = Conn_Open;
776
777             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
778             {
779                 Z_APDU *a;
780
781                 odr_reset(global_parameters.odr_in);
782                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
783                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
784                 {
785                     client_fatal(cl);
786                     return;
787                 }
788                 switch (a->which)
789                 {
790                     case Z_APDU_initResponse:
791                         do_initResponse(i, a);
792                         break;
793                     case Z_APDU_searchResponse:
794                         do_searchResponse(i, a);
795                         break;
796                     case Z_APDU_presentResponse:
797                         do_presentResponse(i, a);
798                         break;
799                     default:
800                         yaz_log(YLOG_WARN, "Unexpected result from server");
801                         client_fatal(cl);
802                         return;
803                 }
804                 // We aren't expecting staggered output from target
805                 // if (cs_more(t->link))
806                 //    iochan_setevent(i, EVENT_INPUT);
807             }
808             else  // we throw away response and go to idle mode
809             {
810                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
811                 cl->state = Client_Idle;
812             }
813         }
814         /* if len==1 we do nothing but wait for more input */
815     }
816
817     if (cl->state == Client_Connected) {
818         send_init(i);
819     }
820
821     if (cl->state == Client_Idle)
822     {
823         if (cl->requestid != se->requestid && *se->query) {
824             send_search(i);
825         }
826         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
827             cl->records < cl->hits) {
828             send_present(i);
829         }
830     }
831 }
832
833 // Disassociate connection from client
834 static void connection_release(struct connection *co)
835 {
836     struct client *cl = co->client;
837
838     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
839     if (!cl)
840         return;
841     cl->connection = 0;
842     co->client = 0;
843 }
844
845 // Close connection and recycle structure
846 static void connection_destroy(struct connection *co)
847 {
848     struct host *h = co->host;
849     cs_close(co->link);
850     iochan_destroy(co->iochan);
851
852     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
853     if (h->connections == co)
854         h->connections = co->next;
855     else
856     {
857         struct connection *pco;
858         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
859             ;
860         if (pco)
861             pco->next = co->next;
862         else
863             abort();
864     }
865     if (co->client)
866     {
867         if (co->client->state != Client_Idle)
868             co->client->state = Client_Disconnected;
869         co->client->connection = 0;
870     }
871     co->next = connection_freelist;
872     connection_freelist = co;
873 }
874
875 // Creates a new connection for client, associated with the host of 
876 // client's database
877 static struct connection *connection_create(struct client *cl)
878 {
879     struct connection *new;
880     COMSTACK link; 
881     int res;
882     void *addr;
883
884     yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->url);
885     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
886     {
887         yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
888         exit(1);
889     }
890
891     if (!(addr = cs_straddr(link, cl->database->host->ipport)))
892     {
893         yaz_log(YLOG_WARN|YLOG_ERRNO, "Lookup of IP address failed?");
894         return 0;
895     }
896
897     res = cs_connect(link, addr);
898     if (res < 0)
899     {
900         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->url);
901         return 0;
902     }
903
904     if ((new = connection_freelist))
905         connection_freelist = new->next;
906     else
907     {
908         new = xmalloc(sizeof (struct connection));
909         new->ibuf = 0;
910         new->ibufsize = 0;
911     }
912     new->state = Conn_Connecting;
913     new->host = cl->database->host;
914     new->next = new->host->connections;
915     new->host->connections = new;
916     new->client = cl;
917     cl->connection = new;
918     new->link = link;
919
920     new->iochan = iochan_create(cs_fileno(link), handler, 0);
921     iochan_setdata(new->iochan, new);
922     new->iochan->next = channel_list;
923     channel_list = new->iochan;
924     return new;
925 }
926
927 // Close connection and set state to error
928 static void client_fatal(struct client *cl)
929 {
930     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->url);
931     connection_destroy(cl->connection);
932     cl->state = Client_Error;
933 }
934
935 // Ensure that client has a connection associated
936 static int client_prep_connection(struct client *cl)
937 {
938     struct connection *co;
939     struct session *se = cl->session;
940     struct host *host = cl->database->host;
941
942     co = cl->connection;
943
944     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->url);
945
946     if (!co)
947     {
948         // See if someone else has an idle connection
949         // We should look at timestamps here to select the longest-idle connection
950         for (co = host->connections; co; co = co->next)
951             if (co->state == Conn_Open && (!co->client || co->client->session != se))
952                 break;
953         if (co)
954         {
955             connection_release(co);
956             cl->connection = co;
957             co->client = cl;
958         }
959         else
960             co = connection_create(cl);
961     }
962     if (co)
963     {
964         if (co->state == Conn_Connecting)
965             cl->state = Client_Connecting;
966         else if (co->state == Conn_Open)
967         {
968             if (cl->state == Client_Error || cl->state == Client_Disconnected)
969                 cl->state = Client_Idle;
970         }
971         iochan_setflag(co->iochan, EVENT_OUTPUT);
972         return 1;
973     }
974     else
975         return 0;
976 }
977
978 void load_simpletargets(const char *fn)
979 {
980     FILE *f = fopen(fn, "r");
981     char line[256];
982
983     if (!f)
984     {
985         yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
986         exit(1);
987     }
988
989     while (fgets(line, 255, f))
990     {
991         char *url, *db;
992         struct host *host;
993         struct database *database;
994
995         if (strncmp(line, "target ", 7))
996             continue;
997         url = line + 7;
998         url[strlen(url) - 1] = '\0';
999         yaz_log(YLOG_DEBUG, "Target: %s", url);
1000         if ((db = strchr(url, '/')))
1001             *(db++) = '\0';
1002         else
1003             db = "Default";
1004
1005         for (host = hosts; host; host = host->next)
1006             if (!strcmp(url, host->hostport))
1007                 break;
1008         if (!host)
1009         {
1010             struct addrinfo *addrinfo, hints;
1011             char *port;
1012             char ipport[128];
1013             unsigned char addrbuf[4];
1014             int res;
1015
1016             host = xmalloc(sizeof(struct host));
1017             host->hostport = xstrdup(url);
1018             host->connections = 0;
1019
1020             if ((port = strchr(url, ':')))
1021                 *(port++) = '\0';
1022             else
1023                 port = "210";
1024
1025             hints.ai_flags = 0;
1026             hints.ai_family = PF_INET;
1027             hints.ai_socktype = SOCK_STREAM;
1028             hints.ai_protocol = IPPROTO_TCP;
1029             hints.ai_addrlen = 0;
1030             hints.ai_addr = 0;
1031             hints.ai_canonname = 0;
1032             hints.ai_next = 0;
1033             // This is not robust code. It assumes that getaddrinfo returns AF_INET
1034             // address.
1035             if ((res = getaddrinfo(url, port, &hints, &addrinfo)))
1036             {
1037                 yaz_log(YLOG_WARN, "Failed to resolve %s: %s", url, gai_strerror(res));
1038                 xfree(host->hostport);
1039                 xfree(host);
1040                 continue;
1041             }
1042             assert(addrinfo->ai_family == PF_INET);
1043             memcpy(addrbuf, &((struct sockaddr_in*)addrinfo->ai_addr)->sin_addr.s_addr, 4);
1044             sprintf(ipport, "%hhd.%hhd.%hhd.%hhd:%s",
1045                     addrbuf[0], addrbuf[1], addrbuf[2], addrbuf[3], port);
1046             host->ipport = xstrdup(ipport);
1047             freeaddrinfo(addrinfo);
1048             host->next = hosts;
1049             hosts = host;
1050         }
1051         database = xmalloc(sizeof(struct database));
1052         database->host = host;
1053         database->url = xmalloc(strlen(url) + strlen(db) + 2);
1054         strcpy(database->url, url);
1055         strcat(database->url, "/");
1056         strcat(database->url, db);
1057         
1058         database->databases = xmalloc(2 * sizeof(char *));
1059         database->databases[0] = xstrdup(db);
1060         database->databases[1] = 0;
1061         database->errors = 0;
1062         database->next = databases;
1063         databases = database;
1064
1065     }
1066     fclose(f);
1067 }
1068
1069 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
1070 {
1071     switch (n->kind)
1072     {
1073         case CCL_RPN_AND:
1074         case CCL_RPN_OR:
1075         case CCL_RPN_NOT:
1076         case CCL_RPN_PROX:
1077             pull_terms(nmem, n->u.p[0], termlist, num);
1078             pull_terms(nmem, n->u.p[1], termlist, num);
1079             break;
1080         case CCL_RPN_TERM:
1081             termlist[(*num)++] = nmem_strdup(nmem, n->u.t.term);
1082             break;
1083         default: // NOOP
1084             break;
1085     }
1086 }
1087
1088 // Extract terms from query into null-terminated termlist
1089 static int extract_terms(NMEM nmem, char *query, char **termlist)
1090 {
1091     int error, pos;
1092     struct ccl_rpn_node *n;
1093     int num = 0;
1094
1095     n = ccl_find_str(global_parameters.ccl_filter, query, &error, &pos);
1096     if (!n)
1097         return -1;
1098     pull_terms(nmem, n, termlist, &num);
1099     termlist[num] = 0;
1100     ccl_rpn_delete(n);
1101     return 0;
1102 }
1103
1104 static struct client *client_create(void)
1105 {
1106     struct client *r;
1107     if (client_freelist)
1108     {
1109         r = client_freelist;
1110         client_freelist = client_freelist->next;
1111     }
1112     else
1113         r = xmalloc(sizeof(struct client));
1114     r->database = 0;
1115     r->connection = 0;
1116     r->session = 0;
1117     r->hits = 0;
1118     r->records = 0;
1119     r->setno = 0;
1120     r->requestid = -1;
1121     r->diagnostic = 0;
1122     r->state = Client_Disconnected;
1123     r->next = 0;
1124     return r;
1125 }
1126
1127 void client_destroy(struct client *c)
1128 {
1129     struct session *se = c->session;
1130     if (c == se->clients)
1131         se->clients = c->next;
1132     else
1133     {
1134         struct client *cc;
1135         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
1136             ;
1137         if (cc)
1138             cc->next = c->next;
1139     }
1140     if (c->connection)
1141         connection_release(c->connection);
1142     c->next = client_freelist;
1143     client_freelist = c;
1144 }
1145
1146 void session_set_watch(struct session *s, int what, session_watchfun fun, void *data)
1147 {
1148     s->watchlist[what].fun = fun;
1149     s->watchlist[what].data = data;
1150 }
1151
1152 void session_alert_watch(struct session *s, int what)
1153 {
1154     if (!s->watchlist[what].fun)
1155         return;
1156     (*s->watchlist[what].fun)(s->watchlist[what].data);
1157     s->watchlist[what].fun = 0;
1158     s->watchlist[what].data = 0;
1159 }
1160
1161 // This should be extended with parameters to control selection criteria
1162 // Associates a set of clients with a session;
1163 int select_targets(struct session *se)
1164 {
1165     struct database *db;
1166     int c = 0;
1167
1168     while (se->clients)
1169         client_destroy(se->clients);
1170     for (db = databases; db; db = db->next)
1171     {
1172         struct client *cl = client_create();
1173         cl->database = db;
1174         cl->session = se;
1175         cl->next = se->clients;
1176         se->clients = cl;
1177         c++;
1178     }
1179     return c;
1180 }
1181
1182 char *search(struct session *se, char *query)
1183 {
1184     int live_channels = 0;
1185     struct client *cl;
1186
1187     yaz_log(YLOG_DEBUG, "Search");
1188
1189     strcpy(se->query, query);
1190     se->requestid++;
1191     nmem_reset(se->nmem);
1192     for (cl = se->clients; cl; cl = cl->next)
1193     {
1194         cl->hits = -1;
1195         cl->records = 0;
1196         cl->diagnostic = 0;
1197
1198         if (client_prep_connection(cl))
1199             live_channels++;
1200     }
1201     if (live_channels)
1202     {
1203         char *p[512];
1204         int maxrecs = live_channels * global_parameters.toget;
1205         se->termlist = termlist_create(se->nmem, maxrecs, 15);
1206         se->reclist = reclist_create(se->nmem, maxrecs);
1207         extract_terms(se->nmem, query, p);
1208         se->relevance = relevance_create(se->nmem, (const char **) p, maxrecs);
1209         se->total_records = se->total_hits = 0;
1210     }
1211     else
1212         return "NOTARGETS";
1213
1214     return 0;
1215 }
1216
1217 void destroy_session(struct session *s)
1218 {
1219     yaz_log(YLOG_LOG, "Destroying session");
1220     while (s->clients)
1221         client_destroy(s->clients);
1222     nmem_destroy(s->nmem);
1223     wrbuf_free(s->wrbuf, 1);
1224 }
1225
1226 struct session *new_session() 
1227 {
1228     int i;
1229     struct session *session = xmalloc(sizeof(*session));
1230
1231     yaz_log(YLOG_DEBUG, "New pazpar2 session");
1232     
1233     session->total_hits = 0;
1234     session->total_records = 0;
1235     session->termlist = 0;
1236     session->reclist = 0;
1237     session->requestid = -1;
1238     session->clients = 0;
1239     session->query[0] = '\0';
1240     session->nmem = nmem_create();
1241     session->wrbuf = wrbuf_alloc();
1242     for (i = 0; i <= SESSION_WATCH_MAX; i++)
1243     {
1244         session->watchlist[i].data = 0;
1245         session->watchlist[i].fun = 0;
1246     }
1247
1248     select_targets(session);
1249
1250     return session;
1251 }
1252
1253 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1254 {
1255     static struct hitsbytarget res[1000]; // FIXME MM
1256     struct client *cl;
1257
1258     *count = 0;
1259     for (cl = se->clients; cl; cl = cl->next)
1260     {
1261         strcpy(res[*count].id, cl->database->host->hostport);
1262         res[*count].hits = cl->hits;
1263         res[*count].records = cl->records;
1264         res[*count].diagnostic = cl->diagnostic;
1265         res[*count].state = client_states[cl->state];
1266         res[*count].connected  = cl->connection ? 1 : 0;
1267         (*count)++;
1268     }
1269
1270     return res;
1271 }
1272
1273 struct termlist_score **termlist(struct session *s, int *num)
1274 {
1275     return termlist_highscore(s->termlist, num);
1276 }
1277
1278 #ifdef REPORT_NMEM
1279 // conditional compilation by SH: This lead to a warning with currently installed
1280 // YAZ header files on us1
1281 void report_nmem_stats(void)
1282 {
1283     size_t in_use, is_free;
1284
1285     nmem_get_memory_in_use(&in_use);
1286     nmem_get_memory_free(&is_free);
1287
1288     yaz_log(YLOG_LOG, "nmem stat: use=%ld free=%ld", 
1289             (long) in_use, (long) is_free);
1290 }
1291 #endif
1292
1293 struct record **show(struct session *s, int start, int *num, int *total,
1294                      int *sumhits, NMEM nmem_show)
1295 {
1296     struct record **recs = nmem_malloc(nmem_show, *num 
1297                                        * sizeof(struct record *));
1298     int i;
1299
1300     relevance_prepare_read(s->relevance, s->reclist);
1301
1302     *total = s->reclist->num_records;
1303     *sumhits = s->total_hits;
1304
1305     for (i = 0; i < start; i++)
1306         if (!reclist_read_record(s->reclist))
1307         {
1308             *num = 0;
1309             return 0;
1310         }
1311
1312     for (i = 0; i < *num; i++)
1313     {
1314         struct record *r = reclist_read_record(s->reclist);
1315         if (!r)
1316         {
1317             *num = i;
1318             break;
1319         }
1320         recs[i] = r;
1321     }
1322     return recs;
1323 }
1324
1325 void statistics(struct session *se, struct statistics *stat)
1326 {
1327     struct client *cl;
1328     int count = 0;
1329
1330     bzero(stat, sizeof(*stat));
1331     for (cl = se->clients; cl; cl = cl->next)
1332     {
1333         if (!cl->connection)
1334             stat->num_no_connection++;
1335         switch (cl->state)
1336         {
1337             case Client_Connecting: stat->num_connecting++; break;
1338             case Client_Initializing: stat->num_initializing++; break;
1339             case Client_Searching: stat->num_searching++; break;
1340             case Client_Presenting: stat->num_presenting++; break;
1341             case Client_Idle: stat->num_idle++; break;
1342             case Client_Failed: stat->num_failed++; break;
1343             case Client_Error: stat->num_error++; break;
1344             default: break;
1345         }
1346         count++;
1347     }
1348     stat->num_hits = se->total_hits;
1349     stat->num_records = se->total_records;
1350
1351     stat->num_clients = count;
1352 }
1353
1354 static CCL_bibset load_cclfile(const char *fn)
1355 {
1356     CCL_bibset res = ccl_qual_mk();
1357     if (ccl_qual_fname(res, fn) < 0)
1358     {
1359         yaz_log(YLOG_FATAL|YLOG_ERRNO, "%s", fn);
1360         exit(1);
1361     }
1362     return res;
1363 }
1364
1365 int main(int argc, char **argv)
1366 {
1367     int ret;
1368     char *arg;
1369     int setport = 0;
1370
1371     if (signal(SIGPIPE, SIG_IGN) < 0)
1372         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
1373
1374     yaz_log_init(YLOG_DEFAULT_LEVEL, "pazpar2", 0);
1375
1376     while ((ret = options("f:x:c:h:p:C:s:", argv, argc, &arg)) != -2)
1377     {
1378         switch (ret) {
1379             case 'f':
1380                 if (!read_config(arg))
1381                     exit(1);
1382                 break;
1383             case 'c':
1384                 command_init(atoi(arg));
1385                 setport++;
1386                 break;
1387             case 'h':
1388                 http_init(arg);
1389                 setport++;
1390                 break;
1391             case 'C':
1392                 global_parameters.ccl_filter = load_cclfile(arg);
1393                 break;
1394             case 'p':
1395                 http_set_proxyaddr(arg);
1396                 break;
1397             case 's':
1398                 load_simpletargets(arg);
1399                 break;
1400             case 'x':
1401                 global_parameters.xsl = load_stylesheet(arg);
1402                 break;
1403             default:
1404                 fprintf(stderr, "Usage: pazpar2\n"
1405                         "    -f configfile\n"
1406                         "    -h [host:]port          (REST protocol listener)\n"
1407                         "    -c cmdport              (telnet-style)\n"
1408                         "    -C cclconfig\n"
1409                         "    -s simpletargetfile\n"
1410                         "    -p hostname[:portno]    (HTTP proxy)\n");
1411                 exit(1);
1412         }
1413     }
1414
1415     if (!setport)
1416     {
1417         fprintf(stderr, "Set command port with -h or -c\n");
1418         exit(1);
1419     }
1420
1421     if (!global_parameters.xsl)
1422         global_parameters.xsl = load_stylesheet("../etc/default.xsl");
1423     global_parameters.ccl_filter = load_cclfile("../etc/default.bib");
1424     global_parameters.yaz_marc = yaz_marc_create();
1425     yaz_marc_subfield_str(global_parameters.yaz_marc, "\t");
1426     global_parameters.odr_in = odr_createmem(ODR_DECODE);
1427     global_parameters.odr_out = odr_createmem(ODR_ENCODE);
1428
1429     event_loop(&channel_list);
1430
1431     return 0;
1432 }
1433
1434 /*
1435  * Local variables:
1436  * c-basic-offset: 4
1437  * indent-tabs-mode: nil
1438  * End:
1439  * vim: shiftwidth=4 tabstop=8 expandtab
1440  */