Switch to jersey native SSE to allow for CORS injection

Change-Id: Ic452cc51042404f0424b3c43cb52a252478caf56
diff --git a/plugin/pom.xml b/plugin/pom.xml
index 3ca25a0..d5a86f3 100644
--- a/plugin/pom.xml
+++ b/plugin/pom.xml
@@ -4,7 +4,7 @@
   <groupId>de.ids_mannheim.korap</groupId>
   <artifactId>KalamarExportPlugin</artifactId>
   <packaging>jar</packaging>
-  <version>0.2.0</version>
+  <version>0.2.1</version>
   <name>KalamarExportPlugin</name>
   <url>http://maven.apache.org</url>
   
@@ -55,10 +55,17 @@
       <version>${jersey.version}</version>
     </dependency>
     <dependency>
+    <groupId>org.glassfish.jersey.containers</groupId>
+    <artifactId>jersey-container-servlet</artifactId>
+    <version>${jersey.version}</version>
+    </dependency>
+    <!--
+    <dependency>
       <groupId>org.glassfish.jersey.containers</groupId>
       <artifactId>jersey-container-servlet-core</artifactId>
       <version>${jersey.version}</version>
-    </dependency>
+      </dependency>
+      -->
 
     <!-- Jersey test framework -->
     <dependency>
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/ExWSConf.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/ExWSConf.java
index b781394..b4523d2 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/ExWSConf.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/ExWSConf.java
@@ -17,7 +17,7 @@
     // Version of Export Plugin
     public static final int VERSION_MAJOR = 0;
     public static final int VERSION_MINOR = 2;
-    public static final int VERSION_PATCHLEVEL= 0;
+    public static final int VERSION_PATCHLEVEL = 1;
 
     private static Properties prop;
 
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java
index ca11226..c82b35b 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java
@@ -5,8 +5,7 @@
 import java.io.IOException;
 import java.io.Writer;
 
