Skip to content

Commit

Permalink
fix(change_stream): emit close event after cursor is closed during error
Browse files Browse the repository at this point in the history
Fixes NODE-2075
  • Loading branch information
kvwalker authored and daprahamian committed Aug 13, 2019
1 parent 583f29f commit c2d80b2
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 4 deletions.
25 changes: 21 additions & 4 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,27 @@ class ChangeStream extends EventEmitter {
}

// Tidy up the existing cursor
var cursor = this.cursor;
['data', 'close', 'end', 'error'].forEach(event => this.cursor.removeAllListeners(event));
delete this.cursor;
return cursor.close(callback);
const cursor = this.cursor;

if (callback) {
return cursor.close(err => {
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
delete this.cursor;

return callback(err);
});
}

const PromiseCtor = this.promiseLibrary || Promise;
return new PromiseCtor((resolve, reject) => {
cursor.close(err => {
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
delete this.cursor;

if (err) return reject(err);
resolve();
});
});
}

/**
Expand Down
43 changes: 43 additions & 0 deletions test/functional/change_stream_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var co = require('co');
var mock = require('mongodb-mock-server');
const chai = require('chai');
const expect = chai.expect;
const sinon = require('sinon');

chai.use(require('chai-subset'));

Expand Down Expand Up @@ -1876,6 +1877,48 @@ describe('Change Streams', function() {
.then(() => teardown(), teardown);
});

it('should emit close event after error event', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
test: function(done) {
const configuration = this.configuration;
const client = configuration.newClient();
const closeSpy = sinon.spy();

client.connect(function(err, client) {
expect(err).to.not.exist;

const db = client.db('integration_tests');
const coll = db.collection('event_test');

// This will cause an error because the _id will be projected out, which causes the following error:
// "A change stream document has been received that lacks a resume token (_id)."
const changeStream = coll.watch([{ $project: { _id: false } }]);

changeStream.on('change', changeDoc => {
expect(changeDoc).to.be.null;
});

changeStream.on('error', err => {
expect(err).to.exist;
changeStream.close(() => {
expect(closeSpy.calledOnce).to.be.true;
client.close(done);
});
});

changeStream.on('close', closeSpy);

// Trigger the first database event
setTimeout(() => {
coll.insertOne({ a: 1 }, (err, result) => {
expect(err).to.not.exist;
expect(result.insertedCount).to.equal(1);
});
});
});
}
});

describe('should properly handle a changeStream event being processed mid-close', function() {
let client, coll;

Expand Down

0 comments on commit c2d80b2

Please sign in to comment.