Skip to content

Commit

Permalink
fix(job): make sure json.dumps return JSON compliant JSON [python] (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
saulane authored Aug 2, 2024
1 parent 00c3037 commit 4441711
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 5 deletions.
2 changes: 1 addition & 1 deletion python/bullmq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async def saveStacktrace(self, pipe, err:str):
self.stacktrace = self.stacktrace[-(stackTraceLimit-1):stackTraceLimit]

keys, args = self.scripts.saveStacktraceArgs(
self.id, json.dumps(self.stacktrace, separators=(',', ':')), err)
self.id, json.dumps(self.stacktrace, separators=(',', ':'), allow_nan=False), err)

await self.scripts.commands["saveStacktrace"](keys=keys, args=args, client=pipe)

Expand Down
8 changes: 4 additions & 4 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def addJobArgs(self, job: Job, waiting_children_key: str|None):
# ARGV[1] msgpacked arguments array
# [9] repeat job key

jsonData = json.dumps(job.data, separators=(',', ':'))
jsonData = json.dumps(job.data, separators=(',', ':'), allow_nan=False)
packedOpts = msgpack.packb(job.opts)

parent = job.parent
Expand Down Expand Up @@ -378,7 +378,7 @@ async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False)

async def updateData(self, job_id: str, data):
keys = [self.toKey(job_id)]
data_json = json.dumps(data, separators=(',', ':'))
data_json = json.dumps(data, separators=(',', ':'), allow_nan=False)
args = [data_json]

result = await self.commands["updateData"](keys=keys, args=args)
Expand Down Expand Up @@ -486,7 +486,7 @@ def moveToFailed(self, job: Job, failedReason: str, removeOnFailed, token: str,

async def updateProgress(self, job_id: str, progress):
keys = [self.toKey(job_id), self.keys['events'], self.keys['meta']]
progress_json = json.dumps(progress, separators=(',', ':'))
progress_json = json.dumps(progress, separators=(',', ':'), allow_nan=False)
args = [job_id, progress_json]
result = await self.commands["updateProgress"](keys=keys, args=args)

Expand All @@ -497,7 +497,7 @@ async def updateProgress(self, job_id: str, progress):

def moveToFinishedArgs(self, job: Job, val: Any, propVal: str, shouldRemove, target, token: str,
opts: dict, fetchNext=True) -> list[Any] | None:
transformed_value = json.dumps(val, separators=(',', ':'))
transformed_value = json.dumps(val, separators=(',', ':'), allow_nan=False)
timestamp = round(time.time() * 1000)
metricsKey = self.toKey('metrics:' + target)

Expand Down
8 changes: 8 additions & 0 deletions python/tests/job_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ async def test_update_job_data(self):

await queue.close()

async def test_job_data_json_compliant(self):
queue = Queue(queueName)
job = await queue.add("test", {"foo": "bar"}, {})
with self.assertRaises(ValueError):
await job.updateData({"baz": float('nan')})

await queue.close()

async def test_update_job_data_when_is_removed(self):
queue = Queue(queueName)
job = await queue.add("test", {"foo": "bar"}, {})
Expand Down
30 changes: 30 additions & 0 deletions python/tests/worker_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,36 @@ async def process(job: Job, token: str):
await worker.close(force=True)
await queue.close()

async def test_process_job_fail_with_nan_as_return_value(self):
queue = Queue(queueName)
data = {"foo": "bar"}
job = await queue.add("test-job", data, {"removeOnComplete": False})

failedReason = "Out of range float values are not JSON compliant"

async def process(job: Job, token: str):
print("Processing job", job)
return float('nan')

worker = Worker(queueName, process)

processing = Future()
worker.on("failed", lambda job, result: processing.set_result(None))
await processing
failedJob = await Job.fromId(queue, job.id)


self.assertEqual(failedJob.id, job.id)
self.assertEqual(failedJob.attemptsMade, 1)
self.assertEqual(failedJob.data, data)
self.assertEqual(failedJob.failedReason, f'"{failedReason}"')
self.assertEqual(len(failedJob.stacktrace), 1)
self.assertEqual(failedJob.returnvalue, None)
self.assertNotEqual(failedJob.finishedOn, None)

await worker.close(force=True)
await queue.close()

async def test_process_jobs_fail(self):
queue = Queue(queueName)
data = {"foo": "bar"}
Expand Down

0 comments on commit 4441711

Please sign in to comment.