Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ted 1040 #404

Merged
merged 2 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
create_notice_collection_materialised_view, create_notice_kpi_collection

DAG_NAME = "daily_materialized_view_update"
DAG_NAME = "daily_materialized_views_update"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
catchup=False,
schedule_interval="0 6 * * *",
tags=['mongodb', 'daily-views-update'])
def daily_materialized_view_update():
def daily_materialized_views_update():
@task
def create_materialised_view():
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
Expand All @@ -34,4 +34,4 @@ def aggregate_batch_logs():
create_materialised_view() >> create_kpi_collection_for_notices() >> aggregate_batch_logs()


dag = daily_materialized_view_update()
dag = daily_materialized_views_update()
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "notice_fetch_by_date_workflow"
DAG_NAME = "fetch_notices_by_date"
BATCH_SIZE = 2000
WILD_CARD_DAG_KEY = "wild_card"
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
Expand All @@ -29,7 +29,7 @@
catchup=False,
timetable=CronTriggerTimetable('0 1 * * *', timezone='UTC'),
tags=['selector', 'daily-fetch'])
def notice_fetch_by_date_workflow():
def fetch_notices_by_date():
@task
@event_log(TechnicalEventMessage(
message="fetch_notice_from_ted",
Expand Down Expand Up @@ -93,4 +93,4 @@ def _branch_selector():
trigger_complete_workflow] >> validate_fetched_notices_step >> finish_step


dag = notice_fetch_by_date_workflow()
dag = fetch_notices_by_date()
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param
from dags.notice_fetch_by_date_workflow import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY
from dags.fetch_notices_by_date import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "notice_fetch_for_date_range_orchestrator"
DAG_NAME = "fetch_notices_by_date_range"

START_DATE_KEY = "start_date"
END_DATE_KEY = "end_date"
Expand All @@ -33,7 +33,7 @@ def generate_wildcards_foreach_day_in_range(start_date: str, end_date: str) -> l


@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'])
def notice_fetch_for_date_range_orchestrator():
def fetch_notices_by_date_range():
@task
@event_log(TechnicalEventMessage(
message="trigger_fetch_notices_workers_for_date_range",
Expand All @@ -59,4 +59,4 @@ def trigger_notice_by_date_for_each_date_in_range():
trigger_notice_by_date_for_each_date_in_range()


dag = notice_fetch_for_date_range_orchestrator()
dag = fetch_notices_by_date_range()
68 changes: 68 additions & 0 deletions dags/fetch_notices_by_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from airflow.decorators import dag, task
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_query_pipeline
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "fetch_notices_by_query"
BATCH_SIZE = 2000
QUERY_DAG_KEY = "query"
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
TRIGGER_PARTIAL_WORKFLOW_TASK_ID = "trigger_partial_notice_proc_workflow"
TRIGGER_COMPLETE_WORKFLOW_TASK_ID = "trigger_complete_notice_proc_workflow"
CHECK_IF_TRIGGER_COMPLETE_WORKFLOW_TASK_ID = "check_if_trigger_complete_workflow"
FINISH_FETCH_BY_DATE_TASK_ID = "finish_fetch_by_query"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
catchup=False,
tags=['fetch'])
def fetch_notices_by_query():
@task
@event_log(TechnicalEventMessage(
message="fetch_by_query_notice_from_ted",
metadata=EventMessageMetadata(
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
))
)
def fetch_by_query_notice_from_ted():
notice_ids = notice_fetcher_by_query_pipeline(query=get_dag_param(key=QUERY_DAG_KEY, raise_error=True))
if not notice_ids:
raise Exception("No notices has been fetched!")
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)

trigger_complete_workflow = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_COMPLETE_WORKFLOW_TASK_ID,
execute_only_one_step=False
)
trigger_normalisation_workflow = TriggerNoticeBatchPipelineOperator(
task_id=TRIGGER_PARTIAL_WORKFLOW_TASK_ID,
batch_size=BATCH_SIZE,
execute_only_one_step=True)

