Skip to content

Commit

Permalink
Refactor to separate initial ONT metadata creation from later regular
Browse files Browse the repository at this point in the history
updates, to introduce "component" data classes and DRY some repeated
code.
  • Loading branch information
kjsanger committed Jun 30, 2023
1 parent 65af1e4 commit 31373df
Show file tree
Hide file tree
Showing 11 changed files with 599 additions and 456 deletions.
76 changes: 65 additions & 11 deletions scripts/locate-data-objects
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import structlog
from partisan.irods import AVU, query_metadata
from sqlalchemy.orm import Session

from npg_irods import illumina
from npg_irods import illumina, ont
from npg_irods.cli import add_logging_arguments, configure_logging, parse_iso_date
from npg_irods.db import DBConfig
from npg_irods.db.mlwh import find_consent_withdrawn_samples
Expand Down Expand Up @@ -110,16 +110,15 @@ cwdr_parser.set_defaults(func=consent_withdrawn)

ilup_parser = subparsers.add_parser(
"illumina-updates",
help="Find data objects which are the components of Illumina runs, whose tracking "
help="Find data objects, which are components of Illumina runs, whose tracking "
"metadata in the ML warehouse have changed since a specified time.",
)
ilup_parser.add_argument(
"--begin-date",
"--begin_date",
help="Limit data object found to those changed after this date. "
"Defaults to 14 days ago. "
"The argument must be an ISO8601 UTC date or date and time e.g. 2022-01-30, "
"2022-01-30T11:11:03Z",
help="Limit data objects found to those changed after this date. Defaults to 14 "
"days ago. The argument must be an ISO8601 UTC date or date and time e.g. "
"2022-01-30, 2022-01-30T11:11:03Z",
type=parse_iso_date,
default=datetime.now(timezone.utc) - timedelta(days=14),
)
Expand All @@ -139,15 +138,14 @@ def illumina_updates(cli_args):
log.info("Finding data objects", item=i, component=c, since=iso_date)

try:
for obj in query_metadata(
avus = [
AVU(Instrument.RUN, c.id_run),
AVU(Instrument.LANE, c.position),
AVU(SeqConcept.TAG_INDEX, c.tag_index),
data_object=True,
collection=False,
zone=cli_args.zone,
):
]
for obj in query_metadata(*avus, collection=False, zone=cli_args.zone):
print(obj)

except Exception as e:
num_errors += 1
log.error(e, item=i, component=c)
Expand All @@ -159,6 +157,62 @@ def illumina_updates(cli_args):

ilup_parser.set_defaults(func=illumina_updates)


ontup_parser = subparsers.add_parser(
"ont-updates",
help="Find data objects, which are components of ONT runs, whose tracking metadata "
"in the ML warehouse have changed since a specified time.",
)
ontup_parser.add_argument(
"--begin-date",
"--begin_date",
help="Limit data objects found to those changed after this date. Defaults to 14 "
"days ago. The argument must be an ISO8601 UTC date or date and time e.g. "
"2022-01-30, 2022-01-30T11:11:03Z",
type=parse_iso_date,
default=datetime.now(timezone.utc) - timedelta(days=14),
)


def ont_updates(cli_args):
dbconfig = DBConfig.from_file(cli_args.database_config.name, "mlwh_ro")
engine = sqlalchemy.create_engine(dbconfig.url)
with Session(engine) as session:
num_processed = num_errors = 0
iso_date = cli_args.begin_date.strftime("%Y-%m-%dT%H:%M:%SZ")

for i, c in enumerate(
ont.find_components_changed(
session, include_tags=False, since=cli_args.begin_date
)
):
num_processed += 1
log.info("Finding collections", item=i, component=c, since=iso_date)

try:
avus = [
AVU(ont.Instrument.EXPERIMENT_NAME, c.experiment_name),
AVU(ont.Instrument.INSTRUMENT_SLOT, c.instrument_slot),
]
if c.tag_identifier is not None:
avus.append(AVU(ont.Instrument.TAG_IDENTIFIER, c.tag_identifier))

for coll in query_metadata(
*avus, data_object=False, zone=cli_args.zone
):
print(coll)

except Exception as e:
num_errors += 1
log.error(e, item=i, component=c)

log.info(f"Processed {num_processed} with {num_errors} errors")
if num_errors:
exit(1)


ontup_parser.set_defaults(func=ont_updates)

