Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Untangle airflow/decorators/__init__.py[i] names #21056

Merged
merged 1 commit into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions airflow/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,43 @@
# specific language governing permissions and limitations
# under the License.

from typing import Any

from airflow.decorators.base import TaskDecorator
from airflow.decorators.python import python_task
from airflow.decorators.python_virtualenv import virtualenv_task
from airflow.decorators.task_group import task_group
from airflow.models.dag import dag
from airflow.providers_manager import ProvidersManager

__all__ = ["dag", "task", "task_group", "python_task", "virtualenv_task"]
# Please keep this in sync with the .pyi's __all__.
__all__ = [
"TaskDecorator",
"TaskDecoratorCollection",
"dag",
"task",
"task_group",
"python_task",
"virtualenv_task",
]


class _TaskDecoratorFactory:
class TaskDecoratorCollection:
"""Implementation to provide the ``@task`` syntax."""

python = staticmethod(python_task)
python: Any = staticmethod(python_task)
virtualenv = staticmethod(virtualenv_task)

__call__ = python # Alias '@task' to '@task.python'.

def __getattr__(self, name: str) -> TaskDecorator:
"""Dynamically get provider-registered task decorators, e.g. ``@task.docker``."""
if name.startswith("__"):
raise AttributeError(f'{type(self).__name__} has no attribute {name!r}')
raise AttributeError(f"{type(self).__name__} has no attribute {name!r}")
decorators = ProvidersManager().taskflow_decorators
if name not in decorators:
raise AttributeError(f"task decorator {name!r} not found")
return decorators[name]


task = _TaskDecoratorFactory()
task = TaskDecoratorCollection()
48 changes: 23 additions & 25 deletions airflow/decorators/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,26 @@

from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, TypeVar, Union, overload

__all__ = ["task"]
from airflow.decorators.base import TaskDecorator
from airflow.decorators.python import python_task
from airflow.decorators.python_virtualenv import virtualenv_task
from airflow.decorators.task_group import task_group
from airflow.models.dag import dag

F = TypeVar("F", bound=Callable)
# Please keep this in sync with __init__.py's __all__.
__all__ = [
"TaskDecorator",
"TaskDecoratorCollection",
"dag",
"task",
"task_group",
"python_task",
"virtualenv_task",
]

TaskDecorator = Callable[[F], F]
Function = TypeVar("Function", bound=Callable)

class TaskDecoratorFactory:
class TaskDecoratorCollection:
@overload
def python(
self,
Expand Down Expand Up @@ -57,35 +70,21 @@ class TaskDecoratorFactory:
"""
# [START mixin_for_typing]
@overload
def python(self, python_callable: F) -> F: ...
def python(self, python_callable: Function) -> Function: ...
# [END mixin_for_typing]
@overload
def __call__(
self,
*,
multiple_outputs: Optional[bool] = None,
# Should match 'python()' signature.
templates_dict: Optional[Mapping[str, Any]] = None,
show_return_value_in_logs: bool = True,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a task.

:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
in your callable's context after the template has been applied
:param show_return_value_in_logs: a bool value whether to show return_value
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
"""
"""Aliasing ``python``; signature should match exactly."""
@overload
def __call__(self, python_callable: F) -> F: ...
def __call__(self, python_callable: Function) -> Function:
"""Aliasing ``python``; signature should match exactly."""
@overload
def virtualenv(
self,
Expand Down Expand Up @@ -127,9 +126,8 @@ class TaskDecoratorFactory:
such as transmission a large amount of XCom to TaskAPI.
"""
@overload
def virtualenv(self, python_callable: F) -> F: ...
def virtualenv(self, python_callable: Function) -> Function: ...
# [START decorator_signature]
@overload
def docker(
self,
*,
Expand Down Expand Up @@ -228,4 +226,4 @@ class TaskDecoratorFactory:
"""
# [END decorator_signature]

task = TaskDecoratorFactory()
task: TaskDecoratorCollection
12 changes: 6 additions & 6 deletions airflow/decorators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ def _hook_apply_defaults(self, *args, **kwargs):
return args, kwargs


T = TypeVar("T", bound=Callable)
Function = TypeVar("Function", bound=Callable)

OperatorSubclass = TypeVar("OperatorSubclass", bound="BaseOperator")


@attr.define(slots=False)
class _TaskDecorator(Generic[T, OperatorSubclass]):
class _TaskDecorator(Generic[Function, OperatorSubclass]):
"""
Helper class for providing dynamic task mapping to decorated functions.

Expand All @@ -211,7 +211,7 @@ class _TaskDecorator(Generic[T, OperatorSubclass]):
:meta private:
"""

function: T = attr.ib(validator=attr.validators.is_callable())
function: Function = attr.ib(validator=attr.validators.is_callable())
operator_class: Type[OperatorSubclass]
multiple_outputs: bool = attr.ib()
kwargs: Dict[str, Any] = attr.ib(factory=dict)
Expand Down Expand Up @@ -296,7 +296,7 @@ def map(

def partial(
self, *, dag: Optional["DAG"] = None, task_group: Optional["TaskGroup"] = None, **kwargs
) -> "_TaskDecorator[T, OperatorSubclass]":
) -> "_TaskDecorator[Function, OperatorSubclass]":
self._validate_arg_names("partial", kwargs, {'task_id'})
partial_kwargs = self.kwargs.copy()
partial_kwargs.update(kwargs)
Expand All @@ -307,11 +307,11 @@ class TaskDecorator(Protocol):
"""Type declaration for ``task_decorator_factory`` return type."""

@overload
def __call__(self, python_callable: T) -> T:
def __call__(self, python_callable: Function) -> Function:
"""For the "bare decorator" ``@task`` case."""

@overload
def __call__(self, *, multiple_outputs: Optional[bool], **kwargs: Any) -> "TaskDecorator":
def __call__(self, *, multiple_outputs: Optional[bool], **kwargs: Any) -> Callable[[Function], Function]:
"""For the decorator factory ``@task()`` case."""


Expand Down
6 changes: 3 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
from airflow.utils.types import NOTSET, ArgNotSet, DagRunType, EdgeInfoType

if TYPE_CHECKING:
from airflow.decorators import TaskDecoratorFactory
from airflow.decorators import TaskDecoratorCollection
from airflow.models.slamiss import SlaMiss
from airflow.utils.task_group import TaskGroup

Expand Down Expand Up @@ -2112,10 +2112,10 @@ def get_downstream(task, level=0):
get_downstream(t)

@property
def task(self) -> "TaskDecoratorFactory":
def task(self) -> "TaskDecoratorCollection":
from airflow.decorators import task

return cast("TaskDecoratorFactory", functools.partial(task, dag=self))
return cast("TaskDecoratorCollection", functools.partial(task, dag=self))

def add_task(self, task):
"""
Expand Down
5 changes: 3 additions & 2 deletions docs/apache-airflow/howto/create-custom-decorator.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ automatically provided by default. All other arguments should be copied directly
and we recommend adding a comment to explain what arguments are filled automatically by FooDecoratedOperator
and thus not included.

You should also add an overload that takes a single callable immediately after the "real" definition so mypy can recognize the function as
a decorator:
If the new decorator can be used without arguments (e.g. ``@task.python`` instead of ``@task.python()``),
You should also add an overload that takes a single callable immediately after the "real" definition so mypy
can recognize the function as a "bare decorator":

.. exampleinclude:: ../../../airflow/decorators/__init__.pyi
:language: python
Expand Down