Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve aggregation #23

Merged
merged 5 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions pgscatalog.calc/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pgscatalog.calc/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ packages = [

[tool.poetry.dependencies]
python = "^3.11"
"pgscatalog.core" = "^0.1.0"
"pgscatalog.core" = {path = "../pgscatalog.core", develop = true}
numpy = "^1.26.4"
pandas = "^2.2.0"
pyarrow = "^15.0.0"
Expand Down
4 changes: 1 addition & 3 deletions pgscatalog.calc/src/pgscatalog/calc/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
from .aggregate_cli import run_aggregate

__all__ = ["run_aggregate"]
__all__ = []
50 changes: 43 additions & 7 deletions pgscatalog.calc/src/pgscatalog/calc/cli/aggregate_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import logging
import pathlib
import textwrap
import operator
import functools
from collections import deque
from typing import Optional

from ..lib.polygenicscore import PolygenicScore
from ..lib import PolygenicScore
from pgscatalog.core import chrom_keyfunc

logger = logging.getLogger(__name__)

Expand All @@ -21,15 +22,50 @@ def run_aggregate():

if args.verbose:
logger.setLevel(logging.INFO)
logging.getLogger("pgscatalog.core").setLevel(logging.INFO)
logging.getLogger("pgscatalog.calc").setLevel(logging.INFO)

if not (outdir := pathlib.Path(args.outdir)).exists():
raise FileNotFoundError(f"--outdir {outdir.name} doesn't exist")

score_paths = [pathlib.Path(x) for x in args.scores]
pgs = [PolygenicScore(path=x) for x in score_paths]
# call __add__ a lot
aggregated = functools.reduce(operator.add, pgs)
score_paths = sorted([pathlib.Path(x) for x in args.scores], key=chrom_keyfunc())
# dfs are only read into memory after accessing them explicitly e.g. pgs[0].df
pgs = deque(PolygenicScore(path=x) for x in score_paths)

observed_columns = set()
aggregated: Optional[PolygenicScore] = None

# first, use PolygenicScore's __add__ method, which implements df.add(fill_value=0)
while pgs:
# popleft ensures that dfs are removed from memory after each aggregation
score: PolygenicScore = pgs.popleft()
if aggregated is None:
logger.info(f"Initialising aggregation with {score}")
aggregated: PolygenicScore = score
else:
logger.info(f"Adding {score}")
aggregated += score
observed_columns.update(set(score.df.columns))

# check to make sure that every column we saw in the dataframes is in the output
if (dfcols := set(aggregated.df.columns)) != observed_columns:
raise ValueError(
f"Missing columns in aggregated file!. "
f"Observed: {observed_columns}. "
f"In aggregated: {dfcols}"
)
else:
logger.info("Aggregated columns match observed columns")

# next, melt the plink2 scoring files from wide (many columns) format to long format
aggregated.melt()

# recalculate PGS average using aggregated SUM and DENOM
aggregated.average()

logger.info("Aggregation finished! Writing to a file")
aggregated.write(outdir=args.outdir, split=args.split)
logger.info("all done. bye :)")


def _description_text() -> str:
Expand Down
74 changes: 39 additions & 35 deletions pgscatalog.calc/src/pgscatalog/calc/lib/polygenicscore.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,17 @@ class PolygenicScore:

>>> aggregated_score = pgs1 + pgs2
>>> aggregated_score # doctest: +ELLIPSIS
PolygenicScore(sampleset='test', path=None)
PolygenicScore(sampleset='test', path='(in-memory)')

Once a score has been fully aggregated it can be helpful to recalculate an average:

>>> reprlib.repr(aggregated_score.average().to_dict()) # doctest: +ELLIPSIS
"{'DENOM': {('test', 'HG00096'): 3128, ('test', 'HG00097'): 3128, ('test', 'HG00099'): 3128, ('test', 'HG00100'): 3128, ...}, 'PGS001229_22_AVG': {('test', 'HG00096'): 0.0003484782608695652, ('test', 'HG00097'): 0.00043120268542199493, ('test', 'HG00099'): 0.0004074616368286445, ('test', 'HG00100'): 0.0005523938618925831, ...}}"
>>> aggregated_score.average()
>>> aggregated_score.df # doctest: +ELLIPSIS,+NORMALIZE_WHITESPACE
PGS SUM DENOM AVG
sampleset IID
test HG00096 PGS001229_22 1.090040 3128 0.000348
HG00097 PGS001229_22 1.348802 3128 0.000431
...

Scores can be written to a TSV file:

Expand Down Expand Up @@ -321,14 +326,19 @@ def __init__(self, *, path=None, df=None, sampleset=None):
if self.sampleset is None:
raise TypeError("Missing sampleset")

self._chunksize = 50000
self._df = df
self._melted = False

def __repr__(self):
return f"{type(self).__name__}(sampleset={repr(self.sampleset)}, path={repr(self.path)})"
if self.path is None:
path = repr("(in-memory)")
else:
path = repr(self.path)
return f"{type(self).__name__}(sampleset={repr(self.sampleset)}, path={path})"

def __add__(self, other):
if isinstance(other, PolygenicScore):
logger.info(f"Doing element-wise addition: {self} + {other}")
sumdf = self.df.add(other.df, fill_value=0)
return PolygenicScore(sampleset=self.sampleset, df=sumdf)
else:
Expand Down Expand Up @@ -361,32 +371,38 @@ def read(self):
return df

def average(self):
"""Recalculate average."""
"""Update the dataframe with a recalculated average."""
logger.info("Recalculating average")
if not self._melted:
self.melt()

df = self.df
avgs = df.filter(regex="SUM$")
avgs = avgs.divide(df.DENOM, axis=0)
avgs.insert(0, "DENOM", df.DENOM)
avgs.columns = avgs.columns.str.replace("_SUM", "_AVG")
return avgs
df["AVG"] = df.SUM / df.DENOM
self._df = df

def melt(self):
"""Melt dataframe from wide format to long format"""
sum_df = _melt(self.df, value_name="SUM")
avg_df = _melt(self.average(), value_name="AVG")
df = pd.concat([sum_df, avg_df.AVG], axis=1)
"""Update the dataframe with a melted version (wide format to long format)"""
logger.info("Melting dataframe from wide to long format")
df = self.df.melt(
id_vars=["DENOM"],
value_name="SUM",
var_name="PGS",
ignore_index=False,
)
# e.g. PGS000822_SUM -> PGS000822
df["PGS"] = df["PGS"].str.replace("_SUM", "")
# melted chunks need a consistent column order
return df[["PGS", "SUM", "DENOM", "AVG"]]
self._df = df[["PGS", "SUM", "DENOM"]]
self._melted = True

def write(self, outdir, split=False, melt=True):
def write(self, outdir, split=False):
"""Write PGS to a compressed TSV"""
outdir = pathlib.Path(outdir)

if melt:
logger.info("Melting before write to TSV")
df = self.melt()
else:
logger.info("Writing wide format to TSV")
df = self.df
if not self._melted:
self.melt()

df = self.df

if split:
logger.info("Writing results split by sampleset")
Expand All @@ -408,15 +424,3 @@ def _select_agg_cols(cols):
for x in cols
if (x.endswith("_SUM") and (x != "NAMED_ALLELE_DOSAGE_SUM")) or (x in keep_cols)
]


