From c29d7280bbad8ffbe28f0d268fc887e858911b66 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Tue, 5 Mar 2024 22:55:31 +0100 Subject: [PATCH] Fix side-effect of default options in Beam Operators (#37916) Some of the operators in Apache Beam had side effect that they modified detault options passed to it in constructor, and in case of xdist tests it had side effect that they could impact other test results (this happened recently in main). The default options are already set by the Dataflow mixin in execute method in all Beam operators, but in Python and Go operator the defaults are also set in the constructor. Setting the defaults in mixin uses deepcopy to avoid such side effects. This might be intended, so this PR rather than removing default settings in the constructor, also adds deepcopy in them and fixes resulting tests - removing the defaults in tests that do not have the defaults set in the constructor. --- airflow/providers/apache/beam/operators/beam.py | 3 ++- tests/providers/apache/beam/operators/test_beam.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/airflow/providers/apache/beam/operators/beam.py b/airflow/providers/apache/beam/operators/beam.py index bf75f3caa74fb..9dc4c36b3efc8 100644 --- a/airflow/providers/apache/beam/operators/beam.py +++ b/airflow/providers/apache/beam/operators/beam.py @@ -333,6 +333,7 @@ def __init__( self.py_interpreter = py_interpreter self.py_requirements = py_requirements self.py_system_site_packages = py_system_site_packages + self.pipeline_options = copy.deepcopy(self.pipeline_options) self.pipeline_options.setdefault("labels", {}).update( {"airflow-version": "v" + version.replace(".", "-").replace("+", "-")} ) @@ -740,7 +741,7 @@ def __init__( self.go_file = go_file self.launcher_binary = launcher_binary self.worker_binary = worker_binary or launcher_binary - + self.pipeline_options = copy.deepcopy(self.pipeline_options) self.pipeline_options.setdefault("labels", {}).update( {"airflow-version": "v" + version.replace(".", "-").replace("+", "-")} ) diff --git a/tests/providers/apache/beam/operators/test_beam.py b/tests/providers/apache/beam/operators/test_beam.py index 538c2417a0e38..ca4e7558e06f6 100644 --- a/tests/providers/apache/beam/operators/test_beam.py +++ b/tests/providers/apache/beam/operators/test_beam.py @@ -60,6 +60,7 @@ "output": "gs://test/output", "labels": {"foo": "bar", "airflow-version": TEST_VERSION}, } + TEST_IMPERSONATION_ACCOUNT = "test@impersonation.com" BEAM_OPERATOR_PATH = "airflow.providers.apache.beam.operators.beam.{}" @@ -286,7 +287,7 @@ def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock, beam_hook_mock "jobName": job_name, "stagingLocation": "gs://test/staging", "region": "us-central1", - "labels": {"foo": "bar", "airflow-version": TEST_VERSION}, + "labels": {"foo": "bar"}, "output": "gs://test/output", "impersonateServiceAccount": TEST_IMPERSONATION_ACCOUNT, } @@ -926,7 +927,7 @@ def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock, beam_hook_mock "job_name": job_name, "staging_location": "gs://test/staging", "output": "gs://test/output", - "labels": {"foo": "bar", "airflow-version": TEST_VERSION}, + "labels": {"foo": "bar"}, "region": "us-central1", "impersonate_service_account": TEST_IMPERSONATION_ACCOUNT, }