Skip to content

Commit

Permalink
feat: service content in handlers (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
thantos authored Apr 28, 2023
1 parent 2df6fad commit 82860fd
Show file tree
Hide file tree
Showing 34 changed files with 402 additions and 167 deletions.
7 changes: 6 additions & 1 deletion apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -764,8 +764,9 @@ export const specCommand = command(
);

export const extractHeaderCommand = api
.use(({ request, next }) =>
.use(({ request, context, next }) =>
next({
...context,
MyHeader: request.headers.get("MyHeader"),
})
)
Expand Down Expand Up @@ -803,6 +804,10 @@ export const modifyResponseMiddlewareHttp = api
return new HttpResponse("My Response");
});

export const contextTest = command("contextText", (_, { service }) => {
return service;
});

const simpleEvent = event<{ value: string }>("simpleEvent");

export const simpleEventHandler = subscription(
Expand Down
16 changes: 15 additions & 1 deletion apps/tests/aws-runtime/test/tester.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
commandRpcPath,
EventualError,
HeartbeatTimeout,
ServiceContext,
} from "@eventual/core";
import { jest } from "@jest/globals";
import { ChaosEffects, ChaosTargets } from "./chaos-extension/chaos-engine.js";
Expand Down Expand Up @@ -242,7 +243,7 @@ test("middleware context is properly piped to command", async () => {
})
).json();

expect(rpcResponse).toEqual({
expect(rpcResponse).toMatchObject({
MyHeader: "value",
});
});
Expand Down Expand Up @@ -286,3 +287,16 @@ test("middleware can edit response", async () => {

expect(rpcResponse.headers.get("ModifiedHeader")).toEqual("Injected Header");
});

