| Leo Repp | 58b9f11 | 2021-11-22 11:57:47 +0100 | [diff] [blame^] | 1 | 'use strict' |
| 2 | |
| 3 | var PassThrough = require('readable-stream').PassThrough |
| 4 | var inherits = require('inherits') |
| 5 | var p = require('process-nextick-args') |
| 6 | |
| 7 | function Cloneable (stream, opts) { |
| 8 | if (!(this instanceof Cloneable)) { |
| 9 | return new Cloneable(stream, opts) |
| 10 | } |
| 11 | |
| 12 | var objectMode = stream._readableState.objectMode |
| 13 | this._original = stream |
| 14 | this._clonesCount = 1 |
| 15 | |
| 16 | opts = opts || {} |
| 17 | opts.objectMode = objectMode |
| 18 | |
| 19 | PassThrough.call(this, opts) |
| 20 | |
| 21 | forwardDestroy(stream, this) |
| 22 | |
| 23 | this.on('newListener', onData) |
| 24 | this.once('resume', onResume) |
| 25 | |
| 26 | this._hasListener = true |
| 27 | } |
| 28 | |
| 29 | inherits(Cloneable, PassThrough) |
| 30 | |
| 31 | function onData (event, listener) { |
| 32 | if (event === 'data' || event === 'readable') { |
| 33 | this._hasListener = false |
| 34 | this.removeListener('newListener', onData) |
| 35 | this.removeListener('resume', onResume) |
| 36 | p.nextTick(clonePiped, this) |
| 37 | } |
| 38 | } |
| 39 | |
| 40 | function onResume () { |
| 41 | this._hasListener = false |
| 42 | this.removeListener('newListener', onData) |
| 43 | p.nextTick(clonePiped, this) |
| 44 | } |
| 45 | |
| 46 | Cloneable.prototype.clone = function () { |
| 47 | if (!this._original) { |
| 48 | throw new Error('already started') |
| 49 | } |
| 50 | |
| 51 | this._clonesCount++ |
| 52 | |
| 53 | // the events added by the clone should not count |
| 54 | // for starting the flow |
| 55 | this.removeListener('newListener', onData) |
| 56 | var clone = new Clone(this) |
| 57 | if (this._hasListener) { |
| 58 | this.on('newListener', onData) |
| 59 | } |
| 60 | |
| 61 | return clone |
| 62 | } |
| 63 | |
| 64 | Cloneable.prototype._destroy = function (err, cb) { |
| 65 | if (!err) { |
| 66 | this.push(null) |
| 67 | this.end() |
| 68 | this.emit('close') |
| 69 | } |
| 70 | |
| 71 | p.nextTick(cb, err) |
| 72 | } |
| 73 | |
| 74 | function forwardDestroy (src, dest) { |
| 75 | src.on('error', destroy) |
| 76 | src.on('close', onClose) |
| 77 | |
| 78 | function destroy (err) { |
| 79 | src.removeListener('close', onClose) |
| 80 | dest.destroy(err) |
| 81 | } |
| 82 | |
| 83 | function onClose () { |
| 84 | dest.end() |
| 85 | } |
| 86 | } |
| 87 | |
| 88 | function clonePiped (that) { |
| 89 | if (--that._clonesCount === 0 && !that._readableState.destroyed) { |
| 90 | that._original.pipe(that) |
| 91 | that._original = undefined |
| 92 | } |
| 93 | } |
| 94 | |
| 95 | function Clone (parent, opts) { |
| 96 | if (!(this instanceof Clone)) { |
| 97 | return new Clone(parent, opts) |
| 98 | } |
| 99 | |
| 100 | var objectMode = parent._readableState.objectMode |
| 101 | |
| 102 | opts = opts || {} |
| 103 | opts.objectMode = objectMode |
| 104 | |
| 105 | this.parent = parent |
| 106 | |
| 107 | PassThrough.call(this, opts) |
| 108 | |
| 109 | forwardDestroy(parent, this) |
| 110 | |
| 111 | parent.pipe(this) |
| 112 | |
| 113 | // the events added by the clone should not count |
| 114 | // for starting the flow |
| 115 | // so we add the newListener handle after we are done |
| 116 | this.on('newListener', onDataClone) |
| 117 | this.on('resume', onResumeClone) |
| 118 | } |
| 119 | |
| 120 | function onDataClone (event, listener) { |
| 121 | // We start the flow once all clones are piped or destroyed |
| 122 | if (event === 'data' || event === 'readable' || event === 'close') { |
| 123 | p.nextTick(clonePiped, this.parent) |
| 124 | this.removeListener('newListener', onDataClone) |
| 125 | } |
| 126 | } |
| 127 | |
| 128 | function onResumeClone () { |
| 129 | this.removeListener('newListener', onDataClone) |
| 130 | p.nextTick(clonePiped, this.parent) |
| 131 | } |
| 132 | |
| 133 | inherits(Clone, PassThrough) |
| 134 | |
| 135 | Clone.prototype.clone = function () { |
| 136 | return this.parent.clone() |
| 137 | } |
| 138 | |
| 139 | Cloneable.isCloneable = function (stream) { |
| 140 | return stream instanceof Cloneable || stream instanceof Clone |
| 141 | } |
| 142 | |
| 143 | Clone.prototype._destroy = function (err, cb) { |
| 144 | if (!err) { |
| 145 | this.push(null) |
| 146 | this.end() |
| 147 | this.emit('close') |
| 148 | } |
| 149 | |
| 150 | p.nextTick(cb, err) |
| 151 | } |
| 152 | |
| 153 | module.exports = Cloneable |