Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

preload airflow imports before dag parsing to save time #30495

Merged
merged 18 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.email import get_email_address_list, send_email
from airflow.utils.file import get_airflow_modules_in
from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, set_context
from airflow.utils.mixins import MultiprocessingStartMethodMixin
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down Expand Up @@ -187,6 +188,12 @@ def _handle_dag_file_processing():

def start(self) -> None:
"""Launch the process and start processing the DAG."""
import importlib
vandonr-amz marked this conversation as resolved.
Show resolved Hide resolved

modules = get_airflow_modules_in(self.file_path)
for module in modules:
importlib.import_module(module)
vandonr-amz marked this conversation as resolved.
Show resolved Hide resolved

context = self._get_multiprocessing_context()

_parent_channel, _child_channel = context.Pipe(duplex=False)
Expand Down
11 changes: 11 additions & 0 deletions airflow/utils/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,14 @@ def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.Zi
content = dag_file.read()
content = content.lower()
return all(s in content for s in (b"dag", b"airflow"))


def get_airflow_modules_in(file_path: str) -> Generator[str]:
"""Returns a list of the airflow modules that are imported in the given file"""
with open(file_path, "rb") as dag_file:
content = dag_file.read()
lines = content.splitlines()
for line in lines:
if line.startswith(b"from airflow.") or line.startswith(b"import airflow."):
module_name = line.split(b" ")[1]
yield module_name.decode()
vandonr-amz marked this conversation as resolved.
Show resolved Hide resolved