diff --git a/lib/cmap/connection_pool.js b/lib/cmap/connection_pool.js index 9de52c4008..e384efc0b2 100644 --- a/lib/cmap/connection_pool.js +++ b/lib/cmap/connection_pool.js @@ -190,6 +190,10 @@ class ConnectionPool extends EventEmitter { return this[kConnections].length; } + get waitQueueSize() { + return this[kWaitQueue].length; + } + /** * Check a connection out of this pool. The connection will continue to be tracked, but no reference to it * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or @@ -287,7 +291,7 @@ class ConnectionPool extends EventEmitter { this[kCancellationToken].emit('cancel'); // drain the wait queue - while (this[kWaitQueue].length) { + while (this.waitQueueSize) { const waitQueueMember = this[kWaitQueue].pop(); clearTimeout(waitQueueMember.timer); if (!waitQueueMember[kCancelled]) { @@ -441,13 +445,17 @@ function processWaitQueue(pool) { return; } - while (pool[kWaitQueue].length && pool.availableConnectionCount) { + while (pool.waitQueueSize) { const waitQueueMember = pool[kWaitQueue].peekFront(); if (waitQueueMember[kCancelled]) { pool[kWaitQueue].shift(); continue; } + if (!pool.availableConnectionCount) { + break; + } + const connection = pool[kConnections].shift(); const isStale = connectionIsStale(pool, connection); const isIdle = connectionIsIdle(pool, connection); @@ -464,7 +472,7 @@ function processWaitQueue(pool) { } const maxPoolSize = pool.options.maxPoolSize; - if (pool[kWaitQueue].length && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) { + if (pool.waitQueueSize && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) { createConnection(pool, (err, connection) => { const waitQueueMember = pool[kWaitQueue].shift(); if (waitQueueMember == null) { diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 9c6ba3f6b5..df62258438 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -3,10 +3,11 @@ const util = require('util'); const loadSpecTests = require('../../spec').loadSpecTests; const ConnectionPool = require('../../../lib/cmap/connection_pool').ConnectionPool; +const WaitQueueTimeoutError = require('../../../lib/cmap/errors').WaitQueueTimeoutError; const EventEmitter = require('events').EventEmitter; const mock = require('mongodb-mock-server'); const cmapEvents = require('../../../lib/cmap/events'); - +const sinon = require('sinon'); const chai = require('chai'); chai.use(require('../../functional/spec-runner/matcher').default); const expect = chai.expect; @@ -113,6 +114,42 @@ describe('Connection Pool', function() { ); }); + it('should clear timed out wait queue members if no connections are available', function(done) { + server.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply(mock.DEFAULT_ISMASTER_36); + } + }); + + const pool = new ConnectionPool( + Object.assign({ bson: new BSON(), maxPoolSize: 1, waitQueueTimeoutMS: 200 }, server.address()) + ); + + pool.checkOut((err, conn) => { + expect(err).to.not.exist; + expect(conn).to.exist; + + pool.checkOut(err => { + expect(err).to.exist.and.be.instanceOf(WaitQueueTimeoutError); + + // We can only process the wait queue with `checkIn` and `checkOut`, so we + // force the pool here to think there are no available connections, even though + // we are checking the connection back in. This simulates a slow leak where + // incoming requests outpace the ability of the queue to fully process cancelled + // wait queue members + sinon.stub(pool, 'availableConnectionCount').get(() => 0); + pool.checkIn(conn); + + expect(pool) + .property('waitQueueSize') + .to.equal(0); + + done(); + }); + }); + }); + describe('withConnection', function() { it('should manage a connection for a successful operation', function(done) { server.setMessageHandler(request => {