blob: 5e660f4944c80090abfe893544751c78c1f4c707 [file] [log] [blame]
Leo Repp58b9f112021-11-22 11:57:47 +01001'use strict'
2
3var test = require('tape')
4var buildQueue = require('../')
5
6test('worker execution', function (t) {
7 t.plan(3)
8
9 var queue = buildQueue(worker, 1)
10
11 queue.push(42, function (err, result) {
12 t.error(err, 'no error')
13 t.equal(result, true, 'result matches')
14 })
15
16 function worker (arg, cb) {
17 t.equal(arg, 42)
18 cb(null, true)
19 }
20})
21
22test('limit', function (t) {
23 t.plan(4)
24
25 var expected = [10, 0]
26 var queue = buildQueue(worker, 1)
27
28 queue.push(10, result)
29 queue.push(0, result)
30
31 function result (err, arg) {
32 t.error(err, 'no error')
33 t.equal(arg, expected.shift(), 'the result matches')
34 }
35
36 function worker (arg, cb) {
37 setTimeout(cb, arg, null, arg)
38 }
39})
40
41test('multiple executions', function (t) {
42 t.plan(15)
43
44 var queue = buildQueue(worker, 1)
45 var toExec = [1, 2, 3, 4, 5]
46 var count = 0
47
48 toExec.forEach(function (task) {
49 queue.push(task, done)
50 })
51
52 function done (err, result) {
53 t.error(err, 'no error')
54 t.equal(result, toExec[count - 1], 'the result matches')
55 }
56
57 function worker (arg, cb) {
58 t.equal(arg, toExec[count], 'arg matches')
59 count++
60 setImmediate(cb, null, arg)
61 }
62})
63
64test('multiple executions, one after another', function (t) {
65 t.plan(15)
66
67 var queue = buildQueue(worker, 1)
68 var toExec = [1, 2, 3, 4, 5]
69 var count = 0
70
71 queue.push(toExec[0], done)
72
73 function done (err, result) {
74 t.error(err, 'no error')
75 t.equal(result, toExec[count - 1], 'the result matches')
76 if (count < toExec.length) {
77 queue.push(toExec[count], done)
78 }
79 }
80
81 function worker (arg, cb) {
82 t.equal(arg, toExec[count], 'arg matches')
83 count++
84 setImmediate(cb, null, arg)
85 }
86})
87
88test('set this', function (t) {
89 t.plan(3)
90
91 var that = {}
92 var queue = buildQueue(that, worker, 1)
93
94 queue.push(42, function (err, result) {
95 t.error(err, 'no error')
96 t.equal(this, that, 'this matches')
97 })
98
99 function worker (arg, cb) {
100 t.equal(this, that, 'this matches')
101 cb(null, true)
102 }
103})
104
105test('drain', function (t) {
106 t.plan(4)
107
108 var queue = buildQueue(worker, 1)
109 var worked = false
110
111 queue.push(42, function (err, result) {
112 t.error(err, 'no error')
113 t.equal(result, true, 'result matches')
114 })
115
116 queue.drain = function () {
117 t.equal(true, worked, 'drained')
118 }
119
120 function worker (arg, cb) {
121 t.equal(arg, 42)
122 worked = true
123 setImmediate(cb, null, true)
124 }
125})
126
127test('pause && resume', function (t) {
128 t.plan(7)
129
130 var queue = buildQueue(worker, 1)
131 var worked = false
132
133 t.notOk(queue.paused, 'it should not be paused')
134
135 queue.pause()
136
137 queue.push(42, function (err, result) {
138 t.error(err, 'no error')
139 t.equal(result, true, 'result matches')
140 })
141
142 t.notOk(worked, 'it should be paused')
143 t.ok(queue.paused, 'it should be paused')
144
145 queue.resume()
146 queue.resume() // second resume is a no-op
147
148 t.notOk(queue.paused, 'it should not be paused')
149
150 function worker (arg, cb) {
151 t.equal(arg, 42)
152 worked = true
153 cb(null, true)
154 }
155})
156
157test('pause in flight && resume', function (t) {
158 t.plan(9)
159
160 var queue = buildQueue(worker, 1)
161 var expected = [42, 24]
162
163 t.notOk(queue.paused, 'it should not be paused')
164
165 queue.push(42, function (err, result) {
166 t.error(err, 'no error')
167 t.equal(result, true, 'result matches')
168 t.ok(queue.paused, 'it should be paused')
169 process.nextTick(function () { queue.resume() })
170 })
171
172 queue.push(24, function (err, result) {
173 t.error(err, 'no error')
174 t.equal(result, true, 'result matches')
175 t.notOk(queue.paused, 'it should not be paused')
176 })
177
178 queue.pause()
179
180 function worker (arg, cb) {
181 t.equal(arg, expected.shift())
182 process.nextTick(function () { cb(null, true) })
183 }
184})
185
186test('altering concurrency', function (t) {
187 t.plan(7)
188
189 var queue = buildQueue(worker, 1)
190 var count = 0
191
192 queue.pause()
193
194 queue.push(24, workDone)
195 queue.push(24, workDone)
196
197 queue.concurrency = 2
198
199 queue.resume()
200
201 t.equal(queue.running(), 2, '2 jobs running')
202
203 function workDone (err, result) {
204 t.error(err, 'no error')
205 t.equal(result, true, 'result matches')
206 }
207
208 function worker (arg, cb) {
209 t.equal(0, count, 'works in parallel')
210 setImmediate(function () {
211 count++
212 cb(null, true)
213 })
214 }
215})
216
217test('idle()', function (t) {
218 t.plan(12)
219
220 var queue = buildQueue(worker, 1)
221
222 t.ok(queue.idle(), 'queue is idle')
223
224 queue.push(42, function (err, result) {
225 t.error(err, 'no error')
226 t.equal(result, true, 'result matches')
227 t.notOk(queue.idle(), 'queue is not idle')
228 })
229
230 queue.push(42, function (err, result) {
231 t.error(err, 'no error')
232 t.equal(result, true, 'result matches')
233 // it will go idle after executing this function
234 setImmediate(function () {
235 t.ok(queue.idle(), 'queue is now idle')
236 })
237 })
238
239 t.notOk(queue.idle(), 'queue is not idle')
240
241 function worker (arg, cb) {
242 t.notOk(queue.idle(), 'queue is not idle')
243 t.equal(arg, 42)
244 setImmediate(cb, null, true)
245 }
246})
247
248test('saturated', function (t) {
249 t.plan(9)
250
251 var queue = buildQueue(worker, 1)
252 var preworked = 0
253 var worked = 0
254
255 queue.saturated = function () {
256 t.pass('saturated')
257 t.equal(preworked, 1, 'started 1 task')
258 t.equal(worked, 0, 'worked zero task')
259 }
260
261 queue.push(42, done)
262 queue.push(42, done)
263
264 function done (err, result) {
265 t.error(err, 'no error')
266 t.equal(result, true, 'result matches')
267 }
268
269 function worker (arg, cb) {
270 t.equal(arg, 42)
271 preworked++
272 setImmediate(function () {
273 worked++
274 cb(null, true)
275 })
276 }
277})
278
279test('length', function (t) {
280 t.plan(7)
281
282 var queue = buildQueue(worker, 1)
283
284 t.equal(queue.length(), 0, 'nothing waiting')
285 queue.push(42, done)
286 t.equal(queue.length(), 0, 'nothing waiting')
287 queue.push(42, done)
288 t.equal(queue.length(), 1, 'one task waiting')
289 queue.push(42, done)
290 t.equal(queue.length(), 2, 'two tasks waiting')
291
292 function done (err, result) {
293 t.error(err, 'no error')
294 }
295
296 function worker (arg, cb) {
297 setImmediate(function () {
298 cb(null, true)
299 })
300 }
301})
302
303test('getQueue', function (t) {
304 t.plan(10)
305
306 var queue = buildQueue(worker, 1)
307
308 t.equal(queue.getQueue().length, 0, 'nothing waiting')
309 queue.push(42, done)
310 t.equal(queue.getQueue().length, 0, 'nothing waiting')
311 queue.push(42, done)
312 t.equal(queue.getQueue().length, 1, 'one task waiting')
313 t.equal(queue.getQueue()[0], 42, 'should be equal')
314 queue.push(43, done)
315 t.equal(queue.getQueue().length, 2, 'two tasks waiting')
316 t.equal(queue.getQueue()[0], 42, 'should be equal')
317 t.equal(queue.getQueue()[1], 43, 'should be equal')
318
319 function done (err, result) {
320 t.error(err, 'no error')
321 }
322
323 function worker (arg, cb) {
324 setImmediate(function () {
325 cb(null, true)
326 })
327 }
328})
329
330test('unshift', function (t) {
331 t.plan(8)
332
333 var queue = buildQueue(worker, 1)
334 var expected = [1, 2, 3, 4]
335
336 queue.push(1, done)
337 queue.push(4, done)
338 queue.unshift(3, done)
339 queue.unshift(2, done)
340
341 function done (err, result) {
342 t.error(err, 'no error')
343 }
344
345 function worker (arg, cb) {
346 t.equal(expected.shift(), arg, 'tasks come in order')
347 setImmediate(function () {
348 cb(null, true)
349 })
350 }
351})
352
353test('unshift && empty', function (t) {
354 t.plan(2)
355
356 var queue = buildQueue(worker, 1)
357 var completed = false
358
359 queue.pause()
360
361 queue.empty = function () {
362 t.notOk(completed, 'the task has not completed yet')
363 }
364
365 queue.unshift(1, done)
366
367 queue.resume()
368
369 function done (err, result) {
370 completed = true
371 t.error(err, 'no error')
372 }
373
374 function worker (arg, cb) {
375 setImmediate(function () {
376 cb(null, true)
377 })
378 }
379})
380
381test('push && empty', function (t) {
382 t.plan(2)
383
384 var queue = buildQueue(worker, 1)
385 var completed = false
386
387 queue.pause()
388
389 queue.empty = function () {
390 t.notOk(completed, 'the task has not completed yet')
391 }
392
393 queue.push(1, done)
394
395 queue.resume()
396
397 function done (err, result) {
398 completed = true
399 t.error(err, 'no error')
400 }
401
402 function worker (arg, cb) {
403 setImmediate(function () {
404 cb(null, true)
405 })
406 }
407})
408
409test('kill', function (t) {
410 t.plan(5)
411
412 var queue = buildQueue(worker, 1)
413 var expected = [1]
414
415 var predrain = queue.drain
416
417 queue.drain = function drain () {
418 t.fail('drain should never be called')
419 }
420
421 queue.push(1, done)
422 queue.push(4, done)
423 queue.unshift(3, done)
424 queue.unshift(2, done)
425 queue.kill()
426
427 function done (err, result) {
428 t.error(err, 'no error')
429 setImmediate(function () {
430 t.equal(queue.length(), 0, 'no queued tasks')
431 t.equal(queue.running(), 0, 'no running tasks')
432 t.equal(queue.drain, predrain, 'drain is back to default')
433 })
434 }
435
436 function worker (arg, cb) {
437 t.equal(expected.shift(), arg, 'tasks come in order')
438 setImmediate(function () {
439 cb(null, true)
440 })
441 }
442})
443
444test('killAndDrain', function (t) {
445 t.plan(6)
446
447 var queue = buildQueue(worker, 1)
448 var expected = [1]
449
450 var predrain = queue.drain
451
452 queue.drain = function drain () {
453 t.pass('drain has been called')
454 }
455
456 queue.push(1, done)
457 queue.push(4, done)
458 queue.unshift(3, done)
459 queue.unshift(2, done)
460 queue.killAndDrain()
461
462 function done (err, result) {
463 t.error(err, 'no error')
464 setImmediate(function () {
465 t.equal(queue.length(), 0, 'no queued tasks')
466 t.equal(queue.running(), 0, 'no running tasks')
467 t.equal(queue.drain, predrain, 'drain is back to default')
468 })
469 }
470
471 function worker (arg, cb) {
472 t.equal(expected.shift(), arg, 'tasks come in order')
473 setImmediate(function () {
474 cb(null, true)
475 })
476 }
477})
478
479test('pause && idle', function (t) {
480 t.plan(11)
481
482 var queue = buildQueue(worker, 1)
483 var worked = false
484
485 t.notOk(queue.paused, 'it should not be paused')
486 t.ok(queue.idle(), 'should be idle')
487
488 queue.pause()
489
490 queue.push(42, function (err, result) {
491 t.error(err, 'no error')
492 t.equal(result, true, 'result matches')
493 })
494
495 t.notOk(worked, 'it should be paused')
496 t.ok(queue.paused, 'it should be paused')
497 t.notOk(queue.idle(), 'should not be idle')
498
499 queue.resume()
500
501 t.notOk(queue.paused, 'it should not be paused')
502 t.notOk(queue.idle(), 'it should not be idle')
503
504 function worker (arg, cb) {
505 t.equal(arg, 42)
506 worked = true
507 process.nextTick(cb.bind(null, null, true))
508 process.nextTick(function () {
509 t.ok(queue.idle(), 'is should be idle')
510 })
511 }
512})
513
514test('push without cb', function (t) {
515 t.plan(1)
516
517 var queue = buildQueue(worker, 1)
518
519 queue.push(42)
520
521 function worker (arg, cb) {
522 t.equal(arg, 42)
523 cb()
524 }
525})
526
527test('unshift without cb', function (t) {
528 t.plan(1)
529
530 var queue = buildQueue(worker, 1)
531
532 queue.unshift(42)
533
534 function worker (arg, cb) {
535 t.equal(arg, 42)
536 cb()
537 }
538})
539
540test('push with worker throwing error', function (t) {
541 t.plan(5)
542 var q = buildQueue(function (task, cb) {
543 cb(new Error('test error'), null)
544 }, 1)
545 q.error(function (err, task) {
546 t.ok(err instanceof Error, 'global error handler should catch the error')
547 t.match(err.message, /test error/, 'error message should be "test error"')
548 t.equal(task, 42, 'The task executed should be passed')
549 })
550 q.push(42, function (err) {
551 t.ok(err instanceof Error, 'push callback should catch the error')
552 t.match(err.message, /test error/, 'error message should be "test error"')
553 })
554})