diff --git a/dags/worker_single_notice_process_orchestrator.py b/dags/worker_single_notice_process_orchestrator.py index a1e79bb81..9b19496d1 100644 --- a/dags/worker_single_notice_process_orchestrator.py +++ b/dags/worker_single_notice_process_orchestrator.py @@ -4,6 +4,7 @@ from dags.dags_utils import pull_dag_upstream, push_dag_downstream from ted_sws import config from ted_sws.core.model.notice import NoticeStatus +from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB from ted_sws.data_manager.adapters.notice_repository import NoticeRepository from ted_sws.metadata_normaliser.services.metadata_normalizer import normalise_notice_by_id @@ -11,6 +12,9 @@ from airflow.operators.python import get_current_context, BranchPythonOperator, PythonOperator from dags import DEFAULT_DAG_ARGUMENTS +from ted_sws.notice_eligibility.services.notice_eligibility import notice_eligibility_checker_by_id +from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper +from ted_sws.notice_transformer.services.notice_transformer import transform_notice_by_id NOTICE_ID = "notice_id" MAPPING_SUITE_ID = "mapping_suite_id" @@ -37,22 +41,39 @@ def _normalise_notice_metadata(): def _check_eligibility_for_transformation(): notice_id = pull_dag_upstream(NOTICE_ID) - notice_id = notice_id + "_checked" + mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) + notice_repository = NoticeRepository(mongodb_client=mongodb_client) + mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client) + notice_id, mapping_suite_id = notice_eligibility_checker_by_id(notice_id=notice_id, + notice_repository=notice_repository, + mapping_suite_repository=mapping_suite_repository) push_dag_downstream(NOTICE_ID, notice_id) - push_dag_downstream(MAPPING_SUITE_ID, "mapping_suite_id") + push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id) def _preprocess_xml_manifestation(): notice_id = pull_dag_upstream(NOTICE_ID) mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID) - notice_id = notice_id + "_preprocessed" + mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) + notice_repository = NoticeRepository(mongodb_client=mongodb_client) + notice = notice_repository.get(reference=notice_id) + notice.update_status_to(new_status=NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION) + notice_repository.update(notice=notice) push_dag_downstream(NOTICE_ID, notice_id) - pull_dag_upstream(MAPPING_SUITE_ID, mapping_suite_id) + push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id) def _transform_notice(): notice_id = pull_dag_upstream(NOTICE_ID) mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID) - notice_id = notice_id + "_transformed" + mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) + notice_repository = NoticeRepository(mongodb_client=mongodb_client) + mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client) + rml_mapper = RMLMapper(rml_mapper_path=config.RML_MAPPER_PATH) + transform_notice_by_id(notice_id=notice_id, mapping_suite_id=mapping_suite_id, + notice_repository=notice_repository, mapping_suite_repository=mapping_suite_repository, + rml_mapper=rml_mapper + ) push_dag_downstream(NOTICE_ID, notice_id) + push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id) def _resolve_entities_in_the_rdf_manifestation(): notice_id = pull_dag_upstream(NOTICE_ID) @@ -90,9 +111,17 @@ def _fail_on_state(): def _check_notice_state_before_transform(): notice_id = pull_dag_upstream(NOTICE_ID) + mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID) + mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) + notice_repository = NoticeRepository(mongodb_client=mongodb_client) + notice = notice_repository.get(reference=notice_id) push_dag_downstream(NOTICE_ID, notice_id) - status = NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION - if status == NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION: + push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id) + print(notice.status) + print(NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION) + print(notice.status == NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION) + if notice.status == NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION: + print("Go to preprocess_xml_manifestation") return "preprocess_xml_manifestation" else: return "fail_on_state" diff --git a/ted_sws/core/model/notice.py b/ted_sws/core/model/notice.py index 059eb6ac8..c82b8fbb7 100644 --- a/ted_sws/core/model/notice.py +++ b/ted_sws/core/model/notice.py @@ -150,9 +150,8 @@ class Notice(WorkExpression): _status: NoticeStatus = NoticeStatus.RAW # PrivateAttr(default=NoticeStatus.RAW) ted_id: str = Field(..., allow_mutation=False) original_metadata: Optional[TEDMetadata] = None - _normalised_metadata: Optional[NormalisedMetadata] = None - xml_manifestation: XMLManifestation = Field(..., allow_mutation=False) + _normalised_metadata: Optional[NormalisedMetadata] = None _preprocessed_xml_manifestation: Optional[XMLManifestation] = None _distilled_rdf_manifestation: Optional[RDFManifestation] = None _rdf_manifestation: Optional[RDFManifestation] = None @@ -178,8 +177,7 @@ def rdf_manifestation(self) -> RDFManifestation: def mets_manifestation(self) -> METSManifestation: return self._mets_manifestation - @property - def rdf_validation(self) -> RDFValidationManifestation: + def get_rdf_validation(self) -> RDFValidationManifestation: if not self.rdf_manifestation: return None diff --git a/ted_sws/data_manager/adapters/notice_repository.py b/ted_sws/data_manager/adapters/notice_repository.py index fdccafb31..a0a31c2a6 100644 --- a/ted_sws/data_manager/adapters/notice_repository.py +++ b/ted_sws/data_manager/adapters/notice_repository.py @@ -1,8 +1,10 @@ import logging -from typing import Iterator +from typing import Iterator, Union from pymongo import MongoClient from ted_sws import config +from ted_sws.core.model.manifestation import XMLManifestation, RDFManifestation, METSManifestation +from ted_sws.core.model.metadata import NormalisedMetadata from ted_sws.data_manager.adapters.repository_abc import NoticeRepositoryABC from ted_sws.core.model.notice import Notice, NoticeStatus @@ -23,6 +25,32 @@ def __init__(self, mongodb_client: MongoClient, database_name: str = _database_n notice_db = mongodb_client[self._database_name] self.collection = notice_db[self._collection_name] + + @staticmethod + def _create_notice_from_repository_result(notice_dict: dict) -> Union[Notice, None]: + """ + + :param notice_dict: + :return: + """ + + def init_object_from_dict(object_class, key): + if notice_dict[key]: + return object_class(**notice_dict[key]) + return None + + if notice_dict: + del notice_dict["_id"] + notice = Notice(**notice_dict) + notice._status = NoticeStatus(notice_dict["status"]) + notice._normalised_metadata= init_object_from_dict(NormalisedMetadata,"normalised_metadata") + notice._preprocessed_xml_manifestation = init_object_from_dict(XMLManifestation,"preprocessed_xml_manifestation") + notice._distilled_rdf_manifestation = init_object_from_dict(RDFManifestation,"distilled_rdf_manifestation") + notice._rdf_manifestation = init_object_from_dict(RDFManifestation,"rdf_manifestation") + notice._mets_manifestation = init_object_from_dict(METSManifestation,"mets_manifestation") + return notice + return None + def add(self, notice: Notice): """ This method allows you to add notice objects to the repository. @@ -50,7 +78,7 @@ def get(self, reference) -> Notice: :return: Notice """ result_dict = self.collection.find_one({"ted_id": reference}) - return Notice(**result_dict) if result_dict else None + return NoticeRepository._create_notice_from_repository_result(result_dict) def get_notice_by_status(self, notice_status: NoticeStatus) -> Iterator[Notice]: """ @@ -59,7 +87,7 @@ def get_notice_by_status(self, notice_status: NoticeStatus) -> Iterator[Notice]: :return: """ for result_dict in self.collection.find({"status": notice_status}): - yield Notice(**result_dict) + yield NoticeRepository._create_notice_from_repository_result(result_dict) def list(self) -> Iterator[Notice]: """ @@ -67,4 +95,4 @@ def list(self) -> Iterator[Notice]: :return: list of notices """ for result_dict in self.collection.find(): - yield Notice(**result_dict) + yield NoticeRepository._create_notice_from_repository_result(result_dict) diff --git a/ted_sws/notice_eligibility/services/notice_eligibility.py b/ted_sws/notice_eligibility/services/notice_eligibility.py index b06fe5571..017bc4147 100644 --- a/ted_sws/notice_eligibility/services/notice_eligibility.py +++ b/ted_sws/notice_eligibility/services/notice_eligibility.py @@ -72,5 +72,6 @@ def notice_eligibility_checker_by_id(notice_id: str, notice_repository: NoticeRe notice = notice_repository.get(reference=notice_id) if notice is None: raise ValueError(f'Notice, with {notice_id} id, was not found') - - return notice_eligibility_checker(notice=notice, mapping_suite_repository=mapping_suite_repository) + result = notice_eligibility_checker(notice=notice, mapping_suite_repository=mapping_suite_repository) + notice_repository.update(notice=notice) + return result diff --git a/ted_sws/notice_transformer/services/notice_transformer.py b/ted_sws/notice_transformer/services/notice_transformer.py index 676147003..f8ac4696d 100644 --- a/ted_sws/notice_transformer/services/notice_transformer.py +++ b/ted_sws/notice_transformer/services/notice_transformer.py @@ -10,12 +10,13 @@ from ted_sws.core.model.notice import Notice, NoticeStatus from ted_sws.core.model.transform import MappingSuite, FileResource from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryInFileSystem +from ted_sws.data_manager.adapters.repository_abc import NoticeRepositoryABC, MappingSuiteRepositoryABC from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapperABC, SerializationFormat as RMLSerializationFormat DATA_SOURCE_PACKAGE = "data" -def transform_notice(notice: Notice, mapping_suite: MappingSuite, rml_mapper: RMLMapperABC): +def transform_notice(notice: Notice, mapping_suite: MappingSuite, rml_mapper: RMLMapperABC) -> Notice: """ This function allows the XML content of a Notice to be transformed into RDF, using the mapping rules in mapping_suite and the rml_mapper mapping adapter. @@ -27,6 +28,30 @@ def transform_notice(notice: Notice, mapping_suite: MappingSuite, rml_mapper: RM return NoticeTransformer(mapping_suite=mapping_suite, rml_mapper=rml_mapper).transform_notice(notice=notice) +def transform_notice_by_id(notice_id: str, mapping_suite_id: str, notice_repository: NoticeRepositoryABC, + mapping_suite_repository: MappingSuiteRepositoryABC, rml_mapper: RMLMapperABC): + """ + This function allows the XML content of a Notice to be transformed into RDF, + using the mapping rules in mapping_suite and the rml_mapper mapping adapter. + :param notice_id: + :param mapping_suite_id: + :param notice_repository: + :param mapping_suite_repository: + :param rml_mapper: + :return: + """ + notice = notice_repository.get(reference=notice_id) + mapping_suite = mapping_suite_repository.get(reference=mapping_suite_id) + if notice is None: + raise ValueError(f'Notice, with {notice_id} id, was not found') + + if mapping_suite is None: + raise ValueError(f'Mapping suite, with {mapping_suite_id} id, was not found') + + result_notice = transform_notice(notice=notice, mapping_suite=mapping_suite, rml_mapper=rml_mapper) + notice_repository.update(notice=result_notice) + + def transform_test_data(mapping_suite: MappingSuite, rml_mapper: RMLMapperABC, output_path: Path, loggable: bool = False): """ diff --git a/tests/features/model/test_notice_operations.py b/tests/features/model/test_notice_operations.py index 7be2b9097..db2f47490 100644 --- a/tests/features/model/test_notice_operations.py +++ b/tests/features/model/test_notice_operations.py @@ -144,7 +144,7 @@ def step_impl(raw_notice): @then("notice contains no RDF validation") def step_impl(transformation_eligible_notice): - assert transformation_eligible_notice.rdf_validation is None + assert transformation_eligible_notice.get_rdf_validation() is None @then("notice contains no METS manifestation") @@ -210,7 +210,7 @@ def step_impl(transformation_eligible_notice, rdf_validation): @then("the notice object contains the RDF validation report") def step_impl(transformation_eligible_notice): - assert transformation_eligible_notice.rdf_validation is not None + assert transformation_eligible_notice.get_rdf_validation() is not None @then("the notice status is VALIDATED") diff --git a/tests/unit/notice_transformer/conftest.py b/tests/unit/notice_transformer/conftest.py index 39770b24e..f1aba7cfa 100644 --- a/tests/unit/notice_transformer/conftest.py +++ b/tests/unit/notice_transformer/conftest.py @@ -1,3 +1,5 @@ +import mongomock +import pymongo import pytest from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryInFileSystem @@ -45,3 +47,8 @@ def fake_mapping_suite(fake_repository_path, fake_mapping_suite_id) -> MappingSu repository_path = fake_repository_path mapping_suite_repository = MappingSuiteRepositoryInFileSystem(repository_path=repository_path) return mapping_suite_repository.get(reference=fake_mapping_suite_id) + +@pytest.fixture +@mongomock.patch(servers=(('server.example.com', 27017),)) +def mongodb_client(): + return pymongo.MongoClient('server.example.com') \ No newline at end of file diff --git a/tests/unit/notice_transformer/test_notice_transformer.py b/tests/unit/notice_transformer/test_notice_transformer.py index 010e418f0..55c9b605f 100644 --- a/tests/unit/notice_transformer/test_notice_transformer.py +++ b/tests/unit/notice_transformer/test_notice_transformer.py @@ -2,9 +2,12 @@ import tempfile from collections import Counter +import pytest + from ted_sws.core.model.notice import NoticeStatus +from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB from ted_sws.notice_transformer.services.notice_transformer import NoticeTransformer, transform_notice, \ - transform_test_data + transform_test_data, transform_notice_by_id def test_notice_transformer(fake_rml_mapper, fake_mapping_suite, notice_2018): @@ -20,6 +23,32 @@ def test_notice_transformer_function(fake_rml_mapper, fake_mapping_suite, notice assert result_notice.status == NoticeStatus.TRANSFORMED +def test_notice_transformer_by_id_function(fake_rml_mapper, mongodb_client, fake_mapping_suite, notice_2018, + notice_repository): + notice_2018._status = NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION + notice_repository.add(notice=notice_2018) + notice_id = notice_2018.ted_id + mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client) + mapping_suite_repository.add(mapping_suite=fake_mapping_suite) + transform_notice_by_id(notice_id, fake_mapping_suite.identifier, notice_repository, mapping_suite_repository, + fake_rml_mapper) + result_notice = notice_repository.get(reference=notice_id) + assert result_notice.status == NoticeStatus.TRANSFORMED + + +def test_notice_transformer_by_id_function_with_invalid_ids(fake_rml_mapper, mongodb_client, fake_mapping_suite, + notice_2018, + notice_repository): + notice_2018._status = NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION + notice_id = notice_2018.ted_id + mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client) + with pytest.raises(Exception) as e_info: + transform_notice_by_id(notice_id, fake_mapping_suite.identifier, notice_repository, mapping_suite_repository, + fake_rml_mapper) + result_notice = notice_repository.get(reference=notice_id) + assert result_notice is None + + def test_transform_test_data(fake_rml_mapper, fake_mapping_suite): notice_transformer = NoticeTransformer(mapping_suite=fake_mapping_suite, rml_mapper=fake_rml_mapper) with tempfile.TemporaryDirectory() as d: