Skip to content

Commit

Permalink
spike: introduce generic workload interface
Browse files Browse the repository at this point in the history
  • Loading branch information
aarlaud committed Jan 31, 2025
1 parent 987fb70 commit f16e52c
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,24 @@ import { HybridClientRequestHandler } from '../hybrid-sdk/clientRequestHelpers';
import { incrementHttpRequestsTotal } from '../common/utils/metrics';
import { filterClientRequest } from './requestFiltering';
import { log as logger } from '../logs/logger';
import {
LocalClientWorkloadRuntimeParams,
Workload,
WorkloadType,
} from '../hybrid-sdk/workloadFactory';

export class BrokerClientRequestWorkload {
export class BrokerClientRequestWorkload extends Workload<WorkloadType.localClient> {
req: Request;
res: Response;
options;
constructor(req, res, options) {
super('broker', WorkloadType['local-client']);
this.req = req;
this.res = res;
this.options = options;
}

async handler(makeRequestOverHttp = false) {
async handler(data: LocalClientWorkloadRuntimeParams) {
const hybridClientRequestHandler = new HybridClientRequestHandler(
this.req,
this.res,
Expand All @@ -38,7 +44,7 @@ export class BrokerClientRequestWorkload {
} else {
hybridClientRequestHandler.makeRequest(
filterResponse,
makeRequestOverHttp,
data.makeRequestOverHttp,
);
incrementHttpRequestsTotal(false, 'inbound-request');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ import {
makeStreamingRequestToDownstream,
} from '../hybrid-sdk/http/request';
import { logError } from '../logs/log';
import {
RemoteServerWorkloadRuntimeParams,
Workload,
WorkloadType,
} from '../hybrid-sdk/workloadFactory';

export class BrokerWorkload {
export class BrokerWorkload extends Workload<WorkloadType.remoteServer> {
options;
connectionIdentifier: string;
websocketConnectionHandler;
Expand All @@ -27,12 +32,15 @@ export class BrokerWorkload {
options,
websocketConnectionHandler,
) {
super('broker', WorkloadType['remote-server']);
this.options = options;
this.connectionIdentifier = connectionIdentifier;
this.websocketConnectionHandler = websocketConnectionHandler;
}

async handler(payload, websocketResponseHandler) {
async handler(data: RemoteServerWorkloadRuntimeParams) {
const { payload, websocketHandler } = data;
const websocketResponseHandler = websocketHandler;
if (this.options.config.universalBrokerEnabled) {
payload.connectionIdentifier = this.connectionIdentifier;
}
Expand Down
25 changes: 22 additions & 3 deletions lib/common/relay/forwardHttpRequest.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import { Request, Response } from 'express';

import { LoadedClientOpts, LoadedServerOpts } from '../types/options';
import { BrokerClientRequestWorkload } from '../../broker-workload/clientRequest';
import {
LocalClientWorkloadRuntimeParams,
Workload,
WorkloadType,
} from '../../hybrid-sdk/workloadFactory';
import { BrokerClientRequestWorkload } from '../../broker-workload/clientRequests';

// 1. Request coming in over HTTP conn (logged)
// 2. Filter for rule match (log and block if no match)
Expand All @@ -13,7 +18,21 @@ export const forwardHttpRequest = (
makeHttpRequest = false,
) => {
return async (req: Request, res: Response) => {
const workload = new BrokerClientRequestWorkload(req, res, options);
await workload.handler(makeHttpRequest);
const workloadName =
options.config.workloadName ?? 'BrokerClientRequestWorkload';
const workloadModulePath =
options.config.workloadModulePath ?? '../broker-workload/clientRequests';

const workload = (await Workload.instantiate(
workloadName,
workloadModulePath,
WorkloadType.localClient,
{ req, res, options },
)) as BrokerClientRequestWorkload;

const data: LocalClientWorkloadRuntimeParams = {
makeRequestOverHttp: makeHttpRequest,
};
await workload.handler(data);
};
};
30 changes: 23 additions & 7 deletions lib/common/relay/forwardWebsocketRequest.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { RequestPayload } from '../types/http';
import { WebSocketConnection } from '../../client/types/client';
import { LoadedClientOpts, LoadedServerOpts } from '../types/options';
import { BrokerWorkload } from '../../broker-workload';
import {
RemoteServerWorkloadRuntimeParams,
Workload,
WorkloadType,
} from '../../hybrid-sdk/workloadFactory';
import { BrokerWorkload } from '../../broker-workload/websocketRequests';
// import { BrokerWorkload } from '../../broker-workload/websocketRequests';

export const forwardWebSocketRequest = (
options: LoadedClientOpts | LoadedServerOpts,
Expand All @@ -14,11 +20,21 @@ export const forwardWebSocketRequest = (
// 5. Send response over websocket conn

return (connectionIdentifier) => async (payload: RequestPayload, emit) => {
const workload = new BrokerWorkload(
connectionIdentifier,
options,
websocketConnectionHandler,
);
await workload.handler(payload, emit);
const workloadName = options.config.workloadName ?? 'BrokerWorkload';
const workloadModulePath =
options.config.workloadModulePath ??
'../broker-workload/websocketRequests';
const workload = (await Workload.instantiate(
workloadName,
workloadModulePath,
WorkloadType.remoteServer,
{ connectionIdentifier, options, websocketConnectionHandler },
)) as BrokerWorkload;

const data: RemoteServerWorkloadRuntimeParams = {
payload,
websocketHandler: emit,
};
await workload.handler(data);
};
};
109 changes: 109 additions & 0 deletions lib/hybrid-sdk/workloadFactory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// import { BrokerClientRequestWorkload } from '../broker-workload/clientRequests';
// import { BrokerWorkload } from '../broker-workload/websocketRequests';

export enum WorkloadType {
remoteServer = 'remoteServer',
localClient = 'localClient',
}

export interface RemoteServerWorkloadParams {
connectionIdentifier: string;
options: any;
websocketConnectionHandler: any;
}

export interface LocalClientWorkloadParams {
req: any;
res: any;
options: any;
}

export interface RemoteServerWorkloadRuntimeParams {
payload: any;
websocketHandler: any;
}
export interface LocalClientWorkloadRuntimeParams {
makeRequestOverHttp?: boolean;
}
// export type WorkloadRuntimeParamType<T extends WorkloadType> =
// T extends WorkloadType.remoteServer
// ? RemoteServerWorkloadRuntimeParams
// : LocalClientWorkloadRuntimeParams;
type WorkloadRuntimeParamType<
T extends WorkloadType.localClient | WorkloadType.remoteServer,
> = T extends WorkloadType.remoteServer
? RemoteServerWorkloadRuntimeParams
: T extends WorkloadType.localClient
? LocalClientWorkloadRuntimeParams
: never;
export type WorkloadRuntimeReturnType = Promise<void> | Promise<any>;

interface WorkloadModule {
default: new (
connectionIdentifier: string,
options: any,
websocketConnectionHandler: any,
) => Workload<WorkloadType.remoteServer>;
}

export abstract class Workload<
T extends WorkloadType.localClient | WorkloadType.remoteServer,
> {
type: WorkloadType;
name: string;

constructor(name: string, type: WorkloadType) {
this.name = name;
this.type = type;
}
abstract handler(
data: WorkloadRuntimeParamType<T>,
): WorkloadRuntimeReturnType;

// abstract handler(makeRequestOverHttp: boolean): void;
// abstract handler(payload: any, websocketHandler: any): void;

private static async instantiateRemoteServerWorkload(
name: string,
path: string,
params: RemoteServerWorkloadParams,
): Promise<Workload<WorkloadType.remoteServer>> {
const { connectionIdentifier, options, websocketConnectionHandler } =
params;
const importedModule = (await import(path)) as WorkloadModule;
const WorkloadClass = importedModule[name];
return new WorkloadClass(
connectionIdentifier,
options,
websocketConnectionHandler,
);
}
private static async instantiateLocalClientWorkload(
name: string,
path: string,
params: LocalClientWorkloadParams,
): Promise<Workload<WorkloadType.localClient>> {
const { req, res, options } = params;
const importedModule = (await import(path)) as WorkloadModule;
const WorkloadClass = importedModule[name];
return new WorkloadClass(req, res, options);
}

static async instantiate(
name: string,
path: string,
type: WorkloadType.localClient | WorkloadType.remoteServer,
params,
): Promise<
Workload<WorkloadType.localClient> | Workload<WorkloadType.remoteServer>
> {
switch (type) {
case WorkloadType.remoteServer:
return await this.instantiateRemoteServerWorkload(name, path, params);
case WorkloadType.localClient:
return await this.instantiateLocalClientWorkload(name, path, params);
default:
throw new Error(`Error loading workload - unknown type ${type}`);
}
}
}

0 comments on commit f16e52c

Please sign in to comment.