Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-6454): use timeoutcontext for state machine execute() cursor options #4291

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions src/client-side-encryption/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
serialize
} from '../bson';
import { type ProxyOptions } from '../cmap/connection';
import { CursorTimeoutContext } from '../cursor/abstract_cursor';
import { getSocks, type SocksLib } from '../deps';
import { MongoOperationTimeoutError } from '../error';
import { type MongoClient, type MongoClientOptions } from '../mongo_client';
Expand Down Expand Up @@ -519,16 +520,16 @@ export class StateMachine {
): Promise<Uint8Array | null> {
const { db } = MongoDBCollectionNamespace.fromString(ns);

const collections = await client
.db(db)
.listCollections(filter, {
promoteLongs: false,
promoteValues: false,
...(timeoutContext?.csotEnabled()
? { timeoutMS: timeoutContext?.remainingTimeMS, timeoutMode: 'cursorLifetime' }
: {})
})
.toArray();
const cursor = client.db(db).listCollections(filter, {
promoteLongs: false,
promoteValues: false,
timeoutContext: timeoutContext && new CursorTimeoutContext(timeoutContext, Symbol())
});

// There is always exactly zero or one matching documents, so this should always exhaust the cursor
// in a single batch. We call `toArray()` just to be safe and ensure that the cursor is always
// exhausted and closed.
const collections = await cursor.toArray();

const info = collections.length > 0 ? serialize(collections[0]) : null;
return info;
Expand Down Expand Up @@ -582,12 +583,9 @@ export class StateMachine {
return client
.db(dbName)
.collection<DataKey>(collectionName, { readConcern: { level: 'majority' } })
.find(
deserialize(filter),
timeoutContext?.csotEnabled()
? { timeoutMS: timeoutContext?.remainingTimeMS, timeoutMode: 'cursorLifetime' }
: {}
)
.find(deserialize(filter), {
timeoutContext: timeoutContext && new CursorTimeoutContext(timeoutContext, Symbol())
})
.toArray();
}
}
5 changes: 4 additions & 1 deletion src/operations/list_collections.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Binary, Document } from '../bson';
import { CursorResponse } from '../cmap/wire_protocol/responses';
import { type CursorTimeoutMode } from '../cursor/abstract_cursor';
import { type CursorTimeoutContext, type CursorTimeoutMode } from '../cursor/abstract_cursor';
import type { Db } from '../db';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
Expand All @@ -19,6 +19,9 @@ export interface ListCollectionsOptions extends Omit<CommandOperationOptions, 'w
batchSize?: number;
/** @internal */
timeoutMode?: CursorTimeoutMode;

/** @internal */
timeoutContext?: CursorTimeoutContext;
}

/** @internal */
Expand Down
68 changes: 67 additions & 1 deletion test/integration/client-side-encryption/driver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type Binary, EJSON, UUID } from 'bson';
import { expect } from 'chai';
import * as crypto from 'crypto';
import * as sinon from 'sinon';
import { setTimeout } from 'timers/promises';