def _melt(df, value_name):
df = df.melt(
id_vars=["DENOM"],
value_name=value_name,
var_name="PGS",
ignore_index=False,
)
# e.g. PGS000822_SUM -> PGS000822
df["PGS"] = df["PGS"].str.replace(f"_{value_name}", "")
return df
17 changes: 16 additions & 1 deletion pgscatalog.core/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pgscatalog.core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tenacity = "^8.2.3"
pyliftover = "^0.4"
xopen = {version = "^1.8.0", extras = ["zstd"]}
tqdm = "^4.66.1"
natsort = "^8.4.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.4"
Expand Down
4 changes: 4 additions & 0 deletions pgscatalog.core/src/pgscatalog/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
RelabelArgs,
relabel,
relabel_write,
effect_type_keyfunc,
chrom_keyfunc,
)

log_fmt = "%(name)s: %(asctime)s %(levelname)-8s %(message)s"
Expand Down Expand Up @@ -74,6 +76,8 @@
"RelabelArgs",
"relabel",
"relabel_write",
"effect_type_keyfunc",
"chrom_keyfunc",
]

__version__ = "0.1.2"
3 changes: 3 additions & 0 deletions pgscatalog.core/src/pgscatalog/core/lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .genomebuild import GenomeBuild
from .targetvariants import TargetVariants, TargetVariant, TargetType
from ._relabel import RelabelArgs, relabel, relabel_write
from ._sortpaths import effect_type_keyfunc, chrom_keyfunc
from .pgsexceptions import (
BasePGSException,
MatchError,
Expand Down Expand Up @@ -59,4 +60,6 @@
"RelabelArgs",
"relabel",
"relabel_write",
"effect_type_keyfunc",
"chrom_keyfunc",
]
29 changes: 29 additions & 0 deletions pgscatalog.core/src/pgscatalog/core/lib/_sortpaths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
""" This module assumes you're working with paths that follow the format:

{sampleset}_{chrom}_{effect_type}_{n}
"""
from natsort import natsort_keygen, ns


def effect_type_keyfunc():
"""Return a key that sorts by effect type and n. Chromosome order doesn't matter.

This is useful for things like itertools.groupby which expect sorted input

>>> import pathlib
>>> paths = [pathlib.Path("ukb_2_dominant_0.txt.gz"), pathlib.Path("ukb_X_additive_0.txt.gz"), pathlib.Path("ukb_X_additive_1.txt.gz"), pathlib.Path("ukb_1_recessive_0.txt.gz")]
>>> sorted(paths, key=effect_type_keyfunc())
[PosixPath('ukb_X_additive_0.txt.gz'), PosixPath('ukb_X_additive_1.txt.gz'), PosixPath('ukb_2_dominant_0.txt.gz'), PosixPath('ukb_1_recessive_0.txt.gz')]
"""
return natsort_keygen(key=lambda x: x.stem.split("_")[2:], alg=ns.REAL)


def chrom_keyfunc():
"""Return a key that sorts by chromosome, including non-integer chromosomes

>>> import pathlib
>>> paths = [pathlib.Path("ukb_2_additive_0.txt.gz"), pathlib.Path("ukb_X_additive_0.txt.gz"), pathlib.Path("ukb_1_additive_0.txt.gz")]
>>> sorted(paths, key=chrom_keyfunc())
[PosixPath('ukb_1_additive_0.txt.gz'), PosixPath('ukb_2_additive_0.txt.gz'), PosixPath('ukb_X_additive_0.txt.gz')]
"""
return natsort_keygen(key=lambda x: x.stem.split("_")[1], alg=ns.REAL)
Loading
Loading