Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ted 25 #51

Merged
merged 62 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
3f7fa50
add dags
CaptainOfHacks Mar 9, 2022
7b0a41c
Update notice_fetcher_master.py
CaptainOfHacks Mar 10, 2022
8d9259f
Update pipeline_scheduler.py
CaptainOfHacks Mar 10, 2022
eda9e0f
Merge remote-tracking branch 'origin/feature/TED-90' into feature/TED-25
CaptainOfHacks Mar 10, 2022
a1ea395
WIP
CaptainOfHacks Mar 10, 2022
9ef0db6
WIP
CaptainOfHacks Mar 11, 2022
e0427eb
Merge branch 'main' into feature/TED-25
CaptainOfHacks Mar 11, 2022
8772533
Update .gitignore
CaptainOfHacks Mar 23, 2022
e6a1398
Merge branch 'main' into feature/TED-25
CaptainOfHacks Mar 23, 2022
aaea2cc
Merge branch 'main' into feature/TED-25
CaptainOfHacks Mar 28, 2022
156df3e
Merge branch 'main' into feature/TED-25
CaptainOfHacks Mar 28, 2022
7632fc0
Update notice_transformer.py
CaptainOfHacks Mar 28, 2022
b82f25c
Merge branch 'main' into feature/TED-25
CaptainOfHacks Mar 29, 2022
fe0bd32
wip
CaptainOfHacks Mar 29, 2022
6040db0
WIP
CaptainOfHacks Mar 30, 2022
bb7e5cb
Update single_notice_processing_pipeline_with_decorators.py
CaptainOfHacks Mar 30, 2022
486f7cb
Update single_notice_processing_pipeline_with_decorators.py
CaptainOfHacks Mar 31, 2022
72f19b6
Update docker-compose.yaml
CaptainOfHacks Mar 31, 2022
d723cd4
Merge branch 'main' into feature/TED-25
CaptainOfHacks Mar 31, 2022
e7a172b
Create selector_daily_fetch_orchestrator.py
CaptainOfHacks Apr 4, 2022
54c78fe
Create selector_repackage_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
2ffc1ad
Create selector_republish_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
574ce68
Create selector_retransform_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
48f9f62
Create worker_single_notice_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
7900e8c
Delete single_notice_processing_pipeline_with_decorators.py
CaptainOfHacks Apr 4, 2022
530ed12
Update selector_daily_fetch_orchestrator.py
CaptainOfHacks Apr 4, 2022
80eaeb2
Update selector_retransform_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
186cd95
Merge branch 'main' into feature/TED-25
CaptainOfHacks Apr 4, 2022
9718898
Update selector_retransform_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
0effcc9
Update worker_single_notice_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
63647cd
fixing magic with airflow
Dragos0000 Apr 4, 2022
6a19ba7
Delete document_proc_pipeline.py
CaptainOfHacks Apr 4, 2022
be9068a
Delete notice_fetcher_master.py
CaptainOfHacks Apr 4, 2022
97bfab0
Delete pipeline_scheduler.py
CaptainOfHacks Apr 4, 2022
49f7f03
Update selector_repackage_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
03e841a
Update selector_republish_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
a2a45dd
Update selector_retransform_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
3b7d006
Delete single_notice_processing_pipeline.py
CaptainOfHacks Apr 4, 2022
c589011
Update worker_single_notice_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
13528a1
Update __init__.py
CaptainOfHacks Apr 4, 2022
06b1276
Update selector_daily_fetch_orchestrator.py
CaptainOfHacks Apr 4, 2022
4ca6750
Update selector_repackage_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
d005bce
Update selector_republish_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
e42b18a
Update selector_retransform_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
dc4b5ef
Update worker_single_notice_process_orchestrator.py
CaptainOfHacks Apr 4, 2022
f5a0831
WIP
CaptainOfHacks Apr 5, 2022
217c069
Update selector_daily_fetch_orchestrator.py
CaptainOfHacks Apr 5, 2022
52e07fa
Update worker_single_notice_process_orchestrator.py
CaptainOfHacks Apr 5, 2022
f371eca
Create __init__.py
CaptainOfHacks Apr 5, 2022
2229f71
Create conftest.py
CaptainOfHacks Apr 5, 2022
766868c
Create test_selector_daily_fetch_orchestrator.py
CaptainOfHacks Apr 5, 2022
73c026d
Update conftest.py
CaptainOfHacks Apr 5, 2022
4f06ba7
WIP
CaptainOfHacks Apr 6, 2022
7c81b02
Update metadata.json
CaptainOfHacks Apr 6, 2022
e355e3f
Merge branch 'main' into feature/TED-25
CaptainOfHacks Apr 6, 2022
63fdbfc
fix airflow home
CaptainOfHacks Apr 6, 2022
a9fa86a
fix eligibility tests
CaptainOfHacks Apr 6, 2022
5775c5f
upgrade airflow version to 2.2.5 [crossfingers]
CaptainOfHacks Apr 6, 2022
18d7c8e
Create requirements.txt
CaptainOfHacks Apr 6, 2022
51062b2
WIP
CaptainOfHacks Apr 6, 2022
8b66124
Update worker_single_notice_process_orchestrator.py
CaptainOfHacks Apr 6, 2022
eedb0e7
resolve comments
CaptainOfHacks Apr 6, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions dags/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from datetime import datetime, timedelta

