Skip to content

Commit

Permalink
[FIX] Prevent FS write from draining Resources content
Browse files Browse the repository at this point in the history
Previously, when writing to an FS-Adapter, a Resources content stream
would be drained, making it impossible to access a Resources content
again.

With this change, by default a Resources content will be buffered while
draining its content stream.

An optional 'drain' parameter can be supplied to skip buffering to lower
the memory demand of the operation in cases where further access to the
resource content is not required.

Alternatively, an optional 'readOnly' parameter can be supplied to skip
buffering and instead create a new readable stream of the written file.
The file will be written with '444' (read only) permissions. This can
also help reducing memory usage.

Add a pessimistic check for drained content streams.

Refactor FS- and memory adapter unit tests.
  • Loading branch information
RandomByte committed Jan 31, 2019
1 parent c8cc5cd commit 370f121
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 37 deletions.
22 changes: 17 additions & 5 deletions lib/AbstractReaderWriter.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,34 @@ class AbstractReaderWriter extends AbstractReader {
* Writes the content of a resource to a path.
*
* @public
* @param {module:@ui5/fs.Resource} resource The Resource to write
* @param {module:@ui5/fs.Resource} resource Resource to write
* @param {Object} [options]
* @param {boolean} [options.readOnly=false] Whether the resource content shall be written read-only
* Do not use in conjunction with the <code>drain</code> option.
* The written file will be used as the new source of this resources content.
* Therefore the written file should not be altered by any means.
* Activating this option might improve overall memory consumption.
* @param {boolean} [options.drain=false] Whether the resource content shall be emptied during the write process.
* Do not use in conjunction with the <code>readOnly</code> option.
* Activating this option might improve overall memory consumption.
* This should be used in cases where this is the last access to the resource.
* E.g. the final write of a resource after all processing is finished.
* @returns {Promise<undefined>} Promise resolving once data has been written
*/
write(resource) {
return this._write(resource);
write(resource, options = {drain: false, readOnly: false}) {
return this._write(resource, options);
}

/**
* Writes the content of a resource to a path.
*
* @abstract
* @protected
* @param {module:@ui5/fs.Resource} resource The Resource to write
* @param {module:@ui5/fs.Resource} resource Resource to write
* @param {Object} [options] Write options, see above
* @returns {Promise<undefined>} Promise resolving once data has been written
*/
_write(resource) {
_write(resource, options) {
throw new Error("Not implemented");
}
}
Expand Down
93 changes: 71 additions & 22 deletions lib/Resource.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ const fnFalse = () => false;
* @memberof module:@ui5/fs
*/
class Resource {
/**
* Function for dynamic creation of content streams
*
* @callback module:@ui5/fs.Resource~createStream
* @returns {stream.Readable} A readable stream of a resources content
*/

/**
* The constructor.
*
Expand All @@ -25,8 +32,9 @@ class Resource {
* (cannot be used in conjunction with parameters buffer, stream or createStream)
* @param {Stream} [parameters.stream] Readable stream of the content of this resource
* (cannot be used in conjunction with parameters buffer, string or createStream)
* @param {Function} [parameters.createStream] Function callback that returns a readable stream of the content
* of this resource (cannot be used in conjunction with parameters buffer, string or stream)
* @param {module:@ui5/fs.Resource~createStream} [parameters.createStream] Function callback that returns a readable stream of
* the content of this resource (cannot be used in conjunction with parameters buffer, string or stream).
* In some cases this is the most memory-efficient way to supply resource content
*/
constructor({path, statInfo, buffer, string, createStream, stream, project}) {
if (!path) {
Expand Down Expand Up @@ -74,16 +82,21 @@ class Resource {
* Gets a buffer with the resource content.
*
* @public
* @returns {Promise<Buffer>} A Promise resolving with a buffer of the resource content.
* @returns {Promise<Buffer>} Promise resolving with a buffer of the resource content.
*/
getBuffer() {
return new Promise((resolve, reject) => {
return Promise.resolve().then(() => {
if (this._contentDrained) {
throw new Error(`Content of Resource ${this._path} has been drained. ` +
"This might be caused by requesting resource content after a content stream has been " +
"requested and no new content (e.g. a new stream) has been set.");
}
if (this._buffer) {
resolve(this._buffer);
return this._buffer;
} else if (this._createStream || this._stream) {
resolve(this._getBufferFromStream());
return this._getBufferFromStream();
} else {
reject(new Error(`Resource ${this._path} has no content`));
throw new Error(`Resource ${this._path} has no content`);
}
});
}
Expand All @@ -92,7 +105,7 @@ class Resource {
* Sets a Buffer as content.
*
* @public
* @param {Buffer} buffer A buffer instance
* @param {Buffer} buffer Buffer instance
*/
setBuffer(buffer) {
this._createStream = null;
Expand All @@ -101,23 +114,30 @@ class Resource {
// }
this._stream = null;
this._buffer = buffer;
this._contentDrained = false;
this._streamDrained = false;
}

/**
* Gets a string with the resource content.
*
* @public
* @returns {Promise<string>} A Promise resolving with a string of the resource content.
* @returns {Promise<string>} Promise resolving with the resource content.
*/
getString() {
if (this._contentDrained) {
return Promise.reject(new Error(`Content of Resource ${this._path} has been drained. ` +
"This might be caused by requesting resource content after a content stream has been " +
"requested and no new content (e.g. a new stream) has been set."));
}
return this.getBuffer().then((buffer) => buffer.toString());
}

/**
* Sets a String as content
*
* @public
* @param {string} string A string
* @param {string} string Resource content
*/
setString(string) {
this.setBuffer(Buffer.from(string, "utf8"));
Expand All @@ -127,33 +147,58 @@ class Resource {
* Gets a readable stream for the resource content.
*
* @public
* @returns {stream.Readable} A readable stream for the resource content.
* @returns {stream.Readable} Readable stream for the resource content.
*/
getStream() {
if (this._contentDrained) {
throw new Error(`Content of Resource ${this._path} has been drained. ` +
"This might be caused by requesting resource content after a content stream has been " +
"requested and no new content (e.g. a new stream) has been set.");
}
let contentStream;
if (this._buffer) {
const bufferStream = new stream.PassThrough();
bufferStream.end(this._buffer);
return bufferStream;
contentStream = bufferStream;
} else if (this._createStream || this._stream) {
return this._getStream();
} else {
contentStream = this._getStream();
}
if (!contentStream) {
throw new Error(`Resource ${this._path} has no content`);
}
// If a stream instance is being returned, it will typically get drained be the consumer.
// In that case, further content access will result in an "Content stream has been drained" error.
// However, depending on the execution environment, a resources content stream might have been
// transformed into a buffer. In that case further content access is possible as a buffer can't be
// drained.
// To prevent unexpected "Content stream has been drained" errors caused by changing environments, we flag
// the resource content as "drained" every time a stream is requested. Even if actually a buffer or
// createStream callback is being used.
this._contentDrained = true;
return contentStream;
}

/**
* Sets a readable stream as content.
*
* @public
* @param {stream.Readable} stream readable stream
* @param {stream.Readable|module:@ui5/fs.Resource~createStream} stream Readable stream of the resource content or
callback for dynamic creation of a readable stream
*/
setStream(stream) {
this._buffer = null;
this._createStream = null;
// if (this._stream) { // TODO this may cause strange issues
// this._stream.destroy();
// }
this._stream = stream;
if (typeof stream === "function") {
this._createStream = stream;
this._stream = null;
} else {
this._stream = stream;
this._createStream = null;
}
this._contentDrained = false;
this._streamDrained = false;
}

/**
Expand Down Expand Up @@ -181,7 +226,7 @@ class Resource {
* Gets the resources stat info.
*
* @public
* @returns {fs.Stats} An object representing an fs.Stats instance
* @returns {fs.Stats} Object representing an fs.Stats instance
*/
getStatInfo() {
return this._statInfo;
Expand All @@ -201,10 +246,10 @@ class Resource {
}

/**
* Returns a clone of the resource.
* Returns a clone of the resource. The clones content is independent from that of the original resource
*
* @public
* @returns {Promise<module:@ui5/fs.Resource>} A promise resolving the resource.
* @returns {Promise<module:@ui5/fs.Resource>} Promise resolving with the clone
*/
clone() {
const options = {
Expand Down Expand Up @@ -235,7 +280,7 @@ class Resource {
/**
* Tracing: Get tree for printing out trace
*
* @returns {Object}
* @returns {Object} Trace tree
*/
getPathTree() {
const tree = {};
Expand All @@ -253,12 +298,16 @@ class Resource {
* Returns the content as stream.
*
* @private
* @returns {Function} The stream
* @returns {stream.Readable} Readable stream
*/
_getStream() {
if (this._streamDrained) {
throw new Error(`Content stream of Resource ${this._path} is flagged as drained.`);
}
if (this._createStream) {
return this._createStream();
}
this._streamDrained = true;
return this._stream;
}

Expand Down
70 changes: 60 additions & 10 deletions lib/adapters/FileSystem.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const path = require("path");
const fs = require("graceful-fs");
const glob = require("globby");
const makeDir = require("make-dir");
const {PassThrough} = require("stream");
const Resource = require("../Resource");
const AbstractAdapter = require("./AbstractAdapter");

Expand Down Expand Up @@ -153,7 +154,7 @@ class FileSystem extends AbstractAdapter {

if (!stat.isDirectory()) {
// Add content
options.createStream = () => {
options.createStream = function() {
return fs.createReadStream(fsPath);
};
}
Expand All @@ -168,10 +169,26 @@ class FileSystem extends AbstractAdapter {
* Writes the content of a resource to a path.
*
* @private
* @param {module:@ui5/fs.Resource} resource The Resource
* @param {module:@ui5/fs.Resource} resource Resource to write
* @param {Object} [options]
* @param {boolean} [options.readOnly] Whether the resource content shall be written read-only
* Do not use in conjunction with the <code>drain</code> option.
* The written file will be used as the new source of this resources content.
* Therefore the written file should not be altered by any means.
* Activating this option might improve overall memory consumption.
* @param {boolean} [options.drain] Whether the resource content shall be emptied during the write process.
* Do not use in conjunction with the <code>readOnly</code> option.
* Activating this option might improve overall memory consumption.
* This should be used in cases where this is the last access to the resource.
* E.g. the final write of a resource after all processing is finished.
* @returns {Promise<undefined>} Promise resolving once data has been written
*/
_write(resource) {
async _write(resource, {drain, readOnly}) {
if (drain && readOnly) {
throw new Error(`Error while writing resource ${resource.getPath()}: ` +
"Do not use options 'drain' and 'readOnly' at the same time.");
}

const relPath = resource.getPath().substr(this._virBasePath.length);
const fsPath = path.join(this._fsBasePath, relPath);
const dirPath = path.dirname(fsPath);
Expand All @@ -182,15 +199,48 @@ class FileSystem extends AbstractAdapter {
fs
}).then(() => {
return new Promise((resolve, reject) => {
const contentStream = resource.getStream();
contentStream.on("error", function(err) {
reject(err);
});
const write = fs.createWriteStream(fsPath);
write.on("error", function(err) {
let contentStream;

if (drain || readOnly) {
// Stream will be drained
contentStream = resource.getStream();

contentStream.on("error", (err) => {
reject(err);
});
} else {
// Transform stream into buffer before writing
contentStream = new PassThrough();
const buffers = [];
contentStream.on("error", (err) => {
reject(err);
});
contentStream.on("data", (data) => {
buffers.push(data);
});
contentStream.on("end", () => {
const buffer = Buffer.concat(buffers);
resource.setBuffer(buffer);
});
resource.getStream().pipe(contentStream);
}

const writeOptions = {};
if (readOnly) {
writeOptions.mode = 0o444; // read only
}

const write = fs.createWriteStream(fsPath, writeOptions);
write.on("error", (err) => {
reject(err);
});
write.on("close", function(ex) {
write.on("close", (ex) => {
if (readOnly) {
// Create new stream from written file
resource.setStream(function() {
return fs.createReadStream(fsPath);
});
}
resolve();
});
contentStream.pipe(write);
Expand Down
Loading

0 comments on commit 370f121

Please sign in to comment.