Skip to content

Commit

Permalink
feat: add job_id to JobDef, closing #376
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasKs committed Dec 15, 2022
1 parent bfa8e68 commit 405e4fd
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 9 deletions.
1 change: 1 addition & 0 deletions arq/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ async def _get_job_def(self, job_id: bytes, score: int) -> JobDef:
assert v is not None, f'job "{key}" not found'
jd = deserialize_job(v, deserializer=self.job_deserializer)
jd.score = score
jd.job_id = job_id.decode()
return jd

async def queued_jobs(self, *, queue_name: Optional[str] = None) -> List[JobDef]:
Expand Down
6 changes: 5 additions & 1 deletion arq/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class JobDef:
job_try: int
enqueue_time: datetime
score: Optional[int]
job_id: Optional[str]

def __post_init__(self) -> None:
if isinstance(self.score, float):
Expand All @@ -60,7 +61,6 @@ class JobResult(JobDef):
start_time: datetime
finish_time: datetime
queue_name: str
job_id: Optional[str] = None


class Job:
Expand Down Expand Up @@ -238,6 +238,7 @@ def serialize_result(
finished_ms: int,
ref: str,
queue_name: str,
job_id: str,
*,
serializer: Optional[Serializer] = None,
) -> Optional[bytes]:
Expand All @@ -252,6 +253,7 @@ def serialize_result(
'st': start_ms,
'ft': finished_ms,
'q': queue_name,
'id': job_id,
}
if serializer is None:
serializer = pickle.dumps
Expand Down Expand Up @@ -281,6 +283,7 @@ def deserialize_job(r: bytes, *, deserializer: Optional[Deserializer] = None) ->
job_try=d['t'],
enqueue_time=ms_to_datetime(d['et']),
score=None,
job_id=None,
)
except Exception as e:
raise DeserializationError('unable to deserialize job') from e
Expand Down Expand Up @@ -315,6 +318,7 @@ def deserialize_result(r: bytes, *, deserializer: Optional[Deserializer] = None)
start_time=ms_to_datetime(d['st']),
finish_time=ms_to_datetime(d['ft']),
queue_name=d.get('q', '<unknown>'),
job_id=d.get('id', '<unknown>'),
)
except Exception as e:
raise DeserializationError('unable to deserialize job result') from e
3 changes: 3 additions & 0 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ async def job_failed(exc: BaseException) -> None:
ref=f'{job_id}:{function_name}',
serializer=self.job_serializer,
queue_name=self.queue_name,
job_id=job_id,
)
await asyncio.shield(self.finish_failed_job(job_id, result_data_))

Expand Down Expand Up @@ -539,6 +540,7 @@ async def job_failed(exc: BaseException) -> None:
timestamp_ms(),
ref,
self.queue_name,
job_id=job_id,
serializer=self.job_serializer,
)
return await asyncio.shield(self.finish_failed_job(job_id, result_data))
Expand Down Expand Up @@ -632,6 +634,7 @@ async def job_failed(exc: BaseException) -> None:
finished_ms,
ref,
self.queue_name,
job_id=job_id,
serializer=self.job_serializer,
)

Expand Down
24 changes: 19 additions & 5 deletions tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pickle

import pytest
from dirty_equals import IsNow
from dirty_equals import IsNow, IsStr

from arq import Worker, func
from arq.connections import ArqRedis, RedisSettings, create_pool
Expand Down Expand Up @@ -89,6 +89,7 @@ async def foobar(ctx, *args, **kwargs):
finish_time=IsNow(tz='utc'),
score=None,
queue_name=expected_queue_name,
job_id=IsStr(),
)
results = await arq_redis.all_job_results()
assert results == [
Expand Down Expand Up @@ -139,9 +140,9 @@ class Foobar:
def __getstate__(self):
raise TypeError("this doesn't pickle")

r1 = serialize_result('foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue')
r1 = serialize_result('foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', 'job_1')
assert isinstance(r1, bytes)
r2 = serialize_result('foobar', (Foobar(),), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue')
r2 = serialize_result('foobar', (Foobar(),), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', 'job_1')
assert r2 is None


Expand All @@ -154,7 +155,19 @@ def custom_serializer(x):
return b'0123456789'

r1 = serialize_result(
'foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', serializer=custom_serializer
'foobar',
(1,),
{},
1,
123,
True,
Foobar(),
123,
123,
'testing',
'test-queue',
'job_1',
serializer=custom_serializer,
)
assert r1 == b'0123456789'
r2 = serialize_result(
Expand All @@ -169,6 +182,7 @@ def custom_serializer(x):
123,
'testing',
'test-queue',
'job_1',
serializer=custom_serializer,
)
assert r2 == b'0123456789'
Expand Down Expand Up @@ -213,7 +227,7 @@ async def test_get_job_result(arq_redis: ArqRedis):

async def test_result_pole_delay_dep(arq_redis: ArqRedis):
j = Job('foobar', arq_redis)
r = serialize_result('foobar', (1,), {}, 1, 123, True, 42, 123, 123, 'testing', 'test-queue')
r = serialize_result('foobar', (1,), {}, 1, 123, True, 42, 123, 123, 'testing', 'test-queue', 'job_1')
await arq_redis.set(result_key_prefix + j.job_id, r)
with pytest.warns(
DeprecationWarning, match='"pole_delay" is deprecated, use the correct spelling "poll_delay" instead'
Expand Down
9 changes: 6 additions & 3 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ async def foobar(ctx):


async def test_get_jobs(arq_redis: ArqRedis):
await arq_redis.enqueue_job('foobar', a=1, b=2, c=3)
await arq_redis.enqueue_job('foobar', a=1, b=2, c=3, _job_id='1')
await asyncio.sleep(0.01)
await arq_redis.enqueue_job('second', 4, b=5, c=6)
await arq_redis.enqueue_job('second', 4, b=5, c=6, _job_id='2')
await asyncio.sleep(0.01)
await arq_redis.enqueue_job('third', 7, b=8)
await arq_redis.enqueue_job('third', 7, b=8, _job_id='3')
jobs = await arq_redis.queued_jobs()
assert [dataclasses.asdict(j) for j in jobs] == [
{
Expand All @@ -252,6 +252,7 @@ async def test_get_jobs(arq_redis: ArqRedis):
'job_try': None,
'enqueue_time': IsNow(tz='utc'),
'score': IsInt(),
'job_id': '1',
},
{
'function': 'second',
Expand All @@ -260,6 +261,7 @@ async def test_get_jobs(arq_redis: ArqRedis):
'job_try': None,
'enqueue_time': IsNow(tz='utc'),
'score': IsInt(),
'job_id': '2',
},
{
'function': 'third',
Expand All @@ -268,6 +270,7 @@ async def test_get_jobs(arq_redis: ArqRedis):
'job_try': None,
'enqueue_time': IsNow(tz='utc'),
'score': IsInt(),
'job_id': '3',
},
]
assert jobs[0].score < jobs[1].score < jobs[2].score
Expand Down

0 comments on commit 405e4fd

Please sign in to comment.