DEFAULT_DAG_ARGUMENTS = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime.now(),
"email": ["info@meaningfy.ws"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=3600),
"schedule_interval": "@once",
"max_active_runs": 128,
"concurrency": 128,
"execution_timeout": timedelta(hours=24),
}
36 changes: 36 additions & 0 deletions dags/dags_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from airflow.operators.python import get_current_context

TASK_INSTANCE = "ti"


def select_first_non_none(data):
"""

:param data:
:return:
"""
return next((item for item in data if item is not None), None)


def pull_dag_upstream(key, task_ids=None):
"""

:param key:
:param task_ids:
:return:
"""
context = get_current_context()
return select_first_non_none(
context[TASK_INSTANCE].xcom_pull(key=str(key),
task_ids=task_ids if task_ids else context['task'].upstream_task_ids))


def push_dag_downstream(key, value):
"""

:param key:
:param value:
:return:
"""
context = get_current_context()
return context[TASK_INSTANCE].xcom_push(key=str(key), value=value)
49 changes: 49 additions & 0 deletions dags/selector_daily_fetch_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from dags import DEFAULT_DAG_ARGUMENTS
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pymongo import MongoClient

from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI
from ted_sws.notice_fetcher.services.notice_fetcher import NoticeFetcher
from ted_sws.core.model.notice import NoticeStatus

@dag(default_args=DEFAULT_DAG_ARGUMENTS, tags=['selector', 'daily-fetch'])
def selector_daily_fetch_orchestrator():
@task
def fetch_notice_from_ted():
context = get_current_context()
dag_conf = context["dag_run"].conf
key = 'fetch_time_filter'
if key in dag_conf.keys():
print(dag_conf[key])
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
NoticeFetcher(notice_repository=NoticeRepository(mongodb_client=mongodb_client),
ted_api_adapter=TedAPIAdapter(request_api=TedRequestAPI())).fetch_notices_by_date_wild_card(
wildcard_date=dag_conf[key]) #"20220203*"
else:
print(f"The key={key} is not present in context")



@task
def trigger_document_proc_pipeline():
context = get_current_context()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notices = notice_repository.get_notice_by_status(notice_status=NoticeStatus.RAW)
for notice in notices:
TriggerDagRunOperator(
task_id=f'trigger_worker_dag_{notice.ted_id}',
trigger_dag_id="worker_single_notice_process_orchestrator",
conf={"notice_id": notice.ted_id,
"notice_status": str(notice.status)
}
).execute(context=context)

fetch_notice_from_ted() >> trigger_document_proc_pipeline()


dag = selector_daily_fetch_orchestrator()
44 changes: 44 additions & 0 deletions dags/selector_repackage_process_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from dags import DEFAULT_DAG_ARGUMENTS
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pymongo import MongoClient
from ted_sws.core.model.notice import NoticeStatus

from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository

RE_PACKAGE_TARGET_NOTICE_STATES = [NoticeStatus.ELIGIBLE_FOR_PACKAGING, NoticeStatus.INELIGIBLE_FOR_PUBLISHING]


