| 'use strict' |
| |
| var fs = require('fs') |
| var path = require('path') |
| var test = require('tape').test |
| var from = require('from2') |
| var crypto = require('crypto') |
| var sink = require('flush-write-stream') |
| var pump = require('pump') |
| var cloneable = require('./') |
| |
| test('basic passthrough', function (t) { |
| t.plan(2) |
| |
| var read = false |
| var source = from(function (size, next) { |
| if (read) { |
| this.push(null) |
| } else { |
| read = true |
| this.push('hello world') |
| } |
| next() |
| }) |
| |
| var instance = cloneable(source) |
| t.notOk(read, 'stream not started') |
| |
| instance.pipe(sink(function (chunk, enc, cb) { |
| t.equal(chunk.toString(), 'hello world', 'chunk matches') |
| cb() |
| })) |
| }) |
| |
| test('clone sync', function (t) { |
| t.plan(4) |
| |
| var read = false |
| var source = from(function (size, next) { |
| if (read) { |
| this.push(null) |
| } else { |
| read = true |
| this.push('hello world') |
| } |
| next() |
| }) |
| |
| var instance = cloneable(source) |
| t.notOk(read, 'stream not started') |
| |
| var cloned = instance.clone() |
| t.notOk(read, 'stream not started') |
| |
| instance.pipe(sink(function (chunk, enc, cb) { |
| t.equal(chunk.toString(), 'hello world', 'chunk matches') |
| cb() |
| })) |
| |
| cloned.pipe(sink(function (chunk, enc, cb) { |
| t.equal(chunk.toString(), 'hello world', 'chunk matches') |
| cb() |
| })) |
| }) |
| |
| test('clone async', function (t) { |
| t.plan(4) |
| |
| var read = false |
| var source = from(function (size, next) { |
| if (read) { |
| this.push(null) |
| } else { |
| read = true |
| this.push('hello world') |
| } |
| next() |
| }) |
| |
| var instance = cloneable(source) |
| t.notOk(read, 'stream not started') |
| |
| var cloned = instance.clone() |
| t.notOk(read, 'stream not started') |
| |
| instance.pipe(sink(function (chunk, enc, cb) { |
| t.equal(chunk.toString(), 'hello world', 'chunk matches') |
| cb() |
| })) |
| |
| setImmediate(function () { |
| cloned.pipe(sink(function (chunk, enc, cb) { |
| t.equal(chunk.toString(), 'hello world', 'chunk matches') |
| cb() |
| })) |
| }) |
| }) |
| |
| test('basic passthrough in obj mode', function (t) { |
| t.plan(2) |
| |
| var read = false |
| var source = from.obj(function (size, next) { |
| if (read) { |
| return this.push(null) |
| } else { |
| read = true |
| this.push({ hello: 'world' }) |
| } |
| next() |
| }) |
| |
| var instance = cloneable(source) |
| t.notOk(read, 'stream not started') |
| |
| instance.pipe(sink.obj(function (chunk, enc, cb) { |
| t.deepEqual(chunk, { hello: 'world' }, 'chunk matches') |
| cb() |
| })) |
| }) |
| |
| test('multiple clone in object mode', function (t) { |
| t.plan(4) |
| |
| var read = false |
| var source = from.obj(function (size, next) { |
| if (read) { |
| return this.push(null) |
| } else { |
| read = true |
| this.push({ hello: 'world' }) |
| } |
| next() |
| }) |
| |
| var instance = cloneable(source) |
| t.notOk(read, 'stream not started') |
| |
| var cloned = instance.clone() |
| t.notOk(read, 'stream not started') |
| |
| instance.pipe(sink.obj(function (chunk, enc, cb) { |
| t.deepEqual(chunk, { hello: 'world' }, 'chunk matches') |
| cb() |
| })) |
| |
| setImmediate(function () { |
| cloned.pipe(sink.obj(function (chunk, enc, cb) { |
| t.deepEqual(chunk, { hello: 'world' }, 'chunk matches') |
| cb() |
| })) |
| }) |
| }) |
| |
| test('basic passthrough with data event', function (t) { |
| t.plan(2) |
| |
| var read = false |
| var source = from(function (size, next) { |
| if (read) { |
| this.push(null) |
| } else { |
| read = true |
| this.push('hello world') |
| } |
| next() |
| }) |
| |
| var instance = cloneable(source) |
| t.notOk(read, 'stream not started') |
| |
| var data = '' |
| instance.on('data', function (chunk) { |
| data += chunk.toString() |
| }) |
| |
| instance.on('end', function () { |
| t.equal(data, 'hello world', 'chunk matches') |
| }) |
| }) |
| |
| test('basic passthrough with data event on clone', function (t) { |
| t.plan(3) |
| |
| var read = false |
| var source = from(function (size, next) { |
| if (read) { |
| this.push(null) |
| } else { |
| read = true |
| this.push('hello world') |
| } |
| next() |
| }) |
| |
| var instance = cloneable(source) |
| var cloned = instance.clone() |
| |
| t.notOk(read, 'stream not started') |
| |
| var data = '' |
| cloned.on('data', function (chunk) { |
| data += chunk.toString() |
| }) |
| |
| cloned.on('end', function () { |
| t.equal(data, 'hello world', 'chunk matches in clone') |
| }) |
| |
| instance.pipe(sink(function (chunk, enc, cb) { |
| t.equal(chunk.toString(), 'hello world', 'chunk matches in instance') |
| cb() |
| })) |
| }) |
| |
| test('errors if cloned after start', function (t) { |
| t.plan(2) |
| |
| var source = from(function (size, next) { |
| this.push('hello world') |
| this.push(null) |
| next() |
| }) |
| |
| var instance = cloneable(source) |
| |
| instance.pipe(sink(function (chunk, enc, cb) { |
| t.equal(chunk.toString(), 'hello world', 'chunk matches') |
| t.throws(function () { |
| instance.clone() |
| }, 'throws if cloned after start') |
| cb() |
| })) |
| }) |
| |
| test('basic passthrough with readable event', function (t) { |
| t.plan(2) |
| |
| var read = false |
| var source = from(function (size, next) { |
| if (read) { |
| this.push(null) |
| } else { |
| read = true |
| this.push('hello world') |
| } |
| next() |
| }) |
| |
| var instance = cloneable(source) |
| t.notOk(read, 'stream not started') |
| |
| var data = '' |
| instance.on('readable', function () { |
| var chunk |
| while ((chunk = this.read()) !== null) { |
| data += chunk.toString() |
| } |
| }) |
| |
| instance.on('end', function () { |
| t.equal(data, 'hello world', 'chunk matches') |
| }) |
| }) |
| |
| test('basic passthrough with readable event on clone', function (t) { |
| t.plan(3) |
| |
| var read = false |
| var source = from(function (size, next) { |
| if (read) { |
| this.push(null) |
| } else { |
| read = true |
| this.push('hello world') |
| } |
| next() |
| }) |
| |
| var instance = cloneable(source) |
| var cloned = instance.clone() |
| |
| t.notOk(read, 'stream not started') |
| |
| var data = '' |
| cloned.on('readable', function () { |
| var chunk |
| while ((chunk = this.read()) !== null) { |
| data += chunk.toString() |
| } |
| }) |
| |
| cloned.on('end', function () { |
| t.equal(data, 'hello world', 'chunk matches in clone') |
| }) |
| |
| instance.pipe(sink(function (chunk, enc, cb) { |
| t.equal(chunk.toString(), 'hello world', 'chunk matches in instance') |
| cb() |
| })) |
| }) |
| |
| test('source error destroys all', function (t) { |
| t.plan(3) |
| |
| var source = from() |
| var instance = cloneable(source) |
| var clone = instance.clone() |
| |
| source.on('error', function (err) { |
| t.ok(err, 'source errors') |
| |
| instance.on('error', function (err2) { |
| t.ok(err === err2, 'instance receives same error') |
| }) |
| |
| clone.on('error', function (err3) { |
| t.ok(err === err3, 'clone receives same error') |
| }) |
| }) |
| |
| source.emit('error', new Error()) |
| }) |
| |
| test('source destroy destroys all', function (t) { |
| t.plan(2) |
| |
| var source = from() |
| var instance = cloneable(source) |
| var clone = instance.clone() |
| |
| instance.on('end', function () { |
| t.pass('instance has ended') |
| }) |
| |
| clone.on('end', function () { |
| t.pass('clone has ended') |
| }) |
| |
| clone.resume() |
| instance.resume() |
| |
| source.destroy() |
| }) |
| |
| test('instance error destroys all but the source', function (t) { |
| t.plan(2) |
| |
| var source = from() |
| var instance = cloneable(source) |
| var clone = instance.clone() |
| |
| source.on('close', function () { |
| t.fail('source should not be closed') |
| }) |
| |
| instance.on('error', function (err) { |
| t.is(err.message, 'beep', 'instance errors') |
| }) |
| |
| instance.on('close', function () { |
| t.fail('close should not be emitted') |
| }) |
| |
| clone.on('error', function (err) { |
| t.is(err.message, 'beep', 'instance errors') |
| }) |
| |
| clone.on('close', function () { |
| t.fail('close should not be emitted') |
| }) |
| |
| instance.destroy(new Error('beep')) |
| }) |
| |
| test('instance destroy destroys all but the source', function (t) { |
| t.plan(2) |
| |
| var source = from() |
| var instance = cloneable(source) |
| var clone = instance.clone() |
| |
| source.on('close', function () { |
| t.fail('source should not be closed') |
| }) |
| |
| instance.on('end', function () { |
| t.pass('instance has ended') |
| }) |
| |
| clone.on('end', function () { |
| t.pass('clone has ended') |
| }) |
| |
| instance.resume() |
| clone.resume() |
| |
| instance.destroy() |
| }) |
| |
| test('clone destroy does not affect other clones, cloneable or source', function (t) { |
| t.plan(1) |
| |
| var source = from() |
| var instance = cloneable(source) |
| var clone = instance.clone() |
| var other = instance.clone() |
| |
| source.on('close', function () { |
| t.fail('source should not be closed') |
| }) |
| |
| instance.on('close', function () { |
| t.fail('instance should not be closed') |
| }) |
| |
| other.on('close', function () { |
| t.fail('other clone should not be closed') |
| }) |
| |
| clone.on('close', function () { |
| t.pass('clone is closed') |
| }) |
| |
| clone.destroy() |
| }) |
| |
| test('clone remains readable if other is destroyed', function (t) { |
| t.plan(3) |
| |
| var read = false |
| var source = from(function (size, next) { |
| if (read) { |
| this.push(null) |
| } else { |
| read = true |
| this.push('hello') |
| } |
| next() |
| }) |
| |
| var instance = cloneable(source) |
| var clone = instance.clone() |
| var other = instance.clone() |
| |
| instance.pipe(sink.obj(function (chunk, enc, cb) { |
| t.deepEqual(chunk.toString(), 'hello', 'instance chunk matches') |
| cb() |
| })) |
| |
| clone.pipe(sink.obj(function (chunk, enc, cb) { |
| t.deepEqual(chunk.toString(), 'hello', 'clone chunk matches') |
| cb() |
| })) |
| |
| clone.on('close', function () { |
| t.fail('clone should not be closed') |
| }) |
| |
| instance.on('close', function () { |
| t.fail('instance should not be closed') |
| }) |
| |
| other.on('close', function () { |
| t.pass('other is closed') |
| }) |
| |
| other.destroy() |
| }) |
| |
| test('clone of clone', function (t) { |
| t.plan(6) |
| |
| var read = false |
| var source = from(function (size, next) { |
| if (read) { |
| this.push(null) |
| } else { |
| read = true |
| this.push('hello world') |
| } |
| next() |
| }) |
| |
| var instance = cloneable(source) |
| t.notOk(read, 'stream not started') |
| |
| var cloned = instance.clone() |
| t.notOk(read, 'stream not started') |
| |
| var replica = cloned.clone() |
| t.notOk(read, 'stream not started') |
| |
| instance.pipe(sink(function (chunk, enc, cb) { |
| t.equal(chunk.toString(), 'hello world', 'chunk matches') |
| cb() |
| })) |
| |
| cloned.pipe(sink(function (chunk, enc, cb) { |
| t.equal(chunk.toString(), 'hello world', 'chunk matches') |
| cb() |
| })) |
| |
| replica.pipe(sink(function (chunk, enc, cb) { |
| t.equal(chunk.toString(), 'hello world', 'chunk matches') |
| cb() |
| })) |
| }) |
| |
| test('from vinyl', function (t) { |
| t.plan(3) |
| |
| var source = from(['wa', 'dup']) |
| |
| var instance = cloneable(source) |
| var clone = instance.clone() |
| |
| var data = '' |
| var data2 = '' |
| var ends = 2 |
| |
| function latch () { |
| if (--ends === 0) { |
| t.equal(data, data2) |
| } |
| } |
| |
| instance.on('data', function (chunk) { |
| data += chunk.toString() |
| }) |
| |
| process.nextTick(function () { |
| t.equal('', data, 'nothing was written yet') |
| t.equal('', data2, 'nothing was written yet') |
| |
| clone.on('data', function (chunk) { |
| data2 += chunk.toString() |
| }) |
| }) |
| |
| instance.on('end', latch) |
| clone.on('end', latch) |
| }) |
| |
| test('waits till all are flowing', function (t) { |
| t.plan(1) |
| |
| var source = from(['wa', 'dup']) |
| |
| var instance = cloneable(source) |
| |
| // we create a clone |
| instance.clone() |
| |
| instance.on('data', function (chunk) { |
| t.fail('this should never happen') |
| }) |
| |
| process.nextTick(function () { |
| t.pass('wait till nextTick') |
| }) |
| }) |
| |
| test('isCloneable', function (t) { |
| t.plan(4) |
| |
| var source = from(['hello', ' ', 'world']) |
| t.notOk(cloneable.isCloneable(source), 'a generic readable is not cloneable') |
| |
| var instance = cloneable(source) |
| t.ok(cloneable.isCloneable(instance), 'a cloneable is cloneable') |
| |
| var clone = instance.clone() |
| t.ok(cloneable.isCloneable(clone), 'a clone is cloneable') |
| |
| var cloneClone = clone.clone() |
| t.ok(cloneable.isCloneable(cloneClone), 'a clone of a clone is cloneable') |
| }) |
| |
| test('emits finish', function (t) { |
| var chunks = ['a', 'b', 'c', 'd', null] |
| var e1 = ['a', 'b', 'c', 'd'] |
| var e2 = ['a', 'b', 'c', 'd'] |
| |
| t.plan(2 + e1.length + e2.length) |
| |
| var source = from(function (size, next) { |
| setImmediate(next, null, chunks.shift()) |
| }) |
| |
| var instance = cloneable(source) |
| |
| var clone = instance.clone() |
| |
| clone.on('finish', t.pass.bind(null, 'clone emits finish')) |
| instance.on('finish', t.pass.bind(null, 'main emits finish')) |
| |
| instance.pipe(sink(function (chunk, enc, cb) { |
| t.equal(chunk.toString(), e1.shift(), 'chunk matches') |
| cb() |
| })) |
| |
| clone.on('data', function (chunk) { |
| t.equal(chunk.toString(), e2.shift(), 'chunk matches') |
| }) |
| }) |
| |
| test('clone async w resume', function (t) { |
| t.plan(4) |
| |
| var read = false |
| var source = from(function (size, next) { |
| if (read) { |
| this.push(null) |
| } else { |
| read = true |
| this.push('hello world') |
| } |
| next() |
| }) |
| |
| var instance = cloneable(source) |
| t.notOk(read, 'stream not started') |
| |
| var cloned = instance.clone() |
| t.notOk(read, 'stream not started') |
| |
| instance.on('end', t.pass.bind(null, 'end emitted')) |
| instance.resume() |
| |
| setImmediate(function () { |
| cloned.on('end', t.pass.bind(null, 'end emitted')) |
| cloned.resume() |
| }) |
| }) |
| |
| test('big file', function (t) { |
| t.plan(13) |
| |
| var stream = cloneable(fs.createReadStream(path.join(__dirname, 'big'))) |
| var hash = crypto.createHash('sha1') |
| hash.setEncoding('hex') |
| |
| var toCheck |
| |
| fs.createReadStream(path.join(__dirname, 'big')) |
| .pipe(hash) |
| .once('readable', function () { |
| toCheck = hash.read() |
| t.ok(toCheck) |
| }) |
| |
| function pipe (s, num) { |
| s.on('end', function () { |
| t.pass('end for ' + num) |
| }) |
| |
| var dest = path.join(__dirname, 'out') |
| |
| s.pipe(fs.createWriteStream(dest)) |
| .on('finish', function () { |
| t.pass('finish for ' + num) |
| |
| var destHash = crypto.createHash('sha1') |
| destHash.setEncoding('hex') |
| |
| fs.createReadStream(dest) |
| .pipe(destHash) |
| .once('readable', function () { |
| var hash = destHash.read() |
| t.ok(hash) |
| t.equal(hash, toCheck) |
| }) |
| }) |
| } |
| |
| // Pipe in another event loop tick <-- this one finished only, it's the original cloneable. |
| setImmediate(pipe.bind(null, stream, 1)) |
| |
| // Pipe in the same event loop tick |
| pipe(stream.clone(), 0) |
| |
| // Pipe a long time after |
| setTimeout(pipe.bind(null, stream.clone(), 2), 1000) |
| }) |
| |
| test('pump error', function (t) { |
| t.plan(1) |
| |
| var err = new Error('kaboom') |
| |
| pump([ |
| cloneable(from(function () { |
| this.destroy(err) |
| })), |
| sink(function (chunk, enc, cb) { |
| t.fail('this should not be called') |
| }) |
| ], function (_err) { |
| t.equal(_err, err) |
| }) |
| }) |