Initial revision
[pazpar2-moved-to-github.git] / pazpar2.c
1 /* $Id: pazpar2.c,v 1.1 2006-11-14 20:44:38 quinn Exp $ */
2
3
4 #define PAZPAR2_VERSION "0.1"
5 #define MAX_DATABASES 512
6
7 #include <stdlib.h>
8 #include <stdio.h>
9 #include <string.h>
10 #include <sys/time.h>
11 #include <unistd.h>
12 #include <sys/socket.h>
13 #include <signal.h>
14
15 #include <yaz/comstack.h>
16 #include <yaz/tcpip.h>
17 #include <yaz/proto.h>
18 #include <yaz/readconf.h>
19 #include <yaz/pquery.h>
20 #include <yaz/yaz-util.h>
21
22 #include "pazpar2.h"
23 #include "eventl.h"
24 #include "command.h"
25
26 char *myname;
27 static long int runId=-1;
28
29 struct target
30 {
31     struct session *session;
32     char fullname[256];
33     char hostport[128];
34     char *ibuf;
35     int ibufsize;
36     char databases[MAX_DATABASES][128];
37     COMSTACK link;
38     ODR odr_in, odr_out;
39     struct target *next;
40     void *addr;
41     int hits;
42     int records;
43     int setno;
44     int requestid;                              // ID of current outstanding request
45     int diagnostic;
46     enum target_state
47     {
48         No_connection,
49         Connecting,
50         Connected,
51         Initializing,
52         Searching,
53         Presenting,
54         Error,
55         Idle,
56         Failed
57     } state;
58 };
59
60 static char *state_strings[] = {
61     "No_connection",
62     "Connecting",
63     "Connected",
64     "Initializing",
65     "Searching",
66     "Presenting",
67     "Error",
68     "Idle",
69     "Failed"
70 };
71
72
73 IOCHAN channel_list = 0;
74
75 static struct parameters {
76     int timeout;                /* operations timeout, in seconds */
77     char implementationId[128];
78     char implementationName[128];
79     char implementationVersion[128];
80     struct timeval base_time;
81     int toget;
82     int chunk;
83 } global_parameters = 
84 {
85     30,
86     "81",
87     "Index Data PazPar2 (MasterKey)",
88     PAZPAR2_VERSION,
89     {0,0},
90     100,
91     10
92 };
93
94
95 static int send_apdu(struct target *t, Z_APDU *a)
96 {
97     char *buf;
98     int len, r;
99
100     if (!z_APDU(t->odr_out, &a, 0, 0))
101     {
102         odr_perror(t->odr_out, "Encoding APDU");
103         abort();
104     }
105     buf = odr_getbuf(t->odr_out, &len, 0);
106     r = cs_put(t->link, buf, len);
107     if (r < 0)
108     {
109         yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(t->link)));
110         return -1;
111     }
112     else if (r == 1)
113     {
114         fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
115     }
116     odr_reset(t->odr_out); /* release the APDU structure  */
117     return 0;
118 }
119
120
121 static void send_init(IOCHAN i)
122 {
123     struct target *t = iochan_getdata(i);
124     Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_initRequest);
125
126     a->u.initRequest->implementationId = global_parameters.implementationId;
127     a->u.initRequest->implementationName = global_parameters.implementationName;
128     a->u.initRequest->implementationVersion =
129         global_parameters.implementationVersion;
130     ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
131     ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
132     ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
133
134     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
135     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
136     ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
137     if (send_apdu(t, a) >= 0)
138     {
139         iochan_setflags(i, EVENT_INPUT);
140         t->state = Initializing;
141     }
142     else
143     {
144         iochan_destroy(i);
145         t->state = Failed;
146         cs_close(t->link);
147     }
148 }
149
150 static void send_search(IOCHAN i)
151 {
152     struct target *t = iochan_getdata(i);
153     struct session *s = t->session;
154     Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_searchRequest);
155     int ndb;
156     char **databaselist;
157     Z_Query *zquery;
158
159     yaz_log(YLOG_DEBUG, "Sending search");
160     a->u.searchRequest->query = zquery = odr_malloc(t->odr_out, sizeof(Z_Query));
161     zquery->which = Z_Query_type_1;
162     zquery->u.type_1 = p_query_rpn(t->odr_out, PROTO_Z3950, s->query);
163
164     for (ndb = 0; *t->databases[ndb]; ndb++)
165         ;
166     databaselist = odr_malloc(t->odr_out, sizeof(char*) * ndb);
167     for (ndb = 0; *t->databases[ndb]; ndb++)
168         databaselist[ndb] = t->databases[ndb];
169
170     a->u.searchRequest->resultSetName = "Default";
171     a->u.searchRequest->databaseNames = databaselist;
172     a->u.searchRequest->num_databaseNames = ndb;
173
174     if (send_apdu(t, a) >= 0)
175     {
176         iochan_setflags(i, EVENT_INPUT);
177         t->state = Searching;
178         t->requestid = s->requestid;
179     }
180     else
181     {
182         iochan_destroy(i);
183         t->state = Failed;
184         cs_close(t->link);
185     }
186     odr_reset(t->odr_out);
187 }
188
189 static void send_present(IOCHAN i)
190 {
191     struct target *t = iochan_getdata(i);
192     Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_presentRequest);
193     int toget;
194
195     toget = global_parameters.chunk;
196     if (toget > t->hits - t->records)
197         toget = t->hits - t->records;
198
199     yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
200
201     a->u.presentRequest->numberOfRecordsRequested = &toget;
202
203     a->u.presentRequest->resultSetId = "Default";
204
205     if (send_apdu(t, a) >= 0)
206     {
207         iochan_setflags(i, EVENT_INPUT);
208         t->state = Presenting;
209     }
210     else
211     {
212         iochan_destroy(i);
213         t->state = Failed;
214         cs_close(t->link);
215     }
216     odr_reset(t->odr_out);
217 }
218
219 static void do_initResponse(IOCHAN i, Z_APDU *a)
220 {
221     struct target *t = iochan_getdata(i);
222     Z_InitResponse *r = a->u.initResponse;
223
224     yaz_log(YLOG_DEBUG, "Received init response");
225
226     if (*r->result)
227     {
228         t->state = Idle;
229     }
230     else
231     {
232         t->state = Failed;
233         iochan_destroy(i);
234         cs_close(t->link);
235     }
236 }
237
238 #if 0
239 static char *search_geterror(Z_SearchRequest *r)
240 {
241 #endif
242     
243
244 static void do_searchResponse(IOCHAN i, Z_APDU *a)
245 {
246     struct target *t = iochan_getdata(i);
247     Z_SearchResponse *r = a->u.searchResponse;
248
249     yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
250
251     if (*r->searchStatus)
252     {
253         t->hits = *r->resultCount;
254         t->state = Idle;
255     }
256     else
257     {          /*"FAILED"*/
258         t->hits = 0;
259         t->state = Failed;
260         if (r->records) {
261             Z_Records *recs = r->records;
262             if (recs->which == Z_Records_NSD)
263             {
264                 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
265                 t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
266                 t->state = Error;
267             }
268         }
269     }
270 }
271
272 // FIXME Catch present errors!!!!!!!
273 static void do_presentResponse(IOCHAN i, Z_APDU *a)
274 {
275     struct target *t = iochan_getdata(i);
276     Z_PresentResponse *r = a->u.presentResponse;
277
278     if (r->records) {
279         Z_Records *recs = r->records;
280         if (recs->which == Z_Records_NSD)
281         {
282             yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
283             t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
284             t->state = Error;
285         }
286         else
287         {
288             yaz_log(YLOG_DEBUG, "Got Records!");
289         }
290     }
291
292     if (!*r->presentStatus && t->state != Error)
293     {
294         yaz_log(YLOG_DEBUG, "Good Present response");
295         t->records += *r->numberOfRecordsReturned;
296         t->state = Idle;
297     }
298     else if (*r->presentStatus) 
299     {
300         yaz_log(YLOG_WARN, "Bad Present response");
301         t->state = Error;
302     }
303 }
304
305 static void handler(IOCHAN i, int event)
306 {
307     struct target *t = iochan_getdata(i);
308     struct session *s = t->session;
309     //static int waiting = 0;
310
311     if (t->state == No_connection) /* Start connection */
312     {
313         int res = cs_connect(t->link, t->addr);
314
315         t->state = Connecting;
316         if (!res) /* we are go */
317             iochan_setevent(i, EVENT_OUTPUT);
318         else if (res == 1)
319             iochan_setflags(i, EVENT_OUTPUT);
320         else
321         {
322             yaz_log(YLOG_WARN|YLOG_ERRNO, "ERROR %s connect\n", t->hostport);
323             cs_close(t->link);
324             t->state = Failed;
325             iochan_destroy(i);
326         }
327     }
328
329     else if (t->state == Connecting && event & EVENT_OUTPUT)
330     {
331         int errcode;
332         socklen_t errlen = sizeof(errcode);
333
334         if (getsockopt(cs_fileno(t->link), SOL_SOCKET, SO_ERROR, &errcode,
335             &errlen) < 0 || errcode != 0)
336         {
337             cs_close(t->link);
338             iochan_destroy(i);
339             t->state = Failed;
340             return;
341         }
342         else
343         {
344             yaz_log(YLOG_DEBUG, "Connect OK");
345             t->state = Connected;
346         }
347     }
348
349     else if (event & EVENT_INPUT)
350     {
351         int len = cs_get(t->link, &t->ibuf, &t->ibufsize);
352
353         if (len < 0)
354         {
355             cs_close(t->link);
356             iochan_destroy(i);
357             t->state = Failed;
358             return;
359         }
360         if (len == 0)
361         {
362             cs_close(t->link);
363             iochan_destroy(i);
364             t->state = Failed;
365             return;
366         }
367         else if (len > 1)
368         {
369             if (t->requestid == s->requestid || t->state == Initializing) 
370             {
371                 Z_APDU *a;
372
373                 odr_reset(t->odr_in);
374                 odr_setbuf(t->odr_in, t->ibuf, len, 0);
375                 if (!z_APDU(t->odr_in, &a, 0, 0))
376                 {
377                     cs_close(t->link);
378                     iochan_destroy(i);
379                     t->state = Failed;
380                     return;
381                 }
382                 switch (a->which)
383                 {
384                     case Z_APDU_initResponse:
385                         do_initResponse(i, a);
386                         break;
387                     case Z_APDU_searchResponse:
388                         do_searchResponse(i, a);
389                         break;
390                     case Z_APDU_presentResponse:
391                         do_presentResponse(i, a);
392                         break;
393                     default:
394                         yaz_log(YLOG_WARN, "Unexpected result from server");
395                         cs_close(t->link);
396                         iochan_destroy(i);
397                         t->state = Failed;
398                         return;
399                 }
400                 // if (cs_more(t->link))
401                 //    iochan_setevent(i, EVENT_INPUT);
402             }
403             else  // we throw away response and go to idle mode
404                 t->state = Idle;
405         }
406         /* if len==1 we do nothing but wait for more input */
407     }
408
409     else if (t->state == Connected) {
410         send_init(i);
411     }
412
413     if (t->state == Idle)
414     {
415         if (t->requestid != s->requestid) {
416             send_search(i);
417         }
418         else if (t->hits > 0 && t->records < global_parameters.toget &&
419             t->records < t->hits) {
420             send_present(i);
421         }
422     }
423 }
424
425 int load_targets(struct session *s, const char *fn)
426 {
427     FILE *f = fopen(fn, "r");
428     char line[256];
429     struct target **target_p;
430
431     if (!f)
432     {
433         yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
434         return -1;
435     }
436
437     target_p = &s->targets;
438     while (fgets(line, 255, f))
439     {
440         char *url, *p;
441         struct target *target;
442         IOCHAN new;
443
444         if (strncmp(line, "target ", 7))
445             continue;
446         url = line + 7;
447         url[strlen(url) - 1] = '\0';
448         yaz_log(LOG_DEBUG, "Target: %s", url);
449
450         *target_p = target = xmalloc(sizeof(**target_p));
451         target->next = 0;
452         target_p = &target->next;
453         target->state = No_connection;
454         target->ibuf = 0;
455         target->ibufsize = 0;
456         target->odr_in = odr_createmem(ODR_DECODE);
457         target->odr_out = odr_createmem(ODR_ENCODE);
458         target->hits = -1;
459         target->setno = 0;
460         target->session = s;
461         target->requestid = -1;
462         target->records = 0;
463         target->diagnostic = 0;
464         strcpy(target->fullname, url);
465         if ((p = strchr(url, '/')))
466         {                   
467             *p = '\0';
468             strcpy(target->hostport, url);
469             *p = '/';
470             p++;
471             strcpy(target->databases[0], p);
472             target->databases[1][0] = '\0';
473         }
474         else
475         {
476             strcpy(target->hostport, url);
477             strcpy(target->databases[0], "Default");
478             target->databases[1][0] = '\0';
479         }
480
481         if (!(target->link = cs_create(tcpip_type, 0, PROTO_Z3950)))
482         {
483             yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
484             exit(1);
485         }
486         if (!(target->addr = cs_straddr(target->link, target->hostport)))
487         {
488             printf("ERROR %s bad-address", target->hostport);
489             target->state = Failed;
490             continue;
491         }
492         new = iochan_create(cs_fileno(target->link), handler, 0);
493         iochan_setdata(new, target);
494         iochan_setevent(new, EVENT_EXCEPT);
495         new->next = channel_list;
496         channel_list = new;
497     }
498     fclose(f);
499
500     return 0;
501 }
502
503 void search(struct session *s, char *query)
504 {
505     IOCHAN c;
506
507     yaz_log(YLOG_DEBUG, "Search");
508
509     // Determine what iochans belong to this session
510     // It might have been better to have a list of them
511
512     strcpy(s->query, query);
513     s->requestid++;
514     nmem_reset(s->nmem);
515     for (c = channel_list; c; c = c->next)
516     {
517         struct target *t;
518
519         if (iochan_getfun(c) != handler) // Not a Z target
520             continue;
521         t = iochan_getdata(c);
522         if (t->session == s)
523         {
524             t->hits = -1;
525             t->records = 0;
526             t->diagnostic = 0;
527
528             if (t->state == Error)
529                 t->state = Idle;
530
531             if (t->state == Idle) 
532                 iochan_setflag(c, EVENT_OUTPUT);
533         }
534     }
535 }
536
537 struct session *new_session() 
538 {
539     struct session *session = xmalloc(sizeof(*session));
540
541     yaz_log(YLOG_DEBUG, "New pazpar2 session");
542     
543     session->requestid = -1;
544     session->targets = 0;
545     session->pqf_parser = yaz_pqf_create();
546     session->query[0] = '\0';
547     session->nmem = nmem_create();
548
549     return session;
550 }
551
552 struct hitsbytarget *hitsbytarget(struct session *s, int *count)
553 {
554     static struct hitsbytarget res[1000]; // FIXME MM
555     IOCHAN c;
556
557     *count = 0;
558     for (c = channel_list; c; c = c->next)
559         if (iochan_getfun(c) == handler)
560         {
561             struct target *t = iochan_getdata(c);
562             if (t->session == s)
563             {
564                 strcpy(res[*count].id, t->hostport);
565                 res[*count].hits = t->hits;
566                 res[*count].records = t->records;
567                 res[*count].diagnostic = t->diagnostic;
568                 res[*count].state = state_strings[(int) t->state];
569                 (*count)++;
570             }
571         }
572
573     return res;
574 }
575
576
577 void statistics(struct session *s, struct statistics *stat)
578 {
579     IOCHAN c;
580     int i;
581
582     bzero(stat, sizeof(*stat));
583     for (i = 0, c = channel_list; c; i++, c = c->next)
584     {
585         struct target *t;
586         if (iochan_getfun(c) != handler)
587             continue;
588         t = iochan_getdata(c);
589         switch (t->state)
590         {
591             case No_connection: stat->num_no_connection++; break;
592             case Connecting: stat->num_connecting++; break;
593             case Initializing: stat->num_initializing++; break;
594             case Searching: stat->num_searching++; break;
595             case Presenting: stat->num_presenting++; break;
596             case Idle: stat->num_idle++; break;
597             case Failed: stat->num_failed++; break;
598             case Error: stat->num_error++; break;
599             default: break;
600         }
601     }
602
603     stat->num_connections = i;
604 }
605
606 int main(int argc, char **argv)
607 {
608     int ret;
609     char *arg;
610
611     if (signal(SIGPIPE, SIG_IGN) < 0)
612         yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
613
614     myname = argv[0];
615     yaz_log_init(YLOG_DEFAULT_LEVEL|YLOG_DEBUG, "pazpar2", 0);
616
617     while ((ret = options("c:", argv, argc, &arg)) != -2)
618     {
619         switch (ret) {
620             case 0:
621                 break;
622             case 'c':
623                 command_init(atoi(arg));
624                 break;
625             default:
626                 fprintf(stderr, "Usage: pazpar2 -d comport");
627                 exit(1);
628         }
629             
630     }
631
632     event_loop(&channel_list);
633
634     return 0;
635 }
636
637 /*
638  * Local variables:
639  * c-basic-offset: 4
640  * indent-tabs-mode: nil
641  * End:
642  * vim: shiftwidth=4 tabstop=8 expandtab
643  */