Skip to content

Commit

Permalink
feat(NODE-6304): add CSOT support for non-tailable cursors (#4195)
Browse files Browse the repository at this point in the history
  • Loading branch information
W-A-James authored and nbbeeken committed Nov 1, 2024
1 parent da6f906 commit b931e4f
Show file tree
Hide file tree
Showing 45 changed files with 2,000 additions and 228 deletions.
4 changes: 2 additions & 2 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export interface CommandOptions extends BSONSerializeOptions {
documentsReturnedIn?: string;
noResponse?: boolean;
omitReadPreference?: boolean;
omitMaxTimeMS?: boolean;

// TODO(NODE-2802): Currently the CommandOptions take a property willRetryWrite which is a hint
// from executeOperation that the txnNum should be applied to this command.
Expand Down Expand Up @@ -426,7 +427,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
...options
};

if (options.timeoutContext?.csotEnabled()) {
if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) {
const { maxTimeMS } = options.timeoutContext;
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
}
Expand Down Expand Up @@ -626,7 +627,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
for await (const document of this.sendCommand(ns, command, options, responseType)) {
if (options.timeoutContext?.csotEnabled()) {
if (MongoDBResponse.is(document)) {
// TODO(NODE-5684): test coverage to be added once cursors are enabling CSOT
if (document.isMaxTimeExpiredError) {
throw new MongoOperationTimeoutError('Server reported a timeout error', {
cause: new MongoServerError(document.toObject())
Expand Down
1 change: 0 additions & 1 deletion src/cmap/wire_protocol/on_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ export function onData(

const timeoutForSocketRead = timeoutContext?.timeoutForSocketRead;
timeoutForSocketRead?.throwIfExpired();
// eslint-disable-next-line github/no-then
timeoutForSocketRead?.then(undefined, errorHandler);

return iterator;
Expand Down
6 changes: 4 additions & 2 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
type ListSearchIndexesOptions
} from './cursor/list_search_indexes_cursor';
import type { Db } from './db';
import { MongoInvalidArgumentError } from './error';
import { MongoInvalidArgumentError, MongoOperationTimeoutError } from './error';
import type { MongoClient, PkFactory } from './mongo_client';
import type {
Filter,
Expand Down Expand Up @@ -678,7 +678,9 @@ export class Collection<TSchema extends Document = Document> {
new DropIndexOperation(this as TODO_NODE_3286, '*', resolveOptions(this, options))
);
return true;
} catch {
} catch (error) {
if (error instanceof MongoOperationTimeoutError) throw error; // TODO: Check the spec for index management behaviour/file a drivers ticket for this
// Seems like we should throw all errors
return false;
}
}
Expand Down
146 changes: 118 additions & 28 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
import type { Server } from '../sdam/server';
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
import { TimeoutContext } from '../timeout';
import { type MongoDBNamespace, squashError } from '../utils';

/**
Expand Down Expand Up @@ -60,6 +61,17 @@ export interface CursorStreamOptions {
/** @public */
export type CursorFlag = (typeof CURSOR_FLAGS)[number];

/** @public*/
export const CursorTimeoutMode = Object.freeze({
ITERATION: 'iteration',
LIFETIME: 'cursorLifetime'
} as const);

/** @public
* TODO(NODE-5688): Document and release
* */
export type CursorTimeoutMode = (typeof CursorTimeoutMode)[keyof typeof CursorTimeoutMode];

/** @public */
export interface AbstractCursorOptions extends BSONSerializeOptions {
session?: ClientSession;
Expand Down Expand Up @@ -105,6 +117,8 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
noCursorTimeout?: boolean;
/** @internal TODO(NODE-5688): make this public */
timeoutMS?: number;
/** @internal TODO(NODE-5688): make this public */
timeoutMode?: CursorTimeoutMode;
}

/** @internal */
Expand All @@ -117,6 +131,8 @@ export type InternalAbstractCursorOptions = Omit<AbstractCursorOptions, 'readPre
oplogReplay?: boolean;
exhaust?: boolean;
partial?: boolean;

omitMaxTimeMS?: boolean;
};

/** @public */
Expand Down Expand Up @@ -154,6 +170,8 @@ export abstract class AbstractCursor<
private isKilled: boolean;
/** @internal */
protected readonly cursorOptions: InternalAbstractCursorOptions;
/** @internal */
protected timeoutContext?: TimeoutContext;

/** @event */
static readonly CLOSE = 'close' as const;
Expand Down Expand Up @@ -186,6 +204,30 @@ export abstract class AbstractCursor<
...pluckBSONSerializeOptions(options)
};
this.cursorOptions.timeoutMS = options.timeoutMS;
if (this.cursorOptions.timeoutMS != null) {
if (options.timeoutMode == null) {
if (options.tailable) {
this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION;
} else {
this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME;
}
} else {
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
throw new MongoInvalidArgumentError(
"Cannot set tailable cursor's timeoutMode to LIFETIME"
);
}
this.cursorOptions.timeoutMode = options.timeoutMode;
}
} else {
if (options.timeoutMode != null)
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
}
this.cursorOptions.omitMaxTimeMS =
this.cursorOptions.timeoutMS != null &&
((this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
!this.cursorOptions.tailable) ||
(this.cursorOptions.tailable && !this.cursorOptions.awaitData));

const readConcern = ReadConcern.fromOptions(options);
if (readConcern) {
Expand Down Expand Up @@ -400,12 +442,21 @@ export abstract class AbstractCursor<
return false;
}

do {
if ((this.documents?.length ?? 0) !== 0) {
return true;
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
this.timeoutContext?.refresh();
}
try {
do {
if ((this.documents?.length ?? 0) !== 0) {
return true;
}
await this.fetchBatch();
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
} finally {
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
this.timeoutContext?.clear();
}
await this.fetchBatch();
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
}

return false;
}
Expand All @@ -415,15 +466,24 @@ export abstract class AbstractCursor<
if (this.cursorId === Long.ZERO) {
throw new MongoCursorExhaustedError();
}
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
this.timeoutContext?.refresh();
}

do {
const doc = this.documents?.shift(this.deserializationOptions);
if (doc != null) {
if (this.transform != null) return await this.transformDocument(doc);
return doc;
try {
do {
const doc = this.documents?.shift(this.deserializationOptions);
if (doc != null) {
if (this.transform != null) return await this.transformDocument(doc);
return doc;
}
await this.fetchBatch();
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
} finally {
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
this.timeoutContext?.clear();
}
await this.fetchBatch();
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
}

return null;
}
Expand All @@ -436,18 +496,27 @@ export abstract class AbstractCursor<
throw new MongoCursorExhaustedError();
}

let doc = this.documents?.shift(this.deserializationOptions);
if (doc != null) {
if (this.transform != null) return await this.transformDocument(doc);
return doc;
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
this.timeoutContext?.refresh();
}
try {
let doc = this.documents?.shift(this.deserializationOptions);
if (doc != null) {
if (this.transform != null) return await this.transformDocument(doc);
return doc;
}

await this.fetchBatch();
await this.fetchBatch();

doc = this.documents?.shift(this.deserializationOptions);
if (doc != null) {
if (this.transform != null) return await this.transformDocument(doc);
return doc;
doc = this.documents?.shift(this.deserializationOptions);
if (doc != null) {
if (this.transform != null) return await this.transformDocument(doc);
return doc;
}
} finally {
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
this.timeoutContext?.clear();
}
}

return null;
Expand Down Expand Up @@ -476,8 +545,8 @@ export abstract class AbstractCursor<
/**
* Frees any client-side resources used by the cursor.
*/
async close(): Promise<void> {
await this.cleanup();
async close(options?: { timeoutMS?: number }): Promise<void> {
await this.cleanup(options?.timeoutMS);
}

/**
Expand Down Expand Up @@ -658,6 +727,8 @@ export abstract class AbstractCursor<

this.cursorId = null;
this.documents?.clear();
this.timeoutContext?.clear();
this.timeoutContext = undefined;
this.isClosed = false;
this.isKilled = false;
this.initialized = false;
Expand Down Expand Up @@ -707,7 +778,7 @@ export abstract class AbstractCursor<
}
);

return await executeOperation(this.cursorClient, getMoreOperation);
return await executeOperation(this.cursorClient, getMoreOperation, this.timeoutContext);
}

/**
Expand All @@ -718,6 +789,12 @@ export abstract class AbstractCursor<
* a significant refactor.
*/
private async cursorInit(): Promise<void> {
if (this.cursorOptions.timeoutMS != null) {
this.timeoutContext = TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS: this.cursorOptions.timeoutMS
});
}
try {
const state = await this._initialize(this.cursorSession);
const response = state.response;
Expand All @@ -729,7 +806,7 @@ export abstract class AbstractCursor<
} catch (error) {
// the cursor is now initialized, even if an error occurred
this.initialized = true;
await this.cleanup(error);
await this.cleanup(undefined, error);
throw error;
}

Expand Down Expand Up @@ -763,14 +840,15 @@ export abstract class AbstractCursor<

// otherwise need to call getMore
const batchSize = this.cursorOptions.batchSize || 1000;
this.cursorOptions.omitMaxTimeMS = this.cursorOptions.timeoutMS != null;

try {
const response = await this.getMore(batchSize);
this.cursorId = response.id;
this.documents = response;
} catch (error) {
try {
await this.cleanup(error);
await this.cleanup(undefined, error);
} catch (error) {
// `cleanupCursor` should never throw, squash and throw the original error
squashError(error);
Expand All @@ -791,7 +869,7 @@ export abstract class AbstractCursor<
}

/** @internal */
private async cleanup(error?: Error) {
private async cleanup(timeoutMS?: number, error?: Error) {
this.isClosed = true;
const session = this.cursorSession;
try {
Expand All @@ -806,11 +884,23 @@ export abstract class AbstractCursor<
this.isKilled = true;
const cursorId = this.cursorId;
this.cursorId = Long.ZERO;
let timeoutContext: TimeoutContext | undefined;
if (timeoutMS != null) {
this.timeoutContext?.clear();
timeoutContext = TimeoutContext.create({
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
timeoutMS
});
} else {
this.timeoutContext?.refresh();
timeoutContext = this.timeoutContext;
}
await executeOperation(
this.cursorClient,
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
session
})
}),
timeoutContext
);
}
} catch (error) {
Expand Down
20 changes: 19 additions & 1 deletion src/cursor/aggregation_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Document } from '../bson';
import { MongoAPIError } from '../error';
import type { ExplainCommandOptions, ExplainVerbosityLike } from '../explain';
import type { MongoClient } from '../mongo_client';
import { AggregateOperation, type AggregateOptions } from '../operations/aggregate';
Expand All @@ -9,6 +10,7 @@ import { mergeOptions, type MongoDBNamespace } from '../utils';
import {
AbstractCursor,
type AbstractCursorOptions,
CursorTimeoutMode,
type InitialCursorResponse
} from './abstract_cursor';

