-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make Dag Serialization a hard requirement #11335
Changes from all commits
6a6c9b5
622e9a7
7fcd122
690e80b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -51,7 +51,6 @@ | |||||||||||||||||
from airflow.models.variable import Variable | ||||||||||||||||||
from airflow.models.xcom import XCOM_RETURN_KEY, XCom | ||||||||||||||||||
from airflow.sentry import Sentry | ||||||||||||||||||
from airflow.settings import STORE_SERIALIZED_DAGS | ||||||||||||||||||
from airflow.stats import Stats | ||||||||||||||||||
from airflow.ti_deps.dep_context import DepContext | ||||||||||||||||||
from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS | ||||||||||||||||||
|
@@ -1160,9 +1159,8 @@ def signal_handler(signum, frame): # pylint: disable=unused-argument | |||||||||||||||||
start_time = time.time() | ||||||||||||||||||
|
||||||||||||||||||
self.render_templates(context=context) | ||||||||||||||||||
if STORE_SERIALIZED_DAGS: | ||||||||||||||||||
RenderedTaskInstanceFields.write(RenderedTaskInstanceFields(ti=self, render_templates=False)) | ||||||||||||||||||
RenderedTaskInstanceFields.delete_old_records(self.task_id, self.dag_id) | ||||||||||||||||||
RenderedTaskInstanceFields.write(RenderedTaskInstanceFields(ti=self, render_templates=False)) | ||||||||||||||||||
RenderedTaskInstanceFields.delete_old_records(self.task_id, self.dag_id) | ||||||||||||||||||
|
||||||||||||||||||
# Export context to make it available for operators to use. | ||||||||||||||||||
airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True) | ||||||||||||||||||
|
@@ -1576,28 +1574,22 @@ def get( | |||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
def get_rendered_template_fields(self): | ||||||||||||||||||
""" | ||||||||||||||||||
Fetch rendered template fields from DB if Serialization is enabled. | ||||||||||||||||||
Else just render the templates | ||||||||||||||||||
""" | ||||||||||||||||||
"""Fetch rendered template fields from DB""" | ||||||||||||||||||
from airflow.models.renderedtifields import RenderedTaskInstanceFields | ||||||||||||||||||
if STORE_SERIALIZED_DAGS: | ||||||||||||||||||
rendered_task_instance_fields = RenderedTaskInstanceFields.get_templated_fields(self) | ||||||||||||||||||
if rendered_task_instance_fields: | ||||||||||||||||||
for field_name, rendered_value in rendered_task_instance_fields.items(): | ||||||||||||||||||
setattr(self.task, field_name, rendered_value) | ||||||||||||||||||
else: | ||||||||||||||||||
try: | ||||||||||||||||||
self.render_templates() | ||||||||||||||||||
except (TemplateAssertionError, UndefinedError) as e: | ||||||||||||||||||
raise AirflowException( | ||||||||||||||||||
"Webserver does not have access to User-defined Macros or Filters " | ||||||||||||||||||
"when Dag Serialization is enabled. Hence for the task that have not yet " | ||||||||||||||||||
"started running, please use 'airflow tasks render' for debugging the " | ||||||||||||||||||
"rendering of template_fields." | ||||||||||||||||||
) from e | ||||||||||||||||||
rendered_task_instance_fields = RenderedTaskInstanceFields.get_templated_fields(self) | ||||||||||||||||||
if rendered_task_instance_fields: | ||||||||||||||||||
for field_name, rendered_value in rendered_task_instance_fields.items(): | ||||||||||||||||||
setattr(self.task, field_name, rendered_value) | ||||||||||||||||||
else: | ||||||||||||||||||
self.render_templates() | ||||||||||||||||||
try: | ||||||||||||||||||
self.render_templates() | ||||||||||||||||||
except (TemplateAssertionError, UndefinedError) as e: | ||||||||||||||||||
raise AirflowException( | ||||||||||||||||||
"Webserver does not have access to User-defined Macros or Filters " | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @kaxil , if using Airflow 2.0 or using Airflow 1.10.* with We also hit a similar problem if user clears How do you recommend addressing these issues going forward? Should we allow some webserver functions to get access to these There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This error is already handled in the Webserver: Lines 883 to 890 in 10b8ecc
i.e we just ask users to run it via a CLI instead. The The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, @kaxil . Thanks for pointing out that the "Rendered Template" error is handled with an error message. Then that issue is a small annoyance for the user because they cannot see the rendered arguments in the website. However, the I don't mind working on a fix. Just wondering if you have any suggestions how to do it. |
||||||||||||||||||
"when Dag Serialization is enabled. Hence for the task that have not yet " | ||||||||||||||||||
"started running, please use 'airflow tasks render' for debugging the " | ||||||||||||||||||
"rendering of template_fields." | ||||||||||||||||||
) from e | ||||||||||||||||||
|
||||||||||||||||||
def overwrite_params_with_dag_run_conf(self, params, dag_run): | ||||||||||||||||||
"""Overwrite Task Params with DagRun.conf""" | ||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we made AirflowNotFound inherit from connextion.NotFound this code could be a simpler here.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will create a separate issue for it, to address it in a separate PR (not strictly related to this PR), so anyone else can also pick that task too.