Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of _load_first_level for remote triplestores #756

Merged
merged 4 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 76 additions & 47 deletions osp/core/session/db/triplestore_wrapper_session.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""A session connecting to a backend which stores the CUDS in triples."""

import uuid
import rdflib
from abc import abstractmethod
from uuid import UUID

from rdflib import RDF, URIRef

from osp.core.utils.wrapper_development import create_from_triples
from osp.core.utils.general import iri_from_uid, uid_from_iri
from osp.core.utils.general import CUDS_IRI_PREFIX, iri_from_uid, uid_from_iri
from osp.core.session.sparql_backend import SPARQLBackend
from osp.core.session.db.db_wrapper_session import DbWrapperSession
from abc import abstractmethod


class TripleStoreWrapperSession(DbWrapperSession, SPARQLBackend):
Expand Down Expand Up @@ -48,71 +50,70 @@ def _apply_deleted(self, root_obj, buffer):

# OVERRIDE
def _load_from_backend(self, uids, expired=None):
for uid in uids:
iri = iri_from_uid(uid)
yield self._load_by_iri(iri)
yield from self._load_by_iri(*(iri_from_uid(uid) for uid in uids))

# OVERRIDE
def _load_first_level(self):
triple = (iri_from_uid(self.root), None, None)
root_iri = iri_from_uid(self.root)
zero_iri = iri_from_uid(UUID(int=0))
triple = (root_iri, None, None)
triple = next(self._substitute_root_iri([triple]))
iris = {
o for s, p, o in self._triples(triple)
if isinstance(o, rdflib.URIRef)
o
for s, p, o in self._triples(triple)
if isinstance(o, URIRef)
and self._is_cuds_iri_ontology(o)
and uid_from_iri(o) != uuid.UUID(int=0)
and o != zero_iri
}
iris.add(iri_from_uid(self.root))
for iri in iris:
self._load_by_iri(iri)
iris |= {root_iri}
for _ in self._load_by_iri(*iris):
pass # Just exhaust the iterator so that CUDS are actually loaded.

# OVERRIDE
def _load_by_oclass(self, oclass):
uids = {
uid_from_iri(s)
for s, _, _ in self._triples((None, rdflib.RDF.type, oclass.iri))
for s, _, _ in self._triples((None, RDF.type, oclass.iri))
}
uids = {x if x != uuid.UUID(int=0) else self. root for x in uids}
uids = {x if x != UUID(int=0) else self.root for x in uids}
yield from self._load_from_backend(uids)

def _substitute_root_iri(self, triples):
from osp.core.utils.general import CUDS_IRI_PREFIX
for triple in triples:
yield tuple(iri_from_uid(uuid.UUID(int=0))
yield tuple(iri_from_uid(UUID(int=0))
if x is not None and x.startswith(CUDS_IRI_PREFIX)
and uid_from_iri(x) == self.root else x
for x in triple)

def _substitute_zero_iri(self, triples):
from osp.core.utils.general import CUDS_IRI_PREFIX
for triple in triples:
yield tuple(iri_from_uid(self.root)
if x is not None and x.startswith(CUDS_IRI_PREFIX)
and uid_from_iri(x) == uuid.UUID(int=0) else x
and uid_from_iri(x) == UUID(int=0) else x
for x in triple)

def _load_by_iri(self, iri):
"""Load the CUDS object wit the given IRI.
def _load_by_iri(self, *iris: URIRef):
"""Load the CUDS objects with the given IRIs.

Args:
iri (rdflib.IRI): The IRI of the CUDS object to oad.
iris: The IRIs of the CUDS objects to oad.

Returns:
Cuds - The CUDS object with the given IRI.
"""
if iri == iri_from_uid(self.root):
iri = iri_from_uid(uuid.UUID(int=0))
triples, neighbor_triples = self._load_triples_for_iri(iri)

triples = self._substitute_zero_iri(triples)
neighbor_triples = self._substitute_zero_iri(neighbor_triples)

return create_from_triples(
triples=triples,
neighbor_triples=neighbor_triples,
session=self,
fix_neighbors=False
)
root_iri = iri_from_uid(self.root)
zero_iri = iri_from_uid(UUID(int=0))
iris = map(lambda x: x if x != root_iri else zero_iri, iris)

for triples, neighbor_triples in self._load_triples_for_iris(*iris):
triples = self._substitute_zero_iri(triples)
neighbor_triples = self._substitute_zero_iri(neighbor_triples)
yield create_from_triples(
triples=triples,
neighbor_triples=neighbor_triples,
session=self,
fix_neighbors=False
)

