Skip to content

Commit

Permalink
feat: bucket (#340)
Browse files Browse the repository at this point in the history
  • Loading branch information
thantos authored May 4, 2023
1 parent ce0c0bc commit a311d73
Show file tree
Hide file tree
Showing 58 changed files with 7,849 additions and 11,367 deletions.
5 changes: 5 additions & 0 deletions apps/tests/aws-runtime-cdk/src/app.mts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ const testService = new eventual.Service<typeof testServiceRuntime>(
logLevel: LogLevel.DEBUG,
},
},
buckets: {
myBucket: {
bucketName: `super-random-test-bucket-${stack.account}`,
},
},
cors: {
allowOrigins: ["http://some-url.com"],
},
Expand Down
2 changes: 1 addition & 1 deletion apps/tests/aws-runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
},
"devDependencies": {
"@anatine/zod-openapi": "^1.12.0",
"@aws-sdk/types": "^3.254.0",
"@aws-sdk/types": "^3.310.0",
"@jest/globals": "^29",
"@types/aws-lambda": "^8.10.108",
"@types/jest": "^29",
Expand Down
112 changes: 106 additions & 6 deletions apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@ import {
PutItemCommand,
TransactionConflictException,
} from "@aws-sdk/client-dynamodb";
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs";
import {
ApiSpecification,
Entity,
EventualError,
HeartbeatTimeout,
HttpResponse,
api,
ApiSpecification,
asyncResult,
bucket,
command,
condition,
duration,
Entity,
entity,
event,
EventualError,
expectSignal,
HeartbeatTimeout,
HttpResponse,
Schedule,
sendSignal,
sendTaskHeartbeat,
signal,
Expand All @@ -30,6 +32,7 @@ import {
workflow,
} from "@eventual/core";
import type openapi from "openapi3-ts";
import { Readable } from "stream";
import z from "zod";
import { AsyncWriterTestEvent } from "./async-writer-handler.js";

Expand Down Expand Up @@ -672,6 +675,102 @@ export const transactionWorkflow = workflow(
}
);

export const myBucket = bucket("myBucket");
export const bucketSignal = signal<{ data: string }>("bucketSignal");

export const myBucketHandler = myBucket.on(
"put",
"myBucketHandler",
{ filters: [{ prefix: "key/" }] },
async (item) => {
const executionId = item.key.slice(4);
const obj = await myBucket.get(item.key);

if (obj?.body) {
await bucketSignal.sendSignal(executionId, {
data: await obj.getBodyString(),
});
}
}
);

export const bucketTask = task(
"bucketTask",
async (request: { key: string; prefix: string; data: string }) => {
await myBucket.put(request.key, request.data);

const result = await myBucket.get(request.key);

const keys = await myBucket.list({ prefix: request.key });

if (!result) {
throw new Error("");
}

return {
data: await result.getBodyString(),
keys: keys.objects.map((s) => s.key),
};
}
);

/**
* 1. use {@link bucketTask} to create an object, then return the data and listed keys
* 2. pickup the write from a stream, emitting a signal to the workflow with the data
* 3. delete the object
* 4. return
*/
export const bucketWorkflow = workflow(
"bucketWorkflow",
async (_, { execution: { id } }) => {
const data = "hello!";
const key = `key/${id}`;

try {
const [result, signalResult] = await Promise.all([
bucketTask({ key, data, prefix: "key/" }),
bucketSignal.expectSignal({ timeout: duration(5, "minutes") }),
]);

const data2 = "hello again!";

const [, signalResult2] = await Promise.all([
myBucket.put(key, data2),
bucketSignal.expectSignal({ timeout: duration(5, "minutes") }),
]);

const data3 = "hello again again!";

const [, signalResult3] = await Promise.all([
myBucket.put(key, Buffer.from(data3)),
bucketSignal.expectSignal({ timeout: duration(5, "minutes") }),
]);

const data4 = "hello again again again!";

const [, signalResult4] = await Promise.all([
myBucket.put(key, Readable.from(Buffer.from(data4))),
bucketSignal.expectSignal({ timeout: duration(5, "minutes") }),
]);

await myBucket.copyTo(key + "2", key);

const copiedData = await myBucket.get(key + "2");

return {
result,
signalResult,
signalResult2,
signalResult3,
signalResult4,
copied: await copiedData?.getBodyString(),
};
} finally {
await Promise.all([myBucket.delete(key), myBucket.delete(key + "2")]);
}
}
);

export const hello3 = api.post("/hello3", () => {
return new HttpResponse("hello?");
});
Expand Down Expand Up @@ -722,6 +821,7 @@ export const typed2 = command(
detailed: z.boolean().optional(),
}),
output: User,
handlerTimeout: Schedule.duration(10, "minutes"),
},
async (request) => {
return {
Expand Down
10 changes: 10 additions & 0 deletions apps/tests/aws-runtime/test/tester.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import type * as TestService from "./test-service.js";
import {
allCommands,
asyncWorkflow,
bucketWorkflow,
createAndDestroyWorkflow,
entityWorkflow,
eventDrivenWorkflow,
Expand Down Expand Up @@ -125,6 +126,15 @@ eventualRuntimeTestHarness(
expect(two).not.toBeUndefined();
expect(three).not.toBeUndefined();
});

testCompletion("buckets", bucketWorkflow, {
result: { data: "hello!", keys: [expect.stringContaining("key/")] },
signalResult: { data: "hello!" },
signalResult2: { data: "hello again!" },
signalResult3: { data: "hello again again!" },
signalResult4: { data: "hello again again again!" },
copied: "hello again again again!"
});
},
{
name: "s3 persist failures",
Expand Down
Loading

0 comments on commit a311d73

Please sign in to comment.