From a041722be6bb73c05097f6d328e5e53662b6fc1a Mon Sep 17 00:00:00 2001 From: Calvin Metcalf Date: Thu, 4 May 2017 15:33:14 +0200 Subject: [PATCH] stream: add final method Adds the ability to for write streams to have an _final method which acts similarly to the _flush method that transform streams have but is called before the finish event is emitted and if asynchronous delays the stream from finishing. The `final` option may also be passed in order to set it. PR-URL: https://github.com/nodejs/node/pull/12828 Reviewed-By: James M Snell Reviewed-By: Matteo Collina Reviewed-By: Refael Ackermann --- doc/api/stream.md | 28 ++++- lib/_stream_writable.js | 35 ++++-- ...stream-readable-constructor-set-methods.js | 14 +-- ...tream-transform-constructor-set-methods.js | 24 ++--- .../test-stream-transform-final-sync.js | 100 +++++++++++++++++ test/parallel/test-stream-transform-final.js | 102 ++++++++++++++++++ test/parallel/test-stream-write-final.js | 24 +++++ test/parallel/test-stream2-writable.js | 22 ++++ 8 files changed, 317 insertions(+), 32 deletions(-) create mode 100644 test/parallel/test-stream-transform-final-sync.js create mode 100644 test/parallel/test-stream-transform-final.js create mode 100644 test/parallel/test-stream-write-final.js diff --git a/doc/api/stream.md b/doc/api/stream.md index c2a257cb5a5af4..d5c255177494d6 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1198,7 +1198,8 @@ on the type of stream being created, as detailed in the chart below:

