Switch to jersey native SSE to allow for CORS injection
Change-Id: Ic452cc51042404f0424b3c43cb52a252478caf56
diff --git a/plugin/pom.xml b/plugin/pom.xml
index 3ca25a0..d5a86f3 100644
--- a/plugin/pom.xml
+++ b/plugin/pom.xml
@@ -4,7 +4,7 @@
<groupId>de.ids_mannheim.korap</groupId>
<artifactId>KalamarExportPlugin</artifactId>
<packaging>jar</packaging>
- <version>0.2.0</version>
+ <version>0.2.1</version>
<name>KalamarExportPlugin</name>
<url>http://maven.apache.org</url>
@@ -55,10 +55,17 @@
<version>${jersey.version}</version>
</dependency>
<dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+ <!--
+ <dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet-core</artifactId>
<version>${jersey.version}</version>
- </dependency>
+ </dependency>
+ -->
<!-- Jersey test framework -->
<dependency>
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/ExWSConf.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/ExWSConf.java
index b781394..b4523d2 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/ExWSConf.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/ExWSConf.java
@@ -17,7 +17,7 @@
// Version of Export Plugin
public static final int VERSION_MAJOR = 0;
public static final int VERSION_MINOR = 2;
- public static final int VERSION_PATCHLEVEL= 0;
+ public static final int VERSION_PATCHLEVEL = 1;
private static Properties prop;
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java
index ca11226..c82b35b 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java
@@ -5,8 +5,7 @@
import java.io.IOException;
import java.io.Writer;
-import javax.ws.rs.sse.Sse;
-import javax.ws.rs.sse.SseEventSink;
+import org.glassfish.jersey.media.sse.EventOutput;
interface Exporter {
@@ -31,7 +30,7 @@
public int getTotalResults ();
public boolean hasTimeExceeded ();
public void setMaxResults (int m);
- public void setSse (SseEventSink sink, Sse sse);
+ public void setSse (EventOutput sse);
// Implemented by Exporter
public ResponseBuilder serve();
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
index c29a7c2..d3de004 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
@@ -23,8 +23,8 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
-import javax.ws.rs.sse.Sse;
-import javax.ws.rs.sse.SseEventSink;
+import org.glassfish.jersey.media.sse.EventOutput;
+import org.glassfish.jersey.media.sse.OutboundEvent;
import static de.ids_mannheim.korap.plkexport.Util.*;
@@ -50,9 +50,8 @@
private int maxResults = -1;
private int fetchedResults = 0;
- private SseEventSink sink;
- private Sse sse;
-
+ private EventOutput evOut;
+
public String getMimeType() {
return "text/plain";
};
@@ -150,19 +149,30 @@
public void writeFooter (Writer w) throws IOException { };
public void addMatch (JsonNode n, Writer w) throws IOException { };
- public void setSse (SseEventSink sink, Sse sse) {
- this.sink = sink;
- this.sse = sse;
+ public void setSse (EventOutput eventOutput) {
+ this.evOut = eventOutput;
};
-
+
// Send the progress
private void sendProgress () {
- if (this.sink == null || this.maxResults == 0)
+ if (this.evOut == null || this.maxResults == 0)
return;
-
+
+ if (this.evOut.isClosed())
+ return;
+
int calc = (int) Math.ceil(((double) this.fetchedResults / this.maxResults) * 100);
- this.sink.send(this.sse.newEvent("Progress", String.valueOf(calc)));
+
+ final OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
+ eventBuilder.name("Progress");
+ eventBuilder.data(String.valueOf(calc));
+
+ try {
+ this.evOut.write(eventBuilder.build());
+ } catch (IOException e) {
+ return;
+ };
};
/**
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
index eb644b1..9be9d9e 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
@@ -27,6 +27,7 @@
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
+import javax.ws.rs.Consumes;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
@@ -37,9 +38,11 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.sse.Sse;
-import javax.ws.rs.sse.SseEventSink;
-import javax.ws.rs.sse.OutboundSseEvent;
+
+import org.glassfish.jersey.media.sse.EventOutput;
+import org.glassfish.jersey.media.sse.OutboundEvent;
+import org.glassfish.jersey.media.sse.SseFeature;
+
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
@@ -53,6 +56,7 @@
* TODO:
* - Delete the temp file of the export at the end
* - Do not expect all meta data per match.
+ * - Abort processing when eventsource is closed.
* - Add progress mechanism.
* - Upgrade default pageSize to 50.
* - Add loading marker.
@@ -99,8 +103,8 @@
String ql,
String cutoffStr,
int hitc,
- SseEventSink sink,
- Sse sse) throws WebApplicationException {
+ EventOutput eventOutput
+ ) throws WebApplicationException {
// These parameters are required
String[][] params = {
@@ -127,7 +131,7 @@
) {
cutoff = true;
};
-
+
// Load configuration values
String scheme = prop.getProperty("api.scheme", "https");
String port = prop.getProperty("api.port", "8089");
@@ -214,8 +218,8 @@
};
// set progress mechanism, if required
- if (sse != null && sink != null)
- exp.setSse(sink, sse);
+ if (eventOutput != null)
+ exp.setSse(eventOutput);
// TODO:
// The following could be subsumed in the MatchAggregator
@@ -321,11 +325,11 @@
// @FormParam("islimit") String il
) throws IOException {
- Exporter exp = export(fname, format, q, cq, ql, cutoffStr, hitc, null, null);
+ Exporter exp = export(fname, format, q, cq, ql, cutoffStr, hitc, null);
return exp.serve().build();
};
-
+
/**
* Progress based counterpart to staticExport,
@@ -333,10 +337,9 @@
*/
@GET
@Path("export")
- @Produces("text/event-stream")
- public void progressExport(
- @Context SseEventSink sink,
- @Context Sse sse,
+ @Produces(SseFeature.SERVER_SENT_EVENTS)
+ @Consumes(SseFeature.SERVER_SENT_EVENTS)
+ public Response progressExport(
@QueryParam("fname") String fname,
@QueryParam("format") String format,
@QueryParam("q") String q,
@@ -349,31 +352,67 @@
// https://www.baeldung.com/java-ee-jax-rs-sse
// https://www.howopensource.com/2016/01/java-sse-chat-example/
// https://csetutorials.com/jersey-sse-tutorial.html
+ // https://eclipse-ee4j.github.io/jersey.github.io/documentation/latest/sse.html
+
+ final EventOutput eventOutput = new EventOutput();
// Send initial event
- sink.send(sse.newEvent("Process", "Init"));
+ if (eventOutput.isClosed())
+ return Response.ok("EventSource closed").build();
- try {
- Exporter exp = export(
- fname,
- format,
- q,
- cq,
- ql,
- cutoffStr,
- hitc,
- sink,
- sse
- );
- sink.send(sse.newEvent("Relocate", "..."));
- }
- catch (WebApplicationException wae) {
- sink.send(sse.newEvent("Error",wae.getMessage()));
- };
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ final OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
+ try {
+ eventBuilder.name("Process");
+ eventBuilder.data("init");
+ eventOutput.write(eventBuilder.build());
+ Exporter exp = export(
+ fname,
+ format,
+ q,
+ cq,
+ ql,
+ cutoffStr,
+ hitc,
+ eventOutput
+ );
+ if (eventOutput.isClosed())
+ return;
+ eventBuilder.name("Relocate");
+ eventBuilder.data("...");
+ eventOutput.write(eventBuilder.build());
+ } catch (Exception e) {
+ try {
+ if (eventOutput.isClosed())
+ return;
+ eventBuilder.name("Error");
+ eventBuilder.data(e.getMessage());
+ eventOutput.write(eventBuilder.build());
+ } catch (IOException ioe) {
+ throw new RuntimeException("Error when writing event output.", ioe);
+ };
+ } finally {
+ try {
+ if (eventOutput.isClosed())
+ return;
- sink.send(sse.newEvent("Process", "Done"));
-
- sink.close();
+ eventBuilder.name("Process");
+ eventBuilder.data("done");
+ eventOutput.write(eventBuilder.build());
+ eventOutput.close();
+ } catch (IOException ioClose) {
+ throw new RuntimeException("Error when closing the event output.", ioClose);
+ }
+ };
+ return;
+ }
+ }).start();
+
+ return Response.ok(eventOutput, SseFeature.SERVER_SENT_EVENTS_TYPE)
+ .header("Access-Control-Allow-Origin", "*")
+ .build();
};
diff --git a/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java b/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
index 49311e0..6c51f5e 100644
--- a/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
+++ b/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
@@ -37,7 +37,7 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
-// Sse testing
+// SSE testing
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import org.glassfish.jersey.media.sse.EventListener;
@@ -45,7 +45,6 @@
import org.glassfish.jersey.media.sse.InboundEvent;
import org.glassfish.jersey.media.sse.SseFeature;
import java.util.concurrent.TimeUnit;
-import javax.ws.rs.sse.SseEventSource;
import javax.ws.rs.client.WebTarget;
import java.util.concurrent.CountDownLatch;
@@ -815,9 +814,9 @@
Thread.sleep(2000);
// Check error
- assertEquals(events.getFirst(), "Process:Init");
+ assertEquals(events.getFirst(), "Process:init");
assertEquals(events.get(1), "Error:HTTP 400 Bad Request");
- assertEquals(events.getLast(), "Process:Done");
+ assertEquals(events.getLast(), "Process:done");
assertEquals(events.size(), 3);
eventSource.close();
};
@@ -886,18 +885,16 @@
Thread.sleep(3000);
// Check error
- assertEquals(events.getFirst(), "Process:Init");
+ assertEquals(events.getFirst(), "Process:init");
assertEquals(events.get(1), "Progress:0");
assertEquals(events.get(2), "Progress:56");
assertEquals(events.get(3), "Relocate:...");
- assertEquals(events.getLast(), "Process:Done");
+ assertEquals(events.getLast(), "Process:done");
assertEquals(events.size(), 5);
eventSource.close();
};
-
-
@Test
public void testClientIP () {
assertEquals(getClientIP("10.0.4.6"), "10.0.4.6");