Expand Down Expand Up @@ -38,6 +40,15 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {

this.pipeline = pipeline;
this.aggregateOptions = options;

const lastStage: Document | undefined = this.pipeline[this.pipeline.length - 1];

if (
this.cursorOptions.timeoutMS != null &&
this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
(lastStage?.$merge != null || lastStage?.$out != null)
)
throw new MongoAPIError('Cannot use $out or $merge stage with ITERATION timeoutMode');
}

clone(): AggregationCursor<TSchema> {
Expand All @@ -60,7 +71,7 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
session
});

const response = await executeOperation(this.client, aggregateOperation);
const response = await executeOperation(this.client, aggregateOperation, this.timeoutContext);

return { server: aggregateOperation.server, session, response };
}
Expand Down Expand Up @@ -95,6 +106,13 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
addStage<T = Document>(stage: Document): AggregationCursor<T>;
addStage<T = Document>(stage: Document): AggregationCursor<T> {
this.throwIfInitialized();
if (
this.cursorOptions.timeoutMS != null &&
this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
(stage.$out != null || stage.$merge != null)
) {
throw new MongoAPIError('Cannot use $out or $merge stage with ITERATION timeoutMode');
}
this.pipeline.push(stage);
return this as unknown as AggregationCursor<T>;
}
Expand Down
Loading

0 comments on commit b931e4f

Please sign in to comment.