blob: a68765b6b1ab760b693ffad22556a54e6b79dc04 [file] [log] [blame]
Leo Repp58b9f112021-11-22 11:57:47 +01001'use strict'
2
3var PassThrough = require('readable-stream').PassThrough
4var inherits = require('inherits')
5var p = require('process-nextick-args')
6
7function Cloneable (stream, opts) {
8 if (!(this instanceof Cloneable)) {
9 return new Cloneable(stream, opts)
10 }
11
12 var objectMode = stream._readableState.objectMode
13 this._original = stream
14 this._clonesCount = 1
15
16 opts = opts || {}
17 opts.objectMode = objectMode
18
19 PassThrough.call(this, opts)
20
21 forwardDestroy(stream, this)
22
23 this.on('newListener', onData)
24 this.once('resume', onResume)
25
26 this._hasListener = true
27}
28
29inherits(Cloneable, PassThrough)
30
31function onData (event, listener) {
32 if (event === 'data' || event === 'readable') {
33 this._hasListener = false
34 this.removeListener('newListener', onData)
35 this.removeListener('resume', onResume)
36 p.nextTick(clonePiped, this)
37 }
38}
39
40function onResume () {
41 this._hasListener = false
42 this.removeListener('newListener', onData)
43 p.nextTick(clonePiped, this)
44}
45
46Cloneable.prototype.clone = function () {
47 if (!this._original) {
48 throw new Error('already started')
49 }
50
51 this._clonesCount++
52
53 // the events added by the clone should not count
54 // for starting the flow
55 this.removeListener('newListener', onData)
56 var clone = new Clone(this)
57 if (this._hasListener) {
58 this.on('newListener', onData)
59 }
60
61 return clone
62}
63
64Cloneable.prototype._destroy = function (err, cb) {
65 if (!err) {
66 this.push(null)
67 this.end()
68 this.emit('close')
69 }
70
71 p.nextTick(cb, err)
72}
73
74function forwardDestroy (src, dest) {
75 src.on('error', destroy)
76 src.on('close', onClose)
77
78 function destroy (err) {
79 src.removeListener('close', onClose)
80 dest.destroy(err)
81 }
82
83 function onClose () {
84 dest.end()
85 }
86}
87
88function clonePiped (that) {
89 if (--that._clonesCount === 0 && !that._readableState.destroyed) {
90 that._original.pipe(that)
91 that._original = undefined
92 }
93}
94
95function Clone (parent, opts) {
96 if (!(this instanceof Clone)) {
97 return new Clone(parent, opts)
98 }
99
100 var objectMode = parent._readableState.objectMode
101
102 opts = opts || {}
103 opts.objectMode = objectMode
104
105 this.parent = parent
106
107 PassThrough.call(this, opts)
108
109 forwardDestroy(parent, this)
110
111 parent.pipe(this)
112
113 // the events added by the clone should not count
114 // for starting the flow
115 // so we add the newListener handle after we are done
116 this.on('newListener', onDataClone)
117 this.on('resume', onResumeClone)
118}
119
120function onDataClone (event, listener) {
121 // We start the flow once all clones are piped or destroyed
122 if (event === 'data' || event === 'readable' || event === 'close') {
123 p.nextTick(clonePiped, this.parent)
124 this.removeListener('newListener', onDataClone)
125 }
126}
127
128function onResumeClone () {
129 this.removeListener('newListener', onDataClone)
130 p.nextTick(clonePiped, this.parent)
131}
132
133inherits(Clone, PassThrough)
134
135Clone.prototype.clone = function () {
136 return this.parent.clone()
137}
138
139Cloneable.isCloneable = function (stream) {
140 return stream instanceof Cloneable || stream instanceof Clone
141}
142
143Clone.prototype._destroy = function (err, cb) {
144 if (!err) {
145 this.push(null)
146 this.end()
147 this.emit('close')
148 }
149
150 p.nextTick(cb, err)
151}
152
153module.exports = Cloneable