Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: errored stream is disturbed #1134

Merged
merged 19 commits into from
Dec 10, 2021
32 changes: 26 additions & 6 deletions lib/core/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ function isDestroyed (stream) {
return !stream || !!(stream.destroyed || stream[kDestroyed])
}

function isAborted (stream) {
function isReadableAborted (stream) {
const state = stream && stream._readableState
return isDestroyed(stream) && state && !state.endEmitted
}
Expand Down Expand Up @@ -244,13 +244,32 @@ function validateHandler (handler, method, upgrade) {
// A body is disturbed if it has been read from and it cannot
// be re-used without losing state or data.
function isDisturbed (body) {
if (!body) {
return false
}

if (stream.isDisturbed) {
return stream.isDisturbed(body)
}

const state = body && body._readableState
return !!(body && (
(stream.isDisturbed && stream.isDisturbed(body)) ||
return !!(
body[kBodyUsed] ||
body.readableDidRead || (state && state.dataEmitted) ||
isAborted(body)
))
isReadableAborted(body)
szmarczak marked this conversation as resolved.
Show resolved Hide resolved
)
}

function isErrored (body) {
if (!body) {
return false
}

if (stream.isErrored) {
return stream.isErrored(body)
}

return /state: 'errored'/.test(nodeUtil.inspect(body))
}

function getSocketInfo (socket) {
Expand Down Expand Up @@ -310,8 +329,9 @@ module.exports = {
kEnumerableProperty,
nop,
isDisturbed,
isErrored,
toUSVString: nodeUtil.toUSVString || ((val) => `${val}`),
isAborted,
isReadableAborted,
isBlobLike,
parseOrigin,
parseURL,
Expand Down
9 changes: 3 additions & 6 deletions lib/fetch/body.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
const util = require('../core/util')
const { ReadableStreamFrom, toUSVString, isBlobLike } = require('./util')
const { FormData } = require('./formdata')
const { kState, kError } = require('./symbols')
const { kState } = require('./symbols')
const { Blob } = require('buffer')
const { kBodyUsed } = require('../core/symbols')
const assert = require('assert')
const nodeUtil = require('util')
const { NotSupportedError } = require('../core/errors')
const { isErrored } = require('../core/util')

let ReadableStream

Expand Down Expand Up @@ -187,7 +188,7 @@ function extractBody (object, keepalive = false) {
// Whenever one or more bytes are available and stream is not errored,
// enqueue a Uint8Array wrapping an ArrayBuffer containing the available
// bytes into stream.
if (!/state: 'errored'/.test(nodeUtil.inspect(stream))) {
if (!isErrored(stream)) {
controller.enqueue(new Uint8Array(value))
}
}
Expand Down Expand Up @@ -268,10 +269,6 @@ const methods = {
if (this[kState].body) {
const stream = this[kState].body.stream

if (stream[kError]) {
throw stream[kError]
}

if (util.isDisturbed(stream)) {
throw new TypeError('disturbed')
}
Expand Down
8 changes: 4 additions & 4 deletions lib/fetch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const {
determineRequestsReferrer,
coarsenedSharedCurrentTime
} = require('./util')
const { kState, kHeaders, kGuard, kRealm, kError } = require('./symbols')
const { kState, kHeaders, kGuard, kRealm } = require('./symbols')
const { AbortError } = require('../core/errors')
const assert = require('assert')
const { safelyExtractBody, cancelBody } = require('./body')
Expand All @@ -45,6 +45,7 @@ const {
const { kHeadersList } = require('../core/symbols')
const EE = require('events')
const { PassThrough, pipeline, compose } = require('stream')
const { isErrored } = require('../core/util')

let ReadableStream

Expand Down Expand Up @@ -1531,7 +1532,6 @@ function httpNetworkFetch (
await pullAlgorithm(controller)
},
async cancel (reason) {
stream[kError] = reason
await cancelAlgorithm(reason)
}
},
Expand Down Expand Up @@ -1742,8 +1742,8 @@ function httpNetworkFetch (
controller.enqueue(new Uint8Array(bytes))

// 8. If stream is errored, then terminate the ongoing fetch.
if (stream[kError]) {
this.context.terminate({ reason: stream[kError] })
if (isErrored(stream)) {
this.context.terminate()
ronag marked this conversation as resolved.
Show resolved Hide resolved
return
}

Expand Down
3 changes: 1 addition & 2 deletions lib/fetch/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@ module.exports = {
kSignal: Symbol('signal'),
kState: Symbol('state'),
kGuard: Symbol('guard'),
kRealm: Symbol('realm'),
kError: Symbol('error')
kRealm: Symbol('realm')
}
2 changes: 1 addition & 1 deletion test/node-fetch/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ describe('node-fetch', () => {
return expect(res.text())
.to.eventually.be.rejected
.and.be.an.instanceof(Error)
.and.have.property('name', 'AbortError')
.and.have.property('name', 'TypeError')
szmarczak marked this conversation as resolved.
Show resolved Hide resolved
})
})
})
Expand Down