From 8eb39d899a923dd72a00e24eddf2adbb47dac901 Mon Sep 17 00:00:00 2001 From: CaptainOfHacks Date: Tue, 29 Nov 2022 13:54:00 +0200 Subject: [PATCH 1/2] Update entity_deduplication.py --- .../services/entity_deduplication.py | 45 +++++++++++++------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/ted_sws/master_data_registry/services/entity_deduplication.py b/ted_sws/master_data_registry/services/entity_deduplication.py index 112b8d917..74e05a59f 100644 --- a/ted_sws/master_data_registry/services/entity_deduplication.py +++ b/ted_sws/master_data_registry/services/entity_deduplication.py @@ -1,7 +1,7 @@ import pathlib import tempfile from io import StringIO -from typing import List, Set, Tuple, Dict +from typing import List, Tuple, Dict import rdflib from pymongo import MongoClient from rdflib import RDF, URIRef, OWL @@ -12,9 +12,9 @@ from ted_sws.core.model.notice import Notice from ted_sws.data_manager.adapters.notice_repository import NoticeRepository from ted_sws.data_manager.adapters.sparql_endpoint import SPARQLStringEndpoint -from ted_sws.data_manager.adapters.triple_store import FusekiAdapter, TripleStoreABC, FusekiException, \ +from ted_sws.data_manager.adapters.triple_store import FusekiAdapter, TripleStoreABC, \ FUSEKI_REPOSITORY_ALREADY_EXIST_ERROR_MSG -from ted_sws.event_manager.services.log import log_error +from ted_sws.event_manager.services.log import log_error, log_notice_error from ted_sws.master_data_registry.services.rdf_fragment_processor import get_rdf_fragments_by_cet_uri_from_notices, \ merge_rdf_fragments_into_graph, write_rdf_fragments_in_triple_store, RDF_FRAGMENT_FROM_NOTICE_PROPERTY, \ get_subjects_by_cet_uri @@ -22,6 +22,7 @@ MDR_TEMPORARY_FUSEKI_DATASET_NAME = "tmp_mdr_dataset" MDR_FUSEKI_DATASET_NAME = "mdr_dataset" MDR_CANONICAL_CET_PROPERTY = rdflib.term.URIRef("http://www.meaningfy.ws/mdr#isCanonicalEntity") +DEDUPLICATE_PROCEDURE_ENTITIES_DOMAIN_ACTION = "deduplicate_procedure_entities" def generate_mdr_alignment_links(merged_rdf_fragments: rdflib.Graph, cet_uri: str, @@ -248,9 +249,18 @@ def deduplicate_procedure_entities(notices: List[Notice], procedure_cet_uri: str rdf_content = parent_notice.rdf_manifestation.object_data sparql_endpoint = SPARQLStringEndpoint(rdf_content=rdf_content) result_uris = get_subjects_by_cet_uri(sparql_endpoint=sparql_endpoint, cet_uri=procedure_cet_uri) - assert len(result_uris) == 1 - parent_procedure_uri = rdflib.URIRef(result_uris[0]) - parent_uries[parent_notice_id] = parent_procedure_uri + result_uris_len = len(result_uris) + if result_uris_len != 1: + notice_normalised_metadata = parent_notice.normalised_metadata + log_notice_error( + message=f"Parent notice with notice_id=[{parent_notice.ted_id}] have {result_uris_len} Procedure CETs!", + notice_id=parent_notice.ted_id, domain_action=DEDUPLICATE_PROCEDURE_ENTITIES_DOMAIN_ACTION, + notice_form_number=notice_normalised_metadata.form_number if notice_normalised_metadata else None, + notice_status=parent_notice.status, + notice_eforms_subtype=notice_normalised_metadata.eforms_subtype if notice_normalised_metadata else None) + else: + parent_procedure_uri = rdflib.URIRef(result_uris[0]) + parent_uries[parent_notice_id] = parent_procedure_uri for parent_uri_key in parent_uries.keys(): parent_uri = parent_uries[parent_uri_key] @@ -258,10 +268,19 @@ def deduplicate_procedure_entities(notices: List[Notice], procedure_cet_uri: str rdf_content = child_notice.rdf_manifestation.object_data sparql_endpoint = SPARQLStringEndpoint(rdf_content=rdf_content) result_uris = get_subjects_by_cet_uri(sparql_endpoint=sparql_endpoint, cet_uri=procedure_cet_uri) - assert len(result_uris) == 1 - child_procedure_uri = rdflib.URIRef(result_uris[0]) - inject_links = rdflib.Graph() - inject_links.add((child_procedure_uri, OWL.sameAs, parent_uri)) - child_notice.distilled_rdf_manifestation.object_data = '\n'.join( - [child_notice.distilled_rdf_manifestation.object_data, - str(inject_links.serialize(format="nt"))]) + result_uris_len = len(result_uris) + if result_uris_len != 1: + notice_normalised_metadata = child_notice.normalised_metadata + log_notice_error( + message=f"Child notice with notice_id=[{child_notice.ted_id}] have {result_uris_len} Procedure CETs!", + notice_id=child_notice.ted_id, domain_action=DEDUPLICATE_PROCEDURE_ENTITIES_DOMAIN_ACTION, + notice_form_number=notice_normalised_metadata.form_number if notice_normalised_metadata else None, + notice_status=child_notice.status, + notice_eforms_subtype=notice_normalised_metadata.eforms_subtype if notice_normalised_metadata else None) + else: + child_procedure_uri = rdflib.URIRef(result_uris[0]) + inject_links = rdflib.Graph() + inject_links.add((child_procedure_uri, OWL.sameAs, parent_uri)) + child_notice.distilled_rdf_manifestation.object_data = '\n'.join( + [child_notice.distilled_rdf_manifestation.object_data, + str(inject_links.serialize(format="nt"))]) From 071ad440a0a316aaffafa8eda253b879f9721076 Mon Sep 17 00:00:00 2001 From: CaptainOfHacks Date: Tue, 29 Nov 2022 14:15:53 +0200 Subject: [PATCH 2/2] fix aggregation query to allow disk usage --- .../services/create_batch_collection_materialised_view.py | 2 +- .../services/create_notice_collection_materialised_view.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ted_sws/data_manager/services/create_batch_collection_materialised_view.py b/ted_sws/data_manager/services/create_batch_collection_materialised_view.py index c0ef7c529..04003c672 100644 --- a/ted_sws/data_manager/services/create_batch_collection_materialised_view.py +++ b/ted_sws/data_manager/services/create_batch_collection_materialised_view.py @@ -30,4 +30,4 @@ def create_batch_collection_materialised_view(mongo_client: MongoClient): { "$out": NOTICE_PROCESS_BATCH_COLLECTION_NAME } - ]) + ], allowDiskUse=True) diff --git a/ted_sws/data_manager/services/create_notice_collection_materialised_view.py b/ted_sws/data_manager/services/create_notice_collection_materialised_view.py index fa790ac9d..0a008c379 100644 --- a/ted_sws/data_manager/services/create_notice_collection_materialised_view.py +++ b/ted_sws/data_manager/services/create_notice_collection_materialised_view.py @@ -40,7 +40,7 @@ def create_notice_collection_materialised_view(mongo_client: MongoClient): { "$out": NOTICES_MATERIALISED_VIEW_NAME } - ]) + ], allowDiskUse=True) materialised_view = database[NOTICES_MATERIALISED_VIEW_NAME] materialised_view.create_index([("created_at", DESCENDING)]) materialised_view.create_index([("publication_date", DESCENDING)]) @@ -80,4 +80,4 @@ def create_notice_kpi_collection(mongo_client: MongoClient): { "$out": NOTICE_KPI_COLLECTION_NAME } - ]) + ], allowDiskUse=True)