--- /dev/null
+/*
+ * Copyright (c) 1995-2015, Index Data
+ * All rights reserved.
+ * See the file LICENSE for details.
+ */
+package org.yaz4j;
+
+import org.yaz4j.exception.ZoomException;
+import static org.yaz4j.jni.yaz4jlib.*;
+
+/**
+ *
+ * @author jakub
+ */
+public class AsyncConnection extends Connection {
+ private ResultSet lastResultSet;
+ ErrorHandler eh;
+ //make sure error is only handled once
+ boolean errorHandled = false;
+ ErrorHandler reh;
+ SearchHandler sh;
+ RecordHandler rh;
+
+ public interface SearchHandler {
+ public void handle(ResultSet rs);
+ }
+
+ public interface RecordHandler {
+ public void handle(Record r);
+ }
+
+ public interface ErrorHandler {
+ public void handle(ZoomException e);
+ }
+
+ public AsyncConnection(String host, int port) {
+ super(host, port);
+ ZOOM_connection_option_set(zoomConnection, "async", "1");
+ closed = false;
+ }
+
+ @Override
+ public ResultSet search(Query query) throws ZoomException {
+ errorHandled = false;
+ lastResultSet = super.search(query);
+ return null;
+ }
+
+ public AsyncConnection onSearch(SearchHandler sh) {
+ this.sh = sh;
+ return this;
+ }
+
+ public AsyncConnection onRecord(RecordHandler rh) {
+ this.rh = rh;
+ return this;
+ }
+
+ public AsyncConnection onError(ErrorHandler eh) {
+ this.eh = eh;
+ return this;
+ }
+
+ public AsyncConnection onRecordError(ErrorHandler reh) {
+ this.reh = reh;
+ return this;
+ }
+
+ //actuall handler, pkg-private
+
+ void handleSearch() {
+ handleError();
+ //handle search
+ if (sh != null) sh.handle(lastResultSet);
+ }
+
+ void handleRecord() {
+ //TODO clone the record to detach it from the result set
+ try {
+ if (rh != null) rh.handle(lastResultSet.getRecord(lastResultSet.asyncRecordOffset));
+ } catch (ZoomException ex) {
+ if (reh != null) reh.handle(ex);
+ } finally {
+ lastResultSet.asyncRecordOffset++;
+ }
+ }
+
+ void handleError() {
+ //handle error
+ if (!errorHandled) {
+ ZoomException err = ExceptionUtil.getError(zoomConnection, host, port);
+ if (err != null) {
+ if (eh != null) {
+ eh.handle(err);
+ errorHandled = true;
+ }
+ }
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 1995-2015, Index Data
+ * All rights reserved.
+ * See the file LICENSE for details.
+ */
+package org.yaz4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.yaz4j.jni.SWIGTYPE_p_p_ZOOM_connection_p;
+import static org.yaz4j.jni.yaz4jlib.*;
+import static java.lang.System.out;
+
+/**
+ *
+ * @author jakub
+ */
+public class AsyncConnections {
+ private List<AsyncConnection> conns = new ArrayList<AsyncConnection>();
+
+ public void add(AsyncConnection conn) {
+ conns.add(conn);
+ }
+
+ public List<AsyncConnection> getConnections() {
+ return conns;
+ }
+
+ public void start() {
+ SWIGTYPE_p_p_ZOOM_connection_p c_conns = new_zoomConnectionArray(conns.size());
+ try {
+ for (int i=0; i<conns.size(); i++) {
+ Connection conn = conns.get(i);
+ zoomConnectionArray_setitem(c_conns, i, conn.zoomConnection);
+ }
+ int ret = 0;
+ while ((ret = ZOOM_event(conns.size(), c_conns)) != 0) {
+ int idx = ret - 1;
+ int last = ZOOM_connection_last_event(zoomConnectionArray_getitem(c_conns, idx));
+ AsyncConnection conn = conns.get(idx);
+ String event = ZOOM_get_event_str(last);
+ out.println("Received event " + event + " on connection #"+idx);
+ switch (last) {
+ case ZOOM_EVENT_RECV_SEARCH: conn.handleSearch(); break;
+ case ZOOM_EVENT_RECV_RECORD: conn.handleRecord(); break;
+ //TODO this will make handle error twice
+ case ZOOM_EVENT_END: conn.handleError(); break;
+ //TODO should we simply handle error for any event?
+ }
+ }
+ } finally {
+ delete_zoomConnectionArray(c_conns);
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 1995-2015, Index Data
+ * All rights reserved.
+ * See the file LICENSE for details.
+ */
+package org.yaz4j;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.yaz4j.exception.ZoomException;
+
+import static java.lang.System.out;
+
+/**
+ *
+ * @author jakub
+ */
+public class AsyncConnectionsTest {
+
+ class Box<T> {
+ T item;
+
+ public Box() {
+ }
+
+ public Box(T item) {
+ this.item = item;
+ }
+
+ T getItem() {
+ return item;
+ }
+
+ void setItem(T item) {
+ this.item = item;
+ }
+ }
+
+ public AsyncConnectionsTest() {
+ }
+
+ @BeforeClass
+ public static void setUpClass() {
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ }
+
+ @Before
+ public void setUp() {
+ }
+
+ @After
+ public void tearDown() {
+ }
+
+ /**
+ * Test async ZOOM operation.
+ */
+ @Test
+ public void testBadDatabaseTarget() {
+ out.println("Trying bad async connection...");
+ AsyncConnection conn = new AsyncConnection("z3950.indexdata.dk:210/doesnotexist", 0);
+ conn.option("count", "100");
+ AsyncConnections conns = new AsyncConnections();
+ conns.add(conn);
+ final Box<Boolean> hadError = new Box<Boolean>(false);
+ try {
+ conn.setSyntax("sutrs");
+ conn.connect();
+ conn.search(new PrefixQuery("@attr 1=4 utah"));
+ conn
+ .onError(new AsyncConnection.ErrorHandler() {
+
+ public void handle(ZoomException e) {
+ out.println("There was an error "+e.getMessage());
+ hadError.setItem(true);
+ }
+ });
+
+ } catch (ZoomException ex) {
+ fail(ex.getMessage());
+ }
+ conns.start();
+ assertTrue(hadError.item);
+ }
+
+ /**
+ * Test async ZOOM operation.
+ */
+ @Test
+ public void testBadTarget() {
+ out.println("Trying bad target async connection...");
+ AsyncConnection conn = new AsyncConnection("z3950.indexdata.dk:70000/doesnotexist", 0);
+ conn.option("count", "100");
+ AsyncConnections conns = new AsyncConnections();
+ conns.add(conn);
+ final Box<Boolean> hadError = new Box<Boolean>(false);
+ try {
+ conn.setSyntax("sutrs");
+ conn.connect();
+ conn.search(new PrefixQuery("@attr 1=4 utah"));
+ conn
+ .onError(new AsyncConnection.ErrorHandler() {
+
+ public void handle(ZoomException e) {
+ out.println("There was an error "+e.getMessage());
+ hadError.setItem(true);
+ }
+ });
+
+ } catch (ZoomException ex) {
+ fail(ex.getMessage());
+ }
+ conns.start();
+ assertTrue(hadError.item);
+ }
+
+ /**
+ * Test async ZOOM operation.
+ */
+ @Test
+ public void testSingleTarget() {
+ out.println("Trying async connection...");
+ AsyncConnection conn = new AsyncConnection("z3950.indexdata.dk:210/gils", 0);
+ conn.option("count", "100");
+ AsyncConnections conns = new AsyncConnections();
+ conns.add(conn);
+ int expectedHitCount = 9;
+ final Box<Long> actualHitCount = new Box<Long>();
+ final Box<Integer> actualRecordCounter = new Box<Integer>(0);
+ try {
+ conn.setSyntax("sutrs");
+ conn.connect();
+ conn.search(new PrefixQuery("@attr 1=4 utah"));
+ conn
+ .onSearch(new AsyncConnection.SearchHandler() {
+ public void handle(ResultSet rs) {
+ out.println("Received search, hit count "+rs.getHitCount());
+ actualHitCount.setItem(rs.getHitCount());
+ }
+ })
+ .onRecord(new AsyncConnection.RecordHandler() {
+ public void handle(Record r) {
+ out.println("Received a record of type "+r.getSyntax());
+ actualRecordCounter.setItem(actualRecordCounter.getItem()+1);
+ }
+ });
+
+ } catch (ZoomException ex) {
+ fail(ex.getMessage());
+ }
+ conns.start();
+ assertEquals(expectedHitCount, actualHitCount.item);
+ assertEquals(expectedHitCount, actualRecordCounter.item);
+
+ }
+
+
+ /**
+ * Test async ZOOM operation.
+ */
+ @Test
+ public void testMulitTarget() {
+ out.println("Trying async with multile connections...");
+ AsyncConnections conns = new AsyncConnections();
+ AsyncConnection conn = new AsyncConnection("z3950.indexdata.dk:210/gils", 0);
+ conn.option("count", "100");
+ conns.add(conn);
+ AsyncConnection conn2 = new AsyncConnection("z3950.indexdata.dk:210/marc", 0);
+ conn2.option("count", "100");
+ conns.add(conn2);
+ int expectedHitCount = 19; //for both
+ final Box<Long> actualHitCount = new Box<Long>(0L);
+ final Box<Integer> actualRecordCounter = new Box<Integer>(0);
+ try {
+ //we need to simplify the API for multiple
+ conn.setSyntax("sutrs");
+ conn.connect();
+ conn.search(new PrefixQuery("@attr 1=4 utah"));
+ conn
+ .onSearch(new AsyncConnection.SearchHandler() {
+ public void handle(ResultSet rs) {
+ out.println("Received search, hit count "+rs.getHitCount());
+ actualHitCount.setItem(actualHitCount.getItem() + rs.getHitCount());
+ }
+ })
+ .onRecord(new AsyncConnection.RecordHandler() {
+ public void handle(Record r) {
+ out.println("Received a record of type "+r.getSyntax());
+ actualRecordCounter.setItem(actualRecordCounter.getItem()+1);
+ }
+ });
+ conn2.setSyntax("marc21");
+ conn2.connect();
+ conn2.search(new PrefixQuery("@attr 1=4 computer"));
+ conn2
+ .onSearch(new AsyncConnection.SearchHandler() {
+ public void handle(ResultSet rs) {
+ out.println("Received search, hit count "+rs.getHitCount());
+ actualHitCount.setItem(actualHitCount.getItem() + rs.getHitCount());
+ }
+ })
+ .onRecord(new AsyncConnection.RecordHandler() {
+ public void handle(Record r) {
+ out.println("Received a record of type "+r.getSyntax());
+ actualRecordCounter.setItem(actualRecordCounter.getItem()+1);
+ }
+ })
+ .onError(new AsyncConnection.ErrorHandler() {
+
+ public void handle(ZoomException e) {
+ out.println("Caught error: "+e.getMessage());
+ }
+ });
+
+ } catch (ZoomException ex) {
+ fail(ex.getMessage());
+ }
+ conns.start();
+ assertEquals(expectedHitCount, actualHitCount.item);
+ assertEquals(expectedHitCount, actualRecordCounter.item);
+
+ }
+}