diff --git a/package-lock.json b/package-lock.json index 6dff24c6e..fcdc64ee1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -30212,6 +30212,8 @@ "version": "0.102.0", "license": "Apache-2.0", "dependencies": { + "@deephaven/log": "file:../log", + "@deephaven/utils": "file:../utils", "ws": "^8.18.0" }, "devDependencies": { @@ -32508,6 +32510,8 @@ "@deephaven/jsapi-nodejs": { "version": "file:packages/jsapi-nodejs", "requires": { + "@deephaven/log": "file:../log", + "@deephaven/utils": "file:../utils", "@types/node": "^22.7.5", "@types/ws": "^8.5.12", "ws": "^8.18.0" diff --git a/packages/jsapi-nodejs/package.json b/packages/jsapi-nodejs/package.json index bf4149de2..042a633e8 100644 --- a/packages/jsapi-nodejs/package.json +++ b/packages/jsapi-nodejs/package.json @@ -21,6 +21,8 @@ "build:babel": "babel ./src --out-dir ./dist --extensions \".ts,.tsx,.js,.jsx\" --source-maps --root-mode upward" }, "dependencies": { + "@deephaven/log": "file:../log", + "@deephaven/utils": "file:../utils", "ws": "^8.18.0" }, "devDependencies": { diff --git a/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts b/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts new file mode 100644 index 000000000..6c8d8e5a6 --- /dev/null +++ b/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts @@ -0,0 +1,194 @@ +import http2 from 'node:http2'; +import type { dh as DhcType } from '@deephaven/jsapi-types'; +import Log from '@deephaven/log'; +import { assertNotNull } from '@deephaven/utils'; + +const logger = Log.module('@deephaven/jsapi-nodejs.NodeHttp2gRPCTransport'); + +type GrpcTransport = DhcType.grpc.GrpcTransport; +type GrpcTransportFactory = DhcType.grpc.GrpcTransportFactory; +type GrpcTransportOptions = DhcType.grpc.GrpcTransportOptions; + +/** + * A gRPC transport implementation using Node.js's built-in HTTP/2 client. This + * can be passed to the CoreClient constructor to adapt the underlying transport + * to use http2. This addresses a limitation of nodejs `fetch` implementation + * which currently uses http1. + * + * e.g. + * const client = new dhc.CoreClient(dhServerUrl, { + * transportFactory: NodeHttp2gRPCTransport.factory, + * }) + */ +export class NodeHttp2gRPCTransport implements GrpcTransport { + private static sessionMap: Map = new Map(); + + /** + * Factory for creating new NodeHttp2gRPCTransport instances. + */ + static readonly factory: GrpcTransportFactory = { + /** + * Create a new transport instance. + * @param options - options for creating the transport + * @return a transport instance to use for gRPC communication + */ + create: (options: GrpcTransportOptions): GrpcTransport => { + const { origin } = new URL(options.url); + + if (!NodeHttp2gRPCTransport.sessionMap.has(origin)) { + const session = http2.connect(origin); + session.on('error', err => { + logger.error('Session error', err); + }); + NodeHttp2gRPCTransport.sessionMap.set(origin, session); + } + + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const session = NodeHttp2gRPCTransport.sessionMap.get(origin)!; + + return new NodeHttp2gRPCTransport(options, session); + }, + + /** + * Return true to signal that created transports may have {@link GrpcTransport.sendMessage} + * called on it more than once before {@link GrpcTransport.finishSend} should + * be called. + * @return true to signal that the implementation can stream multiple messages, + * false otherwise indicating that Open/Next gRPC calls should be used + */ + get supportsClientStreaming(): boolean { + return true; + }, + }; + + /** + * Private constructor to limit instantiation to the static factory method. + * @param options Transport options. + * @param session node:http2 session to use for data transport. + */ + private constructor( + options: GrpcTransportOptions, + session: http2.ClientHttp2Session + ) { + this.options = options; + this.session = session; + } + + private readonly options: GrpcTransportOptions; + + private readonly session: http2.ClientHttp2Session; + + private request: http2.ClientHttp2Stream | null = null; + + /** + * Create an http2 client stream that can send requests to the server and pass + * responses to callbacks defined on the transport options. + * @param headers Request headers + * @returns The created http2 client stream + */ + createRequest = ( + headers: Record | null + ): http2.ClientHttp2Stream => { + const url = new URL(this.options.url); + + logger.debug('createRequest', url.pathname); + + const req = this.session.request({ + ...headers, + ':method': 'POST', + ':path': url.pathname, + }); + + req.on('response', (responseHeaders, _flags) => { + const headersRecord: Record = {}; + + // strip any undefined headers or keys that start with `:` + Object.keys(responseHeaders).forEach(name => { + if (responseHeaders[name] != null && !name.startsWith(':')) { + headersRecord[name] = responseHeaders[name]; + } + }); + + this.options.onHeaders(headersRecord, Number(responseHeaders[':status'])); + }); + + // Note that `chunk` is technically a `Buffer`, but the `Buffer` type defined + // in @types/pouchdb-core is outdated and incompatible with latest `Uint8Array` + // types. Since `Buffer` inherits from `Uint8Array`, we can get around this + // by just declaring it as a `Uint8Array`. + req.on('data', (chunk: Uint8Array) => { + this.options.onChunk(chunk); + }); + req.on('end', () => { + this.options.onEnd(); + }); + req.on('error', err => { + this.options.onEnd(err); + }); + + return req; + }; + + /** + * Starts the stream, sending metadata to the server. + * @param metadata - the headers to send the server when opening the connection + */ + start(metadata: { [key: string]: string | Array }): void { + logger.debug('start', metadata.headersMap); + + if (this.request != null) { + throw new Error('start called more than once'); + } + + const headers: Record = {}; + Object.entries(metadata).forEach(([key, value]) => { + headers[key] = typeof value === 'string' ? value : value.join(', '); + }); + + this.request = this.createRequest(headers); + } + + /** + * Sends a message to the server. + * @param msgBytes - bytes to send to the server + */ + sendMessage(msgBytes: Uint8Array): void { + logger.debug('sendMessage', msgBytes); + assertNotNull(this.request, 'request is required'); + + this.request.write(msgBytes); + } + + /** + * "Half close" the stream, signaling to the server that no more messages will + * be sent, but that the client is still open to receiving messages. + */ + finishSend(): void { + logger.debug('finishSend'); + assertNotNull(this.request, 'request is required'); + this.request.end(); + } + + /** + * End the stream, both notifying the server that no more messages will be + * sent nor received, and preventing the client from receiving any more events. + */ + cancel(): void { + logger.debug('cancel'); + assertNotNull(this.request, 'request is required'); + this.request.close(); + } + + /** + * Cleanup. + */ + static dispose(): void { + // eslint-disable-next-line no-restricted-syntax + for (const session of NodeHttp2gRPCTransport.sessionMap.values()) { + session.close(); + } + NodeHttp2gRPCTransport.sessionMap.clear(); + } +} + +export default NodeHttp2gRPCTransport; diff --git a/packages/jsapi-nodejs/src/index.ts b/packages/jsapi-nodejs/src/index.ts index a3ec85531..75a93b3a3 100644 --- a/packages/jsapi-nodejs/src/index.ts +++ b/packages/jsapi-nodejs/src/index.ts @@ -2,4 +2,5 @@ export * from './errorUtils.js'; export * from './fsUtils.js'; export * from './loaderUtils.js'; export * from './polyfillWs.js'; +export * from './NodeHttp2gRPCTransport.js'; export * from './serverUtils.js'; diff --git a/packages/jsapi-nodejs/tsconfig.json b/packages/jsapi-nodejs/tsconfig.json index e064ffa6b..ee5de70bc 100644 --- a/packages/jsapi-nodejs/tsconfig.json +++ b/packages/jsapi-nodejs/tsconfig.json @@ -12,5 +12,6 @@ "outDir": "dist/" }, "include": ["src/**/*.ts", "src/**/*.tsx", "src/**/*.js", "src/**/*.jsx"], - "exclude": ["node_modules", "src/**/*.test.*", "src/**/__mocks__/*"] + "exclude": ["node_modules", "src/**/*.test.*", "src/**/__mocks__/*"], + "references": [{ "path": "../log" }, { "path": "../utils" }] }