Skip to content

Commit 3826843

Browse files
committed
[interpreter] Support Http1 based tests
1 parent c69673b commit 3826843

File tree

4 files changed

+159
-62
lines changed

4 files changed

+159
-62
lines changed

services/node-services/src/app.ts

+42-9
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@
77
// directory of this repository or package, or at
88
// https://github.com/restatedev/e2e/blob/main/LICENSE
99

10+
/* eslint-disable @typescript-eslint/no-explicit-any */
11+
1012
import * as restate from "@restatedev/restate-sdk";
13+
import { endpoint as fetchEndpoint } from "@restatedev/restate-sdk/fetch";
14+
import { endpoint as lambdaEndpoint } from "@restatedev/restate-sdk/lambda";
15+
import process from "node:process";
1116

1217
import "./awakeable_holder";
1318
import "./counter";
@@ -26,19 +31,47 @@ import "./interpreter/entry_point";
2631
import "./workflow";
2732

2833
import { REGISTRY } from "./services";
34+
import { serve as http1Server } from "./h1server";
35+
36+
class EndpointWrapper<K extends "fetch" | "lambda" | "node", T> {
37+
static fromEnvironment() {
38+
if (process.env.E2E_USE_FETCH) {
39+
return new EndpointWrapper("fetch", fetchEndpoint());
40+
} else if (process.env.AWS_LAMBDA_FUNCTION_NAME) {
41+
return new EndpointWrapper("lambda", lambdaEndpoint());
42+
} else {
43+
return new EndpointWrapper("node", restate.endpoint());
44+
}
45+
}
2946

30-
if (!process.env.SERVICES) {
31-
throw new Error("Cannot find SERVICES env");
47+
constructor(readonly kind: K, readonly endpoint: T) {}
3248
}
33-
const fqdns = new Set(process.env.SERVICES.split(","));
34-
const endpoint = restate.endpoint();
35-
REGISTRY.register(fqdns, endpoint);
49+
50+
const wrapper = EndpointWrapper.fromEnvironment();
51+
REGISTRY.registerFromEnvironment(wrapper.endpoint);
3652

3753
if (process.env.E2E_REQUEST_SIGNING) {
38-
endpoint.withIdentityV1(...process.env.E2E_REQUEST_SIGNING.split(","));
54+
const signing = process.env.E2E_REQUEST_SIGNING;
55+
wrapper.endpoint.withIdentityV1(...signing.split(","));
3956
}
40-
if (!process.env.AWS_LAMBDA_FUNCTION_NAME) {
41-
endpoint.listen();
57+
58+
switch (wrapper.kind) {
59+
case "node": {
60+
wrapper.endpoint.listen();
61+
break;
62+
}
63+
case "fetch": {
64+
http1Server(wrapper.endpoint.handler());
65+
break;
66+
}
67+
case "lambda": {
68+
// do nothing, handler is exported
69+
break;
70+
}
71+
default:
72+
throw new Error("Unknown endpoint type");
4273
}
4374

44-
export const handler = endpoint.lambdaHandler();
75+
// export for lambda. it will be only set for a lambda deployment
76+
export const handler =
77+
wrapper.kind == "lambda" ? wrapper.endpoint.handler() : undefined;

services/node-services/src/interpreter/test_containers.ts

+74-46
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,30 @@
1010
import {
1111
GenericContainer,
1212
Network,
13-
PullPolicy,
1413
StartedNetwork,
1514
StartedTestContainer,
1615
} from "testcontainers";
1716

17+
export interface EnvironmentSpec {
18+
restate: {
19+
image: string;
20+
env: Record<string, string>;
21+
pull?: boolean;
22+
};
23+
24+
interpreters: {
25+
image: string;
26+
env: Record<string, string>;
27+
pull?: boolean;
28+
};
29+
30+
service: {
31+
image: string;
32+
env: Record<string, string>;
33+
pull?: boolean;
34+
};
35+
}
36+
1837
export interface TestEnvironment {
1938
ingressUrl: string;
2039
adminUrl: string;
@@ -31,82 +50,91 @@ export interface Containers {
3150
servicesContainer: StartedTestContainer;
3251
}
3352

34-
export async function setupContainers(): Promise<TestEnvironment> {
53+
export async function setupContainers(
54+
env: EnvironmentSpec
55+
): Promise<TestEnvironment> {
56+
console.log(env);
57+
3558
const network = await new Network().start();
3659

37-
const restate = new GenericContainer("ghcr.io/restatedev/restate:main")
60+
const restate = new GenericContainer(env.restate.image)
3861
.withExposedPorts(8080, 9070)
3962
.withNetwork(network)
4063
.withNetworkAliases("restate")
41-
.withPullPolicy(PullPolicy.alwaysPull())
64+
.withPullPolicy({
65+
shouldPull() {
66+
return env.restate.pull ?? true;
67+
},
68+
})
4269
.withEnvironment({
4370
RESTATE_LOG_FILTER: "restate=warn",
4471
RESTATE_LOG_FORMAT: "json",
72+
...(env.restate?.env ?? {}),
4573
})
4674
.withUlimits({
4775
nproc: { soft: 65535, hard: 65535 },
4876
nofile: { soft: 65535, hard: 65535 },
4977
})
5078
.start();
5179

52-
const zero = new GenericContainer("ghcr.io/restatedev/e2e-node-services:main")
53-
.withNetwork(network)
54-
.withNetworkAliases("interpreter_zero")
55-
.withPullPolicy(PullPolicy.alwaysPull())
56-
.withEnvironment({
57-
PORT: "9000",
58-
RESTATE_LOGGING: "ERROR",
59-
NODE_ENV: "production",
60-
SERVICES: "ObjectInterpreterL0",
61-
})
62-
.start();
63-
64-
const one = new GenericContainer("ghcr.io/restatedev/e2e-node-services:main")
65-
.withNetwork(network)
66-
.withNetworkAliases("interpreter_one")
67-
.withPullPolicy(PullPolicy.alwaysPull())
68-
69-
.withExposedPorts(9001)
70-
.withEnvironment({
71-
PORT: "9001",
80+
const names = ["interpreter_zero", "interpreter_one", "interpreter_two"];
81+
const interpreters = [];
82+
for (let i = 0; i < 3; i++) {
83+
const port = 9000 + i;
84+
const auxEnv = {
85+
PORT: `${port}`,
7286
RESTATE_LOGGING: "ERROR",
7387
NODE_ENV: "production",
74-
SERVICES: "ObjectInterpreterL1",
75-
})
76-
.start();
88+
NODE_OPTIONS: "--max-old-space-size=4096",
89+
SERVICES: `ObjectInterpreterL${i}`,
90+
...(env.interpreters?.env ?? {}),
91+
};
92+
const interpreter = new GenericContainer(env.interpreters.image)
93+
.withNetwork(network)
94+
.withNetworkAliases(names[i])
95+
.withExposedPorts(port)
96+
.withPullPolicy({
97+
shouldPull() {
98+
return env.interpreters.pull ?? true;
99+
},
100+
})
101+
.withEnvironment(auxEnv)
102+
.withUlimits({
103+
nproc: { soft: 65535, hard: 65535 },
104+
nofile: { soft: 65535, hard: 65535 },
105+
})
106+
.start();
77107

78-
const two = new GenericContainer("ghcr.io/restatedev/e2e-node-services:main")
79-
.withNetwork(network)
80-
.withNetworkAliases("interpreter_two")
81-
.withPullPolicy(PullPolicy.alwaysPull())
82-
.withExposedPorts(9002)
83-
.withEnvironment({
84-
PORT: "9002",
85-
RESTATE_LOGGING: "ERROR",
86-
NODE_ENV: "production",
87-
SERVICES: "ObjectInterpreterL2",
88-
})
89-
.start();
108+
interpreters.push(interpreter);
109+
}
90110

91-
const services = new GenericContainer(
92-
"ghcr.io/restatedev/e2e-node-services:main"
93-
)
111+
const services = new GenericContainer(env.service.image)
94112
.withNetwork(network)
95113
.withNetworkAliases("services")
96-
.withPullPolicy(PullPolicy.alwaysPull())
114+
.withPullPolicy({
115+
shouldPull() {
116+
return env.service.pull ?? true;
117+
},
118+
})
97119
.withExposedPorts(9003)
98120
.withEnvironment({
99121
PORT: "9003",
100122
RESTATE_LOGGING: "ERROR",
101123
NODE_ENV: "production",
124+
NODE_OPTIONS: "--max-old-space-size=4096",
102125
SERVICES: "ServiceInterpreterHelper",
126+
...(env.service?.env ?? {}),
127+
})
128+
.withUlimits({
129+
nproc: { soft: 65535, hard: 65535 },
130+
nofile: { soft: 65535, hard: 65535 },
103131
})
104132
.start();
105133

106134
const restateContainer = await restate;
107-
const interpreterZeroContainer = await zero;
108-
const interpreterOneContainer = await one;
109-
const interpreterTwoContainer = await two;
135+
const interpreterZeroContainer = await interpreters[0];
136+
const interpreterOneContainer = await interpreters[1];
137+
const interpreterTwoContainer = await interpreters[2];
110138
const servicesContainer = await services;
111139

112140
const ingressUrl = `http://${restateContainer.getHost()}:${restateContainer.getMappedPort(

services/node-services/src/interpreter/test_driver.ts

+36-4
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ import {
1313
interpreterObjectForLayer,
1414
} from "./interpreter";
1515
import { Random } from "./random";
16-
import { setupContainers, tearDown, TestEnvironment } from "./test_containers";
16+
import {
17+
EnvironmentSpec,
18+
setupContainers,
19+
tearDown,
20+
TestEnvironment,
21+
} from "./test_containers";
1722
import { ProgramGenerator } from "./test_generator";
1823

1924
import * as restate from "@restatedev/restate-sdk-clients";
@@ -35,6 +40,7 @@ export interface TestConfiguration {
3540
readonly register?: TestConfigurationDeployments; // auto register the following endpoints
3641
readonly bootstrap?: boolean;
3742
readonly crashInterval?: number;
43+
readonly bootstrapEnv?: EnvironmentSpec;
3844
}
3945

4046
export enum TestStatus {
@@ -136,7 +142,11 @@ export class Test {
136142
console.log(`Ingress is ready.`);
137143
}
138144

139-
async registerEndpoints(adminUrl?: string, deployments?: string[]) {
145+
async registerEndpoints(
146+
useHttp11?: boolean,
147+
adminUrl?: string,
148+
deployments?: string[]
149+
) {
140150
if (!adminUrl) {
141151
throw new Error("Missing adminUrl");
142152
}
@@ -156,11 +166,13 @@ export class Test {
156166
await sleep(2000);
157167
}
158168
console.log("Admin is ready.");
169+
159170
for (const uri of deployments) {
160171
const res = await fetch(`${adminUrl}/deployments`, {
161172
method: "POST",
162173
body: JSON.stringify({
163174
uri,
175+
use_http_11: useHttp11 ?? false ? true : false,
164176
}),
165177
headers: {
166178
"Content-Type": "application/json",
@@ -191,8 +203,25 @@ export class Test {
191203
console.log(this.conf);
192204
this.status = TestStatus.RUNNING;
193205

206+
const useHttp11 = process.env.E2E_USE_FETCH?.toLocaleLowerCase() == "true";
194207
if (this.conf.bootstrap) {
195-
this.containers = await setupContainers();
208+
const env: EnvironmentSpec = this.conf.bootstrapEnv ?? {
209+
restate: {
210+
image: "ghcr.io/restatedev/restate:main",
211+
env: {},
212+
},
213+
214+
interpreters: {
215+
image: "ghcr.io/restatedev/e2e-node-services:main",
216+
env: {},
217+
},
218+
219+
service: {
220+
image: "ghcr.io/restatedev/e2e-node-services:main",
221+
env: {},
222+
},
223+
};
224+
this.containers = await setupContainers(env);
196225
console.log(this.containers);
197226
}
198227
const ingressUrl = this.containers?.ingressUrl ?? this.conf.ingress;
@@ -203,7 +232,10 @@ export class Test {
203232
let ingress = restate.connect({ url: ingressUrl });
204233

205234
if (deployments) {
206-
await this.registerEndpoints(adminUrl, deployments);
235+
console.log(useHttp11);
236+
console.log(adminUrl);
237+
console.log(deployments);
238+
await this.registerEndpoints(useHttp11, adminUrl, deployments);
207239
}
208240
await this.ingressReady(ingressUrl);
209241

services/node-services/src/services.ts

+7-3
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@
88
// https://github.com/restatedev/e2e/blob/main/LICENSE
99

1010
import {
11-
RestateEndpoint,
1211
ServiceDefinition,
1312
VirtualObjectDefinition,
1413
WorkflowDefinition,
1514
} from "@restatedev/restate-sdk";
1615

1716
export type IComponent = {
1817
fqdn: string;
19-
binder: (endpoint: RestateEndpoint) => void;
18+
binder: (endpoint: { bind: (what: unknown) => void }) => void;
2019
};
2120

2221
export class ComponentRegistry {
@@ -47,7 +46,12 @@ export class ComponentRegistry {
4746
});
4847
}
4948

50-
register(fqdns: Set<string>, e: RestateEndpoint) {
49+
/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
50+
registerFromEnvironment(e: { bind: (what: any) => any }) {
51+
if (!process.env.SERVICES) {
52+
throw new Error("Cannot find SERVICES env");
53+
}
54+
const fqdns = new Set(process.env.SERVICES.split(","));
5155
fqdns.forEach((fqdn) => {
5256
const c = this.components.get(fqdn);
5357
if (!c) {

0 commit comments

Comments
 (0)