Skip to content

Commit

Permalink
add tests for daily materialised view dag scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
duprijil committed Dec 19, 2024
1 parent 76a1c2b commit b954983
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 1 deletion.
2 changes: 1 addition & 1 deletion dags/daily_materialized_views_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
create_notice_collection_materialised_view, create_notice_kpi_collection

DAG_NAME = "daily_materialized_views_update"
DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
Expand Down
4 changes: 4 additions & 0 deletions ted_sws/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
SPARQL_PREFIXES_PATH = PROJECT_PATH / "resources" / "prefixes" / "prefixes.json"

DAG_FETCH_DEFAULT_TIMETABLE = "0 1 * * *"
DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE = "0 6 * * *"

class MongoDBConfig:

Expand Down Expand Up @@ -270,6 +271,9 @@ class DagSchedulingConfig:
def SCHEDULE_DAG_FETCH(self, config_value: str) -> str:
return config_value

@env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value=DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE)
def SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE(self, config_value: str) -> str:
return config_value

class TedConfigResolver(MongoDBConfig, RMLMapperConfig, XMLProcessorConfig, ELKConfig, LoggingConfig,
GitHubArtefacts, API, AllegroConfig, TedAPIConfig, SFTPConfig, FusekiConfig,
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/dags/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from airflow.utils import db
from psutil.tests import pytest

from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME
from dags.fetch_notices_by_date import FETCHER_DAG_NAME
from tests import TESTS_PATH

Expand Down Expand Up @@ -36,6 +37,10 @@ def dag_bag():
def fetcher_dag_id():
return FETCHER_DAG_NAME

@pytest.fixture
def daily_materialised_views_dag_id():
return DAILY_MATERIALISED_VIEWS_DAG_NAME


@pytest.fixture
def example_cron_table() -> str:
Expand Down
85 changes: 85 additions & 0 deletions tests/unit/dags/test_daily_materialised_views_update_schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import os

from airflow import DAG
from airflow.models import DagBag, Variable
from airflow.timetables.trigger import CronTriggerTimetable

from ted_sws import DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE


def test_daily_materialised_view_change_timetable_from_airflow_variable_after_reparse(dag_bag: DagBag,
dag_materialised_view_update_schedule_variable_name: str,
daily_materialised_views_dag_id: str,
example_dag_cron_table: CronTriggerTimetable,
airflow_timetable_import_error_name: str):
daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
assert daily_materialised_view_dag is not None
assert daily_materialised_view_dag.schedule_interval != example_dag_cron_table._expression

Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_dag_cron_table._expression)
dag_bag.collect_dags(only_if_updated=False)

daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
assert daily_materialised_view_dag is not None
assert daily_materialised_view_dag.schedule_interval == example_dag_cron_table._expression

assert all(airflow_timetable_import_error_name not in error for error in dag_bag.import_errors.values())


def test_daily_materialised_view_change_timetable_from_env_variable_after_reparse(dag_bag: DagBag,
dag_materialised_view_update_schedule_variable_name: str,
daily_materialised_views_dag_id: str,
example_dag_cron_table: CronTriggerTimetable,
airflow_timetable_import_error_name: str):
fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
assert fetcher_dag is not None
assert fetcher_dag.schedule_interval != example_dag_cron_table._expression

os.environ[dag_materialised_view_update_schedule_variable_name] = example_dag_cron_table._expression
dag_bag.collect_dags(only_if_updated=False)

fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
assert fetcher_dag is not None
assert fetcher_dag.schedule_interval == example_dag_cron_table._expression

assert all(airflow_timetable_import_error_name not in error for error in dag_bag.import_errors.values())
del os.environ[dag_materialised_view_update_schedule_variable_name]


def test_daily_materialised_view_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag,
dag_materialised_view_update_schedule_variable_name: str,
daily_materialised_views_dag_id: str,
airflow_timetable_import_error_name: str):
env_var_value = os.getenv(dag_materialised_view_update_schedule_variable_name)
is_env_var_set: bool = True if env_var_value is not None else False
if is_env_var_set:
del os.environ[dag_materialised_view_update_schedule_variable_name]
airflow_var_value = Variable.get(key=dag_materialised_view_update_schedule_variable_name, default_var=None)
is_airflow_var_set: bool = True if airflow_var_value is not None else False
if is_airflow_var_set:
Variable.delete(key=dag_materialised_view_update_schedule_variable_name)

dag_bag.collect_dags()
fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
assert fetcher_dag is not None
assert fetcher_dag.schedule_interval == DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE
assert all(airflow_timetable_import_error_name not in error for error in dag_bag.import_errors.values())

if is_env_var_set:
os.environ[dag_materialised_view_update_schedule_variable_name] = env_var_value
if is_airflow_var_set:
Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=airflow_var_value)


def test_daily_materialised_view_gets_incorrect_timetable_after_reparse(dag_bag: DagBag,
dag_materialised_view_update_schedule_variable_name: str,
daily_materialised_views_dag_id: str,
example_wrong_cron_table: str,
airflow_timetable_import_error_name: str):
fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
assert fetcher_dag is not None

Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_wrong_cron_table)
dag_bag.collect_dags(only_if_updated=False)

assert any(airflow_timetable_import_error_name in error for error in dag_bag.import_errors.values())

0 comments on commit b954983

Please sign in to comment.