diff --git a/lib/cmap/connection.js b/lib/cmap/connection.js index 8f0cd4df71..bec3acae6f 100644 --- a/lib/cmap/connection.js +++ b/lib/cmap/connection.js @@ -216,7 +216,15 @@ function messageHandler(conn) { } const operationDescription = conn[kQueue].get(message.responseTo); + + // SERVER-45775: For exhaust responses we should be able to use the same requestId to + // track response, however the server currently synthetically produces remote requests + // making the `responseTo` change on each response conn[kQueue].delete(message.responseTo); + if (message.moreToCome) { + // requeue the callback for next synthetic request + conn[kQueue].set(message.requestId, operationDescription); + } const callback = operationDescription.cb; if (operationDescription.socketTimeoutOverride) { diff --git a/lib/core/connection/msg.js b/lib/core/connection/msg.js index b662dafb6f..9f15a81114 100644 --- a/lib/core/connection/msg.js +++ b/lib/core/connection/msg.js @@ -72,7 +72,8 @@ class Msg { // flags this.checksumPresent = false; this.moreToCome = options.moreToCome || false; - this.exhaustAllowed = false; + this.exhaustAllowed = + typeof options.exhaustAllowed === 'boolean' ? options.exhaustAllowed : false; } toBin() { diff --git a/test/functional/cmap/connection.test.js b/test/functional/cmap/connection.test.js index 35bb90747b..7bfcf1d1b5 100644 --- a/test/functional/cmap/connection.test.js +++ b/test/functional/cmap/connection.test.js @@ -4,8 +4,13 @@ const Connection = require('../../../lib/cmap/connection').Connection; const connect = require('../../../lib/core/connection/connect'); const expect = require('chai').expect; const BSON = require('bson'); +const setupDatabase = require('../../functional/shared').setupDatabase; describe('Connection', function() { + before(function() { + return setupDatabase(this.configuration); + }); + it('should execute a command against a server', function(done) { const connectOptions = Object.assign( { connectionType: Connection, bson: new BSON() }, @@ -70,4 +75,56 @@ describe('Connection', function() { done(); }); }); + + it('should support calling back multiple times on exhaust commands', { + metadata: { requires: { mongodb: '>=4.2.0' } }, + test: function(done) { + const ns = `${this.configuration.db}.$cmd`; + const connectOptions = Object.assign( + { connectionType: Connection, bson: new BSON() }, + this.configuration.options + ); + + connect(connectOptions, (err, conn) => { + expect(err).to.not.exist; + this.defer(_done => conn.destroy(_done)); + + const documents = Array.from(Array(10000), (_, idx) => ({ + test: Math.floor(Math.random() * idx) + })); + + conn.command(ns, { insert: 'test', documents }, (err, res) => { + expect(err).to.not.exist; + expect(res) + .nested.property('result.n') + .to.equal(documents.length); + + let totalDocumentsRead = 0; + conn.command(ns, { find: 'test', batchSize: 100 }, (err, result) => { + expect(err).to.not.exist; + expect(result).nested.property('result.cursor').to.exist; + const cursor = result.result.cursor; + totalDocumentsRead += cursor.firstBatch.length; + + conn.command( + ns, + { getMore: cursor.id, collection: 'test', batchSize: 100 }, + { exhaustAllowed: true }, + (err, result) => { + expect(err).to.not.exist; + expect(result).nested.property('result.cursor').to.exist; + const cursor = result.result.cursor; + totalDocumentsRead += cursor.nextBatch.length; + + if (cursor.id === 0 || cursor.id.isZero()) { + expect(totalDocumentsRead).to.equal(documents.length); + done(); + } + } + ); + }); + }); + }); + } + }); });