// eslint-disable-next-line @typescript-eslint/no-restricted-imports
import { ClientEncryption } from '../../../src/client-side-encryption/client_encryption';
Expand All @@ -13,7 +14,9 @@ import {
CSOTTimeoutContext,
type MongoClient,
MongoOperationTimeoutError,
StateMachine
resolveTimeoutOptions,
StateMachine,
TimeoutContext
} from '../../mongodb';
import {
clearFailPoint,
Expand All @@ -23,6 +26,7 @@ import {
measureDuration,
sleep
} from '../../tools/utils';
import { filterForCommands } from '../shared';

const metadata: MongoDBMetadataUI = {
requires: {
Expand Down Expand Up @@ -948,6 +952,68 @@ describe('CSOT', function () {
}
);

context('when the cursor times out and a killCursors is executed', function () {
let client: MongoClient;
let commands: (CommandStartedEvent & { command: { maxTimeMS?: number } })[] = [];

beforeEach(async function () {
client = this.configuration.newClient({}, { monitorCommands: true });
commands = [];
client.on('commandStarted', filterForCommands('killCursors', commands));

await client.connect();
const docs = Array.from({ length: 1200 }, (_, i) => ({ i }));

await client.db('test').collection('test').insertMany(docs);

await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
data: {
failCommands: ['getMore'],
blockConnection: true,
blockTimeMS: 2000
}
});
});

afterEach(async function () {
await clearFailPoint(this.configuration);
await client.close();
});

it(
'refreshes timeoutMS to the full timeout',
{
requires: {
...metadata.requires,
topology: '!load-balanced'
}
},
async function () {
const timeoutContext = TimeoutContext.create(
resolveTimeoutOptions(client, { timeoutMS: 1900 })
);

await setTimeout(1500);

const { result: error } = await measureDuration(() =>
stateMachine
.fetchKeys(client, 'test.test', BSON.serialize({}), timeoutContext)
.catch(e => e)
);
expect(error).to.be.instanceOf(MongoOperationTimeoutError);

const [
{
command: { maxTimeMS }
}
] = commands;
expect(maxTimeMS).to.be.greaterThan(1800);
}
);
});

context('when csot is not enabled and fetchKeys() is delayed', function () {
let encryptedClient;

Expand Down
49 changes: 30 additions & 19 deletions test/integration/crud/client_bulk_write.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import {
clearFailPoint,
configureFailPoint,
makeMultiBatchWrite,
makeMultiResponseBatchModelArray
makeMultiResponseBatchModelArray,
mergeTestMetadata
} from '../../tools/utils';
import { filterForCommands } from '../shared';

Expand Down Expand Up @@ -268,7 +269,7 @@ describe('Client Bulk Write', function () {

beforeEach(async function () {
client = this.configuration.newClient({}, { monitorCommands: true, minPoolSize: 5 });
client.on('commandStarted', filterForCommands(['getMore'], commands));
client.on('commandStarted', filterForCommands(['getMore', 'killCursors'], commands));
await client.connect();

await configureFailPoint(this.configuration, {
Expand All @@ -278,25 +279,35 @@ describe('Client Bulk Write', function () {
});
});

it('the bulk write operation times out', metadata, async function () {
const models = await makeMultiResponseBatchModelArray(this.configuration);
const start = now();
const timeoutError = await client
.bulkWrite(models, {
verboseResults: true,
timeoutMS: 1500
})
.catch(e => e);
it(
'the bulk write operation times out',
mergeTestMetadata(metadata, {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should fix some of the flakiness we see in CI.

requires: {
// this test has timing logic that depends on killCursors being executed, which does
// not happen in load balanced mode
topology: '!load-balanced'
}
}),
async function () {
const models = await makeMultiResponseBatchModelArray(this.configuration);
const start = now();
const timeoutError = await client
.bulkWrite(models, {
verboseResults: true,
timeoutMS: 1500
})
.catch(e => e);

const end = now();
expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError);
const end = now();
expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError);

// DRIVERS-3005 - killCursors causes cursor cleanup to extend past timeoutMS.
// The amount of time killCursors takes is wildly variable and can take up to almost
// 600-700ms sometimes.
expect(end - start).to.be.within(1500, 1500 + 800);
expect(commands).to.have.lengthOf(1);
});
// DRIVERS-3005 - killCursors causes cursor cleanup to extend past timeoutMS.
// The amount of time killCursors takes is wildly variable and can take up to almost
// 600-700ms sometimes.
expect(end - start).to.be.within(1500, 1500 + 800);
expect(commands.map(({ commandName }) => commandName)).to.have.lengthOf(2);
}
);
});

describe('if the cursor encounters an error and a killCursors is sent', function () {
Expand Down
16 changes: 16 additions & 0 deletions test/tools/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -689,3 +689,19 @@ export async function measureDuration<T>(f: () => Promise<T>): Promise<{
result
};
}

export function mergeTestMetadata(
metadata: MongoDBMetadataUI,
newMetadata: MongoDBMetadataUI
): MongoDBMetadataUI {
return {
requires: {
...metadata.requires,
...newMetadata.requires
},
sessions: {
...metadata.sessions,
...newMetadata.sessions
}
};
aditi-khare-mongoDB marked this conversation as resolved.
Show resolved Hide resolved
}
37 changes: 22 additions & 15 deletions test/unit/client-side-encryption/state_machine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import {
BSON,
Collection,
CSOTTimeoutContext,
CursorTimeoutContext,
type FindOptions,
Int32,
Long,
MongoClient,
Expand Down Expand Up @@ -484,26 +486,29 @@ describe('StateMachine', function () {
});

context('when StateMachine.fetchKeys() is passed a `CSOTimeoutContext`', function () {
it('collection.find runs with its timeoutMS property set to remainingTimeMS', async function () {
const timeoutContext = new CSOTTimeoutContext({
it('collection.find uses the provided timeout context', async function () {
const context = new CSOTTimeoutContext({
timeoutMS: 500,
serverSelectionTimeoutMS: 30000
});
await sleep(300);

await stateMachine
.fetchKeys(client, 'keyVault', BSON.serialize({ a: 1 }), timeoutContext)
.fetchKeys(client, 'keyVault', BSON.serialize({ a: 1 }), context)
.catch(e => squashError(e));
expect(findSpy.getCalls()[0].args[1].timeoutMS).to.not.be.undefined;
expect(findSpy.getCalls()[0].args[1].timeoutMS).to.be.lessThanOrEqual(205);

const { timeoutContext } = findSpy.getCalls()[0].args[1] as FindOptions;
expect(timeoutContext).to.be.instanceOf(CursorTimeoutContext);
expect(timeoutContext.timeoutContext).to.equal(context);
});
});

context('when StateMachine.fetchKeys() is not passed a `CSOTimeoutContext`', function () {
it('collection.find runs with an undefined timeoutMS property', async function () {
it('a timeoutContext is not provided to the find cursor', async function () {
await stateMachine
.fetchKeys(client, 'keyVault', BSON.serialize({ a: 1 }))
.catch(e => squashError(e));
expect(findSpy.getCalls()[0].args[1].timeoutMS).to.be.undefined;
const { timeoutContext } = findSpy.getCalls()[0].args[1] as FindOptions;
expect(timeoutContext).to.be.undefined;
});
});
});
Expand Down Expand Up @@ -564,29 +569,31 @@ describe('StateMachine', function () {
context(
'when StateMachine.fetchCollectionInfo() is passed a `CSOTimeoutContext`',
function () {
it('listCollections runs with its timeoutMS property set to remainingTimeMS', async function () {
const timeoutContext = new CSOTTimeoutContext({
it('listCollections uses the provided timeoutContext', async function () {
const context = new CSOTTimeoutContext({
timeoutMS: 500,
serverSelectionTimeoutMS: 30000
});
await sleep(300);
await stateMachine
.fetchCollectionInfo(client, 'keyVault', BSON.serialize({ a: 1 }), timeoutContext)
.fetchCollectionInfo(client, 'keyVault', BSON.serialize({ a: 1 }), context)
.catch(e => squashError(e));
expect(listCollectionsSpy.getCalls()[0].args[1].timeoutMS).to.not.be.undefined;
expect(listCollectionsSpy.getCalls()[0].args[1].timeoutMS).to.be.lessThanOrEqual(205);
const [_filter, { timeoutContext }] = listCollectionsSpy.getCalls()[0].args;
expect(timeoutContext).to.exist;
expect(timeoutContext.timeoutContext).to.equal(context);
});
}
);

context(
'when StateMachine.fetchCollectionInfo() is not passed a `CSOTimeoutContext`',
function () {
it('listCollections runs with an undefined timeoutMS property', async function () {
it('no timeoutContext is provided to listCollections', async function () {
await stateMachine
.fetchCollectionInfo(client, 'keyVault', BSON.serialize({ a: 1 }))
.catch(e => squashError(e));
expect(listCollectionsSpy.getCalls()[0].args[1].timeoutMS).to.be.undefined;
const [_filter, { timeoutContext }] = listCollectionsSpy.getCalls()[0].args;
expect(timeoutContext).not.to.exist;
});
}
);
Expand Down