Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new classes for lineage manipulation #2437

Merged
merged 16 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/sourmash/lca/lca_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ def build_tree(assignments, initial=None):
for assignment in assignments:
node = tree

# when we switch LineagePair over, will need ot add this.
#if isinstance(assignment, (BaseLineageInfo, RankLineageInfo, LINSLineageInfo)):
# assignment = assignment.filled_lineage

for lineage_tup in assignment:
if lineage_tup.name:
child = node.get(lineage_tup, {})
Expand Down
309 changes: 305 additions & 4 deletions src/sourmash/tax/tax_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import csv
from collections import namedtuple, defaultdict
from collections import abc
from itertools import zip_longest
from typing import NamedTuple
from dataclasses import dataclass, field, replace
import gzip

from sourmash import sqlite_utils, sourmash_args
Expand Down Expand Up @@ -36,7 +39,305 @@

# import lca utils as needed for now
from sourmash.lca import lca_utils
from sourmash.lca.lca_utils import (LineagePair, taxlist, display_lineage, pop_to_rank)
from sourmash.lca.lca_utils import (taxlist, display_lineage, pop_to_rank)

class LineagePair(NamedTuple):
rank: str
name: str = None
taxid: int = None

@dataclass(frozen=True, order=True)
class BaseLineageInfo:
"""
This BaseLineageInfo class defines a set of methods that can be used to handle
summarization and manipulation of taxonomic lineages with hierarchical taxonomic ranks.

Inputs:
required:
ranks: tuple or list of hierarchical ranks
optional:
lineage: tuple or list of LineagePair
lineage_str: `;`- or `,`-separated string of names
lineage_dict: dictionary of {rank: name}

If no lineage information is provided, result will be a BaseLineageInfo
with provided ranks and no lineage names.

Input lineage information is only used for initialization of the final `lineage`
and will not be used or compared in any other class methods.
"""
# need to set compare=False for any mutable type to keep this class hashable
ranks: tuple() # require ranks
lineage: tuple = () # tuple of LineagePairs
lineage_str: str = field(default=None, compare=False) # ';'- or ','-separated str of lineage names
bluegenes marked this conversation as resolved.
Show resolved Hide resolved
lineage_dict: dict = field(default=None, compare=False) # dict of rank: name

def __post_init__(self):
"Initialize according to passed values"
# ranks must be tuple for hashability
if isinstance(self.ranks, list):
object.__setattr__(self, "ranks", tuple(self.ranks))
if self.lineage:
self._init_from_lineage_tuples()
elif self.lineage_str is not None:
self._init_from_lineage_str()
elif self.lineage_dict is not None:
self._init_from_lineage_dict()
else:
self._init_empty()

def __eq__(self, other):
if other == (): # just handy: if comparing to a null tuple, don't try to find it's lineage before returning False
return False
return all([self.ranks == other.ranks and self.lineage==other.lineage])

@property
def taxlist(self):
return self.ranks

@property
def ascending_taxlist(self):
return self.ranks[::-1]

@property
def lowest_rank(self):
if not self.filled_ranks:
return None
return self.filled_ranks[-1]

def rank_index(self, rank):
return self.ranks.index(rank)

@property
def filled_lineage(self):
"""Return lineage down to lowest non-empty rank. Preserves missing ranks above."""
# Would we prefer this to be the default returned by lineage??
if not self.filled_ranks:
return ()
lowest_filled_rank_idx = self.rank_index(self.filled_ranks[-1])
return self.lineage[:lowest_filled_rank_idx+1]

@property
def lowest_lineage_name(self):
"Return the name of the lowest filled lineage"
if not self.filled_ranks:
return None
return self.filled_lineage[-1].name

@property
def lowest_lineage_taxid(self):
"Return the taxid of the lowest filled lineage"
if not self.filled_ranks:
return None
return self.filled_lineage[-1].taxid

def _init_empty(self):
'initialize empty genome lineage'
new_lineage = []
for rank in self.ranks:
new_lineage.append(LineagePair(rank=rank))
# set lineage and filled_ranks (because frozen, need to do it this way)
object.__setattr__(self, "lineage", new_lineage)
object.__setattr__(self, "filled_ranks", ())

