Auto detect xz compression for input and output

Note that the compression is much slower than with gz, because
the xz compressor cannot yet run multi-threaded. However, xz compression
is typically much better.

Change-Id: Ia8bc7f6238f3c6367fd78ab45d37d2485090ef6f
diff --git a/pom.xml b/pom.xml
index 07b65a5..5603cc8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -224,5 +224,10 @@
             <version>4.12</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.tukaani</groupId>
+            <artifactId>xz</artifactId>
+            <version>1.9</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/src/main/java/org/ids_mannheim/TotalNGrams.java b/src/main/java/org/ids_mannheim/TotalNGrams.java
index 9d868c0..952998a 100644
--- a/src/main/java/org/ids_mannheim/TotalNGrams.java
+++ b/src/main/java/org/ids_mannheim/TotalNGrams.java
@@ -1,6 +1,8 @@
 package org.ids_mannheim;
 
 import org.anarres.parallelgzip.ParallelGZIPOutputStream;
+import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream;
+import org.tukaani.xz.XZOutputStream;
 import picocli.CommandLine;
 
 import java.io.BufferedOutputStream;
@@ -125,6 +127,8 @@
             File f = Utils.createFile(output_fillename, force_overwrite);
             if (output_fillename.endsWith(".gz")) {
                 output_stream = new PrintStream(new ParallelGZIPOutputStream((new BufferedOutputStream(new FileOutputStream(f)))));
+            } else if (output_fillename.endsWith(".xz")) {
+                output_stream = new PrintStream(new XZCompressorOutputStream((new BufferedOutputStream(new FileOutputStream(f)))));
             } else {
                 output_stream = new PrintStream(new BufferedOutputStream(new FileOutputStream(f)));
             }
diff --git a/src/main/java/org/ids_mannheim/Worker.java b/src/main/java/org/ids_mannheim/Worker.java
index b468a65..ca7f27c 100644
--- a/src/main/java/org/ids_mannheim/Worker.java
+++ b/src/main/java/org/ids_mannheim/Worker.java
@@ -1,6 +1,7 @@
 package org.ids_mannheim;
 
 import org.anarres.parallelgzip.ParallelGZIPInputStream;
+import org.tukaani.xz.XZInputStream;
 
 import java.io.*;
 import java.util.ArrayList;
@@ -64,6 +65,8 @@
                 logger.info(String.format("Processing %d/%d %s %s", index, fnames.size(), pool.getHost(poolIndex), current_file.getName()));
                 if (fname.matches(".*\\.conllu\\.gz$")) {
                     in = new BufferedReader(new InputStreamReader(new ParallelGZIPInputStream(new FileInputStream(fname))));
+                } else if (fname.matches(".*\\.conllu\\.xz$")) {
+                    in = new BufferedReader(new InputStreamReader(new XZInputStream(new FileInputStream(fname))));
                 } else if (fname.matches(".*\\.conllu?$")) {
                     in = new BufferedReader(new InputStreamReader(new FileInputStream(fname)));
                 } else if (fname.matches(".*\\.zip$")) {
@@ -75,11 +78,20 @@
                     };
                     Process p = Runtime.getRuntime().exec(cmd);
                     in = new BufferedReader(new InputStreamReader(p.getInputStream()));