@dag(default_args=DEFAULT_DAG_ARGUMENTS, tags=['selector', 're-package'])
CaptainOfHacks marked this conversation as resolved.
Show resolved Hide resolved
def selector_re_package_process_orchestrator():
@task
CaptainOfHacks marked this conversation as resolved.
Show resolved Hide resolved
def select_notices_for_re_package_and_reset_status():
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
for target_notice_state in RE_PACKAGE_TARGET_NOTICE_STATES:
notices = notice_repository.get_notice_by_status(notice_status=target_notice_state)
for notice in notices:
notice.update_status_to(new_status=NoticeStatus.ELIGIBLE_FOR_PACKAGING)
notice_repository.update(notice=notice)

@task
def trigger_worker_for_package_branch():
context = get_current_context()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notices = notice_repository.get_notice_by_status(notice_status=NoticeStatus.ELIGIBLE_FOR_PACKAGING)
for notice in notices:
TriggerDagRunOperator(
task_id=f'trigger_worker_dag_{notice.ted_id}',
trigger_dag_id="worker_single_notice_process_orchestrator",
conf={"notice_id": notice.ted_id,
"notice_status": str(notice.status)
}
).execute(context=context)

select_notices_for_re_package_and_reset_status() >> trigger_worker_for_package_branch()


etl_dag = selector_re_package_process_orchestrator()
44 changes: 44 additions & 0 deletions dags/selector_republish_process_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from dags import DEFAULT_DAG_ARGUMENTS
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pymongo import MongoClient
from ted_sws.core.model.notice import NoticeStatus

from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository

RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.PUBLICLY_UNAVAILABLE, NoticeStatus.ELIGIBLE_FOR_PUBLISHING]


@dag(default_args=DEFAULT_DAG_ARGUMENTS, tags=['selector', 're-publish'])
def selector_re_publish_process_orchestrator():
@task
def select_notices_for_re_publish_and_reset_status():
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
for target_notice_state in RE_PUBLISH_TARGET_NOTICE_STATES:
notices = notice_repository.get_notice_by_status(notice_status=target_notice_state)
for notice in notices:
notice.update_status_to(new_status=NoticeStatus.ELIGIBLE_FOR_PUBLISHING)
notice_repository.update(notice=notice)

@task
def trigger_worker_for_publish_branch():
context = get_current_context()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notices = notice_repository.get_notice_by_status(notice_status=NoticeStatus.ELIGIBLE_FOR_PUBLISHING)
for notice in notices:
TriggerDagRunOperator(
task_id=f'trigger_worker_dag_{notice.ted_id}',
trigger_dag_id="worker_single_notice_process_orchestrator",
conf={"notice_id": notice.ted_id,
"notice_status": str(notice.status)
}
).execute(context=context)

select_notices_for_re_publish_and_reset_status() >> trigger_worker_for_publish_branch()


etl_dag = selector_re_publish_process_orchestrator()
50 changes: 50 additions & 0 deletions dags/selector_retransform_process_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from dags import DEFAULT_DAG_ARGUMENTS
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pymongo import MongoClient
from ted_sws.core.model.notice import NoticeStatus

from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository

RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.NORMALISED_METADATA,
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.TRANSFORMED,
NoticeStatus.DISTILLED,
NoticeStatus.VALIDATED, NoticeStatus.INELIGIBLE_FOR_PACKAGING
]


@dag(default_args=DEFAULT_DAG_ARGUMENTS, tags=['selector', 're-transform'])
def selector_re_transform_process_orchestrator():
@task
def select_notices_for_re_transform_and_reset_status():
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
for target_notice_state in RE_TRANSFORM_TARGET_NOTICE_STATES:
notices = notice_repository.get_notice_by_status(notice_status=target_notice_state)
for notice in notices:
notice.update_status_to(new_status=NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION)
notice_repository.update(notice=notice)

@task
def trigger_worker_for_transform_branch():
context = get_current_context()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notices = notice_repository.get_notice_by_status(notice_status=NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION)
for notice in notices:
TriggerDagRunOperator(
task_id=f'trigger_worker_dag_{notice.ted_id}',
trigger_dag_id="worker_single_notice_process_orchestrator",
conf={"notice_id": notice.ted_id,
"notice_status": str(notice.status)
}
).execute(context=context)

select_notices_for_re_transform_and_reset_status() >> trigger_worker_for_transform_branch()


etl_dag = selector_re_transform_process_orchestrator()
Loading