Skip to content

Commit

Permalink
feat(NODE-5197): add server monitoring mode
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Oct 20, 2023
1 parent 32b7176 commit 5b730ca
Show file tree
Hide file tree
Showing 19 changed files with 939 additions and 2,188 deletions.
12 changes: 12 additions & 0 deletions src/connection_string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
} from './mongo_logger';
import { ReadConcern, type ReadConcernLevel } from './read_concern';
import { ReadPreference, type ReadPreferenceMode } from './read_preference';
import { ServerMonitoringModes } from './sdam/monitor';
import type { TagSet } from './sdam/server_description';
import {
DEFAULT_PK_FACTORY,
Expand Down Expand Up @@ -1052,6 +1053,17 @@ export const OPTIONS = {
serializeFunctions: {
type: 'boolean'
},
serverMonitoringMode: {
default: 'auto',
transform({ values: [value] }) {
if (!ServerMonitoringModes.includes(value as string)) {
throw new MongoParseError(
'serverMonitoringMode must be one of `auto`, `poll`, or `stream`'
);
}
return value;
}
},
serverSelectionTimeoutMS: {
default: 30000,
type: 'uint'
Expand Down
4 changes: 3 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,9 @@ export type {
MonitorOptions,
MonitorPrivate,
RTTPinger,
RTTPingerOptions
RTTPingerOptions,
ServerMonitoringMode,
ServerMonitoringModes
} from './sdam/monitor';
export type { Server, ServerEvents, ServerOptions, ServerPrivate } from './sdam/server';
export type {
Expand Down
4 changes: 4 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { executeOperation } from './operations/execute_operation';
import { RunAdminCommandOperation } from './operations/run_command';
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
import { ReadPreference, type ReadPreferenceMode } from './read_preference';
import type { ServerMonitoringMode } from './sdam/monitor';
import type { TagSet } from './sdam/server_description';
import { readPreferenceServerSelector } from './sdam/server_selection';
import type { SrvPoller } from './sdam/srv_polling';
Expand Down Expand Up @@ -247,6 +248,8 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
proxyUsername?: string;
/** Configures a Socks5 proxy password when the proxy in proxyHost requires username/password authentication. */
proxyPassword?: string;
/** Instructs the driver monitors to use a specific monitoring mode */
serverMonitoringMode?: ServerMonitoringMode;

/** @internal */
srvPoller?: SrvPoller;
Expand Down Expand Up @@ -771,6 +774,7 @@ export interface MongoOptions
proxyPort?: number;
proxyUsername?: string;
proxyPassword?: string;
serverMonitoringMode: ServerMonitoringMode;

/** @internal */
connectionType?: typeof Connection;
Expand Down
45 changes: 40 additions & 5 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { clearTimeout, setTimeout } from 'timers';
import { type Document, Long } from '../bson';
import { connect } from '../cmap/connect';
import { Connection, type ConnectionOptions } from '../cmap/connection';
import { getFAASEnv } from '../cmap/handshake/client_metadata';
import { LEGACY_HELLO_COMMAND } from '../constants';
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
Expand Down Expand Up @@ -44,6 +45,11 @@ function isInCloseState(monitor: Monitor) {
return monitor.s.state === STATE_CLOSED || monitor.s.state === STATE_CLOSING;
}

/** @public */
export const ServerMonitoringModes = ['auto', 'poll', 'stream'];
/** @public */
export type ServerMonitoringMode = (typeof ServerMonitoringModes)[number];

/** @internal */
export interface MonitorPrivate {
state: string;
Expand All @@ -55,6 +61,7 @@ export interface MonitorOptions
connectTimeoutMS: number;
heartbeatFrequencyMS: number;
minHeartbeatFrequencyMS: number;
serverMonitoringMode: ServerMonitoringMode;
}

/** @public */
Expand All @@ -73,9 +80,16 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
s: MonitorPrivate;
address: string;
options: Readonly<
Pick<MonitorOptions, 'connectTimeoutMS' | 'heartbeatFrequencyMS' | 'minHeartbeatFrequencyMS'>
Pick<
MonitorOptions,
| 'connectTimeoutMS'
| 'heartbeatFrequencyMS'
| 'minHeartbeatFrequencyMS'
| 'serverMonitoringMode'
>
>;
connectOptions: ConnectionOptions;
isRunningInFaasEnv: boolean;
[kServer]: Server;
[kConnection]?: Connection;
[kCancellationToken]: CancellationToken;
Expand Down Expand Up @@ -103,8 +117,11 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
this.options = Object.freeze({
connectTimeoutMS: options.connectTimeoutMS ?? 10000,
heartbeatFrequencyMS: options.heartbeatFrequencyMS ?? 10000,
minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500
minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500,
serverMonitoringMode: options.serverMonitoringMode
});
console.log(getFAASEnv());
this.isRunningInFaasEnv = getFAASEnv() != null;

const cancellationToken = this[kCancellationToken];
// TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
Expand Down Expand Up @@ -207,10 +224,26 @@ function resetMonitorState(monitor: Monitor) {
monitor[kConnection] = undefined;
}

function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean {
// If we have no topology version we always poll no matter
// what the user provided.
if (topologyVersion == null) return false;

const serverMonitoringMode = monitor.options.serverMonitoringMode;
if (serverMonitoringMode === 'poll') return false;
if (serverMonitoringMode === 'stream') return true;

// If we are in auto mode, we need to figure out if we're in a FaaS
// environment or not and choose the appropriate mode.
if (monitor.isRunningInFaasEnv) return false;
return true;
}

function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
let start = now();
const topologyVersion = monitor[kServer].description.topologyVersion;
const isAwaitable = topologyVersion != null;
console.log('checkServer', topologyVersion);
const isAwaitable = useStreamingProtocol(monitor, topologyVersion);
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, isAwaitable)
Expand Down Expand Up @@ -286,7 +319,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
const duration =
isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start);

const awaited = isAwaitable && hello.topologyVersion != null;
console.log('command', topologyVersion, hello.topologyVersion, hello);
const awaited = useStreamingProtocol(monitor, hello.topologyVersion);
monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, awaited)
Expand Down Expand Up @@ -370,7 +404,8 @@ function monitorServer(monitor: Monitor) {
}

// if the check indicates streaming is supported, immediately reschedule monitoring
if (hello && hello.topologyVersion) {
console.log('checkServerCallback', hello?.topologyVersion);
if (useStreamingProtocol(monitor, hello?.topologyVersion)) {
setTimeout(() => {
if (!isInCloseState(monitor)) {
monitor[kMonitorId]?.wake();
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import {
TopologyDescriptionChangedEvent,
TopologyOpeningEvent
} from './events';
import type { ServerMonitoringMode } from './monitor';
import { Server, type ServerEvents, type ServerOptions } from './server';
import { compareTopologyVersion, ServerDescription } from './server_description';
import { readPreferenceServerSelector, type ServerSelector } from './server_selection';
Expand Down Expand Up @@ -143,6 +144,7 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
directConnection: boolean;
loadBalanced: boolean;
metadata: ClientMetadata;
serverMonitoringMode: ServerMonitoringMode;
/** MongoDB server API version */
serverApi?: ServerApi;
[featureFlag: symbol]: any;
Expand Down
2 changes: 1 addition & 1 deletion test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1823,7 +1823,7 @@ describe('Change Streams', function () {
});
});

describe('ChangeStream resumability', function () {
describe.only('ChangeStream resumability', function () {
let client: MongoClient;
let collection: Collection;
let changeStream: ChangeStream;
Expand Down
5 changes: 5 additions & 0 deletions test/lambda/mongodb/app.mjs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import * as assert from 'node:assert/strict';

import { MongoClient } from 'mongodb';

// Creates the client that is cached for all requests, subscribes to
Expand Down Expand Up @@ -30,18 +32,21 @@ mongoClient.on('commandFailed', (event) => {

mongoClient.on('serverHeartbeatStarted', (event) => {
console.log('serverHeartbeatStarted', event);
assert.strictEqual(event.awaited, false);
});

mongoClient.on('serverHeartbeatSucceeded', (event) => {
heartbeatCount++;
totalHeartbeatDuration += event.duration;
console.log('serverHeartbeatSucceeded', event);
assert.strictEqual(event.awaited, false);
});

mongoClient.on('serverHeartbeatFailed', (event) => {
heartbeatCount++;
totalHeartbeatDuration += event.duration;
console.log('serverHeartbeatFailed', event);
assert.strictEqual(event.awaited, false);
});

mongoClient.on('connectionCreated', (event) => {
Expand Down
Loading

0 comments on commit 5b730ca

Please sign in to comment.