Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ted 295 #79

Merged
merged 10 commits into from
Apr 22, 2022
43 changes: 36 additions & 7 deletions dags/worker_single_notice_process_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
from dags.dags_utils import pull_dag_upstream, push_dag_downstream
from ted_sws import config
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.metadata_normaliser.services.metadata_normalizer import normalise_notice_by_id

from airflow.decorators import dag
from airflow.operators.python import get_current_context, BranchPythonOperator, PythonOperator

from dags import DEFAULT_DAG_ARGUMENTS
from ted_sws.notice_eligibility.services.notice_eligibility import notice_eligibility_checker_by_id
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
from ted_sws.notice_transformer.services.notice_transformer import transform_notice_by_id

NOTICE_ID = "notice_id"
MAPPING_SUITE_ID = "mapping_suite_id"
Expand All @@ -37,22 +41,39 @@ def _normalise_notice_metadata():

def _check_eligibility_for_transformation():
notice_id = pull_dag_upstream(NOTICE_ID)
notice_id = notice_id + "_checked"
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
notice_id, mapping_suite_id = notice_eligibility_checker_by_id(notice_id=notice_id,
notice_repository=notice_repository,
mapping_suite_repository=mapping_suite_repository)
push_dag_downstream(NOTICE_ID, notice_id)
push_dag_downstream(MAPPING_SUITE_ID, "mapping_suite_id")
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)

def _preprocess_xml_manifestation():
notice_id = pull_dag_upstream(NOTICE_ID)
mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID)
notice_id = notice_id + "_preprocessed"
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notice = notice_repository.get(reference=notice_id)
notice.update_status_to(new_status=NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION)
notice_repository.update(notice=notice)
push_dag_downstream(NOTICE_ID, notice_id)
pull_dag_upstream(MAPPING_SUITE_ID, mapping_suite_id)
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)

def _transform_notice():
notice_id = pull_dag_upstream(NOTICE_ID)
mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID)
notice_id = notice_id + "_transformed"
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
rml_mapper = RMLMapper(rml_mapper_path=config.RML_MAPPER_PATH)
transform_notice_by_id(notice_id=notice_id, mapping_suite_id=mapping_suite_id,
notice_repository=notice_repository, mapping_suite_repository=mapping_suite_repository,
rml_mapper=rml_mapper
)
push_dag_downstream(NOTICE_ID, notice_id)
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)

def _resolve_entities_in_the_rdf_manifestation():
notice_id = pull_dag_upstream(NOTICE_ID)
Expand Down Expand Up @@ -90,9 +111,17 @@ def _fail_on_state():

def _check_notice_state_before_transform():
notice_id = pull_dag_upstream(NOTICE_ID)
mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID)
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notice = notice_repository.get(reference=notice_id)
push_dag_downstream(NOTICE_ID, notice_id)
status = NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION
if status == NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION:
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)
print(notice.status)
print(NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION)
print(notice.status == NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION)
if notice.status == NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION:
print("Go to preprocess_xml_manifestation")
return "preprocess_xml_manifestation"
else:
return "fail_on_state"
Expand Down
6 changes: 2 additions & 4 deletions ted_sws/core/model/notice.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,8 @@ class Notice(WorkExpression):
_status: NoticeStatus = NoticeStatus.RAW # PrivateAttr(default=NoticeStatus.RAW)
ted_id: str = Field(..., allow_mutation=False)
original_metadata: Optional[TEDMetadata] = None
_normalised_metadata: Optional[NormalisedMetadata] = None

xml_manifestation: XMLManifestation = Field(..., allow_mutation=False)
_normalised_metadata: Optional[NormalisedMetadata] = None
_preprocessed_xml_manifestation: Optional[XMLManifestation] = None
_distilled_rdf_manifestation: Optional[RDFManifestation] = None
_rdf_manifestation: Optional[RDFManifestation] = None
Expand All @@ -178,8 +177,7 @@ def rdf_manifestation(self) -> RDFManifestation:
def mets_manifestation(self) -> METSManifestation:
return self._mets_manifestation

@property
def rdf_validation(self) -> RDFValidationManifestation:
def get_rdf_validation(self) -> RDFValidationManifestation:
if not self.rdf_manifestation:
return None

Expand Down
36 changes: 32 additions & 4 deletions ted_sws/data_manager/adapters/notice_repository.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
from typing import Iterator
from typing import Iterator, Union
from pymongo import MongoClient

