Skip to content

Commit

Permalink
Merge pull request #79 from meaningfy-ws/feature/TED-295
Browse files Browse the repository at this point in the history
Feature/ted 295
  • Loading branch information
costezki authored Apr 22, 2022
2 parents 248738b + 2f29a6e commit ed33402
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 21 deletions.
43 changes: 36 additions & 7 deletions dags/worker_single_notice_process_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
from dags.dags_utils import pull_dag_upstream, push_dag_downstream
from ted_sws import config
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.metadata_normaliser.services.metadata_normalizer import normalise_notice_by_id

from airflow.decorators import dag
from airflow.operators.python import get_current_context, BranchPythonOperator, PythonOperator

from dags import DEFAULT_DAG_ARGUMENTS
from ted_sws.notice_eligibility.services.notice_eligibility import notice_eligibility_checker_by_id
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
from ted_sws.notice_transformer.services.notice_transformer import transform_notice_by_id

NOTICE_ID = "notice_id"
MAPPING_SUITE_ID = "mapping_suite_id"
Expand All @@ -37,22 +41,39 @@ def _normalise_notice_metadata():

def _check_eligibility_for_transformation():
notice_id = pull_dag_upstream(NOTICE_ID)
notice_id = notice_id + "_checked"
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
notice_id, mapping_suite_id = notice_eligibility_checker_by_id(notice_id=notice_id,
notice_repository=notice_repository,
mapping_suite_repository=mapping_suite_repository)
push_dag_downstream(NOTICE_ID, notice_id)
push_dag_downstream(MAPPING_SUITE_ID, "mapping_suite_id")
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)

def _preprocess_xml_manifestation():
notice_id = pull_dag_upstream(NOTICE_ID)
mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID)
notice_id = notice_id + "_preprocessed"
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notice = notice_repository.get(reference=notice_id)
notice.update_status_to(new_status=NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION)
notice_repository.update(notice=notice)
push_dag_downstream(NOTICE_ID, notice_id)
pull_dag_upstream(MAPPING_SUITE_ID, mapping_suite_id)
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)

def _transform_notice():
notice_id = pull_dag_upstream(NOTICE_ID)
mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID)
notice_id = notice_id + "_transformed"
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
rml_mapper = RMLMapper(rml_mapper_path=config.RML_MAPPER_PATH)
transform_notice_by_id(notice_id=notice_id, mapping_suite_id=mapping_suite_id,
notice_repository=notice_repository, mapping_suite_repository=mapping_suite_repository,
rml_mapper=rml_mapper
)
push_dag_downstream(NOTICE_ID, notice_id)
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)

def _resolve_entities_in_the_rdf_manifestation():
notice_id = pull_dag_upstream(NOTICE_ID)
Expand Down Expand Up @@ -90,9 +111,17 @@ def _fail_on_state():

def _check_notice_state_before_transform():
notice_id = pull_dag_upstream(NOTICE_ID)
mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID)
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notice = notice_repository.get(reference=notice_id)
push_dag_downstream(NOTICE_ID, notice_id)
status = NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION
if status == NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION:
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)
print(notice.status)
print(NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION)
print(notice.status == NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION)
if notice.status == NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION:
print("Go to preprocess_xml_manifestation")
return "preprocess_xml_manifestation"
else:
return "fail_on_state"
Expand Down
6 changes: 2 additions & 4 deletions ted_sws/core/model/notice.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,8 @@ class Notice(WorkExpression):
_status: NoticeStatus = NoticeStatus.RAW # PrivateAttr(default=NoticeStatus.RAW)
ted_id: str = Field(..., allow_mutation=False)
original_metadata: Optional[TEDMetadata] = None
_normalised_metadata: Optional[NormalisedMetadata] = None

xml_manifestation: XMLManifestation = Field(..., allow_mutation=False)
_normalised_metadata: Optional[NormalisedMetadata] = None
_preprocessed_xml_manifestation: Optional[XMLManifestation] = None
_distilled_rdf_manifestation: Optional[RDFManifestation] = None
_rdf_manifestation: Optional[RDFManifestation] = None
Expand All @@ -178,8 +177,7 @@ def rdf_manifestation(self) -> RDFManifestation:
def mets_manifestation(self) -> METSManifestation:
return self._mets_manifestation

@property
def rdf_validation(self) -> RDFValidationManifestation:
def get_rdf_validation(self) -> RDFValidationManifestation:
if not self.rdf_manifestation:
return None

Expand Down
36 changes: 32 additions & 4 deletions ted_sws/data_manager/adapters/notice_repository.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
from typing import Iterator
from typing import Iterator, Union
from pymongo import MongoClient

