totalngrams: use chronicle map for larger maps
diff --git a/pom.xml b/pom.xml
index f7ecbdd..6466062 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
 
     <groupId>groupId</groupId>
     <artifactId>nGrammFoldCount</artifactId>
-    <version>1.1</version>
+    <version>1.2</version>
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -152,9 +152,17 @@
         </dependency>
         <!-- https://mvnrepository.com/artifact/org.anarres/parallelgzip -->
         <dependency>
+            <scope>compile</scope>
             <groupId>org.anarres</groupId>
             <artifactId>parallelgzip</artifactId>
             <version>1.0.5</version>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/net.openhft/chronicle-map -->
+        <dependency>
+            <scope>compile</scope>
+            <groupId>net.openhft</groupId>
+            <artifactId>chronicle-map</artifactId>
+            <version>3.17.8</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/src/main/java/org/ids_mannheim/FoldedEntry.java b/src/main/java/org/ids_mannheim/FoldedEntry.java
index 028561a..9253bc9 100644
--- a/src/main/java/org/ids_mannheim/FoldedEntry.java
+++ b/src/main/java/org/ids_mannheim/FoldedEntry.java
@@ -1,10 +1,12 @@
 package org.ids_mannheim;
 
-import java.util.concurrent.ConcurrentHashMap;
+
+import java.io.Serializable;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 import java.util.stream.IntStream;
 
-public class FoldedEntry implements Comparable<FoldedEntry> {
+public class FoldedEntry implements Comparable<FoldedEntry>, Serializable {
     static int FOLDS = 10;
     final AtomicIntegerArray count;
 
@@ -25,7 +27,7 @@
         }
     }
 
