Skip to content

Commit

Permalink
(#25) Add a Django sample app
Browse files Browse the repository at this point in the history
* Create sample celery task usage
* Include compare celery vs aiotaskq sample code in tests
* Add ref to sample code in README, add sample code README
* Include populate_db step in the tests

Signed-off-by: Imran Ariffin <ariffin.imran@gmail.com>
  • Loading branch information
imranariffin committed Oct 27, 2024
1 parent a6b0149 commit 1b90962
Show file tree
Hide file tree
Showing 32 changed files with 779 additions and 10 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ target/
profile_default/
ipython_config.py

# aiotaskq test outputs
src/tests/apps/sample_app_django/out-*.json

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
Expand Down
32 changes: 32 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [

{
"name": "Python: Attach",
"type": "python",
Expand Down Expand Up @@ -54,6 +55,37 @@
"tests.apps.simple_app"
],
"console": "integratedTerminal",
},
{
"name": "Celery Worker (Django Sample App)",
"type": "python",
"request": "launch",
"module": "celery",
"args": [
"-A",
"sample_app_django",
"worker",
"--concurrency",
"4",
],
"console": "integratedTerminal",
"cwd": "${workspaceFolder}/src/tests/apps/sample_app_django/",
"justMyCode": false
},
{
"name": "AioTaskQ Worker (Django Sample App)",
"type": "python",
"request": "launch",
"module": "aiotaskq",
"args": [
"worker",
"sample_app_django",
"--concurrency",
"4",
],
"console": "integratedTerminal",
"cwd": "${workspaceFolder}/src/tests/apps/sample_app_django/",
"justMyCode": false
}
]
}
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Plus, it is also fully-typed for better productivity and correctness.

Give it a try and let us know if you like it. For questions or feedback feel to file issues on this repository.

## Sampe codes

1. [Simple Django App](/src/tests/apps/sample_app_django/README.md)

