Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: use ByteLengthQueuingStrategy when not in object mode #48847

Merged
merged 14 commits into from
May 12, 2024
7 changes: 2 additions & 5 deletions lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const {

const {
CountQueuingStrategy,
ByteLengthQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');

const {
Expand Down Expand Up @@ -452,11 +453,7 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
return new CountQueuingStrategy({ highWaterMark });
}

// When not running in objectMode explicitly, we just fall
// back to a minimal strategy that just specifies the highWaterMark
// and no size algorithm. Using a ByteLengthQueuingStrategy here
// is unnecessary.
return { highWaterMark };
return new ByteLengthQueuingStrategy({ highWaterMark });
};

const strategy = evaluateStrategyOrFallback(options?.strategy);
Expand Down
62 changes: 62 additions & 0 deletions test/parallel/test-stream-readable-to-web.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto) { common.skip('missing crypto'); }

const { Readable } = require('stream');
const process = require('process');
const { randomBytes } = require('crypto');
const assert = require('assert');

// Based on: https://github.com/nodejs/node/issues/46347#issuecomment-1413886707
// edit: make it cross-platform as /dev/urandom is not available on Windows
{
let currentMemoryUsage = process.memoryUsage().arrayBuffers;

// We initialize a stream, but not start consuming it
const randomNodeStream = new Readable({
read(size) {
randomBytes(size, (err, buffer) => {
if (err) {
// If an error occurs, emit an 'error' event
this.emit('error', err);
return;
}

// Push the random bytes to the stream
this.push(buffer);
});
}
});
// after 2 seconds, it'll get converted to web stream
let randomWebStream;

// We check memory usage every second
// since it's a stream, it shouldn't be higher than the chunk size
const reportMemoryUsage = () => {
const { arrayBuffers } = process.memoryUsage();
currentMemoryUsage = arrayBuffers;

assert(currentMemoryUsage <= 256 * 1024 * 1024);
};
setInterval(reportMemoryUsage, 1000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this test could be somewhat flakey? too many timeouts but nonetheless shouldn't be a blocker


// after 1 second we use Readable.toWeb
// memory usage should stay pretty much the same since it's still a stream
setTimeout(() => {
randomWebStream = Readable.toWeb(randomNodeStream);
}, 1000);

// after 2 seconds we start consuming the stream
// memory usage will grow, but the old chunks should be garbage-collected pretty quickly
setTimeout(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of randomWebStream) {
// Do nothing, just let the stream flow
}
}, 2000);

setTimeout(() => {
// Test considered passed if we don't crash
process.exit(0);
}, 5000);
}