Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams: Breaking Change in Behavior after node 17.1 #41700

Closed
markddrake opened this issue Jan 25, 2022 · 12 comments
Closed

Streams: Breaking Change in Behavior after node 17.1 #41700

markddrake opened this issue Jan 25, 2022 · 12 comments
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@markddrake
Copy link

markddrake commented Jan 25, 2022

Version

17.3.1

Platform

Microsoft Windows NT 10.0.22000.0 x64

Subsystem

Streams

What steps will reproduce the bug?

Run the following simple testcase

const { PassThrough} = require('stream');
const { pipeline, finished,  } = require('stream/promises');
const fs = require('fs');

class StreamSwitcher extends PassThrough {

  constructor() {
	super()
  }
  
  pipe(os,options) {
    options = options || {end: false}
	return super.pipe(os,options)
  }
  
  async _transform(data,enc,callback) {
    this.push(data)
	callback()
  }
 
}


async function main() {

   let is = fs.createReadStream(__filename)
   const os = fs.createWriteStream(__filename + ".copy")
   const switcher = new StreamSwitcher()
   
   const streams = [is,switcher,os]
   pipeline(streams).then(() => {console.log('Pipeline Complete')}).catch((cause) => {console.log('Pipeline Error:',e)})
	 
   await finished(switcher)
   console.log('Finished')

   streams.forEach((s,i) => { 
     s.removeAllListeners('unpipe')
     if (i < streams.length-1) {
       s.unpipe(streams[i+1])
     }
	 s.removeAllListeners(); 
   })	   
	 
   // Destroy the source streams
   is.destroy()
   switcher.destroy()
  
   is = fs.createReadStream(__filename)
   try {
	 console.log('Attempting Append Operation')
     await pipeline(is,os)
     console.log('Append Completed')
   } catch (e) {
	 console.log(e)
     throw e
   }

}

main().then(() => {console.log('Success')}).catch((e) => {console.log('Failed',e)})

In node 17.1.0 the "finished" promise resolves and the second pipeline is executes and completes and the output file is twice the size of the input file

C:\temp>dir
 Volume in drive C has no label.
 Volume Serial Number is 6AA6-883E

 Directory of C:\temp

01/25/2022  02:08 PM    <DIR>          .
01/25/2022  02:08 PM             1,385 test.js
               1 File(s)          1,385 bytes
               1 Dir(s)  39,777,529,856 bytes free

C:\temp>node -v
v17.1.0

C:\temp>node test.js
Finished
Attempting Append Operation
Append Completed
Success

C:\temp>dir
 Volume in drive C has no label.
 Volume Serial Number is 6AA6-883E

 Directory of C:\temp

01/25/2022  02:08 PM    <DIR>          .
01/25/2022  02:08 PM             1,385 test.js
01/25/2022  02:08 PM             2,770 test.js.copy
               2 File(s)          4,155 bytes
               1 Dir(s)  39,777,456,128 bytes free

C:\temp>

In node 17.4.0 node the finished promise resolves and the append operation is attempted. However, it appears that despite no error being thrown, the second copy never completes.

C:\temp>dir
 Volume in drive C has no label.
 Volume Serial Number is 6AA6-883E

 Directory of C:\temp

01/25/2022  02:08 PM    <DIR>          .
01/25/2022  02:08 PM             1,385 test.js
01/25/2022  02:08 PM             2,770 test.js.copy
               2 File(s)          4,155 bytes
               1 Dir(s)  39,772,274,688 bytes free

C:\temp>node -v
v17.4.0

C:\temp>node test.js
Finished
Attempting Append Operation

C:\temp>dir
 Volume in drive C has no label.
 Volume Serial Number is 6AA6-883E

 Directory of C:\temp

01/25/2022  02:08 PM    <DIR>          .
01/25/2022  02:08 PM             1,385 test.js
01/25/2022  02:12 PM             1,385 test.js.copy
               2 File(s)          2,770 bytes
               1 Dir(s)  39,772,274,688 bytes free

C:\temp>

This may be related to #34805 (comment)

How often does it reproduce? Is there a required condition?

See the above testcase

What is the expected behavior?

IMHO the 17.1.0 behavior is correct ?

What do you see instead?

See the testcase

Additional information

No response

@VoltrexKeyva VoltrexKeyva added the stream Issues and PRs related to the stream subsystem. label Jan 25, 2022
@targos
Copy link
Member

targos commented Jan 26, 2022

@nodejs/streams

@MrJithil
Copy link
Member

MrJithil commented Jan 26, 2022

The actual issue is the "Attempting Append Operation" being failed due to an error that is not traversing to the top-level code.

The reason for this error is stream is moving to an ending state. Maybe PR #36817 introduced this behavior.

Is it the right behavior or not? If not, we can think about a fix.

