| Leo Repp | 58b9f11 | 2021-11-22 11:57:47 +0100 | [diff] [blame^] | 1 | var stream = require('readable-stream') |
| 2 | var eos = require('end-of-stream') |
| 3 | var inherits = require('inherits') |
| 4 | var shift = require('stream-shift') |
| 5 | |
| 6 | var SIGNAL_FLUSH = (Buffer.from && Buffer.from !== Uint8Array.from) |
| 7 | ? Buffer.from([0]) |
| 8 | : new Buffer([0]) |
| 9 | |
| 10 | var onuncork = function(self, fn) { |
| 11 | if (self._corked) self.once('uncork', fn) |
| 12 | else fn() |
| 13 | } |
| 14 | |
| 15 | var autoDestroy = function (self, err) { |
| 16 | if (self._autoDestroy) self.destroy(err) |
| 17 | } |
| 18 | |
| 19 | var destroyer = function(self, end) { |
| 20 | return function(err) { |
| 21 | if (err) autoDestroy(self, err.message === 'premature close' ? null : err) |
| 22 | else if (end && !self._ended) self.end() |
| 23 | } |
| 24 | } |
| 25 | |
| 26 | var end = function(ws, fn) { |
| 27 | if (!ws) return fn() |
| 28 | if (ws._writableState && ws._writableState.finished) return fn() |
| 29 | if (ws._writableState) return ws.end(fn) |
| 30 | ws.end() |
| 31 | fn() |
| 32 | } |
| 33 | |
| 34 | var toStreams2 = function(rs) { |
| 35 | return new (stream.Readable)({objectMode:true, highWaterMark:16}).wrap(rs) |
| 36 | } |
| 37 | |
| 38 | var Duplexify = function(writable, readable, opts) { |
| 39 | if (!(this instanceof Duplexify)) return new Duplexify(writable, readable, opts) |
| 40 | stream.Duplex.call(this, opts) |
| 41 | |
| 42 | this._writable = null |
| 43 | this._readable = null |
| 44 | this._readable2 = null |
| 45 | |
| 46 | this._autoDestroy = !opts || opts.autoDestroy !== false |
| 47 | this._forwardDestroy = !opts || opts.destroy !== false |
| 48 | this._forwardEnd = !opts || opts.end !== false |
| 49 | this._corked = 1 // start corked |
| 50 | this._ondrain = null |
| 51 | this._drained = false |
| 52 | this._forwarding = false |
| 53 | this._unwrite = null |
| 54 | this._unread = null |
| 55 | this._ended = false |
| 56 | |
| 57 | this.destroyed = false |
| 58 | |
| 59 | if (writable) this.setWritable(writable) |
| 60 | if (readable) this.setReadable(readable) |
| 61 | } |
| 62 | |
| 63 | inherits(Duplexify, stream.Duplex) |
| 64 | |
| 65 | Duplexify.obj = function(writable, readable, opts) { |
| 66 | if (!opts) opts = {} |
| 67 | opts.objectMode = true |
| 68 | opts.highWaterMark = 16 |
| 69 | return new Duplexify(writable, readable, opts) |
| 70 | } |
| 71 | |
| 72 | Duplexify.prototype.cork = function() { |
| 73 | if (++this._corked === 1) this.emit('cork') |
| 74 | } |
| 75 | |
| 76 | Duplexify.prototype.uncork = function() { |
| 77 | if (this._corked && --this._corked === 0) this.emit('uncork') |
| 78 | } |
| 79 | |
| 80 | Duplexify.prototype.setWritable = function(writable) { |
| 81 | if (this._unwrite) this._unwrite() |
| 82 | |
| 83 | if (this.destroyed) { |
| 84 | if (writable && writable.destroy) writable.destroy() |
| 85 | return |
| 86 | } |
| 87 | |
| 88 | if (writable === null || writable === false) { |
| 89 | this.end() |
| 90 | return |
| 91 | } |
| 92 | |
| 93 | var self = this |
| 94 | var unend = eos(writable, {writable:true, readable:false}, destroyer(this, this._forwardEnd)) |
| 95 | |
| 96 | var ondrain = function() { |
| 97 | var ondrain = self._ondrain |
| 98 | self._ondrain = null |
| 99 | if (ondrain) ondrain() |
| 100 | } |
| 101 | |
| 102 | var clear = function() { |
| 103 | self._writable.removeListener('drain', ondrain) |
| 104 | unend() |
| 105 | } |
| 106 | |
| 107 | if (this._unwrite) process.nextTick(ondrain) // force a drain on stream reset to avoid livelocks |
| 108 | |
| 109 | this._writable = writable |
| 110 | this._writable.on('drain', ondrain) |
| 111 | this._unwrite = clear |
| 112 | |
| 113 | this.uncork() // always uncork setWritable |
| 114 | } |
| 115 | |
| 116 | Duplexify.prototype.setReadable = function(readable) { |
| 117 | if (this._unread) this._unread() |
| 118 | |
| 119 | if (this.destroyed) { |
| 120 | if (readable && readable.destroy) readable.destroy() |
| 121 | return |
| 122 | } |
| 123 | |
| 124 | if (readable === null || readable === false) { |
| 125 | this.push(null) |
| 126 | this.resume() |
| 127 | return |
| 128 | } |
| 129 | |
| 130 | var self = this |
| 131 | var unend = eos(readable, {writable:false, readable:true}, destroyer(this)) |
| 132 | |
| 133 | var onreadable = function() { |
| 134 | self._forward() |
| 135 | } |
| 136 | |
| 137 | var onend = function() { |
| 138 | self.push(null) |
| 139 | } |
| 140 | |
| 141 | var clear = function() { |
| 142 | self._readable2.removeListener('readable', onreadable) |
| 143 | self._readable2.removeListener('end', onend) |
| 144 | unend() |
| 145 | } |
| 146 | |
| 147 | this._drained = true |
| 148 | this._readable = readable |
| 149 | this._readable2 = readable._readableState ? readable : toStreams2(readable) |
| 150 | this._readable2.on('readable', onreadable) |
| 151 | this._readable2.on('end', onend) |
| 152 | this._unread = clear |
| 153 | |
| 154 | this._forward() |
| 155 | } |
| 156 | |
| 157 | Duplexify.prototype._read = function() { |
| 158 | this._drained = true |
| 159 | this._forward() |
| 160 | } |
| 161 | |
| 162 | Duplexify.prototype._forward = function() { |
| 163 | if (this._forwarding || !this._readable2 || !this._drained) return |
| 164 | this._forwarding = true |
| 165 | |
| 166 | var data |
| 167 | |
| 168 | while (this._drained && (data = shift(this._readable2)) !== null) { |
| 169 | if (this.destroyed) continue |
| 170 | this._drained = this.push(data) |
| 171 | } |
| 172 | |
| 173 | this._forwarding = false |
| 174 | } |
| 175 | |
| 176 | Duplexify.prototype.destroy = function(err) { |
| 177 | if (this.destroyed) return |
| 178 | this.destroyed = true |
| 179 | |
| 180 | var self = this |
| 181 | process.nextTick(function() { |
| 182 | self._destroy(err) |
| 183 | }) |
| 184 | } |
| 185 | |
| 186 | Duplexify.prototype._destroy = function(err) { |
| 187 | if (err) { |
| 188 | var ondrain = this._ondrain |
| 189 | this._ondrain = null |
| 190 | if (ondrain) ondrain(err) |
| 191 | else this.emit('error', err) |
| 192 | } |
| 193 | |
| 194 | if (this._forwardDestroy) { |
| 195 | if (this._readable && this._readable.destroy) this._readable.destroy() |
| 196 | if (this._writable && this._writable.destroy) this._writable.destroy() |
| 197 | } |
| 198 | |
| 199 | this.emit('close') |
| 200 | } |
| 201 | |
| 202 | Duplexify.prototype._write = function(data, enc, cb) { |
| 203 | if (this.destroyed) return cb() |
| 204 | if (this._corked) return onuncork(this, this._write.bind(this, data, enc, cb)) |
| 205 | if (data === SIGNAL_FLUSH) return this._finish(cb) |
| 206 | if (!this._writable) return cb() |
| 207 | |
| 208 | if (this._writable.write(data) === false) this._ondrain = cb |
| 209 | else cb() |
| 210 | } |
| 211 | |
| 212 | Duplexify.prototype._finish = function(cb) { |
| 213 | var self = this |
| 214 | this.emit('preend') |
| 215 | onuncork(this, function() { |
| 216 | end(self._forwardEnd && self._writable, function() { |
| 217 | // haxx to not emit prefinish twice |
| 218 | if (self._writableState.prefinished === false) self._writableState.prefinished = true |
| 219 | self.emit('prefinish') |
| 220 | onuncork(self, cb) |
| 221 | }) |
| 222 | }) |
| 223 | } |
| 224 | |
| 225 | Duplexify.prototype.end = function(data, enc, cb) { |
| 226 | if (typeof data === 'function') return this.end(null, null, data) |
| 227 | if (typeof enc === 'function') return this.end(data, null, enc) |
| 228 | this._ended = true |
| 229 | if (data) this.write(data) |
| 230 | if (!this._writableState.ending) this.write(SIGNAL_FLUSH) |
| 231 | return stream.Writable.prototype.end.call(this, cb) |
| 232 | } |
| 233 | |
| 234 | module.exports = Duplexify |