Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tests for dags #53

Merged
merged 2 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion tests/unit/dags/test_selector_daily_fetch_orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
FETCH_NOTICE_FROM_TED_TASK_ID = "fetch_notice_from_ted"
TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID = "trigger_document_proc_pipeline"


def test_selector_daily_fetch_orchestrator(dag_bag):
assert dag_bag.import_errors == {}
dag = dag_bag.get_dag(dag_id="selector_daily_fetch_orchestrator")
assert dag is not None
print(dag.tasks)
assert dag.has_task(FETCH_NOTICE_FROM_TED_TASK_ID)
assert dag.has_task(TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID)
fetch_notice_from_ted_task = dag.get_task(FETCH_NOTICE_FROM_TED_TASK_ID)
trigger_document_proc_pipeline_task = dag.get_task(TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID)
assert fetch_notice_from_ted_task
assert trigger_document_proc_pipeline_task
assert TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID in set(
map(lambda task: task.task_id, fetch_notice_from_ted_task.downstream_list))
assert FETCH_NOTICE_FROM_TED_TASK_ID in set(
map(lambda task: task.task_id, trigger_document_proc_pipeline_task.upstream_list))
19 changes: 19 additions & 0 deletions tests/unit/dags/test_selector_repackage_process_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
SELECT_NOTICES_FOR_RE_PACKAGE_AND_RESET_STATUS_TASK_ID = "select_notices_for_re_package_and_reset_status"
TRIGGER_WORKER_FOR_PACKAGE_BRANCH_TASK_ID = "trigger_worker_for_package_branch"


def test_selector_repackage_process_orchestrator(dag_bag):
assert dag_bag.import_errors == {}
dag = dag_bag.get_dag(dag_id="selector_re_package_process_orchestrator")
assert dag is not None
assert dag.has_task(SELECT_NOTICES_FOR_RE_PACKAGE_AND_RESET_STATUS_TASK_ID)
assert dag.has_task(TRIGGER_WORKER_FOR_PACKAGE_BRANCH_TASK_ID)
select_notices_for_re_package_and_reset_status_task = dag.get_task(
SELECT_NOTICES_FOR_RE_PACKAGE_AND_RESET_STATUS_TASK_ID)
trigger_worker_for_package_branch_task = dag.get_task(TRIGGER_WORKER_FOR_PACKAGE_BRANCH_TASK_ID)
assert select_notices_for_re_package_and_reset_status_task
assert trigger_worker_for_package_branch_task
assert TRIGGER_WORKER_FOR_PACKAGE_BRANCH_TASK_ID in set(
map(lambda task: task.task_id, select_notices_for_re_package_and_reset_status_task.downstream_list))
assert SELECT_NOTICES_FOR_RE_PACKAGE_AND_RESET_STATUS_TASK_ID in set(
map(lambda task: task.task_id, trigger_worker_for_package_branch_task.upstream_list))
19 changes: 19 additions & 0 deletions tests/unit/dags/test_selector_republish_process_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
SELECT_NOTICES_FOR_RE_PUBLISH_AND_RESET_STATUS_TASK_ID = "select_notices_for_re_publish_and_reset_status"
TRIGGER_WORKER_FOR_PUBLISH_BRANCH_TASK_ID = "trigger_worker_for_publish_branch"


def test_selector_republish_process_orchestrator(dag_bag):
assert dag_bag.import_errors == {}
dag = dag_bag.get_dag(dag_id="selector_re_publish_process_orchestrator")
assert dag is not None
assert dag.has_task(SELECT_NOTICES_FOR_RE_PUBLISH_AND_RESET_STATUS_TASK_ID)
assert dag.has_task(TRIGGER_WORKER_FOR_PUBLISH_BRANCH_TASK_ID)
select_notices_for_re_publish_and_reset_status_task = dag.get_task(
SELECT_NOTICES_FOR_RE_PUBLISH_AND_RESET_STATUS_TASK_ID)
trigger_worker_for_publish_branch_task = dag.get_task(TRIGGER_WORKER_FOR_PUBLISH_BRANCH_TASK_ID)
assert select_notices_for_re_publish_and_reset_status_task
assert trigger_worker_for_publish_branch_task
assert TRIGGER_WORKER_FOR_PUBLISH_BRANCH_TASK_ID in set(
map(lambda task: task.task_id, select_notices_for_re_publish_and_reset_status_task.downstream_list))
assert SELECT_NOTICES_FOR_RE_PUBLISH_AND_RESET_STATUS_TASK_ID in set(
map(lambda task: task.task_id, trigger_worker_for_publish_branch_task.upstream_list))
19 changes: 19 additions & 0 deletions tests/unit/dags/test_selector_retransform_process_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
SELECT_NOTICES_FOR_RE_TRANSFORM_AND_RESET_STATUS_TASK_ID = "select_notices_for_re_transform_and_reset_status"
TRIGGER_WORKER_FOR_TRANSFORM_BRANCH_TASK_ID = "trigger_worker_for_transform_branch"


