-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an example for dynamic task mapping with non-TaskFlow operator #29762
Add an example for dynamic task mapping with non-TaskFlow operator #29762
Conversation
It seems like the serialized task from |
@@ -177,9 +177,8 @@ Mapping with non-TaskFlow operators | |||
It is possible to use ``partial`` and ``expand`` with classic style operators as well. Some arguments are not mappable and must be passed to ``partial()``, such as ``task_id``, ``queue``, ``pool``, and most other arguments to ``BaseOperator``. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe explicitly mention something about the example sum_it
being a "reduce" task?
airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py
Outdated
Show resolved
Hide resolved
airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py
Outdated
Show resolved
Hide resolved
@josh-fell @potiuk The test
but in the original_partial_kwargs it's a function. Is it supposed to keep the function as it is in the serialized mapped task or it's bug in test? |
This is a very good question @uranusjr ? |
I excluded callable from comparison because currently we serialize callable as str and there is no way to restore the original callable: airflow/airflow/serialization/serialized_objects.py Lines 446 to 447 in 778c3f6
If there is a need to improve this serialization, then this should be done in a separate PR, IMO there is no problem to merge this one with this condition. |
airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py
Outdated
Show resolved
Hide resolved
I wonder if it’d be worthwhile to make the example not use PythonOperator—it’s not that valuable to know about using this particular operator since you really should use taskflow instead. |
@uranusjr I'd say that's a good idea, I can use this one: """Example DAG demonstrating the usage of dynamic task mapping with non-TaskFlow operators."""
from __future__ import annotations
from datetime import datetime
from airflow import DAG
from airflow.models import BaseOperator
class AddOneOperator(BaseOperator):
"""A custom operator that adds one to the input."""
def __init__(self, input, **kwargs):
super().__init__(**kwargs)
self.input = input
def execute(self, context):
return self.input + 1
class SumItOperator(BaseOperator):
"""A custom operator that sums the input."""
template_fields = ("values",)
def __init__(self, values, **kwargs):
super().__init__(**kwargs)
self.values = values
def execute(self, context):
total = sum(self.values)
print(f"Total was {total}")
return total
with DAG(
dag_id="example_dynamic_task_mapping_with_no_taskflow_operators",
start_date=datetime(2022, 3, 4),
catchup=False,
):
# map the task to a list of values
add_one_task = AddOneOperator.partial(task_id="add_one").expand(input=[1, 2, 3])
# aggregate (reduce) the mapped tasks results
sum_it_task = SumItOperator(task_id="sum_it", values=add_one_task.output) WDYT? |
Sounds good! |
eec1562
to
1ac13a0
Compare
039e75d
to
0c459bd
Compare
closes: #29754
Add a full example for dynamic task mapping with non-TaskFlow operator and include it in dynamic task mapping documentation.