Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throwing a synchronous error in a custom writable stream implementation doesn't immediately stop execution #41846

Closed
ceeser opened this issue Feb 4, 2022 · 2 comments

Comments

@ceeser
Copy link

ceeser commented Feb 4, 2022

Version

v14.18.2

Platform

MacOs Monterey - Darwin 21.3.0 Darwin Kernel Version 21.3.0 root:xnu-8019.80.24~20/RELEASE_X86_64 x86_64

Subsystem

Stream Writable

What steps will reproduce the bug?

When piping in data to a custom writable stream, throw synchronously won't immediately block the stream. The buffer continues processing till the highWaterMark is hit, at which point the error is handled. I believe this is due to onWrite using process.nextTick to handle the errors. It was introduced in this PR.

The following script will trigger the problem. On error should be triggered when the counter is 50, but it actually triggered with the counter is 64 (given than highWaterMark is set to it's default at 16 in objectMode):

const stream = require('stream');

const STREAM_ERR_VAL = 50;

const myWritable = function(fn) {
  return new stream.Writable({
    write(chunk, encoding, cb) {
      if (!fn) {
        return cb();
      }

      let res;
      try {
        const args = [chunk];
        // perform some async task here
        res = fn.apply(null, args);

      } catch (ex) {
        // Failed when executing synchronously:
        // const sync = this._writableState.sync;
        // if (sync) {
        //   this._writableState.sync = false;
        // }
        return cb(ex);
        // this._writableState.sync = sync;

      }

      const is_promise = typeof res.then === 'function' && typeof res.catch === 'function';

      if (is_promise) {
        res
          .then(() => { cb(); })
          .catch(e => { cb(e); });
      } else {
        cb();
      }

    },
    objectMode: true,
  });
};

function noop() {
  return new stream.PassThrough({ objectMode: true, autoDestroy: true });
}


// start a stream that sends numbers consecutively.
function _numStream() {
  const s = noop();
  for (let i=0; i<100; i++) {
    console.log(`Pipe: ${i}`);
    s.write(i);
  }
  s.end();
  return s;
}

function sink(fn) {
  return new myWritable(fn);
};

function runStreamTest () {
  let counter = 0;
  _numStream()
    .pipe(sink(function(val) {
      // do something with val;
      counter++
      // throw in the middle of the stream.
      if (counter === STREAM_ERR_VAL) { throw new Error("TEST ERROR"); }
      // return some promise that needs to be reolved before the stream can continue
      // processing chunks
      return Promise.resolve();
    }))
    .on('error', function() {
      if (counter === STREAM_ERR_VAL) {
        console.log("Handled correctly! Error thrown at " + counter);
      } else {
        console.log("Failed to handle correctly. Error thrown at:  " + counter);
      }
    })
    .on('finish', function() {
      throw new Error("Error not handled correctly");
    });
}

runStreamTest();

How often does it reproduce? Is there a required condition?

It's reproduced 100% of the time.

What is the expected behavior?

The script should output:

Handled correctly! Error thrown at 50 

What do you see instead?

The script outputs:

Failed to handle correctly. Error thrown at:  64 

Additional information

I would have expect this code to run without errors, since the callback is being run in the then arm and the catch arm of the promise. However it seems that invoking onWriteError at nextTick is allowing subsequent buffered chunks to be processed before onWriteError is called when the error is thrown.

If, however, onWriteError is immediately called (not on next tick), the script executes successfully (stopping execution when counter == 50, as expected.

@benjamingr
Copy link
Member

Thanks for the issue - I don't think it's a bug though?

cc @nodejs/streams

@mcollina
Copy link
Member

mcollina commented Feb 4, 2022

This works as expected. Node.js streams are a callback API at their core. There is no built-in try-catch and you have to handle your exceptions.

I would recommend to use async iterators instead as they are a significantly friendlier API with almost no gotchas.

@mcollina mcollina closed this as completed Feb 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants