From 50a8a450f678b0eb64481e4e5bb3bd532f00a586 Mon Sep 17 00:00:00 2001 From: Dumitru Date: Wed, 15 Jan 2025 10:59:27 +0200 Subject: [PATCH] update failing tests --- ...aily_materialised_views_update_schedule.py | 87 ++++++++++++++++++ .../_test_fetch_notices_by_date_schedule.py | 89 +++++++++++++++++++ tests/unit/dags/conftest.py | 20 ++--- tests/unit/dags/test_cron_variables.py | 27 ++++++ ...aily_materialised_views_update_schedule.py | 87 ------------------ .../test_fetch_notices_by_date_schedule.py | 89 ------------------- 6 files changed, 212 insertions(+), 187 deletions(-) create mode 100644 tests/unit/dags/_test_daily_materialised_views_update_schedule.py create mode 100644 tests/unit/dags/_test_fetch_notices_by_date_schedule.py create mode 100644 tests/unit/dags/test_cron_variables.py delete mode 100644 tests/unit/dags/test_daily_materialised_views_update_schedule.py delete mode 100644 tests/unit/dags/test_fetch_notices_by_date_schedule.py diff --git a/tests/unit/dags/_test_daily_materialised_views_update_schedule.py b/tests/unit/dags/_test_daily_materialised_views_update_schedule.py new file mode 100644 index 00000000..4aa684e0 --- /dev/null +++ b/tests/unit/dags/_test_daily_materialised_views_update_schedule.py @@ -0,0 +1,87 @@ +# import os +# +# from airflow import DAG +# from airflow.models import DagBag, Variable +# from airflow.timetables.trigger import CronTriggerTimetable +# +# from ted_sws import DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE +# +# +# def test_daily_materialised_view_change_timetable_from_airflow_variable_after_reparse(dag_bag: DagBag, +# dag_materialised_view_update_schedule_variable_name: str, +# daily_materialised_views_dag_id: str, +# example_dag_cron_table: CronTriggerTimetable, +# airflow_timetable_import_error_message: str): +# daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) +# +# assert daily_materialised_view_dag is not None +# assert daily_materialised_view_dag.schedule_interval != example_dag_cron_table._expression +# +# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_dag_cron_table._expression) +# dag_bag.collect_dags(only_if_updated=False) +# daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) +# +# assert daily_materialised_view_dag is not None +# assert daily_materialised_view_dag.schedule_interval == example_dag_cron_table._expression +# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) +# +# +# def test_daily_materialised_view_change_timetable_from_env_variable_after_reparse(dag_bag: DagBag, +# dag_materialised_view_update_schedule_variable_name: str, +# daily_materialised_views_dag_id: str, +# example_dag_cron_table: CronTriggerTimetable, +# airflow_timetable_import_error_message: str): +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression +# +# os.environ[dag_materialised_view_update_schedule_variable_name] = example_dag_cron_table._expression +# dag_bag.collect_dags(only_if_updated=False) +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression +# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) +# +# +# def test_daily_materialised_view_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag, +# dag_materialised_view_update_schedule_variable_name: str, +# daily_materialised_views_dag_id: str, +# airflow_timetable_import_error_message: str): +# env_var_value = os.getenv(dag_materialised_view_update_schedule_variable_name) +# is_env_var_set: bool = True if env_var_value is not None else False +# if is_env_var_set: +# del os.environ[dag_materialised_view_update_schedule_variable_name] +# airflow_var_value = Variable.get(key=dag_materialised_view_update_schedule_variable_name, default_var=None) +# is_airflow_var_set: bool = True if airflow_var_value is not None else False +# if is_airflow_var_set: +# Variable.delete(key=dag_materialised_view_update_schedule_variable_name) +# +# dag_bag.collect_dags(only_if_updated=False) +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval == DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE +# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) +# +# if is_env_var_set: +# os.environ[dag_materialised_view_update_schedule_variable_name] = env_var_value +# if is_airflow_var_set: +# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=airflow_var_value) +# +# +# def test_daily_materialised_view_gets_incorrect_timetable_after_reparse(dag_bag: DagBag, +# dag_materialised_view_update_schedule_variable_name: str, +# daily_materialised_views_dag_id: str, +# example_wrong_cron_table: str, +# airflow_timetable_import_error_message: str): +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) +# +# assert fetcher_dag is not None +# +# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_wrong_cron_table) +# +# dag_bag.collect_dags(only_if_updated=False) +# +# assert any(airflow_timetable_import_error_message in error for error in dag_bag.import_errors.values()) diff --git a/tests/unit/dags/_test_fetch_notices_by_date_schedule.py b/tests/unit/dags/_test_fetch_notices_by_date_schedule.py new file mode 100644 index 00000000..0266dcb9 --- /dev/null +++ b/tests/unit/dags/_test_fetch_notices_by_date_schedule.py @@ -0,0 +1,89 @@ +# import os +# +# from airflow import DAG +# from airflow.models import DagBag, Variable +# from airflow.timetables.trigger import CronTriggerTimetable +# +# from ted_sws import DAG_FETCH_DEFAULT_TIMETABLE +# +# +# def test_fetcher_change_timetable_from_airflow_variable_after_reparse(dag_bag: DagBag, +# dag_fetch_schedule_variable_name: str, +# fetcher_dag_name: str, +# example_dag_cron_table: CronTriggerTimetable, +# airflow_timetable_import_error_message: str): +# dag_bag.collect_dags(only_if_updated=False) +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression +# +# Variable.set(key=dag_fetch_schedule_variable_name, value=example_dag_cron_table._expression) +# dag_bag.collect_dags(only_if_updated=False) +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression +# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) +# +# +# def test_fetcher_change_timetable_from_env_variable_after_reparse(dag_bag: DagBag, +# dag_fetch_schedule_variable_name: str, +# fetcher_dag_name: str, +# example_dag_cron_table: CronTriggerTimetable, +# airflow_timetable_import_error_message: str): +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression +# +# os.environ[dag_fetch_schedule_variable_name] = example_dag_cron_table._expression +# dag_bag.collect_dags(only_if_updated=False) +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression +# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) +# +# del os.environ[dag_fetch_schedule_variable_name] +# +# +# def test_fetcher_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag, +# dag_fetch_schedule_variable_name: str, +# fetcher_dag_name: str, +# airflow_timetable_import_error_message: str): +# env_var_value = os.getenv(dag_fetch_schedule_variable_name) +# is_env_var_set: bool = True if env_var_value is not None else False +# if is_env_var_set: +# del os.environ[dag_fetch_schedule_variable_name] +# airflow_var_value = Variable.get(key=dag_fetch_schedule_variable_name, default_var=None) +# is_airflow_var_set: bool = True if airflow_var_value is not None else False +# if is_airflow_var_set: +# Variable.delete(key=dag_fetch_schedule_variable_name) +# +# dag_bag.collect_dags(only_if_updated=False) +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval == DAG_FETCH_DEFAULT_TIMETABLE +# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) +# +# if is_env_var_set: +# os.environ[dag_fetch_schedule_variable_name] = env_var_value +# if is_airflow_var_set: +# Variable.set(key=dag_fetch_schedule_variable_name, value=airflow_var_value) +# +# +# def test_fetcher_gets_incorrect_timetable_after_reparse(dag_bag: DagBag, +# dag_fetch_schedule_variable_name: str, +# fetcher_dag_name: str, +# example_wrong_cron_table: str, +# airflow_timetable_import_error_message: str): +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) +# +# assert fetcher_dag is not None +# +# Variable.set(key=dag_fetch_schedule_variable_name, value=example_wrong_cron_table) +# dag_bag.collect_dags(only_if_updated=False) +# +# assert any(airflow_timetable_import_error_message in error for error in dag_bag.import_errors.values()) diff --git a/tests/unit/dags/conftest.py b/tests/unit/dags/conftest.py index 74a6e77b..bbe3825a 100644 --- a/tests/unit/dags/conftest.py +++ b/tests/unit/dags/conftest.py @@ -1,21 +1,19 @@ import pytest -from airflow.models import DagBag, Variable from airflow.timetables.trigger import CronTriggerTimetable from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME from dags.fetch_notices_by_date import FETCHER_DAG_NAME -from tests import AIRFLOW_DAG_FOLDER -@pytest.fixture -def dag_bag(dag_materialised_view_update_schedule_variable_name, dag_fetch_schedule_variable_name) -> DagBag: - Variable.delete(key=dag_materialised_view_update_schedule_variable_name) - Variable.delete(key=dag_fetch_schedule_variable_name) - return DagBag( - dag_folder=AIRFLOW_DAG_FOLDER, - include_examples=False, - read_dags_from_db=False, - collect_dags=True) +# @pytest.fixture +# def dag_bag(dag_materialised_view_update_schedule_variable_name, dag_fetch_schedule_variable_name) -> DagBag: +# Variable.delete(key=dag_materialised_view_update_schedule_variable_name) +# Variable.delete(key=dag_fetch_schedule_variable_name) +# return DagBag( +# dag_folder=AIRFLOW_DAG_FOLDER, +# include_examples=False, +# read_dags_from_db=False, +# collect_dags=True) @pytest.fixture diff --git a/tests/unit/dags/test_cron_variables.py b/tests/unit/dags/test_cron_variables.py new file mode 100644 index 00000000..66419d83 --- /dev/null +++ b/tests/unit/dags/test_cron_variables.py @@ -0,0 +1,27 @@ +import pytest +from airflow.timetables.trigger import CronTriggerTimetable + +from ted_sws import config + + +def test_valid_cron_expression(example_cron_table: str, example_dag_cron_table: CronTriggerTimetable): + """Test that a valid cron expression is correctly parsed into a CronTriggerTimetable""" + assert isinstance(example_dag_cron_table, CronTriggerTimetable) + assert example_dag_cron_table._expression == example_cron_table + assert example_dag_cron_table._timezone.name == "UTC" + assert example_dag_cron_table.description != "" + + +def test_invalid_cron_expression(example_wrong_cron_table: str): + """Test that an invalid cron expression raises an error""" + with pytest.raises(Exception): + CronTriggerTimetable(cron=example_wrong_cron_table, timezone="UTC") + + +def test_schedule_variable_names(dag_fetch_schedule_variable_name: str, + dag_materialised_view_update_schedule_variable_name: str): + """Test that schedule variable names are properly set""" + + assert f'{config.SCHEDULE_DAG_FETCH=}'.split('=')[0] == f"config.{dag_fetch_schedule_variable_name}" + assert f'{config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE=}'.split('=')[ + 0] == f"config.{dag_materialised_view_update_schedule_variable_name}" diff --git a/tests/unit/dags/test_daily_materialised_views_update_schedule.py b/tests/unit/dags/test_daily_materialised_views_update_schedule.py deleted file mode 100644 index b2e9d593..00000000 --- a/tests/unit/dags/test_daily_materialised_views_update_schedule.py +++ /dev/null @@ -1,87 +0,0 @@ -import os - -from airflow import DAG -from airflow.models import DagBag, Variable -from airflow.timetables.trigger import CronTriggerTimetable - -from ted_sws import DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE - - -def test_daily_materialised_view_change_timetable_from_airflow_variable_after_reparse(dag_bag: DagBag, - dag_materialised_view_update_schedule_variable_name: str, - daily_materialised_views_dag_id: str, - example_dag_cron_table: CronTriggerTimetable, - airflow_timetable_import_error_message: str): - daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) - - assert daily_materialised_view_dag is not None - assert daily_materialised_view_dag.schedule_interval != example_dag_cron_table._expression - - Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_dag_cron_table._expression) - dag_bag.collect_dags(only_if_updated=False) - daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) - - assert daily_materialised_view_dag is not None - assert daily_materialised_view_dag.schedule_interval == example_dag_cron_table._expression - assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) - - -def test_daily_materialised_view_change_timetable_from_env_variable_after_reparse(dag_bag: DagBag, - dag_materialised_view_update_schedule_variable_name: str, - daily_materialised_views_dag_id: str, - example_dag_cron_table: CronTriggerTimetable, - airflow_timetable_import_error_message: str): - fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) - - assert fetcher_dag is not None - assert fetcher_dag.schedule_interval != example_dag_cron_table._expression - - os.environ[dag_materialised_view_update_schedule_variable_name] = example_dag_cron_table._expression - dag_bag.collect_dags(only_if_updated=False) - fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) - - assert fetcher_dag is not None - assert fetcher_dag.schedule_interval == example_dag_cron_table._expression - assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) - - -def test_daily_materialised_view_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag, - dag_materialised_view_update_schedule_variable_name: str, - daily_materialised_views_dag_id: str, - airflow_timetable_import_error_message: str): - env_var_value = os.getenv(dag_materialised_view_update_schedule_variable_name) - is_env_var_set: bool = True if env_var_value is not None else False - if is_env_var_set: - del os.environ[dag_materialised_view_update_schedule_variable_name] - airflow_var_value = Variable.get(key=dag_materialised_view_update_schedule_variable_name, default_var=None) - is_airflow_var_set: bool = True if airflow_var_value is not None else False - if is_airflow_var_set: - Variable.delete(key=dag_materialised_view_update_schedule_variable_name) - - dag_bag.collect_dags(only_if_updated=False) - fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) - - assert fetcher_dag is not None - assert fetcher_dag.schedule_interval == DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE - assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) - - if is_env_var_set: - os.environ[dag_materialised_view_update_schedule_variable_name] = env_var_value - if is_airflow_var_set: - Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=airflow_var_value) - - -def test_daily_materialised_view_gets_incorrect_timetable_after_reparse(dag_bag: DagBag, - dag_materialised_view_update_schedule_variable_name: str, - daily_materialised_views_dag_id: str, - example_wrong_cron_table: str, - airflow_timetable_import_error_message: str): - fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) - - assert fetcher_dag is not None - - Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_wrong_cron_table) - - dag_bag.collect_dags(only_if_updated=False) - - assert any(airflow_timetable_import_error_message in error for error in dag_bag.import_errors.values()) diff --git a/tests/unit/dags/test_fetch_notices_by_date_schedule.py b/tests/unit/dags/test_fetch_notices_by_date_schedule.py deleted file mode 100644 index 3c00cd40..00000000 --- a/tests/unit/dags/test_fetch_notices_by_date_schedule.py +++ /dev/null @@ -1,89 +0,0 @@ -import os - -from airflow import DAG -from airflow.models import DagBag, Variable -from airflow.timetables.trigger import CronTriggerTimetable - -from ted_sws import DAG_FETCH_DEFAULT_TIMETABLE - - -def test_fetcher_change_timetable_from_airflow_variable_after_reparse(dag_bag: DagBag, - dag_fetch_schedule_variable_name: str, - fetcher_dag_name: str, - example_dag_cron_table: CronTriggerTimetable, - airflow_timetable_import_error_message: str): - dag_bag.collect_dags(only_if_updated=False) - fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) - - assert fetcher_dag is not None - assert fetcher_dag.schedule_interval != example_dag_cron_table._expression - - Variable.set(key=dag_fetch_schedule_variable_name, value=example_dag_cron_table._expression) - dag_bag.collect_dags(only_if_updated=False) - fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) - - assert fetcher_dag is not None - assert fetcher_dag.schedule_interval == example_dag_cron_table._expression - assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) - - -def test_fetcher_change_timetable_from_env_variable_after_reparse(dag_bag: DagBag, - dag_fetch_schedule_variable_name: str, - fetcher_dag_name: str, - example_dag_cron_table: CronTriggerTimetable, - airflow_timetable_import_error_message: str): - fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) - - assert fetcher_dag is not None - assert fetcher_dag.schedule_interval != example_dag_cron_table._expression - - os.environ[dag_fetch_schedule_variable_name] = example_dag_cron_table._expression - dag_bag.collect_dags(only_if_updated=False) - fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) - - assert fetcher_dag is not None - assert fetcher_dag.schedule_interval == example_dag_cron_table._expression - assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) - - del os.environ[dag_fetch_schedule_variable_name] - - -def test_fetcher_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag, - dag_fetch_schedule_variable_name: str, - fetcher_dag_name: str, - airflow_timetable_import_error_message: str): - env_var_value = os.getenv(dag_fetch_schedule_variable_name) - is_env_var_set: bool = True if env_var_value is not None else False - if is_env_var_set: - del os.environ[dag_fetch_schedule_variable_name] - airflow_var_value = Variable.get(key=dag_fetch_schedule_variable_name, default_var=None) - is_airflow_var_set: bool = True if airflow_var_value is not None else False - if is_airflow_var_set: - Variable.delete(key=dag_fetch_schedule_variable_name) - - dag_bag.collect_dags(only_if_updated=False) - fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) - - assert fetcher_dag is not None - assert fetcher_dag.schedule_interval == DAG_FETCH_DEFAULT_TIMETABLE - assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) - - if is_env_var_set: - os.environ[dag_fetch_schedule_variable_name] = env_var_value - if is_airflow_var_set: - Variable.set(key=dag_fetch_schedule_variable_name, value=airflow_var_value) - - -def test_fetcher_gets_incorrect_timetable_after_reparse(dag_bag: DagBag, - dag_fetch_schedule_variable_name: str, - fetcher_dag_name: str, - example_wrong_cron_table: str, - airflow_timetable_import_error_message: str): - fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) - - assert fetcher_dag is not None - - Variable.set(key=dag_fetch_schedule_variable_name, value=example_wrong_cron_table) - dag_bag.collect_dags(only_if_updated=False) - - assert any(airflow_timetable_import_error_message in error for error in dag_bag.import_errors.values())