Skip to content

Commit

Permalink
fix(NODE-4834): ensure that MessageStream is destroyed when connectio…
Browse files Browse the repository at this point in the history
…ns are destroyed (#3482)
  • Loading branch information
W-A-James authored Dec 16, 2022
1 parent 9f945c4 commit 8338bae
Show file tree
Hide file tree
Showing 13 changed files with 278 additions and 127 deletions.
2 changes: 1 addition & 1 deletion src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ function performInitialHandshake(
) {
const callback: Callback<Document> = function (err, ret) {
if (err && conn) {
conn.destroy();
conn.destroy({ force: false });
}
_callback(err, ret);
};
Expand Down
53 changes: 16 additions & 37 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export interface ConnectionOptions
/** @public */
export interface DestroyOptions {
/** Force the destruction. */
force?: boolean;
force: boolean;
}

/** @public */
Expand All @@ -170,8 +170,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
address: string;
socketTimeoutMS: number;
monitorCommands: boolean;
/** Indicates that the connection (including underlying TCP socket) has been closed. */
closed: boolean;
destroyed: boolean;
lastHelloMS?: number;
serverApi?: ServerApi;
helloOk?: boolean;
Expand Down Expand Up @@ -220,7 +220,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.monitorCommands = options.monitorCommands;
this.serverApi = options.serverApi;
this.closed = false;
this.destroyed = false;
this[kHello] = null;
this[kClusterTime] = null;

Expand Down Expand Up @@ -313,10 +312,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
if (this.closed) {
return;
}

this[kStream].destroy(error);

this.closed = true;
this.destroy({ force: false });

for (const op of this[kQueue].values()) {
op.cb(error);
Expand All @@ -330,8 +326,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
if (this.closed) {
return;
}

this.closed = true;
this.destroy({ force: false });

const message = `connection ${this.id} to ${this.address} closed`;
for (const op of this[kQueue].values()) {
Expand All @@ -348,9 +343,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}

this[kDelayedTimeoutId] = setTimeout(() => {
this[kStream].destroy();

this.closed = true;
this.destroy({ force: false });

const message = `connection ${this.id} to ${this.address} timed out`;
const beforeHandshake = this.hello == null;
Expand Down Expand Up @@ -459,41 +452,27 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
callback(undefined, message.documents[0]);
}

destroy(options?: DestroyOptions, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = { force: false };
}

destroy(options: DestroyOptions, callback?: Callback): void {
this.removeAllListeners(Connection.PINNED);
this.removeAllListeners(Connection.UNPINNED);

options = Object.assign({ force: false }, options);
if (this[kStream] == null || this.destroyed) {
this.destroyed = true;
if (typeof callback === 'function') {
callback();
}

return;
}
this[kMessageStream].destroy();
this.closed = true;

if (options.force) {
this[kStream].destroy();
this.destroyed = true;
if (typeof callback === 'function') {
callback();
if (callback) {
return process.nextTick(callback);
}

return;
}

this[kStream].end(() => {
this.destroyed = true;
if (typeof callback === 'function') {
callback();
if (!this[kStream].writableEnded) {
this[kStream].end(callback);
} else {
if (callback) {
return process.nextTick(callback);
}
});
}
}

command(
Expand Down
4 changes: 2 additions & 2 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, conn, 'poolClosed')
);
conn.destroy(options, cb);
conn.destroy({ force: !!options.force }, cb);
},
err => {
this[kConnections].clear();
Expand Down Expand Up @@ -591,7 +591,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
new ConnectionClosedEvent(this, connection, reason)
);
// destroy the connection
process.nextTick(() => connection.destroy());
process.nextTick(() => connection.destroy({ force: false }));
}

private connectionIsStale(connection: Connection) {
Expand Down
5 changes: 4 additions & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {

/** Destroy the server connection */
destroy(options?: DestroyOptions, callback?: Callback): void {
if (typeof options === 'function') (callback = options), (options = {});
if (typeof options === 'function') {
callback = options;
options = { force: false };
}
options = Object.assign({}, { force: false }, options);

if (this.s.state === STATE_CLOSED) {
Expand Down
17 changes: 4 additions & 13 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -484,26 +484,17 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

/** Close this topology */
close(callback: Callback): void;
close(options: CloseOptions): void;
close(options: CloseOptions, callback: Callback): void;
close(options?: CloseOptions | Callback, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = {};
}

if (typeof options === 'boolean') {
options = { force: options };
}
options = options ?? {};
close(options?: CloseOptions, callback?: Callback): void {
options = options ?? { force: false };

if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
return callback?.();
}

const destroyedServers = Array.from(this.s.servers.values(), server => {
return promisify(destroyServer)(server, this, options as CloseOptions);
return promisify(destroyServer)(server, this, { force: !!options?.force });
});

Promise.all(destroyedServers)
Expand Down Expand Up @@ -765,7 +756,7 @@ function destroyServer(
options?: DestroyOptions,
callback?: Callback
) {
options = options ?? {};
options = options ?? { force: false };
for (const event of LOCAL_SERVER_EVENTS) {
server.removeAllListeners(event);
}
Expand Down
69 changes: 20 additions & 49 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const { ReadPreference } = require('../../../src/read_preference');
const { ServerType } = require('../../../src/sdam/common');
const { formatSort } = require('../../../src/sort');
const { getSymbolFrom } = require('../../tools/utils');
const { MongoExpiredSessionError } = require('../../../src/error');

describe('Cursor', function () {
before(function () {
Expand Down Expand Up @@ -1905,61 +1906,31 @@ describe('Cursor', function () {
}
});

it('should close dead tailable cursors', {
metadata: {
os: '!win32' // NODE-2943: timeout on windows
},

test: function (done) {
// http://www.mongodb.org/display/DOCS/Tailable+Cursors

const configuration = this.configuration;
client.connect((err, client) => {
expect(err).to.not.exist;
this.defer(() => client.close());

const db = client.db(configuration.db);
const options = { capped: true, size: 10000000 };
db.createCollection(
'test_if_dead_tailable_cursors_close',
options,
function (err, collection) {
expect(err).to.not.exist;
it('closes cursors when client is closed even if it has not been exhausted', async function () {
await client
.db()
.dropCollection('test_cleanup_tailable')
.catch(() => null);

let closeCount = 0;
const docs = Array.from({ length: 100 }).map(() => ({ a: 1 }));
collection.insertMany(docs, { w: 'majority', wtimeoutMS: 5000 }, err => {
expect(err).to.not.exist;

const cursor = collection.find({}, { tailable: true, awaitData: true });
const stream = cursor.stream();
const collection = await client
.db()
.createCollection('test_cleanup_tailable', { capped: true, size: 1000, max: 3 });

stream.resume();

var validator = () => {
closeCount++;
if (closeCount === 2) {
done();
}
};
// insert only 2 docs in capped coll of 3
await collection.insertMany([{ a: 1 }, { a: 1 }]);

// we validate that the stream "ends" either cleanly or with an error
stream.on('end', validator);
stream.on('error', validator);
const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 2000 });

cursor.on('close', validator);
await cursor.next();
await cursor.next();
// will block for maxAwaitTimeMS (except we are closing the client)
const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error);

const docs = Array.from({ length: 100 }).map(() => ({ a: 1 }));
collection.insertMany(docs, err => {
expect(err).to.not.exist;
await client.close();
expect(cursor).to.have.property('killed', true);

setTimeout(() => client.close());
});
});
}
);
});
}
const error = await rejectedEarlyBecauseClientClosed;
expect(error).to.be.instanceOf(MongoExpiredSessionError);
});

it('shouldAwaitData', {
Expand Down
18 changes: 13 additions & 5 deletions test/integration/node-specific/topology.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@ describe('Topology', function () {
const states = [];
topology.on('stateChanged', (_, newState) => states.push(newState));
topology.connect(err => {
expect(err).to.not.exist;
topology.close(err => {
try {
expect(err).to.not.exist;
expect(topology.isDestroyed()).to.be.true;
expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']);
done();
} catch (error) {
done(error);
}
topology.close({}, err => {
try {
expect(err).to.not.exist;
expect(topology.isDestroyed()).to.be.true;
expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']);
done();
} catch (error) {
done(error);
}
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ describe('Polling Srv Records for Mongos Discovery', () => {

afterEach(function (done) {
if (context.topology) {
context.topology.close(done);
context.topology.close({}, done);
} else {
done();
}
Expand Down
2 changes: 1 addition & 1 deletion test/unit/assorted/server_selection_spec_helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ function executeServerSelectionTest(testDefinition, testDone) {
});

function done(err) {
topology.close(e => testDone(e || err));
topology.close({}, e => testDone(e || err));
}

topology.connect(err => {
Expand Down
Loading

0 comments on commit 8338bae

Please sign in to comment.