Skip to content

Commit

Permalink
fix(job-scheduler): add next delayed job only when prevMillis matches…
Browse files Browse the repository at this point in the history
… with producerId (#3001)
  • Loading branch information
roggervalf authored Jan 12, 2025
1 parent 442faa3 commit 4ea35dd
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 29 deletions.
3 changes: 2 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ export class Scripts {
queueKeys.delayed,
queueKeys.events,
queueKeys.repeat,
producerId ? this.queue.toKey(producerId) : '',
];

const args = [
Expand All @@ -377,7 +378,7 @@ export class Scripts {
pack(delayedJobOpts),
Date.now(),
queueKeys[''],
producerId ? this.queue.toKey(producerId) : '',
producerId,
];

return this.execCommand(client, 'updateJobScheduler', keys.concat(args));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
--[[
Updates a job scheduler and adds next delayed job
Input:
KEYS[1] 'marker',
KEYS[2] 'meta'
KEYS[3] 'id'
KEYS[4] 'delayed'
KEYS[5] events stream key
KEYS[6] 'repeat' key
ARGV[1] next milliseconds
ARGV[2] jobs scheduler id
ARGV[3] msgpacked delayed opts
ARGV[4] timestamp
ARGV[5] prefix key
ARGV[6] producer key
Output:
next delayed job id - OK
Input:
KEYS[1] 'marker',
KEYS[2] 'meta'
KEYS[3] 'id'
KEYS[4] 'delayed'
KEYS[5] events stream key
KEYS[6] 'repeat' key
KEYS[7] producer key
ARGV[1] next milliseconds
ARGV[2] jobs scheduler id
ARGV[3] msgpacked delayed opts
ARGV[4] timestamp
ARGV[5] prefix key
ARGV[6] producer id
Output:
next delayed job id - OK
]]
local rcall = redis.call
local repeatKey = KEYS[6]
Expand All @@ -26,6 +27,7 @@ local timestamp = ARGV[4]
local nextMillis = ARGV[1]
local jobSchedulerId = ARGV[2]
local prefixKey = ARGV[5]
local producerId = ARGV[6]

-- Includes
--- @include "includes/addDelayedJob"
Expand All @@ -38,24 +40,28 @@ local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis
-- Validate that scheduler exists.
local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
if prevMillis ~= false then
local currentDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis

if producerId == currentDelayedJobId then
local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data")

rcall("ZADD", repeatKey, nextMillis, jobSchedulerId)

local eventsKey = KEYS[5]
local metaKey = KEYS[2]
local maxEvents = getOrSetMaxEvents(metaKey)

rcall("INCR", KEYS[3])

local delayedOpts = cmsgpack.unpack(ARGV[3])

addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1],
schedulerAttributes[2] or "{}", delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)
if ARGV[6] ~= "" then
rcall("HSET", ARGV[6], "nrjid", nextDelayedJobId)

if KEYS[7] ~= "" then
rcall("HSET", KEYS[7], "nrjid", nextDelayedJobId)
end

return nextDelayedJobId .. "" -- convert to string

return nextDelayedJobId .. "" -- convert to string
end
end
67 changes: 65 additions & 2 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ import {
getNextMillis,
Worker,
} from '../src/classes';
import { JobsOptions } from '../src/types';
import { removeAllQueueData } from '../src/utils';
import { delay, removeAllQueueData } from '../src/utils';

const moment = require('moment');

Expand Down Expand Up @@ -2047,6 +2046,70 @@ describe('Job Scheduler', function () {
await worker.close();
});

describe('when overriding a scheduler', function () {
it('should not continue adding new delayed jobs from previous delayed record', async function () {
this.clock.restore();

const repeatOpts = { pattern: '*/2 * * * * *' };

let count = 0;
const worker = new Worker(
queueName,
async () => {
if (count === 0) {
await delay(2000);
await queue.pause(); // keep job in waiting list
}
},
{ connection, prefix },
);

const completing = new Promise<void>(async resolve => {
worker.on('completed', async () => {
count++;
if (count === 1) {
const waitingCount = await queue.getWaitingCount();
expect(waitingCount).to.eql(1);

await queue.upsertJobScheduler(
'test',
{ pattern: '*/15 * * * * *' },
{
data: { foo: 'baz' },
},
);

const waitingCount2 = await queue.getWaitingCount();
expect(waitingCount2).to.eql(1);
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.eql(1);

await queue.resume();
} else {
resolve();
}
});
});

await queue.upsertJobScheduler('test', repeatOpts, {
data: { foo: 'bar' },
});

await completing;

const schedulerCount = await queue.getJobSchedulersCount();
expect(schedulerCount).to.eql(1);

const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.eql(1);

const totalJobs = await queue.getJobCountByTypes();
expect(totalJobs).to.eql(3); // 2 completed, 1 delayed

await worker.close();
});
});

it('should allow adding a repeatable job after removing it', async function () {
const repeat = {
pattern: '*/5 * * * *',
Expand Down

0 comments on commit 4ea35dd

Please sign in to comment.