From 92777adbb367d549c654d6ae9856d0f19d671a81 Mon Sep 17 00:00:00 2001 From: Usiel Riedl Date: Thu, 13 Jun 2024 11:11:16 +0800 Subject: [PATCH] Ensures DAG params order regardless of backend (#40156) * Ensures DAG params order regardless of backend Fixes https://github.com/apache/airflow/issues/40154 This change adds an extra attribute to the serialized DAG param objects which helps us decide the order of the deserialized params dictionary later even if the backend messes with us. I decided not to limit this just to MySQL since the operation is inexpensive and may turn out to be helpful. I made sure the new test fails with the old implementation + MySQL. I assume this test will be executed with MySQL somewhere in the build actions? * Removes GitHub reference Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> * Serialize DAG params as array of tuples to ensure ordering Alternative to previous approach: We serialize the DAG params dict as a list of tuples which _should_ keep their ordering regardless of backend. Backwards compatibility is ensured because if `encoded_params` is a `dict` (not the expected `list`) then `dict(encoded_params)` still works. * Make backwards compatibility more explicit Based on suggestions by @uranusjr with an additional fix to make mypy happy. --------- Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> (cherry picked from commit 2149b4dbee8fb524bb235280aaef158afaec8d4a) --- airflow/serialization/schema.json | 14 ++++--- airflow/serialization/serialized_objects.py | 18 +++++--- tests/models/test_serialized_dag.py | 16 +++++++ tests/serialization/test_dag_serialization.py | 42 ++++++++++++++----- 4 files changed, 69 insertions(+), 21 deletions(-) diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index a2a67327636417..3df4b533d87d5e 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -136,7 +136,7 @@ "dag": { "type": "object", "properties": { - "params": { "$ref": "#/definitions/params_dict" }, + "params": { "$ref": "#/definitions/params" }, "_dag_id": { "type": "string" }, "tasks": { "$ref": "#/definitions/tasks" }, "timezone": { "$ref": "#/definitions/timezone" }, @@ -206,9 +206,13 @@ "type": "array", "additionalProperties": { "$ref": "#/definitions/operator" } }, - "params_dict": { - "type": "object", - "additionalProperties": {"$ref": "#/definitions/param" } + "params": { + "type": "array", + "prefixItems": [ + { "type": "string" }, + { "$ref": "#/definitions/param" } + ], + "unevaluatedItems": false }, "param": { "$comment": "A param for a dag / operator", @@ -258,7 +262,7 @@ "retry_delay": { "$ref": "#/definitions/timedelta" }, "retry_exponential_backoff": { "type": "boolean" }, "max_retry_delay": { "$ref": "#/definitions/timedelta" }, - "params": { "$ref": "#/definitions/params_dict" }, + "params": { "$ref": "#/definitions/params" }, "priority_weight": { "type": "number" }, "weight_rule": { "type": "string" }, "executor_config": { "$ref": "#/definitions/dict" }, diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 16a5c9e481d1d6..fdc05234886125 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -777,9 +777,9 @@ def is_serialized(val): return class_(**kwargs) @classmethod - def _serialize_params_dict(cls, params: ParamsDict | dict): - """Serialize Params dict for a DAG or task.""" - serialized_params = {} + def _serialize_params_dict(cls, params: ParamsDict | dict) -> list[tuple[str, dict]]: + """Serialize Params dict for a DAG or task as a list of tuples to ensure ordering.""" + serialized_params = [] for k, v in params.items(): # TODO: As of now, we would allow serialization of params which are of type Param only. try: @@ -787,7 +787,7 @@ def _serialize_params_dict(cls, params: ParamsDict | dict): except AttributeError: class_identity = "" if class_identity == "airflow.models.param.Param": - serialized_params[k] = cls._serialize_param(v) + serialized_params.append((k, cls._serialize_param(v))) else: raise ValueError( f"Params to a DAG or a Task can be only of type airflow.models.param.Param, " @@ -796,10 +796,16 @@ def _serialize_params_dict(cls, params: ParamsDict | dict): return serialized_params @classmethod - def _deserialize_params_dict(cls, encoded_params: dict) -> ParamsDict: + def _deserialize_params_dict(cls, encoded_params: list[tuple[str, dict]]) -> ParamsDict: """Deserialize a DAG's Params dict.""" + if isinstance(encoded_params, collections.abc.Mapping): + # in 2.9.2 or earlier params were serialized as JSON objects + encoded_param_pairs: Iterable[tuple[str, dict]] = encoded_params.items() + else: + encoded_param_pairs = encoded_params + op_params = {} - for k, v in encoded_params.items(): + for k, v in encoded_param_pairs: if isinstance(v, dict) and "__class" in v: op_params[k] = cls._deserialize_param(v) else: diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index 627943c6fd4bdf..cb62976e55de0b 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -206,6 +206,22 @@ def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields): expected_dependencies = {dag_id: [] for dag_id in example_dags} assert SDM.get_dag_dependencies() == expected_dependencies + def test_order_of_dag_params_is_stable(self): + """ + This asserts that we have logic in place which guarantees the order + of the params is maintained - even if the backend (e.g. MySQL) mutates + the serialized DAG JSON. + """ + example_dags = make_example_dags(example_dags_module) + example_params_trigger_ui = example_dags.get("example_params_trigger_ui") + before = list(example_params_trigger_ui.params.keys()) + + SDM.write_dag(example_params_trigger_ui) + retrieved_dag = SDM.get_dag("example_params_trigger_ui") + after = list(retrieved_dag.params.keys()) + + assert before == after + def test_order_of_deps_is_consistent(self): """ Previously the 'dag_dependencies' node in serialized dag was converted to list from set. diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 5170722363a282..30ebbb957bb0c0 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -232,7 +232,7 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i }, "edge_info": {}, "dag_dependencies": [], - "params": {}, + "params": [], }, } @@ -2034,6 +2034,25 @@ def test_params_upgrade(self): assert isinstance(dag.params.get_param("none"), Param) assert dag.params["str"] == "str" + def test_params_serialization_from_dict_upgrade(self): + """In <=2.9.2 params were serialized as a JSON object instead of a list of key-value pairs. + This test asserts that the params are still deserialized properly.""" + serialized = { + "__version": 1, + "dag": { + "_dag_id": "simple_dag", + "fileloc": "/path/to/file.py", + "tasks": [], + "timezone": "UTC", + "params": {"my_param": {"__class": "airflow.models.param.Param", "default": "str"}}, + }, + } + dag = SerializedDAG.from_dict(serialized) + + param = dag.params.get_param("my_param") + assert isinstance(param, Param) + assert param.value == "str" + def test_params_serialize_default_2_2_0(self): """In 2.0.0, param ``default`` was assumed to be json-serializable objects and were not run though the standard serializer function. In 2.2.2 we serialize param ``default``. We keep this @@ -2045,7 +2064,7 @@ def test_params_serialize_default_2_2_0(self): "fileloc": "/path/to/file.py", "tasks": [], "timezone": "UTC", - "params": {"str": {"__class": "airflow.models.param.Param", "default": "str"}}, + "params": [["str", {"__class": "airflow.models.param.Param", "default": "str"}]], }, } SerializedDAG.validate_schema(serialized) @@ -2062,14 +2081,17 @@ def test_params_serialize_default(self): "fileloc": "/path/to/file.py", "tasks": [], "timezone": "UTC", - "params": { - "my_param": { - "default": "a string value", - "description": "hello", - "schema": {"__var": {"type": "string"}, "__type": "dict"}, - "__class": "airflow.models.param.Param", - } - }, + "params": [ + [ + "my_param", + { + "default": "a string value", + "description": "hello", + "schema": {"__var": {"type": "string"}, "__type": "dict"}, + "__class": "airflow.models.param.Param", + }, + ] + ], }, } SerializedDAG.validate_schema(serialized)