blob: 397a331b46c55b2752d4728a3e972650ded22c75 [file] [log] [blame]
Leo Repp58b9f112021-11-22 11:57:47 +01001'use strict'
2
3var reusify = require('reusify')
4
5function fastqueue (context, worker, concurrency) {
6 if (typeof context === 'function') {
7 concurrency = worker
8 worker = context
9 context = null
10 }
11
12 var cache = reusify(Task)
13 var queueHead = null
14 var queueTail = null
15 var _running = 0
16 var errorHandler = null
17
18 var self = {
19 push: push,
20 drain: noop,
21 saturated: noop,
22 pause: pause,
23 paused: false,
24 concurrency: concurrency,
25 running: running,
26 resume: resume,
27 idle: idle,
28 length: length,
29 getQueue: getQueue,
30 unshift: unshift,
31 empty: noop,
32 kill: kill,
33 killAndDrain: killAndDrain,
34 error: error
35 }
36
37 return self
38
39 function running () {
40 return _running
41 }
42
43 function pause () {
44 self.paused = true
45 }
46
47 function length () {
48 var current = queueHead
49 var counter = 0
50
51 while (current) {
52 current = current.next
53 counter++
54 }
55
56 return counter
57 }
58
59 function getQueue () {
60 var current = queueHead
61 var tasks = []
62
63 while (current) {
64 tasks.push(current.value)
65 current = current.next
66 }
67
68 return tasks
69 }
70
71 function resume () {
72 if (!self.paused) return
73 self.paused = false
74 for (var i = 0; i < self.concurrency; i++) {
75 _running++
76 release()
77 }
78 }
79
80 function idle () {
81 return _running === 0 && self.length() === 0
82 }
83
84 function push (value, done) {
85 var current = cache.get()
86
87 current.context = context
88 current.release = release
89 current.value = value
90 current.callback = done || noop
91 current.errorHandler = errorHandler
92
93 if (_running === self.concurrency || self.paused) {
94 if (queueTail) {
95 queueTail.next = current
96 queueTail = current
97 } else {
98 queueHead = current
99 queueTail = current
100 self.saturated()
101 }
102 } else {
103 _running++
104 worker.call(context, current.value, current.worked)
105 }
106 }
107
108 function unshift (value, done) {
109 var current = cache.get()
110
111 current.context = context
112 current.release = release
113 current.value = value
114 current.callback = done || noop
115
116 if (_running === self.concurrency || self.paused) {
117 if (queueHead) {
118 current.next = queueHead
119 queueHead = current
120 } else {
121 queueHead = current
122 queueTail = current
123 self.saturated()
124 }
125 } else {
126 _running++
127 worker.call(context, current.value, current.worked)
128 }
129 }
130
131 function release (holder) {
132 if (holder) {
133 cache.release(holder)
134 }
135 var next = queueHead
136 if (next) {
137 if (!self.paused) {
138 if (queueTail === queueHead) {
139 queueTail = null
140 }
141 queueHead = next.next
142 next.next = null
143 worker.call(context, next.value, next.worked)
144 if (queueTail === null) {
145 self.empty()
146 }
147 } else {
148 _running--
149 }
150 } else if (--_running === 0) {
151 self.drain()
152 }
153 }
154
155 function kill () {
156 queueHead = null
157 queueTail = null
158 self.drain = noop
159 }
160
161 function killAndDrain () {
162 queueHead = null
163 queueTail = null
164 self.drain()
165 self.drain = noop
166 }
167
168 function error (handler) {
169 errorHandler = handler
170 }
171}
172
173function noop () {}
174
175function Task () {
176 this.value = null
177 this.callback = noop
178 this.next = null
179 this.release = noop
180 this.context = null
181 this.errorHandler = null
182
183 var self = this
184
185 this.worked = function worked (err, result) {
186 var callback = self.callback
187 var errorHandler = self.errorHandler
188 var val = self.value
189 self.value = null
190 self.callback = noop
191 if (self.errorHandler) {
192 errorHandler(err, val)
193 }
194 callback.call(self.context, err, result)
195 self.release(self)
196 }
197}
198
199module.exports = fastqueue