From 52e96b5edfc62a66aa999f1e411d2623f386b85b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Thu, 16 Mar 2023 15:34:11 -0700 Subject: [PATCH 01/15] preload airflow imports before dag parsing to save time --- airflow/dag_processing/processor.py | 7 +++++++ airflow/utils/file.py | 11 +++++++++++ 2 files changed, 18 insertions(+) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 2ed17b65704ed..308df7c0b2936 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -50,6 +50,7 @@ from airflow.stats import Stats from airflow.utils import timezone from airflow.utils.email import get_email_address_list, send_email +from airflow.utils.file import get_airflow_modules_in from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, set_context from airflow.utils.mixins import MultiprocessingStartMethodMixin from airflow.utils.session import NEW_SESSION, provide_session @@ -187,6 +188,12 @@ def _handle_dag_file_processing(): def start(self) -> None: """Launch the process and start processing the DAG.""" + import importlib + + modules = get_airflow_modules_in(self.file_path) + for module in modules: + importlib.import_module(module) + context = self._get_multiprocessing_context() _parent_channel, _child_channel = context.Pipe(duplex=False) diff --git a/airflow/utils/file.py b/airflow/utils/file.py index b3b1e8f3d187f..c02ebe46e4275 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -371,3 +371,14 @@ def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.Zi content = dag_file.read() content = content.lower() return all(s in content for s in (b"dag", b"airflow")) + + +def get_airflow_modules_in(file_path: str) -> Generator[str]: + """Returns a list of the airflow modules that are imported in the given file""" + with open(file_path, "rb") as dag_file: + content = dag_file.read() + lines = content.splitlines() + for line in lines: + if line.startswith(b"from airflow.") or line.startswith(b"import airflow."): + module_name = line.split(b" ")[1] + yield module_name.decode() From d5bbba656c8c3312bec50667ea5f5e216cf8a0e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Mon, 10 Apr 2023 11:06:00 -0700 Subject: [PATCH 02/15] handle exceptions, use ast parsing, add test --- airflow/dag_processing/processor.py | 9 ++++++++- airflow/utils/file.py | 18 ++++++++++++------ tests/dags/test_imports.nopy | 19 +++++++++++++++++++ tests/utils/test_file.py | 13 +++++++++++++ 4 files changed, 52 insertions(+), 7 deletions(-) create mode 100644 tests/dags/test_imports.nopy diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 308df7c0b2936..a8f1ff3d697b8 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -192,7 +192,14 @@ def start(self) -> None: modules = get_airflow_modules_in(self.file_path) for module in modules: - importlib.import_module(module) + try: + importlib.import_module(module) + except Exception as e: + # only log as warning because an error here is not preventing anything from working, + # and if it's serious, it's going to be surfaced to the user when the dag is actually parsed. + self.log.warning( + f"error when try to pre-import module '{module}' found in {self.file_path}: {e}" + ) context = self._get_multiprocessing_context() diff --git a/airflow/utils/file.py b/airflow/utils/file.py index c02ebe46e4275..739e02084a8bb 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import ast import io import logging import os @@ -373,12 +374,17 @@ def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.Zi return all(s in content for s in (b"dag", b"airflow")) -def get_airflow_modules_in(file_path: str) -> Generator[str]: +def get_airflow_modules_in(file_path: str) -> [str]: """Returns a list of the airflow modules that are imported in the given file""" with open(file_path, "rb") as dag_file: content = dag_file.read() - lines = content.splitlines() - for line in lines: - if line.startswith(b"from airflow.") or line.startswith(b"import airflow."): - module_name = line.split(b" ")[1] - yield module_name.decode() + parsed = ast.parse(content) + + modules = [] + for st in parsed.body: + if isinstance(st, ast.Import): + modules.extend([n.name for n in st.names]) + elif isinstance(st, ast.ImportFrom): + modules.append(st.module) + + return [m for m in modules if m.startswith("airflow.")] diff --git a/tests/dags/test_imports.nopy b/tests/dags/test_imports.nopy new file mode 100644 index 0000000000000..c83fdb1fe750b --- /dev/null +++ b/tests/dags/test_imports.nopy @@ -0,0 +1,19 @@ +# using .nopy extension so that the file won't get reformatted +# because we want to assert that we can parse crappy formatting. + +from __future__ import annotations + +# multiline import +import \ + datetime, \ +enum,time + +# from import +from airflow.utils import file +# multiline airflow import +import airflow.decorators, airflow.models\ +, airflow.sensors + +def f(): + # local import + import airflow.operators diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py index e5e51f7b207d9..35877ef6c7b1d 100644 --- a/tests/utils/test_file.py +++ b/tests/utils/test_file.py @@ -188,3 +188,16 @@ def test_might_contain_dag(self): # With safe_mode is False, the user defined callable won't be invoked assert file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=False) + + def test_get_modules(self): + file_path = os.path.join(TEST_DAGS_FOLDER, "test_imports.nopy") + + modules = file_utils.get_airflow_modules_in(file_path) + + assert len(modules) == 4 + assert "airflow.utils" in modules + assert "airflow.decorators" in modules + assert "airflow.models" in modules + assert "airflow.sensors" in modules + # this one is a local import, we don't want it. + assert "airflow.operators" not in modules From e05a6545420bff5c60e38131e5eb3f4f2bcd292f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Mon, 10 Apr 2023 11:21:11 -0700 Subject: [PATCH 03/15] add import in multiline comment to test --- tests/dags/test_imports.nopy | 6 ++++-- tests/utils/test_file.py | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/dags/test_imports.nopy b/tests/dags/test_imports.nopy index c83fdb1fe750b..ffa7bd7bd4d3a 100644 --- a/tests/dags/test_imports.nopy +++ b/tests/dags/test_imports.nopy @@ -7,7 +7,9 @@ from __future__ import annotations import \ datetime, \ enum,time - +""" +import airflow.in_comment +""" # from import from airflow.utils import file # multiline airflow import @@ -16,4 +18,4 @@ import airflow.decorators, airflow.models\ def f(): # local import - import airflow.operators + import airflow.local_import diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py index 35877ef6c7b1d..a3b12435808e0 100644 --- a/tests/utils/test_file.py +++ b/tests/utils/test_file.py @@ -200,4 +200,6 @@ def test_get_modules(self): assert "airflow.models" in modules assert "airflow.sensors" in modules # this one is a local import, we don't want it. - assert "airflow.operators" not in modules + assert "airflow.local_import" not in modules + # this one is in a comment, we don't want it + assert "airflow.in_comment" not in modules From ebde5e150f10b81510fc27b5e0a9adde30f3840e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Mon, 10 Apr 2023 11:41:13 -0700 Subject: [PATCH 04/15] add conditional import to test, add error handling --- airflow/utils/file.py | 5 ++++- tests/dags/test_imports.nopy | 5 +++++ tests/utils/test_file.py | 11 +++++++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/airflow/utils/file.py b/airflow/utils/file.py index 739e02084a8bb..088169cee3a97 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -378,7 +378,10 @@ def get_airflow_modules_in(file_path: str) -> [str]: """Returns a list of the airflow modules that are imported in the given file""" with open(file_path, "rb") as dag_file: content = dag_file.read() - parsed = ast.parse(content) + try: + parsed = ast.parse(content) + except SyntaxError: + return [] modules = [] for st in parsed.body: diff --git a/tests/dags/test_imports.nopy b/tests/dags/test_imports.nopy index ffa7bd7bd4d3a..31cbe177ff541 100644 --- a/tests/dags/test_imports.nopy +++ b/tests/dags/test_imports.nopy @@ -16,6 +16,11 @@ from airflow.utils import file import airflow.decorators, airflow.models\ , airflow.sensors +if prod: + import airflow.if_branch +else: + import airflow.else_branch + def f(): # local import import airflow.local_import diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py index a3b12435808e0..e9145b70718c5 100644 --- a/tests/utils/test_file.py +++ b/tests/utils/test_file.py @@ -203,3 +203,14 @@ def test_get_modules(self): assert "airflow.local_import" not in modules # this one is in a comment, we don't want it assert "airflow.in_comment" not in modules + # we don't want imports under conditions + assert "airflow.if_branch" not in modules + assert "airflow.else_branch" not in modules + + def test_get_modules_from_invalid_file(self): + file_path = os.path.join(TEST_DAGS_FOLDER, "README.md") # just getting a non-python file + + # should not error + modules = file_utils.get_airflow_modules_in(file_path) + + assert len(modules) == 0 From 2064f469cf43cf4a002553784f2f59371ce4eed3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Mon, 10 Apr 2023 14:17:39 -0700 Subject: [PATCH 05/15] fix test & static check --- airflow/utils/file.py | 4 ++-- tests/dag_processing/test_manager.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airflow/utils/file.py b/airflow/utils/file.py index 088169cee3a97..17b68cad7b683 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -374,7 +374,7 @@ def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.Zi return all(s in content for s in (b"dag", b"airflow")) -def get_airflow_modules_in(file_path: str) -> [str]: +def get_airflow_modules_in(file_path: str) -> list[str]: """Returns a list of the airflow modules that are imported in the given file""" with open(file_path, "rb") as dag_file: content = dag_file.read() @@ -387,7 +387,7 @@ def get_airflow_modules_in(file_path: str) -> [str]: for st in parsed.body: if isinstance(st, ast.Import): modules.extend([n.name for n in st.names]) - elif isinstance(st, ast.ImportFrom): + elif isinstance(st, ast.ImportFrom) and st.module: modules.append(st.module) return [m for m in modules if m.startswith("airflow.")] diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index a7d9328bbacfa..42825a9092270 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -203,7 +203,8 @@ def test_max_runs_when_no_files(self): parent_pipe.close() @pytest.mark.backend("mysql", "postgres") - def test_start_new_processes_with_same_filepath(self): + @mock.patch("airflow.dag_processing.processor.get_airflow_modules_in") + def test_start_new_processes_with_same_filepath(self, _): """ Test that when a processor already exist with a filepath, a new processor won't be created with that filepath. The filepath will just be removed from the list. From 7b572db4ee7317d285228e67adcc650086d194b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= <114772123+vandonr-amz@users.noreply.github.com> Date: Tue, 11 Apr 2023 09:53:24 -0700 Subject: [PATCH 06/15] rewrite a bit the method in utils/file Co-authored-by: Tzu-ping Chung --- airflow/utils/file.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/airflow/utils/file.py b/airflow/utils/file.py index 17b68cad7b683..81089e06d4c26 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -374,20 +374,21 @@ def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.Zi return all(s in content for s in (b"dag", b"airflow")) -def get_airflow_modules_in(file_path: str) -> list[str]: - """Returns a list of the airflow modules that are imported in the given file""" - with open(file_path, "rb") as dag_file: - content = dag_file.read() - try: - parsed = ast.parse(content) - except SyntaxError: - return [] - - modules = [] - for st in parsed.body: +def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]: + for st in module.body: if isinstance(st, ast.Import): - modules.extend([n.name for n in st.names]) - elif isinstance(st, ast.ImportFrom) and st.module: - modules.append(st.module) + for n in st.names: + yield n.name + elif isinstance(st, ast.ImportFrom) and st.module is not None: + yield st.module - return [m for m in modules if m.startswith("airflow.")] + +def iter_airflow_imports(file_path: str) -> Generator[str, None, None]: + """Find Airflow modules imported in the given file.""" + try: + parsed = ast.parse(Path(file_path).read_bytes()) + except (OSError, SyntaxError, UnicodeDecodeError): + return + for m in _find_imported_modules(parsed): + if m.startswith("airflow."): + yield m From a2df518284e9d190507c2ac791b252e70252ed6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Tue, 11 Apr 2023 09:57:14 -0700 Subject: [PATCH 07/15] un-interpolate logged string --- airflow/dag_processing/processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index a8f1ff3d697b8..ca8a72e6fb896 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -198,7 +198,7 @@ def start(self) -> None: # only log as warning because an error here is not preventing anything from working, # and if it's serious, it's going to be surfaced to the user when the dag is actually parsed. self.log.warning( - f"error when try to pre-import module '{module}' found in {self.file_path}: {e}" + "error when try to pre-import module '%s' found in %s: %s", module, self.file_path, e ) context = self._get_multiprocessing_context() From 4303a192f5d700ae3500524fc476cf8613d778a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Tue, 11 Apr 2023 10:30:48 -0700 Subject: [PATCH 08/15] fix following method rename --- airflow/dag_processing/processor.py | 5 ++--- tests/dag_processing/test_manager.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index ca8a72e6fb896..f20913c81cb18 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -50,7 +50,7 @@ from airflow.stats import Stats from airflow.utils import timezone from airflow.utils.email import get_email_address_list, send_email -from airflow.utils.file import get_airflow_modules_in +from airflow.utils.file import iter_airflow_imports from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, set_context from airflow.utils.mixins import MultiprocessingStartMethodMixin from airflow.utils.session import NEW_SESSION, provide_session @@ -190,8 +190,7 @@ def start(self) -> None: """Launch the process and start processing the DAG.""" import importlib - modules = get_airflow_modules_in(self.file_path) - for module in modules: + for module in iter_airflow_imports(self.file_path): try: importlib.import_module(module) except Exception as e: diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 42825a9092270..622e73c1fae37 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -203,7 +203,7 @@ def test_max_runs_when_no_files(self): parent_pipe.close() @pytest.mark.backend("mysql", "postgres") - @mock.patch("airflow.dag_processing.processor.get_airflow_modules_in") + @mock.patch("airflow.dag_processing.processor.iter_airflow_imports") def test_start_new_processes_with_same_filepath(self, _): """ Test that when a processor already exist with a filepath, a new processor won't be created From 1b28ab685aefbe85b647a615a2eb1553213dc4b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Tue, 11 Apr 2023 14:27:49 -0700 Subject: [PATCH 09/15] more fixing --- tests/utils/test_file.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py index e9145b70718c5..3f7acc844bf75 100644 --- a/tests/utils/test_file.py +++ b/tests/utils/test_file.py @@ -192,7 +192,7 @@ def test_might_contain_dag(self): def test_get_modules(self): file_path = os.path.join(TEST_DAGS_FOLDER, "test_imports.nopy") - modules = file_utils.get_airflow_modules_in(file_path) + modules = list(file_utils.iter_airflow_imports(file_path)) assert len(modules) == 4 assert "airflow.utils" in modules @@ -211,6 +211,6 @@ def test_get_modules_from_invalid_file(self): file_path = os.path.join(TEST_DAGS_FOLDER, "README.md") # just getting a non-python file # should not error - modules = file_utils.get_airflow_modules_in(file_path) + modules = list(file_utils.iter_airflow_imports(file_path)) assert len(modules) == 0 From 9bde048b1e710b1219637bca2e97636b81089b66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= <114772123+vandonr-amz@users.noreply.github.com> Date: Tue, 11 Apr 2023 15:07:58 -0700 Subject: [PATCH 10/15] Fix wording in warn on import error Co-authored-by: Ephraim Anierobi --- airflow/dag_processing/processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index f20913c81cb18..e22d42258a4f2 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -197,7 +197,7 @@ def start(self) -> None: # only log as warning because an error here is not preventing anything from working, # and if it's serious, it's going to be surfaced to the user when the dag is actually parsed. self.log.warning( - "error when try to pre-import module '%s' found in %s: %s", module, self.file_path, e + "Error when trying to pre-import module '%s' found in %s: %s", module, self.file_path, e ) context = self._get_multiprocessing_context() From 5d7b8e2fc910349b6b69f02c71908f1320d834b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Thu, 13 Apr 2023 14:24:52 -0700 Subject: [PATCH 11/15] use fmt on/off instead of renaming the extension --- .pre-commit-config.yaml | 2 +- tests/dags/test_imports.nopy | 26 -------------------- tests/dags/test_imports.py | 46 ++++++++++++++++++++++++++++++++++++ tests/utils/test_file.py | 2 +- 4 files changed, 48 insertions(+), 28 deletions(-) delete mode 100644 tests/dags/test_imports.nopy create mode 100644 tests/dags/test_imports.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0e5498a38e262..9a3ce362aab47 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -181,7 +181,7 @@ repos: entry: ruff --fix --no-update-check --force-exclude additional_dependencies: ['ruff==0.0.226'] files: \.pyi?$ - exclude: ^.*/.*_vendor/ + exclude: ^.*/.*_vendor/|tests/dags/test_imports.py - repo: https://github.com/asottile/blacken-docs rev: 1.13.0 hooks: diff --git a/tests/dags/test_imports.nopy b/tests/dags/test_imports.nopy deleted file mode 100644 index 31cbe177ff541..0000000000000 --- a/tests/dags/test_imports.nopy +++ /dev/null @@ -1,26 +0,0 @@ -# using .nopy extension so that the file won't get reformatted -# because we want to assert that we can parse crappy formatting. - -from __future__ import annotations - -# multiline import -import \ - datetime, \ -enum,time -""" -import airflow.in_comment -""" -# from import -from airflow.utils import file -# multiline airflow import -import airflow.decorators, airflow.models\ -, airflow.sensors - -if prod: - import airflow.if_branch -else: - import airflow.else_branch - -def f(): - # local import - import airflow.local_import diff --git a/tests/dags/test_imports.py b/tests/dags/test_imports.py new file mode 100644 index 0000000000000..54ca1a80aacb5 --- /dev/null +++ b/tests/dags/test_imports.py @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# fmt: off +# this file contains sample code than only needs to pass the lexer +# it is "badly" formatted on purpose to test edge cases. + +from __future__ import annotations + +# multiline import +import \ + datetime, \ +enum,time +""" +import airflow.in_comment +""" +# from import +from airflow.utils import file +# multiline airflow import +import airflow.decorators, airflow.models\ +, airflow.sensors + +if prod: + import airflow.if_branch +else: + import airflow.else_branch + +def f(): + # local import + import airflow.local_import + +# fmt: on diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py index 3f7acc844bf75..448ddf31c7a9c 100644 --- a/tests/utils/test_file.py +++ b/tests/utils/test_file.py @@ -190,7 +190,7 @@ def test_might_contain_dag(self): assert file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=False) def test_get_modules(self): - file_path = os.path.join(TEST_DAGS_FOLDER, "test_imports.nopy") + file_path = os.path.join(TEST_DAGS_FOLDER, "test_imports.py") modules = list(file_utils.iter_airflow_imports(file_path)) From 5735bfb6c41225827be1db03441dd7608f42674f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Thu, 13 Apr 2023 14:43:33 -0700 Subject: [PATCH 12/15] move importlib import to the top --- airflow/dag_processing/processor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 272137eafab06..0ff83c3884067 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import importlib import logging import multiprocessing import os @@ -188,8 +189,6 @@ def _handle_dag_file_processing(): def start(self) -> None: """Launch the process and start processing the DAG.""" - import importlib - for module in iter_airflow_imports(self.file_path): try: importlib.import_module(module) From f0c89cd217eb3b96fa054f504a80c62f9d06194c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Thu, 13 Apr 2023 16:40:12 -0700 Subject: [PATCH 13/15] exclude test file from mypy core as well --- .pre-commit-config.yaml | 4 ++-- tests/dags/test_imports.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7b934e70d442b..36e80151eece6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -181,7 +181,7 @@ repos: entry: ruff --fix --no-update-check --force-exclude additional_dependencies: ['ruff==0.0.226'] files: \.pyi?$ - exclude: ^.*/.*_vendor/|tests/dags/test_imports.py + exclude: ^.*/.*_vendor/|^tests/dags/test_imports.py - repo: https://github.com/asottile/blacken-docs rev: 1.13.0 hooks: @@ -905,7 +905,7 @@ repos: language: python entry: ./scripts/ci/pre_commit/pre_commit_mypy.py --namespace-packages files: \.py$ - exclude: ^.*/.*_vendor/|^airflow/migrations|^airflow/providers|^dev|^docs|^provider_packages|^tests/providers|^tests/system/providers + exclude: ^.*/.*_vendor/|^airflow/migrations|^airflow/providers|^dev|^docs|^provider_packages|^tests/providers|^tests/system/providers|^tests/dags/test_imports.py require_serial: true additional_dependencies: ['rich>=12.4.4', 'inputimeout', 'pyyaml'] - id: mypy-providers diff --git a/tests/dags/test_imports.py b/tests/dags/test_imports.py index 54ca1a80aacb5..43be6fc08e4aa 100644 --- a/tests/dags/test_imports.py +++ b/tests/dags/test_imports.py @@ -16,6 +16,7 @@ # under the License. # fmt: off + # this file contains sample code than only needs to pass the lexer # it is "badly" formatted on purpose to test edge cases. From 3d309deaec50fec1cccdc6c4c7a06896f4523d64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Thu, 13 Apr 2023 16:42:06 -0700 Subject: [PATCH 14/15] test fix --- tests/jobs/test_scheduler_job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 4870a2aa132e9..5c78524d67867 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3014,6 +3014,7 @@ def test_list_py_file_paths(self): "test_ignore_this.py", "test_invalid_param.py", "test_nested_dag.py", + "test_imports.py", "__init__.py", } for root, _, files in os.walk(TEST_DAG_FOLDER): From 6a53f309fe49d34504192b38a9cf79b2ee07e958 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Fri, 14 Apr 2023 08:46:07 -0700 Subject: [PATCH 15/15] add config flag to control the bahvior --- airflow/config_templates/config.yml | 11 ++++++++- airflow/config_templates/default_airflow.cfg | 6 +++++ airflow/dag_processing/processor.py | 25 +++++++++++++------- 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 29e02df9b3b6b..9d8e0ffdbb6dc 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2344,6 +2344,16 @@ scheduler: version_added: 2.0.0 type: boolean default: "True" + parsing_pre_import_modules: + description: | + The scheduler reads dag files to extract the airflow modules that are going to be used, + and imports them ahead of time to avoid having to re-do it for each parsing process. + This flag can be set to False to disable this behavior in case an airflow module needs to be freshly + imported each time (at the cost of increased DAG parsing time). + version_added: 2.6.0 + type: boolean + example: ~ + default: "True" parsing_processes: description: | The scheduler can run multiple processes in parallel to parse dags. @@ -2363,7 +2373,6 @@ scheduler: same host. This is useful when running with Scheduler in HA mode where each scheduler can parse different DAG files. * ``alphabetical``: Sort by filename - version_added: 2.1.0 type: string example: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 257536cc038fd..44b1401f47df4 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1190,6 +1190,12 @@ max_dagruns_per_loop_to_schedule = 20 # dags in some circumstances schedule_after_task_execution = True +# The scheduler reads dag files to extract the airflow modules that are going to be used, +# and imports them ahead of time to avoid having to re-do it for each parsing process. +# This flag can be set to False to disable this behavior in case an airflow module needs to be freshly +# imported each time (at the cost of increased DAG parsing time). +parsing_pre_import_modules = True + # The scheduler can run multiple processes in parallel to parse dags. # This defines how many processes will run. parsing_processes = 2 diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 0ff83c3884067..442431afb5026 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -189,15 +189,22 @@ def _handle_dag_file_processing(): def start(self) -> None: """Launch the process and start processing the DAG.""" - for module in iter_airflow_imports(self.file_path): - try: - importlib.import_module(module) - except Exception as e: - # only log as warning because an error here is not preventing anything from working, - # and if it's serious, it's going to be surfaced to the user when the dag is actually parsed. - self.log.warning( - "Error when trying to pre-import module '%s' found in %s: %s", module, self.file_path, e - ) + if conf.getboolean("scheduler", "parsing_pre_import_modules", fallback=True): + # Read the file to pre-import airflow modules used. + # This prevents them from being re-imported from zero in each "processing" process + # and saves CPU time and memory. + for module in iter_airflow_imports(self.file_path): + try: + importlib.import_module(module) + except Exception as e: + # only log as warning because an error here is not preventing anything from working, and + # if it's serious, it's going to be surfaced to the user when the dag is actually parsed. + self.log.warning( + "Error when trying to pre-import module '%s' found in %s: %s", + module, + self.file_path, + e, + ) context = self._get_multiprocessing_context()