CC: @ronag

Error is not being thrown because we are stopping the error propagation.
We may need to change the line https://github.com/nodejs/node/blob/master/lib/internal/streams/writable.js#L328 to something as below:

if (err) {
    process.nextTick(cb, err);
    errorOrDestroy(stream, err, true);
    throw err; // To throw the error
}

@markddrake
Copy link
Author

@MrJithil, am I correct that your proposed solution would still leaving the breaking change in place. It would simply surface an error where no error occurred previously

@markddrake
Copy link
Author

BTW, before someone proposes that I could simply create a new write stream over the output file, opened in append mode, consider the (real world) use case where I have a pipeline that includes zip and encryption streams in the pipeline prior to the final write stream, and I want to stop the finished/end from propagating to the zip, encryption and write streams.

@mcollina
Copy link
Member

mcollina commented Jan 27, 2022

In my opinion this is not a breaking change because you are overriding a core behavior of streams, pipe, setting a default to not end the following stream. In order for your example to work, you are relying on implementation details of pipeline. By doing this, you are interfering with how pipeline works and you are on your own.

Note that you also have some subtle bugs in your code: in the event that one of the sources errors, your destination stream will be destroyed, making the second block not work.

At a very high level you should not be using pipeline at all for your use case. pipeline is a high-level utility to make sure all streams are destroyed and resource consumed after the dust is settled. pipe is a low-level utility that does not offer any of those guarantees. You should just use pipe and drop all the complications.

@ronag do you think there is a way to handle this case anyway?

@ronag
Copy link
Member

ronag commented Jan 27, 2022

That example does not look like good code practice. I think we landed { end } in pipeline. Not sure what version though. But I believe this works on master:

   const os = fs.createWriteStream(__filename + ".copy")
   await pipeline(fs.createReadStream(__filename), os, { end: false })
   await pipeline(fs.createReadStream(__filename), os)

@markddrake
Copy link
Author

@mcollina @ronag

The example provided was designed to be a minimal testcase, and as such does include adequate error handling or other good coding practices. I have always assumed that this makes it easier to understand the problem

I do believe that this is a breaking change. Obviously, it up to you whether you choose to fix it or not. However, if you believe the new behavior is correct then I would have expected you to release note it.

@ronag
Looking at the {end} option, what happens if I have something like this

const cipherStream = crypto.createCipheriv(this.yadamu.CIPHER,this.yadamu.ENCRYPTION_KEY,this.INITIALIZATION_VECTOR)
const compressor = createGzip() ()
const os = const os = fs.createWriteStream(__filename + ".compressed_and_encrypted")
 await pipeline(fs.createReadStream(__filename), cipherStream, compressor, os, { end: false })
 await pipeline(fs.createReadStream(__filename), cipherStream, compressor, os, { end: false })

In this example I would need to stop the 'end' on cipherStream, compressor and os. I am not sure whether this is possible given the example above

@ronag
Copy link
Member

ronag commented Jan 27, 2022

 const dst = compose(cipherStream, compressor, os)
 await pipeline(fs.createReadStream(__filename), dst, { end: false })
 await pipeline(fs.createReadStream(__filename), dst)

@ronag
Copy link
Member

ronag commented Jan 27, 2022

I'm not sure what the breaking change here is. Don't quite understand the original example.

@markddrake
Copy link
Author

markddrake commented Jan 27, 2022

The original example is a simple test case that shows that, in 17.1, I could include a subclass of the PassThrough stream which overrides pipe() to prevent 'end' events from propagating, in a pipeline operation and this would prevent the 'end' event from propagating to the components to the right of the PassThrough when the reader reached the end of the file.

I could then start a second pipeline operation that would feed additional data to the streams that occurred to the right of the PassThrough in the original pipeline.

In 17.2 this stops working.

Your implementation of { end } would certainly be much cleaner alternative, but as far as can tell from the example provided, it only works for the last component of the pipeline, so won't work if I need to prevent the 'end' event on something other than the last stream, e.g. when I am using streams to manage encryption and or compression.

@markddrake
Copy link
Author

markddrake commented Jan 27, 2022

But I hadn't seen compose() prior to this. I think this will solve my issue. I will test and get back to you. Is there a way of telling which version of node has the {end} implementation

@markddrake
Copy link
Author

@ronag Using compose and { end:false } appears resolve my issue. At this point I've only run some basic tests. Will run the full suite of regression tests over the next few days. It also has allowed me to remove some fairly hairy code that I was always worried was a weak point. Thank you very much.

BTW in another thread you alluded to async versions of _write() etc, but I still don't see any documentation. Does this mean that the promises library contains a new version of any class that currently relies on one or more functions that take a callback, e.g. Writeable, Transform? What about PassThrough ?

Do you have a simple example of an async Writeable ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

6 participants