-    public static void incr(ConcurrentHashMap<String, FoldedEntry> map, String ngram, int fold) {
+    public static void incr(ConcurrentMap<String, FoldedEntry> map, String ngram, int fold) {
         map.compute(ngram, (key, value) -> {
             if (value == null) {
                 value = new FoldedEntry();
diff --git a/src/main/java/org/ids_mannheim/TotalNGram.java b/src/main/java/org/ids_mannheim/TotalNGram.java
index fb0ea3e..5c405eb 100644
--- a/src/main/java/org/ids_mannheim/TotalNGram.java
+++ b/src/main/java/org/ids_mannheim/TotalNGram.java
@@ -18,8 +18,9 @@
 import java.util.logging.SimpleFormatter;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.zip.GZIPOutputStream;
 
+import net.openhft.chronicle.map.*;
+import shaded.org.ops4j.io.FileUtils;
 
 @CommandLine.Command(mixinStandardHelpOptions = true,
         name = "totalngram", description = "add ngram counts from KorAP-XML or CoNLL-U files")
@@ -49,6 +50,11 @@
     String output_fillename = "-";
 
     @SuppressWarnings("CanBeFinal")
+    @CommandLine.Option(names = {"-t",
+            "--tmp-dir"}, description = "Temporary directory where ChronicleMap files will be stored (default: ${DEFAULT-VALUE})")
+    String tmpdir = "/var/tmp";
+
+    @SuppressWarnings("CanBeFinal")
     @CommandLine.Option(names = {"--force"}, description = "Force overwrite (default: ${DEFAULT-VALUE})")
     boolean force_overwrite = false;
 
@@ -115,8 +121,26 @@
         }
 
         FoldedEntry.setFolds(FOLDS);
-        ConcurrentHashMap<String, FoldedEntry> map = new ConcurrentHashMap<>();
+
         long totalFilesSizes = inputFiles.parallelStream().mapToLong(fname -> new File(fname).length()).sum();
+        logger.info("Sum of file sizes: "+totalFilesSizes);
+        ConcurrentMap<String, FoldedEntry> map;
+        if (FOLDS > 2) {
+            long estimated_entries = totalFilesSizes / 100;
+            File persisted_map = Utils.createFile(
+                    Utils.newExtension(tmpdir + "/" + new File(output_fillename).getName(),
+                            "cm"), true);
+            logger.info("Creating ChronicleMap with "+estimated_entries+ " entries");
+            map = ChronicleMap
+                    .of(String.class, FoldedEntry.class)
+                    .name("ngrams-map")
+                    .averageKey("Amsterdam".repeat(FOLDS))
+                    .averageValue(new FoldedEntry())
+                    .entries(estimated_entries)
+                    .recoverPersistedTo(persisted_map, false);
+        } else {
+            map = new ConcurrentHashMap();
+        }
         etaPrinter = new Progressbar(totalFilesSizes);
         BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(inputFiles.size());
         ExecutorService es = Executors.newCachedThreadPool();
@@ -140,16 +164,20 @@
         etaPrinter.finish();
         logger.info("Sorting and writing frequency table.");
         System.err.println("Sorting and writing frequency table.");
-        map.entrySet().parallelStream()
+        map.entrySet().stream()
                 .sorted(new FreqListEntryComparator<>())
-                .forEachOrdered(entry -> output_stream.println(entry.getKey() + entry.getValue().toString()));
+                .forEach(entry -> output_stream.println(entry.getKey() + entry.getValue().toString()));
         logger.info("Calculating column sums.");
-        System.err.println("Calculating column sums.");
-        IntStream.rangeClosed(1, FOLDS)
-                .parallel()
-                .forEachOrdered(i -> output_stream.print("\t" + Long.toUnsignedString(map.values()
-                        .parallelStream().mapToLong(e -> Integer.toUnsignedLong(e.count.get(i))).sum())));
-        output_stream.println("\t" + Long.toUnsignedString(map.values().parallelStream().mapToLong(e -> Integer.toUnsignedLong(e.count.get(0))).sum()));
+        if (map instanceof ConcurrentHashMap) {
+            logger.info("Calculating column sums.");
+            System.err.println("Calculating column sums.");
+            IntStream.rangeClosed(1, FOLDS)
+                    .forEach(i -> output_stream.print("\t" + Long.toUnsignedString(map.values()
+                            .stream().mapToLong(e -> Integer.toUnsignedLong(e.count.get(i))).sum())));
+            output_stream.println("\t" + Long.toUnsignedString(map.values().stream().mapToLong(e -> Integer.toUnsignedLong(e.count.get(0))).sum()));
+        } else {
+            logger.info("Skip column sums calculation which is too slow with persisted hash maps.");
+        }
         output_stream.close();
         return null;
     }
diff --git a/src/main/java/org/ids_mannheim/Worker.java b/src/main/java/org/ids_mannheim/Worker.java
index e1d052b..6b123f2 100644
--- a/src/main/java/org/ids_mannheim/Worker.java
+++ b/src/main/java/org/ids_mannheim/Worker.java
@@ -1,5 +1,7 @@
 package org.ids_mannheim;
 
+import net.openhft.chronicle.map.ChronicleMap;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
@@ -7,6 +9,7 @@
 import java.util.ArrayList;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -18,7 +21,7 @@
     private static final int MAX_RETRIES = 10;
     private final ArrayList<String> fnames;
     private final BlockingQueue<Integer> queue;
-    private final ConcurrentHashMap<String, FoldedEntry> map;
+    private final ConcurrentMap<String, FoldedEntry> map;
     private final int folds;
     private final Progressbar etaPrinter;
     private final int ngram_size;
@@ -26,7 +29,7 @@
     private final WorkerNodePool pool;
 
     public Worker(BlockingQueue<Integer> queue, ArrayList<String> fnames, int ngram_size, int folds,
-                  ConcurrentHashMap<String, FoldedEntry> map,
+                  ConcurrentMap<String, FoldedEntry> map,
                   WorkerNodePool pool,
                   Progressbar etaPrinter, Logger logger) {
         this.queue = queue;
@@ -60,7 +63,7 @@
                 BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()));
                 String line;
                 int fold = -1;
-                int texts=0;
+                int texts = 0;
                 while ((line = in.readLine()) != null) {
                     if (line.startsWith("#")) {
                         Matcher matcher = new_text_pattern.matcher(line);
@@ -75,12 +78,11 @@
                     if (strings.length < 4) {
                         continue;
                     }
-                    //noinspection ConstantConditions
                     slidingWindowQueue.add(strings[1]);
                 }
                 pool.markFree(poolIndex);
                 if (texts > 0) {
-                    logger.info(pool.getHost(poolIndex)+" finished " + fname + " with "+texts+ " texts");
+                    logger.info(pool.getHost(poolIndex) + " finished " + fname + " with " + texts + " texts");
                     etaPrinter.update(file_size);
                     retries = MAX_RETRIES;
                     index = queue.take();
@@ -89,7 +91,7 @@
                         logger.warning("Retrying " + fname);
                         sleep(1000);
                     } else {
-                        logger.severe ("Giving up " + fname);
+                        logger.severe("Giving up " + fname);
                         index = queue.take();
                     }
                 }