In-memory FIFO queue for concurrent task execution. Used to execute tasks concurrently with optional control (via semaphore) over the max number of tasks running at the same time.
Features:
- Queue processing summary logging
- Introspection of failed, retried, and succeeded tasks
- Task retries (optional)
- Task execution timeout (optional)
- Queue processing with semaphore (optional)
- Batch size control (optional)
TOC:
async-task-queue requires Python 3.6 or above.
pip install async-task-queue
import logging
from async_task_queue import AsyncTask, AsyncTaskQueue
# Initialize a logger
logger = logging.getLogger("foo")
# Initialize an AsyncTaskQueue where:
# - At most 5 tasks are running concurrently
# - Number of tasks executing concurrently should be limited by a semaphore
# - Failed tasks should be retried (default behavior)
# - Executing the tasks queued should timeout and be cancelled after 5 minutes
task_queue = AsyncTaskQueue(
logger,
use_semaphore=True,
batch_size=5,
execution_timeout=300
)
# Add async tasks to the queue
task_queue.enqueue(
[
AsyncTask(some_coroutine, *args, **kwargs) for args, kwargs in some_args_kwargs
]
)
# Start processing the queue
await task_queue.execute()
To develop async-task-queue, install dependencies and enable the pre-commit hook:
pip install pre-commit tox
pre-commit install
To run tests:
tox