Skip to content

Commit

Permalink
feat(NODE-6329): client bulk write happy path (#4206)
Browse files Browse the repository at this point in the history
  • Loading branch information
durran authored Sep 25, 2024
1 parent 643a875 commit 3d3da40
Show file tree
Hide file tree
Showing 49 changed files with 2,875 additions and 81 deletions.
18 changes: 16 additions & 2 deletions src/cmap/command_monitoring_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import {
LEGACY_HELLO_COMMAND_CAMEL_CASE
} from '../constants';
import { calculateDurationInMs, deepCopy } from '../utils';
import { OpMsgRequest, type OpQueryRequest, type WriteProtocolMessageType } from './commands';
import {
DocumentSequence,
OpMsgRequest,
type OpQueryRequest,
type WriteProtocolMessageType
} from './commands';
import type { Connection } from './connection';

/**
Expand Down Expand Up @@ -249,7 +254,16 @@ const OP_QUERY_KEYS = [
/** Extract the actual command from the query, possibly up-converting if it's a legacy format */
function extractCommand(command: WriteProtocolMessageType): Document {
if (command instanceof OpMsgRequest) {
return deepCopy(command.command);
const cmd = deepCopy(command.command);
// For OP_MSG with payload type 1 we need to pull the documents
// array out of the document sequence for monitoring.
if (cmd.ops instanceof DocumentSequence) {
cmd.ops = cmd.ops.documents;
}
if (cmd.nsInfo instanceof DocumentSequence) {
cmd.nsInfo = cmd.nsInfo.documents;
}
return cmd;
}

if (command.query?.$query) {
Expand Down
8 changes: 4 additions & 4 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,10 @@ export class OpMsgRequest {
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + key.length);
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5.
encodeUTF8Into(buffer, key, 5);
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${key}\0`, 5);
chunks.push(buffer);
// Fourth part are the documents' bytes.
let docsLength = 0;
Expand All @@ -557,7 +557,7 @@ export class OpMsgRequest {
chunks.push(docBson);
}
// Second part of the sequence is the length at offset 1;
buffer.writeInt32LE(key.length + docsLength, 1);
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
Expand Down
26 changes: 26 additions & 0 deletions src/cmap/wire_protocol/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,29 @@ export class ExplainedCursorResponse extends CursorResponse {
return this.toObject(options);
}
}

/**
* Client bulk writes have some extra metadata at the top level that needs to be
* included in the result returned to the user.
*/
export class ClientBulkWriteCursorResponse extends CursorResponse {
get insertedCount() {
return this.get('nInserted', BSONType.int, true);
}

get upsertedCount() {
return this.get('nUpserted', BSONType.int, true);
}

get matchedCount() {
return this.get('nMatched', BSONType.int, true);
}

get modifiedCount() {
return this.get('nModified', BSONType.int, true);
}

get deletedCount() {
return this.get('nDeleted', BSONType.int, true);
}
}
73 changes: 73 additions & 0 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import type { Document } from '../bson';
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
import { MongoBulkWriteCursorError } from '../error';
import type { MongoClient } from '../mongo_client';
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
import { executeOperation } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
import { mergeOptions, MongoDBNamespace } from '../utils';
import {
AbstractCursor,
type AbstractCursorOptions,
type InitialCursorResponse
} from './abstract_cursor';

/** @public */
export interface ClientBulkWriteCursorOptions
extends Omit<AbstractCursorOptions, 'maxAwaitTimeMS' | 'tailable' | 'awaitData'>,
ClientBulkWriteOptions {}

/**
* This is the cursor that handles client bulk write operations. Note this is never
* exposed directly to the user and is always immediately exhausted.
* @internal
*/
export class ClientBulkWriteCursor extends AbstractCursor {
public readonly command: Document;
/** @internal */
private cursorResponse?: ClientBulkWriteCursorResponse;
/** @internal */
private clientBulkWriteOptions: ClientBulkWriteOptions;

/** @internal */
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
super(client, new MongoDBNamespace('admin', '$cmd'), options);

this.command = command;
this.clientBulkWriteOptions = options;
}

/**
* We need a way to get the top level cursor response fields for
* generating the bulk write result, so we expose this here.
*/
get response(): ClientBulkWriteCursorResponse {
if (this.cursorResponse) return this.cursorResponse;
throw new MongoBulkWriteCursorError(
'No client bulk write cursor response returned from the server.'
);
}

clone(): ClientBulkWriteCursor {
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
delete clonedOptions.session;
return new ClientBulkWriteCursor(this.client, this.command, {
...clonedOptions
});
}

/** @internal */
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
...this.clientBulkWriteOptions,
...this.cursorOptions,
session
});

const response = await executeOperation(this.client, clientBulkWriteOperation);
this.cursorResponse = response;

return { server: clientBulkWriteOperation.server, session, response };
}
}
27 changes: 27 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,33 @@ export class MongoGCPError extends MongoOIDCError {
}
}

/**
* An error indicating that an error occurred when processing bulk write results.
*
* @public
* @category Error
*/
export class MongoBulkWriteCursorError extends MongoRuntimeError {
/**
* **Do not use this constructor!**
*
* Meant for internal use only.
*
* @remarks
* This class is only meant to be constructed within the driver. This constructor is
* not subject to semantic versioning compatibility guarantees and may change at any time.
*
* @public
**/
constructor(message: string) {
super(message);
}

override get name(): string {
return 'MongoBulkWriteCursorError';
}
}

/**
* An error generated when a ChangeStream operation fails to execute.
*
Expand Down
16 changes: 16 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export {
MongoAWSError,
MongoAzureError,
MongoBatchReExecutionError,
MongoBulkWriteCursorError,
MongoChangeStreamError,
MongoCompatibilityError,
MongoCursorExhaustedError,
Expand Down Expand Up @@ -473,6 +474,21 @@ export type {
AggregateOptions,
DB_AGGREGATE_COLLECTION
} from './operations/aggregate';
export type {
AnyClientBulkWriteModel,
ClientBulkWriteOptions,
ClientBulkWriteResult,
ClientDeleteManyModel,
ClientDeleteOneModel,
ClientDeleteResult,
ClientInsertOneModel,
ClientInsertOneResult,
ClientReplaceOneModel,
ClientUpdateManyModel,
ClientUpdateOneModel,
ClientUpdateResult,
ClientWriteModel
} from './operations/client_bulk_write/common';
export type {
CollationOptions,
CommandOperation,
Expand Down
19 changes: 19 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import {
SeverityLevel
} from './mongo_logger';
import { TypedEventEmitter } from './mongo_types';
import {
type AnyClientBulkWriteModel,
type ClientBulkWriteOptions,
type ClientBulkWriteResult
} from './operations/client_bulk_write/common';
import { ClientBulkWriteExecutor } from './operations/client_bulk_write/executor';
import { executeOperation } from './operations/execute_operation';
import { RunAdminCommandOperation } from './operations/run_command';
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
Expand Down Expand Up @@ -477,6 +483,19 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
return this.s.bsonOptions;
}

/**
* Executes a client bulk write operation, available on server 8.0+.
* @param models - The client bulk write models.
* @param options - The client bulk write options.
* @returns A ClientBulkWriteResult for acknowledged writes and ok: 1 for unacknowledged writes.
*/
async bulkWrite(
models: AnyClientBulkWriteModel[],
options?: ClientBulkWriteOptions
): Promise<ClientBulkWriteResult | { ok: 1 }> {
return await new ClientBulkWriteExecutor(this, models, options).execute();
}

/**
* Connect to MongoDB using a url
*
Expand Down
45 changes: 45 additions & 0 deletions src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { type Document } from 'bson';

import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
import type { Server } from '../../sdam/server';
import type { ClientSession } from '../../sessions';
import { MongoDBNamespace } from '../../utils';
import { CommandOperation } from '../command';
import { Aspect, defineAspects } from '../operation';
import { type ClientBulkWriteOptions } from './common';

/**
* Executes a single client bulk write operation within a potential batch.
* @internal
*/
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
command: Document;
override options: ClientBulkWriteOptions;

override get commandName() {
return 'bulkWrite' as const;
}

constructor(command: Document, options: ClientBulkWriteOptions) {
super(undefined, options);
this.command = command;
this.options = options;
this.ns = new MongoDBNamespace('admin', '$cmd');
}

/**
* Execute the command. Superclass will handle write concern, etc.
* @param server - The server.
* @param session - The session.
* @returns The response.
*/
override async execute(
server: Server,
session: ClientSession | undefined
): Promise<ClientBulkWriteCursorResponse> {
return await super.executeCommand(server, session, this.command, ClientBulkWriteCursorResponse);
}
}

// Skipping the collation as it goes on the individual ops.
defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]);
Loading

0 comments on commit 3d3da40

Please sign in to comment.