Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return body as consumable body #895

Closed
wants to merge 18 commits into from
17 changes: 10 additions & 7 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,19 @@ class Agent extends Dispatcher {

dispatch (opts, handler) {
if (!handler || typeof handler !== 'object') {
throw new InvalidArgumentError('handler')
throw new InvalidArgumentError('handler must be an object.')
}

try {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('opts must be a object.')
throw new InvalidArgumentError('opts must be an object.')
}

if (typeof opts.origin !== 'string' || opts.origin === '') {
throw new InvalidArgumentError('opts.origin must be a non-empty string.')
let key
if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
key = String(opts.origin)
} else {
throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
}

if (this[kDestroyed]) {
Expand All @@ -116,7 +119,7 @@ class Agent extends Dispatcher {
throw new ClientClosedError()
}

const ref = this[kClients].get(opts.origin)
const ref = this[kClients].get(key)

let dispatcher = ref ? ref.deref() : null
if (!dispatcher) {
Expand All @@ -126,8 +129,8 @@ class Agent extends Dispatcher {
.on('disconnect', this[kOnDisconnect])
.on('connectionError', this[kOnConnectionError])

this[kClients].set(opts.origin, new WeakRef(dispatcher))
this[kFinalizer].register(dispatcher, opts.origin)
this[kClients].set(key, new WeakRef(dispatcher))
this[kFinalizer].register(dispatcher, key)
}

const { maxRedirections = this[kMaxRedirections] } = opts
Expand Down
114 changes: 107 additions & 7 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,30 @@ const {
InvalidArgumentError,
RequestAbortedError
} = require('../core/errors')
const { Blob } = require('buffer')
const util = require('../core/util')
const { AsyncResource } = require('async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')
const EE = require('events')

const kAbort = Symbol('abort')
const kResume = Symbol('resume')
const kDestroy = Symbol('destroy')
const kPush = Symbol('push')
const kBody = Symbol('body')
const kReadableDidRead = Symbol('readableDidRead')

class RequestResponse extends Readable {
class RequestBody extends Readable {
constructor (resume, abort) {
super({ autoDestroy: true, read: resume })
super({ autoDestroy: true, read: resume, writable: false })
this[kAbort] = abort
this[kReadableDidRead] = false

if (typeof this.readableDidRead !== 'boolean') {
EE.prototype.once.call(this, 'data', function () {
this[kReadableDidRead] = true
})
}
}

_destroy (err, callback) {
Expand All @@ -30,6 +44,92 @@ class RequestResponse extends Readable {
}
}

class Body {
constructor (resume, abort) {
this[kAbort] = abort
this[kResume] = resume
this[kBody] = new RequestBody(this[kResume], this[kAbort]).on('error', () => {})
}

[kPush] (chunk) {
return this[kBody].push(chunk)
}

[kDestroy] (err) {
this[kBody].destroy(err)
}

get stream () {
if (this.bodyUsed) {
throw new TypeError('disturbed')
}
return this[kBody]
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is almost like BodyMixin from the web standard with the small exception of how to consume as Node stream.... anyone got suggestions on how that could/should look? @szmarczak @Ethan-Arrowood @mcollina @dnlup


get bodyUsed () {
return this[kBody].readableDidRead || this[kBody][kReadableDidRead]
}

get body () {
if (!this[kBody].toWeb) {
throw new TypeError('not supported')
}
return this[kBody].toWeb()
}

async blob () {
if (!Blob) {
throw new TypeError('not supported')
}

// TODO: Optimize.
const sources = []
for await (const chunk of this.stream) {
// TOOD: max size?
sources.push(chunk)
}
return new Blob(sources)
}

async buffer () {
// TODO: Optimize.
const sources = []
for await (const chunk of this.stream) {
// TOOD: max size?
sources.push(chunk)
}
return Buffer.concat(sources)
}

async arrayBuffer () {
// TODO: Optimize.
const blob = await this.blob()
return await blob.arrayBuffer()
}
ronag marked this conversation as resolved.
Show resolved Hide resolved

[Symbol.asyncIterator] () {
// TODO: Optimize.
return this.stream[Symbol.asyncIterator]()
}

async text () {
// TODO: Optimize.
// TODO: Validate content-type req & res headers?
let ret = ''
for await (const chunk of this.stream) {
// TOOD: max size?
ret += chunk
}
return ret
}

async json () {
// TODO: Optimize.
// TODO: Validate content-type req & res headers?
return JSON.parse(await this.text())
}
}

ronag marked this conversation as resolved.
Show resolved Hide resolved
class RequestHandler extends AsyncResource {
constructor (opts, callback) {
if (!opts || typeof opts !== 'object') {
Expand Down Expand Up @@ -92,7 +192,7 @@ class RequestHandler extends AsyncResource {
return
}

const body = new RequestResponse(resume, abort)
const body = new Body(resume, abort)

this.callback = null
this.res = body
Expand All @@ -109,7 +209,7 @@ class RequestHandler extends AsyncResource {

onData (chunk) {
const { res } = this
return res.push(chunk)
return res[kPush](chunk)
}

onComplete (trailers) {
Expand All @@ -119,7 +219,7 @@ class RequestHandler extends AsyncResource {

util.parseHeaders(trailers, this.trailers)

res.push(null)
res[kPush](null)
}

onError (err) {
Expand All @@ -139,13 +239,13 @@ class RequestHandler extends AsyncResource {
this.res = null
// Ensure all queued handlers are invoked before destroying res.
queueMicrotask(() => {
util.destroy(res, err)
res[kDestroy](err)
})
}

if (body) {
this.body = null
util.destroy(body, err)
util.destroy(this.body, err)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,12 @@ class Client extends Dispatcher {

dispatch (opts, handler) {
if (!handler || typeof handler !== 'object') {
throw new InvalidArgumentError('handler')
throw new InvalidArgumentError('handler must be an object')
}

try {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('opts must be a object.')
throw new InvalidArgumentError('opts must be an object.')
}

if (this[kDestroyed]) {
Expand Down Expand Up @@ -1093,7 +1093,7 @@ function connect (client) {
let { host, hostname, protocol, port } = client[kUrl]

// Resolve ipv6
if (hostname.startsWith('[')) {
if (hostname[0] === '[') {
const idx = hostname.indexOf(']')

assert(idx !== -1)
Expand Down
6 changes: 4 additions & 2 deletions lib/core/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,17 @@ class Connector {
socket
.setNoDelay(true)
.once(protocol === 'https:' ? 'secureConnect' : 'connect', function () {
if (callback) {
clearTimeout(timeout)
clearTimeout(timeout)

if (callback) {
const cb = callback
callback = null
cb(null, this)
}
})
.on('error', function (err) {
clearTimeout(timeout)

if (callback) {
const cb = callback
callback = null
Expand Down
30 changes: 1 addition & 29 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,35 +97,7 @@ class Request {
throw new InvalidArgumentError('headers must be an object or an array')
}

if (typeof handler.onConnect !== 'function') {
throw new InvalidArgumentError('invalid onConnect method')
}

if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
}

if (typeof handler.onBodySent !== 'function' && handler.onBodySent !== undefined) {
throw new InvalidArgumentError('invalid onBodySent method')
}

if (this.upgrade || this.method === 'CONNECT') {
if (typeof handler.onUpgrade !== 'function') {
throw new InvalidArgumentError('invalid onUpgrade method')
}
} else {
if (typeof handler.onHeaders !== 'function') {
throw new InvalidArgumentError('invalid onHeaders method')
}

if (typeof handler.onData !== 'function') {
throw new InvalidArgumentError('invalid onData method')
}

if (typeof handler.onComplete !== 'function') {
throw new InvalidArgumentError('invalid onComplete method')
}
}
util.validateHandler(handler, method, upgrade)

this.servername = util.getServerName(this.host)

Expand Down
45 changes: 41 additions & 4 deletions lib/core/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ function parseURL (url) {
if (!(url instanceof URL)) {
const port = url.port != null
? url.port
: { 'http:': 80, 'https:': 443 }[url.protocol]
: (url.protocol === 'https:' ? 443 : 80)
const origin = url.origin != null
? url.origin
: `${url.protocol}//${url.hostname}:${port}`
Expand All @@ -75,7 +75,7 @@ function parseURL (url) {
function parseOrigin (url) {
url = parseURL(url)

if (/\/.+/.test(url.pathname) || url.search || url.hash) {
if (url.pathname !== '/' || url.search || url.hash) {
throw new InvalidArgumentError('invalid url')
}

Expand All @@ -91,7 +91,7 @@ function getServerName (host) {

let servername = host

if (servername.startsWith('[')) {
if (servername[0] === '[') {
const idx = servername.indexOf(']')

assert(idx !== -1)
Expand Down Expand Up @@ -188,6 +188,42 @@ function isBuffer (buffer) {
return buffer instanceof Uint8Array || Buffer.isBuffer(buffer)
}

function validateHandler (handler, method, upgrade) {
if (!handler || typeof handler !== 'object') {
throw new InvalidArgumentError('handler must be an object')
}

if (typeof handler.onConnect !== 'function') {
throw new InvalidArgumentError('invalid onConnect method')
}

if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
}

if (typeof handler.onBodySent !== 'function' && handler.onBodySent !== undefined) {
throw new InvalidArgumentError('invalid onBodySent method')
}

if (upgrade || method === 'CONNECT') {
if (typeof handler.onUpgrade !== 'function') {
throw new InvalidArgumentError('invalid onUpgrade method')
}
} else {
if (typeof handler.onHeaders !== 'function') {
throw new InvalidArgumentError('invalid onHeaders method')
}

if (typeof handler.onData !== 'function') {
throw new InvalidArgumentError('invalid onData method')
}

if (typeof handler.onComplete !== 'function') {
throw new InvalidArgumentError('invalid onComplete method')
}
}
}

module.exports = {
nop,
parseOrigin,
Expand All @@ -203,5 +239,6 @@ module.exports = {
destroy,
bodyLength,
deepClone,
isBuffer
isBuffer,
validateHandler
}
2 changes: 2 additions & 0 deletions lib/handler/redirect.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class RedirectHandler {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

util.validateHandler(handler, opts.method, opts.upgrade)

this.dispatcher = dispatcher
this.location = null
this.abort = null
Expand Down
2 changes: 1 addition & 1 deletion lib/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class Pool extends Dispatcher {

dispatch (opts, handler) {
if (!handler || typeof handler !== 'object') {
throw new InvalidArgumentError('handler')
throw new InvalidArgumentError('handler must be an object')
}

try {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "undici",
"version": "4.2.1",
"version": "4.2.2",
"description": "An HTTP/1.1 client, written from scratch for Node.js",
"homepage": "https://undici.nodejs.org",
"bugs": {
Expand Down
Loading