diff --git a/lib/cursor.js b/lib/cursor.js index 07035a526..b3d87f75e 100644 --- a/lib/cursor.js +++ b/lib/cursor.js @@ -171,6 +171,27 @@ Cursor.prototype.cursorSkip = function() { return this.cursorState.skip; }; +Cursor.prototype._endSession = function(options, callback) { + if (typeof options === 'function') { + callback = options; + options = {}; + } + options = options || {}; + + const session = this.cursorState.session; + + if (session && (options.force || session.owner === this)) { + this.cursorState.session = undefined; + session.endSession(callback); + return true; + } + + if (callback) { + callback(); + } + return false; +}; + // // Handle callback (including any exceptions thrown) var handleCallback = function(callback, err, result) { @@ -475,15 +496,11 @@ Cursor.prototype.rewind = function() { */ var isConnectionDead = function(self, callback) { if (self.pool && self.pool.isDestroyed()) { - self.cursorState.notified = true; self.cursorState.killed = true; - self.cursorState.documents = []; - self.cursorState.cursorIndex = 0; - callback( - new MongoNetworkError( - f('connection to host %s:%s was destroyed', self.pool.host, self.pool.port) - ) + const err = new MongoNetworkError( + f('connection to host %s:%s was destroyed', self.pool.host, self.pool.port) ); + _setCursorNotifiedImpl(self, () => callback(err)); return true; } @@ -496,11 +513,8 @@ var isConnectionDead = function(self, callback) { var isCursorDeadButNotkilled = function(self, callback) { // Cursor is dead but not marked killed, return null if (self.cursorState.dead && !self.cursorState.killed) { - self.cursorState.notified = true; self.cursorState.killed = true; - self.cursorState.documents = []; - self.cursorState.cursorIndex = 0; - handleCallback(callback, null, null); + setCursorNotified(self, callback); return true; } @@ -524,10 +538,7 @@ var isCursorDeadAndKilled = function(self, callback) { */ var isCursorKilled = function(self, callback) { if (self.cursorState.killed) { - self.cursorState.notified = true; - self.cursorState.documents = []; - self.cursorState.cursorIndex = 0; - handleCallback(callback, null, null); + setCursorNotified(self, callback); return true; } @@ -539,20 +550,24 @@ var isCursorKilled = function(self, callback) { */ var setCursorDeadAndNotified = function(self, callback) { self.cursorState.dead = true; - self.cursorState.notified = true; - self.cursorState.documents = []; - self.cursorState.cursorIndex = 0; - handleCallback(callback, null, null); + setCursorNotified(self, callback); }; /** * Mark cursor as being notified */ var setCursorNotified = function(self, callback) { + _setCursorNotifiedImpl(self, () => handleCallback(callback, null, null)); +}; + +var _setCursorNotifiedImpl = function(self, callback) { self.cursorState.notified = true; self.cursorState.documents = []; self.cursorState.cursorIndex = 0; - handleCallback(callback, null, null); + if (self._endSession) { + return self._endSession(undefined, () => callback()); + } + return callback(); }; var nextFunction = function(self, callback) { @@ -654,6 +669,10 @@ var nextFunction = function(self, callback) { self._find(function(err) { if (err) return handleCallback(callback, err, null); + if (self.cursorState.cursorId && self.cursorState.cursorId.isZero() && self._endSession) { + self._endSession(); + } + if ( self.cursorState.documents.length === 0 && self.cursorState.cursorId && @@ -696,6 +715,10 @@ var nextFunction = function(self, callback) { self._getmore(function(err, doc, connection) { if (err) return handleCallback(callback, err); + if (self.cursorState.cursorId && self.cursorState.cursorId.isZero() && self._endSession) { + self._endSession(); + } + // Save the returned connection to ensure all getMore's fire over the same connection self.connection = connection; diff --git a/lib/sessions.js b/lib/sessions.js index 110add8e7..8d8920708 100644 --- a/lib/sessions.js +++ b/lib/sessions.js @@ -39,6 +39,9 @@ class ClientSession extends EventEmitter { } this.operationTime = null; + + this.explicit = !!options.explicit; + this.owner = options.owner; } /**