Skip to content

Commit

Permalink
Merge pull request #29 from PGScatalog/dev
Browse files Browse the repository at this point in the history
v0.3.0
  • Loading branch information
nebfield authored Nov 21, 2022
2 parents afd3a53 + 01a92ac commit e220f14
Show file tree
Hide file tree
Showing 28 changed files with 1,512 additions and 375 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
with:
python-version: '3.10'
- name: Python Poetry Action
uses: abatilo/actions-poetry@v2.1.3
uses: abatilo/actions-poetry@v2.1.6
- name: Install
run: poetry install
- name: Test
Expand Down
32 changes: 22 additions & 10 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@

FROM python:3.10 as builder

# docker build --build-arg "ENV=PROD" ...

ARG ENV

RUN apt-get update && apt-get install -y sqlite3

WORKDIR /app
COPY . /app/

RUN pip install poetry && poetry config virtualenvs.in-project true && \
poetry install --no-ansi --no-dev

RUN poetry build
RUN pip install poetry

RUN python -m venv /venv

COPY install.sh poetry.lock pyproject.toml /app

RUN chmod +x install.sh && ./install.sh

COPY . .

RUN poetry build && /venv/bin/pip install dist/*.whl

FROM python:3.10
FROM builder as final

WORKDIR /opt/
COPY --from=builder /venv /venv

COPY --from=builder /app/dist/pgscatalog_utils-0.2.0-py3-none-any.whl .
ENV PATH="/venv/bin:${PATH}"

RUN pip install pgscatalog_utils-0.2.0-py3-none-any.whl

RUN apt-get update && apt-get install -y sqlite3
2 changes: 2 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from pgscatalog_utils.match.preprocess import complement_valid_alleles
from pgscatalog_utils.scorefile.combine_scorefiles import combine_scorefiles

pl.toggle_string_cache(True)


@pytest.fixture(scope="session")
def pgs_accessions():
Expand Down
7 changes: 7 additions & 0 deletions install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env sh

if [ ${ENV} = "DEV" ]; then
poetry export --dev --without-hashes -f requirements.txt | /venv/bin/pip install -r /dev/stdin
else
poetry export --without-hashes -f requirements.txt | /venv/bin/pip install -r /dev/stdin
fi
2 changes: 1 addition & 1 deletion pgscatalog_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.2.0'
__version__ = '0.3.0'
51 changes: 50 additions & 1 deletion pgscatalog_utils/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,41 @@
import atexit
import logging
import os
import tempfile

POLARS_MAX_THREADS = 1 # dummy value, is reset by args.n_threads (default: 1)
import polars as pl

from pgscatalog_utils.match import tempdir

N_THREADS: int = 1 # dummy value, is reset by args.n_threads (default: 1)
OUTDIR: str = "." # dummy value, reset by args.outdir
TEMPDIR: tempfile.TemporaryDirectory

logger = logging.getLogger(__name__)


def setup_tmpdir(outdir, combine=False):
if combine:
work_dir = "work_combine"
dirs = [work_dir]
else:
work_dir = "work_match"
dirs = [work_dir, "matches"]

for d in dirs:
if os.path.exists(d):
logger.critical(f"{d} already exists, bailing out")
logger.critical("Please choose a different --outdir or clean up")
raise SystemExit(1)

global TEMPDIR
os.mkdir(os.path.join(outdir, work_dir))
TEMPDIR = tempfile.TemporaryDirectory(dir=os.path.join(outdir, work_dir))


def setup_cleaning():
logger.debug(F"Temporary directory set up: {TEMPDIR.name}")
atexit.register(tempdir.cleanup)


def set_logging_level(verbose: bool):
Expand All @@ -15,3 +50,17 @@ def set_logging_level(verbose: bool):
logging.basicConfig(level=logging.WARNING,
format=log_fmt,
datefmt='%Y-%m-%d %H:%M:%S')


def setup_polars_threads(n: int):
global N_THREADS
N_THREADS = n
os.environ['POLARS_MAX_THREADS'] = str(N_THREADS)
logger.debug(f"Using {N_THREADS} threads to read CSVs")
logger.debug(f"polars threadpool size: {pl.threadpool_size()}")

if pl.threadpool_size() != N_THREADS:
logger.warning(f"polars threadpool doesn't match -n argument ({pl.threadpool_size()} vs {n})")
logger.info("To silence this warning, set POLARS_MAX_THREADS to match -n before running combine_matches, e.g.:")
logger.info("$ export POLARS_MAX_THREADS=x")
logger.info("$ combine_matches ... -n x")
11 changes: 9 additions & 2 deletions pgscatalog_utils/download/download_scorefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ def download_scorefile() -> None:
pgsc_calc_info = args.pgsc_calc

if args.efo:
logger.debug("--trait set, querying traits")
pgs_lst = pgs_lst + [query_trait(x, pgsc_calc_info) for x in args.efo]
if args.efo_include_children:
logger.debug("--trait set, querying traits (including PGS for child terms)")
else:
logger.debug("--trait set, querying traits")
pgs_lst = pgs_lst + [query_trait(x, pgsc_calc_info, args.efo_include_children) for x in args.efo]


if args.pgp:
logger.debug("--pgp set, querying publications")
Expand Down Expand Up @@ -133,6 +137,9 @@ def _parse_args(args=None) -> argparse.Namespace:
parser.add_argument('-i', '--pgs', nargs='+', dest='pgs', help='PGS Catalog ID(s) (e.g. PGS000001)')
parser.add_argument('-t', '--efo', dest='efo', nargs='+',
help='Traits described by an EFO term(s) (e.g. EFO_0004611)')
parser.add_argument('-e', '--efo_direct', dest='efo_include_children', action='store_false',
help='<Optional> Return only PGS tagged with exact EFO term '
'(e.g. no PGS for child/descendant terms in the ontology)')
parser.add_argument('-p', '--pgp', dest='pgp', help='PGP publication ID(s) (e.g. PGP000007)', nargs='+')
parser.add_argument('-b', '--build', dest='build', choices=['GRCh37', 'GRCh38'],
help='Download Harmonized Scores with Positions in Genome build: GRCh37 or GRCh38')
Expand Down
2 changes: 1 addition & 1 deletion pgscatalog_utils/download/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def get_url(pgs: list[str], build: str, user_agent:str = None) -> dict[str, str]
response = _parse_json_query(query_score(chunk,user_agent), build)
pgs_result = pgs_result + list(response.keys())
url_result = url_result + list(response.values())
except TypeError:
except (AttributeError, TypeError):
logger.error(f"Bad response from PGS Catalog API. Is {pgs} a valid ID?")
sys.exit(1)

Expand Down
9 changes: 6 additions & 3 deletions pgscatalog_utils/download/trait.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@
logger = logging.getLogger(__name__)


def query_trait(trait: str, user_agent:str = None) -> list[str]:
def query_trait(trait: str, user_agent:str = None, include_children:bool = True) -> list[str]:
logger.debug(f"Querying PGS Catalog with trait {trait}")
api: str = f'/trait/{trait}?include_children=1'
api: str = f'/trait/{trait}?include_children=0'
results_json = query_api(api, user_agent)

if results_json == {} or results_json == None:
logger.critical(f"Bad response from PGS Catalog for EFO term: {trait}")
raise Exception

keys: list[str] = ['associated_pgs_ids', 'child_associated_pgs_ids']
keys: list[str] = ['associated_pgs_ids']
if include_children:
keys.append('child_associated_pgs_ids')

pgs: list[str] = []
for key in keys:
pgs.append(results_json.get(key))
Expand Down
73 changes: 73 additions & 0 deletions pgscatalog_utils/match/combine_matches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import argparse
import logging

import polars as pl

from pgscatalog_utils import config
from pgscatalog_utils.match.label import make_params_dict, label_matches
from pgscatalog_utils.match.match_variants import log_and_write, add_match_args
from pgscatalog_utils.match.read import read_scorefile

logger = logging.getLogger(__name__)


def combine_matches():
args = _parse_args()
config.set_logging_level(args.verbose)
config.setup_polars_threads(args.n_threads)
config.setup_tmpdir(args.outdir, combine=True)
config.OUTDIR = args.outdir

with pl.StringCache():
scorefile = read_scorefile(path=args.scorefile, chrom=None) # chrom=None to read all variants
logger.debug("Reading matches")
matches = pl.concat([pl.scan_ipc(x, memory_map=False, rechunk=False) for x in args.matches], rechunk=False)

logger.debug("Labelling match candidates")
params: dict[str, bool] = make_params_dict(args)
matches = matches.pipe(label_matches, params)

# make sure there's no duplicate variant_ids across matches in multiple pvars
# processing batched chromosomes with overlapping variants might cause problems
# e.g. chr1 1-100000, chr1 100001-500000
_check_duplicate_vars(matches)

dataset = args.dataset.replace('_', '-') # _ used as delimiter in pgsc_calc
log_and_write(matches=matches, scorefile=scorefile, dataset=dataset, args=args)


def _check_duplicate_vars(matches: pl.LazyFrame):
max_occurrence: list[int] = (matches.filter(pl.col('match_status') == 'matched')
.groupby(['accession', 'ID'])
.agg(pl.count())
.select('count')
.max()
.collect()
.get_column('count')
.to_list())
assert max_occurrence == [1], "Duplicate IDs in final matches"


def _parse_args(args=None):
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--dataset', dest='dataset', required=True,
help='<Required> Label for target genomic dataset')
parser.add_argument('-s', '--scorefile', dest='scorefile', required=True,
help='<Required> Path to scorefile')
parser.add_argument('-m', '--matches', dest='matches', required=True, nargs='+',
help='<Required> List of match files')
parser.add_argument('--min_overlap', dest='min_overlap', required=True,
type=float, help='<Required> Minimum proportion of variants to match before error')
parser = add_match_args(parser) # params for labelling matches
parser.add_argument('--outdir', dest='outdir', required=True,
help='<Required> Output directory')
parser.add_argument('--split', dest='split', default=False, action='store_true',
help='<Optional> Split scorefile per chromosome?')
parser.add_argument('-n', dest='n_threads', default=1, help='<Optional> n threads for matching', type=int)
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
help='<Optional> Extra logging information')
return parser.parse_args(args)


if __name__ == "__main__":
combine_matches()
6 changes: 3 additions & 3 deletions pgscatalog_utils/match/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@ def filter_scores(scorefile: pl.LazyFrame, matches: pl.LazyFrame, min_overlap: f
scores.append(df.with_column(pl.col('accession').cast(pl.Categorical)))

score_summary: pl.LazyFrame = pl.concat(scores).lazy()
filtered_scores: pl.DataFrame = (filtered_matches.join(score_summary, on='accession', how='left')
filtered_scores: pl.LazyFrame = (filtered_matches.join(score_summary, on='accession', how='left')
.filter(pl.col('score_pass') == True))

return filtered_scores, score_summary


def _calculate_match_rate(df: pl.DataFrame) -> pl.DataFrame:
def _calculate_match_rate(df: pl.LazyFrame) -> pl.LazyFrame:
logger.debug("Calculating overlap between target genome and scoring file")
return (df.groupby('accession')
.agg([pl.count(), (pl.col('match_type') == None).sum().alias('no_match')])
.with_column((pl.col('no_match') / pl.col('count')).alias('fail_rate')))


def _filter_matches(df: pl.LazyFrame) -> pl.LazyFrame:
logger.debug("Filtering variants with exclude flag")
logger.debug("Filtering to best_match variants (with exclude flag = False)")
return df.filter((pl.col('best_match') == True) & (pl.col('exclude') == False))


Expand Down
21 changes: 13 additions & 8 deletions pgscatalog_utils/match/label.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@
logger = logging.getLogger(__name__)


def make_params_dict(args) -> dict[str, bool]:
""" Make a dictionary with parameters that control labelling match candidates """
return {'keep_first_match': args.keep_first_match,
'remove_ambiguous': args.remove_ambiguous,
'skip_flip': args.skip_flip,
'remove_multiallelic': args.remove_multiallelic}


def label_matches(df: pl.LazyFrame, params: dict[str, bool]) -> pl.LazyFrame:
""" Label match candidates with additional metadata. Column definitions:
Expand Down Expand Up @@ -92,17 +100,14 @@ def _label_duplicate_best_match(df: pl.LazyFrame) -> pl.LazyFrame:
.otherwise(pl.lit(False))
.alias('duplicate_best_match'))
.drop('count')
.rename({'row_nr': 'score_row_nr'})
.with_row_count() # add temporary row count to get first variant
.with_row_count(name='temp_row_nr') # add temporary row count to get first variant
.with_column(pl.when((pl.col("best_match") == True) &
(pl.col("duplicate_best_match") == True) &
(pl.col("row_nr") > pl.min("row_nr")).over(
["accession", "score_row_nr"]))
(pl.col("temp_row_nr") > pl.min("temp_row_nr")).over(
["accession", "row_nr"]))
.then(False) # reset best match flag for duplicates
.otherwise(pl.col("best_match")) # just keep value from existing column
.alias('best_match_duplicate_row_nr'))
.drop(['row_nr', 'best_match'])
.rename({'score_row_nr': 'row_nr', 'best_match_duplicate_row_nr': 'best_match'}))
.alias('best_match')))

return labelled

Expand Down Expand Up @@ -209,4 +214,4 @@ def _label_flips(df: pl.LazyFrame, skip_flip: bool) -> pl.LazyFrame:
.alias('exclude'))
else:
logger.debug("Not excluding flipped matches")
return df
return df
Loading

0 comments on commit e220f14

Please sign in to comment.