Skip to content

Commit

Permalink
Merge pull request #14813 from Automattic/vkarpov15/mongodb-68
Browse files Browse the repository at this point in the history
feat: upgrade mongodb -> 6.8.0, handle throwing error on closed cursor in Mongoose
  • Loading branch information
vkarpov15 authored Aug 21, 2024
2 parents 8180a73 + a725a75 commit 13872ee
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 33 deletions.
52 changes: 31 additions & 21 deletions lib/cursor/changeStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ class ChangeStream extends EventEmitter {
);
}

// This wrapper is necessary because of buffering.
changeStreamThunk((err, driverChangeStream) => {
if (err != null) {
this.emit('error', err);
return;
}
this.$driverChangeStreamPromise = new Promise((resolve, reject) => {
// This wrapper is necessary because of buffering.
changeStreamThunk((err, driverChangeStream) => {
if (err != null) {
this.emit('error', err);
return reject(err);
}

this.driverChangeStream = driverChangeStream;
this.emit('ready');
this.driverChangeStream = driverChangeStream;
this.emit('ready');
resolve();
});
});
}

Expand All @@ -53,20 +56,23 @@ class ChangeStream extends EventEmitter {
this.bindedEvents = true;

if (this.driverChangeStream == null) {
this.once('ready', () => {
this.driverChangeStream.on('close', () => {
this.closed = true;
});
this.$driverChangeStreamPromise.then(
() => {
this.driverChangeStream.on('close', () => {
this.closed = true;
});

driverChangeStreamEvents.forEach(ev => {
this.driverChangeStream.on(ev, data => {
if (data != null && data.fullDocument != null && this.options && this.options.hydrate) {
data.fullDocument = this.options.model.hydrate(data.fullDocument);
}
this.emit(ev, data);
driverChangeStreamEvents.forEach(ev => {
this.driverChangeStream.on(ev, data => {
if (data != null && data.fullDocument != null && this.options && this.options.hydrate) {
data.fullDocument = this.options.model.hydrate(data.fullDocument);
}
this.emit(ev, data);
});
});
});
});
},
() => {} // No need to register events if opening change stream failed
);

return;
}
Expand Down Expand Up @@ -142,8 +148,12 @@ class ChangeStream extends EventEmitter {
this.closed = true;
if (this.driverChangeStream) {
return this.driverChangeStream.close();
} else {
return this.$driverChangeStreamPromise.then(
() => this.driverChangeStream.close(),
() => {} // No need to close if opening the change stream failed
);
}
return Promise.resolve();
}
}

Expand Down
5 changes: 5 additions & 0 deletions lib/cursor/queryCursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ function QueryCursor(query) {
this.cursor = null;
this.skipped = false;
this.query = query;
this._closed = false;
const model = query.model;
this._mongooseOptions = {};
this._transforms = [];
Expand Down Expand Up @@ -229,6 +230,7 @@ QueryCursor.prototype.close = async function close() {
}
try {
await this.cursor.close();
this._closed = true;
this.emit('close');
} catch (error) {
this.listeners('error').length > 0 && this.emit('error', error);
Expand Down Expand Up @@ -266,6 +268,9 @@ QueryCursor.prototype.next = async function next() {
if (typeof arguments[0] === 'function') {
throw new MongooseError('QueryCursor.prototype.next() no longer accepts a callback');
}
if (this._closed) {
throw new MongooseError('Cannot call `next()` on a closed cursor');
}
return new Promise((resolve, reject) => {
_next(this, function(error, doc) {
if (error) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"dependencies": {
"bson": "^6.7.0",
"kareem": "2.6.3",
"mongodb": "6.7.0",
"mongodb": "6.8.0",
"mpath": "0.9.0",
"mquery": "5.0.0",
"ms": "2.1.3",
Expand Down
2 changes: 1 addition & 1 deletion scripts/tsc-diagnostics-check.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const fs = require('fs');

const stdin = fs.readFileSync(0).toString('utf8');
const maxInstantiations = isNaN(process.argv[2]) ? 127500 : parseInt(process.argv[2], 10);
const maxInstantiations = isNaN(process.argv[2]) ? 135000 : parseInt(process.argv[2], 10);

console.log(stdin);

Expand Down
2 changes: 2 additions & 0 deletions test/connection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,8 @@ describe('connections:', function() {
await nextChange;
assert.equal(changes.length, 1);
assert.equal(changes[0].operationType, 'insert');

await changeStream.close();
await conn.close();
});

Expand Down
48 changes: 41 additions & 7 deletions test/model.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const sinon = require('sinon');
const start = require('./common');

const assert = require('assert');
const { once } = require('events');
const random = require('./util').random;
const util = require('./util');

Expand Down Expand Up @@ -3508,6 +3509,9 @@ describe('Model', function() {
}
changeStream.removeListener('change', listener);
listener = null;
// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});
changeStream.close();
changeStream = null;
});
Expand Down Expand Up @@ -3560,14 +3564,21 @@ describe('Model', function() {
it('fullDocument (gh-11936)', async function() {
const MyModel = db.model('Test', new Schema({ name: String }));

const doc = await MyModel.create({ name: 'Ned Stark' });
const changeStream = await MyModel.watch([], {
fullDocument: 'updateLookup',
hydrate: true
});
await changeStream.$driverChangeStreamPromise;

const doc = await MyModel.create({ name: 'Ned Stark' });

const p = changeStream.next();
const p = new Promise((resolve) => {
changeStream.once('change', change => {
resolve(change);
});
});
// Need to wait for resume token to be set after the event listener,
// otherwise change stream might not pick up the update.
await once(changeStream.driverChangeStream, 'resumeTokenChanged');
await MyModel.updateOne({ _id: doc._id }, { name: 'Tony Stark' });

const changeData = await p;
Expand All @@ -3576,22 +3587,31 @@ describe('Model', function() {
doc._id.toHexString());
assert.ok(changeData.fullDocument.$__);
assert.equal(changeData.fullDocument.get('name'), 'Tony Stark');

await changeStream.close();
});

it('fullDocument with immediate watcher and hydrate (gh-14049)', async function() {
const MyModel = db.model('Test', new Schema({ name: String }));

const doc = await MyModel.create({ name: 'Ned Stark' });

let changeStream = null;
const p = new Promise((resolve) => {
MyModel.watch([], {
changeStream = MyModel.watch([], {
fullDocument: 'updateLookup',
hydrate: true
}).on('change', change => {
});

changeStream.on('change', change => {
resolve(change);
});
});

// Need to wait for cursor to be initialized and for resume token to
// be set, otherwise change stream might not pick up the update.
await changeStream.$driverChangeStreamPromise;
await once(changeStream.driverChangeStream, 'resumeTokenChanged');
await MyModel.updateOne({ _id: doc._id }, { name: 'Tony Stark' });

const changeData = await p;
Expand All @@ -3600,6 +3620,8 @@ describe('Model', function() {
doc._id.toHexString());
assert.ok(changeData.fullDocument.$__);
assert.equal(changeData.fullDocument.get('name'), 'Tony Stark');

await changeStream.close();
});

it('respects discriminators (gh-11007)', async function() {
Expand Down Expand Up @@ -3639,6 +3661,9 @@ describe('Model', function() {
assert.equal(changeData.operationType, 'insert');
assert.equal(changeData.fullDocument.name, 'Ned Stark');

// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});
await changeStream.close();
await db.close();
});
Expand All @@ -3654,11 +3679,16 @@ describe('Model', function() {
setTimeout(resolve, 500, false);
});

changeStream.close();
await db;
// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});

const close = changeStream.close();
await db.asPromise();
const readyCalled = await ready;
assert.strictEqual(readyCalled, false);

await close;
await db.close();
});

Expand All @@ -3675,6 +3705,10 @@ describe('Model', function() {

await MyModel.create({ name: 'Hodor' });

// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});

changeStream.close();
const closedData = await closed;
assert.strictEqual(closedData, true);
Expand Down
12 changes: 10 additions & 2 deletions test/model.watch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,22 @@ describe('model: watch: ', function() {
const changeData = await changed;
assert.equal(changeData.operationType, 'insert');
assert.equal(changeData.fullDocument.name, 'Ned Stark');
await changeStream.close();
});

it('watch() close() prevents buffered watch op from running (gh-7022)', async function() {
const MyModel = db.model('Test', new Schema({}));
const changeStream = MyModel.watch();
const ready = new global.Promise(resolve => {
const ready = new Promise(resolve => {
changeStream.once('data', () => {
resolve(true);
});
setTimeout(resolve, 500, false);
});

// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});
const close = changeStream.close();
await db.asPromise();
const readyCalled = await ready;
Expand All @@ -64,12 +68,16 @@ describe('model: watch: ', function() {
await MyModel.init();

const changeStream = MyModel.watch();
const closed = new global.Promise(resolve => {
const closed = new Promise(resolve => {
changeStream.once('close', () => resolve(true));
});

await MyModel.create({ name: 'Hodor' });

// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});

await changeStream.close();

const closedData = await closed;
Expand Down
3 changes: 2 additions & 1 deletion test/query.cursor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ describe('QueryCursor', function() {
await cursor.next();
assert.ok(false);
} catch (error) {
assert.equal(error.name, 'MongoCursorExhaustedError');
assert.equal(error.name, 'MongooseError');
assert.ok(error.message.includes('closed cursor'), error.message);
}
});
});
Expand Down

0 comments on commit 13872ee

Please sign in to comment.