Handled invalid pipes and added tests.

Change-Id: Iba20cbbfb60fa10a769bee697295943b0e4a1772
diff --git a/core/Changes b/core/Changes
index 291e020..11265f7 100644
--- a/core/Changes
+++ b/core/Changes
@@ -6,6 +6,8 @@
      resolved #47) 
 12/12/2019
    - Added support for multiple cq parameters (margaretha, resolved #46)
+13/12/2019
+   - Handled pipe errors and added tests (margaretha)   
 
 # version 0.62.2
 13/11/2019
@@ -14,7 +16,7 @@
 14/11/2019
    - Added a check for OAuth2 client, VC, and group name length (margaretha)
 22/11/2019
-   - Updated the statistic API with KoralQuery (margaretha)  
+   - Updated the statistic API with KoralQuery (margaretha)
 
 # version 0.62.1
 08/07/2019
@@ -25,7 +27,7 @@
 11/07/2019
    - Added cq parameter to VC statistics API and deprecate corpusQuery (margaretha)
 15/07/2019
-   - Added backward compatibility support for corpusQuery parameter (margaretha)    
+   - Added backward compatibility support for corpusQuery parameter (margaretha)
 28/08/2019
    - Resolved #49. Added page param check in the search api (margaretha)
 23/09/2019   
@@ -69,7 +71,7 @@
    - Updated code structure (margaretha)
    - Removed spring security libraries and ManagerInterface (margaretha)
 21/01/2019
-   - Removed codes related to user registration & password management (margaretha)  
+   - Removed codes related to user registration & password management (margaretha)
 22/01/2019
    - Added create, edit, retrieve user default setting controllers (margaretha)
 24/01/2019
@@ -117,12 +119,12 @@
    - Moved license regex configuration to full (margaretha)
    - Added defaultRewriteConstraints (margaretha)
 29/10/2018
-    - Added javax.servlet-api (margaretha)
-    - Updated koral version (margaretha)
-    
+   - Added javax.servlet-api (margaretha)
+   - Updated koral version (margaretha)
+   
 version 0.61.1
 28/08/2018
-    - Added API URL versioning (margaretha)
+   - Added API URL versioning (margaretha)
 
 30/08/2018
    - Added backwards compability for URL versioning (margaretha)
@@ -130,65 +132,65 @@
 
 version 0.61.0
 13/08/2018
-	- Updated Krill and Koral versions enabling VC caching and referencing (margaretha)
-	- Set kustvakt configuration as Krill properties (margaretha)
-	
+   - Updated Krill and Koral versions enabling VC caching and referencing (margaretha)
+   - Set kustvakt configuration as Krill properties (margaretha)
+   
 version 0.60.5
 09/07/2018
-	- Fixed status codes (margaretha)
-	- Added KustvaktException for wrapping another exception (margaretha)
-	- Removed deprecated codes regarding KustvaktResource (margaretha)
-	
+   - Fixed status codes (margaretha)
+   - Added KustvaktException for wrapping another exception (margaretha)
+   - Removed deprecated codes regarding KustvaktResource (margaretha)
+   
 version 0.60.4
 25/06/2018
-	- added the redirect URI property in KustvaktException (margaretha)
-	- added openid related status codes (margaretha)
+   - added the redirect URI property in KustvaktException (margaretha)
+   - added openid related status codes (margaretha)
 
 version 0.60.3
 30/05/2018
-	- added parameter checker for collection (margaretha)
-	- updated krill version (margaretha)
-	
+   - added parameter checker for collection (margaretha)
+   - updated krill version (margaretha)
+   
 version 0.60.2
 25/04/2018
-	- rearranged and cleaned up codes (margaretha)
-	- generalized some KustvaktException methods (margaretha)
-	- added status codes (margaretha)
-	- updated FormRequestWrapper constructor (margaretha)
-	- fixed get request null parameter in FormRequestWrapper (margaretha)
-	
+   - rearranged and cleaned up codes (margaretha)
+   - generalized some KustvaktException methods (margaretha)
+   - added status codes (margaretha)
+   - updated FormRequestWrapper constructor (margaretha)
+   - fixed get request null parameter in FormRequestWrapper (margaretha)
+   
 version 0.60.1
 14/03/2018
-	- removed AdminHandlerIface (margaretha)
-	- removed isSystemAdmin in User class (margaretha)
+   - removed AdminHandlerIface (margaretha)
+   - removed isSystemAdmin in User class (margaretha)
 
 version 0.60
 13/03/2018
-	- removed old policy and deprecated code (margaretha)
-	- moved authentication related code to /full (margaretha)
-	- elaborated default layer names in the configuration file (margaretha)
-	- added GROUP_DELETED status code (margaretha)
+   - removed old policy and deprecated code (margaretha)
+   - moved authentication related code to /full (margaretha)
+   - elaborated default layer names in the configuration file (margaretha)
+   - added GROUP_DELETED status code (margaretha)
 
 version 0.59.10 
 20/02/2018
-	- updated hibernate and reflection versions (margaretha)
-	- added Changes file (margaretha)
-	- merged BeanConfigBaseTest to BeanConfigTest in /full (margaretha)
-	- added status code for already deleted entry (margaretha)
-	- updated library versions and java environment (margaretha)
-	- added status codes (margaretha)
-	- moved validation.properties (margaretha)
-	- fixed unrecognized media-type application/json (margaretha)
-	
-version 0.59.9 	
+   - updated hibernate and reflection versions (margaretha)
+   - added Changes file (margaretha)
+   - merged BeanConfigBaseTest to BeanConfigTest in /full (margaretha)
+   - added status code for already deleted entry (margaretha)
+   - updated library versions and java environment (margaretha)
+   - added status codes (margaretha)
+   - moved validation.properties (margaretha)
+   - fixed unrecognized media-type application/json (margaretha)
+   
+version 0.59.9
 08/11/2017
-	- fixed missing exception in JsonUtils (margaretha)
-	- fixed and restructured KustvaktResponseHandler (margaretha)
-	- updated status code in ParameterChecker (margaretha)
-	
+   - fixed missing exception in JsonUtils (margaretha)
+   - fixed and restructured KustvaktResponseHandler (margaretha)
+   - updated status code in ParameterChecker (margaretha)
+   
 version 0.59.8 
 24/10/2017
-	- restructured Kustvakt and created core project (margaretha)
-	- marked loader classes as deprecated (margaretha)
-	- updated Spring version (margaretha)
-	- moved unnecessary dependencies (margaretha)
\ No newline at end of file
+   - restructured Kustvakt and created core project (margaretha)
+   - marked loader classes as deprecated (margaretha)
+   - updated Spring version (margaretha)
+   - moved unnecessary dependencies (margaretha)
\ No newline at end of file
diff --git a/core/src/main/java/de/ids_mannheim/korap/config/KustvaktConfiguration.java b/core/src/main/java/de/ids_mannheim/korap/config/KustvaktConfiguration.java
index 273250a..26075d1 100644
--- a/core/src/main/java/de/ids_mannheim/korap/config/KustvaktConfiguration.java
+++ b/core/src/main/java/de/ids_mannheim/korap/config/KustvaktConfiguration.java
@@ -106,11 +106,13 @@
     // another variable might be needed to define which metadata fields are restricted 
     private boolean isMetadataRestricted = false;
     
+    // EM: Maybe needed we support pipe registration
+    @Deprecated
     public static Map<String, String> pipes = new HashMap<>();
     
     public KustvaktConfiguration (Properties properties) throws Exception {
         load(properties);
-        readPipesFile("pipes");
+//        readPipesFile("pipes");
         KrillProperties.setProp(properties);
     }
 
@@ -201,6 +203,7 @@
         // "accountCreation");
     }
     
