Skip to content

Commit

Permalink
add test to check default timetable
Browse files Browse the repository at this point in the history
  • Loading branch information
duprijil committed Dec 19, 2024
1 parent cc5c1bb commit fc318ca
Showing 1 changed file with 26 additions and 4 deletions.
30 changes: 26 additions & 4 deletions tests/unit/dags/test_fetch_notices_by_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from airflow.models import DagBag, Variable
from airflow.timetables.trigger import CronTriggerTimetable

from dags.fetch_notices_by_date import DAG_FETCH_DEFAULT_TIMEZONE
from ted_sws import DAG_FETCH_DEFAULT_TIMETABLE


Expand Down Expand Up @@ -44,6 +43,32 @@ def test_fetcher_change_timetable_from_env_variable_after_reparse(dag_bag: DagBa
assert fetcher_dag.schedule_interval == example_dag_cron_table._expression

assert all(airflow_timetable_import_error_name not in error for error in dag_bag.import_errors.values())
del os.environ[dag_fetch_schedule_variable_name]


def test_fetcher_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag,
dag_fetch_schedule_variable_name: str,
fetcher_dag_id: str,
airflow_timetable_import_error_name: str):
env_var_value = os.getenv(dag_fetch_schedule_variable_name)
is_env_var_set: bool = True if env_var_value is not None else False
if is_env_var_set:
del os.environ[dag_fetch_schedule_variable_name]
airflow_var_value = Variable.get(key=dag_fetch_schedule_variable_name, default_var=None)
is_airflow_var_set: bool = True if airflow_var_value is not None else False
if is_airflow_var_set:
Variable.delete(key=dag_fetch_schedule_variable_name)

dag_bag.collect_dags()
fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_id)
assert fetcher_dag is not None
assert fetcher_dag.schedule_interval == DAG_FETCH_DEFAULT_TIMETABLE
assert all(airflow_timetable_import_error_name not in error for error in dag_bag.import_errors.values())

if is_env_var_set:
os.environ[dag_fetch_schedule_variable_name] = env_var_value
if is_airflow_var_set:
Variable.set(key=dag_fetch_schedule_variable_name, value=airflow_var_value)


def test_fetcher_gets_incorrect_timetable_after_reparse(dag_bag: DagBag,
Expand All @@ -53,9 +78,6 @@ def test_fetcher_gets_incorrect_timetable_after_reparse(dag_bag: DagBag,
airflow_timetable_import_error_name: str):
fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_id)
assert fetcher_dag is not None
default_dag_timetable = CronTriggerTimetable(cron=DAG_FETCH_DEFAULT_TIMETABLE,
timezone=DAG_FETCH_DEFAULT_TIMEZONE)
assert fetcher_dag.schedule_interval == default_dag_timetable._expression

Variable.set(key=dag_fetch_schedule_variable_name, value=example_wrong_cron_table)
dag_bag.collect_dags(only_if_updated=False)
Expand Down

0 comments on commit fc318ca

Please sign in to comment.