Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-5019): add runCursorCommand API #3655

Merged
merged 20 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions etc/sync-wip-spec.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#! /usr/bin/env bash
dariakp marked this conversation as resolved.
Show resolved Hide resolved

set -o xtrace
set -o errexit

pushd "$HOME/code/drivers/specifications/source"
make
popd

SOURCE="$HOME/code/drivers/specifications/source/run-command/tests/unified"

for file in $SOURCE/*; do
cp "$file" "test/spec/run-command/$(basename "$file")"
done

# cp "$HOME/code/drivers/specifications/source/unified-test-format/tests/invalid/entity-createRunCursorCommand.yml" test/spec/unified-test-format/invalid/entity-createRunCursorCommand.yml
# cp "$HOME/code/drivers/specifications/source/unified-test-format/tests/invalid/entity-createRunCursorCommand.json" test/spec/unified-test-format/invalid/entity-createRunCursorCommand.json

cp "$HOME/code/drivers/specifications/source/unified-test-format/tests/valid-pass/entity-commandCursor.yml" test/spec/unified-test-format/valid-pass/entity-commandCursor.yml
cp "$HOME/code/drivers/specifications/source/unified-test-format/tests/valid-pass/entity-commandCursor.json" test/spec/unified-test-format/valid-pass/entity-commandCursor.json


if [ -z "$MONGODB_URI" ]; then echo "must set uri" && exit 1; fi
export MONGODB_URI=$MONGODB_URI
npm run check:test -- -g '(RunCommand spec)|(Unified test format runner runCursorCommand)'
2 changes: 1 addition & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ export abstract class AbstractCursor<
abstract clone(): AbstractCursor<TSchema>;

/** @internal */
abstract _initialize(
protected abstract _initialize(
session: ClientSession | undefined,
callback: Callback<ExecutionResult>
): void;
Expand Down
138 changes: 138 additions & 0 deletions src/cursor/run_command_cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import { BSONSerializeOptions, Document, Long, resolveBSONOptions } from '../bson';
import type { Db } from '../db';
import { MongoAPIError, MongoUnexpectedServerResponseError } from '../error';
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
import { GetMoreOperation } from '../operations/get_more';
import { RunCommandOperation } from '../operations/run_command';
import type { ReadConcernLike } from '../read_concern';
import type { ReadPreferenceLike } from '../read_preference';
import type { ClientSession } from '../sessions';
import { Callback, ns } from '../utils';
import { AbstractCursor } from './abstract_cursor';

/** @public */
export type RunCommandCursorOptions = {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
readPreference?: ReadPreferenceLike;
session?: ClientSession;
} & BSONSerializeOptions;

/** @internal */
type RunCursorCommandResponse = {
cursor: { id: bigint | Long | number; ns: string; firstBatch: Document[] };
ok: 1;
};

/** @public */
export class RunCommandCursor extends AbstractCursor {
public readonly command: Readonly<Record<string, any>>;
public readonly getMoreOptions: {
comment?: any;
maxAwaitTimeMS?: number;
batchSize?: number;
} = {};

/**
* Controls the `getMore.comment` field
* @param comment - any BSON value
*/
public setComment(comment: any): this {
this.getMoreOptions.comment = comment;
return this;
}

/**
* Controls the `getMore.maxTimeMS` field. Only valid when cursor is tailable await
* @param maxTimeMS - the number of milliseconds to wait for new data
*/
public setMaxTimeMS(maxTimeMS: number): this {
this.getMoreOptions.maxAwaitTimeMS = maxTimeMS;
return this;
}

/**
* Controls the `getMore.batchSize` field
* @param maxTimeMS - the number documents to return in the `nextBatch`
*/
public setBatchSize(batchSize: number): this {
this.getMoreOptions.batchSize = batchSize;
return this;
}

public clone(): RunCommandCursor {
return new RunCommandCursor(this.db, this.command, {
readPreference: this.readPreference,
...resolveBSONOptions(this.cursorOptions)
});
}

public override withReadConcern(_: ReadConcernLike): never {
throw new MongoAPIError(
'RunCommandCursor does not support readConcern it must be attached to the command being run'
);
}

public override addCursorFlag(_: string, __: boolean): never {
throw new MongoAPIError(
'RunCommandCursor does not support cursor flags, they must be attached to the command being run'
);
}

public override maxTimeMS(_: number): never {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
throw new MongoAPIError(
'RunCommandCursor does not support maxTimeMS, it must be attached to the command being run'
);
}

public override batchSize(_: number): never {
throw new MongoAPIError(
'RunCommandCursor does not support batchSize, it must be attached to the command being run'
);
}

/** @internal */
private db: Db;

/** @internal */
constructor(db: Db, command: Document, options: RunCommandCursorOptions = {}) {
super(db.s.client, ns(db.namespace), options);
this.db = db;
this.command = Object.freeze({ ...command });
}

/** @internal */
protected _initialize(session: ClientSession, callback: Callback<ExecutionResult>) {
const operation = new RunCommandOperation<RunCursorCommandResponse>(this.db, this.command, {
...this.cursorOptions,
session: session,
readPreference: this.cursorOptions.readPreference
});
executeOperation(this.client, operation).then(
response => {
if (!response.cursor) {
callback(
new MongoUnexpectedServerResponseError('Expected server to respond with cursor')
);
return;
}
callback(undefined, {
server: operation.server,
session,
response
});
},
err => callback(err)
);
}

/** @internal */
override _getMore(_batchSize: number, callback: Callback<Document>) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const getMoreOperation = new GetMoreOperation(this.namespace, this.id!, this.server!, {
...this.cursorOptions,
session: this.session,
...this.getMoreOptions
});

executeOperation(this.client, getMoreOperation, callback);
}
}
14 changes: 14 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Collection, CollectionOptions } from './collection';
import * as CONSTANTS from './constants';
import { AggregationCursor } from './cursor/aggregation_cursor';
import { ListCollectionsCursor } from './cursor/list_collections_cursor';
import { RunCommandCursor, type RunCommandCursorOptions } from './cursor/run_command_cursor';
import { MongoAPIError, MongoInvalidArgumentError } from './error';
import type { MongoClient, PkFactory } from './mongo_client';
import type { TODO_NODE_3286 } from './mongo_types';
Expand Down Expand Up @@ -523,6 +524,19 @@ export class Db {

return new ChangeStream<TSchema, TChange>(this, pipeline, resolveOptions(this, options));
}

