Skip to content

Commit

Permalink
feat(util-waiter): add createWaiter() (#1759)
Browse files Browse the repository at this point in the history
* feat(util-waiter): add create waiter

Refactor the original waiter util:
1. expose a createWaiter() only to clients in order to reduce the
duplicated code-gen
2. fix infinite loop in job poller
3. fix potential number overflow

* fix: address feedbacks

* feat(util-waiter): merge waiter options with client

* fix(util-waiter): use timestamp to determine maxWaittime rather than delay sum

* fix(util-waiter): remove maxWaitTime promise

The racing maxWaitTime will set a timeout that would eventually
hang the user's process. So remove it. Instead, we break the poller
promise if the totol wait time get close to the maxWaitTime config.
  • Loading branch information
AllanZhengYP authored Dec 14, 2020
1 parent 6f73e9b commit 3d6eb2d
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 111 deletions.
1 change: 1 addition & 0 deletions packages/util-waiter/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"description": "Shared utilities for client waiters for the AWS SDK",
"dependencies": {
"@aws-sdk/abort-controller": "1.0.0-rc.8",
"@aws-sdk/types": "1.0.0-rc.8",
"tslib": "^1.8.0"
},
"devDependencies": {
Expand Down
85 changes: 85 additions & 0 deletions packages/util-waiter/src/createWaiter.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { AbortController } from "@aws-sdk/abort-controller";

import { ResolvedWaiterOptions, WaiterState } from "./waiter";

const mockValidate = jest.fn();
jest.mock("./utils/validate", () => ({
validateWaiterOptions: mockValidate,
}));

jest.useFakeTimers();

import { createWaiter } from "./createWaiter";

describe("createWaiter", () => {
beforeEach(() => {
jest.clearAllTimers();
jest.clearAllMocks();
});

const minimalWaiterConfig = {
minDelay: 2,
maxDelay: 120,
maxWaitTime: 9999,
client: "client",
} as ResolvedWaiterOptions<any>;
const input = "input";

const abortedState = {
state: WaiterState.ABORTED,
};
const failureState = {
state: WaiterState.FAILURE,
};
const retryState = {
state: WaiterState.RETRY,
};
const successState = {
state: WaiterState.SUCCESS,
};

it("should abort when abortController is signalled", async () => {
const abortController = new AbortController();
const mockAcceptorChecks = jest.fn().mockResolvedValue(retryState);
const statusPromise = createWaiter(
{
...minimalWaiterConfig,
maxWaitTime: 20,
abortController,
},
input,
mockAcceptorChecks
);
jest.advanceTimersByTime(10 * 1000);
abortController.abort(); // Abort before maxWaitTime(20s);
expect(await statusPromise).toEqual(abortedState);
});

it("should success when acceptor checker returns seccess", async () => {
const mockAcceptorChecks = jest.fn().mockResolvedValue(successState);
const statusPromise = createWaiter(
{
...minimalWaiterConfig,
maxWaitTime: 20,
},
input,
mockAcceptorChecks
);
jest.advanceTimersByTime(minimalWaiterConfig.minDelay * 1000);
expect(await statusPromise).toEqual(successState);
});

it("should fail when acceptor checker returns failure", async () => {
const mockAcceptorChecks = jest.fn().mockResolvedValue(failureState);
const statusPromise = createWaiter(
{
...minimalWaiterConfig,
maxWaitTime: 20,
},
input,
mockAcceptorChecks
);
jest.advanceTimersByTime(minimalWaiterConfig.minDelay * 1000);
expect(await statusPromise).toEqual(failureState);
});
});
43 changes: 43 additions & 0 deletions packages/util-waiter/src/createWaiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { AbortSignal } from "@aws-sdk/types";

import { runPolling } from "./poller";
import { sleep, validateWaiterOptions } from "./utils";
import { SmithyClient, WaiterOptions, WaiterResult, waiterServiceDefaults, WaiterState } from "./waiter";

const waiterTimeout = async (seconds: number): Promise<WaiterResult> => {
await sleep(seconds);
return { state: WaiterState.TIMEOUT };
};

const abortTimeout = async (abortSignal: AbortSignal): Promise<WaiterResult> => {
return new Promise((resolve) => {
abortSignal.onabort = () => resolve({ state: WaiterState.ABORTED });
});
};

/**
* Create a waiter promise that only resolves when:
* 1. Abort controller is signaled
* 2. Max wait time is reached
* 3. `acceptorChecks` succeeds, or fails
* Otherwise, it invokes `acceptorChecks` with exponential-backoff delay.
*
* @internal
*/
export const createWaiter = async <Client extends SmithyClient, Input>(
options: WaiterOptions<Client>,
input: Input,
acceptorChecks: (client: Client, input: Input) => Promise<WaiterResult>
): Promise<WaiterResult> => {
const params = {
...waiterServiceDefaults,
...options,
};
validateWaiterOptions(params);

const exitConditions = [runPolling<Client, Input>(params, input, acceptorChecks)];
if (options.abortController) {
exitConditions.push(abortTimeout(options.abortController.signal));
}
return Promise.race(exitConditions);
};
6 changes: 1 addition & 5 deletions packages/util-waiter/src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ import * as exported from "./index";

describe("Waiter util module exports", () => {
it("should export the proper functions", () => {
expect(exported.sleep).toBeDefined();
expect(exported.waiterTimeout).toBeDefined();
expect(exported.abortTimeout).toBeDefined();
expect(exported.validateWaiterOptions).toBeDefined();
expect(exported.runPolling).toBeDefined();
expect(exported.createWaiter).toBeDefined();
});
});
4 changes: 1 addition & 3 deletions packages/util-waiter/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
export * from "./utils/validate";
export * from "./utils/sleep";
export * from "./poller";
export * from "./createWaiter";
export * from "./waiter";
82 changes: 58 additions & 24 deletions packages/util-waiter/src/poller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { AbortController } from "@aws-sdk/abort-controller";

import { runPolling } from "./poller";
import { sleep } from "./utils/sleep";
import { WaiterState } from "./waiter";
import { ResolvedWaiterOptions, WaiterState } from "./waiter";

jest.mock("./utils/sleep");

Expand All @@ -9,9 +11,12 @@ describe(runPolling.name, () => {
minDelay: 2,
maxDelay: 30,
maxWaitTime: 99999,
};
const client = "mockClient";
client: "mockClient",
} as ResolvedWaiterOptions<any>;
const input = "mockInput";
const abortedState = {
state: WaiterState.ABORTED,
};
const failureState = {
state: WaiterState.FAILURE,
};
Expand All @@ -21,6 +26,9 @@ describe(runPolling.name, () => {
const retryState = {
state: WaiterState.RETRY,
};
const timeoutState = {
state: WaiterState.TIMEOUT,
};

let mockAcceptorChecks;

Expand All @@ -36,30 +44,20 @@ describe(runPolling.name, () => {

it("should returns state in case of failure", async () => {
mockAcceptorChecks = jest.fn().mockResolvedValueOnce(failureState);
await expect(runPolling<string, string>(config, client, input, mockAcceptorChecks)).resolves.toStrictEqual(
failureState
);
await expect(runPolling(config, input, mockAcceptorChecks)).resolves.toStrictEqual(failureState);

expect(mockAcceptorChecks).toHaveBeenCalled();
expect(mockAcceptorChecks).toHaveBeenCalledTimes(1);
expect(mockAcceptorChecks).toHaveBeenCalledWith(client, input);
expect(mockAcceptorChecks).toHaveBeenCalledWith(config.client, input);

expect(sleep).toHaveBeenCalled();
expect(sleep).toHaveBeenCalledTimes(1);
expect(sleep).toHaveBeenCalledWith(config.minDelay);
});

it("returns state in case of success", async () => {
const config = {
minDelay: 2,
maxDelay: 30,
maxWaitTime: 99999,
};

mockAcceptorChecks = jest.fn().mockResolvedValueOnce(successState);
await expect(runPolling<string, string>(config, client, input, mockAcceptorChecks)).resolves.toStrictEqual(
successState
);
await expect(runPolling(config, input, mockAcceptorChecks)).resolves.toStrictEqual(successState);
expect(sleep).toHaveBeenCalled();
expect(sleep).toHaveBeenCalledTimes(1);
expect(sleep).toHaveBeenCalledWith(config.minDelay);
Expand All @@ -76,18 +74,54 @@ describe(runPolling.name, () => {
.mockResolvedValueOnce(retryState)
.mockResolvedValueOnce(successState);

await expect(runPolling<string, string>(config, client, input, mockAcceptorChecks)).resolves.toStrictEqual(
successState
);
await expect(runPolling(config, input, mockAcceptorChecks)).resolves.toStrictEqual(successState);

expect(sleep).toHaveBeenCalled();
expect(sleep).toHaveBeenCalledTimes(7);
expect(sleep).toHaveBeenNthCalledWith(1, 2); // min delay
expect(sleep).toHaveBeenNthCalledWith(2, 3); // +random() * 2
expect(sleep).toHaveBeenNthCalledWith(3, 5); // +random() * 4
expect(sleep).toHaveBeenNthCalledWith(4, 9); // +random() * 8
expect(sleep).toHaveBeenNthCalledWith(5, 17); // +random() * 16
expect(sleep).toHaveBeenNthCalledWith(1, 2); // min delay. random(2, 2)
expect(sleep).toHaveBeenNthCalledWith(2, 3); // random(2, 4)
expect(sleep).toHaveBeenNthCalledWith(3, 5); // +random(2, 8)
expect(sleep).toHaveBeenNthCalledWith(4, 9); // +random(2, 16)
expect(sleep).toHaveBeenNthCalledWith(5, 30); // max delay
expect(sleep).toHaveBeenNthCalledWith(6, 30); // max delay
expect(sleep).toHaveBeenNthCalledWith(7, 30); // max delay
});

it("resolves after the last attempt before reaching maxWaitTime ", async () => {
let now = Date.now();
const delay = 2;
const nowMock = jest
.spyOn(Date, "now")
.mockReturnValueOnce(now) // 1st invoke for getting the time stamp to wait until
.mockImplementation(() => {
const rtn = now;
now += delay * 1000;
return rtn;
});
const localConfig = {
...config,
minDelay: delay,
maxDelay: delay,
maxWaitTime: 5,
};

mockAcceptorChecks = jest.fn().mockResolvedValue(retryState);
await expect(runPolling(localConfig, input, mockAcceptorChecks)).resolves.toStrictEqual(timeoutState);
expect(sleep).toHaveBeenCalled();
expect(sleep).toHaveBeenCalledTimes(2);
nowMock.mockReset();
});

it("resolves when abortController is signalled", async () => {
const abortController = new AbortController();
const localConfig = {
...config,
abortController,
};

mockAcceptorChecks = jest.fn().mockResolvedValue(retryState);
abortController.abort();
await expect(runPolling(localConfig, input, mockAcceptorChecks)).resolves.toStrictEqual(abortedState);
expect(sleep).not.toHaveBeenCalled();
});
});
38 changes: 26 additions & 12 deletions packages/util-waiter/src/poller.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { sleep } from "./utils/sleep";
import { WaiterOptions, WaiterResult, WaiterState } from "./waiter";
import { ResolvedWaiterOptions, SmithyClient, WaiterResult, WaiterState } from "./waiter";

/**
* Reference: https://github.com/awslabs/smithy/pull/656
* The theoretical limit to the attempt is max delay cannot be > Number.MAX_VALUE, but it's unlikely because of
* `maxWaitTime`
* Reference: https://awslabs.github.io/smithy/1.0/spec/waiters.html#waiter-retries
*/
const exponentialBackoffWithJitter = (floor: number, ciel: number, attempt: number) =>
Math.floor(Math.min(ciel, randomInRange(floor, floor * 2 ** (attempt - 1))));
const exponentialBackoffWithJitter = (minDelay: number, maxDelay: number, attemptCeiling: number, attempt: number) => {
if (attempt > attemptCeiling) return maxDelay;
const delay = minDelay * 2 ** (attempt - 1);
return randomInRange(minDelay, delay);
};

const randomInRange = (min: number, max: number) => min + Math.random() * (max - min);

/**
Expand All @@ -17,20 +19,32 @@ const randomInRange = (min: number, max: number) => min + Math.random() * (max -
* @param input client input
* @param stateChecker function that checks the acceptor states on each poll.
*/
export const runPolling = async <T, S>(
{ minDelay, maxDelay }: WaiterOptions,
client: T,
export const runPolling = async <T extends SmithyClient, S>(
{ minDelay, maxDelay, maxWaitTime, abortController, client }: ResolvedWaiterOptions<T>,
input: S,
acceptorChecks: (client: T, input: S) => Promise<WaiterResult>
): Promise<WaiterResult> => {
let currentAttempt = 1;

const waitUntil = Date.now() + maxWaitTime * 1000;
// The max attempt number that the derived delay time tend to increase.
// Pre-compute this number to avoid Number type overflow.
const attemptCeiling = Math.log(maxDelay / minDelay) / Math.log(2) + 1;
while (true) {
await sleep(exponentialBackoffWithJitter(minDelay, maxDelay, currentAttempt));
if (abortController?.signal?.aborted) {
return { state: WaiterState.ABORTED };
}
const delay = exponentialBackoffWithJitter(minDelay, maxDelay, attemptCeiling, currentAttempt);
// Resolve the promise explicitly at timeout or aborted. Otherwise this while loop will keep making API call until
// `acceptorCheck` returns non-retry status, even with the Promise.race() outside.
if (Date.now() + delay * 1000 > waitUntil) {
return { state: WaiterState.TIMEOUT };
}
await sleep(delay);
const { state } = await acceptorChecks(client, input);
if (state === WaiterState.SUCCESS || state === WaiterState.FAILURE) {
if (state !== WaiterState.RETRY) {
return { state };
}

currentAttempt += 1;
}
};
2 changes: 2 additions & 0 deletions packages/util-waiter/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./sleep";
export * from "./validate";
Loading

0 comments on commit 3d6eb2d

Please sign in to comment.