Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent multiple simultaneous polling loops #208

Merged
merged 2 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 71 additions & 11 deletions src/PollingBlockTracker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@
async ({ blockTracker }) => {
jest.spyOn(console, 'error').mockImplementation(EMPTY_FUNCTION);

blockTracker.getLatestBlock();

Check warning on line 457 in src/PollingBlockTracker.test.ts

View workflow job for this annotation

GitHub Actions / Build, Lint, and Test (16.x)

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator

Check warning on line 457 in src/PollingBlockTracker.test.ts

View workflow job for this annotation

GitHub Actions / Build, Lint, and Test (18.x)

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator
await new Promise((resolve) => {
blockTracker.on('_waitingForNextIteration', resolve);
});
Expand Down Expand Up @@ -489,7 +489,7 @@
async ({ blockTracker }) => {
jest.spyOn(console, 'error').mockImplementation(EMPTY_FUNCTION);

blockTracker.getLatestBlock();

Check warning on line 492 in src/PollingBlockTracker.test.ts

View workflow job for this annotation

GitHub Actions / Build, Lint, and Test (16.x)

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator

Check warning on line 492 in src/PollingBlockTracker.test.ts

View workflow job for this annotation

GitHub Actions / Build, Lint, and Test (18.x)

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator
await new Promise((resolve) => {
blockTracker.on('_waitingForNextIteration', resolve);
});
Expand Down Expand Up @@ -524,7 +524,7 @@
async ({ blockTracker }) => {
jest.spyOn(console, 'error').mockImplementation(EMPTY_FUNCTION);

blockTracker.getLatestBlock();

Check warning on line 527 in src/PollingBlockTracker.test.ts

View workflow job for this annotation

GitHub Actions / Build, Lint, and Test (16.x)

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator

Check warning on line 527 in src/PollingBlockTracker.test.ts

View workflow job for this annotation

GitHub Actions / Build, Lint, and Test (18.x)

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator
await new Promise((resolve) => {
blockTracker.on('_waitingForNextIteration', resolve);
});
Expand Down Expand Up @@ -557,7 +557,7 @@
async ({ blockTracker }) => {
jest.spyOn(console, 'error').mockImplementation(EMPTY_FUNCTION);

blockTracker.getLatestBlock();

Check warning on line 560 in src/PollingBlockTracker.test.ts

View workflow job for this annotation

GitHub Actions / Build, Lint, and Test (16.x)

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator

Check warning on line 560 in src/PollingBlockTracker.test.ts

View workflow job for this annotation

GitHub Actions / Build, Lint, and Test (18.x)

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator
await new Promise((resolve) => {
blockTracker.on('_waitingForNextIteration', resolve);
});
Expand Down Expand Up @@ -660,7 +660,7 @@
},
},
async ({ blockTracker }) => {
blockTracker.checkForLatestBlock();

Check warning on line 663 in src/PollingBlockTracker.test.ts

View workflow job for this annotation

GitHub Actions / Build, Lint, and Test (16.x)

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator

Check warning on line 663 in src/PollingBlockTracker.test.ts

View workflow job for this annotation

GitHub Actions / Build, Lint, and Test (18.x)

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator
await new Promise((resolve) => {
blockTracker.on('latest', resolve);
});
Expand Down Expand Up @@ -2709,6 +2709,66 @@
},
);
});

it('should cancel polling timeout and prevent multiple synchronize loops', async () => {
const setTimeoutRecorder = recordCallsToSetTimeout();

const blockTrackerOptions = {
pollingInterval: 100,
blockResetDuration: 200,
};

await withPollingBlockTracker(
{
provider: {
stubs: [
{
methodName: 'eth_blockNumber',
response: {
result: '0x0',
},
},
{
methodName: 'eth_blockNumber',
response: {
result: '0x1',
},
},
{
methodName: 'eth_blockNumber',
response: {
result: '0x2',
},
},
],
},
blockTracker: blockTrackerOptions,
},
async ({ blockTracker }) => {
const listener = EMPTY_FUNCTION;

for (let i = 0; i < 3; i++) {
blockTracker.on('latest', listener);

expect(blockTracker.isRunning()).toBe(true);

await new Promise((resolve) => {
blockTracker.on('_waitingForNextIteration', resolve);
});

blockTracker[methodToRemoveListener]('latest', listener);

expect(blockTracker.isRunning()).toBe(false);
}

expect(
setTimeoutRecorder.findCallsMatchingDuration(
blockTrackerOptions.pollingInterval,
),
).toHaveLength(0);
},
);
});
});

