Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensures DAG params order regardless of backend #40156

Merged
merged 4 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
"dag": {
"type": "object",
"properties": {
"params": { "$ref": "#/definitions/params_dict" },
"params": { "$ref": "#/definitions/params" },
"_dag_id": { "type": "string" },
"tasks": { "$ref": "#/definitions/tasks" },
"timezone": { "$ref": "#/definitions/timezone" },
Expand Down Expand Up @@ -206,9 +206,13 @@
"type": "array",
"additionalProperties": { "$ref": "#/definitions/operator" }
},
"params_dict": {
"type": "object",
"additionalProperties": {"$ref": "#/definitions/param" }
"params": {
"type": "array",
"prefixItems": [
{ "type": "string" },
{ "$ref": "#/definitions/param" }
],
"unevaluatedItems": false
},
"param": {
"$comment": "A param for a dag / operator",
Expand Down Expand Up @@ -258,7 +262,7 @@
"retry_delay": { "$ref": "#/definitions/timedelta" },
"retry_exponential_backoff": { "type": "boolean" },
"max_retry_delay": { "$ref": "#/definitions/timedelta" },
"params": { "$ref": "#/definitions/params_dict" },
"params": { "$ref": "#/definitions/params" },
"priority_weight": { "type": "number" },
"weight_rule": { "type": "string" },
"executor": { "type": "string" },
Expand Down
18 changes: 12 additions & 6 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,17 +828,17 @@ def is_serialized(val):
return class_(**kwargs)

@classmethod
def _serialize_params_dict(cls, params: ParamsDict | dict):
"""Serialize Params dict for a DAG or task."""
serialized_params = {}
def _serialize_params_dict(cls, params: ParamsDict | dict) -> list[tuple[str, dict]]:
"""Serialize Params dict for a DAG or task as a list of tuples to ensure ordering."""
serialized_params = []
for k, v in params.items():
# TODO: As of now, we would allow serialization of params which are of type Param only.
try:
class_identity = f"{v.__module__}.{v.__class__.__name__}"
except AttributeError:
class_identity = ""
if class_identity == "airflow.models.param.Param":
serialized_params[k] = cls._serialize_param(v)
serialized_params.append((k, cls._serialize_param(v)))
else:
raise ValueError(
f"Params to a DAG or a Task can be only of type airflow.models.param.Param, "
Expand All @@ -847,10 +847,16 @@ def _serialize_params_dict(cls, params: ParamsDict | dict):
return serialized_params

@classmethod
def _deserialize_params_dict(cls, encoded_params: dict) -> ParamsDict:
def _deserialize_params_dict(cls, encoded_params: list[tuple[str, dict]]) -> ParamsDict:
"""Deserialize a DAG's Params dict."""
if isinstance(encoded_params, collections.abc.Mapping):
# in 2.9.2 or earlier params were serialized as JSON objects
encoded_param_pairs: Iterable[tuple[str, dict]] = encoded_params.items()
else:
encoded_param_pairs = encoded_params

op_params = {}
for k, v in encoded_params.items():
for k, v in encoded_param_pairs:
if isinstance(v, dict) and "__class" in v:
op_params[k] = cls._deserialize_param(v)
else:
Expand Down
16 changes: 16 additions & 0 deletions tests/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,22 @@ def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields):
expected_dependencies = {dag_id: [] for dag_id in example_dags}
assert SDM.get_dag_dependencies() == expected_dependencies

def test_order_of_dag_params_is_stable(self):
"""
This asserts that we have logic in place which guarantees the order
of the params is maintained - even if the backend (e.g. MySQL) mutates
the serialized DAG JSON.
"""
example_dags = make_example_dags(example_dags_module)
example_params_trigger_ui = example_dags.get("example_params_trigger_ui")
before = list(example_params_trigger_ui.params.keys())

SDM.write_dag(example_params_trigger_ui)
retrieved_dag = SDM.get_dag("example_params_trigger_ui")
after = list(retrieved_dag.params.keys())

assert before == after

def test_order_of_deps_is_consistent(self):
"""
Previously the 'dag_dependencies' node in serialized dag was converted to list from set.
Expand Down
42 changes: 32 additions & 10 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i
},
"edge_info": {},
"dag_dependencies": [],
"params": {},
"params": [],
},
}

Expand Down Expand Up @@ -2082,6 +2082,25 @@ def test_params_upgrade(self):
assert isinstance(dag.params.get_param("none"), Param)
assert dag.params["str"] == "str"

def test_params_serialization_from_dict_upgrade(self):
"""In <=2.9.2 params were serialized as a JSON object instead of a list of key-value pairs.
This test asserts that the params are still deserialized properly."""
serialized = {
"__version": 1,
"dag": {
"_dag_id": "simple_dag",
"fileloc": "/path/to/file.py",
"tasks": [],
"timezone": "UTC",
"params": {"my_param": {"__class": "airflow.models.param.Param", "default": "str"}},
},
}
dag = SerializedDAG.from_dict(serialized)

param = dag.params.get_param("my_param")
assert isinstance(param, Param)
assert param.value == "str"

def test_params_serialize_default_2_2_0(self):
"""In 2.0.0, param ``default`` was assumed to be json-serializable objects and were not run though
the standard serializer function. In 2.2.2 we serialize param ``default``. We keep this
Expand All @@ -2093,7 +2112,7 @@ def test_params_serialize_default_2_2_0(self):
"fileloc": "/path/to/file.py",
"tasks": [],
"timezone": "UTC",
"params": {"str": {"__class": "airflow.models.param.Param", "default": "str"}},
"params": [["str", {"__class": "airflow.models.param.Param", "default": "str"}]],
},
}
SerializedDAG.validate_schema(serialized)
Expand All @@ -2110,14 +2129,17 @@ def test_params_serialize_default(self):
"fileloc": "/path/to/file.py",
"tasks": [],
"timezone": "UTC",
"params": {
"my_param": {
"default": "a string value",
"description": "hello",
"schema": {"__var": {"type": "string"}, "__type": "dict"},
"__class": "airflow.models.param.Param",
}
},
"params": [
[
"my_param",
{
"default": "a string value",
"description": "hello",
"schema": {"__var": {"type": "string"}, "__type": "dict"},
"__class": "airflow.models.param.Param",
},
]
],
},
}
SerializedDAG.validate_schema(serialized)
Expand Down