diff --git a/arq/worker.py b/arq/worker.py index 4c33b677..8fcd5fc8 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -447,8 +447,10 @@ async def start_jobs(self, job_ids: List[bytes]) -> None: await pipe.watch(in_progress_key) ongoing_exists = await pipe.exists(in_progress_key) score = await pipe.zscore(self.queue_name, job_id) - if ongoing_exists or not score: + if ongoing_exists or not score or score > timestamp_ms(): # job already started elsewhere, or already finished and removed from queue + # if score > ts_now, + # it means probably the job was re-enqueued with a delay in another worker self.job_counter = self.job_counter - 1 self.sem.release() logger.debug('job %s already running elsewhere', job_id) diff --git a/tests/test_worker.py b/tests/test_worker.py index 141257c1..93fbc7f0 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -160,6 +160,40 @@ async def test_job_successful(arq_redis: ArqRedis, worker, caplog): assert 'X.XXs → testing:foobar()\n X.XXs ← testing:foobar ● 42' in log +async def test_job_retry_race_condition(arq_redis: ArqRedis, worker): + async def retry_job(ctx): + if ctx['job_try'] == 1: + raise Retry(defer=10) + + job_id = 'testing' + await arq_redis.enqueue_job('retry_job', _job_id=job_id) + + worker_one: Worker = worker(functions=[func(retry_job, name='retry_job')]) + worker_two: Worker = worker(functions=[func(retry_job, name='retry_job')]) + + assert worker_one.jobs_complete == 0 + assert worker_one.jobs_failed == 0 + assert worker_one.jobs_retried == 0 + + assert worker_two.jobs_complete == 0 + assert worker_two.jobs_failed == 0 + assert worker_two.jobs_retried == 0 + + await worker_one.start_jobs([job_id.encode()]) + await asyncio.gather(*worker_one.tasks.values()) + + await worker_two.start_jobs([job_id.encode()]) + await asyncio.gather(*worker_two.tasks.values()) + + assert worker_one.jobs_complete == 0 + assert worker_one.jobs_failed == 0 + assert worker_one.jobs_retried == 1 + + assert worker_two.jobs_complete == 0 + assert worker_two.jobs_failed == 0 + assert worker_two.jobs_retried == 0 + + async def test_job_successful_no_result_logging(arq_redis: ArqRedis, worker, caplog): caplog.set_level(logging.INFO) await arq_redis.enqueue_job('foobar', _job_id='testing')