Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Dag Serialization a hard requirement #11335

Merged
merged 4 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,14 @@ The Old and New provider configuration keys that have changed are as follows

For more information, visit https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth

### DAG Serialization will be strictly required

Until Airflow 2.0, DAG Serialization was disabled by default. However from Airflow 2.0, DAG Serialization
will be enabled by default. From Airflow 2.0, Scheduler will use Serialized DAGs to make scheduling decisions
and so DAG Serialization can not be turned off.

The previous setting `[core] store_serialized_dags` will be ignored.

### Changes to the KubernetesExecutor

#### The KubernetesExecutor Will No Longer Read from the airflow.cfg for Base Pod Configurations
Expand Down
7 changes: 3 additions & 4 deletions airflow/api/common/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from datetime import datetime
from typing import Optional

from airflow.configuration import conf
from airflow.exceptions import DagNotFound, DagRunNotFound, TaskNotFound
from airflow.models import DagBag, DagModel, DagRun

Expand All @@ -32,10 +31,10 @@ def check_and_get_dag(dag_id: str, task_id: Optional[str] = None) -> DagModel:

dagbag = DagBag(
dag_folder=dag_model.fileloc,
read_dags_from_db=conf.getboolean('core', 'store_serialized_dags')
read_dags_from_db=True
)
dag = dagbag.get_dag(dag_id) # prefetch dag if it is stored serialized
if dag_id not in dagbag.dags:
dag = dagbag.get_dag(dag_id)
if not dag:
error_message = "Dag id {} not found".format(dag_id)
raise DagNotFound(error_message)
if task_id and not dag.has_task(task_id):
Expand Down
3 changes: 1 addition & 2 deletions airflow/api/common/experimental/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from airflow.exceptions import DagNotFound
from airflow.models import DagModel, TaskFail
from airflow.models.serialized_dag import SerializedDagModel
from airflow.settings import STORE_SERIALIZED_DAGS
from airflow.utils.session import provide_session

log = logging.getLogger(__name__)
Expand All @@ -47,7 +46,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i

# Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
# There may be a lag, so explicitly removes serialized DAG here.
if STORE_SERIALIZED_DAGS and SerializedDagModel.has_dag(dag_id=dag_id, session=session):
if SerializedDagModel.has_dag(dag_id=dag_id, session=session):
SerializedDagModel.remove_dag(dag_id=dag_id, session=session)

count = 0
Expand Down
21 changes: 11 additions & 10 deletions airflow/api/common/experimental/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,16 @@ def _create_dagruns(dag, execution_dates, state, run_type):

