Introduce SSE example (not yet functional)
Change-Id: I5d3ddc6058e28e3e9125dfbe2201c1c1667d174e
diff --git a/plugin/pom.xml b/plugin/pom.xml
index fb0de66..3ca25a0 100644
--- a/plugin/pom.xml
+++ b/plugin/pom.xml
@@ -9,6 +9,7 @@
<url>http://maven.apache.org</url>
<properties>
+ <java.version>1.8</java.version>
<jetty.version>9.4.31.v20200723</jetty.version>
<jersey.version>2.27</jersey.version>
</properties>
@@ -96,6 +97,13 @@
<scope>test</scope>
</dependency>
+ <!-- https://mvnrepository.com/artifact/org.glassfish.jersey.media/jersey-media-sse -->
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-sse</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+
</dependencies>
<build>
@@ -139,6 +147,18 @@
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.1</version>
+ <inherited>true</inherited>
+ <configuration>
+ <showWarnings>true</showWarnings>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
index c7b447f..41b7b47 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
@@ -23,6 +23,9 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+
import static de.ids_mannheim.korap.plkexport.Util.*;
/**
@@ -47,6 +50,9 @@
private int maxResults = -1;
private int fetchedResults = 0;
+ private SseEventSink sink;
+ private Sse sse;
+
public String getMimeType() {
return "text/plain";
};
@@ -143,7 +149,25 @@
public void writeHeader (Writer w) throws IOException { };
public void writeFooter (Writer w) throws IOException { };
public void addMatch (JsonNode n, Writer w) throws IOException { };
-
+
+ public void setSse (SseEventSink sink, Sse sse) {
+ this.sink = sink;
+ this.sse = sse;
+ };
+
+ // Send the progress
+ private void sendProgress () {
+
+ if (this.sink == null)
+ return;
+
+ double calc = Math.ceil(
+ (
+ (float) this.fetchedResults / (float) this.maxResults
+ ) * 100
+ );
+ this.sink.send(this.sse.newEvent("progress", String.valueOf(calc)));
+ };
/**
* Create new match aggregator and parse initial Json
@@ -255,7 +279,7 @@
// Catch error
catch (IOException io) {
};
-
+
// TODO:
// Return exporter error
return Response.status(500).entity("error");
@@ -264,6 +288,9 @@
// Iterate through all matches
private boolean iterateThroughMatches (JsonNode mNodes) throws IOException {
+
+ this.sendProgress();
+
if (mNodes == null)
return false;
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
index 3f250f4..e65572c 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
@@ -8,6 +8,7 @@
import java.io.InputStream;
import java.lang.Thread;
import java.net.URLEncoder;
+import java.net.ConnectException;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Collection;
@@ -35,8 +36,10 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+import javax.ws.rs.sse.OutboundSseEvent;
import javax.servlet.http.Cookie;
-import java.net.ConnectException;
import javax.servlet.http.HttpServletRequest;
import static de.ids_mannheim.korap.plkexport.Util.*;
@@ -107,7 +110,7 @@
@POST
@Path("export")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
- public Response export (
+ public Response staticExport (
@FormParam("fname") String fname,
@FormParam("format") String format,
@FormParam("q") String q,
@@ -118,6 +121,9 @@
// @FormParam("islimit") String il
) throws IOException {
+ // Exporter exp =
+ // return progressExport(fname, format, q, cq, ql, cutoffStr, hitc, null, null);
+
// These parameters are required
String[][] params = {
{ "format", format },
@@ -230,6 +236,10 @@
exp.setFileName(fname);
};
+ // if (sse != null && sink != null) {
+ // exp.setSSE(sse, sink);
+ // };
+
// Initialize exporter (with meta data and first matches)
try {
exp.init(resp);
@@ -244,12 +254,16 @@
);
};
-
// Calculate how many results to fetch
int fetchCount = exp.getTotalResults();
if (exp.hasTimeExceeded() || fetchCount > maxResults) {
fetchCount = maxResults;
};
+ // TODO:
+ // else {
+ // setMaxResults()???
+ // }
+
// The first page was already enough - ignore paging
if (fetchCount <= pageSize) {
@@ -259,47 +273,72 @@
// If only one page should be exported there is no need
// for a temporary export file
if (cutoff) {
- builder = exp.serve();
- }
+
+ // TODO:
+ // Add method serveAndBuild()
+ return exp.serve().build();
+ };
// Page through all results
- else {
- // It's not important anymore to get totalResults
- uri.queryParam("cutoff", "true");
+ // It's not important anymore to get totalResults
+ uri.queryParam("cutoff", "true");
- // Set offset for paging as a template
- uri.queryParam("offset", "{offset}");
+ // Set offset for paging as a template
+ uri.queryParam("offset", "{offset}");
- try {
+ try {
- // Iterate over all results
- for (int i = pageSize; i <= fetchCount; i+=pageSize) {
- resource = client.target(uri.build(i));
- reqBuilder = resource.request(MediaType.APPLICATION_JSON);
- resp = authBuilder(reqBuilder, xff, auth).get(String.class);
+ // Iterate over all results
+ for (int i = pageSize; i <= fetchCount; i+=pageSize) {
+ resource = client.target(uri.build(i));
+ reqBuilder = resource.request(MediaType.APPLICATION_JSON);
+ resp = authBuilder(reqBuilder, xff, auth).get(String.class);
- // Stop when no more matches are allowed
- if (!exp.appendMatches(resp))
- break;
- }
- } catch (Exception e) {
- throw new WebApplicationException(
- responseForm(
- Status.INTERNAL_SERVER_ERROR,
- e.getMessage()
- )
- );
- };
-
- builder = exp.serve();
- };
-
- return builder.build();
+ // Stop when no more matches are allowed
+ if (!exp.appendMatches(resp))
+ break;
+ }
+ } catch (Exception e) {
+ throw new WebApplicationException(
+ responseForm(
+ Status.INTERNAL_SERVER_ERROR,
+ e.getMessage()
+ )
+ );
+ };
+
+ return exp.serve().build();
};
@GET
+ @Path("export")
+ @Produces("text/event-stream")
+ public void progressExport(@Context SseEventSink sseEventSink,
+ @Context Sse sse) throws InterruptedException {
+
+ // Exporter exp =
+ // progressExport(fname, format, q, cq, ql, cutoffStr, hitc, null, null);
+
+ // https://www.baeldung.com/java-ee-jax-rs-sse
+ // https://www.howopensource.com/2016/01/java-sse-chat-example/
+ // https://csetutorials.com/jersey-sse-tutorial.html
+ sseEventSink.send(sse.newEvent("Init", "Start"));
+
+ int x = 0;
+ while (x < 100) {
+ x++;
+ sseEventSink.send(sse.newEvent("Progress", ""+x));
+ };
+
+ sseEventSink.send(sse.newEvent("Relocate", "location"));
+
+ sseEventSink.close();
+ };
+
+
+ @GET
@Path("export")
@Produces(MediaType.TEXT_HTML)
public Response exportHTML () {
diff --git a/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java b/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
index 226f610..be59896 100644
--- a/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
+++ b/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
@@ -24,6 +24,7 @@
import java.net.URLDecoder;
import java.util.List;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
@@ -36,6 +37,18 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+// Sse testing
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import org.glassfish.jersey.media.sse.EventListener;
+import org.glassfish.jersey.media.sse.EventSource;
+import org.glassfish.jersey.media.sse.InboundEvent;
+import org.glassfish.jersey.media.sse.SseFeature;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.sse.SseEventSource;
+import javax.ws.rs.client.WebTarget;
+import java.util.concurrent.CountDownLatch;
+
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
@@ -68,6 +81,7 @@
.getLogger("org.mockserver"))
.setLevel(ch.qos.logback.classic.Level.OFF);
+ // Unfortunately this means the tests can't run in parallel
mockServer = ClientAndServer.startClientAndServer(34765);
mockClient = new MockServerClient("localhost", mockServer.getPort());
@@ -770,6 +784,51 @@
properties.setProperty("api.port", portTemp);
};
+ @Test
+ public void testExportWsProgress () throws InterruptedException {
+ // Based on https://stackoverflow.com/questions/35499655/
+ // how-to-test-server-sent-events-with-spring
+ // &
+ // https://github.com/jersey/jersey/blob/master/examples/
+ // server-sent-events-jersey/src/test/java/org/glassfish/
+ // jersey/examples/sse/jersey/ServerSentEventsTest.java
+
+ final LinkedList<String> events = new LinkedList<>();
+ final int eventCount = 102;
+
+
+ // Expect 102 messages:
+ final CountDownLatch latch = new CountDownLatch(eventCount);
+
+ // Create SSE client
+ Client client = ClientBuilder
+ .newBuilder()
+ .register(SseFeature.class)
+ .build();
+
+ EventSource eventSource = EventSource
+ .target(target("/export"))
+ .reconnectingEvery(300, TimeUnit.SECONDS)
+ .build();
+
+ EventListener listener = inboundEvent -> {
+ events.add(inboundEvent.getName() + ":" + inboundEvent.readData(String.class));
+ latch.countDown();
+ };
+
+ eventSource.register(listener);
+ eventSource.open();
+
+ latch.await(1000, TimeUnit.SECONDS);
+
+ Thread.sleep(1000);
+
+ assertEquals(events.getFirst(), "Init:Start");
+ assertEquals(events.getLast(), "Relocate:location");
+ assertEquals(events.size(), eventCount);
+
+ eventSource.close();
+ };
@Test
public void testClientIP () {