Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dag File Processing Slowness when using Dag Params #32434

Closed
1 of 2 tasks
dlstadther-pl opened this issue Jul 7, 2023 · 10 comments
Closed
1 of 2 tasks

Dag File Processing Slowness when using Dag Params #32434

dlstadther-pl opened this issue Jul 7, 2023 · 10 comments
Assignees
Labels
affected_version:2.6 Issues Reported for 2.6 area:serialization kind:bug This is a clearly a bug

Comments

@dlstadther-pl
Copy link

dlstadther-pl commented Jul 7, 2023

Apache Airflow version

2.6.2

What happened

After migrating from Airflow 2.2.3 to 2.6.2, we saw a large (~5-10x) increase in DAG File Processing time for our dags. While we have some anti-patterns with dag generation (dynamic dag generation and usage of 5 Airflow Variables), we have isolated the increase in processing duration to the existence of Dag Params (see "How to Reproduce", below).

We're experiencing this issue in our most complex dag file. This dag file creates 1 "main" dag which runs a TriggerDagRunOperator on each "client-specific" dags for which it generates dynamically. Each client-specific dag is assigned 5 Dag Params (which describe certain characteristics of the client) and about 400 tasks.

Dag files which used to take 0.58s now take 2.88s; 3s now take 30s; 95s now take 985s.

What you think should happen instead

I believe DAG Processing is inefficient at serializing dag params during the serialization of tasks.

(However, I have been unable to pinpoint a commit which caused a significant change to the serialization of DagBag.sync_to_db() code).

How to reproduce

I have reproduced the situation we experience locally with a representative (but dumb) dag example which can show that dag file processing runtimes increase as the quantity of Dag Params increase.

I realize this dag may be a bit complex and so I've also included the visual representation of how the dags relate to each other and many of the Dag File Processing times for 2.2.3 and 2.6.2 when using various quantity of Dag Params in the client-specific dag definitions.

Code

import datetime
import random
import time
from typing import Any, Dict, List

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.state import State
from airflow.utils.task_group import TaskGroup


# in practice, this CLIENTS_METADATA comes from an Airflow Variable which is set by a different DAG which runs multiple times per day
# e.g. CLIENTS_METADATA: List[Dict[str, Any]] = Variable.get('CLIENTS_METADATA', deserialize_json=True, default_var=list())
# for the sake of an example, we'll create fake client metadata and allow the client qty to be configurable, but deterministic
client_qty = 150
CLIENTS_METADATA: List[Dict[str, Any]] = list()
for i in range(client_qty):
    # if even, set certain values
    if i // 2 == 0:
        client_group = "4"
        shared_resource_group = "us-01"
        client_size = "normal"
    else:
        client_group = "3"
        shared_resource_group = "us-02"
        client_size = "large"
    CLIENTS_METADATA.append(
        {
            "clientId": f"client{i}",
            "clientGroup": client_group,
            "sharedResourceGroup": shared_resource_group,
            "isDataIsolated": False,
            "resourceTags": {"size": client_size},
        }
    )

DE_STACK_NAME = "de-prod-us-01"


ARGS = {
    'owner': 'airflow',
    'start_date': datetime.datetime(2000, 1, 1, 0, 0, 0),
}


def task(**kwargs):
    # would normally receive dag params and use them in the job,
    # but this is a dumb example dag that doesn't actually do anything
    rand_int = random.randint(1, 10)
    print(f"Sleeping: {rand_int}s")
    time.sleep(rand_int)
    return rand_int


