diff --git a/lib/client-stream.js b/lib/client-stream.js index d3188cfd956..6d9f75d4035 100644 --- a/lib/client-stream.js +++ b/lib/client-stream.js @@ -109,6 +109,12 @@ class StreamHandler extends AsyncResource { }) this.res = res + + const needDrain = res.writableNeedDrain !== undefined + ? res.writableNeedDrain + : res._writableState && res._writableState.needDrain + + return needDrain !== true } onData (chunk) { diff --git a/lib/core/client.js b/lib/core/client.js index d3e316473a7..baecbacfc42 100644 --- a/lib/core/client.js +++ b/lib/core/client.js @@ -374,19 +374,53 @@ class Parser extends HTTPParser { this.headers = null this.shouldKeepAlive = false this.request = null + this.paused = false + + // Parser can't be paused from within a callback. + // Use a buffer in JS land in order to stop further progress while paused. + this.resuming = false + this.queue = [] this._resume = () => { - // TODO: Resume parser. + if (!this.paused || this.resuming) { + return + } + + this.paused = false + + this.resuming = true + while (this.queue.length) { + const [fn, ...args] = this.queue.shift() + + Reflect.apply(fn, this, args) + + if (this.paused) { + this.resuming = false + return + } + } + this.resuming = false + socketResume(socket) } this._pause = () => { - // TODO: Pause parser. + if (this.paused) { + return + } + + this.paused = true + socketPause(socket) } } [HTTPParser.kOnHeaders] (rawHeaders) { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnHeaders], rawHeaders]) + return + } + if (this.headers) { Array.prototype.push.apply(this.headers, rawHeaders) } else { @@ -395,6 +429,11 @@ class Parser extends HTTPParser { } [HTTPParser.kOnExecute] (ret) { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnExecute], ret]) + return + } + const { upgrade, socket } = this if (!Number.isFinite(ret)) { @@ -465,6 +504,12 @@ class Parser extends HTTPParser { [HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method, url, statusCode, statusMessage, upgrade, shouldKeepAlive) { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnHeadersComplete], versionMajor, versionMinor, rawHeaders, method, + url, statusCode, statusMessage, upgrade, shouldKeepAlive]) + return + } + const { client, socket } = this const request = client[kQueue][client[kRunningIdx]] @@ -560,6 +605,11 @@ class Parser extends HTTPParser { } [HTTPParser.kOnBody] (chunk, offset, length) { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnBody], chunk, offset, length]) + return + } + const { socket, statusCode, request } = this if (socket.destroyed) { @@ -578,6 +628,11 @@ class Parser extends HTTPParser { } [HTTPParser.kOnMessageComplete] () { + if (this.paused) { + this.queue.push([this[HTTPParser.kOnMessageComplete]]) + return + } + const { client, socket, statusCode, headers, upgrade, request, trailers } = this if (socket.destroyed) { @@ -585,6 +640,7 @@ class Parser extends HTTPParser { } assert(statusCode >= 100) + assert(this.resuming || (socket._handle && socket._handle.reading)) if (upgrade) { // TODO: When, how and why does this happen? @@ -598,7 +654,6 @@ class Parser extends HTTPParser { this.trailers = null if (statusCode < 200) { - assert(!socket.isPaused()) return } @@ -643,7 +698,6 @@ class Parser extends HTTPParser { // have been queued since then. util.destroy(socket, new InformationalError('reset')) } else { - socketResume(socket) resume(client) } } diff --git a/lib/core/request.js b/lib/core/request.js index d4bed72afb1..63c55ee7929 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -162,6 +162,7 @@ class Request { onBody (chunk, offset, length) { assert(!this.aborted) + assert(!this[kPaused]) if (this[kTimeout] && this[kTimeout].refresh) { this[kTimeout].refresh() diff --git a/test/client-stream.js b/test/client-stream.js index d36d9fc1a10..26934da4146 100644 --- a/test/client-stream.js +++ b/test/client-stream.js @@ -657,3 +657,46 @@ test('stream body destroyed on invalid callback', (t) => { } }) }) + +test('stream needDrain', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.end(Buffer.alloc(4096)) + }) + t.tearDown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.tearDown(() => { + client.destroy() + }) + + const dst = new PassThrough() + dst.pause() + + while (dst.write(Buffer.alloc(4096))) { + + } + + const orgWrite = dst.write + dst.write = () => t.fail() + const p = client.stream({ + path: '/', + method: 'GET' + }, () => { + return dst + }) + + setImmediate(() => { + dst.write = (...args) => { + orgWrite.call(dst, ...args) + } + dst.resume() + }) + + p.then(() => { + t.pass() + }) + }) +})