describe('"sync"', () => {
Expand Down Expand Up @@ -2851,19 +2911,19 @@
blockTracker: blockTrackerOptions,
},
async ({ blockTracker }) => {
blockTracker.once('latest', EMPTY_FUNCTION);
const { promise, resolve: listener } = buildDeferred();

await new Promise((resolve) => {
blockTracker.on('_waitingForNextIteration', resolve);
});
blockTracker.once('latest', listener);

const nextIterationTimeout = setTimeoutRecorder.calls.find(
(call) => {
return call.duration === blockTrackerOptions.pollingInterval;
},
);
expect(nextIterationTimeout).toBeDefined();
expect(nextIterationTimeout?.timeout.hasRef()).toBe(false);
await promise;

// Once the listener has fired the block tracker should stop,
// meaning there should be no timeouts.
expect(
setTimeoutRecorder.findCallsMatchingDuration(
blockTrackerOptions.pollingInterval,
),
).toHaveLength(0);
},
);
});
Expand Down
110 changes: 57 additions & 53 deletions src/PollingBlockTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ export class PollingBlockTracker

private _blockResetTimeout?: ReturnType<typeof setTimeout>;

private _pollingTimeout?: ReturnType<typeof setTimeout>;

private readonly _provider: SafeEventEmitterProvider;

private readonly _pollingInterval: number;
Expand Down Expand Up @@ -87,7 +89,7 @@ export class PollingBlockTracker

async destroy() {
matthewwalsh0 marked this conversation as resolved.
Show resolved Hide resolved
this._cancelBlockResetTimeout();
await this._maybeEnd();
this._maybeEnd();
super.removeAllListeners();
}

Expand Down Expand Up @@ -154,24 +156,26 @@ export class PollingBlockTracker
this._maybeEnd();
}

private async _maybeStart(): Promise<void> {
private _maybeStart() {
if (this._isRunning) {
return;
}

this._isRunning = true;
// cancel setting latest block to stale
this._cancelBlockResetTimeout();
await this._start();
this._start();
matthewwalsh0 marked this conversation as resolved.
Show resolved Hide resolved
this.emit('_started');
}

private async _maybeEnd(): Promise<void> {
private _maybeEnd() {
if (!this._isRunning) {
return;
}

this._isRunning = false;
this._setupBlockResetTimeout();
await this._end();
this._end();
this.emit('_ended');
}

Expand Down Expand Up @@ -240,40 +244,12 @@ export class PollingBlockTracker
return await this.getLatestBlock();
}

private async _start(): Promise<void> {
this._synchronize();
private _start() {
this._updateAndQueue();
}

private async _end(): Promise<void> {
// No-op
}

private async _synchronize(): Promise<void> {
while (this._isRunning) {
try {
await this._updateLatestBlock();
const promise = timeout(
this._pollingInterval,
!this._keepEventLoopActive,
);
this.emit('_waitingForNextIteration');
await promise;
} catch (err: any) {
const newErr = new Error(
`PollingBlockTracker - encountered an error while attempting to update latest block:\n${
err.stack ?? err
}`,
);
try {
this.emit('error', newErr);
} catch (emitErr) {
console.error(newErr);
}
const promise = timeout(this._retryTimeout, !this._keepEventLoopActive);
this.emit('_waitingForNextIteration');
await promise;
}
}
private _end() {
this._clearPollingTimeout();
}

private async _updateLatestBlock(): Promise<void> {
Expand Down Expand Up @@ -303,25 +279,53 @@ export class PollingBlockTracker
}
return res.result;
}
}

/**
* Waits for the specified amount of time.
*
* @param duration - The amount of time in milliseconds.
* @param unref - Assuming this function is run in a Node context, governs
* whether Node should wait before the `setTimeout` has completed before ending
* the process (true for no, false for yes). Defaults to false.
* @returns A promise that can be used to wait.
*/
async function timeout(duration: number, unref: boolean) {
return new Promise((resolve) => {
const timeoutRef = setTimeout(resolve, duration);
// don't keep process open
if (timeoutRef.unref && unref) {
private async _updateAndQueue() {
matthewwalsh0 marked this conversation as resolved.
Show resolved Hide resolved
let interval = this._pollingInterval;

try {
await this._updateLatestBlock();
} catch (err: any) {
const newErr = new Error(
`PollingBlockTracker - encountered an error while attempting to update latest block:\n${
err.stack ?? err
}`,
);

try {
this.emit('error', newErr);
} catch (emitErr) {
matthewwalsh0 marked this conversation as resolved.
Show resolved Hide resolved
console.error(newErr);
}

interval = this._retryTimeout;
}

if (!this._isRunning) {
matthewwalsh0 marked this conversation as resolved.
Show resolved Hide resolved
return;
}

this._clearPollingTimeout();

const timeoutRef = setTimeout(() => {
this._updateAndQueue();
}, interval);

if (timeoutRef.unref && !this._keepEventLoopActive) {
timeoutRef.unref();
}
});

this._pollingTimeout = timeoutRef;

this.emit('_waitingForNextIteration');
}

_clearPollingTimeout() {
if (this._pollingTimeout) {
clearTimeout(this._pollingTimeout);
this._pollingTimeout = undefined;
}
}
}

/**
Expand Down
4 changes: 4 additions & 0 deletions tests/recordCallsToSetTimeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ class SetTimeoutRecorder {
});
}

findCallsMatchingDuration(duration: number): SetTimeoutCall[] {
return this.calls.filter((call) => call.duration === duration);
}

/**
* Registers a callback that will be called when `setTimeout` is called and
* the expected number of `setTimeout` calls (as specified via
Expand Down
Loading