-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: introduce a modern
Connection
replacement for CMAP
This introduces a new `Connection` type that is strictly callback based, for use with the forthcoming CMAP implementation. It is still an EventEmitter, but strictly for notification of command events, and coordinating events like reporting that the cluster time has been received.
- Loading branch information
Showing
2 changed files
with
237 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,210 @@ | ||
'use strict'; | ||
|
||
const EventEmitter = require('events'); | ||
const MessageStream = require('./message_stream'); | ||
const MongoError = require('../error').MongoError; | ||
const MongoWriteConcernError = require('../error').MongoWriteConcernError; | ||
const wp = require('../wireprotocol'); | ||
const apm = require('../connection/apm'); | ||
const updateSessionFromResponse = require('../sessions').updateSessionFromResponse; | ||
|
||
const kStream = Symbol('stream'); | ||
const kQueue = Symbol('queue'); | ||
const kMessageStream = Symbol('messageStream'); | ||
|
||
class Connection extends EventEmitter { | ||
constructor(stream, options) { | ||
super(options); | ||
|
||
this.bson = options.bson; | ||
this.description = { maxWireVersion: 5 }; | ||
this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000; | ||
this.monitorCommands = | ||
typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false; | ||
|
||
// setup parser stream and message handling | ||
this[kQueue] = new Map(); | ||
this[kMessageStream] = new MessageStream(options); | ||
this[kMessageStream].on('message', messageHandler(this)); | ||
this[kStream] = stream; | ||
stream.on('error', () => { | ||
/* ignore errors, listen to `close` instead */ | ||
}); | ||
|
||
stream.on('close', () => { | ||
this[kQueue].forEach(op => op.callback(new MongoError('Connection closed'))); | ||
this[kQueue].clear(); | ||
|
||
this.emit('close'); | ||
}); | ||
|
||
// hook the message stream up to the passed in stream | ||
stream.pipe(this[kMessageStream]); | ||
this[kMessageStream].pipe(stream); | ||
} | ||
|
||
// the `connect` method stores the result of the handshake ismaster on the connection | ||
set ismaster(response) { | ||
this.description = response; | ||
} | ||
|
||
destroy(options, callback) { | ||
if (typeof options === 'function') { | ||
callback = options; | ||
options = {}; | ||
} | ||
|
||
options = Object.assign({ force: false }, options); | ||
if (this[kStream] == null || this.destroyed) { | ||
this.destroyed = true; | ||
return; | ||
} | ||
|
||
if (options.force) { | ||
this[kStream].destroy(); | ||
this.destroyed = true; | ||
if (typeof callback === 'function') { | ||
callback(null, null); | ||
} | ||
|
||
return; | ||
} | ||
|
||
this[kStream].end(err => { | ||
this.destroyed = true; | ||
if (typeof callback === 'function') { | ||
callback(err, null); | ||
} | ||
}); | ||
} | ||
|
||
command(ns, cmd, options, callback) { | ||
// NOTE: The wire protocol methods will eventually be migrated to this class, but for now | ||
// we need to pretend we _are_ a server. | ||
const server = { | ||
description: this.description, | ||
s: { | ||
bson: this.bson, | ||
pool: { write: write.bind(this) } | ||
} | ||
}; | ||
|
||
wp.command(server, ns, cmd, options, callback); | ||
} | ||
} | ||
|
||
function messageHandler(conn) { | ||
return function(message) { | ||
// always emit the message, in case we are streaming | ||
conn.emit('message', message); | ||
if (!conn[kQueue].has(message.responseTo)) { | ||
return; | ||
} | ||
|
||
const operationDescription = conn[kQueue].get(message.responseTo); | ||
conn[kQueue].delete(message.responseTo); | ||
|
||
const callback = operationDescription.cb; | ||
if (operationDescription.socketTimeoutOverride) { | ||
this[kStream].setSocketTimeout(this.socketTimeout); | ||
} | ||
|
||
try { | ||
// Pass in the entire description because it has BSON parsing options | ||
message.parse(operationDescription); | ||
} catch (err) { | ||
callback(new MongoError(err)); | ||
return; | ||
} | ||
|
||
if (message.documents[0]) { | ||
const document = message.documents[0]; | ||
const session = operationDescription.session; | ||
if (session) { | ||
updateSessionFromResponse(session, document); | ||
} | ||
|
||
if (document.$clusterTime) { | ||
this.emit('clusterTimeReceived', document.$clusterTime); | ||
} | ||
|
||
if (document.writeConcernError) { | ||
callback(new MongoWriteConcernError(document.writeConcernError, document)); | ||
return; | ||
} | ||
|
||
if (document.ok === 0 || document.$err || document.errmsg || document.code) { | ||
callback(new MongoError(document)); | ||
return; | ||
} | ||
} | ||
|
||
callback(null, operationDescription.fullResult ? message : message.documents[0]); | ||
}; | ||
} | ||
|
||
// Not meant to be called directly, the wire protocol methods call this assuming it is a `Pool` instance | ||
function write(command, options, callback) { | ||
if (typeof options === 'function') { | ||
callback = options; | ||
} | ||
|
||
options = options || {}; | ||
const operationDescription = { | ||
requestId: command.requestId, | ||
cb: callback, | ||
fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false, | ||
session: options.session, | ||
|
||
// For BSON parsing | ||
promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true, | ||
promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true, | ||
promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false, | ||
raw: typeof options.raw === 'boolean' ? options.raw : false, | ||
|
||
// NOTE: This property is set on the connection as part of `connect`, but should | ||
// eventually live in the `StreamDescription` attached to this connection. | ||
agreedCompressor: this.agreedCompressor | ||
}; | ||
|
||
if (typeof options.socketTimeout === 'number') { | ||
operationDescription.socketTimeoutOverride = true; | ||
this[kStream].setSocketTimeout(options.socketTimeout); | ||
} | ||
|
||
// if command monitoring is enabled we need to modify the callback here | ||
if (this.monitorCommands) { | ||
this.emit('commandStarted', new apm.CommandStartedEvent(this, command)); | ||
|
||
operationDescription.started = process.hrtime(); | ||
operationDescription.cb = (err, reply) => { | ||
if (err) { | ||
this.emit( | ||
'commandFailed', | ||
new apm.CommandFailedEvent(this, command, err, operationDescription.started) | ||
); | ||
} else { | ||
if (reply && reply.result && (reply.result.ok === 0 || reply.result.$err)) { | ||
this.emit( | ||
'commandFailed', | ||
new apm.CommandFailedEvent(this, command, reply.result, operationDescription.started) | ||
); | ||
} else { | ||
this.emit( | ||
'commandSucceeded', | ||
new apm.CommandSucceededEvent(this, command, reply, operationDescription.started) | ||
); | ||
} | ||
} | ||
|
||
if (typeof cb === 'function') { | ||
callback(err, reply); | ||
} | ||
}; | ||
} | ||
|
||
this[kQueue].set(operationDescription.requestId, operationDescription); | ||
this[kMessageStream].writeCommand(command, operationDescription); | ||
} | ||
|
||
module.exports = Connection; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
'use strict'; | ||
|
||
const Connection = require('../../../lib/core/cmap/connection'); | ||
const connect = require('../../../lib/core/connection/connect'); | ||
const expect = require('chai').expect; | ||
const BSON = require('bson'); | ||
|
||
describe('Connection', function() { | ||
it('should execute a command against a server', function(done) { | ||
const connectOptions = Object.assign( | ||
{ connectionType: Connection, bson: new BSON() }, | ||
this.configuration.options | ||
); | ||
|
||
connect(connectOptions, (err, conn) => { | ||
expect(err).to.not.exist; | ||
|
||
conn.command('admin.$cmd', { ismaster: 1 }, (err, ismaster) => { | ||
expect(err).to.not.exist; | ||
expect(ismaster).to.exist; | ||
expect(ismaster.ok).to.equal(1); | ||
|
||
conn.destroy(done); | ||
}); | ||
}); | ||
}); | ||
}); |