Skip to content

Commit

Permalink
Alpha (#174)
Browse files Browse the repository at this point in the history
* feat(zeebe): add updateJobTimeout method (#172)

* feat(zeebe): add updateJobTimeout method

fixes #171

* feat(zeebe): support StreamActivatedJobs RPC (#160)

* feat(zeebe): support StreamActivatedJobs RPC

fixes #17

* test(zeebe): add inputVariableDto to test

* refactor(zeebe): incorporate DTO decoding

* test(zeebe): fix StreamJobs test

* docs(zeebe): document StreamJobs

* feat(zeebe): enable compression

* test(zeebe): fix StreamJobs test

* test(zeebe): add delay in StreamJobs test

* test(zeebe): close Zeebe cllent in StreamJob test

* fix(zeebe): cleanup Job Streams on close

* feat(zeebe): return close method from StreamJobs

* chore(release): 8.5.5-alpha.1 [skip ci]

## [8.5.5-alpha.1](v8.5.4...v8.5.5-alpha.1) (2024-06-05)

### Features

* **zeebe:** add updateJobTimeout method ([#172](#172)) ([5eff624](5eff624)), closes [#171](#171)
* **zeebe:** support StreamActivatedJobs RPC ([#160](#160)) ([258296a](258296a)), closes [#17](#17)

* chore(release): 8.5.5-alpha.1 [skip ci]

## [8.5.5-alpha.1](v8.5.4...v8.5.5-alpha.1) (2024-06-05)

### Features

* **zeebe:** add updateJobTimeout method ([#172](#172)) ([5eff624](5eff624)), closes [#171](#171)
* **zeebe:** support StreamActivatedJobs RPC ([#160](#160)) ([258296a](258296a)), closes [#17](#17)

---------

Co-authored-by: semantic-release-bot <semantic-release-bot@martynus.net>
  • Loading branch information
jwulf and semantic-release-bot authored Jun 5, 2024
1 parent 570d5bb commit 5151eb0
Show file tree
Hide file tree
Showing 16 changed files with 778 additions and 14 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## [8.5.5-alpha.1](https://github.com/camunda/camunda-8-js-sdk/compare/v8.5.4...v8.5.5-alpha.1) (2024-06-05)


### Features

* **zeebe:** add updateJobTimeout method ([#172](https://github.com/camunda/camunda-8-js-sdk/issues/172)) ([5eff624](https://github.com/camunda/camunda-8-js-sdk/commit/5eff6243dbce5fd296daeedcf6191ef4c4d4b609)), closes [#171](https://github.com/camunda/camunda-8-js-sdk/issues/171)
* **zeebe:** support StreamActivatedJobs RPC ([#160](https://github.com/camunda/camunda-8-js-sdk/issues/160)) ([258296a](https://github.com/camunda/camunda-8-js-sdk/commit/258296aef6558f976dd299ea977514d58d822141)), closes [#17](https://github.com/camunda/camunda-8-js-sdk/issues/17)

## [8.5.4](https://github.com/camunda/camunda-8-js-sdk/compare/v8.5.3...v8.5.4) (2024-05-24)


Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ The SDK uses the [`debug`](https://github.com/debug-js/debug) library. To enable
| `camunda:worker` | Zeebe Worker |
| `camunda:zeebeclient` | Zeebe Client |

Here is an example of turning on debugging for the OAuth and Operate components:

```bash
DEBUG=camunda:oauth,camunda:operate node app.js
```

## Typing of Zeebe worker variables

The variable payload in a Zeebe worker task handler is available as an object `job.variables`. By default, this is of type `any`.
Expand Down Expand Up @@ -263,3 +269,15 @@ This follows the same strategy as the job variables, as previously described.
From 8.5, you can use Zeebe user tasks. See the documentation on [how to migrate to Zeebe user tasks](https://docs.camunda.io/docs/apis-tools/tasklist-api-rest/migrate-to-zeebe-user-tasks/).

The SDK supports the Zeebe REST API. Be sure to set the `ZEEBE_REST_ADDRESS` either via environment variable or configuration field.

## Job Streaming

The Zeebe gRPC API supports streaming available jobs, rather than polling for them.

The ZeebeGrpcClient method `StreamJobs` allows you to use this API.

Please note that only jobs that become available _after_ the stream is opened are pushed to the client. For jobs that were already activatable _before_ the method is called, you need to use a polling worker.

In this release, this is not handled for you. You must both poll and stream jobs to make sure that you get jobs that were available before your application started as well as jobs that become available after your application starts.

In a subsequent release, the ZeebeWorker will transparently handle this for you.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@camunda8/sdk",
"version": "8.5.4",
"version": "8.5.5-alpha.1",
"description": "",
"main": "dist/index.js",
"scripts": {
Expand Down
6 changes: 5 additions & 1 deletion src/__tests__/config/jest.cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@ export const cleanUp = async () => {
for (const key of instancesKeys) {
try {
await zeebe.cancelProcessInstance(key)
console.log(`Cancelled process instance ${key}`)
} catch (e) {
console.log('Error cancelling process instance', key)
console.log('Failed to cancel process instance', key)
console.log((e as Error).message)
console.log(
`Don't worry about it - Operate is eventually consistent.`
)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/__tests__/config/jest.globalSetup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ import { cleanUp } from './jest.cleanup'

export default async () => {
console.log('Running global setup...')
cleanUp()
await cleanUp()
}
47 changes: 47 additions & 0 deletions src/__tests__/testdata/Client-Update-Job-Timeout.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_0eh4n28" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="update-job-timeout-process" name="Update Job Timeout" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start Update Job Timeout Test">
<bpmn:outgoing>Flow_1kydzz1</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1kydzz1" sourceRef="StartEvent_1" targetRef="Activity_0rpbuc4" />
<bpmn:endEvent id="Event_1hdghel" name="Update Job Timeout Test Completed">
<bpmn:incoming>Flow_0zqeiin</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0zqeiin" sourceRef="Activity_0rpbuc4" targetRef="Event_1hdghel" />
<bpmn:serviceTask id="Activity_0rpbuc4" name="test job">
<bpmn:extensionElements>
<zeebe:taskDefinition type="update-job-timeout" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_1kydzz1</bpmn:incoming>
<bpmn:outgoing>Flow_0zqeiin</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="update-job-timeout-process">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="156" y="142" width="83" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1hdghel_di" bpmnElement="Event_1hdghel">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="419" y="142" width="63" height="40" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_17t8oqh_di" bpmnElement="Activity_0rpbuc4">
<dc:Bounds x="270" y="77" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1kydzz1_di" bpmnElement="Flow_1kydzz1">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0zqeiin_di" bpmnElement="Flow_0zqeiin">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
48 changes: 48 additions & 0 deletions src/__tests__/testdata/StreamJobs.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_1hcuhqo" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.22.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="stream-jobs" name="Stream Jobs" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Start">
<bpmn:outgoing>Flow_1jco6ri</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1jco6ri" sourceRef="StartEvent_1" targetRef="Activity_0xyl2dv" />
<bpmn:endEvent id="Event_1kkcbv7" name="End">
<bpmn:incoming>Flow_1l2fpc9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1l2fpc9" sourceRef="Activity_0xyl2dv" targetRef="Event_1kkcbv7" />
<bpmn:serviceTask id="Activity_0xyl2dv" name="stream-job">
<bpmn:extensionElements>
<zeebe:taskDefinition type="stream-job" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_1jco6ri</bpmn:incoming>
<bpmn:outgoing>Flow_1l2fpc9</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="stream-jobs">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="185" y="142" width="24" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0zd24as_di" bpmnElement="Activity_0xyl2dv">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1kkcbv7_di" bpmnElement="Event_1kkcbv7">
<dc:Bounds x="432" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="440" y="142" width="20" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1jco6ri_di" bpmnElement="Flow_1jco6ri">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1l2fpc9_di" bpmnElement="Flow_1l2fpc9">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
54 changes: 54 additions & 0 deletions src/__tests__/zeebe/integration/Client-Update-Job-Timeout.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe'
import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses'
import { CreateProcessInstanceResponse } from '../../../zeebe/lib/interfaces-grpc-1.0'

process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE'
jest.setTimeout(60000)

let zbc: ZeebeGrpcClient
let wf: CreateProcessInstanceResponse | undefined

beforeAll(() => suppressZeebeLogging())
afterAll(() => restoreZeebeLogging())

beforeEach(() => {
zbc = new ZeebeGrpcClient()
})

afterEach(async () => {
try {
if (wf?.processInstanceKey) {
await zbc.cancelProcessInstance(wf.processInstanceKey)
}
} catch (e: unknown) {
// console.log('Caught NOT FOUND') // @DEBUG
} finally {
await zbc.close() // Makes sure we don't forget to close connection
}
})

test('can update Job Timeout', async () => {
const res = await zbc.deployResource({
processFilename: './src/__tests__/testdata/Client-Update-Job-Timeout.bpmn',
})
await cancelProcesses(res.deployments[0].process.processDefinitionKey)
wf = await zbc.createProcessInstance({
bpmnProcessId: 'update-job-timeout-process',
variables: {},
})

const worker = zbc.createWorker({
taskType: 'update-job-timeout',
taskHandler: async (job) => {
await zbc.updateJobTimeout({
jobKey: job.key,
timeout: 3000,
})
return job.complete().then(async (res) => {
await worker.close()
return res
})
},
})
})
67 changes: 67 additions & 0 deletions src/__tests__/zeebe/integration/StreamJobs.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
import { ZeebeGrpcClient } from '../../../zeebe'
import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses'

process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE'
jest.setTimeout(25000)

let bpmnProcessId: string
let processDefinitionKey: string

beforeAll(async () => {
suppressZeebeLogging()
})

afterAll(async () => {
restoreZeebeLogging()
await cancelProcesses(processDefinitionKey)
})

test('Can activate jobs using StreamActivatedJobs RPC', async () => {
const zbc = new ZeebeGrpcClient()
;({ bpmnProcessId, processDefinitionKey } = (
await zbc.deployResource({
processFilename: './src/__tests__/testdata/StreamJobs.bpmn',
})
).deployments[0].process)
await cancelProcesses(processDefinitionKey)

await new Promise((resolve) => {
let counter = 0
zbc.streamJobs({
type: 'stream-job',
worker: 'test-worker',
tenantIds: ['<default>'],
taskHandler: (job) => {
counter++
expect(job.variables.foo).toBe('bar')
const res = job.complete({})
if (counter === 3) {
zbc.close()
resolve(null)
}
return res
},
inputVariableDto: class {
foo!: string
},
fetchVariables: [],
timeout: 30000,
})
// Wait two seconds to ensure the stream is active
new Promise((resolve) => setTimeout(resolve, 2000)).then(() => {
zbc.createProcessInstance({
bpmnProcessId,
variables: { foo: 'bar' },
})
zbc.createProcessInstance({
bpmnProcessId,
variables: { foo: 'bar' },
})
zbc.createProcessInstance({
bpmnProcessId,
variables: { foo: 'bar' },
})
})
})
})
49 changes: 49 additions & 0 deletions src/subscription/Subscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import EventEmitter from 'events'

export class Subscription<T> extends EventEmitter {
private _cancelled = false
dataGenerator: (interval: number) => AsyncGenerator<T, void, unknown>

constructor(restCall: () => Promise<T>) {
super()
// Define a function generator that fetches data at regular intervals
this.dataGenerator = async function* (
interval: number
// eslint-disable-next-line @typescript-eslint/no-explicit-any
): AsyncGenerator<T, void, unknown> {
while (true) {
try {
const data = await restCall()
if (data) {
// or 404
// If data is available, yield it
yield data
// Break the loop if data is available
break
}
} catch (error) {
console.error('Error fetching data:', error)
}
// Wait for the specified interval before trying again
await new Promise((resolve) => setTimeout(resolve, interval))
}
}
this.start()
}

public cancel() {
this._cancelled = true
this.emit('cancelled')
}

public get cancelled() {
return this._cancelled
}

private async start() {
const generator = this.dataGenerator(5000) // Fetch data every 5 seconds
for await (const data of generator) {
this.emit('data', data)
}
}
}
21 changes: 21 additions & 0 deletions src/zeebe/lib/GrpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,26 @@ export class GrpcClient extends EventEmitter {
*/
'grpc.http2.max_pings_without_data':
this.config.zeebeGrpcSettings.GRPC_HTTP2_MAX_PINGS_WITHOUT_DATA,
/**
* Default compression algorithm for the channel, applies to sending messages.
*
* Possible values for this option are:
* - `0` - No compression
* - `1` - Compress with DEFLATE algorithm
* - `2` - Compress with GZIP algorithm
* - `3` - Stream compression with GZIP algorithm
*/
'grpc.default_compression_algorithm': 2,
/**
* Default compression level for the channel, applies to receiving messages.
*
* Possible values for this option are:
* - `0` - None
* - `1` - Low level
* - `2` - Medium level
* - `3` - High level
*/
'grpc.default_compression_level': 2,
interceptors: [this.interceptor],
})
this.listNameMethods = []
Expand Down Expand Up @@ -299,6 +319,7 @@ export class GrpcClient extends EventEmitter {
let stream: ClientReadableStream<unknown>
const timeNormalisedRequest =
replaceTimeValuesWithMillisecondNumber(data)
debug('TimeNormalisedRequest', timeNormalisedRequest)
try {
const metadata = await this.getAuthToken()

Expand Down
Loading

0 comments on commit 5151eb0

Please sign in to comment.