Introduce SSE example (not yet functional)

Change-Id: I5d3ddc6058e28e3e9125dfbe2201c1c1667d174e
diff --git a/plugin/pom.xml b/plugin/pom.xml
index fb0de66..3ca25a0 100644
--- a/plugin/pom.xml
+++ b/plugin/pom.xml
@@ -9,6 +9,7 @@
   <url>http://maven.apache.org</url>
   
   <properties>
+	  <java.version>1.8</java.version>
     <jetty.version>9.4.31.v20200723</jetty.version>
     <jersey.version>2.27</jersey.version>
   </properties>
@@ -96,6 +97,13 @@
       <scope>test</scope>
     </dependency>
 
+    <!-- https://mvnrepository.com/artifact/org.glassfish.jersey.media/jersey-media-sse -->
+    <dependency>
+      <groupId>org.glassfish.jersey.media</groupId>
+      <artifactId>jersey-media-sse</artifactId>
+      <version>${jersey.version}</version>
+    </dependency>
+
   </dependencies>
   
   <build>
@@ -139,6 +147,18 @@
           </execution>
         </executions>
       </plugin>
+
+      <plugin>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.8.1</version>
+				<inherited>true</inherited>
+				<configuration>
+					<showWarnings>true</showWarnings>
+					<source>${java.version}</source>
+					<target>${java.version}</target>
+				</configuration>
+			</plugin>
+
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-resources-plugin</artifactId>
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
index c7b447f..41b7b47 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
@@ -23,6 +23,9 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+
 import static de.ids_mannheim.korap.plkexport.Util.*;
 
 /**
@@ -47,6 +50,9 @@
     private int maxResults = -1;
     private int fetchedResults = 0;
 
+    private SseEventSink sink;
+    private Sse sse;
+
     public String getMimeType() {
         return "text/plain";
     };
@@ -143,7 +149,25 @@
     public void writeHeader (Writer w) throws IOException { };
     public void writeFooter (Writer w) throws IOException { };
     public void addMatch (JsonNode n, Writer w) throws IOException { };
-   
+
+    public void setSse (SseEventSink sink, Sse sse) {
+        this.sink = sink;
+        this.sse = sse;
+    };
+
+    // Send the progress
+    private void sendProgress () {
+
+        if (this.sink == null)
+            return;
+        
+        double calc = Math.ceil(
+            (
+                (float) this.fetchedResults / (float) this.maxResults
+                ) * 100
+            );
+        this.sink.send(this.sse.newEvent("progress", String.valueOf(calc)));
+    };
 
     /**
      * Create new match aggregator and parse initial Json
@@ -255,7 +279,7 @@
         // Catch error
         catch (IOException io) {
         };
-
+        
         // TODO:
         //   Return exporter error
         return Response.status(500).entity("error");
@@ -264,6 +288,9 @@
 
     // Iterate through all matches
     private boolean iterateThroughMatches (JsonNode mNodes) throws IOException {
+
+        this.sendProgress();
+
         if (mNodes == null)
             return false;
         
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
index 3f250f4..e65572c 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
@@ -8,6 +8,7 @@
 import java.io.InputStream;
 import java.lang.Thread;
 import java.net.URLEncoder;
+import java.net.ConnectException;
 import java.util.HashMap;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -35,8 +36,10 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+import javax.ws.rs.sse.OutboundSseEvent;
 import javax.servlet.http.Cookie;
-import java.net.ConnectException;
 import javax.servlet.http.HttpServletRequest;
 
 import static de.ids_mannheim.korap.plkexport.Util.*;
@@ -107,7 +110,7 @@
     @POST
     @Path("export")
     @Produces(MediaType.APPLICATION_OCTET_STREAM)
-    public Response export (
+    public Response staticExport (
         @FormParam("fname") String fname,
         @FormParam("format") String format,
         @FormParam("q") String q,
@@ -118,6 +121,9 @@
         // @FormParam("islimit") String il
         ) throws IOException {
 
+        // Exporter exp = 
+        // return progressExport(fname, format, q, cq, ql, cutoffStr, hitc, null, null);
+        
         // These parameters are required
         String[][] params = {
             { "format", format },
@@ -230,6 +236,10 @@
             exp.setFileName(fname);
         };
 
+        // if (sse != null && sink != null) {
+        //   exp.setSSE(sse, sink);
+        // };
+
         // Initialize exporter (with meta data and first matches)
         try {
             exp.init(resp);
@@ -244,12 +254,16 @@
                 );
         };
 
-
         // Calculate how many results to fetch
         int fetchCount = exp.getTotalResults();
         if (exp.hasTimeExceeded() || fetchCount > maxResults) {
             fetchCount = maxResults;
         };
+        // TODO:
+        // else {
+        //   setMaxResults()???
+        // }
+
 
         // The first page was already enough - ignore paging
         if (fetchCount <= pageSize) {
@@ -259,47 +273,72 @@
         // If only one page should be exported there is no need
         // for a temporary export file
         if (cutoff) {
-            builder = exp.serve();
-        }
+
+            // TODO:
+            //   Add method serveAndBuild()
+            return exp.serve().build();
+        };
 
         // Page through all results
-        else {
 
-            // It's not important anymore to get totalResults
-            uri.queryParam("cutoff", "true");
+        // It's not important anymore to get totalResults
+        uri.queryParam("cutoff", "true");
 
-            // Set offset for paging as a template
-            uri.queryParam("offset", "{offset}");
+        // Set offset for paging as a template
+        uri.queryParam("offset", "{offset}");
 
-            try {
+        try {
             
-                // Iterate over all results
-                for (int i = pageSize; i <= fetchCount; i+=pageSize) {
-                    resource = client.target(uri.build(i));
-                    reqBuilder = resource.request(MediaType.APPLICATION_JSON);
-                    resp = authBuilder(reqBuilder, xff, auth).get(String.class);
+            // Iterate over all results
+            for (int i = pageSize; i <= fetchCount; i+=pageSize) {
+                resource = client.target(uri.build(i));
+                reqBuilder = resource.request(MediaType.APPLICATION_JSON);
+                resp = authBuilder(reqBuilder, xff, auth).get(String.class);
 
-                    // Stop when no more matches are allowed
-                    if (!exp.appendMatches(resp))
-                        break;
-                }
-            } catch (Exception e) {
-                throw new WebApplicationException(
-                    responseForm(
-                        Status.INTERNAL_SERVER_ERROR,
-                        e.getMessage()
-                        )
-                    );
-            };
-
-            builder = exp.serve();
-        };        
-
-        return builder.build();
+                // Stop when no more matches are allowed
+                if (!exp.appendMatches(resp))
+                    break;
+            }
+        } catch (Exception e) {
+            throw new WebApplicationException(
+                responseForm(
+                    Status.INTERNAL_SERVER_ERROR,
+                    e.getMessage()
+                    )
+                );
+        };
+        
+        return exp.serve().build();
     };
 
 
     @GET
+	@Path("export")
+	@Produces("text/event-stream")
+	public void progressExport(@Context SseEventSink sseEventSink,
+                               @Context Sse sse) throws InterruptedException {
+
+        // Exporter exp = 
+        //   progressExport(fname, format, q, cq, ql, cutoffStr, hitc, null, null);
+
+        // https://www.baeldung.com/java-ee-jax-rs-sse
+        // https://www.howopensource.com/2016/01/java-sse-chat-example/
+        // https://csetutorials.com/jersey-sse-tutorial.html
+        sseEventSink.send(sse.newEvent("Init", "Start"));
+
+        int x = 0;
+        while (x < 100) {
+            x++;
+            sseEventSink.send(sse.newEvent("Progress", ""+x));
+        };
+
+        sseEventSink.send(sse.newEvent("Relocate", "location"));
+
+        sseEventSink.close();
+    };
+
+    
+    @GET
     @Path("export")
     @Produces(MediaType.TEXT_HTML)
     public Response exportHTML () {
diff --git a/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java b/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
index 226f610..be59896 100644
--- a/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
+++ b/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
@@ -24,6 +24,7 @@
 import java.net.URLDecoder;
 
 import java.util.List;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
@@ -36,6 +37,18 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
+// Sse testing
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import org.glassfish.jersey.media.sse.EventListener;
+import org.glassfish.jersey.media.sse.EventSource;
+import org.glassfish.jersey.media.sse.InboundEvent;
+import org.glassfish.jersey.media.sse.SseFeature;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.sse.SseEventSource;
+import javax.ws.rs.client.WebTarget;
+import java.util.concurrent.CountDownLatch;
+
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.test.JerseyTest;
 
@@ -68,6 +81,7 @@
          .getLogger("org.mockserver"))
             .setLevel(ch.qos.logback.classic.Level.OFF);
 
+        // Unfortunately this means the tests can't run in parallel
         mockServer = ClientAndServer.startClientAndServer(34765);
         mockClient = new MockServerClient("localhost", mockServer.getPort());
 
@@ -770,6 +784,51 @@
         properties.setProperty("api.port", portTemp);
     };
 
+    @Test
+    public void testExportWsProgress () throws InterruptedException {
+        // Based on https://stackoverflow.com/questions/35499655/
+        //   how-to-test-server-sent-events-with-spring
+        //   &
+        // https://github.com/jersey/jersey/blob/master/examples/
+        //   server-sent-events-jersey/src/test/java/org/glassfish/
+        //   jersey/examples/sse/jersey/ServerSentEventsTest.java
+
+        final LinkedList<String> events = new LinkedList<>();
+        final int eventCount = 102;
+
+
+        // Expect 102 messages:
+        final CountDownLatch latch = new CountDownLatch(eventCount);
+        
+        // Create SSE client
+        Client client = ClientBuilder
+            .newBuilder()
+            .register(SseFeature.class)
+            .build();
+
+        EventSource eventSource = EventSource
+            .target(target("/export"))
+            .reconnectingEvery(300, TimeUnit.SECONDS)
+            .build();
+
+        EventListener listener = inboundEvent -> {
+            events.add(inboundEvent.getName() + ":" + inboundEvent.readData(String.class));
+            latch.countDown();
+        };
+
+        eventSource.register(listener);
+        eventSource.open();
+
+        latch.await(1000, TimeUnit.SECONDS);
+
+        Thread.sleep(1000);
+
+        assertEquals(events.getFirst(), "Init:Start");
+        assertEquals(events.getLast(), "Relocate:location");
+        assertEquals(events.size(), eventCount);
+
+        eventSource.close();
+    };
 
     @Test
     public void testClientIP () {