Skip to content

Commit

Permalink
fix(bulk): properly calculate batch size for bulk writes
Browse files Browse the repository at this point in the history
Bulk Writes were failing to account for the size of the array
index key when calculating the batch size in a bulk write.
We now calculate the size of the key based off of the max batch
size, and incorporate that number in to the byte size calculations.

Fixes NODE-1778
  • Loading branch information
daprahamian authored Jan 7, 2019
1 parent ff82ff4 commit aafe71b
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 17 deletions.
8 changes: 8 additions & 0 deletions lib/bulk/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,13 @@ class BulkOperationBase {
const maxWriteBatchSize =
isMaster && isMaster.maxWriteBatchSize ? isMaster.maxWriteBatchSize : 1000;

// Calculates the largest possible size of an Array key, represented as a BSON string
// element. This calculation:
// 1 byte for BSON type
// # of bytes = length of (string representation of (maxWriteBatchSize - 1))
// + 1 bytes for null terminator
const maxKeySize = (maxWriteBatchSize - 1).toString(10).length + 2;

// Final options for retryable writes and write concern
let finalOptions = Object.assign({}, options);
finalOptions = applyRetryableWrites(finalOptions, collection.s.db);
Expand Down Expand Up @@ -745,6 +752,7 @@ class BulkOperationBase {
// Max batch size options
maxBatchSizeBytes: maxBatchSizeBytes,
maxWriteBatchSize: maxWriteBatchSize,
maxKeySize,
// Namespace
namespace: namespace,
// BSON
Expand Down
19 changes: 9 additions & 10 deletions lib/bulk/ordered.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ function addToOperationsList(bulkOperation, docType, document) {
if (bulkOperation.s.currentBatch == null)
bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);

const maxKeySize = bulkOperation.s.maxKeySize;

// Check if we need to create a new batch
if (
bulkOperation.s.currentBatchSize + 1 >= bulkOperation.s.maxWriteBatchSize ||
bulkOperation.s.currentBatchSizeBytes + bulkOperation.s.currentBatchSizeBytes >=
bulkOperation.s.currentBatchSizeBytes + maxKeySize + bsonSize >=
bulkOperation.s.maxBatchSizeBytes ||
bulkOperation.s.currentBatch.batchType !== docType
) {
Expand All @@ -51,10 +53,6 @@ function addToOperationsList(bulkOperation, docType, document) {
// Reset the current size trackers
bulkOperation.s.currentBatchSize = 0;
bulkOperation.s.currentBatchSizeBytes = 0;
} else {
// Update current batch size
bulkOperation.s.currentBatchSize = bulkOperation.s.currentBatchSize + 1;
bulkOperation.s.currentBatchSizeBytes = bulkOperation.s.currentBatchSizeBytes + bsonSize;
}

if (docType === common.INSERT) {
Expand All @@ -67,13 +65,14 @@ function addToOperationsList(bulkOperation, docType, document) {
// We have an array of documents
if (Array.isArray(document)) {
throw toError('operation passed in cannot be an Array');
} else {
bulkOperation.s.currentBatch.originalIndexes.push(bulkOperation.s.currentIndex);
bulkOperation.s.currentBatch.operations.push(document);
bulkOperation.s.currentBatchSizeBytes = bulkOperation.s.currentBatchSizeBytes + bsonSize;
bulkOperation.s.currentIndex = bulkOperation.s.currentIndex + 1;
}

bulkOperation.s.currentBatch.originalIndexes.push(bulkOperation.s.currentIndex);
bulkOperation.s.currentBatch.operations.push(document);
bulkOperation.s.currentBatchSize += 1;
bulkOperation.s.currentBatchSizeBytes += maxKeySize + bsonSize;
bulkOperation.s.currentIndex += 1;

// Return bulkOperation
return bulkOperation;
}
Expand Down
17 changes: 10 additions & 7 deletions lib/bulk/unordered.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@ function addToOperationsList(bulkOperation, docType, document) {
bulkOperation.s.currentBatch = bulkOperation.s.currentRemoveBatch;
}

const maxKeySize = bulkOperation.s.maxKeySize;

// Create a new batch object if we don't have a current one
if (bulkOperation.s.currentBatch == null)
bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);

// Check if we need to create a new batch
if (
bulkOperation.s.currentBatch.size + 1 >= bulkOperation.s.maxWriteBatchSize ||
bulkOperation.s.currentBatch.sizeBytes + bsonSize >= bulkOperation.s.maxBatchSizeBytes ||
bulkOperation.s.currentBatch.sizeBytes + maxKeySize + bsonSize >=
bulkOperation.s.maxBatchSizeBytes ||
bulkOperation.s.currentBatch.batchType !== docType
) {
// Save the batch to the execution stack
Expand All @@ -60,12 +63,12 @@ function addToOperationsList(bulkOperation, docType, document) {
// We have an array of documents
if (Array.isArray(document)) {
throw toError('operation passed in cannot be an Array');
} else {
bulkOperation.s.currentBatch.operations.push(document);
bulkOperation.s.currentBatch.originalIndexes.push(bulkOperation.s.currentIndex);
bulkOperation.s.currentIndex = bulkOperation.s.currentIndex + 1;
}

bulkOperation.s.currentBatch.operations.push(document);
bulkOperation.s.currentBatch.originalIndexes.push(bulkOperation.s.currentIndex);
bulkOperation.s.currentIndex = bulkOperation.s.currentIndex + 1;

// Save back the current Batch to the right type
if (docType === common.INSERT) {
bulkOperation.s.currentInsertBatch = bulkOperation.s.currentBatch;
Expand All @@ -80,8 +83,8 @@ function addToOperationsList(bulkOperation, docType, document) {
}

// Update current batch size
bulkOperation.s.currentBatch.size = bulkOperation.s.currentBatch.size + 1;
bulkOperation.s.currentBatch.sizeBytes = bulkOperation.s.currentBatch.sizeBytes + bsonSize;
bulkOperation.s.currentBatch.size += 1;
bulkOperation.s.currentBatch.sizeBytes += maxKeySize + bsonSize;

// Return bulkOperation
return bulkOperation;
Expand Down
58 changes: 58 additions & 0 deletions test/functional/bulk_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1513,4 +1513,62 @@ describe('Bulk', function() {
client.close();
});
});

it('should properly account for array key size in bulk unordered inserts', function(done) {
const client = this.configuration.newClient({ w: 1 }, { monitorCommands: true });
const documents = new Array(20000).fill('').map(() => ({
arr: new Array(19).fill('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
}));

let db;

client
.connect()
// NOTE: Hack to get around unrelated strange error in bulkWrites for right now.
.then(() => {
db = client.db(this.configuration.db);
return db.dropCollection('doesnt_matter').catch(() => {});
})
.then(() => {
return db.createCollection('doesnt_matter');
})
.then(() => {
const coll = db.collection('doesnt_matter');

coll.insert(documents, { ordered: false }, err => {
client.close(() => {
done(err);
});
});
});
});

it('should properly account for array key size in bulk ordered inserts', function(done) {
const client = this.configuration.newClient();
const documents = new Array(20000).fill('').map(() => ({
arr: new Array(19).fill('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
}));

let db;

client
.connect()
// NOTE: Hack to get around unrelated strange error in bulkWrites for right now.
.then(() => {
db = client.db(this.configuration.db);
return db.dropCollection('doesnt_matter').catch(() => {});
})
.then(() => {
return db.createCollection('doesnt_matter');
})
.then(() => {
const coll = db.collection('doesnt_matter');

coll.insert(documents, { ordered: false }, err => {
client.close(() => {
done(err);
});
});
});
});
});

0 comments on commit aafe71b

Please sign in to comment.