Skip to content

Commit

Permalink
stream: use ByteLengthQueuingStrategy when not in objectMode
Browse files Browse the repository at this point in the history
Fixes: #46347
PR-URL: #48847
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
CGQAQ authored and marco-ippolito committed Jun 17, 2024
1 parent be309bd commit af29120
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 5 deletions.
7 changes: 2 additions & 5 deletions lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const {

const {
CountQueuingStrategy,
ByteLengthQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');

const {
Expand Down Expand Up @@ -417,11 +418,7 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
return new CountQueuingStrategy({ highWaterMark });
}

// When not running in objectMode explicitly, we just fall
// back to a minimal strategy that just specifies the highWaterMark
// and no size algorithm. Using a ByteLengthQueuingStrategy here
// is unnecessary.
return { highWaterMark };
return new ByteLengthQueuingStrategy({ highWaterMark });
};

const strategy = evaluateStrategyOrFallback(options?.strategy);
Expand Down
62 changes: 62 additions & 0 deletions test/parallel/test-stream-readable-to-web.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto) { common.skip('missing crypto'); }

const { Readable } = require('stream');
const process = require('process');
const { randomBytes } = require('crypto');
const assert = require('assert');

// Based on: https://github.com/nodejs/node/issues/46347#issuecomment-1413886707
// edit: make it cross-platform as /dev/urandom is not available on Windows
{
let currentMemoryUsage = process.memoryUsage().arrayBuffers;

// We initialize a stream, but not start consuming it
const randomNodeStream = new Readable({
read(size) {
randomBytes(size, (err, buffer) => {
if (err) {
// If an error occurs, emit an 'error' event
this.emit('error', err);
return;
}

// Push the random bytes to the stream
this.push(buffer);
});
}
});
// after 2 seconds, it'll get converted to web stream
let randomWebStream;

// We check memory usage every second
// since it's a stream, it shouldn't be higher than the chunk size
const reportMemoryUsage = () => {
const { arrayBuffers } = process.memoryUsage();
currentMemoryUsage = arrayBuffers;

assert(currentMemoryUsage <= 256 * 1024 * 1024);
};
setInterval(reportMemoryUsage, 1000);

// after 1 second we use Readable.toWeb
// memory usage should stay pretty much the same since it's still a stream
setTimeout(() => {
randomWebStream = Readable.toWeb(randomNodeStream);
}, 1000);

// after 2 seconds we start consuming the stream
// memory usage will grow, but the old chunks should be garbage-collected pretty quickly
setTimeout(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of randomWebStream) {
// Do nothing, just let the stream flow
}
}, 2000);

setTimeout(() => {
// Test considered passed if we don't crash
process.exit(0);
}, 5000);
}

0 comments on commit af29120

Please sign in to comment.