From: Sebastian Hammer Date: Mon, 21 Jul 2008 10:00:38 +0000 (+0000) Subject: Added ZOOM support, only raw ingest missing at this point X-Git-Tag: v1.1.0~149^2 X-Git-Url: http://git.indexdata.com/?p=pazpar2-moved-to-github.git;a=commitdiff_plain;h=8a78bbf3382e2061c68c866a37904e8e14770cf8 Added ZOOM support, only raw ingest missing at this point --- diff --git a/m4 b/m4 index 9763824..a3d7a33 160000 --- a/m4 +++ b/m4 @@ -1 +1 @@ -Subproject commit 9763824ebc2240e71a2eff5bb5b3775ab56d0508 +Subproject commit a3d7a3375702caae9794ae3a724e51ae9ee46ac3 diff --git a/src/client.c b/src/client.c index 076b5e5..964a3cb 100644 --- a/src/client.c +++ b/src/client.c @@ -117,24 +117,6 @@ static const char *client_states[] = { static struct client *client_freelist = 0; -static int send_apdu(struct client *c, Z_APDU *a) -{ - struct session_database *sdb = client_get_database(c); - const char *apdulog = session_setting_oneval(sdb, PZ_APDULOG); - if (apdulog && *apdulog && *apdulog != '0') - { - ODR p = odr_createmem(ODR_PRINT); - yaz_log(YLOG_LOG, "send APDU %s", client_get_url(c)); - - odr_setprint(p, yaz_log_file()); - z_APDU(p, &a, 0, 0); - odr_setprint(p, stderr); - odr_destroy(p); - } - return connection_send_apdu(client_get_connection(c), a); -} - - const char *client_get_state_str(struct client *cl) { return client_states[cl->state]; @@ -161,61 +143,12 @@ static void client_show_raw_error(struct client *cl, const char *addinfo); // Close connection and set state to error void client_fatal(struct client *cl) { - client_show_raw_error(cl, "client connection failure"); + //client_show_raw_error(cl, "client connection failure"); yaz_log(YLOG_WARN, "Fatal error from %s", client_get_url(cl)); connection_destroy(cl->connection); client_set_state(cl, Client_Error); } - -static int diag_to_wrbuf(Z_DiagRec **pp, int num, WRBUF w) -{ - int code = 0; - int i; - for (i = 0; iwhich != Z_DiagRec_defaultFormat) - { - wrbuf_puts(w, "? Not in default format"); - } - else - { - Z_DefaultDiagFormat *r = p->u.defaultFormat; - - if (!r->diagnosticSetId) - wrbuf_puts(w, "? Missing diagset"); - else - { - oid_class oclass; - char diag_name_buf[OID_STR_MAX]; - const char *diag_name = 0; - diag_name = yaz_oid_to_string_buf - (r->diagnosticSetId, &oclass, diag_name_buf); - wrbuf_puts(w, diag_name); - } - if (!code) - code = *r->condition; - wrbuf_printf(w, " %d %s", *r->condition, - diagbib1_str(*r->condition)); - switch (r->which) - { - case Z_DefaultDiagFormat_v2Addinfo: - wrbuf_printf(w, " -- v2 addinfo '%s'", r->u.v2Addinfo); - break; - case Z_DefaultDiagFormat_v3Addinfo: - wrbuf_printf(w, " -- v3 addinfo '%s'", r->u.v3Addinfo); - break; - } - } - } - return code; -} - - - struct connection *client_get_connection(struct client *cl) { return cl->connection; @@ -241,6 +174,7 @@ void client_set_requestid(struct client *cl, int id) cl->requestid = id; } + int client_show_raw_begin(struct client *cl, int position, const char *syntax, const char *esn, void *data, @@ -285,10 +219,6 @@ int client_show_raw_begin(struct client *cl, int position, { client_show_raw_error(cl, "client disconnected"); } - else - { - client_continue(cl); - } return 0; } @@ -331,223 +261,39 @@ static void client_show_raw_cancel(struct client *cl) } } -static void client_present_syntax(Z_APDU *a, const char *syntax) -{ - // empty string for syntax OMITS preferredRecordSyntax (OPTIONAL) - if (syntax && *syntax) - a->u.presentRequest->preferredRecordSyntax = - yaz_string_to_oid_odr(yaz_oid_std(), - CLASS_RECSYN, syntax, - global_parameters.odr_out); -} - -static void client_present_elements(Z_APDU *a, const char *elements) -{ - if (elements && *elements) // element set is optional - { - Z_ElementSetNames *elementSetNames = - odr_malloc(global_parameters.odr_out, sizeof(*elementSetNames)); - Z_RecordComposition *compo = - odr_malloc(global_parameters.odr_out, sizeof(*compo)); - a->u.presentRequest->recordComposition = compo; - - compo->which = Z_RecordComp_simple; - compo->u.simple = elementSetNames; - - elementSetNames->which = Z_ElementSetNames_generic; - elementSetNames->u.generic = - odr_strdup(global_parameters.odr_out, elements); - } -} - void client_send_raw_present(struct client *cl) { struct session_database *sdb = client_get_database(cl); - Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest); - int toget = 1; - int start = cl->show_raw->position; + struct connection *co = client_get_connection(cl); + ZOOM_resultset set = connection_get_resultset(co); + + int offset = cl->show_raw->position; const char *syntax = 0; const char *elements = 0; assert(cl->show_raw); + assert(set); yaz_log(YLOG_DEBUG, "%s: trying to present %d record(s) from %d", - client_get_url(cl), toget, start); - - a->u.presentRequest->resultSetStartPoint = &start; - a->u.presentRequest->numberOfRecordsRequested = &toget; + client_get_url(cl), 1, offset); if (cl->show_raw->syntax) syntax = cl->show_raw->syntax; else syntax = session_setting_oneval(sdb, PZ_REQUESTSYNTAX); + ZOOM_resultset_option_set(set, "preferredRecordSyntax", syntax); - client_present_syntax(a, syntax); if (cl->show_raw->esn) elements = cl->show_raw->esn; else elements = session_setting_oneval(sdb, PZ_ELEMENTS); - client_present_elements(a, elements); + ZOOM_resultset_option_set(set, "elementSetName", elements); - if (send_apdu(cl, a) >= 0) - { - cl->show_raw->active = 1; - cl->state = Client_Presenting; - } - else - { - client_show_raw_error(cl, "send_apdu failed"); - cl->state = Client_Error; - } - odr_reset(global_parameters.odr_out); -} - -void client_send_present(struct client *cl) -{ - struct session_database *sdb = client_get_database(cl); - Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_presentRequest); - int toget; - int start = cl->records + 1; - const char *syntax = 0; - const char *elements = 0; - - toget = global_parameters.chunk; - if (toget > global_parameters.toget - cl->records) - toget = global_parameters.toget - cl->records; - if (toget > cl->hits - cl->records) - toget = cl->hits - cl->records; - - yaz_log(YLOG_DEBUG, "Trying to present %d record(s) from %d", - toget, start); - - a->u.presentRequest->resultSetStartPoint = &start; - a->u.presentRequest->numberOfRecordsRequested = &toget; - - syntax = session_setting_oneval(sdb, PZ_REQUESTSYNTAX); - client_present_syntax(a, syntax); - - elements = session_setting_oneval(sdb, PZ_ELEMENTS); - client_present_elements(a, elements); - - if (send_apdu(cl, a) >= 0) - cl->state = Client_Presenting; - else - cl->state = Client_Error; - odr_reset(global_parameters.odr_out); -} - - -void client_send_search(struct client *cl) -{ - struct session *se = client_get_session(cl); - struct session_database *sdb = client_get_database(cl); - Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_searchRequest); - int ndb; - char **databaselist; - Z_Query *zquery; - int ssub = 0, lslb = 100000, mspn = 10; - const char *piggyback = session_setting_oneval(sdb, PZ_PIGGYBACK); - const char *queryenc = session_setting_oneval(sdb, PZ_QUERYENCODING); - - yaz_log(YLOG_DEBUG, "Sending search to %s", sdb->database->url); - - - // constructing RPN query - a->u.searchRequest->query = zquery = odr_malloc(global_parameters.odr_out, - sizeof(Z_Query)); - zquery->which = Z_Query_type_1; - zquery->u.type_1 = p_query_rpn(global_parameters.odr_out, - client_get_pquery(cl)); - - // converting to target encoding - if (queryenc && *queryenc) - { - yaz_iconv_t iconv = yaz_iconv_open(queryenc, "UTF-8"); - if (iconv){ - yaz_query_charset_convert_rpnquery(zquery->u.type_1, - global_parameters.odr_out, - iconv); - yaz_iconv_close(iconv); - } else - yaz_log(YLOG_WARN, "Query encoding failed %s %s", - client_get_database(cl)->database->url, queryenc); - } - - for (ndb = 0; sdb->database->databases[ndb]; ndb++) - ; - databaselist = odr_malloc(global_parameters.odr_out, sizeof(char*) * ndb); - for (ndb = 0; sdb->database->databases[ndb]; ndb++) - databaselist[ndb] = sdb->database->databases[ndb]; - - if (!piggyback || *piggyback == '1') - { - const char *elements = session_setting_oneval(sdb, PZ_ELEMENTS); - const char *recsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX); - if (recsyn && *recsyn) - { - a->u.searchRequest->preferredRecordSyntax = - yaz_string_to_oid_odr(yaz_oid_std(), - CLASS_RECSYN, recsyn, - global_parameters.odr_out); - } - if (elements && *elements) - { - Z_ElementSetNames *esn = - odr_malloc(global_parameters.odr_out, sizeof(*esn)); - esn->which = Z_ElementSetNames_generic; - esn->u.generic = odr_strdup(global_parameters.odr_out, elements); - - a->u.searchRequest->smallSetElementSetNames = esn; - a->u.searchRequest->mediumSetElementSetNames = esn; - } - a->u.searchRequest->smallSetUpperBound = &ssub; - a->u.searchRequest->largeSetLowerBound = &lslb; - a->u.searchRequest->mediumSetPresentNumber = &mspn; - } - a->u.searchRequest->databaseNames = databaselist; - a->u.searchRequest->num_databaseNames = ndb; - - - { //scope for sending and logging queries - WRBUF wbquery = wrbuf_alloc(); - yaz_query_to_wrbuf(wbquery, a->u.searchRequest->query); - - - if (send_apdu(cl, a) >= 0) - { - client_set_state(cl, Client_Searching); - client_set_requestid(cl, se->requestid); - yaz_log(YLOG_LOG, "SearchRequest %s %s %s", - client_get_database(cl)->database->url, - queryenc ? queryenc : "UTF-8", - wrbuf_cstr(wbquery)); - } - else { - client_set_state(cl, Client_Error); - yaz_log(YLOG_WARN, "Failed SearchRequest %s %s %s", - client_get_database(cl)->database->url, - queryenc ? queryenc : "UTF-8", - wrbuf_cstr(wbquery)); - } - - wrbuf_destroy(wbquery); - } - - odr_reset(global_parameters.odr_out); -} - -void client_init_response(struct client *cl, Z_APDU *a) -{ - Z_InitResponse *r = a->u.initResponse; - - yaz_log(YLOG_DEBUG, "Init response %s", cl->database->database->url); - - if (*r->result) - cl->state = Client_Continue; - else - cl->state = Client_Failed; // FIXME need to do something to the connection + ZOOM_resultset_records(set, 0, offset, 1); + cl->show_raw->active = 1; } +#ifdef RETIRED static void ingest_raw_records(struct client *cl, Z_Records *r) { @@ -609,117 +355,90 @@ static void ingest_raw_records(struct client *cl, Z_Records *r) xmlFree(buf_out); } -static void ingest_records(struct client *cl, Z_Records *r) +#endif // RETIRED show raw + +void client_search_response(struct client *cl) { -#if USE_TIMING - yaz_timing_t t = yaz_timing_create(); -#endif - struct record *rec; - struct session *s = client_get_session(cl); - Z_NamePlusRecordList *rlist; - int i; + struct connection *co = cl->connection; + struct session *se = cl->session; + ZOOM_connection link = connection_get_link(co); + ZOOM_resultset resultset = connection_get_resultset(co); + const char *error, *addinfo; - if (r->which != Z_Records_DBOSD) - return; - rlist = r->u.databaseOrSurDiagnostics; - for (i = 0; i < rlist->num_records; i++) + if (ZOOM_connection_error(link, &error, &addinfo)) { - Z_NamePlusRecord *npr = rlist->records[i]; - - cl->records++; - if (npr->which != Z_NamePlusRecord_databaseRecord) - { - yaz_log(YLOG_WARN, - "Unexpected record type, probably diagnostic %s", - cl->database->database->url); - continue; - } - - rec = ingest_record(cl, npr->u.databaseRecord, cl->records); - if (!rec) - continue; + cl->hits = 0; + cl->state = Client_Error; + yaz_log(YLOG_WARN, "Search error %s (%s): %s", + error, addinfo, client_get_url(cl)); + } + else + { + cl->hits = ZOOM_resultset_size(resultset); + se->total_hits += cl->hits; } - if (rlist->num_records) - session_alert_watch(s, SESSION_WATCH_SHOW); - if (rlist->num_records) - session_alert_watch(s, SESSION_WATCH_RECORD); - -#if USE_TIMING - yaz_timing_stop(t); - yaz_log(YLOG_LOG, "ingest_records %6.5f %3.2f %3.2f", - yaz_timing_get_real(t), yaz_timing_get_user(t), - yaz_timing_get_sys(t)); - yaz_timing_destroy(&t); -#endif } - -void client_search_response(struct client *cl, Z_APDU *a) +void client_record_response(struct client *cl) { - struct session *se = cl->session; - Z_SearchResponse *r = a->u.searchResponse; - - yaz_log(YLOG_DEBUG, "Search response %s (status=%d)", - cl->database->database->url, *r->searchStatus); + struct connection *co = cl->connection; + ZOOM_connection link = connection_get_link(co); + ZOOM_resultset resultset = connection_get_resultset(co); + const char *error, *addinfo; - if (*r->searchStatus) + if (ZOOM_connection_error(link, &error, &addinfo)) { - cl->hits = *r->resultCount; - if (cl->hits < 0) - { - yaz_log(YLOG_WARN, "Target %s returns hit count %d", - cl->database->database->url, cl->hits); - } - else - se->total_hits += cl->hits; - if (r->presentStatus && !*r->presentStatus && r->records) - { - yaz_log(YLOG_DEBUG, "Records in search response %s", - cl->database->database->url); - ingest_records(cl, r->records); - } - cl->state = Client_Continue; + cl->state = Client_Error; + yaz_log(YLOG_WARN, "Search error %s (%s): %s", + error, addinfo, client_get_url(cl)); } else - { /*"FAILED"*/ - Z_Records *recs = r->records; - cl->hits = 0; - cl->state = Client_Error; - if (recs && recs->which == Z_Records_NSD) + { + ZOOM_record rec; + int offset = cl->records; + const char *msg, *addinfo; + + if ((rec = ZOOM_resultset_record(resultset, offset))) { - WRBUF w = wrbuf_alloc(); - - Z_DiagRec dr, *dr_p = &dr; - dr.which = Z_DiagRec_defaultFormat; - dr.u.defaultFormat = recs->u.nonSurrogateDiagnostic; - - wrbuf_printf(w, "Search response NSD %s: ", - cl->database->database->url); - - cl->diagnostic = diag_to_wrbuf(&dr_p, 1, w); - - yaz_log(YLOG_WARN, "%s", wrbuf_cstr(w)); + yaz_log(YLOG_LOG, "Record with offset %d", offset); + cl->records++; + if (ZOOM_record_error(rec, &msg, &addinfo, 0)) + yaz_log(YLOG_WARN, "Record error %s (%s): %s (rec #%d)", + error, addinfo, client_get_url(cl), cl->records); + else + { + struct session_database *sdb = client_get_database(cl); + const char *xmlrec; + char type[128] = "xml"; + const char *nativesyntax = + session_setting_oneval(sdb, PZ_NATIVESYNTAX); + char *cset; + + if (*nativesyntax && (cset = strchr(nativesyntax, ';'))) + sprintf(type, "xml; charset=%s", cset + 1); + + if ((xmlrec = ZOOM_record_get(rec, type, NULL))) + { + if (ingest_record(cl, xmlrec, cl->records)) + { + session_alert_watch(cl->session, SESSION_WATCH_SHOW); + session_alert_watch(cl->session, SESSION_WATCH_RECORD); + } + else + yaz_log(YLOG_WARN, "Failed to ingest"); + } + else + yaz_log(YLOG_WARN, "Failed to extract ZOOM record"); - cl->state = Client_Error; - wrbuf_destroy(w); - } - else if (recs && recs->which == Z_Records_multipleNSD) - { - WRBUF w = wrbuf_alloc(); - - wrbuf_printf(w, "Search response multipleNSD %s: ", - cl->database->database->url); - cl->diagnostic = - diag_to_wrbuf(recs->u.multipleNonSurDiagnostics->diagRecs, - recs->u.multipleNonSurDiagnostics->num_diagRecs, - w); - yaz_log(YLOG_WARN, "%s", wrbuf_cstr(w)); - cl->state = Client_Error; - wrbuf_destroy(w); + } } + else + yaz_log(YLOG_WARN, "Expected record, but got NULL"); } } +#ifdef RETIRED + void client_present_response(struct client *cl, Z_APDU *a) { Z_PresentResponse *r = a->u.presentResponse; @@ -794,6 +513,9 @@ void client_close_response(struct client *cl, Z_APDU *a) connection_destroy(co); } +#endif // RETIRED show raw + +#ifdef RETIRED int client_is_our_response(struct client *cl) { struct session *se = client_get_session(cl); @@ -803,92 +525,53 @@ int client_is_our_response(struct client *cl) return 1; return 0; } +#endif -// Set authentication token in init if one is set for the client -// TODO: Extend this to handle other schemes than open (should be simple) -static void init_authentication(struct client *cl, Z_InitRequest *req) +void client_start_search(struct client *cl) { struct session_database *sdb = client_get_database(cl); - const char *auth = session_setting_oneval(sdb, PZ_AUTHENTICATION); - - if (*auth) + struct connection *co = client_get_connection(cl); + ZOOM_connection link = connection_get_link(co); + ZOOM_resultset rs; + char *databaseName = sdb->database->databases[0]; + const char *opt_piggyback = session_setting_oneval(sdb, PZ_PIGGYBACK); + const char *opt_queryenc = session_setting_oneval(sdb, PZ_QUERYENCODING); + const char *opt_elements = session_setting_oneval(sdb, PZ_ELEMENTS); + const char *opt_requestsyn = session_setting_oneval(sdb, PZ_REQUESTSYNTAX); + const char *opt_maxrecs = session_setting_oneval(sdb, PZ_MAXRECS); + + assert(link); + + cl->hits = -1; + cl->records = 0; + cl->diagnostic = 0; + + if (*opt_piggyback) + ZOOM_connection_option_set(link, "piggyback", opt_piggyback); + else + ZOOM_connection_option_set(link, "piggyback", "1"); + if (*opt_queryenc) + ZOOM_connection_option_set(link, "rpnCharset", opt_queryenc); + if (*opt_elements) + ZOOM_connection_option_set(link, "elementSetName", opt_elements); + if (*opt_requestsyn) + ZOOM_connection_option_set(link, "preferredRecordSyntax", opt_requestsyn); + if (*opt_maxrecs) + ZOOM_connection_option_set(link, "count", opt_maxrecs); + else { - struct connection *co = client_get_connection(cl); - struct session *se = client_get_session(cl); - Z_IdAuthentication *idAuth = odr_malloc(global_parameters.odr_out, - sizeof(*idAuth)); - idAuth->which = Z_IdAuthentication_open; - idAuth->u.open = odr_strdup(global_parameters.odr_out, auth); - req->idAuthentication = idAuth; - connection_set_authentication(co, nmem_strdup(se->session_nmem, auth)); + char n[128]; + sprintf(n, "%d", global_parameters.toget); + ZOOM_connection_option_set(link, "count", n); } -} + if (!databaseName || !*databaseName) + databaseName = "Default"; + ZOOM_connection_option_set(link, "databaseName", databaseName); -static void init_zproxy(struct client *cl, Z_InitRequest *req) -{ - struct session_database *sdb = client_get_database(cl); - char *ztarget = sdb->database->url; - //char *ztarget = sdb->url; - const char *zproxy = session_setting_oneval(sdb, PZ_ZPROXY); - - if (*zproxy) - yaz_oi_set_string_oid(&req->otherInfo, - global_parameters.odr_out, - yaz_oid_userinfo_proxy, - 1, ztarget); -} - - -static void client_init_request(struct client *cl) -{ - Z_APDU *a = zget_APDU(global_parameters.odr_out, Z_APDU_initRequest); - - a->u.initRequest->implementationId = global_parameters.implementationId; - a->u.initRequest->implementationName = global_parameters.implementationName; - a->u.initRequest->implementationVersion = - global_parameters.implementationVersion; - ODR_MASK_SET(a->u.initRequest->options, Z_Options_search); - ODR_MASK_SET(a->u.initRequest->options, Z_Options_present); - ODR_MASK_SET(a->u.initRequest->options, Z_Options_namedResultSets); - - ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_1); - ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_2); - ODR_MASK_SET(a->u.initRequest->protocolVersion, Z_ProtocolVersion_3); - - init_authentication(cl, a->u.initRequest); - init_zproxy(cl, a->u.initRequest); - - if (send_apdu(cl, a) >= 0) - client_set_state(cl, Client_Initializing); - else - client_set_state(cl, Client_Error); - odr_reset(global_parameters.odr_out); -} + ZOOM_connection_option_set(link, "presentChunk", "20"); -void client_continue(struct client *cl) -{ - if (cl->state == Client_Connected) { - client_init_request(cl); - } - if (cl->state == Client_Continue || cl->state == Client_Idle) - { - struct session *se = client_get_session(cl); - if (cl->requestid != se->requestid && cl->pquery) { - // we'll have to abort this because result set is to be deleted - client_show_raw_cancel(cl); - client_send_search(cl); - } - else if (cl->show_raw) - { - client_send_raw_present(cl); - } - else if (cl->hits > 0 && cl->records < global_parameters.toget && - cl->records < cl->hits) { - client_send_present(cl); - } - else - client_set_state(cl, Client_Idle); - } + rs = ZOOM_connection_search_pqf(link, cl->pquery); + connection_set_resultset(co, rs); } struct client *client_create(void) @@ -1034,10 +717,7 @@ int client_is_active(struct client *cl) { if (cl->connection && (cl->state == Client_Continue || cl->state == Client_Connecting || - cl->state == Client_Connected || - cl->state == Client_Initializing || - cl->state == Client_Searching || - cl->state == Client_Presenting)) + cl->state == Client_Working)) return 1; return 0; } diff --git a/src/client.h b/src/client.h index a410078..3b0c355 100644 --- a/src/client.h +++ b/src/client.h @@ -32,9 +32,7 @@ enum client_state Client_Connecting, Client_Connected, Client_Idle, - Client_Initializing, - Client_Searching, - Client_Presenting, + Client_Working, Client_Error, Client_Failed, Client_Disconnected, @@ -65,8 +63,8 @@ const char *client_get_pquery(struct client *cl); void client_set_requestid(struct client *cl, int id); void client_init_response(struct client *cl, Z_APDU *a); -void client_search_response(struct client *cl, Z_APDU *a); -void client_present_response(struct client *cl, Z_APDU *a); +void client_search_response(struct client *cl); +void client_record_response(struct client *cl); void client_close_response(struct client *cl, Z_APDU *a); int client_is_our_response(struct client *cl); @@ -79,6 +77,7 @@ void client_destroy(struct client *c); void client_set_connection(struct client *cl, struct connection *con); void client_disconnect(struct client *cl); int client_prep_connection(struct client *cl); +void client_start_search(struct client *cl); void client_set_session(struct client *cl, struct session *se); int client_is_active(struct client *cl); struct client *client_next_in_session(struct client *cl); diff --git a/src/connection.c b/src/connection.c index c330155..11de6b2 100644 --- a/src/connection.c +++ b/src/connection.c @@ -65,7 +65,8 @@ typedef int socklen_t; */ struct connection { IOCHAN iochan; - COMSTACK link; + ZOOM_connection link; + ZOOM_resultset resultset; struct host *host; struct client *client; char *ibuf; @@ -75,14 +76,47 @@ struct connection { enum { Conn_Resolving, Conn_Connecting, - Conn_Open, - Conn_Waiting, + Conn_Open } state; struct connection *next; // next for same host or next in free list }; static struct connection *connection_freelist = 0; +static int connection_is_idle(struct connection *co) +{ + ZOOM_connection link = co->link; + int event = ZOOM_connection_peek_event(link); + + if (co->state != Conn_Open) + return 0; + + link = co->link; + event = ZOOM_connection_peek_event(link); + if (event == ZOOM_EVENT_NONE || + event == ZOOM_EVENT_END) + return 1; + else + return 0; +} + +ZOOM_connection connection_get_link(struct connection *co) +{ + return co->link; +} + +ZOOM_resultset connection_get_resultset(struct connection *co) +{ + return co->resultset; +} + +void connection_set_resultset(struct connection *co, ZOOM_resultset rs) +{ + if (co->resultset) + ZOOM_resultset_destroy(co->resultset); + co->resultset = rs; +} + static void remove_connection_from_host(struct connection *con) { struct connection **conp = &con->host->connections; @@ -104,9 +138,11 @@ void connection_destroy(struct connection *co) { if (co->link) { - cs_close(co->link); + ZOOM_connection_destroy(co->link); iochan_destroy(co->iochan); } + if (co->resultset) + ZOOM_resultset_destroy(co->resultset); yaz_log(YLOG_DEBUG, "Connection destroy %s", co->host->hostport); @@ -144,7 +180,8 @@ struct connection *connection_create(struct client *cl) new->zproxy = 0; client_set_connection(cl, new); new->link = 0; - new->state = Conn_Resolving; + new->resultset = 0; + new->state = Conn_Connecting; if (host->ipport) connection_connect(new); return new; @@ -178,108 +215,55 @@ static void connection_handler(IOCHAN i, int event) } return; } - if (co->state == Conn_Connecting && event & EVENT_OUTPUT) - { - int errcode; - socklen_t errlen = sizeof(errcode); - - if (getsockopt(cs_fileno(co->link), SOL_SOCKET, SO_ERROR, (char*) &errcode, - &errlen) < 0 || errcode != 0) - { - client_fatal(cl); - return; - } - else - { - yaz_log(YLOG_DEBUG, "Connect OK"); - co->state = Conn_Open; - if (cl) - client_set_state(cl, Client_Connected); - iochan_settimeout(i, global_parameters.z3950_session_timeout); - } - } - - else if (event & EVENT_INPUT) + else { - int len = cs_get(co->link, &co->ibuf, &co->ibufsize); - - if (len < 0) - { - yaz_log(YLOG_WARN|YLOG_ERRNO, "Error reading from %s", - client_get_url(cl)); - connection_destroy(co); - return; - } - else if (len == 0) - { - yaz_log(YLOG_WARN, "EOF reading from %s", client_get_url(cl)); - connection_destroy(co); - return; - } - else if (len > 1) // We discard input if we have no connection - { - co->state = Conn_Open; - - if (client_is_our_response(cl)) - { - Z_APDU *a; - struct session_database *sdb = client_get_database(cl); - const char *apdulog = session_setting_oneval(sdb, PZ_APDULOG); - - odr_reset(global_parameters.odr_in); - odr_setbuf(global_parameters.odr_in, co->ibuf, len, 0); - if (!z_APDU(global_parameters.odr_in, &a, 0, 0)) - { - client_fatal(cl); - return; - } + ZOOM_connection link = co->link; - if (apdulog && *apdulog && *apdulog != '0') - { - ODR p = odr_createmem(ODR_PRINT); - yaz_log(YLOG_LOG, "recv APDU %s", client_get_url(cl)); - - odr_setprint(p, yaz_log_file()); - z_APDU(p, &a, 0, 0); - odr_setprint(p, stderr); - odr_destroy(p); - } - switch (a->which) + if (ZOOM_event(1, &link)) + { + do { + int event = ZOOM_connection_last_event(link); + switch (event) { - case Z_APDU_initResponse: - client_init_response(cl, a); + case ZOOM_EVENT_END: + break; + case ZOOM_EVENT_SEND_DATA: + break; + case ZOOM_EVENT_RECV_DATA: break; - case Z_APDU_searchResponse: - client_search_response(cl, a); + case ZOOM_EVENT_UNKNOWN: break; - case Z_APDU_presentResponse: - client_present_response(cl, a); + case ZOOM_EVENT_SEND_APDU: + client_set_state(co->client, Client_Working); break; - case Z_APDU_close: - client_close_response(cl, a); + case ZOOM_EVENT_RECV_APDU: + client_set_state(co->client, Client_Idle); + break; + case ZOOM_EVENT_CONNECT: + yaz_log(YLOG_LOG, "Connected to %s", client_get_url(cl)); + co->state = Conn_Open; + client_set_state(co->client, Client_Connected); + iochan_settimeout(i, global_parameters.z3950_session_timeout); + break; + case ZOOM_EVENT_RECV_SEARCH: + yaz_log(YLOG_LOG, "Search response from %s", client_get_url(cl)); + client_search_response(cl); + break; + case ZOOM_EVENT_RECV_RECORD: + yaz_log(YLOG_LOG, "Record from %s", client_get_url(cl)); + client_record_response(cl); break; default: - yaz_log(YLOG_WARN, - "Unexpected Z39.50 response from %s", - client_get_url(cl)); - client_fatal(cl); - return; + yaz_log(YLOG_LOG, "Unhandled event (%d) from %s", + event, client_get_url(cl)); } - // We aren't expecting staggered output from target - // if (cs_more(t->link)) - // iochan_setevent(i, EVENT_INPUT); - } - else // we throw away response and go to idle mode - { - yaz_log(YLOG_DEBUG, "Ignoring result of expired operation"); - client_set_state(cl, Client_Continue); } - } - /* if len==1 we do nothing but wait for more input */ + while(ZOOM_event_nonblock(1, &link)); + } } - client_continue(cl); } + // Disassociate connection from client void connection_release(struct connection *co) { @@ -326,45 +310,39 @@ void connect_resolver_host(struct host *host) } } -int connection_send_apdu(struct connection *co, Z_APDU *a) +struct host *connection_get_host(struct connection *con) { - char *buf; - int len, r; + return con->host; +} - if (!z_APDU(global_parameters.odr_out, &a, 0, 0)) - { - odr_perror(global_parameters.odr_out, "Encoding APDU"); - abort(); - } - buf = odr_getbuf(global_parameters.odr_out, &len, 0); - r = cs_put(co->link, buf, len); - if (r < 0) - { - yaz_log(YLOG_WARN, "cs_put: %s", cs_errmsg(cs_errno(co->link))); +// Callback for use by event loop +// We do this because ZOOM connections don't always have (the same) sockets +static int socketfun(IOCHAN c) +{ + struct connection *co = iochan_getdata(c); + if (!co->link) return -1; - } - else if (r == 1) - { - fprintf(stderr, "cs_put incomplete (ParaZ does not handle that)\n"); - return -1;; - } - odr_reset(global_parameters.odr_out); /* release the APDU structure */ - co->state = Conn_Waiting; - iochan_setflags(co->iochan, EVENT_INPUT); - return 0; + return ZOOM_connection_get_socket(co->link); } -struct host *connection_get_host(struct connection *con) +// Because ZOOM always knows what events it is interested in; we may not +static int maskfun(IOCHAN c) { - return con->host; + struct connection *co = iochan_getdata(c); + if (!co->link) + return 0; + + // This is cheating a little, and assuming that eventl mask IDs are always + // the same as ZOOM-C's. + return ZOOM_connection_get_mask(co->link); } int connection_connect(struct connection *con) { - COMSTACK link = 0; + ZOOM_connection link = 0; struct host *host = connection_get_host(con); - void *addr; - int res; + ZOOM_options zoptions = ZOOM_options_create(); + char *auth; struct session_database *sdb = client_get_database(con->client); const char *zproxy = session_setting_oneval(sdb, PZ_ZPROXY); @@ -372,58 +350,43 @@ int connection_connect(struct connection *con) assert(host->ipport); assert(con); - if (!(link = cs_create(tcpip_type, 0, PROTO_Z3950))) - { - yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create comstack"); - return -1; - } + ZOOM_options_set(zoptions, "async", "1"); + ZOOM_options_set(zoptions, "implementationName", + global_parameters.implementationName); + ZOOM_options_set(zoptions, "implementationVersion", + global_parameters.implementationVersion); if (zproxy && *zproxy) - con->zproxy = xstrdup(zproxy); - - if (!con->zproxy) { - /* no Z39.50 proxy needed - direct connect */ - yaz_log(YLOG_DEBUG, "Connection create %s", connection_get_url(con)); - - if (!(addr = cs_straddr(link, host->ipport))) - { - yaz_log(YLOG_WARN|YLOG_ERRNO, - "Lookup of IP address %s failed", host->ipport); - return -1; - } - - } else { - /* Z39.50 proxy connect */ - yaz_log(YLOG_DEBUG, "Connection create %s proxy %s", - connection_get_url(con), con->zproxy); - - if (!(addr = cs_straddr(link, con->zproxy))) - { - yaz_log(YLOG_WARN|YLOG_ERRNO, - "Lookup of ZProxy IP address %s failed", - con->zproxy); - return -1; - } + con->zproxy = xstrdup(zproxy); + ZOOM_options_set(zoptions, "proxy", zproxy); } - - res = cs_connect(link, addr); - if (res < 0) + + if ((auth = (char*) session_setting_oneval(sdb, PZ_AUTHENTICATION))) + ZOOM_options_set(zoptions, "user", auth); + + if (!(link = ZOOM_connection_create(zoptions))) { - yaz_log(YLOG_WARN|YLOG_ERRNO, "cs_connect %s", - connection_get_url(con)); + yaz_log(YLOG_FATAL|YLOG_ERRNO, "Failed to create ZOOM Connection"); + ZOOM_options_destroy(zoptions); return -1; } + ZOOM_connection_connect(link, host->ipport, 0); + con->link = link; + con->iochan = iochan_create(0, connection_handler, 0); con->state = Conn_Connecting; - con->iochan = iochan_create(cs_fileno(link), connection_handler, 0); iochan_settimeout(con->iochan, global_parameters.z3950_connect_timeout); iochan_setdata(con->iochan, con); + iochan_setsocketfun(con->iochan, socketfun); + iochan_setmaskfun(con->iochan, maskfun); pazpar2_add_channel(con->iochan); /* this fragment is bad DRY: from client_prep_connection */ client_set_state(con->client, Client_Connecting); - iochan_setflag(con->iochan, EVENT_OUTPUT); + ZOOM_options_destroy(zoptions); + // This creates the connection + ZOOM_connection_process(link); return 0; } @@ -458,7 +421,7 @@ int client_prep_connection(struct client *cl) // See if someone else has an idle connection // We should look at timestamps here to select the longest-idle connection for (co = host->connections; co; co = co->next) - if (co->state == Conn_Open && + if (connection_is_idle(co) && (!co->client || client_get_session(co->client) != se) && !strcmp(co->authentication, session_setting_oneval(client_get_database(cl), @@ -478,27 +441,18 @@ int client_prep_connection(struct client *cl) else co = connection_create(cl); } + if (co) - { - if (co->state == Conn_Connecting) - { - client_set_state(cl, Client_Connecting); - iochan_setflag(co->iochan, EVENT_OUTPUT); - } - else if (co->state == Conn_Open) - { - if (client_get_state(cl) == Client_Error - || client_get_state(cl) == Client_Disconnected - || client_get_state(cl) == Client_Idle) - client_set_state(cl, Client_Continue); - iochan_setflag(co->iochan, EVENT_OUTPUT); - } return 1; - } else return 0; } +// DELETEME + + +int connection_send_apdu(struct connection *co, Z_APDU *a){return 10;} + /* diff --git a/src/connection.h b/src/connection.h index 0dac593..e0392f3 100644 --- a/src/connection.h +++ b/src/connection.h @@ -23,6 +23,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #ifndef CONNECTION_H #define CONNECTION_H +#include #include #include "eventl.h" @@ -44,6 +45,9 @@ struct connection *connection_get_available(struct connection *con_list, int connection_prep_connection(struct connection *co, struct session *se); const char *connection_get_url(struct connection *co); void connection_release(struct connection *co); +ZOOM_connection connection_get_link(struct connection *co); +ZOOM_resultset connection_get_resultset(struct connection *co); +void connection_set_resultset(struct connection *co, ZOOM_resultset rs); #endif diff --git a/src/eventl.c b/src/eventl.c index 3b689cd..ad1df4b 100644 --- a/src/eventl.c +++ b/src/eventl.c @@ -65,6 +65,8 @@ IOCHAN iochan_create(int fd, IOC_CALLBACK cb, int flags) new_iochan->fd = fd; new_iochan->flags = flags; new_iochan->fun = cb; + new_iochan->socketfun = NULL; + new_iochan->maskfun = NULL; new_iochan->force_event = 0; new_iochan->last_event = new_iochan->max_idle = 0; new_iochan->next = NULL; @@ -90,6 +92,10 @@ int event_loop(IOCHAN *iochans) max = 0; for (p = *iochans; p; p = p->next) { + if (p->maskfun) + p->flags = (*p->maskfun)(p); + if (p->socketfun) + p->fd = (*p->socketfun)(p); if (p->fd < 0) continue; if (p->force_event) @@ -133,12 +139,14 @@ int event_loop(IOCHAN *iochans) force_event == EVENT_INPUT)) { p->last_event = now; + yaz_log(YLOG_DEBUG, "Eventl input event"); (*p->fun)(p, EVENT_INPUT); } if (!p->destroyed && (FD_ISSET(p->fd, &out) || force_event == EVENT_OUTPUT)) { p->last_event = now; + yaz_log(YLOG_DEBUG, "Eventl output event"); (*p->fun)(p, EVENT_OUTPUT); } if (!p->destroyed && (FD_ISSET(p->fd, &except) || diff --git a/src/eventl.h b/src/eventl.h index 9bf2ca5..997b2d9 100644 --- a/src/eventl.h +++ b/src/eventl.h @@ -25,6 +25,8 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA struct iochan; typedef void (*IOC_CALLBACK)(struct iochan *i, int event); +typedef int (*IOC_SOCKETFUN)(struct iochan *i); +typedef int (*IOC_MASKFUN)(struct iochan *i); typedef struct iochan { @@ -37,6 +39,8 @@ typedef struct iochan #define EVENT_WORK 0x10 int force_event; IOC_CALLBACK fun; + IOC_SOCKETFUN socketfun; + IOC_MASKFUN maskfun; void *data; int destroyed; time_t last_event; @@ -61,6 +65,10 @@ typedef struct iochan #define iochan_getnext(i) ((i)->next) #define iochan_settimeout(i, t) ((i)->max_idle = (t), (i)->last_event = time(0)) #define iochan_activity(i) ((i)->last_event = time(0)) +#define iochan_setsocketfun(i, f) ((i)->socketfun = (f)) +#define iochan_getsocketfun(i) ((i)->socketfun) +#define iochan_setmaskfun(i, f) ((i)->maskfun = (f)) +#define iochan_getmaskfun(i) ((i)->maskfun) IOCHAN iochan_create(int fd, IOC_CALLBACK cb, int flags); int event_loop(IOCHAN *iochans); diff --git a/src/http_command.c b/src/http_command.c index 1272415..2670b26 100644 --- a/src/http_command.c +++ b/src/http_command.c @@ -534,8 +534,8 @@ static void show_raw_record_ok_binary(void *data, const char *buf, size_t sz) void show_raw_reset(void *data, struct http_channel *c, void *data2) { - struct client *client = data; - client_show_raw_remove(client, data2); + //struct client *client = data; + //client_show_raw_remove(client, data2); } static void cmd_record_ready(void *data); @@ -829,9 +829,7 @@ static void cmd_stat(struct http_channel *c) wrbuf_printf(c->wrbuf, "%d\n", stat.num_clients); wrbuf_printf(c->wrbuf, "%d\n", stat.num_no_connection); wrbuf_printf(c->wrbuf, "%d\n", stat.num_connecting); - wrbuf_printf(c->wrbuf, "%d\n", stat.num_initializing); - wrbuf_printf(c->wrbuf, "%d\n", stat.num_searching); - wrbuf_printf(c->wrbuf, "%d\n", stat.num_presenting); + wrbuf_printf(c->wrbuf, "%d\n", stat.num_working); wrbuf_printf(c->wrbuf, "%d\n", stat.num_idle); wrbuf_printf(c->wrbuf, "%d\n", stat.num_failed); wrbuf_printf(c->wrbuf, "%d\n", stat.num_error); diff --git a/src/logic.c b/src/logic.c index 8b15df5..c268a01 100644 --- a/src/logic.c +++ b/src/logic.c @@ -158,109 +158,20 @@ static void add_facet(struct session *s, const char *type, const char *value) termlist_insert(s->termlists[i].termlist, value); } -xmlDoc *record_to_xml(struct session_database *sdb, Z_External *rec) +xmlDoc *record_to_xml(struct session_database *sdb, const char *rec) { struct database *db = sdb->database; xmlDoc *rdoc = 0; - const Odr_oid *oid = rec->direct_reference; - /* convert response record to XML somehow */ - if (rec->which == Z_External_octet && oid - && !oid_oidcmp(oid, yaz_oid_recsyn_xml)) - { - /* xml already */ - rdoc = xmlParseMemory((char*) rec->u.octet_aligned->buf, - rec->u.octet_aligned->len); - if (!rdoc) - { - yaz_log(YLOG_FATAL, "Non-wellformed XML received from %s", - db->url); - return 0; - } - } - else if (rec->which == Z_External_OPAC) - { - if (!sdb->yaz_marc) - { - yaz_log(YLOG_WARN, "MARC decoding not configured"); - return 0; - } - else - { - /* OPAC gets converted to XML too */ - WRBUF wrbuf_opac = wrbuf_alloc(); - /* MARCXML inside the OPAC XML. Charset is in effect because we - use the yaz_marc handle */ - yaz_marc_xml(sdb->yaz_marc, YAZ_MARC_MARCXML); - yaz_opac_decode_wrbuf(sdb->yaz_marc, rec->u.opac, wrbuf_opac); - - rdoc = xmlParseMemory((char*) wrbuf_buf(wrbuf_opac), - wrbuf_len(wrbuf_opac)); - if (!rdoc) - { - yaz_log(YLOG_WARN, "Unable to parse OPAC XML"); - /* Was used to debug bug #1348 */ -#if 0 - FILE *f = fopen("/tmp/opac.xml.txt", "wb"); - if (f) - { - fwrite(wrbuf_buf(wrbuf_opac), 1, wrbuf_len(wrbuf_opac), f); - fclose(f); - } -#endif - } - wrbuf_destroy(wrbuf_opac); - } - } - else if (oid && yaz_oid_is_iso2709(oid)) - { - /* ISO2709 gets converted to MARCXML */ - if (!sdb->yaz_marc) - { - yaz_log(YLOG_WARN, "MARC decoding not configured"); - return 0; - } - else - { - xmlNode *res; - char *buf; - int len; - - if (rec->which != Z_External_octet) - { - yaz_log(YLOG_WARN, "Unexpected external branch, probably BER %s", - db->url); - return 0; - } - buf = (char*) rec->u.octet_aligned->buf; - len = rec->u.octet_aligned->len; - if (yaz_marc_read_iso2709(sdb->yaz_marc, buf, len) < 0) - { - yaz_log(YLOG_WARN, "Failed to decode MARC %s", db->url); - return 0; - } - - if (yaz_marc_write_xml(sdb->yaz_marc, &res, - "http://www.loc.gov/MARC21/slim", 0, 0) < 0) - { - yaz_log(YLOG_WARN, "Failed to encode as XML %s", - db->url); - return 0; - } - rdoc = xmlNewDoc((xmlChar *) "1.0"); - xmlDocSetRootElement(rdoc, res); - } - } - else + rdoc = xmlParseMemory(rec, strlen(rec)); + + if (!rdoc) { - char oid_name_buf[OID_STR_MAX]; - const char *oid_name = yaz_oid_to_string_buf(oid, 0, oid_name_buf); - yaz_log(YLOG_FATAL, - "Unable to handle record of type %s from %s", - oid_name, db->url); + yaz_log(YLOG_FATAL, "Non-wellformed XML received from %s", + db->url); return 0; } - + if (global_parameters.dump_records) { FILE *lf = yaz_log_file(); @@ -275,6 +186,7 @@ xmlDoc *record_to_xml(struct session_database *sdb, Z_External *rec) fprintf(lf, "\n"); } } + return rdoc; } @@ -343,7 +255,7 @@ static void insert_settings_values(struct session_database *sdb, xmlDoc *doc) } xmlDoc *normalize_record(struct session_database *sdb, struct session *se, - Z_External *rec) + const char *rec) { struct database_retrievalmap *m; xmlDoc *rdoc = record_to_xml(sdb, rec); @@ -694,7 +606,8 @@ enum pazpar2_error_code search(struct session *se, else { no_working++; - client_prep_connection(cl); + if (client_prep_connection(cl)) + client_start_search(cl); } } @@ -1000,9 +913,7 @@ void statistics(struct session *se, struct statistics *stat) switch (client_get_state(cl)) { case Client_Connecting: stat->num_connecting++; break; - case Client_Initializing: stat->num_initializing++; break; - case Client_Searching: stat->num_searching++; break; - case Client_Presenting: stat->num_presenting++; break; + case Client_Working: stat->num_working++; break; case Client_Idle: stat->num_idle++; break; case Client_Failed: stat->num_failed++; break; case Client_Error: stat->num_error++; break; @@ -1103,7 +1014,7 @@ static struct record_metadata *record_metadata_init( return rec_md; } -struct record *ingest_record(struct client *cl, Z_External *rec, +struct record *ingest_record(struct client *cl, const char *rec, int record_no) { xmlDoc *xdoc = normalize_record(client_get_database(cl), diff --git a/src/pazpar2.h b/src/pazpar2.h index 72310aa..c4989d0 100644 --- a/src/pazpar2.h +++ b/src/pazpar2.h @@ -144,9 +144,7 @@ struct statistics { int num_clients; int num_no_connection; int num_connecting; - int num_initializing; - int num_searching; - int num_presenting; + int num_working; int num_idle; int num_failed; int num_error; @@ -191,10 +189,10 @@ void pazpar2_event_loop(void); int host_getaddrinfo(struct host *host); xmlDoc *normalize_record(struct session_database *sdb, struct session *se, - Z_External *rec); -xmlDoc *record_to_xml(struct session_database *sdb, Z_External *rec); + const char *rec); +xmlDoc *record_to_xml(struct session_database *sdb, const char *rec); -struct record *ingest_record(struct client *cl, Z_External *rec, +struct record *ingest_record(struct client *cl, const char *rec, int record_no); void session_alert_watch(struct session *s, int what); void pull_terms(NMEM nmem, struct ccl_rpn_node *n, char **termlist, int *num); diff --git a/test/test_http_2.res b/test/test_http_2.res index 48f18ee..27822f6 100644 --- a/test/test_http_2.res +++ b/test/test_http_2.res @@ -4,9 +4,7 @@ 0 0 0 -0 -0 -0 +0 0 0 0