def _branch_selector():
trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY,
default_value=True)
push_dag_downstream(key=NOTICE_IDS_KEY, value=pull_dag_upstream(key=NOTICE_IDS_KEY))
if trigger_complete_workflow:
return [TRIGGER_COMPLETE_WORKFLOW_TASK_ID]
return [TRIGGER_PARTIAL_WORKFLOW_TASK_ID]

branch_task = BranchPythonOperator(
task_id=CHECK_IF_TRIGGER_COMPLETE_WORKFLOW_TASK_ID,
python_callable=_branch_selector,
)

finish_step = DummyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)

fetch_by_query_notice_from_ted() >> branch_task >> [trigger_normalisation_workflow,
trigger_complete_workflow] >> finish_step


dag = fetch_notices_by_query()
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['fetch', 'mapping-suite', 'github'])
def load_mapping_suite_in_mongodb():
def load_mapping_suite_in_database():
@task
@event_log(is_loggable=False)
def fetch_mapping_suite_package_from_github_into_mongodb(**context_args):
Expand Down Expand Up @@ -86,4 +86,4 @@ def _branch_selector():
branch_task >> [trigger_document_proc_pipeline, finish_step]


dag = load_mapping_suite_in_mongodb()
dag = load_mapping_suite_in_database()
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID = "switch_to_validation"
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID = "switch_to_package"
SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID = "switch_to_publish"
DAG_NAME = "notice_process_workflow"
DAG_NAME = "notice_processing_pipeline"

BRANCH_SELECTOR_MAP = {NOTICE_NORMALISATION_PIPELINE_TASK_ID: NOTICE_NORMALISATION_PIPELINE_TASK_ID,
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID,
Expand All @@ -49,7 +49,7 @@ def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_I
max_active_runs=256,
max_active_tasks=256,
tags=['worker', 'pipeline'])
def notice_process_workflow():
def notice_processing_pipeline():
"""

"""
Expand Down Expand Up @@ -149,4 +149,4 @@ def _stop_processing():
notice_package_step >> selector_branch_before_publish >> notice_publish_step


dag = notice_process_workflow()
dag = notice_processing_pipeline()
16 changes: 16 additions & 0 deletions dags/pipelines/notice_fetcher_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timedelta
from typing import List


def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]:
from pymongo import MongoClient
from ted_sws import config
Expand All @@ -21,3 +22,18 @@ def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]:
notice_fetched_date=notice_publication_date)

return notice_ids


def notice_fetcher_by_query_pipeline(query: str = None) -> List[str]:
from pymongo import MongoClient
from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI
from ted_sws.notice_fetcher.services.notice_fetcher import NoticeFetcher

ted_api_query = {"q": query}
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_ids = NoticeFetcher(notice_repository=NoticeRepository(mongodb_client=mongodb_client),
ted_api_adapter=TedAPIAdapter(
request_api=TedRequestAPI())).fetch_notices_by_query(query=ted_api_query)
return notice_ids
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "selector_raw_notices_process_orchestrator"
DAG_NAME = "reprocess_unnormalised_notices_from_backlog"

TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
FORM_NUMBER_DAG_PARAM = "form_number"
Expand All @@ -20,7 +20,7 @@
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['selector', 'raw-notices'])
def selector_raw_notices_process_orchestrator():
def reprocess_unnormalised_notices_from_backlog():
@task
@event_log(TechnicalEventMessage(
message="select_all_raw_notices",
Expand All @@ -41,4 +41,4 @@ def select_all_raw_notices():
select_all_raw_notices() >> trigger_notice_process_workflow


dag = selector_raw_notices_process_orchestrator()
dag = reprocess_unnormalised_notices_from_backlog()
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_process_workflow import NOTICE_PACKAGE_PIPELINE_TASK_ID
from dags.notice_processing_pipeline import NOTICE_PACKAGE_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "selector_re_package_process_orchestrator"
DAG_NAME = "reprocess_unpackaged_notices_from_backlog"

RE_PACKAGE_TARGET_NOTICE_STATES = [NoticeStatus.VALIDATED, NoticeStatus.INELIGIBLE_FOR_PACKAGING,
NoticeStatus.ELIGIBLE_FOR_PACKAGING,
Expand All @@ -25,7 +25,7 @@
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['selector', 're-package'])
def selector_re_package_process_orchestrator():
def reprocess_unpackaged_notices_from_backlog():
@task
@event_log(TechnicalEventMessage(
message="select_notices_for_re_package",
Expand All @@ -50,4 +50,4 @@ def select_notices_for_re_package():
select_notices_for_re_package() >> trigger_notice_process_workflow


dag = selector_re_package_process_orchestrator()
dag = reprocess_unpackaged_notices_from_backlog()
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_process_workflow import NOTICE_PUBLISH_PIPELINE_TASK_ID
from dags.notice_processing_pipeline import NOTICE_PUBLISH_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
EXECUTE_ONLY_ONE_STEP_KEY
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
Expand All @@ -11,7 +11,7 @@
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "selector_re_publish_process_orchestrator"
DAG_NAME = "reprocess_unpublished_notices_from_backlog"

RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.ELIGIBLE_FOR_PUBLISHING, NoticeStatus.INELIGIBLE_FOR_PUBLISHING,
NoticeStatus.PACKAGED
Expand All @@ -26,7 +26,7 @@
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['selector', 're-publish'])
def selector_re_publish_process_orchestrator():
def reprocess_unpublished_notices_from_backlog():
@task
@event_log(TechnicalEventMessage(
message="select_notices_for_re_publish",
Expand All @@ -51,4 +51,4 @@ def select_notices_for_re_publish():
select_notices_for_re_publish() >> trigger_notice_process_workflow


etl_dag = selector_re_publish_process_orchestrator()
etl_dag = reprocess_unpublished_notices_from_backlog()
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_process_workflow import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
EXECUTE_ONLY_ONE_STEP_KEY
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
Expand All @@ -11,7 +11,7 @@
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "selector_re_transform_process_orchestrator"
DAG_NAME = "reprocess_untransformed_notices_from_backlog"

RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
Expand All @@ -27,7 +27,7 @@
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['selector', 're-transform'])
def selector_re_transform_process_orchestrator():
def reprocess_untransformed_notices_from_backlog():
@task
@event_log(TechnicalEventMessage(
message="select_notices_for_re_transform",
Expand All @@ -52,4 +52,4 @@ def select_notices_for_re_transform():
select_notices_for_re_transform() >> trigger_notice_process_workflow


dag = selector_re_transform_process_orchestrator()
dag = reprocess_untransformed_notices_from_backlog()
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_process_workflow import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
EXECUTE_ONLY_ONE_STEP_KEY
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
Expand All @@ -11,7 +11,7 @@
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "selector_re_validate_process_orchestrator"
DAG_NAME = "reprocess_unvalidated_notices_from_backlog"

RE_VALIDATE_TARGET_NOTICE_STATES = [NoticeStatus.DISTILLED]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
Expand All @@ -24,7 +24,7 @@
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
tags=['selector', 're-validate'])
def selector_re_validate_process_orchestrator():
def reprocess_unvalidated_notices_from_backlog():
@task
@event_log(TechnicalEventMessage(
message="select_notices_for_re_validate",
Expand All @@ -49,4 +49,4 @@ def select_notices_for_re_validate():
select_notices_for_re_validate() >> trigger_notice_process_workflow


dag = selector_re_validate_process_orchestrator()
dag = reprocess_unvalidated_notices_from_backlog()
2 changes: 1 addition & 1 deletion tests/e2e/dags/_test_load_mapping_suite_in_mongodb.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dags.load_mapping_suite_in_mongodb import \
from dags.load_mapping_suite_in_database import \
FETCH_MAPPING_SUITE_PACKAGE_FROM_GITHUB_INTO_MONGODB, MAPPING_SUITE_PACKAGE_NAME_DAG_PARAM_KEY
from ted_sws import config
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
Expand Down