Skip to content

Commit

Permalink
feat: support operations passed to a Cursor or subclass
Browse files Browse the repository at this point in the history
This is piecemeal change towards the future we're looking for in
the cursor: no longer accepting `ns` and `cmd`, but rather just an
operation. This allows us to disambiguate options management for
the cursor, as well as reusing `executeOperation` for features
such as retryability and sessions management.
  • Loading branch information
mbroadst authored and daprahamian committed Aug 13, 2019
1 parent 3f9c0f5 commit b78bb89
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 12 deletions.
34 changes: 33 additions & 1 deletion lib/core/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const f = require('util').format;
const collationNotSupported = require('./utils').collationNotSupported;
const ReadPreference = require('./topologies/read_preference');
const isUnifiedTopology = require('./utils').isUnifiedTopology;
const executeOperation = require('../operations/execute_operation');

const BSON = retrieveBSON();
const Long = BSON.Long;
Expand Down Expand Up @@ -48,6 +49,13 @@ const Long = BSON.Long;
var Cursor = function(topology, ns, cmd, options) {
options = options || {};

if (typeof ns !== 'string') {
this.operation = ns;
ns = this.operation.ns.toString();
options = this.operation.options;
cmd = {};
}

// Cursor pool
this.pool = null;
// Cursor server
Expand Down Expand Up @@ -689,12 +697,36 @@ Cursor.prototype._initializeCursor = function(callback) {
done(null, result);
};

if (cursor.operation) {
executeOperation(cursor.topology, cursor.operation, (err, result, server) => {
if (err) {
done(err);
return;
}

cursor.server = server;
cursor.cursorState.init = true;

// NOTE: this is a special internal method for cloning a cursor, consider removing
if (cursor.cursorState.cursorId != null) {
return done();
}

queryCallback(err, result);
});

return;
}

// Very explicitly choose what is passed to selectServer
const serverSelectOptions = {};
if (cursor.cursorState.session) {
serverSelectOptions.session = cursor.cursorState.session;
}
if (cursor.options.readPreference) {

if (cursor.operation) {
serverSelectOptions.readPreference = cursor.operation.readPreference;
} else if (cursor.options.readPreference) {
serverSelectOptions.readPreference = cursor.options.readPreference;
}

Expand Down
44 changes: 34 additions & 10 deletions lib/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ function Cursor(topology, ns, cmd, options) {
const bson = topology.s.bson;
const topologyOptions = topology.s.options;

if (typeof ns !== 'string') {
this.operation = ns;
ns = this.operation.ns.toString();
options = this.operation.options;
cmd = {};
}

// Tailable cursor options
const numberOfRetries = options.numberOfRetries || 5;
const tailableRetryInterval = options.tailableRetryInterval || 500;
Expand Down Expand Up @@ -168,12 +175,14 @@ function Cursor(topology, ns, cmd, options) {
this.sortValue = this.s.cmd.sort;

// Get the batchSize
const batchSize =
cmd.cursor && cmd.cursor.batchSize
? cmd.cursor && cmd.cursor.batchSize
: options.cursor && options.cursor.batchSize
? options.cursor.batchSize
: 1000;
let batchSize = 1000;
if (cmd.cursor && cmd.cursor.batchSize) {
batchSize = cmd.cursor && cmd.cursor.batchSize;
} else if (options.cursor && options.cursor.batchSize) {
batchSize = options.cursor.batchSize;
} else if (typeof options.batchSize === 'number') {
batchSize = options.batchSize;
}

// Set the batchSize
this.setCursorBatchSize(batchSize);
Expand Down Expand Up @@ -224,10 +233,19 @@ for (let name in CoreCursor.prototype) {
}

Cursor.prototype._initializeCursor = function(callback) {
// implicitly create a session if one has not been provided
if (!this.s.explicitlyIgnoreSession && !this.s.session && this.s.topology.hasSessionSupport()) {
this.s.session = this.s.topology.startSession({ owner: this });
this.cursorState.session = this.s.session;
if (this.operation && this.operation.session != null) {
this.s.session = this.operation.session;
this.cursorState.session = this.operation.session;
} else {
// implicitly create a session if one has not been provided
if (!this.s.explicitlyIgnoreSession && !this.s.session && this.s.topology.hasSessionSupport()) {
this.s.session = this.s.topology.startSession({ owner: this });
this.cursorState.session = this.s.session;

if (this.operation) {
this.operation.session = this.s.session;
}
}
}

CoreCursor.prototype._initializeCursor.apply(this, [callback]);
Expand Down Expand Up @@ -1000,6 +1018,12 @@ Cursor.prototype.transformStream = function(options) {
* @return {Promise} returns Promise if no callback passed
*/
Cursor.prototype.explain = function(callback) {
if (this.operation) {
this.operation.explain = true;
executeOperation(this.s.topology, this.operation, callback);
return;
}

this.s.cmd.explain = true;

// Do we have a readConcern
Expand Down
3 changes: 2 additions & 1 deletion lib/operations/common_functions.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ function nextObject(cursor, callback) {
MongoError.create({ message: 'Cursor is closed', driver: true })
);
}
if (cursor.s.state === Cursor.INIT && cursor.s.cmd.sort) {

if (cursor.s.state === Cursor.INIT && cursor.s.cmd && cursor.s.cmd.sort) {
try {
cursor.s.cmd.sort = formattedOrderClause(cursor.s.cmd.sort);
} catch (err) {
Expand Down

0 comments on commit b78bb89

Please sign in to comment.