Implemented session timeout; ping command
[pazpar2-moved-to-github.git] / pazpar2.c
1 /* $Id: pazpar2.c,v 1.12 2006-12-12 02:36:24 quinn Exp $ */;
2
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <sys/time.h>
7 #include <unistd.h>
8 #include <sys/socket.h>
9 #include <netdb.h>
10 #include <signal.h>
11 #include <ctype.h>
12 #include <assert.h>
13
14 #include <yaz/comstack.h>
15 #include <yaz/tcpip.h>
16 #include <yaz/proto.h>
17 #include <yaz/readconf.h>
18 #include <yaz/pquery.h>
19 #include <yaz/yaz-util.h>
20
21 #include "pazpar2.h"
22 #include "eventl.h"
23 #include "command.h"
24 #include "http.h"
25 #include "termlists.h"
26 #include "reclists.h"
27 #include "relevance.h"
28
29 #define PAZPAR2_VERSION "0.1"
30 #define MAX_CHUNK 15
31
32 static void client_fatal(struct client *cl);
33 static void connection_destroy(struct connection *co);
34 static int client_prep_connection(struct client *cl);
35 static void ingest_records(struct client *cl, Z_Records *r);
36
37 IOCHAN channel_list = 0;  // Master list of connections we're listening to.
38
39 static struct connection *connection_freelist = 0;
40 static struct client *client_freelist = 0;
41
42 static struct host *hosts = 0;  // The hosts we know about 
43 static struct database *databases = 0; // The databases we know about
44
45 static char *client_states[] = {
46     "Client_Connecting",
47     "Client_Connected",
48     "Client_Idle",
49     "Client_Initializing",
50     "Client_Searching",
51     "Client_Presenting",
52     "Client_Error",
53     "Client_Failed",
54     "Client_Disconnected",
55     "Client_Stopped"
56 };
57
58 struct parameters global_parameters = 
59 {
60     30,
61     "81",
62     "Index Data PazPar2 (MasterKey)",
63     PAZPAR2_VERSION,
64     600, // 10 minutes
65     60,
66     100,
67     MAX_CHUNK,
68     0,
69     0,
70     0,
71     0
72 };
73
74
75 static int send_apdu(struct client *c, Z_APDU *a)
76 {
77     struct connection *co = c->connection;
78     char *buf;
79     int len, r;
80
81     if (!z_APDU(global_parameters.odr_out, &a, 0, 0))
82     {
83         odr_perror(global_parameters.odr_out, "Encoding APDU");
84         abort();
85     }
86     buf = odr_getbuf(global_parameters.odr_out, &len, 0);
87     r = cs_put(co->link, buf, len);
88     if (r < 0)
89     {
90         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link)));
91         return -1;
92     }
93     else if (r == 1)
94     {
95         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
96         exit(1);
97     }
98     odr_reset(global_parameters.odr_out); /* release the APDU structure  */
99     co->state = Conn_Waiting;
100     return 0;
101 }
102
103
104 static void send_init(IOCHAN i)
105 {
106     struct connection *co = iochan_getdata(i);
107     struct client *cl = co->client;
108     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest);
109
110     a->u.initRequest->implementationId = global_parameters.implementationId;
111     a->u.initRequest->implementationName = global_parameters.implementationName;
112     a->u.initRequest->implementationVersion =
113         global_parameters.implementationVersion;
114     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
115     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
116     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
117
118     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
119     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
120     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
121     if (send_apdu(cl, a) >= 0)
122     {
123         iochan_setflags(i, EVENT_INPUT);
124         cl->state = Client_Initializing;
125     }
126     else
127         cl->state = Client_Error;
128     odr_reset(global_parameters.odr_out);
129 }
130
131 static void send_search(IOCHAN i)
132 {
133     struct connection *co = iochan_getdata(i);
134     struct client *cl = co->client; 
135     struct session *se = cl->session;
136     struct database *db = cl->database;
137     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest);
138     int ndb, cerror, cpos;
139     char **databaselist;
140     Z_Query *zquery;
141     struct ccl_rpn_node *cn;
142     int ssub = 0, lslb = 100000, mspn = 10;
143
144     yaz_log(YLOG_DEBUG, "Sending search");
145
146     cn = ccl_find_str(global_parameters.ccl_filter, se->query, &cerror, &cpos);
147     if (!cn)
148         return;
149     a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out,
150             sizeof(Z_Query));
151     zquery->which = Z_Query_type_1;
152     zquery->u.type_1 = ccl_rpn_query(global_parameters.odr_out, cn);
153     ccl_rpn_delete(cn);
154
155     for (ndb = 0; *db->databases[ndb]; ndb++)
156         ;
157     databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb);
158     for (ndb = 0; *db->databases[ndb]; ndb++)
159         databaselist[ndb] = db->databases[ndb];
160
161     a->u.presentRequest->preferredRecordSyntax =
162             yaz_oidval_to_z3950oid(global_parameters.odr_out,
163             CLASS_RECSYN, VAL_USMARC);
164     a->u.searchRequest->smallSetUpperBound = &ssub;
165     a->u.searchRequest->largeSetLowerBound = &lslb;
166     a->u.searchRequest->mediumSetPresentNumber = &mspn;
167     a->u.searchRequest->resultSetName = "Default";
168     a->u.searchRequest->databaseNames = databaselist;
169     a->u.searchRequest->num_databaseNames = ndb;
170
171     if (send_apdu(cl, a) >= 0)
172     {
173         iochan_setflags(i, EVENT_INPUT);
174         cl->state = Client_Searching;
175         cl->requestid = se->requestid;
176     }
177     else
178         cl->state = Client_Error;
179
180     odr_reset(global_parameters.odr_out);
181 }
182
183 static void send_present(IOCHAN i)
184 {
185     struct connection *co = iochan_getdata(i);
186     struct client *cl = co->client; 
187     Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest);
188     int toget;
189     int start = cl->records + 1;
190
191     toget = global_parameters.chunk;
192     if (toget > cl->hits - cl->records)
193         toget = cl->hits - cl->records;
194
195     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
196
197     a->u.presentRequest->resultSetStartPoint = &start;
198     a->u.presentRequest->numberOfRecordsRequested = &toget;
199
200     a->u.presentRequest->resultSetId = "Default";
201
202     a->u.presentRequest->preferredRecordSyntax =
203             yaz_oidval_to_z3950oid(global_parameters.odr_out,
204             CLASS_RECSYN, VAL_USMARC);
205
206     if (send_apdu(cl, a) >= 0)
207     {
208         iochan_setflags(i, EVENT_INPUT);
209         cl->state = Client_Presenting;
210     }
211     else
212         cl->state = Client_Error;
213     odr_reset(global_parameters.odr_out);
214 }
215
216 static void do_initResponse(IOCHAN i, Z_APDU *a)
217 {
218     struct connection *co = iochan_getdata(i);
219     struct client *cl = co->client;
220     Z_InitResponse *r = a->u.initResponse;
221
222     yaz_log(YLOG_DEBUG, "Received init response");
223
224     if (*r->result)
225     {
226         cl->state = Client_Idle;
227     }
228     else
229         cl->state = Client_Failed; // FIXME need to do something to the connection
230 }
231
232 static void do_searchResponse(IOCHAN i, Z_APDU *a)
233 {
234     struct connection *co = iochan_getdata(i);
235     struct client *cl = co->client;
236     struct session *se = cl->session;
237     Z_SearchResponse *r = a->u.searchResponse;
238
239     yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
240
241     if (*r->searchStatus)
242     {
243         cl->hits = *r->resultCount;
244         se->total_hits += cl->hits;
245         if (r->presentStatus && !*r->presentStatus && r->records)
246         {
247             yaz_log(YLOG_DEBUG, "Records in search response");
248             cl->records += *r->numberOfRecordsReturned;
249             ingest_records(cl, r->records);
250         }
251         cl->state = Client_Idle;
252     }
253     else
254     {          /*"FAILED"*/
255         cl->hits = 0;
256         cl->state = Client_Error;
257         if (r->records) {
258             Z_Records *recs = r->records;
259             if (recs->which == Z_Records_NSD)
260             {
261                 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
262                 cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
263                 cl->state = Client_Error;
264             }
265         }
266     }
267 }
268
269 const char *find_field(const char *rec, const char *field)
270 {
271     char lbuf[5];
272     char *line;
273
274     lbuf[0] = '\n';
275     strcpy(lbuf + 1, field);
276
277     if ((line = strstr(rec, lbuf)))
278         return ++line;
279     else
280         return 0;
281 }
282
283 const char *find_subfield(const char *field, char subfield)
284 {
285     const char *p = field;
286
287     while (*p && *p != '\n')
288     {
289         while (*p != '\n' && *p != '\t')
290             p++;
291         if (*p == '\t' && *(++p) == subfield) {
292             if (*(++p) == ' ')
293             {
294                 while (isspace(*p))
295                     p++;
296                 return p;
297             }
298         }
299     }
300     return 0;
301 }
302
303 // Extract 245 $a $b 100 $a
304 char *extract_title(struct session *s, const char *rec)
305 {
306     const char *field, *subfield;
307     char *e, *ef;
308     unsigned char *obuf, *p;
309
310     wrbuf_rewind(s->wrbuf);
311
312     if (!(field = find_field(rec, "245")))
313         return 0;
314     if (!(subfield = find_subfield(field, 'a')))
315         return 0;
316     ef = index(subfield, '\n');
317     if ((e = index(subfield, '\t')) && e < ef)
318         ef = e;
319     if (ef)
320     {
321         wrbuf_write(s->wrbuf, subfield, ef - subfield);
322         if ((subfield = find_subfield(field, 'b'))) 
323         {
324             ef = index(subfield, '\n');
325             if ((e = index(subfield, '\t')) && e < ef)
326                 ef = e;
327             if (ef)
328             {
329                 wrbuf_putc(s->wrbuf, ' ');
330                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
331             }
332         }
333     }
334     if ((field = find_field(rec, "100")))
335     {
336         if ((subfield = find_subfield(field, 'a')))
337         {
338             ef = index(subfield, '\n');
339             if ((e = index(subfield, '\t')) && e < ef)
340                 ef = e;
341             if (ef)
342             {
343                 wrbuf_puts(s->wrbuf, ", by ");
344                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
345             }
346         }
347     }
348     wrbuf_putc(s->wrbuf, '\0');
349     obuf = (unsigned char*) nmem_strdup(s->nmem, wrbuf_buf(s->wrbuf));
350     for (p = obuf; *p; p++)
351         if (*p == '&' || *p == '<' || *p > 122 || *p < ' ')
352             *p = ' ';
353     return (char*) obuf;
354 }
355
356 // Extract 245 $a $b 100 $a
357 char *extract_mergekey(struct session *s, const char *rec)
358 {
359     const char *field, *subfield;
360     char *e, *ef;
361     char *out, *p, *pout;
362
363     wrbuf_rewind(s->wrbuf);
364
365     if (!(field = find_field(rec, "245")))
366         return 0;
367     if (!(subfield = find_subfield(field, 'a')))
368         return 0;
369     ef = index(subfield, '\n');
370     if ((e = index(subfield, '\t')) && e < ef)
371         ef = e;
372     if (ef)
373     {
374         wrbuf_write(s->wrbuf, subfield, ef - subfield);
375         if ((subfield = find_subfield(field, 'b'))) 
376         {
377             ef = index(subfield, '\n');
378             if ((e = index(subfield, '\t')) && e < ef)
379                 ef = e;
380             if (ef)
381             {
382                 wrbuf_puts(s->wrbuf, " field "); 
383                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
384             }
385         }
386     }
387     if ((field = find_field(rec, "100")))
388     {
389         if ((subfield = find_subfield(field, 'a')))
390         {
391             ef = index(subfield, '\n');
392             if ((e = index(subfield, '\t')) && e < ef)
393                 ef = e;
394             if (ef)
395             {
396                 wrbuf_puts(s->wrbuf, " field "); 
397                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
398             }
399         }
400     }
401     wrbuf_putc(s->wrbuf, '\0');
402     p = wrbuf_buf(s->wrbuf);
403     out = pout = nmem_malloc(s->nmem, strlen(p) + 1);
404
405     while (*p)
406     {
407         while (isalnum(*p))
408             *(pout++) = tolower(*(p++));
409         while (*p && !isalnum(*p))
410             p++;
411         *(pout++) = ' ';
412     }
413     if (out != pout)
414         *(--pout) = '\0';
415
416     return out;
417 }
418
419 #ifdef RECHEAP
420 static void push_record(struct session *s, struct record *r)
421 {
422     int p;
423     assert(s->recheap_max + 1 < s->recheap_size);
424
425     s->recheap[p = ++s->recheap_max] = r;
426     while (p > 0)
427     {
428         int parent = (p - 1) >> 1;
429         if (strcmp(s->recheap[p]->merge_key, s->recheap[parent]->merge_key) < 0)
430         {
431             struct record *tmp;
432             tmp = s->recheap[parent];
433             s->recheap[parent] = s->recheap[p];
434             s->recheap[p] = tmp;
435             p = parent;
436         }
437         else
438             break;
439     }
440 }
441
442 static struct record *top_record(struct session *s)
443 {
444     return s-> recheap_max >= 0 ?  s->recheap[0] : 0;
445 }
446
447 static struct record *pop_record(struct session *s)
448 {
449     struct record *res;
450     int p = 0;
451     int lastnonleaf = (s->recheap_max - 1) >> 1;
452
453     if (s->recheap_max < 0)
454         return 0;
455
456     res = s->recheap[0];
457
458     s->recheap[p] = s->recheap[s->recheap_max--];
459
460     while (p <= lastnonleaf)
461     {
462         int right = (p + 1) << 1;
463         int left = right - 1;
464         int min = left;
465
466         if (right < s->recheap_max &&
467                 strcmp(s->recheap[right]->merge_key, s->recheap[left]->merge_key) < 0)
468             min = right;
469         if (strcmp(s->recheap[min]->merge_key, s->recheap[p]->merge_key) < 0)
470         {
471             struct record *tmp = s->recheap[min];
472             s->recheap[min] = s->recheap[p];
473             s->recheap[p] = tmp;
474             p = min;
475         }
476         else
477             break;
478     }
479     return res;
480 }
481
482 // Like pop_record but collapses identical (merge_key) records
483 // The heap will contain multiple independent matching records and possibly
484 // one cluster, created the last time the list was scanned
485 static struct record *pop_mrecord(struct session *s)
486 {
487     struct record *this;
488     struct record *next;
489
490     if (!(this = pop_record(s)))
491         return 0;
492
493     // Collapse identical records
494     while ((next = top_record(s)))
495     {
496         struct record *p, *tmpnext;
497         if (strcmp(this->merge_key, next->merge_key))
498             break;
499         // Absorb record (and clustersiblings) into a supercluster
500         for (p = next; p; p = tmpnext) {
501             tmpnext = p->next_cluster;
502             p->next_cluster = this->next_cluster;
503             this->next_cluster = p;
504         }
505
506         pop_record(s);
507     }
508     return this;
509 }
510
511 // Reads records in sort order. Store records in top of heapspace until rewind is called.
512 static struct record *read_recheap(struct session *s)
513 {
514     struct record *r = pop_mrecord(s);
515
516     if (r)
517     {
518         if (s->recheap_scratch < 0)
519             s->recheap_scratch = s->recheap_size;
520         s->recheap[--s->recheap_scratch] = r;
521     }
522
523     return r;
524 }
525
526 // Return records to heap after read
527 static void rewind_recheap(struct session *s)
528 {
529     while (s->recheap_scratch >= 0) {
530         push_record(s, s->recheap[s->recheap_scratch++]);
531         if (s->recheap_scratch >= s->recheap_size)
532             s->recheap_scratch = -1;
533     }
534 }
535
536 #endif
537
538 // FIXME needs to be generalized. Should flexibly generate X lists per search
539 static void extract_subject(struct session *s, const char *rec)
540 {
541     const char *field, *subfield;
542
543     while ((field = find_field(rec, "650")))
544     {
545         rec = field; 
546         if ((subfield = find_subfield(field, 'a')))
547         {
548             char *e, *ef;
549             char buf[1024];
550             int len;
551
552             ef = index(subfield, '\n');
553             if (!ef)
554                 return;
555             if ((e = index(subfield, '\t')) && e < ef)
556                 ef = e;
557             while (ef > subfield && !isalpha(*(ef - 1)) && *(ef - 1) != ')')
558                 ef--;
559             len = ef - subfield;
560             assert(len < 1023);
561             memcpy(buf, subfield, len);
562             buf[len] = '\0';
563             if (*buf)
564                 termlist_insert(s->termlist, buf);
565         }
566     }
567 }
568
569 static void pull_relevance_field(struct session *s, struct record *head, const char *rec,
570         char *field, int mult)
571 {
572     const char *fb;
573     while ((fb = find_field(rec, field)))
574     {
575         char *ffield = strchr(fb, '\t');
576         if (!ffield)
577             return;
578         char *eol = strchr(ffield, '\n');
579         if (!eol)
580             return;
581         relevance_countwords(s->relevance, head, ffield, eol - ffield, mult);
582         rec = field + 1; // Crude way to cause a loop through repeating fields
583     }
584 }
585
586 static void pull_relevance_keys(struct session *s, struct record *head,  struct record *rec)
587 {
588     relevance_newrec(s->relevance, head);
589     pull_relevance_field(s, head, rec->buf, "100", 2);
590     pull_relevance_field(s, head, rec->buf, "245", 4);
591     //pull_relevance_field(s, head, rec->buf, "530", 1);
592     pull_relevance_field(s, head, rec->buf, "630", 1);
593     pull_relevance_field(s, head, rec->buf, "650", 1);
594     pull_relevance_field(s, head, rec->buf, "700", 1);
595     relevance_donerecord(s->relevance, head);
596 }
597
598 static struct record *ingest_record(struct client *cl, char *buf, int len)
599 {
600     struct session *se = cl->session;
601     struct record *res;
602     struct record *head;
603     const char *recbuf;
604
605     wrbuf_rewind(se->wrbuf);
606     yaz_marc_xml(global_parameters.yaz_marc, YAZ_MARC_LINE);
607     if (yaz_marc_decode_wrbuf(global_parameters.yaz_marc, buf, len, se->wrbuf) < 0)
608     {
609         yaz_log(YLOG_WARN, "Failed to decode MARC record");
610         return 0;
611     }
612     wrbuf_putc(se->wrbuf, '\0');
613     recbuf = wrbuf_buf(se->wrbuf);
614
615     res = nmem_malloc(se->nmem, sizeof(struct record));
616     res->buf = nmem_strdup(se->nmem, recbuf);
617
618     extract_subject(se, res->buf);
619
620     res->title = extract_title(se, res->buf);
621     res->merge_key = extract_mergekey(se, res->buf);
622     if (!res->merge_key)
623         return 0;
624     res->client = cl;
625     res->next_cluster = 0;
626     res->target_offset = -1;
627     res->term_frequency_vec = 0;
628
629     head = reclist_insert(se->reclist, res);
630
631     pull_relevance_keys(se, head, res);
632
633     se->total_records++;
634
635     return res;
636 }
637
638 static void ingest_records(struct client *cl, Z_Records *r)
639 {
640     struct record *rec;
641     Z_NamePlusRecordList *rlist;
642     int i;
643
644     if (r->which != Z_Records_DBOSD)
645         return;
646     rlist = r->u.databaseOrSurDiagnostics;
647     for (i = 0; i < rlist->num_records; i++)
648     {
649         Z_NamePlusRecord *npr = rlist->records[i];
650         Z_External *e;
651         char *buf;
652         int len;
653
654         if (npr->which != Z_NamePlusRecord_databaseRecord)
655         {
656             yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
657             continue;
658         }
659         e = npr->u.databaseRecord;
660         if (e->which != Z_External_octet)
661         {
662             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
663             continue;
664         }
665         buf = (char*) e->u.octet_aligned->buf;
666         len = e->u.octet_aligned->len;
667
668         rec = ingest_record(cl, buf, len);
669         if (!rec)
670             continue;
671     }
672 }
673
674 static void do_presentResponse(IOCHAN i, Z_APDU *a)
675 {
676     struct connection *co = iochan_getdata(i);
677     struct client *cl = co->client;
678     Z_PresentResponse *r = a->u.presentResponse;
679
680     if (r->records) {
681         Z_Records *recs = r->records;
682         if (recs->which == Z_Records_NSD)
683         {
684             yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
685             cl->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
686             cl->state = Client_Error;
687         }
688     }
689
690     if (!*r->presentStatus && cl->state != Client_Error)
691     {
692         yaz_log(YLOG_DEBUG, "Good Present response");
693         cl->records += *r->numberOfRecordsReturned;
694         ingest_records(cl, r->records);
695         cl->state = Client_Idle;
696     }
697     else if (*r->presentStatus) 
698     {
699         yaz_log(YLOG_WARN, "Bad Present response");
700         cl->state = Client_Error;
701     }
702 }
703
704 static void handler(IOCHAN i, int event)
705 {
706     struct connection *co = iochan_getdata(i);
707     struct client *cl = co->client;
708     struct session *se = 0;
709
710     if (cl)
711         se = cl->session;
712
713     if (co->state == Conn_Connecting && event & EVENT_OUTPUT)
714     {
715         int errcode;
716         socklen_t errlen = sizeof(errcode);
717
718         if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, &errcode,
719             &errlen) < 0 || errcode != 0)
720         {
721             client_fatal(cl);
722             return;
723         }
724         else
725         {
726             yaz_log(YLOG_DEBUG, "Connect OK");
727             co->state = Conn_Open;
728             if (cl)
729                 cl->state = Client_Connected;
730         }
731     }
732
733     else if (event & EVENT_INPUT)
734     {
735         int len = cs_get(co->link, &co->ibuf, &co->ibufsize);
736
737         if (len < 0)
738         {
739             client_fatal(cl);
740             return;
741         }
742         else if (len == 0)
743         {
744             client_fatal(cl);
745             return;
746         }
747         else if (len > 1) // We discard input if we have no connection
748         {
749             co->state = Conn_Open;
750
751             if (cl && (cl->requestid == se->requestid || cl->state == Client_Initializing))
752             {
753                 Z_APDU *a;
754
755                 odr_reset(global_parameters.odr_in);
756                 odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0);
757                 if (!z_APDU(global_parameters.odr_in, &a, 0, 0))
758                 {
759                     client_fatal(cl);
760                     return;
761                 }
762                 switch (a->which)
763                 {
764                     case Z_APDU_initResponse:
765                         do_initResponse(i, a);
766                         break;
767                     case Z_APDU_searchResponse:
768                         do_searchResponse(i, a);
769                         break;
770                     case Z_APDU_presentResponse:
771                         do_presentResponse(i, a);
772                         break;
773                     default:
774                         yaz_log(YLOG_WARN, "Unexpected result from server");
775                         client_fatal(cl);
776                         return;
777                 }
778                 // We aren't expecting staggered output from target
779                 // if (cs_more(t->link))
780                 //    iochan_setevent(i, EVENT_INPUT);
781             }
782             else  // we throw away response and go to idle mode
783             {
784                 yaz_log(YLOG_DEBUG, "Ignoring result of expired operation");
785                 cl->state = Client_Idle;
786             }
787         }
788         /* if len==1 we do nothing but wait for more input */
789     }
790
791     if (cl->state == Client_Connected) {
792         send_init(i);
793     }
794
795     if (cl->state == Client_Idle)
796     {
797         if (cl->requestid != se->requestid && *se->query) {
798             send_search(i);
799         }
800         else if (cl->hits > 0 && cl->records < global_parameters.toget &&
801             cl->records < cl->hits) {
802             send_present(i);
803         }
804     }
805 }
806
807 // Disassociate connection from client
808 static void connection_release(struct connection *co)
809 {
810     struct client *cl = co->client;
811
812     yaz_log(YLOG_DEBUG, "Connection release %s", co->host->hostport);
813     if (!cl)
814         return;
815     cl->connection = 0;
816     co->client = 0;
817 }
818
819 // Close connection and recycle structure
820 static void connection_destroy(struct connection *co)
821 {
822     struct host *h = co->host;
823     cs_close(co->link);
824     iochan_destroy(co->iochan);
825
826     yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport);
827     if (h->connections == co)
828         h->connections = co->next;
829     else
830     {
831         struct connection *pco;
832         for (pco = h->connections; pco && pco->next != co; pco = pco->next)
833             ;
834         if (pco)
835             pco->next = co->next;
836         else
837             abort();
838     }
839     if (co->client)
840     {
841         if (co->client->state != Client_Idle)
842             co->client->state = Client_Disconnected;
843         co->client->connection = 0;
844     }
845     co->next = connection_freelist;
846     connection_freelist = co;
847 }
848
849 // Creates a new connection for client, associated with the host of 
850 // client's database
851 static struct connection *connection_create(struct client *cl)
852 {
853     struct connection *new;
854     COMSTACK link; 
855     int res;
856     void *addr;
857
858     yaz_log(YLOG_DEBUG, "Connection create %s", cl->database->url);
859     if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950)))
860     {
861         yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
862         exit(1);
863     }
864
865     if (!(addr = cs_straddr(link, cl->database->host->ipport)))
866     {
867         yaz_log(YLOG_WARN|YLOG_ERRNO, "Lookup of IP address failed?");
868         return 0;
869     }
870
871     res = cs_connect(link, addr);
872     if (res < 0)
873     {
874         yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", cl->database->url);
875         return 0;
876     }
877
878     if ((new = connection_freelist))
879         connection_freelist = new->next;
880     else
881     {
882         new = xmalloc(sizeof (struct connection));
883         new->ibuf = 0;
884         new->ibufsize = 0;
885     }
886     new->state = Conn_Connecting;
887     new->host = cl->database->host;
888     new->next = new->host->connections;
889     new->host->connections = new;
890     new->client = cl;
891     cl->connection = new;
892     new->link = link;
893
894     new->iochan = iochan_create(cs_fileno(link), handler, 0);
895     iochan_setdata(new->iochan, new);
896     new->iochan->next = channel_list;
897     channel_list = new->iochan;
898     return new;
899 }
900
901 // Close connection and set state to error
902 static void client_fatal(struct client *cl)
903 {
904     yaz_log(YLOG_WARN, "Fatal error from %s", cl->database->url);
905     connection_destroy(cl->connection);
906     cl->state = Client_Error;
907 }
908
909 // Ensure that client has a connection associated
910 static int client_prep_connection(struct client *cl)
911 {
912     struct connection *co;
913     struct session *se = cl->session;
914     struct host *host = cl->database->host;
915
916     co = cl->connection;
917
918     yaz_log(YLOG_DEBUG, "Client prep %s", cl->database->url);
919
920     if (!co)
921     {
922         // See if someone else has an idle connection
923         // We should look at timestamps here to select the longest-idle connection
924         for (co = host->connections; co; co = co->next)
925             if (co->state == Conn_Open && (!co->client || co->client->session != se))
926                 break;
927         if (co)
928         {
929             connection_release(co);
930             cl->connection = co;
931             co->client = cl;
932         }
933         else
934             co = connection_create(cl);
935     }
936     if (co)
937     {
938         if (co->state == Conn_Connecting)
939             cl->state = Client_Connecting;
940         else if (co->state == Conn_Open)
941         {
942             if (cl->state == Client_Error || cl->state == Client_Disconnected)
943                 cl->state = Client_Idle;
944         }
945         iochan_setflag(co->iochan, EVENT_OUTPUT);
946         return 1;
947     }
948     else
949         return 0;
950 }
951
952 void load_simpletargets(const char *fn)
953 {
954     FILE *f = fopen(fn, "r");
955     char line[256];
956
957     if (!f)
958     {
959         yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
960         exit(1);
961     }
962
963     while (fgets(line, 255, f))
964     {
965         char *url, *db;
966         struct host *host;
967         struct database *database;
968
969         if (strncmp(line, "target ", 7))
970             continue;
971         url = line + 7;
972         url[strlen(url) - 1] = '\0';
973         yaz_log(LOG_DEBUG, "Target: %s", url);
974         if ((db = strchr(url, '/')))
975             *(db++) = '\0';
976         else
977             db = "Default";
978
979         for (host = hosts; host; host = host->next)
980             if (!strcmp(url, host->hostport))
981                 break;
982         if (!host)
983         {
984             struct addrinfo *addrinfo, hints;
985             char *port;
986             char ipport[128];
987             unsigned char addrbuf[4];
988             int res;
989
990             host = xmalloc(sizeof(struct host));
991             host->hostport = xstrdup(url);
992             host->connections = 0;
993
994             if ((port = strchr(url, ':')))
995                 *(port++) = '\0';
996             else
997                 port = "210";
998
999             hints.ai_flags = 0;
1000             hints.ai_family = PF_INET;
1001             hints.ai_socktype = SOCK_STREAM;
1002             hints.ai_protocol = IPPROTO_TCP;
1003             hints.ai_addrlen = 0;
1004             hints.ai_addr = 0;
1005             hints.ai_canonname = 0;
1006             hints.ai_next = 0;
1007             // This is not robust code. It assumes that getaddrinfo returns AF_INET
1008             // address.
1009             if ((res = getaddrinfo(url, port, &hints, &addrinfo)))
1010             {
1011                 yaz_log(YLOG_WARN, "Failed to resolve %s: %s", url, gai_strerror(res));
1012                 continue;
1013             }
1014             assert(addrinfo->ai_family == PF_INET);
1015             memcpy(addrbuf, &((struct sockaddr_in*)addrinfo->ai_addr)->sin_addr.s_addr, 4);
1016             sprintf(ipport, "%hhd.%hhd.%hhd.%hhd:%s",
1017                     addrbuf[0], addrbuf[1], addrbuf[2], addrbuf[3], port);
1018             host->ipport = xstrdup(ipport);
1019             freeaddrinfo(addrinfo);
1020             host->next = hosts;
1021             hosts = host;
1022         }
1023         database = xmalloc(sizeof(struct database));
1024         database->host = host;
1025         database->url = xmalloc(strlen(url) + strlen(db) + 2);
1026         strcpy(database->url, url);
1027         strcat(database->url, "/");
1028         strcat(database->url, db);
1029         strcpy(database->databases[0], db);
1030         *database->databases[1] = '\0';
1031         database->errors = 0;
1032         database->next = databases;
1033         databases = database;
1034
1035     }
1036     fclose(f);
1037 }
1038
1039 static void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num)
1040 {
1041     switch (n->kind)
1042     {
1043         case CCL_RPN_AND:
1044         case CCL_RPN_OR:
1045         case CCL_RPN_NOT:
1046         case CCL_RPN_PROX:
1047             pull_terms(nmem, n->u.p[0], termlist, num);
1048             pull_terms(nmem, n->u.p[1], termlist, num);
1049             break;
1050         case CCL_RPN_TERM:
1051             termlist[(*num)++] = nmem_strdup(nmem, n->u.t.term);
1052             break;
1053         default: // NOOP
1054             break;
1055     }
1056 }
1057
1058 // Extract terms from query into null-terminated termlist
1059 static int extract_terms(NMEM nmem, char *query, char **termlist)
1060 {
1061     int error, pos;
1062     struct ccl_rpn_node *n;
1063     int num = 0;
1064
1065     n = ccl_find_str(global_parameters.ccl_filter, query, &error, &pos);
1066     if (!n)
1067         return -1;
1068     pull_terms(nmem, n, termlist, &num);
1069     termlist[num] = 0;
1070     ccl_rpn_delete(n);
1071     return 0;
1072 }
1073
1074 static struct client *client_create(void)
1075 {
1076     struct client *r;
1077     if (client_freelist)
1078     {
1079         r = client_freelist;
1080         client_freelist = client_freelist->next;
1081     }
1082     else
1083         r = xmalloc(sizeof(struct client));
1084     r->database = 0;
1085     r->connection = 0;
1086     r->session = 0;
1087     r->hits = 0;
1088     r->records = 0;
1089     r->setno = 0;
1090     r->requestid = -1;
1091     r->diagnostic = 0;
1092     r->state = Client_Disconnected;
1093     r->next = 0;
1094     return r;
1095 }
1096
1097 void client_destroy(struct client *c)
1098 {
1099     struct session *se = c->session;
1100     if (c == se->clients)
1101         se->clients = c->next;
1102     else
1103     {
1104         struct client *cc;
1105         for (cc = se->clients; cc && cc->next != c; cc = cc->next)
1106             ;
1107         if (cc)
1108             cc->next = c->next;
1109     }
1110     if (c->connection)
1111         connection_release(c->connection);
1112     c->next = client_freelist;
1113     client_freelist = c;
1114 }
1115
1116 // This should be extended with parameters to control selection criteria
1117 // Associates a set of clients with a session;
1118 int select_targets(struct session *se)
1119 {
1120     struct database *db;
1121     int c = 0;
1122
1123     while (se->clients)
1124         client_destroy(se->clients);
1125     for (db = databases; db; db = db->next)
1126     {
1127         struct client *cl = client_create();
1128         cl->database = db;
1129         cl->session = se;
1130         cl->next = se->clients;
1131         se->clients = cl;
1132         c++;
1133     }
1134     return c;
1135 }
1136
1137 char *search(struct session *se, char *query)
1138 {
1139     int live_channels = 0;
1140     struct client *cl;
1141
1142     yaz_log(YLOG_DEBUG, "Search");
1143
1144     strcpy(se->query, query);
1145     se->requestid++;
1146     nmem_reset(se->nmem);
1147     for (cl = se->clients; cl; cl = cl->next)
1148     {
1149         cl->hits = -1;
1150         cl->records = 0;
1151         cl->diagnostic = 0;
1152
1153         if (client_prep_connection(cl))
1154             live_channels++;
1155     }
1156     if (live_channels)
1157     {
1158         char *p[512];
1159         int maxrecs = live_channels * global_parameters.toget;
1160         se->termlist = termlist_create(se->nmem, maxrecs, 15);
1161         se->reclist = reclist_create(se->nmem, maxrecs);
1162         extract_terms(se->nmem, query, p);
1163         se->relevance = relevance_create(se->nmem, (const char **) p, maxrecs);
1164         se->total_records = se->total_hits = 0;
1165     }
1166     else
1167         return "NOTARGETS";
1168
1169     return 0;
1170 }
1171
1172 void destroy_session(struct session *s)
1173 {
1174     yaz_log(YLOG_LOG, "Destroying session");
1175     while (s->clients)
1176         client_destroy(s->clients);
1177     nmem_destroy(s->nmem);
1178     wrbuf_free(s->wrbuf, 1);
1179 }
1180
1181 struct session *new_session() 
1182 {
1183     struct session *session = xmalloc(sizeof(*session));
1184
1185     yaz_log(YLOG_DEBUG, "New pazpar2 session");
1186     
1187     session->total_hits = 0;
1188     session->total_records = 0;
1189     session->termlist = 0;
1190     session->reclist = 0;
1191     session->requestid = -1;
1192     session->clients = 0;
1193     session->query[0] = '\0';
1194     session->nmem = nmem_create();
1195     session->wrbuf = wrbuf_alloc();
1196
1197     select_targets(session);
1198
1199     return session;
1200 }
1201
1202 struct hitsbytarget *hitsbytarget(struct session *se, int *count)
1203 {
1204     static struct hitsbytarget res[1000]; // FIXME MM
1205     struct client *cl;
1206
1207     *count = 0;
1208     for (cl = se->clients; cl; cl = cl->next)
1209     {
1210         strcpy(res[*count].id, cl->database->host->hostport);
1211         res[*count].hits = cl->hits;
1212         res[*count].records = cl->records;
1213         res[*count].diagnostic = cl->diagnostic;
1214         res[*count].state = client_states[cl->state];
1215         res[*count].connected  = cl->connection ? 1 : 0;
1216         (*count)++;
1217     }
1218
1219     return res;
1220 }
1221
1222 struct termlist_score **termlist(struct session *s, int *num)
1223 {
1224     return termlist_highscore(s->termlist, num);
1225 }
1226
1227 struct record **show(struct session *s, int start, int *num, int *total, int *sumhits)
1228 {
1229     struct record **recs = nmem_malloc(s->nmem, *num * sizeof(struct record *));
1230     int i;
1231
1232     relevance_prepare_read(s->relevance, s->reclist);
1233
1234     *total = s->reclist->num_records;
1235     *sumhits = s->total_hits;
1236
1237     for (i = 0; i < start; i++)
1238         if (!reclist_read_record(s->reclist))
1239         {
1240             *num = 0;
1241             return 0;
1242         }
1243
1244     for (i = 0; i < *num; i++)
1245     {
1246         struct record *r = reclist_read_record(s->reclist);
1247         if (!r)
1248         {
1249             *num = i;
1250             break;
1251         }
1252         recs[i] = r;
1253     }
1254     return recs;
1255 }
1256
1257 void statistics(struct session *se, struct statistics *stat)
1258 {
1259     struct client *cl;
1260     int count = 0;
1261
1262     bzero(stat, sizeof(*stat));
1263     for (cl = se->clients; cl; cl = cl->next)
1264     {
1265         if (!cl->connection)
1266             stat->num_no_connection++;
1267         switch (cl->state)
1268         {
1269             case Client_Connecting: stat->num_connecting++; break;
1270             case Client_Initializing: stat->num_initializing++; break;
1271             case Client_Searching: stat->num_searching++; break;
1272             case Client_Presenting: stat->num_presenting++; break;
1273             case Client_Idle: stat->num_idle++; break;
1274             case Client_Failed: stat->num_failed++; break;
1275             case Client_Error: stat->num_error++; break;
1276             default: break;
1277         }
1278         count++;
1279     }
1280     stat->num_hits = se->total_hits;
1281     stat->num_records = se->total_records;
1282
1283     stat->num_clients = count;
1284 }
1285
1286 static CCL_bibset load_cclfile(const char *fn)
1287 {
1288     CCL_bibset res = ccl_qual_mk();
1289     if (ccl_qual_fname(res, fn) < 0)
1290     {
1291         yaz_log(YLOG_FATAL|YLOG_ERRNO, "%s", fn);
1292         exit(1);
1293     }
1294     return res;
1295 }
1296
1297 int main(int argc, char **argv)
1298 {
1299     int ret;
1300     char *arg;
1301     int setport = 0;
1302
1303     if (signal(SIGPIPE, SIG_IGN) < 0)
1304         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
1305
1306     yaz_log_init(YLOG_DEFAULT_LEVEL|YLOG_DEBUG, "pazpar2", 0);
1307
1308     while ((ret = options("c:h:p:C:s:", argv, argc, &arg)) != -2)
1309     {
1310         switch (ret) {
1311             case 'c':
1312                 command_init(atoi(arg));
1313                 setport++;
1314                 break;
1315             case 'h':
1316                 http_init(atoi(arg));
1317                 setport++;
1318                 break;
1319             case 'C':
1320                 global_parameters.ccl_filter = load_cclfile(arg);
1321                 break;
1322             case 'p':
1323                 http_set_proxyaddr(arg);
1324                 break;
1325             case 's':
1326                 load_simpletargets(arg);
1327                 break;
1328             default:
1329                 fprintf(stderr, "Usage: pazpar2\n"
1330                         "    -h httpport             (REST)\n"
1331                         "    -c cmdport              (telnet-style)\n"
1332                         "    -C cclconfig\n"
1333                         "    -s simpletargetfile\n"
1334                         "    -p hostname[:portno]    (HTTP proxy)\n");
1335                 exit(1);
1336         }
1337     }
1338
1339     if (!setport)
1340     {
1341         fprintf(stderr, "Set command port with -h or -c\n");
1342         exit(1);
1343     }
1344
1345     global_parameters.ccl_filter = load_cclfile("default.bib");
1346     global_parameters.yaz_marc = yaz_marc_create();
1347     yaz_marc_subfield_str(global_parameters.yaz_marc, "\t");
1348     global_parameters.odr_in = odr_createmem(ODR_DECODE);
1349     global_parameters.odr_out = odr_createmem(ODR_ENCODE);
1350
1351     event_loop(&channel_list);
1352
1353     return 0;
1354 }
1355
1356 /*
1357  * Local variables:
1358  * c-basic-offset: 4
1359  * indent-tabs-mode: nil
1360  * End:
1361  * vim: shiftwidth=4 tabstop=8 expandtab
1362  */