Revert "totalngrams: use chronicle map for larger maps"
This reverts commit 8f086bf0e18359f6dcec6a125936a79b7113dc4b.
diff --git a/pom.xml b/pom.xml
index 6466062..f7ecbdd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
<groupId>groupId</groupId>
<artifactId>nGrammFoldCount</artifactId>
- <version>1.2</version>
+ <version>1.1</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -152,17 +152,9 @@
</dependency>
<!-- https://mvnrepository.com/artifact/org.anarres/parallelgzip -->
<dependency>
- <scope>compile</scope>
<groupId>org.anarres</groupId>
<artifactId>parallelgzip</artifactId>
<version>1.0.5</version>
</dependency>
- <!-- https://mvnrepository.com/artifact/net.openhft/chronicle-map -->
- <dependency>
- <scope>compile</scope>
- <groupId>net.openhft</groupId>
- <artifactId>chronicle-map</artifactId>
- <version>3.17.8</version>
- </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/src/main/java/org/ids_mannheim/FoldedEntry.java b/src/main/java/org/ids_mannheim/FoldedEntry.java
index 9253bc9..028561a 100644
--- a/src/main/java/org/ids_mannheim/FoldedEntry.java
+++ b/src/main/java/org/ids_mannheim/FoldedEntry.java
@@ -1,12 +1,10 @@
package org.ids_mannheim;
-
-import java.io.Serializable;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.stream.IntStream;
-public class FoldedEntry implements Comparable<FoldedEntry>, Serializable {
+public class FoldedEntry implements Comparable<FoldedEntry> {
static int FOLDS = 10;
final AtomicIntegerArray count;
@@ -27,7 +25,7 @@
}
}
- public static void incr(ConcurrentMap<String, FoldedEntry> map, String ngram, int fold) {
+ public static void incr(ConcurrentHashMap<String, FoldedEntry> map, String ngram, int fold) {
map.compute(ngram, (key, value) -> {
if (value == null) {
value = new FoldedEntry();
diff --git a/src/main/java/org/ids_mannheim/TotalNGram.java b/src/main/java/org/ids_mannheim/TotalNGram.java
index 5c405eb..fb0ea3e 100644
--- a/src/main/java/org/ids_mannheim/TotalNGram.java
+++ b/src/main/java/org/ids_mannheim/TotalNGram.java
@@ -18,9 +18,8 @@
import java.util.logging.SimpleFormatter;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import java.util.zip.GZIPOutputStream;
-import net.openhft.chronicle.map.*;
-import shaded.org.ops4j.io.FileUtils;
@CommandLine.Command(mixinStandardHelpOptions = true,
name = "totalngram", description = "add ngram counts from KorAP-XML or CoNLL-U files")
@@ -50,11 +49,6 @@
String output_fillename = "-";
@SuppressWarnings("CanBeFinal")
- @CommandLine.Option(names = {"-t",
- "--tmp-dir"}, description = "Temporary directory where ChronicleMap files will be stored (default: ${DEFAULT-VALUE})")
- String tmpdir = "/var/tmp";
-
- @SuppressWarnings("CanBeFinal")
@CommandLine.Option(names = {"--force"}, description = "Force overwrite (default: ${DEFAULT-VALUE})")
boolean force_overwrite = false;
@@ -121,26 +115,8 @@
}
FoldedEntry.setFolds(FOLDS);
-
+ ConcurrentHashMap<String, FoldedEntry> map = new ConcurrentHashMap<>();
long totalFilesSizes = inputFiles.parallelStream().mapToLong(fname -> new File(fname).length()).sum();
- logger.info("Sum of file sizes: "+totalFilesSizes);
- ConcurrentMap<String, FoldedEntry> map;
- if (FOLDS > 2) {
- long estimated_entries = totalFilesSizes / 100;
- File persisted_map = Utils.createFile(
- Utils.newExtension(tmpdir + "/" + new File(output_fillename).getName(),
- "cm"), true);
- logger.info("Creating ChronicleMap with "+estimated_entries+ " entries");
- map = ChronicleMap
- .of(String.class, FoldedEntry.class)
- .name("ngrams-map")
- .averageKey("Amsterdam".repeat(FOLDS))
- .averageValue(new FoldedEntry())
- .entries(estimated_entries)
- .recoverPersistedTo(persisted_map, false);
- } else {
- map = new ConcurrentHashMap();
- }
etaPrinter = new Progressbar(totalFilesSizes);
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(inputFiles.size());
ExecutorService es = Executors.newCachedThreadPool();
@@ -164,20 +140,16 @@
etaPrinter.finish();
logger.info("Sorting and writing frequency table.");
System.err.println("Sorting and writing frequency table.");
- map.entrySet().stream()
+ map.entrySet().parallelStream()
.sorted(new FreqListEntryComparator<>())
- .forEach(entry -> output_stream.println(entry.getKey() + entry.getValue().toString()));
+ .forEachOrdered(entry -> output_stream.println(entry.getKey() + entry.getValue().toString()));
logger.info("Calculating column sums.");
- if (map instanceof ConcurrentHashMap) {
- logger.info("Calculating column sums.");
- System.err.println("Calculating column sums.");
- IntStream.rangeClosed(1, FOLDS)
- .forEach(i -> output_stream.print("\t" + Long.toUnsignedString(map.values()
- .stream().mapToLong(e -> Integer.toUnsignedLong(e.count.get(i))).sum())));
- output_stream.println("\t" + Long.toUnsignedString(map.values().stream().mapToLong(e -> Integer.toUnsignedLong(e.count.get(0))).sum()));
- } else {
- logger.info("Skip column sums calculation which is too slow with persisted hash maps.");
- }
+ System.err.println("Calculating column sums.");
+ IntStream.rangeClosed(1, FOLDS)
+ .parallel()
+ .forEachOrdered(i -> output_stream.print("\t" + Long.toUnsignedString(map.values()
+ .parallelStream().mapToLong(e -> Integer.toUnsignedLong(e.count.get(i))).sum())));
+ output_stream.println("\t" + Long.toUnsignedString(map.values().parallelStream().mapToLong(e -> Integer.toUnsignedLong(e.count.get(0))).sum()));
output_stream.close();
return null;
}
diff --git a/src/main/java/org/ids_mannheim/Worker.java b/src/main/java/org/ids_mannheim/Worker.java
index 6b123f2..e1d052b 100644
--- a/src/main/java/org/ids_mannheim/Worker.java
+++ b/src/main/java/org/ids_mannheim/Worker.java
@@ -1,7 +1,5 @@
package org.ids_mannheim;
-import net.openhft.chronicle.map.ChronicleMap;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
@@ -9,7 +7,6 @@
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -21,7 +18,7 @@
private static final int MAX_RETRIES = 10;
private final ArrayList<String> fnames;
private final BlockingQueue<Integer> queue;
- private final ConcurrentMap<String, FoldedEntry> map;
+ private final ConcurrentHashMap<String, FoldedEntry> map;
private final int folds;
private final Progressbar etaPrinter;
private final int ngram_size;
@@ -29,7 +26,7 @@
private final WorkerNodePool pool;
public Worker(BlockingQueue<Integer> queue, ArrayList<String> fnames, int ngram_size, int folds,
- ConcurrentMap<String, FoldedEntry> map,
+ ConcurrentHashMap<String, FoldedEntry> map,
WorkerNodePool pool,
Progressbar etaPrinter, Logger logger) {
this.queue = queue;
@@ -63,7 +60,7 @@
BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line;
int fold = -1;
- int texts = 0;
+ int texts=0;
while ((line = in.readLine()) != null) {
if (line.startsWith("#")) {
Matcher matcher = new_text_pattern.matcher(line);
@@ -78,11 +75,12 @@
if (strings.length < 4) {
continue;
}
+ //noinspection ConstantConditions
slidingWindowQueue.add(strings[1]);
}
pool.markFree(poolIndex);
if (texts > 0) {
- logger.info(pool.getHost(poolIndex) + " finished " + fname + " with " + texts + " texts");
+ logger.info(pool.getHost(poolIndex)+" finished " + fname + " with "+texts+ " texts");
etaPrinter.update(file_size);
retries = MAX_RETRIES;
index = queue.take();
@@ -91,7 +89,7 @@
logger.warning("Retrying " + fname);
sleep(1000);
} else {
- logger.severe("Giving up " + fname);
+ logger.severe ("Giving up " + fname);
index = queue.take();
}
}