@abstractmethod
def _triples(self, pattern):
Expand Down Expand Up @@ -145,17 +146,45 @@ def _remove(self, pattern):
Each can be None.
"""

def _load_triples_for_iri(self, iri):
"""Load the all triples for the CUDS object with the given IRI.
def _load_triples_for_iris(self, *iris: URIRef):
"""Load the all triples for the CUDS objects with the given IRIs.

Args:
iri (Tuple): The IRI of the CUDS object to load the triples for.
iris: The IRIs of the CUDS objects to load the triples for.
"""
triples = set(self._triples((iri, None, None)))
type_triples_of_neighbors = set()
for s, p, o in triples:
if isinstance(o, rdflib.URIRef):
type_triples_of_neighbors |= set(
self._triples((o, rdflib.RDF.type, None))
)
return triples, type_triples_of_neighbors
if not iris:
yield from ()

try: # Triples via SPARQL (fast with remote store, only one request).
query_string_template = f"""
SELECT ?s ?p ?o ?t WHERE {{
?s ?p ?o .
VALUES ?s {{ %s }}
OPTIONAL {{ ?o <{RDF.type}> ?t . }}
}}
"""
query_result = self._sparql(
query_string_template
% ' '.join((f"<{s}>" for s in iris))
)
result = dict()
for row in query_result:
result[row['s']] = result.get(row['s'], (set(), set()))
triples, type_triples_of_neighbors = result[row['s']]
triples |= {(row['s'], row['p'], row['o'])}
if row['t'] is not None:
type_triples_of_neighbors |= {(row['o'],
RDF.type,
row['t'])}
del query_result
yield from result.values()
except NotImplementedError: # Fall back to triple patterns.
for iri in iris:
triples = set(self._triples((iri, None, None)))
type_triples_of_neighbors = {
triple
for _, __, o in triples
if isinstance(o, URIRef)
for triple in self._triples((o, RDF.type, None))
}
yield triples, type_triples_of_neighbors
28 changes: 15 additions & 13 deletions osp/core/utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
These are potentially useful for every user of SimPhoNy.
"""

from typing import Optional, Union, TextIO, List
import io
import itertools
import json
import logging
import requests
import io
import pathlib
import json
import uuid
from typing import Optional, Union, TextIO, List
from uuid import UUID

import requests
from rdflib.parser import Parser as RDFLib_Parser
from rdflib.serializer import Serializer as RDFLib_Serializer
from rdflib.plugin import get as get_plugin
Expand All @@ -31,10 +32,11 @@ def _silent_warn(*args, **kwargs) -> None:
warnings.warn = _silent_warn
from rdflib_jsonld.parser import to_rdf as json_to_rdf
warnings.warn = warn

from osp.core.namespaces import cuba
from osp.core.ontology.cuba import rdflib_cuba
from osp.core.ontology.relationship import OntologyRelationship
from osp.core.ontology.datatypes import convert_from
from osp.core.ontology.relationship import OntologyRelationship


CUDS_IRI_PREFIX = "http://www.osp-core.com/cuds#"
Expand Down Expand Up @@ -220,7 +222,7 @@ def _deserialize_cuds_object(json_doc, session=None, buffer_context=None):
first = g.value(rdflib_cuba._serialization, RDF.first)
if first: # return the element marked as first later
try:
first = uuid.UUID(hex=first)
first = UUID(hex=first)
except ValueError:
first = URIRef(first)
g.remove((rdflib_cuba._serialization, RDF.first, None))
Expand Down Expand Up @@ -267,7 +269,7 @@ def _import_rdf_file(path, format="xml", session=None, buffer_context=None):
first = g.value(rdflib_cuba._serialization, RDF.first)
if first: # return the element marked as first later
try:
first = uuid.UUID(hex=first)
first = UUID(hex=first)
except ValueError:
first = URIRef(first)
g.remove((rdflib_cuba._serialization, RDF.first, None))
Expand All @@ -284,16 +286,16 @@ def _import_rdf_file(path, format="xml", session=None, buffer_context=None):
# Internal utilities (not user-facing).


def iri_from_uid(uid):
def iri_from_uid(uid: Union[UUID, URIRef]) -> URIRef:
"""Transform an uid to an IRI.

Args:
uid (Union[UUID, URIRef]): The UUID to transform.
uid: The UUID to transform.

Returns:
URIRef: The IRI of the CUDS object with the given UUID.
The IRI of the CUDS object with the given UUID.
"""
if type(uid) is uuid.UUID:
if type(uid) is UUID:
return URIRef(CUDS_IRI_PREFIX + str(uid))
else:
return uid
Expand All @@ -310,7 +312,7 @@ def uid_from_iri(iri):
"""
if iri.startswith(CUDS_IRI_PREFIX):
try:
return uuid.UUID(hex=str(iri)[len(CUDS_IRI_PREFIX):])
return UUID(hex=str(iri)[len(CUDS_IRI_PREFIX):])
except ValueError as e:
raise ValueError(f"Unable to transform {iri} to uid.") \
from e
Expand Down