diff --git a/scripts/locate-data-objects b/scripts/locate-data-objects index c4dd72c9..0c211d43 100755 --- a/scripts/locate-data-objects +++ b/scripts/locate-data-objects @@ -26,7 +26,7 @@ import structlog from partisan.irods import AVU, query_metadata from sqlalchemy.orm import Session -from npg_irods import illumina +from npg_irods import illumina, ont from npg_irods.cli import add_logging_arguments, configure_logging, parse_iso_date from npg_irods.db import DBConfig from npg_irods.db.mlwh import find_consent_withdrawn_samples @@ -110,16 +110,15 @@ cwdr_parser.set_defaults(func=consent_withdrawn) ilup_parser = subparsers.add_parser( "illumina-updates", - help="Find data objects which are the components of Illumina runs, whose tracking " + help="Find data objects, which are components of Illumina runs, whose tracking " "metadata in the ML warehouse have changed since a specified time.", ) ilup_parser.add_argument( "--begin-date", "--begin_date", - help="Limit data object found to those changed after this date. " - "Defaults to 14 days ago. " - "The argument must be an ISO8601 UTC date or date and time e.g. 2022-01-30, " - "2022-01-30T11:11:03Z", + help="Limit data objects found to those changed after this date. Defaults to 14 " + "days ago. The argument must be an ISO8601 UTC date or date and time e.g. " + "2022-01-30, 2022-01-30T11:11:03Z", type=parse_iso_date, default=datetime.now(timezone.utc) - timedelta(days=14), ) @@ -139,15 +138,14 @@ def illumina_updates(cli_args): log.info("Finding data objects", item=i, component=c, since=iso_date) try: - for obj in query_metadata( + avus = [ AVU(Instrument.RUN, c.id_run), AVU(Instrument.LANE, c.position), AVU(SeqConcept.TAG_INDEX, c.tag_index), - data_object=True, - collection=False, - zone=cli_args.zone, - ): + ] + for obj in query_metadata(*avus, collection=False, zone=cli_args.zone): print(obj) + except Exception as e: num_errors += 1 log.error(e, item=i, component=c) @@ -159,6 +157,62 @@ def illumina_updates(cli_args): ilup_parser.set_defaults(func=illumina_updates) + +ontup_parser = subparsers.add_parser( + "ont-updates", + help="Find data objects, which are components of ONT runs, whose tracking metadata " + "in the ML warehouse have changed since a specified time.", +) +ontup_parser.add_argument( + "--begin-date", + "--begin_date", + help="Limit data objects found to those changed after this date. Defaults to 14 " + "days ago. The argument must be an ISO8601 UTC date or date and time e.g. " + "2022-01-30, 2022-01-30T11:11:03Z", + type=parse_iso_date, + default=datetime.now(timezone.utc) - timedelta(days=14), +) + + +def ont_updates(cli_args): + dbconfig = DBConfig.from_file(cli_args.database_config.name, "mlwh_ro") + engine = sqlalchemy.create_engine(dbconfig.url) + with Session(engine) as session: + num_processed = num_errors = 0 + iso_date = cli_args.begin_date.strftime("%Y-%m-%dT%H:%M:%SZ") + + for i, c in enumerate( + ont.find_components_changed( + session, include_tags=False, since=cli_args.begin_date + ) + ): + num_processed += 1 + log.info("Finding collections", item=i, component=c, since=iso_date) + + try: + avus = [ + AVU(ont.Instrument.EXPERIMENT_NAME, c.experiment_name), + AVU(ont.Instrument.INSTRUMENT_SLOT, c.instrument_slot), + ] + if c.tag_identifier is not None: + avus.append(AVU(ont.Instrument.TAG_IDENTIFIER, c.tag_identifier)) + + for coll in query_metadata( + *avus, data_object=False, zone=cli_args.zone + ): + print(coll) + + except Exception as e: + num_errors += 1 + log.error(e, item=i, component=c) + + log.info(f"Processed {num_processed} with {num_errors} errors") + if num_errors: + exit(1) + + +ontup_parser.set_defaults(func=ont_updates) + args = parser.parse_args() configure_logging( config_file=args.log_config, diff --git a/scripts/update-ont-metadata b/scripts/update-ont-metadata index 90788b21..9bb80308 100755 --- a/scripts/update-ont-metadata +++ b/scripts/update-ont-metadata @@ -27,7 +27,7 @@ from sqlalchemy.orm import Session from npg_irods.cli import add_logging_arguments, configure_logging, parse_iso_date from npg_irods.db import DBConfig -from npg_irods.ont import MetadataUpdate +from npg_irods.ont import update_metadata from npg_irods.version import version description = """ @@ -94,9 +94,8 @@ def main(): engine = sqlalchemy.create_engine(dbconfig.url) with Session(engine) as session: - mu = MetadataUpdate(zone=args.zone) - num_processed, num_updated, num_errors = mu.update_secondary_metadata( - session, since=args.begin_date + num_processed, num_updated, num_errors = update_metadata( + session, since=args.begin_date, zone=args.zone ) if num_errors: diff --git a/src/npg_irods/common.py b/src/npg_irods/common.py index 89ace74f..64ef8026 100644 --- a/src/npg_irods/common.py +++ b/src/npg_irods/common.py @@ -22,10 +22,15 @@ import re from enum import Enum, unique from os import PathLike +from pathlib import PurePath from typing import Tuple +from partisan.irods import AC, AVU, Collection, DataObject, Permission from structlog import get_logger +from npg_irods.metadata.lims import has_mixed_ownership, is_managed_access + + log = get_logger(__package__) @@ -194,3 +199,88 @@ def infer_data_source(path: PathLike | str) -> Tuple[Platform, AnalysisType]: return Platform.ULTIMA_GENOMICS, AnalysisType.NUCLEIC_ACID_SEQUENCING raise ValueError(f"Failed to infer a data source for iRODS path '{path}'") + + +def do_metadata_update(item: Collection | DataObject, avus: list[AVU]) -> bool: + """Update metadata on an iRODS path, removing existing metadata and replacing with + the given AVUs and adding history of changes. + + Args: + item: iRODS path to update. + avus: Metadata to apply. + + Returns: + True if any changes were made, False if the desired metadata were already + present. + """ + log.info("Updating metadata", path=item, meta=avus) + num_removed, num_added = item.supersede_metadata(*avus, history=True) + log.info( + "Updated metadata", + path=item, + meta=avus, + num_added=num_added, + num_removed=num_removed, + ) + return num_removed > 0 or num_added > 0 + + +def do_permissions_update( + item: Collection | DataObject, acl: list[AC], recurse=False +) -> bool: + """Update permissions on an iRODS path, removing existing permissions and replacing + with the given ACL. If the ACL contains multiple, conflicting, managed permissions + then it will issue a warning and revoke access. + + Args: + item: iRODS path to update. + acl: ACL to apply. + recurse: If True, recursively apply the ACL. + + Returns: + True if changes any changes were made, False if the ACL(s) of the target path(s) + were already in the desired state. This applies recursively, so to return False for + a recursive operation, neither the target path, nor any contained path will have + been updated. + """ + if item.rods_type == DataObject and recurse: + raise ValueError( + f"Cannot recursively update permissions on a data object: {item}" + ) + + if has_mixed_ownership(acl): + log.warn("Mixed-study data", path=item, acl=acl) + for ac in acl: + if is_managed_access(ac): + ac.perm = Permission.NULL + + keep = [ac for ac in item.permissions() if not is_managed_access(ac)] + + log.info("Updating permissions", path=item, keep=keep, acl=acl) + + kwargs = {"recurse": recurse} if recurse else {} + num_removed, num_added = item.supersede_permissions(*keep, *acl, **kwargs) + log.info( + "Updated permissions", + path=item, + keep=keep, + acl=acl, + num_added=num_added, + num_removed=num_removed, + ) + return num_removed > 0 or num_added > 0 + + +def infer_zone(path: Collection | DataObject) -> str: + """Infer the iRODS zone from an iRODS path. + + Args: + path: An absolute iRODS path. + + Returns: + The zone. + """ + parts = PurePath(path).parts + if len(parts) < 2: + raise ValueError(f"Invalid iRODS path {path}; no zone component") + return parts[1] diff --git a/src/npg_irods/illumina.py b/src/npg_irods/illumina.py index be45aea2..6031bfe3 100644 --- a/src/npg_irods/illumina.py +++ b/src/npg_irods/illumina.py @@ -21,21 +21,19 @@ from dataclasses import dataclass from datetime import datetime from enum import Enum, unique -from pathlib import PurePath from typing import Iterator, Optional, Type -from partisan.irods import AVU, Collection, DataObject, Permission +from partisan.irods import AVU, Collection, DataObject from sqlalchemy import asc from sqlalchemy.orm import Session from structlog import get_logger +from npg_irods.common import do_metadata_update, do_permissions_update, infer_zone from npg_irods.db.mlwh import IseqFlowcell, IseqProductMetrics, Sample, Study from npg_irods.metadata.common import SeqConcept, SeqSubset from npg_irods.metadata.illumina import Instrument from npg_irods.metadata.lims import ( ensure_consent_withdrawn, - has_mixed_ownership, - is_managed_access, make_sample_acl, make_sample_metadata, make_study_metadata, @@ -143,7 +141,7 @@ def __repr__(self): def ensure_secondary_metadata_updated( - item: Collection | DataObject, mlwh_session, include_controls=False, zone=None + item: Collection | DataObject, mlwh_session, include_controls=False ) -> bool: """Update iRODS secondary metadata and permissions on Illumina run collections and data objects. @@ -187,24 +185,18 @@ def ensure_secondary_metadata_updated( mlwh_session: An open SQL session. include_controls: If True, include any control samples in the metadata and permissions. - zone: The iRODS zone for any permissions created. Returns: True if updated. """ - if zone is None: - parts = PurePath(item).parts - if len(parts) < 2: - raise ValueError(f"Invalid iRODS path {item}; no zone component") - zone = parts[1] - + zone = infer_zone(item) updated = False secondary_metadata, acl = [], [] components = [ Component.from_avu(avu) for avu in item.metadata(attr=SeqConcept.COMPONENT.value) - ] + ] # Illumina specific for c in components: for fc in find_flowcells_by_component( mlwh_session, c, include_controls=include_controls @@ -213,41 +205,12 @@ def ensure_secondary_metadata_updated( secondary_metadata.extend(make_study_metadata(fc.study)) acl.extend(make_sample_acl(fc.sample, fc.study, zone=zone)) - log.info("Updating metadata", path=item, meta=secondary_metadata) - num_removed, num_added = item.supersede_metadata(*secondary_metadata, history=True) - log.info( - "Updated metadata", - path=item, - meta=secondary_metadata, - num_added=num_added, - num_removed=num_removed, - ) - if num_removed or num_added: - updated = True + updated = True if do_metadata_update(item, secondary_metadata) else updated - if any(c.contains_nonconsented_human() for c in components): + if any(c.contains_nonconsented_human() for c in components): # Illumina specific updated = True if ensure_consent_withdrawn(item) else updated else: - if has_mixed_ownership(acl): - log.warn("Mixed-study data", path=item, acl=acl) - for ac in acl: - if is_managed_access(ac): - ac.perm = Permission.NULL - - keep = [ac for ac in item.permissions() if not is_managed_access(ac)] - - log.info("Updating permissions", path=item, keep=keep, acl=acl) - num_removed, num_added = item.supersede_permissions(*keep, *acl) - log.info( - "Updated permissions", - path=item, - keep=keep, - acl=acl, - num_added=num_added, - num_removed=num_removed, - ) - if num_removed or num_added: - updated = True + updated = True if do_permissions_update(item, acl) else updated return updated @@ -320,10 +283,10 @@ def find_components_changed(sess: Session, since: datetime) -> Iterator[Componen .join(IseqFlowcell.study) .join(IseqFlowcell.iseq_product_metrics) .filter( - (Sample.recorded_at > since) - | (Study.recorded_at > since) - | (IseqFlowcell.recorded_at > since) - | (IseqProductMetrics.last_changed > since) + (Sample.recorded_at >= since) + | (Study.recorded_at >= since) + | (IseqFlowcell.recorded_at >= since) + | (IseqProductMetrics.last_changed >= since) ) .order_by(asc(IseqFlowcell.id_iseq_flowcell_tmp)) ): diff --git a/src/npg_irods/metadata/ont.py b/src/npg_irods/metadata/ont.py index e2f9f374..0c53445e 100644 --- a/src/npg_irods/metadata/ont.py +++ b/src/npg_irods/metadata/ont.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright © 2021, 2022 Genome Research Ltd. All rights reserved. +# Copyright © 2021, 2022, 2023 Genome Research Ltd. All rights reserved. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -28,6 +28,7 @@ class Instrument(AsValueEnum, metaclass=with_namespace("ont")): EXPERIMENT_NAME = "experiment_name" INSTRUMENT_SLOT = "instrument_slot" + TAG_IDENTIFIER = "tag_identifier" def __repr__(self): return f"{Instrument.namespace}:{self.value}" diff --git a/src/npg_irods/ont.py b/src/npg_irods/ont.py index 8b8f56f3..0603c175 100644 --- a/src/npg_irods/ont.py +++ b/src/npg_irods/ont.py @@ -23,7 +23,7 @@ from dataclasses import dataclass from datetime import datetime from os import PathLike -from typing import Optional, Type +from typing import Iterator, Optional, Type from partisan.exception import RodsError from partisan.irods import AVU, Collection, DataObject, query_metadata @@ -31,8 +31,9 @@ from sqlalchemy.orm import Session from structlog import get_logger -from npg_irods.db.mlwh import OseqFlowcell -from npg_irods.metadata.common import SeqConcept, SeqSubset +from npg_irods.common import do_metadata_update, do_permissions_update, infer_zone +from npg_irods.db.mlwh import OseqFlowcell, Sample, Study +from npg_irods.metadata.common import SeqConcept from npg_irods.metadata.lims import ( is_managed_access, make_public_read_acl, @@ -56,174 +57,108 @@ @dataclass(order=True) class Component: - experiment_name: str - position: int - tag_index: Optional[int] - subset: Optional[SeqSubset] + """A set of reads from an ONT sequencing run.""" + experiment_name: str + """The experiment name recorded in MinKNOW.""" + instrument_slot: int + """The 1-based instrument slot index where the flowcell was run.""" -class MetadataUpdate: - """Performs updated on metadata of data objects and collections for ONT data in - iRODS.""" + tag_identifier: Optional[str] + """The tag identifier, if the reads are from a multiplexed pool.""" def __init__( - self, experiment_name: str = None, instrument_slot: int = None, zone: str = None + self, experiment_name: str, instrument_slot: int, tag_identifier: str = None ): - """Create a new metadata updater for the specified ONT run. - - Args: - experiment_name: The ONT experiment name. Optional; provide this to limit - the updates to only that experiment. - instrument_slot: The ONT instrument slot number. Optional; provide this to - limit the updates to only that slot. - zone: The iRODS zone where the data are located. Optional; provide this to - update on an iRODS zone other than local (i.e. on a federated zone) - """ - self.experiment_name = experiment_name self.instrument_slot = instrument_slot - self.zone = zone - - def update_secondary_metadata( - self, mlwh_session: Session, since: datetime = None - ) -> (int, int, int): - """Update iRODS secondary metadata on ONT run collections whose corresponding - ML warehouse records have been updated more recently than the specified time. - - Collections to update are identified by having ont:experiment_name and - ont:instrument_slot metadata already attached to them. This is done for example, - by the process which moves sequence data from the instrument into iRODS. + self.tag_identifier = tag_identifier - Args: - mlwh_session: An open SQL session. - since: A datetime. - Returns: - A tuple of the number of paths found, the number of paths whose metadata - was updated and the number of errors encountered. - """ - if since is None: - since = datetime.fromtimestamp(0) # Everything since the Epoch - - num_found, num_updated, num_errors = 0, 0, 0 - - try: - expt_slots = find_recent_expt_slot(mlwh_session, since=since) - except Exception as e: - num_errors += 1 - log.error(e) - return num_found, num_updated, num_errors - - for expt_name, slot in expt_slots: - expt_avu = AVU(Instrument.EXPERIMENT_NAME, expt_name) - slot_avu = AVU(Instrument.INSTRUMENT_SLOT, slot) - - try: - if self.experiment_name is not None: - if self.experiment_name != expt_name: - log.info( - "Skipping on experiment name", - expt_name=expt_name, - slot=slot, - ) - continue - if self.instrument_slot is not None: - if self.instrument_slot != slot: - log.info( - "Skipping on slot", - expt_name=expt_name, - slot=slot, - ) - continue - - log.info("Searching", expt_name=expt_name, slot=slot, zone=self.zone) - colls = query_metadata( - expt_avu, - slot_avu, - collection=True, - data_object=False, - zone=self.zone, - ) - - num_colls = len(colls) - num_found += num_colls - if num_colls: - log.info( - "Found collections", - expt_name=expt_name, - slot=slot, - num_coll=num_colls, - ) - else: - log.warn("Found no collections", expt_name=expt_name, slot=slot) - - for coll in colls: - try: - if annotate_results_collection( - coll, expt_name, slot, mlwh_session - ): - log.info( - "Updated", expt_name=expt_name, slot=slot, path=coll - ) - num_updated += 1 - else: - num_errors += 1 - except RodsError as re1: - log.error(re1.message, code=re1.code) - num_errors += 1 +def update_metadata( + mlwh_session: Session, + experiment_name=None, + instrument_slot=None, + since: datetime = None, + zone=None, +) -> (int, int, int): + """Update iRODS metadata on ONT run collections whose corresponding ML warehouse + records have been updated at, or more recently than, the specified time. + + Collections to update are identified by having ont:experiment_name and + ont:instrument_slot metadata already attached to them. This is done for example, + by the process which moves sequence data from the instrument into iRODS. - except RodsError as re2: - log.error(re2.message, code=re2.code) - num_errors += 1 - except Exception as e: - log.error(e) - num_errors += 1 + Args: + mlwh_session: An open SQL session. + experiment_name: Limit updates to this experiment. Optional. + instrument_slot: Limit updates to this instrument slot. Optional, requires + an experiment_name to be supplied. + since: A datetime. Limit updates to experiments changed at this time or later. + zone: The iRODS zone to search for metadata to update. - return num_found, num_updated, num_errors + Returns: + A tuple of the number of paths found, the number of paths whose metadata + was updated and the number of errors encountered. + """ + if since is None: + since = datetime.fromtimestamp(0) # Everything since the Epoch - def __str__(self): - return ( - f"" + if experiment_name is None and instrument_slot is not None: + raise ValueError( + f"An instrument_slot {instrument_slot} was supplied " + "without an experiment_name" ) + num_found, num_updated, num_errors = 0, 0, 0 -def tag_index_from_id(tag_identifier: str) -> int: - """Return the barcode tag index given a barcode tag identifier. - - Returns: int - """ - match = TAG_IDENTIFIER_REGEX.search(tag_identifier) - if match: - return int(match.group(TAG_IDENTIFIER_GROUP)) + for i, c in enumerate( + find_components_changed(mlwh_session, include_tags=False, since=since) + ): + if experiment_name is not None and c.experiment_name != experiment_name: + continue + if instrument_slot is not None and c.instrument_slot != instrument_slot: + continue - raise ValueError( - f"Invalid ONT tag identifier '{tag_identifier}'. " - f"Expected a value matching {TAG_IDENTIFIER_REGEX}" - ) + avus = [ + AVU(Instrument.EXPERIMENT_NAME, c.experiment_name), + AVU(Instrument.INSTRUMENT_SLOT, c.instrument_slot), + ] + try: + log.info("Searching", item=i, comp=c, zone=zone) + colls = query_metadata(*avus, data_object=False, zone=zone) -def barcode_name_from_id(tag_identifier: str) -> str: - """Return the barcode name given a barcode tag identifier. The name is used most - often for directory naming in ONT experiment results. + num_colls = len(colls) + num_found += num_colls + if num_colls: + log.info("Found collections", item=i, comp=c, num_coll=num_colls) + else: + log.warn("Found no collections", item=i, comp=c) + + for coll in colls: + try: + if annotate_results_collection(coll, c, mlwh_session): + log.info("Updated", item=i, path=coll, comp=c) + num_updated += 1 + else: + num_errors += 1 + except RodsError as re1: + log.error(re1.message, item=i, code=re1.code) + num_errors += 1 - Returns: str - """ - match = TAG_IDENTIFIER_REGEX.search(tag_identifier) - if match: - return f"barcode{match.group(TAG_IDENTIFIER_GROUP) :0>2}" + except RodsError as re2: + log.error(re2.message, item=i, code=re2.code) + num_errors += 1 + except Exception as e: + log.error(e) + num_errors += 1 - raise ValueError( - f"Invalid ONT tag identifier '{tag_identifier}'. " - f"Expected a value matching {TAG_IDENTIFIER_REGEX}" - ) + return num_found, num_updated, num_errors def annotate_results_collection( - path: PathLike | str, - experiment_name: str, - instrument_slot: int, - mlwh_session: Session, + path: PathLike | str, component: Component, mlwh_session: Session ) -> bool: """Add or update metadata on an existing iRODS collection containing ONT data. @@ -236,96 +171,53 @@ def annotate_results_collection( Args: path: A collection path to annotate. - experiment_name: The ONT experiment name. - instrument_slot: The ONT instrument slot number. + component: A Component describing the portion of an instrument mlwh_session: An open SQL session. Returns: True on success. """ - log.debug( - "Searching the ML warehouse", expt_name=experiment_name, slot=instrument_slot - ) + c = component + log.debug("Searching the ML warehouse", comp=c) - fc_info = find_flowcell_by_expt_slot(mlwh_session, experiment_name, instrument_slot) - if not fc_info: - log.warn( - "Failed to find flowcell information in the ML warehouse", - expt_name=experiment_name, - slot=instrument_slot, - ) + flowcells = find_flowcells_by_component(mlwh_session, c) + if not flowcells: + log.warn("Failed to find flowcell information in the ML warehouse", comp=c) return False coll = Collection(path) if not coll.exists(): - log.warn( - "Collection does not exist", - path=coll, - expt_name=experiment_name, - slot=instrument_slot, - ) + log.warn("Collection does not exist", path=coll, comp=c) return False avus = [ - AVU(Instrument.EXPERIMENT_NAME, experiment_name), - AVU(Instrument.INSTRUMENT_SLOT, instrument_slot), + AVU(Instrument.EXPERIMENT_NAME, c.experiment_name), + AVU(Instrument.INSTRUMENT_SLOT, c.instrument_slot), ] coll.supersede_metadata(*avus) # These AVUs should be present already # A single fc record (for non-multiplexed data) - if len(fc_info) == 1: - log.info( - "Found non-multiplexed", expt_name=experiment_name, slot=instrument_slot - ) - fc = fc_info[0] + if len(flowcells) == 1: + log.info("Found non-multiplexed", comp=c) try: - coll.supersede_metadata(*make_study_metadata(fc.study), history=True) - coll.supersede_metadata(*make_sample_metadata(fc.sample), history=True) - - # Keep the access controls that we don't manage - keep = [ac for ac in coll.permissions() if not is_managed_access(ac)] - coll.supersede_permissions( - *keep, *make_sample_acl(fc.sample, fc.study), recurse=True - ) + # Secondary metadata. Updating this here is an optimisation to reduce + # turn-around-time. If we don't update, we just have to wait for a + # cron job to call `ensure_secondary_metadata_updated`. + _do_metadata_and_permissions_update(coll, flowcells) except RodsError as e: log.error(e.message, code=e.code) return False return True - log.info("Found multiplexed", expt_name=experiment_name, slot=instrument_slot) + log.info("Found multiplexed", comp=c) num_errors = 0 # Since the run report files are outside the 'root' collection for # multiplexed runs, their permissions must be set explicitly - reports = [ - obj - for obj in coll.contents() - if "report" in str(obj.path) and obj.rods_type == DataObject - ] - for report in reports: - try: - if report.exists(): - log.info( - "Updating run report permissions", - path=report.path, - expt_name=experiment_name, - slot=instrument_slot, - ) - keep = [ac for ac in report.permissions() if not is_managed_access(ac)] - report.supersede_permissions(*keep, *make_public_read_acl()) - else: - log.warn( - "Run report missing", - path=report.path, - expt_name=experiment_name, - slot=instrument_slot, - ) - except RodsError as e: - log.error(e.message, code=e.code) - num_errors += 1 + num_errors += _set_minknow_reports_public(coll) - sub_colls = [c for c in coll.contents() if c.rods_type == Collection] + sub_colls = [item for item in coll.contents() if item.rods_type == Collection] # This expects the barcode directory naming style created by current ONT's # Guppy de-multiplexer which creates several subdirectories e.g. "fast5_pass", @@ -339,45 +231,40 @@ def annotate_results_collection( continue # Multiple fc records (one per plex of multiplexed data) - for fc in fc_info: + for fc in flowcells: try: - bc_path = sc.path / barcode_name_from_id(fc.tag_identifier) - bc_coll = Collection(bc_path) - if not bc_coll.exists(): - log.warn( - "Collection missing", - path=bc_path, - expt_name=experiment_name, - slot=instrument_slot, - tag_id=fc.tag_identifier, - ) + bpath = sc.path / barcode_name_from_id(fc.tag_identifier) + bcoll = Collection(bpath) + bcomp = Component( + c.experiment_name, c.instrument_slot, fc.tag_identifier + ) + if not bcoll.exists(): + log.warn("Collection missing", path=bpath, comp=bcomp) continue log.info( "Annotating", - path=bc_coll, - expt_name=experiment_name, - slot=instrument_slot, - tag_id=fc.tag_identifier, + path=bcoll, + comp=bcomp, sample=fc.sample, study=fc.study, ) - bc_coll.supersede_metadata( + # Primary metadata. We are adding the full tag identifier to enable ML + # warehouse lookup given just an iRODS path as a starting point. The tag + # identifier to tag index transformation loses information (the tag + # prefix), so the existing tag index AVU doesn't allow this. + bcoll.supersede_metadata( + AVU(Instrument.TAG_IDENTIFIER, fc.tag_identifier), AVU(SeqConcept.TAG_INDEX, tag_index_from_id(fc.tag_identifier)), history=True, ) - bc_coll.supersede_metadata(*make_study_metadata(fc.study), history=True) - bc_coll.supersede_metadata( - *make_sample_metadata(fc.sample), history=True - ) - # The ACL could be different for each plex - # Keep the access controls that we don't manage - keep = [ac for ac in bc_coll.permissions() if not is_managed_access(ac)] - bc_coll.supersede_permissions( - *keep, *make_sample_acl(fc.sample, fc.study), recurse=True - ) + # Secondary metadata. Updating this here is an optimisation to reduce + # turn-around-time. If we don't update, we just have to wait for a + # cron job to call `ensure_secondary_metadata_updated`. + _do_metadata_and_permissions_update(bcoll, [fc]) + except RodsError as e: log.error(e.message, code=e.code) num_errors += 1 @@ -385,11 +272,34 @@ def annotate_results_collection( return num_errors == 0 +def ensure_secondary_metadata_updated( + item: Collection | DataObject, mlwh_session +) -> bool: + """Update secondary metadata on an iRODS path, using information from the ML + warehouse. + + Args: + item: iRODS path to update. + mlwh_session: An open SQL session. + + Returns: + True if any changes were made. + """ + expt = item.avu(Instrument.EXPERIMENT_NAME.value) + slot = item.avu(Instrument.INSTRUMENT_SLOT.value) + tag_id = item.avu(Instrument.TAG_IDENTIFIER.value) + + component = Component(expt.value, slot.value, tag_id.value) + flowcells = find_flowcells_by_component(mlwh_session, component) + + return _do_metadata_and_permissions_update(item, flowcells) + + def find_recent_expt(sess: Session, since: datetime) -> list[str]: """Find recent ONT experiments in the ML warehouse database. Find ONT experiments in the ML warehouse database that have been updated - since a specified date and time. If any element of the experiment (any of + at, or since a specified date and time. If any element of the experiment (any of the positions in a multi-flowcell experiment, any of the multiplexed elements within a position) have been updated in the query window, the experiment name will be returned. @@ -404,68 +314,170 @@ def find_recent_expt(sess: Session, since: datetime) -> list[str]: rows = ( sess.query(distinct(OseqFlowcell.experiment_name)) - .filter(OseqFlowcell.last_updated >= since) + .filter(OseqFlowcell.recorded_at >= since) .all() ) return [val for val, in rows] -def find_recent_expt_slot(sess: Session, since: datetime) -> list[tuple]: - """Find recent ONT experiments and instrument slot positions in the ML - warehouse database. - - Find ONT experiments and associated instrument slot positions in the ML - warehouse database that have been updated since a specified date and time. +def find_components_changed( + sess: Session, since: datetime, include_tags=True +) -> Iterator[Component]: + """Return the components of runs whose ML warehouse metadata has been updated + at or since the given date and time. Args: - sess: An open session to the ML warehouse. + sess: An open SQL session. since: A datetime. + include_tags: Resolve the components to the granularity of individual tags, + rather than as whole runs. Optional, defaults to True. Returns: - List of matching (experiment name, slot position) tuples + An iterator over the matching components. """ - rows = ( - sess.query(OseqFlowcell.experiment_name, OseqFlowcell.instrument_slot) - .filter(OseqFlowcell.last_updated >= since) + columns = [OseqFlowcell.experiment_name, OseqFlowcell.instrument_slot] + + if include_tags: + columns.append(OseqFlowcell.tag_identifier) + + for cols in ( + sess.query(*columns) + .distinct() + .join(OseqFlowcell.sample) + .join(OseqFlowcell.study) + .filter( + (Sample.recorded_at >= since) + | (Study.recorded_at >= since) + | (OseqFlowcell.recorded_at >= since) + ) .group_by(OseqFlowcell.experiment_name, OseqFlowcell.instrument_slot) .order_by(asc(OseqFlowcell.experiment_name), asc(OseqFlowcell.instrument_slot)) - .all() - ) - return [row.tuple() for row in rows] + ): + yield Component(*cols) -def find_flowcell_by_expt_slot( - sess: Session, experiment_name: str, instrument_slot: int +def find_flowcells_by_component( + sess: Session, component: Component ) -> list[Type[OseqFlowcell]]: - return ( + """Return the flowcells for this component. + + Args: + sess: An open SQL session. + component: An ONT run component. + + Returns: + The OseqFlowcells for the component. + """ + query = ( sess.query(OseqFlowcell) - .filter( - OseqFlowcell.experiment_name == experiment_name, - OseqFlowcell.instrument_slot == instrument_slot, - ) - .order_by( - asc(OseqFlowcell.experiment_name), - asc(OseqFlowcell.instrument_slot), - asc(OseqFlowcell.tag_identifier), - asc(OseqFlowcell.tag2_identifier), - ) - .all() + .distinct() + .filter(OseqFlowcell.experiment_name == component.experiment_name) ) + if component.instrument_slot is not None: + query = query.filter(OseqFlowcell.instrument_slot == component.instrument_slot) -def find_flowcells_by_component(sess: Session, component: Component): - return ( - sess.query(OseqFlowcell) - .filter( - OseqFlowcell.experiment_name == component.experiment_name, - OseqFlowcell.instrument_slot == component.position, - ) - .order_by( - asc(OseqFlowcell.experiment_name), - asc(OseqFlowcell.instrument_slot), - asc(OseqFlowcell.tag_identifier), - asc(OseqFlowcell.tag2_identifier), - ) - .all() + if component.tag_identifier is not None: + query = query.filter(OseqFlowcell.tag_identifier.like()) + + return query.order_by( + asc(OseqFlowcell.experiment_name), + asc(OseqFlowcell.instrument_slot), + asc(OseqFlowcell.tag_identifier), + asc(OseqFlowcell.tag2_identifier), + ).all() + + +def tag_index_from_id(tag_identifier: str) -> int: + """Return the barcode tag index given a barcode tag identifier. + + Returns: int + """ + match = TAG_IDENTIFIER_REGEX.search(tag_identifier) + if match: + return int(match.group(TAG_IDENTIFIER_GROUP)) + + raise ValueError( + f"Invalid ONT tag identifier '{tag_identifier}'. " + f"Expected a value matching {TAG_IDENTIFIER_REGEX}" ) + + +def barcode_name_from_id(tag_identifier: str) -> str: + """Return the barcode name given a barcode tag identifier. The name is used most + often for directory naming in ONT experiment results. + + Returns: str + """ + match = TAG_IDENTIFIER_REGEX.search(tag_identifier) + if match: + return f"barcode{match.group(TAG_IDENTIFIER_GROUP) :0>2}" + + raise ValueError( + f"Invalid ONT tag identifier '{tag_identifier}'. " + f"Expected a value matching {TAG_IDENTIFIER_REGEX}" + ) + + +def _do_metadata_and_permissions_update( + item: Collection | DataObject, flowcells +) -> bool: + """Update metadata and permissions using sample/study information obtained from + flowcell records in the ML warehouse. + + Args: + item: iRODS path to update. + flowcells: ML warehouse flowcell records. + + Returns: + True if changes were made. + """ + zone = infer_zone(item) + + metadata = [] + for fc in flowcells: + metadata.extend(make_sample_metadata(fc.sample)) + metadata.extend(make_study_metadata(fc.study)) + meta_updated = do_metadata_update(item, metadata) + + acl = [] + for fc in flowcells: + acl.extend(make_sample_acl(fc.sample, fc.study, zone=zone)) + perm_updated = do_permissions_update( + item, acl, recurse=(item.rods_type == Collection) + ) + + return meta_updated or perm_updated + + +def _set_minknow_reports_public(coll: Collection) -> int: + """Set the permissions of any MinKNOW reports directly in the collection to + public:read. + + Args: + coll: A collection containing report data objects. + + Returns: + The number of errors encountered. + """ + num_errors = 0 + + reports = [ + obj + for obj in coll.contents() + if obj.rods_type == DataObject and "report" in obj.name + ] + + for report in reports: + try: + if report.exists(): + log.info("Updating run report permissions", path=report.path) + keep = [ac for ac in report.permissions() if not is_managed_access(ac)] + report.supersede_permissions(*keep, *make_public_read_acl()) + else: + log.warn("Run report missing", path=report.path) + except RodsError as e: + log.error(e.message, code=e.code) + num_errors += 1 + return num_errors diff --git a/src/npg_irods/utilities.py b/src/npg_irods/utilities.py index 4b16a896..04d146b9 100644 --- a/src/npg_irods/utilities.py +++ b/src/npg_irods/utilities.py @@ -38,9 +38,9 @@ ) from structlog import get_logger +from npg_irods import illumina, ont from npg_irods.common import AnalysisType, Platform, infer_data_source from npg_irods.exception import ChecksumError -from npg_irods.illumina import ensure_secondary_metadata_updated from npg_irods.metadata.common import ( DataFile, ensure_common_metadata, @@ -588,7 +588,7 @@ def fn(i: int, path: str) -> (bool, bool): def update_secondary_metadata( - reader, writer, mlwh_session, print_update=True, print_fail=False, zone=None + reader, writer, mlwh_session, print_update=True, print_fail=False ) -> (int, int, int): num_processed, num_updated, num_errors = 0, 0, 0 @@ -596,16 +596,32 @@ def update_secondary_metadata( num_processed += 1 try: p = path.strip() + rods_item = make_rods_item(p) + updated = False + match infer_data_source(p): case Platform.ILLUMINA, AnalysisType.NUCLEIC_ACID_SEQUENCING: - item = make_rods_item(p) - log.info("Illumina", item=i, path=p, zone=zone) - if ensure_secondary_metadata_updated(item, mlwh_session, zone=zone): - num_updated += 1 - if print_update: - _print(p, writer) - case _, _: - log.warn("Unsupported", path=p) + log.info("Illumina", item=i, path=p) + updated = illumina.ensure_secondary_metadata_updated( + rods_item, mlwh_session + ) + case Platform.OXFORD_NANOPORE_TECHNOLOGIES, AnalysisType.NUCLEIC_ACID_SEQUENCING: + log.info("ONT", item=i, path=p) + updated = ont.ensure_secondary_metadata_updated( + rods_item, mlwh_session + ) + case platform, analysis: + log.warn( + "Unsupported platform/analysis", + path=p, + platform=platform, + analysis=analysis, + ) + + if updated: + num_updated += 1 + if print_update: + _print(p, writer) except RodsError as re: num_errors += 1 diff --git a/tests/conftest.py b/tests/conftest.py index cb352cd9..4196dc47 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -254,7 +254,7 @@ def initialize_mlwh_ont_synthetic(session: Session): # All the even experiments have the early datetime # All the odd experiments have the late datetime - when_expt = EARLY if expt % 2 == 0 else LATE + when = EARLY if expt % 2 == 0 else LATE flowcells.append( OseqFlowcell( @@ -266,8 +266,8 @@ def initialize_mlwh_ont_synthetic(session: Session): id_lims=f"Example LIMS ID {sample_idx}", id_flowcell_lims=id_flowcell, requested_data_type=req_data_type, - last_updated=when_expt, - recorded_at=BEGIN, + last_updated=when, + recorded_at=when, ) ) sample_idx += 1 @@ -325,7 +325,7 @@ def initialize_mlwh_ont_synthetic(session: Session): tag_sequence=barcode, tag_identifier=tag_id, last_updated=when, - recorded_at=BEGIN, + recorded_at=when, ) ) msample_idx += 1 diff --git a/tests/test_illumina.py b/tests/test_illumina.py index 1ba31d7a..ac60c3d4 100644 --- a/tests/test_illumina.py +++ b/tests/test_illumina.py @@ -80,7 +80,7 @@ def test_updates_present_metadata( assert avu in obj.metadata() assert not ensure_secondary_metadata_updated( - obj, mlwh_session=illumina_synthetic_mlwh, zone=zone + obj, mlwh_session=illumina_synthetic_mlwh ) for avu in expected_avus: @@ -275,7 +275,7 @@ def test_updates_absent_study_permissions( assert obj.permissions() == [AC("irods", perm=Permission.OWN, zone=zone)] assert ensure_secondary_metadata_updated( - obj, mlwh_session=illumina_synthetic_mlwh, zone=zone + obj, mlwh_session=illumina_synthetic_mlwh ) assert obj.permissions() == [ AC("irods", perm=Permission.OWN, zone=zone), @@ -310,7 +310,7 @@ def test_updates_present_study_permissions( assert obj.permissions() == expected_acl assert not ensure_secondary_metadata_updated( - obj, mlwh_session=illumina_synthetic_mlwh, zone=zone + obj, mlwh_session=illumina_synthetic_mlwh ) assert obj.permissions() == expected_acl @@ -331,7 +331,7 @@ def test_updates_changed_study_permissions( assert obj.permissions() == old_permissions assert ensure_secondary_metadata_updated( - obj, mlwh_session=illumina_synthetic_mlwh, zone=zone + obj, mlwh_session=illumina_synthetic_mlwh ) new_permissions = [ AC("irods", perm=Permission.OWN, zone=zone), @@ -356,7 +356,7 @@ def test_updates_human_permissions( assert obj.permissions() == expected_acl assert ensure_secondary_metadata_updated( - obj, mlwh_session=illumina_synthetic_mlwh, zone=zone + obj, mlwh_session=illumina_synthetic_mlwh ) assert obj.permissions() == [AC("irods", perm=Permission.OWN, zone=zone)] @@ -377,7 +377,7 @@ def test_updates_xahuman_permissions( assert obj.permissions() == expected_acl assert ensure_secondary_metadata_updated( - obj, mlwh_session=illumina_synthetic_mlwh, zone=zone + obj, mlwh_session=illumina_synthetic_mlwh ) assert obj.permissions() == [AC("irods", perm=Permission.OWN, zone=zone)] diff --git a/tests/test_ml_warehouse_queries.py b/tests/test_ml_warehouse_queries.py index 742a19c9..e345e481 100644 --- a/tests/test_ml_warehouse_queries.py +++ b/tests/test_ml_warehouse_queries.py @@ -24,7 +24,7 @@ from conftest import BEGIN, EARLY, LATE, LATEST from npg_irods.metadata import illumina from npg_irods.metadata.lims import TrackedSample, TrackedStudy -from npg_irods.ont import find_recent_expt, find_recent_expt_slot +from npg_irods.ont import Component, find_components_changed, find_recent_expt @m.describe("Finding updated ONT experiments by datetime") @@ -62,50 +62,71 @@ def test_find_recent_expt(self, ont_synthetic_mlwh): @m.describe("Finding updated experiments and positions by datetime") @m.context("When a query date is provided") @m.it("Finds the correct experiment, slot tuples") - def test_find_recent_expt_pos(self, ont_synthetic_mlwh): + def test_find_recent_component(self, ont_synthetic_mlwh): before_late = LATE - timedelta(days=1) odd_expts = [ - ("multiplexed_experiment_001", 1), - ("multiplexed_experiment_001", 2), - ("multiplexed_experiment_001", 3), - ("multiplexed_experiment_001", 4), - ("multiplexed_experiment_001", 5), - ("multiplexed_experiment_003", 1), - ("multiplexed_experiment_003", 2), - ("multiplexed_experiment_003", 3), - ("multiplexed_experiment_003", 4), - ("multiplexed_experiment_003", 5), - ("simple_experiment_001", 1), - ("simple_experiment_001", 2), - ("simple_experiment_001", 3), - ("simple_experiment_001", 4), - ("simple_experiment_001", 5), - ("simple_experiment_003", 1), - ("simple_experiment_003", 2), - ("simple_experiment_003", 3), - ("simple_experiment_003", 4), - ("simple_experiment_003", 5), - ("simple_experiment_005", 1), - ("simple_experiment_005", 2), - ("simple_experiment_005", 3), - ("simple_experiment_005", 4), - ("simple_experiment_005", 5), + Component(*args) + for args in [ + ("multiplexed_experiment_001", 1), + ("multiplexed_experiment_001", 2), + ("multiplexed_experiment_001", 3), + ("multiplexed_experiment_001", 4), + ("multiplexed_experiment_001", 5), + ("multiplexed_experiment_003", 1), + ("multiplexed_experiment_003", 2), + ("multiplexed_experiment_003", 3), + ("multiplexed_experiment_003", 4), + ("multiplexed_experiment_003", 5), + ("simple_experiment_001", 1), + ("simple_experiment_001", 2), + ("simple_experiment_001", 3), + ("simple_experiment_001", 4), + ("simple_experiment_001", 5), + ("simple_experiment_003", 1), + ("simple_experiment_003", 2), + ("simple_experiment_003", 3), + ("simple_experiment_003", 4), + ("simple_experiment_003", 5), + ("simple_experiment_005", 1), + ("simple_experiment_005", 2), + ("simple_experiment_005", 3), + ("simple_experiment_005", 4), + ("simple_experiment_005", 5), + ] ] - assert find_recent_expt_slot(ont_synthetic_mlwh, before_late) == odd_expts + assert [ + c + for c in find_components_changed( + ont_synthetic_mlwh, before_late, include_tags=False + ) + ] == odd_expts before_latest = LATEST - timedelta(days=1) odd_positions = [ - ("multiplexed_experiment_001", 1), - ("multiplexed_experiment_001", 3), - ("multiplexed_experiment_001", 5), - ("multiplexed_experiment_003", 1), - ("multiplexed_experiment_003", 3), - ("multiplexed_experiment_003", 5), + Component(*args) + for args in [ + ("multiplexed_experiment_001", 1), + ("multiplexed_experiment_001", 3), + ("multiplexed_experiment_001", 5), + ("multiplexed_experiment_003", 1), + ("multiplexed_experiment_003", 3), + ("multiplexed_experiment_003", 5), + ] ] - assert find_recent_expt_slot(ont_synthetic_mlwh, before_latest) == odd_positions + assert [ + c + for c in find_components_changed( + ont_synthetic_mlwh, before_latest, include_tags=False + ) + ] == odd_positions after_latest = LATEST + timedelta(days=1) - assert find_recent_expt_slot(ont_synthetic_mlwh, after_latest) == [] + assert [ + c + for c in find_components_changed( + ont_synthetic_mlwh, after_latest, include_tags=False + ) + ] == [] @m.describe("Finding Illumina recently changed information in Illumina tables") diff --git a/tests/test_ont.py b/tests/test_ont.py index d8d8282c..02b83432 100644 --- a/tests/test_ont.py +++ b/tests/test_ont.py @@ -28,7 +28,11 @@ from npg_irods.metadata.lims import TrackedSample, TrackedStudy from npg_irods.metadata.common import SeqConcept -from npg_irods.ont import MetadataUpdate, annotate_results_collection +from npg_irods.ont import ( + Component, + annotate_results_collection, + update_metadata, +) class TestONTMetadataCreation(object): @@ -41,12 +45,8 @@ def test_add_new_sample_metadata(self, ont_synthetic_irods, ont_synthetic_mlwh): slot = 1 path = ont_synthetic_irods / expt / "20190904_1514_GA10000_flowcell011_69126024" - annotate_results_collection( - path, - experiment_name=expt, - instrument_slot=slot, - mlwh_session=ont_synthetic_mlwh, - ) + c = Component(experiment_name=expt, instrument_slot=slot) + annotate_results_collection(path, c, mlwh_session=ont_synthetic_mlwh) coll = Collection(path) for avu in [ @@ -64,9 +64,9 @@ def test_add_new_sample_metadata(self, ont_synthetic_irods, ont_synthetic_mlwh): AC("irods", Permission.OWN, zone="testZone"), AC("ss_2000", Permission.READ, zone="testZone"), ] - assert coll.acl() == expected_acl + assert coll.acl() == expected_acl, f"ACL of {coll} is { expected_acl}" for item in coll.contents(): - assert item.acl() == expected_acl + assert item.acl() == expected_acl, f"ACL of {item} is {expected_acl}" @tests_have_admin @m.context("When the experiment is multiplexed") @@ -76,13 +76,8 @@ def test_add_new_plex_metadata(self, ont_synthetic_irods, ont_synthetic_mlwh): slot = 1 path = ont_synthetic_irods / expt / "20190904_1514_GA10000_flowcell101_cf751ba1" - - annotate_results_collection( - path, - experiment_name=expt, - instrument_slot=slot, - mlwh_session=ont_synthetic_mlwh, - ) + c = Component(experiment_name=expt, instrument_slot=slot) + annotate_results_collection(path, c, mlwh_session=ont_synthetic_mlwh) for subcoll in ["fast5_fail", "fast5_pass", "fastq_fail", "fastq_pass"]: for tag_index in range(1, 12): @@ -102,13 +97,8 @@ def test_add_new_plex_sample_metadata( slot = 1 path = ont_synthetic_irods / expt / "20190904_1514_GA10000_flowcell101_cf751ba1" - - annotate_results_collection( - path, - experiment_name=expt, - instrument_slot=slot, - mlwh_session=ont_synthetic_mlwh, - ) + c = Component(experiment_name=expt, instrument_slot=slot) + annotate_results_collection(path, c, mlwh_session=ont_synthetic_mlwh) for subcoll in ["fast5_fail", "fast5_pass", "fastq_fail", "fastq_pass"]: for tag_index in range(1, 12): @@ -142,13 +132,8 @@ def test_public_read_reports(self, ont_synthetic_irods, ont_synthetic_mlwh): slot = 1 path = ont_synthetic_irods / expt / "20190904_1514_GA10000_flowcell101_cf751ba1" - - annotate_results_collection( - path, - experiment_name=expt, - instrument_slot=slot, - mlwh_session=ont_synthetic_mlwh, - ) + c = Component(experiment_name=expt, instrument_slot=slot) + annotate_results_collection(path, c, mlwh_session=ont_synthetic_mlwh) expected_acl = [ AC("irods", Permission.OWN, zone="testZone"), AC("public", Permission.READ, zone="testZone"), @@ -172,8 +157,7 @@ def test_find_all(self, ont_synthetic_irods, ont_synthetic_mlwh): num_multiplexed_expts = 3 num_slots = 5 - update = MetadataUpdate() - num_found, num_updated, num_errors = update.update_secondary_metadata( + num_found, num_updated, num_errors = update_metadata( mlwh_session=ont_synthetic_mlwh ) num_expected = (num_simple_expts * num_slots) + ( @@ -188,8 +172,7 @@ def test_find_all(self, ont_synthetic_irods, ont_synthetic_mlwh): @m.context("When a time window is specified") @m.it("Finds only collections updated in that time window") def test_find_recent_updates(self, ont_synthetic_irods, ont_synthetic_mlwh): - update = MetadataUpdate() - num_found, num_updated, num_errors = update.update_secondary_metadata( + num_found, num_updated, num_errors = update_metadata( mlwh_session=ont_synthetic_mlwh, since=LATEST ) @@ -218,9 +201,8 @@ def test_find_recent_updates(self, ont_synthetic_irods, ont_synthetic_mlwh): @m.context("When an experiment name is specified") @m.it("Finds only collections with that experiment name") def test_find_updates_for_experiment(self, ont_synthetic_irods, ont_synthetic_mlwh): - update = MetadataUpdate(experiment_name="simple_experiment_001") - num_found, num_updated, num_errors = update.update_secondary_metadata( - mlwh_session=ont_synthetic_mlwh + num_found, num_updated, num_errors = update_metadata( + experiment_name="simple_experiment_001", mlwh_session=ont_synthetic_mlwh ) expected_colls = [ @@ -247,11 +229,10 @@ def test_find_updates_for_experiment(self, ont_synthetic_irods, ont_synthetic_ml def test_find_updates_for_experiment_slot( self, ont_synthetic_irods, ont_synthetic_mlwh ): - update = MetadataUpdate( - experiment_name="simple_experiment_001", instrument_slot=1 - ) - num_found, num_updated, num_errors = update.update_secondary_metadata( - mlwh_session=ont_synthetic_mlwh + num_found, num_updated, num_errors = update_metadata( + experiment_name="simple_experiment_001", + instrument_slot=1, + mlwh_session=ont_synthetic_mlwh, ) expected_colls = [ @@ -277,11 +258,12 @@ def test_updates_absent_metadata(self, ont_synthetic_irods, ont_synthetic_mlwh): / "simple_experiment_001/20190904_1514_G100000_flowcell011_69126024" ) assert AVU(TrackedSample.NAME, "sample 1") not in coll.metadata() - update = MetadataUpdate( - experiment_name="simple_experiment_001", instrument_slot=1 - ) - update.update_secondary_metadata(mlwh_session=ont_synthetic_mlwh) + update_metadata( + experiment_name="simple_experiment_001", + instrument_slot=1, + mlwh_session=ont_synthetic_mlwh, + ) assert AVU(TrackedSample.NAME, "sample 1") in coll.metadata() @@ -293,11 +275,12 @@ def test_updates_present_metadata(self, ont_synthetic_irods, ont_synthetic_mlwh) / "simple_experiment_001/20190904_1514_G100000_flowcell011_69126024" ) coll.add_metadata(AVU(TrackedSample.NAME, "sample 1")) - update = MetadataUpdate( - experiment_name="simple_experiment_001", instrument_slot=1 - ) - update.update_secondary_metadata(mlwh_session=ont_synthetic_mlwh) + update_metadata( + experiment_name="simple_experiment_001", + instrument_slot=1, + mlwh_session=ont_synthetic_mlwh, + ) assert AVU(TrackedSample.NAME, "sample 1") in coll.metadata() @@ -309,11 +292,13 @@ def test_updates_changed_metadata(self, ont_synthetic_irods, ont_synthetic_mlwh) / "simple_experiment_001/20190904_1514_G100000_flowcell011_69126024" ) coll.add_metadata(AVU(TrackedSample.NAME, "sample 0")) - update = MetadataUpdate( - experiment_name="simple_experiment_001", instrument_slot=1 + + update_metadata( + experiment_name="simple_experiment_001", + instrument_slot=1, + mlwh_session=ont_synthetic_mlwh, ) - update.update_secondary_metadata(mlwh_session=ont_synthetic_mlwh) assert AVU(TrackedSample.NAME, "sample 1") in coll.metadata() assert AVU(TrackedSample.NAME, "sample 0") not in coll.metadata() assert history_in_meta( @@ -329,10 +314,12 @@ def test_updates_multiple_metadata(self, ont_synthetic_irods, ont_synthetic_mlwh ) coll.add_metadata(AVU(TrackedStudy.NAME, "Study A")) coll.add_metadata(AVU(TrackedStudy.NAME, "Study B")) - update = MetadataUpdate( - experiment_name="simple_experiment_001", instrument_slot=1 + + update_metadata( + experiment_name="simple_experiment_001", + instrument_slot=1, + mlwh_session=ont_synthetic_mlwh, ) - update.update_secondary_metadata(mlwh_session=ont_synthetic_mlwh) assert AVU(TrackedStudy.NAME, "Study Y") in coll.metadata() assert AVU(TrackedStudy.NAME, "Study A") not in coll.metadata() assert AVU(TrackedStudy.NAME, "Study B") not in coll.metadata()