Skip to content

Commit

Permalink
Ensures DAG params order regardless of backend
Browse files Browse the repository at this point in the history
Fixes #40154

This change adds an extra attribute to the serialized DAG param objects which helps us decide
the order of the deserialized params dictionary later even if the backend messes with us.

I decided not to limit this just to MySQL since the operation is inexpensive and may turn
out to be helpful.

I made sure the new test fails with the old implementation + MySQL. I assume this test will be
executed with MySQL somewhere in the build actions?
  • Loading branch information
Usiel committed Jun 10, 2024
1 parent fc4fbb3 commit 0112221
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
13 changes: 10 additions & 3 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,14 +831,17 @@ def is_serialized(val):
def _serialize_params_dict(cls, params: ParamsDict | dict):
"""Serialize Params dict for a DAG or task."""
serialized_params = {}
for k, v in params.items():
for idx, item in enumerate(params.items()):
k, v = item
# TODO: As of now, we would allow serialization of params which are of type Param only.
try:
class_identity = f"{v.__module__}.{v.__class__.__name__}"
except AttributeError:
class_identity = ""
if class_identity == "airflow.models.param.Param":
serialized_params[k] = cls._serialize_param(v)
serialized_param = cls._serialize_param(v)
serialized_param["__position"] = idx
serialized_params[k] = serialized_param
else:
raise ValueError(
f"Params to a DAG or a Task can be only of type airflow.models.param.Param, "
Expand All @@ -850,7 +853,11 @@ def _serialize_params_dict(cls, params: ParamsDict | dict):
def _deserialize_params_dict(cls, encoded_params: dict) -> ParamsDict:
"""Deserialize a DAG's Params dict."""
op_params = {}
for k, v in encoded_params.items():
sorted_params = sorted(
encoded_params.items(),
key=lambda item: item[1].get("__position", 0) if isinstance(item[1], dict) else 0,
)
for k, v in sorted_params:
if isinstance(v, dict) and "__class" in v:
op_params[k] = cls._deserialize_param(v)
else:
Expand Down
17 changes: 17 additions & 0 deletions tests/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,23 @@ def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields):
expected_dependencies = {dag_id: [] for dag_id in example_dags}
assert SDM.get_dag_dependencies() == expected_dependencies

def test_order_of_dag_params_is_stable(self):
"""
https://github.com/apache/airflow/issues/40154
This asserts that we have logic in place which guarantees the order
of the params is maintained - even if the backend (e.g. MySQL) mutates
the serialized DAG JSON.
"""
example_dags = make_example_dags(example_dags_module)
example_params_trigger_ui = example_dags.get("example_params_trigger_ui")
before = list(example_params_trigger_ui.params.keys())

SDM.write_dag(example_params_trigger_ui)
retrieved_dag = SDM.get_dag("example_params_trigger_ui")
after = list(retrieved_dag.params.keys())

assert before == after

def test_order_of_deps_is_consistent(self):
"""
Previously the 'dag_dependencies' node in serialized dag was converted to list from set.
Expand Down

0 comments on commit 0112221

Please sign in to comment.