test("test service context", async () => {
const serviceClient = new ServiceClient<typeof TestService>({
serviceUrl: url,
});

await expect(
serviceClient.contextText()
).resolves.toMatchObject<ServiceContext>({
serviceName: "eventual-tests",
serviceUrl: url,
});
});
3 changes: 2 additions & 1 deletion packages/@eventual/aws-cdk/src/service-function.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { serviceFunctionName } from "@eventual/aws-runtime";
import { ENV_NAMES, serviceFunctionName } from "@eventual/aws-runtime";
import type { FunctionRuntimeProps } from "@eventual/core";
import {
BundledFunction,
Expand Down Expand Up @@ -55,6 +55,7 @@ export class ServiceFunction extends Function {
: props.defaults?.timeout ?? Duration.seconds(3)),
environment: {
NODE_OPTIONS: "--enable-source-maps",
[ENV_NAMES.SERVICE_NAME]: props.build.serviceName,
...baseFnProps.environment,
...props.defaults?.environment,
...props.overrides?.environment,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { EventualServiceClient, HttpMethod, HttpRequest } from "@eventual/core";
import {
getLazy,
LazyValue,
registerWorkerIntrinsics,
type CommandWorker,
} from "@eventual/core-runtime";
import { ServiceSpec } from "@eventual/core/internal";
import type { ServiceSpec } from "@eventual/core/internal";
import type {
APIGatewayProxyEventV2,
APIGatewayProxyHandlerV2,
Expand All @@ -21,24 +23,28 @@ export function createApiGCommandAdaptor({
commandWorker,
serviceClientBuilder,
serviceSpec,
serviceName: _serviceName,
}: {
commandWorker: CommandWorker;
serviceName: LazyValue<string>;
serviceSpec?: ServiceSpec;
serviceClientBuilder?: (serviceUrl: string) => EventualServiceClient;
}): APIGatewayProxyHandlerV2 {
const serviceName = getLazy(_serviceName);
return async function (
event: APIGatewayProxyEventV2
): Promise<APIGatewayProxyResultV2> {
console.debug("event", event);
//

const serviceUrl = `https://${event.requestContext.domainName}`;
const serviceClient = serviceClientBuilder
? serviceClientBuilder(serviceUrl)
: undefined;
registerWorkerIntrinsics({
serviceClient,
serviceSpec,
serviceUrls: [serviceUrl],
serviceUrl,
serviceName: getLazy(serviceName),
});
const requestBody = event.body
? event.isBase64Encoded
Expand All @@ -56,7 +62,9 @@ export function createApiGCommandAdaptor({
}
);

const response = await commandWorker(request);
const response = await commandWorker(request, {
service: { serviceUrl, serviceName },
});
const headers: Record<string, string> = {};

response.headers.forEach((value, key) => (headers[key] = value));
Expand Down
3 changes: 3 additions & 0 deletions packages/@eventual/aws-runtime/src/handlers/command-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
createTransactionClient,
} from "../create.js";
import { createApiGCommandAdaptor } from "./apig-command-adapter.js";
import { serviceName } from "../env.js";

/**
* Handle inbound command and rest api requests.
Expand All @@ -20,7 +21,9 @@ export default createApiGCommandAdaptor({
commandWorker: createCommandWorker({
// the service client, spec, and service url will be created at runtime, using a computed uri from the apigateway request
entityClient: createEntityClient(),
serviceName,
}),
serviceName,
serviceSpec,
// pulls the service url from the request instead of env variables to reduce the circular dependency between commands and the gateway.
serviceClientBuilder: (serviceUrl) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@ import {
import { EntityStreamOperation } from "@eventual/core/internal";
import { DynamoDBStreamHandler } from "aws-lambda";
import { createEntityClient, createServiceClient } from "../create.js";
import { entityName, entityStreamName, serviceUrl } from "../env.js";
import {
entityName,
entityStreamName,
serviceName,
serviceUrl,
} from "../env.js";
import { EntityEntityRecord } from "../stores/entity-store.js";

const worker = createEntityStreamWorker({
serviceClient: createServiceClient({}),
entityClient: createEntityClient(),
serviceSpec,
serviceUrls: [serviceUrl],
serviceName,
serviceUrl,
});

export default (async (event) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
createServiceClient,
createTransactionClient,
} from "../create.js";
import { serviceUrl } from "../env.js";
import { serviceName, serviceUrl } from "../env.js";

export const processEvent = createSubscriptionWorker({
// partially uses the runtime clients and partially uses the http client
Expand All @@ -23,7 +23,8 @@ export const processEvent = createSubscriptionWorker({
subscriptionProvider: new GlobalSubscriptionProvider(),
entityClient: createEntityClient(),
serviceSpec,
serviceUrls: [serviceUrl],
serviceName,
serviceUrl: serviceUrl,
});

export default async function (event: EventBridgeEvent<string, any>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import serviceSpec from "@eventual/injected/spec";
import type { AnyCommand } from "@eventual/core";
import {
createCommandWorker,
createEmitEventsCommand,
createExecuteTransactionCommand,
createGetExecutionCommand,
createListExecutionHistoryCommand,
createListExecutionsCommand,
createListWorkflowHistoryCommand,
createListWorkflowsCommand,
createEmitEventsCommand,
createSendSignalCommand,
createStartExecutionCommand,
createUpdateTaskCommand,
Expand All @@ -26,13 +26,16 @@ import {
createTransactionClient,
createWorkflowClient,
} from "../create.js";
import { serviceName } from "../env.js";
import { createApiGCommandAdaptor } from "./apig-command-adapter.js";

function systemCommandWorker(
..._commands: AnyCommand[]
): APIGatewayProxyHandlerV2<Response> {
return createApiGCommandAdaptor({
commandWorker: createCommandWorker({ serviceSpec }),
commandWorker: createCommandWorker({}),
serviceSpec,
serviceName,
});
}

Expand Down
18 changes: 9 additions & 9 deletions packages/@eventual/aws-runtime/src/handlers/task-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import serviceSpec from "@eventual/injected/spec";
import "@eventual/injected/entry";

import {
createTaskWorker,
GlobalTaskProvider,
TaskFallbackRequest,
TaskWorkerRequest,
createTaskWorker,
} from "@eventual/core-runtime";
import { AWSMetricsClient } from "../clients/metrics-client.js";
import {
Expand All @@ -24,11 +24,11 @@ import {
import { serviceName, serviceUrl } from "../env.js";

const worker = createTaskWorker({
executionQueueClient: createExecutionQueueClient(),
entityClient: createEntityClient(),
eventClient: createEventClient(),
timerClient: createTimerClient(),
executionQueueClient: createExecutionQueueClient(),
logAgent: createLogAgent(),
metricsClient: AWSMetricsClient,
taskProvider: new GlobalTaskProvider(),
// partially uses the runtime clients and partially uses the http client
serviceClient: createServiceClient({
taskClient: createTaskClient(),
Expand All @@ -38,12 +38,12 @@ const worker = createTaskWorker({
executionStore: createExecutionStore(),
transactionClient: createTransactionClient(),
}),
logAgent: createLogAgent(),
taskStore: createTaskStore(),
serviceName: serviceName(),
entityClient: createEntityClient(),
serviceName,
serviceSpec,
serviceUrls: [serviceUrl],
serviceUrl,
taskProvider: new GlobalTaskProvider(),
taskStore: createTaskStore(),
timerClient: createTimerClient(),
});

export default async (request: TaskWorkerRequest) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import { createTransactionWorker } from "@eventual/core-runtime";
import {
createEntityStore,
createEventClient,
createExecutionQueueClient,
createExecutionQueueClient
} from "../create.js";
import { serviceName } from "../env.js";

export default createTransactionWorker({
entityStore: createEntityStore(),
eventClient: createEventClient(),
executionQueueClient: createExecutionQueueClient(),
serviceName,
});
3 changes: 2 additions & 1 deletion packages/@eventual/cli/src/commands/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ export const local = (yargs: Argv) =>
// TODO: should the loading be done by the local env?
const localEnv = new LocalEnvironment({
serviceSpec,
serviceUrls: [url],
serviceUrl: url,
serviceName: serviceName,
});

app.use(express.json({ strict: false }));
Expand Down
25 changes: 16 additions & 9 deletions packages/@eventual/core-runtime/src/handlers/command-worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
CommandContext,
commandRpcPath,
HttpRequest,
HttpResponse,
Expand All @@ -13,10 +14,10 @@ import {
import itty from "itty-router";
import { registerWorkerIntrinsics, WorkerIntrinsicDeps } from "./utils.js";

export interface ApiHandlerDependencies extends WorkerIntrinsicDeps {}
export interface ApiHandlerDependencies extends Partial<WorkerIntrinsicDeps> {}

export interface CommandWorker {
(request: HttpRequest): Promise<HttpResponse>;
(request: HttpRequest, commandContext: CommandContext): Promise<HttpResponse>;
}

/**
Expand All @@ -28,7 +29,7 @@ export interface CommandWorker {
export function createCommandWorker(
deps: ApiHandlerDependencies
): CommandWorker {
registerWorkerIntrinsics(deps);
registerWorkerIntrinsics(deps as WorkerIntrinsicDeps);

const router = initRouter();

Expand All @@ -38,11 +39,11 @@ export function createCommandWorker(
* Each webhook registers routes on the central {@link router} which
* then handles the request.
*/
return function (request) {
return function (request, context) {
console.debug("request", request);
return serviceTypeScope(ServiceType.CommandWorker, async () => {
try {
const response = await router.handle(request);
const response = await router.handle(request, context);
if (response === undefined) {
if (request.method === "OPTIONS") {
return new HttpResponse(undefined, {
Expand Down Expand Up @@ -219,16 +220,22 @@ function initRouter() {
* @returns
*/
function withMiddleware(
handler: (request: HttpRequest, context: any) => Promise<HttpResponse>
handler: (
request: HttpRequest,
context: CommandContext
) => Promise<HttpResponse>
) {
return async (request: HttpRequest): Promise<HttpResponse> => {
return async (
request: HttpRequest,
context: CommandContext
): Promise<HttpResponse> => {
const chain = (command.middlewares ?? []).values();

return next(request, {});
return next(request, context);

async function next(
request: HttpRequest,
context: any
context: CommandContext
): Promise<HttpResponse> {
let consumed = false;
const middleware = chain.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import { EntityStreamItem } from "@eventual/core";
import {
ServiceType,
entities,
serviceTypeScope
serviceTypeScope,
} from "@eventual/core/internal";
import { getLazy } from "../utils.js";
import { WorkerIntrinsicDeps, registerWorkerIntrinsics } from "./utils.js";

export interface EntityStreamWorker {
Expand All @@ -15,7 +16,7 @@ interface EntityStreamWorkerDependencies extends WorkerIntrinsicDeps {}
export function createEntityStreamWorker(
dependencies: EntityStreamWorkerDependencies
): EntityStreamWorker {
registerWorkerIntrinsics(dependencies)
registerWorkerIntrinsics(dependencies);

return async (item) =>
serviceTypeScope(ServiceType.EntityStreamWorker, async () => {
Expand All @@ -25,6 +26,11 @@ export function createEntityStreamWorker(
if (!streamHandler) {
throw new Error(`Stream handler ${item.streamName} does not exist`);
}
return await streamHandler.handler(item);
return await streamHandler.handler(item, {
service: {
serviceName: getLazy(dependencies.serviceName),
serviceUrl: getLazy(dependencies.serviceUrl),
},
});
});
}
Loading

0 comments on commit 82860fd

Please sign in to comment.