| 'use strict' |
| |
| var PassThrough = require('readable-stream').PassThrough |
| var inherits = require('inherits') |
| var p = require('process-nextick-args') |
| |
| function Cloneable (stream, opts) { |
| if (!(this instanceof Cloneable)) { |
| return new Cloneable(stream, opts) |
| } |
| |
| var objectMode = stream._readableState.objectMode |
| this._original = stream |
| this._clonesCount = 1 |
| |
| opts = opts || {} |
| opts.objectMode = objectMode |
| |
| PassThrough.call(this, opts) |
| |
| forwardDestroy(stream, this) |
| |
| this.on('newListener', onData) |
| this.once('resume', onResume) |
| |
| this._hasListener = true |
| } |
| |
| inherits(Cloneable, PassThrough) |
| |
| function onData (event, listener) { |
| if (event === 'data' || event === 'readable') { |
| this._hasListener = false |
| this.removeListener('newListener', onData) |
| this.removeListener('resume', onResume) |
| p.nextTick(clonePiped, this) |
| } |
| } |
| |
| function onResume () { |
| this._hasListener = false |
| this.removeListener('newListener', onData) |
| p.nextTick(clonePiped, this) |
| } |
| |
| Cloneable.prototype.clone = function () { |
| if (!this._original) { |
| throw new Error('already started') |
| } |
| |
| this._clonesCount++ |
| |
| // the events added by the clone should not count |
| // for starting the flow |
| this.removeListener('newListener', onData) |
| var clone = new Clone(this) |
| if (this._hasListener) { |
| this.on('newListener', onData) |
| } |
| |
| return clone |
| } |
| |
| Cloneable.prototype._destroy = function (err, cb) { |
| if (!err) { |
| this.push(null) |
| this.end() |
| this.emit('close') |
| } |
| |
| p.nextTick(cb, err) |
| } |
| |
| function forwardDestroy (src, dest) { |
| src.on('error', destroy) |
| src.on('close', onClose) |
| |
| function destroy (err) { |
| src.removeListener('close', onClose) |
| dest.destroy(err) |
| } |
| |
| function onClose () { |
| dest.end() |
| } |
| } |
| |
| function clonePiped (that) { |
| if (--that._clonesCount === 0 && !that._readableState.destroyed) { |
| that._original.pipe(that) |
| that._original = undefined |
| } |
| } |
| |
| function Clone (parent, opts) { |
| if (!(this instanceof Clone)) { |
| return new Clone(parent, opts) |
| } |
| |
| var objectMode = parent._readableState.objectMode |
| |
| opts = opts || {} |
| opts.objectMode = objectMode |
| |
| this.parent = parent |
| |
| PassThrough.call(this, opts) |
| |
| forwardDestroy(parent, this) |
| |
| parent.pipe(this) |
| |
| // the events added by the clone should not count |
| // for starting the flow |
| // so we add the newListener handle after we are done |
| this.on('newListener', onDataClone) |
| this.on('resume', onResumeClone) |
| } |
| |
| function onDataClone (event, listener) { |
| // We start the flow once all clones are piped or destroyed |
| if (event === 'data' || event === 'readable' || event === 'close') { |
| p.nextTick(clonePiped, this.parent) |
| this.removeListener('newListener', onDataClone) |
| } |
| } |
| |
| function onResumeClone () { |
| this.removeListener('newListener', onDataClone) |
| p.nextTick(clonePiped, this.parent) |
| } |
| |
| inherits(Clone, PassThrough) |
| |
| Clone.prototype.clone = function () { |
| return this.parent.clone() |
| } |
| |
| Cloneable.isCloneable = function (stream) { |
| return stream instanceof Cloneable || stream instanceof Clone |
| } |
| |
| Clone.prototype._destroy = function (err, cb) { |
| if (!err) { |
| this.push(null) |
| this.end() |
| this.emit('close') |
| } |
| |
| p.nextTick(cb, err) |
| } |
| |
| module.exports = Cloneable |