Skip to content

Commit

Permalink
test(NODE-5197): move started event
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Nov 6, 2023
1 parent bd8f0fa commit d53c18d
Showing 1 changed file with 16 additions and 20 deletions.
36 changes: 16 additions & 20 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500,
serverMonitoringMode: options.serverMonitoringMode
});
console.log(getFAASEnv());
this.isRunningInFaasEnv = getFAASEnv() != null;

const cancellationToken = this[kCancellationToken];
Expand Down Expand Up @@ -241,15 +240,8 @@ function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion

function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
let start = now();
const topologyVersion = monitor[kServer].description.topologyVersion;
console.log('checkServer', topologyVersion);
const isAwaitable = useStreamingProtocol(monitor, topologyVersion);
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, isAwaitable)
);

function failureHandler(err: Error) {

function failureHandler(err: Error, isAwaitable: boolean) {
monitor[kConnection]?.destroy({ force: true });
monitor[kConnection] = undefined;

Expand Down Expand Up @@ -281,6 +273,13 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
const connectTimeoutMS = monitor.options.connectTimeoutMS;
const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS;

const topologyVersion = monitor[kServer].description.topologyVersion;
const isAwaitable = useStreamingProtocol(monitor, topologyVersion);
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, isAwaitable)
);

const cmd = {
[serverApi?.version || helloOk ? 'hello' : LEGACY_HELLO_COMMAND]: 1,
...(isAwaitable && topologyVersion
Expand All @@ -307,7 +306,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {

connection.command(ns('admin.$cmd'), cmd, options, (err, hello) => {
if (err) {
return failureHandler(err);
return failureHandler(err, isAwaitable);
}

if (!('isWritablePrimary' in hello)) {
Expand All @@ -319,16 +318,14 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
const duration =
isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start);

console.log('command', topologyVersion, hello.topologyVersion, hello);
const awaited = useStreamingProtocol(monitor, hello.topologyVersion);
// I think it's this.
//const awaited = useStreamingProtocol(monitor, hello.topologyVersion);
monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, awaited)
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable)
);

// if we are using the streaming protocol then we immediately issue another `started`
// event, otherwise the "check" is complete and return to the main monitor loop
if (awaited) {
if (isAwaitable) {
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, true)
Expand All @@ -350,7 +347,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
if (err) {
monitor[kConnection] = undefined;

failureHandler(err);
failureHandler(err, false);
return;
}

Expand All @@ -371,7 +368,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
monitor.address,
calculateDurationInMs(start),
conn.hello,
false
useStreamingProtocol(monitor, conn.hello?.topologyVersion)
)
);

Expand Down Expand Up @@ -404,7 +401,6 @@ function monitorServer(monitor: Monitor) {
}

// if the check indicates streaming is supported, immediately reschedule monitoring
console.log('checkServerCallback', hello?.topologyVersion);
if (useStreamingProtocol(monitor, hello?.topologyVersion)) {
setTimeout(() => {
if (!isInCloseState(monitor)) {
Expand Down

0 comments on commit d53c18d

Please sign in to comment.