Skip to content

Commit

Permalink
Fix race condition on task retry (#487)
Browse files Browse the repository at this point in the history
  • Loading branch information
RB387 authored Nov 29, 2024
1 parent 3914e48 commit 2f752e2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
4 changes: 3 additions & 1 deletion arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,10 @@ async def start_jobs(self, job_ids: List[bytes]) -> None:
await pipe.watch(in_progress_key)
ongoing_exists = await pipe.exists(in_progress_key)
score = await pipe.zscore(self.queue_name, job_id)
if ongoing_exists or not score:
if ongoing_exists or not score or score > timestamp_ms():
# job already started elsewhere, or already finished and removed from queue
# if score > ts_now,
# it means probably the job was re-enqueued with a delay in another worker
self.job_counter = self.job_counter - 1
self.sem.release()
logger.debug('job %s already running elsewhere', job_id)
Expand Down
34 changes: 34 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,40 @@ async def test_job_successful(arq_redis: ArqRedis, worker, caplog):
assert 'X.XXs → testing:foobar()\n X.XXs ← testing:foobar ● 42' in log


async def test_job_retry_race_condition(arq_redis: ArqRedis, worker):
async def retry_job(ctx):
if ctx['job_try'] == 1:
raise Retry(defer=10)

job_id = 'testing'
await arq_redis.enqueue_job('retry_job', _job_id=job_id)

worker_one: Worker = worker(functions=[func(retry_job, name='retry_job')])
worker_two: Worker = worker(functions=[func(retry_job, name='retry_job')])

assert worker_one.jobs_complete == 0
assert worker_one.jobs_failed == 0
assert worker_one.jobs_retried == 0

assert worker_two.jobs_complete == 0
assert worker_two.jobs_failed == 0
assert worker_two.jobs_retried == 0

await worker_one.start_jobs([job_id.encode()])
await asyncio.gather(*worker_one.tasks.values())

await worker_two.start_jobs([job_id.encode()])
await asyncio.gather(*worker_two.tasks.values())

assert worker_one.jobs_complete == 0
assert worker_one.jobs_failed == 0
assert worker_one.jobs_retried == 1

assert worker_two.jobs_complete == 0
assert worker_two.jobs_failed == 0
assert worker_two.jobs_retried == 0


async def test_job_successful_no_result_logging(arq_redis: ArqRedis, worker, caplog):
caplog.set_level(logging.INFO)
await arq_redis.enqueue_job('foobar', _job_id='testing')
Expand Down

0 comments on commit 2f752e2

Please sign in to comment.