diff --git a/dags/daily_materialized_views_update.py b/dags/daily_materialized_views_update.py index 836c090d..ff10297d 100644 --- a/dags/daily_materialized_views_update.py +++ b/dags/daily_materialized_views_update.py @@ -1,8 +1,9 @@ from airflow.decorators import dag, task +from airflow.timetables.trigger import CronTriggerTimetable from pymongo import MongoClient from dags import DEFAULT_DAG_ARGUMENTS -from ted_sws import config +from ted_sws import config, DAG_DEFAULT_TIMEZONE from ted_sws.data_manager.services.create_batch_collection_materialised_view import \ create_batch_collection_materialised_view from ted_sws.data_manager.services.create_notice_collection_materialised_view import \ @@ -12,8 +13,11 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, + dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME, catchup=False, - schedule_interval="0 6 * * *", + timetable=CronTriggerTimetable( + cron=config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE, + timezone=DAG_DEFAULT_TIMEZONE), tags=['mongodb', 'daily-views-update']) def daily_materialized_views_update(): @task diff --git a/dags/fetch_notices_by_date.py b/dags/fetch_notices_by_date.py index 86c8115e..d2bca30b 100644 --- a/dags/fetch_notices_by_date.py +++ b/dags/fetch_notices_by_date.py @@ -10,7 +10,7 @@ from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_date_pipeline -from ted_sws import config +from ted_sws import config, DAG_DEFAULT_TIMEZONE from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType @@ -26,15 +26,13 @@ FINISH_FETCH_BY_DATE_TASK_ID = "finish_fetch_by_date" VALIDATE_FETCHED_NOTICES_TASK_ID = "validate_fetched_notices" -DAG_FETCH_DEFAULT_TIMEZONE = "UTC" - @dag(default_args=DEFAULT_DAG_ARGUMENTS, dag_id=FETCHER_DAG_NAME, catchup=False, timetable=CronTriggerTimetable( cron=config.SCHEDULE_DAG_FETCH, - timezone=DAG_FETCH_DEFAULT_TIMEZONE), + timezone=DAG_DEFAULT_TIMEZONE), tags=['selector', 'daily-fetch']) def fetch_notices_by_date(): @task diff --git a/ted_sws/__init__.py b/ted_sws/__init__.py index c49c4cca..a0b8c60a 100644 --- a/ted_sws/__init__.py +++ b/ted_sws/__init__.py @@ -37,6 +37,7 @@ DAG_FETCH_DEFAULT_TIMETABLE = "0 1 * * *" DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE = "0 6 * * *" +DAG_DEFAULT_TIMEZONE = "UTC" class MongoDBConfig: