Respect stdout backpressure in sparse mode
Avoiding heap errors, like:
```
$ pv ndy.conllu | docker run --rm -i korap/conllu-cmc:latest -s >
out.txt
1.41GiB 0:01:44 [5.33MiB/s]
[==========================================================================================================================================================================================>
] 90% ETA 0:00:11
<--- Last few GCs --->
[1:0x7fac64edc000] 104717 ms: Scavenge 3973.3 (4008.7) -> 3920.7
(4012.7) MB, pooled: 0.0 MB, 5.60 / 0.00 ms (average mu = 0.907, current
mu = 0.773) allocation failure;
[1:0x7fac64edc000] 106306 ms: Mark-Compact (reduce) 4220.5 (4287.8) ->
4182.9 (4187.3) MB, pooled: 0.0 MB, 492.75 / 0.00 ms (+ 217.2 ms in 291
steps since start of marking, biggest step 6.1 ms, walltime since start
of marking 1512 ms) (average mu = 0.84
FATAL ERROR: Reached heap limit Allocation failed - JavaScript heap out
of memory
----- Native stack trace -----
1.42GiB 0:01:51 [13.1MiB/s]
[============================================================================================================================================================================================>
] 91%
```
Change-Id: Idb1cfe0d9eed4a25c630134c0445c3b000f55645
diff --git a/src/index.js b/src/index.js
index ce45a04..3d34cd2 100755
--- a/src/index.js
+++ b/src/index.js
@@ -70,28 +70,36 @@
const EmojiRegex = require('emoji-regex');
const emojiRegex = EmojiRegex();
+const { once } = require('events');
const readline = require('readline');
global.header = '';
+global.fileheader = '';
global.standalone = false
const rl = readline.createInterface({
input: process.stdin,
- output: process.stdout,
terminal: false,
});
-function parseConllu(line) {
+async function writeOutput(text) {
+ if (!process.stdout.write(text)) {
+ await once(process.stdout, 'drain');
+ }
+}
+
+
+async function parseConllu(line) {
if (line.match('#\\s*foundry')) {
if (line.match('=\\s*base')) {
if (options.sparse) {
global.standalone = true
}
- process.stdout.write("# foundry = cmc\n");
+ await writeOutput("# foundry = cmc\n");
} else {
- process.stdout.write(`${line}\n`);
+ await writeOutput(`${line}\n`);
}
return
}
@@ -104,21 +112,21 @@
global.fileheader += `${line}\n`;
return;
} else if (line.match('^#\\s*eo[ft]')) {
- process.stdout.write(`${line}\n`);
+ await writeOutput(`${line}\n`);
return;
} else if (line.match('^#')) {
global.header += `${line}\n`;
return;
} else if (line.trim().match('^$')) {
if (global.header == "") {
- process.stdout.write("\n");
+ await writeOutput("\n");
}
global.header = '';
return
}
} else {
if (!line.match('^\\d+')) {
- process.stdout.write(`${line}\n`);
+ await writeOutput(`${line}\n`);
return;
}
}
@@ -128,7 +136,7 @@
const word = columns[1];
// Guard clause: if word is undefined, just output the line as-is
if (!word) {
- process.stdout.write(`${line}\n`);
+ await writeOutput(`${line}\n`);
return;
}
@@ -167,16 +175,24 @@
}
}
if (global.standalone) {
- process.stdout.write(fileheader);
- process.stdout.write(header);
+ await writeOutput(fileheader);
+ await writeOutput(header);
new_tag = null;
header = fileheader = '';
}
- process.stdout.write(columns.join('\t') + '\n');
+ await writeOutput(columns.join('\t') + '\n');
} else if (!global.standalone) {
- process.stdout.write(`${line}\n`);
+ await writeOutput(`${line}\n`);
}
}
-rl.on('line', parseConllu);
-rl.on('close', () => process.exit(0)); // important to exit, otherwise the process will hang
+async function main() {
+ for await (const line of rl) {
+ await parseConllu(line);
+ }
+}
+
+main().catch((error) => {
+ console.error(error);
+ process.exit(1);
+});