Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Piping from a readable to multiple writables. #2707

Closed
vlopp opened this issue May 16, 2020 · 2 comments
Closed

Piping from a readable to multiple writables. #2707

vlopp opened this issue May 16, 2020 · 2 comments

Comments

@vlopp
Copy link

vlopp commented May 16, 2020

I want to pipe from a single readable to multiple writables. Let's assume writables process the data at various speeds, both being slower than the readable source.

Do I need to implement some extra logic to ensure I don't lose any data? Some old SO threads suggest so: https://stackoverflow.com/questions/19553837/node-js-piping-the-same-readable-stream-into-multiple-writable-targets

On the other side, in this snippet, the faster Writable just waits for the slower one, and both consume all of produced data.

import stream from "stream";

class ReadableStream extends stream.Readable {
  constructor() {
    super({ objectMode: true });
  }

  i = 0;
  _read(size: number) {

    while (true) {
      this.i++;
      if (this.i > 150) {
        return this.push(null);
      }
      
      const resp = this.push({ key: this.i });
      if (!resp) {
        for (let i = 0; i < 10; i++) {
          this.i++;
          console.log(`Pushing over limit: ${this.i}`);
          this.push({ key: this.i });
        }
        return;
      }

    }
  }
}

class FastWritable extends stream.Writable {
  constructor() {
    super({ objectMode: true });
  }

  _write(
    chunk: any,
    encoding: string,
    callback: (error?: Error | null) => void
  ) {
    setTimeout(async () => {
      console.log("FAST WRITE CONSUMED", chunk.key);
      callback();
    }, 20);
  }
}

class SlowWritable extends stream.Writable {
  constructor() {
    super({ objectMode: true });
  }

  _write(
    chunk: any,
    encoding: string,
    callback: (error?: Error | null) => void
  ) {
    setTimeout(async () => {
      console.log("SLOW WRITE CONSUMED", chunk.key);
      callback();
    }, 1500);
  }
}
const read = new ReadableStream();
read.pipe(new FastWritable());
read.pipe(new SlowWritable());
// everything works fine, both streams consume all data
@squarewav
Copy link

squarewav commented May 21, 2020

Once you call pipe(), the stream will switch to flowing mode. So if you want to install a second pipe() or 'data' event handler, you need to do that in the same calling context before it starts flowing. Otherwise, any 'data' events that were emitted or data piped to Writables in between will be "lost". If you called pipe() and then called process.nextTick() that called pipe() on the same Readable, just that context switch alone could be enough, in theory, to loose data.

From the Stream documentation:

If a Readable is switched into flowing mode and there are no consumers available to handle the data, that data will be lost. This can occur, for instance, when the readable.resume() method is called without a listener attached to the 'data' event, or when a 'data' event handler is removed from the stream.

The example from SO you cite violates the API as described above because it uses a 'data' event handler and then later tries to effectively restart streaming the inputStream. But those chunks are long gone. They were consumed completely by the 'data' event handler.

But scanning over your code, it looks correct. When the slow pipe _write implementation does not call callback(), data will be buffered. I don't know exactly what happens under the hood but I think what will happen then is that when the highWaterMark is reached, Writable will call pause() on the Readable. Then everything stops until the callback() is finally called. Then resume() will be called and Readable starts flowing again, calling any 'data' event handlers and pipe()ing.

Again, I don't know the details but my understanding is that pipe() is just using the 'data' event interface. So think about what you would do if you were using 'data' events: In your Readable 'data' handler you would then call write() on the writable. If that returns false, you would have to call pause() on the readable and register a 'drain' event that just calls resume(). That's pretty much pipe() in a nutshell.

@gireeshpunathil
Copy link
Member

closing as answered, pls reopen if there is anything outstanding.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants