Skip to content

Commit

Permalink
fix(NODE-4649): use SDAM handling for errors from min pool size popul…
Browse files Browse the repository at this point in the history
…ation (#3424)
  • Loading branch information
dariakp authored Sep 28, 2022
1 parent 0d3c02e commit ef3b55d
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 79 deletions.
19 changes: 18 additions & 1 deletion src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
import { MongoError, MongoInvalidArgumentError, MongoRuntimeError } from '../error';
import { Logger } from '../logger';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { Callback, eachAsync, makeCounter } from '../utils';
import { connect } from './connect';
import { Connection, ConnectionEvents, ConnectionOptions } from './connection';
Expand All @@ -38,6 +39,8 @@ import {
import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors';
import { ConnectionPoolMetrics } from './metrics';

/** @internal */
const kServer = Symbol('server');
/** @internal */
const kLogger = Symbol('logger');
/** @internal */
Expand Down Expand Up @@ -126,6 +129,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
/** @internal */
[kPoolState]: typeof PoolState[keyof typeof PoolState];
/** @internal */
[kServer]: Server;
/** @internal */
[kLogger]: Logger;
/** @internal */
[kConnections]: Denque<Connection>;
Expand Down Expand Up @@ -212,7 +217,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
static readonly CONNECTION_CHECKED_IN = CONNECTION_CHECKED_IN;

/** @internal */
constructor(options: ConnectionPoolOptions) {
constructor(server: Server, options: ConnectionPoolOptions) {
super();

this.options = Object.freeze({
Expand All @@ -234,6 +239,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

this[kPoolState] = PoolState.paused;
this[kServer] = server;
this[kLogger] = new Logger('ConnectionPool');
this[kConnections] = new Denque();
this[kPending] = 0;
Expand Down Expand Up @@ -304,6 +310,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return this[kServiceGenerations];
}

get serverError() {
return this[kServer].description.error;
}

/**
* Get the metrics information for the pool when a wait queue timeout occurs.
*/
Expand Down Expand Up @@ -587,6 +597,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
if (err || !connection) {
this[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
this[kPending]--;
this.emit(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, { id: connectOptions.id, serviceId: undefined }, 'error')
);
callback(err ?? new MongoRuntimeError('Connection creation failed without error'));
return;
}
Expand Down Expand Up @@ -651,6 +665,9 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
// connection permits because that potentially delays the availability of
// the connection to a checkout request
this.createConnection((err, connection) => {
if (err) {
this[kServer].handleError(err);
}
if (!err && connection) {
this[kConnections].push(connection);
process.nextTick(() => this.processWaitQueue());
Expand Down
6 changes: 5 additions & 1 deletion src/cmap/connection_pool_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ export class ConnectionClosedEvent extends ConnectionPoolMonitoringEvent {
serviceId?: ObjectId;

/** @internal */
constructor(pool: ConnectionPool, connection: Connection, reason: string) {
constructor(
pool: ConnectionPool,
connection: Pick<Connection, 'id' | 'serviceId'>,
reason: string
) {
super(pool);
this.connectionId = connection.id;
this.reason = reason || 'unknown';
Expand Down
6 changes: 3 additions & 3 deletions src/cmap/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ export class PoolClearedError extends MongoNetworkError {
address: string;

constructor(pool: ConnectionPool) {
// TODO(NODE-3135): pass in original pool-clearing error and use in message
// "failed with: <original error which cleared the pool>"
super(`Connection pool for ${pool.address} was cleared because another operation failed`);
super(
`Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"`
);
this.address = pool.address;
}

Expand Down
74 changes: 44 additions & 30 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
} from '../constants';
import type { AutoEncrypter } from '../deps';
import {
AnyError,
isNetworkErrorBeforeHandshake,
isNodeShuttingDownError,
isSDAMUnrecoverableError,
Expand Down Expand Up @@ -149,7 +150,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
logger: new Logger('Server'),
state: STATE_CLOSED,
topology,
pool: new ConnectionPool(poolOptions),
pool: new ConnectionPool(this, poolOptions),
operationCount: 0
};

Expand Down Expand Up @@ -368,6 +369,46 @@ export class Server extends TypedEventEmitter<ServerEvents> {
callback
);
}

/**
* Handle SDAM error
* @internal
*/
handleError(error: AnyError, connection?: Connection) {
if (!(error instanceof MongoError)) {
return;
}
if (error instanceof MongoNetworkError) {
if (!(error instanceof MongoNetworkTimeoutError) || isNetworkErrorBeforeHandshake(error)) {
// In load balanced mode we never mark the server as unknown and always
// clear for the specific service id.

if (!this.loadBalanced) {
error.addErrorLabel(MongoErrorLabel.ResetPool);
markServerUnknown(this, error);
} else if (connection) {
this.s.pool.clear(connection.serviceId);
}
}
} else {
if (isSDAMUnrecoverableError(error)) {
if (shouldHandleStateChangeError(this, error)) {
const shouldClearPool = maxWireVersion(this) <= 7 || isNodeShuttingDownError(error);
if (this.loadBalanced && connection && shouldClearPool) {
this.s.pool.clear(connection.serviceId);
}

if (!this.loadBalanced) {
if (shouldClearPool) {
error.addErrorLabel(MongoErrorLabel.ResetPool);
}
markServerUnknown(this, error);
process.nextTick(() => this.requestCheck());
}
}
}
}
}
}

function calculateRoundTripTime(oldRtt: number, duration: number): number {
Expand Down Expand Up @@ -482,18 +523,6 @@ function makeOperationHandler(
) {
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}

if (!(error instanceof MongoNetworkTimeoutError) || isNetworkErrorBeforeHandshake(error)) {
// In load balanced mode we never mark the server as unknown and always
// clear for the specific service id.

if (!server.loadBalanced) {
error.addErrorLabel(MongoErrorLabel.ResetPool);
markServerUnknown(server, error);
} else {
server.s.pool.clear(connection.serviceId);
}
}
} else {
if (
(isRetryableWritesEnabled(server.s.topology) || isTransactionCommand(cmd)) &&
Expand All @@ -502,23 +531,6 @@ function makeOperationHandler(
) {
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}

if (isSDAMUnrecoverableError(error)) {
if (shouldHandleStateChangeError(server, error)) {
const shouldClearPool = maxWireVersion(server) <= 7 || isNodeShuttingDownError(error);
if (server.loadBalanced && shouldClearPool) {
server.s.pool.clear(connection.serviceId);
}

if (!server.loadBalanced) {
if (shouldClearPool) {
error.addErrorLabel(MongoErrorLabel.ResetPool);
}
markServerUnknown(server, error);
process.nextTick(() => server.requestCheck());
}
}
}
}

if (
Expand All @@ -529,6 +541,8 @@ function makeOperationHandler(
session.unpin({ force: true });
}

server.handleError(error, connection);

return callback(error);
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,25 @@ const LB_SKIP_TESTS: SkipDescription[] = [
'pool clear halts background minPoolSize establishments',
'clearing a paused pool emits no events',
'after clear, cannot check out connections until pool ready',
'readying a ready pool emits no events'
'readying a ready pool emits no events',
'error during minPoolSize population clears pool'
].map(description => ({
description,
skipIfCondition: 'loadBalanced',
skipReason: 'cannot run against a load balanced environment'
}));

const POOL_PAUSED_SKIP_TESTS: SkipDescription[] = [
'error during minPoolSize population clears pool'
].map(description => ({
description,
skipIfCondition: 'always',
skipReason: 'TODO(NODE-3135): make connection pool SDAM aware'
}));

describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () {
const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling');

runCmapTestSuite(tests, {
testsToSkip: LB_SKIP_TESTS.concat(
[
{
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
skipIfCondition: 'always',
skipReason:
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
}
],
POOL_PAUSED_SKIP_TESTS
)
testsToSkip: LB_SKIP_TESTS.concat([
{
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
skipIfCondition: 'always',
skipReason:
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
}
])
});
});
52 changes: 35 additions & 17 deletions test/tools/cmap_spec_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { EventEmitter } from 'events';
import { clearTimeout, setTimeout } from 'timers';
import { promisify } from 'util';

import { Connection, HostAddress, MongoClient } from '../../src';
import { Connection, HostAddress, MongoClient, Server } from '../../src';
import { ConnectionPool, ConnectionPoolOptions } from '../../src/cmap/connection_pool';
import { CMAP_EVENTS } from '../../src/constants';
import { makeClientMetadata, shuffle } from '../../src/utils';
Expand Down Expand Up @@ -253,11 +253,13 @@ export class ThreadContext {
threads: Map<any, Thread> = new Map();
connections: Map<string, Connection> = new Map();
orphans: Set<Connection> = new Set();
poolEvents = [];
poolEvents: any[] = [];
poolEventsEventEmitter = new EventEmitter();

#poolOptions: Partial<ConnectionPoolOptions>;
#hostAddress: HostAddress;
#server: Server;
#originalServerPool: ConnectionPool;
#supportedOperations: ReturnType<typeof getTestOpDefinitions>;
#injectPoolStats = false;

Expand All @@ -267,12 +269,14 @@ export class ThreadContext {
* @param poolOptions - Allows the test to pass in extra options to the pool not specified by the spec test definition, such as the environment-dependent "loadBalanced"
*/
constructor(
server: Server,
hostAddress: HostAddress,
poolOptions: Partial<ConnectionPoolOptions> = {},
contextOptions: { injectPoolStats: boolean }
) {
this.#poolOptions = poolOptions;
this.#hostAddress = hostAddress;
this.#server = server;
this.#supportedOperations = getTestOpDefinitions(this);
this.#injectPoolStats = contextOptions.injectPoolStats;
}
Expand All @@ -292,11 +296,13 @@ export class ThreadContext {
}

createPool(options) {
this.pool = new ConnectionPool({
this.pool = new ConnectionPool(this.#server, {
...this.#poolOptions,
...options,
hostAddress: this.#hostAddress
});
this.#originalServerPool = this.#server.s.pool;
this.#server.s.pool = this.pool;
ALL_POOL_EVENTS.forEach(eventName => {
this.pool.on(eventName, event => {
if (this.#injectPoolStats) {
Expand All @@ -312,6 +318,7 @@ export class ThreadContext {
}

closePool() {
this.#server.s.pool = this.#originalServerPool;
return new Promise(resolve => {
ALL_POOL_EVENTS.forEach(ev => this.pool.removeAllListeners(ev));
this.pool.close(resolve);
Expand Down Expand Up @@ -438,7 +445,10 @@ export function runCmapTestSuite(
) {
for (const test of tests) {
describe(test.name, function () {
let hostAddress: HostAddress, threadContext: ThreadContext, client: MongoClient;
let hostAddress: HostAddress,
server: Server,
threadContext: ThreadContext,
client: MongoClient;

beforeEach(async function () {
let utilClient: MongoClient;
Expand Down Expand Up @@ -479,25 +489,33 @@ export function runCmapTestSuite(
}

try {
const serverMap = utilClient.topology.s.description.servers;
const hosts = shuffle(serverMap.keys());
const serverDescriptionMap = utilClient.topology?.s.description.servers;
const hosts = shuffle(serverDescriptionMap.keys());
const selectedHostUri = hosts[0];
hostAddress = serverMap.get(selectedHostUri).hostAddress;
hostAddress = serverDescriptionMap.get(selectedHostUri).hostAddress;

client = this.configuration.newClient(
`mongodb://${hostAddress}/${
this.configuration.isLoadBalanced ? '?loadBalanced=true' : '?directConnection=true'
}`
);
await client.connect();
if (test.failPoint) {
await client.db('admin').command(test.failPoint);
}

const serverMap = client.topology?.s.servers;
server = serverMap?.get(selectedHostUri);
if (!server) {
throw new Error('Failed to retrieve server for test');
}

threadContext = new ThreadContext(
server,
hostAddress,
this.configuration.isLoadBalanced ? { loadBalanced: true } : {},
{ injectPoolStats: !!options?.injectPoolStats }
);

if (test.failPoint) {
client = this.configuration.newClient(
`mongodb://${hostAddress}/${
this.configuration.isLoadBalanced ? '?loadBalanced=true' : '?directConnection=true'
}`
);
await client.connect();
await client.db('admin').command(test.failPoint);
}
} finally {
await utilClient.close();
}
Expand Down
Loading

0 comments on commit ef3b55d

Please sign in to comment.