Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(job-scheduler): avoid duplicates when upserting in a quick sequence #2991

Merged
merged 2 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,36 +81,38 @@ export class JobScheduler extends QueueBase {
return;
}

const prevMillis = opts.prevMillis || 0;
now = prevMillis < now ? now : prevMillis;

// Check if we have a start date for the repeatable job
const { startDate, immediately, ...filteredRepeatOpts } = repeatOpts;
let startMillis = now;
if (startDate) {
const startMillis = new Date(startDate).getTime();
now = startMillis > now ? startMillis : now;
startMillis = new Date(startDate).getTime();
startMillis = startMillis > now ? startMillis : now;
}

const prevMillis = opts.prevMillis || 0;
now = prevMillis < now ? now : prevMillis;

let nextMillis: number;
let newOffset = offset;
let newOffset = offset || 0;

if (every) {
const nextSlot = Math.floor(now / every) * every + every;
const prevSlot = Math.floor(startMillis / every) * every;
const nextSlot = prevSlot + every;
if (prevMillis || offset) {
nextMillis = nextSlot + (offset || 0);
nextMillis = nextSlot;
} else {
nextMillis = now;
newOffset = every - (nextSlot - now);
nextMillis = prevSlot;
newOffset = startMillis - prevSlot;

// newOffset should always be positive, but as an extra safety check
// newOffset should always be positive, but we do an extra safety check
newOffset = newOffset < 0 ? 0 : newOffset;
}
} else if (pattern) {
nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);

if (nextMillis < now) {
nextMillis = now;
}
} else if (pattern) {
nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
}

const multi = (await this.client).multi();
Expand Down Expand Up @@ -163,6 +165,7 @@ export class JobScheduler extends QueueBase {
(<unknown>multi) as RedisClient,
jobName,
nextMillis,
newOffset,
jobSchedulerId,
{
...opts,
Expand Down Expand Up @@ -203,6 +206,7 @@ export class JobScheduler extends QueueBase {
client: RedisClient,
name: N,
nextMillis: number,
offset: number,
jobSchedulerId: string,
opts: JobsOptions,
data: T,
Expand All @@ -219,7 +223,7 @@ export class JobScheduler extends QueueBase {
});

const now = Date.now();
const delay = nextMillis - now;
const delay = nextMillis + offset - now;

const mergedOpts = {
...opts,
Expand Down
2 changes: 1 addition & 1 deletion src/commands/removeJobScheduler-3.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

--[[
Removes a repeatable job
Removes a job scheduler and its next scheduled job.
Input:
KEYS[1] job schedulers key
KEYS[2] delayed jobs key
Expand Down
56 changes: 52 additions & 4 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,50 @@ describe('Job Scheduler', function () {
});
});

describe('when job schedulers are upserted in quick succession', function () {
it('should create only one job scheduler and one delayed job', async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);
const worker = new Worker(
queueName,
async () => {
await this.clock.tickAsync(1);
},
{
connection,
prefix,
concurrency: 1,
},
);
await worker.waitUntilReady();

const jobSchedulerId = 'test';
await queue.upsertJobScheduler(jobSchedulerId, {
every: ONE_MINUTE * 5,
});
await this.clock.tickAsync(1);
await queue.upsertJobScheduler(jobSchedulerId, {
every: ONE_MINUTE * 5,
});

await queue.upsertJobScheduler(jobSchedulerId, {
every: ONE_MINUTE * 5,
});

await queue.upsertJobScheduler(jobSchedulerId, {
every: ONE_MINUTE * 5,
});

const repeatableJobs = await queue.getJobSchedulers();
expect(repeatableJobs.length).to.be.eql(1);
await this.clock.tickAsync(ONE_MINUTE);
const delayed = await queue.getDelayed();
expect(delayed).to.have.length(1);

await worker.close();
});
});

describe('when clocks are slightly out of sync', function () {
it('should create only one delayed job', async function () {
const date = new Date('2017-02-07 9:24:00');
Expand Down Expand Up @@ -1684,13 +1728,17 @@ describe('Job Scheduler', function () {

await processingAfterFailing;

const failedCountAfterProcessing = await queue.getFailedCount();
expect(failedCountAfterProcessing).to.be.equal(0);

await worker.close();

const waitingCount = await queue.getWaitingCount();
const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);

const waitingCount = await queue.getWaitingCount();
expect(waitingCount).to.be.equal(0);
// Due to asynchronicities, the next job could be already in waiting state
// We just check that both are 1, as it should only exist 1 job in either waiting or delayed state
expect(waitingCount + delayedCount2).to.be.equal(1);
});

it('should not create a new delayed job if the failed job is retried with Job.retry()', async function () {
Expand Down Expand Up @@ -2250,7 +2298,7 @@ describe('Job Scheduler', function () {
delayStub.restore();
});

it('should repeat every 2 seconds with a startDate in the future', async function () {
it('should repeat every day with a startDate in the future', async function () {
this.timeout(10000);

// Set the initial system time
Expand Down
Loading