1 /* $Id: pazpar2.c,v 1.1 2006-11-14 20:44:38 quinn Exp $ */
4 #define PAZPAR2_VERSION "0.1"
5 #define MAX_DATABASES 512
12 #include <sys/socket.h>
15 #include <yaz/comstack.h>
16 #include <yaz/tcpip.h>
17 #include <yaz/proto.h>
18 #include <yaz/readconf.h>
19 #include <yaz/pquery.h>
20 #include <yaz/yaz-util.h>
27 static long int runId=-1;
31 struct session *session;
36 char databases[MAX_DATABASES][128];
44 int requestid; // ID of current outstanding request
60 static char *state_strings[] = {
73 IOCHAN channel_list = 0;
75 static struct parameters {
76 int timeout; /* operations timeout, in seconds */
77 char implementationId[128];
78 char implementationName[128];
79 char implementationVersion[128];
80 struct timeval base_time;
87 "Index Data PazPar2 (MasterKey)",
95 static int send_apdu(struct target *t, Z_APDU *a)
100 if (!z_APDU(t->odr_out, &a, 0, 0))
102 odr_perror(t->odr_out, "Encoding APDU");
105 buf = odr_getbuf(t->odr_out, &len, 0);
106 r = cs_put(t->link, buf, len);
109 yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(t->link)));
114 fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n");
116 odr_reset(t->odr_out); /* release the APDU structure */
121 static void send_init(IOCHAN i)
123 struct target *t = iochan_getdata(i);
124 Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_initRequest);
126 a->u.initRequest->implementationId = global_parameters.implementationId;
127 a->u.initRequest->implementationName = global_parameters.implementationName;
128 a->u.initRequest->implementationVersion =
129 global_parameters.implementationVersion;
130 ODR_MASK_SET(a->u.initRequest->options, Z_Options_search);
131 ODR_MASK_SET(a->u.initRequest->options, Z_Options_present);
132 ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets);
134 ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1);
135 ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2);
136 ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3);
137 if (send_apdu(t, a) >= 0)
139 iochan_setflags(i, EVENT_INPUT);
140 t->state = Initializing;
150 static void send_search(IOCHAN i)
152 struct target *t = iochan_getdata(i);
153 struct session *s = t->session;
154 Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_searchRequest);
159 yaz_log(YLOG_DEBUG, "Sending search");
160 a->u.searchRequest->query = zquery = odr_malloc(t->odr_out, sizeof(Z_Query));
161 zquery->which = Z_Query_type_1;
162 zquery->u.type_1 = p_query_rpn(t->odr_out, PROTO_Z3950, s->query);
164 for (ndb = 0; *t->databases[ndb]; ndb++)
166 databaselist = odr_malloc(t->odr_out, sizeof(char*) * ndb);
167 for (ndb = 0; *t->databases[ndb]; ndb++)
168 databaselist[ndb] = t->databases[ndb];
170 a->u.searchRequest->resultSetName = "Default";
171 a->u.searchRequest->databaseNames = databaselist;
172 a->u.searchRequest->num_databaseNames = ndb;
174 if (send_apdu(t, a) >= 0)
176 iochan_setflags(i, EVENT_INPUT);
177 t->state = Searching;
178 t->requestid = s->requestid;
186 odr_reset(t->odr_out);
189 static void send_present(IOCHAN i)
191 struct target *t = iochan_getdata(i);
192 Z_APDU *a = zget_APDU(t->odr_out, Z_APDU_presentRequest);
195 toget = global_parameters.chunk;
196 if (toget > t->hits - t->records)
197 toget = t->hits - t->records;
199 yaz_log(YLOG_DEBUG, "Trying to present %d records\n", toget);
201 a->u.presentRequest->numberOfRecordsRequested = &toget;
203 a->u.presentRequest->resultSetId = "Default";
205 if (send_apdu(t, a) >= 0)
207 iochan_setflags(i, EVENT_INPUT);
208 t->state = Presenting;
216 odr_reset(t->odr_out);
219 static void do_initResponse(IOCHAN i, Z_APDU *a)
221 struct target *t = iochan_getdata(i);
222 Z_InitResponse *r = a->u.initResponse;
224 yaz_log(YLOG_DEBUG, "Received init response");
239 static char *search_geterror(Z_SearchRequest *r)
244 static void do_searchResponse(IOCHAN i, Z_APDU *a)
246 struct target *t = iochan_getdata(i);
247 Z_SearchResponse *r = a->u.searchResponse;
249 yaz_log(YLOG_DEBUG, "Searchresponse (status=%d)", *r->searchStatus);
251 if (*r->searchStatus)
253 t->hits = *r->resultCount;
261 Z_Records *recs = r->records;
262 if (recs->which == Z_Records_NSD)
264 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
265 t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
272 // FIXME Catch present errors!!!!!!!
273 static void do_presentResponse(IOCHAN i, Z_APDU *a)
275 struct target *t = iochan_getdata(i);
276 Z_PresentResponse *r = a->u.presentResponse;
279 Z_Records *recs = r->records;
280 if (recs->which == Z_Records_NSD)
282 yaz_log(YLOG_WARN, "Non-surrogate diagnostic");
283 t->diagnostic = *recs->u.nonSurrogateDiagnostic->condition;
288 yaz_log(YLOG_DEBUG, "Got Records!");
292 if (!*r->presentStatus && t->state != Error)
294 yaz_log(YLOG_DEBUG, "Good Present response");
295 t->records += *r->numberOfRecordsReturned;
298 else if (*r->presentStatus)
300 yaz_log(YLOG_WARN, "Bad Present response");
305 static void handler(IOCHAN i, int event)
307 struct target *t = iochan_getdata(i);
308 struct session *s = t->session;
309 //static int waiting = 0;
311 if (t->state == No_connection) /* Start connection */
313 int res = cs_connect(t->link, t->addr);
315 t->state = Connecting;
316 if (!res) /* we are go */
317 iochan_setevent(i, EVENT_OUTPUT);
319 iochan_setflags(i, EVENT_OUTPUT);
322 yaz_log(YLOG_WARN|YLOG_ERRNO, "ERROR %s connect\n", t->hostport);
329 else if (t->state == Connecting && event & EVENT_OUTPUT)
332 socklen_t errlen = sizeof(errcode);
334 if (getsockopt(cs_fileno(t->link), SOL_SOCKET, SO_ERROR, &errcode,
335 &errlen) < 0 || errcode != 0)
344 yaz_log(YLOG_DEBUG, "Connect OK");
345 t->state = Connected;
349 else if (event & EVENT_INPUT)
351 int len = cs_get(t->link, &t->ibuf, &t->ibufsize);
369 if (t->requestid == s->requestid || t->state == Initializing)
373 odr_reset(t->odr_in);
374 odr_setbuf(t->odr_in, t->ibuf, len, 0);
375 if (!z_APDU(t->odr_in, &a, 0, 0))
384 case Z_APDU_initResponse:
385 do_initResponse(i, a);
387 case Z_APDU_searchResponse:
388 do_searchResponse(i, a);
390 case Z_APDU_presentResponse:
391 do_presentResponse(i, a);
394 yaz_log(YLOG_WARN, "Unexpected result from server");
400 // if (cs_more(t->link))
401 // iochan_setevent(i, EVENT_INPUT);
403 else // we throw away response and go to idle mode
406 /* if len==1 we do nothing but wait for more input */
409 else if (t->state == Connected) {
413 if (t->state == Idle)
415 if (t->requestid != s->requestid) {
418 else if (t->hits > 0 && t->records < global_parameters.toget &&
419 t->records < t->hits) {
425 int load_targets(struct session *s, const char *fn)
427 FILE *f = fopen(fn, "r");
429 struct target **target_p;
433 yaz_log(YLOG_WARN|YLOG_ERRNO, "open %s", fn);
437 target_p = &s->targets;
438 while (fgets(line, 255, f))
441 struct target *target;
444 if (strncmp(line, "target ", 7))
447 url[strlen(url) - 1] = '\0';
448 yaz_log(LOG_DEBUG, "Target: %s", url);
450 *target_p = target = xmalloc(sizeof(**target_p));
452 target_p = &target->next;
453 target->state = No_connection;
455 target->ibufsize = 0;
456 target->odr_in = odr_createmem(ODR_DECODE);
457 target->odr_out = odr_createmem(ODR_ENCODE);
461 target->requestid = -1;
463 target->diagnostic = 0;
464 strcpy(target->fullname, url);
465 if ((p = strchr(url, '/')))
468 strcpy(target->hostport, url);
471 strcpy(target->databases[0], p);
472 target->databases[1][0] = '\0';
476 strcpy(target->hostport, url);
477 strcpy(target->databases[0], "Default");
478 target->databases[1][0] = '\0';
481 if (!(target->link = cs_create(tcpip_type, 0, PROTO_Z3950)))
483 yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack");
486 if (!(target->addr = cs_straddr(target->link, target->hostport)))
488 printf("ERROR %s bad-address", target->hostport);
489 target->state = Failed;
492 new = iochan_create(cs_fileno(target->link), handler, 0);
493 iochan_setdata(new, target);
494 iochan_setevent(new, EVENT_EXCEPT);
495 new->next = channel_list;
503 void search(struct session *s, char *query)
507 yaz_log(YLOG_DEBUG, "Search");
509 // Determine what iochans belong to this session
510 // It might have been better to have a list of them
512 strcpy(s->query, query);
515 for (c = channel_list; c; c = c->next)
519 if (iochan_getfun(c) != handler) // Not a Z target
521 t = iochan_getdata(c);
528 if (t->state == Error)
531 if (t->state == Idle)
532 iochan_setflag(c, EVENT_OUTPUT);
537 struct session *new_session()
539 struct session *session = xmalloc(sizeof(*session));
541 yaz_log(YLOG_DEBUG, "New pazpar2 session");
543 session->requestid = -1;
544 session->targets = 0;
545 session->pqf_parser = yaz_pqf_create();
546 session->query[0] = '\0';
547 session->nmem = nmem_create();
552 struct hitsbytarget *hitsbytarget(struct session *s, int *count)
554 static struct hitsbytarget res[1000]; // FIXME MM
558 for (c = channel_list; c; c = c->next)
559 if (iochan_getfun(c) == handler)
561 struct target *t = iochan_getdata(c);
564 strcpy(res[*count].id, t->hostport);
565 res[*count].hits = t->hits;
566 res[*count].records = t->records;
567 res[*count].diagnostic = t->diagnostic;
568 res[*count].state = state_strings[(int) t->state];
577 void statistics(struct session *s, struct statistics *stat)
582 bzero(stat, sizeof(*stat));
583 for (i = 0, c = channel_list; c; i++, c = c->next)
586 if (iochan_getfun(c) != handler)
588 t = iochan_getdata(c);
591 case No_connection: stat->num_no_connection++; break;
592 case Connecting: stat->num_connecting++; break;
593 case Initializing: stat->num_initializing++; break;
594 case Searching: stat->num_searching++; break;
595 case Presenting: stat->num_presenting++; break;
596 case Idle: stat->num_idle++; break;
597 case Failed: stat->num_failed++; break;
598 case Error: stat->num_error++; break;
603 stat->num_connections = i;
606 int main(int argc, char **argv)
611 if (signal(SIGPIPE, SIG_IGN) < 0)
612 yaz_log(YLOG_WARN|YLOG_ERRNO, "signal");
615 yaz_log_init(YLOG_DEFAULT_LEVEL|YLOG_DEBUG, "pazpar2", 0);
617 while ((ret = options("c:", argv, argc, &arg)) != -2)
623 command_init(atoi(arg));
626 fprintf(stderr, "Usage: pazpar2 -d comport");
632 event_loop(&channel_list);
640 * indent-tabs-mode: nil
642 * vim: shiftwidth=4 tabstop=8 expandtab