blob: 8e3df6a9e67e5942951cd0c4fc1cf74ecd2a7ae4 [file] [log] [blame]
Leo Repp58b9f112021-11-22 11:57:47 +01001'use strict';
2
3var Writable = require('flush-write-stream');
4
5function listenerCount(stream, evt) {
6 return stream.listeners(evt).length;
7}
8
9function hasListeners(stream) {
10 return !!(listenerCount(stream, 'readable') || listenerCount(stream, 'data'));
11}
12
13function sinker(file, enc, callback) {
14 callback();
15}
16
17function sink(stream) {
18 var sinkAdded = false;
19
20 var sinkOptions = {
21 objectMode: stream._readableState.objectMode,
22 };
23
24 var sinkStream = new Writable(sinkOptions, sinker);
25
26 function addSink() {
27 if (sinkAdded) {
28 return;
29 }
30
31 if (hasListeners(stream)) {
32 return;
33 }
34
35 sinkAdded = true;
36 stream.pipe(sinkStream);
37 }
38
39 function removeSink(evt) {
40 if (evt !== 'readable' && evt !== 'data') {
41 return;
42 }
43
44 if (hasListeners(stream)) {
45 sinkAdded = false;
46 stream.unpipe(sinkStream);
47 }
48 }
49
50 stream.on('newListener', removeSink);
51 stream.on('removeListener', removeSink);
52 stream.on('removeListener', addSink);
53
54 // Sink the stream to start flowing
55 // Do this on nextTick, it will flow at slowest speed of piped streams
56 process.nextTick(addSink);
57
58 return stream;
59}
60
61module.exports = sink;