Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prevent duplicate jobs when result exists #138

Merged
merged 1 commit into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
History
-------

v0.16.1 (unreleased)
....................
* prevent duplicate ``job_id`` when job result exists, fix #137

v0.16 (2019-07-30)
..................
* improved error when a job is aborted (eg. function not found)
Expand Down
3 changes: 2 additions & 1 deletion arq/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ async def enqueue_job(
pipe.unwatch()
pipe.watch(job_key)
job_exists = pipe.exists(job_key)
job_result_exists = pipe.exists(result_key_prefix + job_id)
await pipe.execute()
if await job_exists:
if await job_exists or await job_result_exists:
return

enqueue_time_ms = timestamp_ms()
Expand Down
14 changes: 14 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from arq.connections import ArqRedis
from arq.constants import default_queue_name, health_check_key_suffix, job_key_prefix
from arq.jobs import Job, JobStatus
from arq.worker import FailedJobs, JobExecutionFailed, Retry, Worker, async_check_health, check_health, func, run_worker


Expand Down Expand Up @@ -403,3 +404,16 @@ async def test_many_jobs_expire(arq_redis: ArqRedis, worker, caplog):
log = '\n'.join(r.message for r in caplog.records)
assert 'job testing-0 expired' in log
assert log.count(' expired') == 100


async def test_repeat_job_result(arq_redis: ArqRedis, worker):
j1 = await arq_redis.enqueue_job('foobar', _job_id='job_id')
assert isinstance(j1, Job)
assert await j1.status() == JobStatus.queued

assert await arq_redis.enqueue_job('foobar', _job_id='job_id') is None

await worker(functions=[foobar]).run_check()
assert await j1.status() == JobStatus.complete

assert await arq_redis.enqueue_job('foobar', _job_id='job_id') is None