From 10681f8735ee0572ddedef58707f86d7efe961c4 Mon Sep 17 00:00:00 2001 From: Usiel Riedl Date: Mon, 10 Jun 2024 16:08:56 +0800 Subject: [PATCH 1/4] Ensures DAG params order regardless of backend Fixes https://github.com/apache/airflow/issues/40154 This change adds an extra attribute to the serialized DAG param objects which helps us decide the order of the deserialized params dictionary later even if the backend messes with us. I decided not to limit this just to MySQL since the operation is inexpensive and may turn out to be helpful. I made sure the new test fails with the old implementation + MySQL. I assume this test will be executed with MySQL somewhere in the build actions? --- airflow/serialization/serialized_objects.py | 13 ++++++++++--- tests/models/test_serialized_dag.py | 17 +++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 6e7f50a87c73d0..3f130b1aad4939 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -831,14 +831,17 @@ def is_serialized(val): def _serialize_params_dict(cls, params: ParamsDict | dict): """Serialize Params dict for a DAG or task.""" serialized_params = {} - for k, v in params.items(): + for idx, item in enumerate(params.items()): + k, v = item # TODO: As of now, we would allow serialization of params which are of type Param only. try: class_identity = f"{v.__module__}.{v.__class__.__name__}" except AttributeError: class_identity = "" if class_identity == "airflow.models.param.Param": - serialized_params[k] = cls._serialize_param(v) + serialized_param = cls._serialize_param(v) + serialized_param["__position"] = idx + serialized_params[k] = serialized_param else: raise ValueError( f"Params to a DAG or a Task can be only of type airflow.models.param.Param, " @@ -850,7 +853,11 @@ def _serialize_params_dict(cls, params: ParamsDict | dict): def _deserialize_params_dict(cls, encoded_params: dict) -> ParamsDict: """Deserialize a DAG's Params dict.""" op_params = {} - for k, v in encoded_params.items(): + sorted_params = sorted( + encoded_params.items(), + key=lambda item: item[1].get("__position", 0) if isinstance(item[1], dict) else 0, + ) + for k, v in sorted_params: if isinstance(v, dict) and "__class" in v: op_params[k] = cls._deserialize_param(v) else: diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index c46d4e18a07342..236cb884e6d8b4 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -206,6 +206,23 @@ def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields): expected_dependencies = {dag_id: [] for dag_id in example_dags} assert SDM.get_dag_dependencies() == expected_dependencies + def test_order_of_dag_params_is_stable(self): + """ + https://github.com/apache/airflow/issues/40154 + This asserts that we have logic in place which guarantees the order + of the params is maintained - even if the backend (e.g. MySQL) mutates + the serialized DAG JSON. + """ + example_dags = make_example_dags(example_dags_module) + example_params_trigger_ui = example_dags.get("example_params_trigger_ui") + before = list(example_params_trigger_ui.params.keys()) + + SDM.write_dag(example_params_trigger_ui) + retrieved_dag = SDM.get_dag("example_params_trigger_ui") + after = list(retrieved_dag.params.keys()) + + assert before == after + def test_order_of_deps_is_consistent(self): """ Previously the 'dag_dependencies' node in serialized dag was converted to list from set. From 7f3cba15b59c2f7f4e7fd9183383869ddfba9624 Mon Sep 17 00:00:00 2001 From: Usiel Riedl Date: Mon, 10 Jun 2024 19:15:23 -0700 Subject: [PATCH 2/4] Removes GitHub reference Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- tests/models/test_serialized_dag.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index 236cb884e6d8b4..848d26119506ee 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -208,7 +208,6 @@ def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields): def test_order_of_dag_params_is_stable(self): """ - https://github.com/apache/airflow/issues/40154 This asserts that we have logic in place which guarantees the order of the params is maintained - even if the backend (e.g. MySQL) mutates the serialized DAG JSON. From 14031dd2102dc4e9d66867de9e1cd8fb9e6caeb8 Mon Sep 17 00:00:00 2001 From: Usiel Riedl Date: Tue, 11 Jun 2024 12:31:56 +0800 Subject: [PATCH 3/4] Serialize DAG params as array of tuples to ensure ordering Alternative to previous approach: We serialize the DAG params dict as a list of tuples which _should_ keep their ordering regardless of backend. Backwards compatibility is ensured because if `encoded_params` is a `dict` (not the expected `list`) then `dict(encoded_params)` still works. --- airflow/serialization/schema.json | 14 ++++--- airflow/serialization/serialized_objects.py | 21 ++++------ tests/serialization/test_dag_serialization.py | 42 ++++++++++++++----- 3 files changed, 48 insertions(+), 29 deletions(-) diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index 85631e09650784..76ae3e36ba111f 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -136,7 +136,7 @@ "dag": { "type": "object", "properties": { - "params": { "$ref": "#/definitions/params_dict" }, + "params": { "$ref": "#/definitions/params" }, "_dag_id": { "type": "string" }, "tasks": { "$ref": "#/definitions/tasks" }, "timezone": { "$ref": "#/definitions/timezone" }, @@ -206,9 +206,13 @@ "type": "array", "additionalProperties": { "$ref": "#/definitions/operator" } }, - "params_dict": { - "type": "object", - "additionalProperties": {"$ref": "#/definitions/param" } + "params": { + "type": "array", + "prefixItems": [ + { "type": "string" }, + { "$ref": "#/definitions/param" } + ], + "unevaluatedItems": false }, "param": { "$comment": "A param for a dag / operator", @@ -258,7 +262,7 @@ "retry_delay": { "$ref": "#/definitions/timedelta" }, "retry_exponential_backoff": { "type": "boolean" }, "max_retry_delay": { "$ref": "#/definitions/timedelta" }, - "params": { "$ref": "#/definitions/params_dict" }, + "params": { "$ref": "#/definitions/params" }, "priority_weight": { "type": "number" }, "weight_rule": { "type": "string" }, "executor": { "type": "string" }, diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 3f130b1aad4939..2238ac3e27530b 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -828,20 +828,17 @@ def is_serialized(val): return class_(**kwargs) @classmethod - def _serialize_params_dict(cls, params: ParamsDict | dict): - """Serialize Params dict for a DAG or task.""" - serialized_params = {} - for idx, item in enumerate(params.items()): - k, v = item + def _serialize_params_dict(cls, params: ParamsDict | dict) -> list[tuple[str, dict]]: + """Serialize Params dict for a DAG or task as a list of tuples to ensure ordering.""" + serialized_params = [] + for k, v in params.items(): # TODO: As of now, we would allow serialization of params which are of type Param only. try: class_identity = f"{v.__module__}.{v.__class__.__name__}" except AttributeError: class_identity = "" if class_identity == "airflow.models.param.Param": - serialized_param = cls._serialize_param(v) - serialized_param["__position"] = idx - serialized_params[k] = serialized_param + serialized_params.append((k, cls._serialize_param(v))) else: raise ValueError( f"Params to a DAG or a Task can be only of type airflow.models.param.Param, " @@ -850,14 +847,10 @@ def _serialize_params_dict(cls, params: ParamsDict | dict): return serialized_params @classmethod - def _deserialize_params_dict(cls, encoded_params: dict) -> ParamsDict: + def _deserialize_params_dict(cls, encoded_params: list) -> ParamsDict: """Deserialize a DAG's Params dict.""" op_params = {} - sorted_params = sorted( - encoded_params.items(), - key=lambda item: item[1].get("__position", 0) if isinstance(item[1], dict) else 0, - ) - for k, v in sorted_params: + for k, v in dict(encoded_params).items(): if isinstance(v, dict) and "__class" in v: op_params[k] = cls._deserialize_param(v) else: diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 093b7fba7615e7..a1e5997cd570db 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -245,7 +245,7 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i }, "edge_info": {}, "dag_dependencies": [], - "params": {}, + "params": [], }, } @@ -2082,6 +2082,25 @@ def test_params_upgrade(self): assert isinstance(dag.params.get_param("none"), Param) assert dag.params["str"] == "str" + def test_params_serialization_from_dict_upgrade(self): + """In <=2.9.2 params were serialized as a JSON object instead of a list of key-value pairs. + This test asserts that the params are still deserialized properly.""" + serialized = { + "__version": 1, + "dag": { + "_dag_id": "simple_dag", + "fileloc": "/path/to/file.py", + "tasks": [], + "timezone": "UTC", + "params": {"my_param": {"__class": "airflow.models.param.Param", "default": "str"}}, + }, + } + dag = SerializedDAG.from_dict(serialized) + + param = dag.params.get_param("my_param") + assert isinstance(param, Param) + assert param.value == "str" + def test_params_serialize_default_2_2_0(self): """In 2.0.0, param ``default`` was assumed to be json-serializable objects and were not run though the standard serializer function. In 2.2.2 we serialize param ``default``. We keep this @@ -2093,7 +2112,7 @@ def test_params_serialize_default_2_2_0(self): "fileloc": "/path/to/file.py", "tasks": [], "timezone": "UTC", - "params": {"str": {"__class": "airflow.models.param.Param", "default": "str"}}, + "params": [["str", {"__class": "airflow.models.param.Param", "default": "str"}]], }, } SerializedDAG.validate_schema(serialized) @@ -2110,14 +2129,17 @@ def test_params_serialize_default(self): "fileloc": "/path/to/file.py", "tasks": [], "timezone": "UTC", - "params": { - "my_param": { - "default": "a string value", - "description": "hello", - "schema": {"__var": {"type": "string"}, "__type": "dict"}, - "__class": "airflow.models.param.Param", - } - }, + "params": [ + [ + "my_param", + { + "default": "a string value", + "description": "hello", + "schema": {"__var": {"type": "string"}, "__type": "dict"}, + "__class": "airflow.models.param.Param", + }, + ] + ], }, } SerializedDAG.validate_schema(serialized) From a1c7e31b880517a16479a3aee68ec6d05e308402 Mon Sep 17 00:00:00 2001 From: Usiel Riedl Date: Tue, 11 Jun 2024 13:37:48 +0800 Subject: [PATCH 4/4] Make backwards compatibility more explicit Based on suggestions by @uranusjr with an additional fix to make mypy happy. --- airflow/serialization/serialized_objects.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 2238ac3e27530b..d218b037320592 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -847,10 +847,16 @@ def _serialize_params_dict(cls, params: ParamsDict | dict) -> list[tuple[str, di return serialized_params @classmethod - def _deserialize_params_dict(cls, encoded_params: list) -> ParamsDict: + def _deserialize_params_dict(cls, encoded_params: list[tuple[str, dict]]) -> ParamsDict: """Deserialize a DAG's Params dict.""" + if isinstance(encoded_params, collections.abc.Mapping): + # in 2.9.2 or earlier params were serialized as JSON objects + encoded_param_pairs: Iterable[tuple[str, dict]] = encoded_params.items() + else: + encoded_param_pairs = encoded_params + op_params = {} - for k, v in dict(encoded_params).items(): + for k, v in encoded_param_pairs: if isinstance(v, dict) and "__class" in v: op_params[k] = cls._deserialize_param(v) else: