Skip to content

Commit

Permalink
refactor, fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
szmarczak committed Sep 27, 2019
1 parent 3eb8000 commit 32d160a
Show file tree
Hide file tree
Showing 11 changed files with 572 additions and 264 deletions.
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
"node": ">=8.16.0"
},
"scripts": {
"test": "echo XO $(xo --version) && echo AVA $(ava --version) && xo && nyc ava",
"coveralls": "nyc report --reporter=text-lcov | coveralls",
"report": "npm test && nyc report --reporter=html"
"test": "echo XO $(xo --version) && echo AVA $(ava --version) && xo && nyc --reporter=html --reporter=text ava",
"coveralls": "nyc report --reporter=text-lcov | coveralls"
},
"files": [
"source"
Expand All @@ -36,12 +35,13 @@
},
"devDependencies": {
"@sindresorhus/is": "^1.0.0",
"ava": "^2.2.0",
"ava": "^2.4.0",
"benchmark": "^2.1.4",
"coveralls": "^3.0.5",
"create-cert": "^1.0.6",
"get-stream": "^5.1.0",
"got": "^9.6.0",
"lolex": "^4.2.0",
"many-keys-map": "^1.0.2",
"nyc": "^14.1.1",
"p-event": "^4.1.0",
Expand Down
203 changes: 107 additions & 96 deletions source/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,18 @@ const removeSession = (where, name, session) => {
return false;
};

const addSession = (where, name, session) => {
if (Reflect.has(where, name)) {
where[name].push(session);
} else {
where[name] = [session];
}
};

const getSessions = (where, name, normalizedAuthority) => {
if (Reflect.has(where, name)) {
return where[name].filter(session => {
return session.originSet.includes(normalizedAuthority);
return !session.closed && !session.destroyed && session.originSet.includes(normalizedAuthority);
});
}

Expand Down Expand Up @@ -98,7 +106,7 @@ class Agent extends EventEmitter {

this.timeout = timeout;
this.maxSessions = maxSessions;
this.maxFreeSessions = maxFreeSessions;
this.maxFreeSessions = maxFreeSessions; // TODO: decreasing `maxFreeSessions` should close some sessions

this.settings = {
enablePush: false
Expand All @@ -107,12 +115,12 @@ class Agent extends EventEmitter {
this.tlsSessionCache = new QuickLRU({maxSize: maxCachedTlsSessions});
}

normalizeAuthority(authority) {
static normalizeAuthority(authority, servername) {
if (typeof authority === 'string') {
authority = new URL(authority);
}

const host = authority.hostname || authority.host || 'localhost';
const host = servername || authority.hostname || authority.host || 'localhost';
const port = authority.port || 443;

if (port === 443) {
Expand All @@ -122,12 +130,12 @@ class Agent extends EventEmitter {
return `https://${host}:${port}`;
}

normalizeOptions(options) {
static normalizeOptions(options) {
let normalized = '';

if (options) {
for (const key of nameKeys) {
if (Reflect.has(options, key)) {
if (options[key]) {
normalized += `:${options[key]}`;
}
}
Expand All @@ -151,40 +159,48 @@ class Agent extends EventEmitter {
}
}

async getSession(authority, options) {
getSession(authority, options, listeners) {
return new Promise((resolve, reject) => {
const detached = {resolve, reject};
const normalizedOptions = this.normalizeOptions(options);
const normalizedAuthority = this.normalizeAuthority(authority);
if (Array.isArray(listeners)) {
listeners = [...listeners];

// Resolve ASAP, because we're just moving the listeners.
resolve();
} else {
listeners = [{resolve, reject}];
}

const normalizedOptions = Agent.normalizeOptions(options);
const normalizedAuthority = Agent.normalizeAuthority(authority, options && options.servername);

if (Reflect.has(this.freeSessions, normalizedOptions)) {
const freeSessions = getSessions(this.freeSessions, normalizedOptions, normalizedAuthority);

if (freeSessions.length !== 0) {
resolve(freeSessions.reduce((previousValue, nextValue) => {
if (nextValue[kCurrentStreamsCount] > previousValue[kCurrentStreamsCount]) {
return nextValue;
}
for (const listener of listeners) {
listener.resolve(freeSessions.reduce((previousSession, nextSession) => {
if (nextSession[kCurrentStreamsCount] > previousSession[kCurrentStreamsCount]) {
return nextSession;
}

return previousValue;
}));
return previousSession;
}));
}

return;
}
}

if (Reflect.has(this.queue, normalizedOptions)) {
if (Reflect.has(this.queue[normalizedOptions], normalizedAuthority)) {
this.queue[normalizedOptions][normalizedAuthority].listeners.push(detached);
this.queue[normalizedOptions][normalizedAuthority].listeners.push(...listeners);

return;
}
} else {
this.queue[normalizedOptions] = {};
}

const listeners = [detached];

const removeFromQueue = () => {
// Our entry can be replaced. We cannot remove the new one.
if (Reflect.has(this.queue, normalizedOptions) && this.queue[normalizedOptions][normalizedAuthority] === entry) {
Expand Down Expand Up @@ -212,18 +228,34 @@ class Agent extends EventEmitter {
});
session[kCurrentStreamsCount] = 0;

session.socket.once('session', session => {
this.tlsSessionCache.set(name, {
session,
servername
const freeSession = () => {
const freeSessionsLength = getSessions(this.freeSessions, normalizedOptions, normalizedAuthority).length;

if (freeSessionsLength < this.maxFreeSessions) {
addSession(this.freeSessions, normalizedOptions, session);

return true;
}

return false;
};

const isFree = () => session[kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams;

session.socket.once('session', tlsSession => {
setImmediate(() => {
this.tlsSessionCache.set(name, {
session: tlsSession,
servername
});
});
});

// See https://github.com/nodejs/node/issues/28985
session.socket.once('secureConnect', () => {
servername = session.socket.servername;

if (servername === false && typeof tlsSessionCache !== 'undefined') {
if (servername === false && typeof tlsSessionCache !== 'undefined' && typeof tlsSessionCache.servername !== 'undefined') {
session.socket.servername = tlsSessionCache.servername;
}
});
Expand All @@ -239,7 +271,7 @@ class Agent extends EventEmitter {
});

session.setTimeout(this.timeout, () => {
// `.close()` gracefully closes the session. Current streams wouldn't be terminated that way.
// Terminates all streams owend by this session. `session.close()` would gracefully close it instead.
session.destroy();
});

Expand All @@ -253,37 +285,44 @@ class Agent extends EventEmitter {
removeFromQueue();
removeSession(this.freeSessions, normalizedOptions, session);

// TODO: this needs tests (session `close` event emitted before its streams were closed)
// See https://travis-ci.org/szmarczak/http2-wrapper/jobs/587629103#L282
removeSession(this.busySessions, normalizedOptions, session);

// This is needed. A session can be destroyed,
// so `sessionsCount < maxSessions` and there may be callback awaiting already.
this._processQueue(normalizedOptions, normalizedAuthority);
});

const checkQueue = () => {
for (const authority in this.queue[normalizedOptions]) {
if (session.originSet.includes(authority)) {
const {listeners} = this.queue[normalizedOptions][authority];
const movedListeners = listeners.splice(0, session.remoteSettings.maxConcurrentStreams - session[kCurrentStreamsCount]);
if (!Reflect.has(this.queue, normalizedOptions)) {
return;
}

while (movedListeners.length !== 0 && session[kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams) {
movedListeners.shift().resolve(session);
for (const origin of session.originSet) {
if (Reflect.has(this.queue[normalizedOptions], origin)) {
const {listeners} = this.queue[normalizedOptions][origin];
while (listeners.length !== 0 && isFree()) {
// We assume `resolve(...)` calls `request(...)` *directly*,
// otherwise the session will get overloaded.
listeners.shift().resolve(session);
}

if (this.queue[normalizedOptions][authority].length === 0) {
delete this.queue[normalizedOptions][authority];
if (this.queue[normalizedOptions][origin].length === 0) {
delete this.queue[normalizedOptions][origin];

if (Object.keys(this.queue[normalizedOptions]).length === 0) {
delete this.queue[normalizedOptions];
break;
}
}
}
}

// It isn't possible for the queue to exceed the stream limit of two free sessions.
// The queue will start immediately if there's at least one free session.
// The queue will be cleared. If not, it will wait for another free session.
};

// The Origin Set cannot shrink. No need to check if it suddenly became "uncovered".
// The Origin Set cannot shrink. No need to check if it suddenly became covered by another one.
session.once('origin', () => {
if (session[kCurrentStreamsCount] >= session.remoteSettings.maxConcurrentStreams) {
if (!isFree()) {
return;
}

Expand All @@ -292,43 +331,24 @@ class Agent extends EventEmitter {
checkQueue();
});

session.once('localSettings', () => {
removeFromQueue();

const movedListeners = listeners.splice(session.remoteSettings.maxConcurrentStreams);

if (movedListeners.length !== 0) {
const freeSessions = getSessions(this.freeSessions, normalizedOptions, normalizedAuthority);

while (freeSessions.length !== 0 && movedListeners.length !== 0) {
movedListeners.shift().resolve(freeSessions[0]);

if (freeSessions[0][kCurrentStreamsCount] >= freeSessions[0].remoteSettings.maxConcurrentStreams) {
freeSessions.shift();
}
}

if (movedListeners.length !== 0) {
this.getSession(authority, options);

// Replace listeners with the new ones
const {listeners} = this.queue[normalizedOptions][normalizedAuthority];
listeners.length = 0;
listeners.push(...movedListeners);
}
}

if (Reflect.has(this.freeSessions, normalizedOptions)) {
this.freeSessions[normalizedOptions].push(session);
session.once('remoteSettings', () => {
if (freeSession()) {
checkQueue();
} else if (this.maxFreeSessions === 0) {
checkQueue();
setImmediate(() => session.close());
} else {
this.freeSessions[normalizedOptions] = [session];
session.close();
}

for (const listener of listeners) {
listener.resolve(session);
if (listeners.length !== 0) {
// Requests for a new session with predefined listeners
this.getSession(normalizedAuthority, options, listeners);
listeners.length = 0;
}

receivedSettings = true;
removeFromQueue();
});

session[kRequest] = session.request;
Expand All @@ -339,35 +359,22 @@ class Agent extends EventEmitter {

session.ref();

if (++session[kCurrentStreamsCount] >= session.remoteSettings.maxConcurrentStreams) {
removeSession(this.freeSessions, normalizedOptions, session);
++session[kCurrentStreamsCount];

if (Reflect.has(this.busySessions, normalizedOptions)) {
this.busySessions[normalizedOptions].push(session);
} else {
this.busySessions[normalizedOptions] = [session];
}
if (!isFree() && removeSession(this.freeSessions, normalizedOptions, session)) {
addSession(this.busySessions, normalizedOptions, session);
}

stream.once('close', () => {
if (--session[kCurrentStreamsCount] < session.remoteSettings.maxConcurrentStreams) {
--session[kCurrentStreamsCount];

if (isFree()) {
if (session[kCurrentStreamsCount] === 0) {
session.unref();
}

if (removeSession(this.busySessions, normalizedOptions, session) && !session.destroyed && !session.closed) {
const freeSessionsLength = getSessions(this.freeSessions, normalizedOptions, normalizedAuthority).length;

if (freeSessionsLength < this.maxFreeSessions) {
if (Reflect.has(this.freeSessions, normalizedOptions)) {
this.freeSessions[normalizedOptions].push(session);
} else {
this.freeSessions[normalizedOptions] = [session];
}

// The session cannot be uncovered at this point. To be uncovered,
// the only possible way is to make another session cover this one.

if (freeSession()) {
closeCoveredSessions(this.freeSessions, normalizedOptions, session);
closeCoveredSessions(this.busySessions, normalizedOptions, session);
checkQueue();
Expand All @@ -387,7 +394,7 @@ class Agent extends EventEmitter {
listener.reject(error);
}

delete this.queue[normalizedOptions][normalizedAuthority];
removeFromQueue();
}
};

Expand All @@ -399,11 +406,15 @@ class Agent extends EventEmitter {
});
}

async request(authority, options, headers) {
const session = await this.getSession(authority, options);
const stream = session.request(headers);

return stream;
request(authority, options, headers) {
return new Promise((resolve, reject) => {
this.getSession(authority, options, [{
reject,
resolve: session => {
resolve(session.request(headers));
}
}]);
});
}

createConnection(authority, options) {
Expand Down
Loading

0 comments on commit 32d160a

Please sign in to comment.