Skip to content

Commit

Permalink
deps: update undici to 5.22.0
Browse files Browse the repository at this point in the history
PR-URL: nodejs#47679
Reviewed-By: Mohammed Keyvanzadeh <mohammadkeyvanzade94@gmail.com>
Reviewed-By: Debadree Chatterjee <debadree333@gmail.com>
Reviewed-By: Matthew Aitken <maitken033380023@gmail.com>
  • Loading branch information
nodejs-github-bot authored and MoLow committed Jul 6, 2023
1 parent 323881f commit 7283486
Show file tree
Hide file tree
Showing 19 changed files with 235 additions and 182 deletions.
67 changes: 22 additions & 45 deletions deps/undici/src/lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
const Readable = require('./readable')
const {
InvalidArgumentError,
RequestAbortedError,
ResponseStatusCodeError
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')

Expand All @@ -16,13 +16,17 @@ class RequestHandler extends AsyncResource {
throw new InvalidArgumentError('invalid opts')
}

const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts
const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError, highWaterMark } = opts

try {
if (typeof callback !== 'function') {
throw new InvalidArgumentError('invalid callback')
}

if (highWaterMark && (typeof highWaterMark !== 'number' || highWaterMark < 0)) {
throw new InvalidArgumentError('invalid highWaterMark')
}

if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
}
Expand Down Expand Up @@ -53,6 +57,7 @@ class RequestHandler extends AsyncResource {
this.context = null
this.onInfo = onInfo || null
this.throwOnError = throwOnError
this.highWaterMark = highWaterMark

if (util.isStream(body)) {
body.on('error', (err) => {
Expand All @@ -73,40 +78,39 @@ class RequestHandler extends AsyncResource {
}

onHeaders (statusCode, rawHeaders, resume, statusMessage) {
const { callback, opaque, abort, context } = this
const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this

const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)

if (statusCode < 200) {
if (this.onInfo) {
const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
this.onInfo({ statusCode, headers })
}
return
}

const parsedHeaders = util.parseHeaders(rawHeaders)
const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
const contentType = parsedHeaders['content-type']
const body = new Readable(resume, abort, contentType)
const body = new Readable({ resume, abort, contentType, highWaterMark })

this.callback = null
this.res = body
const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)

if (callback !== null) {
if (this.throwOnError && statusCode >= 400) {
this.runInAsyncScope(getResolveErrorBodyCallback, null,
{ callback, body, contentType, statusCode, statusMessage, headers }
)
return
} else {
this.runInAsyncScope(callback, null, null, {
statusCode,
headers,
trailers: this.trailers,
opaque,
body,
context
})
}

this.runInAsyncScope(callback, null, null, {
statusCode,
headers,
trailers: this.trailers,
opaque,
body,
context
})
}
}

Expand Down Expand Up @@ -153,33 +157,6 @@ class RequestHandler extends AsyncResource {
}
}

async function getResolveErrorBodyCallback ({ callback, body, contentType, statusCode, statusMessage, headers }) {
if (statusCode === 204 || !contentType) {
body.dump()
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers))
return
}

try {
if (contentType.startsWith('application/json')) {
const payload = await body.json()
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload))
return
}

if (contentType.startsWith('text/')) {
const payload = await body.text()
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload))
return
}
} catch (err) {
// Process in a fallback if error
}

body.dump()
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers))
}

