blob: 61f388650b3614b0eba6072a8ebaf5257f025168 [file] [log] [blame]
Leo Repp58b9f112021-11-22 11:57:47 +01001var Readable = require('readable-stream/readable');
2var util = require('util');
3
4function isReadable(stream) {
5 if (typeof stream.pipe !== 'function') {
6 return false;
7 }
8
9 if (!stream.readable) {
10 return false;
11 }
12
13 if (typeof stream._read !== 'function') {
14 return false;
15 }
16
17 if (!stream._readableState) {
18 return false;
19 }
20
21 return true;
22}
23
24function addStream (streams, stream) {
25 if (!isReadable(stream)) {
26 throw new Error('All input streams must be readable');
27 }
28
29 var self = this;
30
31 stream._buffer = [];
32
33 stream.on('readable', function () {
34 var chunk = stream.read();
35 while (chunk) {
36 if (this === streams[0]) {
37 self.push(chunk);
38 } else {
39 this._buffer.push(chunk);
40 }
41 chunk = stream.read();
42 }
43 });
44
45 stream.on('end', function () {
46 for (var stream = streams[0];
47 stream && stream._readableState.ended;
48 stream = streams[0]) {
49 while (stream._buffer.length) {
50 self.push(stream._buffer.shift());
51 }
52
53 streams.shift();
54 }
55
56 if (!streams.length) {
57 self.push(null);
58 }
59 });
60
61 stream.on('error', this.emit.bind(this, 'error'));
62
63 streams.push(stream);
64}
65
66function OrderedStreams (streams, options) {
67 if (!(this instanceof(OrderedStreams))) {
68 return new OrderedStreams(streams, options);
69 }
70
71 streams = streams || [];
72 options = options || {};
73
74 options.objectMode = true;
75
76 Readable.call(this, options);
77
78 if (!Array.isArray(streams)) {
79 streams = [streams];
80 }
81 if (!streams.length) {
82 return this.push(null); // no streams, close
83 }
84
85 var addStreamBinded = addStream.bind(this, []);
86
87 streams.forEach(function (item) {
88 if (Array.isArray(item)) {
89 item.forEach(addStreamBinded);
90 } else {
91 addStreamBinded(item);
92 }
93 });
94}
95util.inherits(OrderedStreams, Readable);
96
97OrderedStreams.prototype._read = function () {};
98
99module.exports = OrderedStreams;