/**
* A low level cursor API providing basic driver functionality.
dariakp marked this conversation as resolved.
Show resolved Hide resolved
* - ClientSession management
* - ReadPreference for server selection
* - Running getMores automatically when a local batch is exhausted
*
* @param command - The command that will start a cursor on the server.
* @param options - Configurations for running the command, bson options will apply to getMores
*/
runCursorCommand(command: Document, options?: RunCommandCursorOptions): RunCommandCursor {
return new RunCommandCursor(this, command, options);
}
}

// TODO(NODE-3484): Refactor into MongoDBNamespace
Expand Down
3 changes: 3 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { AggregationCursor } from './cursor/aggregation_cursor';
import { FindCursor } from './cursor/find_cursor';
import { ListCollectionsCursor } from './cursor/list_collections_cursor';
import { ListIndexesCursor } from './cursor/list_indexes_cursor';
import type { RunCommandCursor } from './cursor/run_command_cursor';
import { Db } from './db';
import { GridFSBucket } from './gridfs';
import { GridFSBucketReadStream } from './gridfs/download';
Expand Down Expand Up @@ -87,6 +88,7 @@ export {
ListIndexesCursor,
MongoClient,
OrderedBulkOperation,
RunCommandCursor,
UnorderedBulkOperation
};

Expand Down Expand Up @@ -275,6 +277,7 @@ export type {
ChangeStreamAggregateRawResult,
ChangeStreamCursorOptions
} from './cursor/change_stream_cursor';
export type { RunCommandCursorOptions } from './cursor/run_command_cursor';
export type { DbOptions, DbPrivate } from './db';
export type { AutoEncrypter, AutoEncryptionOptions, AutoEncryptionTlsOptions } from './deps';
export type { Encrypter, EncrypterOptions } from './encrypter';
Expand Down
10 changes: 9 additions & 1 deletion test/integration/run-command/run_command.spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,13 @@ import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

describe('RunCommand spec', () => {
runUnifiedSuite(loadSpecTests('run-command'));
runUnifiedSuite(loadSpecTests('run-command'), test => {
if (test.description.includes('timeoutMS') || test.description.includes('timeoutMode')) {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
return 'CSOT not implemented in Node.js yet';
}
if (test.description === 'does not attach $readPreference to given command on standalone') {
return 'TODO(NODE-5263): Do not send $readPreference to standalone servers';
}
return false;
});
});
53 changes: 53 additions & 0 deletions test/integration/run-command/run_cursor_command.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { expect } from 'chai';

import { CommandStartedEvent, Db, MongoClient } from '../../mongodb';

describe('class RunCommandCursor', () => {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
let client: MongoClient;
let db: Db;
let commandsStarted: CommandStartedEvent[];

beforeEach(async function () {
client = this.configuration.newClient({}, { monitorCommands: true });
db = client.db();
await db.dropDatabase().catch(() => null);
await db
.collection<{ _id: number }>('collection')
.insertMany([{ _id: 0 }, { _id: 1 }, { _id: 2 }]);
commandsStarted = [];
client.on('commandStarted', started => commandsStarted.push(started));
});

afterEach(async function () {
commandsStarted = [];
await client.close();
});

it('should only run init command once', async () => {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const cursor = db.runCursorCommand({ find: 'collection', filter: {}, batchSize: 1 });
cursor.setBatchSize(1);
const it0 = cursor[Symbol.asyncIterator]();
const it1 = cursor[Symbol.asyncIterator]();

const next0it0 = await it0.next(); // find, 1 doc
const next0it1 = await it1.next(); // getMore, 1 doc

expect(next0it0).to.deep.equal({ value: { _id: 0 }, done: false });
expect(next0it1).to.deep.equal({ value: { _id: 1 }, done: false });
expect(commandsStarted.map(c => c.commandName)).to.have.lengthOf(2);

const next1it0 = await it0.next(); // getMore, 1 doc
const next1it1 = await it1.next(); // getMore, 0 doc & exhausted id

expect(next1it0).to.deep.equal({ value: { _id: 2 }, done: false });
expect(next1it1).to.deep.equal({ value: undefined, done: true });
expect(commandsStarted.map(c => c.commandName)).to.have.lengthOf(4);

const next2it0 = await it0.next();
const next2it1 = await it1.next();

expect(next2it0).to.deep.equal({ value: undefined, done: true });
expect(next2it1).to.deep.equal({ value: undefined, done: true });
expect(commandsStarted.map(c => c.commandName)).to.have.lengthOf(4);
});
});
1 change: 1 addition & 0 deletions test/mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ export * from '../src/cursor/change_stream_cursor';
export * from '../src/cursor/find_cursor';
export * from '../src/cursor/list_collections_cursor';
export * from '../src/cursor/list_indexes_cursor';
export * from '../src/cursor/run_command_cursor';
export * from '../src/db';
export * from '../src/deps';
export * from '../src/encrypter';
Expand Down
Loading