Skip to content

Commit

Permalink
feat(idempotency): implement IdempotencyHandler (#1416)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Schueren authored Apr 25, 2023
1 parent 5d680a0 commit f2ebf08
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 45 deletions.
79 changes: 61 additions & 18 deletions packages/idempotency/src/IdempotencyHandler.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,84 @@
import type { AnyFunctionWithRecord, IdempotencyOptions } from './types';
import { IdempotencyRecordStatus } from './types';
import type {
AnyFunctionWithRecord,
IdempotencyOptions,
} from './types';
import {
IdempotencyAlreadyInProgressError,
IdempotencyInconsistentStateError,
IdempotencyItemAlreadyExistsError,
IdempotencyAlreadyInProgressError,
IdempotencyPersistenceLayerError
IdempotencyPersistenceLayerError,
} from './Exceptions';
import { IdempotencyRecord } from './persistence/IdempotencyRecord';
import { IdempotencyRecord } from './persistence';

export class IdempotencyHandler<U> {
public constructor(
private functionToMakeIdempotent: AnyFunctionWithRecord<U>,
private functionPayloadToBeHashed: Record<string, unknown>,
private functionPayloadToBeHashed: Record<string, unknown>,
private idempotencyOptions: IdempotencyOptions,
private fullFunctionPayload: Record<string, unknown>
) {}
private fullFunctionPayload: Record<string, unknown>,
) {
}

public determineResultFromIdempotencyRecord(idempotencyRecord: IdempotencyRecord): Promise<U> | U {
public determineResultFromIdempotencyRecord(
idempotencyRecord: IdempotencyRecord
): Promise<U> | U {
if (idempotencyRecord.getStatus() === IdempotencyRecordStatus.EXPIRED) {
throw new IdempotencyInconsistentStateError('Item has expired during processing and may not longer be valid.');
} else if (idempotencyRecord.getStatus() === IdempotencyRecordStatus.INPROGRESS){
throw new IdempotencyAlreadyInProgressError(`There is already an execution in progress with idempotency key: ${idempotencyRecord.idempotencyKey}`);
throw new IdempotencyInconsistentStateError(
'Item has expired during processing and may not longer be valid.'
);
} else if (
idempotencyRecord.getStatus() === IdempotencyRecordStatus.INPROGRESS
) {
if (
idempotencyRecord.inProgressExpiryTimestamp &&
idempotencyRecord.inProgressExpiryTimestamp <
new Date().getUTCMilliseconds()
) {
throw new IdempotencyInconsistentStateError(
'Item is in progress but the in progress expiry timestamp has expired.'
);
} else {
throw new IdempotencyAlreadyInProgressError(
`There is already an execution in progress with idempotency key: ${idempotencyRecord.idempotencyKey}`
);
}
} else {
// Currently recalling the method as this fulfills FR1. FR3 will address using the previously stored value https://github.com/awslabs/aws-lambda-powertools-typescript/issues/447
return this.functionToMakeIdempotent(this.fullFunctionPayload);
return this.functionToMakeIdempotent(this.fullFunctionPayload);
}
}

/**
* Main entry point for the handler
* IdempotencyInconsistentStateError can happen under rare but expected cases
* when persistent state changes in the small time between put & get requests.
* In most cases we can retry successfully on this exception.
*/
public async handle(): Promise<U> {

const MAX_RETRIES = 2;
for (let i = 1; i <= MAX_RETRIES; i++) {
try {
return await this.processIdempotency();
} catch (e) {
if (!(e instanceof IdempotencyAlreadyInProgressError) || i === MAX_RETRIES) {
throw e;
}
}
}
/* istanbul ignore next */
throw new Error('This should never happen');
}

public async processIdempotency(): Promise<U> {
try {
await this.idempotencyOptions.persistenceStore.saveInProgress(this.functionPayloadToBeHashed);
await this.idempotencyOptions.persistenceStore.saveInProgress(
this.functionPayloadToBeHashed,
);
} catch (e) {
if (e instanceof IdempotencyItemAlreadyExistsError) {
const idempotencyRecord: IdempotencyRecord = await this.idempotencyOptions.persistenceStore.getRecord(this.functionPayloadToBeHashed);
const idempotencyRecord: IdempotencyRecord =
await this.idempotencyOptions.persistenceStore.getRecord(
this.functionPayloadToBeHashed
);

return this.determineResultFromIdempotencyRecord(idempotencyRecord);
} else {
Expand All @@ -45,4 +88,4 @@ export class IdempotencyHandler<U> {

return this.functionToMakeIdempotent(this.fullFunctionPayload);
}
}
}
9 changes: 4 additions & 5 deletions packages/idempotency/src/idempotentDecorator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ const idempotent = function (options: IdempotencyOptions) {
descriptor.value = function(record: GenericTempRecord){
const idempotencyHandler = new IdempotencyHandler<GenericTempRecord>(childFunction, record[options.dataKeywordArgument], options, record);

return idempotencyHandler.processIdempotency();
return idempotencyHandler.handle();
};

return descriptor;
};
};
};

export { idempotent };

2 changes: 1 addition & 1 deletion packages/idempotency/src/makeFunctionIdempotent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const makeFunctionIdempotent = function <U>(
const wrappedFn: AnyIdempotentFunction<U> = function (record: GenericTempRecord): Promise<U> {
const idempotencyHandler: IdempotencyHandler<U> = new IdempotencyHandler<U>(fn, record[options.dataKeywordArgument], options, record);

return idempotencyHandler.processIdempotency();
return idempotencyHandler.handle();
};

return wrappedFn;
Expand Down
178 changes: 178 additions & 0 deletions packages/idempotency/tests/unit/IdempotencyHandler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/**
* Test Idempotency Handler
*
* @group unit/idempotency/IdempotencyHandler
*/

import {
IdempotencyAlreadyInProgressError, IdempotencyInconsistentStateError,
IdempotencyItemAlreadyExistsError,
IdempotencyPersistenceLayerError
} from '../../src/Exceptions';
import { IdempotencyOptions, IdempotencyRecordStatus } from '../../src/types';
import { BasePersistenceLayer, IdempotencyRecord } from '../../src/persistence';
import { IdempotencyHandler } from '../../src/IdempotencyHandler';

class PersistenceLayerTestClass extends BasePersistenceLayer {
protected _deleteRecord = jest.fn();
protected _getRecord = jest.fn();
protected _putRecord = jest.fn();
protected _updateRecord = jest.fn();
}

const mockFunctionToMakeIdempotent = jest.fn();
const mockFunctionPayloadToBeHashed = {};
const mockIdempotencyOptions: IdempotencyOptions = {
persistenceStore: new PersistenceLayerTestClass(),
dataKeywordArgument: 'testingKey'
};
const mockFullFunctionPayload = {};

const idempotentHandler = new IdempotencyHandler(
mockFunctionToMakeIdempotent,
mockFunctionPayloadToBeHashed,
mockIdempotencyOptions,
mockFullFunctionPayload,
);

describe('Class IdempotencyHandler', () => {
beforeEach(() => jest.resetAllMocks());

describe('Method: determineResultFromIdempotencyRecord', () => {
test('when record is in progress and within expiry window, it rejects with IdempotencyAlreadyInProgressError', async () => {

const stubRecord = new IdempotencyRecord({
idempotencyKey: 'idempotencyKey',
expiryTimestamp: Date.now() + 1000, // should be in the future
inProgressExpiryTimestamp: 0, // less than current time in milliseconds
responseData: { responseData: 'responseData' },
payloadHash: 'payloadHash',
status: IdempotencyRecordStatus.INPROGRESS
});

expect(stubRecord.isExpired()).toBe(false);
expect(stubRecord.getStatus()).toBe(IdempotencyRecordStatus.INPROGRESS);

try {
await idempotentHandler.determineResultFromIdempotencyRecord(stubRecord);
} catch (e) {
expect(e).toBeInstanceOf(IdempotencyAlreadyInProgressError);
}
});

test('when record is in progress and outside expiry window, it rejects with IdempotencyInconsistentStateError', async () => {

const stubRecord = new IdempotencyRecord({
idempotencyKey: 'idempotencyKey',
expiryTimestamp: Date.now() + 1000, // should be in the future
inProgressExpiryTimestamp: new Date().getUTCMilliseconds() - 1000, // should be in the past
responseData: { responseData: 'responseData' },
payloadHash: 'payloadHash',
status: IdempotencyRecordStatus.INPROGRESS
});

expect(stubRecord.isExpired()).toBe(false);
expect(stubRecord.getStatus()).toBe(IdempotencyRecordStatus.INPROGRESS);

try {
await idempotentHandler.determineResultFromIdempotencyRecord(stubRecord);
} catch (e) {
expect(e).toBeInstanceOf(IdempotencyInconsistentStateError);
}
});

test('when record is expired, it rejects with IdempotencyInconsistentStateError', async () => {

const stubRecord = new IdempotencyRecord({
idempotencyKey: 'idempotencyKey',
expiryTimestamp: new Date().getUTCMilliseconds() - 1000, // should be in the past
inProgressExpiryTimestamp: 0, // less than current time in milliseconds
responseData: { responseData: 'responseData' },
payloadHash: 'payloadHash',
status: IdempotencyRecordStatus.EXPIRED
});

expect(stubRecord.isExpired()).toBe(true);
expect(stubRecord.getStatus()).toBe(IdempotencyRecordStatus.EXPIRED);

try {
await idempotentHandler.determineResultFromIdempotencyRecord(stubRecord);
} catch (e) {
expect(e).toBeInstanceOf(IdempotencyInconsistentStateError);
}
});
});

describe('Method: handle', () => {

afterAll(() => jest.restoreAllMocks()); // restore processIdempotency for other tests

test('when IdempotencyAlreadyInProgressError is thrown, it retries two times', async () => {
const mockProcessIdempotency = jest.spyOn(IdempotencyHandler.prototype, 'processIdempotency').mockRejectedValue(new IdempotencyAlreadyInProgressError('There is already an execution in progress'));
await expect(
idempotentHandler.handle()
).rejects.toThrow(IdempotencyAlreadyInProgressError);
expect(mockProcessIdempotency).toHaveBeenCalledTimes(2);
});

test('when non IdempotencyAlreadyInProgressError is thrown, it rejects', async () => {

const mockProcessIdempotency = jest.spyOn(IdempotencyHandler.prototype, 'processIdempotency').mockRejectedValue(new Error('Some other error'));

await expect(
idempotentHandler.handle()
).rejects.toThrow(Error);
expect(mockProcessIdempotency).toHaveBeenCalledTimes(1);
});

});

describe('Method: processIdempotency', () => {

test('when persistenceStore saves successfuly, it resolves', async () => {
const mockSaveInProgress = jest.spyOn(mockIdempotencyOptions.persistenceStore, 'saveInProgress').mockResolvedValue();

mockFunctionToMakeIdempotent.mockImplementation(() => Promise.resolve('result'));

await expect(
idempotentHandler.processIdempotency()
).resolves.toBe('result');
expect(mockSaveInProgress).toHaveBeenCalledTimes(1);
});

test('when persistences store throws any error, it wraps the error to IdempotencyPersistencesLayerError', async () => {
const mockSaveInProgress = jest.spyOn(mockIdempotencyOptions.persistenceStore, 'saveInProgress').mockRejectedValue(new Error('Some error'));
const mockDetermineResultFromIdempotencyRecord = jest.spyOn(IdempotencyHandler.prototype, 'determineResultFromIdempotencyRecord').mockResolvedValue('result');

await expect(
idempotentHandler.processIdempotency()
).rejects.toThrow(IdempotencyPersistenceLayerError);
expect(mockSaveInProgress).toHaveBeenCalledTimes(1);
expect(mockDetermineResultFromIdempotencyRecord).toHaveBeenCalledTimes(0);
});

test('when idempotency item already exists, it returns the existing record', async () => {
const mockSaveInProgress = jest.spyOn(mockIdempotencyOptions.persistenceStore, 'saveInProgress').mockRejectedValue(new IdempotencyItemAlreadyExistsError('There is already an execution in progress'));

const stubRecord = new IdempotencyRecord({
idempotencyKey: 'idempotencyKey',
expiryTimestamp: 0,
inProgressExpiryTimestamp: 0,
responseData: { responseData: 'responseData' },
payloadHash: 'payloadHash',
status: IdempotencyRecordStatus.INPROGRESS
});
const mockGetRecord = jest.spyOn(mockIdempotencyOptions.persistenceStore, 'getRecord').mockImplementation(() => Promise.resolve(stubRecord));
const mockDetermineResultFromIdempotencyRecord = jest.spyOn(IdempotencyHandler.prototype, 'determineResultFromIdempotencyRecord').mockResolvedValue('result');

await expect(
idempotentHandler.processIdempotency()
).resolves.toBe('result');
expect(mockSaveInProgress).toHaveBeenCalledTimes(1);
expect(mockGetRecord).toHaveBeenCalledTimes(1);
expect(mockDetermineResultFromIdempotencyRecord).toHaveBeenCalledTimes(1);
});
});

});

Loading

0 comments on commit f2ebf08

Please sign in to comment.