Skip to content

Commit

Permalink
quic: implement QuicSocket Promise API, part 2
Browse files Browse the repository at this point in the history
PR-URL: #34283
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
jasnell authored and cjihrig committed Jul 22, 2020
1 parent 766320b commit b471188
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 101 deletions.
7 changes: 4 additions & 3 deletions doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -1584,16 +1584,17 @@ with this `QuicSocket`.

Read-only.

#### quicsocket.close(\[callback\])
#### quicsocket.close()
<!-- YAML
added: REPLACEME
-->

* `callback` {Function}
* Returns: {Promise}

Gracefully closes the `QuicSocket`. Existing `QuicSession` instances will be
permitted to close naturally. New `QuicClientSession` and `QuicServerSession`
instances will not be allowed.
instances will not be allowed. The returns `Promise` will be resolved once
the `QuicSocket` is destroyed.

#### quicsocket.connect(\[options\])
<!-- YAML
Expand Down
151 changes: 64 additions & 87 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,7 @@ const kRejections = Symbol.for('nodejs.rejection');
const kSocketUnbound = 0;
const kSocketPending = 1;
const kSocketBound = 2;
const kSocketClosing = 3;
const kSocketDestroyed = 4;
const kSocketDestroyed = 3;

let diagnosticPacketLossWarned = false;
let warnedVerifyHostnameIdentity = false;
Expand Down Expand Up @@ -939,6 +938,9 @@ class QuicSocket extends EventEmitter {
alpn: undefined,
bindPromise: undefined,
client: undefined,
closePromise: undefined,
closePromiseResolve: undefined,
closePromiseReject: undefined,
defaultEncoding: undefined,
endpoints: new Set(),
highWaterMark: undefined,
Expand Down Expand Up @@ -1089,8 +1091,10 @@ class QuicSocket extends EventEmitter {
}

[kRemoveSession](session) {
this[kInternalState].sessions.delete(session);
this[kMaybeDestroy]();
const state = this[kInternalState];
state.sessions.delete(session);
if (this.closing && state.sessions.size === 0)
this.destroy();
}

[kMaybeBind](options) {
Expand Down Expand Up @@ -1191,37 +1195,6 @@ class QuicSocket extends EventEmitter {
});
}

[kEndpointClose](endpoint, error) {
const state = this[kInternalState];
state.endpoints.delete(endpoint);
process.nextTick(() => {
try {
this.emit('endpointClose', endpoint, error);
} catch (error) {
this.destroy(error);
}
});

// If there aren't any more endpoints, the QuicSession
// is no longer usable and needs to be destroyed.
if (state.endpoints.size === 0) {
if (!this.destroyed)
return this.destroy(error);
this[kDestroy](error);
}
}

// kMaybeDestroy is called one or more times after the close() method
// is called. The QuicSocket will be destroyed if there are no remaining
// open sessions.
[kMaybeDestroy]() {
if (this.closing && this[kInternalState].sessions.size === 0) {
this.destroy();
return true;
}
return false;
}