@provide_session
def set_state(
tasks: Iterable[BaseOperator],
execution_date: datetime.datetime,
upstream: bool = False,
downstream: bool = False,
future: bool = False,
past: bool = False,
state: str = State.SUCCESS,
commit: bool = False,
session=None): # pylint: disable=too-many-arguments,too-many-locals
tasks: Iterable[BaseOperator],
execution_date: datetime.datetime,
upstream: bool = False,
downstream: bool = False,
future: bool = False,
past: bool = False,
state: str = State.SUCCESS,
commit: bool = False,
session=None
): # pylint: disable=too-many-arguments,too-many-locals
"""
Set the state of a task instance and if needed its relatives. Can set state
for future tasks (calculated from execution_date) and retroactively
Expand Down Expand Up @@ -181,7 +182,7 @@ def get_subdag_runs(dag, session, state, task_ids, commit, confirmed_dates):
continue

current_task = current_dag.get_task(task_id)
if isinstance(current_task, SubDagOperator):
if isinstance(current_task, SubDagOperator) or current_task.task_type == "SubDagOperator":
# this works as a kind of integrity check
# it creates missing dag runs for subdag operators,
# maybe this should be moved to dagrun.verify_integrity
Expand Down
8 changes: 1 addition & 7 deletions airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,7 @@ def trigger_dag(
if dag_model is None:
raise DagNotFound("Dag id {} not found in DagModel".format(dag_id))

def read_store_serialized_dags():
from airflow.configuration import conf
return conf.getboolean('core', 'store_serialized_dags')
dagbag = DagBag(
dag_folder=dag_model.fileloc,
read_dags_from_db=read_store_serialized_dags()
)
dagbag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
triggers = _trigger_dag(
dag_id=dag_id,
dag_bag=dagbag,
Expand Down
11 changes: 8 additions & 3 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
TaskInstanceReferenceCollection,
set_task_instance_state_form,
)
from airflow.exceptions import SerializedDagNotFound
from airflow.models.dagrun import DagRun as DR
from airflow.models.taskinstance import clear_task_instances, TaskInstance as TI
from airflow.models import SlaMiss
Expand Down Expand Up @@ -286,9 +287,13 @@ def post_set_task_instances_state(dag_id, session):
except ValidationError as err:
raise BadRequest(detail=str(err.messages))

dag = current_app.dag_bag.get_dag(dag_id)
if not dag:
error_message = "Dag ID {} not found".format(dag_id)
error_message = "Dag ID {} not found".format(dag_id)
try:
dag = current_app.dag_bag.get_dag(dag_id)
if not dag:
raise NotFound(error_message)
except SerializedDagNotFound:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we made AirflowNotFound inherit from connextion.NotFound this code could be a simpler here.

WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create a separate issue for it, to address it in a separate PR (not strictly related to this PR), so anyone else can also pick that task too.

# If DAG is not found in serialized_dag table
raise NotFound(error_message)

task_id = data['task_id']
Expand Down
3 changes: 1 addition & 2 deletions airflow/cli/commands/sync_perm_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.
"""Sync permission command"""
from airflow import settings
from airflow.models import DagBag
from airflow.utils import cli as cli_utils
from airflow.www.app import cached_app
Expand All @@ -29,7 +28,7 @@ def sync_perm(args):
print('Updating permission, view-menu for all existing roles')
appbuilder.sm.sync_roles()
print('Updating permission on all DAG views')
dags = DagBag(store_serialized_dags=settings.STORE_SERIALIZED_DAGS).dags.values()
dags = DagBag(read_dags_from_db=True).dags.values()
for dag in dags:
appbuilder.sm.sync_perm_for_dag(
dag.dag_id,
Expand Down
4 changes: 4 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ class DuplicateTaskIdFound(AirflowException):
"""Raise when a Task with duplicate task_id is defined in the same DAG"""


class SerializedDagNotFound(DagNotFound):
"""Raise when DAG is not found in the serialized_dags table in DB"""


class TaskNotFound(AirflowNotFoundException):
"""Raise when a Task is not available in the system"""

Expand Down
18 changes: 7 additions & 11 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from airflow import settings
from airflow.configuration import conf
from airflow.dag.base_dag import BaseDagBag
from airflow.exceptions import AirflowClusterPolicyViolation, AirflowDagCycleException
from airflow.exceptions import AirflowClusterPolicyViolation, AirflowDagCycleException, SerializedDagNotFound
from airflow.plugins_manager import integrate_dag_plugins
from airflow.stats import Stats
from airflow.utils import timezone
Expand Down Expand Up @@ -75,10 +75,8 @@ class DagBag(BaseDagBag, LoggingMixin):
:param include_smart_sensor: whether to include the smart sensor native
DAGs that create the smart sensor operators for whole cluster
:type include_smart_sensor: bool
:param read_dags_from_db: Read DAGs from DB if store_serialized_dags is ``True``.
If ``False`` DAGs are read from python files. This property is not used when
determining whether or not to write Serialized DAGs, that is done by checking
the config ``store_serialized_dags``.
:param read_dags_from_db: Read DAGs from DB if ``True`` is passed.
If ``False`` DAGs are read from python files.
:type read_dags_from_db: bool
"""

Expand Down Expand Up @@ -214,7 +212,7 @@ def _add_dag_from_db(self, dag_id: str, session: Session):
from airflow.models.serialized_dag import SerializedDagModel
row = SerializedDagModel.get(dag_id, session)
if not row:
raise ValueError(f"DAG '{dag_id}' not found in serialized_dag table")
raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")

dag = row.dag
for subdag in dag.subdags:
Expand Down Expand Up @@ -527,8 +525,6 @@ def sync_to_db(self, session: Optional[Session] = None):
from airflow.models.serialized_dag import SerializedDagModel
self.log.debug("Calling the DAG.bulk_sync_to_db method")
DAG.bulk_write_to_db(self.dags.values(), session=session)
# Write Serialized DAGs to DB if DAG Serialization is turned on
# Even though self.read_dags_from_db is False
if settings.STORE_SERIALIZED_DAGS or self.read_dags_from_db:
self.log.debug("Calling the SerializedDagModel.bulk_sync_to_db method")
SerializedDagModel.bulk_sync_to_db(self.dags.values(), session=session)
# Write Serialized DAGs to DB
self.log.debug("Calling the SerializedDagModel.bulk_sync_to_db method")
SerializedDagModel.bulk_sync_to_db(self.dags.values(), session=session)
1 change: 0 additions & 1 deletion airflow/models/dagcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class DagCode(Base):
dag_code table contains code of DAG files synchronized by scheduler.
This feature is controlled by:

* ``[core] store_serialized_dags = True``: enable this feature
* ``[core] store_dag_code = True``: enable this feature

For details on dag serialization see SerializedDagModel
Expand Down
5 changes: 2 additions & 3 deletions airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,15 @@ class SerializedDagModel(Base):
serialized_dag table is a snapshot of DAG files synchronized by scheduler.
This feature is controlled by:

