Separate export method from response method and adopt progress calculation for sse

Change-Id: I0d2e6d206a1b19475905528c2f8400dd25c418d7
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java
index 72e4b0e..ca11226 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Exporter.java
@@ -5,6 +5,10 @@
 import java.io.IOException;
 import java.io.Writer;
 
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+
+
 interface Exporter {
 
     // Implemented by MatchAggregator
@@ -24,10 +28,10 @@
     public void setCorpusQueryString (String s);
     public String getSource ();
     public void setSource (String h, String p);
-
     public int getTotalResults ();
     public boolean hasTimeExceeded ();
     public void setMaxResults (int m);
+    public void setSse (SseEventSink sink, Sse sse);
 
     // Implemented by Exporter
     public ResponseBuilder serve();
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
index 41b7b47..c29a7c2 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/MatchAggregator.java
@@ -158,15 +158,11 @@
     // Send the progress
     private void sendProgress () {
 
-        if (this.sink == null)
+        if (this.sink == null || this.maxResults == 0)
             return;
         
-        double calc = Math.ceil(
-            (
-                (float) this.fetchedResults / (float) this.maxResults
-                ) * 100
-            );
-        this.sink.send(this.sse.newEvent("progress", String.valueOf(calc)));
+        int calc = (int) Math.ceil(((double) this.fetchedResults / this.maxResults) * 100);
+        this.sink.send(this.sse.newEvent("Progress", String.valueOf(calc)));
     };
 
     /**
diff --git a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
index e65572c..eb644b1 100644
--- a/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
+++ b/plugin/src/main/java/de/ids_mannheim/korap/plkexport/Service.java
@@ -22,6 +22,7 @@
 import javax.ws.rs.BadRequestException;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.FormParam;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.POST;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -89,40 +90,17 @@
     @Context
     private HttpServletRequest req;     
 
-    /**
-     * WebService calls Kustvakt Search Webservices and returns
-     * response as json (all of the response) and
-     * as rtf (matches)
-     * 
-     * @param fname
-     *            file name
-     * @param format
-     *            the file format value rtf or json.
-     * @param q
-     *            the query
-     * @param ql
-     *            the query language
-     * @param cutoff
-     *            Export more than the first page
-     * 
-     * 
-     */
-    @POST
-    @Path("export")
-    @Produces(MediaType.APPLICATION_OCTET_STREAM)
-    public Response staticExport (
-        @FormParam("fname") String fname,
-        @FormParam("format") String format,
-        @FormParam("q") String q,
-        @FormParam("cq") String cq,
-        @FormParam("ql") String ql,
-        @FormParam("cutoff") String cutoffStr,
-        @FormParam("hitc") int hitc
-        // @FormParam("islimit") String il
-        ) throws IOException {
-
-        // Exporter exp = 
-        // return progressExport(fname, format, q, cq, ql, cutoffStr, hitc, null, null);
+    // Private method to run the export,
+    // either static or streaming
+    private Exporter export (String fname,
+                             String format,
+                             String q,
+                             String cq,
+                             String ql,
+                             String cutoffStr,
+                             int hitc,
+                             SseEventSink sink,
+                             Sse sse) throws WebApplicationException {
         
         // These parameters are required
         String[][] params = {
@@ -134,12 +112,11 @@
         // Check that all parameters are available
         for (int i = 0; i < params.length; i++) {
             if (params[i][1] == null || params[i][1].trim().isEmpty())
-                throw new BadRequestException(
-                    Response
-                    .status(Status.BAD_REQUEST)
-                    .entity("Parameter " + "\""
-                            + params[i][0] + "\"" + " is missing or empty")
-                    .build());
+                throw new WebApplicationException(
+                    responseForm(Status.BAD_REQUEST,
+                                 "Parameter " + "\""
+                                 + params[i][0] + "\"" +
+                                 " is missing or empty"));
         };
         
         // Retrieve cutoff value
@@ -151,9 +128,6 @@
             cutoff = true;
         };
 
-        ResponseBuilder builder = null;
-        Client client = ClientBuilder.newClient();
-
         // Load configuration values
         String scheme  = prop.getProperty("api.scheme", "https");
         String port    = prop.getProperty("api.port", "8089");
@@ -169,7 +143,10 @@
         // If less than pageSize results are requested - dont't fetch more
         if (maxResults < pageSize)
             pageSize = maxResults;
-               
+
+        ResponseBuilder builder = null;
+        Client client = ClientBuilder.newClient();
+        
         // Create initial search uri
         UriBuilder uri = UriBuilder.fromPath("/api/v1.0/search")
             .host(host)
@@ -236,10 +213,15 @@
             exp.setFileName(fname);
         };
 
-        // if (sse != null && sink != null) {
-        //   exp.setSSE(sse, sink);
-        // };
+        // set progress mechanism, if required
+        if (sse != null && sink != null)
+            exp.setSse(sink, sse);
 
+        // TODO:
+        //   The following could be subsumed in the MatchAggregator
+        //   as a "run()" routine.
+
+        
         // Initialize exporter (with meta data and first matches)
         try {
             exp.init(resp);
@@ -258,12 +240,10 @@
         int fetchCount = exp.getTotalResults();
         if (exp.hasTimeExceeded() || fetchCount > maxResults) {
             fetchCount = maxResults;
-        };
-        // TODO:
-        // else {
-        //   setMaxResults()???
-        // }
+        }
 
+        // fetchCount may be different to maxResults now, so reset after init
+        exp.setMaxResults(fetchCount);
 
         // The first page was already enough - ignore paging
         if (fetchCount <= pageSize) {
@@ -273,10 +253,7 @@
         // If only one page should be exported there is no need
         // for a temporary export file
         if (cutoff) {
-
-            // TODO:
-            //   Add method serveAndBuild()
-            return exp.serve().build();
+            return exp;
         };
 
         // Page through all results
@@ -307,34 +284,96 @@
                     )
                 );
         };
