From 7890e48d17c4cf42cb9419d6003c9ee003aedae1 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Fri, 8 Nov 2019 10:33:33 -0500 Subject: [PATCH] feat: introduce a modern `Connection` replacement for CMAP This introduces a new `Connection` type that is strictly callback based, for use with the forthcoming CMAP implementation. It is still an EventEmitter, but strictly for notification of command events, and coordinating events like reporting that the cluster time has been received. --- lib/core/cmap/connection.js | 210 +++++++++++++++++++++++ test/functional/cmap/connection_tests.js | 27 +++ 2 files changed, 237 insertions(+) create mode 100644 lib/core/cmap/connection.js create mode 100644 test/functional/cmap/connection_tests.js diff --git a/lib/core/cmap/connection.js b/lib/core/cmap/connection.js new file mode 100644 index 0000000000..3259d4191d --- /dev/null +++ b/lib/core/cmap/connection.js @@ -0,0 +1,210 @@ +'use strict'; + +const EventEmitter = require('events'); +const MessageStream = require('./message_stream'); +const MongoError = require('../error').MongoError; +const MongoWriteConcernError = require('../error').MongoWriteConcernError; +const wp = require('../wireprotocol'); +const apm = require('../connection/apm'); +const updateSessionFromResponse = require('../sessions').updateSessionFromResponse; + +const kStream = Symbol('stream'); +const kQueue = Symbol('queue'); +const kMessageStream = Symbol('messageStream'); + +class Connection extends EventEmitter { + constructor(stream, options) { + super(options); + + this.bson = options.bson; + this.description = { maxWireVersion: 5 }; + this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000; + this.monitorCommands = + typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false; + + // setup parser stream and message handling + this[kQueue] = new Map(); + this[kMessageStream] = new MessageStream(options); + this[kMessageStream].on('message', messageHandler(this)); + this[kStream] = stream; + stream.on('error', () => { + /* ignore errors, listen to `close` instead */ + }); + + stream.on('close', () => { + this[kQueue].forEach(op => op.callback(new MongoError('Connection closed'))); + this[kQueue].clear(); + + this.emit('close'); + }); + + // hook the message stream up to the passed in stream + stream.pipe(this[kMessageStream]); + this[kMessageStream].pipe(stream); + } + + // the `connect` method stores the result of the handshake ismaster on the connection + set ismaster(response) { + this.description = response; + } + + destroy(options, callback) { + if (typeof options === 'function') { + callback = options; + options = {}; + } + + options = Object.assign({ force: false }, options); + if (this[kStream] == null || this.destroyed) { + this.destroyed = true; + return; + } + + if (options.force) { + this[kStream].destroy(); + this.destroyed = true; + if (typeof callback === 'function') { + callback(null, null); + } + + return; + } + + this[kStream].end(err => { + this.destroyed = true; + if (typeof callback === 'function') { + callback(err, null); + } + }); + } + + command(ns, cmd, options, callback) { + // NOTE: The wire protocol methods will eventually be migrated to this class, but for now + // we need to pretend we _are_ a server. + const server = { + description: this.description, + s: { + bson: this.bson, + pool: { write: write.bind(this) } + } + }; + + wp.command(server, ns, cmd, options, callback); + } +} + +function messageHandler(conn) { + return function(message) { + // always emit the message, in case we are streaming + conn.emit('message', message); + if (!conn[kQueue].has(message.responseTo)) { + return; + } + + const operationDescription = conn[kQueue].get(message.responseTo); + conn[kQueue].delete(message.responseTo); + + const callback = operationDescription.cb; + if (operationDescription.socketTimeoutOverride) { + this[kStream].setSocketTimeout(this.socketTimeout); + } + + try { + // Pass in the entire description because it has BSON parsing options + message.parse(operationDescription); + } catch (err) { + callback(new MongoError(err)); + return; + } + + if (message.documents[0]) { + const document = message.documents[0]; + const session = operationDescription.session; + if (session) { + updateSessionFromResponse(session, document); + } + + if (document.$clusterTime) { + this.emit('clusterTimeReceived', document.$clusterTime); + } + + if (document.writeConcernError) { + callback(new MongoWriteConcernError(document.writeConcernError, document)); + return; + } + + if (document.ok === 0 || document.$err || document.errmsg || document.code) { + callback(new MongoError(document)); + return; + } + } + + callback(null, operationDescription.fullResult ? message : message.documents[0]); + }; +} + +// Not meant to be called directly, the wire protocol methods call this assuming it is a `Pool` instance +function write(command, options, callback) { + if (typeof options === 'function') { + callback = options; + } + + options = options || {}; + const operationDescription = { + requestId: command.requestId, + cb: callback, + fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false, + session: options.session, + + // For BSON parsing + promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true, + promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true, + promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false, + raw: typeof options.raw === 'boolean' ? options.raw : false, + + // NOTE: This property is set on the connection as part of `connect`, but should + // eventually live in the `StreamDescription` attached to this connection. + agreedCompressor: this.agreedCompressor + }; + + if (typeof options.socketTimeout === 'number') { + operationDescription.socketTimeoutOverride = true; + this[kStream].setSocketTimeout(options.socketTimeout); + } + + // if command monitoring is enabled we need to modify the callback here + if (this.monitorCommands) { + this.emit('commandStarted', new apm.CommandStartedEvent(this, command)); + + operationDescription.started = process.hrtime(); + operationDescription.cb = (err, reply) => { + if (err) { + this.emit( + 'commandFailed', + new apm.CommandFailedEvent(this, command, err, operationDescription.started) + ); + } else { + if (reply && reply.result && (reply.result.ok === 0 || reply.result.$err)) { + this.emit( + 'commandFailed', + new apm.CommandFailedEvent(this, command, reply.result, operationDescription.started) + ); + } else { + this.emit( + 'commandSucceeded', + new apm.CommandSucceededEvent(this, command, reply, operationDescription.started) + ); + } + } + + if (typeof cb === 'function') { + callback(err, reply); + } + }; + } + + this[kQueue].set(operationDescription.requestId, operationDescription); + this[kMessageStream].writeCommand(command, operationDescription); +} + +module.exports = Connection; diff --git a/test/functional/cmap/connection_tests.js b/test/functional/cmap/connection_tests.js new file mode 100644 index 0000000000..280340151f --- /dev/null +++ b/test/functional/cmap/connection_tests.js @@ -0,0 +1,27 @@ +'use strict'; + +const Connection = require('../../../lib/core/cmap/connection'); +const connect = require('../../../lib/core/connection/connect'); +const expect = require('chai').expect; +const BSON = require('bson'); + +describe('Connection', function() { + it('should execute a command against a server', function(done) { + const connectOptions = Object.assign( + { connectionType: Connection, bson: new BSON() }, + this.configuration.options + ); + + connect(connectOptions, (err, conn) => { + expect(err).to.not.exist; + + conn.command('admin.$cmd', { ismaster: 1 }, (err, ismaster) => { + expect(err).to.not.exist; + expect(ismaster).to.exist; + expect(ismaster.ok).to.equal(1); + + conn.destroy(done); + }); + }); + }); +});