-import javax.ws.rs.sse.Sse;
-import javax.ws.rs.sse.SseEventSink;
+import org.glassfish.jersey.media.sse.EventOutput;
 
 
 interface Exporter {
@@ -31,7 +30,7 @@
     public int getTotalResults ();
     public boolean hasTimeExceeded ();
     public void setMaxResults (int m);
-    public void setSse (SseEventSink sink, Sse sse);
+    public void setSse (EventOutput sse);
 
     // Implemented by Exporter
     public ResponseBuilder serve();
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
index c29a7c2..d3de004 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
@@ -23,8 +23,8 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 
-import javax.ws.rs.sse.Sse;
-import javax.ws.rs.sse.SseEventSink;
+import org.glassfish.jersey.media.sse.EventOutput;
+import org.glassfish.jersey.media.sse.OutboundEvent;
 
 import static de.ids_mannheim.korap.plkexport.Util.*;
 
@@ -50,9 +50,8 @@
     private int maxResults = -1;
     private int fetchedResults = 0;
 
-    private SseEventSink sink;
-    private Sse sse;
-
+    private EventOutput evOut;
+    
     public String getMimeType() {
         return "text/plain";
     };
@@ -150,19 +149,30 @@
     public void writeFooter (Writer w) throws IOException { };
     public void addMatch (JsonNode n, Writer w) throws IOException { };
 
-    public void setSse (SseEventSink sink, Sse sse) {
-        this.sink = sink;
-        this.sse = sse;
+    public void setSse (EventOutput eventOutput) {
+        this.evOut = eventOutput;
     };
-
+    
     // Send the progress
     private void sendProgress () {
 
-        if (this.sink == null || this.maxResults == 0)
+        if (this.evOut == null || this.maxResults == 0)
             return;
-        
+
+        if (this.evOut.isClosed())
+            return;
+         
         int calc = (int) Math.ceil(((double) this.fetchedResults / this.maxResults) * 100);
-        this.sink.send(this.sse.newEvent("Progress", String.valueOf(calc)));
+
+        final OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
+        eventBuilder.name("Progress");
+        eventBuilder.data(String.valueOf(calc));
+
+        try {
+            this.evOut.write(eventBuilder.build());
+        } catch (IOException e) {
+            return;
+        };
     };
 
     /**
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
index eb644b1..9be9d9e 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
@@ -27,6 +27,7 @@
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.Consumes;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.WebTarget;
@@ -37,9 +38,11 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.sse.Sse;
-import javax.ws.rs.sse.SseEventSink;
-import javax.ws.rs.sse.OutboundSseEvent;
+
+import org.glassfish.jersey.media.sse.EventOutput;
+import org.glassfish.jersey.media.sse.OutboundEvent;
+import org.glassfish.jersey.media.sse.SseFeature;
+
 import javax.servlet.http.Cookie;
 import javax.servlet.http.HttpServletRequest;
 
@@ -53,6 +56,7 @@
  * TODO:
  * - Delete the temp file of the export at the end
  * - Do not expect all meta data per match.
+ * - Abort processing when eventsource is closed.
  * - Add progress mechanism.
  * - Upgrade default pageSize to 50.
  * - Add loading marker.
@@ -99,8 +103,8 @@
                              String ql,
                              String cutoffStr,
                              int hitc,
-                             SseEventSink sink,
-                             Sse sse) throws WebApplicationException {
+                             EventOutput eventOutput
+        ) throws WebApplicationException {
         
         // These parameters are required
         String[][] params = {
@@ -127,7 +131,7 @@
             ) {
             cutoff = true;
         };
-
+        
         // Load configuration values
         String scheme  = prop.getProperty("api.scheme", "https");
         String port    = prop.getProperty("api.port", "8089");
@@ -214,8 +218,8 @@
         };
 
         // set progress mechanism, if required
-        if (sse != null && sink != null)
-            exp.setSse(sink, sse);
+        if (eventOutput != null)
+            exp.setSse(eventOutput);
 
         // TODO:
         //   The following could be subsumed in the MatchAggregator
@@ -321,11 +325,11 @@
         // @FormParam("islimit") String il
         ) throws IOException {
 
-        Exporter exp = export(fname, format, q, cq, ql, cutoffStr, hitc, null, null);
+        Exporter exp = export(fname, format, q, cq, ql, cutoffStr, hitc, null);
         
         return exp.serve().build();
     };
-    
+
 
     /**
      * Progress based counterpart to staticExport,
@@ -333,10 +337,9 @@
      */
     @GET
 	@Path("export")
-	@Produces("text/event-stream")
-	public void progressExport(
-        @Context SseEventSink sink,
-        @Context Sse sse,
+    @Produces(SseFeature.SERVER_SENT_EVENTS)
+    @Consumes(SseFeature.SERVER_SENT_EVENTS)
+	public Response progressExport(
         @QueryParam("fname") String fname,
         @QueryParam("format") String format,
         @QueryParam("q") String q,
@@ -349,31 +352,67 @@
         // https://www.baeldung.com/java-ee-jax-rs-sse
         // https://www.howopensource.com/2016/01/java-sse-chat-example/
         // https://csetutorials.com/jersey-sse-tutorial.html
+        // https://eclipse-ee4j.github.io/jersey.github.io/documentation/latest/sse.html
+        
+        final EventOutput eventOutput = new EventOutput();
 
         // Send initial event
-        sink.send(sse.newEvent("Process", "Init"));
+        if (eventOutput.isClosed())
+            return Response.ok("EventSource closed").build();
 
-        try {
-            Exporter exp = export(
-                fname,
-                format,
-                q,
-                cq,
-                ql,
-                cutoffStr,
-                hitc,
-                sink,
-                sse
-                );
-            sink.send(sse.newEvent("Relocate", "..."));
-        }
-        catch (WebApplicationException wae) {
-            sink.send(sse.newEvent("Error",wae.getMessage()));
-        };
+        new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    final OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
+                    try {
+                        eventBuilder.name("Process");
+                        eventBuilder.data("init");
+                        eventOutput.write(eventBuilder.build());
+                        Exporter exp = export(
+                            fname,
+                            format,
+                            q,
+                            cq,
+                            ql,
+                            cutoffStr,
+                            hitc,
+                            eventOutput
+                            );
+                        if (eventOutput.isClosed())
+                            return;
+                        eventBuilder.name("Relocate");
+                        eventBuilder.data("...");
+                        eventOutput.write(eventBuilder.build());
+                    } catch (Exception e) {
+                        try {
+                            if (eventOutput.isClosed())
+                                return;
+                            eventBuilder.name("Error");
+                            eventBuilder.data(e.getMessage());
+                            eventOutput.write(eventBuilder.build());
+                        } catch (IOException ioe) {
+                            throw new RuntimeException("Error when writing event output.", ioe);
+                        };
+                    } finally {
+                        try {
+                            if (eventOutput.isClosed())
+                                return;
 
-        sink.send(sse.newEvent("Process", "Done"));
-        
-        sink.close();
+                            eventBuilder.name("Process");
+                            eventBuilder.data("done");
+                            eventOutput.write(eventBuilder.build());                        
+                            eventOutput.close();
+                        } catch (IOException ioClose) {
+                            throw new RuntimeException("Error when closing the event output.", ioClose);
+                        }
+                    };
+                    return;
+                }
+            }).start();      
+
+        return Response.ok(eventOutput, SseFeature.SERVER_SENT_EVENTS_TYPE)
+            .header("Access-Control-Allow-Origin", "*")
+            .build();
     };
 
     
diff --git a/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java b/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
index 49311e0..6c51f5e 100644
--- a/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
+++ b/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
@@ -37,7 +37,7 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
-// Sse testing
+// SSE testing
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import org.glassfish.jersey.media.sse.EventListener;
@@ -45,7 +45,6 @@
 import org.glassfish.jersey.media.sse.InboundEvent;
 import org.glassfish.jersey.media.sse.SseFeature;
 import java.util.concurrent.TimeUnit;
-import javax.ws.rs.sse.SseEventSource;
 import javax.ws.rs.client.WebTarget;
 import java.util.concurrent.CountDownLatch;
 
@@ -815,9 +814,9 @@
         Thread.sleep(2000);
 
         // Check error
-        assertEquals(events.getFirst(), "Process:Init");
+        assertEquals(events.getFirst(), "Process:init");
         assertEquals(events.get(1), "Error:HTTP 400 Bad Request");
-        assertEquals(events.getLast(), "Process:Done");
+        assertEquals(events.getLast(), "Process:done");
         assertEquals(events.size(), 3);
         eventSource.close();
     };
@@ -886,18 +885,16 @@
         Thread.sleep(3000);
 
         // Check error
-        assertEquals(events.getFirst(), "Process:Init");
+        assertEquals(events.getFirst(), "Process:init");
         assertEquals(events.get(1), "Progress:0");
         assertEquals(events.get(2), "Progress:56");
         assertEquals(events.get(3), "Relocate:...");
-        assertEquals(events.getLast(), "Process:Done");
+        assertEquals(events.getLast(), "Process:done");
         assertEquals(events.size(), 5);
         eventSource.close();
     };
 
     
-
-    
     @Test
     public void testClientIP () {
         assertEquals(getClientIP("10.0.4.6"), "10.0.4.6");