diff --git a/package.json b/package.json index 6b00ea6..5175de7 100644 --- a/package.json +++ b/package.json @@ -7,9 +7,8 @@ "node": ">=8.16.0" }, "scripts": { - "test": "echo XO $(xo --version) && echo AVA $(ava --version) && xo && nyc ava", - "coveralls": "nyc report --reporter=text-lcov | coveralls", - "report": "npm test && nyc report --reporter=html" + "test": "echo XO $(xo --version) && echo AVA $(ava --version) && xo && nyc --reporter=html --reporter=text ava", + "coveralls": "nyc report --reporter=text-lcov | coveralls" }, "files": [ "source" @@ -36,12 +35,13 @@ }, "devDependencies": { "@sindresorhus/is": "^1.0.0", - "ava": "^2.2.0", + "ava": "^2.4.0", "benchmark": "^2.1.4", "coveralls": "^3.0.5", "create-cert": "^1.0.6", "get-stream": "^5.1.0", "got": "^9.6.0", + "lolex": "^4.2.0", "many-keys-map": "^1.0.2", "nyc": "^14.1.1", "p-event": "^4.1.0", diff --git a/source/agent.js b/source/agent.js index cfa08ae..c6c7ba0 100644 --- a/source/agent.js +++ b/source/agent.js @@ -61,10 +61,18 @@ const removeSession = (where, name, session) => { return false; }; +const addSession = (where, name, session) => { + if (Reflect.has(where, name)) { + where[name].push(session); + } else { + where[name] = [session]; + } +}; + const getSessions = (where, name, normalizedAuthority) => { if (Reflect.has(where, name)) { return where[name].filter(session => { - return session.originSet.includes(normalizedAuthority); + return !session.closed && !session.destroyed && session.originSet.includes(normalizedAuthority); }); } @@ -98,7 +106,7 @@ class Agent extends EventEmitter { this.timeout = timeout; this.maxSessions = maxSessions; - this.maxFreeSessions = maxFreeSessions; + this.maxFreeSessions = maxFreeSessions; // TODO: decreasing `maxFreeSessions` should close some sessions this.settings = { enablePush: false @@ -107,12 +115,12 @@ class Agent extends EventEmitter { this.tlsSessionCache = new QuickLRU({maxSize: maxCachedTlsSessions}); } - normalizeAuthority(authority) { + static normalizeAuthority(authority, servername) { if (typeof authority === 'string') { authority = new URL(authority); } - const host = authority.hostname || authority.host || 'localhost'; + const host = servername || authority.hostname || authority.host || 'localhost'; const port = authority.port || 443; if (port === 443) { @@ -122,12 +130,12 @@ class Agent extends EventEmitter { return `https://${host}:${port}`; } - normalizeOptions(options) { + static normalizeOptions(options) { let normalized = ''; if (options) { for (const key of nameKeys) { - if (Reflect.has(options, key)) { + if (options[key]) { normalized += `:${options[key]}`; } } @@ -151,23 +159,33 @@ class Agent extends EventEmitter { } } - async getSession(authority, options) { + getSession(authority, options, listeners) { return new Promise((resolve, reject) => { - const detached = {resolve, reject}; - const normalizedOptions = this.normalizeOptions(options); - const normalizedAuthority = this.normalizeAuthority(authority); + if (Array.isArray(listeners)) { + listeners = [...listeners]; + + // Resolve ASAP, because we're just moving the listeners. + resolve(); + } else { + listeners = [{resolve, reject}]; + } + + const normalizedOptions = Agent.normalizeOptions(options); + const normalizedAuthority = Agent.normalizeAuthority(authority, options && options.servername); if (Reflect.has(this.freeSessions, normalizedOptions)) { const freeSessions = getSessions(this.freeSessions, normalizedOptions, normalizedAuthority); if (freeSessions.length !== 0) { - resolve(freeSessions.reduce((previousValue, nextValue) => { - if (nextValue[kCurrentStreamsCount] > previousValue[kCurrentStreamsCount]) { - return nextValue; - } + for (const listener of listeners) { + listener.resolve(freeSessions.reduce((previousSession, nextSession) => { + if (nextSession[kCurrentStreamsCount] > previousSession[kCurrentStreamsCount]) { + return nextSession; + } - return previousValue; - })); + return previousSession; + })); + } return; } @@ -175,7 +193,7 @@ class Agent extends EventEmitter { if (Reflect.has(this.queue, normalizedOptions)) { if (Reflect.has(this.queue[normalizedOptions], normalizedAuthority)) { - this.queue[normalizedOptions][normalizedAuthority].listeners.push(detached); + this.queue[normalizedOptions][normalizedAuthority].listeners.push(...listeners); return; } @@ -183,8 +201,6 @@ class Agent extends EventEmitter { this.queue[normalizedOptions] = {}; } - const listeners = [detached]; - const removeFromQueue = () => { // Our entry can be replaced. We cannot remove the new one. if (Reflect.has(this.queue, normalizedOptions) && this.queue[normalizedOptions][normalizedAuthority] === entry) { @@ -212,10 +228,26 @@ class Agent extends EventEmitter { }); session[kCurrentStreamsCount] = 0; - session.socket.once('session', session => { - this.tlsSessionCache.set(name, { - session, - servername + const freeSession = () => { + const freeSessionsLength = getSessions(this.freeSessions, normalizedOptions, normalizedAuthority).length; + + if (freeSessionsLength < this.maxFreeSessions) { + addSession(this.freeSessions, normalizedOptions, session); + + return true; + } + + return false; + }; + + const isFree = () => session[kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams; + + session.socket.once('session', tlsSession => { + setImmediate(() => { + this.tlsSessionCache.set(name, { + session: tlsSession, + servername + }); }); }); @@ -223,7 +255,7 @@ class Agent extends EventEmitter { session.socket.once('secureConnect', () => { servername = session.socket.servername; - if (servername === false && typeof tlsSessionCache !== 'undefined') { + if (servername === false && typeof tlsSessionCache !== 'undefined' && typeof tlsSessionCache.servername !== 'undefined') { session.socket.servername = tlsSessionCache.servername; } }); @@ -239,7 +271,7 @@ class Agent extends EventEmitter { }); session.setTimeout(this.timeout, () => { - // `.close()` gracefully closes the session. Current streams wouldn't be terminated that way. + // Terminates all streams owend by this session. `session.close()` would gracefully close it instead. session.destroy(); }); @@ -253,37 +285,44 @@ class Agent extends EventEmitter { removeFromQueue(); removeSession(this.freeSessions, normalizedOptions, session); - // TODO: this needs tests (session `close` event emitted before its streams were closed) - // See https://travis-ci.org/szmarczak/http2-wrapper/jobs/587629103#L282 - removeSession(this.busySessions, normalizedOptions, session); - + // This is needed. A session can be destroyed, + // so `sessionsCount < maxSessions` and there may be callback awaiting already. this._processQueue(normalizedOptions, normalizedAuthority); }); const checkQueue = () => { - for (const authority in this.queue[normalizedOptions]) { - if (session.originSet.includes(authority)) { - const {listeners} = this.queue[normalizedOptions][authority]; - const movedListeners = listeners.splice(0, session.remoteSettings.maxConcurrentStreams - session[kCurrentStreamsCount]); + if (!Reflect.has(this.queue, normalizedOptions)) { + return; + } - while (movedListeners.length !== 0 && session[kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams) { - movedListeners.shift().resolve(session); + for (const origin of session.originSet) { + if (Reflect.has(this.queue[normalizedOptions], origin)) { + const {listeners} = this.queue[normalizedOptions][origin]; + while (listeners.length !== 0 && isFree()) { + // We assume `resolve(...)` calls `request(...)` *directly*, + // otherwise the session will get overloaded. + listeners.shift().resolve(session); } - if (this.queue[normalizedOptions][authority].length === 0) { - delete this.queue[normalizedOptions][authority]; + if (this.queue[normalizedOptions][origin].length === 0) { + delete this.queue[normalizedOptions][origin]; if (Object.keys(this.queue[normalizedOptions]).length === 0) { delete this.queue[normalizedOptions]; + break; } } } } + + // It isn't possible for the queue to exceed the stream limit of two free sessions. + // The queue will start immediately if there's at least one free session. + // The queue will be cleared. If not, it will wait for another free session. }; - // The Origin Set cannot shrink. No need to check if it suddenly became "uncovered". + // The Origin Set cannot shrink. No need to check if it suddenly became covered by another one. session.once('origin', () => { - if (session[kCurrentStreamsCount] >= session.remoteSettings.maxConcurrentStreams) { + if (!isFree()) { return; } @@ -292,43 +331,24 @@ class Agent extends EventEmitter { checkQueue(); }); - session.once('localSettings', () => { - removeFromQueue(); - - const movedListeners = listeners.splice(session.remoteSettings.maxConcurrentStreams); - - if (movedListeners.length !== 0) { - const freeSessions = getSessions(this.freeSessions, normalizedOptions, normalizedAuthority); - - while (freeSessions.length !== 0 && movedListeners.length !== 0) { - movedListeners.shift().resolve(freeSessions[0]); - - if (freeSessions[0][kCurrentStreamsCount] >= freeSessions[0].remoteSettings.maxConcurrentStreams) { - freeSessions.shift(); - } - } - - if (movedListeners.length !== 0) { - this.getSession(authority, options); - - // Replace listeners with the new ones - const {listeners} = this.queue[normalizedOptions][normalizedAuthority]; - listeners.length = 0; - listeners.push(...movedListeners); - } - } - - if (Reflect.has(this.freeSessions, normalizedOptions)) { - this.freeSessions[normalizedOptions].push(session); + session.once('remoteSettings', () => { + if (freeSession()) { + checkQueue(); + } else if (this.maxFreeSessions === 0) { + checkQueue(); + setImmediate(() => session.close()); } else { - this.freeSessions[normalizedOptions] = [session]; + session.close(); } - for (const listener of listeners) { - listener.resolve(session); + if (listeners.length !== 0) { + // Requests for a new session with predefined listeners + this.getSession(normalizedAuthority, options, listeners); + listeners.length = 0; } receivedSettings = true; + removeFromQueue(); }); session[kRequest] = session.request; @@ -339,35 +359,22 @@ class Agent extends EventEmitter { session.ref(); - if (++session[kCurrentStreamsCount] >= session.remoteSettings.maxConcurrentStreams) { - removeSession(this.freeSessions, normalizedOptions, session); + ++session[kCurrentStreamsCount]; - if (Reflect.has(this.busySessions, normalizedOptions)) { - this.busySessions[normalizedOptions].push(session); - } else { - this.busySessions[normalizedOptions] = [session]; - } + if (!isFree() && removeSession(this.freeSessions, normalizedOptions, session)) { + addSession(this.busySessions, normalizedOptions, session); } stream.once('close', () => { - if (--session[kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams) { + --session[kCurrentStreamsCount]; + + if (isFree()) { if (session[kCurrentStreamsCount] === 0) { session.unref(); } if (removeSession(this.busySessions, normalizedOptions, session) && !session.destroyed && !session.closed) { - const freeSessionsLength = getSessions(this.freeSessions, normalizedOptions, normalizedAuthority).length; - - if (freeSessionsLength < this.maxFreeSessions) { - if (Reflect.has(this.freeSessions, normalizedOptions)) { - this.freeSessions[normalizedOptions].push(session); - } else { - this.freeSessions[normalizedOptions] = [session]; - } - - // The session cannot be uncovered at this point. To be uncovered, - // the only possible way is to make another session cover this one. - + if (freeSession()) { closeCoveredSessions(this.freeSessions, normalizedOptions, session); closeCoveredSessions(this.busySessions, normalizedOptions, session); checkQueue(); @@ -387,7 +394,7 @@ class Agent extends EventEmitter { listener.reject(error); } - delete this.queue[normalizedOptions][normalizedAuthority]; + removeFromQueue(); } }; @@ -399,11 +406,15 @@ class Agent extends EventEmitter { }); } - async request(authority, options, headers) { - const session = await this.getSession(authority, options); - const stream = session.request(headers); - - return stream; + request(authority, options, headers) { + return new Promise((resolve, reject) => { + this.getSession(authority, options, [{ + reject, + resolve: session => { + resolve(session.request(headers)); + } + }]); + }); } createConnection(authority, options) { diff --git a/source/client-request.js b/source/client-request.js index 299a5e5..dcc75ac 100644 --- a/source/client-request.js +++ b/source/client-request.js @@ -100,7 +100,11 @@ class ClientRequest extends Writable { options.path = options.socketPath; this[kOptions] = options; - this[kAuthority] = options.authority || new URL(`https://${options.hostname || options.host}:${options.port}`); + this[kAuthority] = Agent.normalizeAuthority(options, options.servername); + + if (!Reflect.has(this[kHeaders], ':authority')) { + this[kHeaders][':authority'] = this[kAuthority].slice(8); + } if (this.agent && options.preconnect !== false) { this.agent.getSession(this[kAuthority], options).catch(() => {}); @@ -149,6 +153,10 @@ class ClientRequest extends Writable { } _final(callback) { + if (this.destroyed || this.aborted) { + return; + } + this.flushHeaders(); const callEnd = () => this._request.end(callback); @@ -184,7 +192,7 @@ class ClientRequest extends Writable { } flushHeaders() { - if (this[kFlushedHeaders] && !this.destroyed && !this.aborted) { + if (this[kFlushedHeaders] || this.destroyed || this.aborted) { return; } @@ -196,69 +204,70 @@ class ClientRequest extends Writable { const onStream = stream => { this._request = stream; - if (!this.destroyed && !this.aborted) { - // Forwards `timeout`, `continue`, `close` and `error` events to this instance. - if (!isConnectMethod) { - proxyEvents(this._request, this, ['timeout', 'continue', 'close', 'error']); - } + if (this.destroyed || this.aborted) { + this._request.close(NGHTTP2_CANCEL); + return; + } - // This event tells we are ready to listen for the data. - this._request.once('response', (headers, flags, rawHeaders) => { - this.res = new IncomingMessage(this.socket); - this.res.req = this; - this.res.statusCode = headers[HTTP2_HEADER_STATUS]; - this.res.headers = headers; - this.res.rawHeaders = rawHeaders; - - this.res.once('end', () => { - if (this.aborted) { - this.res.aborted = true; - this.res.emit('aborted'); - } else { - this.res.complete = true; - } - }); - - if (isConnectMethod) { - this.res.upgrade = true; - - // The HTTP1 API says the socket is detached here, - // but we can't do that so we pass the original HTTP2 request. - if (this.emit('connect', this.res, this._request, Buffer.alloc(0))) { - this.emit('close'); - } else { - // No listeners attached, destroy the original request. - this._request.destroy(); - } - } else { - // Forwards data - this._request.pipe(this.res); + // Forwards `timeout`, `continue`, `close` and `error` events to this instance. + if (!isConnectMethod) { + proxyEvents(this._request, this, ['timeout', 'continue', 'close', 'error']); + } - if (!this.emit('response', this.res)) { - // No listeners attached, dump the response. - this.res._dump(); - } + // This event tells we are ready to listen for the data. + this._request.once('response', (headers, flags, rawHeaders) => { + this.res = new IncomingMessage(this.socket); + this.res.req = this; + this.res.statusCode = headers[HTTP2_HEADER_STATUS]; + this.res.headers = headers; + this.res.rawHeaders = rawHeaders; + + this.res.once('end', () => { + if (this.aborted) { + this.res.aborted = true; + this.res.emit('aborted'); + } else { + this.res.complete = true; } }); - // Emits `information` event - this._request.once('headers', headers => this.emit('information', {statusCode: headers[HTTP2_HEADER_STATUS]})); + if (isConnectMethod) { + this.res.upgrade = true; - this._request.once('trailers', (trailers, flags, rawTrailers) => { - // Assigns trailers to the response object. - this.res.trailers = trailers; - this.res.rawTrailers = rawTrailers; - }); + // The HTTP1 API says the socket is detached here, + // but we can't do that so we pass the original HTTP2 request. + if (this.emit('connect', this.res, this._request, Buffer.alloc(0))) { + this.emit('close'); + } else { + // No listeners attached, destroy the original request. + this._request.destroy(); + } + } else { + // Forwards data + this._request.pipe(this.res); - this.socket = this._request.session.socket; - this.connection = this._request.session.socket; + if (!this.emit('response', this.res)) { + // No listeners attached, dump the response. + this.res._dump(); + } + } + }); - process.nextTick(() => { - this.emit('socket', this._request.session.socket); - }); - } else { - this._request.close(NGHTTP2_CANCEL); - } + // Emits `information` event + this._request.once('headers', headers => this.emit('information', {statusCode: headers[HTTP2_HEADER_STATUS]})); + + this._request.once('trailers', (trailers, flags, rawTrailers) => { + // Assigns trailers to the response object. + this.res.trailers = trailers; + this.res.rawTrailers = rawTrailers; + }); + + this.socket = this._request.session.socket; + this.connection = this._request.session.socket; + + process.nextTick(() => { + this.emit('socket', this._request.session.socket); + }); }; // Makes a HTTP2 request diff --git a/source/utils/calculate-server-name.js b/source/utils/calculate-server-name.js index 915dff5..5a7a3d0 100644 --- a/source/utils/calculate-server-name.js +++ b/source/utils/calculate-server-name.js @@ -1,5 +1,6 @@ 'use strict'; -/* istanbul ignore file: https://github.com/nodejs/node/blob/d4c91f28148af8a6c1a95392e5c88cb93d4b61c6/lib/_http_agent.js */ +const net = require('net'); +/* istanbul ignore file: https://github.com/nodejs/node/blob/v12.10.0/lib/_http_agent.js */ module.exports = (options, headers) => { let servername = options.host; @@ -18,5 +19,9 @@ module.exports = (options, headers) => { } } + if (net.isIP(servername)) { + return ''; + } + return servername; }; diff --git a/test/agent.js b/test/agent.js index 9c84b56..3bfe1a9 100644 --- a/test/agent.js +++ b/test/agent.js @@ -7,11 +7,10 @@ import is from '@sindresorhus/is'; import {Agent} from '../source'; import isCompatible from '../source/utils/is-compatible'; import {createWrapper, createServer} from './helpers/server'; +import setImmediateAsync from './helpers/set-immediate-async'; const supportsTlsSessions = process.versions.node.split('.')[0] >= 11; -const setImmediateAsync = () => new Promise(resolve => setImmediate(resolve)); - if (isCompatible) { process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; @@ -23,6 +22,13 @@ if (isCompatible) { } }); + const singleRequestWrapperWithLolex = createWrapper({ + settings: { + maxConcurrentStreams: 1 + }, + lolex: true + }); + const tripleRequestWrapper = createWrapper({ settings: { maxConcurrentStreams: 3 @@ -34,6 +40,8 @@ if (isCompatible) { test('passing string as `authority`', wrapper, async (t, server) => { const agent = new Agent(); await t.notThrowsAsync(agent.getSession(server.url)); + + agent.destroy(); }); test('passing options as `authority`', wrapper, async (t, server) => { @@ -62,6 +70,8 @@ if (isCompatible) { const error = await t.throwsAsync(agent.getSession({})); t.is(error.port, 443); t.is(error.address, '127.0.0.1'); + + agent.destroy(); }); test('sessions are not busy if still can make requests', wrapper, async (t, server) => { @@ -76,6 +86,8 @@ if (isCompatible) { t.is(Object.values(agent.freeSessions)[0].length, 1); t.is(Object.values(agent.busySessions).length, 0); + + agent.destroy(); }); test('sessions are busy when cannot make requests', singleRequestWrapper, async (t, server) => { @@ -90,6 +102,8 @@ if (isCompatible) { t.is(Object.values(agent.busySessions).length, 0); t.is(Object.values(agent.freeSessions)[0].length, 1); + + agent.destroy(); }); test('gives free sessions if available', wrapper, async (t, server) => { @@ -102,6 +116,8 @@ if (isCompatible) { t.is(Object.values(agent.freeSessions)[0].length, 1); t.is(first, second); + + agent.destroy(); }); test('gives the queued session if exists', wrapper, async (t, server) => { @@ -119,6 +135,8 @@ if (isCompatible) { t.is(typeof Object.values(agent.queue[''])[0], 'function'); t.is(await firstPromise, await secondPromise); + + agent.destroy(); }); test.serial('`timeout` option', wrapper, async (t, server) => { @@ -140,42 +158,64 @@ if (isCompatible) { t.true(difference <= timeout, `Timeout exceeded ${timeout}ms (${difference}ms)`); }); }); + + agent.destroy(); }); test('`maxSessions` option', singleRequestWrapper, async (t, server) => { - const queue = []; server.get('/', (request, response) => { - queue.push(() => response.end('ok')); + process.nextTick(() => { + response.end(); + }); }); const agent = new Agent({ maxSessions: 1 }); - const firstRequest = (await agent.request(server.url, server.options)).end(); - const secondRequestPromise = agent.request(server.url, server.options); + const {session} = (await agent.request(server.url, server.options)).end(); + const requestPromise = agent.request(server.url, server.options); t.is(typeof Object.values(agent.queue[''])[0], 'function'); t.is(Object.values(agent.freeSessions).length, 0); - t.is(Object.values(agent.busySessions)[0].length, 1); + t.is(Object.values(agent.busySessions['']).length, 1); - await new Promise(resolve => { - const interval = setInterval(() => { - if (queue.length !== 0) { - resolve(); + session.destroy(); - clearInterval(interval); - } - }, 100); - }); + const request = await requestPromise; + request.close(); - // TODO: get rid of `serverSession.setTimeout()` as it breaks this test + agent.destroy(); + }); + + test('doesn\'t break on session `close` event', singleRequestWrapper, async (t, server) => { + server.get('/', () => {}); + + const agent = new Agent(); + const request = (await agent.request(server.url)).end(); + const {session} = request; + + const requestPromise = agent.request(server.url); + + const emit = request.emit.bind(request); + request.emit = (event, ...args) => { + if (event === 'error') { + t.pass(); + } else { + emit(event, ...args); + } + }; - queue.shift()(); - await pEvent(firstRequest, 'response'); - firstRequest.resume(); + session.close(); - await secondRequestPromise; + await requestPromise; + if (process.versions.node.split('.')[0] < 12) { + // Session `close` event is emitted before its streams send `close` event + t.pass(); + } else { + // Session `close` event is emitted after its streams send `close` event + t.plan(1); + } agent.destroy(); }); @@ -190,6 +230,8 @@ if (isCompatible) { await pEvent(session, 'close'); t.is(Object.values(agent.freeSessions).length, 0); + + agent.destroy(); }); test('creates new session if there are no free sessions', singleRequestWrapper, async (t, server) => { @@ -222,6 +264,8 @@ if (isCompatible) { t.is(Object.values(agent.freeSessions).length, 0); t.is(Object.values(agent.busySessions).length, 0); + + agent.destroy(); }); test('can destroy busy sessions', singleRequestWrapper, async (t, server) => { @@ -241,6 +285,8 @@ if (isCompatible) { t.is(error.message, message); t.is(Object.values(agent.busySessions).length, 0); + + agent.destroy(); }); test('`closeFreeSessions()` closes sessions with 0 pending streams only', wrapper, async (t, server) => { @@ -254,6 +300,8 @@ if (isCompatible) { await pEvent(session, 'close'); t.is(Object.values(agent.freeSessions).length, 0); + + agent.destroy(); } { @@ -265,14 +313,17 @@ if (isCompatible) { t.is(session.closed, false); t.is(Object.values(agent.freeSessions).length, 1); + + agent.destroy(); } }); test('throws if session is closed before receiving a SETTINGS frame', async t => { const {key, cert} = await createCert(); + const sockets = []; const server = tls.createServer({key, cert, ALPNProtocols: ['h2']}, socket => { - setTimeout(() => socket.end(), 2000); + sockets.push(socket); }); server.listen = promisify(server.listen.bind(server)); @@ -289,6 +340,10 @@ if (isCompatible) { 'Session closed without receiving a SETTINGS frame' ); + for (const socket of sockets) { + socket.destroy(); + } + await server.close(); }); @@ -301,10 +356,14 @@ if (isCompatible) { await pEvent(secondStream, 'close'); t.pass(); + + agent.destroy(); }); test('endless response (specific case)', singleRequestWrapper, async (t, server) => { - const agent = new Agent(); + const agent = new Agent({ + timeout: 1000 + }); const firstRequest = await agent.request(server.url); const secondRequest = await agent.request(server.url); @@ -317,6 +376,8 @@ if (isCompatible) { await pEvent(secondStream, 'close'); t.pass(); + + agent.destroy(); }); test('respects `.getName()`', wrapper, async (t, server) => { @@ -328,6 +389,8 @@ if (isCompatible) { }); t.not(firstSession, secondSession); + + agent.destroy(); }); test('custom servername', wrapper, async (t, server) => { @@ -335,6 +398,8 @@ if (isCompatible) { const session = await agent.getSession(server.url, {servername: 'foobar'}); t.is(session.socket.servername, 'foobar'); + + agent.destroy(); }); test('appends to freeSessions after the stream has ended', singleRequestWrapper, async (t, server) => { @@ -361,16 +426,42 @@ if (isCompatible) { setImmediate(() => { t.is(agent.freeSessions[''].length, 2); }); + + agent.destroy(); + }); + + test('appends to freeSessions after the stream has ended #2', singleRequestWrapper, async (t, server) => { + server.get('/', (request, response) => { + setTimeout(() => { + response.end(); + }, 200); + }); + + const agent = new Agent({maxSessions: 1}); + + const firstRequest = await agent.request(server.url); + const secondRequestPromise = agent.request(server.url); + + firstRequest.close(); + + const secondRequest = await secondRequestPromise; + secondRequest.end(); + await pEvent(secondRequest, 'close'); + + t.pass(); + + agent.destroy(); }); test('prevents overloading sessions', singleRequestWrapper, async (t, server) => { const agent = new Agent(); - agent.getSession(server.url); const requestPromises = Promise.all([agent.request(server.url), agent.request(server.url)]); const requests = await requestPromises; t.not(requests[0].session, requests[1].session); + + agent.destroy(); }); test('prevents overloading sessions #2', singleRequestWrapper, async (t, server) => { @@ -401,7 +492,9 @@ if (isCompatible) { t.false(thirdSession.destroyed); request.close(); - await secondServer.gracefulClose(); + agent.closeFreeSessions(); + + await secondServer.close(); }); test('sessions can be manually overloaded', singleRequestWrapper, async (t, server) => { @@ -411,6 +504,8 @@ if (isCompatible) { const requests = [session.request(), session.request()]; t.is(requests[0].session, requests[1].session); + + agent.destroy(); }); test('emits `session` event when a new session is created', wrapper, async (t, server) => { @@ -423,6 +518,8 @@ if (isCompatible) { }); await agent.getSession(server.url); + + agent.destroy(); }); test('`.settings` property', wrapper, async (t, server) => { @@ -430,40 +527,57 @@ if (isCompatible) { agent.settings.maxHeaderListSize = 100; const session = await agent.getSession(server.url); + await pEvent(session, 'localSettings'); + t.is(session.localSettings.maxHeaderListSize, 100); + + agent.destroy(); }); if (supportsTlsSessions) { test('caches a TLS session when successfully connected', wrapper, async (t, server) => { const agent = new Agent(); + await agent.getSession(server.url); + await setImmediateAsync(); - t.true(is.buffer(agent.tlsSessionCache.get(`${agent.normalizeAuthority(server.url)}:`).session)); + t.true(is.buffer(agent.tlsSessionCache.get(`${Agent.normalizeAuthority(server.url)}:`).session)); + + agent.destroy(); }); test('reuses a TLS session', wrapper, async (t, server) => { const agent = new Agent(); const session = await agent.getSession(server.url); - const tlsSession = agent.tlsSessionCache.get(`${agent.normalizeAuthority(server.url)}:`).session; + await setImmediateAsync(); + + const tlsSession = agent.tlsSessionCache.get(`${Agent.normalizeAuthority(server.url)}:`).session; session.close(); await pEvent(session, 'close'); const secondSession = await agent.getSession(server.url); + await setImmediateAsync(); t.deepEqual(secondSession.socket.getSession(), tlsSession); t.true(is.buffer(tlsSession)); + + agent.destroy(); }); test('purges the TLS session on session error', wrapper, async (t, server) => { const agent = new Agent(); const session = await agent.getSession(server.url); - t.true(is.buffer(agent.tlsSessionCache.get(`${agent.normalizeAuthority(server.url)}:`).session)); + await setImmediateAsync(); + + t.true(is.buffer(agent.tlsSessionCache.get(`${Agent.normalizeAuthority(server.url)}:`).session)); session.destroy(new Error('Ouch.')); await pEvent(session, 'close', {rejectionEvents: []}); - t.true(is.undefined(agent.tlsSessionCache.get(`${agent.normalizeAuthority(server.url)}:`))); + t.true(is.undefined(agent.tlsSessionCache.get(`${Agent.normalizeAuthority(server.url)}:`))); + + agent.destroy(); }); } @@ -473,6 +587,8 @@ if (isCompatible) { const session = await agent.getSession(server.url); t.throws(() => session.request(), 'Invalid usage. Use `await agent.request(authority, options, headers)` instead.'); + + agent.destroy(); }); test('doesn\'t create a new session if there exists an authoritive one', wrapper, async (t, server) => { @@ -486,6 +602,8 @@ if (isCompatible) { await pEvent(session, 'origin'); t.is(await agent.getSession('https://example.com'), session); + + agent.destroy(); }); test('closes covered sessions - `origin` event', wrapper, async (t, server) => { @@ -507,7 +625,9 @@ if (isCompatible) { t.true(firstSession.destroyed); t.false(secondSession.destroyed); - await secondServer.gracefulClose(); + agent.destroy(); + + await secondServer.close(); }); test('closes covered sessions - session no longer busy', singleRequestWrapper, async (t, server) => { @@ -538,7 +658,9 @@ if (isCompatible) { t.true(firstSession.destroyed); t.false(secondSession.destroyed); - await secondServer.gracefulClose(); + agent.destroy(); + + await secondServer.close(); }); test('doesn\'t close covered sessions if the current one is full', singleRequestWrapper, async (t, server) => { @@ -572,7 +694,9 @@ if (isCompatible) { secondRequest.close(); - await secondServer.gracefulClose(); + agent.destroy(); + + await secondServer.close(); }); test('uses sessions which are more loaded to use fewer connections', tripleRequestWrapper, async (t, server) => { @@ -631,5 +755,66 @@ if (isCompatible) { for (let i = 0; i < SESSIONS_COUNT; i++) { sessions[i].closeRequests(); } + + agent.destroy(); + }); + + test('`.freeSessions` may contain destroyed sessions', wrapper, async (t, server) => { + const agent = new Agent(); + const session = await agent.getSession(server.url); + session.destroy(); + + t.true(agent.freeSessions[''][0].destroyed); + + agent.destroy(); + }); + + test('`.freeSessions` may contain closed sessions', wrapper, async (t, server) => { + const agent = new Agent(); + const session = await agent.getSession(server.url); + session.close(); + + t.true(agent.freeSessions[''][0].closed); + + agent.destroy(); + }); + + test('`maxFreeSessions` set to 0 causes to close the session after running through the queue', wrapper, async (t, server) => { + const agent = new Agent(); + const sessionPromise = agent.getSession(server.url); + + agent.maxFreeSessions = 0; + + const session = await sessionPromise; + await setImmediateAsync(); + + t.true(session.destroyed); + + agent.destroy(); + }); + + test.serial('respects `.maxFreeSessions` changes', singleRequestWrapperWithLolex, async (t, server, clock) => { + const agent = new Agent({ + maxFreeSessions: 2 + }); + + const stream = await agent.request(server.url); + const streamSession = stream.session; + + agent.maxFreeSessions = 1; + stream.close(); + + const sessionPromise = agent.getSession(server.url); + const session = await sessionPromise; + + t.is(session, streamSession); + + const serverSession = await pEvent(server, 'session'); + + await pEvent(serverSession, 'remoteSettings'); + + clock.tick(1); + agent.destroy(); + clock.tick(1); }); } diff --git a/test/auto.js b/test/auto.js index 1443cb4..1ecbe6f 100644 --- a/test/auto.js +++ b/test/auto.js @@ -2,7 +2,7 @@ import tls from 'tls'; import https from 'https'; import http from 'http'; import util from 'util'; -import test from 'ava'; +import {serial as test, afterEach} from 'ava'; import pEvent from 'p-event'; import getStream from 'get-stream'; import http2 from '../source'; @@ -11,6 +11,12 @@ import {createServer} from './helpers/server'; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; +afterEach(() => { + http2.globalAgent.destroy(); +}); + +test.serial = test; + if (isCompatible) { const createH1Server = () => { const server = http.createServer((request, response) => { @@ -76,6 +82,8 @@ if (isCompatible) { const data = await getStream(response); t.is(data, 'h2'); t.is(Object.keys(agent.freeSessions).length, 1); + + agent.destroy(); }); test('https', async t => { @@ -110,6 +118,8 @@ if (isCompatible) { const data = await getStream(response); t.is(data, 'http/1.1'); t.is(Object.keys(agent.sockets).length, 1); + + agent.destroy(); }); test('http', async t => { @@ -142,6 +152,8 @@ if (isCompatible) { const data = await getStream(response); t.is(data, 'http/1.1'); t.is(Object.keys(agent.sockets).length, 1); + + agent.destroy(); }); test('accepts string as URL', async t => { @@ -252,6 +264,7 @@ if (isCompatible) { return tls.connect(h2s.address().port, 'localhost', { ...options, + servername: 'localhost', allowHalfOpen: true, ALPNProtocols: ['h2'] }); @@ -264,6 +277,8 @@ if (isCompatible) { const data = await getStream(response); t.is(data, 'h2'); t.true(called); + + request.agent.destroy(); }); } else { test('fallbacks to HTTP1', async t => { diff --git a/test/headers.js b/test/headers.js index 64bb828..1359337 100644 --- a/test/headers.js +++ b/test/headers.js @@ -1,14 +1,16 @@ -import test from 'ava'; +import {serial as test} from 'ava'; import pEvent from 'p-event'; import getStream from 'get-stream'; -import {request as makeRequest} from '../source'; +import http2, {request as makeRequest} from '../source'; import isCompatible from '../source/utils/is-compatible'; import {createWrapper} from './helpers/server'; if (isCompatible) { process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; - const wrapper = createWrapper(); + const wrapper = createWrapper({ + beforeServerClose: () => http2.globalAgent.destroy() + }); test('setting headers', wrapper, async (t, server) => { const request = makeRequest(server.options); diff --git a/test/helpers/server.js b/test/helpers/server.js index 62e30f8..9f1b5ad 100644 --- a/test/helpers/server.js +++ b/test/helpers/server.js @@ -3,8 +3,7 @@ const net = require('net'); const http2 = require('http2'); const util = require('util'); const createCert = require('create-cert'); - -const delay = ms => new Promise(resolve => setTimeout(setImmediate, ms, resolve)); +const lolex = require('lolex'); const createPlainServer = async (options, handler) => { if (typeof options === 'function') { @@ -28,33 +27,6 @@ const createPlainServer = async (options, handler) => { server.url = `${server.options.protocol}//${server.options.hostname}:${server.options.port}`; }); - const sessions = []; - let hasConnected = false; - - server.on('session', session => { - hasConnected = true; - sessions.push(session); - - session.once('close', () => { - sessions.splice(sessions.indexOf(session), 1); - }); - - session.setTimeout(1000); - }); - - server.gracefulClose = async () => { - let elapsed = 0; - const tick = 10; - - // eslint-disable-next-line no-unmodified-loop-condition - while ((sessions.length !== 0 || !hasConnected) && elapsed < 1000) { - await delay(tick); // eslint-disable-line no-await-in-loop - elapsed += tick; - } - - return server.close(); - }; - return server; }; @@ -125,6 +97,8 @@ const createWrapper = options => { return async (t, run) => { const create = (options && options.createServer) || createServer; + const clock = options && options.lolex ? lolex.install() : lolex.createClock(); + const server = await create(options); await server.listen(); @@ -132,9 +106,17 @@ const createWrapper = options => { // console.log(`${server.options.port} - ${t.title}`); try { - await run(t, server); + await run(t, server, clock); } finally { - await server.gracefulClose(); + if (options && options.beforeServerClose) { + options.beforeServerClose(); + } + + if (options && options.lolex) { + clock.uninstall(); + } + + await server.close(); } }; }; diff --git a/test/helpers/set-immediate-async.js b/test/helpers/set-immediate-async.js new file mode 100644 index 0000000..15a5e8f --- /dev/null +++ b/test/helpers/set-immediate-async.js @@ -0,0 +1,3 @@ +'use strict'; + +module.exports = () => new Promise(resolve => setImmediate(resolve)); diff --git a/test/request.js b/test/request.js index 11e5655..b505050 100644 --- a/test/request.js +++ b/test/request.js @@ -1,7 +1,6 @@ import EventEmitter from 'events'; import net from 'net'; -import http2 from 'http2'; -import test from 'ava'; +import {serial as test} from 'ava'; import pEvent from 'p-event'; import getStream from 'get-stream'; import tempy from 'tempy'; @@ -9,13 +8,22 @@ import is from '@sindresorhus/is'; import {request as makeRequest, get, constants, connect, Agent, globalAgent} from '../source'; import isCompatible from '../source/utils/is-compatible'; import {createWrapper, createServer, createProxyServer} from './helpers/server'; +import setImmediateAsync from './helpers/set-immediate-async'; + +const delay = ms => new Promise(resolve => setTimeout(resolve, ms)); + +test.serial = test; if (isCompatible) { process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; - const wrapper = createWrapper(); + const wrapper = createWrapper({ + beforeServerClose: () => globalAgent.destroy() + }); + const proxyWrapper = createWrapper({ - createServer: createProxyServer + createServer: createProxyServer, + beforeServerClose: () => globalAgent.destroy() }); const okHandler = (request, response) => { @@ -26,6 +34,31 @@ if (isCompatible) { t.throws(() => makeRequest('invalid://'), 'Protocol "invalid:" not supported. Expected "https:"'); }); + test('does not modify options', t => { + const inputs = [ + undefined, + 'https://example.com', + new URL('https://example.com') + ]; + + const noop = () => {}; + + for (const input of inputs) { + const originalOptions = { + preconnect: false + }; + + const options = { + ...originalOptions + }; + + const request = input ? makeRequest(input, options, noop) : makeRequest(options, noop); + request.abort(); + + t.deepEqual(options, originalOptions); + } + }); + test('accepts `URL` as the input parameter', wrapper, async (t, server) => { server.get('/200', okHandler); @@ -125,20 +158,6 @@ if (isCompatible) { t.true((/self signed certificate/).test(error.message) || error.message === 'unable to verify the first certificate'); }); - test('`authority` option', async t => { - const localServer = await createServer(); - await localServer.listen(); - - const request = makeRequest({...localServer.options, authority: localServer.options}); - request.end(); - - const response = await pEvent(request, 'response'); - const data = JSON.parse(await getStream(response)); - await localServer.close(); - - t.is(data.headers[':authority'], `${localServer.options.hostname}:${localServer.options.port}`); - }); - test('`tlsSession` option', wrapper, async (t, server) => { const request = makeRequest(server.url, {tlsSession: 'not a buffer', agent: false}); request.end(); @@ -173,7 +192,9 @@ if (isCompatible) { request.end(); await pEvent(request, 'finish'); - session.unref(); + request.abort(); + session.close(); + t.true(called); }); @@ -249,7 +270,7 @@ if (isCompatible) { test('`headersSent` property is `false` before flushing headers', wrapper, (t, server) => { const request = makeRequest(server.options); t.false(request.headersSent); - request.end(); + request.abort(); }); test('`headersSent` is `true` after flushing headers', wrapper, async (t, server) => { @@ -261,6 +282,8 @@ if (isCompatible) { }); test('`timeout` option', wrapper, async (t, server) => { + server.get('/', () => {}); + const request = makeRequest({ ...server.options, timeout: 1 @@ -274,6 +297,8 @@ if (isCompatible) { }); test('`.setTimeout()` works', wrapper, async (t, server) => { + server.get('/', () => {}); + const request = makeRequest(server.options); request.setTimeout(1); request.end(); @@ -440,6 +465,35 @@ if (isCompatible) { t.notThrows(() => request.end()); }); + test('`.flushHeaders()` has no effect if `.abort()` had been run before', wrapper, (t, server) => { + const request = makeRequest(server.options); + request.abort(); + t.notThrows(() => request.flushHeaders()); + }); + + test('aborting after flushing headers may error because the request could be sent already', wrapper, async (t, server) => { + const request = makeRequest(server.options); + request.flushHeaders(); + request.abort(); + + // It's also called after every test finishes + server.close(); + server.close = () => {}; + + const error = await pEvent(request, 'error'); + t.is(error.code, 'ECONNREFUSED'); + }); + + test('`.abort()` works if it has received a stream recently', wrapper, async (t, server) => { + const request = makeRequest(server.options); + request.flushHeaders(); + request.abort(); + + await delay(100); + + t.true(request.aborted); + }); + test('emits `abort` only once', wrapper, (t, server) => { let aborts = 0; @@ -456,6 +510,8 @@ if (isCompatible) { request.end(); t.notThrows(() => request.setNoDelay()); + + request.abort(); }); test('`.setSocketKeepAlive()` doesn\'t throw', wrapper, (t, server) => { @@ -463,6 +519,8 @@ if (isCompatible) { request.end(); t.notThrows(() => request.setSocketKeepAlive()); + + request.abort(); }); test('`.maxHeadersCount` - getter', wrapper, async (t, server) => { @@ -488,7 +546,7 @@ if (isCompatible) { }); test('throws if making a request on a closed session', wrapper, async (t, server) => { - const session = http2.connect(server.url); + const session = connect(server.url); session.destroy(); const request = makeRequest({ @@ -547,12 +605,36 @@ if (isCompatible) { request.abort(); t.true(called); + + request.agent.destroy(); + }); + + test('sets proper `:authority` header', wrapper, async (t, server) => { + server.on('session', session => { + session.origin('https://example.com'); + }); + + server.get('/', (request, response) => { + response.end(request.headers[':authority']); + }); + + const agent = new Agent(); + await agent.getSession(server.url); + await setImmediateAsync(); + + const request = makeRequest('https://example.com', {agent}).end(); + const response = await pEvent(request, 'response'); + const body = await getStream(response); + + t.is(body, 'example.com'); + + agent.destroy(); }); if (process.platform !== 'win32') { const socketPath = tempy.file({extension: 'socket'}); - test('`socketPath` option', async t => { + test.serial('`socketPath` option', async t => { const localServer = await createServer(); await localServer.listen(socketPath); @@ -566,6 +648,8 @@ if (isCompatible) { const data = JSON.parse(await getStream(response)); t.truthy(data); + request.agent.destroy(); + await localServer.close(); }); } diff --git a/test/response.js b/test/response.js index 24c031d..8470765 100644 --- a/test/response.js +++ b/test/response.js @@ -1,8 +1,8 @@ -import test from 'ava'; +import {serial as test} from 'ava'; import pEvent from 'p-event'; import getStream from 'get-stream'; import is from '@sindresorhus/is'; -import {request as makeRequest} from '../source'; +import {request as makeRequest, globalAgent} from '../source'; import isCompatible from '../source/utils/is-compatible'; import IncomingMessage from '../source/incoming-message'; import {createWrapper} from './helpers/server'; @@ -10,7 +10,9 @@ import {createWrapper} from './helpers/server'; if (isCompatible) { process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; - const wrapper = createWrapper(); + const wrapper = createWrapper({ + beforeServerClose: () => globalAgent.destroy() + }); const intervalMs = 250; const intervalHandler = (request, response) => { @@ -128,17 +130,22 @@ if (isCompatible) { await pEvent(response, 'data'); response.pause(); - const interval = setInterval(async () => { - if (response.readableLength === 2) { - response.resume(); + const run = () => new Promise(resolve => { + const interval = setInterval(async () => { + if (response.readableLength === 2) { + response.resume(); - await pEvent(response, 'data'); - t.deepEqual(chunks, ['0', '1', '2']); + await pEvent(response, 'data'); + t.deepEqual(chunks, ['0', '1', '2']); - request.abort(); - clearInterval(interval); - } + request.abort(); + clearInterval(interval); + resolve(); + } + }); }); + + await run(); }); test('reading parts of the response', wrapper, async (t, server) => { @@ -155,15 +162,20 @@ if (isCompatible) { t.is(response.read(1), '0'); - const interval = setInterval(() => { - if (response.readableLength === 2) { - t.is(response.read(1), '1'); - t.is(response.read(1), '2'); + const run = () => new Promise(resolve => { + const interval = setInterval(() => { + if (response.readableLength === 2) { + t.is(response.read(1), '1'); + t.is(response.read(1), '2'); - request.abort(); - clearInterval(interval); - } + request.abort(); + clearInterval(interval); + resolve(); + } + }); }); + + await run(); }); test('headers', wrapper, async (t, server) => {