Skip to content

Commit

Permalink
Fix side-effect of default options in Beam Operators (#37916)
Browse files Browse the repository at this point in the history
Some of the operators in Apache Beam had side effect that they
modified detault options passed to it in constructor, and in case
of xdist tests it had side effect that they could impact other
test results (this happened recently in main). The default options
are already set by the Dataflow mixin in execute method in all
Beam operators, but in Python and Go operator the defaults
are also set in the constructor.

Setting the defaults in mixin uses deepcopy to avoid such side effects.

This might  be intended, so this PR rather than removing default
settings in the constructor, also adds deepcopy in them and fixes
resulting tests - removing the defaults in tests that do not
have the defaults set in the constructor.
  • Loading branch information
potiuk committed Mar 5, 2024
1 parent 17cc4c9 commit c29d728
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
3 changes: 2 additions & 1 deletion airflow/providers/apache/beam/operators/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ def __init__(
self.py_interpreter = py_interpreter
self.py_requirements = py_requirements
self.py_system_site_packages = py_system_site_packages
self.pipeline_options = copy.deepcopy(self.pipeline_options)
self.pipeline_options.setdefault("labels", {}).update(
{"airflow-version": "v" + version.replace(".", "-").replace("+", "-")}
)
Expand Down Expand Up @@ -740,7 +741,7 @@ def __init__(
self.go_file = go_file
self.launcher_binary = launcher_binary
self.worker_binary = worker_binary or launcher_binary

self.pipeline_options = copy.deepcopy(self.pipeline_options)
self.pipeline_options.setdefault("labels", {}).update(
{"airflow-version": "v" + version.replace(".", "-").replace("+", "-")}
)
Expand Down
5 changes: 3 additions & 2 deletions tests/providers/apache/beam/operators/test_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"output": "gs://test/output",
"labels": {"foo": "bar", "airflow-version": TEST_VERSION},
}

TEST_IMPERSONATION_ACCOUNT = "test@impersonation.com"
BEAM_OPERATOR_PATH = "airflow.providers.apache.beam.operators.beam.{}"

Expand Down Expand Up @@ -286,7 +287,7 @@ def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock, beam_hook_mock
"jobName": job_name,
"stagingLocation": "gs://test/staging",
"region": "us-central1",
"labels": {"foo": "bar", "airflow-version": TEST_VERSION},
"labels": {"foo": "bar"},
"output": "gs://test/output",
"impersonateServiceAccount": TEST_IMPERSONATION_ACCOUNT,
}
Expand Down Expand Up @@ -926,7 +927,7 @@ def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock, beam_hook_mock
"job_name": job_name,
"staging_location": "gs://test/staging",
"output": "gs://test/output",
"labels": {"foo": "bar", "airflow-version": TEST_VERSION},
"labels": {"foo": "bar"},
"region": "us-central1",
"impersonate_service_account": TEST_IMPERSONATION_ACCOUNT,
}
Expand Down

0 comments on commit c29d728

Please sign in to comment.