Skip to content

Commit

Permalink
feat: add middleware for bulkWrite() and createCollection()
Browse files Browse the repository at this point in the history
Fix #7893
Fix #14263
  • Loading branch information
vkarpov15 committed Feb 19, 2024
1 parent 7732ce2 commit aa65645
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 61 deletions.
4 changes: 4 additions & 0 deletions docs/middleware.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ Model middleware is supported for the following model functions.
Don't confuse model middleware and document middleware: model middleware hooks into *static* functions on a `Model` class, document middleware hooks into *methods* on a `Model` class.
In model middleware functions, `this` refers to the model.

* [bulkWrite](api/model.html#model_Model-bulkWrite)
* [createCollection](api/model.html#model_Model-createCollection)
* [insertMany](api/model.html#model_Model-insertMany)

Here are the possible strings that can be passed to `pre()`

* aggregate
* bulkWrite
* count
* countDocuments
* createCollection
* deleteOne
* deleteMany
* estimatedDocumentCount
Expand Down
185 changes: 126 additions & 59 deletions lib/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,18 @@ Model.createCollection = async function createCollection(options) {
throw new MongooseError('Model.createCollection() no longer accepts a callback');
}

const shouldSkip = await new Promise((resolve, reject) => {
this.hooks.execPre('createCollection', this, [options], (err) => {
if (err != null) {
if (err instanceof Kareem.skipWrappedFunction) {
return resolve(true);
}
return reject(err);
}
resolve();
});
});

const collectionOptions = this &&
this.schema &&
this.schema.options &&
Expand Down Expand Up @@ -1468,13 +1480,32 @@ Model.createCollection = async function createCollection(options) {
}

try {
await this.db.createCollection(this.$__collection.collectionName, options);
if (!shouldSkip) {
await this.db.createCollection(this.$__collection.collectionName, options);
}
} catch (err) {

if (err != null && (err.name !== 'MongoServerError' || err.code !== 48)) {
throw err;
await new Promise((resolve, reject) => {
const _opts = { error: err };
this.hooks.execPost('createCollection', this, [null], _opts, (err) => {
if (err != null) {
return reject(err);
}
resolve();
});
});
}
}

await new Promise((resolve, reject) => {
this.hooks.execPost('createCollection', this, [this.$__collection], (err) => {
if (err != null) {
return reject(err);
}
resolve();
});
});

return this.$__collection;
};

Expand Down Expand Up @@ -3428,44 +3459,62 @@ Model.bulkWrite = async function bulkWrite(ops, options) {
throw new MongooseError('Model.bulkWrite() no longer accepts a callback');
}
options = options || {};

const shouldSkip = await new Promise((resolve, reject) => {
this.hooks.execPre('bulkWrite', this, [ops, options], (err) => {
if (err != null) {
if (err instanceof Kareem.skipWrappedFunction) {
return resolve(err);
}
return reject(err);
}
resolve();
});
});

if (shouldSkip) {
return shouldSkip.args[0];
}

const ordered = options.ordered == null ? true : options.ordered;

if (ops.length === 0) {
return getDefaultBulkwriteResult();
}

const validations = ops.map(op => castBulkWrite(this, op, options));

return new Promise((resolve, reject) => {
if (ordered) {
let res = null;
if (ordered) {
await new Promise((resolve, reject) => {
each(validations, (fn, cb) => fn(cb), error => {
if (error) {
return reject(error);
}

if (ops.length === 0) {
return resolve(getDefaultBulkwriteResult());
}

try {
this.$__collection.bulkWrite(ops, options, (error, res) => {
if (error) {
return reject(error);
}

resolve(res);
});
} catch (err) {
return reject(err);
}
resolve();
});
});

return;
try {
res = await this.$__collection.bulkWrite(ops, options);
} catch (error) {
await new Promise((resolve, reject) => {
const _opts = { error: error };
this.hooks.execPost('bulkWrite', this, [null], _opts, (err) => {
if (err != null) {
return reject(err);
}
resolve();
});
});
}

} else {
let remaining = validations.length;
let validOps = [];
let validationErrors = [];
const results = [];
if (remaining === 0) {
completeUnorderedValidation.call(this);
} else {
await new Promise((resolve) => {
for (let i = 0; i < validations.length; ++i) {
validations[i]((err) => {
if (err == null) {
Expand All @@ -3475,56 +3524,74 @@ Model.bulkWrite = async function bulkWrite(ops, options) {
results[i] = err;
}
if (--remaining <= 0) {
completeUnorderedValidation.call(this);
resolve();
}
});
}
}
});

validationErrors = validationErrors.
sort((v1, v2) => v1.index - v2.index).
map(v => v.error);

function completeUnorderedValidation() {
const validOpIndexes = validOps;
validOps = validOps.sort().map(index => ops[index]);
const validOpIndexes = validOps;
validOps = validOps.sort().map(index => ops[index]);

if (validOps.length === 0) {
return resolve(getDefaultBulkwriteResult());
}
if (validOps.length === 0) {
return getDefaultBulkwriteResult();
}

this.$__collection.bulkWrite(validOps, options, (error, res) => {
if (error) {
if (validationErrors.length > 0) {
error.mongoose = error.mongoose || {};
error.mongoose.validationErrors = validationErrors;
}
let error;
[res, error] = await this.$__collection.bulkWrite(validOps, options).
then(res => ([res, null])).
catch(err => ([null, err]));

return reject(error);
}
if (error) {
if (validationErrors.length > 0) {
error.mongoose = error.mongoose || {};
error.mongoose.validationErrors = validationErrors;
}

for (let i = 0; i < validOpIndexes.length; ++i) {
results[validOpIndexes[i]] = null;
}
if (validationErrors.length > 0) {
if (options.throwOnValidationError) {
return reject(new MongooseBulkWriteError(
validationErrors,
results,
res,
'bulkWrite'
));
} else {
res.mongoose = res.mongoose || {};
res.mongoose.validationErrors = validationErrors;
res.mongoose.results = results;
await new Promise((resolve, reject) => {
const _opts = { error: error };
this.hooks.execPost('bulkWrite', this, [null], _opts, (err) => {
if (err != null) {
return reject(err);
}
}

resolve(res);
resolve();
});
});
}

for (let i = 0; i < validOpIndexes.length; ++i) {
results[validOpIndexes[i]] = null;
}
if (validationErrors.length > 0) {
if (options.throwOnValidationError) {
throw new MongooseBulkWriteError(
validationErrors,
results,
res,
'bulkWrite'
);
} else {
res.mongoose = res.mongoose || {};
res.mongoose.validationErrors = validationErrors;
res.mongoose.results = results;
}
}
}

await new Promise((resolve, reject) => {
this.hooks.execPost('bulkWrite', this, [res], (err) => {
if (err != null) {
return reject(err);
}
resolve();
});
});

return res;
};

/**
Expand Down
Loading

0 comments on commit aa65645

Please sign in to comment.