A work queue, on top of a redis database, with implementations in Python, Rust, Go, Node.js (TypeScript) and Dotnet (C#).
This is the Python implementations. For an overview of how the work queue works, it's limitations, and the general concepts and implementations in other languages, please read the redis-work-queue readme.
from redis import Redis
from redis_work_queue import KeyPrefix, WorkQueue
db = Redis(host='your-redis-server')
work_queue = WorkQueue(KeyPrefix("example_work_queue"));
from redis_work_queue import Item
# Create an item from bytes
bytes_item = Item(b"[1,2,3]")
# Create an item from a string
string_item = Item('[1,2,3]');
# Create an item by json serializing an object
json_item = Item.from_json_data([1, 2, 3])
# Retreive an item's data, as bytes:
assert bytes_item.data() == b"[1,2,3]"
assert bytes_item.data() == string_item.data()
assert bytes_item.data() == json_item.data()
# Parse an item's data as JSON:
assert bytes_item.data_json() == [1, 2, 3]
work_queue.add_item(db, item)
If you know that items have unique IDs, which aren't the same as any already in the queue, you can instead use:
work_queue.add_unique_item(db, item)
Please read the documentation on leasing and completing items.
from redis_work_queue import Item
while True:
# Wait for a job with no timeout and a lease time of 5 seconds.
# Use work_queue.lease(db, 5, timeout=10) to timeout and return `None` after 10 seconds.
job: Item | None = work_queue.lease(db, 5)
assert job is not None
do_some_work(job)
work_queue.complete(db, job)
Please read the documentation on handling errors.
from redis_work_queue import Item
while True:
# Wait for a job with no timeout and a lease time of 5 seconds.
job: Item = work_queue.lease(db, 5)
try:
do_some_work(job)
except Exception as err:
if should_retry(err):
# Drop a job that should be retried - it will be returned to the work queue after
# the (5 second) lease expires.
pass
else:
# Errors that shouldn't cause a retry should mark the job as complete so it isn't
# tried again.
log_error(err)
work_queue.complete(db, &job)
continue
# Mark successful jobs as complete
work_queue.complete(db, job)