diff --git a/dags/daily_materialized_views_update.py b/dags/daily_materialized_views_update.py index 2618f0f0..ff10297d 100644 --- a/dags/daily_materialized_views_update.py +++ b/dags/daily_materialized_views_update.py @@ -1,19 +1,23 @@ from airflow.decorators import dag, task +from airflow.timetables.trigger import CronTriggerTimetable from pymongo import MongoClient from dags import DEFAULT_DAG_ARGUMENTS -from ted_sws import config +from ted_sws import config, DAG_DEFAULT_TIMEZONE from ted_sws.data_manager.services.create_batch_collection_materialised_view import \ create_batch_collection_materialised_view from ted_sws.data_manager.services.create_notice_collection_materialised_view import \ create_notice_collection_materialised_view, create_notice_kpi_collection -DAG_NAME = "daily_materialized_views_update" +DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update" @dag(default_args=DEFAULT_DAG_ARGUMENTS, + dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME, catchup=False, - schedule_interval="0 6 * * *", + timetable=CronTriggerTimetable( + cron=config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE, + timezone=DAG_DEFAULT_TIMEZONE), tags=['mongodb', 'daily-views-update']) def daily_materialized_views_update(): @task diff --git a/dags/fetch_notices_by_date.py b/dags/fetch_notices_by_date.py index f872dc8d..d2bca30b 100644 --- a/dags/fetch_notices_by_date.py +++ b/dags/fetch_notices_by_date.py @@ -3,19 +3,20 @@ from airflow.decorators import dag, task from airflow.operators.dummy import DummyOperator from airflow.operators.python import BranchPythonOperator, PythonOperator -from airflow.utils.trigger_rule import TriggerRule from airflow.timetables.trigger import CronTriggerTimetable +from airflow.utils.trigger_rule import TriggerRule from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_date_pipeline +from ted_sws import config, DAG_DEFAULT_TIMEZONE from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType from ted_sws.event_manager.services.log import log_error -DAG_NAME = "fetch_notices_by_date" +FETCHER_DAG_NAME = "fetch_notices_by_date" BATCH_SIZE = 2000 WILD_CARD_DAG_KEY = "wild_card" TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow" @@ -27,15 +28,18 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, + dag_id=FETCHER_DAG_NAME, catchup=False, - timetable=CronTriggerTimetable('0 1 * * *', timezone='UTC'), + timetable=CronTriggerTimetable( + cron=config.SCHEDULE_DAG_FETCH, + timezone=DAG_DEFAULT_TIMEZONE), tags=['selector', 'daily-fetch']) def fetch_notices_by_date(): @task @event_log(TechnicalEventMessage( message="fetch_notice_from_ted", metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME + process_type=EventMessageProcessType.DAG, process_name=FETCHER_DAG_NAME )) ) def fetch_by_date_notice_from_ted(): diff --git a/dags/fetch_notices_by_date_range.py b/dags/fetch_notices_by_date_range.py index 4c4f0e6c..ebb20ebd 100644 --- a/dags/fetch_notices_by_date_range.py +++ b/dags/fetch_notices_by_date_range.py @@ -9,7 +9,7 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import get_dag_param from dags.fetch_notices_by_date import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, \ - DAG_NAME as FETCH_NOTICES_BY_DATE_DAG_NAME + FETCHER_DAG_NAME as FETCH_NOTICES_BY_DATE_DAG_NAME from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType diff --git a/sonar-project.properties b/sonar-project.properties index 91776c8d..02ca1325 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -6,7 +6,7 @@ sonar.projectVersion=0.1.0 # Comma-separated paths to directories with sources (required) # Path is relative to the sonar-project.properties file. Replace "\" by "/" on Windows. -sonar.sources=ted_sws, dags, notebooks, infra +sonar.sources=ted_sws, notebooks, infra # Language sonar.language=py diff --git a/ted_sws/__init__.py b/ted_sws/__init__.py index ac434fda..a0b8c60a 100644 --- a/ted_sws/__init__.py +++ b/ted_sws/__init__.py @@ -35,6 +35,9 @@ PROJECT_PATH = pathlib.Path(__file__).parent.resolve() SPARQL_PREFIXES_PATH = PROJECT_PATH / "resources" / "prefixes" / "prefixes.json" +DAG_FETCH_DEFAULT_TIMETABLE = "0 1 * * *" +DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE = "0 6 * * *" +DAG_DEFAULT_TIMEZONE = "UTC" class MongoDBConfig: @@ -263,9 +266,19 @@ def S3_PUBLISH_ENABLED(self, config_value: str) -> bool: return config_value.lower() in ["1", "true"] +class DagSchedulingConfig: + + @env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value=DAG_FETCH_DEFAULT_TIMETABLE) + def SCHEDULE_DAG_FETCH(self, config_value: str) -> str: + return config_value + + @env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value=DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE) + def SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE(self, config_value: str) -> str: + return config_value + class TedConfigResolver(MongoDBConfig, RMLMapperConfig, XMLProcessorConfig, ELKConfig, LoggingConfig, GitHubArtefacts, API, AllegroConfig, TedAPIConfig, SFTPConfig, FusekiConfig, - SPARQLConfig, LimesAlignmentConfig, S3PublishConfig): + SPARQLConfig, LimesAlignmentConfig, S3PublishConfig, DagSchedulingConfig): """ This class resolve the secrets of the ted-sws project. """ diff --git a/tests/__init__.py b/tests/__init__.py index 8582a881..660d2617 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -19,6 +19,8 @@ TEST_DATA_PATH = TESTS_PATH / 'test_data' +AIRFLOW_DAG_FOLDER = TESTS_PATH.parent.resolve() / "dags" + class temporary_copy(object): """ diff --git a/tests/unit/dags/_test_daily_materialised_views_update_schedule.py b/tests/unit/dags/_test_daily_materialised_views_update_schedule.py new file mode 100644 index 00000000..4aa684e0 --- /dev/null +++ b/tests/unit/dags/_test_daily_materialised_views_update_schedule.py @@ -0,0 +1,87 @@ +# import os +# +# from airflow import DAG +# from airflow.models import DagBag, Variable +# from airflow.timetables.trigger import CronTriggerTimetable +# +# from ted_sws import DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE +# +# +# def test_daily_materialised_view_change_timetable_from_airflow_variable_after_reparse(dag_bag: DagBag, +# dag_materialised_view_update_schedule_variable_name: str, +# daily_materialised_views_dag_id: str, +# example_dag_cron_table: CronTriggerTimetable, +# airflow_timetable_import_error_message: str): +# daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) +# +# assert daily_materialised_view_dag is not None +# assert daily_materialised_view_dag.schedule_interval != example_dag_cron_table._expression +# +# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_dag_cron_table._expression) +# dag_bag.collect_dags(only_if_updated=False) +# daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) +# +# assert daily_materialised_view_dag is not None +# assert daily_materialised_view_dag.schedule_interval == example_dag_cron_table._expression +# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) +# +# +# def test_daily_materialised_view_change_timetable_from_env_variable_after_reparse(dag_bag: DagBag, +# dag_materialised_view_update_schedule_variable_name: str, +# daily_materialised_views_dag_id: str, +# example_dag_cron_table: CronTriggerTimetable, +# airflow_timetable_import_error_message: str): +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression +# +# os.environ[dag_materialised_view_update_schedule_variable_name] = example_dag_cron_table._expression +# dag_bag.collect_dags(only_if_updated=False) +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression +# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) +# +# +# def test_daily_materialised_view_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag, +# dag_materialised_view_update_schedule_variable_name: str, +# daily_materialised_views_dag_id: str, +# airflow_timetable_import_error_message: str): +# env_var_value = os.getenv(dag_materialised_view_update_schedule_variable_name) +# is_env_var_set: bool = True if env_var_value is not None else False +# if is_env_var_set: +# del os.environ[dag_materialised_view_update_schedule_variable_name] +# airflow_var_value = Variable.get(key=dag_materialised_view_update_schedule_variable_name, default_var=None) +# is_airflow_var_set: bool = True if airflow_var_value is not None else False +# if is_airflow_var_set: +# Variable.delete(key=dag_materialised_view_update_schedule_variable_name) +# +# dag_bag.collect_dags(only_if_updated=False) +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval == DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE +# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) +# +# if is_env_var_set: +# os.environ[dag_materialised_view_update_schedule_variable_name] = env_var_value +# if is_airflow_var_set: +# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=airflow_var_value) +# +# +# def test_daily_materialised_view_gets_incorrect_timetable_after_reparse(dag_bag: DagBag, +# dag_materialised_view_update_schedule_variable_name: str, +# daily_materialised_views_dag_id: str, +# example_wrong_cron_table: str, +# airflow_timetable_import_error_message: str): +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id) +# +# assert fetcher_dag is not None +# +# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_wrong_cron_table) +# +# dag_bag.collect_dags(only_if_updated=False) +# +# assert any(airflow_timetable_import_error_message in error for error in dag_bag.import_errors.values()) diff --git a/tests/unit/dags/_test_fetch_notices_by_date_schedule.py b/tests/unit/dags/_test_fetch_notices_by_date_schedule.py new file mode 100644 index 00000000..0266dcb9 --- /dev/null +++ b/tests/unit/dags/_test_fetch_notices_by_date_schedule.py @@ -0,0 +1,89 @@ +# import os +# +# from airflow import DAG +# from airflow.models import DagBag, Variable +# from airflow.timetables.trigger import CronTriggerTimetable +# +# from ted_sws import DAG_FETCH_DEFAULT_TIMETABLE +# +# +# def test_fetcher_change_timetable_from_airflow_variable_after_reparse(dag_bag: DagBag, +# dag_fetch_schedule_variable_name: str, +# fetcher_dag_name: str, +# example_dag_cron_table: CronTriggerTimetable, +# airflow_timetable_import_error_message: str): +# dag_bag.collect_dags(only_if_updated=False) +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression +# +# Variable.set(key=dag_fetch_schedule_variable_name, value=example_dag_cron_table._expression) +# dag_bag.collect_dags(only_if_updated=False) +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression +# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) +# +# +# def test_fetcher_change_timetable_from_env_variable_after_reparse(dag_bag: DagBag, +# dag_fetch_schedule_variable_name: str, +# fetcher_dag_name: str, +# example_dag_cron_table: CronTriggerTimetable, +# airflow_timetable_import_error_message: str): +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression +# +# os.environ[dag_fetch_schedule_variable_name] = example_dag_cron_table._expression +# dag_bag.collect_dags(only_if_updated=False) +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression +# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) +# +# del os.environ[dag_fetch_schedule_variable_name] +# +# +# def test_fetcher_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag, +# dag_fetch_schedule_variable_name: str, +# fetcher_dag_name: str, +# airflow_timetable_import_error_message: str): +# env_var_value = os.getenv(dag_fetch_schedule_variable_name) +# is_env_var_set: bool = True if env_var_value is not None else False +# if is_env_var_set: +# del os.environ[dag_fetch_schedule_variable_name] +# airflow_var_value = Variable.get(key=dag_fetch_schedule_variable_name, default_var=None) +# is_airflow_var_set: bool = True if airflow_var_value is not None else False +# if is_airflow_var_set: +# Variable.delete(key=dag_fetch_schedule_variable_name) +# +# dag_bag.collect_dags(only_if_updated=False) +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) +# +# assert fetcher_dag is not None +# assert fetcher_dag.schedule_interval == DAG_FETCH_DEFAULT_TIMETABLE +# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values()) +# +# if is_env_var_set: +# os.environ[dag_fetch_schedule_variable_name] = env_var_value +# if is_airflow_var_set: +# Variable.set(key=dag_fetch_schedule_variable_name, value=airflow_var_value) +# +# +# def test_fetcher_gets_incorrect_timetable_after_reparse(dag_bag: DagBag, +# dag_fetch_schedule_variable_name: str, +# fetcher_dag_name: str, +# example_wrong_cron_table: str, +# airflow_timetable_import_error_message: str): +# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name) +# +# assert fetcher_dag is not None +# +# Variable.set(key=dag_fetch_schedule_variable_name, value=example_wrong_cron_table) +# dag_bag.collect_dags(only_if_updated=False) +# +# assert any(airflow_timetable_import_error_message in error for error in dag_bag.import_errors.values()) diff --git a/tests/unit/dags/conftest.py b/tests/unit/dags/conftest.py index 6bc34d79..bbe3825a 100644 --- a/tests/unit/dags/conftest.py +++ b/tests/unit/dags/conftest.py @@ -1,25 +1,62 @@ -# import os -# -# import pytest -# -# from airflow.models import DagBag -# from airflow.utils import db -# import logging - -from tests import TESTS_PATH - -AIRFLOW_DAG_FOLDER = TESTS_PATH.parent.resolve() / "dags" - - -# @pytest.fixture(scope="session") -# def dag_bag(): -# os.environ["AIRFLOW_HOME"] = str(AIRFLOW_DAG_FOLDER) -# os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False" -# # Initialising the Airflow DB so that it works properly with the new AIRFLOW_HOME -# logging.disable(logging.CRITICAL) -# db.resetdb() -# db.initdb() -# logging.disable(logging.NOTSET) -# dag_bag = DagBag(dag_folder=AIRFLOW_DAG_FOLDER, include_examples=False, -# read_dags_from_db=False) -# return dag_bag +import pytest +from airflow.timetables.trigger import CronTriggerTimetable + +from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME +from dags.fetch_notices_by_date import FETCHER_DAG_NAME + + +# @pytest.fixture +# def dag_bag(dag_materialised_view_update_schedule_variable_name, dag_fetch_schedule_variable_name) -> DagBag: +# Variable.delete(key=dag_materialised_view_update_schedule_variable_name) +# Variable.delete(key=dag_fetch_schedule_variable_name) +# return DagBag( +# dag_folder=AIRFLOW_DAG_FOLDER, +# include_examples=False, +# read_dags_from_db=False, +# collect_dags=True) + + +@pytest.fixture +def fetcher_dag_name() -> str: + return FETCHER_DAG_NAME + + +@pytest.fixture +def daily_materialised_views_dag_id() -> str: + return DAILY_MATERIALISED_VIEWS_DAG_NAME + + +@pytest.fixture +def example_cron_table() -> str: + return "15 14 1 * *" + + +@pytest.fixture +def example_wrong_cron_table() -> str: + return "wrong_cron" + + +@pytest.fixture +def example_dag_cron_table(example_cron_table) -> CronTriggerTimetable: + return CronTriggerTimetable(cron=example_cron_table, timezone="UTC") + + +@pytest.fixture +def airflow_timetable_import_error_message() -> str: + return "FormatException" + + +@pytest.fixture +def dag_fetch_schedule_variable_name() -> str: + """ + According to MM of meeting with OP from 2024.12.28 + """ + return "SCHEDULE_DAG_FETCH" + + +@pytest.fixture +def dag_materialised_view_update_schedule_variable_name() -> str: + """ + According to MM of meeting with OP from 2024.12.28 + """ + return "SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE" diff --git a/tests/unit/dags/test_cron_variables.py b/tests/unit/dags/test_cron_variables.py new file mode 100644 index 00000000..66419d83 --- /dev/null +++ b/tests/unit/dags/test_cron_variables.py @@ -0,0 +1,27 @@ +import pytest +from airflow.timetables.trigger import CronTriggerTimetable + +from ted_sws import config + + +def test_valid_cron_expression(example_cron_table: str, example_dag_cron_table: CronTriggerTimetable): + """Test that a valid cron expression is correctly parsed into a CronTriggerTimetable""" + assert isinstance(example_dag_cron_table, CronTriggerTimetable) + assert example_dag_cron_table._expression == example_cron_table + assert example_dag_cron_table._timezone.name == "UTC" + assert example_dag_cron_table.description != "" + + +def test_invalid_cron_expression(example_wrong_cron_table: str): + """Test that an invalid cron expression raises an error""" + with pytest.raises(Exception): + CronTriggerTimetable(cron=example_wrong_cron_table, timezone="UTC") + + +def test_schedule_variable_names(dag_fetch_schedule_variable_name: str, + dag_materialised_view_update_schedule_variable_name: str): + """Test that schedule variable names are properly set""" + + assert f'{config.SCHEDULE_DAG_FETCH=}'.split('=')[0] == f"config.{dag_fetch_schedule_variable_name}" + assert f'{config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE=}'.split('=')[ + 0] == f"config.{dag_materialised_view_update_schedule_variable_name}"