Skip to content

Commit

Permalink
Added queue_name attribute to JobResult (#198)
Browse files Browse the repository at this point in the history
* Added queue_name attribute to job result

* cope with old jobs where queue_name "q" is not available

Co-authored-by: Samuel Colvin <s@muelcolvin.com>
  • Loading branch information
ccharlesgb and samuelcolvin authored Apr 21, 2021
1 parent 0d43250 commit d594c9c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 4 deletions.
4 changes: 4 additions & 0 deletions arq/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class JobResult(JobDef):
result: Any
start_time: datetime
finish_time: datetime
queue_name: str
job_id: Optional[str] = None


Expand Down Expand Up @@ -172,6 +173,7 @@ def serialize_result(
start_ms: int,
finished_ms: int,
ref: str,
queue_name: str,
*,
serializer: Optional[Serializer] = None,
) -> Optional[bytes]:
Expand All @@ -185,6 +187,7 @@ def serialize_result(
'r': result,
'st': start_ms,
'ft': finished_ms,
'q': queue_name,
}
if serializer is None:
serializer = pickle.dumps
Expand Down Expand Up @@ -247,6 +250,7 @@ def deserialize_result(r: bytes, *, deserializer: Optional[Deserializer] = None)
result=d['r'],
start_time=ms_to_datetime(d['st']),
finish_time=ms_to_datetime(d['ft']),
queue_name=d.get('q', '<unknown>'),
)
except Exception as e:
raise DeserializationError('unable to deserialize job result') from e
3 changes: 3 additions & 0 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ async def job_failed(exc: Exception) -> None:
finished_ms=timestamp_ms(),
ref=f'{job_id}:{function_name}',
serializer=self.job_serializer,
queue_name=self.queue_name,
)
await asyncio.shield(self.abort_job(job_id, result_data_))

Expand Down Expand Up @@ -431,6 +432,7 @@ async def job_failed(exc: Exception) -> None:
start_ms,
timestamp_ms(),
ref,
self.queue_name,
serializer=self.job_serializer,
)
return await asyncio.shield(self.abort_job(job_id, result_data))
Expand Down Expand Up @@ -506,6 +508,7 @@ async def job_failed(exc: Exception) -> None:
start_ms,
finished_ms,
ref,
self.queue_name,
serializer=self.job_serializer,
)

Expand Down
24 changes: 20 additions & 4 deletions tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async def foobar(ctx, *args, **kwargs):
assert r == 42
assert JobStatus.complete == await j.status()
info = await j.info()
expected_queue_name = queue_name or arq_redis.default_queue_name
assert info == JobResult(
job_try=1,
function='foobar',
Expand All @@ -54,6 +55,7 @@ async def foobar(ctx, *args, **kwargs):
start_time=CloseToNow(),
finish_time=CloseToNow(),
score=None,
queue_name=expected_queue_name,
)
results = await arq_redis.all_job_results()
assert results == [
Expand All @@ -68,6 +70,7 @@ async def foobar(ctx, *args, **kwargs):
start_time=CloseToNow(),
finish_time=CloseToNow(),
score=None,
queue_name=expected_queue_name,
job_id=j.job_id,
)
]
Expand All @@ -92,9 +95,9 @@ class Foobar:
def __getstate__(self):
raise TypeError("this doesn't pickle")

r1 = serialize_result('foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing')
r1 = serialize_result('foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue')
assert isinstance(r1, bytes)
r2 = serialize_result('foobar', (Foobar(),), {}, 1, 123, True, Foobar(), 123, 123, 'testing')
r2 = serialize_result('foobar', (Foobar(),), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue')
assert r2 is None


Expand All @@ -106,10 +109,23 @@ def __getstate__(self):
def custom_serializer(x):
return b'0123456789'

r1 = serialize_result('foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', serializer=custom_serializer)
r1 = serialize_result(
'foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', serializer=custom_serializer
)
assert r1 == b'0123456789'
r2 = serialize_result(
'foobar', (Foobar(),), {}, 1, 123, True, Foobar(), 123, 123, 'testing', serializer=custom_serializer
'foobar',
(Foobar(),),
{},
1,
123,
True,
Foobar(),
123,
123,
'testing',
'test-queue',
serializer=custom_serializer,
)
assert r2 == b'0123456789'

Expand Down

0 comments on commit d594c9c

Please sign in to comment.