From e01b55c2708594b04cccd3d802dc6d92abc2b5a9 Mon Sep 17 00:00:00 2001 From: CaptainOfHacks Date: Wed, 14 Dec 2022 00:49:05 +0200 Subject: [PATCH 1/2] rename DAGs --- ..._view_update.py => daily_materialized_views_update.py} | 6 +++--- ...fetch_by_date_workflow.py => fetch_notices_by_date.py} | 6 +++--- ...nge_orchestrator.py => fetch_notices_by_date_range.py} | 8 ++++---- ...te_in_mongodb.py => load_mapping_suite_in_database.py} | 4 ++-- ..._process_workflow.py => notice_processing_pipeline.py} | 6 +++--- ....py => reprocess_unnormalised_notices_from_backlog.py} | 6 +++--- ...or.py => reprocess_unpackaged_notices_from_backlog.py} | 8 ++++---- ...r.py => reprocess_unpublished_notices_from_backlog.py} | 8 ++++---- ...py => reprocess_untransformed_notices_from_backlog.py} | 8 ++++---- ...r.py => reprocess_unvalidated_notices_from_backlog.py} | 8 ++++---- tests/e2e/dags/_test_load_mapping_suite_in_mongodb.py | 2 +- 11 files changed, 35 insertions(+), 35 deletions(-) rename dags/{daily_materialized_view_update.py => daily_materialized_views_update.py} (90%) rename dags/{notice_fetch_by_date_workflow.py => fetch_notices_by_date.py} (97%) rename dags/{notice_fetch_for_date_range_orchestrator.py => fetch_notices_by_date_range.py} (90%) rename dags/{load_mapping_suite_in_mongodb.py => load_mapping_suite_in_database.py} (98%) rename dags/{notice_process_workflow.py => notice_processing_pipeline.py} (98%) rename dags/{selector_raw_notices_process_orchestrator.py => reprocess_unnormalised_notices_from_backlog.py} (91%) rename dags/{selector_repackage_process_orchestrator.py => reprocess_unpackaged_notices_from_backlog.py} (90%) rename dags/{selector_republish_process_orchestrator.py => reprocess_unpublished_notices_from_backlog.py} (90%) rename dags/{selector_retransform_process_orchestrator.py => reprocess_untransformed_notices_from_backlog.py} (90%) rename dags/{selector_revalidate_process_orchestrator.py => reprocess_unvalidated_notices_from_backlog.py} (90%) diff --git a/dags/daily_materialized_view_update.py b/dags/daily_materialized_views_update.py similarity index 90% rename from dags/daily_materialized_view_update.py rename to dags/daily_materialized_views_update.py index 16a45513e..2618f0f0c 100644 --- a/dags/daily_materialized_view_update.py +++ b/dags/daily_materialized_views_update.py @@ -8,14 +8,14 @@ from ted_sws.data_manager.services.create_notice_collection_materialised_view import \ create_notice_collection_materialised_view, create_notice_kpi_collection -DAG_NAME = "daily_materialized_view_update" +DAG_NAME = "daily_materialized_views_update" @dag(default_args=DEFAULT_DAG_ARGUMENTS, catchup=False, schedule_interval="0 6 * * *", tags=['mongodb', 'daily-views-update']) -def daily_materialized_view_update(): +def daily_materialized_views_update(): @task def create_materialised_view(): mongo_client = MongoClient(config.MONGO_DB_AUTH_URL) @@ -34,4 +34,4 @@ def aggregate_batch_logs(): create_materialised_view() >> create_kpi_collection_for_notices() >> aggregate_batch_logs() -dag = daily_materialized_view_update() +dag = daily_materialized_views_update() diff --git a/dags/notice_fetch_by_date_workflow.py b/dags/fetch_notices_by_date.py similarity index 97% rename from dags/notice_fetch_by_date_workflow.py rename to dags/fetch_notices_by_date.py index 5aacad087..a54eec04c 100644 --- a/dags/notice_fetch_by_date_workflow.py +++ b/dags/fetch_notices_by_date.py @@ -14,7 +14,7 @@ from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType -DAG_NAME = "notice_fetch_by_date_workflow" +DAG_NAME = "fetch_notices_by_date" BATCH_SIZE = 2000 WILD_CARD_DAG_KEY = "wild_card" TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow" @@ -29,7 +29,7 @@ catchup=False, timetable=CronTriggerTimetable('0 1 * * *', timezone='UTC'), tags=['selector', 'daily-fetch']) -def notice_fetch_by_date_workflow(): +def fetch_notices_by_date(): @task @event_log(TechnicalEventMessage( message="fetch_notice_from_ted", @@ -93,4 +93,4 @@ def _branch_selector(): trigger_complete_workflow] >> validate_fetched_notices_step >> finish_step -dag = notice_fetch_by_date_workflow() +dag = fetch_notices_by_date() diff --git a/dags/notice_fetch_for_date_range_orchestrator.py b/dags/fetch_notices_by_date_range.py similarity index 90% rename from dags/notice_fetch_for_date_range_orchestrator.py rename to dags/fetch_notices_by_date_range.py index 0aed761c6..65f3059ba 100644 --- a/dags/notice_fetch_for_date_range_orchestrator.py +++ b/dags/fetch_notices_by_date_range.py @@ -8,12 +8,12 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import get_dag_param -from dags.notice_fetch_by_date_workflow import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY +from dags.fetch_notices_by_date import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType -DAG_NAME = "notice_fetch_for_date_range_orchestrator" +DAG_NAME = "fetch_notices_by_date_range" START_DATE_KEY = "start_date" END_DATE_KEY = "end_date" @@ -33,7 +33,7 @@ def generate_wildcards_foreach_day_in_range(start_date: str, end_date: str) -> l @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master']) -def notice_fetch_for_date_range_orchestrator(): +def fetch_notices_by_date_range(): @task @event_log(TechnicalEventMessage( message="trigger_fetch_notices_workers_for_date_range", @@ -59,4 +59,4 @@ def trigger_notice_by_date_for_each_date_in_range(): trigger_notice_by_date_for_each_date_in_range() -dag = notice_fetch_for_date_range_orchestrator() +dag = fetch_notices_by_date_range() diff --git a/dags/load_mapping_suite_in_mongodb.py b/dags/load_mapping_suite_in_database.py similarity index 98% rename from dags/load_mapping_suite_in_mongodb.py rename to dags/load_mapping_suite_in_database.py index 4944e9a15..944aa38c0 100644 --- a/dags/load_mapping_suite_in_mongodb.py +++ b/dags/load_mapping_suite_in_database.py @@ -31,7 +31,7 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['fetch', 'mapping-suite', 'github']) -def load_mapping_suite_in_mongodb(): +def load_mapping_suite_in_database(): @task @event_log(is_loggable=False) def fetch_mapping_suite_package_from_github_into_mongodb(**context_args): @@ -86,4 +86,4 @@ def _branch_selector(): branch_task >> [trigger_document_proc_pipeline, finish_step] -dag = load_mapping_suite_in_mongodb() +dag = load_mapping_suite_in_database() diff --git a/dags/notice_process_workflow.py b/dags/notice_processing_pipeline.py similarity index 98% rename from dags/notice_process_workflow.py rename to dags/notice_processing_pipeline.py index 3791bc5b2..ceb66d0e2 100644 --- a/dags/notice_process_workflow.py +++ b/dags/notice_processing_pipeline.py @@ -24,7 +24,7 @@ SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID = "switch_to_validation" SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID = "switch_to_package" SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID = "switch_to_publish" -DAG_NAME = "notice_process_workflow" +DAG_NAME = "notice_processing_pipeline" BRANCH_SELECTOR_MAP = {NOTICE_NORMALISATION_PIPELINE_TASK_ID: NOTICE_NORMALISATION_PIPELINE_TASK_ID, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID, @@ -49,7 +49,7 @@ def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_I max_active_runs=256, max_active_tasks=256, tags=['worker', 'pipeline']) -def notice_process_workflow(): +def notice_processing_pipeline(): """ """ @@ -149,4 +149,4 @@ def _stop_processing(): notice_package_step >> selector_branch_before_publish >> notice_publish_step -dag = notice_process_workflow() +dag = notice_processing_pipeline() diff --git a/dags/selector_raw_notices_process_orchestrator.py b/dags/reprocess_unnormalised_notices_from_backlog.py similarity index 91% rename from dags/selector_raw_notices_process_orchestrator.py rename to dags/reprocess_unnormalised_notices_from_backlog.py index 064877089..51005e1ee 100644 --- a/dags/selector_raw_notices_process_orchestrator.py +++ b/dags/reprocess_unnormalised_notices_from_backlog.py @@ -8,7 +8,7 @@ from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType -DAG_NAME = "selector_raw_notices_process_orchestrator" +DAG_NAME = "reprocess_unnormalised_notices_from_backlog" TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" FORM_NUMBER_DAG_PARAM = "form_number" @@ -20,7 +20,7 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['selector', 'raw-notices']) -def selector_raw_notices_process_orchestrator(): +def reprocess_unnormalised_notices_from_backlog(): @task @event_log(TechnicalEventMessage( message="select_all_raw_notices", @@ -41,4 +41,4 @@ def select_all_raw_notices(): select_all_raw_notices() >> trigger_notice_process_workflow -dag = selector_raw_notices_process_orchestrator() +dag = reprocess_unnormalised_notices_from_backlog() diff --git a/dags/selector_repackage_process_orchestrator.py b/dags/reprocess_unpackaged_notices_from_backlog.py similarity index 90% rename from dags/selector_repackage_process_orchestrator.py rename to dags/reprocess_unpackaged_notices_from_backlog.py index 5a476d547..75d37f4c6 100644 --- a/dags/selector_repackage_process_orchestrator.py +++ b/dags/reprocess_unpackaged_notices_from_backlog.py @@ -2,7 +2,7 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.notice_process_workflow import NOTICE_PACKAGE_PIPELINE_TASK_ID +from dags.notice_processing_pipeline import NOTICE_PACKAGE_PIPELINE_TASK_ID from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status from ted_sws.core.model.notice import NoticeStatus @@ -10,7 +10,7 @@ from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType -DAG_NAME = "selector_re_package_process_orchestrator" +DAG_NAME = "reprocess_unpackaged_notices_from_backlog" RE_PACKAGE_TARGET_NOTICE_STATES = [NoticeStatus.VALIDATED, NoticeStatus.INELIGIBLE_FOR_PACKAGING, NoticeStatus.ELIGIBLE_FOR_PACKAGING, @@ -25,7 +25,7 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['selector', 're-package']) -def selector_re_package_process_orchestrator(): +def reprocess_unpackaged_notices_from_backlog(): @task @event_log(TechnicalEventMessage( message="select_notices_for_re_package", @@ -50,4 +50,4 @@ def select_notices_for_re_package(): select_notices_for_re_package() >> trigger_notice_process_workflow -dag = selector_re_package_process_orchestrator() +dag = reprocess_unpackaged_notices_from_backlog() diff --git a/dags/selector_republish_process_orchestrator.py b/dags/reprocess_unpublished_notices_from_backlog.py similarity index 90% rename from dags/selector_republish_process_orchestrator.py rename to dags/reprocess_unpublished_notices_from_backlog.py index b91b231b0..4f4439df0 100644 --- a/dags/selector_republish_process_orchestrator.py +++ b/dags/reprocess_unpublished_notices_from_backlog.py @@ -2,7 +2,7 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.notice_process_workflow import NOTICE_PUBLISH_PIPELINE_TASK_ID +from dags.notice_processing_pipeline import NOTICE_PUBLISH_PIPELINE_TASK_ID from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ EXECUTE_ONLY_ONE_STEP_KEY from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status @@ -11,7 +11,7 @@ from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType -DAG_NAME = "selector_re_publish_process_orchestrator" +DAG_NAME = "reprocess_unpublished_notices_from_backlog" RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.ELIGIBLE_FOR_PUBLISHING, NoticeStatus.INELIGIBLE_FOR_PUBLISHING, NoticeStatus.PACKAGED @@ -26,7 +26,7 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['selector', 're-publish']) -def selector_re_publish_process_orchestrator(): +def reprocess_unpublished_notices_from_backlog(): @task @event_log(TechnicalEventMessage( message="select_notices_for_re_publish", @@ -51,4 +51,4 @@ def select_notices_for_re_publish(): select_notices_for_re_publish() >> trigger_notice_process_workflow -etl_dag = selector_re_publish_process_orchestrator() +etl_dag = reprocess_unpublished_notices_from_backlog() diff --git a/dags/selector_retransform_process_orchestrator.py b/dags/reprocess_untransformed_notices_from_backlog.py similarity index 90% rename from dags/selector_retransform_process_orchestrator.py rename to dags/reprocess_untransformed_notices_from_backlog.py index e5ca395b4..b93033e61 100644 --- a/dags/selector_retransform_process_orchestrator.py +++ b/dags/reprocess_untransformed_notices_from_backlog.py @@ -2,7 +2,7 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.notice_process_workflow import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID +from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ EXECUTE_ONLY_ONE_STEP_KEY from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status @@ -11,7 +11,7 @@ from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType -DAG_NAME = "selector_re_transform_process_orchestrator" +DAG_NAME = "reprocess_untransformed_notices_from_backlog" RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION, @@ -27,7 +27,7 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['selector', 're-transform']) -def selector_re_transform_process_orchestrator(): +def reprocess_untransformed_notices_from_backlog(): @task @event_log(TechnicalEventMessage( message="select_notices_for_re_transform", @@ -52,4 +52,4 @@ def select_notices_for_re_transform(): select_notices_for_re_transform() >> trigger_notice_process_workflow -dag = selector_re_transform_process_orchestrator() +dag = reprocess_untransformed_notices_from_backlog() diff --git a/dags/selector_revalidate_process_orchestrator.py b/dags/reprocess_unvalidated_notices_from_backlog.py similarity index 90% rename from dags/selector_revalidate_process_orchestrator.py rename to dags/reprocess_unvalidated_notices_from_backlog.py index 44b3aae71..9f53f3f58 100644 --- a/dags/selector_revalidate_process_orchestrator.py +++ b/dags/reprocess_unvalidated_notices_from_backlog.py @@ -2,7 +2,7 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.notice_process_workflow import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID +from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ EXECUTE_ONLY_ONE_STEP_KEY from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status @@ -11,7 +11,7 @@ from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType -DAG_NAME = "selector_re_validate_process_orchestrator" +DAG_NAME = "reprocess_unvalidated_notices_from_backlog" RE_VALIDATE_TARGET_NOTICE_STATES = [NoticeStatus.DISTILLED] TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" @@ -24,7 +24,7 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['selector', 're-validate']) -def selector_re_validate_process_orchestrator(): +def reprocess_unvalidated_notices_from_backlog(): @task @event_log(TechnicalEventMessage( message="select_notices_for_re_validate", @@ -49,4 +49,4 @@ def select_notices_for_re_validate(): select_notices_for_re_validate() >> trigger_notice_process_workflow -dag = selector_re_validate_process_orchestrator() +dag = reprocess_unvalidated_notices_from_backlog() diff --git a/tests/e2e/dags/_test_load_mapping_suite_in_mongodb.py b/tests/e2e/dags/_test_load_mapping_suite_in_mongodb.py index 7e26e65df..bec6902ba 100644 --- a/tests/e2e/dags/_test_load_mapping_suite_in_mongodb.py +++ b/tests/e2e/dags/_test_load_mapping_suite_in_mongodb.py @@ -1,4 +1,4 @@ -from dags.load_mapping_suite_in_mongodb import \ +from dags.load_mapping_suite_in_database import \ FETCH_MAPPING_SUITE_PACKAGE_FROM_GITHUB_INTO_MONGODB, MAPPING_SUITE_PACKAGE_NAME_DAG_PARAM_KEY from ted_sws import config from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB From 27a6897a6725f5ae47381eea12d4d7c7f704a010 Mon Sep 17 00:00:00 2001 From: CaptainOfHacks Date: Wed, 14 Dec 2022 09:07:51 +0200 Subject: [PATCH 2/2] add fetch_notices_by_query DAG --- dags/fetch_notices_by_query.py | 68 ++++++++++++++++++++++ dags/pipelines/notice_fetcher_pipelines.py | 16 +++++ 2 files changed, 84 insertions(+) create mode 100644 dags/fetch_notices_by_query.py diff --git a/dags/fetch_notices_by_query.py b/dags/fetch_notices_by_query.py new file mode 100644 index 000000000..baad1bccc --- /dev/null +++ b/dags/fetch_notices_by_query.py @@ -0,0 +1,68 @@ +from airflow.decorators import dag, task +from airflow.operators.dummy import DummyOperator +from airflow.operators.python import BranchPythonOperator +from airflow.utils.trigger_rule import TriggerRule +from dags import DEFAULT_DAG_ARGUMENTS +from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator +from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_query_pipeline +from ted_sws.event_manager.adapters.event_log_decorator import event_log +from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ + EventMessageProcessType + +DAG_NAME = "fetch_notices_by_query" +BATCH_SIZE = 2000 +QUERY_DAG_KEY = "query" +TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow" +TRIGGER_PARTIAL_WORKFLOW_TASK_ID = "trigger_partial_notice_proc_workflow" +TRIGGER_COMPLETE_WORKFLOW_TASK_ID = "trigger_complete_notice_proc_workflow" +CHECK_IF_TRIGGER_COMPLETE_WORKFLOW_TASK_ID = "check_if_trigger_complete_workflow" +FINISH_FETCH_BY_DATE_TASK_ID = "finish_fetch_by_query" + + +@dag(default_args=DEFAULT_DAG_ARGUMENTS, + catchup=False, + tags=['fetch']) +def fetch_notices_by_query(): + @task + @event_log(TechnicalEventMessage( + message="fetch_by_query_notice_from_ted", + metadata=EventMessageMetadata( + process_type=EventMessageProcessType.DAG, process_name=DAG_NAME + )) + ) + def fetch_by_query_notice_from_ted(): + notice_ids = notice_fetcher_by_query_pipeline(query=get_dag_param(key=QUERY_DAG_KEY, raise_error=True)) + if not notice_ids: + raise Exception("No notices has been fetched!") + push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) + + trigger_complete_workflow = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_COMPLETE_WORKFLOW_TASK_ID, + execute_only_one_step=False + ) + trigger_normalisation_workflow = TriggerNoticeBatchPipelineOperator( + task_id=TRIGGER_PARTIAL_WORKFLOW_TASK_ID, + batch_size=BATCH_SIZE, + execute_only_one_step=True) + + def _branch_selector(): + trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, + default_value=True) + push_dag_downstream(key=NOTICE_IDS_KEY, value=pull_dag_upstream(key=NOTICE_IDS_KEY)) + if trigger_complete_workflow: + return [TRIGGER_COMPLETE_WORKFLOW_TASK_ID] + return [TRIGGER_PARTIAL_WORKFLOW_TASK_ID] + + branch_task = BranchPythonOperator( + task_id=CHECK_IF_TRIGGER_COMPLETE_WORKFLOW_TASK_ID, + python_callable=_branch_selector, + ) + + finish_step = DummyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID, + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + + fetch_by_query_notice_from_ted() >> branch_task >> [trigger_normalisation_workflow, + trigger_complete_workflow] >> finish_step + + +dag = fetch_notices_by_query() diff --git a/dags/pipelines/notice_fetcher_pipelines.py b/dags/pipelines/notice_fetcher_pipelines.py index 2cac0698f..d0a8722c1 100644 --- a/dags/pipelines/notice_fetcher_pipelines.py +++ b/dags/pipelines/notice_fetcher_pipelines.py @@ -1,6 +1,7 @@ from datetime import datetime, timedelta from typing import List + def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]: from pymongo import MongoClient from ted_sws import config @@ -21,3 +22,18 @@ def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]: notice_fetched_date=notice_publication_date) return notice_ids + + +def notice_fetcher_by_query_pipeline(query: str = None) -> List[str]: + from pymongo import MongoClient + from ted_sws import config + from ted_sws.data_manager.adapters.notice_repository import NoticeRepository + from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI + from ted_sws.notice_fetcher.services.notice_fetcher import NoticeFetcher + + ted_api_query = {"q": query} + mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) + notice_ids = NoticeFetcher(notice_repository=NoticeRepository(mongodb_client=mongodb_client), + ted_api_adapter=TedAPIAdapter( + request_api=TedRequestAPI())).fetch_notices_by_query(query=ted_api_query) + return notice_ids