def _init_from_lineage_tuples(self):
'initialize from tuple/list of LineagePairs, allowing empty ranks and reordering if necessary'
new_lineage = []
# check this is a list or tuple of lineage tuples:
for rank in self.ranks:
new_lineage.append(LineagePair(rank=rank))
for lin_tup in self.lineage:
# now add input tuples in correct spots. This corrects for order and allows empty values.
if not isinstance(lin_tup, (LineagePair, lca_utils.LineagePair)):
raise ValueError(f"{lin_tup} is not LineagePair.")
# find index for this rank
if lin_tup.rank: # skip this tuple if rank is None or "" (empty lineage tuple. is this needed?)
try:
rank_idx = self.rank_index(lin_tup.rank)
except ValueError as e:
raise ValueError(f"Rank '{lin_tup.rank}' not present in {', '.join(self.ranks)}") from e
# make sure we're adding tax_utils.LineagePairs, not lca_utils.LineagePairs for consistency
if isinstance(lin_tup, lca_utils.LineagePair):
new_lineage[rank_idx] = LineagePair(rank=lin_tup.rank, name=lin_tup.name)
else:
new_lineage[rank_idx] = lin_tup
# build list of filled ranks
filled_ranks = [a.rank for a in new_lineage if a.name]
# set lineage and filled_ranks
object.__setattr__(self, "lineage", tuple(new_lineage))
object.__setattr__(self, "filled_ranks", filled_ranks)

def _init_from_lineage_dict(self):
'initialize from lineage dict, e.g. from gather csv, allowing empty ranks and reordering if necessary'
if not isinstance(self.lineage_dict, (dict)):
raise ValueError(f"{self.lineage_dict} is not dictionary")
# first, initialize_empty
new_lineage = []
# build empty lineage
for rank in self.ranks:
new_lineage.append(LineagePair(rank=rank))
# now add input information in correct spots. This corrects for order and allows empty values.
for rank, info in self.lineage_dict.items():
try:
rank_idx = self.rank_index(rank)
except ValueError as e:
raise ValueError(f"Rank '{rank}' not present in {', '.join(self.ranks)}") from e

name, taxid = None, None
if isinstance(info, dict):
if 'name' in info.keys():
name = info['name']
if 'taxid' in info.keys():
taxid = info['taxid']
elif isinstance(info, str):
name = info
new_lineage[rank_idx] = LineagePair(rank=rank, name=name, taxid=taxid)
# build list of filled ranks
filled_ranks = [a.rank for a in new_lineage if a.name]
# set lineage and filled_ranks
object.__setattr__(self, "lineage", tuple(new_lineage))
object.__setattr__(self, "filled_ranks", filled_ranks)

def _init_from_lineage_str(self):
"""
Turn a ; or ,-separated set of lineages into a list of LineagePair objs.
"""
new_lineage = self.lineage_str.split(';')
if len(new_lineage) == 1:
new_lineage = self.lineage_str.split(',')
new_lineage = [ LineagePair(rank=rank, name=n) for (rank, n) in zip_longest(self.ranks, new_lineage) ]
# build list of filled ranks
filled_ranks = [a.rank for a in new_lineage if a.name]
object.__setattr__(self, "lineage", tuple(new_lineage))
object.__setattr__(self, "filled_ranks", filled_ranks)

def zip_lineage(self, truncate_empty=False):
"""
Return lineage names as a list
"""
if truncate_empty:
zipped = [a.name for a in self.filled_lineage]
else:
zipped = [a.name for a in self.lineage]
# replace None with empty string ("")
if None in zipped:
zipped = ['' if x is None else x for x in zipped]

return zipped

def zip_taxid(self, truncate_empty=False):
"""
Return taxids as a list
"""
if truncate_empty:
zipped = [a.taxid for a in self.filled_lineage]
else:
zipped = [a.taxid for a in self.lineage]
# replace None with empty string (""); cast taxids to str
zipped = ['' if x is None else str(x) for x in zipped]

return zipped

def display_lineage(self, truncate_empty=True, null_as_unclassified=False):
"Return lineage names as ';'-separated list"
lin = ";".join(self.zip_lineage(truncate_empty=truncate_empty))
if null_as_unclassified:
if lin == "":
return "unclassified"
else:
return lin

def display_taxid(self, truncate_empty=True):
"Return lineage taxids as ';'-separated list"
return ";".join(self.zip_taxid(truncate_empty=truncate_empty))

def check_rank_availability(self, rank):
if rank in self.ranks: # rank is available
return True
raise ValueError(f"Desired Rank '{rank}' not available for this lineage.")