def build_abstract_dag(client: dict) -> DAG:
    """One copy of the graph per client"""
    client_id = client.get('clientId')
    client_group = client.get('clientGroup')
    resource_tags = client.get('resourceTags')
    resource_group = client.get('sharedResourceGroup')

    dag_id = 'client-specific-dag-{}'.format(client_id)

    dag = DAG(
        dag_id=dag_id,
        default_args=ARGS,
        schedule_interval=None,
        is_paused_upon_creation=False,
        catchup=False,
        # CHANGE THE QTY OF PARAMS HERE TO SEE THE IMPACT TO DAG PROCESSING RUNTIME
        params={
            'client_id': client_id,
            'client_group': client_group,
            'resource_tags': resource_tags,
            'resource_group': resource_group,
            'airflow_stack': DE_STACK_NAME,
        },
    )

    # mimic multiple sets of jobs
    qty_task_groups = 100
    for i in range(qty_task_groups):
        with TaskGroup(f"task_group_{i}", dag=dag):
            # mimic our generic job structure
            #   data generation job (t1)
            #   distribution jobs (t2 and t3)
            #   visualization or validation job (t4)
            t1 = PythonOperator(dag=dag, task_id="task_1", python_callable=task, retries=0)
            t2 = PythonOperator(dag=dag, task_id="task_2", python_callable=task, retries=0)
            t3 = PythonOperator(dag=dag, task_id="task_3", python_callable=task, retries=0)
            t4 = PythonOperator(dag=dag, task_id="task_4", python_callable=task, retries=0)

            [t3, t2] << t1
            t4 << t1

    return dag


# after all clients complete, go back and requeue anything which didn't succeed
# addresses network-related transient errors which aren't covered by default retry config
def retry_all_main_graph_tasks(retry_meta: list, **context):
    print(f"Retrying: {retry_meta}")


dag_meta = DAG(
    dag_id='a-trigger-dag',
    default_args=ARGS,
    dagrun_timeout=datetime.timedelta(hours=8),
    is_paused_upon_creation=True,
    catchup=False,
    schedule_interval='00 05 * * *',  # daily, at 0500 UTC
)

trigger_dag_tasks = list()
# List containing meta around the retry tasks and triggered dag per client
# Each list record is a dict {'trigger_task_id': <str>, 'dag': <Dag>}
retry_main_graph_tasks_meta = list()

# Create 1 Abstract DAG per client
for client_data in CLIENTS_METADATA:
    abstract_dag = build_abstract_dag(
        client=client_data,
    )
    trigger_dag_id = abstract_dag.dag_id
    globals()[trigger_dag_id] = abstract_dag

    trigger_operator = TriggerDagRunOperator(
        dag=dag_meta,
        task_id='trigger_{}'.format(trigger_dag_id),
        retries=1,
        trigger_dag_id=trigger_dag_id,
        wait_for_completion=True,
        # "success" or "failed" states means the client DAG was triggered successfully.
        # trigger_l3_main_{CLIENT_ID} task failures indicate the DAG was never triggered.
        allowed_states=[State.SUCCESS, State.FAILED],
        # https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/operators/trigger_dagrun.html
        # we cannot set failed_states to None or empty list, as it will set failed_states = ['failed']
        # in the BaseOperator initialization. The code snippet:
        # `self.failed_states = failed_states or [State.FAILED]`
        failed_states=['not-a-state'],
        execution_date='{{ data_interval_start }}',
        execution_timeout=datetime.timedelta(hours=2),
        pool='main_graph_trigger',
    )
    trigger_dag_tasks.append(trigger_operator)
    retry_main_graph_tasks_meta.append(
        dict(
            trigger_task_id=trigger_operator.task_id,
            dag=abstract_dag,
        )
    )

retry_main_graph_tasks = PythonOperator(
    dag=dag_meta,
    task_id='retry_all_main_graph_tasks',
    retries=1,
    trigger_rule='all_done',
    op_kwargs={
        'retry_meta': retry_main_graph_tasks_meta,
    },
    pool='batch',
    python_callable=retry_all_main_graph_tasks,
)
retry_main_graph_tasks << trigger_dag_tasks

Creates:
airflow-sample-dag

Runtimes

  • client_qty = 1
Dag Param Qty Runtime (2.2.3) Runtime (2.6.2)
0 0.3s 1s
1 2s
2 3s
3 4s
4 5s
5 0.4s 6s
  • client_qty = 10
Dag Param Qty Runtime (2.2.3) Runtime (2.6.2)
0 0.3s 4s
1 20s
2 25s
3 30s
4 35s
5 3s 40s
  • client_qty = 150
