Skip to content

Commit

Permalink
fix: transaction in a workflow transform error (#450)
Browse files Browse the repository at this point in the history
  • Loading branch information
thantos authored Sep 19, 2023
1 parent e989788 commit e000bfc
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 69 deletions.
24 changes: 23 additions & 1 deletion apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -941,10 +941,32 @@ const gitErDone = transaction("gitErDone", async ({ id }: { id: string }) => {
return val?.n ?? 0 + 1;
});

/**
* Test writing to a new item in a transaction while asserting that it is really new.
*/
const transactInitialize = transaction(
"transactInitialize",
async ({ id, n }: { id: string; n: number }) => {
// mimic a situation where an item is retrieved but doesn't exist
const x = await check.get({ id: "something random" });
if (!x) {
await check.put({ id, n });
}
}
);

const transactDelete = transaction(
"transactDelete",
async ({ id }: { id: string }) => {
await check.delete([id]);
}
);

const noise = task(
"noiseTask",
async ({ x }: { x: number }, { execution: { id } }) => {
let n = 100;
await transactInitialize({ id, n: 101 });
let transact: Promise<number> | undefined;
while (n-- > 0) {
try {
Expand Down Expand Up @@ -988,7 +1010,7 @@ export const transactionWorkflow = workflow(
gitErDone({ id }),
check.put({ id, n: two ?? 0 + 1 }),
]);
await check.delete([id]);
await transactDelete({ id });
return [one, two, three.status === "fulfilled" ? three.value : "AHHH"];
}
);
Expand Down
2 changes: 1 addition & 1 deletion packages/@eventual/aws-runtime/src/stores/entity-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ export class AWSEntityStore extends EntityStore {
"#version": EntityEntityRecord.VERSION_FIELD,
},
ExpressionAttributeValues:
item.version !== undefined
item.version !== undefined && item.version !== 0
? {
":expectedVersion": {
N: item.version.toString(),
Expand Down
2 changes: 1 addition & 1 deletion packages/@eventual/cli/src/commands/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ export const local = (yargs: Argv) =>
: message,
})
.then((res) => {
if (res) {
if (res && res.message) {
ws.send(res.message);
}
});
Expand Down
101 changes: 55 additions & 46 deletions packages/@eventual/core-runtime/src/local/stores/entity-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import {
Entity,
EntityConsistencyOptions,
EntityIndex,
EntityPutOptions,
EntityQueryOptions,
EntityQueryResult,
EntityScanOptions,
EntityPutOptions,
EntityStreamItem,
EntityWithMetadata,
KeyValue,
Expand Down Expand Up @@ -36,7 +36,6 @@ import {
isNormalizedEntityQueryKeyConditionPart,
normalizeCompositeKey,
} from "../../stores/entity-store.js";
import { deserializeCompositeKey, serializeCompositeKey } from "../../utils.js";
import { LocalEnvConnector } from "../local-container.js";
import { paginateItems } from "./pagination.js";

Expand Down Expand Up @@ -70,19 +69,35 @@ export class LocalEntityStore extends EntityStore {
return this.getPartitionMap(entity, key.partition).get(skOrDefault(key));
}

protected _getWithMetadataSync(
entity: Entity,
key: NormalizedEntityCompositeKeyComplete
): EntityWithMetadata | undefined {
return this.getPartitionMap(entity, key.partition).get(skOrDefault(key));
}

protected override async _put(
entity: Entity,
value: Attributes,
key: NormalizedEntityCompositeKeyComplete,
options?: EntityPutOptions
): Promise<{ version: number }> {
return this._putSync(entity, value, key, options);
}

protected _putSync(
entity: Entity,
value: Attributes,
key: NormalizedEntityCompositeKeyComplete,
options?: EntityPutOptions
): { version: number } {
const { version = 0, value: oldValue } =
(await this._getWithMetadata(entity, key)) ?? {};
this._getWithMetadataSync(entity, key) ?? {};
if (
options?.expectedVersion !== undefined &&
options.expectedVersion !== version
) {
throw new Error(
throw new UnexpectedVersion(
`Expected entity to be of version ${options.expectedVersion} but found ${version}`
);
}
Expand Down Expand Up @@ -116,7 +131,15 @@ export class LocalEntityStore extends EntityStore {
key: NormalizedEntityCompositeKeyComplete,
options?: EntityConsistencyOptions | undefined
): Promise<void> {
const item = await this._getWithMetadata(entity, key);
return this._deleteSync(entity, key, options);
}

protected _deleteSync(
entity: Entity,
key: NormalizedEntityCompositeKeyComplete,
options?: EntityConsistencyOptions | undefined
): void {
const item = this._getWithMetadataSync(entity, key);
if (item) {
if (options?.expectedVersion !== undefined) {
if (options.expectedVersion !== item.version) {
Expand Down Expand Up @@ -251,37 +274,28 @@ export class LocalEntityStore extends EntityStore {
};
}

protected override async _transactWrite(
protected override _transactWrite(
items: NormalizedEntityTransactItem[]
): Promise<void> {
const keysAndVersions = Object.fromEntries(
items.map((item) => {
return [
serializeCompositeKey(item.entity.name, item.key),
item.operation === "condition"
? item.version
: item.options?.expectedVersion,
] as const;
})
);
/**
* Evaluate the expected versions against the current state and return the results.
*
* This is similar to calling TransactWriteItem in dynamo with only ConditionChecks and then
* handling the errors.
*/
const consistencyResults = await Promise.all(
Object.entries(keysAndVersions).map(async ([sKey, expectedVersion]) => {
if (expectedVersion === undefined) {
return true;
}
const [entityName, key] = deserializeCompositeKey(sKey);
const { version } = (await this.getWithMetadata(entityName, key)) ?? {
version: 0,
};
return version === expectedVersion;
})
);
const consistencyResults = items.map((item) => {
const expectedVersion =
item.operation === "condition"
? item.version
: item.options?.expectedVersion;
if (expectedVersion === undefined) {
return true;
}
const { version } = this._getWithMetadataSync(item.entity, item.key) ?? {
version: 0,
};
return version === expectedVersion;
});
if (consistencyResults.some((r) => !r)) {
throw new TransactionCancelled(
consistencyResults.map((r) =>
Expand All @@ -294,24 +308,19 @@ export class LocalEntityStore extends EntityStore {
* Here we assume that the write operations are synchronous and that
* the state of the condition checks will not be invalided.
*/
await Promise.all(
items.map(async (item) => {
if (item.operation === "put") {
return await this._put(
item.entity,
item.value,
item.key,
item.options
);
} else if (item.operation === "delete") {
return await this._delete(item.entity, item.key, item.options);
} else if (item.operation === "condition") {
// no op
return;
}
return assertNever(item);
})
);
items.forEach((item) => {
if (item.operation === "put") {
return this._putSync(item.entity, item.value, item.key, item.options);
} else if (item.operation === "delete") {
return this._deleteSync(item.entity, item.key, item.options);
} else if (item.operation === "condition") {
// no op
return;
}
return assertNever(item);
});

return Promise.resolve();
}

private getLocalEntity(entityOrIndex: Entity | EntityIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,14 @@ export function createTransactionWorkflowQueueExecutor(
new TransactionCallExecutor(transactionClient),
queueClient,
(_, result, { executionTime, seq }) => {
if (result.succeeded) {
return createEvent<TransactionRequestSucceeded>(
{
type: WorkflowEventType.TransactionRequestSucceeded,
result: result.output,
seq,
},
executionTime
);
} else {
return createEvent<TransactionRequestFailed>(
{
type: WorkflowEventType.TransactionRequestFailed,
error: "Transaction Failed",
message: "",
seq,
},
executionTime
);
}
return createEvent<TransactionRequestSucceeded>(
{
type: WorkflowEventType.TransactionRequestSucceeded,
result,
seq,
},
executionTime
);
},
(_, err, { executionTime, seq }) => {
return createEvent<TransactionRequestFailed>(
Expand Down

0 comments on commit e000bfc

Please sign in to comment.