def rank_is_filled(self, rank, other=None):
self.check_rank_availability(rank)
if other is not None:
if rank in self.filled_ranks and rank in other.filled_ranks:
return True
elif rank in self.filled_ranks:
return True
return False

def is_lineage_match(self, other, rank):
"""
check to see if two lineages are a match down to given rank.
"""
if not other.ranks == self.ranks: # check same ranks
raise ValueError("Cannot compare lineages from taxonomies with different ranks.")
# always return false if rank is not filled in either of the two lineages
if self.rank_is_filled(rank, other=other):
rank_idx = self.rank_index(rank)
a_lin = self.lineage[:rank_idx+1]
b_lin = other.lineage[:rank_idx+1]
if a_lin == b_lin:
return 1
return 0

def pop_to_rank(self, rank):
"Return new LineageInfo with ranks only filled to desired rank"
# are we already above rank?
if not self.rank_is_filled(rank):
return replace(self)
# if not, make filled_lineage at this rank + use to generate new LineageInfo
new_lineage = self.lineage_at_rank(rank)
new = replace(self, lineage = new_lineage)
# replace doesn't run the __post_init__ properly. reinitialize.
new._init_from_lineage_tuples()
return new

def lineage_at_rank(self, rank):
# non-destructive pop_to_rank. Returns tuple of LineagePairs
"Returns tuple of LineagePairs at given rank."
# are we already above rank?
if not self.rank_is_filled(rank):
return self.filled_lineage
# if not, return lineage tuples down to desired rank
rank_idx = self.rank_index(rank)
return self.filled_lineage[:rank_idx+1]


@dataclass(frozen=True, order=True)
class RankLineageInfo(BaseLineageInfo):
"""
This RankLineageInfo class usees the BaseLineageInfo methods for a standard set
of taxonomic ranks.

Inputs:
optional:
ranks: tuple or list of hierarchical ranks
default: ('superkingdom', 'phylum', 'class', 'order', 'family', 'genus', 'species', 'strain')
lineage: tuple or list of LineagePair
lineage_str: `;`- or `,`-separated string of names
lineage_dict: dictionary of {rank: name}

If no inputs are provided, result will be RankLineageInfo with
default ranks and no lineage names.

Input lineage information is only used for initialization of the final `lineage`
and will not be used or compared in any other class methods.
"""
ranks: tuple = ('superkingdom', 'phylum', 'class', 'order', 'family', 'genus', 'species', 'strain')

def __post_init__(self):
"Initialize according to passed values"
# ranks must be tuple for hashability
if isinstance(self.ranks, list):
object.__setattr__(self, "ranks", tuple(self.ranks))
if self.lineage:
self._init_from_lineage_tuples()
elif self.lineage_str is not None:
self._init_from_lineage_str()
elif self.lineage_dict is not None:
self._init_from_lineage_dict()
elif self.ranks:
self._init_empty()


