Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add procedure deduplication #348

Merged
merged 3 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dags/pipelines/notice_batch_processor_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import List
from pymongo import MongoClient

from ted_sws.master_data_registry.services.entity_deduplication import deduplicate_procedure_entities

CET_URIS = ["http://www.w3.org/ns/org#Organization"]
PROCEDURE_CET_URI = "http://data.europa.eu/a4g/ontology#Procedure"


def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: MongoClient) -> List[str]:
Expand All @@ -23,6 +26,7 @@ def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: M
notices.append(notice)
for cet_uri in CET_URIS:
deduplicate_entities_by_cet_uri(notices=notices, cet_uri=cet_uri)
deduplicate_procedure_entities(notices=notices, procedure_cet_uri=PROCEDURE_CET_URI, mongodb_client=mongodb_client)
for notice in notices:
notice_repository.update(notice=notice)
return notice_ids
51 changes: 49 additions & 2 deletions ted_sws/master_data_registry/services/entity_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
from io import StringIO
from typing import List, Set, Tuple, Dict
import rdflib
from pymongo import MongoClient
from rdflib import RDF, URIRef, OWL
from collections import defaultdict

from ted_sws.alignment_oracle.services.generate_alignment_links import generate_alignment_links, TURTLE_SOURCE_DATA_TYPE
from ted_sws.alignment_oracle.services.limes_config_resolver import get_limes_config_generator_by_cet_uri
from ted_sws.core.model.notice import Notice
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.data_manager.adapters.sparql_endpoint import SPARQLStringEndpoint
from ted_sws.data_manager.adapters.triple_store import FusekiAdapter, TripleStoreABC, FusekiException, \
FUSEKI_REPOSITORY_ALREADY_EXIST_ERROR_MSG
from ted_sws.event_manager.services.log import log_error
from ted_sws.master_data_registry.services.rdf_fragment_processor import get_rdf_fragments_by_cet_uri_from_notices, \
merge_rdf_fragments_into_graph, write_rdf_fragments_in_triple_store, RDF_FRAGMENT_FROM_NOTICE_PROPERTY
merge_rdf_fragments_into_graph, write_rdf_fragments_in_triple_store, RDF_FRAGMENT_FROM_NOTICE_PROPERTY, \
get_subjects_by_cet_uri

MDR_TEMPORARY_FUSEKI_DATASET_NAME = "tmp_mdr_dataset"
MDR_FUSEKI_DATASET_NAME = "mdr_dataset"
Expand Down Expand Up @@ -81,7 +86,7 @@ def copy_rdf_graph(graph: rdflib.Graph) -> rdflib.Graph:
if found_transition:
alignment_graph = copy_rdf_graph(graph=reduced_graph)
canonical_cets = [canonical_cet for canonical_cet in canonical_cets
if next(alignment_graph.triples(triple=(None,OWL.sameAs , canonical_cet)),None)]
if next(alignment_graph.triples(triple=(None, OWL.sameAs, canonical_cet)), None)]
return reduced_graph


Expand Down Expand Up @@ -218,3 +223,45 @@ def deduplicate_entities_by_cet_uri(notices: List[Notice], cet_uri: str,
alignment_graph=cet_alignment_links)
inject_similarity_links_in_notices(notices=notices, cet_rdf_fragments_dict=new_canonical_cet_fragments_dict,
alignment_graph=cet_alignment_links, inject_reflexive_links=True)


def deduplicate_procedure_entities(notices: List[Notice], procedure_cet_uri: str, mongodb_client: MongoClient):
"""
This function deduplicate procedure entities for each notice from batch of notices.
:param notices:
:param procedure_cet_uri:
:param mongodb_client:
:return:
"""
notice_families = defaultdict(list)
for notice in notices:
if notice.original_metadata and notice.original_metadata.RN:
parent_notice_id = notice.original_metadata.RN[0]
parent_notice_id = f"{parent_notice_id[4:]}-{parent_notice_id[:4]}"
notice_families[parent_notice_id].append(notice)

parent_uries = {}
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
for parent_notice_id in notice_families.keys():
parent_notice = notice_repository.get(reference=parent_notice_id)
if parent_notice and parent_notice.rdf_manifestation and parent_notice.rdf_manifestation.object_data:
rdf_content = parent_notice.rdf_manifestation.object_data
sparql_endpoint = SPARQLStringEndpoint(rdf_content=rdf_content)
result_uris = get_subjects_by_cet_uri(sparql_endpoint=sparql_endpoint, cet_uri=procedure_cet_uri)
assert len(result_uris) == 1
parent_procedure_uri = rdflib.URIRef(result_uris[0])
parent_uries[parent_notice_id] = parent_procedure_uri

for parent_uri_key in parent_uries.keys():
parent_uri = parent_uries[parent_uri_key]
for child_notice in notice_families[parent_uri_key]:
rdf_content = child_notice.rdf_manifestation.object_data
sparql_endpoint = SPARQLStringEndpoint(rdf_content=rdf_content)
result_uris = get_subjects_by_cet_uri(sparql_endpoint=sparql_endpoint, cet_uri=procedure_cet_uri)
assert len(result_uris) == 1
child_procedure_uri = rdflib.URIRef(result_uris[0])
inject_links = rdflib.Graph()
inject_links.add((child_procedure_uri, OWL.sameAs, parent_uri))
child_notice.distilled_rdf_manifestation.object_data = '\n'.join(
[child_notice.distilled_rdf_manifestation.object_data,
str(inject_links.serialize(format="nt"))])
5 changes: 5 additions & 0 deletions tests/e2e/master_data_registry/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ def rdf_content(rdf_file_path) -> str:
@pytest.fixture
def organisation_cet_uri() -> str:
return "http://www.w3.org/ns/org#Organization"


