Added basic HTTP server logic
[pazpar2-moved-to-github.git] / pazpar2.c
1 /* $Id: pazpar2.c,v 1.3 2006-11-21 18:46:43 quinn Exp $ */
2
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <sys/time.h>
7 #include <unistd.h>
8 #include <sys/socket.h>
9 #include <signal.h>
10 #include <ctype.h>
11 #include <assert.h>
12
13 #include <yaz/comstack.h>
14 #include <yaz/tcpip.h>
15 #include <yaz/proto.h>
16 #include <yaz/readconf.h>
17 #include <yaz/pquery.h>
18 #include <yaz/yaz-util.h>
19
20 #include "pazpar2.h"
21 #include "eventl.h"
22 #include "command.h"
23 #include "http.h"
24
25 #define PAZPAR2_VERSION "0.1"
26 #define MAX_DATABASES 512
27 #define MAX_CHUNK 10
28
29 struct target
30 {
31     struct session *session;
32     char fullname[256];
33     char hostport[128];
34     char *ibuf;
35     int ibufsize;
36     char databases[MAX_DATABASES][128];
37     COMSTACK link;
38     ODR odr_in, odr_out;
39     struct target *next;
40     void *addr;
41     int hits;
42     int records;
43     int setno;
44     int requestid;                              // ID of current outstanding request
45     int diagnostic;
46     enum target_state
47     {
48         No_connection,
49         Connecting,
50         Connected,
51         Initializing,
52         Searching,
53         Presenting,
54         Error,
55         Idle,
56         Failed
57     } state;
58 };
59
60 static char *state_strings[] = {
61     "No_connection",
62     "Connecting",
63     "Connected",
64     "Initializing",
65     "Searching",
66     "Presenting",
67     "Error",
68     "Idle",
69     "Failed"
70 };
71
72
73 IOCHAN channel_list = 0;
74
75 static struct parameters {
76     int timeout;                /* operations timeout, in seconds */
77     char implementationId[128];
78     char implementationName[128];
79     char implementationVersion[128];
80     struct timeval base_time;
81     int toget;
82     int chunk;
83 } global_parameters = 
84 {
85     30,
86     "81",
87     "Index Data PazPar2 (MasterKey)",
88     PAZPAR2_VERSION,
89     {0,0},
90     100,
91     MAX_CHUNK
92 };
93
94
95 static int send_apdu(struct target *t, Z_APDU *a)
96 {
97     char *buf;
98     int len, r;
99
100     if (!z_APDU(t->odr_out, &a, 0, 0))
101     {
102         odr_perror(t->odr_out, "Encoding APDU");
103         abort();
104     }
105     buf = odr_getbuf(t->odr_out, &len, 0);
106     r = cs_put(t->link, buf, len);
107     if (r < 0)
108     {
109         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(t->link)));
110         return -1;
111     }
112     else if (r == 1)
113     {
114         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
115     }
116     odr_reset(t->odr_out); /* release the APDU structure  */
117     return 0;
118 }
119
120
121 static void send_init(IOCHAN i)
122 {
123     struct target *t = iochan_getdata(i);
124     Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_initRequest);
125
126     a->u.initRequest->implementationId = global_parameters.implementationId;
127     a->u.initRequest->implementationName = global_parameters.implementationName;
128     a->u.initRequest->implementationVersion =
129         global_parameters.implementationVersion;
130     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
131     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
132     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
133
134     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
135     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
136     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
137     if (send_apdu(t, a) >= 0)
138     {
139         iochan_setflags(i, EVENT_INPUT);
140         t->state = Initializing;
141     }
142     else
143     {
144         iochan_destroy(i);
145         t->state = Failed;
146         cs_close(t->link);
147     }
148 }
149
150 static void send_search(IOCHAN i)
151 {
152     struct target *t = iochan_getdata(i);
153     struct session *s = t->session;
154     Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_searchRequest);
155     int ndb;
156     char **databaselist;
157     Z_Query *zquery;
158
159     yaz_log(YLOG_DEBUG, "Sending search");
160     a->u.searchRequest->query = zquery = odr_malloc(t->odr_out, sizeof(Z_Query));
161     zquery->which = Z_Query_type_1;
162     zquery->u.type_1 = p_query_rpn(t->odr_out, PROTO_Z3950, s->query);
163
164     for (ndb = 0; *t->databases[ndb]; ndb++)
165         ;
166     databaselist = odr_malloc(t->odr_out, sizeof(char*) * ndb);
167     for (ndb = 0; *t->databases[ndb]; ndb++)
168         databaselist[ndb] = t->databases[ndb];
169
170     a->u.searchRequest->resultSetName = "Default";
171     a->u.searchRequest->databaseNames = databaselist;
172     a->u.searchRequest->num_databaseNames = ndb;
173
174     if (send_apdu(t, a) >= 0)
175     {
176         iochan_setflags(i, EVENT_INPUT);
177         t->state = Searching;
178         t->requestid = s->requestid;
179     }
180     else
181     {
182         iochan_destroy(i);
183         t->state = Failed;
184         cs_close(t->link);
185     }
186     odr_reset(t->odr_out);
187 }
188
189 static void send_present(IOCHAN i)
190 {
191     struct target *t = iochan_getdata(i);
192     Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_presentRequest);
193     int toget;
194     int start = t->records + 1;
195
196     toget = global_parameters.chunk;
197     if (toget > t->hits - t->records)
198         toget = t->hits - t->records;
199
200     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
201
202     a->u.presentRequest->resultSetStartPoint = &start;
203     a->u.presentRequest->numberOfRecordsRequested = &toget;
204
205     a->u.presentRequest->resultSetId = "Default";
206
207     if (send_apdu(t, a) >= 0)
208     {
209         iochan_setflags(i, EVENT_INPUT);
210         t->state = Presenting;
211     }
212     else
213     {
214         iochan_destroy(i);
215         t->state = Failed;
216         cs_close(t->link);
217     }
218     odr_reset(t->odr_out);
219 }
220
221 static void do_initResponse(IOCHAN i, Z_APDU *a)
222 {
223     struct target *t = iochan_getdata(i);
224     Z_InitResponse *r = a->u.initResponse;
225
226     yaz_log(YLOG_DEBUG, "Received init response");
227
228     if (*r->result)
229     {
230         t->state = Idle;
231     }
232     else
233     {
234         t->state = Failed;
235         iochan_destroy(i);
236         cs_close(t->link);
237     }
238 }
239
240 static void do_searchResponse(IOCHAN i, Z_APDU *a)
241 {
242     struct target *t = iochan_getdata(i);
243     Z_SearchResponse *r = a->u.searchResponse;
244
245     yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
246
247     if (*r->searchStatus)
248     {
249         t->hits = *r->resultCount;
250         t->state = Idle;
251     }
252     else
253     {          /*"FAILED"*/
254         t->hits = 0;
255         t->state = Failed;
256         if (r->records) {
257             Z_Records *recs = r->records;
258             if (recs->which == Z_Records_NSD)
259             {
260                 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
261                 t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
262                 t->state = Error;
263             }
264         }
265     }
266 }
267
268 const char *find_field(const char *rec, const char *field)
269 {
270     const char *line = rec;
271
272     while (*line)
273     {
274         if (!strncmp(line, field, 3) && line[3] == ' ')
275             return line;
276         while (*(line++) != '\n')
277             ;
278     }
279     return 0;
280 }
281
282 const char *find_subfield(const char *field, char subfield)
283 {
284     const char *p = field;
285
286     while (*p && *p != '\n')
287     {
288         while (*p != '\n' && *p != '\t')
289             p++;
290         if (*p == '\t' && *(++p) == subfield) {
291             if (*(++p) == ' ')
292                 return ++p;
293         }
294     }
295     return 0;
296 }
297
298 // Extract 245 $a $b 100 $a
299 char *extract_mergekey(struct session *s, const char *rec)
300 {
301     const char *field, *subfield;
302     char *e, *ef;
303     char *out, *p, *pout;
304
305     wrbuf_rewind(s->wrbuf);
306
307     if (!(field = find_field(rec, "245")))
308         return 0;
309     if (!(subfield = find_subfield(field, 'a')))
310         return 0;
311     ef = index(subfield, '\n');
312     if ((e = index(subfield, '\t')) && e < ef)
313         ef = e;
314     if (ef)
315     {
316         wrbuf_write(s->wrbuf, subfield, ef - subfield);
317         if ((subfield = find_subfield(field, 'b'))) 
318         {
319             ef = index(subfield, '\n');
320             if ((e = index(subfield, '\t')) && e < ef)
321                 ef = e;
322             if (ef)
323             {
324                 wrbuf_puts(s->wrbuf, " field "); 
325                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
326             }
327         }
328     }
329     if ((field = find_field(rec, "100")))
330     {
331         if ((subfield = find_subfield(field, 'a')))
332         {
333             ef = index(subfield, '\n');
334             if ((e = index(subfield, '\t')) && e < ef)
335                 ef = e;
336             if (ef)
337             {
338                 wrbuf_puts(s->wrbuf, " field "); 
339                 wrbuf_write(s->wrbuf, subfield, ef - subfield);
340             }
341         }
342     }
343     wrbuf_putc(s->wrbuf, '\0');
344     p = wrbuf_buf(s->wrbuf);
345     out = pout = nmem_malloc(s->nmem, strlen(p) + 1);
346
347     while (*p)
348     {
349         while (isalnum(*p))
350             *(pout++) = tolower(*(p++));
351         while (*p && !isalnum(*p))
352             p++;
353         *(pout++) = ' ';
354     }
355     if (out != pout)
356         *(--pout) = '\0';
357
358     return out;
359 }
360
361 static void push_record(struct session *s, struct record *r)
362 {
363     int p;
364     assert(s->recheap_max + 1 < s->recheap_size);
365
366     s->recheap[p = ++s->recheap_max] = r;
367     while (p > 0)
368     {
369         int parent = (p - 1) >> 1;
370         if (strcmp(s->recheap[p]->merge_key, s->recheap[parent]->merge_key) < 0)
371         {
372             struct record *tmp;
373             tmp = s->recheap[parent];
374             s->recheap[parent] = s->recheap[p];
375             s->recheap[p] = tmp;
376             p = parent;
377         }
378         else
379             break;
380     }
381 }
382
383 static struct record *top_record(struct session *s)
384 {
385     return s-> recheap_max >= 0 ?  s->recheap[0] : 0;
386 }
387
388 static struct record *pop_record(struct session *s)
389 {
390     struct record *res = s->recheap[0];
391     int p = 0;
392     int lastnonleaf = (s->recheap_max - 1) >> 1;
393
394     if (s->recheap_max < 0)
395         return 0;
396
397     s->recheap[p] = s->recheap[s->recheap_max--];
398
399     while (p <= lastnonleaf)
400     {
401         int right = (p + 1) << 1;
402         int left = right - 1;
403         int min = left;
404
405         if (right < s->recheap_max &&
406                 strcmp(s->recheap[right]->merge_key, s->recheap[left]->merge_key) < 0)
407             min = right;
408         if (strcmp(s->recheap[min]->merge_key, s->recheap[p]->merge_key) < 0)
409         {
410             struct record *tmp = s->recheap[min];
411             s->recheap[min] = s->recheap[p];
412             s->recheap[p] = tmp;
413             p = min;
414         }
415         else
416             break;
417     }
418     return res;
419 }
420
421 // Like pop_record but collapses identical (merge_key) records
422 // The heap will contain multiple independent matching records and possibly
423 // one cluster, created the last time the list was scanned
424 static struct record *pop_mrecord(struct session *s)
425 {
426     struct record *this;
427     struct record *next;
428
429     if (!(this = pop_record(s)))
430         return 0;
431
432     // Collapse identical records
433     while ((next = top_record(s)))
434     {
435         struct record *p, *tmpnext;
436         if (strcmp(this->merge_key, next->merge_key))
437             break;
438         // Absorb record (and clustersiblings) into a supercluster
439         for (p = next; p; p = tmpnext) {
440             tmpnext = p->next_cluster;
441             p->next_cluster = this->next_cluster;
442             this->next_cluster = p;
443         }
444
445         pop_record(s);
446     }
447     return this;
448 }
449
450 // Reads records in sort order. Store records in top of heapspace until rewind is called.
451 static struct record *read_recheap(struct session *s)
452 {
453     struct record *r = pop_mrecord(s);
454
455     if (r)
456     {
457         if (s->recheap_scratch < 0)
458             s->recheap_scratch = s->recheap_size;
459         s->recheap[--s->recheap_scratch] = r;
460     }
461
462     return r;
463 }
464
465 // Return records to heap after read
466 static void rewind_recheap(struct session *s)
467 {
468     while (s->recheap_scratch >= 0) {
469         push_record(s, s->recheap[s->recheap_scratch++]);
470         if (s->recheap_scratch >= s->recheap_size)
471             s->recheap_scratch = -1;
472     }
473 }
474
475 struct record *ingest_record(struct target *t, char *buf, int len)
476 {
477     struct session *s = t->session;
478     struct record *res;
479     const char *recbuf;
480
481     wrbuf_rewind(s->wrbuf);
482     yaz_marc_xml(s->yaz_marc, YAZ_MARC_LINE);
483     if (yaz_marc_decode_wrbuf(s->yaz_marc, buf, len, s->wrbuf) < 0)
484     {
485         yaz_log(YLOG_WARN, "Failed to decode MARC record");
486         return 0;
487     }
488     wrbuf_putc(s->wrbuf, '\0');
489     recbuf = wrbuf_buf(s->wrbuf);
490
491     res = nmem_malloc(s->nmem, sizeof(struct record));
492
493     res->merge_key = extract_mergekey(s, recbuf);
494     if (!res->merge_key)
495         return 0;
496     res->buf = nmem_strdupn(s->nmem, recbuf, wrbuf_len(s->wrbuf));
497     res->target = t;
498     res->next_cluster = 0;
499     res->target_offset = -1;
500
501     yaz_log(YLOG_DEBUG, "Key: %s", res->merge_key);
502
503     push_record(s, res);
504
505     return res;
506 }
507
508 void ingest_records(struct target *t, Z_Records *r)
509 {
510     //struct session *s = t->session;
511     struct record *rec;
512     Z_NamePlusRecordList *rlist;
513     int i;
514
515     if (r->which != Z_Records_DBOSD)
516         return;
517     rlist = r->u.databaseOrSurDiagnostics;
518     for (i = 0; i < rlist->num_records; i++)
519     {
520         Z_NamePlusRecord *npr = rlist->records[i];
521         Z_External *e;
522         char *buf;
523         int len;
524
525         if (npr->which != Z_NamePlusRecord_databaseRecord)
526         {
527             yaz_log(YLOG_WARN, "Unexpected record type, probably diagnostic");
528             continue;
529         }
530         e = npr->u.databaseRecord;
531         if (e->which != Z_External_octet)
532         {
533             yaz_log(YLOG_WARN, "Unexpected external branch, probably BER");
534             continue;
535         }
536         buf = (char*) e->u.octet_aligned->buf;
537         len = e->u.octet_aligned->len;
538
539         rec = ingest_record(t, buf, len);
540         if (!rec)
541             continue;
542         yaz_log(YLOG_DEBUG, "Ingested a fooking record");
543     }
544 }
545
546 static void do_presentResponse(IOCHAN i, Z_APDU *a)
547 {
548     struct target *t = iochan_getdata(i);
549     Z_PresentResponse *r = a->u.presentResponse;
550
551     if (r->records) {
552         Z_Records *recs = r->records;
553         if (recs->which == Z_Records_NSD)
554         {
555             yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
556             t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
557             t->state = Error;
558         }
559         else
560         {
561             yaz_log(YLOG_DEBUG, "Got Records!");
562         }
563     }
564
565     if (!*r->presentStatus && t->state != Error)
566     {
567         yaz_log(YLOG_DEBUG, "Good Present response");
568         t->records += *r->numberOfRecordsReturned;
569         ingest_records(t, r->records);
570         t->state = Idle;
571     }
572     else if (*r->presentStatus) 
573     {
574         yaz_log(YLOG_WARN, "Bad Present response");
575         t->state = Error;
576     }
577 }
578
579 static void handler(IOCHAN i, int event)
580 {
581     struct target *t = iochan_getdata(i);
582     struct session *s = t->session;
583     //static int waiting = 0;
584
585     if (t->state == No_connection) /* Start connection */
586     {
587         int res = cs_connect(t->link, t->addr);
588
589         t->state = Connecting;
590         if (!res) /* we are go */
591             iochan_setevent(i, EVENT_OUTPUT);
592         else if (res == 1)
593             iochan_setflags(i, EVENT_OUTPUT);
594         else
595         {
596             yaz_log(YLOG_WARN|YLOG_ERRNO, "ERROR %s connect\n", t->hostport);
597             cs_close(t->link);
598             t->state = Failed;
599             iochan_destroy(i);
600         }
601     }
602
603     else if (t->state == Connecting && event & EVENT_OUTPUT)
604     {
605         int errcode;
606         socklen_t errlen = sizeof(errcode);
607
608         if (getsockopt(cs_fileno(t->link), SOL_SOCKET, SO_ERROR, &errcode,
609             &errlen) < 0 || errcode != 0)
610         {
611             cs_close(t->link);
612             iochan_destroy(i);
613             t->state = Failed;
614             return;
615         }
616         else
617         {
618             yaz_log(YLOG_DEBUG, "Connect OK");
619             t->state = Connected;
620         }
621     }
622
623     else if (event & EVENT_INPUT)
624     {
625         int len = cs_get(t->link, &t->ibuf, &t->ibufsize);
626
627         if (len < 0)
628         {
629             cs_close(t->link);
630             iochan_destroy(i);
631             t->state = Failed;
632             return;
633         }
634         if (len == 0)
635         {
636             cs_close(t->link);
637             iochan_destroy(i);
638             t->state = Failed;
639             return;
640         }
641         else if (len > 1)
642         {
643             if (t->requestid == s->requestid || t->state == Initializing) 
644             {
645                 Z_APDU *a;
646
647                 odr_reset(t->odr_in);
648                 odr_setbuf(t->odr_in, t->ibuf, len, 0);
649                 if (!z_APDU(t->odr_in, &a, 0, 0))
650                 {
651                     cs_close(t->link);
652                     iochan_destroy(i);
653                     t->state = Failed;
654                     return;
655                 }
656                 yaz_log(YLOG_DEBUG, "Successfully decoded %d oct PDU", len);
657                 switch (a->which)
658                 {
659                     case Z_APDU_initResponse:
660                         do_initResponse(i, a);
661                         break;
662                     case Z_APDU_searchResponse:
663                         do_searchResponse(i, a);
664                         break;
665                     case Z_APDU_presentResponse:
666                         do_presentResponse(i, a);
667                         break;
668                     default:
669                         yaz_log(YLOG_WARN, "Unexpected result from server");
670                         cs_close(t->link);
671                         iochan_destroy(i);
672                         t->state = Failed;
673                         return;
674                 }
675                 // if (cs_more(t->link))
676                 //    iochan_setevent(i, EVENT_INPUT);
677             }
678             else  // we throw away response and go to idle mode
679                 t->state = Idle;
680         }
681         /* if len==1 we do nothing but wait for more input */
682     }
683
684     else if (t->state == Connected) {
685         send_init(i);
686     }
687
688     if (t->state == Idle)
689     {
690         if (t->requestid != s->requestid) {
691             send_search(i);
692         }
693         else if (t->hits > 0 && t->records < global_parameters.toget &&
694             t->records < t->hits) {
695             send_present(i);
696         }
697     }
698 }
699
700 int load_targets(struct session *s, const char *fn)
701 {
702     FILE *f = fopen(fn, "r");
703     char line[256];
704     struct target **target_p;
705
706     if (!f)
707     {
708         yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
709         return -1;
710     }
711
712     target_p = &s->targets;
713     while (fgets(line, 255, f))
714     {
715         char *url, *p;
716         struct target *target;
717         IOCHAN new;
718
719         if (strncmp(line, "target ", 7))
720             continue;
721         url = line + 7;
722         url[strlen(url) - 1] = '\0';
723         yaz_log(LOG_DEBUG, "Target: %s", url);
724
725         *target_p = target = xmalloc(sizeof(**target_p));
726         target->next = 0;
727         target_p = &target->next;
728         target->state = No_connection;
729         target->ibuf = 0;
730         target->ibufsize = 0;
731         target->odr_in = odr_createmem(ODR_DECODE);
732         target->odr_out = odr_createmem(ODR_ENCODE);
733         target->hits = -1;
734         target->setno = 0;
735         target->session = s;
736         target->requestid = -1;
737         target->records = 0;
738         target->diagnostic = 0;
739         strcpy(target->fullname, url);
740         if ((p = strchr(url, '/')))
741         {                   
742             *p = '\0';
743             strcpy(target->hostport, url);
744             *p = '/';
745             p++;
746             strcpy(target->databases[0], p);
747             target->databases[1][0] = '\0';
748         }
749         else
750         {
751             strcpy(target->hostport, url);
752             strcpy(target->databases[0], "Default");
753             target->databases[1][0] = '\0';
754         }
755
756         if (!(target->link = cs_create(tcpip_type, 0, PROTO_Z3950)))
757         {
758             yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
759             exit(1);
760         }
761         if (!(target->addr = cs_straddr(target->link, target->hostport)))
762         {
763             printf("ERROR %s bad-address", target->hostport);
764             target->state = Failed;
765             continue;
766         }
767         new = iochan_create(cs_fileno(target->link), handler, 0);
768         iochan_setdata(new, target);
769         iochan_setevent(new, EVENT_EXCEPT);
770         new->next = channel_list;
771         channel_list = new;
772     }
773     fclose(f);
774
775     return 0;
776 }
777
778 void search(struct session *s, char *query)
779 {
780     IOCHAN c;
781     int live_channels = 0;
782
783     yaz_log(YLOG_DEBUG, "Search");
784
785     // Determine what iochans belong to this session
786     // It might have been better to have a list of them
787
788     strcpy(s->query, query);
789     s->requestid++;
790     nmem_reset(s->nmem);
791     for (c = channel_list; c; c = c->next)
792     {
793         struct target *t;
794
795         if (iochan_getfun(c) != handler) // Not a Z target
796             continue;
797         t = iochan_getdata(c);
798         if (t->session == s)
799         {
800             t->hits = -1;
801             t->records = 0;
802             t->diagnostic = 0;
803
804             if (t->state == Error)
805                 t->state = Idle;
806
807             if (t->state == Idle) 
808                 iochan_setflag(c, EVENT_OUTPUT);
809
810             live_channels++;
811         }
812     }
813     if (live_channels)
814     {
815         int maxrecs = live_channels * global_parameters.toget;
816         if (!s->recheap_size)
817         {
818             s->recheap = xmalloc(maxrecs * sizeof(struct record *));
819             s->recheap_size = maxrecs;
820         }
821         else if (s->recheap_size < maxrecs)
822         {
823             s->recheap = xrealloc(s->recheap, maxrecs * sizeof(struct record*));
824             s->recheap_size = maxrecs;
825         }
826     }
827     s->recheap_max = -1;
828     s->recheap_scratch = -1;
829 }
830
831 struct session *new_session() 
832 {
833     struct session *session = xmalloc(sizeof(*session));
834
835     yaz_log(YLOG_DEBUG, "New pazpar2 session");
836     
837     session->requestid = -1;
838     session->targets = 0;
839     session->pqf_parser = yaz_pqf_create();
840     session->query[0] = '\0';
841     session->nmem = nmem_create();
842     session->yaz_marc = yaz_marc_create();
843     yaz_marc_subfield_str(session->yaz_marc, "\t");
844     session->wrbuf = wrbuf_alloc();
845     session->recheap = 0;
846     session->recheap_size = 0;
847
848     return session;
849 }
850
851 void session_destroy(struct session *s)
852 {
853     // FIXME do some shit here!!!!
854 }
855
856 struct hitsbytarget *hitsbytarget(struct session *s, int *count)
857 {
858     static struct hitsbytarget res[1000]; // FIXME MM
859     IOCHAN c;
860
861     *count = 0;
862     for (c = channel_list; c; c = c->next)
863         if (iochan_getfun(c) == handler)
864         {
865             struct target *t = iochan_getdata(c);
866             if (t->session == s)
867             {
868                 strcpy(res[*count].id, t->hostport);
869                 res[*count].hits = t->hits;
870                 res[*count].records = t->records;
871                 res[*count].diagnostic = t->diagnostic;
872                 res[*count].state = state_strings[(int) t->state];
873                 (*count)++;
874             }
875         }
876
877     return res;
878 }
879
880 struct record **show(struct session *s, int start, int *num)
881 {
882     struct record **recs = nmem_malloc(s->nmem, *num * sizeof(struct record *));
883     int i;
884
885     // FIXME -- skip initial records
886
887     for (i = 0; i < *num; i++)
888     {
889         recs[i] = read_recheap(s);
890         if (!recs[i])
891         {
892             *num = i;
893             break;
894         }
895     }
896     rewind_recheap(s);
897     return recs;
898 }
899
900 void statistics(struct session *s, struct statistics *stat)
901 {
902     IOCHAN c;
903     int i;
904
905     bzero(stat, sizeof(*stat));
906     for (i = 0, c = channel_list; c; i++, c = c->next)
907     {
908         struct target *t;
909         if (iochan_getfun(c) != handler)
910             continue;
911         t = iochan_getdata(c);
912         switch (t->state)
913         {
914             case No_connection: stat->num_no_connection++; break;
915             case Connecting: stat->num_connecting++; break;
916             case Initializing: stat->num_initializing++; break;
917             case Searching: stat->num_searching++; break;
918             case Presenting: stat->num_presenting++; break;
919             case Idle: stat->num_idle++; break;
920             case Failed: stat->num_failed++; break;
921             case Error: stat->num_error++; break;
922             default: break;
923         }
924     }
925
926     stat->num_connections = i;
927 }
928
929 int main(int argc, char **argv)
930 {
931     int ret;
932     char *arg;
933
934     if (signal(SIGPIPE, SIG_IGN) < 0)
935         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
936
937     yaz_log_init(YLOG_DEFAULT_LEVEL|YLOG_DEBUG, "pazpar2", 0);
938
939     while ((ret = options("c:h:", argv, argc, &arg)) != -2)
940     {
941         switch (ret) {
942             case 0:
943                 break;
944             case 'c':
945                 command_init(atoi(arg));
946                 break;
947             case 'h':
948                 http_init(atoi(arg));
949                 break;
950             default:
951                 fprintf(stderr, "Usage: pazpar2 -d comport");
952                 exit(1);
953         }
954             
955     }
956
957     event_loop(&channel_list);
958
959     return 0;
960 }
961
962 /*
963  * Local variables:
964  * c-basic-offset: 4
965  * indent-tabs-mode: nil
966  * End:
967  * vim: shiftwidth=4 tabstop=8 expandtab
968  */