Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an example for dynamic task mapping with non-TaskFlow operator #29762

Merged

Conversation

hussein-awala
Copy link
Member

closes: #29754


Add a full example for dynamic task mapping with non-TaskFlow operator and include it in dynamic task mapping documentation.

@hussein-awala
Copy link
Member Author

It seems like the serialized task from Mapped(_PythonDecoratedOperator) has a field _operator_name which we use in this test, but the serialized task from Mapped(PythonOperator) does not. I will check if it's a bug in the serialization code or a problem in the test.

@hussein-awala hussein-awala marked this pull request as draft February 25, 2023 19:14
@@ -177,9 +177,8 @@ Mapping with non-TaskFlow operators
It is possible to use ``partial`` and ``expand`` with classic style operators as well. Some arguments are not mappable and must be passed to ``partial()``, such as ``task_id``, ``queue``, ``pool``, and most other arguments to ``BaseOperator``.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe explicitly mention something about the example sum_it being a "reduce" task?

@eladkal eladkal added this to the Airflow 2.5.2 milestone Mar 4, 2023
@eladkal eladkal added the type:doc-only Changelog: Doc Only label Mar 4, 2023
@hussein-awala
Copy link
Member Author

@josh-fell @potiuk The test test_deserialization_across_process failes because of this assertion, where in serialized_partial_kwargs, python_callable is a string:

'def add_one(x: int):\n    return x + 1\n'

but in the original_partial_kwargs it's a function.

Is it supposed to keep the function as it is in the serialized mapped task or it's bug in test?

@potiuk
Copy link
Member

potiuk commented Mar 5, 2023

Is it supposed to keep the function as it is in the serialized mapped task or it's bug in test?

This is a very good question @uranusjr ?

@hussein-awala hussein-awala marked this pull request as ready for review May 26, 2023 09:48
@hussein-awala
Copy link
Member Author

I excluded callable from comparison because currently we serialize callable as str and there is no way to restore the original callable:

elif callable(var):
return str(get_python_source(var))

If there is a need to improve this serialization, then this should be done in a separate PR, IMO there is no problem to merge this one with this condition.

@uranusjr
Copy link
Member

I wonder if it’d be worthwhile to make the example not use PythonOperator—it’s not that valuable to know about using this particular operator since you really should use taskflow instead.

@hussein-awala
Copy link
Member Author

hussein-awala commented May 30, 2023

I wonder if it’d be worthwhile to make the example not use PythonOperator—it’s not that valuable to know about using this particular operator since you really should use taskflow instead.

@uranusjr I'd say that's a good idea, I can use this one:

"""Example DAG demonstrating the usage of dynamic task mapping with non-TaskFlow operators."""
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.models import BaseOperator


class AddOneOperator(BaseOperator):
    """A custom operator that adds one to the input."""

    def __init__(self, input, **kwargs):
        super().__init__(**kwargs)
        self.input = input

    def execute(self, context):
        return self.input + 1


class SumItOperator(BaseOperator):
    """A custom operator that sums the input."""

    template_fields = ("values",)

    def __init__(self, values, **kwargs):
        super().__init__(**kwargs)
        self.values = values

    def execute(self, context):
        total = sum(self.values)
        print(f"Total was {total}")
        return total


with DAG(
    dag_id="example_dynamic_task_mapping_with_no_taskflow_operators",
    start_date=datetime(2022, 3, 4),
    catchup=False,
):
    # map the task to a list of values
    add_one_task = AddOneOperator.partial(task_id="add_one").expand(input=[1, 2, 3])

    # aggregate (reduce) the mapped tasks results
    sum_it_task = SumItOperator(task_id="sum_it", values=add_one_task.output)

WDYT?

@uranusjr
Copy link
Member

Sounds good!

@hussein-awala hussein-awala force-pushed the docs/dtm_with_no_taskflow_op_example branch from eec1562 to 1ac13a0 Compare May 30, 2023 15:27
@hussein-awala hussein-awala force-pushed the docs/dtm_with_no_taskflow_op_example branch from 039e75d to 0c459bd Compare May 30, 2023 18:46
@hussein-awala hussein-awala merged commit 4d4c2b9 into apache:main May 31, 2023
eladkal pushed a commit that referenced this pull request Jun 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add classic operator example for dynamic task mapping "reduce" task
8 participants