Part of the way to blocking functions
[pazpar2-moved-to-github.git] / pazpar2.c
1 /* $Id: pazpar2.c,v 1.13 2006-12-14 14:58:03 quinn Exp $ */;
2
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <sys/time.h>
7 #include <unistd.h>
8 #include <sys/socket.h>
9 #include <netdb.h>
10 #include <signal.h>
11 #include <ctype.h>
12 #include <assert.h>
13
14 #include <yaz/comstack.h>
15 #include <yaz/tcpip.h>
16 #include <yaz/proto.h>
17 #include <yaz/readconf.h>
18 #include <yaz/pquery.h>
19 #include <yaz/yaz-util.h>
20
21 #include "pazpar2.h"
22 #include "eventl.h"
23 #include "command.h"
24 #include "http.h"
25 #include "termlists.h"
26 #include "reclists.h"
27 #include "relevance.h"
28
29 #define PAZPAR2_VERSION "0.1"
30 #define MAX_CHUNK 15
31
32 static void client_fatal(struct client *cl);
33 static void connection_destroy(struct connection *co);
34 static int client_prep_connection(struct client *cl);
35 static void ingest_records(struct client *cl, Z_Records *r);
36
37 IOCHAN channel_list = 0;  // Master list of connections we're listening to.
38
39 static struct connection *connection_freelist = 0;
40 static struct client *client_freelist = 0;
41
42 static struct host *hosts = 0;  // The hosts we know about 
43 static struct database *databases = 0; // The databases we know about
44
45 static char *client_states[] = {
46     "Client_Connecting",
47     "Client_Connected",
48     "Client_Idle",
49     "Client_Initializing",
50     "Client_Searching",
51     "Client_Presenting",
52     "Client_Error",
53     "Client_Failed",
54     "Client_Disconnected",
55     "Client_Stopped"
56 };
57
58 struct parameters global_parameters = 
59 {
60     30,
61     "81",
62     "Index Data PazPar2 (MasterKey)",
63     PAZPAR2_VERSION,
64     600, // 10 minutes
65     60,
66     100,
67     MAX_CHUNK,
68     0,
69     0,
70     0,
71     0
72 };
73
74
75 static int send_apdu(struct client *c, Z_APDU *a)
76 {
77     struct connection *co = c->connection;
78     char *buf;
79     int len, r;
80
81     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
82     {
83         odr_perror(global_parameters.odr_out, "Encoding APDU");
84         abort();
85     }
86     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
87     r = cs_put(co->link, buf, len);
88     if (r < 0)
89     {
90         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
91         return -1;
92     }
93     else if (r == 1)
94     {
95         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
96         exit(1);
97     }
98     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
99     co->state = Conn_Waiting;
100     return 0;
101 }
102
103
104 static void send_init(IOCHAN i)
105 {
106     struct connection *co = iochan_getdata(i);
107     struct client *cl = co->client;
108     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
109
110     a->u.initRequest->implementationId = global_parameters.implementationId;
111     a->u.initRequest->implementationName = global_parameters.implementationName;
112     a->u.initRequest->implementationVersion =
113         global_parameters.implementationVersion;
114     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
115     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
116     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
117
118     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
119     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
120     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
121     if (send_apdu(cl, a) >= 0)
122     {
123         iochan_setflags(i, EVENT_INPUT);
124         cl->state = Client_Initializing;
125     }
126     else
127         cl->state = Client_Error;
128     odr_reset(global_parameters.odr_out);
129 }
130
131 static void send_search(IOCHAN i)
132 {
133     struct connection *co = iochan_getdata(i);
134     struct client *cl = co->client; 
135     struct session *se = cl->session;
136     struct database *db = cl->database;
137     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
138     int ndb, cerror, cpos;
139     char **databaselist;
140     Z_Query *zquery;
141     struct ccl_rpn_node *cn;
142     int ssub = 0, lslb = 100000, mspn = 10;
143
144     yaz_log(YLOG_DEBUG, "Sending search");
145
146     cn = ccl_find_str(global_parameters.ccl_filter, se->query, &cerror, &cpos);
147     if (!cn)
148         return;
149     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
150             sizeof(Z_Query));
151     zquery->which = Z_Query_type_1;
152     zquery->u.type_1 = ccl_rpn_query(global_parameters.odr_out, cn);
153     ccl_rpn_delete(cn);
154
155     for (ndb = 0; *db->databases[ndb]; ndb++)
156         ;
157     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
158     for (ndb = 0; *db->databases[ndb]; ndb++)
159         databaselist[ndb] = db->databases[ndb];
160
161     a->u.presentRequest->preferredRecordSyntax =
162             yaz_oidval_to_z3950oid(global_parameters.odr_out,
163             CLASS_RECSYN, VAL_USMARC);
164     a->u.searchRequest->smallSetUpperBound = &ssub;
165     a->u.searchRequest->largeSetLowerBound = &lslb;
166     a->u.searchRequest->mediumSetPresentNumber = &mspn;
167     a->u.searchRequest->resultSetName = "Default";
168     a->u.searchRequest->databaseNames = databaselist;
169     a->u.searchRequest->num_databaseNames = ndb;
170
171     if (send_apdu(cl, a) >= 0)
172     {
173         iochan_setflags(i, EVENT_INPUT);
174         cl->state = Client_Searching;
175         cl->requestid = se->requestid;
176     }
177     else
178         cl->state = Client_Error;
179
180     odr_reset(global_parameters.odr_out);
181 }
182
183 static void send_present(IOCHAN i)
184 {
185     struct connection *co = iochan_getdata(i);
186     struct client *cl = co->client; 
187     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
188     int toget;
189     int start = cl->records + 1;
190
191     toget = global_parameters.chunk;
192     if (toget > cl->hits - cl->records)
193         toget = cl->hits - cl->records;
194
195     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
196
197     a->u.presentRequest->resultSetStartPoint = &start;
198     a->u.presentRequest->numberOfRecordsRequested = &toget;
199
200     a->u.presentRequest->resultSetId = "Default";
201
202     a->u.presentRequest->preferredRecordSyntax =
203             yaz_oidval_to_z3950oid(global_parameters.odr_out,
204             CLASS_RECSYN, VAL_USMARC);
205
206     if (send_apdu(cl, a) >= 0)
207     {
208         iochan_setflags(i, EVENT_INPUT);
209         cl->state = Client_Presenting;
210     }
211     else
212         cl->state = Client_Error;
213     odr_reset(global_parameters.odr_out);
214 }
215
216 static void do_initResponse(IOCHAN i, Z_APDU *a)
217 {
218     struct connection *co = iochan_getdata(i);
219     struct client *cl = co->client;
220     Z_InitResponse *r = a->u.initResponse;
221
222     yaz_log(YLOG_DEBUG, "Received init response");
223
224     if (*r->result)
225     {
226         cl->state = Client_Idle;
227     }
228     else
229         cl->state = Client_Failed; // FIXME need to do something to the connection
230 }
231
232 static void do_searchResponse(IOCHAN i, Z_APDU *a)
233 {
234     struct connection *co = iochan_getdata(i);
235     struct client *cl = co->client;
236     struct session *se = cl->session;
237     Z_SearchResponse *r = a->u.searchResponse;
238
239     yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
240
241     if (*r->searchStatus)
242     {
243         cl->hits = *r->resultCount;
244         se->total_hits += cl->hits;
245         if (r->presentStatus && !*r->presentStatus && r->records)
246         {
247             yaz_log(YLOG_DEBUG, "Records in search response");
248             cl->records += *r->numberOfRecordsReturned;
249             ingest_records(cl, r->records);
250         }
251         cl->state = Client_Idle;
252     }
253     else
254     {          /*"FAILED"*/
255         cl->hits = 0;
256         cl->state = Client_Error;
257         if (r->records) {
258             Z_Records *recs = r->records;
259             if (recs->which == Z_Records_NSD)
260             {
261                 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
262                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
263                 cl->state = Client_Error;
264             }
265         }
266     }
267 }
268
269 const char *find_field(const char *rec, const char *field)
270 {
271     char lbuf[5];
272     char *line;
273
274     lbuf[0] = '\n';
275     strcpy(lbuf + 1, field);
276
277     if ((line = strstr(rec, lbuf)))
278         return ++line;
279     else
280         return 0;
281 }
282
283 const char *find_subfield(const char *field, char subfield)
284 {
285     const char *p = field;
286
287     while (*p && *p != '\n')
288     {
289         while (*p != '\n' && *p != '\t')
290             p++;
291         if (*p == '\t' && *(++p) == subfield) {
292             if (*(++p) == ' ')
293             {
294                 while (isspace(*p))
295                     p++;
296                 return p;
297             }
298         }
299     }
300     return 0;
301 }
302
303 // Extract 245 $a $b 100 $a
304 char *extract_title(struct session *s, const char *rec)
305 {
306     const char *field, *subfield;
307     char *e, *ef;
308     unsigned char *obuf, *p;
309
310     wrbuf_rewind(s->wrbuf);
311
312     if (!(field = find_field(rec, "245")))
313         return 0;
314     if (!(subfield = find_subfield(field, 'a')))
315         return 0;
316     ef = index(subfield, '\n');
317     if ((e = index(subfield, '\t')) && e < ef)
318         ef = e;
319     if (ef)
320     {
321         wrbuf_write(s->wrbuf, subfield, ef - subfield);
322         if ((subfield = find_subfield(field, 'b'))) 
323         {
324             ef = index(subfield, '\n');
325             if ((e = index(subfield, '\t')) && e < ef)
326                 ef = e;
327             if (ef)
328             {
329                 wrbuf_putc(s->wrbuf, ' ');
330                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
331             }
332         }
333     }
334     if ((field = find_field(rec, "100")))
335     {
336         if ((subfield = find_subfield(field, 'a')))
337         {
338             ef = index(subfield, '\n');
339             if ((e = index(subfield, '\t')) && e < ef)
340                 ef = e;
341             if (ef)
342             {
343                 wrbuf_puts(s->wrbuf, ", by ");
344                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
345             }
346         }
347     }
348     wrbuf_putc(s->wrbuf, '\0');
349     obuf = (unsigned char*) nmem_strdup(s->nmem, wrbuf_buf(s->wrbuf));
350     for (p = obuf; *p; p++)
351         if (*p == '&' || *p == '<' || *p > 122 || *p < ' ')
352             *p = ' ';
353     return (char*) obuf;
354 }
355
356 // Extract 245 $a $b 100 $a
357 char *extract_mergekey(struct session *s, const char *rec)
358 {
359     const char *field, *subfield;
360     char *e, *ef;
361     char *out, *p, *pout;
362
363     wrbuf_rewind(s->wrbuf);
364
365     if (!(field = find_field(rec, "245")))
366         return 0;
367     if (!(subfield = find_subfield(field, 'a')))
368         return 0;
369     ef = index(subfield, '\n');
370     if ((e = index(subfield, '\t')) && e < ef)
371         ef = e;
372     if (ef)
373     {
374         wrbuf_write(s->wrbuf, subfield, ef - subfield);
375         if ((subfield = find_subfield(field, 'b'))) 
376         {
377             ef = index(subfield, '\n');
378             if ((e = index(subfield, '\t')) && e < ef)
379                 ef = e;
380             if (ef)
381             {
382                 wrbuf_puts(s->wrbuf, " field "); 
383                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
384             }
385         }
386     }
387     if ((field = find_field(rec, "100")))
388     {
389         if ((subfield = find_subfield(field, 'a')))
390         {
391             ef = index(subfield, '\n');
392             if ((e = index(subfield, '\t')) && e < ef)
393                 ef = e;
394             if (ef)
395             {
396                 wrbuf_puts(s->wrbuf, " field "); 
397                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
398             }
399         }
400     }
401     wrbuf_putc(s->wrbuf, '\0');
402     p = wrbuf_buf(s->wrbuf);
403     out = pout = nmem_malloc(s->nmem, strlen(p) + 1);
404
405     while (*p)
406     {
407         while (isalnum(*p))
408             *(pout++) = tolower(*(p++));
409         while (*p && !isalnum(*p))
410             p++;
411         *(pout++) = ' ';
412     }
413     if (out != pout)
414         *(--pout) = '\0';
415
416     return out;
417 }
418
419 #ifdef RECHEAP
420 static void push_record(struct session *s, struct record *r)
421 {
422     int p;
423     assert(s->recheap_max + 1 < s->recheap_size);
424
425     s->recheap[p = ++s->recheap_max] = r;
426     while (p > 0)
427     {
428         int parent = (p - 1) >> 1;
429         if (strcmp(s->recheap[p]->merge_key, s->recheap[parent]->merge_key) < 0)
430         {
431             struct record *tmp;
432             tmp = s->recheap[parent];
433             s->recheap[parent] = s->recheap[p];
434             s->recheap[p] = tmp;
435             p = parent;
436         }
437         else
438             break;
439     }
440 }
441
442 static struct record *top_record(struct session *s)
443 {
444     return s-> recheap_max >= 0 ?  s->recheap[0] : 0;
445 }
446
447 static struct record *pop_record(struct session *s)
448 {
449     struct record *res;
450     int p = 0;
451     int lastnonleaf = (s->recheap_max - 1) >> 1;
452
453     if (s->recheap_max < 0)
454         return 0;
455
456     res = s->recheap[0];
457
458     s->recheap[p] = s->recheap[s->recheap_max--];
459
460     while (p <= lastnonleaf)
461     {
462         int right = (p + 1) << 1;
463         int left = right - 1;
464         int min = left;
465
466         if (right < s->recheap_max &&
467                 strcmp(s->recheap[right]->merge_key, s->recheap[left]->merge_key) < 0)
468             min = right;
469         if (strcmp(s->recheap[min]->merge_key, s->recheap[p]->merge_key) < 0)
470         {
471             struct record *tmp = s->recheap[min];
472             s->recheap[min] = s->recheap[p];
473             s->recheap[p] = tmp;
474             p = min;
475         }
476         else
477             break;
478     }
479     return res;
480 }
481
482 // Like pop_record but collapses identical (merge_key) records
483 // The heap will contain multiple independent matching records and possibly
484 // one cluster, created the last time the list was scanned
485 static struct record *pop_mrecord(struct session *s)
486 {
487     struct record *this;
488     struct record *next;
489
490     if (!(this = pop_record(s)))
491         return 0;
492
493     // Collapse identical records
494     while ((next = top_record(s)))
495     {
496         struct record *p, *tmpnext;
497         if (strcmp(this->merge_key, next->merge_key))
498             break;
499         // Absorb record (and clustersiblings) into a supercluster
500         for (p = next; p; p = tmpnext) {
501             tmpnext = p->next_cluster;
502             p->next_cluster = this->next_cluster;
503             this->next_cluster = p;
504         }
505
506         pop_record(s);
507     }
508     return this;
509 }
510
511 // Reads records in sort order. Store records in top of heapspace until rewind is called.
512 static struct record *read_recheap(struct session *s)
513 {
514     struct record *r = pop_mrecord(s);
515
516     if (r)
517     {
518         if (s->recheap_scratch < 0)
519             s->recheap_scratch = s->recheap_size;
520         s->recheap[--s->recheap_scratch] = r;
521     }
522
523     return r;
524 }
525
526 // Return records to heap after read
527 static void rewind_recheap(struct session *s)
528 {
529     while (s->recheap_scratch >= 0) {
530         push_record(s, s->recheap[s->recheap_scratch++]);
531         if (s->recheap_scratch >= s->recheap_size)
532             s->recheap_scratch = -1;
533     }
534 }
535
536 #endif
537
538 // FIXME needs to be generalized. Should flexibly generate X lists per search
539 static void extract_subject(struct session *s, const char *rec)
540 {
541     const char *field, *subfield;
542
543     while ((field = find_field(rec, "650")))
544     {
545         rec = field; 
546         if ((subfield = find_subfield(field, 'a')))
547         {
548             char *e, *ef;
549             char buf[1024];
550             int len;
551
552             ef = index(subfield, '\n');
553             if (!ef)
554                 return;
555             if ((e = index(subfield, '\t')) && e < ef)
556                 ef = e;
557             while (ef > subfield && !isalpha(*(ef - 1)) && *(ef - 1) != ')')
558                 ef--;
559             len = ef - subfield;
560             assert(len < 1023);
561             memcpy(buf, subfield, len);
562             buf[len] = '\0';
563             if (*buf)
564                 termlist_insert(s->termlist, buf);
565         }
566     }
567 }
568
569 static void pull_relevance_field(struct session *s, struct record *head, const char *rec,
570         char *field, int mult)
571 {
572     const char *fb;
573     while ((fb = find_field(rec, field)))
574     {
575         char *ffield = strchr(fb, '\t');
576         if (!ffield)
577             return;
578         char *eol = strchr(ffield, '\n');
579         if (!eol)
580             return;
581         relevance_countwords(s->relevance, head, ffield, eol - ffield, mult);
582         rec = field + 1; // Crude way to cause a loop through repeating fields
583     }
584 }
585
586 static void pull_relevance_keys(struct session *s, struct record *head,  struct record *rec)
587 {
588     relevance_newrec(s->relevance, head);
589     pull_relevance_field(s, head, rec->buf, "100", 2);
590     pull_relevance_field(s, head, rec->buf, "245", 4);
591     //pull_relevance_field(s, head, rec->buf, "530", 1);
592     pull_relevance_field(s, head, rec->buf, "630", 1);
593     pull_relevance_field(s, head, rec->buf, "650", 1);
594     pull_relevance_field(s, head, rec->buf, "700", 1);
595     relevance_donerecord(s->relevance, head);
596 }
597
598 static struct record *ingest_record(struct client *cl, char *buf, int len)
599 {
600     struct session *se = cl->session;
601     struct record *res;
602     struct record *head;
603     const char *recbuf;
604
605     wrbuf_rewind(se->wrbuf);
606     yaz_marc_xml(global_parameters.yaz_marc, YAZ_MARC_LINE);
607     if (yaz_marc_decode_wrbuf(global_parameters.yaz_marc, buf, len, se->wrbuf) < 0)
608     {
609         yaz_log(YLOG_WARN, "Failed to decode MARC record");
610         return 0;
611     }
612     wrbuf_putc(se->wrbuf, '\0');
613     recbuf = wrbuf_buf(se->wrbuf);
614
615     res = nmem_malloc(se->nmem, sizeof(struct record));
616     res->buf = nmem_strdup(se->nmem, recbuf);
617
618     extract_subject(se, res->buf);
619
620     res->title = extract_title(se, res->buf);
621     res->merge_key = extract_mergekey(se, res->buf);
622     if (!res->merge_key)
623         return 0;
624     res->client = cl;
625     res->next_cluster = 0;
626     res->target_offset = -1;
627     res->term_frequency_vec = 0;
628
629     head = reclist_insert(se->reclist, res);
630
631     pull_relevance_keys(se, head, res);
632
633     se->total_records++;
634
635     return res;
636 }
637
638 static void ingest_records(struct client *cl, Z_Records *r)
639 {
640     struct record *rec;
641     Z_NamePlusRecordList *rlist;
642     int i;
643
644     if (r->which != Z_Records_DBOSD)
645         return;
646     rlist = r->u.databaseOrSurDiagnostics;
647     for (i = 0; i < rlist->num_records; i++)
648     {
649         Z_NamePlusRecord *npr = rlist->records[i];
650         Z_External *e;
651         char *buf;
652         int len;
653
654         if (npr->which != Z_NamePlusRecord_databaseRecord)
655         {
656             yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
657             continue;
658         }
659         e = npr->u.databaseRecord;
660         if (e->which != Z_External_octet)
661         {
662             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
663             continue;
664         }
665         buf = (char*) e->u.octet_aligned->buf;
666         len = e->u.octet_aligned->len;
667
668         rec = ingest_record(cl, buf, len);
669         if (!rec)
670             continue;
671     }
672 }
673
674 static void do_presentResponse(IOCHAN i, Z_APDU *a)
675 {
676     struct connection *co = iochan_getdata(i);
677     struct client *cl = co->client;
678     Z_PresentResponse *r = a->u.presentResponse;
679
680     if (r->records) {
681         Z_Records *recs = r->records;
682         if (recs->which == Z_Records_NSD)
683         {
684             yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
685             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
686             cl->state = Client_Error;
687         }
688     }
689
690     if (!*r->presentStatus && cl->state != Client_Error)
691     {
692         yaz_log(YLOG_DEBUG, "Good Present response");
693         cl->records += *r->numberOfRecordsReturned;
694         ingest_records(cl, r->records);
695         cl->state = Client_Idle;
696     }
697     else if (*r->presentStatus) 
698     {
699         yaz_log(YLOG_WARN, "Bad Present response");
700         cl->state = Client_Error;
701     }
702 }
703
704 static void handler(IOCHAN i, int event)
705 {
706     struct connection *co = iochan_getdata(i);
707     struct client *cl = co->client;
708     struct session *se = 0;
709
710     if (cl)
711         se = cl->session;
712
713     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
714     {
715         int errcode;
716         socklen_t errlen = sizeof(errcode);
717
718         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
719             &errlen) < 0 || errcode != 0)
720         {
721             client_fatal(cl);
722             return;
723         }
724         else
725         {
726             yaz_log(YLOG_DEBUG, "Connect OK");
727             co->state = Conn_Open;
728             if (cl)
729                 cl->state = Client_Connected;
730         }
731     }
732
733     else if (event & EVENT_INPUT)
734     {
735         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
736
737         if (len < 0)
738         {
739             yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from Z server");
740             connection_destroy(co);
741             return;
742         }
743         else if (len == 0)
744         {
745             yaz_log(YLOG_WARN, "EOF reading from Z server");
746             connection_destroy(co);
747             return;
748         }
749         else if (len > 1) // We discard input if we have no connection
750         {
751             co->state = Conn_Open;
752
753             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
754             {
755                 Z_APDU *a;
756
757                 odr_reset(global_parameters.odr_in);
758                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
759                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
760                 {
761                     client_fatal(cl);
762                     return;
763                 }
764                 switch (a->which)
765                 {
766                     case Z_APDU_initResponse:
767                         do_initResponse(i, a);
768                         break;
769                     case Z_APDU_searchResponse:
770                         do_searchResponse(i, a);
771                         break;
772                     case Z_APDU_presentResponse:
773                         do_presentResponse(i, a);
774                         break;
775                     default:
776                         yaz_log(YLOG_WARN, "Unexpected result from server");
777                         client_fatal(cl);
778                         return;
779                 }
780                 // We aren't expecting staggered output from target
781                 // if (cs_more(t->link))
782                 //    iochan_setevent(i, EVENT_INPUT);
783             }
784             else  // we throw away response and go to idle mode
785             {
786                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
787                 cl->state = Client_Idle;
788             }
789         }
790         /* if len==1 we do nothing but wait for more input */
791     }
792
793     if (cl->state == Client_Connected) {
794         send_init(i);
795     }
796
797     if (cl->state == Client_Idle)
798     {
799         if (cl->requestid != se->requestid && *se->query) {
800             send_search(i);
801         }
802         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
803             cl->records < cl->hits) {
804             send_present(i);
805         }
806     }
807 }
808
809 // Disassociate connection from client
810 static void connection_release(struct connection *co)
811 {
812     struct client *cl = co->client;
813
814     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
815     if (!cl)
816         return;
817     cl->connection = 0;
818     co->client = 0;
819 }
820
821 // Close connection and recycle structure
822 static void connection_destroy(struct connection *co)
823 {
824     struct host *h = co->host;
825     cs_close(co->link);
826     iochan_destroy(co->iochan);
827
828     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
829     if (h->connections == co)
830         h->connections = co->next;
831     else
832     {
833         struct connection *pco;
834         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
835             ;
836         if (pco)
837             pco->next = co->next;
838         else
839             abort();
840     }
841     if (co->client)
842     {
843         if (co->client->state != Client_Idle)
844             co->client->state = Client_Disconnected;
845         co->client->connection = 0;
846     }
847     co->next = connection_freelist;
848     connection_freelist = co;
849 }
850
851 // Creates a new connection for client, associated with the host of 
852 // client's database
853 static struct connection *connection_create(struct client *cl)
854 {
855     struct connection *new;
856     COMSTACK link; 
857     int res;
858     void *addr;
859
860     yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->url);
861     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
862     {
863         yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
864         exit(1);
865     }
866
867     if (!(addr = cs_straddr(link, cl->database->host->ipport)))
868     {
869         yaz_log(YLOG_WARN|YLOG_ERRNO, "Lookup of IP address failed?");
870         return 0;
871     }
872
873     res = cs_connect(link, addr);
874     if (res < 0)
875     {
876         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->url);
877         return 0;
878     }
879
880     if ((new = connection_freelist))
881         connection_freelist = new->next;
882     else
883     {
884         new = xmalloc(sizeof (struct connection));
885         new->ibuf = 0;
886         new->ibufsize = 0;
887     }
888     new->state = Conn_Connecting;
889     new->host = cl->database->host;
890     new->next = new->host->connections;
891     new->host->connections = new;
892     new->client = cl;
893     cl->connection = new;
894     new->link = link;
895
896     new->iochan = iochan_create(cs_fileno(link), handler, 0);
897     iochan_setdata(new->iochan, new);
898     new->iochan->next = channel_list;
899     channel_list = new->iochan;
900     return new;
901 }
902
903 // Close connection and set state to error
904 static void client_fatal(struct client *cl)
905 {
906     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->url);
907     connection_destroy(cl->connection);
908     cl->state = Client_Error;
909 }
910
911 // Ensure that client has a connection associated
912 static int client_prep_connection(struct client *cl)
913 {
914     struct connection *co;
915     struct session *se = cl->session;
916     struct host *host = cl->database->host;
917
918     co = cl->connection;
919
920     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->url);
921
922     if (!co)
923     {
924         // See if someone else has an idle connection
925         // We should look at timestamps here to select the longest-idle connection
926         for (co = host->connections; co; co = co->next)
927             if (co->state == Conn_Open && (!co->client || co->client->session != se))
928                 break;
929         if (co)
930         {
931             connection_release(co);
932             cl->connection = co;
933             co->client = cl;
934         }
935         else
936             co = connection_create(cl);
937     }
938     if (co)
939     {
940         if (co->state == Conn_Connecting)
941             cl->state = Client_Connecting;
942         else if (co->state == Conn_Open)
943         {
944             if (cl->state == Client_Error || cl->state == Client_Disconnected)
945                 cl->state = Client_Idle;
946         }
947         iochan_setflag(co->iochan, EVENT_OUTPUT);
948         return 1;
949     }
950     else
951         return 0;
952 }
953
954 void load_simpletargets(const char *fn)
955 {
956     FILE *f = fopen(fn, "r");
957     char line[256];
958
959     if (!f)
960     {
961         yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
962         exit(1);
963     }
964
965     while (fgets(line, 255, f))
966     {
967         char *url, *db;
968         struct host *host;
969         struct database *database;
970
971         if (strncmp(line, "target ", 7))
972             continue;
973         url = line + 7;
974         url[strlen(url) - 1] = '\0';
975         yaz_log(LOG_DEBUG, "Target: %s", url);
976         if ((db = strchr(url, '/')))
977             *(db++) = '\0';
978         else
979             db = "Default";
980
981         for (host = hosts; host; host = host->next)
982             if (!strcmp(url, host->hostport))
983                 break;
984         if (!host)
985         {
986             struct addrinfo *addrinfo, hints;
987             char *port;
988             char ipport[128];
989             unsigned char addrbuf[4];
990             int res;
991
992             host = xmalloc(sizeof(struct host));
993             host->hostport = xstrdup(url);
994             host->connections = 0;
995
996             if ((port = strchr(url, ':')))
997                 *(port++) = '\0';
998             else
999                 port = "210";
1000
1001             hints.ai_flags = 0;
1002             hints.ai_family = PF_INET;
1003             hints.ai_socktype = SOCK_STREAM;
1004             hints.ai_protocol = IPPROTO_TCP;
1005             hints.ai_addrlen = 0;
1006             hints.ai_addr = 0;
1007             hints.ai_canonname = 0;
1008             hints.ai_next = 0;
1009             // This is not robust code. It assumes that getaddrinfo returns AF_INET
1010             // address.
1011             if ((res = getaddrinfo(url, port, &hints, &addrinfo)))
1012             {
1013                 yaz_log(YLOG_WARN, "Failed to resolve %s: %s", url, gai_strerror(res));
1014                 continue;
1015             }
1016             assert(addrinfo->ai_family == PF_INET);
1017             memcpy(addrbuf, &((struct sockaddr_in*)addrinfo->ai_addr)->sin_addr.s_addr, 4);
1018             sprintf(ipport, "%hhd.%hhd.%hhd.%hhd:%s",
1019                     addrbuf[0], addrbuf[1], addrbuf[2], addrbuf[3], port);
1020             host->ipport = xstrdup(ipport);
1021             freeaddrinfo(addrinfo);
1022             host->next = hosts;
1023             hosts = host;
1024         }
1025         database = xmalloc(sizeof(struct database));
1026         database->host = host;
1027         database->url = xmalloc(strlen(url) + strlen(db) + 2);
1028         strcpy(database->url, url);
1029         strcat(database->url, "/");
1030         strcat(database->url, db);
1031         strcpy(database->databases[0], db);
1032         *database->databases[1] = '\0';
1033         database->errors = 0;
1034         database->next = databases;
1035         databases = database;
1036
1037     }
1038     fclose(f);
1039 }
1040
1041 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
1042 {
1043     switch (n->kind)
1044     {
1045         case CCL_RPN_AND:
1046         case CCL_RPN_OR:
1047         case CCL_RPN_NOT:
1048         case CCL_RPN_PROX:
1049             pull_terms(nmem, n->u.p[0], termlist, num);
1050             pull_terms(nmem, n->u.p[1], termlist, num);
1051             break;
1052         case CCL_RPN_TERM:
1053             termlist[(*num)++] = nmem_strdup(nmem, n->u.t.term);
1054             break;
1055         default: // NOOP
1056             break;
1057     }
1058 }
1059
1060 // Extract terms from query into null-terminated termlist
1061 static int extract_terms(NMEM nmem, char *query, char **termlist)
1062 {
1063     int error, pos;
1064     struct ccl_rpn_node *n;
1065     int num = 0;
1066
1067     n = ccl_find_str(global_parameters.ccl_filter, query, &error, &pos);
1068     if (!n)
1069         return -1;
1070     pull_terms(nmem, n, termlist, &num);
1071     termlist[num] = 0;
1072     ccl_rpn_delete(n);
1073     return 0;
1074 }
1075
1076 static struct client *client_create(void)
1077 {
1078     struct client *r;
1079     if (client_freelist)
1080     {
1081         r = client_freelist;
1082         client_freelist = client_freelist->next;
1083     }
1084     else
1085         r = xmalloc(sizeof(struct client));
1086     r->database = 0;
1087     r->connection = 0;
1088     r->session = 0;
1089     r->hits = 0;
1090     r->records = 0;
1091     r->setno = 0;
1092     r->requestid = -1;
1093     r->diagnostic = 0;
1094     r->state = Client_Disconnected;
1095     r->next = 0;
1096     return r;
1097 }
1098
1099 void client_destroy(struct client *c)
1100 {
1101     struct session *se = c->session;
1102     if (c == se->clients)
1103         se->clients = c->next;
1104     else
1105     {
1106         struct client *cc;
1107         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
1108             ;
1109         if (cc)
1110             cc->next = c->next;
1111     }
1112     if (c->connection)
1113         connection_release(c->connection);
1114     c->next = client_freelist;
1115     client_freelist = c;
1116 }
1117
1118 void session_set_watch(struct session *s, int what, session_watchfun fun, void *data)
1119 {
1120     s->watchlist[what].fun = fun;
1121     s->watchlist[what].data = data;
1122 }
1123
1124 // This should be extended with parameters to control selection criteria
1125 // Associates a set of clients with a session;
1126 int select_targets(struct session *se)
1127 {
1128     struct database *db;
1129     int c = 0;
1130
1131     while (se->clients)
1132         client_destroy(se->clients);
1133     for (db = databases; db; db = db->next)
1134     {
1135         struct client *cl = client_create();
1136         cl->database = db;
1137         cl->session = se;
1138         cl->next = se->clients;
1139         se->clients = cl;
1140         c++;
1141     }
1142     return c;
1143 }
1144
1145 char *search(struct session *se, char *query)
1146 {
1147     int live_channels = 0;
1148     struct client *cl;
1149
1150     yaz_log(YLOG_DEBUG, "Search");
1151
1152     strcpy(se->query, query);
1153     se->requestid++;
1154     nmem_reset(se->nmem);
1155     for (cl = se->clients; cl; cl = cl->next)
1156     {
1157         cl->hits = -1;
1158         cl->records = 0;
1159         cl->diagnostic = 0;
1160
1161         if (client_prep_connection(cl))
1162             live_channels++;
1163     }
1164     if (live_channels)
1165     {
1166         char *p[512];
1167         int maxrecs = live_channels * global_parameters.toget;
1168         se->termlist = termlist_create(se->nmem, maxrecs, 15);
1169         se->reclist = reclist_create(se->nmem, maxrecs);
1170         extract_terms(se->nmem, query, p);
1171         se->relevance = relevance_create(se->nmem, (const char **) p, maxrecs);
1172         se->total_records = se->total_hits = 0;
1173     }
1174     else
1175         return "NOTARGETS";
1176
1177     return 0;
1178 }
1179
1180 void destroy_session(struct session *s)
1181 {
1182     yaz_log(YLOG_LOG, "Destroying session");
1183     while (s->clients)
1184         client_destroy(s->clients);
1185     nmem_destroy(s->nmem);
1186     wrbuf_free(s->wrbuf, 1);
1187 }
1188
1189 struct session *new_session() 
1190 {
1191     int i;
1192     struct session *session = xmalloc(sizeof(*session));
1193
1194     yaz_log(YLOG_DEBUG, "New pazpar2 session");
1195     
1196     session->total_hits = 0;
1197     session->total_records = 0;
1198     session->termlist = 0;
1199     session->reclist = 0;
1200     session->requestid = -1;
1201     session->clients = 0;
1202     session->query[0] = '\0';
1203     session->nmem = nmem_create();
1204     session->wrbuf = wrbuf_alloc();
1205     for (i = 0; i <= SESSION_WATCH_MAX; i++)
1206     {
1207         session->watchlist[i].data = 0;
1208         session->watchlist[i].fun = 0;
1209     }
1210
1211     select_targets(session);
1212
1213     return session;
1214 }
1215
1216 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1217 {
1218     static struct hitsbytarget res[1000]; // FIXME MM
1219     struct client *cl;
1220
1221     *count = 0;
1222     for (cl = se->clients; cl; cl = cl->next)
1223     {
1224         strcpy(res[*count].id, cl->database->host->hostport);
1225         res[*count].hits = cl->hits;
1226         res[*count].records = cl->records;
1227         res[*count].diagnostic = cl->diagnostic;
1228         res[*count].state = client_states[cl->state];
1229         res[*count].connected  = cl->connection ? 1 : 0;
1230         (*count)++;
1231     }
1232
1233     return res;
1234 }
1235
1236 struct termlist_score **termlist(struct session *s, int *num)
1237 {
1238     return termlist_highscore(s->termlist, num);
1239 }
1240
1241 struct record **show(struct session *s, int start, int *num, int *total, int *sumhits)
1242 {
1243     struct record **recs = nmem_malloc(s->nmem, *num * sizeof(struct record *));
1244     int i;
1245
1246     relevance_prepare_read(s->relevance, s->reclist);
1247
1248     *total = s->reclist->num_records;
1249     *sumhits = s->total_hits;
1250
1251     for (i = 0; i < start; i++)
1252         if (!reclist_read_record(s->reclist))
1253         {
1254             *num = 0;
1255             return 0;
1256         }
1257
1258     for (i = 0; i < *num; i++)
1259     {
1260         struct record *r = reclist_read_record(s->reclist);
1261         if (!r)
1262         {
1263             *num = i;
1264             break;
1265         }
1266         recs[i] = r;
1267     }
1268     return recs;
1269 }
1270
1271 void statistics(struct session *se, struct statistics *stat)
1272 {
1273     struct client *cl;
1274     int count = 0;
1275
1276     bzero(stat, sizeof(*stat));
1277     for (cl = se->clients; cl; cl = cl->next)
1278     {
1279         if (!cl->connection)
1280             stat->num_no_connection++;
1281         switch (cl->state)
1282         {
1283             case Client_Connecting: stat->num_connecting++; break;
1284             case Client_Initializing: stat->num_initializing++; break;
1285             case Client_Searching: stat->num_searching++; break;
1286             case Client_Presenting: stat->num_presenting++; break;
1287             case Client_Idle: stat->num_idle++; break;
1288             case Client_Failed: stat->num_failed++; break;
1289             case Client_Error: stat->num_error++; break;
1290             default: break;
1291         }
1292         count++;
1293     }
1294     stat->num_hits = se->total_hits;
1295     stat->num_records = se->total_records;
1296
1297     stat->num_clients = count;
1298 }
1299
1300 static CCL_bibset load_cclfile(const char *fn)
1301 {
1302     CCL_bibset res = ccl_qual_mk();
1303     if (ccl_qual_fname(res, fn) < 0)
1304     {
1305         yaz_log(YLOG_FATAL|YLOG_ERRNO, "%s", fn);
1306         exit(1);
1307     }
1308     return res;
1309 }
1310
1311 int main(int argc, char **argv)
1312 {
1313     int ret;
1314     char *arg;
1315     int setport = 0;
1316
1317     if (signal(SIGPIPE, SIG_IGN) < 0)
1318         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
1319
1320     yaz_log_init(YLOG_DEFAULT_LEVEL|YLOG_DEBUG, "pazpar2", 0);
1321
1322     while ((ret = options("c:h:p:C:s:", argv, argc, &arg)) != -2)
1323     {
1324         switch (ret) {
1325             case 'c':
1326                 command_init(atoi(arg));
1327                 setport++;
1328                 break;
1329             case 'h':
1330                 http_init(atoi(arg));
1331                 setport++;
1332                 break;
1333             case 'C':
1334                 global_parameters.ccl_filter = load_cclfile(arg);
1335                 break;
1336             case 'p':
1337                 http_set_proxyaddr(arg);
1338                 break;
1339             case 's':
1340                 load_simpletargets(arg);
1341                 break;
1342             default:
1343                 fprintf(stderr, "Usage: pazpar2\n"
1344                         "    -h httpport             (REST)\n"
1345                         "    -c cmdport              (telnet-style)\n"
1346                         "    -C cclconfig\n"
1347                         "    -s simpletargetfile\n"
1348                         "    -p hostname[:portno]    (HTTP proxy)\n");
1349                 exit(1);
1350         }
1351     }
1352
1353     if (!setport)
1354     {
1355         fprintf(stderr, "Set command port with -h or -c\n");
1356         exit(1);
1357     }
1358
1359     global_parameters.ccl_filter = load_cclfile("default.bib");
1360     global_parameters.yaz_marc = yaz_marc_create();
1361     yaz_marc_subfield_str(global_parameters.yaz_marc, "\t");
1362     global_parameters.odr_in = odr_createmem(ODR_DECODE);
1363     global_parameters.odr_out = odr_createmem(ODR_ENCODE);
1364
1365     event_loop(&channel_list);
1366
1367     return 0;
1368 }
1369
1370 /*
1371  * Local variables:
1372  * c-basic-offset: 4
1373  * indent-tabs-mode: nil
1374  * End:
1375  * vim: shiftwidth=4 tabstop=8 expandtab
1376  */