Dag Param Qty Runtime (2.2.3) Runtime (2.6.2)
0 25s 50s
5 50s 900s (15m)

Operating System

Debian 11 (bullseye)

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

  • kubernetes 1.24
  • official helm chart 1.9.0
  • standalone dag processor
  • celery executor
  • postgres database (with pgbouncer)

Anything else

I've also recreated the same issue (with the sample code provided above) using the Airflow Docker Compose setup.

Our issue differs from #30593 and #30884 , as we are already on 2.6.x and use the default value (5s) for job_heartbeat_sec.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@dlstadther-pl dlstadther-pl added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jul 7, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Jul 7, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@hussein-awala
Copy link
Member

@dlstadther-pl I'll take a look and try to reproduce the problem.

@hussein-awala hussein-awala added area:serialization and removed area:core needs-triage label for new issues that we didn't triage yet labels Jul 7, 2023
@hussein-awala hussein-awala self-assigned this Jul 7, 2023
@eladkal eladkal added the affected_version:2.6 Issues Reported for 2.6 label Jul 8, 2023
@tirkarthi
Copy link
Contributor

See also a previous report in the past regarding params and dag processing slowdown. Can you please share the jsonschema version?

Ref : #28445

@hussein-awala
Copy link
Member

@tirkarthi I have reproduced the issue using jsonschema=4.17.3. In an attempt to resolve this, I have replaced jsonschema with fastjsonschema. Based on initial results, the execution time has been reduced to one second instead of the previous five seconds per dag.

@tirkarthi
Copy link
Contributor

@hussein-awala As per the jsonschema project it seems they did a major rewrite that fixes the performance issue. The fix pr was merged and 4.18.0 was released 2 days back. Can you please try to see if 4.18.0 fixes the issue?

python-jsonschema/jsonschema#941
Fix PR : python-jsonschema/jsonschema#1049
Comment from Maintainer : python-jsonschema/jsonschema#941 (comment)
https://github.com/python-jsonschema/jsonschema/releases/tag/v4.18.0

@hussein-awala
Copy link
Member

I just tested it, the parsing time has been reduced to approximately 2-3 seconds. While this is an improvement over jsonschema=4.17.3, fastjsonschema remains the faster option. wdyt?

@tirkarthi
Copy link
Contributor

Since jsonschema is already used by Airflow and this version bump is also promised to be backwards compatible by upstream I tend to just bump jsonschema to see if CI passes since it seems better fix in terms of compatibility and also improves the situation without any code changes and less risky. I don't see fastjsonschema as promised to be compatible with jsonschema in their docs. Maybe fastjsonschema could be a different issue since it will involve code changes and testing the API with existing code.

https://pypistats.org/packages/jsonschema
https://pypistats.org/packages/fastjsonschema

@dlstadther-pl
Copy link
Author

Thank you @hussein-awala and @tirkarthi for the quick engagement on this issue!


I have forced my local Airflow 2.6.2 image to include jsonschema==4.18.0 and am seeing improved dag processing runtimes (not as quick as 2.2.3, but better than plain 2.6.2). In the largest case of the "reproducible example" (150 clients with 5 dag params), the runtime decreased from 900s to 350s.

Would the rest of the delta (between 2.2.3's 50s and the now 350s) be explained by additional validation checks against dag params? Or could there be another performance issue at play?

@hussein-awala
Copy link
Member

@dlstadther-pl I'm trying to switch from using jsonschema to fastjsonschema. But I should be careful as much as possible during the testing phase to prevent any potential breaking changes, because there may be differences in validators between the two libraries. So this could take some time. (I hope having this ready in 2.7.0)

@dlstadther-pl
Copy link
Author

After forcing the upgrade of jsonschema==4.18.0 in our various pre-prod and prod environments, I can confirm that dag processing runtimes are back to normal durations.

Glad it was a simple as incrementing a dependency version.

Thanks again @hussein-awala ! I'm good to close this issue as resolved now that your PR is merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.6 Issues Reported for 2.6 area:serialization kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

4 participants