Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

preload airflow imports before dag parsing to save time #30495

Merged
merged 18 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.email import get_email_address_list, send_email
from airflow.utils.file import iter_airflow_imports
from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, set_context
from airflow.utils.mixins import MultiprocessingStartMethodMixin
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down Expand Up @@ -187,6 +188,18 @@ def _handle_dag_file_processing():

def start(self) -> None:
"""Launch the process and start processing the DAG."""
import importlib
vandonr-amz marked this conversation as resolved.
Show resolved Hide resolved

for module in iter_airflow_imports(self.file_path):
try:
importlib.import_module(module)
except Exception as e:
# only log as warning because an error here is not preventing anything from working,
# and if it's serious, it's going to be surfaced to the user when the dag is actually parsed.
self.log.warning(
"Error when trying to pre-import module '%s' found in %s: %s", module, self.file_path, e
)

context = self._get_multiprocessing_context()

_parent_channel, _child_channel = context.Pipe(duplex=False)
Expand Down
21 changes: 21 additions & 0 deletions airflow/utils/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import ast
import io
import logging
import os
Expand Down Expand Up @@ -371,3 +372,23 @@ def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.Zi
content = dag_file.read()
content = content.lower()
return all(s in content for s in (b"dag", b"airflow"))


def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]:
for st in module.body:
if isinstance(st, ast.Import):
for n in st.names:
yield n.name
elif isinstance(st, ast.ImportFrom) and st.module is not None:
yield st.module


def iter_airflow_imports(file_path: str) -> Generator[str, None, None]:
"""Find Airflow modules imported in the given file."""
try:
parsed = ast.parse(Path(file_path).read_bytes())
except (OSError, SyntaxError, UnicodeDecodeError):
return
for m in _find_imported_modules(parsed):
if m.startswith("airflow."):
yield m
3 changes: 2 additions & 1 deletion tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ def test_max_runs_when_no_files(self):
parent_pipe.close()

@pytest.mark.backend("mysql", "postgres")
def test_start_new_processes_with_same_filepath(self):
@mock.patch("airflow.dag_processing.processor.iter_airflow_imports")
def test_start_new_processes_with_same_filepath(self, _):
"""
Test that when a processor already exist with a filepath, a new processor won't be created
with that filepath. The filepath will just be removed from the list.
Expand Down
26 changes: 26 additions & 0 deletions tests/dags/test_imports.nopy
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# using .nopy extension so that the file won't get reformatted
# because we want to assert that we can parse crappy formatting.
vandonr-amz marked this conversation as resolved.
Show resolved Hide resolved

from __future__ import annotations

# multiline import
import \
datetime, \
enum,time
"""
import airflow.in_comment
"""
# from import
from airflow.utils import file
# multiline airflow import
import airflow.decorators, airflow.models\
, airflow.sensors

if prod:
import airflow.if_branch
else:
import airflow.else_branch

def f():
# local import
import airflow.local_import
26 changes: 26 additions & 0 deletions tests/utils/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,29 @@ def test_might_contain_dag(self):

# With safe_mode is False, the user defined callable won't be invoked
assert file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=False)

def test_get_modules(self):
file_path = os.path.join(TEST_DAGS_FOLDER, "test_imports.nopy")

modules = list(file_utils.iter_airflow_imports(file_path))

assert len(modules) == 4
assert "airflow.utils" in modules
assert "airflow.decorators" in modules
assert "airflow.models" in modules
assert "airflow.sensors" in modules
# this one is a local import, we don't want it.
assert "airflow.local_import" not in modules
# this one is in a comment, we don't want it
assert "airflow.in_comment" not in modules
# we don't want imports under conditions
assert "airflow.if_branch" not in modules
assert "airflow.else_branch" not in modules

def test_get_modules_from_invalid_file(self):
file_path = os.path.join(TEST_DAGS_FOLDER, "README.md") # just getting a non-python file

# should not error
modules = list(file_utils.iter_airflow_imports(file_path))

assert len(modules) == 0