From 7f6a0ed548822f88a744bb2e0c014f6cc1bc4ca7 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 11 Aug 2019 15:29:30 +0200 Subject: [PATCH] fs: allow overriding fs for streams Allow overriding open, write, and close when using createReadStream() and createWriteStream(). PR-URL: https://github.com/nodejs/node/pull/29083 Refs: https://github.com/nodejs/node/issues/29050 Reviewed-By: Matteo Collina Reviewed-By: Anna Henningsen Reviewed-By: Rich Trott --- doc/api/fs.md | 26 ++++- lib/internal/fs/streams.js | 128 ++++++++++++++++------- test/parallel/test-fs-read-stream.js | 13 ++- test/parallel/test-fs-write-stream-fs.js | 38 +++++++ 4 files changed, 164 insertions(+), 41 deletions(-) create mode 100644 test/parallel/test-fs-write-stream-fs.js diff --git a/doc/api/fs.md b/doc/api/fs.md index a6372f3b115337..11f03c8a2caac7 100644 --- a/doc/api/fs.md +++ b/doc/api/fs.md @@ -1674,6 +1674,10 @@ changes: - version: v2.3.0 pr-url: https://github.com/nodejs/node/pull/1845 description: The passed `options` object can be a string now. + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/REPLACEME + description: The `fs` options allow overriding the used `fs` + implementation. --> * `path` {string|Buffer|URL} @@ -1688,7 +1692,8 @@ changes: * `start` {integer} * `end` {integer} **Default:** `Infinity` * `highWaterMark` {integer} **Default:** `64 * 1024` -* Returns: {fs.ReadStream} + * `fs` {Object|null} **Default:** `null` +* Returns: {fs.ReadStream} See [Readable Stream][]. Unlike the 16 kb default `highWaterMark` for a readable stream, the stream returned by this method has a default `highWaterMark` of 64 kb. @@ -1715,6 +1720,10 @@ By default, the stream will not emit a `'close'` event after it has been destroyed. This is the opposite of the default for other `Readable` streams. Set the `emitClose` option to `true` to change this behavior. +By providing the `fs` option it is possible to override the corresponding `fs` +implementations for `open`, `read` and `close`. When providing the `fs` option, +you must override `open`, `close` and `read`. + ```js const fs = require('fs'); // Create a stream from some character device. @@ -1768,6 +1777,10 @@ changes: - version: v2.3.0 pr-url: https://github.com/nodejs/node/pull/1845 description: The passed `options` object can be a string now. + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/REPLACEME + description: The `fs` options allow overriding the used `fs` + implementation. --> * `path` {string|Buffer|URL} @@ -1780,7 +1793,8 @@ changes: * `autoClose` {boolean} **Default:** `true` * `emitClose` {boolean} **Default:** `false` * `start` {integer} -* Returns: {fs.WriteStream} + * `fs` {Object|null} **Default:** `null` +* Returns: {fs.WriteStream} See [Writable Stream][]. `options` may also include a `start` option to allow writing data at some position past the beginning of the file, allowed values are in the @@ -1799,6 +1813,12 @@ By default, the stream will not emit a `'close'` event after it has been destroyed. This is the opposite of the default for other `Writable` streams. Set the `emitClose` option to `true` to change this behavior. +By providing the `fs` option it is possible to override the corresponding `fs` +implementations for `open`, `write`, `writev` and `close`. Overriding `write()` +without `writev()` can reduce performance as some optimizations (`_writev()`) +will be disabled. When providing the `fs` option, you must override `open`, +`close` and at least one of `write` and `writev`. + Like [`ReadStream`][], if `fd` is specified, [`WriteStream`][] will ignore the `path` argument and will use the specified file descriptor. This means that no `'open'` event will be emitted. `fd` should be blocking; non-blocking `fd`s @@ -5520,6 +5540,7 @@ the file contents. [`Number.MAX_SAFE_INTEGER`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER [`ReadDirectoryChangesW`]: https://docs.microsoft.com/en-us/windows/desktop/api/winbase/nf-winbase-readdirectorychangesw [`ReadStream`]: #fs_class_fs_readstream +[Readable Stream]: #stream_class_stream_readable [`URL`]: url.html#url_the_whatwg_url_api [`UV_THREADPOOL_SIZE`]: cli.html#cli_uv_threadpool_size_size [`WriteStream`]: #fs_class_fs_writestream @@ -5577,3 +5598,4 @@ the file contents. [chcp]: https://ss64.com/nt/chcp.html [inode]: https://en.wikipedia.org/wiki/Inode [support of file system `flags`]: #fs_file_system_flags +[Writable Stream]: #stream_class_stream_writable diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index b17ebe5cb4317e..456b57d2af8f9a 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -11,6 +11,7 @@ const { } = primordials; const { + ERR_INVALID_ARG_TYPE, ERR_OUT_OF_RANGE, ERR_STREAM_DESTROYED } = require('internal/errors').codes; @@ -28,6 +29,7 @@ const kIoDone = Symbol('kIoDone'); const kIsPerformingIO = Symbol('kIsPerformingIO'); const kMinPoolSpace = 128; +const kFs = Symbol('kFs'); let pool; // It can happen that we expect to read a large chunk of data, and reserve @@ -76,6 +78,23 @@ function ReadStream(path, options) { options.emitClose = false; } + this[kFs] = options.fs || fs; + + if (typeof this[kFs].open !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function', + this[kFs].open); + } + + if (typeof this[kFs].read !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function', + this[kFs].read); + } + + if (typeof this[kFs].close !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function', + this[kFs].close); + } + Readable.call(this, options); // Path will be ignored when fd is specified, so it can be falsy @@ -136,7 +155,7 @@ function _openReadFs(stream) { return; } - fs.open(stream.path, stream.flags, stream.mode, (er, fd) => { + stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => { if (er) { if (stream.autoClose) { stream.destroy(); @@ -186,42 +205,43 @@ ReadStream.prototype._read = function(n) { // the actual read. this[kIsPerformingIO] = true; - fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { - this[kIsPerformingIO] = false; - // Tell ._destroy() that it's safe to close the fd now. - if (this.destroyed) return this.emit(kIoDone, er); - - if (er) { - if (this.autoClose) { - this.destroy(); - } - this.emit('error', er); - } else { - let b = null; - // Now that we know how much data we have actually read, re-wind the - // 'used' field if we can, and otherwise allow the remainder of our - // reservation to be used as a new pool later. - if (start + toRead === thisPool.used && thisPool === pool) { - const newUsed = thisPool.used + bytesRead - toRead; - thisPool.used = roundUpToMultipleOf8(newUsed); + this[kFs].read( + this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { + this[kIsPerformingIO] = false; + // Tell ._destroy() that it's safe to close the fd now. + if (this.destroyed) return this.emit(kIoDone, er); + + if (er) { + if (this.autoClose) { + this.destroy(); + } + this.emit('error', er); } else { - // Round down to the next lowest multiple of 8 to ensure the new pool - // fragment start and end positions are aligned to an 8 byte boundary. - const alignedEnd = (start + toRead) & ~7; - const alignedStart = roundUpToMultipleOf8(start + bytesRead); - if (alignedEnd - alignedStart >= kMinPoolSpace) { - poolFragments.push(thisPool.slice(alignedStart, alignedEnd)); + let b = null; + // Now that we know how much data we have actually read, re-wind the + // 'used' field if we can, and otherwise allow the remainder of our + // reservation to be used as a new pool later. + if (start + toRead === thisPool.used && thisPool === pool) { + const newUsed = thisPool.used + bytesRead - toRead; + thisPool.used = roundUpToMultipleOf8(newUsed); + } else { + // Round down to the next lowest multiple of 8 to ensure the new pool + // fragment start and end positions are aligned to an 8 byte boundary. + const alignedEnd = (start + toRead) & ~7; + const alignedStart = roundUpToMultipleOf8(start + bytesRead); + if (alignedEnd - alignedStart >= kMinPoolSpace) { + poolFragments.push(thisPool.slice(alignedStart, alignedEnd)); + } } - } - if (bytesRead > 0) { - this.bytesRead += bytesRead; - b = thisPool.slice(start, start + bytesRead); - } + if (bytesRead > 0) { + this.bytesRead += bytesRead; + b = thisPool.slice(start, start + bytesRead); + } - this.push(b); - } - }); + this.push(b); + } + }); // Move the pool positions, and internal position for reading. if (this.pos !== undefined) @@ -245,7 +265,7 @@ ReadStream.prototype._destroy = function(err, cb) { }; function closeFsStream(stream, cb, err) { - fs.close(stream.fd, (er) => { + stream[kFs].close(stream.fd, (er) => { er = er || err; cb(er); stream.closed = true; @@ -279,6 +299,40 @@ function WriteStream(path, options) { options.emitClose = false; } + this[kFs] = options.fs || fs; + if (typeof this[kFs].open !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function', + this[kFs].open); + } + + if (!this[kFs].write && !this[kFs].writev) { + throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', + this[kFs].write); + } + + if (this[kFs].write && typeof this[kFs].write !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', + this[kFs].write); + } + + if (this[kFs].writev && typeof this[kFs].writev !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function', + this[kFs].writev); + } + + if (typeof this[kFs].close !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function', + this[kFs].close); + } + + // It's enough to override either, in which case only one will be used. + if (!this[kFs].write) { + this._write = null; + } + if (!this[kFs].writev) { + this._writev = null; + } + Writable.call(this, options); // Path will be ignored when fd is specified, so it can be falsy @@ -335,7 +389,7 @@ function _openWriteFs(stream) { return; } - fs.open(stream.path, stream.flags, stream.mode, (er, fd) => { + stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => { if (er) { if (stream.autoClose) { stream.destroy(); @@ -361,7 +415,7 @@ WriteStream.prototype._write = function(data, encoding, cb) { if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write')); this[kIsPerformingIO] = true; - fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { + this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { this[kIsPerformingIO] = false; // Tell ._destroy() that it's safe to close the fd now. if (this.destroyed) { @@ -405,7 +459,7 @@ WriteStream.prototype._writev = function(data, cb) { } this[kIsPerformingIO] = true; - fs.writev(this.fd, chunks, this.pos, (er, bytes) => { + this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => { this[kIsPerformingIO] = false; // Tell ._destroy() that it's safe to close the fd now. if (this.destroyed) { diff --git a/test/parallel/test-fs-read-stream.js b/test/parallel/test-fs-read-stream.js index e33c6dec4ee264..05c86f724680e6 100644 --- a/test/parallel/test-fs-read-stream.js +++ b/test/parallel/test-fs-read-stream.js @@ -31,11 +31,11 @@ const fixtures = require('../common/fixtures'); const fn = fixtures.path('elipses.txt'); const rangeFile = fixtures.path('x.txt'); -{ +function test1(options) { let paused = false; let bytesRead = 0; - const file = fs.createReadStream(fn); + const file = fs.createReadStream(fn, options); const fileSize = fs.statSync(fn).size; assert.strictEqual(file.bytesRead, 0); @@ -88,6 +88,15 @@ const rangeFile = fixtures.path('x.txt'); }); } +test1({}); +test1({ + fs: { + open: common.mustCall(fs.open), + read: common.mustCallAtLeast(fs.read, 1), + close: common.mustCall(fs.close), + } +}); + { const file = fs.createReadStream(fn, { encoding: 'utf8' }); file.length = 0; diff --git a/test/parallel/test-fs-write-stream-fs.js b/test/parallel/test-fs-write-stream-fs.js new file mode 100644 index 00000000000000..2e15f57c764838 --- /dev/null +++ b/test/parallel/test-fs-write-stream-fs.js @@ -0,0 +1,38 @@ +'use strict'; +const common = require('../common'); +const path = require('path'); +const fs = require('fs'); + +const tmpdir = require('../common/tmpdir'); +tmpdir.refresh(); + +{ + const file = path.join(tmpdir.path, 'write-end-test0.txt'); + const stream = fs.createWriteStream(file, { + fs: { + open: common.mustCall(fs.open), + write: common.mustCallAtLeast(fs.write, 1), + close: common.mustCall(fs.close), + } + }); + stream.end('asd'); + stream.on('close', common.mustCall()); +} + + +{ + const file = path.join(tmpdir.path, 'write-end-test1.txt'); + const stream = fs.createWriteStream(file, { + fs: { + open: common.mustCall(fs.open), + write: fs.write, + writev: common.mustCallAtLeast(fs.writev, 1), + close: common.mustCall(fs.close), + } + }); + stream.write('asd'); + stream.write('asd'); + stream.write('asd'); + stream.end(); + stream.on('close', common.mustCall()); +}