Skip to content

Commit

Permalink
feat(NODE-6337): implement client bulk write batching
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Sep 24, 2024
1 parent c7fc4e2 commit fb4fc98
Show file tree
Hide file tree
Showing 9 changed files with 438 additions and 192 deletions.
77 changes: 60 additions & 17 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,67 @@ export interface OpMsgOptions {

/** @internal */
export class DocumentSequence {
field: string;
documents: Document[];
serializedDocumentsLength: number;
private chunks: Uint8Array[];
private header?: Buffer;

constructor(documents: Document[]) {
this.documents = documents;
/**
* Create a new document sequence for the provided field.
* @param field - The field it will replace.
*/
constructor(field: string, documents?: Document[]) {
this.field = field;
this.documents = [];
this.chunks = [];
this.serializedDocumentsLength = 0;
this.init();
if (documents) {
for (const doc of documents) {
this.push(doc, BSON.serialize(doc));
}
}
}

/**
* Initialize the buffer chunks.
*/
private init() {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${this.field}\0`, 5);
this.chunks.push(buffer);
this.header = buffer;
}

/**
* Push a document to the document sequence. Will serialize the document
* as well and return the current serialized length of all documents.
* @param document - The document to add.
* @param buffer - The serialized document in raw BSON.
* @returns The serialized documents length.
*/
push(document: Document, buffer: Uint8Array): number {
this.serializedDocumentsLength += buffer.length;
// Push the document.
this.documents.push(document);
// Push the document raw bson.
this.chunks.push(buffer);
// Write the new length.
this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
return this.serializedDocumentsLength;
}

/**
* Get the fully serialized bytes for the document sequence section.
* @returns The section bytes.
*/
toBin(): Uint8Array {
// TODO: What to do if no documents?
return Buffer.concat(this.chunks);
}
}

Expand Down Expand Up @@ -543,21 +600,7 @@ export class OpMsgRequest {
const chunks = [];
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${key}\0`, 5);
chunks.push(buffer);
// Fourth part are the documents' bytes.
let docsLength = 0;
for (const doc of value.documents) {
const docBson = this.serializeBson(doc);
docsLength += docBson.length;
chunks.push(docBson);
}
// Second part of the sequence is the length at offset 1;
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
chunks.push(value.toBin());
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
Expand Down
99 changes: 87 additions & 12 deletions src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type Document } from '../../bson';
import { BSON, type Document } from '../../bson';
import { DocumentSequence } from '../../cmap/commands';
import { type PkFactory } from '../../mongo_client';
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
Expand Down Expand Up @@ -28,6 +28,11 @@ export interface ClientBulkWriteCommand {
comment?: any;
}

/**
* The bytes overhead for the extra fields added post command generation.
*/
const MESSAGE_OVERHEAD_BYTES = 1000;

/** @internal */
export class ClientBulkWriteCommandBuilder {
models: AnyClientBulkWriteModel[];
Expand Down Expand Up @@ -62,32 +67,101 @@ export class ClientBulkWriteCommandBuilder {
/**
* Build the bulk write commands from the models.
*/
buildCommands(): ClientBulkWriteCommand[] {
buildCommands(maxMessageSizeBytes?: number): ClientBulkWriteCommand[] {
// If we don't know the maxMessageSizeBytes or for some reason it's 0
// then we cannot calculate the batch.
if (!maxMessageSizeBytes) {
throw new Error('');
}

// Iterate the models to build the ops and nsInfo fields.
const operations = [];
// We need to do this in a loop which creates one command each up
// to the max bson size or max message size.
const commands: ClientBulkWriteCommand[] = [];
let currentCommandLength = MESSAGE_OVERHEAD_BYTES;
let currentNamespaceIndex = 0;
let currentCommand: ClientBulkWriteCommand = this.baseCommand();
const namespaces = new Map<string, number>();
for (const model of this.models) {
for (const [modelIndex, model] of this.models.entries()) {
const ns = model.namespace;
const index = namespaces.get(ns);
if (index != null) {
operations.push(buildOperation(model, index, this.pkFactory));
// Pushing to the ops document sequence returns the bytes length added.
const operation = buildOperation(model, index, this.pkFactory);
const operationBuffer = BSON.serialize(operation);

// Check if the operation buffer can fit in the current command. If it can,
// then add the operation to the document sequence and increment the
// current length.
if (currentCommandLength + operationBuffer.length < maxMessageSizeBytes) {
// Pushing to the ops document sequence returns the bytes length added.
const opsLength = currentCommand.ops.push(operation, operationBuffer);
currentCommandLength += opsLength;

// If this is the last model in the array, push the current command.
if (modelIndex === this.models.length - 1) {
commands.push(currentCommand);
}
} else {
// We need to batch. Push the current command to the commands
// array and create a new current command if there are more models
// that need to be iterated.
commands.push(currentCommand);
if (modelIndex < this.models.length - 1) {
currentCommand = this.baseCommand();
}
}
} else {
namespaces.set(ns, currentNamespaceIndex);
operations.push(buildOperation(model, currentNamespaceIndex, this.pkFactory));
currentNamespaceIndex++;
const nsInfo = { ns: ns };
const nsInfoBuffer = BSON.serialize(nsInfo);
const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory);
const operationBuffer = BSON.serialize(operation);

// Check if the operation and nsInfo buffers can fit in the command. If they
// can, then add the operation and nsInfo to their respective document
// sequences and increment the current length.
if (
currentCommandLength + nsInfoBuffer.length + operationBuffer.length <
maxMessageSizeBytes
) {
// Pushing to the nsInfo document sequence returns the bytes length added.
const nsInfoLength = currentCommand.nsInfo.push(nsInfo, nsInfoBuffer);
currentCommandLength += nsInfoLength;

// Pushing to the ops document sequence returns the bytes length added.
const opsLength = currentCommand.ops.push(operation, operationBuffer);
currentCommandLength += opsLength;

// We've added a new namespace, increment the namespace index.
currentNamespaceIndex++;

// If this is the last model in the array, push the current command.
if (modelIndex === this.models.length - 1) {
commands.push(currentCommand);
}
} else {
// We need to batch. Push the current command to the commands
// array and create a new current command if there are more models
// that need to be iterated.
commands.push(currentCommand);
if (modelIndex < this.models.length - 1) {
currentCommand = this.baseCommand();
}
}
}
}

const nsInfo = Array.from(namespaces.keys(), ns => ({ ns }));
return commands;
}

// The base command.
private baseCommand(): ClientBulkWriteCommand {
const command: ClientBulkWriteCommand = {
bulkWrite: 1,
errorsOnly: this.errorsOnly,
ordered: this.options.ordered ?? true,
ops: new DocumentSequence(operations),
nsInfo: new DocumentSequence(nsInfo)
ops: new DocumentSequence('ops'),
nsInfo: new DocumentSequence('nsInfo')
};
// Add bypassDocumentValidation if it was present in the options.
if (this.options.bypassDocumentValidation != null) {
Expand All @@ -103,7 +177,8 @@ export class ClientBulkWriteCommandBuilder {
if (this.options.comment !== undefined) {
command.comment = this.options.comment;
}
return [command];

return command;
}
}

Expand Down
10 changes: 8 additions & 2 deletions src/operations/client_bulk_write/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ export class ClientBulkWriteExecutor {
this.options,
pkFactory
);
const commands = commandBuilder.buildCommands();
const commands = commandBuilder.buildCommands(
this.client.topology?.description.maxMessageSizeBytes
);
if (this.options.writeConcern?.w === 0) {
return await executeUnacknowledged(this.client, this.options, commands);
}
Expand All @@ -75,10 +77,14 @@ async function executeAcknowledged(
): Promise<ClientBulkWriteResult> {
const resultsMerger = new ClientBulkWriteResultsMerger(options);
// For each command will will create and exhaust a cursor for the results.
let currentBatchOffset = 0;
for (const command of commands) {
const cursor = new ClientBulkWriteCursor(client, command, options);
const docs = await cursor.toArray();
resultsMerger.merge(command.ops.documents, cursor.response, docs);
const operations = command.ops.documents;
resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs);
// Set the new batch index so we can back back to the index in the original models.
currentBatchOffset += operations.length;
}
return resultsMerger.result;
}
Expand Down
12 changes: 9 additions & 3 deletions src/operations/client_bulk_write/results_merger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ export class ClientBulkWriteResultsMerger {

/**
* Merge the results in the cursor to the existing result.
* @param currentBatchOffset - The offset index to the original models.
* @param response - The cursor response.
* @param documents - The documents in the cursor.
* @returns The current result.
*/
merge(
currentBatchOffset: number,
operations: Document[],
response: ClientBulkWriteCursorResponse,
documents: Document[]
Expand All @@ -67,7 +69,9 @@ export class ClientBulkWriteResultsMerger {
const operation = operations[document.idx];
// Handle insert results.
if ('insert' in operation) {
this.result.insertResults?.set(document.idx, { insertedId: operation.document._id });
this.result.insertResults?.set(document.idx + currentBatchOffset, {
insertedId: operation.document._id
});
}
// Handle update results.
if ('update' in operation) {
Expand All @@ -81,11 +85,13 @@ export class ClientBulkWriteResultsMerger {
if (document.upserted) {
result.upsertedId = document.upserted._id;
}
this.result.updateResults?.set(document.idx, result);
this.result.updateResults?.set(document.idx + currentBatchOffset, result);
}
// Handle delete results.
if ('delete' in operation) {
this.result.deleteResults?.set(document.idx, { deletedCount: document.n });
this.result.deleteResults?.set(document.idx + currentBatchOffset, {
deletedCount: document.n
});
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/sdam/server_description.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ export class ServerDescription {
setVersion: number | null;
electionId: ObjectId | null;
logicalSessionTimeoutMinutes: number | null;
/** The max message size in bytes for the server. */
maxMessageSizeBytes: number;

// NOTE: does this belong here? It seems we should gossip the cluster time at the CMAP level
$clusterTime?: ClusterTime;
Expand Down Expand Up @@ -111,6 +113,7 @@ export class ServerDescription {
this.setVersion = hello?.setVersion ?? null;
this.electionId = hello?.electionId ?? null;
this.logicalSessionTimeoutMinutes = hello?.logicalSessionTimeoutMinutes ?? null;
this.maxMessageSizeBytes = hello?.maxMessageSizeBytes ?? 0;
this.primary = hello?.primary ?? null;
this.me = hello?.me?.toLowerCase() ?? null;
this.$clusterTime = hello?.$clusterTime ?? null;
Expand Down
11 changes: 11 additions & 0 deletions src/sdam/topology_description.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export class TopologyDescription {
heartbeatFrequencyMS: number;
localThresholdMS: number;
commonWireVersion: number;
maxMessageSizeBytes?: number;

/**
* Create a TopologyDescription
Expand Down Expand Up @@ -71,6 +72,16 @@ export class TopologyDescription {

// determine server compatibility
for (const serverDescription of this.servers.values()) {
// Find the lowest maxMessageSizeBytes from all the servers.
if (this.maxMessageSizeBytes == null) {
this.maxMessageSizeBytes = serverDescription.maxMessageSizeBytes;
} else {
this.maxMessageSizeBytes = Math.min(
this.maxMessageSizeBytes,
serverDescription.maxMessageSizeBytes
);
}

// Load balancer mode is always compatible.
if (
serverDescription.type === ServerType.Unknown ||
Expand Down
6 changes: 3 additions & 3 deletions test/unit/cmap/commands.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ describe('commands', function () {
context('when there is one document sequence', function () {
const command = {
test: 1,
field: new DocumentSequence([{ test: 1 }])
field: new DocumentSequence('field', [{ test: 1 }])
};
const msg = new OpMsgRequest('admin', command, {});
const buffers = msg.toBin();
Expand Down Expand Up @@ -53,8 +53,8 @@ describe('commands', function () {
context('when there are multiple document sequences', function () {
const command = {
test: 1,
fieldOne: new DocumentSequence([{ test: 1 }]),
fieldTwo: new DocumentSequence([{ test: 1 }])
fieldOne: new DocumentSequence('fieldOne', [{ test: 1 }]),
fieldTwo: new DocumentSequence('fieldTwo', [{ test: 1 }])
};
const msg = new OpMsgRequest('admin', command, {});
const buffers = msg.toBin();
Expand Down
Loading

0 comments on commit fb4fc98

Please sign in to comment.