Skip to content

Commit

Permalink
live testing dev
Browse files Browse the repository at this point in the history
  • Loading branch information
kedhammar committed Dec 20, 2024
1 parent f3381bb commit ff412b0
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 24 deletions.
43 changes: 32 additions & 11 deletions scilifelab_epps/utils/udf_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,36 @@ def list_udfs(art: Artifact) -> list:

def fetch_last(
currentStep: Process,
art_tuple: tuple,
target_udfs: str | list,
art_tuple: tuple = None,
art: Artifact = None,
use_current=True,
print_history=False,
on_fail=AssertionError,
):
"""Recursively look for target UDF.
Target UDF can be supplied as a string, or as a prioritized list of strings.
Arguments:
- "art_tuple": step input-output tuple. Mutually exclusive use with "art".
- "art": step artifact, either input or output. Mutually exclusive use with "art_tuple".
- "target_udfs": can be supplied as a string, or as a
prioritized list of strings.
If "print_history" == True, will return both the target metric and the lookup history as a string.
- "use_current": if true, will return the target metric
if found in the current step.
- "print_history": if true, will return both the target
metric and the lookup history as a string.
"""

assert art_tuple or art, "One of function args 'art_tuple' and 'art' are required."
assert not (
art_tuple and art
), "Function args 'art_tuple' and 'art' are mutually exclusive."

# Convert to list, to enable iteration
if isinstance(target_udfs, str):
target_udfs = [target_udfs]
Expand All @@ -158,15 +175,19 @@ def fetch_last(
while True:
history.append({"Step name": currentStep.type.name, "Step ID": currentStep.id})

# Try to grab input and output articles, if possible
try:
input_art = art_tuple[0]["uri"]
except:
input_art = None
try:
output_art = art_tuple[1]["uri"]
except:
if len(history) == 1 and not art_tuple:
# Handle the case of having an art instead of an art_tuple in the original step
input_art = art
output_art = None
else:
try:
input_art = art_tuple[0]["uri"]
except:
input_art = None
try:
output_art = art_tuple[1]["uri"]
except:
output_art = None

if len(history) == 1 and use_current is not True:
# If we are in the original step and "use_current" is false, skip
Expand Down
43 changes: 30 additions & 13 deletions scripts/fetch_last_known_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import datetime as dt

from genologics.config import BASEURI, PASSWORD, USERNAME
from genologics.entities import Process
from genologics.entities import Artifact, Process
from genologics.lims import Lims

from scilifelab_epps.utils import udf_tools
Expand All @@ -30,41 +30,58 @@ def main(args):
lims = Lims(BASEURI, USERNAME, PASSWORD)
process = Process(lims, id=args.pid)

# Get the target UDF from the step field
target_udf = process.udf.get(args.step_udf, None)
if target_udf is None or target_udf == "None":
logging.error(f"No target UDF supplied from step field '{args.step_udf}'")
assert (
target_udf is not None or target_udf != "None"
), f"No target UDF supplied from step field '{args.step_udf}'"

no_outputs = udf_tools.no_outputs(process)
# Check whether process has output artifacts, not the case for e.g. QC steps
no_outputs: bool = udf_tools.no_outputs(process)

# Load input artifacts
arts_in: list[Artifact] = [
art for art in process.all_inputs() if art.type == "Analyte"
]

# Find target output artifacts, if any
if no_outputs:
logging.info("Step has no output artifacts. Assigning to input artifact.")
else:
art_tuples: list[tuple[dict]] = process.input_output_maps
art_in2out: dict[Process.Artifact : Process.Artifact] = {
i["uri"]: o["uri"]
for i, o in art_tuples
if i["uri"].type == "Analyte" and o["uri"].type == "Analyte"
}

# TODO need to tweak this script and possible the traceback function to handle both
# TODO aggregate QC and regular steps
art_tuples = udf_tools.get_art_tuples(process) # TODO this returns []
for art_tuple in art_tuples:
target_artifact = art_tuple[0]["uri"] if no_outputs else art_tuple[1]["uri"]
for art_in in arts_in:
if no_outputs:
target_artifact = art_in
else:
target_artifact = art_in2out[art_in]
logging.info(
f"Looking for last recorded UDF '{target_udf}' of sample '{target_artifact.name}'..."
f"Looking for last recorded UDF '{target_udf}' of {'input' if no_outputs else 'output'} artifact '{target_artifact.name}'..."
)
udf_value, udf_history = udf_tools.fetch_last(
currentStep=process,
art_tuple=art_tuple,
art=art_in,
target_udfs=target_udf,
use_current=False,
print_history=True,
on_fail=None,
)
if udf_value:
logging.info(f"Found target UDF '{target_udf}' with value '{udf_value}'")
logging.info(f"Traceback:\n{udf_history}")
target_artifact.udf[target_udf] = udf_value
target_artifact.put()
logging.info(
f"Updated UDF '{target_udf}' for '{art_tuple[1]['uri'].name}' to '{udf_value}'"
f"Updated UDF '{target_udf}' for '{art_in.name}' to '{udf_value}'"
)
else:
logging.warning(
f"Could not traceback UDF '{target_udf}' for '{art_tuple[1]['uri'].name}'"
f"Could not traceback UDF '{target_udf}' for '{art_in.name}'"
)
logging.info(f"Traceback:\n{udf_history}")

Expand Down

0 comments on commit ff412b0

Please sign in to comment.