+    @Deprecated
     public void readPipesFile (String filename) throws IOException {
         File file = new File(filename);
         if (file.exists()) {
diff --git a/core/src/main/java/de/ids_mannheim/korap/exceptions/StatusCodes.java b/core/src/main/java/de/ids_mannheim/korap/exceptions/StatusCodes.java
index a2ebe5f..52de571 100644
--- a/core/src/main/java/de/ids_mannheim/korap/exceptions/StatusCodes.java
+++ b/core/src/main/java/de/ids_mannheim/korap/exceptions/StatusCodes.java
@@ -5,7 +5,7 @@
 import de.ids_mannheim.korap.config.ConfigLoader;
 
 /**
- * @author hanl
+ * @author hanl, margaretha
  * @date 07/09/2014
  */
 public class StatusCodes {
@@ -43,20 +43,18 @@
 
     /**
      * 300 status codes for query language and serialization
+     * see Koral (de.ids_mannheim.korap.query.serialize.util.StatusCodes)
      */
 
-    public static final int NO_QUERY = 301;
-//    public static final int INVALID_TYPE = 302;
-//    public static final int SERIALIZATION_FAILED = 300;
-    
     /**
-     *  400 status codes for authorization and rewrite functions
+     *  400 status codes for rewrite functions
      */
 
-    // fixme: use unsupported resource and include type in return message
-    public static final int POLICY_ERROR_DEFAULT = 400;
+    public static final int REWRITE_ERROR_DEFAULT = 400;
     public static final int NON_PUBLIC_FIELD_IGNORED = 401;
-    public static final int UNSUPPORTED_RESOURCE = 402;
+    public static final int PIPE_FAILED = 402;
+    
+//    public static final int UNSUPPORTED_RESOURCE = 402;
     //    public static final int REWRITE_FAILED = 403;
     //public static final int UNSUPPORTED_FOUNDRY = 403;
     //public static final int UNSUPPORTED_CORPUS = 404;
@@ -67,10 +65,10 @@
     //public static final int FOUNDRY_REWRITE = 408;
     //public static final int FOUNDRY_INJECTION = 409;
     //    public static final int MISSING_RESOURCE = 405;
-    public static final int NO_POLICY_TARGET = 406;
-    public static final int NO_POLICY_CONDITION = 407;
-    public static final int NO_POLICY_PERMISSION = 408;
-    public static final int NO_POLICIES = 409;
+//    public static final int NO_POLICY_TARGET = 406;
+//    public static final int NO_POLICY_CONDITION = 407;
+//    public static final int NO_POLICY_PERMISSION = 408;
+//    public static final int NO_POLICIES = 409;
 
 
 
diff --git a/core/src/main/java/de/ids_mannheim/korap/service/SearchService.java b/core/src/main/java/de/ids_mannheim/korap/service/SearchService.java
index 0d1db18..a40cbda 100644
--- a/core/src/main/java/de/ids_mannheim/korap/service/SearchService.java
+++ b/core/src/main/java/de/ids_mannheim/korap/service/SearchService.java
@@ -12,12 +12,15 @@
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.UriBuilder;
 
+import org.apache.http.HttpStatus;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
@@ -30,6 +33,7 @@
 import de.ids_mannheim.korap.exceptions.StatusCodes;
 import de.ids_mannheim.korap.query.serialize.MetaQueryBuilder;
 import de.ids_mannheim.korap.query.serialize.QuerySerializer;
+import de.ids_mannheim.korap.response.Notifications;
 import de.ids_mannheim.korap.rewrite.RewriteHandler;
 import de.ids_mannheim.korap.user.User;
 import de.ids_mannheim.korap.user.User.CorpusAccess;
@@ -187,22 +191,71 @@
 
     }
 
-    private String runPipes (String query, String[] pipeArray) {
+    /**
+     * Pipes are service URLs for modifying KoralQuery. A POST request
+     * with Content-Type application/json will be sent for each pipe.
+     * Kustvakt expects a KoralQuery in JSON format as the pipe response. 
+     * 
+     * @param query the original koral query
+     * @param pipeArray the pipe service URLs
+     * @param serializer the query serializer
+     * @return a modified koral query
+     * @throws KustvaktException 
+     */
+    private String runPipes (String query, String[] pipeArray) throws KustvaktException {
         if (pipeArray !=null){
             for (int i=0; i<pipeArray.length; i++){
-                String url = KustvaktConfiguration.pipes.get(pipeArray[i]);
-                // update query by sending it to a pipe URL
-                // NOTE: request formulation may vary depending on the service
-                Client client = Client.create();
-                WebResource resource = client.resource(url);
-                ClientResponse response =
-                        resource.type(MediaType.APPLICATION_JSON)
-                                .post(ClientResponse.class, query);
-                query = response.getEntity(String.class);
+                String pipeURL = pipeArray[i];
+                try {
+                    Client client = Client.create();
+                    WebResource resource = client.resource(pipeURL);
+                    ClientResponse response =
+                            resource.type(MediaType.APPLICATION_JSON)
+                                    .accept(MediaType.APPLICATION_JSON)
+                                    .post(ClientResponse.class, query);
+                    if (response.getStatus() == HttpStatus.SC_OK) {
+                        String entity = response.getEntity(String.class);
+                        if (entity != null && !entity.isEmpty()) {
+                            query = entity;
+                        }
+                    }
+                    else {
+                        query = handlePipeError(query, pipeURL,
+                                response.getStatus() + " "
+                                        + response.getStatusInfo().toString());
+                    }
+                }
+                catch (Exception e) {
+                    query = handlePipeError(query, pipeURL,
+                            e.getMessage());
+                }
             }
         }
         return query;
     }
+    
+    private String handlePipeError (String query, String url,
+            String message) throws KustvaktException {
+        jlog.error("Failed running the pipe at " + url + ". Message: "+ message);
+       
+        Notifications n = new Notifications();
+        n.addWarning(StatusCodes.PIPE_FAILED,
+                "Pipe failed", url, message);
+        JsonNode warning = n.toJsonNode();
+        
+        ObjectNode node = (ObjectNode) JsonUtils.readTree(query);
+        if (node.has("warnings")){
+            warning = warning.at("/warnings/0");
+            ArrayNode arrayNode = (ArrayNode) node.get("warnings");
+            arrayNode.add(warning);
+            node.set("warnings", arrayNode);
+        }
+        else{
+            node.setAll((ObjectNode) warning);
+        }
+        
+        return node.toString(); 
+    }
 
     private void handleNonPublicFields (List<String> fieldList,
             boolean accessRewriteDisabled, QuerySerializer serializer) {
diff --git a/core/src/main/java/de/ids_mannheim/korap/test/TestController.java b/core/src/main/java/de/ids_mannheim/korap/test/TestController.java
index 43144bb..03b8f96 100644
--- a/core/src/main/java/de/ids_mannheim/korap/test/TestController.java
+++ b/core/src/main/java/de/ids_mannheim/korap/test/TestController.java
@@ -1,16 +1,28 @@
 package de.ids_mannheim.korap.test;
 
+import java.io.IOException;
 import java.io.InputStream;
 
-import javax.ws.rs.Produces;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Controller;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import de.ids_mannheim.korap.exceptions.KustvaktException;
+import de.ids_mannheim.korap.utils.JsonUtils;
+import de.ids_mannheim.korap.web.KustvaktResponseHandler;
+
 /**
  * Controllers used only for testing
  * 
@@ -22,12 +34,60 @@
 @Produces(MediaType.APPLICATION_JSON + ";charset=utf-8")
 public class TestController {
 
+    @Autowired
+    private KustvaktResponseHandler kustvaktResponseHandler;
+
+    public static ObjectMapper mapper = new ObjectMapper();
+
     @POST
     @Path("glemm")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response dummyGlemm (String jsonld) {
-        InputStream is = getClass().getClassLoader()
+    public Response dummyGlemm (String jsonld,
+            @QueryParam("param") String param) throws IOException {
+        InputStream is;
+        is = getClass().getClassLoader()
                 .getResourceAsStream("test-pipes.jsonld");
-        return Response.ok(is).build();
+
+        ObjectNode newJson = (ObjectNode) mapper.readTree(is);
+
+        try {
+            JsonNode node = JsonUtils.readTree(jsonld);
+            if (node.has("warnings")) {
+                node = node.get("warnings");
+                newJson.set("warnings", node);
+            }
+            if (param != null && !param.isEmpty()) {
+                ArrayNode keys = (ArrayNode) newJson.at("/query/wrap/key");
+                keys.add("die");
+                newJson.set("key", keys);
+            }
+        }
+        catch (KustvaktException e) {
+            throw kustvaktResponseHandler.throwit(e);
+        }
+        return Response.ok(newJson.toString()).build();
+    }
+
+    @POST
+    @Path("invalid-json-pipe")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response dummyPipe1 (String jsonld) {
+        String incorrectJson = "{blah:}";
+        return Response.ok(incorrectJson).build();
+    }
+
+    @POST
+    @Path("plain-response-pipe")
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response dummyPipe2 (String jsonld) {
+        String incorrectJson = "brumbrum";
+        return Response.ok(incorrectJson).build();
+    }
+
+    @POST
+    @Path("urlencoded-pipe")
+    @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
+    public Response dummyPipe3 (String jsonld) {
+        return Response.ok().build();
     }
 }
diff --git a/core/src/main/resources/test-pipes.jsonld b/core/src/main/resources/test-pipes.jsonld
index eca8cec..7fde643 100644
--- a/core/src/main/resources/test-pipes.jsonld
+++ b/core/src/main/resources/test-pipes.jsonld
@@ -10,7 +10,6 @@
             "match": "match:eq",
             "key": [
                 "der",
-                "die",
                 "das"
             ],
             "layer": "orth",