-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjobs.py
80 lines (64 loc) · 2.81 KB
/
jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import redis.asyncio
import json
import time
from typing import Optional, TypedDict
class Job(TypedDict):
"""describes a job dictionary"""
name: str
"""the name of the job which corresponds to the import path in the jobs module
e.g., 'runners.charge' corresponds to the excecute function in jobs/charge.py
relative to the jobs root directory
"""
kwargs: dict
"""the keyword arguments to pass to the job; must be json serializable
the jobs will automatically be sent the integrations and graceful death handler
"""
queued_at: float
"""the time when the job was enqueued"""
class Jobs:
"""interface for queueing and retreiving jobs
acts as an asynchronous context manager"""
def __init__(self, conn: redis.asyncio.Redis) -> None:
"""initializes a new interface for queueing and retreiving jobs
Args:
conn (redis.asyncio.Redis): the redis connection to use
"""
self.conn: redis.asyncio.Redis = conn
"""the redis connection containing the jobs queue"""
self.queue_key: bytes = b"jobs:hot"
"""the key for the list in redis"""
async def __aenter__(self) -> "Jobs":
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
pass
async def enqueue(self, name: str, **kwargs) -> None:
"""queues the job with the given name and key word arguments
the job is run as soon as possible, and is not retried regardless of success.
Args:
name (str): the name of the job which corresponds to the import path in the jobs module
e.g., 'runners.charge' corresponds to the excecute function in jobs/charge.py
relative to the jobs root directory
kwargs (dict): the keyword arguments to pass to the job; must be json serializable
the jobs will automatically be sent the integrations and graceful death handler
"""
job = {"name": name, "kwargs": kwargs, "queued_at": time.time()}
job_serd = json.dumps(job)
await self.conn.rpush(self.queue_key, job_serd.encode("utf-8"))
async def retrieve(self, timeout: float) -> Optional[Job]:
"""blocking retrieve of the oldest job in the queue, if there is one
Args:
timeout (float): maximum time in seconds to wait for a job to be enqueued
Returns:
(Job, None): The oldest job, if there is one
"""
response: Optional[tuple] = await self.conn.blpop(
self.queue_key, timeout=timeout
)
if response is None:
return None
job_serd_and_encoded: bytes = response[1]
print(repr(job_serd_and_encoded))
job_serd = job_serd_and_encoded.decode("utf-8")
print(repr(job_serd))
job = json.loads(job_serd)
return job