Skip to content

Commit

Permalink
fix: fix tests, local, and socket middleware interface (#447)
Browse files Browse the repository at this point in the history
  • Loading branch information
thantos authored Sep 16, 2023
1 parent 7554525 commit 0bc5c3f
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 112 deletions.
4 changes: 4 additions & 0 deletions apps/tests/aws-runtime-cdk/src/app.mts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ new CfnOutput(stack, "serviceUrl", {
value: testService.gateway.apiEndpoint,
});

new CfnOutput(stack, "testSocketUrl", {
value: testService.sockets.socket1.gatewayStage.url,
});

new CfnOutput(stack, "chaosParamName", {
value: chaosExtension.ssm.parameterName,
});
3 changes: 3 additions & 0 deletions apps/tests/aws-runtime/test/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ const outputs = fs.existsSync(path.resolve(outputsFile))
export const awsRegion = () => process.env.AWS_REGION ?? "us-east-1";
export const serviceUrl = () =>
outputs?.["eventual-tests"]?.serviceUrl ?? "http://localhost:3111";
export const testSocketUrl = () =>
outputs?.["eventual-tests"]?.testSocketUrl ??
"http://localhost:3111/__ws/socket1";
export const chaosSSMParamName = () =>
outputs?.["eventual-tests"]?.chaosParamName;
123 changes: 31 additions & 92 deletions apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import {
} from "@eventual/core";
import type openapi from "openapi3-ts";
import { Readable } from "stream";
import { WebSocket } from "ws";
import z from "zod";
import { AsyncWriterTestEvent } from "./async-writer-handler.js";

Expand Down Expand Up @@ -1392,7 +1391,7 @@ export const searchBlog = command(
* 7. the socket will resolve the value in the connection, completing the test
*/

interface SocketMessage {
export interface SocketMessage {
id: string;
v: number;
}
Expand All @@ -1413,31 +1412,33 @@ const jsonSocket = socket.use({
},
});

export const socket1 = jsonSocket.use(({ request, context, next }) => {
const { id, n } = (request.query ?? {}) as { n?: string; id?: string };
if (!id || !n) {
throw new Error("Missing ID");
}
return next({ ...context, id, n });
})("socket1", {
$connect: async ({ connectionId }, { id, n }) => {
console.log("sending signal to", id);
await socketConnectSignal.sendSignal(id, {
connectionId,
n: Number(n),
});
console.log("signal sent to", id);
},
$disconnect: async () => undefined,
$default: async ({ connectionId }, { data }) => {
console.log("sending signal to", data.id);
await socketMessageSignal.sendSignal(data.id, {
...data,
connectionId,
});
console.log("signal sent to", data.id);
},
});
export const socket1 = jsonSocket
.use(({ request, context, next }) => {
const { id, n } = (request.query ?? {}) as { n?: string; id?: string };
if (!id || !n) {
throw new Error("Missing ID");
}
return next({ ...context, id, n });
})
.socket("socket1", {
$connect: async ({ connectionId }, { id, n }) => {
console.log("sending signal to", id);
await socketConnectSignal.sendSignal(id, {
connectionId,
n: Number(n),
});
console.log("signal sent to", id);
},
$disconnect: async () => undefined,
$default: async ({ connectionId }, { data }) => {
console.log("sending signal to", data.id);
await socketMessageSignal.sendSignal(data.id, {
...data,
connectionId,
});
console.log("signal sent to", data.id);
},
});

export const socketConnectSignal = signal<{ connectionId: string; n: number }>(
"socketConnectSignal"
Expand All @@ -1447,13 +1448,13 @@ export const socketMessageSignal = signal<{
v: number;
}>("socketMessageSignal");

interface StartSocketEvent {
export interface StartSocketEvent {
type: "start";
n: number;
v: number;
}

interface DataSocketEvent {
export interface DataSocketEvent {
type: "data";
n: number;
v: number;
Expand Down Expand Up @@ -1520,69 +1521,7 @@ export const socketTest = command(
const { executionId } = await socketWorkflow.startExecution({
input: undefined,
});
const encodedId = encodeURIComponent(executionId);

console.log("pre-socket");

const ws1 = new WebSocket(`${socket1.wssEndpoint}?id=${encodedId}&n=0`);
const ws2 = new WebSocket(`${socket1.wssEndpoint}?id=${encodedId}&n=1`);

console.log("setup-socket");

const running1 = setupWS(executionId, ws1);
const running2 = setupWS(executionId, ws2);

console.log("waiting...");

return await Promise.all([running1, running2]);
return executionId;
}
);

function setupWS(executionId: string, ws: WebSocket) {
let n: number | undefined;
let v: number | undefined;
return new Promise<number>((resolve, reject) => {
ws.on("error", (err) => {
console.log("error", err);
reject(err);
});
ws.on("message", (data) => {
try {
console.log(n, "message");
const d = (data as Buffer).toString("utf8");
console.log(d);
const event = JSON.parse(d) as StartSocketEvent | DataSocketEvent;
if (event.type === "start") {
n = event.n;
// after connecting, we will send our "n" and incremented "value" back.
ws.send(
JSON.stringify({
id: executionId,
v: event.v + 1,
} satisfies SocketMessage)
);
} else if (event.type === "data") {
v = event.v;
} else {
console.log("unexpected event", event);
reject(event);
}
} catch (err) {
console.error(err);
reject(err);
}
});
ws.on("close", (code, reason) => {
try {
console.log(code, reason.toString("utf-8"));
console.log(n, "close", v);
if (n === undefined) {
throw new Error("n was not set");
}
resolve(v ?? -1);
} catch (err) {
reject(err);
}
});
});
}
78 changes: 74 additions & 4 deletions apps/tests/aws-runtime/test/tester.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@ import {
ServiceContext,
} from "@eventual/core";
import { jest } from "@jest/globals";
import { WebSocket } from "ws";
import { ChaosEffects, ChaosTargets } from "./chaos-extension/chaos-engine.js";
import { serviceUrl } from "./env.js";
import { serviceUrl, testSocketUrl } from "./env.js";
import { eventualRuntimeTestHarness } from "./runtime-test-harness.js";
import type * as TestService from "./test-service.js";
import {
allCommands,
asyncWorkflow,
bucketWorkflow,
createAndDestroyWorkflow,
DataSocketEvent,
entityWorkflow,
eventDrivenWorkflow,
failedWorkflow,
heartbeatWorkflow,
parentWorkflow,
queueWorkflow,
SocketMessage,
StartSocketEvent,
timedOutWorkflow,
timedWorkflow,
transactionWorkflow,
Expand Down Expand Up @@ -302,6 +306,7 @@ eventualRuntimeTestHarness(
);

const url = serviceUrl();
const socketUrl = testSocketUrl();

test("hello API should route and return OK response", async () => {
const restResponse = await (await fetch(`${url}/hello`)).json();
Expand Down Expand Up @@ -445,15 +450,80 @@ test("test service context", async () => {
});

test("socket test", async () => {
const rpcResponse = await (
const executionId = (await (
await fetch(`${url}/${commandRpcPath({ name: "socketTest" })}`, {
method: "POST",
})
).json();
).json()) as string;

const encodedId = encodeURIComponent(executionId);

console.log("pre-socket");

const ws1 = new WebSocket(`${socketUrl}?id=${encodedId}&n=0`);
const ws2 = new WebSocket(`${socketUrl}?id=${encodedId}&n=1`);

console.log("setup-socket");

const running1 = setupWS(executionId, ws1);
const running2 = setupWS(executionId, ws2);

console.log("waiting...");

const result = await Promise.all([running1, running2]);

expect(rpcResponse).toEqual([3, 4]);
expect(result).toEqual([3, 4]);
});

function setupWS(executionId: string, ws: WebSocket) {
let n: number | undefined;
let v: number | undefined;
return new Promise<number>((resolve, reject) => {
ws.on("error", (err) => {
console.log("error", err);
reject(err);
});
ws.on("message", (data) => {
try {
console.log(n, "message");
const d = (data as Buffer).toString("utf8");
console.log(d);
const event = JSON.parse(d) as StartSocketEvent | DataSocketEvent;
if (event.type === "start") {
n = event.n;
// after connecting, we will send our "n" and incremented "value" back.
ws.send(
JSON.stringify({
id: executionId,
v: event.v + 1,
} satisfies SocketMessage)
);
} else if (event.type === "data") {
v = event.v;
} else {
console.log("unexpected event", event);
reject(event);
}
} catch (err) {
console.error(err);
reject(err);
}
});
ws.on("close", (code, reason) => {
try {
console.log(code, reason.toString("utf-8"));
console.log(n, "close", v);
if (n === undefined) {
throw new Error("n was not set");
}
resolve(v ?? -1);
} catch (err) {
reject(err);
}
});
});
}

if (!process.env.TEST_LOCAL) {
test("index.search", async () => {
const serviceClient = new ServiceClient<typeof TestService>({
Expand Down
2 changes: 1 addition & 1 deletion packages/@eventual/cli/src/commands/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ export const local = (yargs: Argv) =>
const hasSockets = serviceSpec.sockets.length > 0;

if (hasSockets) {
const wss = new WebSocketServer({ server });
const wss = new WebSocketServer({ noServer: true });

server.on("upgrade", (request, socket, head) => {
if (request.url?.startsWith("/__ws/")) {
Expand Down
12 changes: 12 additions & 0 deletions packages/@eventual/compiler/src/ast-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ export function isSocketResourceCall(call: CallExpression): boolean {
return false;
}

export function isSocketMemberCall(call: CallExpression): boolean {
const c = call.callee;
if (c.type === "MemberExpression") {
if (isId(c.property, "socket")) {
// socket.use().socket("handlerName", async () => { })
// socket.use().socket("handlerName", options, async () => { })
return call.arguments.length === 2 || call.arguments.length === 3;
}
}
return false;
}

/**
* A heuristic for identifying a {@link CallExpression} that is a call to an `subscription` handler.
*
Expand Down
2 changes: 2 additions & 0 deletions packages/@eventual/compiler/src/eventual-infer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
isEntityStreamMemberCall,
isOnEventCall,
isQueueResourceCall,
isSocketMemberCall,
isSocketResourceCall,
isSubscriptionCall,
isTaskCall,
Expand Down Expand Up @@ -279,6 +280,7 @@ export class InferVisitor extends Visitor {
isBucketHandlerMemberCall,
isQueueResourceCall,
isSocketResourceCall,
isSocketMemberCall,
].some((op) => op(call))
) {
this.didMutate = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ globalThis.tryGetEventualHook ??= () => {
return void 0;
};
function createSocketBuilder(middlewares) {
const socketFunction = (...args) => {
function createSocketFunction(middlewares) {
return (...args) => {
const { sourceLocation, name, options, handlers } = parseSocketArgs(args);
const socket2 = {
middlewares,
Expand Down Expand Up @@ -350,14 +350,27 @@ function createSocketBuilder(middlewares) {
};
return registerEventualResource("Socket", socket2);
};
const useFunction = (socketMiddleware) => {
}
function createUseFunction(middlewares) {
return (socketMiddleware) => {
const middleware = typeof socketMiddleware === "function" ? { connect: socketMiddleware } : socketMiddleware;
return createSocketBuilder([...middlewares, middleware]);
return createSocketRouter([...middlewares, middleware]);
};
}
function createSocketRouter(middlewares) {
return {
middlewares,
use: createUseFunction(middlewares),
socket: createSocketFunction(middlewares)
};
}
function createSocketBuilder() {
const socketFunction = createSocketFunction([]);
const useFunction = createUseFunction([]);
socketFunction.use = useFunction;
return socketFunction;
}
var socket = createSocketBuilder([]);
var socket = createSocketBuilder();
function parseSocketArgs(args) {
return parseArgs(args, {
sourceLocation: isSourceLocation,
Expand Down
Loading

0 comments on commit 0bc5c3f

Please sign in to comment.