def test_selector_re_transform_process_orchestrator(dag_bag):
assert dag_bag.import_errors == {}
dag = dag_bag.get_dag(dag_id="selector_re_transform_process_orchestrator")
assert dag is not None
assert dag.has_task(SELECT_NOTICES_FOR_RE_TRANSFORM_AND_RESET_STATUS_TASK_ID)
assert dag.has_task(TRIGGER_WORKER_FOR_TRANSFORM_BRANCH_TASK_ID)
select_notices_for_re_transform_and_reset_status_task = dag.get_task(
SELECT_NOTICES_FOR_RE_TRANSFORM_AND_RESET_STATUS_TASK_ID)
trigger_worker_for_transform_branch_task = dag.get_task(TRIGGER_WORKER_FOR_TRANSFORM_BRANCH_TASK_ID)
assert select_notices_for_re_transform_and_reset_status_task
assert trigger_worker_for_transform_branch_task
assert TRIGGER_WORKER_FOR_TRANSFORM_BRANCH_TASK_ID in set(
map(lambda task: task.task_id, select_notices_for_re_transform_and_reset_status_task.downstream_list))
assert SELECT_NOTICES_FOR_RE_TRANSFORM_AND_RESET_STATUS_TASK_ID in set(
map(lambda task: task.task_id, trigger_worker_for_transform_branch_task.upstream_list))
86 changes: 86 additions & 0 deletions tests/unit/dags/test_work_single_notice_process_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
NORMALISE_NOTICE_METADATA_TASK_ID = "normalise_notice_metadata"
CHECK_ELIGIBILITY_FOR_TRANSFORMATION_TASK_ID = "check_eligibility_for_transformation"
PREPROCESS_XML_MANIFESTATION_TASK_ID = "preprocess_xml_manifestation"
TRANSFORM_NOTICE_TASK_ID = "transform_notice"
RESOLVE_ENTITIES_IN_THE_RDF_MANIFESTATION_TASK_ID = "resolve_entities_in_the_rdf_manifestation"
VALIDATE_TRANSFORMED_RDF_MANIFESTATION_TASK_ID = "validate_transformed_rdf_manifestation"
CHECK_ELIGIBILITY_FOR_PACKING_BY_VALIDATION_REPORT_TASK_ID = "check_eligibility_for_packing_by_validation_report"
GENERATE_METS_PACKAGE_TASK_ID = "generate_mets_package"
CHECK_PACKAGE_INTEGRITY_BY_PACKAGE_STRUCTURE_TASK_ID = "check_package_integrity_by_package_structure"
PUBLISH_NOTICE_IN_CELLAR_TASK_ID = "publish_notice_in_cellar"
CHECK_NOTICE_PUBLIC_AVAILABILITY_IN_CELLAR_TASK_ID = "check_notice_public_availability_in_cellar"
NOTICE_SUCCESSFULLY_PROCESSED_TASK_ID = "notice_successfully_processed"
FAIL_ON_STATE_TASK_ID = "fail_on_state"
CHECK_NOTICE_STATE_BEFORE_TRANSFORM_TASK_ID = "check_notice_state_before_transform"
CHECK_NOTICE_STATE_BEFORE_GENERATE_METS_PACKAGE_TASK_ID = "check_notice_state_before_generate_mets_package"
CHECK_NOTICE_STATE_BEFORE_PUBLISH_NOTICE_IN_CELLAR_TASK_ID = "check_notice_state_before_publish_notice_in_cellar"
CHECK_NOTICE_STATE_BEFORE_NOTICE_SUCCESSFULLY_PROCESSED_TASK_ID = "check_notice_state_before_notice_successfully_processed"
START_PROCESSING_NOTICE_TASK_ID = "start_processing_notice"

