Skip to content

Commit

Permalink
stream: add final method
Browse files Browse the repository at this point in the history
Adds the ability to for write streams to have an _final method which acts
similarly to the _flush method that transform streams have but is called before
the finish event is emitted and if asynchronous delays the stream from
finishing.  The `final` option may also be passed in order to set it.

PR-URL: #12828
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Refael Ackermann <refack@gmail.com>
  • Loading branch information
calvinmetcalf authored and jasnell committed May 28, 2017
1 parent 87cef63 commit 07c7f19
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 32 deletions.
28 changes: 25 additions & 3 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,8 @@ on the type of stream being created, as detailed in the chart below:
<p>[Writable](#stream_class_stream_writable)</p>
</td>
<td>
<p><code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code></p>
<p><code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>,
<code>[_final][stream-_final]</code></p>
</td>
</tr>
<tr>
Expand All @@ -1209,7 +1210,8 @@ on the type of stream being created, as detailed in the chart below:
<p>[Duplex](#stream_class_stream_duplex)</p>
</td>
<td>
<p><code>[_read][stream-_read]</code>, <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code></p>
<p><code>[_read][stream-_read]</code>, <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>,
<code>[_final][stream-_final]</code></p>
</td>
</tr>
<tr>
Expand All @@ -1220,7 +1222,8 @@ on the type of stream being created, as detailed in the chart below:
<p>[Transform](#stream_class_stream_transform)</p>
</td>
<td>
<p><code>[_transform][stream-_transform]</code>, <code>[_flush][stream-_flush]</code></p>
<p><code>[_transform][stream-_transform]</code>, <code>[_flush][stream-_flush]</code>,
<code>[_final][stream-_final]</code></p>
</td>
</tr>
</table>
Expand Down Expand Up @@ -1279,6 +1282,8 @@ constructor and implement the `writable._write()` method. The
[`stream._writev()`][stream-_writev] method.
* `destroy` {Function} Implementation for the
[`stream._destroy()`][writable-_destroy] method.
* `final` {Function} Implementation for the
[`stream._final()`][stream-_final] method.

For example:

Expand Down Expand Up @@ -1398,6 +1403,22 @@ added: REPLACEME
* `callback` {Function} A callback function that takes an optional error argument
which is invoked when the writable is destroyed.

#### writable.\_final(callback)
<!-- YAML
added: REPLACEME
-->

* `callback` {Function} Call this function (optionally with an error
argument) when you are done writing any remaining data.

Note: `_final()` **must not** be called directly. It MAY be implemented
by child classes, and if so, will be called by the internal Writable
class methods only.

This optional function will be called before the stream closes, delaying the
`finish` event until `callback` is called. This is useful to close resources
or write buffered data before a stream ends.

#### Errors While Writing

It is recommended that errors occurring during the processing of the
Expand Down Expand Up @@ -2115,6 +2136,7 @@ readable buffer so there is nothing for a user to consume.
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
[stream-_write]: #stream_writable_write_chunk_encoding_callback_1
[stream-_writev]: #stream_writable_writev_chunks_callback
[stream-_final]: #stream_writable_final_callback
[stream-end]: #stream_writable_end_chunk_encoding_callback
[stream-pause]: #stream_readable_pause
[stream-push]: #stream_readable_push_chunk_encoding
Expand Down
35 changes: 29 additions & 6 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ function WritableState(options, stream) {
// cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark);

// if _final has been called
this.finalCalled = false;

// if _final has been called
this.finalCalled = false;

// drain event flag.
this.needDrain = false;
// at the start of calling end()
Expand Down Expand Up @@ -199,6 +205,9 @@ function Writable(options) {

if (typeof options.destroy === 'function')
this._destroy = options.destroy;

if (typeof options.final === 'function')
this._final = options.final;
}

Stream.call(this);
Expand Down Expand Up @@ -520,23 +529,37 @@ function needFinish(state) {
!state.finished &&
!state.writing);
}

function prefinish(stream, state) {
if (!state.prefinished) {
function callFinal(stream, state) {
stream._final((err) => {
state.pendingcb--;
if (err) {
stream.emit('error', err);
}
state.prefinished = true;
stream.emit('prefinish');
finishMaybe(stream, state);
});
}
function prefinish(stream, state) {
if (!state.prefinished && !state.finalCalled) {
if (typeof stream._final === 'function') {
state.pendingcb++;
state.finalCalled = true;
process.nextTick(callFinal, stream, state);
} else {
state.prefinished = true;
stream.emit('prefinish');
}
}
}

function finishMaybe(stream, state) {
var need = needFinish(state);
if (need) {
prefinish(stream, state);
if (state.pendingcb === 0) {
prefinish(stream, state);
state.finished = true;
stream.emit('finish');
} else {
prefinish(stream, state);
}
}
return need;
Expand Down
14 changes: 3 additions & 11 deletions test/parallel/test-stream-readable-constructor-set-methods.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
'use strict';
require('../common');
const assert = require('assert');
const common = require('../common');

const Readable = require('stream').Readable;

let _readCalled = false;
function _read(n) {
_readCalled = true;
const _read = common.mustCall(function _read(n) {
this.push(null);
}
});

const r = new Readable({ read: _read });
r.resume();

process.on('exit', function() {
assert.strictEqual(r._read, _read);
assert(_readCalled);
});
24 changes: 12 additions & 12 deletions test/parallel/test-stream-transform-constructor-set-methods.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
'use strict';
require('../common');
const common = require('../common');
const assert = require('assert');

const Transform = require('stream').Transform;

let _transformCalled = false;
function _transform(d, e, n) {
_transformCalled = true;
const _transform = common.mustCall(function _transform(d, e, n) {
n();
}
});

let _flushCalled = false;
function _flush(n) {
_flushCalled = true;
const _final = common.mustCall(function _final(n) {
n();
}
});

const _flush = common.mustCall(function _flush(n) {
n();
});

const t = new Transform({
transform: _transform,
flush: _flush
flush: _flush,
final: _final
});

const t2 = new Transform({});
Expand All @@ -34,6 +35,5 @@ assert.throws(() => {
process.on('exit', () => {
assert.strictEqual(t._transform, _transform);
assert.strictEqual(t._flush, _flush);
assert.strictEqual(_transformCalled, true);
assert.strictEqual(_flushCalled, true);
assert.strictEqual(t._final, _final);
});
100 changes: 100 additions & 0 deletions test/parallel/test-stream-transform-final-sync.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
'use strict';
const common = require('../common');
const assert = require('assert');

const stream = require('stream');
let state = 0;

/*
What you do
var stream = new tream.Transform({
transform: function transformCallback(chunk, _, next) {
// part 1
this.push(chunk);
//part 2
next();
},
final: function endCallback(done) {
// part 1
process.nextTick(function () {
// part 2
done();
});
},
flush: function flushCallback(done) {
// part 1
process.nextTick(function () {
// part 2
done();
});
}
});
t.on('data', dataListener);
t.on('end', endListener);
t.on('finish', finishListener);
t.write(1);
t.write(4);
t.end(7, endMethodCallback);
The order things are called
1. transformCallback part 1
2. dataListener
3. transformCallback part 2
4. transformCallback part 1
5. dataListener
6. transformCallback part 2
7. transformCallback part 1
8. dataListener
9. transformCallback part 2
10. finalCallback part 1
11. finalCallback part 2
12. flushCallback part 1
13. finishListener
14. endMethodCallback
15. flushCallback part 2
16. endListener
*/

const t = new stream.Transform({
objectMode: true,
transform: common.mustCall(function(chunk, _, next) {
assert.strictEqual(++state, chunk, 'transformCallback part 1');
this.push(state);
assert.strictEqual(++state, chunk + 2, 'transformCallback part 2');
process.nextTick(next);
}, 3),
final: common.mustCall(function(done) {
state++;
assert.strictEqual(state, 10, 'finalCallback part 1');
state++;
assert.strictEqual(state, 11, 'finalCallback part 2');
done();
}, 1),
flush: common.mustCall(function(done) {
state++;
assert.strictEqual(state, 12, 'flushCallback part 1');
process.nextTick(function() {
state++;
assert.strictEqual(state, 15, 'flushCallback part 2');
done();
});
}, 1)
});
t.on('finish', common.mustCall(function() {
state++;
assert.strictEqual(state, 13, 'finishListener');
}, 1));
t.on('end', common.mustCall(function() {
state++;
assert.strictEqual(state, 16, 'end event');
}, 1));
t.on('data', common.mustCall(function(d) {
assert.strictEqual(++state, d + 1, 'dataListener');
}, 3));
t.write(1);
t.write(4);
t.end(7, common.mustCall(function() {
state++;
assert.strictEqual(state, 14, 'endMethodCallback');
}, 1));
Loading

0 comments on commit 07c7f19

Please sign in to comment.