diff --git a/.vscode/launch.json b/.vscode/launch.json index f09c113..ce6a481 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,47 +1,58 @@ -{ - // Use IntelliSense to learn about possible attributes. - // Hover to view descriptions of existing attributes. - // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - { - "name": "Python: Current File", - "type": "python", - "request": "launch", - "program": "${file}", - "console": "integratedTerminal", - "justMyCode": false - }, - { - "name": "FastAPI: Example", - "type": "python", - "request": "launch", - "module": "uvicorn", - "args": [ - "example.app.main:app", - "--reload", - ], - "justMyCode": false - }, - { - "name": "FastAPI: Test", - "type": "python", - "request": "launch", - "module": "uvicorn", - "args": [ - "test:app", - "--reload", - ], - "justMyCode": false - }, - { - "name": "Python: Debug Tests", - "type": "python", - "request": "launch", - "program": "${file}", - "purpose": ["debug-test"], - "console": "integratedTerminal", - "justMyCode": false - } - ] +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "justMyCode": false + }, + { + "name": "FastAPI: Example", + "type": "python", + "request": "launch", + "module": "uvicorn", + "args": [ + "example.app.main:app", + "--reload", + ], + "justMyCode": false + }, + { + "name": "FastAPI: Example2", + "type": "python", + "request": "launch", + "module": "uvicorn", + "args": [ + "guard:app", + "--reload", + ], + "justMyCode": false + }, + { + "name": "FastAPI: Test", + "type": "python", + "request": "launch", + "module": "uvicorn", + "args": [ + "test:app", + "--reload", + ], + "justMyCode": false + }, + { + "name": "Python: Debug Tests", + "type": "python", + "request": "launch", + "program": "${file}", + "purpose": ["debug-test"], + "console": "integratedTerminal", + "justMyCode": false + } + ] } \ No newline at end of file diff --git a/pest/__init__.py b/pest/__init__.py index ae9b390..56def72 100644 --- a/pest/__init__.py +++ b/pest/__init__.py @@ -1,48 +1,57 @@ -from .decorators.controller import api, controller, ctrl, router, rtr -from .decorators.handler import delete, get, head, options, patch, post, put, trace -from .decorators.module import dom, domain, mod, module -from .factory import Pest -from .metadata.types.injectable_meta import ( - ClassProvider, - ExistingProvider, - FactoryProvider, - ProviderBase, - Scope, - SingletonProvider, - ValueProvider, -) -from .utils.decorators import meta - -__all__ = [ - 'Pest', - # decorators - module - 'module', - 'mod', - 'domain', - 'dom', - # decorators - handler - 'get', - 'post', - 'put', - 'delete', - 'patch', - 'options', - 'head', - 'trace', - # decorators - controller - 'controller', - 'ctrl', - 'router', - 'rtr', - 'api', - # decorators - utils - 'meta', - # meta - providers - 'ProviderBase', - 'ClassProvider', - 'ValueProvider', - 'SingletonProvider', - 'FactoryProvider', - 'ExistingProvider', - 'Scope', -] +from .decorators.controller import api, controller, ctrl, router, rtr +from .decorators.guard import Guard, GuardCb, GuardExtra, use_guard +from .decorators.handler import delete, get, head, options, patch, post, put, trace +from .decorators.module import dom, domain, mod, module +from .factory import Pest +from .metadata.types.injectable_meta import ( + ClassProvider, + ExistingProvider, + FactoryProvider, + ProviderBase, + Scope, + SingletonProvider, + ValueProvider, +) +from .utils.decorators import meta + +guard = use_guard + +__all__ = [ + 'Pest', + # decorators - module + 'module', + 'mod', + 'domain', + 'dom', + # decorators - handler + 'get', + 'post', + 'put', + 'delete', + 'patch', + 'options', + 'head', + 'trace', + # decorators - controller + 'controller', + 'ctrl', + 'router', + 'rtr', + 'api', + # decorators - utils + 'meta', + # decorators - guard + 'Guard', + 'GuardCb', + 'GuardExtra', + 'use_guard', + 'guard', + # meta - providers + 'ProviderBase', + 'ClassProvider', + 'ValueProvider', + 'SingletonProvider', + 'FactoryProvider', + 'ExistingProvider', + 'Scope', +] diff --git a/pest/decorators/guard.py b/pest/decorators/guard.py new file mode 100644 index 0000000..a15fc15 --- /dev/null +++ b/pest/decorators/guard.py @@ -0,0 +1,146 @@ +from functools import wraps +from inspect import Parameter, getmembers, iscoroutinefunction, isfunction, signature +from typing import Any, Callable, Dict, List, Optional, Protocol, Tuple, Type, get_args + +from fastapi import Request + +from ..core.handler import HandlerFn +from ..exceptions.http.http import ForbiddenException +from ..metadata.meta import get_meta, get_meta_value +from ..metadata.types._meta import PestType + +GuardCb = Callable[[Dict[str, Any]], None] + + +class Guard(Protocol): + """๐Ÿ€ โ‡ base guard protocol""" + + def can_activate( + self, request: Request, *, context: Dict[str, Any], set_result: GuardCb + ) -> bool: + """๐Ÿ€ โ‡ determines if the request can be activated by the current request""" + ... + + +def use_guard(guard: Type[Guard]) -> Callable: + """๐Ÿ€ โ‡ decorator to apply a guard either to a single method or all methods in a class""" + + def decorator(target: Callable) -> Callable: + if isinstance(target, type): # If it's a class, apply to all methods + return _apply_guard_to_class(target, guard) + else: + return _apply_guard_to_method(target, guard) + + return decorator + + +class GuardExtra(Dict[str, Any]): + pass + + +def _extract_params(params: List[Parameter]) -> Tuple[Optional[Parameter], List[Parameter]]: + """ + extracts the request and all parameters annotated with "guard_extra" from a list of parameters + """ + request_param = None + extra_params = [] + + for param in params: + if param.annotation == Request: + request_param = param + elif param.annotation is GuardExtra: + extra_params.append(param) + else: + anns = get_args(param.annotation) + if len(anns) == 0: + continue + + typing, metas = anns[0], anns[1:] + if typing == GuardExtra or GuardExtra in metas: + extra_params.append(param) + + return request_param, extra_params + + +# applies the guard to a single method +def _apply_guard_to_method(func: Callable, guard: Type[Guard]) -> Callable: + sig = signature(func) + params: List[Parameter] = list(sig.parameters.values()) + + # check if there's any parameter annotated with type Request + request_parameter, extras = _extract_params(params) + request_was_in_original_sig = request_parameter is not None + + if request_parameter is None: + # add the parameter to the signature, annotated with type Request + request_param_name = '__request__' + params = [ + *params, + Parameter(request_param_name, Parameter.POSITIONAL_OR_KEYWORD, annotation=Request), + ] + else: + request_param_name = request_parameter.name + + # remove the extras the `params` list + params = [param for param in params if param not in extras] + + @wraps(func) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + meta = get_meta(func) + + # try to extract the request object from kwargs or args + request = kwargs.get(request_param_name) + if request is None: + for arg in args: + if isinstance(arg, Request): + request = arg + break + + if not request: + raise ValueError('Request object not found in args or kwargs') + + extra_result: Dict[str, Any] = {} + + def set_result(result: Dict[str, Any]) -> None: + nonlocal extra_result + extra_result = result + + # apply the guard + guard_instance = guard() + if not guard_instance.can_activate(request, context=meta, set_result=set_result): + raise ForbiddenException('Not authorized') + + # if the request was not in the original signature, remove it from args/kwargs + if not request_was_in_original_sig: + kwargs.pop(request_param_name, None) + args = tuple([arg for arg in args if not isinstance(arg, Request)]) + + # add the extra result to the signature + for param in extras: + if param.annotation is GuardExtra: + kwargs[param.name] = extra_result + else: + kwargs[param.name] = extra_result.get(param.name, None) + + return await func(*args, **kwargs) if iscoroutinefunction(func) else func(*args, **kwargs) + + # update the signature to include the new 'request' parameter + setattr(wrapper, '__signature__', sig.replace(parameters=params)) + return wrapper + + +# applies the guard to all methods in a class +def _apply_guard_to_class(cls: type, guard: Type[Guard]) -> type: + members = getmembers(cls, lambda m: isfunction(m)) + handlers: List[HandlerFn] = [] + + for _, method in members: + meta_type = get_meta_value(method, key='meta_type', type=PestType, default=None) + if meta_type == PestType.HANDLER: + handlers.append(method) + + for handler in handlers: + replacement = _apply_guard_to_method(handler, guard) + setattr(cls, handler.__name__, replacement) + + return cls diff --git a/pest/utils/decorators.py b/pest/utils/decorators.py index 326dbc4..f588a06 100644 --- a/pest/utils/decorators.py +++ b/pest/utils/decorators.py @@ -1,22 +1,25 @@ -from dataclasses import asdict -from typing import Any, Callable, Mapping, Type, TypeVar, Union - -from ..metadata.meta import inject_metadata -from .protocols import DataclassInstance - -Cls = TypeVar('Cls', bound=Type[Any]) - - -def meta(meta: Union[Mapping[str, Any], DataclassInstance]) -> Callable[[Cls], Cls]: - """๐Ÿ€ ยป injects metadata into a class""" - - def wrapper(cls: Cls) -> Cls: - if isinstance(meta, Mapping): - metadata = meta - else: - metadata = asdict(meta) - - inject_metadata(cls, metadata) - return cls - - return wrapper +from dataclasses import asdict +from typing import Any, Callable, Mapping, Type, TypeVar, Union + +from ..metadata.meta import inject_metadata +from .protocols import DataclassInstance + +Cls = TypeVar('Cls', bound=Type[Any]) +Fn = TypeVar('Fn', bound=Callable[..., Any]) + + +def meta( + meta: Union[Mapping[str, Any], DataclassInstance] +) -> Callable[[Union[Cls, Fn]], Union[Cls, Fn]]: + """๐Ÿ€ ยป injects metadata into a class""" + + def decorator(callable: Union[Cls, Fn]) -> Union[Cls, Fn]: + if isinstance(meta, Mapping): + metadata = meta + else: + metadata = asdict(meta) + + inject_metadata(callable, metadata) + return callable + + return decorator diff --git a/tests/cfg/fixtures.py b/tests/cfg/fixtures.py index a83b024..0a4d9ec 100644 --- a/tests/cfg/fixtures.py +++ b/tests/cfg/fixtures.py @@ -1,97 +1,105 @@ -try: - from typing import TypeAlias, cast -except ImportError: - from typing_extensions import TypeAlias, cast - -from typing import Tuple - -import pytest -from fastapi import Request, Response -from fastapi.testclient import TestClient -from starlette.middleware.base import RequestResponseEndpoint - -from pest.core.application import PestApplication -from pest.core.module import Module -from pest.core.module import setup_module as _setup_module -from pest.factory import Pest -from tests.cfg.test_modules.di_scopes_primitives import Scoped, Transient - -from .test_apps.multi_singleton_app import app as multi_singleton_app -from .test_apps.todo_app import app as todo_app -from .test_modules.di_scopes_primitives import DIScopesModule, Singleton -from .test_modules.fastapi_dependencies import FastApiDependenciesModule -from .test_modules.fastapi_params import FastApiParamsModule -from .test_modules.pest_primitives import ( - Mod, - ModuleWithController, - ParentMod, -) - -TestApp: TypeAlias = Tuple[PestApplication, TestClient] - - -@pytest.fixture() -def mod() -> Mod: - return cast(Mod, _setup_module(Mod)) - - -@pytest.fixture() -def parent_mod() -> Module: - return _setup_module(ParentMod) - - -@pytest.fixture() -def module_with_controller() -> Module: - return _setup_module(ModuleWithController) - - -@pytest.fixture() -def app_n_client() -> TestApp: - app = todo_app.bootstrap_app() - client = TestClient(app) - return app, client - - -@pytest.fixture() -def multiple_singletons_app() -> TestApp: - app = multi_singleton_app.bootstrap_app() - client = TestClient(app) - return app, client - - -@pytest.fixture() -def di_app_n_client() -> TestApp: - app = Pest.create(root_module=DIScopesModule) - - @app.middleware('http') - async def middleware( - request: Request, - call_next: RequestResponseEndpoint, - scoped: Scoped, - singleton: Singleton, - transient: Transient, - ) -> Response: - response = await call_next(request) - response.headers['Scoped-Id'] = str(scoped.get_id()) - response.headers['Singleton-Id'] = str(singleton.get_id()) - response.headers['Transient-Id'] = str(transient.get_id()) - return response - - client = TestClient(app) - return app, client - - -@pytest.fixture() -def fastapi_params_app() -> TestApp: - app = Pest.create(root_module=FastApiParamsModule) - client = TestClient(app) - - return app, client - - -@pytest.fixture() -def fastapi_dependencies_app() -> TestApp: - app = Pest.create(root_module=FastApiDependenciesModule) - client = TestClient(app) - - return app, client +try: + from typing import TypeAlias, cast +except ImportError: + from typing_extensions import TypeAlias, cast + +from typing import Tuple + +import pytest +from fastapi import Request, Response +from fastapi.testclient import TestClient +from starlette.middleware.base import RequestResponseEndpoint + +from pest.core.application import PestApplication +from pest.core.module import Module +from pest.core.module import setup_module as _setup_module +from pest.factory import Pest +from tests.cfg.test_modules.di_scopes_primitives import Scoped, Transient + +from .test_apps.guards_app import app as guards_app +from .test_apps.multi_singleton_app import app as multi_singleton_app +from .test_apps.todo_app import app as todo_app +from .test_modules.di_scopes_primitives import DIScopesModule, Singleton +from .test_modules.fastapi_dependencies import FastApiDependenciesModule +from .test_modules.fastapi_params import FastApiParamsModule +from .test_modules.pest_primitives import ( + Mod, + ModuleWithController, + ParentMod, +) + +TestApp: TypeAlias = Tuple[PestApplication, TestClient] + + +@pytest.fixture() +def mod() -> Mod: + return cast(Mod, _setup_module(Mod)) + + +@pytest.fixture() +def parent_mod() -> Module: + return _setup_module(ParentMod) + + +@pytest.fixture() +def module_with_controller() -> Module: + return _setup_module(ModuleWithController) + + +@pytest.fixture() +def app_n_client() -> TestApp: + app = todo_app.bootstrap_app() + client = TestClient(app) + return app, client + + +@pytest.fixture() +def multiple_singletons_app() -> TestApp: + app = multi_singleton_app.bootstrap_app() + client = TestClient(app) + return app, client + + +@pytest.fixture() +def guards_annotated_app() -> TestApp: + app = guards_app.bootstrap_app() + client = TestClient(app) + return app, client + + +@pytest.fixture() +def di_app_n_client() -> TestApp: + app = Pest.create(root_module=DIScopesModule) + + @app.middleware('http') + async def middleware( + request: Request, + call_next: RequestResponseEndpoint, + scoped: Scoped, + singleton: Singleton, + transient: Transient, + ) -> Response: + response = await call_next(request) + response.headers['Scoped-Id'] = str(scoped.get_id()) + response.headers['Singleton-Id'] = str(singleton.get_id()) + response.headers['Transient-Id'] = str(transient.get_id()) + return response + + client = TestClient(app) + return app, client + + +@pytest.fixture() +def fastapi_params_app() -> TestApp: + app = Pest.create(root_module=FastApiParamsModule) + client = TestClient(app) + + return app, client + + +@pytest.fixture() +def fastapi_dependencies_app() -> TestApp: + app = Pest.create(root_module=FastApiDependenciesModule) + client = TestClient(app) + + return app, client diff --git a/tests/cfg/test_apps/guards_app/app.py b/tests/cfg/test_apps/guards_app/app.py new file mode 100644 index 0000000..7818df2 --- /dev/null +++ b/tests/cfg/test_apps/guards_app/app.py @@ -0,0 +1,73 @@ +import json +from base64 import b64decode +from dataclasses import dataclass +from typing import Any, Callable + +from fastapi import Request + +from pest import Guard, GuardCb, GuardExtra, Pest, controller, get, meta, module, use_guard +from pest.core.application import PestApplication +from pest.exceptions.http.http import UnauthorizedException + +try: + from typing import Annotated +except ImportError: + from typing_extensions import Annotated + + +@dataclass +class User: + id: int + name: str + role: str + + +class AuthGuard(Guard): + def can_activate(self, request: Request, *, context: Any, set_result: GuardCb) -> bool: + token = request.headers.get('Authorization') + roles = context.get('roles', []) + if not token: + raise UnauthorizedException('Token is required') + + try: + decoded_token = b64decode(token).decode() + token_data = json.loads(decoded_token) + if token_data.get('role') in roles: + set_result({'user': User(**token_data)}) + return True + else: + return False + except Exception: + return False + + +def roles(*role_list: str) -> Callable: + def wrapper(fn: Callable) -> Callable: + return meta({'roles': list(role_list)})(fn) + + return wrapper + + +@controller('/secure') +@use_guard(AuthGuard) +class Controller: + @get('/') + @roles('admin', 'superuser') + def annotated(self, user: Annotated[User, GuardExtra]) -> dict: + return {'message': "You're allowed to see this because your role is " + user.role} + + @get('/typed') + @roles('admin', 'superuser') + def get_typed(self, guard_result: GuardExtra) -> dict: + user: User = guard_result['user'] + return {'message': "You're allowed to see this because your role is " + user.role} + + +@module(controllers=[Controller]) +class AppModule: + pass + + +def bootstrap_app() -> PestApplication: + app = Pest.create(root_module=AppModule) + return app diff --git a/tests/test_guard.py b/tests/test_guard.py new file mode 100644 index 0000000..6e27402 --- /dev/null +++ b/tests/test_guard.py @@ -0,0 +1,47 @@ +import sys +from base64 import b64encode + +import pytest + + +@pytest.mark.asyncio +@pytest.mark.skipif(sys.version_info < (3, 9), reason='requires python3.9 or higher') +async def test_guards_annotated(guards_annotated_app) -> None: + """๐Ÿ€ guard :: annotated :: should run guards to decide if can activate route (>=3.9)""" + + _, client = guards_annotated_app + + admin_token = b64encode('{"id": 1, "name": "John Doe", "role": "admin"}'.encode()).decode() + user_token = b64encode('{"id": 1, "name": "Jane Doe", "role": "user"}'.encode()).decode() + + ok = client.get('/secure', headers={'Authorization': admin_token}).json() + assert ok == {'message': "You're allowed to see this because your role is admin"} + + forb = client.get('/secure', headers={'Authorization': user_token}) + assert forb.status_code == 403 + assert forb.json() == {'code': 403, 'error': 'Forbidden', 'message': 'Not authorized'} + + unauth = client.get('/secure') + assert unauth.status_code == 401 + assert unauth.json() == {'code': 401, 'error': 'Unauthorized', 'message': 'Token is required'} + + +@pytest.mark.asyncio +async def test_guards_typed(guards_annotated_app) -> None: + """๐Ÿ€ guard :: typed :: should run guards to decide if can activate route (>=3.8)""" + + _, client = guards_annotated_app + + admin_token = b64encode('{"id": 1, "name": "John Doe", "role": "admin"}'.encode()).decode() + user_token = b64encode('{"id": 1, "name": "Jane Doe", "role": "user"}'.encode()).decode() + + ok = client.get('/secure/typed', headers={'Authorization': admin_token}).json() + assert ok == {'message': "You're allowed to see this because your role is admin"} + + forb = client.get('/secure/typed', headers={'Authorization': user_token}) + assert forb.status_code == 403 + assert forb.json() == {'code': 403, 'error': 'Forbidden', 'message': 'Not authorized'} + + unauth = client.get('/secure/typed') + assert unauth.status_code == 401 + assert unauth.json() == {'code': 401, 'error': 'Unauthorized', 'message': 'Token is required'} diff --git a/tests/test_utils.py b/tests/test_utils.py index b0ff7ad..dcab0f8 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,118 +1,122 @@ -from dataclasses import dataclass - -from pest.core.module import setup_module as _setup_module -from pest.exceptions.http.http import exc_response -from pest.metadata.meta import get_meta, get_meta_value -from pest.utils.colorize import c -from pest.utils.decorators import meta -from pest.utils.module import _get_provider_name, as_tree - -from .cfg.test_modules.pest_primitives import FooModule - - -def test_colorize_text(): - """๐Ÿ€ utils :: colorize :: should return a colored string""" - colored = ( - f'๐Ÿค  >' - f'{c("Howdy", on_color="on_green")}' - f',{c("Cowboy", color="blue", attrs=["bold", "reverse"])}!' - ) - assert colored == '๐Ÿค  >\x1b[42mHowdy\x1b[0m,\x1b[7m\x1b[1m\x1b[34mCowboy\x1b[0m!' - - -def test_colorize_no_color_option(): - """๐Ÿ€ utils :: colorize :: should return the original string if color option is False""" - colored = f'๐Ÿค  > Howdy, {c("Cowboy", color="blue", attrs=["bold", "reverse"], no_color=True)}!' - assert colored == '๐Ÿค  > Howdy, Cowboy!' - - -def test_module_tree_generation(): - """๐Ÿ€ utils :: module :: should generate a tree representation of a module""" - module = _setup_module(FooModule) - - tree = as_tree(module) - - assert tree == ( - '\x1b[4m\x1b[32mFooModule\x1b[0m ๐Ÿ€\n โ”‚\n โ”‚\x1b[35m โ—‹ ProviderBaz\x1b[0m\n ' - 'โ”‚\x1b[34m โ–ก FooController\x1b[0m\n โ”œโ”€ Mod\n โ”‚\x1b[35m โ—‹ ProviderFoo\x1b[0m\n' - ' โ”‚\x1b[35m โ—‹ ProviderBar\x1b[0m\n' - ) - - class Foo: - provide = 'Bar' - - # test with str injection token - tree = _get_provider_name(Foo) - assert tree == 'Bar' - - -def test_meta_docorator_dict_meta(): - """๐Ÿ€ utils :: `meta` decorator :: should inject dict metadata into a class""" - - @dataclass - class QuuxMeta: - foo: str - baz: str - - @meta({'foo': 'bar', 'baz': 'qux'}) - class Quux: - pass - - metadata = get_meta(Quux) - foo_value = get_meta_value(Quux, 'foo', None) - baz_value = get_meta_value(Quux, 'baz', None) - - assert foo_value == 'bar' - assert baz_value == 'qux' - assert metadata == {'foo': 'bar', 'baz': 'qux'} - - meta_as_dataclass = get_meta(Quux, QuuxMeta) - assert isinstance(meta_as_dataclass, QuuxMeta) - assert meta_as_dataclass.foo == 'bar' - assert meta_as_dataclass.baz == 'qux' - - -def test_meta_docorator_dataclass_meta(): - """๐Ÿ€ utils :: `meta` decorator :: should inject dataclass metadata into a class""" - - @dataclass - class QuuxMeta: - foo: str - baz: str - - @meta(QuuxMeta(foo='foo', baz='baz')) - class Quux: - pass - - metadata = get_meta(Quux, QuuxMeta) - assert isinstance(metadata, QuuxMeta) - assert metadata.foo == 'foo' - assert metadata.baz == 'baz' - - foo_value = get_meta_value(Quux, 'foo', None) - baz_value = get_meta_value(Quux, 'baz', None) - - assert foo_value == 'foo' - assert baz_value == 'baz' - metadata = get_meta(Quux) - assert metadata == {'foo': 'foo', 'baz': 'baz'} - - -def test_exception_example_generator(): - example = exc_response(404, 418) - - not_found = example[404] - assert not_found['description'] == 'Not Found' - assert not_found['content']['application/json']['example'] == { - 'code': 404, - 'message': 'Detailed error message', - 'error': 'Not Found', - } - - tea_pot = example[418] - assert tea_pot['description'] == "I'm a teapot" - assert tea_pot['content']['application/json']['example'] == { - 'code': 418, - 'message': 'Detailed error message', - 'error': "I'm a teapot", - } +from dataclasses import dataclass + +from pest.core.module import setup_module as _setup_module +from pest.exceptions.http.http import exc_response +from pest.metadata.meta import get_meta, get_meta_value +from pest.utils.colorize import c +from pest.utils.decorators import meta +from pest.utils.module import _get_provider_name, as_tree + +from .cfg.test_modules.pest_primitives import FooModule + + +def test_colorize_text(): + """๐Ÿ€ utils :: colorize :: should return a colored string""" + colored = ( + f'๐Ÿค  >' + f'{c("Howdy", on_color="on_green")}' + f',{c("Cowboy", color="blue", attrs=["bold", "reverse"])}!' + ) + assert colored == '๐Ÿค  >\x1b[42mHowdy\x1b[0m,\x1b[7m\x1b[1m\x1b[34mCowboy\x1b[0m!' + + +def test_colorize_no_color_option(): + """๐Ÿ€ utils :: colorize :: should return the original string if color option is False""" + colored = f'๐Ÿค  > Howdy, {c("Cowboy", color="blue", attrs=["bold", "reverse"], no_color=True)}!' + assert colored == '๐Ÿค  > Howdy, Cowboy!' + + +def test_module_tree_generation(): + """๐Ÿ€ utils :: module :: should generate a tree representation of a module""" + module = _setup_module(FooModule) + + tree = as_tree(module) + + assert tree == ( + '\x1b[4m\x1b[32mFooModule\x1b[0m ๐Ÿ€\n โ”‚\n โ”‚\x1b[35m โ—‹ ProviderBaz\x1b[0m\n ' + 'โ”‚\x1b[34m โ–ก FooController\x1b[0m\n โ”œโ”€ Mod\n โ”‚\x1b[35m โ—‹ ProviderFoo\x1b[0m\n' + ' โ”‚\x1b[35m โ—‹ ProviderBar\x1b[0m\n' + ) + + class Foo: + provide = 'Bar' + + # test with str injection token + tree = _get_provider_name(Foo) + assert tree == 'Bar' + + +def test_meta_docorator_dict_meta(): + """๐Ÿ€ utils :: `meta` decorator :: should inject dict metadata into a class""" + + @dataclass + class QuuxMeta: + foo: str + baz: str + + @meta({'foo': 'bar', 'baz': 'qux'}) + class Quux: + pass + + metadata = get_meta(Quux) + foo_value = get_meta_value(Quux, 'foo', None) + baz_value = get_meta_value(Quux, 'baz', None) + + assert foo_value == 'bar' + assert baz_value == 'qux' + assert metadata == {'foo': 'bar', 'baz': 'qux'} + + meta_as_dataclass = get_meta(Quux, QuuxMeta) + assert isinstance(meta_as_dataclass, QuuxMeta) + assert meta_as_dataclass.foo == 'bar' + assert meta_as_dataclass.baz == 'qux' + + +def test_meta_docorator_dataclass_meta(): + """๐Ÿ€ utils :: `meta` decorator :: should inject dataclass metadata into a class""" + + @dataclass + class QuuxMeta: + foo: str + baz: str + + @meta(QuuxMeta(foo='foo', baz='baz')) + class Quux: + pass + + metadata = get_meta(Quux, QuuxMeta) + assert isinstance(metadata, QuuxMeta) + assert metadata.foo == 'foo' + assert metadata.baz == 'baz' + + foo_value = get_meta_value(Quux, 'foo', None) + baz_value = get_meta_value(Quux, 'baz', None) + + assert foo_value == 'foo' + assert baz_value == 'baz' + metadata = get_meta(Quux) + assert metadata == {'foo': 'foo', 'baz': 'baz'} + + +def test_exception_example_generator(): + """ + ๐Ÿ€ utils :: `exc_response` :: should generate the right example response for a given error code + """ + + example = exc_response(404, 418) + + not_found = example[404] + assert not_found['description'] == 'Not Found' + assert not_found['content']['application/json']['example'] == { + 'code': 404, + 'message': 'Detailed error message', + 'error': 'Not Found', + } + + tea_pot = example[418] + assert tea_pot['description'] == "I'm a teapot" + assert tea_pot['content']['application/json']['example'] == { + 'code': 418, + 'message': 'Detailed error message', + 'error': "I'm a teapot", + }