Skip to content

Commit

Permalink
Improve aggregation (#23)
Browse files Browse the repository at this point in the history
* export key functions for sorting chromosomes / effect types

* use new key functions for sorting

* reduce memory usage during aggregation

* fix doctest output

* make aggregation steps clearer
  • Loading branch information
nebfield authored Jun 12, 2024
1 parent 016afe0 commit 1664852
Show file tree
Hide file tree
Showing 16 changed files with 216 additions and 86 deletions.
41 changes: 28 additions & 13 deletions pgscatalog.calc/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pgscatalog.calc/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ packages = [

[tool.poetry.dependencies]
python = "^3.11"
"pgscatalog.core" = "^0.1.0"
"pgscatalog.core" = {path = "../pgscatalog.core", develop = true}
numpy = "^1.26.4"
pandas = "^2.2.0"
pyarrow = "^15.0.0"
Expand Down
4 changes: 1 addition & 3 deletions pgscatalog.calc/src/pgscatalog/calc/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
from .aggregate_cli import run_aggregate

__all__ = ["run_aggregate"]
__all__ = []
50 changes: 43 additions & 7 deletions pgscatalog.calc/src/pgscatalog/calc/cli/aggregate_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import logging
import pathlib
import textwrap
import operator
import functools
from collections import deque
from typing import Optional

from ..lib.polygenicscore import PolygenicScore
from ..lib import PolygenicScore
from pgscatalog.core import chrom_keyfunc

logger = logging.getLogger(__name__)

Expand All @@ -21,15 +22,50 @@ def run_aggregate():

if args.verbose:
logger.setLevel(logging.INFO)
logging.getLogger("pgscatalog.core").setLevel(logging.INFO)
logging.getLogger("pgscatalog.calc").setLevel(logging.INFO)

if not (outdir := pathlib.Path(args.outdir)).exists():
raise FileNotFoundError(f"--outdir {outdir.name} doesn't exist")

score_paths = [pathlib.Path(x) for x in args.scores]
pgs = [PolygenicScore(path=x) for x in score_paths]
# call __add__ a lot
aggregated = functools.reduce(operator.add, pgs)
score_paths = sorted([pathlib.Path(x) for x in args.scores], key=chrom_keyfunc())
# dfs are only read into memory after accessing them explicitly e.g. pgs[0].df
pgs = deque(PolygenicScore(path=x) for x in score_paths)

observed_columns = set()
aggregated: Optional[PolygenicScore] = None

# first, use PolygenicScore's __add__ method, which implements df.add(fill_value=0)
while pgs:
# popleft ensures that dfs are removed from memory after each aggregation
score: PolygenicScore = pgs.popleft()
if aggregated is None:
logger.info(f"Initialising aggregation with {score}")
aggregated: PolygenicScore = score
else:
logger.info(f"Adding {score}")
aggregated += score
observed_columns.update(set(score.df.columns))

# check to make sure that every column we saw in the dataframes is in the output
if (dfcols := set(aggregated.df.columns)) != observed_columns:
raise ValueError(
f"Missing columns in aggregated file!. "
f"Observed: {observed_columns}. "
f"In aggregated: {dfcols}"
)
else:
logger.info("Aggregated columns match observed columns")

# next, melt the plink2 scoring files from wide (many columns) format to long format
aggregated.melt()

# recalculate PGS average using aggregated SUM and DENOM
aggregated.average()

logger.info("Aggregation finished! Writing to a file")
aggregated.write(outdir=args.outdir, split=args.split)
logger.info("all done. bye :)")


def _description_text() -> str:
Expand Down
74 changes: 39 additions & 35 deletions pgscatalog.calc/src/pgscatalog/calc/lib/polygenicscore.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,17 @@ class PolygenicScore:
>>> aggregated_score = pgs1 + pgs2
>>> aggregated_score # doctest: +ELLIPSIS
PolygenicScore(sampleset='test', path=None)
PolygenicScore(sampleset='test', path='(in-memory)')
Once a score has been fully aggregated it can be helpful to recalculate an average:
>>> reprlib.repr(aggregated_score.average().to_dict()) # doctest: +ELLIPSIS
"{'DENOM': {('test', 'HG00096'): 3128, ('test', 'HG00097'): 3128, ('test', 'HG00099'): 3128, ('test', 'HG00100'): 3128, ...}, 'PGS001229_22_AVG': {('test', 'HG00096'): 0.0003484782608695652, ('test', 'HG00097'): 0.00043120268542199493, ('test', 'HG00099'): 0.0004074616368286445, ('test', 'HG00100'): 0.0005523938618925831, ...}}"
>>> aggregated_score.average()
>>> aggregated_score.df # doctest: +ELLIPSIS,+NORMALIZE_WHITESPACE
PGS SUM DENOM AVG
sampleset IID
test HG00096 PGS001229_22 1.090040 3128 0.000348
HG00097 PGS001229_22 1.348802 3128 0.000431
...
Scores can be written to a TSV file:
Expand Down Expand Up @@ -321,14 +326,19 @@ def __init__(self, *, path=None, df=None, sampleset=None):
if self.sampleset is None:
raise TypeError("Missing sampleset")

self._chunksize = 50000
self._df = df
self._melted = False

def __repr__(self):
return f"{type(self).__name__}(sampleset={repr(self.sampleset)}, path={repr(self.path)})"
if self.path is None:
path = repr("(in-memory)")
else:
path = repr(self.path)
return f"{type(self).__name__}(sampleset={repr(self.sampleset)}, path={path})"

def __add__(self, other):
if isinstance(other, PolygenicScore):
logger.info(f"Doing element-wise addition: {self} + {other}")
sumdf = self.df.add(other.df, fill_value=0)
return PolygenicScore(sampleset=self.sampleset, df=sumdf)
else:
Expand Down Expand Up @@ -361,32 +371,38 @@ def read(self):
return df

def average(self):
"""Recalculate average."""
"""Update the dataframe with a recalculated average."""
logger.info("Recalculating average")
if not self._melted:
self.melt()

df = self.df
avgs = df.filter(regex="SUM$")
avgs = avgs.divide(df.DENOM, axis=0)
avgs.insert(0, "DENOM", df.DENOM)
avgs.columns = avgs.columns.str.replace("_SUM", "_AVG")
return avgs
df["AVG"] = df.SUM / df.DENOM
self._df = df

def melt(self):
"""Melt dataframe from wide format to long format"""
sum_df = _melt(self.df, value_name="SUM")
avg_df = _melt(self.average(), value_name="AVG")
df = pd.concat([sum_df, avg_df.AVG], axis=1)
"""Update the dataframe with a melted version (wide format to long format)"""
logger.info("Melting dataframe from wide to long format")
df = self.df.melt(
id_vars=["DENOM"],
value_name="SUM",
var_name="PGS",
ignore_index=False,
)
# e.g. PGS000822_SUM -> PGS000822
df["PGS"] = df["PGS"].str.replace("_SUM", "")
# melted chunks need a consistent column order
return df[["PGS", "SUM", "DENOM", "AVG"]]
self._df = df[["PGS", "SUM", "DENOM"]]
self._melted = True

def write(self, outdir, split=False, melt=True):
def write(self, outdir, split=False):
"""Write PGS to a compressed TSV"""
outdir = pathlib.Path(outdir)

if melt:
logger.info("Melting before write to TSV")
df = self.melt()
else:
logger.info("Writing wide format to TSV")
df = self.df
if not self._melted:
self.melt()

df = self.df

if split:
logger.info("Writing results split by sampleset")
Expand All @@ -408,15 +424,3 @@ def _select_agg_cols(cols):
for x in cols
if (x.endswith("_SUM") and (x != "NAMED_ALLELE_DOSAGE_SUM")) or (x in keep_cols)
]


def _melt(df, value_name):
df = df.melt(
id_vars=["DENOM"],
value_name=value_name,
var_name="PGS",
ignore_index=False,
)
# e.g. PGS000822_SUM -> PGS000822
df["PGS"] = df["PGS"].str.replace(f"_{value_name}", "")
return df
17 changes: 16 additions & 1 deletion pgscatalog.core/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pgscatalog.core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tenacity = "^8.2.3"
pyliftover = "^0.4"
xopen = {version = "^1.8.0", extras = ["zstd"]}
tqdm = "^4.66.1"
natsort = "^8.4.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.4"
Expand Down
4 changes: 4 additions & 0 deletions pgscatalog.core/src/pgscatalog/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
RelabelArgs,
relabel,
relabel_write,
effect_type_keyfunc,
chrom_keyfunc,
)

