Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue #557 #562

Merged
merged 11 commits into from
Jan 15, 2025
10 changes: 7 additions & 3 deletions dags/daily_materialized_views_update.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
from airflow.decorators import dag, task
from airflow.timetables.trigger import CronTriggerTimetable
from pymongo import MongoClient

from dags import DEFAULT_DAG_ARGUMENTS
from ted_sws import config
from ted_sws import config, DAG_DEFAULT_TIMEZONE
from ted_sws.data_manager.services.create_batch_collection_materialised_view import \
create_batch_collection_materialised_view
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
create_notice_collection_materialised_view, create_notice_kpi_collection

DAG_NAME = "daily_materialized_views_update"
DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME,
catchup=False,
schedule_interval="0 6 * * *",
timetable=CronTriggerTimetable(
cron=config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE,
timezone=DAG_DEFAULT_TIMEZONE),
tags=['mongodb', 'daily-views-update'])
def daily_materialized_views_update():
@task
Expand Down
12 changes: 8 additions & 4 deletions dags/fetch_notices_by_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@
from airflow.decorators import dag, task
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.utils.trigger_rule import TriggerRule

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_date_pipeline
from ted_sws import config, DAG_DEFAULT_TIMEZONE
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
from ted_sws.event_manager.services.log import log_error

DAG_NAME = "fetch_notices_by_date"
FETCHER_DAG_NAME = "fetch_notices_by_date"
BATCH_SIZE = 2000
WILD_CARD_DAG_KEY = "wild_card"
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
Expand All @@ -27,15 +28,18 @@


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
dag_id=FETCHER_DAG_NAME,
catchup=False,
timetable=CronTriggerTimetable('0 1 * * *', timezone='UTC'),
timetable=CronTriggerTimetable(
cron=config.SCHEDULE_DAG_FETCH,
timezone=DAG_DEFAULT_TIMEZONE),
tags=['selector', 'daily-fetch'])
def fetch_notices_by_date():
@task
@event_log(TechnicalEventMessage(
message="fetch_notice_from_ted",
metadata=EventMessageMetadata(
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
process_type=EventMessageProcessType.DAG, process_name=FETCHER_DAG_NAME
))
)
def fetch_by_date_notice_from_ted():
Expand Down
2 changes: 1 addition & 1 deletion dags/fetch_notices_by_date_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param
from dags.fetch_notices_by_date import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, \
DAG_NAME as FETCH_NOTICES_BY_DATE_DAG_NAME
FETCHER_DAG_NAME as FETCH_NOTICES_BY_DATE_DAG_NAME
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
Expand Down
2 changes: 1 addition & 1 deletion sonar-project.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ sonar.projectVersion=0.1.0

# Comma-separated paths to directories with sources (required)
# Path is relative to the sonar-project.properties file. Replace "\" by "/" on Windows.
sonar.sources=ted_sws, dags, notebooks, infra
sonar.sources=ted_sws, notebooks, infra

# Language
sonar.language=py
Expand Down
15 changes: 14 additions & 1 deletion ted_sws/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
PROJECT_PATH = pathlib.Path(__file__).parent.resolve()
SPARQL_PREFIXES_PATH = PROJECT_PATH / "resources" / "prefixes" / "prefixes.json"

DAG_FETCH_DEFAULT_TIMETABLE = "0 1 * * *"
DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE = "0 6 * * *"
DAG_DEFAULT_TIMEZONE = "UTC"

class MongoDBConfig:

Expand Down Expand Up @@ -263,9 +266,19 @@ def S3_PUBLISH_ENABLED(self, config_value: str) -> bool:
return config_value.lower() in ["1", "true"]


class DagSchedulingConfig:

@env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value=DAG_FETCH_DEFAULT_TIMETABLE)
def SCHEDULE_DAG_FETCH(self, config_value: str) -> str:
return config_value

@env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value=DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE)
def SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE(self, config_value: str) -> str:
return config_value

class TedConfigResolver(MongoDBConfig, RMLMapperConfig, XMLProcessorConfig, ELKConfig, LoggingConfig,
GitHubArtefacts, API, AllegroConfig, TedAPIConfig, SFTPConfig, FusekiConfig,
SPARQLConfig, LimesAlignmentConfig, S3PublishConfig):
SPARQLConfig, LimesAlignmentConfig, S3PublishConfig, DagSchedulingConfig):
"""
This class resolve the secrets of the ted-sws project.
"""
Expand Down
2 changes: 2 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

TEST_DATA_PATH = TESTS_PATH / 'test_data'

AIRFLOW_DAG_FOLDER = TESTS_PATH.parent.resolve() / "dags"