* ``[core] store_serialized_dags = True``: enable this feature
* ``[core] min_serialized_dag_update_interval = 30`` (s):
serialized DAGs are updated in DB when a file gets processed by scheduler,
to reduce DB write rate, there is a minimal interval of updating serialized DAGs.
* ``[scheduler] dag_dir_list_interval = 300`` (s):
interval of deleting serialized DAGs in DB when the files are deleted, suggest
to use a smaller interval such as 60

It is used by webserver to load dags when ``store_serialized_dags=True``.
Because reading from database is lightweight compared to importing from files,
It is used by webserver to load dags
because reading from database is lightweight compared to importing from files,
it solves the webserver scalability issue.
"""

Expand Down
40 changes: 16 additions & 24 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
from airflow.models.variable import Variable
from airflow.models.xcom import XCOM_RETURN_KEY, XCom
from airflow.sentry import Sentry
from airflow.settings import STORE_SERIALIZED_DAGS
from airflow.stats import Stats
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
Expand Down Expand Up @@ -1160,9 +1159,8 @@ def signal_handler(signum, frame): # pylint: disable=unused-argument
start_time = time.time()

self.render_templates(context=context)
if STORE_SERIALIZED_DAGS:
RenderedTaskInstanceFields.write(RenderedTaskInstanceFields(ti=self, render_templates=False))
RenderedTaskInstanceFields.delete_old_records(self.task_id, self.dag_id)
RenderedTaskInstanceFields.write(RenderedTaskInstanceFields(ti=self, render_templates=False))
RenderedTaskInstanceFields.delete_old_records(self.task_id, self.dag_id)

# Export context to make it available for operators to use.
airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
Expand Down Expand Up @@ -1576,28 +1574,22 @@ def get(
}

def get_rendered_template_fields(self):
"""
Fetch rendered template fields from DB if Serialization is enabled.
Else just render the templates
"""
"""Fetch rendered template fields from DB"""
from airflow.models.renderedtifields import RenderedTaskInstanceFields
if STORE_SERIALIZED_DAGS:
rendered_task_instance_fields = RenderedTaskInstanceFields.get_templated_fields(self)
if rendered_task_instance_fields:
for field_name, rendered_value in rendered_task_instance_fields.items():
setattr(self.task, field_name, rendered_value)
else:
try:
self.render_templates()
except (TemplateAssertionError, UndefinedError) as e:
raise AirflowException(
"Webserver does not have access to User-defined Macros or Filters "
"when Dag Serialization is enabled. Hence for the task that have not yet "
"started running, please use 'airflow tasks render' for debugging the "
"rendering of template_fields."
) from e
rendered_task_instance_fields = RenderedTaskInstanceFields.get_templated_fields(self)
if rendered_task_instance_fields:
for field_name, rendered_value in rendered_task_instance_fields.items():
setattr(self.task, field_name, rendered_value)
else:
self.render_templates()
try:
self.render_templates()
except (TemplateAssertionError, UndefinedError) as e:
raise AirflowException(
"Webserver does not have access to User-defined Macros or Filters "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kaxil , if using Airflow 2.0 or using Airflow 1.10.* with store_serialized_dags = True turned on, we hit this error here when user clicks on "Rendered Template" on tasks that are using user_defined_macros in jinja template fields.

We also hit a similar problem if user clears ExternalTaskMarker that uses user_defined_macros in the jinja template fields (because the dag.clear() function calls ti.render_templates() to figure out the actual values of the template fields.

How do you recommend addressing these issues going forward? Should we allow some webserver functions to get access to these user_defined_macros? Or should we serialize the rendered template values so that webserver can access them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error is already handled in the Webserver:

ti = models.TaskInstance(task=task, execution_date=dttm)
try:
ti.get_rendered_template_fields()
except AirflowException as e: # pylint: disable=broad-except
msg = "Error rendering template: " + escape(e)
if e.__cause__: # pylint: disable=using-constant-test
msg += Markup("<br><br>OriginalError: ") + escape(e.__cause__)
flash(msg, "error")

i.e we just ask users to run it via a CLI instead.

The flash message is so that they know where to look for i.e. run it using the CLI

The ExternalTaskMarker does it cause failure in Scheduler or Webserver

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @kaxil . Thanks for pointing out that the "Rendered Template" error is handled with an error message. Then that issue is a small annoyance for the user because they cannot see the rendered arguments in the website.

However, the ExternalTaskMarker actually causes issue in the Webserver when user hits Clear. I opened an issue here #13827

I don't mind working on a fix. Just wondering if you have any suggestions how to do it.

"when Dag Serialization is enabled. Hence for the task that have not yet "
"started running, please use 'airflow tasks render' for debugging the "
"rendering of template_fields."
) from e

def overwrite_params_with_dag_run_conf(self, params, dag_run):
"""Overwrite Task Params with DagRun.conf"""
Expand Down
3 changes: 1 addition & 2 deletions airflow/operators/dagrun_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from typing import Dict, Optional, Union
from urllib.parse import quote

from airflow import settings
from airflow.api.common.experimental.trigger_dag import trigger_dag
from airflow.exceptions import DagNotFound, DagRunAlreadyExists
from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun
Expand Down Expand Up @@ -122,7 +121,7 @@ def execute(self, context: Dict):

dag_bag = DagBag(
dag_folder=dag_model.fileloc,
store_serialized_dags=settings.STORE_SERIALIZED_DAGS
read_dags_from_db=True
)

dag = dag_bag.get_dag(self.trigger_dag_id)
Expand Down
6 changes: 1 addition & 5 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,6 @@ def initialize():
WEB_COLORS = {'LIGHTBLUE': '#4d9de0',
'LIGHTORANGE': '#FF9933'}

# If store_serialized_dags is True, scheduler writes serialized DAGs to DB, and webserver
# reads DAGs from DB instead of importing from files.
STORE_SERIALIZED_DAGS = conf.getboolean('core', 'store_serialized_dags', fallback=False)

# Updating serialized DAG can not be faster than a minimum interval to reduce database
# write rate.
Expand All @@ -360,8 +357,7 @@ def initialize():

# Whether to persist DAG files code in DB. If set to True, Webserver reads file contents
# from DB instead of trying to access files in a DAG folder.
# Defaults to same as the store_serialized_dags setting.
STORE_DAG_CODE = conf.getboolean("core", "store_dag_code", fallback=STORE_SERIALIZED_DAGS)
STORE_DAG_CODE = conf.getboolean("core", "store_dag_code", fallback=True)

# If donot_modify_handlers=True, we do not modify logging handlers in task_run command
# If the flag is set to False, we remove all handlers from the root logger
Expand Down
12 changes: 5 additions & 7 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@

import airflow.models
from airflow.configuration import conf
from airflow.models import errors
from airflow.models import DagModel, errors
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.settings import STORE_DAG_CODE, STORE_SERIALIZED_DAGS
from airflow.settings import STORE_DAG_CODE
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.callback_requests import CallbackRequest, SlaCallbackRequest, TaskCallbackRequest
Expand Down Expand Up @@ -734,11 +735,8 @@ def _refresh_dag_dir(self):
except Exception: # noqa pylint: disable=broad-except
self.log.exception("Error removing old import errors")

if STORE_SERIALIZED_DAGS:
from airflow.models.dag import DagModel
from airflow.models.serialized_dag import SerializedDagModel
SerializedDagModel.remove_deleted_dags(self._file_paths)
DagModel.deactivate_deleted_dags(self._file_paths)
SerializedDagModel.remove_deleted_dags(self._file_paths)
DagModel.deactivate_deleted_dags(self._file_paths)

if self.store_dag_code:
from airflow.models.dagcode import DagCode
Expand Down
4 changes: 2 additions & 2 deletions airflow/www/extensions/init_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import os

from airflow.models import DagBag
from airflow.settings import DAGS_FOLDER, STORE_SERIALIZED_DAGS
from airflow.settings import DAGS_FOLDER


def init_dagbag(app):
Expand All @@ -29,4 +29,4 @@ def init_dagbag(app):
if os.environ.get('SKIP_DAGS_PARSING') == 'True':
app.dag_bag = DagBag(os.devnull, include_examples=False)
else:
app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=STORE_SERIALIZED_DAGS)
app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=True)
12 changes: 7 additions & 5 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1867,7 +1867,12 @@ def duration(self, session=None): # pylint: disable=too-many-locals
"""Get Dag as duration graph."""
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
dag_id = request.args.get('dag_id')
dag = current_app.dag_bag.get_dag(dag_id)

try:
dag = current_app.dag_bag.get_dag(dag_id)
except airflow.exceptions.SerializedDagNotFound:
dag = None

base_date = request.args.get('base_date')
num_runs = request.args.get('num_runs')
num_runs = int(num_runs) if num_runs else default_dag_run
Expand Down Expand Up @@ -2159,10 +2164,7 @@ def refresh(self, session=None):
@action_logging
def refresh_all(self):
"""Refresh everything"""
if settings.STORE_SERIALIZED_DAGS:
current_app.dag_bag.collect_dags_from_db()
else:
current_app.dag_bag.collect_dags(only_if_updated=False)
current_app.dag_bag.collect_dags_from_db()

# sync permissions for all dags
for dag_id, dag in current_app.dag_bag.dags.items():
Expand Down
Loading