diff --git a/src/__tests__/testdata/Client-Update-Job-Timeout.bpmn b/src/__tests__/testdata/Client-Update-Job-Timeout.bpmn new file mode 100644 index 00000000..fb675920 --- /dev/null +++ b/src/__tests__/testdata/Client-Update-Job-Timeout.bpmn @@ -0,0 +1,47 @@ + + + + + Flow_1kydzz1 + + + + Flow_0zqeiin + + + + + + + Flow_1kydzz1 + Flow_0zqeiin + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/__tests__/zeebe/integration/Client-Update-Job-Timeout.spec.ts b/src/__tests__/zeebe/integration/Client-Update-Job-Timeout.spec.ts new file mode 100644 index 00000000..2584354e --- /dev/null +++ b/src/__tests__/zeebe/integration/Client-Update-Job-Timeout.spec.ts @@ -0,0 +1,54 @@ +import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib' +import { ZeebeGrpcClient } from '../../../zeebe' +import { cancelProcesses } from '../../../zeebe/lib/cancelProcesses' +import { CreateProcessInstanceResponse } from '../../../zeebe/lib/interfaces-grpc-1.0' + +process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE' +jest.setTimeout(60000) + +let zbc: ZeebeGrpcClient +let wf: CreateProcessInstanceResponse | undefined + +beforeAll(() => suppressZeebeLogging()) +afterAll(() => restoreZeebeLogging()) + +beforeEach(() => { + zbc = new ZeebeGrpcClient() +}) + +afterEach(async () => { + try { + if (wf?.processInstanceKey) { + await zbc.cancelProcessInstance(wf.processInstanceKey) + } + } catch (e: unknown) { + // console.log('Caught NOT FOUND') // @DEBUG + } finally { + await zbc.close() // Makes sure we don't forget to close connection + } +}) + +test('can update Job Timeout', async () => { + const res = await zbc.deployResource({ + processFilename: './src/__tests__/testdata/Client-Update-Job-Timeout.bpmn', + }) + await cancelProcesses(res.deployments[0].process.processDefinitionKey) + wf = await zbc.createProcessInstance({ + bpmnProcessId: 'update-job-timeout-process', + variables: {}, + }) + + const worker = zbc.createWorker({ + taskType: 'update-job-timeout', + taskHandler: async (job) => { + await zbc.updateJobTimeout({ + jobKey: job.key, + timeout: 3000, + }) + return job.complete().then(async (res) => { + await worker.close() + return res + }) + }, + }) +}) diff --git a/src/zeebe/lib/interfaces-1.0.ts b/src/zeebe/lib/interfaces-1.0.ts index 685db16e..5925d472 100644 --- a/src/zeebe/lib/interfaces-1.0.ts +++ b/src/zeebe/lib/interfaces-1.0.ts @@ -32,6 +32,7 @@ import { ThrowErrorRequest, TopologyResponse, UpdateJobRetriesRequest, + UpdateJobTimeoutRequest, } from './interfaces-grpc-1.0' import { Loglevel, ZBCustomLogger } from './interfaces-published-contract' @@ -403,6 +404,9 @@ export interface ZBGrpc extends GrpcClient { updateJobRetriesSync( updateJobRetriesRequest: UpdateJobRetriesRequest ): Promise + updateJobTimeoutSync( + updateJobTimeoutRequest: UpdateJobTimeoutRequest + ): Promise deleteResourceSync: ( deleteResourceRequest: DeleteResourceRequest ) => Promise> diff --git a/src/zeebe/lib/interfaces-grpc-1.0.ts b/src/zeebe/lib/interfaces-grpc-1.0.ts index 2d146df6..17fb0cb0 100644 --- a/src/zeebe/lib/interfaces-grpc-1.0.ts +++ b/src/zeebe/lib/interfaces-grpc-1.0.ts @@ -465,6 +465,12 @@ export interface UpdateJobRetriesRequest { retries: number } +export interface UpdateJobTimeoutRequest { + readonly jobKey: string + /** the duration of the new timeout in ms, starting from the current moment */ + timeout: number +} + export interface FailJobRequest { readonly jobKey: string retries: number diff --git a/src/zeebe/zb/ZeebeGrpcClient.ts b/src/zeebe/zb/ZeebeGrpcClient.ts index 68ffab87..76d768cb 100644 --- a/src/zeebe/zb/ZeebeGrpcClient.ts +++ b/src/zeebe/zb/ZeebeGrpcClient.ts @@ -1215,6 +1215,25 @@ export class ZeebeGrpcClient extends TypedEmitter< ) } + /** + Updates the deadline of a job using the timeout (in ms) provided. This can be used + for extending or shortening the job deadline. + + Errors: + NOT_FOUND: + - no job exists with the given key + + INVALID_STATE: + - no deadline exists for the given job key + */ + public updateJobTimeout( + updateJobTimeoutRequest: Grpc.UpdateJobTimeoutRequest + ): Promise { + return this.executeOperation('updateJobTimeout', async () => + (await this.grpc).updateJobTimeoutSync(updateJobTimeoutRequest) + ) + } + private constructGrpcClient({ grpcConfig, logConfig,