Skip to content

Commit

Permalink
implement scheduling for daily materialised view update + make defaul…
Browse files Browse the repository at this point in the history
…t dag timezone global
  • Loading branch information
duprijil committed Dec 19, 2024
1 parent b954983 commit ed05bcb
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 6 deletions.
8 changes: 6 additions & 2 deletions dags/daily_materialized_views_update.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from airflow.decorators import dag, task
from airflow.timetables.trigger import CronTriggerTimetable
from pymongo import MongoClient

from dags import DEFAULT_DAG_ARGUMENTS
from ted_sws import config
from ted_sws import config, DAG_DEFAULT_TIMEZONE
from ted_sws.data_manager.services.create_batch_collection_materialised_view import \
create_batch_collection_materialised_view
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
Expand All @@ -12,8 +13,11 @@


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME,
catchup=False,
schedule_interval="0 6 * * *",
timetable=CronTriggerTimetable(
cron=config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE,
timezone=DAG_DEFAULT_TIMEZONE),
tags=['mongodb', 'daily-views-update'])
def daily_materialized_views_update():
@task
Expand Down
6 changes: 2 additions & 4 deletions dags/fetch_notices_by_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_date_pipeline
from ted_sws import config
from ted_sws import config, DAG_DEFAULT_TIMEZONE
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
Expand All @@ -26,15 +26,13 @@
FINISH_FETCH_BY_DATE_TASK_ID = "finish_fetch_by_date"
VALIDATE_FETCHED_NOTICES_TASK_ID = "validate_fetched_notices"

DAG_FETCH_DEFAULT_TIMEZONE = "UTC"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
dag_id=FETCHER_DAG_NAME,
catchup=False,
timetable=CronTriggerTimetable(
cron=config.SCHEDULE_DAG_FETCH,
timezone=DAG_FETCH_DEFAULT_TIMEZONE),
timezone=DAG_DEFAULT_TIMEZONE),
tags=['selector', 'daily-fetch'])
def fetch_notices_by_date():
@task
Expand Down
1 change: 1 addition & 0 deletions ted_sws/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

DAG_FETCH_DEFAULT_TIMETABLE = "0 1 * * *"
DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE = "0 6 * * *"
DAG_DEFAULT_TIMEZONE = "UTC"

class MongoDBConfig:

Expand Down

0 comments on commit ed05bcb

Please sign in to comment.