class temporary_copy(object):
"""
Expand Down
87 changes: 87 additions & 0 deletions tests/unit/dags/_test_daily_materialised_views_update_schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# import os
#
# from airflow import DAG
# from airflow.models import DagBag, Variable
# from airflow.timetables.trigger import CronTriggerTimetable
#
# from ted_sws import DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE
#
#
# def test_daily_materialised_view_change_timetable_from_airflow_variable_after_reparse(dag_bag: DagBag,
# dag_materialised_view_update_schedule_variable_name: str,
# daily_materialised_views_dag_id: str,
# example_dag_cron_table: CronTriggerTimetable,
# airflow_timetable_import_error_message: str):
# daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
#
# assert daily_materialised_view_dag is not None
# assert daily_materialised_view_dag.schedule_interval != example_dag_cron_table._expression
#
# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_dag_cron_table._expression)
# dag_bag.collect_dags(only_if_updated=False)
# daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
#
# assert daily_materialised_view_dag is not None
# assert daily_materialised_view_dag.schedule_interval == example_dag_cron_table._expression
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values())
#
#
# def test_daily_materialised_view_change_timetable_from_env_variable_after_reparse(dag_bag: DagBag,
# dag_materialised_view_update_schedule_variable_name: str,
# daily_materialised_views_dag_id: str,
# example_dag_cron_table: CronTriggerTimetable,
# airflow_timetable_import_error_message: str):
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
#
# assert fetcher_dag is not None
# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression
#
# os.environ[dag_materialised_view_update_schedule_variable_name] = example_dag_cron_table._expression
# dag_bag.collect_dags(only_if_updated=False)
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
#
# assert fetcher_dag is not None
# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values())
#
#
# def test_daily_materialised_view_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag,
# dag_materialised_view_update_schedule_variable_name: str,
# daily_materialised_views_dag_id: str,
# airflow_timetable_import_error_message: str):
# env_var_value = os.getenv(dag_materialised_view_update_schedule_variable_name)
# is_env_var_set: bool = True if env_var_value is not None else False
# if is_env_var_set:
# del os.environ[dag_materialised_view_update_schedule_variable_name]
# airflow_var_value = Variable.get(key=dag_materialised_view_update_schedule_variable_name, default_var=None)
# is_airflow_var_set: bool = True if airflow_var_value is not None else False
# if is_airflow_var_set:
# Variable.delete(key=dag_materialised_view_update_schedule_variable_name)
#
# dag_bag.collect_dags(only_if_updated=False)
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
#
# assert fetcher_dag is not None
# assert fetcher_dag.schedule_interval == DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values())
#
# if is_env_var_set:
# os.environ[dag_materialised_view_update_schedule_variable_name] = env_var_value
# if is_airflow_var_set:
# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=airflow_var_value)
#
#
# def test_daily_materialised_view_gets_incorrect_timetable_after_reparse(dag_bag: DagBag,
# dag_materialised_view_update_schedule_variable_name: str,
# daily_materialised_views_dag_id: str,
# example_wrong_cron_table: str,
# airflow_timetable_import_error_message: str):
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
#
# assert fetcher_dag is not None
#
# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_wrong_cron_table)
#
# dag_bag.collect_dags(only_if_updated=False)
#
# assert any(airflow_timetable_import_error_message in error for error in dag_bag.import_errors.values())
89 changes: 89 additions & 0 deletions tests/unit/dags/_test_fetch_notices_by_date_schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# import os
#
# from airflow import DAG
# from airflow.models import DagBag, Variable
# from airflow.timetables.trigger import CronTriggerTimetable
#
# from ted_sws import DAG_FETCH_DEFAULT_TIMETABLE
#
#
# def test_fetcher_change_timetable_from_airflow_variable_after_reparse(dag_bag: DagBag,
# dag_fetch_schedule_variable_name: str,
# fetcher_dag_name: str,
# example_dag_cron_table: CronTriggerTimetable,
# airflow_timetable_import_error_message: str):
# dag_bag.collect_dags(only_if_updated=False)
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name)
#
# assert fetcher_dag is not None
# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression
#
# Variable.set(key=dag_fetch_schedule_variable_name, value=example_dag_cron_table._expression)
# dag_bag.collect_dags(only_if_updated=False)
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name)
#
# assert fetcher_dag is not None
# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values())
#
#
# def test_fetcher_change_timetable_from_env_variable_after_reparse(dag_bag: DagBag,
# dag_fetch_schedule_variable_name: str,
# fetcher_dag_name: str,
# example_dag_cron_table: CronTriggerTimetable,
# airflow_timetable_import_error_message: str):
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name)
#
# assert fetcher_dag is not None
# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression
#
# os.environ[dag_fetch_schedule_variable_name] = example_dag_cron_table._expression
# dag_bag.collect_dags(only_if_updated=False)
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name)
#
# assert fetcher_dag is not None
# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values())
#
# del os.environ[dag_fetch_schedule_variable_name]
#
#
# def test_fetcher_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag,
# dag_fetch_schedule_variable_name: str,
# fetcher_dag_name: str,
# airflow_timetable_import_error_message: str):
# env_var_value = os.getenv(dag_fetch_schedule_variable_name)
# is_env_var_set: bool = True if env_var_value is not None else False
# if is_env_var_set:
# del os.environ[dag_fetch_schedule_variable_name]
# airflow_var_value = Variable.get(key=dag_fetch_schedule_variable_name, default_var=None)
# is_airflow_var_set: bool = True if airflow_var_value is not None else False
# if is_airflow_var_set:
# Variable.delete(key=dag_fetch_schedule_variable_name)
#
# dag_bag.collect_dags(only_if_updated=False)
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name)
#
# assert fetcher_dag is not None
# assert fetcher_dag.schedule_interval == DAG_FETCH_DEFAULT_TIMETABLE
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values())
#
# if is_env_var_set:
# os.environ[dag_fetch_schedule_variable_name] = env_var_value
# if is_airflow_var_set:
# Variable.set(key=dag_fetch_schedule_variable_name, value=airflow_var_value)
#
#
# def test_fetcher_gets_incorrect_timetable_after_reparse(dag_bag: DagBag,
# dag_fetch_schedule_variable_name: str,
# fetcher_dag_name: str,
# example_wrong_cron_table: str,
# airflow_timetable_import_error_message: str):
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name)
#
# assert fetcher_dag is not None
#
# Variable.set(key=dag_fetch_schedule_variable_name, value=example_wrong_cron_table)
# dag_bag.collect_dags(only_if_updated=False)
#
# assert any(airflow_timetable_import_error_message in error for error in dag_bag.import_errors.values())
87 changes: 62 additions & 25 deletions tests/unit/dags/conftest.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,62 @@
# import os
#
# import pytest
#
# from airflow.models import DagBag
# from airflow.utils import db
# import logging

from tests import TESTS_PATH

AIRFLOW_DAG_FOLDER = TESTS_PATH.parent.resolve() / "dags"


# @pytest.fixture(scope="session")
# def dag_bag():
# os.environ["AIRFLOW_HOME"] = str(AIRFLOW_DAG_FOLDER)
# os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
# # Initialising the Airflow DB so that it works properly with the new AIRFLOW_HOME
# logging.disable(logging.CRITICAL)
# db.resetdb()
# db.initdb()
# logging.disable(logging.NOTSET)
# dag_bag = DagBag(dag_folder=AIRFLOW_DAG_FOLDER, include_examples=False,
# read_dags_from_db=False)
# return dag_bag
import pytest
from airflow.timetables.trigger import CronTriggerTimetable

from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME
from dags.fetch_notices_by_date import FETCHER_DAG_NAME


# @pytest.fixture
# def dag_bag(dag_materialised_view_update_schedule_variable_name, dag_fetch_schedule_variable_name) -> DagBag:
# Variable.delete(key=dag_materialised_view_update_schedule_variable_name)
# Variable.delete(key=dag_fetch_schedule_variable_name)
# return DagBag(
# dag_folder=AIRFLOW_DAG_FOLDER,
# include_examples=False,
# read_dags_from_db=False,
# collect_dags=True)


@pytest.fixture
def fetcher_dag_name() -> str:
return FETCHER_DAG_NAME


@pytest.fixture
def daily_materialised_views_dag_id() -> str:
return DAILY_MATERIALISED_VIEWS_DAG_NAME


@pytest.fixture
def example_cron_table() -> str:
return "15 14 1 * *"


@pytest.fixture
def example_wrong_cron_table() -> str:
return "wrong_cron"


@pytest.fixture
def example_dag_cron_table(example_cron_table) -> CronTriggerTimetable:
return CronTriggerTimetable(cron=example_cron_table, timezone="UTC")


@pytest.fixture
def airflow_timetable_import_error_message() -> str:
return "FormatException"


@pytest.fixture
def dag_fetch_schedule_variable_name() -> str:
"""
According to MM of meeting with OP from 2024.12.28
"""
return "SCHEDULE_DAG_FETCH"


@pytest.fixture
def dag_materialised_view_update_schedule_variable_name() -> str:
"""
According to MM of meeting with OP from 2024.12.28
"""
return "SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE"
Loading
Loading