function request (opts, callback) {
if (callback === undefined) {
return new Promise((resolve, reject) => {
Expand Down
103 changes: 46 additions & 57 deletions deps/undici/src/lib/api/api-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ const { finished, PassThrough } = require('stream')
const {
InvalidArgumentError,
InvalidReturnValueError,
RequestAbortedError,
ResponseStatusCodeError
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')

Expand Down Expand Up @@ -79,77 +79,66 @@ class StreamHandler extends AsyncResource {
}

onHeaders (statusCode, rawHeaders, resume, statusMessage) {
const { factory, opaque, context, callback } = this
const { factory, opaque, context, callback, responseHeaders } = this

const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)

if (statusCode < 200) {
if (this.onInfo) {
const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
this.onInfo({ statusCode, headers })
}
return
}

this.factory = null
const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
const res = this.runInAsyncScope(factory, null, {
statusCode,
headers,
opaque,
context
})

if (this.throwOnError && statusCode >= 400) {
const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
const chunks = []
const pt = new PassThrough()
pt
.on('data', (chunk) => chunks.push(chunk))
.on('end', () => {
const payload = Buffer.concat(chunks).toString('utf8')
this.runInAsyncScope(
callback,
null,
new ResponseStatusCodeError(
`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`,
statusCode,
headers,
payload
)
)
})
.on('error', (err) => {
this.onError(err)
})
this.res = pt
return
}
let res

if (
!res ||
typeof res.write !== 'function' ||
typeof res.end !== 'function' ||
typeof res.on !== 'function'
) {
throw new InvalidReturnValueError('expected Writable')
}
if (this.throwOnError && statusCode >= 400) {
const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
const contentType = parsedHeaders['content-type']
res = new PassThrough()

res.on('drain', resume)
// TODO: Avoid finished. It registers an unnecessary amount of listeners.
finished(res, { readable: false }, (err) => {
const { callback, res, opaque, trailers, abort } = this
this.callback = null
this.runInAsyncScope(getResolveErrorBodyCallback, null,
{ callback, body: res, contentType, statusCode, statusMessage, headers }
)
} else {
res = this.runInAsyncScope(factory, null, {
statusCode,
headers,
opaque,
context
})

this.res = null
if (err || !res.readable) {
util.destroy(res, err)
if (
!res ||
typeof res.write !== 'function' ||
typeof res.end !== 'function' ||
typeof res.on !== 'function'
) {
throw new InvalidReturnValueError('expected Writable')
}

this.callback = null
this.runInAsyncScope(callback, null, err || null, { opaque, trailers })
// TODO: Avoid finished. It registers an unnecessary amount of listeners.
finished(res, { readable: false }, (err) => {
const { callback, res, opaque, trailers, abort } = this

if (err) {
abort()
}
})
this.res = null
if (err || !res.readable) {
util.destroy(res, err)
}

this.callback = null
this.runInAsyncScope(callback, null, err || null, { opaque, trailers })

if (err) {
abort()
}
})
}

res.on('drain', resume)

this.res = res

Expand Down
9 changes: 7 additions & 2 deletions deps/undici/src/lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ const kAbort = Symbol('abort')
const kContentType = Symbol('kContentType')

module.exports = class BodyReadable extends Readable {
constructor (resume, abort, contentType = '') {
constructor ({
resume,
abort,
contentType = '',
highWaterMark = 64 * 1024 // Same as nodejs fs streams.
}) {
super({
autoDestroy: true,
read: resume,
highWaterMark: 64 * 1024 // Same as nodejs fs streams.
highWaterMark
})

this._readableState.dataEmitted = false
Expand Down
46 changes: 46 additions & 0 deletions deps/undici/src/lib/api/util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
const assert = require('assert')
const {
ResponseStatusCodeError
} = require('../core/errors')
const { toUSVString } = require('../core/util')

async function getResolveErrorBodyCallback ({ callback, body, contentType, statusCode, statusMessage, headers }) {
assert(body)

let chunks = []
let limit = 0

for await (const chunk of body) {
chunks.push(chunk)
limit += chunk.length
if (limit > 128 * 1024) {
chunks = null
break
}
}

if (statusCode === 204 || !contentType || !chunks) {
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers))
return
}

try {
if (contentType.startsWith('application/json')) {
const payload = JSON.parse(toUSVString(Buffer.concat(chunks)))
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload))
return
}

if (contentType.startsWith('text/')) {
const payload = toUSVString(Buffer.concat(chunks))
process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload))
return
}
} catch (err) {
// Process in a fallback if error
}

process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers))
}

module.exports = { getResolveErrorBodyCallback }
Loading

0 comments on commit 7283486

Please sign in to comment.