Respect stdout backpressure in sparse mode
Avoiding heap errors, like:
```
$ pv ndy.conllu | docker run --rm -i korap/conllu-cmc:latest -s >
out.txt
1.41GiB 0:01:44 [5.33MiB/s]
[==========================================================================================================================================================================================>
] 90% ETA 0:00:11
<--- Last few GCs --->
[1:0x7fac64edc000] 104717 ms: Scavenge 3973.3 (4008.7) -> 3920.7
(4012.7) MB, pooled: 0.0 MB, 5.60 / 0.00 ms (average mu = 0.907, current
mu = 0.773) allocation failure;
[1:0x7fac64edc000] 106306 ms: Mark-Compact (reduce) 4220.5 (4287.8) ->
4182.9 (4187.3) MB, pooled: 0.0 MB, 492.75 / 0.00 ms (+ 217.2 ms in 291
steps since start of marking, biggest step 6.1 ms, walltime since start
of marking 1512 ms) (average mu = 0.84
FATAL ERROR: Reached heap limit Allocation failed - JavaScript heap out
of memory
----- Native stack trace -----
1.42GiB 0:01:51 [13.1MiB/s]
[============================================================================================================================================================================================>
] 91%
```
Change-Id: Idb1cfe0d9eed4a25c630134c0445c3b000f55645
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e25a8dd..11202ae 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,6 +16,10 @@
- Purely numeric tokens such as `#10` are no longer tagged as `HST`; hashtags must contain at least one letter.
- Documentation examples were revised and anonymized for public release.
+### Fixed
+
+- Sparse mode now respects stdout backpressure, avoiding Node.js heap exhaustion on very large corpora with many matches.
+
## 1.0.0
- Initial release.
diff --git a/Readme.md b/Readme.md
index 1fc5c46..a1227ee 100644
--- a/Readme.md
+++ b/Readme.md
@@ -104,7 +104,7 @@
The tagger is implemented in Node.js because the runtime provides efficient regular-expression execution, which is central to this regex-based annotation workflow.
-On CMC corpora with many matches, throughput is above 13 MB/s. This includes dense CMC material such as the NottDeuYTSch corpus.
+On CMC corpora with many matches, throughput is above 10 MB/s. This includes dense CMC material such as the NottDeuYTSch corpus.
## Applications
diff --git a/src/index.js b/src/index.js
index ce45a04..3d34cd2 100755
--- a/src/index.js
+++ b/src/index.js
@@ -70,28 +70,36 @@
const EmojiRegex = require('emoji-regex');
const emojiRegex = EmojiRegex();
+const { once } = require('events');
const readline = require('readline');
global.header = '';
+global.fileheader = '';
global.standalone = false
const rl = readline.createInterface({
input: process.stdin,
- output: process.stdout,
terminal: false,
});
-function parseConllu(line) {
+async function writeOutput(text) {
+ if (!process.stdout.write(text)) {
+ await once(process.stdout, 'drain');
+ }
+}
+
+
+async function parseConllu(line) {
if (line.match('#\\s*foundry')) {
if (line.match('=\\s*base')) {
if (options.sparse) {
global.standalone = true
}
- process.stdout.write("# foundry = cmc\n");
+ await writeOutput("# foundry = cmc\n");
} else {
- process.stdout.write(`${line}\n`);
+ await writeOutput(`${line}\n`);
}
return
}
@@ -104,21 +112,21 @@
global.fileheader += `${line}\n`;
return;
} else if (line.match('^#\\s*eo[ft]')) {
- process.stdout.write(`${line}\n`);
+ await writeOutput(`${line}\n`);
return;
} else if (line.match('^#')) {
global.header += `${line}\n`;
return;
} else if (line.trim().match('^$')) {
if (global.header == "") {
- process.stdout.write("\n");
+ await writeOutput("\n");
}
global.header = '';
return
}
} else {
if (!line.match('^\\d+')) {
- process.stdout.write(`${line}\n`);
+ await writeOutput(`${line}\n`);
return;
}
}
@@ -128,7 +136,7 @@
const word = columns[1];
// Guard clause: if word is undefined, just output the line as-is
if (!word) {
- process.stdout.write(`${line}\n`);
+ await writeOutput(`${line}\n`);
return;
}
@@ -167,16 +175,24 @@
}
}
if (global.standalone) {
- process.stdout.write(fileheader);
- process.stdout.write(header);
+ await writeOutput(fileheader);
+ await writeOutput(header);
new_tag = null;
header = fileheader = '';
}
- process.stdout.write(columns.join('\t') + '\n');
+ await writeOutput(columns.join('\t') + '\n');
} else if (!global.standalone) {
- process.stdout.write(`${line}\n`);
+ await writeOutput(`${line}\n`);
}
}
-rl.on('line', parseConllu);
-rl.on('close', () => process.exit(0)); // important to exit, otherwise the process will hang
+async function main() {
+ for await (const line of rl) {
+ await parseConllu(line);
+ }
+}
+
+main().catch((error) => {
+ console.error(error);
+ process.exit(1);
+});