Skip to content

Commit

Permalink
feat(repo): add CAMUNDA_CUSTOM_ROOT_CERT_STRING parameter (#146)
Browse files Browse the repository at this point in the history
* feat(repo): add CAMUNDA_CUSTOM_ROOT_CERT_STRING parameter

accept a custom certificate as a string argument to the configuration

fixes #142

* style(zeebe): lint generated zeebe grpc code

* test(zeebe): extend timeout to deal with SaaS connection delay

* fix(zeebe): fix waitForReady deadline (#151)

This fixes a bug in the grpc channel connection sensing

fixes #150

* feat(repo): add CAMUNDA_CUSTOM_ROOT_CERT_STRING parameter

accept a custom certificate as a string argument to the configuration

fixes #142

* style(zeebe): lint generated zeebe grpc code

* test(zeebe): extend timeout to deal with SaaS connection delay

* test(zeebe): refactor test
  • Loading branch information
jwulf authored May 7, 2024
1 parent a88ea2e commit f828a95
Show file tree
Hide file tree
Showing 12 changed files with 275 additions and 149 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,5 @@ dist
*.env
!docker/.env

docs
docs
.nx/cache
124 changes: 112 additions & 12 deletions src/__tests__/lib/GetCustomCertificateBuffer.unit.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ import {
import { OperateApiClient } from '../../operate'
import { ZeebeGrpcClient } from '../../zeebe'

let server
afterEach(() => {
;(server && server.close && server.close()) ||
(server &&
server.tryShutdown &&
server.tryShutdown((err) => {
if (err) {
throw err
}
}))
})

test('Can use a custom root certificate to connect to a REST API', async () => {
const app = express()

Expand All @@ -26,7 +38,7 @@ test('Can use a custom root certificate to connect to a REST API', async () => {
cert: fs.readFileSync(path.join(__dirname, 'localhost.crt')),
}

const server = https.createServer(options, app)
server = https.createServer(options, app)

server.listen(3012, () => {
// console.log('Server listening on port 3012')
Expand Down Expand Up @@ -81,7 +93,7 @@ test('gRPC server with self-signed certificate', (done) => {
) as unknown as { gateway_protocol: { Gateway: any } }

// Create the server
const server = new Server()
server = new Server()

// Add a service to the server
server.addService(zeebeProto.gateway_protocol.Gateway.service, {
Expand Down Expand Up @@ -140,16 +152,104 @@ test('gRPC server with self-signed certificate', (done) => {
done()
})
})
// const zbc2 = new ZeebeGrpcClient({
// config: {
// CAMUNDA_OAUTH_DISABLED: true,
// ZEEBE_ADDRESS: 'localhost:50051',
// },
// })
// zbc2.topology().catch((e) => {
// console.log(e)
// zbc2.close()
// })
}
)
})

test('gRPC server with self-signed certificate provided via string', (done) => {
// Load the protobuf definition
const packageDefinition = loadSync(
path.join(__dirname, '..', '..', 'proto', 'zeebe.proto'),
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
}
)

const zeebeProto = loadPackageDefinition(
packageDefinition
// eslint-disable-next-line @typescript-eslint/no-explicit-any
) as unknown as { gateway_protocol: { Gateway: any } }

// Create the server
const server = new Server()

// Add a service to the server
server.addService(zeebeProto.gateway_protocol.Gateway.service, {
Topology: (_, callback) => {
const t = new TopologyResponse()
const b = new BrokerInfo()
b.setHost('localhost')
const partition = new Partition()
partition.setHealth(0)
partition.setPartitionid(0)
partition.setRole(0)
b.setPartitionsList([partition])
t.setBrokersList([b])
callback(null, t)
},
// Implement your service methods here
})

// Read the key and certificate
const key = fs.readFileSync(path.join(__dirname, 'localhost.key'))
const cert = fs.readFileSync(path.join(__dirname, 'localhost.crt'))

// Start the server
server.bindAsync(
'localhost:50051',
ServerCredentials.createSsl(null, [
{
private_key: key,
cert_chain: cert,
},
]),
(err) => {
if (err) {
console.error(err)
done()
return
}
const zbc = new ZeebeGrpcClient({
config: {
CAMUNDA_OAUTH_DISABLED: true,
ZEEBE_ADDRESS: 'localhost:50051',
CAMUNDA_CUSTOM_ROOT_CERT_STRING: `-----BEGIN CERTIFICATE-----
MIIC7TCCAdWgAwIBAgIUXlnuRfR2yE/v5t9A0ZoeVG4BvZowDQYJKoZIhvcNAQEL
BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTI0MDUwMTAzMTIyNloXDTI1MDUw
MTAzMTIyNlowFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEA57iCfaiUmW74QUb0MtX5MOCOfhKJ8zw0i4Ep+nwST03w
Z/K5Z0W7OPmJDnPsko9DPC3bOeObriTcWgYg4zNaLNSKvDbkbLtDvpUyY8rdJAP3
6H7uTZWMoUQzdIGdSFZHa8HRO1HUZGFZT55bi7czyMPFnzSOtcaz4pCvlhLRk+QA
lsSG+4owhfDrpyPlFtEFNyeaE2fIsJtHxupkwGOmDNeh6iKV46lD8F0SGf2pl5qF
nbbMn6IZlpH7heQqMdPNs1ikGOuDybJISu07S72RSoClgdFzepzXFHoNWwhucdvN
UMJXWBnP/PoeNViI2+nBMrK/1Bwuhci0t5mjTujQNQIDAQABozcwNTAUBgNVHREE
DTALgglsb2NhbGhvc3QwHQYDVR0OBBYEFBuCDjLQbWjX7D+o9dt4nszcP3OIMA0G
CSqGSIb3DQEBCwUAA4IBAQASgexBeY7Nz9i0qREk1RzpQDIT+jM/QAgmnH3G6Ehx
tAYUMYLeDTmGhYhp3GJAU7/R3mbN6t5qg2d9Fa8b+JpBJRdxMY+CyjESoPvUHIE3
lkgNGphT+8QPnh7uO5KOUVnk7Jc9MTwBntDouLHfuzJJnHPlRko3IWnwaivZYVRn
VbnUoSMKKPzFaqqaY8uHPjvs4Gt4OGcYV8hHcjeI3fMHckmsXZclxb3pwF+x698o
Htg5+ydbmWkTspvbMuHx/280Ow0JPSSXFnwWGWpyH7kI0EAfq75W3iGMRR6yL7Je
ffZG7W8KARYx824nRlxbIN2rHo9VQwEBkbmoeg5nSkvi
-----END CERTIFICATE-----`,
CAMUNDA_SECURE_CONNECTION: true,
zeebeGrpcSettings: {
ZEEBE_CLIENT_LOG_LEVEL: 'NONE',
},
},
})
zbc.topology().then(() => {
expect(true).toBe(true)
zbc.close()
// Stop the server after the test
server.tryShutdown((err) => {
if (err) console.error(err)
done()
})
})
}
)
})
41 changes: 21 additions & 20 deletions src/__tests__/zeebe/integration/Client-ThrowError.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,8 @@ jest.setTimeout(25000)
let bpmnProcessId: string
let processDefinitionKey: string

let zbc: ZeebeGrpcClient

beforeAll(async () => {
suppressZeebeLogging()
const zb = new ZeebeGrpcClient()
;({ bpmnProcessId, processDefinitionKey } = (
await zb.deployResource({
processFilename: './src/__tests__/testdata/Client-ThrowError.bpmn',
})
).deployments[0].process)
await cancelProcesses(processDefinitionKey)
await zb.close()
})

beforeEach(() => {
zbc = new ZeebeGrpcClient()
})

afterEach(async () => {
await zbc.close()
})

afterAll(async () => {
Expand All @@ -38,6 +20,13 @@ afterAll(async () => {
})

test('Throws a business error that is caught in the process', async () => {
const zbc = new ZeebeGrpcClient()
;({ bpmnProcessId, processDefinitionKey } = (
await zbc.deployResource({
processFilename: './src/__tests__/testdata/Client-ThrowError.bpmn',
})
).deployments[0].process)
await cancelProcesses(processDefinitionKey)
zbc.createWorker({
taskHandler: (job) =>
job.error('BUSINESS_ERROR', 'Well, that did not work'),
Expand All @@ -57,9 +46,18 @@ test('Throws a business error that is caught in the process', async () => {
variables: {},
})
expect(result.variables.bpmnErrorCaught).toBe(true)
await zbc.close()
})

test('Can set variables when throwing a BPMN Error', async () => {
const zbc = new ZeebeGrpcClient()
;({ bpmnProcessId, processDefinitionKey } = (
await zbc.deployResource({
processFilename: './src/__tests__/testdata/Client-ThrowError.bpmn',
})
).deployments[0].process)
await cancelProcesses(processDefinitionKey)
// This worker takes the first job and throws a BPMN error, setting a variable
zbc.createWorker({
taskHandler: (job) =>
job.error({
Expand All @@ -68,8 +66,8 @@ test('Can set variables when throwing a BPMN Error', async () => {
variables: { something: 'someValue' },
}),
taskType: 'throw-bpmn-error-task',
timeout: Duration.seconds.of(30),
})
// This worker is on the business error throw path
zbc.createWorker({
taskType: 'sad-flow',
taskHandler: (job) =>
Expand All @@ -83,5 +81,8 @@ test('Can set variables when throwing a BPMN Error', async () => {
variables: {},
})
expect(result.variables.bpmnErrorCaught).toBe(true)
// expect(result.variables.something).toBe("someValue")
// This is not working, the variable is not being set on 8.5
// this may be due to incremental implementation of the feature
// expect(result.variables.something).toBe('someValue')
await zbc.close()
})
10 changes: 5 additions & 5 deletions src/__tests__/zeebe/integration/Client-onReady.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ test('Does not call the onReady handler if there is no broker', (done) => {
expect(called).toBe(false)
await zbc2.close()
done(null)
}, 4000)
}, 8000)
})

test('Calls the onReady handler if there is a broker and eagerConnection is true', (done) => {
Expand All @@ -32,7 +32,7 @@ test('Calls the onReady handler if there is a broker and eagerConnection is true
expect(called).toBe(1)
await zbc2.close()
done()
}, 6000)
}, 8000)
})

test('Sets connected to true if there is a broker', (done) => {
Expand All @@ -44,21 +44,21 @@ test('Sets connected to true if there is a broker', (done) => {
expect(zbc2.connected).toBe(true)
await zbc2.close()
done()
}, 6000)
}, 8000)
})

test('emits the ready event if there is a broker and eagerConnection: true', (done) => {
let called = 0
const zbc2 = new ZeebeGrpcClient({
config: { zeebeGrpcSettings: { ZEEBE_GRPC_CLIENT_EAGER_CONNECT: true } },
}).on('ready', () => {
called++
called = called + 1
})

setTimeout(async () => {
expect(called).toBe(1)
expect(zbc2.connected).toBe(true)
await zbc2.close()
done()
}, 6000)
}, 8000)
})
1 change: 1 addition & 0 deletions src/generated/zeebe_grpc_pb.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/no-var-requires */
// GENERATED CODE -- DO NOT EDIT!

'use strict'
Expand Down
1 change: 1 addition & 0 deletions src/generated/zeebe_pb.d.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/ban-types */
// package: gateway_protocol
// file: zeebe.proto

Expand Down
5 changes: 5 additions & 0 deletions src/lib/Configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ const getMainEnv = () =>
type: 'string',
optional: true,
},
/** When using self-signed certificates, provide the root certificate as a string */
CAMUNDA_CUSTOM_ROOT_CERT_STRING: {
type: 'string',
optional: true,
},
/** When using custom or self-signed certificates, provide the path to the certificate chain */
CAMUNDA_CUSTOM_CERT_CHAIN_PATH: {
type: 'string',
Expand Down
5 changes: 4 additions & 1 deletion src/lib/GetCustomCertificateBuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ export async function GetCustomCertificateBuffer(
config: CamundaPlatform8Configuration
): Promise<Buffer | undefined> {
const customRootCertPath = config.CAMUNDA_CUSTOM_ROOT_CERT_PATH
const customRootCert = config.CAMUNDA_CUSTOM_ROOT_CERT_STRING

if (!customRootCertPath) {
if (!customRootCertPath && !customRootCert) {
return undefined
}
const rootCerts: string[] = []
Expand All @@ -21,6 +22,8 @@ export async function GetCustomCertificateBuffer(
if (cert) {
rootCerts.push(cert)
}
} else if (customRootCert) {
rootCerts.push(customRootCert)
}

// (2) use certificates from OS keychain
Expand Down
7 changes: 4 additions & 3 deletions src/zeebe/lib/ConnectionFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,17 @@ export class ConnectionFactory {
grpcConfig.host
)
const log = new StatefulLogInterceptor({ characteristics, logConfig })
const grpcClient = new GrpcMiddleware({
const grpcMiddleware = new GrpcMiddleware({
characteristics,
config: grpcConfig,
log,
}).getGrpcClient()
})
const grpcClient = grpcMiddleware.getGrpcClient()
const _close = grpcClient.close.bind(grpcClient)
grpcClient.close = async () => {
log.close()
_close()
return null
return grpcMiddleware.close()
}

return { grpcClient, log }
Expand Down
6 changes: 3 additions & 3 deletions src/zeebe/lib/GrpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ export class GrpcClient extends EventEmitter {
return this.listNameMethods
}

public close(timeout = 5000): Promise<null> {
public close(timeout = 5000): Promise<void> {
const STATE_SHUTDOWN = 4
const isClosed = (state) => state === STATE_SHUTDOWN

Expand All @@ -453,7 +453,7 @@ export class GrpcClient extends EventEmitter {
if (closed || alreadyClosed) {
this.channelClosed = true
this.emit(MiddlewareSignals.Log.Info, 'Grpc channel closed')
return resolve(null) // setTimeout(() => resolve(), 2000)
return resolve() // setTimeout(() => resolve(), 2000)
}

this.emit(
Expand All @@ -480,7 +480,7 @@ export class GrpcClient extends EventEmitter {
this.emit(MiddlewareSignals.Log.Info, `Closed: ${alreadyClosed}`)
}
if (alreadyClosed) {
return resolve(null)
return resolve()
}
})

Expand Down
6 changes: 5 additions & 1 deletion src/zeebe/lib/GrpcMiddleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ export class GrpcMiddleware {
}
public getGrpcClient = () => this.grpcClient

public close = async () => {
clearTimeout(this.blockingTimer)
}

private createInterceptedGrpcClient(config: GrpcClientCtor) {
const grpcClient = new GrpcClient(config)
const logInterceptor = this.log
Expand All @@ -55,7 +59,7 @@ export class GrpcMiddleware {
clearTimeout(this.blockingTimer)
}
_close()
return null
return
}
grpcClient.on(MiddlewareSignals.Log.Debug, logInterceptor.logDebug)
grpcClient.on(MiddlewareSignals.Log.Info, logInterceptor.logInfo)
Expand Down
Loading

0 comments on commit f828a95

Please sign in to comment.