Skip to content

Commit

Permalink
Merge commit '5ffa276f31a1c257ec82dad83ffe25fa5d92e231' into aggregat…
Browse files Browse the repository at this point in the history
…e-reads
  • Loading branch information
kedhammar committed Jan 22, 2025
2 parents bef3088 + 5ffa276 commit 42ad826
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 148 deletions.
10 changes: 9 additions & 1 deletion VERSIONLOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
# Scilifelab_epps Version Log

## 20250117.1
## 20250122.3

Renovate reads aggregation EPP and include ONT / AVITI.

## 20250122.2

Rebuild EPP to fetch last recorded derived sample UDF.

## 20250122.1

Create yearly dir for AVITI run manifests.

## 20250116.1

Ruff 0.9.2 formatting.
Expand Down
32 changes: 9 additions & 23 deletions scilifelab_epps/calc_from_args/udf_arg_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
from typing import Any

import yaml
from genologics.entities import Artifact, Process

from scilifelab_epps.utils import udf_tools
Expand All @@ -23,7 +22,6 @@ def fetch_from_arg(
"""

history: str | None = None
source: Artifact | Process
source_name: str

Expand All @@ -47,19 +45,11 @@ def fetch_from_arg(
value = process.udf[arg_dict["udf"]]
else:
if arg_dict["recursive"]:
# Fetch UDF recursively, back-tracking the input-output tuple
if arg_dict["source"] == "input":
use_current = False
else:
assert arg_dict["source"] == "output"
use_current = True

value, history = udf_tools.fetch_last(
currentStep=process,
art_tuple=art_tuple,
# Fetch UDF recursively

value = udf_tools.fetch_last(
target_art=source,
target_udfs=arg_dict["udf"],
use_current=use_current,
print_history=True,
)
else:
# Fetch UDF from input or output artifact
Expand All @@ -78,15 +68,11 @@ def fetch_from_arg(
else:
return on_fail

# Log what has been done
log_str = f"Fetched UDF '{arg_dict['udf']}': {value} from {arg_dict['source']} '{source_name}'."

if history:
history_yaml = yaml.load(history, Loader=yaml.FullLoader)
last_step_name = history_yaml[-1]["Step name"]
last_step_id = history_yaml[-1]["Step ID"]
log_str += f"\n\tUDF recusively fetched from step: '{last_step_name}' (ID: '{last_step_id}')"

log_str = (
f"Fetched UDF '{arg_dict['udf']}': {value}"
+ f"{' (recursive)' if arg_dict['recursive'] else ''}"
+ f" from {arg_dict['source']} '{source_name}'."
)
logging.info(log_str)

return value
Expand Down
266 changes: 149 additions & 117 deletions scilifelab_epps/utils/udf_tools.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
from typing import Union
import logging
import xml.etree.ElementTree as ET
from typing import Any, Union

from genologics.entities import Artifact, Process
from requests.exceptions import HTTPError
Expand All @@ -9,6 +11,32 @@
"""


def process_has_udfs(process: Process, target_udfs: list[str]) -> list[str]:
"""Check whether any target UDFs are present in the sample fields of the process associated type.
This function is necessary because a non-required sample UDF left blank will not be detected in the artifact object.
Returns a list of found UDFs, or an empty list if none were found.
"""

# Get the raw xml of the process associated type
raw_xml = process.type.xml()

# Parse as tree object
root = ET.fromstring(raw_xml)

# Instantiate return object
target_udfs_found = []

# Check whether the target UDF is present in the sample fields
for sample_field in root.iter("sample-field"):
for target_udf in target_udfs:
if sample_field.attrib["name"] == target_udf:
target_udfs_found.append(target_udf)

return target_udfs_found


def put(target: Artifact | Process, target_udf: str, val, on_fail=AssertionError):
"""Try to put UDF on artifact or process, optionally without causing fatal error.
Evaluates true on success and error (default) or on_fail param on failure.
Expand Down Expand Up @@ -39,22 +67,6 @@ def is_filled(art: Artifact, target_udf: str) -> bool:
return False


def no_outputs(currentStep: Process) -> bool:
"""Check whether step has outputs or not"""

art_tuples = get_art_tuples(currentStep)

if art_tuples:
none_outputs = [t[1] is None for t in art_tuples]

if all(none_outputs):
return True
else:
return False
else:
return True


def get_art_tuples(currentStep: Process) -> list:
"""Return I/O tuples whose elements are either
1) both analytes
Expand Down Expand Up @@ -135,125 +147,145 @@ def list_udfs(art: Artifact) -> list:


def fetch_last(
currentStep: Process,
art_tuple: tuple,
target_art: Artifact,
target_udfs: str | list,
use_current=True,
print_history=False,
include_current=False,
log_traceback=False,
return_traceback=False,
on_fail=AssertionError,
):
) -> Any | tuple[Any, dict]:
"""Recursively look for target UDF.
Target UDF can be supplied as a string, or as a prioritized list of strings.
Arguments:
target_art Artifact to traceback.
If "print_history" == True, will return both the target metric and the lookup history as a string.
target_udfs The UDF(s) to look for. Can be supplied as a string, or as a prioritized
list of strings.
include_current If True, will pull target UDFs if found in the target artifact.
log_traceback If True, will log the full traceback.
return_traceback If True, will additionally return the traceback as a dict.
on_fail If this is a subclass of Exception, will raise this exception on failure.
If not, will return this value on failure instead of the UDF value.
"""

# Convert to list, to enable iteration
if isinstance(target_udfs, str):
target_udfs = [target_udfs]

history = []

while True:
history.append({"Step name": currentStep.type.name, "Step ID": currentStep.id})