TRANSFORM_BRANCH_TASK_IDS = [
CHECK_ELIGIBILITY_FOR_TRANSFORMATION_TASK_ID, CHECK_NOTICE_STATE_BEFORE_TRANSFORM_TASK_ID,
PREPROCESS_XML_MANIFESTATION_TASK_ID, TRANSFORM_NOTICE_TASK_ID,
RESOLVE_ENTITIES_IN_THE_RDF_MANIFESTATION_TASK_ID,
VALIDATE_TRANSFORMED_RDF_MANIFESTATION_TASK_ID,
CHECK_ELIGIBILITY_FOR_PACKING_BY_VALIDATION_REPORT_TASK_ID,
CHECK_NOTICE_STATE_BEFORE_GENERATE_METS_PACKAGE_TASK_ID
]

PACKAGE_BRANCH_TASK_IDS = [
GENERATE_METS_PACKAGE_TASK_ID, CHECK_PACKAGE_INTEGRITY_BY_PACKAGE_STRUCTURE_TASK_ID,
CHECK_NOTICE_STATE_BEFORE_PUBLISH_NOTICE_IN_CELLAR_TASK_ID
]

PUBLISH_BRANCH_TASK_IDS = [
PUBLISH_NOTICE_IN_CELLAR_TASK_ID, CHECK_NOTICE_PUBLIC_AVAILABILITY_IN_CELLAR_TASK_ID,
CHECK_NOTICE_STATE_BEFORE_NOTICE_SUCCESSFULLY_PROCESSED_TASK_ID,
NOTICE_SUCCESSFULLY_PROCESSED_TASK_ID
]

FULL_BRANCH_TASK_IDS = [START_PROCESSING_NOTICE_TASK_ID,
NORMALISE_NOTICE_METADATA_TASK_ID] + TRANSFORM_BRANCH_TASK_IDS + PACKAGE_BRANCH_TASK_IDS + PUBLISH_BRANCH_TASK_IDS


def _test_dag_branch(dag, task_ids: list):
for task_id in task_ids:
assert dag.has_task(task_id)
for index in range(0, len(task_ids) - 1):
task_a = dag.get_task(task_ids[index])
task_b = dag.get_task(task_ids[index + 1])
assert task_b.task_id in set(map(lambda task: task.task_id, task_a.downstream_list))
assert task_a.task_id in set(
map(lambda task: task.task_id, task_b.upstream_list))


def test_transform_branch_worker_single_notice_process_orchestrator(dag_bag):
assert dag_bag.import_errors == {}
dag = dag_bag.get_dag(dag_id="worker_single_notice_process_orchestrator")
assert dag is not None
_test_dag_branch(dag, TRANSFORM_BRANCH_TASK_IDS)


def test_package_branch_worker_single_notice_process_orchestrator(dag_bag):
assert dag_bag.import_errors == {}
dag = dag_bag.get_dag(dag_id="worker_single_notice_process_orchestrator")
assert dag is not None
_test_dag_branch(dag, PACKAGE_BRANCH_TASK_IDS)


def test_publish_branch_worker_single_notice_process_orchestrator(dag_bag):
assert dag_bag.import_errors == {}
dag = dag_bag.get_dag(dag_id="worker_single_notice_process_orchestrator")
assert dag is not None
_test_dag_branch(dag, PUBLISH_BRANCH_TASK_IDS)


def test_full_branch_worker_single_notice_process_orchestrator(dag_bag):
assert dag_bag.import_errors == {}
dag = dag_bag.get_dag(dag_id="worker_single_notice_process_orchestrator")
assert dag is not None
_test_dag_branch(dag, FULL_BRANCH_TASK_IDS)


def test_worker_single_notice_process_orchestrator(dag_bag):
assert dag_bag.import_errors == {}
dag = dag_bag.get_dag(dag_id="worker_single_notice_process_orchestrator")
assert dag is not None