diff --git a/.gitignore b/.gitignore index ba4a9775..e60be868 100644 --- a/.gitignore +++ b/.gitignore @@ -134,4 +134,5 @@ dist *.env !docker/.env -docs \ No newline at end of file +docs +.nx/cache \ No newline at end of file diff --git a/src/__tests__/lib/GetCustomCertificateBuffer.unit.spec.ts b/src/__tests__/lib/GetCustomCertificateBuffer.unit.spec.ts index 98cd3b07..07507d21 100644 --- a/src/__tests__/lib/GetCustomCertificateBuffer.unit.spec.ts +++ b/src/__tests__/lib/GetCustomCertificateBuffer.unit.spec.ts @@ -14,6 +14,18 @@ import { import { OperateApiClient } from '../../operate' import { ZeebeGrpcClient } from '../../zeebe' +let server +afterEach(() => { + ;(server && server.close && server.close()) || + (server && + server.tryShutdown && + server.tryShutdown((err) => { + if (err) { + throw err + } + })) +}) + test('Can use a custom root certificate to connect to a REST API', async () => { const app = express() @@ -26,7 +38,7 @@ test('Can use a custom root certificate to connect to a REST API', async () => { cert: fs.readFileSync(path.join(__dirname, 'localhost.crt')), } - const server = https.createServer(options, app) + server = https.createServer(options, app) server.listen(3012, () => { // console.log('Server listening on port 3012') @@ -81,7 +93,7 @@ test('gRPC server with self-signed certificate', (done) => { ) as unknown as { gateway_protocol: { Gateway: any } } // Create the server - const server = new Server() + server = new Server() // Add a service to the server server.addService(zeebeProto.gateway_protocol.Gateway.service, { @@ -140,16 +152,104 @@ test('gRPC server with self-signed certificate', (done) => { done() }) }) - // const zbc2 = new ZeebeGrpcClient({ - // config: { - // CAMUNDA_OAUTH_DISABLED: true, - // ZEEBE_ADDRESS: 'localhost:50051', - // }, - // }) - // zbc2.topology().catch((e) => { - // console.log(e) - // zbc2.close() - // }) + } + ) +}) + +test('gRPC server with self-signed certificate provided via string', (done) => { + // Load the protobuf definition + const packageDefinition = loadSync( + path.join(__dirname, '..', '..', 'proto', 'zeebe.proto'), + { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, + } + ) + + const zeebeProto = loadPackageDefinition( + packageDefinition + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ) as unknown as { gateway_protocol: { Gateway: any } } + + // Create the server + const server = new Server() + + // Add a service to the server + server.addService(zeebeProto.gateway_protocol.Gateway.service, { + Topology: (_, callback) => { + const t = new TopologyResponse() + const b = new BrokerInfo() + b.setHost('localhost') + const partition = new Partition() + partition.setHealth(0) + partition.setPartitionid(0) + partition.setRole(0) + b.setPartitionsList([partition]) + t.setBrokersList([b]) + callback(null, t) + }, + // Implement your service methods here + }) + + // Read the key and certificate + const key = fs.readFileSync(path.join(__dirname, 'localhost.key')) + const cert = fs.readFileSync(path.join(__dirname, 'localhost.crt')) + + // Start the server + server.bindAsync( + 'localhost:50051', + ServerCredentials.createSsl(null, [ + { + private_key: key, + cert_chain: cert, + }, + ]), + (err) => { + if (err) { + console.error(err) + done() + return + } + const zbc = new ZeebeGrpcClient({ + config: { + CAMUNDA_OAUTH_DISABLED: true, + ZEEBE_ADDRESS: 'localhost:50051', + CAMUNDA_CUSTOM_ROOT_CERT_STRING: `-----BEGIN CERTIFICATE----- +MIIC7TCCAdWgAwIBAgIUXlnuRfR2yE/v5t9A0ZoeVG4BvZowDQYJKoZIhvcNAQEL +BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTI0MDUwMTAzMTIyNloXDTI1MDUw +MTAzMTIyNlowFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEF +AAOCAQ8AMIIBCgKCAQEA57iCfaiUmW74QUb0MtX5MOCOfhKJ8zw0i4Ep+nwST03w +Z/K5Z0W7OPmJDnPsko9DPC3bOeObriTcWgYg4zNaLNSKvDbkbLtDvpUyY8rdJAP3 +6H7uTZWMoUQzdIGdSFZHa8HRO1HUZGFZT55bi7czyMPFnzSOtcaz4pCvlhLRk+QA +lsSG+4owhfDrpyPlFtEFNyeaE2fIsJtHxupkwGOmDNeh6iKV46lD8F0SGf2pl5qF +nbbMn6IZlpH7heQqMdPNs1ikGOuDybJISu07S72RSoClgdFzepzXFHoNWwhucdvN +UMJXWBnP/PoeNViI2+nBMrK/1Bwuhci0t5mjTujQNQIDAQABozcwNTAUBgNVHREE +DTALgglsb2NhbGhvc3QwHQYDVR0OBBYEFBuCDjLQbWjX7D+o9dt4nszcP3OIMA0G +CSqGSIb3DQEBCwUAA4IBAQASgexBeY7Nz9i0qREk1RzpQDIT+jM/QAgmnH3G6Ehx +tAYUMYLeDTmGhYhp3GJAU7/R3mbN6t5qg2d9Fa8b+JpBJRdxMY+CyjESoPvUHIE3 +lkgNGphT+8QPnh7uO5KOUVnk7Jc9MTwBntDouLHfuzJJnHPlRko3IWnwaivZYVRn +VbnUoSMKKPzFaqqaY8uHPjvs4Gt4OGcYV8hHcjeI3fMHckmsXZclxb3pwF+x698o +Htg5+ydbmWkTspvbMuHx/280Ow0JPSSXFnwWGWpyH7kI0EAfq75W3iGMRR6yL7Je +ffZG7W8KARYx824nRlxbIN2rHo9VQwEBkbmoeg5nSkvi +-----END CERTIFICATE-----`, + CAMUNDA_SECURE_CONNECTION: true, + zeebeGrpcSettings: { + ZEEBE_CLIENT_LOG_LEVEL: 'NONE', + }, + }, + }) + zbc.topology().then(() => { + expect(true).toBe(true) + zbc.close() + // Stop the server after the test + server.tryShutdown((err) => { + if (err) console.error(err) + done() + }) + }) } ) }) diff --git a/src/__tests__/zeebe/integration/Client-ThrowError.spec.ts b/src/__tests__/zeebe/integration/Client-ThrowError.spec.ts index 2f2ea0e7..fb89a4fb 100644 --- a/src/__tests__/zeebe/integration/Client-ThrowError.spec.ts +++ b/src/__tests__/zeebe/integration/Client-ThrowError.spec.ts @@ -10,26 +10,8 @@ jest.setTimeout(25000) let bpmnProcessId: string let processDefinitionKey: string -let zbc: ZeebeGrpcClient - beforeAll(async () => { suppressZeebeLogging() - const zb = new ZeebeGrpcClient() - ;({ bpmnProcessId, processDefinitionKey } = ( - await zb.deployResource({ - processFilename: './src/__tests__/testdata/Client-ThrowError.bpmn', - }) - ).deployments[0].process) - await cancelProcesses(processDefinitionKey) - await zb.close() -}) - -beforeEach(() => { - zbc = new ZeebeGrpcClient() -}) - -afterEach(async () => { - await zbc.close() }) afterAll(async () => { @@ -38,6 +20,13 @@ afterAll(async () => { }) test('Throws a business error that is caught in the process', async () => { + const zbc = new ZeebeGrpcClient() + ;({ bpmnProcessId, processDefinitionKey } = ( + await zbc.deployResource({ + processFilename: './src/__tests__/testdata/Client-ThrowError.bpmn', + }) + ).deployments[0].process) + await cancelProcesses(processDefinitionKey) zbc.createWorker({ taskHandler: (job) => job.error('BUSINESS_ERROR', 'Well, that did not work'), @@ -57,9 +46,18 @@ test('Throws a business error that is caught in the process', async () => { variables: {}, }) expect(result.variables.bpmnErrorCaught).toBe(true) + await zbc.close() }) test('Can set variables when throwing a BPMN Error', async () => { + const zbc = new ZeebeGrpcClient() + ;({ bpmnProcessId, processDefinitionKey } = ( + await zbc.deployResource({ + processFilename: './src/__tests__/testdata/Client-ThrowError.bpmn', + }) + ).deployments[0].process) + await cancelProcesses(processDefinitionKey) + // This worker takes the first job and throws a BPMN error, setting a variable zbc.createWorker({ taskHandler: (job) => job.error({ @@ -68,8 +66,8 @@ test('Can set variables when throwing a BPMN Error', async () => { variables: { something: 'someValue' }, }), taskType: 'throw-bpmn-error-task', - timeout: Duration.seconds.of(30), }) + // This worker is on the business error throw path zbc.createWorker({ taskType: 'sad-flow', taskHandler: (job) => @@ -83,5 +81,8 @@ test('Can set variables when throwing a BPMN Error', async () => { variables: {}, }) expect(result.variables.bpmnErrorCaught).toBe(true) - // expect(result.variables.something).toBe("someValue") + // This is not working, the variable is not being set on 8.5 + // this may be due to incremental implementation of the feature + // expect(result.variables.something).toBe('someValue') + await zbc.close() }) diff --git a/src/__tests__/zeebe/integration/Client-onReady.spec.ts b/src/__tests__/zeebe/integration/Client-onReady.spec.ts index 77626644..3fe682d9 100644 --- a/src/__tests__/zeebe/integration/Client-onReady.spec.ts +++ b/src/__tests__/zeebe/integration/Client-onReady.spec.ts @@ -17,7 +17,7 @@ test('Does not call the onReady handler if there is no broker', (done) => { expect(called).toBe(false) await zbc2.close() done(null) - }, 4000) + }, 8000) }) test('Calls the onReady handler if there is a broker and eagerConnection is true', (done) => { @@ -32,7 +32,7 @@ test('Calls the onReady handler if there is a broker and eagerConnection is true expect(called).toBe(1) await zbc2.close() done() - }, 6000) + }, 8000) }) test('Sets connected to true if there is a broker', (done) => { @@ -44,7 +44,7 @@ test('Sets connected to true if there is a broker', (done) => { expect(zbc2.connected).toBe(true) await zbc2.close() done() - }, 6000) + }, 8000) }) test('emits the ready event if there is a broker and eagerConnection: true', (done) => { @@ -52,7 +52,7 @@ test('emits the ready event if there is a broker and eagerConnection: true', (do const zbc2 = new ZeebeGrpcClient({ config: { zeebeGrpcSettings: { ZEEBE_GRPC_CLIENT_EAGER_CONNECT: true } }, }).on('ready', () => { - called++ + called = called + 1 }) setTimeout(async () => { @@ -60,5 +60,5 @@ test('emits the ready event if there is a broker and eagerConnection: true', (do expect(zbc2.connected).toBe(true) await zbc2.close() done() - }, 6000) + }, 8000) }) diff --git a/src/generated/zeebe_grpc_pb.ts b/src/generated/zeebe_grpc_pb.ts index 2661965b..5eaac678 100644 --- a/src/generated/zeebe_grpc_pb.ts +++ b/src/generated/zeebe_grpc_pb.ts @@ -1,3 +1,4 @@ +/* eslint-disable @typescript-eslint/no-var-requires */ // GENERATED CODE -- DO NOT EDIT! 'use strict' diff --git a/src/generated/zeebe_pb.d.ts b/src/generated/zeebe_pb.d.ts index 51f35bfd..801268f0 100644 --- a/src/generated/zeebe_pb.d.ts +++ b/src/generated/zeebe_pb.d.ts @@ -1,3 +1,4 @@ +/* eslint-disable @typescript-eslint/ban-types */ // package: gateway_protocol // file: zeebe.proto diff --git a/src/lib/Configuration.ts b/src/lib/Configuration.ts index cc43e2f2..0b8bcce6 100644 --- a/src/lib/Configuration.ts +++ b/src/lib/Configuration.ts @@ -111,6 +111,11 @@ const getMainEnv = () => type: 'string', optional: true, }, + /** When using self-signed certificates, provide the root certificate as a string */ + CAMUNDA_CUSTOM_ROOT_CERT_STRING: { + type: 'string', + optional: true, + }, /** When using custom or self-signed certificates, provide the path to the certificate chain */ CAMUNDA_CUSTOM_CERT_CHAIN_PATH: { type: 'string', diff --git a/src/lib/GetCustomCertificateBuffer.ts b/src/lib/GetCustomCertificateBuffer.ts index 75db14a2..8c6ca584 100644 --- a/src/lib/GetCustomCertificateBuffer.ts +++ b/src/lib/GetCustomCertificateBuffer.ts @@ -9,8 +9,9 @@ export async function GetCustomCertificateBuffer( config: CamundaPlatform8Configuration ): Promise { const customRootCertPath = config.CAMUNDA_CUSTOM_ROOT_CERT_PATH + const customRootCert = config.CAMUNDA_CUSTOM_ROOT_CERT_STRING - if (!customRootCertPath) { + if (!customRootCertPath && !customRootCert) { return undefined } const rootCerts: string[] = [] @@ -21,6 +22,8 @@ export async function GetCustomCertificateBuffer( if (cert) { rootCerts.push(cert) } + } else if (customRootCert) { + rootCerts.push(customRootCert) } // (2) use certificates from OS keychain diff --git a/src/zeebe/lib/ConnectionFactory.ts b/src/zeebe/lib/ConnectionFactory.ts index a0f6b841..a04a8c2c 100644 --- a/src/zeebe/lib/ConnectionFactory.ts +++ b/src/zeebe/lib/ConnectionFactory.ts @@ -45,16 +45,17 @@ export class ConnectionFactory { grpcConfig.host ) const log = new StatefulLogInterceptor({ characteristics, logConfig }) - const grpcClient = new GrpcMiddleware({ + const grpcMiddleware = new GrpcMiddleware({ characteristics, config: grpcConfig, log, - }).getGrpcClient() + }) + const grpcClient = grpcMiddleware.getGrpcClient() const _close = grpcClient.close.bind(grpcClient) grpcClient.close = async () => { log.close() _close() - return null + return grpcMiddleware.close() } return { grpcClient, log } diff --git a/src/zeebe/lib/GrpcClient.ts b/src/zeebe/lib/GrpcClient.ts index b6529b7e..45925008 100644 --- a/src/zeebe/lib/GrpcClient.ts +++ b/src/zeebe/lib/GrpcClient.ts @@ -430,7 +430,7 @@ export class GrpcClient extends EventEmitter { return this.listNameMethods } - public close(timeout = 5000): Promise { + public close(timeout = 5000): Promise { const STATE_SHUTDOWN = 4 const isClosed = (state) => state === STATE_SHUTDOWN @@ -453,7 +453,7 @@ export class GrpcClient extends EventEmitter { if (closed || alreadyClosed) { this.channelClosed = true this.emit(MiddlewareSignals.Log.Info, 'Grpc channel closed') - return resolve(null) // setTimeout(() => resolve(), 2000) + return resolve() // setTimeout(() => resolve(), 2000) } this.emit( @@ -480,7 +480,7 @@ export class GrpcClient extends EventEmitter { this.emit(MiddlewareSignals.Log.Info, `Closed: ${alreadyClosed}`) } if (alreadyClosed) { - return resolve(null) + return resolve() } }) diff --git a/src/zeebe/lib/GrpcMiddleware.ts b/src/zeebe/lib/GrpcMiddleware.ts index 83120522..22b15dec 100644 --- a/src/zeebe/lib/GrpcMiddleware.ts +++ b/src/zeebe/lib/GrpcMiddleware.ts @@ -46,6 +46,10 @@ export class GrpcMiddleware { } public getGrpcClient = () => this.grpcClient + public close = async () => { + clearTimeout(this.blockingTimer) + } + private createInterceptedGrpcClient(config: GrpcClientCtor) { const grpcClient = new GrpcClient(config) const logInterceptor = this.log @@ -55,7 +59,7 @@ export class GrpcMiddleware { clearTimeout(this.blockingTimer) } _close() - return null + return } grpcClient.on(MiddlewareSignals.Log.Debug, logInterceptor.logDebug) grpcClient.on(MiddlewareSignals.Log.Info, logInterceptor.logInfo) diff --git a/src/zeebe/zb/ZeebeGrpcClient.ts b/src/zeebe/zb/ZeebeGrpcClient.ts index 1bc39bf9..257551d8 100644 --- a/src/zeebe/zb/ZeebeGrpcClient.ts +++ b/src/zeebe/zb/ZeebeGrpcClient.ts @@ -11,6 +11,7 @@ import { CamundaEnvironmentConfigurator, CamundaPlatform8Configuration, DeepPartial, + GetCustomCertificateBuffer, LosslessDto, RequireConfiguration, constructOAuthProvider, @@ -65,18 +66,18 @@ const idColors = [ export class ZeebeGrpcClient extends TypedEmitter< typeof ConnectionStatusEvent > { - public connectionTolerance: MaybeTimeDuration + public connectionTolerance!: MaybeTimeDuration public connected?: boolean = undefined public readied = false public gatewayAddress: string public loglevel: Loglevel public onReady?: () => void public onConnectionError?: (err: Error) => void - private logger: StatefulLogInterceptor + private logger!: StatefulLogInterceptor private closePromise?: Promise private closing = false // A gRPC channel for the ZBClient to execute commands on - private grpc: ZB.ZBGrpc + private grpc: Promise private options: ZBClientOptions private workerCount = 0 private workers: ZBWorker[] = // eslint-disable-line @typescript-eslint/no-explicit-any @@ -136,19 +137,6 @@ export class ZeebeGrpcClient extends TypedEmitter< debug('Gateway address: ', this.gatewayAddress) - this.useTLS = config.CAMUNDA_SECURE_CONNECTION - const certChainPath = config.CAMUNDA_CUSTOM_CERT_CHAIN_PATH - const rootCertsPath = config.CAMUNDA_CUSTOM_ROOT_CERT_PATH - const privateKeyPath = config.CAMUNDA_CUSTOM_PRIVATE_KEY_PATH - const customSSL = { - certChain: certChainPath ? readFileSync(certChainPath) : undefined, - privateKey: privateKeyPath ? readFileSync(privateKeyPath) : undefined, - rootCerts: rootCertsPath ? readFileSync(rootCertsPath) : undefined, - } - - this.customSSL = customSSL - this.options.customSSL = customSSL - this.connectionTolerance = Duration.milliseconds.of( config.zeebeGrpcSettings.ZEEBE_GRPC_CLIENT_CONNECTION_TOLERANCE_MS ) @@ -158,45 +146,61 @@ export class ZeebeGrpcClient extends TypedEmitter< this.oAuthProvider = options?.oAuthProvider ?? constructOAuthProvider(config) - const { grpcClient, log } = this.constructGrpcClient({ - grpcConfig: { - namespace: this.options.logNamespace || 'ZBClient', - }, - logConfig: { - _tag: 'ZBCLIENT', - loglevel: this.loglevel, - longPoll: Duration.milliseconds.from(this.options.longPoll), - namespace: this.options.logNamespace || 'ZBClient', - pollInterval: Duration.milliseconds.from(this.options.pollInterval), - stdout: this.stdout, - }, - }) - - grpcClient.on(ConnectionStatusEvent.connectionError, (err: Error) => { - if (this.connected !== false) { - this.onConnectionError?.(err) - this.emit(ConnectionStatusEvent.connectionError) - } - this.connected = false - this.readied = false - }) - grpcClient.on(ConnectionStatusEvent.ready, () => { - if (!this.readied) { - this.onReady?.() - this.emit(ConnectionStatusEvent.ready) - } - this.connected = true - this.readied = true - }) - this.grpc = grpcClient - this.logger = log - this.maxRetries = config.zeebeGrpcSettings.ZEEBE_GRPC_CLIENT_MAX_RETRIES this.maxRetryTimeout = Duration.seconds.of( config.zeebeGrpcSettings.ZEEBE_GRPC_CLIENT_MAX_RETRY_TIMEOUT_SECONDS ) + this.useTLS = config.CAMUNDA_SECURE_CONNECTION + const certChainPath = config.CAMUNDA_CUSTOM_CERT_CHAIN_PATH + const privateKeyPath = config.CAMUNDA_CUSTOM_PRIVATE_KEY_PATH + + this.grpc = GetCustomCertificateBuffer(config).then((rootCerts) => { + const customSSL = { + certChain: certChainPath ? readFileSync(certChainPath) : undefined, + privateKey: privateKeyPath ? readFileSync(privateKeyPath) : undefined, + rootCerts, + } + + this.customSSL = customSSL + this.options.customSSL = customSSL + + const { grpcClient, log } = this.constructGrpcClient({ + grpcConfig: { + namespace: this.options.logNamespace || 'ZBClient', + }, + logConfig: { + _tag: 'ZBCLIENT', + loglevel: this.loglevel, + longPoll: Duration.milliseconds.from(this.options.longPoll), + namespace: this.options.logNamespace || 'ZBClient', + pollInterval: Duration.milliseconds.from(this.options.pollInterval), + stdout: this.stdout, + }, + }) + + grpcClient.on(ConnectionStatusEvent.connectionError, (err: Error) => { + debug('grpcClient emitted error event to ZeebeGrpcClient, err: ', err) + this.readied = false + if (this.connected !== false) { + this.connected = false + this.onConnectionError?.(err) + this.emit(ConnectionStatusEvent.connectionError) + } + }) + grpcClient.on(ConnectionStatusEvent.ready, () => { + debug('grpcClient emitted ready event to ZeebeGrpcClient') + this.connected = true + if (!this.readied) { + this.onReady?.() + this.emit(ConnectionStatusEvent.ready) + } + this.readied = true + }) + this.logger = log + return grpcClient + }) // Send command to broker to eagerly fail / prove connection. // This is useful for, for example: the Node-Red client, which wants to // display the connection status. @@ -207,6 +211,8 @@ export class ZeebeGrpcClient extends TypedEmitter< .then((res) => { this.logger.logDirect(chalk.blueBright('Zeebe cluster topology:')) this.logger.logDirect(res.brokers) + // debug('Emitting ready event') + // this.emit(ConnectionStatusEvent.ready) }) .catch((e) => { // Swallow exception to avoid throwing if retries are off @@ -268,7 +274,7 @@ export class ZeebeGrpcClient extends TypedEmitter< // eslint-disable-next-line no-async-promise-executor return new Promise(async (resolve, reject) => { try { - const stream = await this.grpc.activateJobsStream(req) + const stream = await (await this.grpc).activateJobsStream(req) stream.on('data', (res: Grpc.ActivateJobsResponse) => { const jobs = res.jobs.map((job) => parseVariablesAndCustomHeadersToJSON( @@ -306,8 +312,8 @@ export class ZeebeGrpcClient extends TypedEmitter< variables: JSON.stringify(req.variables ?? {}), tenantId: req.tenantId ?? this.tenantId, } - return this.executeOperation('broadcastSignal', () => - this.grpc.broadcastSignalSync(request) + return this.executeOperation('broadcastSignal', async () => + (await this.grpc).broadcastSignalSync(request) ) } @@ -328,8 +334,8 @@ export class ZeebeGrpcClient extends TypedEmitter< processInstanceKey: string | number ): Promise { Utils.validateNumber(processInstanceKey, 'processInstanceKey') - return this.executeOperation('cancelProcessInstance', () => - this.grpc.cancelProcessInstanceSync({ + return this.executeOperation('cancelProcessInstance', async () => + (await this.grpc).cancelProcessInstanceSync({ processInstanceKey, }) ) @@ -460,16 +466,17 @@ export class ZeebeGrpcClient extends TypedEmitter< * ``` */ public async close(timeout?: number): Promise { + debug('Closing Zeebe Client') this.closePromise = this.closePromise || new Promise((resolve) => { // Prevent the creation of more workers this.closing = true Promise.all(this.workers.map((w) => w.close(timeout))) - .then(() => this.grpc.close(timeout)) - .then(() => { + .then(async () => (await this.grpc).close(timeout)) + .then(async () => { this.emit(ConnectionStatusEvent.close) - this.grpc.removeAllListeners() + ;(await this.grpc).removeAllListeners() this.removeAllListeners() resolve(null) }) @@ -505,8 +512,8 @@ export class ZeebeGrpcClient extends TypedEmitter< ): Promise { const withStringifiedVariables = stringifyVariables(completeJobRequest) this.logger.logDebug(withStringifiedVariables) - return this.executeOperation('completeJob', () => - this.grpc.completeJobSync(withStringifiedVariables).catch((e) => { + return this.executeOperation('completeJob', async () => + (await this.grpc).completeJobSync(withStringifiedVariables).catch((e) => { if (e.code === GrpcError.NOT_FOUND) { e.details += '. The process may have been cancelled, the job cancelled by an interrupting event, or the job already completed.' + @@ -559,8 +566,8 @@ export class ZeebeGrpcClient extends TypedEmitter< tenantId: config.tenantId ?? this.tenantId, }) - return this.executeOperation('createProcessInstance', () => - this.grpc.createProcessInstanceSync(createProcessInstanceRequest) + return this.executeOperation('createProcessInstance', async () => + (await this.grpc).createProcessInstanceSync(createProcessInstanceRequest) ) } @@ -604,8 +611,8 @@ export class ZeebeGrpcClient extends TypedEmitter< tenantId: request.tenantId ?? this.tenantId, }) - return this.executeOperation('createProcessInstanceWithResult', () => - this.grpc.createProcessInstanceWithResultSync({ + return this.executeOperation('createProcessInstanceWithResult', async () => + (await this.grpc).createProcessInstanceWithResultSync({ fetchVariables: request.fetchVariables, request: createProcessInstanceRequest, requestTimeout: request.requestTimeout, @@ -628,8 +635,8 @@ export class ZeebeGrpcClient extends TypedEmitter< }: { resourceKey: string }): Promise> { - return this.executeOperation('deleteResourceSync', () => - this.grpc.deleteResourceSync({ resourceKey }) + return this.executeOperation('deleteResourceSync', async () => + (await this.grpc).deleteResourceSync({ resourceKey }) ) } @@ -718,8 +725,8 @@ export class ZeebeGrpcClient extends TypedEmitter< if (isProcessFilename(resource)) { const filename = resource.processFilename const process = readFileSync(filename) - return this.executeOperation('deployResource', () => - this.grpc.deployResourceSync({ + return this.executeOperation('deployResource', async () => + (await this.grpc).deployResourceSync({ resources: [ { name: filename, @@ -730,8 +737,8 @@ export class ZeebeGrpcClient extends TypedEmitter< }) ) } else if (isProcess(resource)) { - return this.executeOperation('deployResource', () => - this.grpc.deployResourceSync({ + return this.executeOperation('deployResource', async () => + (await this.grpc).deployResourceSync({ resources: [ { name: resource.name, @@ -744,8 +751,8 @@ export class ZeebeGrpcClient extends TypedEmitter< } else if (isDecisionFilename(resource)) { const filename = resource.decisionFilename const decision = readFileSync(filename) - return this.executeOperation('deployResource', () => - this.grpc.deployResourceSync({ + return this.executeOperation('deployResource', async () => + (await this.grpc).deployResourceSync({ resources: [ { name: filename, @@ -756,8 +763,8 @@ export class ZeebeGrpcClient extends TypedEmitter< }) ) } else if (isDecision(resource)) { - return this.executeOperation('deployResource', () => - this.grpc.deployResourceSync({ + return this.executeOperation('deployResource', async () => + (await this.grpc).deployResourceSync({ resources: [ { name: resource.name, @@ -770,8 +777,8 @@ export class ZeebeGrpcClient extends TypedEmitter< } else if (isFormFilename(resource)) { const filename = resource.formFilename const form = readFileSync(filename) - return this.executeOperation('deployResource', () => - this.grpc.deployResourceSync({ + return this.executeOperation('deployResource', async () => + (await this.grpc).deployResourceSync({ resources: [ { name: filename, @@ -783,8 +790,8 @@ export class ZeebeGrpcClient extends TypedEmitter< ) } /* if (isForm(resource)) */ else { // default fall-through - return this.executeOperation('deployResource', () => - this.grpc.deployResourceSync({ + return this.executeOperation('deployResource', async () => + (await this.grpc).deployResourceSync({ resources: [ { name: resource.name, @@ -815,8 +822,8 @@ export class ZeebeGrpcClient extends TypedEmitter< const variables = losslessStringify( evaluateDecisionRequest.variables ) as unknown as ZB.JSONDoc - return this.executeOperation('evaluateDecision', () => - this.grpc.evaluateDecisionSync({ + return this.executeOperation('evaluateDecision', async () => + (await this.grpc).evaluateDecisionSync({ ...evaluateDecisionRequest, variables, tenantId: evaluateDecisionRequest.tenantId ?? this.tenantId, @@ -841,8 +848,8 @@ export class ZeebeGrpcClient extends TypedEmitter< * ``` */ public failJob(failJobRequest: Grpc.FailJobRequest): Promise { - return this.executeOperation('failJob', () => - this.grpc.failJobSync(failJobRequest) + return this.executeOperation('failJob', async () => + (await this.grpc).failJobSync(failJobRequest) ) } @@ -885,7 +892,7 @@ export class ZeebeGrpcClient extends TypedEmitter< public modifyProcessInstance( modifyProcessInstanceRequest: Grpc.ModifyProcessInstanceRequest ): Promise { - return this.executeOperation('modifyProcessInstance', () => { + return this.executeOperation('modifyProcessInstance', async () => { // We accept JSONDoc for the variableInstructions, but the actual gRPC call needs stringified JSON, so transform it with a mutation const req = Utils.deepClone(modifyProcessInstanceRequest) req?.activateInstructions?.forEach((a) => @@ -893,7 +900,7 @@ export class ZeebeGrpcClient extends TypedEmitter< (v) => (v.variables = losslessStringify(v.variables)) ) ) - return this.grpc.modifyProcessInstanceSync({ + return (await this.grpc).modifyProcessInstanceSync({ ...req, }) }) @@ -906,8 +913,10 @@ export class ZeebeGrpcClient extends TypedEmitter< public migrateProcessInstance( migrateProcessInstanceRequest: Grpc.MigrateProcessInstanceRequest ): Promise { - return this.executeOperation('migrateProcessInstance', () => - this.grpc.migrateProcessInstanceSync(migrateProcessInstanceRequest) + return this.executeOperation('migrateProcessInstance', async () => + (await this.grpc).migrateProcessInstanceSync( + migrateProcessInstanceRequest + ) ) } @@ -934,8 +943,8 @@ export class ZeebeGrpcClient extends TypedEmitter< >( publishMessageRequest: Grpc.PublishMessageRequest ): Promise { - return this.executeOperation('publishMessage', () => - this.grpc.publishMessageSync( + return this.executeOperation('publishMessage', async () => + (await this.grpc).publishMessageSync( stringifyVariables({ ...publishMessageRequest, variables: publishMessageRequest.variables, @@ -997,8 +1006,8 @@ export class ZeebeGrpcClient extends TypedEmitter< ...publishStartMessageRequest, tenantId: publishStartMessageRequest.tenantId ?? this.tenantId, } - return this.executeOperation('publishStartMessage', () => - this.grpc.publishMessageSync( + return this.executeOperation('publishStartMessage', async () => + (await this.grpc).publishMessageSync( stringifyVariables({ ...publishMessageRequest, variables: publishMessageRequest.variables || {}, @@ -1041,8 +1050,8 @@ export class ZeebeGrpcClient extends TypedEmitter< public resolveIncident( resolveIncidentRequest: Grpc.ResolveIncidentRequest ): Promise { - return this.executeOperation('resolveIncident', () => - this.grpc.resolveIncidentSync(resolveIncidentRequest) + return this.executeOperation('resolveIncident', async () => + (await this.grpc).resolveIncidentSync(resolveIncidentRequest) ) } @@ -1092,8 +1101,8 @@ export class ZeebeGrpcClient extends TypedEmitter< ? losslessStringify(request.variables) : request.variables - return this.executeOperation('setVariables', () => - this.grpc.setVariablesSync({ ...request, variables }) + return this.executeOperation('setVariables', async () => + (await this.grpc).setVariablesSync({ ...request, variables }) ) } @@ -1146,8 +1155,8 @@ export class ZeebeGrpcClient extends TypedEmitter< ...throwErrorRequest, variables: throwErrorRequest.variables ?? {}, }) - return this.executeOperation('throwError', () => - this.grpc.throwErrorSync(req) + return this.executeOperation('throwError', async () => + (await this.grpc).throwErrorSync(req) ) } @@ -1160,8 +1169,8 @@ export class ZeebeGrpcClient extends TypedEmitter< * zbc.topology().then(res => console.res(JSON.stringify(res, null, 2))) * ``` */ - public topology(): Promise { - return this.executeOperation('topology', this.grpc.topologySync) + public async topology(): Promise { + return this.executeOperation('topology', (await this.grpc).topologySync) } /** @@ -1201,8 +1210,8 @@ export class ZeebeGrpcClient extends TypedEmitter< public updateJobRetries( updateJobRetriesRequest: Grpc.UpdateJobRetriesRequest ): Promise { - return this.executeOperation('updateJobRetries', () => - this.grpc.updateJobRetriesSync(updateJobRetriesRequest) + return this.executeOperation('updateJobRetries', async () => + (await this.grpc).updateJobRetriesSync(updateJobRetriesRequest) ) } @@ -1306,8 +1315,8 @@ export class ZeebeGrpcClient extends TypedEmitter< let connectionErrorCount = 0 let authFailures = 0 return promiseRetry( - (retry, n) => { - if (this.closing || this.grpc.channelClosed) { + async (retry, n) => { + if (this.closing || (await this.grpc).channelClosed) { /** * Should we reject instead? The idea here is that calling ZBClient.close() will allow the application to cleanly shut down. * If we reject here, any pending calls will throw errors. This is probably not what the user is expecting to see. @@ -1319,7 +1328,7 @@ export class ZeebeGrpcClient extends TypedEmitter< `[${operationName}]: Attempt ${n} (max: ${this.maxRetries}).` ) } - return operation().catch((err) => { + return operation().catch(async (err) => { // This could be DNS resolution, or the gRPC gateway is not reachable yet, or Backpressure const isNetworkError = (err.message.indexOf('14') === 0 || @@ -1351,7 +1360,7 @@ export class ZeebeGrpcClient extends TypedEmitter< } // The gRPC channel will be closed if close has been called - if (this.grpc.channelClosed) { + if ((await this.grpc).channelClosed) { return Promise.resolve(null as unknown as T) } throw err