diff --git a/lib/cmap/connection.js b/lib/cmap/connection.js index bf715625b2..d4f9a54f31 100644 --- a/lib/cmap/connection.js +++ b/lib/cmap/connection.js @@ -58,38 +58,9 @@ class Connection extends EventEmitter { /* ignore errors, listen to `close` instead */ }); - stream.on('close', () => { - if (this.closed) { - return; - } - - this.closed = true; - this[kQueue].forEach(op => - op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`)) - ); - this[kQueue].clear(); - - this.emit('close'); - }); - - stream.on('timeout', () => { - if (this.closed) { - return; - } - - stream.destroy(); - this.closed = true; - this[kQueue].forEach(op => - op.cb( - new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, { - beforeHandshake: this[kIsMaster] == null - }) - ) - ); - - this[kQueue].clear(); - this.emit('close'); - }); + this[kMessageStream].on('error', error => this.handleIssue({ destroy: error })); + stream.on('close', () => this.handleIssue({ isClose: true })); + stream.on('timeout', () => this.handleIssue({ isTimeout: true, destroy: true })); // hook the message stream up to the passed in stream stream.pipe(this[kMessageStream]); @@ -132,6 +103,39 @@ class Connection extends EventEmitter { this[kLastUseTime] = now(); } + /** + * @param {{ isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error }} issue + */ + handleIssue(issue) { + if (this.closed) { + return; + } + + if (issue.destroy) { + this[kStream].destroy(typeof issue.destroy === 'boolean' ? undefined : issue.destroy); + } + + this.closed = true; + + for (const idAndOp of this[kQueue]) { + const op = idAndOp[1]; + if (issue.isTimeout) { + op.cb( + new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, { + beforeHandshake: !!this[kIsMaster] + }) + ); + } else if (issue.isClose) { + op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`)); + } else { + op.cb(typeof issue.destroy === 'boolean' ? undefined : issue.destroy); + } + } + + this[kQueue].clear(); + this.emit('close'); + } + destroy(options, callback) { if (typeof options === 'function') { callback = options; diff --git a/test/unit/sdam/topology.test.js b/test/unit/sdam/topology.test.js index 598274f997..1b48d00b1f 100644 --- a/test/unit/sdam/topology.test.js +++ b/test/unit/sdam/topology.test.js @@ -291,5 +291,35 @@ describe('Topology (unit)', function() { }); }); }); + + it('should encounter a server selection timeout on garbled server responses', function() { + const net = require('net'); + const server = net.createServer(); + const p = Promise.resolve(); + server.listen(0, 'localhost', 2, () => { + server.on('connection', c => c.on('data', () => c.write('garbage_data'))); + const address = server.address(); + const client = this.configuration.newClient( + `mongodb://${address.address}:${address.port}`, + { serverSelectionTimeoutMS: 1000 } + ); + p.then(() => + client + .connect() + .then(() => { + server.close(); + client.close(); + expect.fail('Should throw a server selection error!'); + }) + .catch(error => { + server.close(); + const closePromise = client.close(); + expect(error).to.exist; + return closePromise; + }) + ); + }); + return p; + }); }); });