blob: 65d32a3ecb7d22329f5465cb2ef127f70e54be43 [file] [log] [blame]
Leo Repp58b9f112021-11-22 11:57:47 +01001var fs = require('fs');
2var util = require('util');
3var stream = require('stream');
4var Readable = stream.Readable;
5var Writable = stream.Writable;
6var PassThrough = stream.PassThrough;
7var Pend = require('pend');
8var EventEmitter = require('events').EventEmitter;
9
10exports.createFromBuffer = createFromBuffer;
11exports.createFromFd = createFromFd;
12exports.BufferSlicer = BufferSlicer;
13exports.FdSlicer = FdSlicer;
14
15util.inherits(FdSlicer, EventEmitter);
16function FdSlicer(fd, options) {
17 options = options || {};
18 EventEmitter.call(this);
19
20 this.fd = fd;
21 this.pend = new Pend();
22 this.pend.max = 1;
23 this.refCount = 0;
24 this.autoClose = !!options.autoClose;
25}
26
27FdSlicer.prototype.read = function(buffer, offset, length, position, callback) {
28 var self = this;
29 self.pend.go(function(cb) {
30 fs.read(self.fd, buffer, offset, length, position, function(err, bytesRead, buffer) {
31 cb();
32 callback(err, bytesRead, buffer);
33 });
34 });
35};
36
37FdSlicer.prototype.write = function(buffer, offset, length, position, callback) {
38 var self = this;
39 self.pend.go(function(cb) {
40 fs.write(self.fd, buffer, offset, length, position, function(err, written, buffer) {
41 cb();
42 callback(err, written, buffer);
43 });
44 });
45};
46
47FdSlicer.prototype.createReadStream = function(options) {
48 return new ReadStream(this, options);
49};
50
51FdSlicer.prototype.createWriteStream = function(options) {
52 return new WriteStream(this, options);
53};
54
55FdSlicer.prototype.ref = function() {
56 this.refCount += 1;
57};
58
59FdSlicer.prototype.unref = function() {
60 var self = this;
61 self.refCount -= 1;
62
63 if (self.refCount > 0) return;
64 if (self.refCount < 0) throw new Error("invalid unref");
65
66 if (self.autoClose) {
67 fs.close(self.fd, onCloseDone);
68 }
69
70 function onCloseDone(err) {
71 if (err) {
72 self.emit('error', err);
73 } else {
74 self.emit('close');
75 }
76 }
77};
78
79util.inherits(ReadStream, Readable);
80function ReadStream(context, options) {
81 options = options || {};
82 Readable.call(this, options);
83
84 this.context = context;
85 this.context.ref();
86
87 this.start = options.start || 0;
88 this.endOffset = options.end;
89 this.pos = this.start;
90 this.destroyed = false;
91}
92
93ReadStream.prototype._read = function(n) {
94 var self = this;
95 if (self.destroyed) return;
96
97 var toRead = Math.min(self._readableState.highWaterMark, n);
98 if (self.endOffset != null) {
99 toRead = Math.min(toRead, self.endOffset - self.pos);
100 }
101 if (toRead <= 0) {
102 self.destroyed = true;
103 self.push(null);
104 self.context.unref();
105 return;
106 }
107 self.context.pend.go(function(cb) {
108 if (self.destroyed) return cb();
109 var buffer = new Buffer(toRead);
110 fs.read(self.context.fd, buffer, 0, toRead, self.pos, function(err, bytesRead) {
111 if (err) {
112 self.destroy(err);
113 } else if (bytesRead === 0) {
114 self.destroyed = true;
115 self.push(null);
116 self.context.unref();
117 } else {
118 self.pos += bytesRead;
119 self.push(buffer.slice(0, bytesRead));
120 }
121 cb();
122 });
123 });
124};
125
126ReadStream.prototype.destroy = function(err) {
127 if (this.destroyed) return;
128 err = err || new Error("stream destroyed");
129 this.destroyed = true;
130 this.emit('error', err);
131 this.context.unref();
132};
133
134util.inherits(WriteStream, Writable);
135function WriteStream(context, options) {
136 options = options || {};
137 Writable.call(this, options);
138
139 this.context = context;
140 this.context.ref();
141
142 this.start = options.start || 0;
143 this.endOffset = (options.end == null) ? Infinity : +options.end;
144 this.bytesWritten = 0;
145 this.pos = this.start;
146 this.destroyed = false;
147
148 this.on('finish', this.destroy.bind(this));
149}
150
151WriteStream.prototype._write = function(buffer, encoding, callback) {
152 var self = this;
153 if (self.destroyed) return;
154
155 if (self.pos + buffer.length > self.endOffset) {
156 var err = new Error("maximum file length exceeded");
157 err.code = 'ETOOBIG';
158 self.destroy();
159 callback(err);
160 return;
161 }
162 self.context.pend.go(function(cb) {
163 if (self.destroyed) return cb();
164 fs.write(self.context.fd, buffer, 0, buffer.length, self.pos, function(err, bytes) {
165 if (err) {
166 self.destroy();
167 cb();
168 callback(err);
169 } else {
170 self.bytesWritten += bytes;
171 self.pos += bytes;
172 self.emit('progress');
173 cb();
174 callback();
175 }
176 });
177 });
178};
179
180WriteStream.prototype.destroy = function() {
181 if (this.destroyed) return;
182 this.destroyed = true;
183 this.context.unref();
184};
185
186util.inherits(BufferSlicer, EventEmitter);
187function BufferSlicer(buffer, options) {
188 EventEmitter.call(this);
189
190 options = options || {};
191 this.refCount = 0;
192 this.buffer = buffer;
193 this.maxChunkSize = options.maxChunkSize || Number.MAX_SAFE_INTEGER;
194}
195
196BufferSlicer.prototype.read = function(buffer, offset, length, position, callback) {
197 var end = position + length;
198 var delta = end - this.buffer.length;
199 var written = (delta > 0) ? delta : length;
200 this.buffer.copy(buffer, offset, position, end);
201 setImmediate(function() {
202 callback(null, written);
203 });
204};
205
206BufferSlicer.prototype.write = function(buffer, offset, length, position, callback) {
207 buffer.copy(this.buffer, position, offset, offset + length);
208 setImmediate(function() {
209 callback(null, length, buffer);
210 });
211};
212
213BufferSlicer.prototype.createReadStream = function(options) {
214 options = options || {};
215 var readStream = new PassThrough(options);
216 readStream.destroyed = false;
217 readStream.start = options.start || 0;
218 readStream.endOffset = options.end;
219 // by the time this function returns, we'll be done.
220 readStream.pos = readStream.endOffset || this.buffer.length;
221
222 // respect the maxChunkSize option to slice up the chunk into smaller pieces.
223 var entireSlice = this.buffer.slice(readStream.start, readStream.pos);
224 var offset = 0;
225 while (true) {
226 var nextOffset = offset + this.maxChunkSize;
227 if (nextOffset >= entireSlice.length) {
228 // last chunk
229 if (offset < entireSlice.length) {
230 readStream.write(entireSlice.slice(offset, entireSlice.length));
231 }
232 break;
233 }
234 readStream.write(entireSlice.slice(offset, nextOffset));
235 offset = nextOffset;
236 }
237
238 readStream.end();
239 readStream.destroy = function() {
240 readStream.destroyed = true;
241 };
242 return readStream;
243};
244
245BufferSlicer.prototype.createWriteStream = function(options) {
246 var bufferSlicer = this;
247 options = options || {};
248 var writeStream = new Writable(options);
249 writeStream.start = options.start || 0;
250 writeStream.endOffset = (options.end == null) ? this.buffer.length : +options.end;
251 writeStream.bytesWritten = 0;
252 writeStream.pos = writeStream.start;
253 writeStream.destroyed = false;
254 writeStream._write = function(buffer, encoding, callback) {
255 if (writeStream.destroyed) return;
256
257 var end = writeStream.pos + buffer.length;
258 if (end > writeStream.endOffset) {
259 var err = new Error("maximum file length exceeded");
260 err.code = 'ETOOBIG';
261 writeStream.destroyed = true;
262 callback(err);
263 return;
264 }
265 buffer.copy(bufferSlicer.buffer, writeStream.pos, 0, buffer.length);
266
267 writeStream.bytesWritten += buffer.length;
268 writeStream.pos = end;
269 writeStream.emit('progress');
270 callback();
271 };
272 writeStream.destroy = function() {
273 writeStream.destroyed = true;
274 };
275 return writeStream;
276};
277
278BufferSlicer.prototype.ref = function() {
279 this.refCount += 1;
280};
281
282BufferSlicer.prototype.unref = function() {
283 this.refCount -= 1;
284
285 if (this.refCount < 0) {
286 throw new Error("invalid unref");
287 }
288};
289
290function createFromBuffer(buffer, options) {
291 return new BufferSlicer(buffer, options);
292}
293
294function createFromFd(fd, options) {
295 return new FdSlicer(fd, options);
296}