From 683c368d81957ef5b9f53d966a41c58973a18b21 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 26 Jan 2024 14:47:21 +0100 Subject: [PATCH] fix: readable body (#2642) --- lib/api/readable.js | 52 +++++++++++++++++++++--------------- test/node-test/large-body.js | 45 +++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 21 deletions(-) create mode 100644 test/node-test/large-body.js diff --git a/lib/api/readable.js b/lib/api/readable.js index 8ea6d512f60..cd4811591a7 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -94,7 +94,7 @@ module.exports = class BodyReadable extends Readable { } push (chunk) { - if (this[kConsume] && chunk !== null && this.readableLength === 0) { + if (this[kConsume] && chunk !== null) { consumePush(this[kConsume], chunk) return this[kReading] ? super.push(chunk) : true } @@ -215,26 +215,28 @@ async function consume (stream, type) { reject(rState.errored ?? new TypeError('unusable')) } } else { - stream[kConsume] = { - type, - stream, - resolve, - reject, - length: 0, - body: [] - } + queueMicrotask(() => { + stream[kConsume] = { + type, + stream, + resolve, + reject, + length: 0, + body: [] + } - stream - .on('error', function (err) { - consumeFinish(this[kConsume], err) - }) - .on('close', function () { - if (this[kConsume].body !== null) { - consumeFinish(this[kConsume], new RequestAbortedError()) - } - }) + stream + .on('error', function (err) { + consumeFinish(this[kConsume], err) + }) + .on('close', function () { + if (this[kConsume].body !== null) { + consumeFinish(this[kConsume], new RequestAbortedError()) + } + }) - queueMicrotask(() => consumeStart(stream[kConsume])) + consumeStart(stream[kConsume]) + }) } }) } @@ -246,8 +248,16 @@ function consumeStart (consume) { const { _readableState: state } = consume.stream - for (const chunk of state.buffer) { - consumePush(consume, chunk) + if (state.bufferIndex) { + const start = state.bufferIndex + const end = state.buffer.length + for (let n = start; n < end; n++) { + consumePush(consume, state.buffer[n]) + } + } else { + for (const chunk of state.buffer) { + consumePush(consume, chunk) + } } if (state.endEmitted) { diff --git a/test/node-test/large-body.js b/test/node-test/large-body.js new file mode 100644 index 00000000000..4fbe5b7a9b7 --- /dev/null +++ b/test/node-test/large-body.js @@ -0,0 +1,45 @@ +'use strict' + +const { test } = require('node:test') +const { createServer } = require('http') +const { request } = require('../../') +const { strictEqual } = require('node:assert') + +test('socket should not be reused unless body is consumed', async (t) => { + const LARGE_BODY = 'x'.repeat(10000000) + + const server = createServer((req, res) => { + if (req.url === '/foo') { + res.end(LARGE_BODY) + return + } + if (req.url === '/bar') { + res.end('bar') + return + } + throw new Error('Unexpected request url: ' + req.url) + }) + + await new Promise((resolve) => { server.listen(0, resolve) }) + t.after(() => { server.close() }) + + // Works fine + // const fooRes = await request('http://localhost:3000/foo') + // const fooBody = await fooRes.body.text() + + // const barRes = await request('http://localhost:3000/bar') + // await barRes.body.text() + + const port = server.address().port + + // Fails with: + const fooRes = await request(`http://localhost:${port}/foo`) + const barRes = await request(`http://localhost:${port}/bar`) + + const fooBody = await fooRes.body.text() + await barRes.body.text() + + strictEqual(fooRes.headers['content-length'], String(LARGE_BODY.length)) + strictEqual(fooBody.length, LARGE_BODY.length) + strictEqual(fooBody, LARGE_BODY) +})