Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-5636): generate _ids using pkFactory in bulk write operations #4025

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 21 additions & 30 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { promisify } from 'util';

import { type BSONSerializeOptions, type Document, ObjectId, resolveBSONOptions } from '../bson';
import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '../bson';
import type { Collection } from '../collection';
import {
type AnyError,
Expand All @@ -12,6 +12,7 @@ import {
} from '../error';
import type { Filter, OneOrMore, OptionalId, UpdateFilter, WithoutId } from '../mongo_types';
import type { CollationOptions, CommandOperationOptions } from '../operations/command';
import { maybeAddIdToDocuments } from '../operations/common_functions';
import { DeleteOperation, type DeleteStatement, makeDeleteStatement } from '../operations/delete';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
Expand Down Expand Up @@ -917,7 +918,7 @@ export abstract class BulkOperationBase {
* Create a new OrderedBulkOperation or UnorderedBulkOperation instance
* @internal
*/
constructor(collection: Collection, options: BulkWriteOptions, isOrdered: boolean) {
constructor(private collection: Collection, options: BulkWriteOptions, isOrdered: boolean) {
// determine whether bulkOperation is ordered or unordered
this.isOrdered = isOrdered;

Expand Down Expand Up @@ -1032,9 +1033,9 @@ export abstract class BulkOperationBase {
* ```
*/
insert(document: Document): BulkOperationBase {
if (document._id == null && !shouldForceServerObjectId(this)) {
document._id = new ObjectId();
}
maybeAddIdToDocuments(this.collection, document, {
forceServerObjectId: this.shouldForceServerObjectId()
});

return this.addToOperationsList(BatchType.INSERT, document);
}
Expand Down Expand Up @@ -1093,21 +1094,16 @@ export abstract class BulkOperationBase {
throw new MongoInvalidArgumentError('Operation must be an object with an operation key');
}
if ('insertOne' in op) {
const forceServerObjectId = shouldForceServerObjectId(this);
if (op.insertOne && op.insertOne.document == null) {
// NOTE: provided for legacy support, but this is a malformed operation
if (forceServerObjectId !== true && (op.insertOne as Document)._id == null) {
(op.insertOne as Document)._id = new ObjectId();
}

return this.addToOperationsList(BatchType.INSERT, op.insertOne);
}
const forceServerObjectId = this.shouldForceServerObjectId();
const document =
op.insertOne && op.insertOne.document == null
? // TODO(NODE-6003): remove support for omitting the `documents` subdocument in bulk inserts
(op.insertOne as Document)
: op.insertOne.document;

if (forceServerObjectId !== true && op.insertOne.document._id == null) {
op.insertOne.document._id = new ObjectId();
}
maybeAddIdToDocuments(this.collection, document, { forceServerObjectId });

return this.addToOperationsList(BatchType.INSERT, op.insertOne.document);
return this.addToOperationsList(BatchType.INSERT, document);
}

if ('replaceOne' in op || 'updateOne' in op || 'updateMany' in op) {
Expand Down Expand Up @@ -1268,6 +1264,13 @@ export abstract class BulkOperationBase {
batchType: BatchType,
document: Document | UpdateStatement | DeleteStatement
): this;

private shouldForceServerObjectId(): boolean {
return (
this.s.options.forceServerObjectId === true ||
this.s.collection.s.db.options?.forceServerObjectId === true
);
}
}

Object.defineProperty(BulkOperationBase.prototype, 'length', {
Expand All @@ -1277,18 +1280,6 @@ Object.defineProperty(BulkOperationBase.prototype, 'length', {
}
});

function shouldForceServerObjectId(bulkOperation: BulkOperationBase): boolean {
if (typeof bulkOperation.s.options.forceServerObjectId === 'boolean') {
return bulkOperation.s.options.forceServerObjectId;
}

if (typeof bulkOperation.s.collection.s.db.options?.forceServerObjectId === 'boolean') {
return bulkOperation.s.collection.s.db.options?.forceServerObjectId;
}

return false;
}

function isInsertBatch(batch: Batch): boolean {
return batch.batchType === BatchType.INSERT;
}
Expand Down
2 changes: 1 addition & 1 deletion src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export interface Auth {

/** @public */
export interface PkFactory {
createPk(): any; // TODO: when js-bson is typed, function should return some BSON type
createPk(): any;
}

/** @public */
Expand Down
21 changes: 16 additions & 5 deletions src/operations/common_functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,37 @@ export async function indexInformation(
return info;
}

export function prepareDocs(
export function maybeAddIdToDocuments(
coll: Collection,
docs: Document[],
options: { forceServerObjectId?: boolean }
): Document[] {
): Document[];
export function maybeAddIdToDocuments(
coll: Collection,
docs: Document,
options: { forceServerObjectId?: boolean }
): Document;
export function maybeAddIdToDocuments(
coll: Collection,
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
docOrDocs: Document[] | Document,
options: { forceServerObjectId?: boolean }
): Document[] | Document {
const forceServerObjectId =
typeof options.forceServerObjectId === 'boolean'
? options.forceServerObjectId
: coll.s.db.options?.forceServerObjectId;

// no need to modify the docs if server sets the ObjectId
if (forceServerObjectId === true) {
return docs;
return docOrDocs;
}

return docs.map(doc => {
const transform = (doc: Document): Document => {
if (doc._id == null) {
doc._id = coll.s.pkFactory.createPk();
}

return doc;
});
};
return Array.isArray(docOrDocs) ? docOrDocs.map(transform) : transform(docOrDocs);
}
8 changes: 5 additions & 3 deletions src/operations/insert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type { MongoDBNamespace } from '../utils';
import { WriteConcern } from '../write_concern';
import { BulkWriteOperation } from './bulk_write';
import { CommandOperation, type CommandOperationOptions } from './command';
import { prepareDocs } from './common_functions';
import { maybeAddIdToDocuments } from './common_functions';
import { AbstractOperation, Aspect, defineAspects } from './operation';

/** @internal */
Expand Down Expand Up @@ -69,7 +69,7 @@ export interface InsertOneResult<TSchema = Document> {

export class InsertOneOperation extends InsertOperation {
constructor(collection: Collection, doc: Document, options: InsertOneOptions) {
super(collection.s.namespace, prepareDocs(collection, [doc], options), options);
super(collection.s.namespace, maybeAddIdToDocuments(collection, [doc], options), options);
}

override async execute(
Expand Down Expand Up @@ -131,7 +131,9 @@ export class InsertManyOperation extends AbstractOperation<InsertManyResult> {
const writeConcern = WriteConcern.fromOptions(options);
const bulkWriteOperation = new BulkWriteOperation(
coll,
prepareDocs(coll, this.docs, options).map(document => ({ insertOne: { document } })),
this.docs.map(document => ({
insertOne: { document }
})),
options
);

Expand Down
72 changes: 71 additions & 1 deletion test/integration/crud/bulk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as crypto from 'crypto';

import {
type Collection,
Double,
Long,
MongoBatchReExecutionError,
MongoBulkWriteError,
Expand Down Expand Up @@ -65,16 +66,85 @@ describe('Bulk', function () {
context('when called with a valid operation', function () {
it('should not throw a MongoInvalidArgument error', async function () {
try {
client.db('test').collection('test').initializeUnorderedBulkOp().raw({ insertOne: {} });
client
.db('test')
.collection('test')
.initializeUnorderedBulkOp()
.raw({ insertOne: { document: {} } });
} catch (error) {
expect(error).not.to.exist;
}
});
});

it('supports the legacy specification (no nested document field)', async function () {
await client
.db('test')
.collection('test')
.initializeUnorderedBulkOp()
// @ts-expect-error Not allowed in TS, but allowed for legacy compat
.raw({ insertOne: { name: 'john doe' } })
.execute();
const result = await client.db('test').collection('test').findOne({ name: 'john doe' });
expect(result).to.exist;
});
});
});

describe('Collection', function () {
describe('when a pkFactory is set on the client', function () {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
let client: MongoClient;
const pkFactory = {
count: 0,
createPk: function () {
return new Double(this.count++);
}
};
let collection: Collection;

beforeEach(async function () {
client = this.configuration.newClient({}, { pkFactory, promoteValues: false });
collection = client.db('integration').collection('pk_factory_tests');
await collection.deleteMany({});
});

afterEach(() => client.close());

it('insertMany() generates _ids using the pkFactory', async function () {
await collection.insertMany([{ name: 'john doe' }]);
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.have.property('_bsontype').to.equal('Double');
});

it('bulkWrite() generates _ids using the pkFactory', async function () {
await collection.bulkWrite([{ insertOne: { document: { name: 'john doe' } } }]);
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.have.property('_bsontype').to.equal('Double');
});

it('ordered bulk operations generate _ids using pkFactory', async function () {
await collection.initializeOrderedBulkOp().insert({ name: 'john doe' }).execute();
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.have.property('_bsontype').to.equal('Double');
});

it('unordered bulk operations generate _ids using pkFactory', async function () {
await collection.initializeUnorderedBulkOp().insert({ name: 'john doe' }).execute();
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.have.property('_bsontype').to.equal('Double');
});

it('bulkOperation.raw() with the legacy syntax (no nested document field) generates _ids using pkFactory', async function () {
await collection
.initializeOrderedBulkOp()
// @ts-expect-error Not allowed by TS, but still permitted.
.raw({ insertOne: { name: 'john doe' } })
.execute();
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.have.property('_bsontype').to.equal('Double');
});
});

describe('#insertMany()', function () {
context('when passed an invalid docs argument', function () {
it('should throw a MongoInvalidArgument error', async function () {
Expand Down
29 changes: 28 additions & 1 deletion test/integration/crud/insert.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,31 @@ describe('crud - insert', function () {
await client.close();
});

describe('when a pkFactory is set on the client', function () {
let client;
const pkFactory = {
count: 0,
createPk: function () {
return new Double(this.count++);
}
};
let collection;

beforeEach(async function () {
client = this.configuration.newClient({}, { pkFactory, promoteValues: false });
collection = client.db('integration').collection('pk_factory_tests');
await collection.deleteMany({});
});

afterEach(() => client.close());

it('insertOne() generates _ids using the pkFactory', async function () {
await collection.insertOne({ name: 'john doe' });
const result = await collection.findOne({ name: 'john doe' });
expect(result).to.have.property('_id').to.have.property('_bsontype').to.equal('Double');
});
});

it('Should correctly execute Collection.prototype.insertOne', function (done) {
const configuration = this.configuration;
let url = configuration.url();
Expand Down Expand Up @@ -135,6 +160,7 @@ describe('crud - insert', function () {
it('insertMany returns the insertedIds and we can look up the documents', async function () {
const db = client.db();
const collection = db.collection('test_multiple_insert');
await collection.deleteMany({});
const docs = [{ a: 1 }, { a: 2 }];

const r = await collection.insertMany(docs);
Expand Down Expand Up @@ -839,6 +865,7 @@ describe('crud - insert', function () {

const db = client.db();
const collection = db.collection('Should_correctly_insert_object_with_timestamps');
await collection.deleteMany({});

const { insertedId } = await collection.insertOne(doc);
expect(insertedId.equals(doc._id)).to.be.true;
Expand Down Expand Up @@ -1700,7 +1727,7 @@ describe('crud - insert', function () {
try {
db.collection(k.toString());
test.fail(false);
} catch (err) { } // eslint-disable-line
} catch (err) {} // eslint-disable-line

client.close(done);
});
Expand Down
51 changes: 0 additions & 51 deletions test/integration/crud/pk_factory.test.js

This file was deleted.