-                } else if (fname.matches(".*\\.(freq|tsv)(\\.gz)?$")) {
-                    in = new BufferedReader(new InputStreamReader(
-                            (fname.matches("\\.gz$") ?
-                                    new ParallelGZIPInputStream(new FileInputStream(fname)) :
-                                    new FileInputStream(fname))));
+                } else if (fname.matches(".*\\.(freq|[ct]sv)(\\.[xg]z)?$")) {
+                    FileInputStream fileInputStream = new FileInputStream(fname);
+                    InputStreamReader inputStreamReader;
+                    if (fname.matches(".*\\.gz$")) {
+                        logger.info("gz input stream detected for "+fname);
+                        inputStreamReader = new InputStreamReader(new ParallelGZIPInputStream(fileInputStream));
+                    } else if (fname.matches(".*\\.xz$")) {
+                        logger.info("xz input stream detected for "+fname);
+                        inputStreamReader = new InputStreamReader(new XZInputStream(fileInputStream));
+                    } else {
+                        logger.info("uncompressed input stream detected for "+fname);
+                        inputStreamReader = new InputStreamReader(fileInputStream);
+                    }
+                    in = new BufferedReader(inputStreamReader);
                     String line;
                     while((line = in.readLine()) != null) {
                         int tabPos = line.lastIndexOf('\t');
diff --git a/src/test/java/org/ids_mannheim/WorkerTest.java b/src/test/java/org/ids_mannheim/WorkerTest.java
index 80474ae..5eab373 100644
--- a/src/test/java/org/ids_mannheim/WorkerTest.java
+++ b/src/test/java/org/ids_mannheim/WorkerTest.java
@@ -16,12 +16,13 @@
 import static org.junit.jupiter.api.Assertions.*;
 
 class WorkerTest {
-    private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
+    private ByteArrayOutputStream errContent;
     private final PrintStream originalErr = System.err;
     Worker worker;
     ConcurrentHashMap<String, AtomicInteger> map;
     @BeforeEach
     public void setUpStreams() {
+        errContent = new ByteArrayOutputStream();
         System.setErr(new PrintStream(errContent));
     }
 
@@ -31,7 +32,7 @@
     }
 
     @Test
-    void resultAndOutputAreCorrect() throws IOException {
+    void resultAndOutputAreCorrectWithGz() throws IOException {
         Map<String, Integer> gold = Map.of(
                 "und	und	KON	Fluchen	Fluchen	NN", 1,
                 "Bestreben	Bestreben	NN	,	,	$,", 1,
@@ -173,4 +174,49 @@
         gold.forEach((key, value) -> assertNotNull(map.get(key)));
         gold.forEach((key, value) -> assertEquals(value, map.get(key).intValue()));
     }
+
+    @Test
+    void resultAndOutputAreCorrectWithXz() throws IOException {
+        Map<String, Integer> gold = Map.of(
+                "und	und	KON	Fluchen	Fluchen	NN", 1,
+                "Bestreben	Bestreben	NN	,	,	$,", 1,
+                "Bürger	Bürger	NN	sich	sich	PRF", 1,
+                "dieses	dies	PDAT	würdigen	würdig	ADJA", 1,
+                "im	in	APPRART	Kriegshandwerk	Kriegshandwerk	NN", 1,
+                "man	man	PIS	nur	nur	ADV", 1,
+                "von	von	APPR	Longwy	--	NE", 3,
+                "nicht	nicht	PTKNEG	ungeschickt	ungeschickt	ADJD", 1,
+                "Republikaner	Republikaner	NN	und	und	KON", 1,
+                "Patriotismus	Patriotismus	NN	derer	die	PDS", 1
+        );
+
+        File tempFile = File.createTempFile("goe_sample", ".conllu.xz");
+        tempFile.deleteOnExit();
+        try (FileOutputStream out = new FileOutputStream(tempFile)) {
+            IOUtils.copy(Thread.currentThread().getContextClassLoader()
+                    .getResourceAsStream("goe_sample.conllu.xz"), out);
+        }
+        ArrayList<String> fnames = new ArrayList<>();
+        fnames.add(tempFile.getAbsolutePath());
+        map = new ConcurrentHashMap<>();
+        LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);
+        worker = new Worker(
+                queue,
+                fnames,
+                2,
+                8,
+                10,
+                map,
+                true,
+                false,
+                new WorkerNodePool(""),
+                new Progressbar(tempFile.length()),
+                Logger.getLogger(TotalNGrams.class.getSimpleName()));
+
+        queue.add(0);
+        queue.add(-1);
+        worker.run();
+        gold.forEach((key, value) -> assertEquals(value, map.get(key).intValue()));
+    }
+
 }
diff --git a/src/test/resources/goe_sample.conllu.xz b/src/test/resources/goe_sample.conllu.xz
new file mode 100644
index 0000000..724af90
--- /dev/null
+++ b/src/test/resources/goe_sample.conllu.xz
Binary files differ