args = parser.parse_args()
configure_logging(
config_file=args.log_config,
Expand Down
7 changes: 3 additions & 4 deletions scripts/update-ont-metadata
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ from sqlalchemy.orm import Session

from npg_irods.cli import add_logging_arguments, configure_logging, parse_iso_date
from npg_irods.db import DBConfig
from npg_irods.ont import MetadataUpdate
from npg_irods.ont import update_metadata
from npg_irods.version import version

description = """
Expand Down Expand Up @@ -94,9 +94,8 @@ def main():

engine = sqlalchemy.create_engine(dbconfig.url)
with Session(engine) as session:
mu = MetadataUpdate(zone=args.zone)
num_processed, num_updated, num_errors = mu.update_secondary_metadata(
session, since=args.begin_date
num_processed, num_updated, num_errors = update_metadata(
session, since=args.begin_date, zone=args.zone
)

if num_errors:
Expand Down
90 changes: 90 additions & 0 deletions src/npg_irods/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
import re
from enum import Enum, unique
from os import PathLike
from pathlib import PurePath
from typing import Tuple

from partisan.irods import AC, AVU, Collection, DataObject, Permission
from structlog import get_logger

from npg_irods.metadata.lims import has_mixed_ownership, is_managed_access


log = get_logger(__package__)


Expand Down Expand Up @@ -194,3 +199,88 @@ def infer_data_source(path: PathLike | str) -> Tuple[Platform, AnalysisType]:
return Platform.ULTIMA_GENOMICS, AnalysisType.NUCLEIC_ACID_SEQUENCING

raise ValueError(f"Failed to infer a data source for iRODS path '{path}'")


def do_metadata_update(item: Collection | DataObject, avus: list[AVU]) -> bool:
"""Update metadata on an iRODS path, removing existing metadata and replacing with
the given AVUs and adding history of changes.
Args:
item: iRODS path to update.
avus: Metadata to apply.
Returns:
True if any changes were made, False if the desired metadata were already
present.
"""
log.info("Updating metadata", path=item, meta=avus)
num_removed, num_added = item.supersede_metadata(*avus, history=True)
log.info(
"Updated metadata",
path=item,
meta=avus,
num_added=num_added,
num_removed=num_removed,
)
return num_removed > 0 or num_added > 0


def do_permissions_update(
item: Collection | DataObject, acl: list[AC], recurse=False
) -> bool:
"""Update permissions on an iRODS path, removing existing permissions and replacing
with the given ACL. If the ACL contains multiple, conflicting, managed permissions
then it will issue a warning and revoke access.
Args:
item: iRODS path to update.
acl: ACL to apply.
recurse: If True, recursively apply the ACL.
Returns:
True if changes any changes were made, False if the ACL(s) of the target path(s)
were already in the desired state. This applies recursively, so to return False for
a recursive operation, neither the target path, nor any contained path will have
been updated.
"""
if item.rods_type == DataObject and recurse:
raise ValueError(
f"Cannot recursively update permissions on a data object: {item}"
)

if has_mixed_ownership(acl):
log.warn("Mixed-study data", path=item, acl=acl)
for ac in acl:
if is_managed_access(ac):
ac.perm = Permission.NULL

keep = [ac for ac in item.permissions() if not is_managed_access(ac)]

log.info("Updating permissions", path=item, keep=keep, acl=acl)

kwargs = {"recurse": recurse} if recurse else {}
num_removed, num_added = item.supersede_permissions(*keep, *acl, **kwargs)
log.info(
"Updated permissions",
path=item,
keep=keep,
acl=acl,
num_added=num_added,
num_removed=num_removed,
)
return num_removed > 0 or num_added > 0


def infer_zone(path: Collection | DataObject) -> str:
"""Infer the iRODS zone from an iRODS path.
Args:
path: An absolute iRODS path.
Returns:
The zone.
"""
parts = PurePath(path).parts
if len(parts) < 2:
raise ValueError(f"Invalid iRODS path {path}; no zone component")
return parts[1]
61 changes: 12 additions & 49 deletions src/npg_irods/illumina.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,19 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum, unique
from pathlib import PurePath
from typing import Iterator, Optional, Type

from partisan.irods import AVU, Collection, DataObject, Permission
from partisan.irods import AVU, Collection, DataObject
from sqlalchemy import asc
from sqlalchemy.orm import Session
from structlog import get_logger

from npg_irods.common import do_metadata_update, do_permissions_update, infer_zone
from npg_irods.db.mlwh import IseqFlowcell, IseqProductMetrics, Sample, Study
from npg_irods.metadata.common import SeqConcept, SeqSubset
from npg_irods.metadata.illumina import Instrument
from npg_irods.metadata.lims import (
ensure_consent_withdrawn,
has_mixed_ownership,
is_managed_access,
make_sample_acl,
make_sample_metadata,
make_study_metadata,
Expand Down Expand Up @@ -143,7 +141,7 @@ def __repr__(self):


def ensure_secondary_metadata_updated(
item: Collection | DataObject, mlwh_session, include_controls=False, zone=None
item: Collection | DataObject, mlwh_session, include_controls=False
) -> bool:
"""Update iRODS secondary metadata and permissions on Illumina run collections
and data objects.
Expand Down Expand Up @@ -187,24 +185,18 @@ def ensure_secondary_metadata_updated(
mlwh_session: An open SQL session.
include_controls: If True, include any control samples in the metadata and
permissions.
zone: The iRODS zone for any permissions created.
Returns:
True if updated.
"""
if zone is None:
parts = PurePath(item).parts
if len(parts) < 2:
raise ValueError(f"Invalid iRODS path {item}; no zone component")
zone = parts[1]

zone = infer_zone(item)
updated = False
secondary_metadata, acl = [], []

components = [
Component.from_avu(avu)
for avu in item.metadata(attr=SeqConcept.COMPONENT.value)
]
] # Illumina specific
for c in components:
for fc in find_flowcells_by_component(
mlwh_session, c, include_controls=include_controls
Expand All @@ -213,41 +205,12 @@ def ensure_secondary_metadata_updated(
secondary_metadata.extend(make_study_metadata(fc.study))
acl.extend(make_sample_acl(fc.sample, fc.study, zone=zone))

log.info("Updating metadata", path=item, meta=secondary_metadata)
num_removed, num_added = item.supersede_metadata(*secondary_metadata, history=True)
log.info(
"Updated metadata",
path=item,
meta=secondary_metadata,
num_added=num_added,
num_removed=num_removed,
)
if num_removed or num_added:
updated = True
updated = True if do_metadata_update(item, secondary_metadata) else updated

if any(c.contains_nonconsented_human() for c in components):
if any(c.contains_nonconsented_human() for c in components): # Illumina specific
updated = True if ensure_consent_withdrawn(item) else updated
else:
if has_mixed_ownership(acl):
log.warn("Mixed-study data", path=item, acl=acl)
for ac in acl:
if is_managed_access(ac):
ac.perm = Permission.NULL

keep = [ac for ac in item.permissions() if not is_managed_access(ac)]

log.info("Updating permissions", path=item, keep=keep, acl=acl)
num_removed, num_added = item.supersede_permissions(*keep, *acl)
log.info(
"Updated permissions",
path=item,
keep=keep,
acl=acl,
num_added=num_added,
num_removed=num_removed,
)
if num_removed or num_added:
updated = True
updated = True if do_permissions_update(item, acl) else updated

return updated

Expand Down Expand Up @@ -320,10 +283,10 @@ def find_components_changed(sess: Session, since: datetime) -> Iterator[Componen
.join(IseqFlowcell.study)
.join(IseqFlowcell.iseq_product_metrics)
.filter(
(Sample.recorded_at > since)
| (Study.recorded_at > since)
| (IseqFlowcell.recorded_at > since)
| (IseqProductMetrics.last_changed > since)
(Sample.recorded_at >= since)
| (Study.recorded_at >= since)
| (IseqFlowcell.recorded_at >= since)
| (IseqProductMetrics.last_changed >= since)
)
.order_by(asc(IseqFlowcell.id_iseq_flowcell_tmp))
):
Expand Down
3 changes: 2 additions & 1 deletion src/npg_irods/metadata/ont.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright © 2021, 2022 Genome Research Ltd. All rights reserved.
# Copyright © 2021, 2022, 2023 Genome Research Ltd. All rights reserved.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -28,6 +28,7 @@ class Instrument(AsValueEnum, metaclass=with_namespace("ont")):

EXPERIMENT_NAME = "experiment_name"
INSTRUMENT_SLOT = "instrument_slot"
TAG_IDENTIFIER = "tag_identifier"

def __repr__(self):
return f"{Instrument.namespace}:{self.value}"
Loading

0 comments on commit 31373df

Please sign in to comment.