From fb4fc9832d8043cadb35f5df0fdcd5e5fa7ffa36 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Thu, 19 Sep 2024 16:00:55 +0200 Subject: [PATCH] feat(NODE-6337): implement client bulk write batching --- src/cmap/commands.ts | 77 +++- .../client_bulk_write/command_builder.ts | 99 ++++- src/operations/client_bulk_write/executor.ts | 10 +- .../client_bulk_write/results_merger.ts | 12 +- src/sdam/server_description.ts | 3 + src/sdam/topology_description.ts | 11 + test/unit/cmap/commands.test.ts | 6 +- .../client_bulk_write/command_builder.test.ts | 10 +- .../client_bulk_write/results_merger.test.ts | 402 +++++++++++------- 9 files changed, 438 insertions(+), 192 deletions(-) diff --git a/src/cmap/commands.ts b/src/cmap/commands.ts index 9322fc53414..21e443a1592 100644 --- a/src/cmap/commands.ts +++ b/src/cmap/commands.ts @@ -429,10 +429,67 @@ export interface OpMsgOptions { /** @internal */ export class DocumentSequence { + field: string; documents: Document[]; + serializedDocumentsLength: number; + private chunks: Uint8Array[]; + private header?: Buffer; - constructor(documents: Document[]) { - this.documents = documents; + /** + * Create a new document sequence for the provided field. + * @param field - The field it will replace. + */ + constructor(field: string, documents?: Document[]) { + this.field = field; + this.documents = []; + this.chunks = []; + this.serializedDocumentsLength = 0; + this.init(); + if (documents) { + for (const doc of documents) { + this.push(doc, BSON.serialize(doc)); + } + } + } + + /** + * Initialize the buffer chunks. + */ + private init() { + // Document sequences starts with type 1 at the first byte. + const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1); + buffer[0] = 1; + // Third part is the field name at offset 5 with trailing null byte. + encodeUTF8Into(buffer, `${this.field}\0`, 5); + this.chunks.push(buffer); + this.header = buffer; + } + + /** + * Push a document to the document sequence. Will serialize the document + * as well and return the current serialized length of all documents. + * @param document - The document to add. + * @param buffer - The serialized document in raw BSON. + * @returns The serialized documents length. + */ + push(document: Document, buffer: Uint8Array): number { + this.serializedDocumentsLength += buffer.length; + // Push the document. + this.documents.push(document); + // Push the document raw bson. + this.chunks.push(buffer); + // Write the new length. + this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1); + return this.serializedDocumentsLength; + } + + /** + * Get the fully serialized bytes for the document sequence section. + * @returns The section bytes. + */ + toBin(): Uint8Array { + // TODO: What to do if no documents? + return Buffer.concat(this.chunks); } } @@ -543,21 +600,7 @@ export class OpMsgRequest { const chunks = []; for (const [key, value] of Object.entries(document)) { if (value instanceof DocumentSequence) { - // Document sequences starts with type 1 at the first byte. - const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1); - buffer[0] = 1; - // Third part is the field name at offset 5 with trailing null byte. - encodeUTF8Into(buffer, `${key}\0`, 5); - chunks.push(buffer); - // Fourth part are the documents' bytes. - let docsLength = 0; - for (const doc of value.documents) { - const docBson = this.serializeBson(doc); - docsLength += docBson.length; - chunks.push(docBson); - } - // Second part of the sequence is the length at offset 1; - buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1); + chunks.push(value.toBin()); // Why are we removing the field from the command? This is because it needs to be // removed in the OP_MSG request first section, and DocumentSequence is not a // BSON type and is specific to the MongoDB wire protocol so there's nothing diff --git a/src/operations/client_bulk_write/command_builder.ts b/src/operations/client_bulk_write/command_builder.ts index ad7ab953605..46efac777c4 100644 --- a/src/operations/client_bulk_write/command_builder.ts +++ b/src/operations/client_bulk_write/command_builder.ts @@ -1,4 +1,4 @@ -import { type Document } from '../../bson'; +import { BSON, type Document } from '../../bson'; import { DocumentSequence } from '../../cmap/commands'; import { type PkFactory } from '../../mongo_client'; import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types'; @@ -28,6 +28,11 @@ export interface ClientBulkWriteCommand { comment?: any; } +/** + * The bytes overhead for the extra fields added post command generation. + */ +const MESSAGE_OVERHEAD_BYTES = 1000; + /** @internal */ export class ClientBulkWriteCommandBuilder { models: AnyClientBulkWriteModel[]; @@ -62,32 +67,101 @@ export class ClientBulkWriteCommandBuilder { /** * Build the bulk write commands from the models. */ - buildCommands(): ClientBulkWriteCommand[] { + buildCommands(maxMessageSizeBytes?: number): ClientBulkWriteCommand[] { + // If we don't know the maxMessageSizeBytes or for some reason it's 0 + // then we cannot calculate the batch. + if (!maxMessageSizeBytes) { + throw new Error(''); + } + // Iterate the models to build the ops and nsInfo fields. - const operations = []; + // We need to do this in a loop which creates one command each up + // to the max bson size or max message size. + const commands: ClientBulkWriteCommand[] = []; + let currentCommandLength = MESSAGE_OVERHEAD_BYTES; let currentNamespaceIndex = 0; + let currentCommand: ClientBulkWriteCommand = this.baseCommand(); const namespaces = new Map(); - for (const model of this.models) { + for (const [modelIndex, model] of this.models.entries()) { const ns = model.namespace; const index = namespaces.get(ns); if (index != null) { - operations.push(buildOperation(model, index, this.pkFactory)); + // Pushing to the ops document sequence returns the bytes length added. + const operation = buildOperation(model, index, this.pkFactory); + const operationBuffer = BSON.serialize(operation); + + // Check if the operation buffer can fit in the current command. If it can, + // then add the operation to the document sequence and increment the + // current length. + if (currentCommandLength + operationBuffer.length < maxMessageSizeBytes) { + // Pushing to the ops document sequence returns the bytes length added. + const opsLength = currentCommand.ops.push(operation, operationBuffer); + currentCommandLength += opsLength; + + // If this is the last model in the array, push the current command. + if (modelIndex === this.models.length - 1) { + commands.push(currentCommand); + } + } else { + // We need to batch. Push the current command to the commands + // array and create a new current command if there are more models + // that need to be iterated. + commands.push(currentCommand); + if (modelIndex < this.models.length - 1) { + currentCommand = this.baseCommand(); + } + } } else { namespaces.set(ns, currentNamespaceIndex); - operations.push(buildOperation(model, currentNamespaceIndex, this.pkFactory)); - currentNamespaceIndex++; + const nsInfo = { ns: ns }; + const nsInfoBuffer = BSON.serialize(nsInfo); + const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory); + const operationBuffer = BSON.serialize(operation); + + // Check if the operation and nsInfo buffers can fit in the command. If they + // can, then add the operation and nsInfo to their respective document + // sequences and increment the current length. + if ( + currentCommandLength + nsInfoBuffer.length + operationBuffer.length < + maxMessageSizeBytes + ) { + // Pushing to the nsInfo document sequence returns the bytes length added. + const nsInfoLength = currentCommand.nsInfo.push(nsInfo, nsInfoBuffer); + currentCommandLength += nsInfoLength; + + // Pushing to the ops document sequence returns the bytes length added. + const opsLength = currentCommand.ops.push(operation, operationBuffer); + currentCommandLength += opsLength; + + // We've added a new namespace, increment the namespace index. + currentNamespaceIndex++; + + // If this is the last model in the array, push the current command. + if (modelIndex === this.models.length - 1) { + commands.push(currentCommand); + } + } else { + // We need to batch. Push the current command to the commands + // array and create a new current command if there are more models + // that need to be iterated. + commands.push(currentCommand); + if (modelIndex < this.models.length - 1) { + currentCommand = this.baseCommand(); + } + } } } - const nsInfo = Array.from(namespaces.keys(), ns => ({ ns })); + return commands; + } - // The base command. + private baseCommand(): ClientBulkWriteCommand { const command: ClientBulkWriteCommand = { bulkWrite: 1, errorsOnly: this.errorsOnly, ordered: this.options.ordered ?? true, - ops: new DocumentSequence(operations), - nsInfo: new DocumentSequence(nsInfo) + ops: new DocumentSequence('ops'), + nsInfo: new DocumentSequence('nsInfo') }; // Add bypassDocumentValidation if it was present in the options. if (this.options.bypassDocumentValidation != null) { @@ -103,7 +177,8 @@ export class ClientBulkWriteCommandBuilder { if (this.options.comment !== undefined) { command.comment = this.options.comment; } - return [command]; + + return command; } } diff --git a/src/operations/client_bulk_write/executor.ts b/src/operations/client_bulk_write/executor.ts index 74511ede9dd..a312bbf4390 100644 --- a/src/operations/client_bulk_write/executor.ts +++ b/src/operations/client_bulk_write/executor.ts @@ -57,7 +57,9 @@ export class ClientBulkWriteExecutor { this.options, pkFactory ); - const commands = commandBuilder.buildCommands(); + const commands = commandBuilder.buildCommands( + this.client.topology?.description.maxMessageSizeBytes + ); if (this.options.writeConcern?.w === 0) { return await executeUnacknowledged(this.client, this.options, commands); } @@ -75,10 +77,14 @@ async function executeAcknowledged( ): Promise { const resultsMerger = new ClientBulkWriteResultsMerger(options); // For each command will will create and exhaust a cursor for the results. + let currentBatchOffset = 0; for (const command of commands) { const cursor = new ClientBulkWriteCursor(client, command, options); const docs = await cursor.toArray(); - resultsMerger.merge(command.ops.documents, cursor.response, docs); + const operations = command.ops.documents; + resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs); + // Set the new batch index so we can back back to the index in the original models. + currentBatchOffset += operations.length; } return resultsMerger.result; } diff --git a/src/operations/client_bulk_write/results_merger.ts b/src/operations/client_bulk_write/results_merger.ts index f5fd0491516..c09c7d25830 100644 --- a/src/operations/client_bulk_write/results_merger.ts +++ b/src/operations/client_bulk_write/results_merger.ts @@ -42,11 +42,13 @@ export class ClientBulkWriteResultsMerger { /** * Merge the results in the cursor to the existing result. + * @param currentBatchOffset - The offset index to the original models. * @param response - The cursor response. * @param documents - The documents in the cursor. * @returns The current result. */ merge( + currentBatchOffset: number, operations: Document[], response: ClientBulkWriteCursorResponse, documents: Document[] @@ -67,7 +69,9 @@ export class ClientBulkWriteResultsMerger { const operation = operations[document.idx]; // Handle insert results. if ('insert' in operation) { - this.result.insertResults?.set(document.idx, { insertedId: operation.document._id }); + this.result.insertResults?.set(document.idx + currentBatchOffset, { + insertedId: operation.document._id + }); } // Handle update results. if ('update' in operation) { @@ -81,11 +85,13 @@ export class ClientBulkWriteResultsMerger { if (document.upserted) { result.upsertedId = document.upserted._id; } - this.result.updateResults?.set(document.idx, result); + this.result.updateResults?.set(document.idx + currentBatchOffset, result); } // Handle delete results. if ('delete' in operation) { - this.result.deleteResults?.set(document.idx, { deletedCount: document.n }); + this.result.deleteResults?.set(document.idx + currentBatchOffset, { + deletedCount: document.n + }); } } } diff --git a/src/sdam/server_description.ts b/src/sdam/server_description.ts index cd32f4968b6..1991454ed0e 100644 --- a/src/sdam/server_description.ts +++ b/src/sdam/server_description.ts @@ -69,6 +69,8 @@ export class ServerDescription { setVersion: number | null; electionId: ObjectId | null; logicalSessionTimeoutMinutes: number | null; + /** The max message size in bytes for the server. */ + maxMessageSizeBytes: number; // NOTE: does this belong here? It seems we should gossip the cluster time at the CMAP level $clusterTime?: ClusterTime; @@ -111,6 +113,7 @@ export class ServerDescription { this.setVersion = hello?.setVersion ?? null; this.electionId = hello?.electionId ?? null; this.logicalSessionTimeoutMinutes = hello?.logicalSessionTimeoutMinutes ?? null; + this.maxMessageSizeBytes = hello?.maxMessageSizeBytes ?? 0; this.primary = hello?.primary ?? null; this.me = hello?.me?.toLowerCase() ?? null; this.$clusterTime = hello?.$clusterTime ?? null; diff --git a/src/sdam/topology_description.ts b/src/sdam/topology_description.ts index 436321c7f1a..e31f0942c47 100644 --- a/src/sdam/topology_description.ts +++ b/src/sdam/topology_description.ts @@ -43,6 +43,7 @@ export class TopologyDescription { heartbeatFrequencyMS: number; localThresholdMS: number; commonWireVersion: number; + maxMessageSizeBytes?: number; /** * Create a TopologyDescription @@ -71,6 +72,16 @@ export class TopologyDescription { // determine server compatibility for (const serverDescription of this.servers.values()) { + // Find the lowest maxMessageSizeBytes from all the servers. + if (this.maxMessageSizeBytes == null) { + this.maxMessageSizeBytes = serverDescription.maxMessageSizeBytes; + } else { + this.maxMessageSizeBytes = Math.min( + this.maxMessageSizeBytes, + serverDescription.maxMessageSizeBytes + ); + } + // Load balancer mode is always compatible. if ( serverDescription.type === ServerType.Unknown || diff --git a/test/unit/cmap/commands.test.ts b/test/unit/cmap/commands.test.ts index f4b3fdf0252..5725f5b2490 100644 --- a/test/unit/cmap/commands.test.ts +++ b/test/unit/cmap/commands.test.ts @@ -15,7 +15,7 @@ describe('commands', function () { context('when there is one document sequence', function () { const command = { test: 1, - field: new DocumentSequence([{ test: 1 }]) + field: new DocumentSequence('field', [{ test: 1 }]) }; const msg = new OpMsgRequest('admin', command, {}); const buffers = msg.toBin(); @@ -53,8 +53,8 @@ describe('commands', function () { context('when there are multiple document sequences', function () { const command = { test: 1, - fieldOne: new DocumentSequence([{ test: 1 }]), - fieldTwo: new DocumentSequence([{ test: 1 }]) + fieldOne: new DocumentSequence('fieldOne', [{ test: 1 }]), + fieldTwo: new DocumentSequence('fieldTwo', [{ test: 1 }]) }; const msg = new OpMsgRequest('admin', command, {}); const buffers = msg.toBin(); diff --git a/test/unit/operations/client_bulk_write/command_builder.test.ts b/test/unit/operations/client_bulk_write/command_builder.test.ts index 6b34ef9a817..9e5ba536574 100644 --- a/test/unit/operations/client_bulk_write/command_builder.test.ts +++ b/test/unit/operations/client_bulk_write/command_builder.test.ts @@ -34,7 +34,7 @@ describe('ClientBulkWriteCommandBuilder', function () { ordered: false, comment: { bulk: 'write' } }); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); @@ -79,7 +79,7 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: id, name: 1 } }; const builder = new ClientBulkWriteCommandBuilder([model], {}); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); @@ -122,7 +122,7 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); @@ -156,7 +156,7 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idTwo, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo], {}); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); @@ -199,7 +199,7 @@ describe('ClientBulkWriteCommandBuilder', function () { document: { _id: idThree, name: 2 } }; const builder = new ClientBulkWriteCommandBuilder([modelOne, modelTwo, modelThree], {}); - const commands = builder.buildCommands(); + const commands = builder.buildCommands(48000000); it('sets the bulkWrite command', function () { expect(commands[0].bulkWrite).to.equal(1); diff --git a/test/unit/operations/client_bulk_write/results_merger.test.ts b/test/unit/operations/client_bulk_write/results_merger.test.ts index ec43843af65..44a3b34381a 100644 --- a/test/unit/operations/client_bulk_write/results_merger.test.ts +++ b/test/unit/operations/client_bulk_write/results_merger.test.ts @@ -28,180 +28,282 @@ describe('ClientBulkWriteResultsMerger', function () { describe('#merge', function () { context('when the bulk write is acknowledged', function () { - context('when requesting verbose results', function () { - // An example verbose response from the server without errors: - // { - // cursor: { - // id: Long('0'), - // firstBatch: [ { ok: 1, idx: 0, n: 1 }, { ok: 1, idx: 1, n: 1 } ], - // ns: 'admin.$cmd.bulkWrite' - // }, - // nErrors: 0, - // nInserted: 2, - // nMatched: 0, - // nModified: 0, - // nUpserted: 0, - // nDeleted: 0, - // ok: 1 - // } - context('when there are no errors', function () { - const operations = [ - { insert: 0, document: { _id: 1 } }, - { update: 0 }, - { update: 0 }, - { delete: 0 } - ]; - const documents = [ - { ok: 1, idx: 0, n: 1 }, // Insert - { ok: 1, idx: 1, n: 1, nModified: 1 }, // Update match - { ok: 1, idx: 2, n: 0, upserted: { _id: 1 } }, // Update no match with upsert - { ok: 1, idx: 3, n: 1 } // Delete - ]; - const serverResponse = { - cursor: { - id: new Long('0'), - firstBatch: documents, - ns: 'admin.$cmd.bulkWrite' - }, - nErrors: 0, - nInserted: 1, - nMatched: 1, - nModified: 1, - nUpserted: 1, - nDeleted: 1, - ok: 1 - }; - const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); - const merger = new ClientBulkWriteResultsMerger({ verboseResults: true }); - let result: ClientBulkWriteResult; - - before(function () { - result = merger.merge(operations, response, documents); - }); + context('when merging on the first batch', function () { + context('when requesting verbose results', function () { + // An example verbose response from the server without errors: + // { + // cursor: { + // id: Long('0'), + // firstBatch: [ { ok: 1, idx: 0, n: 1 }, { ok: 1, idx: 1, n: 1 } ], + // ns: 'admin.$cmd.bulkWrite' + // }, + // nErrors: 0, + // nInserted: 2, + // nMatched: 0, + // nModified: 0, + // nUpserted: 0, + // nDeleted: 0, + // ok: 1 + // } + context('when there are no errors', function () { + const operations = [ + { insert: 0, document: { _id: 1 } }, + { update: 0 }, + { update: 0 }, + { delete: 0 } + ]; + const documents = [ + { ok: 1, idx: 0, n: 1 }, // Insert + { ok: 1, idx: 1, n: 1, nModified: 1 }, // Update match + { ok: 1, idx: 2, n: 0, upserted: { _id: 1 } }, // Update no match with upsert + { ok: 1, idx: 3, n: 1 } // Delete + ]; + const serverResponse = { + cursor: { + id: new Long('0'), + firstBatch: documents, + ns: 'admin.$cmd.bulkWrite' + }, + nErrors: 0, + nInserted: 1, + nMatched: 1, + nModified: 1, + nUpserted: 1, + nDeleted: 1, + ok: 1 + }; + const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); + const merger = new ClientBulkWriteResultsMerger({ verboseResults: true }); + let result: ClientBulkWriteResult; - it('merges the inserted count', function () { - expect(result.insertedCount).to.equal(1); - }); + before(function () { + result = merger.merge(0, operations, response, documents); + }); - it('sets insert results', function () { - expect(result.insertResults.get(0).insertedId).to.equal(1); - }); + it('merges the inserted count', function () { + expect(result.insertedCount).to.equal(1); + }); - it('merges the upserted count', function () { - expect(result.upsertedCount).to.equal(1); - }); + it('sets insert results', function () { + expect(result.insertResults.get(0).insertedId).to.equal(1); + }); - it('merges the matched count', function () { - expect(result.matchedCount).to.equal(1); - }); + it('merges the upserted count', function () { + expect(result.upsertedCount).to.equal(1); + }); - it('merges the modified count', function () { - expect(result.modifiedCount).to.equal(1); - }); + it('merges the matched count', function () { + expect(result.matchedCount).to.equal(1); + }); - it('sets the update results', function () { - expect(result.updateResults.get(1)).to.deep.equal({ - matchedCount: 1, - modifiedCount: 1, - didUpsert: false + it('merges the modified count', function () { + expect(result.modifiedCount).to.equal(1); }); - }); - it('sets the upsert results', function () { - expect(result.updateResults.get(2)).to.deep.equal({ - matchedCount: 0, - modifiedCount: 0, - upsertedId: 1, - didUpsert: true + it('sets the update results', function () { + expect(result.updateResults.get(1)).to.deep.equal({ + matchedCount: 1, + modifiedCount: 1, + didUpsert: false + }); + }); + + it('sets the upsert results', function () { + expect(result.updateResults.get(2)).to.deep.equal({ + matchedCount: 0, + modifiedCount: 0, + upsertedId: 1, + didUpsert: true + }); }); - }); - it('merges the deleted count', function () { - expect(result.deletedCount).to.equal(1); + it('merges the deleted count', function () { + expect(result.deletedCount).to.equal(1); + }); + + it('sets the delete results', function () { + expect(result.deleteResults.get(3).deletedCount).to.equal(1); + }); }); + }); + + context('when not requesting verbose results', function () { + // An example verbose response from the server without errors: + // { + // cursor: { + // id: Long('0'), + // firstBatch: [], + // ns: 'admin.$cmd.bulkWrite' + // }, + // nErrors: 0, + // nInserted: 2, + // nMatched: 0, + // nModified: 0, + // nUpserted: 0, + // nDeleted: 0, + // ok: 1 + // } + context('when there are no errors', function () { + const operations = [ + { insert: 0, document: { _id: 1 } }, + { update: 0 }, + { update: 0 }, + { delete: 0 } + ]; + const documents = []; + const serverResponse = { + cursor: { + id: new Long('0'), + firstBatch: documents, + ns: 'admin.$cmd.bulkWrite' + }, + nErrors: 0, + nInserted: 1, + nMatched: 1, + nModified: 1, + nUpserted: 1, + nDeleted: 1, + ok: 1 + }; + const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); + const merger = new ClientBulkWriteResultsMerger({ verboseResults: false }); + let result: ClientBulkWriteResult; + + before(function () { + result = merger.merge(0, operations, response, documents); + }); - it('sets the delete results', function () { - expect(result.deleteResults.get(3).deletedCount).to.equal(1); + it('merges the inserted count', function () { + expect(result.insertedCount).to.equal(1); + }); + + it('sets no insert results', function () { + expect(result).to.not.have.property('insertResults'); + }); + + it('merges the upserted count', function () { + expect(result.upsertedCount).to.equal(1); + }); + + it('merges the matched count', function () { + expect(result.matchedCount).to.equal(1); + }); + + it('merges the modified count', function () { + expect(result.modifiedCount).to.equal(1); + }); + + it('sets no update results', function () { + expect(result).to.not.have.property('updateResults'); + }); + + it('merges the deleted count', function () { + expect(result.deletedCount).to.equal(1); + }); + + it('sets no delete results', function () { + expect(result).to.not.have.property('deleteResults'); + }); }); }); }); - context('when not requesting verbose results', function () { - // An example verbose response from the server without errors: - // { - // cursor: { - // id: Long('0'), - // firstBatch: [], - // ns: 'admin.$cmd.bulkWrite' - // }, - // nErrors: 0, - // nInserted: 2, - // nMatched: 0, - // nModified: 0, - // nUpserted: 0, - // nDeleted: 0, - // ok: 1 - // } - context('when there are no errors', function () { - const operations = [ - { insert: 0, document: { _id: 1 } }, - { update: 0 }, - { update: 0 }, - { delete: 0 } - ]; - const documents = []; - const serverResponse = { - cursor: { - id: new Long('0'), - firstBatch: documents, - ns: 'admin.$cmd.bulkWrite' - }, - nErrors: 0, - nInserted: 1, - nMatched: 1, - nModified: 1, - nUpserted: 1, - nDeleted: 1, - ok: 1 - }; - const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); - const merger = new ClientBulkWriteResultsMerger({ verboseResults: false }); - let result: ClientBulkWriteResult; - - before(function () { - result = merger.merge(operations, response, documents); - }); + context('when merging on a later batch', function () { + context('when requesting verbose results', function () { + // An example verbose response from the server without errors: + // { + // cursor: { + // id: Long('0'), + // firstBatch: [ { ok: 1, idx: 0, n: 1 }, { ok: 1, idx: 1, n: 1 } ], + // ns: 'admin.$cmd.bulkWrite' + // }, + // nErrors: 0, + // nInserted: 2, + // nMatched: 0, + // nModified: 0, + // nUpserted: 0, + // nDeleted: 0, + // ok: 1 + // } + context('when there are no errors', function () { + const operations = [ + { insert: 0, document: { _id: 1 } }, + { update: 0 }, + { update: 0 }, + { delete: 0 } + ]; + const documents = [ + { ok: 1, idx: 0, n: 1 }, // Insert + { ok: 1, idx: 1, n: 1, nModified: 1 }, // Update match + { ok: 1, idx: 2, n: 0, upserted: { _id: 1 } }, // Update no match with upsert + { ok: 1, idx: 3, n: 1 } // Delete + ]; + const serverResponse = { + cursor: { + id: new Long('0'), + firstBatch: documents, + ns: 'admin.$cmd.bulkWrite' + }, + nErrors: 0, + nInserted: 1, + nMatched: 1, + nModified: 1, + nUpserted: 1, + nDeleted: 1, + ok: 1 + }; + const response = new ClientBulkWriteCursorResponse(BSON.serialize(serverResponse), 0); + const merger = new ClientBulkWriteResultsMerger({ verboseResults: true }); + let result: ClientBulkWriteResult; - it('merges the inserted count', function () { - expect(result.insertedCount).to.equal(1); - }); + before(function () { + result = merger.merge(20, operations, response, documents); + }); - it('sets no insert results', function () { - expect(result.insertResults).to.equal(undefined); - }); + it('merges the inserted count', function () { + expect(result.insertedCount).to.equal(1); + }); - it('merges the upserted count', function () { - expect(result.upsertedCount).to.equal(1); - }); + it('sets insert results', function () { + expect(result.insertResults.get(20).insertedId).to.equal(1); + }); - it('merges the matched count', function () { - expect(result.matchedCount).to.equal(1); - }); + it('merges the upserted count', function () { + expect(result.upsertedCount).to.equal(1); + }); - it('merges the modified count', function () { - expect(result.modifiedCount).to.equal(1); - }); + it('merges the matched count', function () { + expect(result.matchedCount).to.equal(1); + }); - it('sets no update results', function () { - expect(result.updateResults).to.equal(undefined); - }); + it('merges the modified count', function () { + expect(result.modifiedCount).to.equal(1); + }); - it('merges the deleted count', function () { - expect(result.deletedCount).to.equal(1); - }); + it('sets the update results', function () { + expect(result.updateResults.get(21)).to.deep.equal({ + matchedCount: 1, + modifiedCount: 1, + didUpsert: false + }); + }); + + it('sets the upsert results', function () { + expect(result.updateResults.get(22)).to.deep.equal({ + matchedCount: 0, + modifiedCount: 0, + upsertedId: 1, + didUpsert: true + }); + }); - it('sets no delete results', function () { - expect(result.deleteResults).to.equal(undefined); + it('merges the deleted count', function () { + expect(result.deletedCount).to.equal(1); + }); + + it('sets the delete results', function () { + expect(result.deleteResults.get(23).deletedCount).to.equal(1); + }); }); }); });