## Example Usage
Install aiotaskq
```bash
Expand Down
9 changes: 9 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@ services:
command: redis-cli -h redis
depends_on:
- redis

db:
image: "postgres"
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=sample_app_django
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies = [
"typer >= 0.4.0, < 0.5.0",
]
name = "aiotaskq"
version = "0.0.13"
version = "0.0.14"
readme = "README.md"
description = "A simple asynchronous task queue"
authors = [
Expand All @@ -31,12 +31,13 @@ license = { file = "LICENSE" }
dev = [
"black >= 22.2.0, < 23.0.0",
"coverage >= 6.4.0, < 6.5.0",
"django-stubs >= 5.1.0, < 5.2.0",
"mypy >= 0.931, < 1.0",
"mypy-extensions >= 0.4.0, < 0.5.0",
"pytest-asyncio >= 0.19.0, < 0.20.0",
"pylint >= 2.14.0, < 2.15.0",
"pytest >= 7.1.0, < 7.2.0",
"typing_extensions >= 4.1.1, < 4.2.0",
"typing_extensions >= 4.11.0, < 4.12.0",
]

[project.urls]
Expand Down
7 changes: 3 additions & 4 deletions src/aiotaskq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ async def main():

import tomlkit

from .app import App
from .task import task

with Path("pyproject.toml").open("r", encoding="utf-8") as f:
toml_dict = tomlkit.loads(f.read())
__version__ = toml_dict["project"]["version"]

__all__ = ["__version__", "task"]
__version__ = "0.0.14"
__all__ = ["__version__", "task", "App"]
48 changes: 48 additions & 0 deletions src/aiotaskq/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""
Define the aiotaskq application instance.
This app instance provides access to all tasks defined within the application.
"""

from importlib import import_module
from typing import TYPE_CHECKING, Any


if TYPE_CHECKING:
from .task import Task


class App:
"""Define the aiotaskq application instance."""

_task_registry: dict[str, "Task"] = {}

def __getattribute__(self, name: str, /) -> Any:
"""Get access to all task instances defined within the application."""

task_registry = object.__getattribute__(self, "_task_registry")
if name in task_registry:
return task_registry[name]
return object.__getattribute__(self, name)

def autodiscover_tasks(self, tasks_module_name: str = "tasks"):
"""
Search for all tasks defined within the application and imported them.
The tasks are expected to be defined in files named as "tasks.py".
"""

import django # pylint: disable=import-outside-toplevel
from django.apps import apps # pylint: disable=import-outside-toplevel

django.setup()

module_names: list[str] = [config.name for config in apps.get_app_configs()]
for module_name in module_names:
_ = import_module(module_name)
try:
_ = import_module(f"{module_name}.{tasks_module_name}")
except ModuleNotFoundError:
pass
else:
pass
2 changes: 1 addition & 1 deletion src/aiotaskq/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def serialization_type() -> SerializationType:
@staticmethod
def log_level() -> int:
"""Return the log level as provided via env var LOG_LEVEL."""
level: int = int(environ.get("AIOTASKQ_LOG_LEVEL", logging.DEBUG))
level: int = getattr(logging, environ.get("AIOTASKQ_LOG_LEVEL", "DEBUG"))
return level

@staticmethod
Expand Down
34 changes: 34 additions & 0 deletions src/aiotaskq/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Define util functions for use within the whole library."""

import os
import sys
from contextlib import contextmanager
from importlib import import_module


def import_from_cwd(import_path):
"""Import module as if the caller is located in current working directory (cwd)."""
with _cwd_in_path():
return import_module(import_path)


# Private region


@contextmanager
def _cwd_in_path():
"""Context adding the current working directory to sys.path."""
cwd = os.getcwd()
if cwd in sys.path:
yield
else:
sys.path.insert(0, cwd)
try:
yield cwd
finally:
try:
sys.path.remove(cwd)
except ValueError: # pragma: no cover
pass

# Private region ends
6 changes: 3 additions & 3 deletions src/aiotaskq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from abc import ABC, abstractmethod
import asyncio
from functools import cached_property
import importlib
import inspect
import logging
import multiprocessing
Expand All @@ -22,6 +21,7 @@
from .pubsub import PubSub
from .serde import Serialization
from .task import AsyncResult, Task
from .utils import import_from_cwd

logger = logging.getLogger(__name__)

Expand All @@ -40,7 +40,7 @@ class BaseWorker(ABC):
concurrency_manager: IConcurrencyManager

def __init__(self, app_import_path: str):
self.app = importlib.import_module(app_import_path)
self.app = import_from_cwd(app_import_path)

def run_forever(self) -> None:
"""
Expand Down Expand Up @@ -308,7 +308,7 @@ async def _execute_task_and_publish(
def validate_input(app_import_path: str) -> t.Optional[str]:
"""Validate all worker cli inputs and return an error string if any."""
try:
importlib.import_module(app_import_path)
import_from_cwd(app_import_path)
except ModuleNotFoundError:
return (
f"Error at argument `--app_import_path {app_import_path}`:"
Expand Down
Empty file.
6 changes: 6 additions & 0 deletions src/tests/apps/sample_app_django/api/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class ApiConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "api"
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import logging
import random
import string

from django.core.management import BaseCommand

from ...models import User, Order
from ...utils import chunker

logger = logging.getLogger(__name__)


class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument("--n-users", type=int, default=1000)
parser.add_argument("--n-orders-per-user", type=int, default=1000)
parser.add_argument("--from-scratch", action="store_true")

def handle(self, *args, **options):
if options["from_scratch"]:
_delete_all()
_populate_users(n=options["n_users"])
_populate_orders(n_per_user=options["n_orders_per_user"])


def _delete_all():
r = Order.objects.all().delete()
logger.debug("Deleted %s: %s", Order.__name__, r)
r = User.objects.all().delete()
logger.debug("Deleted %s: %s", User.__name__, r)


def _populate_users(n: int) -> None:
users: list[User] = []
username_length: int = User._meta.get_field("username").max_length
random_username_generator = RandomStringGenerator(length=username_length)
for _ in range(n):
username_random: str = random_username_generator.generate()
user = User(username=username_random)
users.append(user)

for user_chunk in chunker(users, size=10_000):
User.objects.bulk_create(user_chunk)
print(f"Bulk-created {len(user_chunk)} random User(s)")


def _populate_orders(n_per_user: int) -> None:
user_ids: list[int] = list(User.objects.all().values_list("id", flat=True))
order_name_length: int = Order._meta.get_field("name").max_length
random_order_name_id_generator = RandomStringGenerator(length=order_name_length)

for user_ids_chunk in chunker(user_ids, size=1000):
orders: list[Order] = []

for user_id in user_ids_chunk:
for _ in range(n_per_user):
order = Order(
user_id=user_id,
name=random_order_name_id_generator.generate(),
price=random.uniform(0.0, 5_000.00),
)
orders.append(order)

Order.objects.bulk_create(orders)
print(f"Created {len(orders)} random Order(s) for {len(user_ids_chunk)} users")


class MaxRandomStringTryReached(Exception):
pass


class RandomStringGenerator:
max_tries = 10

def __init__(self, length: int):
self.length = length
self.existing: set[str] = set()

def generate(self) -> str:
tries = 0
while True:
s = ''.join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(self.length)
)
if s not in self.existing:
self.existing.add(s)
return s
tries += 1
if tries > self.max_tries:
raise MaxRandomStringTryReached
Loading

0 comments on commit 1b90962

Please sign in to comment.