blob: 35bb78bb8df0d72c9e963a6eb87c5cb2f14bd308 [file] [log] [blame]
Leo Repp58b9f112021-11-22 11:57:47 +01001'use strict'
2
3var fs = require('fs')
4var path = require('path')
5var test = require('tape').test
6var from = require('from2')
7var crypto = require('crypto')
8var sink = require('flush-write-stream')
9var pump = require('pump')
10var cloneable = require('./')
11
12test('basic passthrough', function (t) {
13 t.plan(2)
14
15 var read = false
16 var source = from(function (size, next) {
17 if (read) {
18 this.push(null)
19 } else {
20 read = true
21 this.push('hello world')
22 }
23 next()
24 })
25
26 var instance = cloneable(source)
27 t.notOk(read, 'stream not started')
28
29 instance.pipe(sink(function (chunk, enc, cb) {
30 t.equal(chunk.toString(), 'hello world', 'chunk matches')
31 cb()
32 }))
33})
34
35test('clone sync', function (t) {
36 t.plan(4)
37
38 var read = false
39 var source = from(function (size, next) {
40 if (read) {
41 this.push(null)
42 } else {
43 read = true
44 this.push('hello world')
45 }
46 next()
47 })
48
49 var instance = cloneable(source)
50 t.notOk(read, 'stream not started')
51
52 var cloned = instance.clone()
53 t.notOk(read, 'stream not started')
54
55 instance.pipe(sink(function (chunk, enc, cb) {
56 t.equal(chunk.toString(), 'hello world', 'chunk matches')
57 cb()
58 }))
59
60 cloned.pipe(sink(function (chunk, enc, cb) {
61 t.equal(chunk.toString(), 'hello world', 'chunk matches')
62 cb()
63 }))
64})
65
66test('clone async', function (t) {
67 t.plan(4)
68
69 var read = false
70 var source = from(function (size, next) {
71 if (read) {
72 this.push(null)
73 } else {
74 read = true
75 this.push('hello world')
76 }
77 next()
78 })
79
80 var instance = cloneable(source)
81 t.notOk(read, 'stream not started')
82
83 var cloned = instance.clone()
84 t.notOk(read, 'stream not started')
85
86 instance.pipe(sink(function (chunk, enc, cb) {
87 t.equal(chunk.toString(), 'hello world', 'chunk matches')
88 cb()
89 }))
90
91 setImmediate(function () {
92 cloned.pipe(sink(function (chunk, enc, cb) {
93 t.equal(chunk.toString(), 'hello world', 'chunk matches')
94 cb()
95 }))
96 })
97})
98
99test('basic passthrough in obj mode', function (t) {
100 t.plan(2)
101
102 var read = false
103 var source = from.obj(function (size, next) {
104 if (read) {
105 return this.push(null)
106 } else {
107 read = true
108 this.push({ hello: 'world' })
109 }
110 next()
111 })
112
113 var instance = cloneable(source)
114 t.notOk(read, 'stream not started')
115
116 instance.pipe(sink.obj(function (chunk, enc, cb) {
117 t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
118 cb()
119 }))
120})
121
122test('multiple clone in object mode', function (t) {
123 t.plan(4)
124
125 var read = false
126 var source = from.obj(function (size, next) {
127 if (read) {
128 return this.push(null)
129 } else {
130 read = true
131 this.push({ hello: 'world' })
132 }
133 next()
134 })
135
136 var instance = cloneable(source)
137 t.notOk(read, 'stream not started')
138
139 var cloned = instance.clone()
140 t.notOk(read, 'stream not started')
141
142 instance.pipe(sink.obj(function (chunk, enc, cb) {
143 t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
144 cb()
145 }))
146
147 setImmediate(function () {
148 cloned.pipe(sink.obj(function (chunk, enc, cb) {
149 t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
150 cb()
151 }))
152 })
153})
154
155test('basic passthrough with data event', function (t) {
156 t.plan(2)
157
158 var read = false
159 var source = from(function (size, next) {
160 if (read) {
161 this.push(null)
162 } else {
163 read = true
164 this.push('hello world')
165 }
166 next()
167 })
168
169 var instance = cloneable(source)
170 t.notOk(read, 'stream not started')
171
172 var data = ''
173 instance.on('data', function (chunk) {
174 data += chunk.toString()
175 })
176
177 instance.on('end', function () {
178 t.equal(data, 'hello world', 'chunk matches')
179 })
180})
181
182test('basic passthrough with data event on clone', function (t) {
183 t.plan(3)
184
185 var read = false
186 var source = from(function (size, next) {
187 if (read) {
188 this.push(null)
189 } else {
190 read = true
191 this.push('hello world')
192 }
193 next()
194 })
195
196 var instance = cloneable(source)
197 var cloned = instance.clone()
198
199 t.notOk(read, 'stream not started')
200
201 var data = ''
202 cloned.on('data', function (chunk) {
203 data += chunk.toString()
204 })
205
206 cloned.on('end', function () {
207 t.equal(data, 'hello world', 'chunk matches in clone')
208 })
209
210 instance.pipe(sink(function (chunk, enc, cb) {
211 t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')
212 cb()
213 }))
214})
215
216test('errors if cloned after start', function (t) {
217 t.plan(2)
218
219 var source = from(function (size, next) {
220 this.push('hello world')
221 this.push(null)
222 next()
223 })
224
225 var instance = cloneable(source)
226
227 instance.pipe(sink(function (chunk, enc, cb) {
228 t.equal(chunk.toString(), 'hello world', 'chunk matches')
229 t.throws(function () {
230 instance.clone()
231 }, 'throws if cloned after start')
232 cb()
233 }))
234})
235
236test('basic passthrough with readable event', function (t) {
237 t.plan(2)
238
239 var read = false
240 var source = from(function (size, next) {
241 if (read) {
242 this.push(null)
243 } else {
244 read = true
245 this.push('hello world')
246 }
247 next()
248 })
249
250 var instance = cloneable(source)
251 t.notOk(read, 'stream not started')
252
253 var data = ''
254 instance.on('readable', function () {
255 var chunk
256 while ((chunk = this.read()) !== null) {
257 data += chunk.toString()
258 }
259 })
260
261 instance.on('end', function () {
262 t.equal(data, 'hello world', 'chunk matches')
263 })
264})
265
266test('basic passthrough with readable event on clone', function (t) {
267 t.plan(3)
268
269 var read = false
270 var source = from(function (size, next) {
271 if (read) {
272 this.push(null)
273 } else {
274 read = true
275 this.push('hello world')
276 }
277 next()
278 })
279
280 var instance = cloneable(source)
281 var cloned = instance.clone()
282
283 t.notOk(read, 'stream not started')
284
285 var data = ''
286 cloned.on('readable', function () {
287 var chunk
288 while ((chunk = this.read()) !== null) {
289 data += chunk.toString()
290 }
291 })
292
293 cloned.on('end', function () {
294 t.equal(data, 'hello world', 'chunk matches in clone')
295 })
296
297 instance.pipe(sink(function (chunk, enc, cb) {
298 t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')
299 cb()
300 }))
301})
302
303test('source error destroys all', function (t) {
304 t.plan(3)
305
306 var source = from()
307 var instance = cloneable(source)
308 var clone = instance.clone()
309
310 source.on('error', function (err) {
311 t.ok(err, 'source errors')
312
313 instance.on('error', function (err2) {
314 t.ok(err === err2, 'instance receives same error')
315 })
316
317 clone.on('error', function (err3) {
318 t.ok(err === err3, 'clone receives same error')
319 })
320 })
321
322 source.emit('error', new Error())
323})
324
325test('source destroy destroys all', function (t) {
326 t.plan(2)
327
328 var source = from()
329 var instance = cloneable(source)
330 var clone = instance.clone()
331
332 instance.on('end', function () {
333 t.pass('instance has ended')
334 })
335
336 clone.on('end', function () {
337 t.pass('clone has ended')
338 })
339
340 clone.resume()
341 instance.resume()
342
343 source.destroy()
344})
345
346test('instance error destroys all but the source', function (t) {
347 t.plan(2)
348
349 var source = from()
350 var instance = cloneable(source)
351 var clone = instance.clone()
352
353 source.on('close', function () {
354 t.fail('source should not be closed')
355 })
356
357 instance.on('error', function (err) {
358 t.is(err.message, 'beep', 'instance errors')
359 })
360
361 instance.on('close', function () {
362 t.fail('close should not be emitted')
363 })
364
365 clone.on('error', function (err) {
366 t.is(err.message, 'beep', 'instance errors')
367 })
368
369 clone.on('close', function () {
370 t.fail('close should not be emitted')
371 })
372
373 instance.destroy(new Error('beep'))
374})
375
376test('instance destroy destroys all but the source', function (t) {
377 t.plan(2)
378
379 var source = from()
380 var instance = cloneable(source)
381 var clone = instance.clone()
382
383 source.on('close', function () {
384 t.fail('source should not be closed')
385 })
386
387 instance.on('end', function () {
388 t.pass('instance has ended')
389 })
390
391 clone.on('end', function () {
392 t.pass('clone has ended')
393 })
394
395 instance.resume()
396 clone.resume()
397
398 instance.destroy()
399})
400
401test('clone destroy does not affect other clones, cloneable or source', function (t) {
402 t.plan(1)
403
404 var source = from()
405 var instance = cloneable(source)
406 var clone = instance.clone()
407 var other = instance.clone()
408
409 source.on('close', function () {
410 t.fail('source should not be closed')
411 })
412
413 instance.on('close', function () {
414 t.fail('instance should not be closed')
415 })
416
417 other.on('close', function () {
418 t.fail('other clone should not be closed')
419 })
420
421 clone.on('close', function () {
422 t.pass('clone is closed')
423 })
424
425 clone.destroy()
426})
427
428test('clone remains readable if other is destroyed', function (t) {
429 t.plan(3)
430
431 var read = false
432 var source = from(function (size, next) {
433 if (read) {
434 this.push(null)
435 } else {
436 read = true
437 this.push('hello')
438 }
439 next()
440 })
441
442 var instance = cloneable(source)
443 var clone = instance.clone()
444 var other = instance.clone()
445
446 instance.pipe(sink.obj(function (chunk, enc, cb) {
447 t.deepEqual(chunk.toString(), 'hello', 'instance chunk matches')
448 cb()
449 }))
450
451 clone.pipe(sink.obj(function (chunk, enc, cb) {
452 t.deepEqual(chunk.toString(), 'hello', 'clone chunk matches')
453 cb()
454 }))
455
456 clone.on('close', function () {
457 t.fail('clone should not be closed')
458 })
459
460 instance.on('close', function () {
461 t.fail('instance should not be closed')
462 })
463
464 other.on('close', function () {
465 t.pass('other is closed')
466 })
467
468 other.destroy()
469})
470
471test('clone of clone', function (t) {
472 t.plan(6)
473
474 var read = false
475 var source = from(function (size, next) {
476 if (read) {
477 this.push(null)
478 } else {
479 read = true
480 this.push('hello world')
481 }
482 next()
483 })
484
485 var instance = cloneable(source)
486 t.notOk(read, 'stream not started')
487
488 var cloned = instance.clone()
489 t.notOk(read, 'stream not started')
490
491 var replica = cloned.clone()
492 t.notOk(read, 'stream not started')
493
494 instance.pipe(sink(function (chunk, enc, cb) {
495 t.equal(chunk.toString(), 'hello world', 'chunk matches')
496 cb()
497 }))
498
499 cloned.pipe(sink(function (chunk, enc, cb) {
500 t.equal(chunk.toString(), 'hello world', 'chunk matches')
501 cb()
502 }))
503
504 replica.pipe(sink(function (chunk, enc, cb) {
505 t.equal(chunk.toString(), 'hello world', 'chunk matches')
506 cb()
507 }))
508})
509
510test('from vinyl', function (t) {
511 t.plan(3)
512
513 var source = from(['wa', 'dup'])
514
515 var instance = cloneable(source)
516 var clone = instance.clone()
517
518 var data = ''
519 var data2 = ''
520 var ends = 2
521
522 function latch () {
523 if (--ends === 0) {
524 t.equal(data, data2)
525 }
526 }
527
528 instance.on('data', function (chunk) {
529 data += chunk.toString()
530 })
531
532 process.nextTick(function () {
533 t.equal('', data, 'nothing was written yet')
534 t.equal('', data2, 'nothing was written yet')
535
536 clone.on('data', function (chunk) {
537 data2 += chunk.toString()
538 })
539 })
540
541 instance.on('end', latch)
542 clone.on('end', latch)
543})
544
545test('waits till all are flowing', function (t) {
546 t.plan(1)
547
548 var source = from(['wa', 'dup'])
549
550 var instance = cloneable(source)
551
552 // we create a clone
553 instance.clone()
554
555 instance.on('data', function (chunk) {
556 t.fail('this should never happen')
557 })
558
559 process.nextTick(function () {
560 t.pass('wait till nextTick')
561 })
562})
563
564test('isCloneable', function (t) {
565 t.plan(4)
566
567 var source = from(['hello', ' ', 'world'])
568 t.notOk(cloneable.isCloneable(source), 'a generic readable is not cloneable')
569
570 var instance = cloneable(source)
571 t.ok(cloneable.isCloneable(instance), 'a cloneable is cloneable')
572
573 var clone = instance.clone()
574 t.ok(cloneable.isCloneable(clone), 'a clone is cloneable')
575
576 var cloneClone = clone.clone()
577 t.ok(cloneable.isCloneable(cloneClone), 'a clone of a clone is cloneable')
578})
579
580test('emits finish', function (t) {
581 var chunks = ['a', 'b', 'c', 'd', null]
582 var e1 = ['a', 'b', 'c', 'd']
583 var e2 = ['a', 'b', 'c', 'd']
584
585 t.plan(2 + e1.length + e2.length)
586
587 var source = from(function (size, next) {
588 setImmediate(next, null, chunks.shift())
589 })
590
591 var instance = cloneable(source)
592
593 var clone = instance.clone()
594
595 clone.on('finish', t.pass.bind(null, 'clone emits finish'))
596 instance.on('finish', t.pass.bind(null, 'main emits finish'))
597
598 instance.pipe(sink(function (chunk, enc, cb) {
599 t.equal(chunk.toString(), e1.shift(), 'chunk matches')
600 cb()
601 }))
602
603 clone.on('data', function (chunk) {
604 t.equal(chunk.toString(), e2.shift(), 'chunk matches')
605 })
606})
607
608test('clone async w resume', function (t) {
609 t.plan(4)
610
611 var read = false
612 var source = from(function (size, next) {
613 if (read) {
614 this.push(null)
615 } else {
616 read = true
617 this.push('hello world')
618 }
619 next()
620 })
621
622 var instance = cloneable(source)
623 t.notOk(read, 'stream not started')
624
625 var cloned = instance.clone()
626 t.notOk(read, 'stream not started')
627
628 instance.on('end', t.pass.bind(null, 'end emitted'))
629 instance.resume()
630
631 setImmediate(function () {
632 cloned.on('end', t.pass.bind(null, 'end emitted'))
633 cloned.resume()
634 })
635})
636
637test('big file', function (t) {
638 t.plan(13)
639
640 var stream = cloneable(fs.createReadStream(path.join(__dirname, 'big')))
641 var hash = crypto.createHash('sha1')
642 hash.setEncoding('hex')
643
644 var toCheck
645
646 fs.createReadStream(path.join(__dirname, 'big'))
647 .pipe(hash)
648 .once('readable', function () {
649 toCheck = hash.read()
650 t.ok(toCheck)
651 })
652
653 function pipe (s, num) {
654 s.on('end', function () {
655 t.pass('end for ' + num)
656 })
657
658 var dest = path.join(__dirname, 'out')
659
660 s.pipe(fs.createWriteStream(dest))
661 .on('finish', function () {
662 t.pass('finish for ' + num)
663
664 var destHash = crypto.createHash('sha1')
665 destHash.setEncoding('hex')
666
667 fs.createReadStream(dest)
668 .pipe(destHash)
669 .once('readable', function () {
670 var hash = destHash.read()
671 t.ok(hash)
672 t.equal(hash, toCheck)
673 })
674 })
675 }
676
677 // Pipe in another event loop tick <-- this one finished only, it's the original cloneable.
678 setImmediate(pipe.bind(null, stream, 1))
679
680 // Pipe in the same event loop tick
681 pipe(stream.clone(), 0)
682
683 // Pipe a long time after
684 setTimeout(pipe.bind(null, stream.clone(), 2), 1000)
685})
686
687test('pump error', function (t) {
688 t.plan(1)
689
690 var err = new Error('kaboom')
691
692 pump([
693 cloneable(from(function () {
694 this.destroy(err)
695 })),
696 sink(function (chunk, enc, cb) {
697 t.fail('this should not be called')
698 })
699 ], function (_err) {
700 t.equal(_err, err)
701 })
702})