def get_ident(ident, *,
Expand Down Expand Up @@ -760,7 +1061,7 @@ def load(cls, filename, *, delimiter=',', force=False,
# read row into a lineage pair
for rank in lca_utils.taxlist(include_strain=include_strain):
lin = row[rank]
lineage.append(LineagePair(rank, lin))
lineage.append(lca_utils.LineagePair(rank, lin))
ident = row[identifier]

# fold, spindle, and mutilate ident?
Expand All @@ -770,7 +1071,7 @@ def load(cls, filename, *, delimiter=',', force=False,

# clean lineage of null names, replace with 'unassigned'
lineage = [ (a, lca_utils.filter_null(b)) for (a,b) in lineage ]
lineage = [ LineagePair(a, b) for (a, b) in lineage ]
lineage = [ lca_utils.LineagePair(a, b) for (a, b) in lineage ]

# remove end nulls
while lineage and lineage[-1].name == 'unassigned':
Expand Down Expand Up @@ -924,7 +1225,7 @@ def load(cls, location):

def _make_tup(self, row):
"build a tuple of LineagePairs for this sqlite row"
tup = [ LineagePair(n, r) for (n, r) in zip(taxlist(True), row) ]
tup = [ lca_utils.LineagePair(n, r) for (n, r) in zip(taxlist(True), row) ]
return tuple(tup)

def __getitem__(self, ident):
Expand Down
10 changes: 5 additions & 5 deletions tests/test-data/tax/test1.gather.csv
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
intersect_bp,f_orig_query,f_match,f_unique_to_query,f_unique_weighted,average_abund,median_abund,std_abund,name,filename,md5,f_match_orig,unique_intersect_bp,gather_result_rank,remaining_bp,query_name,query_md5,query_filename
442000,0.08815317112086159,0.08438335242458954,0.08815317112086159,0.05815279361459521,1.6153846153846154,1.0,1.1059438185997785,"GCF_001881345.1 Escherichia coli strain=SF-596, ASM188134v1",/group/ctbrowngrp/gtdb/databases/ctb/gtdb-rs202.genomic.k31.sbt.zip,683df1ec13872b4b98d59e98b355b52c,0.042779713511420826,442000,0,4572000,test1,md5,test1.sig
390000,0.07778220981252493,0.10416666666666667,0.07778220981252493,0.050496823586903404,1.5897435897435896,1.0,0.8804995294906566,"GCF_009494285.1 Prevotella copri strain=iAK1218, ASM949428v1",/group/ctbrowngrp/gtdb/databases/ctb/gtdb-rs202.genomic.k31.sbt.zip,1266c86141e3a5603da61f57dd863ed0,0.052236806857755155,390000,1,4182000,test1,md5,test1.sig
138000,0.027522935779816515,0.024722321748477247,0.027522935779816515,0.015637726014008795,1.391304347826087,1.0,0.5702120455914782,"GCF_013368705.1 Bacteroides vulgatus strain=B33, ASM1336870v1",/group/ctbrowngrp/gtdb/databases/ctb/gtdb-rs202.genomic.k31.sbt.zip,7d5f4ba1d01c8c3f7a520d19faded7cb,0.012648945921173235,138000,2,4044000,test1,md5,test1.sig
338000,0.06741124850418827,0.013789581205311542,0.010769844435580374,0.006515719172503665,1.4814814814814814,1.0,0.738886568268889,"GCF_003471795.1 Prevotella copri strain=AM16-54, ASM347179v1",/group/ctbrowngrp/gtdb/databases/ctb/gtdb-rs202.genomic.k31.sbt.zip,0ebd36ff45fc2810808789667f4aad84,0.04337782340862423,54000,3,3990000,test1,md5,test1.sig
intersect_bp,f_orig_query,f_match,f_unique_to_query,f_unique_weighted,average_abund,median_abund,std_abund,name,filename,md5,f_match_orig,unique_intersect_bp,gather_result_rank,remaining_bp,query_name,query_md5,query_filename,query_bp,ksize,scaled,query_n_hashes
442000,0.08815317112086159,0.08438335242458954,0.08815317112086159,0.05815279361459521,1.6153846153846154,1.0,1.1059438185997785,"GCF_001881345.1 Escherichia coli strain=SF-596, ASM188134v1",/group/ctbrowngrp/gtdb/databases/ctb/gtdb-rs202.genomic.k31.sbt.zip,683df1ec13872b4b98d59e98b355b52c,0.042779713511420826,442000,0,4572000,test1,md5,test1.sig,5014000,31,1000,2507
390000,0.07778220981252493,0.10416666666666667,0.07778220981252493,0.050496823586903404,1.5897435897435896,1.0,0.8804995294906566,"GCF_009494285.1 Prevotella copri strain=iAK1218, ASM949428v1",/group/ctbrowngrp/gtdb/databases/ctb/gtdb-rs202.genomic.k31.sbt.zip,1266c86141e3a5603da61f57dd863ed0,0.052236806857755155,390000,1,4182000,test1,md5,test1.sig,50140000,31,1000,2507
138000,0.027522935779816515,0.024722321748477247,0.027522935779816515,0.015637726014008795,1.391304347826087,1.0,0.5702120455914782,"GCF_013368705.1 Bacteroides vulgatus strain=B33, ASM1336870v1",/group/ctbrowngrp/gtdb/databases/ctb/gtdb-rs202.genomic.k31.sbt.zip,7d5f4ba1d01c8c3f7a520d19faded7cb,0.012648945921173235,138000,2,4044000,test1,md5,test1.sig,5014000,31,1000,2507
338000,0.06741124850418827,0.013789581205311542,0.010769844435580374,0.006515719172503665,1.4814814814814814,1.0,0.738886568268889,"GCF_003471795.1 Prevotella copri strain=AM16-54, ASM347179v1",/group/ctbrowngrp/gtdb/databases/ctb/gtdb-rs202.genomic.k31.sbt.zip,0ebd36ff45fc2810808789667f4aad84,0.04337782340862423,54000,3,3990000,test1,md5,test1.sig,5014000,31,1000,2507
Loading