[Writable](#stream_class_stream_writable)

-

[_write][stream-_write], [_writev][stream-_writev]

+

[_write][stream-_write], [_writev][stream-_writev], + [_final][stream-_final]

@@ -1209,7 +1210,8 @@ on the type of stream being created, as detailed in the chart below:

[Duplex](#stream_class_stream_duplex)

-

[_read][stream-_read], [_write][stream-_write], [_writev][stream-_writev]

+

[_read][stream-_read], [_write][stream-_write], [_writev][stream-_writev], + [_final][stream-_final]

@@ -1220,7 +1222,8 @@ on the type of stream being created, as detailed in the chart below:

[Transform](#stream_class_stream_transform)

-

[_transform][stream-_transform], [_flush][stream-_flush]

+

[_transform][stream-_transform], [_flush][stream-_flush], + [_final][stream-_final]

@@ -1279,6 +1282,8 @@ constructor and implement the `writable._write()` method. The [`stream._writev()`][stream-_writev] method. * `destroy` {Function} Implementation for the [`stream._destroy()`][writable-_destroy] method. + * `final` {Function} Implementation for the + [`stream._final()`][stream-_final] method. For example: @@ -1398,6 +1403,22 @@ added: REPLACEME * `callback` {Function} A callback function that takes an optional error argument which is invoked when the writable is destroyed. +#### writable.\_final(callback) + + +* `callback` {Function} Call this function (optionally with an error + argument) when you are done writing any remaining data. + +Note: `_final()` **must not** be called directly. It MAY be implemented +by child classes, and if so, will be called by the internal Writable +class methods only. + +This optional function will be called before the stream closes, delaying the +`finish` event until `callback` is called. This is useful to close resources +or write buffered data before a stream ends. + #### Errors While Writing It is recommended that errors occurring during the processing of the @@ -2115,6 +2136,7 @@ readable buffer so there is nothing for a user to consume. [stream-_transform]: #stream_transform_transform_chunk_encoding_callback [stream-_write]: #stream_writable_write_chunk_encoding_callback_1 [stream-_writev]: #stream_writable_writev_chunks_callback +[stream-_final]: #stream_writable_final_callback [stream-end]: #stream_writable_end_chunk_encoding_callback [stream-pause]: #stream_readable_pause [stream-push]: #stream_readable_push_chunk_encoding diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 4e2a19f12c822f..8540d180c75ad7 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -58,6 +58,12 @@ function WritableState(options, stream) { // cast to ints. this.highWaterMark = Math.floor(this.highWaterMark); + // if _final has been called + this.finalCalled = false; + + // if _final has been called + this.finalCalled = false; + // drain event flag. this.needDrain = false; // at the start of calling end() @@ -199,6 +205,9 @@ function Writable(options) { if (typeof options.destroy === 'function') this._destroy = options.destroy; + + if (typeof options.final === 'function') + this._final = options.final; } Stream.call(this); @@ -520,23 +529,37 @@ function needFinish(state) { !state.finished && !state.writing); } - -function prefinish(stream, state) { - if (!state.prefinished) { +function callFinal(stream, state) { + stream._final((err) => { + state.pendingcb--; + if (err) { + stream.emit('error', err); + } state.prefinished = true; stream.emit('prefinish'); + finishMaybe(stream, state); + }); +} +function prefinish(stream, state) { + if (!state.prefinished && !state.finalCalled) { + if (typeof stream._final === 'function') { + state.pendingcb++; + state.finalCalled = true; + process.nextTick(callFinal, stream, state); + } else { + state.prefinished = true; + stream.emit('prefinish'); + } } } function finishMaybe(stream, state) { var need = needFinish(state); if (need) { + prefinish(stream, state); if (state.pendingcb === 0) { - prefinish(stream, state); state.finished = true; stream.emit('finish'); - } else { - prefinish(stream, state); } } return need; diff --git a/test/parallel/test-stream-readable-constructor-set-methods.js b/test/parallel/test-stream-readable-constructor-set-methods.js index e5e3114de456db..1b9f0496b991ec 100644 --- a/test/parallel/test-stream-readable-constructor-set-methods.js +++ b/test/parallel/test-stream-readable-constructor-set-methods.js @@ -1,19 +1,11 @@ 'use strict'; -require('../common'); -const assert = require('assert'); +const common = require('../common'); const Readable = require('stream').Readable; -let _readCalled = false; -function _read(n) { - _readCalled = true; +const _read = common.mustCall(function _read(n) { this.push(null); -} +}); const r = new Readable({ read: _read }); r.resume(); - -process.on('exit', function() { - assert.strictEqual(r._read, _read); - assert(_readCalled); -}); diff --git a/test/parallel/test-stream-transform-constructor-set-methods.js b/test/parallel/test-stream-transform-constructor-set-methods.js index 1423f4de10942d..3e1325c0fd1e27 100644 --- a/test/parallel/test-stream-transform-constructor-set-methods.js +++ b/test/parallel/test-stream-transform-constructor-set-methods.js @@ -1,24 +1,25 @@ 'use strict'; -require('../common'); +const common = require('../common'); const assert = require('assert'); const Transform = require('stream').Transform; -let _transformCalled = false; -function _transform(d, e, n) { - _transformCalled = true; +const _transform = common.mustCall(function _transform(d, e, n) { n(); -} +}); -let _flushCalled = false; -function _flush(n) { - _flushCalled = true; +const _final = common.mustCall(function _final(n) { n(); -} +}); + +const _flush = common.mustCall(function _flush(n) { + n(); +}); const t = new Transform({ transform: _transform, - flush: _flush + flush: _flush, + final: _final }); const t2 = new Transform({}); @@ -34,6 +35,5 @@ assert.throws(() => { process.on('exit', () => { assert.strictEqual(t._transform, _transform); assert.strictEqual(t._flush, _flush); - assert.strictEqual(_transformCalled, true); - assert.strictEqual(_flushCalled, true); + assert.strictEqual(t._final, _final); }); diff --git a/test/parallel/test-stream-transform-final-sync.js b/test/parallel/test-stream-transform-final-sync.js new file mode 100644 index 00000000000000..de3f0904885bb9 --- /dev/null +++ b/test/parallel/test-stream-transform-final-sync.js @@ -0,0 +1,100 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const stream = require('stream'); +let state = 0; + +/* +What you do +var stream = new tream.Transform({ + transform: function transformCallback(chunk, _, next) { + // part 1 + this.push(chunk); + //part 2 + next(); + }, + final: function endCallback(done) { + // part 1 + process.nextTick(function () { + // part 2 + done(); + }); + }, + flush: function flushCallback(done) { + // part 1 + process.nextTick(function () { + // part 2 + done(); + }); + } +}); +t.on('data', dataListener); +t.on('end', endListener); +t.on('finish', finishListener); +t.write(1); +t.write(4); +t.end(7, endMethodCallback); + +The order things are called + +1. transformCallback part 1 +2. dataListener +3. transformCallback part 2 +4. transformCallback part 1 +5. dataListener +6. transformCallback part 2 +7. transformCallback part 1 +8. dataListener +9. transformCallback part 2 +10. finalCallback part 1 +11. finalCallback part 2 +12. flushCallback part 1 +13. finishListener +14. endMethodCallback +15. flushCallback part 2 +16. endListener +*/ + +const t = new stream.Transform({ + objectMode: true, + transform: common.mustCall(function(chunk, _, next) { + assert.strictEqual(++state, chunk, 'transformCallback part 1'); + this.push(state); + assert.strictEqual(++state, chunk + 2, 'transformCallback part 2'); + process.nextTick(next); + }, 3), + final: common.mustCall(function(done) { + state++; + assert.strictEqual(state, 10, 'finalCallback part 1'); + state++; + assert.strictEqual(state, 11, 'finalCallback part 2'); + done(); + }, 1), + flush: common.mustCall(function(done) { + state++; + assert.strictEqual(state, 12, 'flushCallback part 1'); + process.nextTick(function() { + state++; + assert.strictEqual(state, 15, 'flushCallback part 2'); + done(); + }); + }, 1) +}); +t.on('finish', common.mustCall(function() { + state++; + assert.strictEqual(state, 13, 'finishListener'); +}, 1)); +t.on('end', common.mustCall(function() { + state++; + assert.strictEqual(state, 16, 'end event'); +}, 1)); +t.on('data', common.mustCall(function(d) { + assert.strictEqual(++state, d + 1, 'dataListener'); +}, 3)); +t.write(1); +t.write(4); +t.end(7, common.mustCall(function() { + state++; + assert.strictEqual(state, 14, 'endMethodCallback'); +}, 1)); diff --git a/test/parallel/test-stream-transform-final.js b/test/parallel/test-stream-transform-final.js new file mode 100644 index 00000000000000..56566152e69165 --- /dev/null +++ b/test/parallel/test-stream-transform-final.js @@ -0,0 +1,102 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const stream = require('stream'); +let state = 0; + +/* +What you do +var stream = new tream.Transform({ + transform: function transformCallback(chunk, _, next) { + // part 1 + this.push(chunk); + //part 2 + next(); + }, + final: function endCallback(done) { + // part 1 + process.nextTick(function () { + // part 2 + done(); + }); + }, + flush: function flushCallback(done) { + // part 1 + process.nextTick(function () { + // part 2 + done(); + }); + } +}); +t.on('data', dataListener); +t.on('end', endListener); +t.on('finish', finishListener); +t.write(1); +t.write(4); +t.end(7, endMethodCallback); + +The order things are called + +1. transformCallback part 1 +2. dataListener +3. transformCallback part 2 +4. transformCallback part 1 +5. dataListener +6. transformCallback part 2 +7. transformCallback part 1 +8. dataListener +9. transformCallback part 2 +10. finalCallback part 1 +11. finalCallback part 2 +12. flushCallback part 1 +13. finishListener +14. endMethodCallback +15. flushCallback part 2 +16. endListener +*/ + +const t = new stream.Transform({ + objectMode: true, + transform: common.mustCall(function(chunk, _, next) { + assert.strictEqual(++state, chunk, 'transformCallback part 1'); + this.push(state); + assert.strictEqual(++state, chunk + 2, 'transformCallback part 2'); + process.nextTick(next); + }, 3), + final: common.mustCall(function(done) { + state++; + assert.strictEqual(state, 10, 'finalCallback part 1'); + setTimeout(function() { + state++; + assert.strictEqual(state, 11, 'finalCallback part 2'); + done(); + }, 100); + }, 1), + flush: common.mustCall(function(done) { + state++; + assert.strictEqual(state, 12, 'flushCallback part 1'); + process.nextTick(function() { + state++; + assert.strictEqual(state, 15, 'flushCallback part 2'); + done(); + }); + }, 1) +}); +t.on('finish', common.mustCall(function() { + state++; + assert.strictEqual(state, 13, 'finishListener'); +}, 1)); +t.on('end', common.mustCall(function() { + state++; + assert.strictEqual(state, 16, 'end event'); +}, 1)); +t.on('data', common.mustCall(function(d) { + assert.strictEqual(++state, d + 1, 'dataListener'); +}, 3)); +t.write(1); +t.write(4); +t.end(7, common.mustCall(function() { + state++; + assert.strictEqual(state, 14, 'endMethodCallback'); +}, 1)); diff --git a/test/parallel/test-stream-write-final.js b/test/parallel/test-stream-write-final.js new file mode 100644 index 00000000000000..56537bd7fae94d --- /dev/null +++ b/test/parallel/test-stream-write-final.js @@ -0,0 +1,24 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const stream = require('stream'); +let shutdown = false; + +const w = new stream.Writable({ + final: common.mustCall(function(cb) { + assert.strictEqual(this, w); + setTimeout(function() { + shutdown = true; + cb(); + }, 100); + }), + write: function(chunk, e, cb) { + process.nextTick(cb); + } +}); +w.on('finish', common.mustCall(function() { + assert(shutdown); +})); +w.write(Buffer.allocUnsafe(1)); +w.end(Buffer.allocUnsafe(0)); diff --git a/test/parallel/test-stream2-writable.js b/test/parallel/test-stream2-writable.js index c046194d6bf63b..8e1ea9a6034cea 100644 --- a/test/parallel/test-stream2-writable.js +++ b/test/parallel/test-stream2-writable.js @@ -408,3 +408,25 @@ test('finish is emitted if last chunk is empty', function(t) { w.write(Buffer.allocUnsafe(1)); w.end(Buffer.alloc(0)); }); + +test('finish is emitted after shutdown', function(t) { + const w = new W(); + let shutdown = false; + + w._final = common.mustCall(function(cb) { + assert.strictEqual(this, w); + setTimeout(function() { + shutdown = true; + cb(); + }, 100); + }); + w._write = function(chunk, e, cb) { + process.nextTick(cb); + }; + w.on('finish', common.mustCall(function() { + assert(shutdown); + t.end(); + })); + w.write(Buffer.allocUnsafe(1)); + w.end(Buffer.allocUnsafe(0)); +});