Skip to content

Commit

Permalink
Refactor source code, fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
szmarczak authored Sep 26, 2019
1 parent 3eb8000 commit e4db251
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 120 deletions.
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
"node": ">=8.16.0"
},
"scripts": {
"test": "echo XO $(xo --version) && echo AVA $(ava --version) && xo && nyc ava",
"coveralls": "nyc report --reporter=text-lcov | coveralls",
"report": "npm test && nyc report --reporter=html"
"test": "echo XO $(xo --version) && echo AVA $(ava --version) && xo && nyc --reporter=html --reporter=text ava",
"coveralls": "nyc report --reporter=text-lcov | coveralls"
},
"files": [
"source"
Expand Down
143 changes: 69 additions & 74 deletions source/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const removeSession = (where, name, session) => {
const getSessions = (where, name, normalizedAuthority) => {
if (Reflect.has(where, name)) {
return where[name].filter(session => {
return session.originSet.includes(normalizedAuthority);
return !session.closed && session.originSet.includes(normalizedAuthority);
});
}

Expand Down Expand Up @@ -107,12 +107,12 @@ class Agent extends EventEmitter {
this.tlsSessionCache = new QuickLRU({maxSize: maxCachedTlsSessions});
}

normalizeAuthority(authority) {
static normalizeAuthority(authority, servername) {
if (typeof authority === 'string') {
authority = new URL(authority);
}

const host = authority.hostname || authority.host || 'localhost';
const host = servername || authority.hostname || authority.host || 'localhost';
const port = authority.port || 443;

if (port === 443) {
Expand All @@ -122,12 +122,12 @@ class Agent extends EventEmitter {
return `https://${host}:${port}`;
}

normalizeOptions(options) {
static normalizeOptions(options) {
let normalized = '';

if (options) {
for (const key of nameKeys) {
if (Reflect.has(options, key)) {
if (options[key]) {
normalized += `:${options[key]}`;
}
}
Expand All @@ -151,40 +151,46 @@ class Agent extends EventEmitter {
}
}

async getSession(authority, options) {
getSession(authority, options, listeners) {
return new Promise((resolve, reject) => {
const detached = {resolve, reject};
const normalizedOptions = this.normalizeOptions(options);
const normalizedAuthority = this.normalizeAuthority(authority);
if (Array.isArray(listeners)) {
// Resolve ASAP, because we're just moving the listeners.
resolve();
} else {
listeners = [{resolve, reject}];
}

const normalizedOptions = Agent.normalizeOptions(options);
const normalizedAuthority = Agent.normalizeAuthority(authority, options && options.servername);

if (Reflect.has(this.freeSessions, normalizedOptions)) {
const freeSessions = getSessions(this.freeSessions, normalizedOptions, normalizedAuthority);

if (freeSessions.length !== 0) {
resolve(freeSessions.reduce((previousValue, nextValue) => {
if (nextValue[kCurrentStreamsCount] > previousValue[kCurrentStreamsCount]) {
return nextValue;
}
for (const listener of listeners) {
listener.resolve(freeSessions.reduce((previousSession, nextSession) => {
if (nextSession[kCurrentStreamsCount] > previousSession[kCurrentStreamsCount]) {
return nextSession;
}

return previousValue;
}));
return previousSession;
}));
}

return;
}
}

if (Reflect.has(this.queue, normalizedOptions)) {
if (Reflect.has(this.queue[normalizedOptions], normalizedAuthority)) {
this.queue[normalizedOptions][normalizedAuthority].listeners.push(detached);
this.queue[normalizedOptions][normalizedAuthority].listeners.push(...listeners);

return;
}
} else {
this.queue[normalizedOptions] = {};
}

const listeners = [detached];

const removeFromQueue = () => {
// Our entry can be replaced. We cannot remove the new one.
if (Reflect.has(this.queue, normalizedOptions) && this.queue[normalizedOptions][normalizedAuthority] === entry) {
Expand Down Expand Up @@ -212,18 +218,20 @@ class Agent extends EventEmitter {
});
session[kCurrentStreamsCount] = 0;

session.socket.once('session', session => {
this.tlsSessionCache.set(name, {
session,
servername
session.socket.once('session', tlsSession => {
setImmediate(() => {
this.tlsSessionCache.set(name, {
session: tlsSession,
servername
});
});
});

// See https://github.com/nodejs/node/issues/28985
session.socket.once('secureConnect', () => {
servername = session.socket.servername;

if (servername === false && typeof tlsSessionCache !== 'undefined') {
if (servername === false && typeof tlsSessionCache !== 'undefined' && typeof tlsSessionCache.servername !== 'undefined') {
session.socket.servername = tlsSessionCache.servername;
}
});
Expand All @@ -239,7 +247,7 @@ class Agent extends EventEmitter {
});

session.setTimeout(this.timeout, () => {
// `.close()` gracefully closes the session. Current streams wouldn't be terminated that way.
// Terminates all streams owend by this session. `session.close()` would gracefully close it instead.
session.destroy();
});

Expand All @@ -253,35 +261,42 @@ class Agent extends EventEmitter {
removeFromQueue();
removeSession(this.freeSessions, normalizedOptions, session);

// TODO: this needs tests (session `close` event emitted before its streams were closed)
// See https://travis-ci.org/szmarczak/http2-wrapper/jobs/587629103#L282
removeSession(this.busySessions, normalizedOptions, session);

// This is needed. A session can be destroyed,
// so `sessionsCount < maxSessions` and there may be callback awaiting already.
this._processQueue(normalizedOptions, normalizedAuthority);
});

const checkQueue = () => {
for (const authority in this.queue[normalizedOptions]) {
if (session.originSet.includes(authority)) {
const {listeners} = this.queue[normalizedOptions][authority];
const movedListeners = listeners.splice(0, session.remoteSettings.maxConcurrentStreams - session[kCurrentStreamsCount]);
if (!Reflect.has(this.queue, normalizedOptions)) {
return;
}

while (movedListeners.length !== 0 && session[kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams) {
movedListeners.shift().resolve(session);
for (const origin of session.originSet) {
if (Reflect.has(this.queue[normalizedOptions], origin)) {
const {listeners} = this.queue[normalizedOptions][origin];
while (listeners.length !== 0 && session[kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams) {
// We assume `resolve(...)` calls `request(...)` *directly*,
// otherwise the session will get overloaded.
listeners.shift().resolve(session);
}

if (this.queue[normalizedOptions][authority].length === 0) {
delete this.queue[normalizedOptions][authority];
if (this.queue[normalizedOptions][origin].length === 0) {
delete this.queue[normalizedOptions][origin];

if (Object.keys(this.queue[normalizedOptions]).length === 0) {
delete this.queue[normalizedOptions];
break;
}
}
}
}

// It isn't possible for the queue to exceed the stream limit of two free sessions.
// The queue will start immediately if there's at least one free session.
// The queue will be cleared. If not, it will wait for another free session.
};

// The Origin Set cannot shrink. No need to check if it suddenly became "uncovered".
// The Origin Set cannot shrink. No need to check if it suddenly became covered by another one.
session.once('origin', () => {
if (session[kCurrentStreamsCount] >= session.remoteSettings.maxConcurrentStreams) {
return;
Expand All @@ -292,43 +307,22 @@ class Agent extends EventEmitter {
checkQueue();
});

session.once('localSettings', () => {
removeFromQueue();

const movedListeners = listeners.splice(session.remoteSettings.maxConcurrentStreams);

if (movedListeners.length !== 0) {
const freeSessions = getSessions(this.freeSessions, normalizedOptions, normalizedAuthority);

while (freeSessions.length !== 0 && movedListeners.length !== 0) {
movedListeners.shift().resolve(freeSessions[0]);

if (freeSessions[0][kCurrentStreamsCount] >= freeSessions[0].remoteSettings.maxConcurrentStreams) {
freeSessions.shift();
}
}

if (movedListeners.length !== 0) {
this.getSession(authority, options);

// Replace listeners with the new ones
const {listeners} = this.queue[normalizedOptions][normalizedAuthority];
listeners.length = 0;
listeners.push(...movedListeners);
}
}

session.once('remoteSettings', () => {
if (Reflect.has(this.freeSessions, normalizedOptions)) {
this.freeSessions[normalizedOptions].push(session);
} else {
this.freeSessions[normalizedOptions] = [session];
}

for (const listener of listeners) {
listener.resolve(session);
checkQueue();

if (listeners.length !== 0) {
// Requests for a new session with predefined listeners
this.getSession(normalizedAuthority, options, listeners);
}

receivedSettings = true;
removeFromQueue();
});

session[kRequest] = session.request;
Expand Down Expand Up @@ -365,9 +359,6 @@ class Agent extends EventEmitter {
this.freeSessions[normalizedOptions] = [session];
}

// The session cannot be uncovered at this point. To be uncovered,
// the only possible way is to make another session cover this one.

closeCoveredSessions(this.freeSessions, normalizedOptions, session);
closeCoveredSessions(this.busySessions, normalizedOptions, session);
checkQueue();
Expand All @@ -387,7 +378,7 @@ class Agent extends EventEmitter {
listener.reject(error);
}

delete this.queue[normalizedOptions][normalizedAuthority];
removeFromQueue();
}
};

Expand All @@ -399,11 +390,15 @@ class Agent extends EventEmitter {
});
}

async request(authority, options, headers) {
const session = await this.getSession(authority, options);
const stream = session.request(headers);

return stream;
request(authority, options, headers) {
return new Promise((resolve, reject) => {
this.getSession(authority, options, [{
reject,
resolve: session => {
resolve(session.request(headers));
}
}]);
});
}

createConnection(authority, options) {
Expand Down
6 changes: 5 additions & 1 deletion source/client-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ class ClientRequest extends Writable {
options.path = options.socketPath;

this[kOptions] = options;
this[kAuthority] = options.authority || new URL(`https://${options.hostname || options.host}:${options.port}`);
this[kAuthority] = Agent.normalizeAuthority(options, options.servername);

if (!Reflect.has(this[kHeaders], ':authority')) {
this[kHeaders][':authority'] = this[kAuthority].slice(8);
}

if (this.agent && options.preconnect !== false) {
this.agent.getSession(this[kAuthority], options).catch(() => {});
Expand Down
7 changes: 6 additions & 1 deletion source/utils/calculate-server-name.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';
/* istanbul ignore file: https://github.com/nodejs/node/blob/d4c91f28148af8a6c1a95392e5c88cb93d4b61c6/lib/_http_agent.js */
const net = require('net');
/* istanbul ignore file: https://github.com/nodejs/node/blob/v12.10.0/lib/_http_agent.js */

module.exports = (options, headers) => {
let servername = options.host;
Expand All @@ -18,5 +19,9 @@ module.exports = (options, headers) => {
}
}

if (net.isIP(servername)) {
return '';
}

return servername;
};
Loading

0 comments on commit e4db251

Please sign in to comment.