Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6337): implement client bulk write batching #4242

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/cmap/command_monitoring_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import {
LEGACY_HELLO_COMMAND_CAMEL_CASE
} from '../constants';
import { calculateDurationInMs, deepCopy } from '../utils';
import { OpMsgRequest, type OpQueryRequest, type WriteProtocolMessageType } from './commands';
import {
DocumentSequence,
OpMsgRequest,
type OpQueryRequest,
type WriteProtocolMessageType
} from './commands';
import type { Connection } from './connection';

/**
Expand Down Expand Up @@ -249,7 +254,16 @@ const OP_QUERY_KEYS = [
/** Extract the actual command from the query, possibly up-converting if it's a legacy format */
function extractCommand(command: WriteProtocolMessageType): Document {
if (command instanceof OpMsgRequest) {
return deepCopy(command.command);
const cmd = deepCopy(command.command);
// For OP_MSG with payload type 1 we need to pull the documents
// array out of the document sequence for monitoring.
if (cmd.ops instanceof DocumentSequence) {
cmd.ops = cmd.ops.documents;
}
if (cmd.nsInfo instanceof DocumentSequence) {
cmd.nsInfo = cmd.nsInfo.documents;
}
return cmd;
}

if (command.query?.$query) {
Expand Down
76 changes: 59 additions & 17 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,66 @@ export interface OpMsgOptions {

/** @internal */
export class DocumentSequence {
field: string;
documents: Document[];
serializedDocumentsLength: number;
private chunks: Uint8Array[];
private header?: Buffer;

constructor(documents: Document[]) {
this.documents = documents;
/**
* Create a new document sequence for the provided field.
* @param field - The field it will replace.
*/
constructor(field: string, documents?: Document[]) {
this.field = field;
this.documents = [];
this.chunks = [];
this.serializedDocumentsLength = 0;
this.init();
if (documents) {
for (const doc of documents) {
this.push(doc, BSON.serialize(doc));
}
}
}

/**
* Initialize the buffer chunks.
*/
private init() {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${this.field}\0`, 5);
this.chunks.push(buffer);
this.header = buffer;
}

/**
* Push a document to the document sequence. Will serialize the document
* as well and return the current serialized length of all documents.
* @param document - The document to add.
* @param buffer - The serialized document in raw BSON.
* @returns The new totoal document sequence length.
*/
push(document: Document, buffer: Uint8Array): number {
this.serializedDocumentsLength += buffer.length;
// Push the document.
this.documents.push(document);
// Push the document raw bson.
this.chunks.push(buffer);
// Write the new length.
this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
return this.serializedDocumentsLength + (this.header?.length ?? 0);
}

/**
* Get the fully serialized bytes for the document sequence section.
* @returns The section bytes.
*/
toBin(): Uint8Array {
return Buffer.concat(this.chunks);
}
}

Expand Down Expand Up @@ -543,21 +599,7 @@ export class OpMsgRequest {
const chunks = [];
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + key.length);
buffer[0] = 1;
// Third part is the field name at offset 5.
encodeUTF8Into(buffer, key, 5);
chunks.push(buffer);
// Fourth part are the documents' bytes.
let docsLength = 0;
for (const doc of value.documents) {
const docBson = this.serializeBson(doc);
docsLength += docBson.length;
chunks.push(docBson);
}
// Second part of the sequence is the length at offset 1;
buffer.writeInt32LE(key.length + docsLength, 1);
chunks.push(value.toBin());
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
Expand Down
26 changes: 26 additions & 0 deletions src/cmap/wire_protocol/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,29 @@ export class ExplainedCursorResponse extends CursorResponse {
return this.toObject(options);
}
}

/**
* Client bulk writes have some extra metadata at the top level that needs to be
* included in the result returned to the user.
*/
export class ClientBulkWriteCursorResponse extends CursorResponse {
get insertedCount() {
return this.get('nInserted', BSONType.int, true);
}

get upsertedCount() {
return this.get('nUpserted', BSONType.int, true);
}

get matchedCount() {
return this.get('nMatched', BSONType.int, true);
}

get modifiedCount() {
return this.get('nModified', BSONType.int, true);
}

get deletedCount() {
return this.get('nDeleted', BSONType.int, true);
}
}
73 changes: 73 additions & 0 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import type { Document } from '../bson';
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
import { MongoClientBulkWriteCursorError } from '../error';
import type { MongoClient } from '../mongo_client';
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
import { executeOperation } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
import { mergeOptions, MongoDBNamespace } from '../utils';
import {
AbstractCursor,
type AbstractCursorOptions,
type InitialCursorResponse
} from './abstract_cursor';

/** @public */
export interface ClientBulkWriteCursorOptions
extends Omit<AbstractCursorOptions, 'maxAwaitTimeMS' | 'tailable' | 'awaitData'>,
ClientBulkWriteOptions {}

/**
* This is the cursor that handles client bulk write operations. Note this is never
* exposed directly to the user and is always immediately exhausted.
* @internal
*/
export class ClientBulkWriteCursor extends AbstractCursor {
public readonly command: Document;
/** @internal */
private cursorResponse?: ClientBulkWriteCursorResponse;
/** @internal */
private clientBulkWriteOptions: ClientBulkWriteOptions;

/** @internal */
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
super(client, new MongoDBNamespace('admin', '$cmd'), options);

this.command = command;
this.clientBulkWriteOptions = options;
}

/**
* We need a way to get the top level cursor response fields for
* generating the bulk write result, so we expose this here.
*/
get response(): ClientBulkWriteCursorResponse {
if (this.cursorResponse) return this.cursorResponse;
throw new MongoClientBulkWriteCursorError(
'No client bulk write cursor response returned from the server.'
);
}

clone(): ClientBulkWriteCursor {
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
delete clonedOptions.session;
return new ClientBulkWriteCursor(this.client, this.command, {
...clonedOptions
});
}

/** @internal */
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
...this.clientBulkWriteOptions,
...this.cursorOptions,
session
});

const response = await executeOperation(this.client, clientBulkWriteOperation);
this.cursorResponse = response;

return { server: clientBulkWriteOperation.server, session, response };
}
}
54 changes: 54 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,60 @@ export class MongoGCPError extends MongoOIDCError {
}
}

/**
* An error indicating that an error occurred when processing bulk write results.
*
* @public
* @category Error
*/
export class MongoClientBulkWriteCursorError extends MongoRuntimeError {
/**
* **Do not use this constructor!**
*
* Meant for internal use only.
*
* @remarks
* This class is only meant to be constructed within the driver. This constructor is
* not subject to semantic versioning compatibility guarantees and may change at any time.
*
* @public
**/
constructor(message: string) {
super(message);
}

override get name(): string {
return 'MongoClientBulkWriteCursorError';
}
}

/**
* An error indicating that an error occurred on the client when executing a client bulk write.
*
* @public
* @category Error
*/
export class MongoClientBulkWriteExecutionError extends MongoRuntimeError {
/**
* **Do not use this constructor!**
*
* Meant for internal use only.
*
* @remarks
* This class is only meant to be constructed within the driver. This constructor is
* not subject to semantic versioning compatibility guarantees and may change at any time.
*
* @public
**/
constructor(message: string) {
super(message);
}

override get name(): string {
return 'MongoClientBulkWriteExecutionError';
}
}

/**
* An error generated when a ChangeStream operation fails to execute.
*
Expand Down
17 changes: 17 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ export {
MongoAzureError,
MongoBatchReExecutionError,
MongoChangeStreamError,
MongoClientBulkWriteCursorError,
MongoClientBulkWriteExecutionError,
MongoCompatibilityError,
MongoCursorExhaustedError,
MongoCursorInUseError,
Expand Down Expand Up @@ -473,6 +475,21 @@ export type {
AggregateOptions,
DB_AGGREGATE_COLLECTION
} from './operations/aggregate';
export type {
AnyClientBulkWriteModel,
ClientBulkWriteOptions,
ClientBulkWriteResult,
ClientDeleteManyModel,
ClientDeleteOneModel,
ClientDeleteResult,
ClientInsertOneModel,
ClientInsertOneResult,
ClientReplaceOneModel,
ClientUpdateManyModel,
ClientUpdateOneModel,
ClientUpdateResult,
ClientWriteModel
} from './operations/client_bulk_write/common';
export type {
CollationOptions,
CommandOperation,
Expand Down
19 changes: 19 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import {
SeverityLevel
} from './mongo_logger';
import { TypedEventEmitter } from './mongo_types';
import {
type AnyClientBulkWriteModel,
type ClientBulkWriteOptions,
type ClientBulkWriteResult
} from './operations/client_bulk_write/common';
import { ClientBulkWriteExecutor } from './operations/client_bulk_write/executor';
import { executeOperation } from './operations/execute_operation';
import { RunAdminCommandOperation } from './operations/run_command';
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
Expand Down Expand Up @@ -477,6 +483,19 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
return this.s.bsonOptions;
}

/**
* Executes a client bulk write operation, available on server 8.0+.
* @param models - The client bulk write models.
* @param options - The client bulk write options.
* @returns A ClientBulkWriteResult for acknowledged writes and ok: 1 for unacknowledged writes.
*/
async bulkWrite(
models: AnyClientBulkWriteModel[],
options?: ClientBulkWriteOptions
): Promise<ClientBulkWriteResult | { ok: 1 }> {
return await new ClientBulkWriteExecutor(this, models, options).execute();
}

/**
* Connect to MongoDB using a url
*
Expand Down
45 changes: 45 additions & 0 deletions src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { type Document } from 'bson';

import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
import type { Server } from '../../sdam/server';
import type { ClientSession } from '../../sessions';
import { MongoDBNamespace } from '../../utils';
import { CommandOperation } from '../command';
import { Aspect, defineAspects } from '../operation';
import { type ClientBulkWriteOptions } from './common';

/**
* Executes a single client bulk write operation within a potential batch.
* @internal
*/
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
command: Document;
override options: ClientBulkWriteOptions;

override get commandName() {
return 'bulkWrite' as const;
}

constructor(command: Document, options: ClientBulkWriteOptions) {
super(undefined, options);
this.command = command;
this.options = options;
this.ns = new MongoDBNamespace('admin', '$cmd');
}

/**
* Execute the command. Superclass will handle write concern, etc.
* @param server - The server.
* @param session - The session.
* @returns The response.
*/
override async execute(
server: Server,
session: ClientSession | undefined
): Promise<ClientBulkWriteCursorResponse> {
return await super.executeCommand(server, session, this.command, ClientBulkWriteCursorResponse);
}
}

// Skipping the collation as it goes on the individual ops.
defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]);
Loading