+
+        return exp;
+    };
+
+    
+    /**
+     * WebService calls Kustvakt Search Webservices and returns
+     * response as json (all of the response) and
+     * as rtf (matches)
+     * 
+     * @param fname
+     *            file name
+     * @param format
+     *            the file format value rtf or json.
+     * @param q
+     *            the query
+     * @param ql
+     *            the query language
+     * @param cutoff
+     *            Export more than the first page
+     * 
+     * 
+     */
+    @POST
+    @Path("export")
+    @Produces(MediaType.APPLICATION_OCTET_STREAM)
+    public Response staticExport (
+        @FormParam("fname") String fname,
+        @FormParam("format") String format,
+        @FormParam("q") String q,
+        @FormParam("cq") String cq,
+        @FormParam("ql") String ql,
+        @FormParam("cutoff") String cutoffStr,
+        @FormParam("hitc") int hitc
+        // @FormParam("islimit") String il
+        ) throws IOException {
+
+        Exporter exp = export(fname, format, q, cq, ql, cutoffStr, hitc, null, null);
         
         return exp.serve().build();
     };
+    
 
-
+    /**
+     * Progress based counterpart to staticExport,
+     * that requires a GET due to the JavaScript API.
+     */
     @GET
 	@Path("export")
 	@Produces("text/event-stream")
