Skip to content

Commit

Permalink
Allow onFinished to turn off default endOnError behavior.
Browse files Browse the repository at this point in the history
  • Loading branch information
vqvu committed Sep 5, 2016
1 parent 4b3e98b commit 53ce50b
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 10 deletions.
52 changes: 43 additions & 9 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,19 @@ var Decoder = require('string_decoder').StringDecoder;
* when the Readable ends. If the Readable ended from an error, the error
* should be passed as the first argument to the callback. `onFinished` should
* bind to whatever listener is necessary to detect the Readable's completion.
* It may also optionally return a cleanup function that will be called to
* unbind any listeners that were added. If the callback is called
* multiple times, only the first invocation counts. If the callback is
* called *after* the Readable has already ended (e.g., the `pipe`
* method already called `end`), it will be ignored.
* If the callback is called multiple times, only the first invocation counts.
* If the callback is called *after* the Readable has already ended (e.g., the
* `pipe` method already called `end`), it will be ignored.
*
* The `onFinished` function may optionally return one of the following:
*
* 1. A cleanup function that will be called when the stream ends. It should
* unbind any listeners that were added.
* 2. An object with the following optional properties:
* - `onDestroy` - the cleanup function.
* - `continueOnError` - Whether or not to continue the stream when an
* error is passed to the callback. Set this to `true` if the Readable
* may continue to emit values after errors. Default: `false`.
*
* See [this issue](https://github.com/caolan/highland/issues/490) for a
* discussion on why Highland cannot reliably detect stream completion for
Expand Down Expand Up @@ -128,6 +136,20 @@ var Decoder = require('string_decoder').StringDecoder;
* };
* }).pipe(writable);
*
* // wrapping a Readable that may emit values after errors.
* _(req, function (req, callback) {
* req.on('end', callback)
* .on('error', callback);
*
* return {
* onDestroy: function () {
* req.removeListener('end', callback);
* req.removeListener('error', callback);
* },
* continueOnError: true
* };
* }).pipe(writable);
*
* // creating a stream from events
* _('click', btn).each(handleEvent);
*
Expand Down Expand Up @@ -431,9 +453,20 @@ function defaultReadableOnFinish(readable, callback) {
}

function pipeReadable(xs, onFinish, stream) {
var cleanup = onFinish(xs, streamEndCb);
var response = onFinish(xs, streamEndCb);
var unbound = false;

var cleanup = null;
var endOnError = true;

if (_.isFunction(response)) {
cleanup = response;
}
else if (response != null) {
cleanup = response.cleanup;
endOnError = !response.continueOnError;
}

xs.pipe(stream);

// TODO: Replace with onDestroy in v3.
Expand All @@ -444,13 +477,14 @@ function pipeReadable(xs, onFinish, stream) {
return;
}

unbind();

if (error) {
stream.write(new StreamError(error));
}

stream.end();
if (error == null || endOnError) {
unbind();
stream.end();
}
}

function unbind() {
Expand Down
67 changes: 66 additions & 1 deletion test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ exports.constructor = {
test.strictEqual(cleanup.callCount, 1);
test.done();
},
'from Readable - custom onFinish handler emits error': function (test) {
'from Readable - custom onFinish handler - emits error': function (test) {
test.expect(2);
var clock = sinon.useFakeTimers();
var rs = new Stream.Readable();
Expand Down Expand Up @@ -760,6 +760,71 @@ exports.constructor = {
test.strictEqual(cleanup.callCount, 1);
test.done();
},
'from Readable - custom onFinish handler - default to end on error': function (test) {
test.expect(2);
var clock = sinon.useFakeTimers();
var rs = new Stream.Readable();
var firstTime = true;

rs._read = function (size) {
// Infinite stream!
};

var error1 = new Error('error1');
var error2 = new Error('error2');
var s = _(rs, function (_rs, callback) {
setTimeout(function () {
callback(error1);
}, 1000);
setTimeout(function () {
callback(error2);
callback();
}, 2000);
});

clock.tick(2000);
clock.restore();

s.pull(errorEquals(test, 'error1'));
s.pull(valueEquals(test, _.nil));

test.done();
},
'from Readable - custom onFinish handler - emits multiple errors': function (test) {
test.expect(3);
var clock = sinon.useFakeTimers();
var rs = new Stream.Readable();
var firstTime = true;

rs._read = function (size) {
// Infinite stream!
};

var error1 = new Error('error1');
var error2 = new Error('error2');
var s = _(rs, function (_rs, callback) {
setTimeout(function () {
callback(error1);
}, 1000);
setTimeout(function () {
callback(error2);
callback();
}, 2000);

return {
continueOnError: true
};
});

clock.tick(2000);
clock.restore();

s.pull(errorEquals(test, 'error1'));
s.pull(errorEquals(test, 'error2'));
s.pull(valueEquals(test, _.nil));

test.done();
},
'throws error for unsupported object': function (test) {
test.expect(1);
test.throws(function () {
Expand Down

0 comments on commit 53ce50b

Please sign in to comment.