Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor entities #345

Merged
merged 22 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 94 additions & 31 deletions apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,25 @@ export const createAndDestroyWorkflow = workflow(
}
);

export const counter = entity<{ n: number }>("counter2", z.any());
export const counter = entity("counter4", {
attributes: {
n: z.number(),
namespace: z.union([z.literal("different"), z.literal("default")]),
id: z.string(),
},
partition: ["namespace", "id"],
});

export const counterCollection = entity("counter-collection", {
attributes: {
id: z.string(),
counterNumber: z.number(),
n: z.number(),
},
partition: ["id"],
sort: ["counterNumber"],
});

const entityEvent = event<{ id: string }>("entityEvent");
const entitySignal = signal("entitySignal");
const entitySignal2 = signal<{ n: number }>("entitySignal2");
Expand All @@ -560,19 +578,25 @@ export const counterWatcher = counter.stream(
// TODO: compute the possible operations union from the operations array
if (item.operation === "remove") {
const { n } = item.oldValue!;
await entitySignal2.sendSignal(item.key, { n: n + 1 });
await entitySignal2.sendSignal(item.key.id, { n: n + 1 });
}
}
);

export const counterNamespaceWatcher = counter.stream(
"counterNamespaceWatch",
{ namespacePrefixes: ["different"] },
{ queryKeys: [{ namespace: "different" }] },
async (item) => {
console.log(item);
if (item.operation === "insert") {
const value = await counter.get(item.key);
await counter.set(item.key, { n: (value?.n ?? 0) + 1 });
await entitySignal.sendSignal(item.key);
await counter.set({
namespace: "default",
id: value!.id,
n: (value?.n ?? 0) + 1,
});
console.log("send signal to", value!.id);
await entitySignal.sendSignal(value!.id);
}
}
);
Expand All @@ -581,60 +605,91 @@ export const onEntityEvent = subscription(
"onEntityEvent",
{ events: [entityEvent] },
async ({ id }) => {
const value = await counter.get(id);
await counter.set(id, { n: (value?.n ?? 0) + 1 });
const value = await counter.get({ namespace: "default", id });
await counter.set({ namespace: "default", id, n: (value?.n ?? 0) + 1 });
await entitySignal.sendSignal(id);
}
);

export const entityTask = task(
"entityAct",
async (_, { execution: { id } }) => {
const value = await counter.get(id);
await counter.set(id, { n: (value?.n ?? 0) + 1 });
const value = await counter.get(["default", id]);
await counter.set({ namespace: "default", id, n: (value?.n ?? 0) + 1 });
}
);

export const entityWorkflow = workflow(
"entityWorkflow",
async (_, { execution: { id } }) => {
await counter.set(id, { n: 1 });
counter.set({ key: id, namespace: "different!" }, { n: 0 });
await counter.set({ namespace: "default", id, n: 1 });
await counter.set({ namespace: "different", id, n: 1 });
await entitySignal.expectSignal();
await entityTask();
await Promise.all([entityEvent.emit({ id }), entitySignal.expectSignal()]);
try {
// will fail
await counter.set(id, { n: 0 }, { expectedVersion: 1 });
await counter.set(
{ namespace: "default", id, n: 0 },
{ expectedVersion: 1 }
);
} catch (err) {
console.error("expected the entity set to fail", err);
}
const { entity, version } = (await counter.getWithMetadata(id)) ?? {};
await counter.set(id, { n: entity!.n + 1 }, { expectedVersion: version });
const value = await counter.get(id);
const { value: entityValue, version } =
(await counter.getWithMetadata(["default", id])) ?? {};
await counter.set(
{ namespace: "default", id, n: entityValue!.n + 1 },
{ expectedVersion: version }
);
const value = await counter.get(["default", id]);
await Entity.transactWrite([
{
entity: counter,
operation: {
operation: "set",
key: id,
value: { n: (value?.n ?? 0) + 1 },
},
operation: "set",
value: { namespace: "default", id, n: (value?.n ?? 0) + 1 },
},
]);
// send deletion, to be picked up by the stream
counter.delete(id);
await counter.list({});
await counter.delete(["default", id]);
await counter.query(["default", id]);
// this signal will contain the final value after deletion
return await entitySignal2.expectSignal();
const result1 = await entitySignal2.expectSignal();

/**
* Testing sort keys and query
*/

await Promise.all([
counterCollection.set({ id, counterNumber: 1, n: 1 }),
counterCollection.set({ id, counterNumber: 2, n: 1 }),
counterCollection.set({ id, counterNumber: 3, n: 1 }),
]);

const counter1 = await counterCollection.get({ id, counterNumber: 1 });
await counterCollection.set({
id,
counterNumber: 2,
n: (counter1?.n ?? 0) + 1,
});

const counters = await counterCollection.query({ id });

return [
result1,
counters.entries?.map((c) => [c.value.counterNumber, c.value.n]),
];
}
);

export const check = entity<{ n: number }>("check");
export const check = entity("check4", {
attributes: { n: z.number(), id: z.string() },
partition: ["id"],
});

const gitErDone = transaction("gitErDone", async ({ id }: { id: string }) => {
const val = await check.get(id);
await check.set(id, { n: val?.n ?? 0 + 1 });
const val = await check.get([id]);
await check.set({ id, n: val?.n ?? 0 + 1 });
return val?.n ?? 0 + 1;
});

Expand All @@ -645,17 +700,25 @@ const noise = task(
let transact: Promise<number> | undefined;
while (n-- > 0) {
try {
await check.set(id, { n });
await check.set({ id, n });
} catch (err) {
console.error(err);
if (!(err instanceof TransactionConflictException)) {
throw err;
}
}
console.log(n);
if (n === x) {
transact = gitErDone({ id });
}
}
return await transact;
console.log("waiting...");
try {
return await transact;
} catch (err) {
console.error("Transaction Errored", err);
throw err;
}
}
);

Expand All @@ -665,11 +728,11 @@ export const transactionWorkflow = workflow(
const one = await noise({ x: 40 });
const two = await noise({ x: 60 });
const [, three] = await Promise.allSettled([
check.set(id, { n: two ?? 0 + 1 }),
check.set({ id, n: two ?? 0 + 1 }),
gitErDone({ id }),
check.set(id, { n: two ?? 0 + 1 }),
check.set({ id, n: two ?? 0 + 1 }),
]);
await check.delete(id);
await check.delete([id]);
return [one, two, three.status === "fulfilled" ? three.value : "AHHH"];
}
);
Expand Down
9 changes: 8 additions & 1 deletion apps/tests/aws-runtime/test/tester.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,14 @@ eventualRuntimeTestHarness(

testCompletion("awsSdkCalls", createAndDestroyWorkflow, "done");

testCompletion("ent", entityWorkflow, { n: 7 });
testCompletion("ent", entityWorkflow, [
{ n: 7 },
[
[1, 1],
[2, 2],
[3, 1],
],
]);

testCompletion("transaction", transactionWorkflow, ([one, two, three]) => {
expect(one).not.toBeUndefined();
Expand Down
Loading