# Try to grab input and output articles, if possible
try:
input_art = art_tuple[0]["uri"]
except:
input_art = None
try:
output_art = art_tuple[1]["uri"]
except:
output_art = None
# Instantiate traceback
traceback = []
steps_visited = []

if len(history) == 1 and use_current is not True:
# If we are in the original step and "use_current" is false, skip
pass
else:
# Look trough outputs
if output_art:
history[-1].update(
{
"Derived sample ID": output_art.id,
"Derived sample UDFs": dict(output_art.udf.items()),
# Instantiate recursion variables
current_art = target_art
n = 1
try:
# Start recursive search
while True:
# Dynamically reassign parent process
pp = current_art.parent_process

# Keep track of visited parent processes
if pp is not None:
steps_visited.append(f"'{pp.type.name}' ({pp.id})")
target_udfs_in_parent_process = process_has_udfs(pp, target_udfs)

traceback.append(
{
"Artifact": {
"Name": current_art.name,
"ID": current_art.id,
"UDFs": dict(current_art.udf.items()),
"Parent Step": {
"Name": pp.type.name if pp else None,
"ID": pp.id if pp else None,
},
}
)
}
)

for target_udf in target_udfs:
if target_udf in list_udfs(output_art):
if print_history is True:
return output_art.udf[target_udf], json.dumps(
history, indent=2
# Search for correct UDF
for target_udf in target_udfs:
if target_udf in list_udfs(current_art):
if include_current is not True and n == 1:
logging.info(
"Target UDF was found in specified artifact, but include_current is set to False. Skipping."
)
else:
if log_traceback is True:
logging.info(
f"Traceback:\n{json.dumps(traceback, indent=2)}"
)
logging.info(
f"Found target UDF '{target_udf}'"
+ f" with value '{current_art.udf[target_udf]}'"
+ f" in process {steps_visited[-1]}"
+ f" {'output' if pp else 'input'}"
+ f" artifact '{current_art.name}' ({current_art.id})"
)

if return_traceback:
return current_art.udf[target_udf], traceback
else:
return output_art.udf[target_udf]

# Look through inputs
if input_art:
if input_art.parent_process:
history[-1].update(
{
"Input sample parent step name": input_art.parent_process.type.name,
"Input sample parent step ID": input_art.parent_process.id,
}
)
history[-1].update(
{
"Input sample ID": input_art.id,
"Input sample UDFs": dict(input_art.udf.items()),
}
return current_art.udf[target_udf]

# Address the case that no target UDFs were found on the artifact, even though they were present in the parent process
if pp is not None and target_udfs_in_parent_process != []:
logging.warning(
f"Parent process '{pp.type.name}' ({pp.id})"
+ f" has target UDF(s) {target_udfs_in_parent_process},"
+ f" but it's not filled in for artifact '{current_art.name}' ({current_art.id})."
+ " Please double check that you haven't missed filling it in."
)
for target_udf in target_udfs:
if target_udf in list_udfs(input_art):
if print_history is True:
return input_art.udf[target_udf], json.dumps(
history, indent=2
)
else:
return input_art.udf[target_udf]

# Cycle to previous step, if possible
try:
pp = input_art.parent_process
assert pp is not None

pp_tuples = get_art_tuples(pp)
matching_tuples = []
for pp_tuple in pp_tuples:
try:
pp_input = pp_tuple[0]["uri"]
except:
pp_input = None
try:
pp_output = pp_tuple[1]["uri"]
except:
pp_output = None

if (pp_input and pp_input.id == input_art.id) or (
pp_output and pp_output.id == input_art.id
):
matching_tuples.append(pp_tuple)

assert len(matching_tuples) == 1, (
"Target artifact matches multiple inputs/outputs in previous step."
)
# Stop traceback if no parent process is found
if pp is None:
raise AssertionError(
f"Artifact '{current_art.name}' ({current_art.id}) has no parent process linked and can't be traced back further."
)

pp_art_tuples = get_art_tuples(pp)

# Back-tracking successful, re-assign variables to represent previous step
currentStep = pp
art_tuple = matching_tuples[0]
# If parent process has valid input-output tuples, use for linkage
linked_input_arts = []
if pp_art_tuples != []:
for pp_tuple in pp_art_tuples:
if pp_tuple[1]["uri"].id == current_art.id:
linked_input_arts.append(pp_tuple[0]["uri"])
else:
raise NotImplementedError(
"Parent process has no valid input-output links, traceback can't continue."
)

except AssertionError:
if isinstance(on_fail, type) and issubclass(on_fail, Exception):
if print_history is True:
print(json.dumps(history, indent=2))
raise on_fail(
f"Could not find matching UDF(s) [{', '.join(target_udfs)}] for artifact tuple {art_tuple}"
if len(linked_input_arts) == 1:
# Dynamically reassign current artifact
current_art = linked_input_arts[0]
elif len(linked_input_arts) > 1:
raise AssertionError(
"Parent process has multiple input artifacts linked to the same output artifact, can't traceback."
)
else:
if print_history is True:
print(json.dumps(history, indent=2))
return on_fail, json.dumps(history, indent=2)
else:
return on_fail
raise AssertionError(
"Parent process has no input artifacts linked to the output artifact, can't traceback."
)

n += 1

except AssertionError:
if isinstance(on_fail, type) and issubclass(on_fail, Exception):
raise on_fail(
f"Could not find matching UDF(s) [{', '.join(target_udfs)}] for artifact '{target_art.name}' ({target_art.id})"
)
else:
logging.warning(
f"Failed traceback for artifact '{target_art.name}' ({target_art.id}), falling back to value '{on_fail}'"
)
if return_traceback:
return on_fail, traceback
else:
return on_fail
Loading

0 comments on commit 42ad826

Please sign in to comment.