From 975656a26ef048a98ed9c4d98ed70aa411d02965 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Tue, 31 Dec 2024 17:34:12 +0100 Subject: [PATCH 1/2] fix(job-scheduler): avoid duplicates when upserting in a quick sequence --- src/classes/job-scheduler.ts | 32 ++++++++------- src/commands/removeJobScheduler-3.lua | 2 +- tests/test_job_scheduler.ts | 59 +++++++++++++++++++++++++-- 3 files changed, 74 insertions(+), 19 deletions(-) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 743af75b72..f8646381eb 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -81,36 +81,38 @@ export class JobScheduler extends QueueBase { return; } + const prevMillis = opts.prevMillis || 0; + now = prevMillis < now ? now : prevMillis; + // Check if we have a start date for the repeatable job const { startDate, immediately, ...filteredRepeatOpts } = repeatOpts; + let startMillis = now; if (startDate) { - const startMillis = new Date(startDate).getTime(); - now = startMillis > now ? startMillis : now; + startMillis = new Date(startDate).getTime(); + startMillis = startMillis > now ? startMillis : now; } - const prevMillis = opts.prevMillis || 0; - now = prevMillis < now ? now : prevMillis; - let nextMillis: number; - let newOffset = offset; + let newOffset = offset || 0; if (every) { - const nextSlot = Math.floor(now / every) * every + every; + const prevSlot = Math.floor(startMillis / every) * every; + const nextSlot = prevSlot + every; if (prevMillis || offset) { - nextMillis = nextSlot + (offset || 0); + nextMillis = nextSlot; } else { - nextMillis = now; - newOffset = every - (nextSlot - now); + nextMillis = prevSlot; + newOffset = startMillis - prevSlot; - // newOffset should always be positive, but as an extra safety check + // newOffset should always be positive, but we do an extra safety check newOffset = newOffset < 0 ? 0 : newOffset; } + } else if (pattern) { + nextMillis = await this.repeatStrategy(now, repeatOpts, jobName); if (nextMillis < now) { nextMillis = now; } - } else if (pattern) { - nextMillis = await this.repeatStrategy(now, repeatOpts, jobName); } const multi = (await this.client).multi(); @@ -163,6 +165,7 @@ export class JobScheduler extends QueueBase { (multi) as RedisClient, jobName, nextMillis, + newOffset, jobSchedulerId, { ...opts, @@ -203,6 +206,7 @@ export class JobScheduler extends QueueBase { client: RedisClient, name: N, nextMillis: number, + offset: number, jobSchedulerId: string, opts: JobsOptions, data: T, @@ -219,7 +223,7 @@ export class JobScheduler extends QueueBase { }); const now = Date.now(); - const delay = nextMillis - now; + const delay = nextMillis + offset - now; const mergedOpts = { ...opts, diff --git a/src/commands/removeJobScheduler-3.lua b/src/commands/removeJobScheduler-3.lua index ebcc98b69a..921e351374 100644 --- a/src/commands/removeJobScheduler-3.lua +++ b/src/commands/removeJobScheduler-3.lua @@ -1,6 +1,6 @@ --[[ - Removes a repeatable job + Removes a job scheduler and its next scheduled job. Input: KEYS[1] job schedulers key KEYS[2] delayed jobs key diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index c2da779a83..8a14d839d1 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -205,6 +205,53 @@ describe('Job Scheduler', function () { }); }); + describe('when job schedulers are upserted in quick succession', function () { + let worker: Worker; + beforeEach(async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + worker = new Worker( + queueName, + async () => { + await this.clock.tickAsync(1); + }, + { + connection, + prefix, + concurrency: 1, + }, + ); + await worker.waitUntilReady(); + }); + afterEach(async function () { + await worker.close(); + }); + it('should create only one job scheduler and one delayed job', async function () { + const jobSchedulerId = 'test'; + await queue.upsertJobScheduler(jobSchedulerId, { + every: ONE_MINUTE * 5, + }); + await this.clock.tickAsync(1); + await queue.upsertJobScheduler(jobSchedulerId, { + every: ONE_MINUTE * 5, + }); + + await queue.upsertJobScheduler(jobSchedulerId, { + every: ONE_MINUTE * 5, + }); + + await queue.upsertJobScheduler(jobSchedulerId, { + every: ONE_MINUTE * 5, + }); + + const repeatableJobs = await queue.getJobSchedulers(); + expect(repeatableJobs.length).to.be.eql(1); + await this.clock.tickAsync(ONE_MINUTE); + const delayed = await queue.getDelayed(); + expect(delayed).to.have.length(1); + }); + }); + describe('when clocks are slightly out of sync', function () { it('should create only one delayed job', async function () { const date = new Date('2017-02-07 9:24:00'); @@ -1684,13 +1731,17 @@ describe('Job Scheduler', function () { await processingAfterFailing; + const failedCountAfterProcessing = await queue.getFailedCount(); + expect(failedCountAfterProcessing).to.be.equal(0); + await worker.close(); + const waitingCount = await queue.getWaitingCount(); const delayedCount2 = await queue.getDelayedCount(); - expect(delayedCount2).to.be.equal(1); - const waitingCount = await queue.getWaitingCount(); - expect(waitingCount).to.be.equal(0); + // Due to asynchronicities, the next job could be already in waiting state + // We just check that both are 1, as it should only exist 1 job in either waiting or delayed state + expect(waitingCount + delayedCount2).to.be.equal(1); }); it('should not create a new delayed job if the failed job is retried with Job.retry()', async function () { @@ -2250,7 +2301,7 @@ describe('Job Scheduler', function () { delayStub.restore(); }); - it('should repeat every 2 seconds with a startDate in the future', async function () { + it('should repeat every day with a startDate in the future', async function () { this.timeout(10000); // Set the initial system time From 6190b1170bc90fc5656f12963570217951a9efe3 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 31 Dec 2024 15:02:59 -0500 Subject: [PATCH 2/2] test: refactor test case --- tests/test_job_scheduler.ts | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index 8a14d839d1..160d9c8a7a 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -206,11 +206,10 @@ describe('Job Scheduler', function () { }); describe('when job schedulers are upserted in quick succession', function () { - let worker: Worker; - beforeEach(async function () { + it('should create only one job scheduler and one delayed job', async function () { const date = new Date('2017-02-07 9:24:00'); this.clock.setSystemTime(date); - worker = new Worker( + const worker = new Worker( queueName, async () => { await this.clock.tickAsync(1); @@ -222,11 +221,7 @@ describe('Job Scheduler', function () { }, ); await worker.waitUntilReady(); - }); - afterEach(async function () { - await worker.close(); - }); - it('should create only one job scheduler and one delayed job', async function () { + const jobSchedulerId = 'test'; await queue.upsertJobScheduler(jobSchedulerId, { every: ONE_MINUTE * 5, @@ -249,6 +244,8 @@ describe('Job Scheduler', function () { await this.clock.tickAsync(ONE_MINUTE); const delayed = await queue.getDelayed(); expect(delayed).to.have.length(1); + + await worker.close(); }); });