Skip to content

Commit

Permalink
Add an example for dynamic task mapping with non-TaskFlow operator (#…
Browse files Browse the repository at this point in the history
…29762)

(cherry picked from commit 4d4c2b9)
  • Loading branch information
hussein-awala authored and Elad Kalif committed Jun 8, 2023
1 parent 3563023 commit e39ca99
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of dynamic task mapping with non-TaskFlow operators."""
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.models.baseoperator import BaseOperator


class AddOneOperator(BaseOperator):
"""A custom operator that adds one to the input."""

def __init__(self, value, **kwargs):
super().__init__(**kwargs)
self.value = value

def execute(self, context):
return self.value + 1


class SumItOperator(BaseOperator):
"""A custom operator that sums the input."""

template_fields = ("values",)

def __init__(self, values, **kwargs):
super().__init__(**kwargs)
self.values = values

def execute(self, context):
total = sum(self.values)
print(f"Total was {total}")
return total


with DAG(
dag_id="example_dynamic_task_mapping_with_no_taskflow_operators",
start_date=datetime(2022, 3, 4),
catchup=False,
):
# map the task to a list of values
add_one_task = AddOneOperator.partial(task_id="add_one").expand(value=[1, 2, 3])

# aggregate (reduce) the mapped tasks results
sum_it_task = SumItOperator(task_id="sum_it", values=add_one_task.output)
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,8 @@ Mapping with non-TaskFlow operators
It is possible to use ``partial`` and ``expand`` with classic style operators as well. Some arguments are not mappable and must be passed to ``partial()``, such as ``task_id``, ``queue``, ``pool``, and most other arguments to ``BaseOperator``.


.. code-block:: python
BashOperator.partial(task_id="bash", do_xcom_push=False).expand(bash_command=["echo 1", "echo 2"])
.. exampleinclude:: /../../airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py
:language: python

.. note:: Only keyword arguments are allowed to be passed to ``partial()``.

Expand Down
4 changes: 3 additions & 1 deletion tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import airflow
from airflow.datasets import Dataset
from airflow.decorators import teardown
from airflow.decorators.base import DecoratedOperator
from airflow.exceptions import AirflowException, SerializationError
from airflow.hooks.base import BaseHook
from airflow.kubernetes.pod_generator import PodGenerator
Expand Down Expand Up @@ -615,7 +616,8 @@ def validate_deserialized_task(
# data; checking its entirety basically duplicates this validation
# function, so we just do some satiny checks.
serialized_task.operator_class["_task_type"] == type(task).__name__
serialized_task.operator_class["_operator_name"] == task._operator_name
if isinstance(serialized_task.operator_class, DecoratedOperator):
serialized_task.operator_class["_operator_name"] == task._operator_name

# Serialization cleans up default values in partial_kwargs, this
# adds them back to both sides.
Expand Down
2 changes: 2 additions & 0 deletions tests/www/views/test_views_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ def test_dag_autocomplete_success(client_all_dags):
)
expected = [
{"name": "airflow", "type": "owner"},
{"name": "example_dynamic_task_mapping_with_no_taskflow_operators", "type": "dag"},
{"name": "example_setup_teardown_taskflow", "type": "dag"},
{"name": "test_mapped_taskflow", "type": "dag"},
{"name": "tutorial_taskflow_api", "type": "dag"},
{"name": "tutorial_taskflow_api_virtualenv", "type": "dag"},
Expand Down

0 comments on commit e39ca99

Please sign in to comment.