From 1be85c2dacad8a21419498928a2efc9e21fda14f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 25 Sep 2020 18:50:49 +0200 Subject: [PATCH 1/8] fix: Client.stream writableNeedDrain Fixes: https://github.com/nodejs/undici/issues/441 Refs: https://github.com/nodejs/node/pull/35348 Refs: https://github.com/nodejs/node/issues/35341 --- lib/client-stream.js | 6 ++++++ lib/core/client.js | 45 +++++++++++++++++++++++++++++++++++++++++-- lib/core/request.js | 1 + test/client-stream.js | 44 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 94 insertions(+), 2 deletions(-) diff --git a/lib/client-stream.js b/lib/client-stream.js index d3188cfd956..6d9f75d4035 100644 --- a/lib/client-stream.js +++ b/lib/client-stream.js @@ -109,6 +109,12 @@ class StreamHandler extends AsyncResource { }) this.res = res + + const needDrain = res.writableNeedDrain !== undefined + ? res.writableNeedDrain + : res._writableState && res._writableState.needDrain + + return needDrain !== true } onData (chunk) { diff --git a/lib/core/client.js b/lib/core/client.js index d3e316473a7..3e1b23fa92d 100644 --- a/lib/core/client.js +++ b/lib/core/client.js @@ -374,19 +374,37 @@ class Parser extends HTTPParser { this.headers = null this.shouldKeepAlive = false this.request = null + this.paused = false + this.queue = [] this._resume = () => { - // TODO: Resume parser. + this.paused = false + + while (this.queue.length) { + const [fn, ...args] = this.queue.shift() + + fn.apply(this, args) + + if (this.paused) { + return + } + } + socketResume(socket) } this._pause = () => { - // TODO: Pause parser. + this.paused = true socketPause(socket) } } [HTTPParser.kOnHeaders] (rawHeaders) { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnHeaders], rawHeaders]) + return + } + if (this.headers) { Array.prototype.push.apply(this.headers, rawHeaders) } else { @@ -395,6 +413,11 @@ class Parser extends HTTPParser { } [HTTPParser.kOnExecute] (ret) { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnExecute], ret]) + return + } + const { upgrade, socket } = this if (!Number.isFinite(ret)) { @@ -465,6 +488,12 @@ class Parser extends HTTPParser { [HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method, url, statusCode, statusMessage, upgrade, shouldKeepAlive) { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnHeadersComplete], versionMajor, versionMinor, rawHeaders, method, + url, statusCode, statusMessage, upgrade, shouldKeepAlive]) + return + } + const { client, socket } = this const request = client[kQueue][client[kRunningIdx]] @@ -560,6 +589,11 @@ class Parser extends HTTPParser { } [HTTPParser.kOnBody] (chunk, offset, length) { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnBody], chunk, offset, length]) + return + } + const { socket, statusCode, request } = this if (socket.destroyed) { @@ -578,6 +612,11 @@ class Parser extends HTTPParser { } [HTTPParser.kOnMessageComplete] () { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnMessageComplete]]) + return + } + const { client, socket, statusCode, headers, upgrade, request, trailers } = this if (socket.destroyed) { @@ -785,6 +824,7 @@ function connect (client) { } function socketPause (socket) { + // TODO: Pause parser. if (socket._handle && socket._handle.reading) { socket._handle.reading = false const err = socket._handle.readStop() @@ -795,6 +835,7 @@ function socketPause (socket) { } function socketResume (socket) { + // TODO: Resume parser. if (socket._handle && !socket._handle.reading) { socket._handle.reading = true const err = socket._handle.readStart() diff --git a/lib/core/request.js b/lib/core/request.js index d4bed72afb1..63c55ee7929 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -162,6 +162,7 @@ class Request { onBody (chunk, offset, length) { assert(!this.aborted) + assert(!this[kPaused]) if (this[kTimeout] && this[kTimeout].refresh) { this[kTimeout].refresh() diff --git a/test/client-stream.js b/test/client-stream.js index d36d9fc1a10..fa558f82e2f 100644 --- a/test/client-stream.js +++ b/test/client-stream.js @@ -657,3 +657,47 @@ test('stream body destroyed on invalid callback', (t) => { } }) }) + +test('stream needDrain', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.end(Buffer.alloc(4096)) + }) + t.tearDown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.tearDown(() => { + console.error(3) + client.destroy() + }) + + const dst = new PassThrough() + dst.pause() + + while (dst.write(Buffer.alloc(4096))) { + + } + + const orgWrite = dst.write + dst.write = () => t.fail() + const p = client.stream({ + path: '/', + method: 'GET' + }, () => { + return dst + }) + + setTimeout(() => { + dst.write = (...args) => { + orgWrite.call(dst, ...args) + } + dst.resume() + }, 1e3) + + await p + + t.pass() + }) +}) From 464cf74d521cccf35200d94acc6b83f4b80b7d59 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 12 Nov 2020 13:15:14 +0100 Subject: [PATCH 2/8] fixup --- lib/core/client.js | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/lib/core/client.js b/lib/core/client.js index 3e1b23fa92d..3df6330ff60 100644 --- a/lib/core/client.js +++ b/lib/core/client.js @@ -375,19 +375,29 @@ class Parser extends HTTPParser { this.shouldKeepAlive = false this.request = null this.paused = false + this.resuming = false this.queue = [] this._resume = () => { + if (this.resuming) { + return + } + this.paused = false - while (this.queue.length) { - const [fn, ...args] = this.queue.shift() + this.resuming = true + try { + while (this.queue.length) { + const [fn, ...args] = this.queue.shift() - fn.apply(this, args) + Reflect.apply(fn, this, args) - if (this.paused) { - return + if (this.paused) { + return + } } + } finally { + this.resuming = false } socketResume(socket) @@ -824,7 +834,6 @@ function connect (client) { } function socketPause (socket) { - // TODO: Pause parser. if (socket._handle && socket._handle.reading) { socket._handle.reading = false const err = socket._handle.readStop() @@ -835,7 +844,6 @@ function socketPause (socket) { } function socketResume (socket) { - // TODO: Resume parser. if (socket._handle && !socket._handle.reading) { socket._handle.reading = true const err = socket._handle.readStart() From 39da1e25fda420182504c1a98a6a7aa02bd789bc Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 12 Nov 2020 13:31:49 +0100 Subject: [PATCH 3/8] fixuP --- lib/core/client.js | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/lib/core/client.js b/lib/core/client.js index 3df6330ff60..94ddac4d94e 100644 --- a/lib/core/client.js +++ b/lib/core/client.js @@ -375,36 +375,35 @@ class Parser extends HTTPParser { this.shouldKeepAlive = false this.request = null this.paused = false - this.resuming = false this.queue = [] this._resume = () => { - if (this.resuming) { + if (!this.paused) { return } this.paused = false - this.resuming = true - try { - while (this.queue.length) { - const [fn, ...args] = this.queue.shift() + while (this.queue.length) { + const [fn, ...args] = this.queue.shift() - Reflect.apply(fn, this, args) + Reflect.apply(fn, this, args) - if (this.paused) { - return - } + if (this.paused) { + return } - } finally { - this.resuming = false } socketResume(socket) } this._pause = () => { + if (this.paused) { + return + } + this.paused = true + socketPause(socket) } } From 7a402586fb43d9dffa1319dc912a85ebc49ba015 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 13 Nov 2020 08:45:16 +0100 Subject: [PATCH 4/8] fixup --- lib/core/client.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/core/client.js b/lib/core/client.js index 94ddac4d94e..610d073d1aa 100644 --- a/lib/core/client.js +++ b/lib/core/client.js @@ -375,24 +375,28 @@ class Parser extends HTTPParser { this.shouldKeepAlive = false this.request = null this.paused = false + this.resuming = false this.queue = [] this._resume = () => { - if (!this.paused) { + if (!this.paused || this.resuming) { return } this.paused = false + this.resuming = true while (this.queue.length) { const [fn, ...args] = this.queue.shift() Reflect.apply(fn, this, args) if (this.paused) { + this.resuming = false return } } + this.resuming = false socketResume(socket) } From a9b439fb662c5b5f8c47f78141078d7d99eef0c9 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 13 Nov 2020 09:02:45 +0100 Subject: [PATCH 5/8] fixup --- lib/core/client.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/core/client.js b/lib/core/client.js index 610d073d1aa..d1ae5df7b9c 100644 --- a/lib/core/client.js +++ b/lib/core/client.js @@ -637,6 +637,7 @@ class Parser extends HTTPParser { } assert(statusCode >= 100) + assert(this.resuming || (socket._handle && socket._handle.reading)) if (upgrade) { // TODO: When, how and why does this happen? @@ -650,7 +651,6 @@ class Parser extends HTTPParser { this.trailers = null if (statusCode < 200) { - assert(!socket.isPaused()) return } @@ -695,7 +695,6 @@ class Parser extends HTTPParser { // have been queued since then. util.destroy(socket, new InformationalError('reset')) } else { - socketResume(socket) resume(client) } } From a3d11510c6b75d2e187383fcc9447fd91b739cd9 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 13 Nov 2020 09:03:15 +0100 Subject: [PATCH 6/8] fixup --- test/client-stream.js | 1 - 1 file changed, 1 deletion(-) diff --git a/test/client-stream.js b/test/client-stream.js index fa558f82e2f..816ec8ceb00 100644 --- a/test/client-stream.js +++ b/test/client-stream.js @@ -669,7 +669,6 @@ test('stream needDrain', (t) => { server.listen(0, async () => { const client = new Client(`http://localhost:${server.address().port}`) t.tearDown(() => { - console.error(3) client.destroy() }) From 2e5b606caf145f0741d4bfa867f523cb3c6e065b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 13 Nov 2020 09:26:55 +0100 Subject: [PATCH 7/8] fixup --- test/client-stream.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/client-stream.js b/test/client-stream.js index 816ec8ceb00..26934da4146 100644 --- a/test/client-stream.js +++ b/test/client-stream.js @@ -666,7 +666,7 @@ test('stream needDrain', (t) => { }) t.tearDown(server.close.bind(server)) - server.listen(0, async () => { + server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.tearDown(() => { client.destroy() @@ -688,15 +688,15 @@ test('stream needDrain', (t) => { return dst }) - setTimeout(() => { + setImmediate(() => { dst.write = (...args) => { orgWrite.call(dst, ...args) } dst.resume() - }, 1e3) - - await p + }) - t.pass() + p.then(() => { + t.pass() + }) }) }) From 5ab13dce728e0ed33e929eeae5692be781b0ac93 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 13 Nov 2020 09:29:00 +0100 Subject: [PATCH 8/8] fixup: add comment --- lib/core/client.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/core/client.js b/lib/core/client.js index d1ae5df7b9c..baecbacfc42 100644 --- a/lib/core/client.js +++ b/lib/core/client.js @@ -375,6 +375,9 @@ class Parser extends HTTPParser { this.shouldKeepAlive = false this.request = null this.paused = false + + // Parser can't be paused from within a callback. + // Use a buffer in JS land in order to stop further progress while paused. this.resuming = false this.queue = []