-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
212 additions
and
187 deletions.
There are no files selected for viewing
87 changes: 87 additions & 0 deletions
87
tests/unit/dags/_test_daily_materialised_views_update_schedule.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
# import os | ||
# | ||
# from airflow import DAG | ||
# from airflow.models import DagBag, Variable | ||
# from airflow.timetables.trigger import CronTriggerTimetable | ||
# | ||
# from ted_sws import DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE | ||
# | ||
# | ||
# def test_daily_materialised_view_change_timetable_from_airflow_variable_after_reparse(dag_bag: DagBag, | ||
# dag_materialised_view_update_schedule_variable_name: str, | ||
# daily_materialised_views_dag_id: str, | ||
# example_dag_cron_table: CronTriggerTimetable, | ||
# airflow_timetable_import_error_message: str): | ||
# daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) | ||
# | ||
# assert daily_materialised_view_dag is not None | ||
# assert daily_materialised_view_dag.schedule_interval != example_dag_cron_table._expression | ||
# | ||
# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_dag_cron_table._expression) | ||
# dag_bag.collect_dags(only_if_updated=False) | ||
# daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) | ||
# | ||
# assert daily_materialised_view_dag is not None | ||
# assert daily_materialised_view_dag.schedule_interval == example_dag_cron_table._expression | ||
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) | ||
# | ||
# | ||
# def test_daily_materialised_view_change_timetable_from_env_variable_after_reparse(dag_bag: DagBag, | ||
# dag_materialised_view_update_schedule_variable_name: str, | ||
# daily_materialised_views_dag_id: str, | ||
# example_dag_cron_table: CronTriggerTimetable, | ||
# airflow_timetable_import_error_message: str): | ||
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) | ||
# | ||
# assert fetcher_dag is not None | ||
# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression | ||
# | ||
# os.environ[dag_materialised_view_update_schedule_variable_name] = example_dag_cron_table._expression | ||
# dag_bag.collect_dags(only_if_updated=False) | ||
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) | ||
# | ||
# assert fetcher_dag is not None | ||
# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression | ||
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) | ||
# | ||
# | ||
# def test_daily_materialised_view_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag, | ||
# dag_materialised_view_update_schedule_variable_name: str, | ||
# daily_materialised_views_dag_id: str, | ||
# airflow_timetable_import_error_message: str): | ||
# env_var_value = os.getenv(dag_materialised_view_update_schedule_variable_name) | ||
# is_env_var_set: bool = True if env_var_value is not None else False | ||
# if is_env_var_set: | ||
# del os.environ[dag_materialised_view_update_schedule_variable_name] | ||
# airflow_var_value = Variable.get(key=dag_materialised_view_update_schedule_variable_name, default_var=None) | ||
# is_airflow_var_set: bool = True if airflow_var_value is not None else False | ||
# if is_airflow_var_set: | ||
# Variable.delete(key=dag_materialised_view_update_schedule_variable_name) | ||
# | ||
# dag_bag.collect_dags(only_if_updated=False) | ||
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) | ||
# | ||
# assert fetcher_dag is not None | ||
# assert fetcher_dag.schedule_interval == DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE | ||
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) | ||
# | ||
# if is_env_var_set: | ||
# os.environ[dag_materialised_view_update_schedule_variable_name] = env_var_value | ||
# if is_airflow_var_set: | ||
# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=airflow_var_value) | ||
# | ||
# | ||
# def test_daily_materialised_view_gets_incorrect_timetable_after_reparse(dag_bag: DagBag, | ||
# dag_materialised_view_update_schedule_variable_name: str, | ||
# daily_materialised_views_dag_id: str, | ||
# example_wrong_cron_table: str, | ||
# airflow_timetable_import_error_message: str): | ||
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) | ||
# | ||
# assert fetcher_dag is not None | ||
# | ||
# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_wrong_cron_table) | ||
# | ||
# dag_bag.collect_dags(only_if_updated=False) | ||
# | ||
# assert any(airflow_timetable_import_error_message in error for error in dag_bag.import_errors.values()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
# import os | ||
# | ||
# from airflow import DAG | ||
# from airflow.models import DagBag, Variable | ||
# from airflow.timetables.trigger import CronTriggerTimetable | ||
# | ||
# from ted_sws import DAG_FETCH_DEFAULT_TIMETABLE | ||
# | ||
# | ||
# def test_fetcher_change_timetable_from_airflow_variable_after_reparse(dag_bag: DagBag, | ||
# dag_fetch_schedule_variable_name: str, | ||
# fetcher_dag_name: str, | ||
# example_dag_cron_table: CronTriggerTimetable, | ||
# airflow_timetable_import_error_message: str): | ||
# dag_bag.collect_dags(only_if_updated=False) | ||
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) | ||
# | ||
# assert fetcher_dag is not None | ||
# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression | ||
# | ||
# Variable.set(key=dag_fetch_schedule_variable_name, value=example_dag_cron_table._expression) | ||
# dag_bag.collect_dags(only_if_updated=False) | ||
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) | ||
# | ||
# assert fetcher_dag is not None | ||
# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression | ||
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) | ||
# | ||
# | ||
# def test_fetcher_change_timetable_from_env_variable_after_reparse(dag_bag: DagBag, | ||
# dag_fetch_schedule_variable_name: str, | ||
# fetcher_dag_name: str, | ||
# example_dag_cron_table: CronTriggerTimetable, | ||
# airflow_timetable_import_error_message: str): | ||
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) | ||
# | ||
# assert fetcher_dag is not None | ||
# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression | ||
# | ||
# os.environ[dag_fetch_schedule_variable_name] = example_dag_cron_table._expression | ||
# dag_bag.collect_dags(only_if_updated=False) | ||
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) | ||
# | ||
# assert fetcher_dag is not None | ||
# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression | ||
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) | ||
# | ||
# del os.environ[dag_fetch_schedule_variable_name] | ||
# | ||
# | ||
# def test_fetcher_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag, | ||
# dag_fetch_schedule_variable_name: str, | ||
# fetcher_dag_name: str, | ||
# airflow_timetable_import_error_message: str): | ||
# env_var_value = os.getenv(dag_fetch_schedule_variable_name) | ||
# is_env_var_set: bool = True if env_var_value is not None else False | ||
# if is_env_var_set: | ||
# del os.environ[dag_fetch_schedule_variable_name] | ||
# airflow_var_value = Variable.get(key=dag_fetch_schedule_variable_name, default_var=None) | ||
# is_airflow_var_set: bool = True if airflow_var_value is not None else False | ||
# if is_airflow_var_set: | ||
# Variable.delete(key=dag_fetch_schedule_variable_name) | ||
# | ||
# dag_bag.collect_dags(only_if_updated=False) | ||
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) | ||
# | ||
# assert fetcher_dag is not None | ||
# assert fetcher_dag.schedule_interval == DAG_FETCH_DEFAULT_TIMETABLE | ||
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) | ||
# | ||
# if is_env_var_set: | ||
# os.environ[dag_fetch_schedule_variable_name] = env_var_value | ||
# if is_airflow_var_set: | ||
# Variable.set(key=dag_fetch_schedule_variable_name, value=airflow_var_value) | ||
# | ||
# | ||
# def test_fetcher_gets_incorrect_timetable_after_reparse(dag_bag: DagBag, | ||
# dag_fetch_schedule_variable_name: str, | ||
# fetcher_dag_name: str, | ||
# example_wrong_cron_table: str, | ||
# airflow_timetable_import_error_message: str): | ||
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) | ||
# | ||
# assert fetcher_dag is not None | ||
# | ||
# Variable.set(key=dag_fetch_schedule_variable_name, value=example_wrong_cron_table) | ||
# dag_bag.collect_dags(only_if_updated=False) | ||
# | ||
# assert any(airflow_timetable_import_error_message in error for error in dag_bag.import_errors.values()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
import pytest | ||
from airflow.timetables.trigger import CronTriggerTimetable | ||
|
||
from ted_sws import config | ||
|
||
|
||
def test_valid_cron_expression(example_cron_table: str, example_dag_cron_table: CronTriggerTimetable): | ||
"""Test that a valid cron expression is correctly parsed into a CronTriggerTimetable""" | ||
assert isinstance(example_dag_cron_table, CronTriggerTimetable) | ||
assert example_dag_cron_table._expression == example_cron_table | ||
assert example_dag_cron_table._timezone.name == "UTC" | ||
assert example_dag_cron_table.description != "" | ||
|
||
|
||
def test_invalid_cron_expression(example_wrong_cron_table: str): | ||
"""Test that an invalid cron expression raises an error""" | ||
with pytest.raises(Exception): | ||
CronTriggerTimetable(cron=example_wrong_cron_table, timezone="UTC") | ||
|
||
|
||
def test_schedule_variable_names(dag_fetch_schedule_variable_name: str, | ||
dag_materialised_view_update_schedule_variable_name: str): | ||
"""Test that schedule variable names are properly set""" | ||
|
||
assert f'{config.SCHEDULE_DAG_FETCH=}'.split('=')[0] == f"config.{dag_fetch_schedule_variable_name}" | ||
assert f'{config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE=}'.split('=')[ | ||
0] == f"config.{dag_materialised_view_update_schedule_variable_name}" |
87 changes: 0 additions & 87 deletions
87
tests/unit/dags/test_daily_materialised_views_update_schedule.py
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.