// Called by the C++ internals to notify when server busy status is toggled.
[kServerBusy]() {
const busy = this.serverBusy;
Expand Down Expand Up @@ -1419,6 +1392,26 @@ class QuicSocket extends EventEmitter {
return session;
}

[kEndpointClose](endpoint, error) {
const state = this[kInternalState];
state.endpoints.delete(endpoint);
process.nextTick(() => {
try {
this.emit('endpointClose', endpoint, error);
} catch (error) {
this.destroy(error);
}
});

// If there aren't any more endpoints, the QuicSession
// is no longer usable and needs to be destroyed.
if (state.endpoints.size === 0) {
if (!this.destroyed)
return this.destroy(error);
this[kDestroy](error);
}
}

// Initiate a Graceful Close of the QuicSocket.
// Existing QuicClientSession and QuicServerSession instances will be
// permitted to close naturally and gracefully on their own.
Expand All @@ -1427,80 +1420,57 @@ class QuicSocket extends EventEmitter {
// QuicClientSession or QuicServerSession instances, the QuicSocket
// will be immediately closed.
//
// If specified, the callback will be registered for once('close').
// Returns a Promise that will be resolved once the QuicSocket is
// destroyed.
//
// No additional QuicServerSession instances will be accepted from
// remote peers, and calls to connect() to create QuicClientSession
// instances will fail. The QuicSocket will be otherwise usable in
// every other way.
//
// Subsequent calls to close(callback) will register the close callback
// if one is defined but will otherwise do nothing.
//
// Once initiated, a graceful close cannot be canceled. The graceful
// close can be interupted, however, by abruptly destroying the
// QuicSocket using the destroy() method.
//
// If close() is called before the QuicSocket has been bound (before
// either connect() or listen() have been called, or the QuicSocket
// is still in the pending state, the callback is registered for the
// once('close') event (if specified) and the QuicSocket is destroyed
// is still in the pending state, the QuicSocket is destroyed
// immediately.
close(callback) {
const state = this[kInternalState];
if (this.destroyed)
throw new ERR_INVALID_STATE('QuicSocket is already destroyed');

// If a callback function is specified, it is registered as a
// handler for the once('close') event. If the close occurs
// immediately, the close event will be emitted as soon as the
// process.nextTick queue is processed. Otherwise, the close
// event will occur at some unspecified time in the near future.
if (callback) {
if (typeof callback !== 'function')
throw new ERR_INVALID_CALLBACK();
this.once('close', callback);
}

// If we are already closing, do nothing else and wait
// for the close event to be invoked.
if (state.state === kSocketClosing)
return;
close() {
return this[kInternalState].closePromise || this[kClose]();
}

// If the QuicSocket is otherwise not bound to the local
// port, destroy the QuicSocket immediately.
if (state.state !== kSocketBound) {
this.destroy();
[kClose]() {
if (this.destroyed) {
return Promise.reject(
new ERR_INVALID_STATE('QuicSocket is already destroyed'));
}

// Mark the QuicSocket as closing to prevent re-entry
state.state = kSocketClosing;

// Otherwise, gracefully close each QuicSession, with
// [kMaybeDestroy]() being called after each closes.
const maybeDestroy = this[kMaybeDestroy].bind(this);
const state = this[kInternalState];
const promise = deferredClosePromise(state);

// Tell the underlying QuicSocket C++ object to stop
// listening for new QuicServerSession connections.
// New initial connection packets for currently unknown
// DCID's will be ignored.
if (this[kHandle])
this[kInternalState].sharedState.serverListening = false;
state.sharedState.serverListening = false;

// If there are no sessions, calling maybeDestroy
// will immediately and synchronously destroy the
// QuicSocket.
if (maybeDestroy())
return;
// If the QuicSocket is otherwise not bound to the local
// port, or there are not active sessions, destroy the
// QuicSocket immediately and we're done.
if (state.state !== kSocketBound || state.sessions.size === 0) {
this.destroy();
return promise;
}

// If we got this far, there a QuicClientSession and
// QuicServerSession instances still, we need to trigger
// a graceful close for each of them. As each closes,
// they will call the kMaybeDestroy function. When there
// are no remaining session instances, the QuicSocket
// will be closed and destroyed.
// Otherwise, loop through each of the known sessions
// and close them.
// TODO(@jasnell): These will be promises soon, but we
// do not want to await them.
for (const session of state.sessions)
session.close(maybeDestroy);
session.close();

return promise;
}

// Initiate an abrupt close and destruction of the QuicSocket.
Expand Down Expand Up @@ -1546,7 +1516,14 @@ class QuicSocket extends EventEmitter {
}

[kDestroy](error) {
if (error) process.nextTick(emit.bind(this, 'error', error));
const state = this[kInternalState];
if (error) {
if (typeof state.closePromiseReject === 'function')
state.closePromiseReject(error);
process.nextTick(emit.bind(this, 'error', error));
} else if (typeof state.closePromiseResolve === 'function') {
state.closePromiseResolve();
}
process.nextTick(emit.bind(this, 'close'));
}

Expand Down Expand Up @@ -1587,7 +1564,7 @@ class QuicSocket extends EventEmitter {

// True if graceful close has been initiated by calling close()
get closing() {
return this[kInternalState].state === kSocketClosing;
return this[kInternalState].closePromise !== undefined;
}

// True if the QuicSocket has been destroyed and is no longer usable
Expand Down
12 changes: 7 additions & 5 deletions test/parallel/test-quic-quicsession-server-destroy-early.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ const server = createQuicSocket({ server: options });

(async function() {
server.on('session', common.mustCall((session) => {
session.on('close', common.mustCall(() => {
client.close();
server.close();
assert.throws(() => server.close(), {
session.on('stream', common.mustNotCall());
session.on('close', common.mustCall(async () => {
await Promise.all([
client.close(),
server.close()
]);
assert.rejects(server.close(), {
code: 'ERR_INVALID_STATE',
name: 'Error'
});
}));
session.on('stream', common.mustNotCall());
session.destroy();
}));

Expand Down
13 changes: 7 additions & 6 deletions test/parallel/test-quic-quicsocket-close.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ if (!common.hasQuic)
const assert = require('assert');
const { createQuicSocket } = require('net');

{
const socket = createQuicSocket();
socket.close(common.mustCall());
socket.on('close', common.mustCall());
assert.throws(() => socket.close(), {
const socket = createQuicSocket();
socket.on('close', common.mustCall());

(async function() {
await socket.close();
assert.rejects(() => socket.close(), {
code: 'ERR_INVALID_STATE'
});
}
})().then(common.mustCall());

0 comments on commit b471188

Please sign in to comment.