Skip to content

Commit

Permalink
fix(change-stream): properly support resumablity in stream mode
Browse files Browse the repository at this point in the history
A number of changes were required to support this bug report, first
being that we need to process errors emitted by the stream on the
'error' event. This also introduces a minimal server selection
mechanism that will have to be depended upon until the new SDAM
layer becomes the default

NODE-1617
  • Loading branch information
mbroadst committed Aug 17, 2018
1 parent 918a1e0 commit c43a34b
Showing 1 changed file with 99 additions and 50 deletions.
149 changes: 99 additions & 50 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ChangeStream extends EventEmitter {

if (changeDomain instanceof Collection) {
this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
this.serverConfig = changeDomain.s.db.serverConfig;
this.topology = changeDomain.s.db.serverConfig;

this.namespace = {
collection: changeDomain.collectionName,
Expand All @@ -58,12 +58,12 @@ class ChangeStream extends EventEmitter {
this.type = CHANGE_DOMAIN_TYPES.DATABASE;
this.namespace = { collection: '', database: changeDomain.databaseName };
this.cursorNamespace = this.namespace.database;
this.serverConfig = changeDomain.serverConfig;
this.topology = changeDomain.serverConfig;
} else if (changeDomain instanceof MongoClient) {
this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
this.namespace = { collection: '', database: 'admin' };
this.cursorNamespace = this.namespace.database;
this.serverConfig = changeDomain.topology;
this.topology = changeDomain.topology;
} else {
throw new TypeError(
'changeDomain provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient'
Expand All @@ -76,9 +76,9 @@ class ChangeStream extends EventEmitter {
}

// We need to get the operationTime as early as possible
const isMaster = this.serverConfig.lastIsMaster();
const isMaster = this.topology.lastIsMaster();
if (!isMaster) {
throw new MongoError('ServerConfig does not have an ismaster yet.');
throw new MongoError('Topology does not have an ismaster yet.');
}

this.operationTime = isMaster.operationTime;
Expand All @@ -89,7 +89,9 @@ class ChangeStream extends EventEmitter {
// Listen for any `change` listeners being added to ChangeStream
this.on('newListener', eventName => {
if (eventName === 'change' && this.cursor && this.cursor.listenerCount('change') === 0) {
this.cursor.on('data', change => processNewChange(this, null, change));
this.cursor.on('data', change =>
processNewChange({ changeStream: this, change, eventEmitter: true })
);
}
});

Expand Down Expand Up @@ -125,14 +127,11 @@ class ChangeStream extends EventEmitter {
if (callback) return callback(new Error('Change Stream is not open.'), null);
return self.promiseLibrary.reject(new Error('Change Stream is not open.'));
}

return this.cursor
.next()
.then(function(change) {
return processNewChange(self, null, change, callback);
})
.catch(function(err) {
return processNewChange(self, err, null, callback);
});
.then(change => processNewChange({ changeStream: self, change, callback }))
.catch(error => processNewChange({ changeStream: self, error, callback }));
}

/**
Expand Down Expand Up @@ -230,14 +229,16 @@ var createChangeStreamCursor = function(self) {
var changeStreamCursor = buildChangeStreamAggregationCommand(self);

/**
* Fired for each new matching change in the specified namespace. Attaching a `change` event listener to a Change Stream will switch the stream into flowing mode. Data will then be passed as soon as it is available.
* Fired for each new matching change in the specified namespace. Attaching a `change`
* event listener to a Change Stream will switch the stream into flowing mode. Data will
* then be passed as soon as it is available.
*
* @event ChangeStream#change
* @type {object}
*/
if (self.listenerCount('change') > 0) {
changeStreamCursor.on('data', function(change) {
processNewChange(self, null, change);
processNewChange({ changeStream: self, change, eventEmitter: true });
});
}

Expand Down Expand Up @@ -268,7 +269,7 @@ var createChangeStreamCursor = function(self) {
* @type {Error}
*/
changeStreamCursor.on('error', function(error) {
self.emit('error', error);
processNewChange({ changeStream: self, error, eventEmitter: true });
});

if (self.pipeDestinations) {
Expand All @@ -286,14 +287,14 @@ function getResumeToken(self) {
}

function getStartAtOperationTime(self) {
const isMaster = self.serverConfig.lastIsMaster() || {};
const isMaster = self.topology.lastIsMaster() || {};
return (
isMaster.maxWireVersion && isMaster.maxWireVersion >= 7 && self.options.startAtOperationTime
);
}

var buildChangeStreamAggregationCommand = function(self) {
const serverConfig = self.serverConfig;
const topology = self.topology;
const namespace = self.namespace;
const pipeline = self.pipeline;
const options = self.options;
Expand Down Expand Up @@ -339,62 +340,110 @@ var buildChangeStreamAggregationCommand = function(self) {
};

// Create and return the cursor
return serverConfig.cursor(cursorNamespace, command, cursorOptions);
return topology.cursor(cursorNamespace, command, cursorOptions);
};

// This method performs a basic server selection loop, satisfying the requirements of
// ChangeStream resumability until the new SDAM layer can be used.
const SELECTION_TIMEOUT = 30000;
function waitForTopologyConnected(topology, options, callback) {
setTimeout(() => {
if (options && options.start == null) options.start = process.hrtime();
const start = options.start || process.hrtime();
const timeout = options.timeout || SELECTION_TIMEOUT;
const readPreference = options.readPreference;

if (topology.isConnected({ readPreference })) return callback(null, null);
const hrElapsed = process.hrtime(start);
const elapsed = (hrElapsed[0] * 1e9 + hrElapsed[1]) / 1e6;
if (elapsed > timeout) return callback(new MongoError('Timed out waiting for connection'));
waitForTopologyConnected(topology, options, callback);
}, 3000); // this is an arbitrary wait time to allow SDAM to transition
}

// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
var processNewChange = function(self, err, change, callback) {
// Handle errors
if (err) {
// Handle resumable MongoNetworkErrors
if (isResumableError(err) && !self.attemptingResume) {
self.attemptingResume = true;

if (!(getResumeToken(self) || getStartAtOperationTime(self))) {
const startAtOperationTime = self.cursor.cursorState.operationTime;
self.options = Object.assign({ startAtOperationTime }, self.options);
function processNewChange(args) {
const changeStream = args.changeStream;
const error = args.error;
const change = args.change;
const callback = args.callback;
const eventEmitter = args.eventEmitter || false;
const topology = changeStream.topology;
const options = changeStream.cursor.options;

if (error) {
if (isResumableError(error) && !changeStream.attemptingResume) {
changeStream.attemptingResume = true;

if (!(getResumeToken(changeStream) || getStartAtOperationTime(changeStream))) {
const startAtOperationTime = changeStream.cursor.cursorState.operationTime;
changeStream.options = Object.assign({ startAtOperationTime }, changeStream.options);
}

if (callback) {
return self.cursor.close(function(closeErr) {
if (closeErr) {
return callback(err, null);
}
// stop listening to all events from old cursor
['data', 'close', 'end', 'error'].forEach(event =>
changeStream.cursor.removeAllListeners(event)
);

// close internal cursor, ignore errors
changeStream.cursor.close();

// attempt recreating the cursor
if (eventEmitter) {
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) return changeStream.emit('error', err);
changeStream.cursor = createChangeStreamCursor(changeStream);
});

return;
}

self.cursor = createChangeStreamCursor(self);
if (callback) {
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) return callback(err, null);

return self.next(callback);
changeStream.cursor = createChangeStreamCursor(changeStream);
changeStream.next(callback);
});

return;
}

return self.cursor
.close()
.then(() => (self.cursor = createChangeStreamCursor(self)))
.then(() => self.next());
return new Promise((resolve, reject) => {
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) return reject(err);
resolve();
});
})
.then(() => (changeStream.cursor = createChangeStreamCursor(changeStream)))
.then(() => changeStream.next());
}

if (typeof callback === 'function') return callback(err, null);
if (self.listenerCount('error')) return self.emit('error', err);
return self.promiseLibrary.reject(err);
if (eventEmitter) return changeStream.emit('error', error);
if (typeof callback === 'function') return callback(error, null);
return changeStream.promiseLibrary.reject(error);
}
self.attemptingResume = false;

changeStream.attemptingResume = false;

// Cache the resume token if it is present. If it is not present return an error.
if (!change || !change._id) {
var noResumeTokenError = new Error(
'A change stream document has been received that lacks a resume token (_id).'
);

if (eventEmitter) return changeStream.emit('error', noResumeTokenError);
if (typeof callback === 'function') return callback(noResumeTokenError, null);
if (self.listenerCount('error')) return self.emit('error', noResumeTokenError);
return self.promiseLibrary.reject(noResumeTokenError);
return changeStream.promiseLibrary.reject(noResumeTokenError);
}
self.resumeToken = change._id;

changeStream.resumeToken = change._id;

// Return the change
if (typeof callback === 'function') return callback(err, change);
if (self.listenerCount('change')) return self.emit('change', change);
return self.promiseLibrary.resolve(change);
};
if (eventEmitter) return changeStream.emit('change', change);
if (typeof callback === 'function') return callback(error, change);
return changeStream.promiseLibrary.resolve(change);
}

/**
* The callback format for results
Expand Down

0 comments on commit c43a34b

Please sign in to comment.