Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: set annotations in an async way when submitting a manifest #1440

Merged
merged 25 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pygsheets = "^2.0.4"
PyYAML = "^6.0.0"
rdflib = "^6.0.0"
setuptools = "^66.0.0"
synapseclient = "^4.1.0"
synapseclient = "^4.3.0"
tenacity = "^8.0.1"
toml = "^0.10.2"
great-expectations = "^0.15.0"
Expand All @@ -74,6 +74,8 @@ Flask = {version = "2.1.3", optional = true}
Flask-Cors = {version = "^3.0.10", optional = true}
uWSGI = {version = "^2.0.21", optional = true}
Jinja2 = {version = ">2.11.3", optional = true}
asyncio = "^3.4.3"
pytest-asyncio = "^0.23.7"

[tool.poetry.extras]
api = ["connexion", "Flask", "Flask-Cors", "Jinja2", "pyopenssl"]
Expand Down
142 changes: 112 additions & 30 deletions schematic/store/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import secrets
linglp marked this conversation as resolved.
Show resolved Hide resolved
import shutil
import synapseclient
from synapseclient.api import get_entity_id_bundle2
import uuid # used to generate unique names for entities

from tenacity import (
Expand All @@ -23,7 +24,7 @@
from time import sleep

# allows specifying explicit variable types
from typing import Dict, List, Tuple, Sequence, Union, Optional
from typing import Dict, List, Tuple, Sequence, Union, Optional, Any

from synapseclient import (
Synapse,
Expand Down Expand Up @@ -68,6 +69,9 @@
from schematic.store.base import BaseStorage
from schematic.exceptions import AccessCredentialsError
from schematic.configuration.configuration import CONFIG
from synapseclient.models.annotations import Annotations
import asyncio
from dataclasses import asdict

logger = logging.getLogger("Synapse storage")

Expand Down Expand Up @@ -700,7 +704,6 @@ def fill_in_entity_id_filename(
new_files = self._get_file_entityIds(
dataset_files=dataset_files, only_new_files=True, manifest=manifest
)

# update manifest so that it contains new dataset files
new_files = pd.DataFrame(new_files)
manifest = (
Expand Down Expand Up @@ -1335,8 +1338,43 @@ def upload_manifest_file(

return manifest_synapse_file_id

async def get_async_annotation(self, synapse_id: str) -> Dict[str, Any]:
"""get annotations asynchronously

Args:
synapse_id (str): synapse id of the entity that the annotation belongs

Returns:
Dict[str, Any]: The requested entity bundle matching
<https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/entitybundle/v2/EntityBundle.html>
"""
return await get_entity_id_bundle2(
entity_id=synapse_id,
request={"includeAnnotations": True},
synapse_client=self.syn,
)

async def store_async_annotation(self, annotation_dict: dict) -> Annotations:
"""store annotation in an async way

Args:
annotation_dict (dict): annotation in a dictionary format

Returns:
Annotations: The stored annotations.
"""
annotation_data = Annotations.from_dict(
synapse_annotations=annotation_dict["annotations"]["annotations"]
)
annotation_class = Annotations(
annotations=annotation_data,
etag=annotation_dict["annotations"]["etag"],
id=annotation_dict["annotations"]["id"],
)
return await annotation_class.store_async(self.syn)
linglp marked this conversation as resolved.
Show resolved Hide resolved

@missing_entity_handler
def format_row_annotations(
async def format_row_annotations(
self, dmge, row, entityId: str, hideBlanks: bool, annotation_keys: str
):
# prepare metadata for Synapse storage (resolve display name into a name that Synapse annotations support (e.g no spaces, parenthesis)
Expand Down Expand Up @@ -1368,7 +1406,8 @@ def format_row_annotations(

metadataSyn[keySyn] = v
# set annotation(s) for the various objects/items in a dataset on Synapse
annos = self.syn.get_annotations(entityId)
annos = await self.get_async_annotation(entityId)

csv_list_regex = comma_separated_list_regex()
for anno_k, anno_v in metadataSyn.items():
# Remove keys with nan or empty string values from dict of annotations to be uploaded
Expand Down Expand Up @@ -1653,7 +1692,37 @@ def _create_entity_id(self, idx, row, manifest, datasetId):
manifest.loc[idx, "entityId"] = entityId
return manifest, entityId

def add_annotations_to_entities_files(
async def _store_annos(self, requests):
linglp marked this conversation as resolved.
Show resolved Hide resolved
while requests:
done_tasks, pending_tasks = await asyncio.wait(
requests, return_when=asyncio.FIRST_COMPLETED
)
requests = pending_tasks

for completed_task in done_tasks:
try:
annos = completed_task.result()

if isinstance(annos, Annotations):
annos_dict = asdict(annos)
entity_id = annos_dict["id"]
logger.info(f"Successfully stored annotations for {entity_id}")
else:
# remove special characters in annotations
entity_id = annos["EntityId"]
logger.info(
f"Obtained and processed annotations for {entity_id} entity"
)
if annos:
requests.add(
asyncio.create_task(
self.store_async_annotation(annotation_dict=annos)
)
)
except Exception as e:
raise RuntimeError(f"failed with { repr(e) }.") from e

async def add_annotations_to_entities_files(
self,
dmge,
manifest,
Expand Down Expand Up @@ -1694,6 +1763,7 @@ def add_annotations_to_entities_files(
).drop("entityId_x", axis=1)

# Fill `entityId` for each row if missing and annotate entity as appropriate
requests = set()
for idx, row in manifest.iterrows():
if not row["entityId"] and (
manifest_record_type == "file_and_entities"
Expand All @@ -1713,8 +1783,14 @@ def add_annotations_to_entities_files(

# Adding annotations to connected files.
if entityId:
self._add_annotations(dmge, row, entityId, hideBlanks, annotation_keys)
logger.info(f"Added annotations to entity: {entityId}")
# Format annotations for Synapse
annos_task = asyncio.create_task(
self.format_row_annotations(
dmge, row, entityId, hideBlanks, annotation_keys
)
)
requests.add(annos_task)
await self._store_annos(requests)
return manifest

def upload_manifest_as_table(
Expand Down Expand Up @@ -1767,14 +1843,16 @@ def upload_manifest_as_table(
)

if file_annotations_upload:
manifest = self.add_annotations_to_entities_files(
dmge,
manifest,
manifest_record_type,
datasetId,
hideBlanks,
manifest_synapse_table_id,
annotation_keys,
manifest = asyncio.run(
self.add_annotations_to_entities_files(
dmge,
manifest,
manifest_record_type,
datasetId,
hideBlanks,
manifest_synapse_table_id,
annotation_keys,
)
)
# Load manifest to synapse as a CSV File
manifest_synapse_file_id = self.upload_manifest_file(
Expand Down Expand Up @@ -1840,13 +1918,15 @@ def upload_manifest_as_csv(
manifest_synapse_file_id (str): SynID of manifest csv uploaded to synapse.
"""
if file_annotations_upload:
manifest = self.add_annotations_to_entities_files(
dmge,
manifest,
manifest_record_type,
datasetId,
hideBlanks,
annotation_keys=annotation_keys,
manifest = asyncio.run(
self.add_annotations_to_entities_files(
dmge,
manifest,
manifest_record_type,
datasetId,
hideBlanks,
annotation_keys=annotation_keys,
)
)

# Load manifest to synapse as a CSV File
Expand Down Expand Up @@ -1917,14 +1997,16 @@ def upload_manifest_combo(
)

if file_annotations_upload:
manifest = self.add_annotations_to_entities_files(
dmge,
manifest,
manifest_record_type,
datasetId,
hideBlanks,
manifest_synapse_table_id,
annotation_keys=annotation_keys,
manifest = asyncio.run(
linglp marked this conversation as resolved.
Show resolved Hide resolved
self.add_annotations_to_entities_files(
dmge,
manifest,
manifest_record_type,
datasetId,
hideBlanks,
manifest_synapse_table_id,
annotation_keys=annotation_keys,
)
)

# Load manifest to synapse as a CSV File
Expand Down
Loading
Loading