totalngrams: use chronicle map for larger maps
diff --git a/pom.xml b/pom.xml
index f7ecbdd..6466062 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>groupId</groupId>
<artifactId>nGrammFoldCount</artifactId>
- <version>1.1</version>
+ <version>1.2</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -152,9 +152,17 @@
</dependency>
<!-- https://mvnrepository.com/artifact/org.anarres/parallelgzip -->
<dependency>
+ <scope>compile</scope>
<groupId>org.anarres</groupId>
<artifactId>parallelgzip</artifactId>
<version>1.0.5</version>
</dependency>
+ <!-- https://mvnrepository.com/artifact/net.openhft/chronicle-map -->
+ <dependency>
+ <scope>compile</scope>
+ <groupId>net.openhft</groupId>
+ <artifactId>chronicle-map</artifactId>
+ <version>3.17.8</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/src/main/java/org/ids_mannheim/FoldedEntry.java b/src/main/java/org/ids_mannheim/FoldedEntry.java
index 028561a..9253bc9 100644
--- a/src/main/java/org/ids_mannheim/FoldedEntry.java
+++ b/src/main/java/org/ids_mannheim/FoldedEntry.java
@@ -1,10 +1,12 @@
package org.ids_mannheim;
-import java.util.concurrent.ConcurrentHashMap;
+
+import java.io.Serializable;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.stream.IntStream;
-public class FoldedEntry implements Comparable<FoldedEntry> {
+public class FoldedEntry implements Comparable<FoldedEntry>, Serializable {
static int FOLDS = 10;
final AtomicIntegerArray count;
@@ -25,7 +27,7 @@
}
}
- public static void incr(ConcurrentHashMap<String, FoldedEntry> map, String ngram, int fold) {
+ public static void incr(ConcurrentMap<String, FoldedEntry> map, String ngram, int fold) {
map.compute(ngram, (key, value) -> {
if (value == null) {
value = new FoldedEntry();
diff --git a/src/main/java/org/ids_mannheim/TotalNGram.java b/src/main/java/org/ids_mannheim/TotalNGram.java
index fb0ea3e..5c405eb 100644
--- a/src/main/java/org/ids_mannheim/TotalNGram.java
+++ b/src/main/java/org/ids_mannheim/TotalNGram.java
@@ -18,8 +18,9 @@
import java.util.logging.SimpleFormatter;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import java.util.zip.GZIPOutputStream;
+import net.openhft.chronicle.map.*;
+import shaded.org.ops4j.io.FileUtils;
@CommandLine.Command(mixinStandardHelpOptions = true,
name = "totalngram", description = "add ngram counts from KorAP-XML or CoNLL-U files")
@@ -49,6 +50,11 @@
String output_fillename = "-";
@SuppressWarnings("CanBeFinal")
+ @CommandLine.Option(names = {"-t",
+ "--tmp-dir"}, description = "Temporary directory where ChronicleMap files will be stored (default: ${DEFAULT-VALUE})")
+ String tmpdir = "/var/tmp";
+
+ @SuppressWarnings("CanBeFinal")
@CommandLine.Option(names = {"--force"}, description = "Force overwrite (default: ${DEFAULT-VALUE})")
boolean force_overwrite = false;
@@ -115,8 +121,26 @@
}
FoldedEntry.setFolds(FOLDS);
- ConcurrentHashMap<String, FoldedEntry> map = new ConcurrentHashMap<>();
+
long totalFilesSizes = inputFiles.parallelStream().mapToLong(fname -> new File(fname).length()).sum();
+ logger.info("Sum of file sizes: "+totalFilesSizes);
+ ConcurrentMap<String, FoldedEntry> map;
+ if (FOLDS > 2) {
+ long estimated_entries = totalFilesSizes / 100;
+ File persisted_map = Utils.createFile(
+ Utils.newExtension(tmpdir + "/" + new File(output_fillename).getName(),
+ "cm"), true);
+ logger.info("Creating ChronicleMap with "+estimated_entries+ " entries");
+ map = ChronicleMap
+ .of(String.class, FoldedEntry.class)
+ .name("ngrams-map")
+ .averageKey("Amsterdam".repeat(FOLDS))
+ .averageValue(new FoldedEntry())
+ .entries(estimated_entries)
+ .recoverPersistedTo(persisted_map, false);
+ } else {
+ map = new ConcurrentHashMap();
+ }
etaPrinter = new Progressbar(totalFilesSizes);
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(inputFiles.size());
ExecutorService es = Executors.newCachedThreadPool();
@@ -140,16 +164,20 @@
etaPrinter.finish();
logger.info("Sorting and writing frequency table.");
System.err.println("Sorting and writing frequency table.");
- map.entrySet().parallelStream()
+ map.entrySet().stream()
.sorted(new FreqListEntryComparator<>())
- .forEachOrdered(entry -> output_stream.println(entry.getKey() + entry.getValue().toString()));
+ .forEach(entry -> output_stream.println(entry.getKey() + entry.getValue().toString()));
logger.info("Calculating column sums.");
- System.err.println("Calculating column sums.");
- IntStream.rangeClosed(1, FOLDS)
- .parallel()
- .forEachOrdered(i -> output_stream.print("\t" + Long.toUnsignedString(map.values()
- .parallelStream().mapToLong(e -> Integer.toUnsignedLong(e.count.get(i))).sum())));
- output_stream.println("\t" + Long.toUnsignedString(map.values().parallelStream().mapToLong(e -> Integer.toUnsignedLong(e.count.get(0))).sum()));
+ if (map instanceof ConcurrentHashMap) {
+ logger.info("Calculating column sums.");
+ System.err.println("Calculating column sums.");
+ IntStream.rangeClosed(1, FOLDS)
+ .forEach(i -> output_stream.print("\t" + Long.toUnsignedString(map.values()
+ .stream().mapToLong(e -> Integer.toUnsignedLong(e.count.get(i))).sum())));
+ output_stream.println("\t" + Long.toUnsignedString(map.values().stream().mapToLong(e -> Integer.toUnsignedLong(e.count.get(0))).sum()));
+ } else {
+ logger.info("Skip column sums calculation which is too slow with persisted hash maps.");
+ }
output_stream.close();
return null;
}
diff --git a/src/main/java/org/ids_mannheim/Worker.java b/src/main/java/org/ids_mannheim/Worker.java
index e1d052b..6b123f2 100644
--- a/src/main/java/org/ids_mannheim/Worker.java
+++ b/src/main/java/org/ids_mannheim/Worker.java
@@ -1,5 +1,7 @@
package org.ids_mannheim;
+import net.openhft.chronicle.map.ChronicleMap;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
@@ -7,6 +9,7 @@
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -18,7 +21,7 @@
private static final int MAX_RETRIES = 10;
private final ArrayList<String> fnames;
private final BlockingQueue<Integer> queue;
- private final ConcurrentHashMap<String, FoldedEntry> map;
+ private final ConcurrentMap<String, FoldedEntry> map;
private final int folds;
private final Progressbar etaPrinter;
private final int ngram_size;
@@ -26,7 +29,7 @@
private final WorkerNodePool pool;
public Worker(BlockingQueue<Integer> queue, ArrayList<String> fnames, int ngram_size, int folds,
- ConcurrentHashMap<String, FoldedEntry> map,
+ ConcurrentMap<String, FoldedEntry> map,
WorkerNodePool pool,
Progressbar etaPrinter, Logger logger) {
this.queue = queue;
@@ -60,7 +63,7 @@
BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line;
int fold = -1;
- int texts=0;
+ int texts = 0;
while ((line = in.readLine()) != null) {
if (line.startsWith("#")) {
Matcher matcher = new_text_pattern.matcher(line);
@@ -75,12 +78,11 @@
if (strings.length < 4) {
continue;
}
- //noinspection ConstantConditions
slidingWindowQueue.add(strings[1]);
}
pool.markFree(poolIndex);
if (texts > 0) {
- logger.info(pool.getHost(poolIndex)+" finished " + fname + " with "+texts+ " texts");
+ logger.info(pool.getHost(poolIndex) + " finished " + fname + " with " + texts + " texts");
etaPrinter.update(file_size);
retries = MAX_RETRIES;
index = queue.take();
@@ -89,7 +91,7 @@
logger.warning("Retrying " + fname);
sleep(1000);
} else {
- logger.severe ("Giving up " + fname);
+ logger.severe("Giving up " + fname);
index = queue.take();
}
}