Skip to content

Commit

Permalink
feat(connection): support exhaust behavior at the transport level
Browse files Browse the repository at this point in the history
This work acknowledges an `exhaustAllowed` option to enable exhaust
behavior on OP_MSG messags. It also tracks synthetic `getMore` ids
and uses the same callback provided for the orignal command for all
returned batches.

NODE-2438
  • Loading branch information
mbroadst committed Jan 27, 2020
1 parent 8388443 commit 9ccf268
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 1 deletion.
8 changes: 8 additions & 0 deletions lib/cmap/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,15 @@ function messageHandler(conn) {
}

const operationDescription = conn[kQueue].get(message.responseTo);

// SERVER-45775: For exhaust responses we should be able to use the same requestId to
// track response, however the server currently synthetically produces remote requests
// making the `responseTo` change on each response
conn[kQueue].delete(message.responseTo);
if (message.moreToCome) {
// requeue the callback for next synthetic request
conn[kQueue].set(message.requestId, operationDescription);
}

const callback = operationDescription.cb;
if (operationDescription.socketTimeoutOverride) {
Expand Down
3 changes: 2 additions & 1 deletion lib/core/connection/msg.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class Msg {
// flags
this.checksumPresent = false;
this.moreToCome = options.moreToCome || false;
this.exhaustAllowed = false;
this.exhaustAllowed =
typeof options.exhaustAllowed === 'boolean' ? options.exhaustAllowed : false;
}

toBin() {
Expand Down
57 changes: 57 additions & 0 deletions test/functional/cmap/connection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ const Connection = require('../../../lib/cmap/connection').Connection;
const connect = require('../../../lib/core/connection/connect');
const expect = require('chai').expect;
const BSON = require('bson');
const setupDatabase = require('../../functional/shared').setupDatabase;

describe('Connection', function() {
before(function() {
return setupDatabase(this.configuration);
});

it('should execute a command against a server', function(done) {
const connectOptions = Object.assign(
{ connectionType: Connection, bson: new BSON() },
Expand Down Expand Up @@ -70,4 +75,56 @@ describe('Connection', function() {
done();
});
});

it('should support calling back multiple times on exhaust commands', {
metadata: { requires: { mongodb: '>=4.2.0' } },
test: function(done) {
const ns = `${this.configuration.db}.$cmd`;
const connectOptions = Object.assign(
{ connectionType: Connection, bson: new BSON() },
this.configuration.options
);

connect(connectOptions, (err, conn) => {
expect(err).to.not.exist;
this.defer(_done => conn.destroy(_done));

const documents = Array.from(Array(10000), (_, idx) => ({
test: Math.floor(Math.random() * idx)
}));

conn.command(ns, { insert: 'test', documents }, (err, res) => {
expect(err).to.not.exist;
expect(res)
.nested.property('result.n')
.to.equal(documents.length);

let totalDocumentsRead = 0;
conn.command(ns, { find: 'test', batchSize: 100 }, (err, result) => {
expect(err).to.not.exist;
expect(result).nested.property('result.cursor').to.exist;
const cursor = result.result.cursor;
totalDocumentsRead += cursor.firstBatch.length;

conn.command(
ns,
{ getMore: cursor.id, collection: 'test', batchSize: 100 },
{ exhaustAllowed: true },
(err, result) => {
expect(err).to.not.exist;
expect(result).nested.property('result.cursor').to.exist;
const cursor = result.result.cursor;
totalDocumentsRead += cursor.nextBatch.length;

if (cursor.id === 0 || cursor.id.isZero()) {
expect(totalDocumentsRead).to.equal(documents.length);
done();
}
}
);
});
});
});
}
});
});

0 comments on commit 9ccf268

Please sign in to comment.