diff --git a/src/cmap/command_monitoring_events.ts b/src/cmap/command_monitoring_events.ts index 002ecf2c8c..a7e1653bec 100644 --- a/src/cmap/command_monitoring_events.ts +++ b/src/cmap/command_monitoring_events.ts @@ -7,7 +7,12 @@ import { LEGACY_HELLO_COMMAND_CAMEL_CASE } from '../constants'; import { calculateDurationInMs, deepCopy } from '../utils'; -import { OpMsgRequest, type OpQueryRequest, type WriteProtocolMessageType } from './commands'; +import { + DocumentSequence, + OpMsgRequest, + type OpQueryRequest, + type WriteProtocolMessageType +} from './commands'; import type { Connection } from './connection'; /** @@ -249,7 +254,16 @@ const OP_QUERY_KEYS = [ /** Extract the actual command from the query, possibly up-converting if it's a legacy format */ function extractCommand(command: WriteProtocolMessageType): Document { if (command instanceof OpMsgRequest) { - return deepCopy(command.command); + const cmd = deepCopy(command.command); + // For OP_MSG with payload type 1 we need to pull the documents + // array out of the document sequence for monitoring. + if (cmd.ops instanceof DocumentSequence) { + cmd.ops = cmd.ops.documents; + } + if (cmd.nsInfo instanceof DocumentSequence) { + cmd.nsInfo = cmd.nsInfo.documents; + } + return cmd; } if (command.query?.$query) { diff --git a/src/cmap/commands.ts b/src/cmap/commands.ts index 73ef1f2e1f..e04897eedf 100644 --- a/src/cmap/commands.ts +++ b/src/cmap/commands.ts @@ -429,10 +429,66 @@ export interface OpMsgOptions { /** @internal */ export class DocumentSequence { + field: string; documents: Document[]; + serializedDocumentsLength: number; + private chunks: Uint8Array[]; + private header?: Buffer; - constructor(documents: Document[]) { - this.documents = documents; + /** + * Create a new document sequence for the provided field. + * @param field - The field it will replace. + */ + constructor(field: string, documents?: Document[]) { + this.field = field; + this.documents = []; + this.chunks = []; + this.serializedDocumentsLength = 0; + this.init(); + if (documents) { + for (const doc of documents) { + this.push(doc, BSON.serialize(doc)); + } + } + } + + /** + * Initialize the buffer chunks. + */ + private init() { + // Document sequences starts with type 1 at the first byte. + const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1); + buffer[0] = 1; + // Third part is the field name at offset 5 with trailing null byte. + encodeUTF8Into(buffer, `${this.field}\0`, 5); + this.chunks.push(buffer); + this.header = buffer; + } + + /** + * Push a document to the document sequence. Will serialize the document + * as well and return the current serialized length of all documents. + * @param document - The document to add. + * @param buffer - The serialized document in raw BSON. + * @returns The new totoal document sequence length. + */ + push(document: Document, buffer: Uint8Array): number { + this.serializedDocumentsLength += buffer.length; + // Push the document. + this.documents.push(document); + // Push the document raw bson. + this.chunks.push(buffer); + // Write the new length. + this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1); + return this.serializedDocumentsLength + (this.header?.length ?? 0); + } + + /** + * Get the fully serialized bytes for the document sequence section. + * @returns The section bytes. + */ + toBin(): Uint8Array { + return Buffer.concat(this.chunks); } } @@ -543,21 +599,7 @@ export class OpMsgRequest { const chunks = []; for (const [key, value] of Object.entries(document)) { if (value instanceof DocumentSequence) { - // Document sequences starts with type 1 at the first byte. - const buffer = Buffer.allocUnsafe(1 + 4 + key.length); - buffer[0] = 1; - // Third part is the field name at offset 5. - encodeUTF8Into(buffer, key, 5); - chunks.push(buffer); - // Fourth part are the documents' bytes. - let docsLength = 0; - for (const doc of value.documents) { - const docBson = this.serializeBson(doc); - docsLength += docBson.length; - chunks.push(docBson); - } - // Second part of the sequence is the length at offset 1; - buffer.writeInt32LE(key.length + docsLength, 1); + chunks.push(value.toBin()); // Why are we removing the field from the command? This is because it needs to be // removed in the OP_MSG request first section, and DocumentSequence is not a // BSON type and is specific to the MongoDB wire protocol so there's nothing diff --git a/src/cmap/wire_protocol/responses.ts b/src/cmap/wire_protocol/responses.ts index e69cf84cfc..6c166afd61 100644 --- a/src/cmap/wire_protocol/responses.ts +++ b/src/cmap/wire_protocol/responses.ts @@ -329,3 +329,29 @@ export class ExplainedCursorResponse extends CursorResponse { return this.toObject(options); } } + +/** + * Client bulk writes have some extra metadata at the top level that needs to be + * included in the result returned to the user. + */ +export class ClientBulkWriteCursorResponse extends CursorResponse { + get insertedCount() { + return this.get('nInserted', BSONType.int, true); + } + + get upsertedCount() { + return this.get('nUpserted', BSONType.int, true); + } + + get matchedCount() { + return this.get('nMatched', BSONType.int, true); + } + + get modifiedCount() { + return this.get('nModified', BSONType.int, true); + } + + get deletedCount() { + return this.get('nDeleted', BSONType.int, true); + } +} diff --git a/src/cursor/client_bulk_write_cursor.ts b/src/cursor/client_bulk_write_cursor.ts new file mode 100644 index 0000000000..cd853a4647 --- /dev/null +++ b/src/cursor/client_bulk_write_cursor.ts @@ -0,0 +1,73 @@ +import type { Document } from '../bson'; +import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses'; +import { MongoClientBulkWriteCursorError } from '../error'; +import type { MongoClient } from '../mongo_client'; +import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write'; +import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common'; +import { executeOperation } from '../operations/execute_operation'; +import type { ClientSession } from '../sessions'; +import { mergeOptions, MongoDBNamespace } from '../utils'; +import { + AbstractCursor, + type AbstractCursorOptions, + type InitialCursorResponse +} from './abstract_cursor'; + +/** @public */ +export interface ClientBulkWriteCursorOptions + extends Omit, + ClientBulkWriteOptions {} + +/** + * This is the cursor that handles client bulk write operations. Note this is never + * exposed directly to the user and is always immediately exhausted. + * @internal + */ +export class ClientBulkWriteCursor extends AbstractCursor { + public readonly command: Document; + /** @internal */ + private cursorResponse?: ClientBulkWriteCursorResponse; + /** @internal */ + private clientBulkWriteOptions: ClientBulkWriteOptions; + + /** @internal */ + constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) { + super(client, new MongoDBNamespace('admin', '$cmd'), options); + + this.command = command; + this.clientBulkWriteOptions = options; + } + + /** + * We need a way to get the top level cursor response fields for + * generating the bulk write result, so we expose this here. + */ + get response(): ClientBulkWriteCursorResponse { + if (this.cursorResponse) return this.cursorResponse; + throw new MongoClientBulkWriteCursorError( + 'No client bulk write cursor response returned from the server.' + ); + } + + clone(): ClientBulkWriteCursor { + const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions); + delete clonedOptions.session; + return new ClientBulkWriteCursor(this.client, this.command, { + ...clonedOptions + }); + } + + /** @internal */ + async _initialize(session: ClientSession): Promise { + const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, { + ...this.clientBulkWriteOptions, + ...this.cursorOptions, + session + }); + + const response = await executeOperation(this.client, clientBulkWriteOperation); + this.cursorResponse = response; + + return { server: clientBulkWriteOperation.server, session, response }; + } +} diff --git a/src/error.ts b/src/error.ts index 668e9cdbf5..4aed6b9314 100644 --- a/src/error.ts +++ b/src/error.ts @@ -616,6 +616,60 @@ export class MongoGCPError extends MongoOIDCError { } } +/** + * An error indicating that an error occurred when processing bulk write results. + * + * @public + * @category Error + */ +export class MongoClientBulkWriteCursorError extends MongoRuntimeError { + /** + * **Do not use this constructor!** + * + * Meant for internal use only. + * + * @remarks + * This class is only meant to be constructed within the driver. This constructor is + * not subject to semantic versioning compatibility guarantees and may change at any time. + * + * @public + **/ + constructor(message: string) { + super(message); + } + + override get name(): string { + return 'MongoClientBulkWriteCursorError'; + } +} + +/** + * An error indicating that an error occurred on the client when executing a client bulk write. + * + * @public + * @category Error + */ +export class MongoClientBulkWriteExecutionError extends MongoRuntimeError { + /** + * **Do not use this constructor!** + * + * Meant for internal use only. + * + * @remarks + * This class is only meant to be constructed within the driver. This constructor is + * not subject to semantic versioning compatibility guarantees and may change at any time. + * + * @public + **/ + constructor(message: string) { + super(message); + } + + override get name(): string { + return 'MongoClientBulkWriteExecutionError'; + } +} + /** * An error generated when a ChangeStream operation fails to execute. * diff --git a/src/index.ts b/src/index.ts index dda026323a..97f964ce54 100644 --- a/src/index.ts +++ b/src/index.ts @@ -45,6 +45,8 @@ export { MongoAzureError, MongoBatchReExecutionError, MongoChangeStreamError, + MongoClientBulkWriteCursorError, + MongoClientBulkWriteExecutionError, MongoCompatibilityError, MongoCursorExhaustedError, MongoCursorInUseError, @@ -473,6 +475,21 @@ export type { AggregateOptions, DB_AGGREGATE_COLLECTION } from './operations/aggregate'; +export type { + AnyClientBulkWriteModel, + ClientBulkWriteOptions, + ClientBulkWriteResult, + ClientDeleteManyModel, + ClientDeleteOneModel, + ClientDeleteResult, + ClientInsertOneModel, + ClientInsertOneResult, + ClientReplaceOneModel, + ClientUpdateManyModel, + ClientUpdateOneModel, + ClientUpdateResult, + ClientWriteModel +} from './operations/client_bulk_write/common'; export type { CollationOptions, CommandOperation, diff --git a/src/mongo_client.ts b/src/mongo_client.ts index e8aae51639..092e9418b3 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -30,6 +30,12 @@ import { SeverityLevel } from './mongo_logger'; import { TypedEventEmitter } from './mongo_types'; +import { + type AnyClientBulkWriteModel, + type ClientBulkWriteOptions, + type ClientBulkWriteResult +} from './operations/client_bulk_write/common'; +import { ClientBulkWriteExecutor } from './operations/client_bulk_write/executor'; import { executeOperation } from './operations/execute_operation'; import { RunAdminCommandOperation } from './operations/run_command'; import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern'; @@ -477,6 +483,19 @@ export class MongoClient extends TypedEventEmitter implements return this.s.bsonOptions; } + /** + * Executes a client bulk write operation, available on server 8.0+. + * @param models - The client bulk write models. + * @param options - The client bulk write options. + * @returns A ClientBulkWriteResult for acknowledged writes and ok: 1 for unacknowledged writes. + */ + async bulkWrite( + models: AnyClientBulkWriteModel[], + options?: ClientBulkWriteOptions + ): Promise { + return await new ClientBulkWriteExecutor(this, models, options).execute(); + } + /** * Connect to MongoDB using a url * diff --git a/src/operations/client_bulk_write/client_bulk_write.ts b/src/operations/client_bulk_write/client_bulk_write.ts new file mode 100644 index 0000000000..cb020bde40 --- /dev/null +++ b/src/operations/client_bulk_write/client_bulk_write.ts @@ -0,0 +1,45 @@ +import { type Document } from 'bson'; + +import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses'; +import type { Server } from '../../sdam/server'; +import type { ClientSession } from '../../sessions'; +import { MongoDBNamespace } from '../../utils'; +import { CommandOperation } from '../command'; +import { Aspect, defineAspects } from '../operation'; +import { type ClientBulkWriteOptions } from './common'; + +/** + * Executes a single client bulk write operation within a potential batch. + * @internal + */ +export class ClientBulkWriteOperation extends CommandOperation { + command: Document; + override options: ClientBulkWriteOptions; + + override get commandName() { + return 'bulkWrite' as const; + } + + constructor(command: Document, options: ClientBulkWriteOptions) { + super(undefined, options); + this.command = command; + this.options = options; + this.ns = new MongoDBNamespace('admin', '$cmd'); + } + + /** + * Execute the command. Superclass will handle write concern, etc. + * @param server - The server. + * @param session - The session. + * @returns The response. + */ + override async execute( + server: Server, + session: ClientSession | undefined + ): Promise { + return await super.executeCommand(server, session, this.command, ClientBulkWriteCursorResponse); + } +} + +// Skipping the collation as it goes on the individual ops. +defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]); diff --git a/src/operations/client_bulk_write/command_builder.ts b/src/operations/client_bulk_write/command_builder.ts index 4d4d323de6..6b809a08c5 100644 --- a/src/operations/client_bulk_write/command_builder.ts +++ b/src/operations/client_bulk_write/command_builder.ts @@ -1,6 +1,8 @@ -import { type Document } from '../../bson'; +import { BSON, type Document } from '../../bson'; import { DocumentSequence } from '../../cmap/commands'; +import { type PkFactory } from '../../mongo_client'; import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types'; +import { DEFAULT_PK_FACTORY } from '../../utils'; import { type CollationOptions } from '../command'; import { type Hint } from '../operation'; import type { @@ -23,20 +25,32 @@ export interface ClientBulkWriteCommand { nsInfo: DocumentSequence; bypassDocumentValidation?: boolean; let?: Document; + comment?: any; } +/** + * The bytes overhead for the extra fields added post command generation. + */ +const MESSAGE_OVERHEAD_BYTES = 1000; + /** @internal */ export class ClientBulkWriteCommandBuilder { models: AnyClientBulkWriteModel[]; options: ClientBulkWriteOptions; + pkFactory: PkFactory; /** * Create the command builder. * @param models - The client write models. */ - constructor(models: AnyClientBulkWriteModel[], options: ClientBulkWriteOptions) { + constructor( + models: AnyClientBulkWriteModel[], + options: ClientBulkWriteOptions, + pkFactory?: PkFactory + ) { this.models = models; this.options = options; + this.pkFactory = pkFactory ?? DEFAULT_PK_FACTORY; } /** @@ -53,32 +67,148 @@ export class ClientBulkWriteCommandBuilder { /** * Build the bulk write commands from the models. */ - buildCommands(): ClientBulkWriteCommand[] { + buildCommands(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand[] { // Iterate the models to build the ops and nsInfo fields. - const operations = []; + // We need to do this in a loop which creates one command each up + // to the max bson size or max message size. + const commands: ClientBulkWriteCommand[] = []; + let currentCommandLength = 0; let currentNamespaceIndex = 0; + let currentCommand: ClientBulkWriteCommand = this.baseCommand(); const namespaces = new Map(); + for (const model of this.models) { const ns = model.namespace; const index = namespaces.get(ns); + + /** + * Convenience function for resetting everything when a new batch + * is started. + */ + const reset = () => { + commands.push(currentCommand); + namespaces.clear(); + currentNamespaceIndex = 0; + currentCommand = this.baseCommand(); + namespaces.set(ns, currentNamespaceIndex); + }; + if (index != null) { - operations.push(buildOperation(model, index)); + // Pushing to the ops document sequence returns the bytes length added. + const operation = buildOperation(model, index, this.pkFactory); + const operationBuffer = BSON.serialize(operation); + + // Check if the operation buffer can fit in the current command. If it can, + // then add the operation to the document sequence and increment the + // current length as long as the ops don't exceed the maxWriteBatchSize. + if ( + currentCommandLength + operationBuffer.length < maxMessageSizeBytes && + currentCommand.ops.documents.length < maxWriteBatchSize + ) { + // Pushing to the ops document sequence returns the bytes length added. + currentCommandLength = + MESSAGE_OVERHEAD_BYTES + this.addOperation(currentCommand, operation, operationBuffer); + } else { + // We need to batch. Push the current command to the commands + // array and create a new current command. We aslo need to clear the namespaces + // map for the new command. + reset(); + + const nsInfo = { ns: ns }; + const nsInfoBuffer = BSON.serialize(nsInfo); + currentCommandLength = + MESSAGE_OVERHEAD_BYTES + + this.addOperationAndNsInfo( + currentCommand, + operation, + operationBuffer, + nsInfo, + nsInfoBuffer + ); + } } else { namespaces.set(ns, currentNamespaceIndex); - operations.push(buildOperation(model, currentNamespaceIndex)); + const nsInfo = { ns: ns }; + const nsInfoBuffer = BSON.serialize(nsInfo); + const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory); + const operationBuffer = BSON.serialize(operation); + + // Check if the operation and nsInfo buffers can fit in the command. If they + // can, then add the operation and nsInfo to their respective document + // sequences and increment the current length as long as the ops don't exceed + // the maxWriteBatchSize. + if ( + currentCommandLength + nsInfoBuffer.length + operationBuffer.length < + maxMessageSizeBytes && + currentCommand.ops.documents.length < maxWriteBatchSize + ) { + currentCommandLength = + MESSAGE_OVERHEAD_BYTES + + this.addOperationAndNsInfo( + currentCommand, + operation, + operationBuffer, + nsInfo, + nsInfoBuffer + ); + } else { + // We need to batch. Push the current command to the commands + // array and create a new current command. Aslo clear the namespaces map. + reset(); + + currentCommandLength = + MESSAGE_OVERHEAD_BYTES + + this.addOperationAndNsInfo( + currentCommand, + operation, + operationBuffer, + nsInfo, + nsInfoBuffer + ); + } + // We've added a new namespace, increment the namespace index. currentNamespaceIndex++; } } - const nsInfo = Array.from(namespaces.keys(), ns => ({ ns })); + // After we've finisihed iterating all the models put the last current command + // only if there are operations in it. + if (currentCommand.ops.documents.length > 0) { + commands.push(currentCommand); + } + + return commands; + } + + private addOperation( + command: ClientBulkWriteCommand, + operation: Document, + operationBuffer: Uint8Array + ): number { + // Pushing to the ops document sequence returns the bytes length added. + return command.ops.push(operation, operationBuffer); + } + + private addOperationAndNsInfo( + command: ClientBulkWriteCommand, + operation: Document, + operationBuffer: Uint8Array, + nsInfo: Document, + nsInfoBuffer: Uint8Array + ): number { + // Pushing to the nsInfo document sequence returns the bytes length added. + const nsInfoLength = command.nsInfo.push(nsInfo, nsInfoBuffer); + const opsLength = this.addOperation(command, operation, operationBuffer); + return nsInfoLength + opsLength; + } - // The base command. + private baseCommand(): ClientBulkWriteCommand { const command: ClientBulkWriteCommand = { bulkWrite: 1, errorsOnly: this.errorsOnly, ordered: this.options.ordered ?? true, - ops: new DocumentSequence(operations), - nsInfo: new DocumentSequence(nsInfo) + ops: new DocumentSequence('ops'), + nsInfo: new DocumentSequence('nsInfo') }; // Add bypassDocumentValidation if it was present in the options. if (this.options.bypassDocumentValidation != null) { @@ -88,7 +218,14 @@ export class ClientBulkWriteCommandBuilder { if (this.options.let) { command.let = this.options.let; } - return [command]; + + // we check for undefined specifically here to allow falsy values + // eslint-disable-next-line no-restricted-syntax + if (this.options.comment !== undefined) { + command.comment = this.options.comment; + } + + return command; } } @@ -106,12 +243,14 @@ interface ClientInsertOperation { */ export const buildInsertOneOperation = ( model: ClientInsertOneModel, - index: number + index: number, + pkFactory: PkFactory ): ClientInsertOperation => { const document: ClientInsertOperation = { insert: index, document: model.document }; + document.document._id = model.document._id ?? pkFactory.createPk(); return document; }; @@ -175,6 +314,7 @@ export interface ClientUpdateOperation { hint?: Hint; upsert?: boolean; arrayFilters?: Document[]; + collation?: CollationOptions; } /** @@ -226,6 +366,9 @@ function createUpdateOperation( if (model.arrayFilters) { document.arrayFilters = model.arrayFilters; } + if (model.collation) { + document.collation = model.collation; + } return document; } @@ -237,6 +380,7 @@ export interface ClientReplaceOneOperation { updateMods: WithoutId; hint?: Hint; upsert?: boolean; + collation?: CollationOptions; } /** @@ -261,14 +405,21 @@ export const buildReplaceOneOperation = ( if (model.upsert) { document.upsert = model.upsert; } + if (model.collation) { + document.collation = model.collation; + } return document; }; /** @internal */ -export function buildOperation(model: AnyClientBulkWriteModel, index: number): Document { +export function buildOperation( + model: AnyClientBulkWriteModel, + index: number, + pkFactory: PkFactory +): Document { switch (model.name) { case 'insertOne': - return buildInsertOneOperation(model, index); + return buildInsertOneOperation(model, index, pkFactory); case 'deleteOne': return buildDeleteOneOperation(model, index); case 'deleteMany': diff --git a/src/operations/client_bulk_write/common.ts b/src/operations/client_bulk_write/common.ts index e76fb5108f..c41d971f02 100644 --- a/src/operations/client_bulk_write/common.ts +++ b/src/operations/client_bulk_write/common.ts @@ -144,3 +144,82 @@ export type AnyClientBulkWriteModel = | ClientUpdateManyModel | ClientDeleteOneModel | ClientDeleteManyModel; + +/** @public */ +export interface ClientBulkWriteResult { + /** + * The total number of documents inserted across all insert operations. + */ + insertedCount: number; + /** + * The total number of documents upserted across all update operations. + */ + upsertedCount: number; + /** + * The total number of documents matched across all update operations. + */ + matchedCount: number; + /** + * The total number of documents modified across all update operations. + */ + modifiedCount: number; + /** + * The total number of documents deleted across all delete operations. + */ + deletedCount: number; + /** + * The results of each individual insert operation that was successfully performed. + */ + insertResults?: Map; + /** + * The results of each individual update operation that was successfully performed. + */ + updateResults?: Map; + /** + * The results of each individual delete operation that was successfully performed. + */ + deleteResults?: Map; +} + +/** @public */ +export interface ClientInsertOneResult { + /** + * The _id of the inserted document. + */ + insertedId: any; +} + +/** @public */ +export interface ClientUpdateResult { + /** + * The number of documents that matched the filter. + */ + matchedCount: number; + + /** + * The number of documents that were modified. + */ + modifiedCount: number; + + /** + * The _id field of the upserted document if an upsert occurred. + * + * It MUST be possible to discern between a BSON Null upserted ID value and this field being + * unset. If necessary, drivers MAY add a didUpsert boolean field to differentiate between + * these two cases. + */ + upsertedId?: any; + + /** + * Determines if the upsert did include an _id, which includes the case of the _id being null. + */ + didUpsert: boolean; +} + +/** @public */ +export interface ClientDeleteResult { + /** + * The number of documents that were deleted. + */ + deletedCount: number; +} diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts new file mode 100644 index 0000000000..1c02a42add --- /dev/null +++ b/src/operations/client_bulk_write/executor.ts @@ -0,0 +1,121 @@ +import { type Document } from 'bson'; + +import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor'; +import { MongoClientBulkWriteExecutionError } from '../../error'; +import { type MongoClient } from '../../mongo_client'; +import { WriteConcern } from '../../write_concern'; +import { executeOperation } from '../execute_operation'; +import { ClientBulkWriteOperation } from './client_bulk_write'; +import { type ClientBulkWriteCommand, ClientBulkWriteCommandBuilder } from './command_builder'; +import { + type AnyClientBulkWriteModel, + type ClientBulkWriteOptions, + type ClientBulkWriteResult +} from './common'; +import { ClientBulkWriteResultsMerger } from './results_merger'; + +/** + * Responsible for executing a client bulk write. + * @internal + */ +export class ClientBulkWriteExecutor { + client: MongoClient; + options: ClientBulkWriteOptions; + operations: AnyClientBulkWriteModel[]; + + /** + * Instantiate the executor. + * @param client - The mongo client. + * @param operations - The user supplied bulk write models. + * @param options - The bulk write options. + */ + constructor( + client: MongoClient, + operations: AnyClientBulkWriteModel[], + options?: ClientBulkWriteOptions + ) { + this.client = client; + this.operations = operations; + this.options = { ...options }; + + // If no write concern was provided, we inherit one from the client. + if (!this.options.writeConcern) { + this.options.writeConcern = WriteConcern.fromOptions(this.client.options); + } + } + + /** + * Execute the client bulk write. Will split commands into batches and exhaust the cursors + * for each, then merge the results into one. + * @returns The result. + */ + async execute(): Promise { + const topologyDescription = this.client.topology?.description; + const maxMessageSizeBytes = topologyDescription?.maxMessageSizeBytes; + const maxWriteBatchSize = topologyDescription?.maxWriteBatchSize; + // If we don't know the maxMessageSizeBytes or for some reason it's 0 + // then we cannot calculate the batch. + if (!maxMessageSizeBytes) { + throw new MongoClientBulkWriteExecutionError( + 'No maxMessageSizeBytes value found - client bulk writes cannot execute without this value set from the monitoring connections.' + ); + } + + if (!maxWriteBatchSize) { + throw new MongoClientBulkWriteExecutionError( + 'No maxWriteBatchSize value found - client bulk writes cannot execute without this value set from the monitoring connections.' + ); + } + + // The command builder will take the user provided models and potential split the batch + // into multiple commands due to size. + const pkFactory = this.client.s.options.pkFactory; + const commandBuilder = new ClientBulkWriteCommandBuilder( + this.operations, + this.options, + pkFactory + ); + const commands = commandBuilder.buildCommands(maxMessageSizeBytes, maxWriteBatchSize); + if (this.options.writeConcern?.w === 0) { + return await executeUnacknowledged(this.client, this.options, commands); + } + return await executeAcknowledged(this.client, this.options, commands); + } +} + +/** + * Execute an acknowledged bulk write. + */ +async function executeAcknowledged( + client: MongoClient, + options: ClientBulkWriteOptions, + commands: ClientBulkWriteCommand[] +): Promise { + const resultsMerger = new ClientBulkWriteResultsMerger(options); + // For each command will will create and exhaust a cursor for the results. + let currentBatchOffset = 0; + for (const command of commands) { + const cursor = new ClientBulkWriteCursor(client, command, options); + const docs = await cursor.toArray(); + const operations = command.ops.documents; + resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); + // Set the new batch index so we can back back to the index in the original models. + currentBatchOffset += operations.length; + } + return resultsMerger.result; +} + +/** + * Execute an unacknowledged bulk write. + */ +async function executeUnacknowledged( + client: MongoClient, + options: ClientBulkWriteOptions, + commands: Document[] +): Promise<{ ok: 1 }> { + for (const command of commands) { + const operation = new ClientBulkWriteOperation(command, options); + await executeOperation(client, operation); + } + return { ok: 1 }; +} diff --git a/src/operations/client_bulk_write/results_merger.ts b/src/operations/client_bulk_write/results_merger.ts new file mode 100644 index 0000000000..ca5f3f1604 --- /dev/null +++ b/src/operations/client_bulk_write/results_merger.ts @@ -0,0 +1,101 @@ +import { type Document } from '../../bson'; +import { type ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses'; +import { + type ClientBulkWriteOptions, + type ClientBulkWriteResult, + type ClientDeleteResult, + type ClientInsertOneResult, + type ClientUpdateResult +} from './common'; + +/** + * Merges client bulk write cursor responses together into a single result. + * @internal + */ +export class ClientBulkWriteResultsMerger { + result: ClientBulkWriteResult; + options: ClientBulkWriteOptions; + + /** + * Instantiate the merger. + * @param options - The options. + */ + constructor(options: ClientBulkWriteOptions) { + this.options = options; + this.result = { + insertedCount: 0, + upsertedCount: 0, + matchedCount: 0, + modifiedCount: 0, + deletedCount: 0, + insertResults: undefined, + updateResults: undefined, + deleteResults: undefined + }; + + if (options.verboseResults) { + this.result.insertResults = new Map(); + this.result.updateResults = new Map(); + this.result.deleteResults = new Map(); + } + } + + /** + * Merge the results in the cursor to the existing result. + * @param currentBatchOffset - The offset index to the original models. + * @param response - The cursor response. + * @param documents - The documents in the cursor. + * @returns The current result. + */ + merge( + currentBatchOffset: number, + operations: Document[], + response: ClientBulkWriteCursorResponse, + documents: Document[] + ): ClientBulkWriteResult { + // Update the counts from the cursor response. + this.result.insertedCount += response.insertedCount; + this.result.upsertedCount += response.upsertedCount; + this.result.matchedCount += response.matchedCount; + this.result.modifiedCount += response.modifiedCount; + this.result.deletedCount += response.deletedCount; + + if (this.options.verboseResults) { + // Iterate all the documents in the cursor and update the result. + for (const document of documents) { + // Only add to maps if ok: 1 + if (document.ok === 1) { + // Get the corresponding operation from the command. + const operation = operations[document.idx]; + // Handle insert results. + if ('insert' in operation) { + this.result.insertResults?.set(document.idx + currentBatchOffset, { + insertedId: operation.document._id + }); + } + // Handle update results. + if ('update' in operation) { + const result: ClientUpdateResult = { + matchedCount: document.n, + modifiedCount: document.nModified ?? 0, + // Check if the bulk did actually upsert. + didUpsert: document.upserted != null + }; + if (document.upserted) { + result.upsertedId = document.upserted._id; + } + this.result.updateResults?.set(document.idx + currentBatchOffset, result); + } + // Handle delete results. + if ('delete' in operation) { + this.result.deleteResults?.set(document.idx + currentBatchOffset, { + deletedCount: document.n + }); + } + } + } + } + + return this.result; + } +} diff --git a/src/sdam/server_description.ts b/src/sdam/server_description.ts index cd32f4968b..320a43bc16 100644 --- a/src/sdam/server_description.ts +++ b/src/sdam/server_description.ts @@ -18,6 +18,12 @@ const DATA_BEARING_SERVER_TYPES = new Set([ ServerType.LoadBalancer ]); +/** Default in case we are in load balanced mode. */ +const MAX_MESSAGE_SIZE_BYTES = 48000000; + +/** Default in case we are in load balanced mode. */ +const MAX_WRITE_BATCH_SIZE = 100000; + /** @public */ export interface TopologyVersion { processId: ObjectId; @@ -69,6 +75,10 @@ export class ServerDescription { setVersion: number | null; electionId: ObjectId | null; logicalSessionTimeoutMinutes: number | null; + /** The max message size in bytes for the server. */ + maxMessageSizeBytes: number; + /** The max number of writes in a bulk write command. */ + maxWriteBatchSize: number; // NOTE: does this belong here? It seems we should gossip the cluster time at the CMAP level $clusterTime?: ClusterTime; @@ -111,6 +121,8 @@ export class ServerDescription { this.setVersion = hello?.setVersion ?? null; this.electionId = hello?.electionId ?? null; this.logicalSessionTimeoutMinutes = hello?.logicalSessionTimeoutMinutes ?? null; + this.maxMessageSizeBytes = hello?.maxMessageSizeBytes ?? MAX_MESSAGE_SIZE_BYTES; + this.maxWriteBatchSize = hello?.maxWriteBatchSize ?? MAX_WRITE_BATCH_SIZE; this.primary = hello?.primary ?? null; this.me = hello?.me?.toLowerCase() ?? null; this.$clusterTime = hello?.$clusterTime ?? null; diff --git a/src/sdam/topology_description.ts b/src/sdam/topology_description.ts index 436321c7f1..3f646975f2 100644 --- a/src/sdam/topology_description.ts +++ b/src/sdam/topology_description.ts @@ -43,6 +43,8 @@ export class TopologyDescription { heartbeatFrequencyMS: number; localThresholdMS: number; commonWireVersion: number; + maxMessageSizeBytes?: number; + maxWriteBatchSize?: number; /** * Create a TopologyDescription @@ -71,6 +73,25 @@ export class TopologyDescription { // determine server compatibility for (const serverDescription of this.servers.values()) { + // Find the lowest maxMessageSizeBytes from all the servers. + if (this.maxMessageSizeBytes == null) { + this.maxMessageSizeBytes = serverDescription.maxMessageSizeBytes; + } else { + this.maxMessageSizeBytes = Math.min( + this.maxMessageSizeBytes, + serverDescription.maxMessageSizeBytes + ); + } + + // Find the lowest maxWriteBatchSize from all the servers. + if (this.maxWriteBatchSize == null) { + this.maxWriteBatchSize = serverDescription.maxWriteBatchSize; + } else { + this.maxWriteBatchSize = Math.min( + this.maxWriteBatchSize, + serverDescription.maxWriteBatchSize + ); + } // Load balancer mode is always compatible. if ( serverDescription.type === ServerType.Unknown || diff --git a/test/integration/crud/crud.prose.test.ts b/test/integration/crud/crud.prose.test.ts index 3ddc126d33..3a64aa083f 100644 --- a/test/integration/crud/crud.prose.test.ts +++ b/test/integration/crud/crud.prose.test.ts @@ -3,6 +3,7 @@ import { once } from 'events'; import { type CommandStartedEvent } from '../../../mongodb'; import { + type AnyClientBulkWriteModel, type Collection, MongoBulkWriteError, type MongoClient, @@ -151,6 +152,134 @@ describe('CRUD Prose Spec Tests', () => { }); }); + describe('3. MongoClient.bulkWrite batch splits a writeModels input with greater than maxWriteBatchSize operations', function () { + // Test that MongoClient.bulkWrite properly handles writeModels inputs containing a number of writes greater than + // maxWriteBatchSize. + // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. + // Construct a MongoClient (referred to as client) with command monitoring enabled to observe CommandStartedEvents. + // Perform a hello command using client and record the maxWriteBatchSize value contained in the response. Then, + // construct the following write model (referred to as model): + // InsertOne: { + // "namespace": "db.coll", + // "document": { "a": "b" } + // } + // Construct a list of write models (referred to as models) with model repeated maxWriteBatchSize + 1 times. Execute + // bulkWrite on client with models. Assert that the bulk write succeeds and returns a BulkWriteResult with an + // insertedCount value of maxWriteBatchSize + 1. + // Assert that two CommandStartedEvents (referred to as firstEvent and secondEvent) were observed for the bulkWrite + // command. Assert that the length of firstEvent.command.ops is maxWriteBatchSize. Assert that the length of + // secondEvent.command.ops is 1. If the driver exposes operationIds in its CommandStartedEvents, assert that + // firstEvent.operationId is equal to secondEvent.operationId. + let client: MongoClient; + let maxWriteBatchSize; + const models: AnyClientBulkWriteModel[] = []; + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + await client.connect(); + const hello = await client.db('admin').command({ hello: 1 }); + maxWriteBatchSize = hello.maxWriteBatchSize; + + client.on('commandStarted', filterForCommands('bulkWrite', commands)); + commands.length = 0; + + Array.from({ length: maxWriteBatchSize + 1 }, () => { + models.push({ + namespace: 'db.coll', + name: 'insertOne', + document: { a: 'b' } + }); + }); + }); + + afterEach(async function () { + await client.close(); + }); + + it('splits the commands into 2 operations', { + metadata: { requires: { mongodb: '>=8.0.0' } }, + async test() { + const result = await client.bulkWrite(models); + expect(result.insertedCount).to.equal(maxWriteBatchSize + 1); + expect(commands.length).to.equal(2); + expect(commands[0].command.ops.length).to.equal(maxWriteBatchSize); + expect(commands[1].command.ops.length).to.equal(1); + } + }); + }); + + describe('4. MongoClient.bulkWrite batch splits when an ops payload exceeds maxMessageSizeBytes', function () { + // Test that MongoClient.bulkWrite properly handles a writeModels input which constructs an ops array larger + // than maxMessageSizeBytes. + // This test must only be run on 8.0+ servers. This test must be skipped on Atlas Serverless. + // Construct a MongoClient (referred to as client) with command monitoring enabled to observe CommandStartedEvents. + // Perform a hello command using client and record the following values from the response: maxBsonObjectSize + // and maxMessageSizeBytes. Then, construct the following document (referred to as document): + // { + // "a": "b".repeat(maxBsonObjectSize - 500) + // } + // Construct the following write model (referred to as model): + // InsertOne: { + // "namespace": "db.coll", + // "document": document + // } + // Use the following calculation to determine the number of inserts that should be provided to + // MongoClient.bulkWrite: maxMessageSizeBytes / maxBsonObjectSize + 1 (referred to as numModels). This number + // ensures that the inserts provided to MongoClient.bulkWrite will require multiple bulkWrite commands to be + // sent to the server. + // Construct as list of write models (referred to as models) with model repeated numModels times. Then execute + // bulkWrite on client with models. Assert that the bulk write succeeds and returns a BulkWriteResult with + // an insertedCount value of numModels. + // Assert that two CommandStartedEvents (referred to as firstEvent and secondEvent) were observed. Assert + // that the length of firstEvent.command.ops is numModels - 1. Assert that the length of secondEvent.command.ops + // is 1. If the driver exposes operationIds in its CommandStartedEvents, assert that firstEvent.operationId is + // equal to secondEvent.operationId. + let client: MongoClient; + let maxBsonObjectSize; + let maxMessageSizeBytes; + let numModels; + const models: AnyClientBulkWriteModel[] = []; + const commands: CommandStartedEvent[] = []; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); + await client.connect(); + const hello = await client.db('admin').command({ hello: 1 }); + maxBsonObjectSize = hello.maxBsonObjectSize; + maxMessageSizeBytes = hello.maxMessageSizeBytes; + numModels = Math.floor(maxMessageSizeBytes / maxBsonObjectSize + 1); + + client.on('commandStarted', filterForCommands('bulkWrite', commands)); + commands.length = 0; + + Array.from({ length: numModels }, () => { + models.push({ + name: 'insertOne', + namespace: 'db.coll', + document: { + a: 'b'.repeat(maxBsonObjectSize - 500) + } + }); + }); + }); + + afterEach(async function () { + await client.close(); + }); + + it('splits the commands into 2 operations', { + metadata: { requires: { mongodb: '>=8.0.0' } }, + async test() { + const result = await client.bulkWrite(models, { verboseResults: true }); + expect(result.insertedCount).to.equal(numModels); + expect(commands.length).to.equal(2); + expect(commands[0].command.ops.length).to.equal(numModels - 1); + expect(commands[1].command.ops.length).to.equal(1); + } + }); + }); + describe('14. `explain` helpers allow users to specify `maxTimeMS`', function () { let client: MongoClient; const commands: CommandStartedEvent[] = []; diff --git a/test/integration/crud/crud.spec.test.ts b/test/integration/crud/crud.spec.test.ts index 912fbcb1ef..a8a0d2987f 100644 --- a/test/integration/crud/crud.spec.test.ts +++ b/test/integration/crud/crud.spec.test.ts @@ -5,8 +5,6 @@ import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; const clientBulkWriteTests = new RegExp( [ - 'client bulk write delete with collation', - 'client bulk write delete with hint', 'client bulkWrite operations support errorResponse assertions', 'an individual operation fails during an ordered bulkWrite', 'an individual operation fails during an unordered bulkWrite', @@ -15,7 +13,9 @@ const clientBulkWriteTests = new RegExp( 'a bulk write with only errors does not report a partial result', 'an empty list of write models is a client-side error', 'a write concern error occurs during a bulkWrite', - 'client bulkWrite' + 'client bulkWrite replaceOne prohibits atomic modifiers', + 'client bulkWrite updateOne requires atomic modifiers', + 'client bulkWrite updateMany requires atomic modifiers' ].join('|') ); diff --git a/test/integration/transactions/transactions.spec.test.ts b/test/integration/transactions/transactions.spec.test.ts index 22dd4ddd80..ab6f84b1e9 100644 --- a/test/integration/transactions/transactions.spec.test.ts +++ b/test/integration/transactions/transactions.spec.test.ts @@ -9,7 +9,8 @@ const SKIPPED_TESTS = [ 'defaultTransactionOptions override client options', 'transaction options inherited from defaultTransactionOptions', 'transaction options inherited from client', - 'causal consistency disabled' + 'causal consistency disabled', + 'client bulkWrite with writeConcern in a transaction causes a transaction error' ]; describe('Transactions Spec Unified Tests', function () { diff --git a/test/mongodb.ts b/test/mongodb.ts index 8aeb97b592..3503412304 100644 --- a/test/mongodb.ts +++ b/test/mongodb.ts @@ -158,6 +158,7 @@ export * from '../src/operations/aggregate'; export * from '../src/operations/bulk_write'; export * from '../src/operations/client_bulk_write/command_builder'; export * from '../src/operations/client_bulk_write/common'; +export * from '../src/operations/client_bulk_write/results_merger'; export * from '../src/operations/collections'; export * from '../src/operations/command'; export * from '../src/operations/count'; diff --git a/test/spec/command-logging-and-monitoring/monitoring/unacknowledged-client-bulkWrite.json b/test/spec/command-logging-and-monitoring/monitoring/unacknowledged-client-bulkWrite.json new file mode 100644 index 0000000000..1099b6a1e9 --- /dev/null +++ b/test/spec/command-logging-and-monitoring/monitoring/unacknowledged-client-bulkWrite.json @@ -0,0 +1,218 @@ +{ + "description": "unacknowledged-client-bulkWrite", + "schemaVersion": "1.7", + "runOnRequirements": [ + { + "minServerVersion": "8.0" + } + ], + "createEntities": [ + { + "client": { + "id": "client", + "useMultipleMongoses": false, + "observeEvents": [ + "commandStartedEvent", + "commandSucceededEvent", + "commandFailedEvent" + ], + "uriOptions": { + "w": 0 + } + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "command-monitoring-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "test" + } + } + ], + "initialData": [ + { + "collectionName": "test", + "databaseName": "command-monitoring-tests", + "documents": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + } + ] + } + ], + "_yamlAnchors": { + "namespace": "command-monitoring-tests.test" + }, + "tests": [ + { + "description": "A successful mixed client bulkWrite", + "operations": [ + { + "object": "client", + "name": "clientBulkWrite", + "arguments": { + "models": [ + { + "insertOne": { + "namespace": "command-monitoring-tests.test", + "document": { + "_id": 4, + "x": 44 + } + } + }, + { + "updateOne": { + "namespace": "command-monitoring-tests.test", + "filter": { + "_id": 3 + }, + "update": { + "$set": { + "x": 333 + } + } + } + } + ] + }, + "expectResult": { + "insertedCount": { + "$$unsetOrMatches": 0 + }, + "upsertedCount": { + "$$unsetOrMatches": 0 + }, + "matchedCount": { + "$$unsetOrMatches": 0 + }, + "modifiedCount": { + "$$unsetOrMatches": 0 + }, + "deletedCount": { + "$$unsetOrMatches": 0 + }, + "insertResults": { + "$$unsetOrMatches": {} + }, + "updateResults": { + "$$unsetOrMatches": {} + }, + "deleteResults": { + "$$unsetOrMatches": {} + } + } + }, + { + "object": "collection", + "name": "find", + "arguments": { + "filter": {} + }, + "expectResult": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 333 + }, + { + "_id": 4, + "x": 44 + } + ] + } + ], + "expectEvents": [ + { + "client": "client", + "ignoreExtraEvents": true, + "events": [ + { + "commandStartedEvent": { + "commandName": "bulkWrite", + "databaseName": "admin", + "command": { + "bulkWrite": 1, + "errorsOnly": true, + "ordered": true, + "ops": [ + { + "insert": 0, + "document": { + "_id": 4, + "x": 44 + } + }, + { + "update": 0, + "filter": { + "_id": 3 + }, + "updateMods": { + "$set": { + "x": 333 + } + }, + "multi": false + } + ], + "nsInfo": [ + { + "ns": "command-monitoring-tests.test" + } + ] + } + } + }, + { + "commandSucceededEvent": { + "commandName": "bulkWrite", + "reply": { + "ok": 1, + "nInserted": { + "$$exists": false + }, + "nMatched": { + "$$exists": false + }, + "nModified": { + "$$exists": false + }, + "nUpserted": { + "$$exists": false + }, + "nDeleted": { + "$$exists": false + } + } + } + } + ] + } + ] + } + ] +} diff --git a/test/spec/command-logging-and-monitoring/monitoring/unacknowledged-client-bulkWrite.yml b/test/spec/command-logging-and-monitoring/monitoring/unacknowledged-client-bulkWrite.yml new file mode 100644 index 0000000000..fcc6b7b3ec --- /dev/null +++ b/test/spec/command-logging-and-monitoring/monitoring/unacknowledged-client-bulkWrite.yml @@ -0,0 +1,109 @@ +description: "unacknowledged-client-bulkWrite" + +schemaVersion: "1.7" + +runOnRequirements: + - minServerVersion: "8.0" + +createEntities: + - client: + id: &client client + useMultipleMongoses: false + observeEvents: + - commandStartedEvent + - commandSucceededEvent + - commandFailedEvent + uriOptions: + w: 0 + - database: + id: &database database + client: *client + databaseName: &databaseName command-monitoring-tests + - collection: + id: &collection collection + database: *database + collectionName: &collectionName test + +initialData: + - collectionName: *collectionName + databaseName: *databaseName + documents: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - { _id: 3, x: 33 } + +_yamlAnchors: + namespace: &namespace "command-monitoring-tests.test" + +tests: + - description: 'A successful mixed client bulkWrite' + operations: + - object: *client + name: clientBulkWrite + arguments: + models: + - insertOne: + namespace: *namespace + document: { _id: 4, x: 44 } + - updateOne: + namespace: *namespace + filter: { _id: 3 } + update: { $set: { x: 333 } } + expectResult: + insertedCount: + $$unsetOrMatches: 0 + upsertedCount: + $$unsetOrMatches: 0 + matchedCount: + $$unsetOrMatches: 0 + modifiedCount: + $$unsetOrMatches: 0 + deletedCount: + $$unsetOrMatches: 0 + insertResults: + $$unsetOrMatches: {} + updateResults: + $$unsetOrMatches: {} + deleteResults: + $$unsetOrMatches: {} + # Force completion of the w:0 write by executing a find on the same connection + - object: *collection + name: find + arguments: + filter: {} + expectResult: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - { _id: 3, x: 333 } + - { _id: 4, x: 44 } + + expectEvents: + - + client: *client + ignoreExtraEvents: true + events: + - commandStartedEvent: + commandName: bulkWrite + databaseName: admin + command: + bulkWrite: 1 + errorsOnly: true + ordered: true + ops: + - insert: 0 + document: { _id: 4, x: 44 } + - update: 0 + filter: { _id: 3 } + updateMods: { $set: { x: 333 } } + multi: false + nsInfo: + - ns: *namespace + - commandSucceededEvent: + commandName: bulkWrite + reply: + ok: 1 + nInserted: { $$exists: false } + nMatched: { $$exists: false } + nModified: { $$exists: false } + nUpserted: { $$exists: false } + nDeleted: { $$exists: false } diff --git a/test/spec/crud/unified/client-bulkWrite-delete-options.json b/test/spec/crud/unified/client-bulkWrite-delete-options.json index 5bdf2b124a..d9987897dc 100644 --- a/test/spec/crud/unified/client-bulkWrite-delete-options.json +++ b/test/spec/crud/unified/client-bulkWrite-delete-options.json @@ -1,9 +1,10 @@ { "description": "client bulkWrite delete options", - "schemaVersion": "1.1", + "schemaVersion": "1.4", "runOnRequirements": [ { - "minServerVersion": "8.0" + "minServerVersion": "8.0", + "serverless": "forbid" } ], "createEntities": [ diff --git a/test/spec/crud/unified/client-bulkWrite-delete-options.yml b/test/spec/crud/unified/client-bulkWrite-delete-options.yml index db8b9f46d7..9297838535 100644 --- a/test/spec/crud/unified/client-bulkWrite-delete-options.yml +++ b/test/spec/crud/unified/client-bulkWrite-delete-options.yml @@ -1,7 +1,8 @@ description: "client bulkWrite delete options" -schemaVersion: "1.1" +schemaVersion: "1.4" # To support `serverless: forbid` runOnRequirements: - minServerVersion: "8.0" + serverless: forbid createEntities: - client: diff --git a/test/spec/crud/unified/client-bulkWrite-errorResponse.json b/test/spec/crud/unified/client-bulkWrite-errorResponse.json index edf2339d8a..b828aad3b9 100644 --- a/test/spec/crud/unified/client-bulkWrite-errorResponse.json +++ b/test/spec/crud/unified/client-bulkWrite-errorResponse.json @@ -3,7 +3,8 @@ "schemaVersion": "1.12", "runOnRequirements": [ { - "minServerVersion": "8.0" + "minServerVersion": "8.0", + "serverless": "forbid" } ], "createEntities": [ diff --git a/test/spec/crud/unified/client-bulkWrite-errorResponse.yml b/test/spec/crud/unified/client-bulkWrite-errorResponse.yml index 45e53171ec..d63010afc7 100644 --- a/test/spec/crud/unified/client-bulkWrite-errorResponse.yml +++ b/test/spec/crud/unified/client-bulkWrite-errorResponse.yml @@ -2,6 +2,7 @@ description: "client bulkWrite errorResponse" schemaVersion: "1.12" runOnRequirements: - minServerVersion: "8.0" + serverless: forbid createEntities: - client: diff --git a/test/spec/crud/unified/client-bulkWrite-errors.json b/test/spec/crud/unified/client-bulkWrite-errors.json index 9f17f85331..8cc45bb5f2 100644 --- a/test/spec/crud/unified/client-bulkWrite-errors.json +++ b/test/spec/crud/unified/client-bulkWrite-errors.json @@ -3,7 +3,8 @@ "schemaVersion": "1.21", "runOnRequirements": [ { - "minServerVersion": "8.0" + "minServerVersion": "8.0", + "serverless": "forbid" } ], "createEntities": [ diff --git a/test/spec/crud/unified/client-bulkWrite-errors.yml b/test/spec/crud/unified/client-bulkWrite-errors.yml index 3a420f1429..6c513006ce 100644 --- a/test/spec/crud/unified/client-bulkWrite-errors.yml +++ b/test/spec/crud/unified/client-bulkWrite-errors.yml @@ -2,6 +2,7 @@ description: "client bulkWrite errors" schemaVersion: "1.21" runOnRequirements: - minServerVersion: "8.0" + serverless: forbid createEntities: - client: diff --git a/test/spec/crud/unified/client-bulkWrite-mixed-namespaces.json b/test/spec/crud/unified/client-bulkWrite-mixed-namespaces.json index f90755dc85..55f0618923 100644 --- a/test/spec/crud/unified/client-bulkWrite-mixed-namespaces.json +++ b/test/spec/crud/unified/client-bulkWrite-mixed-namespaces.json @@ -1,9 +1,10 @@ { "description": "client bulkWrite with mixed namespaces", - "schemaVersion": "1.1", + "schemaVersion": "1.4", "runOnRequirements": [ { - "minServerVersion": "8.0" + "minServerVersion": "8.0", + "serverless": "forbid" } ], "createEntities": [ diff --git a/test/spec/crud/unified/client-bulkWrite-mixed-namespaces.yml b/test/spec/crud/unified/client-bulkWrite-mixed-namespaces.yml index 4e4cb01e16..9788bce8c5 100644 --- a/test/spec/crud/unified/client-bulkWrite-mixed-namespaces.yml +++ b/test/spec/crud/unified/client-bulkWrite-mixed-namespaces.yml @@ -1,7 +1,8 @@ description: "client bulkWrite with mixed namespaces" -schemaVersion: "1.1" +schemaVersion: "1.4" # To support `serverless: forbid` runOnRequirements: - minServerVersion: "8.0" + serverless: forbid createEntities: - client: diff --git a/test/spec/crud/unified/client-bulkWrite-options.json b/test/spec/crud/unified/client-bulkWrite-options.json index a1e6af3bf3..708fe4e85b 100644 --- a/test/spec/crud/unified/client-bulkWrite-options.json +++ b/test/spec/crud/unified/client-bulkWrite-options.json @@ -1,9 +1,10 @@ { "description": "client bulkWrite top-level options", - "schemaVersion": "1.1", + "schemaVersion": "1.4", "runOnRequirements": [ { - "minServerVersion": "8.0" + "minServerVersion": "8.0", + "serverless": "forbid" } ], "createEntities": [ diff --git a/test/spec/crud/unified/client-bulkWrite-options.yml b/test/spec/crud/unified/client-bulkWrite-options.yml index fdcf788799..e0cbe747b3 100644 --- a/test/spec/crud/unified/client-bulkWrite-options.yml +++ b/test/spec/crud/unified/client-bulkWrite-options.yml @@ -1,7 +1,8 @@ description: "client bulkWrite top-level options" -schemaVersion: "1.1" +schemaVersion: "1.4" # To support `serverless: forbid` runOnRequirements: - minServerVersion: "8.0" + serverless: forbid createEntities: - client: diff --git a/test/spec/crud/unified/client-bulkWrite-ordered.json b/test/spec/crud/unified/client-bulkWrite-ordered.json index a55d6619b5..6fb10d992f 100644 --- a/test/spec/crud/unified/client-bulkWrite-ordered.json +++ b/test/spec/crud/unified/client-bulkWrite-ordered.json @@ -1,9 +1,10 @@ { "description": "client bulkWrite with ordered option", - "schemaVersion": "1.1", + "schemaVersion": "1.4", "runOnRequirements": [ { - "minServerVersion": "8.0" + "minServerVersion": "8.0", + "serverless": "forbid" } ], "createEntities": [ diff --git a/test/spec/crud/unified/client-bulkWrite-ordered.yml b/test/spec/crud/unified/client-bulkWrite-ordered.yml index dc56dcb860..48aa8ad40a 100644 --- a/test/spec/crud/unified/client-bulkWrite-ordered.yml +++ b/test/spec/crud/unified/client-bulkWrite-ordered.yml @@ -1,7 +1,8 @@ description: "client bulkWrite with ordered option" -schemaVersion: "1.1" +schemaVersion: "1.4" # To support `serverless: forbid` runOnRequirements: - minServerVersion: "8.0" + serverless: forbid createEntities: - client: diff --git a/test/spec/crud/unified/client-bulkWrite-results.json b/test/spec/crud/unified/client-bulkWrite-results.json index 97a9e50b21..accf5a9cbf 100644 --- a/test/spec/crud/unified/client-bulkWrite-results.json +++ b/test/spec/crud/unified/client-bulkWrite-results.json @@ -1,9 +1,10 @@ { "description": "client bulkWrite results", - "schemaVersion": "1.1", + "schemaVersion": "1.4", "runOnRequirements": [ { - "minServerVersion": "8.0" + "minServerVersion": "8.0", + "serverless": "forbid" } ], "createEntities": [ diff --git a/test/spec/crud/unified/client-bulkWrite-results.yml b/test/spec/crud/unified/client-bulkWrite-results.yml index eb001bbb42..86cb5346ae 100644 --- a/test/spec/crud/unified/client-bulkWrite-results.yml +++ b/test/spec/crud/unified/client-bulkWrite-results.yml @@ -1,7 +1,8 @@ description: "client bulkWrite results" -schemaVersion: "1.1" +schemaVersion: "1.4" # To support `serverless: forbid` runOnRequirements: - minServerVersion: "8.0" + serverless: forbid createEntities: - client: diff --git a/test/spec/crud/unified/client-bulkWrite-update-options.json b/test/spec/crud/unified/client-bulkWrite-update-options.json index 93a2774e5f..ce6241c681 100644 --- a/test/spec/crud/unified/client-bulkWrite-update-options.json +++ b/test/spec/crud/unified/client-bulkWrite-update-options.json @@ -1,9 +1,10 @@ { "description": "client bulkWrite update options", - "schemaVersion": "1.1", + "schemaVersion": "1.4", "runOnRequirements": [ { - "minServerVersion": "8.0" + "minServerVersion": "8.0", + "serverless": "forbid" } ], "createEntities": [ diff --git a/test/spec/crud/unified/client-bulkWrite-update-options.yml b/test/spec/crud/unified/client-bulkWrite-update-options.yml index fe188a490c..c5cc20d480 100644 --- a/test/spec/crud/unified/client-bulkWrite-update-options.yml +++ b/test/spec/crud/unified/client-bulkWrite-update-options.yml @@ -1,7 +1,8 @@ description: "client bulkWrite update options" -schemaVersion: "1.1" +schemaVersion: "1.4" # To support `serverless: forbid` runOnRequirements: - minServerVersion: "8.0" + serverless: forbid createEntities: - client: diff --git a/test/spec/crud/unified/client-bulkWrite-update-pipeline.json b/test/spec/crud/unified/client-bulkWrite-update-pipeline.json index 57b6c9c1ba..9dba5ee6c5 100644 --- a/test/spec/crud/unified/client-bulkWrite-update-pipeline.json +++ b/test/spec/crud/unified/client-bulkWrite-update-pipeline.json @@ -1,9 +1,10 @@ { "description": "client bulkWrite update pipeline", - "schemaVersion": "1.1", + "schemaVersion": "1.4", "runOnRequirements": [ { - "minServerVersion": "8.0" + "minServerVersion": "8.0", + "serverless": "forbid" } ], "createEntities": [ diff --git a/test/spec/crud/unified/client-bulkWrite-update-pipeline.yml b/test/spec/crud/unified/client-bulkWrite-update-pipeline.yml index fe0e29a508..c90e93b47c 100644 --- a/test/spec/crud/unified/client-bulkWrite-update-pipeline.yml +++ b/test/spec/crud/unified/client-bulkWrite-update-pipeline.yml @@ -1,7 +1,8 @@ description: "client bulkWrite update pipeline" -schemaVersion: "1.1" +schemaVersion: "1.4" # To support `serverless: forbid` runOnRequirements: - minServerVersion: "8.0" + serverless: forbid createEntities: - client: diff --git a/test/spec/server-selection/logging/operation-id.json b/test/spec/server-selection/logging/operation-id.json index 276e4b8d6d..3de19abb23 100644 --- a/test/spec/server-selection/logging/operation-id.json +++ b/test/spec/server-selection/logging/operation-id.json @@ -47,6 +47,9 @@ } } ], + "_yamlAnchors": { + "namespace": "logging-tests.server-selection" + }, "tests": [ { "description": "Successful bulkWrite operation: log messages have operationIds", @@ -224,6 +227,190 @@ ] } ] + }, + { + "description": "Successful client bulkWrite operation: log messages have operationIds", + "runOnRequirements": [ + { + "minServerVersion": "8.0" + } + ], + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 2 + } + }, + { + "name": "clientBulkWrite", + "object": "client", + "arguments": { + "models": [ + { + "insertOne": { + "namespace": "logging-tests.server-selection", + "document": { + "x": 1 + } + } + } + ] + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "bulkWrite" + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection succeeded", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "bulkWrite" + } + } + ] + } + ] + }, + { + "description": "Failed client bulkWrite operation: log messages have operationIds", + "runOnRequirements": [ + { + "minServerVersion": "8.0" + } + ], + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "hello", + "ismaster" + ], + "appName": "loggingClient", + "closeConnection": true + } + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": { + "newDescription": { + "type": "Unknown" + } + } + }, + "count": 1 + } + }, + { + "name": "clientBulkWrite", + "object": "client", + "arguments": { + "models": [ + { + "insertOne": { + "namespace": "logging-tests.server-selection", + "document": { + "x": 1 + } + } + } + ] + }, + "expectError": { + "isClientError": true + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "bulkWrite" + } + }, + { + "level": "info", + "component": "serverSelection", + "data": { + "message": "Waiting for suitable server to become available", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "bulkWrite" + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection failed", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "bulkWrite" + } + } + ] + } + ] } ] } diff --git a/test/spec/server-selection/logging/operation-id.yml b/test/spec/server-selection/logging/operation-id.yml index 430e81a58b..739030b5b1 100644 --- a/test/spec/server-selection/logging/operation-id.yml +++ b/test/spec/server-selection/logging/operation-id.yml @@ -30,6 +30,9 @@ createEntities: - client: id: &failPointClient failPointClient +_yamlAnchors: + namespace: &namespace "logging-tests.server-selection" + tests: - description: "Successful bulkWrite operation: log messages have operationIds" operations: @@ -122,3 +125,97 @@ tests: operationId: { $$type: [int, long] } operation: insert + - description: "Successful client bulkWrite operation: log messages have operationIds" + runOnRequirements: + - minServerVersion: "8.0" # required for bulkWrite command + operations: + # ensure we've discovered the server so it is immediately available + # and no extra "waiting for suitable server" messages are emitted. + # expected topology events reflect initial server discovery and server connect event. + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + topologyDescriptionChangedEvent: {} + count: 2 + - name: clientBulkWrite + object: *client + arguments: + models: + - insertOne: + namespace: *namespace + document: { x: 1 } + expectLogMessages: + - client: *client + messages: + - level: debug + component: serverSelection + data: + message: "Server selection started" + operationId: { $$type: [int, long] } + operation: bulkWrite + - level: debug + component: serverSelection + data: + message: "Server selection succeeded" + operationId: { $$type: [int, long] } + operation: bulkWrite + + - description: "Failed client bulkWrite operation: log messages have operationIds" + runOnRequirements: + - minServerVersion: "8.0" # required for bulkWrite command + operations: + # fail all hello/legacy hello commands for the main client. + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: alwaysOn + data: + failCommands: ["hello", "ismaster"] + appName: *appName + closeConnection: true + # wait until we've marked the server unknown due + # to a failed heartbeat. + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + serverDescriptionChangedEvent: + newDescription: + type: Unknown + count: 1 + - name: clientBulkWrite + object: *client + arguments: + models: + - insertOne: + namespace: *namespace + document: { x: 1 } + expectError: + isClientError: true # server selection timeout + expectLogMessages: + - client: *client + messages: + - level: debug + component: serverSelection + data: + message: "Server selection started" + operationId: { $$type: [int, long] } + operation: bulkWrite + - level: info + component: serverSelection + data: + message: "Waiting for suitable server to become available" + operationId: { $$type: [int, long] } + operation: bulkWrite + - level: debug + component: serverSelection + data: + message: "Server selection failed" + operationId: { $$type: [int, long] } + operation: bulkWrite diff --git a/test/spec/transactions/unified/client-bulkWrite.json b/test/spec/transactions/unified/client-bulkWrite.json new file mode 100644 index 0000000000..4a8d013f8d --- /dev/null +++ b/test/spec/transactions/unified/client-bulkWrite.json @@ -0,0 +1,593 @@ +{ + "description": "client bulkWrite transactions", + "schemaVersion": "1.4", + "runOnRequirements": [ + { + "minServerVersion": "8.0", + "topologies": [ + "replicaset", + "sharded", + "load-balanced" + ], + "serverless": "forbid" + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent" + ] + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "transaction-tests" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "coll0" + } + }, + { + "session": { + "id": "session0", + "client": "client0" + } + }, + { + "client": { + "id": "client_with_wmajority", + "uriOptions": { + "w": "majority" + }, + "observeEvents": [ + "commandStartedEvent" + ] + } + }, + { + "session": { + "id": "session_with_wmajority", + "client": "client_with_wmajority" + } + } + ], + "_yamlAnchors": { + "namespace": "transaction-tests.coll0" + }, + "initialData": [ + { + "databaseName": "transaction-tests", + "collectionName": "coll0", + "documents": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + }, + { + "_id": 5, + "x": 55 + }, + { + "_id": 6, + "x": 66 + }, + { + "_id": 7, + "x": 77 + } + ] + } + ], + "tests": [ + { + "description": "client bulkWrite in a transaction", + "operations": [ + { + "object": "session0", + "name": "startTransaction" + }, + { + "object": "client0", + "name": "clientBulkWrite", + "arguments": { + "session": "session0", + "models": [ + { + "insertOne": { + "namespace": "transaction-tests.coll0", + "document": { + "_id": 8, + "x": 88 + } + } + }, + { + "updateOne": { + "namespace": "transaction-tests.coll0", + "filter": { + "_id": 1 + }, + "update": { + "$inc": { + "x": 1 + } + } + } + }, + { + "updateMany": { + "namespace": "transaction-tests.coll0", + "filter": { + "$and": [ + { + "_id": { + "$gt": 1 + } + }, + { + "_id": { + "$lte": 3 + } + } + ] + }, + "update": { + "$inc": { + "x": 2 + } + } + } + }, + { + "replaceOne": { + "namespace": "transaction-tests.coll0", + "filter": { + "_id": 4 + }, + "replacement": { + "x": 44 + }, + "upsert": true + } + }, + { + "deleteOne": { + "namespace": "transaction-tests.coll0", + "filter": { + "_id": 5 + } + } + }, + { + "deleteMany": { + "namespace": "transaction-tests.coll0", + "filter": { + "$and": [ + { + "_id": { + "$gt": 5 + } + }, + { + "_id": { + "$lte": 7 + } + } + ] + } + } + } + ], + "verboseResults": true + }, + "expectResult": { + "insertedCount": 1, + "upsertedCount": 1, + "matchedCount": 3, + "modifiedCount": 3, + "deletedCount": 3, + "insertResults": { + "0": { + "insertedId": 8 + } + }, + "updateResults": { + "1": { + "matchedCount": 1, + "modifiedCount": 1, + "upsertedId": { + "$$exists": false + } + }, + "2": { + "matchedCount": 2, + "modifiedCount": 2, + "upsertedId": { + "$$exists": false + } + }, + "3": { + "matchedCount": 1, + "modifiedCount": 0, + "upsertedId": 4 + } + }, + "deleteResults": { + "4": { + "deletedCount": 1 + }, + "5": { + "deletedCount": 2 + } + } + } + }, + { + "object": "session0", + "name": "commitTransaction" + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "commandName": "bulkWrite", + "databaseName": "admin", + "command": { + "lsid": { + "$$sessionLsid": "session0" + }, + "txnNumber": 1, + "startTransaction": true, + "autocommit": false, + "writeConcern": { + "$$exists": false + }, + "bulkWrite": 1, + "errorsOnly": false, + "ordered": true, + "ops": [ + { + "insert": 0, + "document": { + "_id": 8, + "x": 88 + } + }, + { + "update": 0, + "filter": { + "_id": 1 + }, + "updateMods": { + "$inc": { + "x": 1 + } + }, + "multi": false + }, + { + "update": 0, + "filter": { + "$and": [ + { + "_id": { + "$gt": 1 + } + }, + { + "_id": { + "$lte": 3 + } + } + ] + }, + "updateMods": { + "$inc": { + "x": 2 + } + }, + "multi": true + }, + { + "update": 0, + "filter": { + "_id": 4 + }, + "updateMods": { + "x": 44 + }, + "upsert": true, + "multi": false + }, + { + "delete": 0, + "filter": { + "_id": 5 + }, + "multi": false + }, + { + "delete": 0, + "filter": { + "$and": [ + { + "_id": { + "$gt": 5 + } + }, + { + "_id": { + "$lte": 7 + } + } + ] + }, + "multi": true + } + ], + "nsInfo": [ + { + "ns": "transaction-tests.coll0" + } + ] + } + } + }, + { + "commandStartedEvent": { + "commandName": "commitTransaction", + "databaseName": "admin", + "command": { + "commitTransaction": 1, + "lsid": { + "$$sessionLsid": "session0" + }, + "txnNumber": 1, + "startTransaction": { + "$$exists": false + }, + "autocommit": false, + "writeConcern": { + "$$exists": false + } + } + } + } + ] + } + ], + "outcome": [ + { + "collectionName": "coll0", + "databaseName": "transaction-tests", + "documents": [ + { + "_id": 1, + "x": 12 + }, + { + "_id": 2, + "x": 24 + }, + { + "_id": 3, + "x": 35 + }, + { + "_id": 4, + "x": 44 + }, + { + "_id": 8, + "x": 88 + } + ] + } + ] + }, + { + "description": "client writeConcern ignored for client bulkWrite in transaction", + "operations": [ + { + "object": "session_with_wmajority", + "name": "startTransaction", + "arguments": { + "writeConcern": { + "w": 1 + } + } + }, + { + "object": "client_with_wmajority", + "name": "clientBulkWrite", + "arguments": { + "session": "session_with_wmajority", + "models": [ + { + "insertOne": { + "namespace": "transaction-tests.coll0", + "document": { + "_id": 8, + "x": 88 + } + } + } + ] + }, + "expectResult": { + "insertedCount": 1, + "upsertedCount": 0, + "matchedCount": 0, + "modifiedCount": 0, + "deletedCount": 0, + "insertResults": { + "$$unsetOrMatches": {} + }, + "updateResults": { + "$$unsetOrMatches": {} + }, + "deleteResults": { + "$$unsetOrMatches": {} + } + } + }, + { + "object": "session_with_wmajority", + "name": "commitTransaction" + } + ], + "expectEvents": [ + { + "client": "client_with_wmajority", + "events": [ + { + "commandStartedEvent": { + "commandName": "bulkWrite", + "databaseName": "admin", + "command": { + "lsid": { + "$$sessionLsid": "session_with_wmajority" + }, + "txnNumber": 1, + "startTransaction": true, + "autocommit": false, + "writeConcern": { + "$$exists": false + }, + "bulkWrite": 1, + "errorsOnly": true, + "ordered": true, + "ops": [ + { + "insert": 0, + "document": { + "_id": 8, + "x": 88 + } + } + ], + "nsInfo": [ + { + "ns": "transaction-tests.coll0" + } + ] + } + } + }, + { + "commandStartedEvent": { + "command": { + "commitTransaction": 1, + "lsid": { + "$$sessionLsid": "session_with_wmajority" + }, + "txnNumber": { + "$numberLong": "1" + }, + "startTransaction": { + "$$exists": false + }, + "autocommit": false, + "writeConcern": { + "w": 1 + } + }, + "commandName": "commitTransaction", + "databaseName": "admin" + } + } + ] + } + ], + "outcome": [ + { + "collectionName": "coll0", + "databaseName": "transaction-tests", + "documents": [ + { + "_id": 1, + "x": 11 + }, + { + "_id": 2, + "x": 22 + }, + { + "_id": 3, + "x": 33 + }, + { + "_id": 5, + "x": 55 + }, + { + "_id": 6, + "x": 66 + }, + { + "_id": 7, + "x": 77 + }, + { + "_id": 8, + "x": 88 + } + ] + } + ] + }, + { + "description": "client bulkWrite with writeConcern in a transaction causes a transaction error", + "operations": [ + { + "object": "session0", + "name": "startTransaction" + }, + { + "object": "client0", + "name": "clientBulkWrite", + "arguments": { + "session": "session0", + "writeConcern": { + "w": 1 + }, + "models": [ + { + "insertOne": { + "namespace": "transaction-tests.coll0", + "document": { + "_id": 8, + "x": 88 + } + } + } + ] + }, + "expectError": { + "isClientError": true, + "errorContains": "Cannot set write concern after starting a transaction" + } + } + ] + } + ] +} diff --git a/test/spec/transactions/unified/client-bulkWrite.yml b/test/spec/transactions/unified/client-bulkWrite.yml new file mode 100644 index 0000000000..d80e618728 --- /dev/null +++ b/test/spec/transactions/unified/client-bulkWrite.yml @@ -0,0 +1,263 @@ +description: "client bulkWrite transactions" +schemaVersion: "1.4" # To support `serverless: forbid` +runOnRequirements: + - minServerVersion: "8.0" + topologies: + - replicaset + - sharded + - load-balanced + serverless: forbid + +createEntities: + - client: + id: &client0 client0 + observeEvents: [ commandStartedEvent ] + - database: + id: &database0 database0 + client: *client0 + databaseName: &database0Name transaction-tests + - collection: + id: &collection0 collection0 + database: *database0 + collectionName: &collection0Name coll0 + - session: + id: &session0 session0 + client: *client0 + - client: + id: &client_with_wmajority client_with_wmajority + uriOptions: + w: majority + observeEvents: + - commandStartedEvent + - session: + id: &session_with_wmajority session_with_wmajority + client: *client_with_wmajority + +_yamlAnchors: + namespace: &namespace "transaction-tests.coll0" + +initialData: + - databaseName: *database0Name + collectionName: *collection0Name + documents: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - { _id: 3, x: 33 } + - { _id: 5, x: 55 } + - { _id: 6, x: 66 } + - { _id: 7, x: 77 } + +tests: + - description: "client bulkWrite in a transaction" + operations: + - object: *session0 + name: startTransaction + - object: *client0 + name: clientBulkWrite + arguments: + session: *session0 + models: + - insertOne: + namespace: *namespace + document: { _id: 8, x: 88 } + - updateOne: + namespace: *namespace + filter: { _id: 1 } + update: { $inc: { x: 1 } } + - updateMany: + namespace: *namespace + filter: + $and: [ { _id: { $gt: 1 } }, { _id: { $lte: 3 } } ] + update: { $inc: { x: 2 } } + - replaceOne: + namespace: *namespace + filter: { _id: 4 } + replacement: { x: 44 } + upsert: true + - deleteOne: + namespace: *namespace + filter: { _id: 5 } + - deleteMany: + namespace: *namespace + filter: + $and: [ { _id: { $gt: 5 } }, { _id: { $lte: 7 } } ] + verboseResults: true + expectResult: + insertedCount: 1 + upsertedCount: 1 + matchedCount: 3 + modifiedCount: 3 + deletedCount: 3 + insertResults: + 0: + insertedId: 8 + updateResults: + 1: + matchedCount: 1 + modifiedCount: 1 + upsertedId: { $$exists: false } + 2: + matchedCount: 2 + modifiedCount: 2 + upsertedId: { $$exists: false } + 3: + matchedCount: 1 + modifiedCount: 0 + upsertedId: 4 + deleteResults: + 4: + deletedCount: 1 + 5: + deletedCount: 2 + - object: *session0 + name: commitTransaction + expectEvents: + - client: *client0 + events: + - commandStartedEvent: + commandName: bulkWrite + databaseName: admin + command: + lsid: { $$sessionLsid: *session0 } + txnNumber: 1 + startTransaction: true + autocommit: false + writeConcern: { $$exists: false } + bulkWrite: 1 + errorsOnly: false + ordered: true + ops: + - insert: 0 + document: { _id: 8, x: 88 } + - update: 0 + filter: { _id: 1 } + updateMods: { $inc: { x: 1 } } + multi: false + - update: 0 + filter: + $and: [ { _id: { $gt: 1 } }, { _id: { $lte: 3 } } ] + updateMods: { $inc: { x: 2 } } + multi: true + - update: 0 + filter: { _id: 4 } + updateMods: { x: 44 } + upsert: true + multi: false + - delete: 0 + filter: { _id: 5 } + multi: false + - delete: 0 + filter: + $and: [ { _id: { $gt: 5 } }, { _id: { $lte: 7 } } ] + multi: true + nsInfo: + - ns: *namespace + - commandStartedEvent: + commandName: commitTransaction + databaseName: admin + command: + commitTransaction: 1 + lsid: { $$sessionLsid: *session0 } + txnNumber: 1 + startTransaction: { $$exists: false } + autocommit: false + writeConcern: { $$exists: false } + outcome: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1, x: 12 } + - { _id: 2, x: 24 } + - { _id: 3, x: 35 } + - { _id: 4, x: 44 } + - { _id: 8, x: 88 } + - description: 'client writeConcern ignored for client bulkWrite in transaction' + operations: + - object: *session_with_wmajority + name: startTransaction + arguments: + writeConcern: + w: 1 + - object: *client_with_wmajority + name: clientBulkWrite + arguments: + session: *session_with_wmajority + models: + - insertOne: + namespace: *namespace + document: { _id: 8, x: 88 } + expectResult: + insertedCount: 1 + upsertedCount: 0 + matchedCount: 0 + modifiedCount: 0 + deletedCount: 0 + insertResults: + $$unsetOrMatches: {} + updateResults: + $$unsetOrMatches: {} + deleteResults: + $$unsetOrMatches: {} + - object: *session_with_wmajority + name: commitTransaction + expectEvents: + - + client: *client_with_wmajority + events: + - commandStartedEvent: + commandName: bulkWrite + databaseName: admin + command: + lsid: { $$sessionLsid: *session_with_wmajority } + txnNumber: 1 + startTransaction: true + autocommit: false + writeConcern: { $$exists: false } + bulkWrite: 1 + errorsOnly: true + ordered: true + ops: + - insert: 0 + document: { _id: 8, x: 88 } + nsInfo: + - ns: *namespace + - + commandStartedEvent: + command: + commitTransaction: 1 + lsid: { $$sessionLsid: *session_with_wmajority } + txnNumber: { $numberLong: '1' } + startTransaction: { $$exists: false } + autocommit: false + writeConcern: + w: 1 + commandName: commitTransaction + databaseName: admin + outcome: + - collectionName: *collection0Name + databaseName: *database0Name + documents: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - { _id: 3, x: 33 } + - { _id: 5, x: 55 } + - { _id: 6, x: 66 } + - { _id: 7, x: 77 } + - { _id: 8, x: 88 } + - description: "client bulkWrite with writeConcern in a transaction causes a transaction error" + operations: + - object: *session0 + name: startTransaction + - object: *client0 + name: clientBulkWrite + arguments: + session: *session0 + writeConcern: + w: 1 + models: + - insertOne: + namespace: *namespace + document: { _id: 8, x: 88 } + expectError: + isClientError: true + errorContains: "Cannot set write concern after starting a transaction" diff --git a/test/spec/transactions/unified/mongos-pin-auto.json b/test/spec/transactions/unified/mongos-pin-auto.json index 93eac8bb77..27db520401 100644 --- a/test/spec/transactions/unified/mongos-pin-auto.json +++ b/test/spec/transactions/unified/mongos-pin-auto.json @@ -2004,6 +2004,104 @@ } ] }, + { + "description": "remain pinned after non-transient Interrupted error on clientBulkWrite bulkWrite", + "operations": [ + { + "object": "session0", + "name": "startTransaction" + }, + { + "object": "collection0", + "name": "insertOne", + "arguments": { + "session": "session0", + "document": { + "_id": 3 + } + }, + "expectResult": { + "$$unsetOrMatches": { + "insertedId": { + "$$unsetOrMatches": 3 + } + } + } + }, + { + "name": "targetedFailPoint", + "object": "testRunner", + "arguments": { + "session": "session0", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "bulkWrite" + ], + "errorCode": 11601 + } + } + } + }, + { + "name": "clientBulkWrite", + "object": "client0", + "arguments": { + "session": "session0", + "models": [ + { + "insertOne": { + "namespace": "database0.collection0", + "document": { + "_id": 8, + "x": 88 + } + } + } + ] + }, + "expectError": { + "errorLabelsOmit": [ + "TransientTransactionError" + ] + } + }, + { + "object": "testRunner", + "name": "assertSessionPinned", + "arguments": { + "session": "session0" + } + }, + { + "object": "session0", + "name": "abortTransaction" + } + ], + "outcome": [ + { + "collectionName": "test", + "databaseName": "transaction-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "runOnRequirements": [ + { + "minServerVersion": "8.0" + } + ] + }, { "description": "unpin after transient connection error on insertOne insert", "operations": [ @@ -5175,6 +5273,202 @@ ] } ] + }, + { + "description": "unpin after transient connection error on clientBulkWrite bulkWrite", + "operations": [ + { + "object": "session0", + "name": "startTransaction" + }, + { + "object": "collection0", + "name": "insertOne", + "arguments": { + "session": "session0", + "document": { + "_id": 3 + } + }, + "expectResult": { + "$$unsetOrMatches": { + "insertedId": { + "$$unsetOrMatches": 3 + } + } + } + }, + { + "name": "targetedFailPoint", + "object": "testRunner", + "arguments": { + "session": "session0", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "bulkWrite" + ], + "closeConnection": true + } + } + } + }, + { + "name": "clientBulkWrite", + "object": "client0", + "arguments": { + "session": "session0", + "models": [ + { + "insertOne": { + "namespace": "database0.collection0", + "document": { + "_id": 8, + "x": 88 + } + } + } + ] + }, + "expectError": { + "errorLabelsContain": [ + "TransientTransactionError" + ] + } + }, + { + "object": "testRunner", + "name": "assertSessionUnpinned", + "arguments": { + "session": "session0" + } + }, + { + "object": "session0", + "name": "abortTransaction" + } + ], + "outcome": [ + { + "collectionName": "test", + "databaseName": "transaction-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "runOnRequirements": [ + { + "minServerVersion": "8.0" + } + ] + }, + { + "description": "unpin after transient ShutdownInProgress error on clientBulkWrite bulkWrite", + "operations": [ + { + "object": "session0", + "name": "startTransaction" + }, + { + "object": "collection0", + "name": "insertOne", + "arguments": { + "session": "session0", + "document": { + "_id": 3 + } + }, + "expectResult": { + "$$unsetOrMatches": { + "insertedId": { + "$$unsetOrMatches": 3 + } + } + } + }, + { + "name": "targetedFailPoint", + "object": "testRunner", + "arguments": { + "session": "session0", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "bulkWrite" + ], + "errorCode": 91 + } + } + } + }, + { + "name": "clientBulkWrite", + "object": "client0", + "arguments": { + "session": "session0", + "models": [ + { + "insertOne": { + "namespace": "database0.collection0", + "document": { + "_id": 8, + "x": 88 + } + } + } + ] + }, + "expectError": { + "errorLabelsContain": [ + "TransientTransactionError" + ] + } + }, + { + "object": "testRunner", + "name": "assertSessionUnpinned", + "arguments": { + "session": "session0" + } + }, + { + "object": "session0", + "name": "abortTransaction" + } + ], + "outcome": [ + { + "collectionName": "test", + "databaseName": "transaction-tests", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + ], + "runOnRequirements": [ + { + "minServerVersion": "8.0" + } + ] } ] } diff --git a/test/spec/transactions/unified/mongos-pin-auto.yml b/test/spec/transactions/unified/mongos-pin-auto.yml index 7a76347555..a80dd62031 100644 --- a/test/spec/transactions/unified/mongos-pin-auto.yml +++ b/test/spec/transactions/unified/mongos-pin-auto.yml @@ -676,6 +676,36 @@ tests: - *abortTransaction outcome: *outcome + - description: remain pinned after non-transient Interrupted error on clientBulkWrite bulkWrite + operations: + - *startTransaction + - *initialCommand + - name: targetedFailPoint + object: testRunner + arguments: + session: *session0 + failPoint: + configureFailPoint: failCommand + mode: {times: 1} + data: + failCommands: ["bulkWrite"] + errorCode: 11601 + - name: clientBulkWrite + object: *client0 + arguments: + session: *session0 + models: + - insertOne: + namespace: database0.collection0 + document: { _id: 8, x: 88 } + expectError: + errorLabelsOmit: ["TransientTransactionError"] + - *assertSessionPinned + - *abortTransaction + outcome: *outcome + runOnRequirements: + - minServerVersion: "8.0" # `bulkWrite` added to server 8.0" + - description: unpin after transient connection error on insertOne insert operations: - *startTransaction @@ -1614,3 +1644,63 @@ tests: - *abortTransaction outcome: *outcome + - description: unpin after transient connection error on clientBulkWrite bulkWrite + operations: + - *startTransaction + - *initialCommand + - name: targetedFailPoint + object: testRunner + arguments: + session: *session0 + failPoint: + configureFailPoint: failCommand + mode: {times: 1} + data: + failCommands: ["bulkWrite"] + closeConnection: true + - name: clientBulkWrite + object: *client0 + arguments: + session: *session0 + models: + - insertOne: + namespace: database0.collection0 + document: { _id: 8, x: 88 } + expectError: + errorLabelsContain: ["TransientTransactionError"] + - *assertSessionUnpinned + - *abortTransaction + outcome: *outcome + runOnRequirements: + - minServerVersion: "8.0" # `bulkWrite` added to server 8.0" + + - description: unpin after transient ShutdownInProgress error on clientBulkWrite bulkWrite + operations: + - *startTransaction + - *initialCommand + - name: targetedFailPoint + object: testRunner + arguments: + session: *session0 + failPoint: + configureFailPoint: failCommand + mode: {times: 1} + data: + failCommands: ["bulkWrite"] + errorCode: 91 + - name: clientBulkWrite + object: *client0 + arguments: + session: *session0 + models: + - insertOne: + namespace: database0.collection0 + document: { _id: 8, x: 88 } + expectError: + errorLabelsContain: ["TransientTransactionError"] + - *assertSessionUnpinned + - *abortTransaction + outcome: *outcome + runOnRequirements: + - minServerVersion: "8.0" # `bulkWrite` added to server 8.0" + diff --git a/test/spec/versioned-api/crud-api-version-1.json b/test/spec/versioned-api/crud-api-version-1.json index a387d0587e..23ef59a6d9 100644 --- a/test/spec/versioned-api/crud-api-version-1.json +++ b/test/spec/versioned-api/crud-api-version-1.json @@ -50,7 +50,8 @@ }, "apiDeprecationErrors": true } - ] + ], + "namespace": "versioned-api-tests.test" }, "initialData": [ { @@ -426,6 +427,86 @@ } ] }, + { + "description": "client bulkWrite appends declared API version", + "runOnRequirements": [ + { + "minServerVersion": "8.0", + "serverless": "forbid" + } + ], + "operations": [ + { + "name": "clientBulkWrite", + "object": "client", + "arguments": { + "models": [ + { + "insertOne": { + "namespace": "versioned-api-tests.test", + "document": { + "_id": 6, + "x": 6 + } + } + } + ], + "verboseResults": true + }, + "expectResult": { + "insertedCount": 1, + "upsertedCount": 0, + "matchedCount": 0, + "modifiedCount": 0, + "deletedCount": 0, + "insertResults": { + "0": { + "insertedId": 6 + } + }, + "updateResults": {}, + "deleteResults": {} + } + } + ], + "expectEvents": [ + { + "client": "client", + "events": [ + { + "commandStartedEvent": { + "commandName": "bulkWrite", + "databaseName": "admin", + "command": { + "bulkWrite": 1, + "errorsOnly": false, + "ordered": true, + "ops": [ + { + "insert": 0, + "document": { + "_id": 6, + "x": 6 + } + } + ], + "nsInfo": [ + { + "ns": "versioned-api-tests.test" + } + ], + "apiVersion": "1", + "apiStrict": { + "$$unsetOrMatches": false + }, + "apiDeprecationErrors": true + } + } + } + ] + } + ] + }, { "description": "countDocuments appends declared API version", "operations": [ diff --git a/test/spec/versioned-api/crud-api-version-1.yml b/test/spec/versioned-api/crud-api-version-1.yml index 50135c1458..01e0323420 100644 --- a/test/spec/versioned-api/crud-api-version-1.yml +++ b/test/spec/versioned-api/crud-api-version-1.yml @@ -34,6 +34,7 @@ _yamlAnchors: apiVersion: "1" apiStrict: { $$unsetOrMatches: false } apiDeprecationErrors: true + namespace: &namespace "versioned-api-tests.test" initialData: - collectionName: *collectionName @@ -155,6 +156,47 @@ tests: multi: { $$unsetOrMatches: false } upsert: true <<: *expectedApiVersion + + - description: "client bulkWrite appends declared API version" + runOnRequirements: + - minServerVersion: "8.0" # `bulkWrite` added to server 8.0 + serverless: forbid + operations: + - name: clientBulkWrite + object: *client + arguments: + models: + - insertOne: + namespace: *namespace + document: { _id: 6, x: 6 } + verboseResults: true + expectResult: + insertedCount: 1 + upsertedCount: 0 + matchedCount: 0 + modifiedCount: 0 + deletedCount: 0 + insertResults: + 0: + insertedId: 6 + updateResults: {} + deleteResults: {} + expectEvents: + - client: *client + events: + - commandStartedEvent: + commandName: bulkWrite + databaseName: admin + command: + bulkWrite: 1 + errorsOnly: false + ordered: true + ops: + - insert: 0 + document: { _id: 6, x: 6 } + nsInfo: + - { ns: *namespace } + <<: *expectedApiVersion - description: "countDocuments appends declared API version" operations: diff --git a/test/tools/unified-spec-runner/match.ts b/test/tools/unified-spec-runner/match.ts index 3a4d4e5e3d..f92004c776 100644 --- a/test/tools/unified-spec-runner/match.ts +++ b/test/tools/unified-spec-runner/match.ts @@ -213,7 +213,18 @@ export function resultCheck( } resultCheck(actual[key], value, entities, path, checkExtraKeys); } else { - resultCheck(actual[key], value, entities, path, checkExtraKeys); + // If our actual value is a map, such as in the client bulk write results, we need + // to convert the expected keys from the string numbers to actual numbers since the key + // values in the maps are actual numbers. + const isActualMap = actual instanceof Map; + const mapKey = !Number.isNaN(Number(key)) ? Number(key) : key; + resultCheck( + isActualMap ? actual.get(mapKey) : actual[key], + value, + entities, + path, + checkExtraKeys + ); } } diff --git a/test/tools/unified-spec-runner/operations.ts b/test/tools/unified-spec-runner/operations.ts index 51d458a185..9cc67174f3 100644 --- a/test/tools/unified-spec-runner/operations.ts +++ b/test/tools/unified-spec-runner/operations.ts @@ -202,6 +202,16 @@ operations.set('bulkWrite', async ({ entities, operation }) => { return collection.bulkWrite(requests, opts); }); +operations.set('clientBulkWrite', async ({ entities, operation }) => { + const client = entities.getEntity('client', operation.object); + const { models, ...opts } = operation.arguments!; + const bulkWriteModels = models.map(model => { + const name = Object.keys(model)[0]; + return { name: name, ...model[name] }; + }); + return client.bulkWrite(bulkWriteModels, opts); +}); + // The entity exists for the name but can potentially have the wrong // type (stream/cursor) which will also throw an exception even when // telling getEntity() to ignore checking existence. diff --git a/test/unit/cmap/commands.test.ts b/test/unit/cmap/commands.test.ts index 07e3fccbb6..5725f5b249 100644 --- a/test/unit/cmap/commands.test.ts +++ b/test/unit/cmap/commands.test.ts @@ -15,7 +15,7 @@ describe('commands', function () { context('when there is one document sequence', function () { const command = { test: 1, - field: new DocumentSequence([{ test: 1 }]) + field: new DocumentSequence('field', [{ test: 1 }]) }; const msg = new OpMsgRequest('admin', command, {}); const buffers = msg.toBin(); @@ -41,7 +41,7 @@ describe('commands', function () { it('sets the length of the document sequence', function () { // Bytes starting at index 1 is a 4 byte length. - expect(buffers[3].readInt32LE(1)).to.equal(20); + expect(buffers[3].readInt32LE(1)).to.equal(25); }); it('sets the name of the first field to be replaced', function () { @@ -53,8 +53,8 @@ describe('commands', function () { context('when there are multiple document sequences', function () { const command = { test: 1, - fieldOne: new DocumentSequence([{ test: 1 }]), - fieldTwo: new DocumentSequence([{ test: 1 }]) + fieldOne: new DocumentSequence('fieldOne', [{ test: 1 }]), + fieldTwo: new DocumentSequence('fieldTwo', [{ test: 1 }]) }; const msg = new OpMsgRequest('admin', command, {}); const buffers = msg.toBin(); @@ -81,7 +81,7 @@ describe('commands', function () { it('sets the length of the first document sequence', function () { // Bytes starting at index 1 is a 4 byte length. - expect(buffers[3].readInt32LE(1)).to.equal(23); + expect(buffers[3].readInt32LE(1)).to.equal(28); }); it('sets the name of the first field to be replaced', function () { @@ -91,17 +91,17 @@ describe('commands', function () { it('sets the document sequence sections second type to 1', function () { // First byte is a one byte type. - expect(buffers[3][28]).to.equal(1); + expect(buffers[3][29]).to.equal(1); }); it('sets the length of the second document sequence', function () { // Bytes starting at index 1 is a 4 byte length. - expect(buffers[3].readInt32LE(29)).to.equal(23); + expect(buffers[3].readInt32LE(30)).to.equal(28); }); it('sets the name of the second field to be replaced', function () { // Bytes starting at index 33 is the field name. - expect(buffers[3].toString('utf8', 33, 41)).to.equal('fieldTwo'); + expect(buffers[3].toString('utf8', 34, 42)).to.equal('fieldTwo'); }); }); }); diff --git a/test/unit/index.test.ts b/test/unit/index.test.ts index 980747c8c7..883cc4b4ba 100644 --- a/test/unit/index.test.ts +++ b/test/unit/index.test.ts @@ -73,6 +73,8 @@ const EXPECTED_EXPORTS = [ 'MongoChangeStreamError', 'MongoClient', 'MongoClientAuthProviders', + 'MongoClientBulkWriteCursorError', + 'MongoClientBulkWriteExecutionError', 'MongoCompatibilityError', 'MongoCryptAzureKMSRequestError', 'MongoCryptCreateDataKeyError', diff --git a/test/unit/operations/client_bulk_write/command_builder.test.ts b/test/unit/operations/client_bulk_write/command_builder.test.ts index b0e69e2b23..fade57d408 100644 --- a/test/unit/operations/client_bulk_write/command_builder.test.ts +++ b/test/unit/operations/client_bulk_write/command_builder.test.ts @@ -14,23 +14,27 @@ import { type ClientReplaceOneModel, type ClientUpdateManyModel, type ClientUpdateOneModel, - DocumentSequence + DEFAULT_PK_FACTORY, + DocumentSequence, + ObjectId } from '../../../mongodb'; describe('ClientBulkWriteCommandBuilder', function () { describe('#buildCommand', function () { context('when custom options are provided', function () { + const id = new ObjectId(); const model: ClientInsertOneModel = { name: 'insertOne', namespace: 'test.coll', - document: { name: 1 } + document: { _id: id, name: 1 } }; const builder = new ClientBulkWriteCommandBuilder([model], { verboseResults: true, bypassDocumentValidation: true, - ordered: false + ordered: false, + comment: { bulk: 'write' } }); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000, 100000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); @@ -50,24 +54,32 @@ describe('ClientBulkWriteCommandBuilder', function () { it('sets the ops document sequence', function () { expect(commands[0].ops).to.be.instanceOf(DocumentSequence); - expect(commands[0].ops.documents[0]).to.deep.equal({ insert: 0, document: { name: 1 } }); + expect(commands[0].ops.documents[0]).to.deep.equal({ + insert: 0, + document: { _id: id, name: 1 } + }); }); it('sets the nsInfo document sequence', function () { expect(commands[0].nsInfo).to.be.instanceOf(DocumentSequence); expect(commands[0].nsInfo.documents[0]).to.deep.equal({ ns: 'test.coll' }); }); + + it('passes comment options into the commands', function () { + expect(commands[0].comment).to.deep.equal({ bulk: 'write' }); + }); }); context('when no options are provided', function () { context('when a single model is provided', function () { + const id = new ObjectId(); const model: ClientInsertOneModel = { name: 'insertOne', namespace: 'test.coll', - document: { name: 1 } + document: { _id: id, name: 1 } }; const builder = new ClientBulkWriteCommandBuilder([model], {}); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000, 100000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); @@ -83,7 +95,10 @@ describe('ClientBulkWriteCommandBuilder', function () { it('sets the ops document sequence', function () { expect(commands[0].ops).to.be.instanceOf(DocumentSequence); - expect(commands[0].ops.documents[0]).to.deep.equal({ insert: 0, document: { name: 1 } }); + expect(commands[0].ops.documents[0]).to.deep.equal({ + insert: 0, + document: { _id: id, name: 1 } + }); }); it('sets the nsInfo document sequence', function () { @@ -93,19 +108,75 @@ describe('ClientBulkWriteCommandBuilder', function () { }); context('when multiple models are provided', function () { + context('when exceeding the max batch size', function () { + const idOne = new ObjectId(); + const idTwo = new ObjectId(); + const modelOne: ClientInsertOneModel = { + name: 'insertOne', + namespace: 'test.coll', + document: { _id: idOne, name: 1 } + }; + const modelTwo: ClientInsertOneModel = { + name: 'insertOne', + namespace: 'test.coll', + document: { _id: idTwo, name: 2 } + }; + const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); + const commands = builder.buildCommands(48000000, 1); + + it('splits the operations into multiple commands', function () { + expect(commands.length).to.equal(2); + expect(commands[0].ops.documents).to.deep.equal([ + { insert: 0, document: { _id: idOne, name: 1 } } + ]); + expect(commands[1].ops.documents).to.deep.equal([ + { insert: 0, document: { _id: idTwo, name: 2 } } + ]); + }); + }); + + context('when exceeding the max message size in bytes', function () { + const idOne = new ObjectId(); + const idTwo = new ObjectId(); + const modelOne: ClientInsertOneModel = { + name: 'insertOne', + namespace: 'test.coll', + document: { _id: idOne, name: 1 } + }; + const modelTwo: ClientInsertOneModel = { + name: 'insertOne', + namespace: 'test.coll', + document: { _id: idTwo, name: 2 } + }; + const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); + const commands = builder.buildCommands(1090, 100000); + + it('splits the operations into multiple commands', function () { + expect(commands.length).to.equal(2); + expect(commands[0].ops.documents).to.deep.equal([ + { insert: 0, document: { _id: idOne, name: 1 } } + ]); + expect(commands[1].ops.documents).to.deep.equal([ + { insert: 0, document: { _id: idTwo, name: 2 } } + ]); + }); + }); + context('when the namespace is the same', function () { + const idOne = new ObjectId(); + const idTwo = new ObjectId(); const modelOne: ClientInsertOneModel = { name: 'insertOne', namespace: 'test.coll', - document: { name: 1 } + document: { _id: idOne, name: 1 } }; const modelTwo: ClientInsertOneModel = { name: 'insertOne', namespace: 'test.coll', - document: { name: 2 } + document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000, 100000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); @@ -114,8 +185,8 @@ describe('ClientBulkWriteCommandBuilder', function () { it('sets the ops document sequence', function () { expect(commands[0].ops).to.be.instanceOf(DocumentSequence); expect(commands[0].ops.documents).to.deep.equal([ - { insert: 0, document: { name: 1 } }, - { insert: 0, document: { name: 2 } } + { insert: 0, document: { _id: idOne, name: 1 } }, + { insert: 0, document: { _id: idTwo, name: 2 } } ]); }); @@ -126,18 +197,20 @@ describe('ClientBulkWriteCommandBuilder', function () { }); context('when the namespace differs', function () { + const idOne = new ObjectId(); + const idTwo = new ObjectId(); const modelOne: ClientInsertOneModel = { name: 'insertOne', namespace: 'test.coll', - document: { name: 1 } + document: { _id: idOne, name: 1 } }; const modelTwo: ClientInsertOneModel = { name: 'insertOne', namespace: 'test.coll2', - document: { name: 2 } + document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000, 100000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); @@ -146,8 +219,8 @@ describe('ClientBulkWriteCommandBuilder', function () { it('sets the ops document sequence', function () { expect(commands[0].ops).to.be.instanceOf(DocumentSequence); expect(commands[0].ops.documents).to.deep.equal([ - { insert: 0, document: { name: 1 } }, - { insert: 1, document: { name: 2 } } + { insert: 0, document: { _id: idOne, name: 1 } }, + { insert: 1, document: { _id: idTwo, name: 2 } } ]); }); @@ -161,23 +234,26 @@ describe('ClientBulkWriteCommandBuilder', function () { }); context('when the namespaces are intermixed', function () { + const idOne = new ObjectId(); + const idTwo = new ObjectId(); + const idThree = new ObjectId(); const modelOne: ClientInsertOneModel = { name: 'insertOne', namespace: 'test.coll', - document: { name: 1 } + document: { _id: idOne, name: 1 } }; const modelTwo: ClientInsertOneModel = { name: 'insertOne', namespace: 'test.coll2', - document: { name: 2 } + document: { _id: idTwo, name: 2 } }; const modelThree: ClientInsertOneModel = { name: 'insertOne', namespace: 'test.coll', - document: { name: 2 } + document: { _id: idThree, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo, modelThree], {}); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000, 100000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); @@ -186,9 +262,9 @@ describe('ClientBulkWriteCommandBuilder', function () { it('sets the ops document sequence', function () { expect(commands[0].ops).to.be.instanceOf(DocumentSequence); expect(commands[0].ops.documents).to.deep.equal([ - { insert: 0, document: { name: 1 } }, - { insert: 1, document: { name: 2 } }, - { insert: 0, document: { name: 2 } } + { insert: 0, document: { _id: idOne, name: 1 } }, + { insert: 1, document: { _id: idTwo, name: 2 } }, + { insert: 0, document: { _id: idThree, name: 2 } } ]); }); @@ -205,15 +281,33 @@ describe('ClientBulkWriteCommandBuilder', function () { }); describe('#buildInsertOneOperation', function () { - const model: ClientInsertOneModel = { - name: 'insertOne', - namespace: 'test.coll', - document: { name: 1 } - }; - const operation = buildInsertOneOperation(model, 5); - - it('generates the insert operation', function () { - expect(operation).to.deep.equal({ insert: 5, document: { name: 1 } }); + context('when no _id exists on the document', function () { + const model: ClientInsertOneModel = { + name: 'insertOne', + namespace: 'test.coll', + document: { name: 1 } + }; + const operation = buildInsertOneOperation(model, 5, DEFAULT_PK_FACTORY); + + it('generates the insert operation with an _id', function () { + expect(operation.insert).to.equal(5); + expect(operation.document.name).to.equal(1); + expect(operation.document).to.have.property('_id'); + }); + }); + + context('when an _id exists on the document', function () { + const id = new ObjectId(); + const model: ClientInsertOneModel = { + name: 'insertOne', + namespace: 'test.coll', + document: { _id: id, name: 1 } + }; + const operation = buildInsertOneOperation(model, 5, DEFAULT_PK_FACTORY); + + it('generates the insert operation with an _id', function () { + expect(operation).to.deep.equal({ insert: 5, document: { _id: id, name: 1 } }); + }); }); }); @@ -317,7 +411,8 @@ describe('ClientBulkWriteCommandBuilder', function () { update: { $set: { name: 2 } }, hint: 'test', upsert: true, - arrayFilters: [{ test: 1 }] + arrayFilters: [{ test: 1 }], + collation: { locale: 'de' } }; const operation = buildUpdateOneOperation(model, 5); @@ -329,7 +424,8 @@ describe('ClientBulkWriteCommandBuilder', function () { multi: false, hint: 'test', upsert: true, - arrayFilters: [{ test: 1 }] + arrayFilters: [{ test: 1 }], + collation: { locale: 'de' } }); }); }); @@ -363,7 +459,8 @@ describe('ClientBulkWriteCommandBuilder', function () { update: { $set: { name: 2 } }, hint: 'test', upsert: true, - arrayFilters: [{ test: 1 }] + arrayFilters: [{ test: 1 }], + collation: { locale: 'de' } }; const operation = buildUpdateManyOperation(model, 5); @@ -375,7 +472,8 @@ describe('ClientBulkWriteCommandBuilder', function () { multi: true, hint: 'test', upsert: true, - arrayFilters: [{ test: 1 }] + arrayFilters: [{ test: 1 }], + collation: { locale: 'de' } }); }); }); @@ -408,7 +506,8 @@ describe('ClientBulkWriteCommandBuilder', function () { filter: { name: 1 }, replacement: { name: 2 }, hint: 'test', - upsert: true + upsert: true, + collation: { locale: 'de' } }; const operation = buildReplaceOneOperation(model, 5); @@ -419,7 +518,8 @@ describe('ClientBulkWriteCommandBuilder', function () { updateMods: { name: 2 }, multi: false, hint: 'test', - upsert: true + upsert: true, + collation: { locale: 'de' } }); }); }); diff --git a/test/unit/operations/client_bulk_write/results_merger.test.ts b/test/unit/operations/client_bulk_write/results_merger.test.ts new file mode 100644 index 0000000000..342502eebb --- /dev/null +++ b/test/unit/operations/client_bulk_write/results_merger.test.ts @@ -0,0 +1,312 @@ +import { expect } from 'chai'; + +import { + BSON, + ClientBulkWriteCursorResponse, + type ClientBulkWriteResult, + ClientBulkWriteResultsMerger, + Long +} from '../../../mongodb'; + +describe('ClientBulkWriteResultsMerger', function () { + describe('#constructor', function () { + const resultsMerger = new ClientBulkWriteResultsMerger({}); + + it('initializes the result', function () { + expect(resultsMerger.result).to.deep.equal({ + insertedCount: 0, + upsertedCount: 0, + matchedCount: 0, + modifiedCount: 0, + deletedCount: 0, + deleteResults: undefined, + insertResults: undefined, + updateResults: undefined + }); + }); + }); + + describe('#merge', function () { + context('when the bulk write is acknowledged', function () { + context('when merging on the first batch', function () { + context('when requesting verbose results', function () { + // An example verbose response from the server without errors: + // { + // cursor: { + // id: Long('0'), + // firstBatch: [ { ok: 1, idx: 0, n: 1 }, { ok: 1, idx: 1, n: 1 } ], + // ns: 'admin.$cmd.bulkWrite' + // }, + // nErrors: 0, + // nInserted: 2, + // nMatched: 0, + // nModified: 0, + // nUpserted: 0, + // nDeleted: 0, + // ok: 1 + // } + context('when there are no errors', function () { + const operations = [ + { insert: 0, document: { _id: 1 } }, + { update: 0 }, + { update: 0 }, + { delete: 0 } + ]; + const documents = [ + { ok: 1, idx: 0, n: 1 }, // Insert + { ok: 1, idx: 1, n: 1, nModified: 1 }, // Update match + { ok: 1, idx: 2, n: 0, upserted: { _id: 1 } }, // Update no match with upsert + { ok: 1, idx: 3, n: 1 } // Delete + ]; + const serverResponse = { + cursor: { + id: new Long('0'), + firstBatch: documents, + ns: 'admin.$cmd.bulkWrite' + }, + nErrors: 0, + nInserted: 1, + nMatched: 1, + nModified: 1, + nUpserted: 1, + nDeleted: 1, + ok: 1 + }; + const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); + const merger = new ClientBulkWriteResultsMerger({ verboseResults: true }); + let result: ClientBulkWriteResult; + + before(function () { + result = merger.merge(0, operations, response, documents); + }); + + it('merges the inserted count', function () { + expect(result.insertedCount).to.equal(1); + }); + + it('sets insert results', function () { + expect(result.insertResults.get(0).insertedId).to.equal(1); + }); + + it('merges the upserted count', function () { + expect(result.upsertedCount).to.equal(1); + }); + + it('merges the matched count', function () { + expect(result.matchedCount).to.equal(1); + }); + + it('merges the modified count', function () { + expect(result.modifiedCount).to.equal(1); + }); + + it('sets the update results', function () { + expect(result.updateResults.get(1)).to.deep.equal({ + matchedCount: 1, + modifiedCount: 1, + didUpsert: false + }); + }); + + it('sets the upsert results', function () { + expect(result.updateResults.get(2)).to.deep.equal({ + matchedCount: 0, + modifiedCount: 0, + upsertedId: 1, + didUpsert: true + }); + }); + + it('merges the deleted count', function () { + expect(result.deletedCount).to.equal(1); + }); + + it('sets the delete results', function () { + expect(result.deleteResults.get(3).deletedCount).to.equal(1); + }); + }); + }); + + context('when not requesting verbose results', function () { + // An example verbose response from the server without errors: + // { + // cursor: { + // id: Long('0'), + // firstBatch: [], + // ns: 'admin.$cmd.bulkWrite' + // }, + // nErrors: 0, + // nInserted: 2, + // nMatched: 0, + // nModified: 0, + // nUpserted: 0, + // nDeleted: 0, + // ok: 1 + // } + context('when there are no errors', function () { + const operations = [ + { insert: 0, document: { _id: 1 } }, + { update: 0 }, + { update: 0 }, + { delete: 0 } + ]; + const documents = []; + const serverResponse = { + cursor: { + id: new Long('0'), + firstBatch: documents, + ns: 'admin.$cmd.bulkWrite' + }, + nErrors: 0, + nInserted: 1, + nMatched: 1, + nModified: 1, + nUpserted: 1, + nDeleted: 1, + ok: 1 + }; + const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); + const merger = new ClientBulkWriteResultsMerger({ verboseResults: false }); + let result: ClientBulkWriteResult; + + before(function () { + result = merger.merge(0, operations, response, documents); + }); + + it('merges the inserted count', function () { + expect(result.insertedCount).to.equal(1); + }); + + it('sets no insert results', function () { + expect(result.insertResults).to.equal(undefined); + }); + + it('merges the upserted count', function () { + expect(result.upsertedCount).to.equal(1); + }); + + it('merges the matched count', function () { + expect(result.matchedCount).to.equal(1); + }); + + it('merges the modified count', function () { + expect(result.modifiedCount).to.equal(1); + }); + + it('sets no update results', function () { + expect(result.updateResults).to.equal(undefined); + }); + + it('merges the deleted count', function () { + expect(result.deletedCount).to.equal(1); + }); + + it('sets no delete results', function () { + expect(result.deleteResults).to.equal(undefined); + }); + }); + }); + }); + + context('when merging on a later batch', function () { + context('when requesting verbose results', function () { + // An example verbose response from the server without errors: + // { + // cursor: { + // id: Long('0'), + // firstBatch: [ { ok: 1, idx: 0, n: 1 }, { ok: 1, idx: 1, n: 1 } ], + // ns: 'admin.$cmd.bulkWrite' + // }, + // nErrors: 0, + // nInserted: 2, + // nMatched: 0, + // nModified: 0, + // nUpserted: 0, + // nDeleted: 0, + // ok: 1 + // } + context('when there are no errors', function () { + const operations = [ + { insert: 0, document: { _id: 1 } }, + { update: 0 }, + { update: 0 }, + { delete: 0 } + ]; + const documents = [ + { ok: 1, idx: 0, n: 1 }, // Insert + { ok: 1, idx: 1, n: 1, nModified: 1 }, // Update match + { ok: 1, idx: 2, n: 0, upserted: { _id: 1 } }, // Update no match with upsert + { ok: 1, idx: 3, n: 1 } // Delete + ]; + const serverResponse = { + cursor: { + id: new Long('0'), + firstBatch: documents, + ns: 'admin.$cmd.bulkWrite' + }, + nErrors: 0, + nInserted: 1, + nMatched: 1, + nModified: 1, + nUpserted: 1, + nDeleted: 1, + ok: 1 + }; + const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); + const merger = new ClientBulkWriteResultsMerger({ verboseResults: true }); + let result: ClientBulkWriteResult; + + before(function () { + result = merger.merge(20, operations, response, documents); + }); + + it('merges the inserted count', function () { + expect(result.insertedCount).to.equal(1); + }); + + it('sets insert results', function () { + expect(result.insertResults.get(20).insertedId).to.equal(1); + }); + + it('merges the upserted count', function () { + expect(result.upsertedCount).to.equal(1); + }); + + it('merges the matched count', function () { + expect(result.matchedCount).to.equal(1); + }); + + it('merges the modified count', function () { + expect(result.modifiedCount).to.equal(1); + }); + + it('sets the update results', function () { + expect(result.updateResults.get(21)).to.deep.equal({ + matchedCount: 1, + modifiedCount: 1, + didUpsert: false + }); + }); + + it('sets the upsert results', function () { + expect(result.updateResults.get(22)).to.deep.equal({ + matchedCount: 0, + modifiedCount: 0, + upsertedId: 1, + didUpsert: true + }); + }); + + it('merges the deleted count', function () { + expect(result.deletedCount).to.equal(1); + }); + + it('sets the delete results', function () { + expect(result.deleteResults.get(23).deletedCount).to.equal(1); + }); + }); + }); + }); + }); + }); +});