from ted_sws import config
from ted_sws.core.model.manifestation import XMLManifestation, RDFManifestation, METSManifestation
from ted_sws.core.model.metadata import NormalisedMetadata
from ted_sws.data_manager.adapters.repository_abc import NoticeRepositoryABC
from ted_sws.core.model.notice import Notice, NoticeStatus

Expand All @@ -23,6 +25,32 @@ def __init__(self, mongodb_client: MongoClient, database_name: str = _database_n
notice_db = mongodb_client[self._database_name]
self.collection = notice_db[self._collection_name]


@staticmethod
def _create_notice_from_repository_result(notice_dict: dict) -> Union[Notice, None]:
"""

:param notice_dict:
:return:
"""

def init_object_from_dict(object_class, key):
if notice_dict[key]:
return object_class(**notice_dict[key])
return None

if notice_dict:
del notice_dict["_id"]
notice = Notice(**notice_dict)
notice._status = NoticeStatus(notice_dict["status"])
notice._normalised_metadata= init_object_from_dict(NormalisedMetadata,"normalised_metadata")
notice._preprocessed_xml_manifestation = init_object_from_dict(XMLManifestation,"preprocessed_xml_manifestation")
notice._distilled_rdf_manifestation = init_object_from_dict(RDFManifestation,"distilled_rdf_manifestation")
notice._rdf_manifestation = init_object_from_dict(RDFManifestation,"rdf_manifestation")
notice._mets_manifestation = init_object_from_dict(METSManifestation,"mets_manifestation")
return notice
return None

def add(self, notice: Notice):
"""
This method allows you to add notice objects to the repository.
Expand Down Expand Up @@ -50,7 +78,7 @@ def get(self, reference) -> Notice:
:return: Notice
"""
result_dict = self.collection.find_one({"ted_id": reference})
return Notice(**result_dict) if result_dict else None
return NoticeRepository._create_notice_from_repository_result(result_dict)

def get_notice_by_status(self, notice_status: NoticeStatus) -> Iterator[Notice]:
"""
Expand All @@ -59,12 +87,12 @@ def get_notice_by_status(self, notice_status: NoticeStatus) -> Iterator[Notice]:
:return:
"""
for result_dict in self.collection.find({"status": notice_status}):
yield Notice(**result_dict)
yield NoticeRepository._create_notice_from_repository_result(result_dict)

def list(self) -> Iterator[Notice]:
"""
This method allows all records to be retrieved from the repository.
:return: list of notices
"""
for result_dict in self.collection.find():
yield Notice(**result_dict)
yield NoticeRepository._create_notice_from_repository_result(result_dict)
5 changes: 3 additions & 2 deletions ted_sws/notice_eligibility/services/notice_eligibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,6 @@ def notice_eligibility_checker_by_id(notice_id: str, notice_repository: NoticeRe
notice = notice_repository.get(reference=notice_id)
if notice is None:
raise ValueError(f'Notice, with {notice_id} id, was not found')

return notice_eligibility_checker(notice=notice, mapping_suite_repository=mapping_suite_repository)
result = notice_eligibility_checker(notice=notice, mapping_suite_repository=mapping_suite_repository)
notice_repository.update(notice=notice)
return result
27 changes: 26 additions & 1 deletion ted_sws/notice_transformer/services/notice_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
from ted_sws.core.model.notice import Notice, NoticeStatus
from ted_sws.core.model.transform import MappingSuite, FileResource
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryInFileSystem
from ted_sws.data_manager.adapters.repository_abc import NoticeRepositoryABC, MappingSuiteRepositoryABC
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapperABC, SerializationFormat as RMLSerializationFormat

DATA_SOURCE_PACKAGE = "data"


def transform_notice(notice: Notice, mapping_suite: MappingSuite, rml_mapper: RMLMapperABC):
def transform_notice(notice: Notice, mapping_suite: MappingSuite, rml_mapper: RMLMapperABC) -> Notice:
"""
This function allows the XML content of a Notice to be transformed into RDF,
using the mapping rules in mapping_suite and the rml_mapper mapping adapter.
Expand All @@ -27,6 +28,30 @@ def transform_notice(notice: Notice, mapping_suite: MappingSuite, rml_mapper: RM
return NoticeTransformer(mapping_suite=mapping_suite, rml_mapper=rml_mapper).transform_notice(notice=notice)


def transform_notice_by_id(notice_id: str, mapping_suite_id: str, notice_repository: NoticeRepositoryABC,
mapping_suite_repository: MappingSuiteRepositoryABC, rml_mapper: RMLMapperABC):
"""
This function allows the XML content of a Notice to be transformed into RDF,
using the mapping rules in mapping_suite and the rml_mapper mapping adapter.
:param notice_id:
:param mapping_suite_id:
:param notice_repository:
:param mapping_suite_repository:
:param rml_mapper:
:return:
"""
notice = notice_repository.get(reference=notice_id)
mapping_suite = mapping_suite_repository.get(reference=mapping_suite_id)
if notice is None:
raise ValueError(f'Notice, with {notice_id} id, was not found')

if mapping_suite is None:
raise ValueError(f'Mapping suite, with {mapping_suite_id} id, was not found')

result_notice = transform_notice(notice=notice, mapping_suite=mapping_suite, rml_mapper=rml_mapper)
notice_repository.update(notice=result_notice)


def transform_test_data(mapping_suite: MappingSuite, rml_mapper: RMLMapperABC, output_path: Path,
loggable: bool = False):
"""
Expand Down
4 changes: 2 additions & 2 deletions tests/features/model/test_notice_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def step_impl(raw_notice):

@then("notice contains no RDF validation")
def step_impl(transformation_eligible_notice):
assert transformation_eligible_notice.rdf_validation is None
assert transformation_eligible_notice.get_rdf_validation() is None


@then("notice contains no METS manifestation")
Expand Down Expand Up @@ -210,7 +210,7 @@ def step_impl(transformation_eligible_notice, rdf_validation):

@then("the notice object contains the RDF validation report")
def step_impl(transformation_eligible_notice):
assert transformation_eligible_notice.rdf_validation is not None
assert transformation_eligible_notice.get_rdf_validation() is not None


@then("the notice status is VALIDATED")
Expand Down
7 changes: 7 additions & 0 deletions tests/unit/notice_transformer/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import mongomock
import pymongo
import pytest

from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryInFileSystem
Expand Down Expand Up @@ -45,3 +47,8 @@ def fake_mapping_suite(fake_repository_path, fake_mapping_suite_id) -> MappingSu
repository_path = fake_repository_path
mapping_suite_repository = MappingSuiteRepositoryInFileSystem(repository_path=repository_path)
return mapping_suite_repository.get(reference=fake_mapping_suite_id)

@pytest.fixture
@mongomock.patch(servers=(('server.example.com', 27017),))
def mongodb_client():
return pymongo.MongoClient('server.example.com')
31 changes: 30 additions & 1 deletion tests/unit/notice_transformer/test_notice_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
import tempfile
from collections import Counter

import pytest

from ted_sws.core.model.notice import NoticeStatus
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
from ted_sws.notice_transformer.services.notice_transformer import NoticeTransformer, transform_notice, \
transform_test_data
transform_test_data, transform_notice_by_id


def test_notice_transformer(fake_rml_mapper, fake_mapping_suite, notice_2018):
Expand All @@ -20,6 +23,32 @@ def test_notice_transformer_function(fake_rml_mapper, fake_mapping_suite, notice
assert result_notice.status == NoticeStatus.TRANSFORMED


def test_notice_transformer_by_id_function(fake_rml_mapper, mongodb_client, fake_mapping_suite, notice_2018,
notice_repository):
notice_2018._status = NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION
notice_repository.add(notice=notice_2018)
notice_id = notice_2018.ted_id
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
mapping_suite_repository.add(mapping_suite=fake_mapping_suite)
transform_notice_by_id(notice_id, fake_mapping_suite.identifier, notice_repository, mapping_suite_repository,
fake_rml_mapper)
result_notice = notice_repository.get(reference=notice_id)
assert result_notice.status == NoticeStatus.TRANSFORMED


def test_notice_transformer_by_id_function_with_invalid_ids(fake_rml_mapper, mongodb_client, fake_mapping_suite,
notice_2018,
notice_repository):
notice_2018._status = NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION
notice_id = notice_2018.ted_id
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
with pytest.raises(Exception) as e_info:
transform_notice_by_id(notice_id, fake_mapping_suite.identifier, notice_repository, mapping_suite_repository,
fake_rml_mapper)
result_notice = notice_repository.get(reference=notice_id)
assert result_notice is None


def test_transform_test_data(fake_rml_mapper, fake_mapping_suite):
notice_transformer = NoticeTransformer(mapping_suite=fake_mapping_suite, rml_mapper=fake_rml_mapper)
with tempfile.TemporaryDirectory() as d:
Expand Down