Separate export method from response method and adopt progress calculation for sse
Change-Id: I0d2e6d206a1b19475905528c2f8400dd25c418d7
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java
index 72e4b0e..ca11226 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java
@@ -5,6 +5,10 @@
import java.io.IOException;
import java.io.Writer;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+
+
interface Exporter {
// Implemented by MatchAggregator
@@ -24,10 +28,10 @@
public void setCorpusQueryString (String s);
public String getSource ();
public void setSource (String h, String p);
-
public int getTotalResults ();
public boolean hasTimeExceeded ();
public void setMaxResults (int m);
+ public void setSse (SseEventSink sink, Sse sse);
// Implemented by Exporter
public ResponseBuilder serve();
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
index 41b7b47..c29a7c2 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
@@ -158,15 +158,11 @@
// Send the progress
private void sendProgress () {
- if (this.sink == null)
+ if (this.sink == null || this.maxResults == 0)
return;
- double calc = Math.ceil(
- (
- (float) this.fetchedResults / (float) this.maxResults
- ) * 100
- );
- this.sink.send(this.sse.newEvent("progress", String.valueOf(calc)));
+ int calc = (int) Math.ceil(((double) this.fetchedResults / this.maxResults) * 100);
+ this.sink.send(this.sse.newEvent("Progress", String.valueOf(calc)));
};
/**
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
index e65572c..eb644b1 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
@@ -22,6 +22,7 @@
import javax.ws.rs.BadRequestException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.FormParam;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.POST;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -89,40 +90,17 @@
@Context
private HttpServletRequest req;
- /**
- * WebService calls Kustvakt Search Webservices and returns
- * response as json (all of the response) and
- * as rtf (matches)
- *
- * @param fname
- * file name
- * @param format
- * the file format value rtf or json.
- * @param q
- * the query
- * @param ql
- * the query language
- * @param cutoff
- * Export more than the first page
- *
- *
- */
- @POST
- @Path("export")
- @Produces(MediaType.APPLICATION_OCTET_STREAM)
- public Response staticExport (
- @FormParam("fname") String fname,
- @FormParam("format") String format,
- @FormParam("q") String q,
- @FormParam("cq") String cq,
- @FormParam("ql") String ql,
- @FormParam("cutoff") String cutoffStr,
- @FormParam("hitc") int hitc
- // @FormParam("islimit") String il
- ) throws IOException {
-
- // Exporter exp =
- // return progressExport(fname, format, q, cq, ql, cutoffStr, hitc, null, null);
+ // Private method to run the export,
+ // either static or streaming
+ private Exporter export (String fname,
+ String format,
+ String q,
+ String cq,
+ String ql,
+ String cutoffStr,
+ int hitc,
+ SseEventSink sink,
+ Sse sse) throws WebApplicationException {
// These parameters are required
String[][] params = {
@@ -134,12 +112,11 @@
// Check that all parameters are available
for (int i = 0; i < params.length; i++) {
if (params[i][1] == null || params[i][1].trim().isEmpty())
- throw new BadRequestException(
- Response
- .status(Status.BAD_REQUEST)
- .entity("Parameter " + "\""
- + params[i][0] + "\"" + " is missing or empty")
- .build());
+ throw new WebApplicationException(
+ responseForm(Status.BAD_REQUEST,
+ "Parameter " + "\""
+ + params[i][0] + "\"" +
+ " is missing or empty"));
};
// Retrieve cutoff value
@@ -151,9 +128,6 @@
cutoff = true;
};
- ResponseBuilder builder = null;
- Client client = ClientBuilder.newClient();
-
// Load configuration values
String scheme = prop.getProperty("api.scheme", "https");
String port = prop.getProperty("api.port", "8089");
@@ -169,7 +143,10 @@
// If less than pageSize results are requested - dont't fetch more
if (maxResults < pageSize)
pageSize = maxResults;
-
+
+ ResponseBuilder builder = null;
+ Client client = ClientBuilder.newClient();
+
// Create initial search uri
UriBuilder uri = UriBuilder.fromPath("/api/v1.0/search")
.host(host)
@@ -236,10 +213,15 @@
exp.setFileName(fname);
};
- // if (sse != null && sink != null) {
- // exp.setSSE(sse, sink);
- // };
+ // set progress mechanism, if required
+ if (sse != null && sink != null)
+ exp.setSse(sink, sse);
+ // TODO:
+ // The following could be subsumed in the MatchAggregator
+ // as a "run()" routine.
+
+
// Initialize exporter (with meta data and first matches)
try {
exp.init(resp);
@@ -258,12 +240,10 @@
int fetchCount = exp.getTotalResults();
if (exp.hasTimeExceeded() || fetchCount > maxResults) {
fetchCount = maxResults;
- };
- // TODO:
- // else {
- // setMaxResults()???
- // }
+ }
+ // fetchCount may be different to maxResults now, so reset after init
+ exp.setMaxResults(fetchCount);
// The first page was already enough - ignore paging
if (fetchCount <= pageSize) {
@@ -273,10 +253,7 @@
// If only one page should be exported there is no need
// for a temporary export file
if (cutoff) {
-
- // TODO:
- // Add method serveAndBuild()
- return exp.serve().build();
+ return exp;
};
// Page through all results
@@ -307,34 +284,96 @@
)
);
};
+
+ return exp;
+ };
+
+
+ /**
+ * WebService calls Kustvakt Search Webservices and returns
+ * response as json (all of the response) and
+ * as rtf (matches)
+ *
+ * @param fname
+ * file name
+ * @param format
+ * the file format value rtf or json.
+ * @param q
+ * the query
+ * @param ql
+ * the query language
+ * @param cutoff
+ * Export more than the first page
+ *
+ *
+ */
+ @POST
+ @Path("export")
+ @Produces(MediaType.APPLICATION_OCTET_STREAM)
+ public Response staticExport (
+ @FormParam("fname") String fname,
+ @FormParam("format") String format,
+ @FormParam("q") String q,
+ @FormParam("cq") String cq,
+ @FormParam("ql") String ql,
+ @FormParam("cutoff") String cutoffStr,
+ @FormParam("hitc") int hitc
+ // @FormParam("islimit") String il
+ ) throws IOException {
+
+ Exporter exp = export(fname, format, q, cq, ql, cutoffStr, hitc, null, null);
return exp.serve().build();
};
+
-
+ /**
+ * Progress based counterpart to staticExport,
+ * that requires a GET due to the JavaScript API.
+ */
@GET
@Path("export")
@Produces("text/event-stream")
- public void progressExport(@Context SseEventSink sseEventSink,
- @Context Sse sse) throws InterruptedException {
-
- // Exporter exp =
- // progressExport(fname, format, q, cq, ql, cutoffStr, hitc, null, null);
+ public void progressExport(
+ @Context SseEventSink sink,
+ @Context Sse sse,
+ @QueryParam("fname") String fname,
+ @QueryParam("format") String format,
+ @QueryParam("q") String q,
+ @QueryParam("cq") String cq,
+ @QueryParam("ql") String ql,
+ @QueryParam("cutoff") String cutoffStr,
+ @QueryParam("hitc") int hitc
+ ) throws InterruptedException {
// https://www.baeldung.com/java-ee-jax-rs-sse
// https://www.howopensource.com/2016/01/java-sse-chat-example/
// https://csetutorials.com/jersey-sse-tutorial.html
- sseEventSink.send(sse.newEvent("Init", "Start"));
- int x = 0;
- while (x < 100) {
- x++;
- sseEventSink.send(sse.newEvent("Progress", ""+x));
+ // Send initial event
+ sink.send(sse.newEvent("Process", "Init"));
+
+ try {
+ Exporter exp = export(
+ fname,
+ format,
+ q,
+ cq,
+ ql,
+ cutoffStr,
+ hitc,
+ sink,
+ sse
+ );
+ sink.send(sse.newEvent("Relocate", "..."));
+ }
+ catch (WebApplicationException wae) {
+ sink.send(sse.newEvent("Error",wae.getMessage()));
};
- sseEventSink.send(sse.newEvent("Relocate", "location"));
-
- sseEventSink.close();
+ sink.send(sse.newEvent("Process", "Done"));
+
+ sink.close();
};
diff --git a/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java b/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
index be59896..49311e0 100644
--- a/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
+++ b/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
@@ -785,19 +785,11 @@
};
@Test
- public void testExportWsProgress () throws InterruptedException {
- // Based on https://stackoverflow.com/questions/35499655/
- // how-to-test-server-sent-events-with-spring
- // &
- // https://github.com/jersey/jersey/blob/master/examples/
- // server-sent-events-jersey/src/test/java/org/glassfish/
- // jersey/examples/sse/jersey/ServerSentEventsTest.java
-
+ public void testExportWsProgressError () throws InterruptedException {
final LinkedList<String> events = new LinkedList<>();
- final int eventCount = 102;
+ final int eventCount = 3;
-
- // Expect 102 messages:
+ // Expect messages:
final CountDownLatch latch = new CountDownLatch(eventCount);
// Create SSE client
@@ -820,16 +812,92 @@
eventSource.open();
latch.await(1000, TimeUnit.SECONDS);
+ Thread.sleep(2000);
- Thread.sleep(1000);
-
- assertEquals(events.getFirst(), "Init:Start");
- assertEquals(events.getLast(), "Relocate:location");
- assertEquals(events.size(), eventCount);
-
+ // Check error
+ assertEquals(events.getFirst(), "Process:Init");
+ assertEquals(events.get(1), "Error:HTTP 400 Bad Request");
+ assertEquals(events.getLast(), "Process:Done");
+ assertEquals(events.size(), 3);
eventSource.close();
};
+
+ @Test
+ public void testExportWsProgress () throws InterruptedException {
+ mockClient.reset().when(
+ request()
+ .withMethod("GET")
+ .withPath("/api/v1.0/search")
+ .withQueryStringParameter("q", "Plagegeist")
+ .withQueryStringParameter("count", "5")
+ .withQueryStringParameter("offset", "5")
+ )
+ .respond(
+ response()
+ .withHeader("Content-Type: application/json; charset=utf-8")
+ .withBody(getFixture("response_plagegeist_2.json"))
+ .withStatusCode(200)
+ );
+
+ mockClient.when(
+ request()
+ .withMethod("GET")
+ .withPath("/api/v1.0/search")
+ .withQueryStringParameter("q", "Plagegeist")
+ )
+ .respond(
+ response()
+ .withHeader("Content-Type: application/json; charset=utf-8")
+ .withBody(getFixture("response_plagegeist_1.json"))
+ .withStatusCode(200)
+ );
+
+ // Based on https://stackoverflow.com/questions/35499655/
+ // how-to-test-server-sent-events-with-spring
+ // &
+ // https://github.com/jersey/jersey/blob/master/examples/
+ // server-sent-events-jersey/src/test/java/org/glassfish/
+ // jersey/examples/sse/jersey/ServerSentEventsTest.java
+
+ final LinkedList<String> events = new LinkedList<>();
+
+ // Create SSE client
+ Client client = ClientBuilder
+ .newBuilder()
+ .register(SseFeature.class)
+ .build();
+
+ EventSource eventSource = EventSource
+ .target(target("/export")
+ .queryParam("q", "Plagegeist")
+ .queryParam("ql","poliqarp")
+ .queryParam("format","rtf"))
+ .reconnectingEvery(300, TimeUnit.SECONDS)
+ .build();
+
+ EventListener listener = inboundEvent -> {
+ events.add(inboundEvent.getName() + ":" + inboundEvent.readData(String.class));
+ };
+
+ eventSource.register(listener);
+ eventSource.open();
+
+ Thread.sleep(3000);
+
+ // Check error
+ assertEquals(events.getFirst(), "Process:Init");
+ assertEquals(events.get(1), "Progress:0");
+ assertEquals(events.get(2), "Progress:56");
+ assertEquals(events.get(3), "Relocate:...");
+ assertEquals(events.getLast(), "Process:Done");
+ assertEquals(events.size(), 5);
+ eventSource.close();
+ };
+
+
+
+
@Test
public void testClientIP () {
assertEquals(getClientIP("10.0.4.6"), "10.0.4.6");