-	public void progressExport(@Context SseEventSink sseEventSink,
-                               @Context Sse sse) throws InterruptedException {
-
-        // Exporter exp = 
-        //   progressExport(fname, format, q, cq, ql, cutoffStr, hitc, null, null);
+	public void progressExport(
+        @Context SseEventSink sink,
+        @Context Sse sse,
+        @QueryParam("fname") String fname,
+        @QueryParam("format") String format,
+        @QueryParam("q") String q,
+        @QueryParam("cq") String cq,
+        @QueryParam("ql") String ql,
+        @QueryParam("cutoff") String cutoffStr,
+        @QueryParam("hitc") int hitc
+        ) throws InterruptedException {
 
         // https://www.baeldung.com/java-ee-jax-rs-sse
         // https://www.howopensource.com/2016/01/java-sse-chat-example/
         // https://csetutorials.com/jersey-sse-tutorial.html
-        sseEventSink.send(sse.newEvent("Init", "Start"));
 
-        int x = 0;
-        while (x < 100) {
-            x++;
-            sseEventSink.send(sse.newEvent("Progress", ""+x));
+        // Send initial event
+        sink.send(sse.newEvent("Process", "Init"));
+
+        try {
+            Exporter exp = export(
+                fname,
+                format,
+                q,
+                cq,
+                ql,
+                cutoffStr,
+                hitc,
+                sink,
+                sse
+                );
+            sink.send(sse.newEvent("Relocate", "..."));
+        }
+        catch (WebApplicationException wae) {
+            sink.send(sse.newEvent("Error",wae.getMessage()));
         };
 
-        sseEventSink.send(sse.newEvent("Relocate", "location"));
-
-        sseEventSink.close();
+        sink.send(sse.newEvent("Process", "Done"));
+        
+        sink.close();
     };
 
     
diff --git a/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java b/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
index be59896..49311e0 100644
--- a/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
+++ b/plugin/src/test/java/de/ids_mannheim/korap/plkexport/ServiceTest.java
@@ -785,19 +785,11 @@
     };
 
     @Test
-    public void testExportWsProgress () throws InterruptedException {
-        // Based on https://stackoverflow.com/questions/35499655/
-        //   how-to-test-server-sent-events-with-spring
-        //   &
-        // https://github.com/jersey/jersey/blob/master/examples/
-        //   server-sent-events-jersey/src/test/java/org/glassfish/
-        //   jersey/examples/sse/jersey/ServerSentEventsTest.java
-
+    public void testExportWsProgressError () throws InterruptedException {
         final LinkedList<String> events = new LinkedList<>();
-        final int eventCount = 102;
+        final int eventCount = 3;
 
-
-        // Expect 102 messages:
+        // Expect messages:
         final CountDownLatch latch = new CountDownLatch(eventCount);
         
         // Create SSE client
@@ -820,16 +812,92 @@
         eventSource.open();
 
         latch.await(1000, TimeUnit.SECONDS);
+        Thread.sleep(2000);
 
-        Thread.sleep(1000);
-
-        assertEquals(events.getFirst(), "Init:Start");
-        assertEquals(events.getLast(), "Relocate:location");
-        assertEquals(events.size(), eventCount);
-
+        // Check error
+        assertEquals(events.getFirst(), "Process:Init");
+        assertEquals(events.get(1), "Error:HTTP 400 Bad Request");
+        assertEquals(events.getLast(), "Process:Done");
+        assertEquals(events.size(), 3);
         eventSource.close();
     };
 
+
+    @Test
+    public void testExportWsProgress () throws InterruptedException {
+        mockClient.reset().when(
+            request()
+            .withMethod("GET")
+            .withPath("/api/v1.0/search")
+            .withQueryStringParameter("q", "Plagegeist")
+            .withQueryStringParameter("count", "5")
+            .withQueryStringParameter("offset", "5")
+            )
+            .respond(
+                response()
+                .withHeader("Content-Type: application/json; charset=utf-8")
+                .withBody(getFixture("response_plagegeist_2.json"))
+                .withStatusCode(200)
+                );
+
+        mockClient.when(
+            request()
+            .withMethod("GET")
+            .withPath("/api/v1.0/search")
+            .withQueryStringParameter("q", "Plagegeist")
+            )
+            .respond(
+                response()
+                .withHeader("Content-Type: application/json; charset=utf-8")
+                .withBody(getFixture("response_plagegeist_1.json"))
+                .withStatusCode(200)
+                );
+
+        // Based on https://stackoverflow.com/questions/35499655/
+        //   how-to-test-server-sent-events-with-spring
+        //   &
+        // https://github.com/jersey/jersey/blob/master/examples/
+        //   server-sent-events-jersey/src/test/java/org/glassfish/
+        //   jersey/examples/sse/jersey/ServerSentEventsTest.java
+
+        final LinkedList<String> events = new LinkedList<>();
+
+        // Create SSE client
+        Client client = ClientBuilder
+            .newBuilder()
+            .register(SseFeature.class)
+            .build();
+
+        EventSource eventSource = EventSource
+            .target(target("/export")
+                    .queryParam("q", "Plagegeist")
+                    .queryParam("ql","poliqarp")
+                    .queryParam("format","rtf"))
+            .reconnectingEvery(300, TimeUnit.SECONDS)
+            .build();
+
+        EventListener listener = inboundEvent -> {
+            events.add(inboundEvent.getName() + ":" + inboundEvent.readData(String.class));
+        };
+
+        eventSource.register(listener);
+        eventSource.open();
+
+        Thread.sleep(3000);
+
+        // Check error
+        assertEquals(events.getFirst(), "Process:Init");
+        assertEquals(events.get(1), "Progress:0");
+        assertEquals(events.get(2), "Progress:56");
+        assertEquals(events.get(3), "Relocate:...");
+        assertEquals(events.getLast(), "Process:Done");
+        assertEquals(events.size(), 5);
+        eventSource.close();
+    };
+
+    
+
+    
     @Test
     public void testClientIP () {
         assertEquals(getClientIP("10.0.4.6"), "10.0.4.6");