@pytest.fixture
def procedure_cet_uri() -> str:
return "http://data.europa.eu/a4g/ontology#Procedure"
73 changes: 72 additions & 1 deletion tests/e2e/master_data_registry/test_entity_deduplication.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
from typing import Tuple

import rdflib
from rdflib import OWL

from ted_sws import config
from ted_sws.core.model.notice import Notice, NoticeStatus
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.data_manager.adapters.sparql_endpoint import SPARQLStringEndpoint
from ted_sws.data_manager.adapters.triple_store import FusekiAdapter
from ted_sws.master_data_registry.services.entity_deduplication import deduplicate_entities_by_cet_uri
from ted_sws.data_sampler.services.notice_xml_indexer import index_notice
from ted_sws.mapping_suite_processor.services.conceptual_mapping_processor import \
mapping_suite_processor_from_github_expand_and_load_package_in_mongo_db
from ted_sws.master_data_registry.services.entity_deduplication import deduplicate_entities_by_cet_uri, \
deduplicate_procedure_entities
from ted_sws.master_data_registry.services.rdf_fragment_processor import get_subjects_by_cet_uri
from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI
from ted_sws.notice_fetcher.services.notice_fetcher import NoticeFetcher
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice
from ted_sws.notice_metadata_processor.services.notice_eligibility import notice_eligibility_checker
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
from ted_sws.notice_transformer.services.notice_transformer import transform_notice

TEST_MDR_REPOSITORY = "tmp_mdr_test_repository"
TEST_QUERY_UNIQUE_NAMES = """SELECT distinct ?name
Expand All @@ -16,6 +34,8 @@
?s <http://www.meaningfy.ws/mdr#isCanonicalEntity> True .
}
"""
CHILD_NOTICE_ID = "003544-2021"
PARENT_NOTICE_ID = "445564-2020"


def test_deduplicate_entities_by_cet_uri(notice_with_rdf_manifestation, organisation_cet_uri):
Expand Down Expand Up @@ -51,3 +71,54 @@ def test_deduplicate_entities_by_cet_uri(notice_with_rdf_manifestation, organisa
assert str(triple[2]) in canonical_cets_set

fuseki_triple_store.delete_repository(repository_name=TEST_MDR_REPOSITORY)


def execute_transform_pipeline(notice_id: str, mongodb_client, procedure_cet_uri) -> Tuple[rdflib.URIRef, Notice]:
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notice_fetcher = NoticeFetcher(notice_repository=notice_repository,
ted_api_adapter=TedAPIAdapter(
request_api=TedRequestAPI()))

notice_fetcher.fetch_notice_by_id(notice_id)
notice = notice_repository.get(reference=notice_id)
indexed_notice = index_notice(notice=notice)
normalised_notice = normalise_notice(notice=indexed_notice)
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
result = notice_eligibility_checker(notice=normalised_notice, mapping_suite_repository=mapping_suite_repository)
assert result is not None
notice_id, mapping_suite_id = result
normalised_notice.update_status_to(new_status=NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION)
mapping_suite = mapping_suite_repository.get(reference=mapping_suite_id)
rml_mapper = RMLMapper(rml_mapper_path=config.RML_MAPPER_PATH)
transformed_notice = transform_notice(notice=normalised_notice, mapping_suite=mapping_suite, rml_mapper=rml_mapper)
transformed_notice.set_distilled_rdf_manifestation(
distilled_rdf_manifestation=transformed_notice.rdf_manifestation.copy())
notice_repository.update(notice=transformed_notice)
rdf_content = transformed_notice.rdf_manifestation.object_data
sparql_endpoint = SPARQLStringEndpoint(rdf_content=rdf_content)
result_uris = get_subjects_by_cet_uri(sparql_endpoint=sparql_endpoint, cet_uri=procedure_cet_uri)
assert len(result_uris) == 1
parent_procedure_uri = rdflib.URIRef(result_uris[0])

return parent_procedure_uri, transformed_notice


def test_deduplicate_procedure_entities(procedure_cet_uri, fake_mongodb_client):
mapping_suite_processor_from_github_expand_and_load_package_in_mongo_db(mongodb_client=fake_mongodb_client,
mapping_suite_package_name="package_F03")

child_procedure_uri, child_notice = execute_transform_pipeline(notice_id=CHILD_NOTICE_ID,
mongodb_client=fake_mongodb_client,
procedure_cet_uri=procedure_cet_uri)
parent_procedure_uri, parent_notice = execute_transform_pipeline(notice_id=PARENT_NOTICE_ID,
mongodb_client=fake_mongodb_client,
procedure_cet_uri=procedure_cet_uri)

deduplicate_procedure_entities(notices=[child_notice], procedure_cet_uri=procedure_cet_uri,
mongodb_client=fake_mongodb_client)

notice_rdf_content = child_notice.distilled_rdf_manifestation.object_data
notice_rdf_graph = rdflib.Graph()
notice_rdf_graph.parse(data=notice_rdf_content, format="ttl")

assert len(list(notice_rdf_graph.triples((child_procedure_uri, OWL.sameAs, parent_procedure_uri)))) == 1