diff --git a/lib/agent.js b/lib/agent.js index 2d58d9be5d2..9866d958d16 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -96,16 +96,19 @@ class Agent extends Dispatcher { dispatch (opts, handler) { if (!handler || typeof handler !== 'object') { - throw new InvalidArgumentError('handler') + throw new InvalidArgumentError('handler must be an object.') } try { if (!opts || typeof opts !== 'object') { - throw new InvalidArgumentError('opts must be a object.') + throw new InvalidArgumentError('opts must be an object.') } - if (typeof opts.origin !== 'string' || opts.origin === '') { - throw new InvalidArgumentError('opts.origin must be a non-empty string.') + let key + if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) { + key = String(opts.origin) + } else { + throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.') } if (this[kDestroyed]) { @@ -116,7 +119,7 @@ class Agent extends Dispatcher { throw new ClientClosedError() } - const ref = this[kClients].get(opts.origin) + const ref = this[kClients].get(key) let dispatcher = ref ? ref.deref() : null if (!dispatcher) { @@ -126,8 +129,8 @@ class Agent extends Dispatcher { .on('disconnect', this[kOnDisconnect]) .on('connectionError', this[kOnConnectionError]) - this[kClients].set(opts.origin, new WeakRef(dispatcher)) - this[kFinalizer].register(dispatcher, opts.origin) + this[kClients].set(key, new WeakRef(dispatcher)) + this[kFinalizer].register(dispatcher, key) } const { maxRedirections = this[kMaxRedirections] } = opts diff --git a/lib/api/api-request.js b/lib/api/api-request.js index e9a67430ea0..e6619568eb2 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -5,16 +5,30 @@ const { InvalidArgumentError, RequestAbortedError } = require('../core/errors') +const { Blob } = require('buffer') const util = require('../core/util') const { AsyncResource } = require('async_hooks') const { addSignal, removeSignal } = require('./abort-signal') +const EE = require('events') const kAbort = Symbol('abort') +const kResume = Symbol('resume') +const kDestroy = Symbol('destroy') +const kPush = Symbol('push') +const kBody = Symbol('body') +const kReadableDidRead = Symbol('readableDidRead') -class RequestResponse extends Readable { +class RequestBody extends Readable { constructor (resume, abort) { - super({ autoDestroy: true, read: resume }) + super({ autoDestroy: true, read: resume, writable: false }) this[kAbort] = abort + this[kReadableDidRead] = false + + if (typeof this.readableDidRead !== 'boolean') { + EE.prototype.once.call(this, 'data', function () { + this[kReadableDidRead] = true + }) + } } _destroy (err, callback) { @@ -30,6 +44,92 @@ class RequestResponse extends Readable { } } +class Body { + constructor (resume, abort) { + this[kAbort] = abort + this[kResume] = resume + this[kBody] = new RequestBody(this[kResume], this[kAbort]).on('error', () => {}) + } + + [kPush] (chunk) { + return this[kBody].push(chunk) + } + + [kDestroy] (err) { + this[kBody].destroy(err) + } + + get stream () { + if (this.bodyUsed) { + throw new TypeError('disturbed') + } + return this[kBody] + } + + get bodyUsed () { + return this[kBody].readableDidRead || this[kBody][kReadableDidRead] + } + + get body () { + if (!this[kBody].toWeb) { + throw new TypeError('not supported') + } + return this[kBody].toWeb() + } + + async blob () { + if (!Blob) { + throw new TypeError('not supported') + } + + // TODO: Optimize. + const sources = [] + for await (const chunk of this.stream) { + // TOOD: max size? + sources.push(chunk) + } + return new Blob(sources) + } + + async buffer () { + // TODO: Optimize. + const sources = [] + for await (const chunk of this.stream) { + // TOOD: max size? + sources.push(chunk) + } + return Buffer.concat(sources) + } + + async arrayBuffer () { + // TODO: Optimize. + const blob = await this.blob() + return await blob.arrayBuffer() + } + + [Symbol.asyncIterator] () { + // TODO: Optimize. + return this.stream[Symbol.asyncIterator]() + } + + async text () { + // TODO: Optimize. + // TODO: Validate content-type req & res headers? + let ret = '' + for await (const chunk of this.stream) { + // TOOD: max size? + ret += chunk + } + return ret + } + + async json () { + // TODO: Optimize. + // TODO: Validate content-type req & res headers? + return JSON.parse(await this.text()) + } +} + class RequestHandler extends AsyncResource { constructor (opts, callback) { if (!opts || typeof opts !== 'object') { @@ -92,7 +192,7 @@ class RequestHandler extends AsyncResource { return } - const body = new RequestResponse(resume, abort) + const body = new Body(resume, abort) this.callback = null this.res = body @@ -109,7 +209,7 @@ class RequestHandler extends AsyncResource { onData (chunk) { const { res } = this - return res.push(chunk) + return res[kPush](chunk) } onComplete (trailers) { @@ -119,7 +219,7 @@ class RequestHandler extends AsyncResource { util.parseHeaders(trailers, this.trailers) - res.push(null) + res[kPush](null) } onError (err) { @@ -139,13 +239,13 @@ class RequestHandler extends AsyncResource { this.res = null // Ensure all queued handlers are invoked before destroying res. queueMicrotask(() => { - util.destroy(res, err) + res[kDestroy](err) }) } if (body) { this.body = null - util.destroy(body, err) + util.destroy(this.body, err) } } } diff --git a/lib/client.js b/lib/client.js index 4dbe270da76..512273f5336 100644 --- a/lib/client.js +++ b/lib/client.js @@ -245,12 +245,12 @@ class Client extends Dispatcher { dispatch (opts, handler) { if (!handler || typeof handler !== 'object') { - throw new InvalidArgumentError('handler') + throw new InvalidArgumentError('handler must be an object') } try { if (!opts || typeof opts !== 'object') { - throw new InvalidArgumentError('opts must be a object.') + throw new InvalidArgumentError('opts must be an object.') } if (this[kDestroyed]) { @@ -1093,7 +1093,7 @@ function connect (client) { let { host, hostname, protocol, port } = client[kUrl] // Resolve ipv6 - if (hostname.startsWith('[')) { + if (hostname[0] === '[') { const idx = hostname.indexOf(']') assert(idx !== -1) diff --git a/lib/core/connect.js b/lib/core/connect.js index 7210b7e5519..d000dbada5e 100644 --- a/lib/core/connect.js +++ b/lib/core/connect.js @@ -79,15 +79,17 @@ class Connector { socket .setNoDelay(true) .once(protocol === 'https:' ? 'secureConnect' : 'connect', function () { - if (callback) { - clearTimeout(timeout) + clearTimeout(timeout) + if (callback) { const cb = callback callback = null cb(null, this) } }) .on('error', function (err) { + clearTimeout(timeout) + if (callback) { const cb = callback callback = null diff --git a/lib/core/request.js b/lib/core/request.js index 725ce376851..296f9a37ebc 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -97,35 +97,7 @@ class Request { throw new InvalidArgumentError('headers must be an object or an array') } - if (typeof handler.onConnect !== 'function') { - throw new InvalidArgumentError('invalid onConnect method') - } - - if (typeof handler.onError !== 'function') { - throw new InvalidArgumentError('invalid onError method') - } - - if (typeof handler.onBodySent !== 'function' && handler.onBodySent !== undefined) { - throw new InvalidArgumentError('invalid onBodySent method') - } - - if (this.upgrade || this.method === 'CONNECT') { - if (typeof handler.onUpgrade !== 'function') { - throw new InvalidArgumentError('invalid onUpgrade method') - } - } else { - if (typeof handler.onHeaders !== 'function') { - throw new InvalidArgumentError('invalid onHeaders method') - } - - if (typeof handler.onData !== 'function') { - throw new InvalidArgumentError('invalid onData method') - } - - if (typeof handler.onComplete !== 'function') { - throw new InvalidArgumentError('invalid onComplete method') - } - } + util.validateHandler(handler, method, upgrade) this.servername = util.getServerName(this.host) diff --git a/lib/core/util.js b/lib/core/util.js index 4882148fd6c..0024f1724b0 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -58,7 +58,7 @@ function parseURL (url) { if (!(url instanceof URL)) { const port = url.port != null ? url.port - : { 'http:': 80, 'https:': 443 }[url.protocol] + : (url.protocol === 'https:' ? 443 : 80) const origin = url.origin != null ? url.origin : `${url.protocol}//${url.hostname}:${port}` @@ -75,7 +75,7 @@ function parseURL (url) { function parseOrigin (url) { url = parseURL(url) - if (/\/.+/.test(url.pathname) || url.search || url.hash) { + if (url.pathname !== '/' || url.search || url.hash) { throw new InvalidArgumentError('invalid url') } @@ -91,7 +91,7 @@ function getServerName (host) { let servername = host - if (servername.startsWith('[')) { + if (servername[0] === '[') { const idx = servername.indexOf(']') assert(idx !== -1) @@ -188,6 +188,42 @@ function isBuffer (buffer) { return buffer instanceof Uint8Array || Buffer.isBuffer(buffer) } +function validateHandler (handler, method, upgrade) { + if (!handler || typeof handler !== 'object') { + throw new InvalidArgumentError('handler must be an object') + } + + if (typeof handler.onConnect !== 'function') { + throw new InvalidArgumentError('invalid onConnect method') + } + + if (typeof handler.onError !== 'function') { + throw new InvalidArgumentError('invalid onError method') + } + + if (typeof handler.onBodySent !== 'function' && handler.onBodySent !== undefined) { + throw new InvalidArgumentError('invalid onBodySent method') + } + + if (upgrade || method === 'CONNECT') { + if (typeof handler.onUpgrade !== 'function') { + throw new InvalidArgumentError('invalid onUpgrade method') + } + } else { + if (typeof handler.onHeaders !== 'function') { + throw new InvalidArgumentError('invalid onHeaders method') + } + + if (typeof handler.onData !== 'function') { + throw new InvalidArgumentError('invalid onData method') + } + + if (typeof handler.onComplete !== 'function') { + throw new InvalidArgumentError('invalid onComplete method') + } + } +} + module.exports = { nop, parseOrigin, @@ -203,5 +239,6 @@ module.exports = { destroy, bodyLength, deepClone, - isBuffer + isBuffer, + validateHandler } diff --git a/lib/handler/redirect.js b/lib/handler/redirect.js index 2d21d52ac94..a946357a753 100644 --- a/lib/handler/redirect.js +++ b/lib/handler/redirect.js @@ -12,6 +12,8 @@ class RedirectHandler { throw new InvalidArgumentError('maxRedirections must be a positive number') } + util.validateHandler(handler, opts.method, opts.upgrade) + this.dispatcher = dispatcher this.location = null this.abort = null diff --git a/lib/pool.js b/lib/pool.js index b3804519150..1f5ec4319e0 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -162,7 +162,7 @@ class Pool extends Dispatcher { dispatch (opts, handler) { if (!handler || typeof handler !== 'object') { - throw new InvalidArgumentError('handler') + throw new InvalidArgumentError('handler must be an object') } try { diff --git a/package.json b/package.json index bfb0415cb5c..21f6f7dfd67 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "undici", - "version": "4.2.1", + "version": "4.2.2", "description": "An HTTP/1.1 client, written from scratch for Node.js", "homepage": "https://undici.nodejs.org", "bugs": { diff --git a/test/abort-controller.js b/test/abort-controller.js index 4658686657a..33cebce958c 100644 --- a/test/abort-controller.js +++ b/test/abort-controller.js @@ -61,10 +61,11 @@ for (const { AbortControllerImpl, controllerName } of controllers) { client.request({ path: '/', method: 'GET' }, (err, response) => { t.error(err) const bufs = [] - response.body.on('data', (buf) => { + const stream = response.body.stream + stream.on('data', (buf) => { bufs.push(buf) }) - response.body.on('end', () => { + stream.on('end', () => { t.equal('hello', Buffer.concat(bufs).toString('utf8')) }) }) @@ -136,10 +137,11 @@ for (const { AbortControllerImpl, controllerName } of controllers) { client.request({ path: '/', method: 'GET', signal: abortController.signal }, (err, response) => { t.error(err) - response.body.on('data', () => { + const stream = response.body.stream + stream.on('data', () => { abortController.abort() }) - response.body.on('error', err => { + stream.on('error', err => { t.type(err, errors.RequestAbortedError) }) }) diff --git a/test/agent.js b/test/agent.js index 1a1041c30a1..78a3f6daff0 100644 --- a/test/agent.js +++ b/test/agent.js @@ -280,10 +280,11 @@ test('with globalAgent', t => { t.equal(statusCode, 200) t.equal(headers['content-type'], 'text/plain') const bufs = [] - body.on('data', (buf) => { + const stream = body.stream + stream.on('data', (buf) => { bufs.push(buf) }) - body.on('end', () => { + stream.on('end', () => { t.equal(wanted, Buffer.concat(bufs).toString('utf8')) }) }) @@ -315,10 +316,12 @@ test('with local agent', t => { t.equal(statusCode, 200) t.equal(headers['content-type'], 'text/plain') const bufs = [] - body.on('data', (buf) => { + + const stream = body.stream + stream.on('data', (buf) => { bufs.push(buf) }) - body.on('end', () => { + stream.on('end', () => { t.equal(wanted, Buffer.concat(bufs).toString('utf8')) }) }) @@ -536,17 +539,36 @@ test('dispatch validations', t => { const dispatcher = new Agent() const noopHandler = { + onConnect () {}, + onHeaders () {}, + onData () {}, + onComplete () { + server.close() + }, onError (err) { throw err } } - t.plan(5) + const server = http.createServer((req, res) => { + res.setHeader('Content-Type', 'text/plain') + res.end('asd') + }) + + t.plan(6) t.throws(() => dispatcher.dispatch('ASD'), InvalidArgumentError, 'throws on missing handler') t.throws(() => dispatcher.dispatch('ASD', noopHandler), InvalidArgumentError, 'throws on invalid opts argument type') t.throws(() => dispatcher.dispatch({}, noopHandler), InvalidArgumentError, 'throws on invalid opts.origin argument') t.throws(() => dispatcher.dispatch({ origin: '' }, noopHandler), InvalidArgumentError, 'throws on invalid opts.origin argument') t.throws(() => dispatcher.dispatch({}, {}), InvalidArgumentError, 'throws on invalid handler.onError') + + server.listen(0, () => { + t.doesNotThrow(() => dispatcher.dispatch({ + origin: new URL(`http://localhost:${server.address().port}`), + path: '/', + method: 'GET' + }, noopHandler)) + }) }) test('drain', t => { diff --git a/test/async_hooks.js b/test/async_hooks.js index 8a77af35d72..92f8e5a677d 100644 --- a/test/async_hooks.js +++ b/test/async_hooks.js @@ -57,7 +57,8 @@ test('async hooks', (t) => { client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { t.error(err) - body.resume() + const stream = body.stream + stream.resume() t.strictSame(getCurrentTransaction(), null) setCurrentTransaction({ hello: 'world2' }) @@ -65,13 +66,13 @@ test('async hooks', (t) => { client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { t.error(err) t.strictSame(getCurrentTransaction(), { hello: 'world2' }) - - body.once('data', () => { + const stream = body.stream + stream.once('data', () => { t.pass() - body.resume() + stream.resume() }) - body.on('end', () => { + stream.on('end', () => { t.pass() }) }) @@ -79,7 +80,8 @@ test('async hooks', (t) => { client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { t.error(err) - body.resume() + const stream = body.stream + stream.resume() t.strictSame(getCurrentTransaction(), null) setCurrentTransaction({ hello: 'world' }) @@ -88,12 +90,13 @@ test('async hooks', (t) => { t.error(err) t.strictSame(getCurrentTransaction(), { hello: 'world' }) - body.once('data', () => { + const stream = body.stream + stream.once('data', () => { t.pass() - body.resume() + stream.resume() }) - body.on('end', () => { + stream.on('end', () => { t.pass() }) }) @@ -101,7 +104,8 @@ test('async hooks', (t) => { client.request({ path: '/', method: 'HEAD' }, (err, { statusCode, headers, body }) => { t.error(err) - body.resume() + const stream = body.stream + stream.resume() t.strictSame(getCurrentTransaction(), null) setCurrentTransaction({ hello: 'world' }) @@ -110,12 +114,13 @@ test('async hooks', (t) => { t.error(err) t.strictSame(getCurrentTransaction(), { hello: 'world' }) - body.once('data', () => { + const stream = body.stream + stream.once('data', () => { t.pass() - body.resume() + stream.resume() }) - body.on('end', () => { + stream.on('end', () => { t.pass() }) }) @@ -159,8 +164,9 @@ test('async hooks client is destroyed', (t) => { client.request({ path: '/', method: 'GET' }, (err, { body }) => { t.error(err) - body.resume() - body.on('error', (err) => { + const stream = body.stream + stream.resume() + stream.on('error', (err) => { t.ok(err) }) t.strictSame(getCurrentTransaction(), null) diff --git a/test/client-abort.js b/test/client-abort.js index 5854bc2a8b6..ae464ea1767 100644 --- a/test/client-abort.js +++ b/test/client-abort.js @@ -24,7 +24,8 @@ test('aborted response errors', (t) => { client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { t.error(err) - body.destroy() + const stream = body.stream + stream.destroy() body .on('error', err => { t.type(err, errors.RequestAbortedError) diff --git a/test/client-dispatch.js b/test/client-dispatch.js index f765a816130..181b2fbd81d 100644 --- a/test/client-dispatch.js +++ b/test/client-dispatch.js @@ -18,7 +18,7 @@ test('dispatch invalid opts', (t) => { }, null) } catch (err) { t.type(err, errors.InvalidArgumentError) - t.equal(err.message, 'handler') + t.equal(err.message, 'handler must be an object') } try { @@ -29,7 +29,7 @@ test('dispatch invalid opts', (t) => { }, 'asd') } catch (err) { t.type(err, errors.InvalidArgumentError) - t.equal(err.message, 'handler') + t.equal(err.message, 'handler must be an object') } client.dispatch({ diff --git a/test/client-errors.js b/test/client-errors.js index 34d8f4e0794..56697213208 100644 --- a/test/client-errors.js +++ b/test/client-errors.js @@ -45,10 +45,11 @@ test('GET errors and reconnect with pipelining 1', (t) => { t.equal(statusCode, 200) t.equal(headers['content-type'], 'text/plain') const bufs = [] - body.on('data', (buf) => { + const stream = body.stream + stream.on('data', (buf) => { bufs.push(buf) }) - body.on('end', () => { + stream.on('end', () => { t.equal('hello', Buffer.concat(bufs).toString('utf8')) }) }) @@ -100,10 +101,11 @@ test('GET errors and reconnect with pipelining 3', (t) => { t.equal(statusCode, 200) t.equal(headers['content-type'], 'text/plain') const bufs = [] - body.on('data', (buf) => { + const stream = body.stream + stream.on('data', (buf) => { bufs.push(buf) }) - body.on('end', () => { + stream.on('end', () => { t.equal('hello', Buffer.concat(bufs).toString('utf8')) }) }) @@ -169,10 +171,11 @@ function errorAndPipelining (type) { t.equal(statusCode, 200) t.equal(headers['content-type'], 'text/plain') const bufs = [] - body.on('data', (buf) => { + const stream = body.stream + stream.on('data', (buf) => { bufs.push(buf) }) - body.on('end', () => { + stream.on('end', () => { t.equal('hello', Buffer.concat(bufs).toString('utf8')) }) }) @@ -238,10 +241,11 @@ function errorAndChunkedEncodingPipelining (type) { t.equal(statusCode, 200) t.equal(headers['content-type'], 'text/plain') const bufs = [] - body.on('data', (buf) => { + const stream = body.stream + stream.on('data', (buf) => { bufs.push(buf) }) - body.on('end', () => { + stream.on('end', () => { t.equal('hello', Buffer.concat(bufs).toString('utf8')) }) }) @@ -704,8 +708,9 @@ test('GET errors body', (t) => { client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { t.error(err) - body.resume() - body.on('error', err => ( + const stream = body.stream + stream.resume() + stream.on('error', err => ( t.ok(err) )) }) @@ -754,7 +759,7 @@ test('validate request body', (t) => { body: '' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) client.request({ @@ -763,7 +768,7 @@ test('validate request body', (t) => { body: new Uint8Array() }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) client.request({ @@ -772,7 +777,7 @@ test('validate request body', (t) => { body: Buffer.alloc(10) }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) }) }) @@ -903,7 +908,7 @@ test('queued request should not fail on socket destroy', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume().on('error', () => { + data.body.stream.resume().on('error', () => { t.pass() }) client[kSocket].destroy() @@ -912,7 +917,7 @@ test('queued request should not fail on socket destroy', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume().on('end', () => { + data.body.stream.resume().on('end', () => { t.pass() }) }) @@ -941,7 +946,7 @@ test('queued request should fail on client destroy', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() .on('error', () => { t.pass() }) @@ -993,14 +998,14 @@ test('retry idempotent inflight', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) client.request({ path: '/', method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) }) }) diff --git a/test/client-pipeline.js b/test/client-pipeline.js index 3633312f984..6ca8412676f 100644 --- a/test/client-pipeline.js +++ b/test/client-pipeline.js @@ -497,7 +497,7 @@ test('pipeline abort duplex', (t) => { method: 'PUT' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() client.pipeline({ path: '/', diff --git a/test/client-pipelining.js b/test/client-pipelining.js index 0bb5404012c..ff684770d75 100644 --- a/test/client-pipelining.js +++ b/test/client-pipelining.js @@ -149,16 +149,17 @@ test('pipeline 1 is 1 active request', (t) => { method: 'GET' }, (err, data) => { t.error(err) - finished(data.body, (err) => { + const stream = data.body.stream + finished(stream, (err) => { t.ok(err) client.close((err) => { t.error(err) }) }) - data.body.destroy() + stream.destroy() res2.end() })) - data.body.resume() + data.body.stream.resume() res2.end() }) t.ok(client[kSize] <= client.pipelining) @@ -194,7 +195,7 @@ test('pipelined chunked POST stream', (t) => { path: '/', method: 'GET' }, (err, { body }) => { - body.resume() + body.stream.resume() t.error(err) }) @@ -207,7 +208,7 @@ test('pipelined chunked POST stream', (t) => { } }) }, (err, { body }) => { - body.resume() + body.stream.resume() t.error(err) }) @@ -215,7 +216,7 @@ test('pipelined chunked POST stream', (t) => { path: '/', method: 'GET' }, (err, { body }) => { - body.resume() + body.stream.resume() t.error(err) }) @@ -228,7 +229,7 @@ test('pipelined chunked POST stream', (t) => { } }) }, (err, { body }) => { - body.resume() + body.stream.resume() t.error(err) }) }) @@ -261,7 +262,7 @@ test('pipelined chunked POST iterator', (t) => { path: '/', method: 'GET' }, (err, { body }) => { - body.resume() + body.stream.resume() t.error(err) }) @@ -274,7 +275,7 @@ test('pipelined chunked POST iterator', (t) => { } })() }, (err, { body }) => { - body.resume() + body.stream.resume() t.error(err) }) @@ -282,7 +283,7 @@ test('pipelined chunked POST iterator', (t) => { path: '/', method: 'GET' }, (err, { body }) => { - body.resume() + body.stream.resume() t.error(err) }) @@ -295,7 +296,7 @@ test('pipelined chunked POST iterator', (t) => { } })() }, (err, { body }) => { - body.resume() + body.stream.resume() t.error(err) }) }) @@ -398,7 +399,7 @@ test('pipelining non-idempotent', (t) => { }, (err, data) => { t.error(err) t.equal(ended, true) - data.body.resume() + data.body.stream.resume() }) }) }) @@ -455,7 +456,7 @@ function pipeliningNonIdempotentWithBody (bodyType) { }, (err, data) => { t.error(err) t.equal(ended, true) - data.body.resume() + data.body.stream.resume() }) }) }) diff --git a/test/client-reconnect.js b/test/client-reconnect.js index 6abb8e403f3..29d2808d067 100644 --- a/test/client-reconnect.js +++ b/test/client-reconnect.js @@ -29,6 +29,7 @@ test('multiple reconnect', (t) => { client.request({ path: '/', method: 'GET' }, (err, data) => { t.error(err) data.body + .stream .resume() .on('end', () => { t.pass() diff --git a/test/client-request.js b/test/client-request.js index 17a1cb877a2..011e60da8b9 100644 --- a/test/client-request.js +++ b/test/client-request.js @@ -93,6 +93,7 @@ test('trailers', (t) => { }) body + .stream .on('data', () => t.fail()) .on('end', () => { t.strictSame(trailers, { 'content-md5': 'test' }) diff --git a/test/client-unref.js b/test/client-unref.js index 1c2413853d1..c7e3a5db734 100644 --- a/test/client-unref.js +++ b/test/client-unref.js @@ -23,14 +23,22 @@ if (isMainThread) { }) }) }) + + tap.test('client automatically closes itself if the server is not there', t => { + t.plan(1) + + const url = 'http://localhost:4242' // hopefully empty port + const worker = new Worker(__filename, { workerData: { url } }) + worker.on('exit', code => { + t.equal(code, 0) + }) + }) } else { const { Client } = require('..') const client = new Client(workerData.url) - client.request({ path: '/', method: 'GET' }, (err, res) => { - if (err) { - throw err - } + client.request({ path: '/', method: 'GET' }, () => { + // We do not care about Errors setTimeout(() => { throw new Error() diff --git a/test/client.js b/test/client.js index b7dfc03b1d0..4026430e425 100644 --- a/test/client.js +++ b/test/client.js @@ -778,7 +778,7 @@ test('ignore request header mutations', (t) => { headers }, (err, { body }) => { t.error(err) - body.resume() + body.stream.resume() }) headers.test = 'asd' }) @@ -802,7 +802,7 @@ test('url-like url', (t) => { client.request({ path: '/', method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) }) }) @@ -828,7 +828,7 @@ test('an absolute url as path', (t) => { client.request({ path, method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) }) }) @@ -885,14 +885,14 @@ test('only one streaming req at a time', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() client.request({ path: '/', method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) client.request({ @@ -941,14 +941,14 @@ test('only one async iterating req at a time', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() client.request({ path: '/', method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) const body = wrapWithAsyncIterable(new Readable({ read () { @@ -1317,7 +1317,7 @@ test('parser pause with no body timeout', (t) => { client.request({ path: '/', method: 'GET' }, (err, { statusCode, body }) => { t.error(err) t.equal(statusCode, 200) - body.resume() + body.stream.resume() }) }) }) @@ -1340,7 +1340,7 @@ test('TypedArray and DataView body', (t) => { client.request({ path: '/', method: 'POST', body }, (err, { statusCode, body }) => { t.error(err) t.equal(statusCode, 200) - body.resume() + body.stream.resume() }) }) }) @@ -1376,7 +1376,7 @@ test('async iterator empty chunk continues', (t) => { client.request({ path: '/', method: 'POST', body }, (err, { statusCode, body }) => { t.error(err) t.equal(statusCode, 200) - body.resume() + body.stream.resume() }) }) }) diff --git a/test/close-and-destroy.js b/test/close-and-destroy.js index c8e22e0788c..e353e2ab800 100644 --- a/test/close-and-destroy.js +++ b/test/close-and-destroy.js @@ -171,7 +171,7 @@ test('close should still reconnect', (t) => { function makeRequest () { client.request({ path: '/', method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) return client[kSize] <= client.pipelining } @@ -203,7 +203,7 @@ test('close should call callback once finished', (t) => { function makeRequest () { client.request({ path: '/', method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) return client[kSize] <= client.pipelining } diff --git a/test/gc.js b/test/gc.js index c1ceecf88ab..429f715c4ea 100644 --- a/test/gc.js +++ b/test/gc.js @@ -48,7 +48,7 @@ test('gc should collect the client if, and only if, there are no active sockets' method: 'GET' }, (err, { body }) => { t.error(err) - body.resume() + body.stream.resume() }) }) }) @@ -92,7 +92,7 @@ test('gc should collect the pool if, and only if, there are no active sockets', method: 'GET' }, (err, { body }) => { t.error(err) - body.resume() + body.stream.resume() }) }) }) diff --git a/test/get-head-body.js b/test/get-head-body.js index 3e86b13bfd3..b46cefa5013 100644 --- a/test/get-head-body.js +++ b/test/get-head-body.js @@ -30,7 +30,7 @@ test('GET and HEAD with body should reset connection', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) const emptyBody = new Readable({ @@ -43,7 +43,7 @@ test('GET and HEAD with body should reset connection', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) client.request({ @@ -56,7 +56,7 @@ test('GET and HEAD with body should reset connection', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) client.request({ @@ -70,7 +70,7 @@ test('GET and HEAD with body should reset connection', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) client.request({ @@ -79,7 +79,7 @@ test('GET and HEAD with body should reset connection', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) client.request({ @@ -92,7 +92,7 @@ test('GET and HEAD with body should reset connection', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) client.request({ @@ -106,7 +106,7 @@ test('GET and HEAD with body should reset connection', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) }) }) @@ -153,7 +153,7 @@ test('HEAD should reset connection', (t) => { method: 'HEAD' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() }) t.equal(client[kBusy], true) @@ -162,7 +162,7 @@ test('HEAD should reset connection', (t) => { method: 'HEAD' }, (err, data) => { t.error(err) - data.body.resume() + data.body.stream.resume() client.once('disconnect', () => { client[kConnect](() => { client.request({ @@ -170,10 +170,11 @@ test('HEAD should reset connection', (t) => { method: 'HEAD' }, (err, data) => { t.error(err) - data.body.resume() - data.body.on('end', () => { - t.pass() - }) + data.body.stream + .resume() + .on('end', () => { + t.pass() + }) }) t.equal(client[kBusy], true) }) diff --git a/test/http-req-destroy.js b/test/http-req-destroy.js index 29ec98e1a9c..e08f3b569ce 100644 --- a/test/http-req-destroy.js +++ b/test/http-req-destroy.js @@ -19,8 +19,9 @@ function doNotKillReqSocket (bodyType) { body: req }, (err, response) => { t.error(err) + const stream = response.body.stream setTimeout(() => { - response.body.on('data', buf => { + stream.on('data', buf => { res.write(buf) setTimeout(() => { res.end() @@ -51,11 +52,12 @@ function doNotKillReqSocket (bodyType) { }, (err, response) => { t.error(err) const bufs = [] - response.body.on('data', (buf) => { + const stream = response.body.stream + stream.on('data', (buf) => { bufs.push(buf) r.push(null) }) - response.body.on('end', () => { + stream.on('end', () => { t.equal('hello', Buffer.concat(bufs).toString('utf8')) }) }) diff --git a/test/https.js b/test/https.js index 17a5d23dd28..a4415eb3448 100644 --- a/test/https.js +++ b/test/https.js @@ -29,10 +29,11 @@ test('https get with tls opts', (t) => { t.equal(statusCode, 200) t.equal(headers['content-type'], 'text/plain') const bufs = [] - body.on('data', (buf) => { + const stream = body.stream + stream.on('data', (buf) => { bufs.push(buf) }) - body.on('end', () => { + stream.on('end', () => { t.equal('hello', Buffer.concat(bufs).toString('utf8')) }) }) diff --git a/test/issue-803.js b/test/issue-803.js index 70f64cce773..2fc7dbaff1c 100644 --- a/test/issue-803.js +++ b/test/issue-803.js @@ -36,10 +36,11 @@ test('https://github.com/nodejs/undici/issues/803', (t) => { t.error(err) let pos = 0 - data.body.on('data', (buf) => { + const stream = data.body.stream + stream.on('data', (buf) => { pos += buf.length }) - data.body.on('end', () => { + stream.on('end', () => { t.equal(pos, SIZE) }) }) diff --git a/test/issue-810.js b/test/issue-810.js index 9a8914272a2..4465177ece5 100644 --- a/test/issue-810.js +++ b/test/issue-810.js @@ -30,7 +30,8 @@ test('https://github.com/mcollina/undici/issues/810', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume().on('end', () => { + const stream = data.body.stream + stream.resume().on('end', () => { t.fail() }).on('error', err => ( t.type(err, errors.HTTPParserError) @@ -64,7 +65,8 @@ test('https://github.com/mcollina/undici/issues/810 no pipelining', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume().on('end', () => { + const stream = data.body.stream + stream.resume().on('end', () => { t.fail() }).on('error', err => { t.equal(err.code, 'HPE_CB_MESSAGE_BEGIN') @@ -93,7 +95,7 @@ test('https://github.com/mcollina/undici/issues/810 pipelining', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume().on('end', () => { + data.body.stream.resume().on('end', () => { t.fail() }).on('error', err => { t.equal(err.code, 'HPE_CB_MESSAGE_BEGIN') @@ -122,7 +124,7 @@ test('https://github.com/mcollina/undici/issues/810 pipelining 2', (t) => { method: 'GET' }, (err, data) => { t.error(err) - data.body.resume().on('end', () => { + data.body.stream.resume().on('end', () => { t.fail() }).on('error', err => { t.equal(err.code, 'HPE_INVALID_CONSTANT') diff --git a/test/mock-agent.js b/test/mock-agent.js index aaf5e2a23b4..c04daba5508 100644 --- a/test/mock-agent.js +++ b/test/mock-agent.js @@ -92,7 +92,7 @@ test('MockAgent - get', t => { }) test('MockAgent - dispatch', t => { - t.plan(2) + t.plan(3) t.test('should call the dispatch method of the MockPool', (t) => { t.plan(1) @@ -116,7 +116,8 @@ test('MockAgent - dispatch', t => { }, { onHeaders: (_statusCode, _headers, resume) => resume(), onData: () => {}, - onComplete: () => {} + onComplete: () => {}, + onError: () => {} })) }) @@ -142,9 +143,93 @@ test('MockAgent - dispatch', t => { }, { onHeaders: (_statusCode, _headers, resume) => resume(), onData: () => {}, - onComplete: () => {} + onComplete: () => {}, + onError: () => {} })) }) + + t.test('should throw if handler is not valid on redirect', (t) => { + t.plan(7) + + const baseUrl = 'http://localhost:9999' + + const mockAgent = new MockAgent() + t.teardown(mockAgent.close.bind(mockAgent)) + + t.throws(() => mockAgent.dispatch({ + origin: baseUrl, + path: '/foo', + method: 'GET' + }, { + onError: 'INVALID' + }), new InvalidArgumentError('invalid onError method')) + + t.throws(() => mockAgent.dispatch({ + origin: baseUrl, + path: '/foo', + method: 'GET' + }, { + onError: (err) => { throw err }, + onConnect: 'INVALID' + }), new InvalidArgumentError('invalid onConnect method')) + + t.throws(() => mockAgent.dispatch({ + origin: baseUrl, + path: '/foo', + method: 'GET' + }, { + onError: (err) => { throw err }, + onConnect: () => {}, + onBodySent: 'INVALID' + }), new InvalidArgumentError('invalid onBodySent method')) + + t.throws(() => mockAgent.dispatch({ + origin: baseUrl, + path: '/foo', + method: 'CONNECT' + }, { + onError: (err) => { throw err }, + onConnect: () => {}, + onBodySent: () => {}, + onUpgrade: 'INVALID' + }), new InvalidArgumentError('invalid onUpgrade method')) + + t.throws(() => mockAgent.dispatch({ + origin: baseUrl, + path: '/foo', + method: 'GET' + }, { + onError: (err) => { throw err }, + onConnect: () => {}, + onBodySent: () => {}, + onHeaders: 'INVALID' + }), new InvalidArgumentError('invalid onHeaders method')) + + t.throws(() => mockAgent.dispatch({ + origin: baseUrl, + path: '/foo', + method: 'GET' + }, { + onError: (err) => { throw err }, + onConnect: () => {}, + onBodySent: () => {}, + onHeaders: () => {}, + onData: 'INVALID' + }), new InvalidArgumentError('invalid onData method')) + + t.throws(() => mockAgent.dispatch({ + origin: baseUrl, + path: '/foo', + method: 'GET' + }, { + onError: (err) => { throw err }, + onConnect: () => {}, + onBodySent: () => {}, + onHeaders: () => {}, + onData: () => {}, + onComplete: 'INVALID' + }), new InvalidArgumentError('invalid onComplete method')) + }) }) test('MockAgent - .close should clean up registered pools', async (t) => { diff --git a/test/pool.js b/test/pool.js index 0c6099549b9..2451d67f467 100644 --- a/test/pool.js +++ b/test/pool.js @@ -49,7 +49,7 @@ test('connect/disconnect event(s)', (t) => { method: 'GET' }, (err, { headers, body }) => { t.error(err) - body.resume() + body.stream.resume() }) } }) @@ -185,7 +185,7 @@ test('basic get with async/await', async (t) => { t.equal(statusCode, 200) t.equal(headers['content-type'], 'text/plain') - body.resume() + body.stream.resume() await promisify(eos)(body) await client.close() diff --git a/test/socket-back-pressure.js b/test/socket-back-pressure.js index 9e774b3de6a..c16171c38e0 100644 --- a/test/socket-back-pressure.js +++ b/test/socket-back-pressure.js @@ -43,7 +43,7 @@ test('socket back-pressure', (t) => { setTimeout(() => { t.ok(data.body._readableState.length < bytesWritten - data.body._readableState.highWaterMark) src.push(null) - data.body.resume() + data.body.stream.resume() }, 1e3) }) .on('end', () => { diff --git a/test/tls-session-reuse.js b/test/tls-session-reuse.js index 161d48a360d..c919069cc12 100644 --- a/test/tls-session-reuse.js +++ b/test/tls-session-reuse.js @@ -70,7 +70,7 @@ test('A client should disable session caching', { client.request(options, (err, data) => { t.error(err) clientSessions[options.name] = client[kSocket].getSession() - data.body.resume().on('end', () => { + data.body.stream.resume().on('end', () => { if (queue.length !== 0) { return request() } @@ -156,7 +156,7 @@ test('A pool should be able to reuse TLS sessions between clients', { path: '/' }, (err, data) => { if (err) return reject(err) - data.body.resume().on('end', resolve) + data.body.stream.resume().on('end', resolve) }) }) }