Skip to content

Commit

Permalink
feat(cmd-api-server): add ConnectRPC auto-registration for plugins
Browse files Browse the repository at this point in the history
1. This is enabling plugins to expose their operations via ConnectRPC
services which is very similar to gRPC but it comes with a few extra
bells and whistles that can come in very handy.
2. There is an upcoming pull request that makes it so that the keychain
memory plugin implements and registers its services via this newly added
hook of the API server. The importance of this is that test coverage for
the code in this commit resides on another branch, meaning that even though
there are no new test cases on this branch, the feature has been extensively
tested and there is test-automation in place to continue verifying it
as well.
3. The main difference between the hook methods are that for CRPC the
API server expects an array of service definition+implementation pairs
instead of just a single one. This was a design decision forced by the
issues with implementing separate services in a single class: The compiler
was hard to appease in a way that kept the code clean. gRPC did not suffer
from this and therefore the registration methods defined for that only
return a single gRPC service defintion+implementation pair which can combine
any number of .proto services.

Signed-off-by: Peter Somogyvari <peter.somogyvari@accenture.com>
  • Loading branch information
petermetz committed Apr 9, 2024
1 parent afce155 commit c569460
Show file tree
Hide file tree
Showing 4 changed files with 717 additions and 10 deletions.
9 changes: 9 additions & 0 deletions packages/cactus-cmd-api-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
"webpack:dev:web": "webpack --env=dev --target=web --config ../../webpack.config.js"
},
"dependencies": {
"@connectrpc/connect": "1.4.0",
"@connectrpc/connect-express": "1.4.0",
"@connectrpc/connect-fastify": "1.4.0",
"@connectrpc/connect-node": "1.4.0",
"@grpc/grpc-js": "1.10.3",
"@grpc/proto-loader": "0.7.8",
"@hyperledger/cactus-common": "2.0.0-alpha.2",
Expand All @@ -78,6 +82,7 @@
"express-jwt": "8.4.1",
"express-openapi-validator": "5.0.4",
"express-rate-limit": "6.7.0",
"fastify": "4.26.2",
"fs-extra": "10.1.0",
"google-protobuf": "3.18.0-rc.2",
"jose": "4.15.5",
Expand All @@ -95,6 +100,10 @@
"uuid": "9.0.1"
},
"devDependencies": {
"@bufbuild/buf": "1.30.0",
"@bufbuild/protobuf": "1.8.0",
"@bufbuild/protoc-gen-es": "1.8.0",
"@connectrpc/protoc-gen-connect-es": "1.4.0",
"@hyperledger/cactus-plugin-keychain-vault": "2.0.0-alpha.2",
"@hyperledger/cactus-test-tooling": "2.0.0-alpha.2",
"@openapitools/openapi-generator-cli": "2.7.0",
Expand Down
168 changes: 164 additions & 4 deletions packages/cactus-cmd-api-server/src/main/typescript/api-server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import type { AddressInfo } from "net";
import { Http2Server, Http2ServerRequest } from "node:http2";
import { Http2ServerResponse } from "node:http2";
import type { Server as SecureServer } from "https";
import type { Request, Response, RequestHandler } from "express";
import type { ServerOptions as SocketIoServerOptions } from "socket.io";
Expand All @@ -16,6 +18,15 @@ import fs from "fs-extra";
import expressHttpProxy from "express-http-proxy";
import { Server as GrpcServer } from "@grpc/grpc-js";
import { ServerCredentials as GrpcServerCredentials } from "@grpc/grpc-js";
import { expressConnectMiddleware } from "@connectrpc/connect-express";
import { ConnectRouter } from "@connectrpc/connect";
import {
fastify,
FastifyBaseLogger,
FastifyInstance,
FastifyTypeProviderDefault,
} from "fastify";
import { fastifyConnectPlugin } from "@connectrpc/connect-fastify";
import express from "express";
import { OpenAPIV3 } from "express-openapi-validator/dist/framework/types";
import compression from "compression";
Expand All @@ -24,6 +35,7 @@ import cors, { CorsOptionsDelegate, CorsRequest } from "cors";

import { Server as SocketIoServer } from "socket.io";
import { authorize as authorizeSocket } from "@thream/socketio-jwt";
import { ServiceType } from "@bufbuild/protobuf";

import {
ICactusPlugin,
Expand All @@ -35,6 +47,8 @@ import {
Constants,
PluginImportAction,
isIPluginGrpcService,
isIPluginCrpcService,
ICrpcSvcRegistration,
} from "@hyperledger/cactus-core-api";

import {
Expand Down Expand Up @@ -74,6 +88,13 @@ export interface IApiServerConstructorOptions {
readonly httpServerApi?: Server | SecureServer;
readonly wsServerApi?: SocketIoServer;
readonly grpcServer?: GrpcServer;
readonly crpcServer?: FastifyInstance<
Http2Server,
Http2ServerRequest,
Http2ServerResponse,
FastifyBaseLogger,
FastifyTypeProviderDefault
>;
readonly wsOptions?: SocketIoServerOptions;
readonly httpServerCockpit?: Server | SecureServer;
readonly config: ICactusApiServerOptions;
Expand All @@ -100,6 +121,13 @@ export class ApiServer {
private readonly httpServerCockpit?: Server | SecureServer;
private readonly wsApi: SocketIoServer;
private readonly grpcServer: GrpcServer;
private readonly crpcServer: FastifyInstance<
Http2Server,
Http2ServerRequest,
Http2ServerResponse,
FastifyBaseLogger,
FastifyTypeProviderDefault
>;
private readonly expressApi: express.Express;
private readonly expressCockpit: express.Express;
private readonly pluginsPath: string;
Expand Down Expand Up @@ -158,6 +186,10 @@ export class ApiServer {
}
}

this.crpcServer =
this.options.crpcServer ||
fastify({ http2: true, forceCloseConnections: true });

this.grpcServer = this.options.grpcServer || new GrpcServer({});
this.wsApi = new SocketIoServer();
this.expressApi = express();
Expand Down Expand Up @@ -212,9 +244,10 @@ export class ApiServer {
}

async start(): Promise<{
addressInfoCockpit?: AddressInfo;
addressInfoApi: AddressInfo;
addressInfoGrpc: AddressInfo;
readonly addressInfoCockpit?: AddressInfo;
readonly addressInfoApi: AddressInfo;
readonly addressInfoGrpc: AddressInfo;
readonly addressInfoCrpc: AddressInfo;
}> {
this.checkNodeVersion();
const tlsMaxVersion = this.options.config.tlsDefaultMaxVersion;
Expand All @@ -229,6 +262,8 @@ export class ApiServer {
}
const addressInfoApi = await this.startApiServer();
const addressInfoGrpc = await this.startGrpcServer();
const { addressInfoCrpc, crpcUrl } = await this.startCrpcServer();
this.log.debug("Cacti CRPC reachable %s", crpcUrl);

{
const { port, address } = addressInfoGrpc;
Expand All @@ -252,7 +287,12 @@ export class ApiServer {
this.log.info(`Cactus Cockpit reachable ${httpUrl}`);
}

return { addressInfoCockpit, addressInfoApi, addressInfoGrpc };
return {
addressInfoCockpit,
addressInfoApi,
addressInfoGrpc,
addressInfoCrpc,
};
} catch (ex1: unknown) {
const context = "Failed to start ApiServer";
this.log.error(context, ex1);
Expand Down Expand Up @@ -465,6 +505,14 @@ export class ApiServer {
this.log.info(`Close HTTP server of the cockpit OK`);
}

if (this.crpcServer) {
const fastifyPlugins = this.crpcServer.printPlugins();
this.log.info("Fastify plugin list: %o", fastifyPlugins);
this.log.info(`Closing Cacti CRPC HTTP server ...`);
await this.crpcServer.close();
this.log.info(`Closed Cacti CRPC HTTP server OK`);
}

if (this.grpcServer) {
await new Promise<void>((resolve, reject) => {
this.log.info(`Draining Cacti gRPC server ...`);
Expand Down Expand Up @@ -655,6 +703,78 @@ export class ApiServer {
app[httpVerbPrometheus](httpPathPrometheus, prometheusExporterHandler);
}

async createCrpcExpressMiddlewareHandler(): Promise<{
readonly svcCount: number;
readonly crpcMiddlewareHandler: express.RequestHandler;
}> {
const { crpcRoutesHandler, svcCount } =
await this.createCrpcRoutesHandler();

const crpcMiddlewareHandler = expressConnectMiddleware({
routes: crpcRoutesHandler,
});

return { svcCount, crpcMiddlewareHandler };
}

async createCrpcRoutesHandler(): Promise<{
readonly svcCount: number;
readonly crpcRoutesHandler: (router: ConnectRouter) => void;
}> {
const fnTag = `${this.className}#registerCrpcServices()}`;
const { log } = this;

const crpcSvcRegistrations = await this.createCrpcServicesOfPlugins();
const crpcSvcRegCount = crpcSvcRegistrations.length;

log.debug("%s Obtained %o Crpc registrations.", fnTag, crpcSvcRegCount);

const crpcRoutesHandler = (router: ConnectRouter) => {
log.debug("%s expressConnectMiddleware() routes handler", fnTag);

crpcSvcRegistrations.forEach((it) => {
log.debug("%s Registering %s", fnTag, it.serviceName);
router.service(it.definition, it.implementation, it.options);
});
};

return { svcCount: crpcSvcRegCount, crpcRoutesHandler };
}

async startCrpcServer(): Promise<{
readonly addressInfoCrpc: AddressInfo;
readonly crpcUrl: string;
}> {
const fn = `${this.className}#startCrpcServer()`;
const { log, options } = this;

const { config } = options;
const { crpcHost, crpcPort } = config;

const { crpcRoutesHandler, svcCount } =
await this.createCrpcRoutesHandler();

log.debug("%s Registering %o CRPC routes handler(s).", fn, svcCount);

const registration = await this.crpcServer.register(fastifyConnectPlugin, {
routes: crpcRoutesHandler,
shutdownTimeoutMs: 5000,
grpc: true,
grpcWeb: true,
});
log.debug("%s Fastify registration OK=%o", fn, registration);

const crpcUrl = await this.crpcServer.listen({
host: crpcHost,
port: crpcPort,
});
log.debug("%s Fastify listen() crpcUrl: %s", fn, crpcUrl);

const [addressInfoCrpc, ...addresses] = this.crpcServer.addresses();
log.debug("%s server is listening at", fn, addressInfoCrpc, addresses);
return { crpcUrl, addressInfoCrpc };
}

async startGrpcServer(): Promise<AddressInfo> {
const fnTag = `${this.className}#startGrpcServer()`;
const { log } = this;
Expand Down Expand Up @@ -732,6 +852,39 @@ export class ApiServer {
});
}

async createCrpcServicesOfPlugins(): Promise<
ICrpcSvcRegistration<ServiceType>[]
> {
const fnTag = `${this.className}#startCrpcServer()`;
const { log } = this;
const { logLevel } = this.options.config;
const pluginRegistry = await this.getOrInitPluginRegistry();

log.debug("Installing crpc services of IPluginCrpcService instances...");

const out: ICrpcSvcRegistration<ServiceType>[] = [];

const plugins = pluginRegistry.getPlugins();

const tasksDone = plugins.map(async (x: ICactusPlugin) => {
if (!isIPluginCrpcService(x)) {
this.log.debug("%s skipping %s instance", fnTag, x.getPackageName());
return;
}
const opts = { logLevel };
log.info("%s Creating crpc service of: %s", fnTag, x.getPackageName());

const svcRegistrations = await x.createCrpcSvcRegistrations(opts);
log.debug("%s Got %o Crpc svc defs:", fnTag, svcRegistrations.length);

svcRegistrations.forEach((it) => out.push(it));
});

await Promise.all(tasksDone);

return out;
}

async startApiServer(): Promise<AddressInfo> {
const { options, expressApi: app, wsApi } = this;
const { config } = options;
Expand All @@ -744,6 +897,13 @@ export class ApiServer {

const pluginRegistry = await this.getOrInitPluginRegistry();

const { svcCount, crpcMiddlewareHandler } =
await this.createCrpcExpressMiddlewareHandler();

this.log.info("Registered %o Crpc services OK", svcCount);

app.use(crpcMiddlewareHandler);

app.use(compression() as RequestHandler);

const apiCorsDomainCsv = this.options.config.apiCorsDomainCsv;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ convict.addFormat(FORMAT_PLUGIN_ARRAY);
convict.addFormat(ipaddress);

export interface ICactusApiServerOptions {
crpcPort: number;
crpcHost: string;
pluginManagerOptionsJson: string;
authorizationProtocol: AuthorizationProtocol;
authorizationConfigJson: IAuthorizationConfig;
Expand Down Expand Up @@ -90,6 +92,20 @@ export class ConfigService {

private static getConfigSchema(): Schema<ICactusApiServerOptions> {
return {
crpcHost: {
doc: "The host to bind the CRPC fastify instance to. Secure default is: 127.0.0.1. Use 0.0.0.0 to bind for any host.",
format: "ipaddress",
env: "CRPC_HOST",
arg: "crpc-host",
default: "127.0.0.1",
},
crpcPort: {
doc: "The HTTP port to bind the CRPC fastify server endpoints to.",
format: "port",
env: "CRPC_PORT",
arg: "crpc-port",
default: 6000,
},
pluginManagerOptionsJson: {
doc: "Can be used to override npm registry and authentication details for example. See https://www.npmjs.com/package/live-plugin-manager#pluginmanagerconstructoroptions-partialpluginmanageroptions for further details.",
format: "*",
Expand Down Expand Up @@ -554,6 +570,8 @@ export class ConfigService {
};

return {
crpcHost: (schema.crpcHost as SchemaObj).default,
crpcPort: (schema.crpcPort as SchemaObj).default,
pluginManagerOptionsJson: "{}",
authorizationProtocol: AuthorizationProtocol.JSON_WEB_TOKEN,
authorizationConfigJson,
Expand Down
Loading

1 comment on commit c569460

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 0.05.

Benchmark suite Current: c569460 Previous: 5762dad Ratio
cmd-api-server_HTTP_GET_getOpenApiSpecV1 579 ops/sec (±1.69%) 550 ops/sec (±1.49%) 0.95
cmd-api-server_gRPC_GetOpenApiSpecV1 359 ops/sec (±1.44%) 355 ops/sec (±1.51%) 0.99

This comment was automatically generated by workflow using github-action-benchmark.

CC: @petermetz

Please sign in to comment.