blob: f91477e1f5b3f37fc1f1512c818b37f8fe3dc854 [file] [log] [blame]
Leo Repp58b9f112021-11-22 11:57:47 +01001'use strict';
2module.exports = (iterable, mapper, opts) => new Promise((resolve, reject) => {
3 opts = Object.assign({
4 concurrency: Infinity
5 }, opts);
6
7 if (typeof mapper !== 'function') {
8 throw new TypeError('Mapper function is required');
9 }
10
11 const concurrency = opts.concurrency;
12
13 if (!(typeof concurrency === 'number' && concurrency >= 1)) {
14 throw new TypeError(`Expected \`concurrency\` to be a number from 1 and up, got \`${concurrency}\` (${typeof concurrency})`);
15 }
16
17 const ret = [];
18 const iterator = iterable[Symbol.iterator]();
19 let isRejected = false;
20 let iterableDone = false;
21 let resolvingCount = 0;
22 let currentIdx = 0;
23
24 const next = () => {
25 if (isRejected) {
26 return;
27 }
28
29 const nextItem = iterator.next();
30 const i = currentIdx;
31 currentIdx++;
32
33 if (nextItem.done) {
34 iterableDone = true;
35
36 if (resolvingCount === 0) {
37 resolve(ret);
38 }
39
40 return;
41 }
42
43 resolvingCount++;
44
45 Promise.resolve(nextItem.value)
46 .then(el => mapper(el, i))
47 .then(
48 val => {
49 ret[i] = val;
50 resolvingCount--;
51 next();
52 },
53 err => {
54 isRejected = true;
55 reject(err);
56 }
57 );
58 };
59
60 for (let i = 0; i < concurrency; i++) {
61 next();
62
63 if (iterableDone) {
64 break;
65 }
66 }
67});