Skip to content

Commit

Permalink
Merge pull request #75 from MatrixAI/feature-unforced-end
Browse files Browse the repository at this point in the history
fix ending `QUICConnection` with `force: false`
  • Loading branch information
tegefaulkes authored Nov 1, 2023
2 parents c06bf7f + e64ed4a commit 5abf687
Show file tree
Hide file tree
Showing 6 changed files with 452 additions and 23 deletions.
48 changes: 48 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 52 additions & 15 deletions src/QUICConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { buildQuicheConfig, minIdleTimeout } from './config';
import QUICConnectionId from './QUICConnectionId';
import QUICStream from './QUICStream';
import { quiche, ConnectionErrorCode } from './native';
import { Shutdown } from './native/types';
import * as utils from './utils';
import * as events from './events';
import * as errors from './errors';
Expand Down Expand Up @@ -616,7 +617,27 @@ class QUICConnection {
} = {}) {
this.logger.info(`Stop ${this.constructor.name}`);
this.stopKeepAliveIntervalTimer();
// Closing the connection first to avoid accepting new streams

// Yield to allow any background processing to settle before proceeding.
// This will allow any streams to process buffers before continuing
await utils.yieldMicro();

// Destroy all streams
const streamsDestroyP: Array<Promise<void>> = [];
for (const quicStream of this.streamMap.values()) {
// The reason is only used if `force` is `true`
// If `force` is not true, this will gracefully wait for
// both readable and writable to gracefully close
streamsDestroyP.push(
quicStream.destroy({
reason: this.errorLast,
force: force || this.conn.isDraining() || this.conn.isClosed(),
}),
);
}
await Promise.all(streamsDestroyP);

// Close after processing all streams
if (!this.conn.isDraining() && !this.conn.isClosed()) {
// If `this.conn.close` is already called, the connection will be draining,
// in that case we just skip doing this local close.
Expand All @@ -632,20 +653,7 @@ class QUICConnection {
});
this.dispatchEvent(new events.EventQUICConnectionError({ detail: e }));
}
// Destroy all streams
const streamsDestroyP: Array<Promise<void>> = [];
for (const quicStream of this.streamMap.values()) {
// The reason is only used if `force` is `true`
// If `force` is not true, this will gracefully wait for
// both readable and writable to gracefully close
streamsDestroyP.push(
quicStream.destroy({
reason: this.errorLast,
force: force || this.conn.isDraining() || this.conn.isClosed(),
}),
);
}
await Promise.all(streamsDestroyP);

// Waiting for `closedP` to resolve
// Only the `this.connTimeoutTimer` will resolve this promise
await this.closedP;
Expand Down Expand Up @@ -940,6 +948,13 @@ class QUICConnection {
for (const streamId of this.conn.readable() as Iterable<StreamId>) {
let quicStream = this.streamMap.get(streamId);
if (quicStream == null) {
if (this[running] === false || this[status] === 'stopping') {
// We should reject new connections when stopping
this.conn.streamShutdown(streamId, Shutdown.Write, 1);
this.conn.streamShutdown(streamId, Shutdown.Read, 1);
continue;
}

quicStream = QUICStream.createQUICStream({
initiated: 'peer',
streamId,
Expand Down Expand Up @@ -969,6 +984,13 @@ class QUICConnection {
for (const streamId of this.conn.writable() as Iterable<StreamId>) {
let quicStream = this.streamMap.get(streamId);
if (quicStream == null) {
if (this[running] === false || this[status] === 'stopping') {
// We should reject new connections when stopping
this.conn.streamShutdown(streamId, Shutdown.Write, 1);
this.conn.streamShutdown(streamId, Shutdown.Read, 1);
continue;
}

quicStream = QUICStream.createQUICStream({
initiated: 'peer',
streamId,
Expand Down Expand Up @@ -1132,6 +1154,21 @@ class QUICConnection {
return quicStream;
}

/**
* Destroys all active streams without closing the connection.
*
* If there are no active streams then it will do nothing.
* If the connection is stopped with `force: false` then this can be used
* to force close any streams `stop` is waiting for to end.
*
* Destruction will occur in the background.
*/
public destroyStreams(reason?: any) {
for (const quicStream of this.streamMap.values()) {
quicStream.cancel(reason);
}
}

/**
* Starts the keep alive interval timer.
*
Expand Down
8 changes: 8 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ import * as errors from './errors';
const textEncoder = new TextEncoder();
const textDecoder = new TextDecoder('utf-8');

/**
* Used to yield to the event loop to allow other micro tasks to process
*/
async function yieldMicro(): Promise<void> {
return await new Promise<void>((r) => queueMicrotask(r));
}

/**
* Convert callback-style to promise-style
* If this is applied to overloaded function
Expand Down Expand Up @@ -550,6 +557,7 @@ function isStreamReset(e: Error): number | false {
export {
textEncoder,
textDecoder,
yieldMicro,
promisify,
promise,
bufferWrap,
Expand Down
2 changes: 2 additions & 0 deletions tests/QUICClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,7 @@ describe(QUICClient.name, () => {

// Handling client error event
const clientErrorProm = promise<never>();
void clientErrorProm.p.catch(() => {}); // Ignore unhandled rejection
client.addEventListener(
events.EventQUICClientError.name,
(evt: events.EventQUICClientError) => clientErrorProm.rejectP(evt.detail),
Expand All @@ -1783,6 +1784,7 @@ describe(QUICClient.name, () => {

// Handling client destroy event
const clientDestroyedProm = promise<void>();
void clientDestroyedProm.p.catch(() => {}); // Ignore unhandled rejection
client.addEventListener(
events.EventQUICClientDestroyed.name,
() => clientDestroyedProm.resolveP(),
Expand Down
Loading

0 comments on commit 5abf687

Please sign in to comment.