log_fmt = "%(name)s: %(asctime)s %(levelname)-8s %(message)s"
Expand Down Expand Up @@ -74,6 +76,8 @@
"RelabelArgs",
"relabel",
"relabel_write",
"effect_type_keyfunc",
"chrom_keyfunc",
]

__version__ = "0.1.2"
3 changes: 3 additions & 0 deletions pgscatalog.core/src/pgscatalog/core/lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .genomebuild import GenomeBuild
from .targetvariants import TargetVariants, TargetVariant, TargetType
from ._relabel import RelabelArgs, relabel, relabel_write
from ._sortpaths import effect_type_keyfunc, chrom_keyfunc
from .pgsexceptions import (
BasePGSException,
MatchError,
Expand Down Expand Up @@ -59,4 +60,6 @@
"RelabelArgs",
"relabel",
"relabel_write",
"effect_type_keyfunc",
"chrom_keyfunc",
]
29 changes: 29 additions & 0 deletions pgscatalog.core/src/pgscatalog/core/lib/_sortpaths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
""" This module assumes you're working with paths that follow the format:
{sampleset}_{chrom}_{effect_type}_{n}
"""
from natsort import natsort_keygen, ns


def effect_type_keyfunc():
"""Return a key that sorts by effect type and n. Chromosome order doesn't matter.
This is useful for things like itertools.groupby which expect sorted input
>>> import pathlib
>>> paths = [pathlib.Path("ukb_2_dominant_0.txt.gz"), pathlib.Path("ukb_X_additive_0.txt.gz"), pathlib.Path("ukb_X_additive_1.txt.gz"), pathlib.Path("ukb_1_recessive_0.txt.gz")]
>>> sorted(paths, key=effect_type_keyfunc())
[PosixPath('ukb_X_additive_0.txt.gz'), PosixPath('ukb_X_additive_1.txt.gz'), PosixPath('ukb_2_dominant_0.txt.gz'), PosixPath('ukb_1_recessive_0.txt.gz')]
"""
return natsort_keygen(key=lambda x: x.stem.split("_")[2:], alg=ns.REAL)


def chrom_keyfunc():
"""Return a key that sorts by chromosome, including non-integer chromosomes
>>> import pathlib
>>> paths = [pathlib.Path("ukb_2_additive_0.txt.gz"), pathlib.Path("ukb_X_additive_0.txt.gz"), pathlib.Path("ukb_1_additive_0.txt.gz")]
>>> sorted(paths, key=chrom_keyfunc())
[PosixPath('ukb_1_additive_0.txt.gz'), PosixPath('ukb_2_additive_0.txt.gz'), PosixPath('ukb_X_additive_0.txt.gz')]
"""
return natsort_keygen(key=lambda x: x.stem.split("_")[1], alg=ns.REAL)
Loading

0 comments on commit 1664852

Please sign in to comment.