from ted_sws import config
from ted_sws.core.model.manifestation import XMLManifestation, RDFManifestation, METSManifestation
from ted_sws.core.model.metadata import NormalisedMetadata
from ted_sws.data_manager.adapters.repository_abc import NoticeRepositoryABC
from ted_sws.core.model.notice import Notice, NoticeStatus

Expand All @@ -23,6 +25,32 @@ def __init__(self, mongodb_client: MongoClient, database_name: str = _database_n
notice_db = mongodb_client[self._database_name]
self.collection = notice_db[self._collection_name]


@staticmethod
def _create_notice_from_repository_result(notice_dict: dict) -> Union[Notice, None]:
"""
:param notice_dict:
:return:
"""

def init_object_from_dict(object_class, key):
if notice_dict[key]:
return object_class(**notice_dict[key])
return None

if notice_dict:
del notice_dict["_id"]
notice = Notice(**notice_dict)
notice._status = NoticeStatus(notice_dict["status"])
notice._normalised_metadata= init_object_from_dict(NormalisedMetadata,"normalised_metadata")
notice._preprocessed_xml_manifestation = init_object_from_dict(XMLManifestation,"preprocessed_xml_manifestation")
notice._distilled_rdf_manifestation = init_object_from_dict(RDFManifestation,"distilled_rdf_manifestation")
notice._rdf_manifestation = init_object_from_dict(RDFManifestation,"rdf_manifestation")
notice._mets_manifestation = init_object_from_dict(METSManifestation,"mets_manifestation")
return notice
return None

def add(self, notice: Notice):
"""
This method allows you to add notice objects to the repository.
Expand Down Expand Up @@ -50,7 +78,7 @@ def get(self, reference) -> Notice:
:return: Notice
"""
result_dict = self.collection.find_one({"ted_id": reference})
return Notice(**result_dict) if result_dict else None
return NoticeRepository._create_notice_from_repository_result(result_dict)

def get_notice_by_status(self, notice_status: NoticeStatus) -> Iterator[Notice]:
"""
Expand All @@ -59,12 +87,12 @@ def get_notice_by_status(self, notice_status: NoticeStatus) -> Iterator[Notice]:
:return:
"""
for result_dict in self.collection.find({"status": notice_status}):
yield Notice(**result_dict)
yield NoticeRepository._create_notice_from_repository_result(result_dict)

def list(self) -> Iterator[Notice]:
"""
This method allows all records to be retrieved from the repository.
:return: list of notices
"""
for result_dict in self.collection.find():
yield Notice(**result_dict)
yield NoticeRepository._create_notice_from_repository_result(result_dict)
5 changes: 3 additions & 2 deletions ted_sws/notice_eligibility/services/notice_eligibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,6 @@ def notice_eligibility_checker_by_id(notice_id: str, notice_repository: NoticeRe
notice = notice_repository.get(reference=notice_id)
if notice is None:
raise ValueError(f'Notice, with {notice_id} id, was not found')

return notice_eligibility_checker(notice=notice, mapping_suite_repository=mapping_suite_repository)
result = notice_eligibility_checker(notice=notice, mapping_suite_repository=mapping_suite_repository)
notice_repository.update(notice=notice)
return result
27 changes: 26 additions & 1 deletion ted_sws/notice_transformer/services/notice_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
from ted_sws.core.model.notice import Notice, NoticeStatus
from ted_sws.core.model.transform import MappingSuite, FileResource
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryInFileSystem
from ted_sws.data_manager.adapters.repository_abc import NoticeRepositoryABC, MappingSuiteRepositoryABC
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapperABC, SerializationFormat as RMLSerializationFormat

DATA_SOURCE_PACKAGE = "data"


def transform_notice(notice: Notice, mapping_suite: MappingSuite, rml_mapper: RMLMapperABC):
def transform_notice(notice: Notice, mapping_suite: MappingSuite, rml_mapper: RMLMapperABC) -> Notice:
"""
This function allows the XML content of a Notice to be transformed into RDF,
using the mapping rules in mapping_suite and the rml_mapper mapping adapter.
Expand All @@ -27,6 +28,30 @@ def transform_notice(notice: Notice, mapping_suite: MappingSuite, rml_mapper: RM
return NoticeTransformer(mapping_suite=mapping_suite, rml_mapper=rml_mapper).transform_notice(notice=notice)


def transform_notice_by_id(notice_id: str, mapping_suite_id: str, notice_repository: NoticeRepositoryABC,
mapping_suite_repository: MappingSuiteRepositoryABC, rml_mapper: RMLMapperABC):
"""
This function allows the XML content of a Notice to be transformed into RDF,
using the mapping rules in mapping_suite and the rml_mapper mapping adapter.
:param notice_id:
:param mapping_suite_id:
:param notice_repository:
:param mapping_suite_repository:
:param rml_mapper:
:return:
"""
notice = notice_repository.get(reference=notice_id)
mapping_suite = mapping_suite_repository.get(reference=mapping_suite_id)
if notice is None:
raise ValueError(f'Notice, with {notice_id} id, was not found')

if mapping_suite is None:
raise ValueError(f'Mapping suite, with {mapping_suite_id} id, was not found')

result_notice = transform_notice(notice=notice, mapping_suite=mapping_suite, rml_mapper=rml_mapper)
notice_repository.update(notice=result_notice)


def transform_test_data(mapping_suite: MappingSuite, rml_mapper: RMLMapperABC, output_path: Path,
loggable: bool = False):
"""
Expand Down
4 changes: 2 additions & 2 deletions tests/features/model/test_notice_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def step_impl(raw_notice):

@then("notice contains no RDF validation")
def step_impl(transformation_eligible_notice):
assert transformation_eligible_notice.rdf_validation is None
assert transformation_eligible_notice.get_rdf_validation() is None


@then("notice contains no METS manifestation")
Expand Down Expand Up @@ -210,7 +210,7 @@ def step_impl(transformation_eligible_notice, rdf_validation):

@then("the notice object contains the RDF validation report")
def step_impl(transformation_eligible_notice):
assert transformation_eligible_notice.rdf_validation is not None
assert transformation_eligible_notice.get_rdf_validation() is not None


@then("the notice status is VALIDATED")
Expand Down
7 changes: 7 additions & 0 deletions tests/unit/notice_transformer/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import mongomock
import pymongo
import pytest

from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryInFileSystem
Expand Down Expand Up @@ -45,3 +47,8 @@ def fake_mapping_suite(fake_repository_path, fake_mapping_suite_id) -> MappingSu
repository_path = fake_repository_path
mapping_suite_repository = MappingSuiteRepositoryInFileSystem(repository_path=repository_path)
return mapping_suite_repository.get(reference=fake_mapping_suite_id)

@pytest.fixture
@mongomock.patch(servers=(('server.example.com', 27017),))
def mongodb_client():
return pymongo.MongoClient('server.example.com')
31 changes: 30 additions & 1 deletion tests/unit/notice_transformer/test_notice_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
import tempfile
from collections import Counter

import pytest

from ted_sws.core.model.notice import NoticeStatus
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
from ted_sws.notice_transformer.services.notice_transformer import NoticeTransformer, transform_notice, \
transform_test_data
transform_test_data, transform_notice_by_id


def test_notice_transformer(fake_rml_mapper, fake_mapping_suite, notice_2018):
Expand All @@ -20,6 +23,32 @@ def test_notice_transformer_function(fake_rml_mapper, fake_mapping_suite, notice
assert result_notice.status == NoticeStatus.TRANSFORMED


def test_notice_transformer_by_id_function(fake_rml_mapper, mongodb_client, fake_mapping_suite, notice_2018,
notice_repository):
notice_2018._status = NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION
notice_repository.add(notice=notice_2018)
notice_id = notice_2018.ted_id
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
mapping_suite_repository.add(mapping_suite=fake_mapping_suite)
transform_notice_by_id(notice_id, fake_mapping_suite.identifier, notice_repository, mapping_suite_repository,
fake_rml_mapper)
result_notice = notice_repository.get(reference=notice_id)
assert result_notice.status == NoticeStatus.TRANSFORMED


def test_notice_transformer_by_id_function_with_invalid_ids(fake_rml_mapper, mongodb_client, fake_mapping_suite,
notice_2018,
notice_repository):
notice_2018._status = NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION
notice_id = notice_2018.ted_id
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
with pytest.raises(Exception) as e_info:
transform_notice_by_id(notice_id, fake_mapping_suite.identifier, notice_repository, mapping_suite_repository,
fake_rml_mapper)
result_notice = notice_repository.get(reference=notice_id)
assert result_notice is None


def test_transform_test_data(fake_rml_mapper, fake_mapping_suite):
notice_transformer = NoticeTransformer(mapping_suite=fake_mapping_suite, rml_mapper=fake_rml_mapper)
with tempfile.TemporaryDirectory() as d:
Expand Down

0 comments on commit ed33402

Please sign in to comment.