Skip to content

Commit

Permalink
Merge pull request #26 from PGScatalog/dev
Browse files Browse the repository at this point in the history
`v0.2.0` release
  • Loading branch information
smlmbrt authored Oct 11, 2022
2 parents 060982e + 4a21393 commit afd3a53
Show file tree
Hide file tree
Showing 46 changed files with 2,279 additions and 422 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ FROM python:3.10

WORKDIR /opt/

COPY --from=builder /app/dist/pgscatalog_utils-0.1.2-py3-none-any.whl .
COPY --from=builder /app/dist/pgscatalog_utils-0.2.0-py3-none-any.whl .

RUN pip install pgscatalog_utils-0.1.2-py3-none-any.whl
RUN pip install pgscatalog_utils-0.2.0-py3-none-any.whl

RUN apt-get update && apt-get install -y sqlite3
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

[![CI](https://github.com/PGScatalog/pgscatalog_utils/actions/workflows/main.yml/badge.svg)](https://github.com/PGScatalog/pgscatalog_utils/actions/workflows/main.yml)

This repository is a collection of useful tools for working with data from the
PGS Catalog. This is mostly used internally by the PGS Catalog calculator, but
other users might find some of these tools helpful.
This repository is a collection of useful tools for downloading and working with scoring files from the
PGS Catalog. This is mostly used internally by the PGS Catalog Calculator ([`PGScatalog/pgsc_calc`](https://github.com/PGScatalog/pgsc_calc)); however, other users may find some of these tools helpful.

## Overview

Expand All @@ -13,6 +12,7 @@ other users might find some of these tools helpful.
in 'long' format
* `match_variants`: Match target variants (bim or pvar files) against the output
of `combine_scorefile` to produce scoring files for plink 2
* `validate_scorefiles`: Check/validate that the scoring files and harmonized scoring files match the PGS Catalog scoring file formats.

## Installation

Expand All @@ -26,6 +26,7 @@ $ pip install pgscatalog-utils
$ download_scorefiles -i PGS000922 PGS001229 -o . -b GRCh37
$ combine_scorefiles -s PGS*.txt.gz -o combined.txt
$ match_variants -s combined.txt -t <example.pvar> --min_overlap 0.75 --outdir .
$ validate_scorefiles -t formatted --dir <scoringfiles_directory> --log_dir <logs_directory>
```

More details are available using the `--help` parameter.
Expand Down Expand Up @@ -66,4 +67,4 @@ doi:[10.1038/s41588-021-00783-5](https://doi.org/10.1038/s41588-021-00783-5).

This work has received funding from EMBL-EBI core funds, the Baker Institute, the University of Cambridge,
Health Data Research UK (HDRUK), and the European Union's Horizon 2020 research and innovation programme
under grant agreement No 101016775 INTERVENE.
under grant agreement No 101016775 INTERVENE.
2 changes: 1 addition & 1 deletion pgscatalog_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.2'
__version__ = '0.2.0'
Empty file.
92 changes: 92 additions & 0 deletions pgscatalog_utils/aggregate/aggregate_scores.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import argparse
import textwrap

import pandas as pd

from pgscatalog_utils.config import set_logging_level
import glob
import logging

logger = logging.getLogger(__name__)


def aggregate_scores():
args = _parse_args()
set_logging_level(args.verbose)
df = aggregate(list(set(args.scores)))
logger.debug("Compressing and writing combined scores")
df.to_csv('aggregated_scores.txt.gz', sep='\t', compression='gzip')


def aggregate(scorefiles: list[str]):
combined = pd.DataFrame()
aggcols = set()

for i, path in enumerate(scorefiles):
logger.debug(f"Reading {path}")
# pandas can automatically detect zst compression, neat!
df = (pd.read_table(path)
.assign(sampleset=path.split('_')[0])
.set_index(['sampleset', '#IID']))

df.index.names = ['sampleset', 'IID']

# Subset to aggregatable columns
df = df[_select_agg_cols(df.columns)]
aggcols.update(set(df.columns))

# Combine DFs
if i == 0:
logger.debug('Initialising combined DF')
combined = df.copy()
else:
logger.debug('Adding to combined DF')
combined = combined.add(df, fill_value=0)

assert all([x in combined.columns for x in aggcols]), "All Aggregatable Columns are present in the final DF"

return combined.pipe(_calculate_average)


def _calculate_average(combined: pd.DataFrame):
logger.debug("Averaging data")
avgs = combined.loc[:, combined.columns.str.endswith('_SUM')].divide(combined['DENOM'], axis=0)
avgs.columns = avgs.columns.str.replace('_SUM', '_AVG')
return pd.concat([combined, avgs], axis=1)


def _select_agg_cols(cols):
keep_cols = ['DENOM']
return [x for x in cols if (x.endswith('_SUM') and (x != 'NAMED_ALLELE_DOSAGE_SUM')) or (x in keep_cols)]


def _description_text() -> str:
return textwrap.dedent('''
Aggregate plink .sscore files into a combined TSV table.
This aggregation sums scores that were calculated from plink
.scorefiles. Scorefiles may be split to calculate scores over different
chromosomes or effect types. The PGS Catalog calculator automatically splits
scorefiles where appropriate, and uses this script to combine them.
Input .sscore files can be optionally compressed with zstd or gzip.
The aggregated output scores are compressed with gzip.
''')


def _parse_args(args=None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=_description_text(),
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-s', '--scores', dest='scores', required=True, nargs='+',
help='<Required> List of scorefile paths. Use a wildcard (*) to select multiple files.')
parser.add_argument('-o', '--outdir', dest='outdir', required=True,
default='scores/', help='<Required> Output directory to store downloaded files')
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
help='<Optional> Extra logging information')
return parser.parse_args(args)


if __name__ == "__main__":
aggregate_scores()

2 changes: 2 additions & 0 deletions pgscatalog_utils/log_config.py → pgscatalog_utils/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

POLARS_MAX_THREADS = 1 # dummy value, is reset by args.n_threads (default: 1)


def set_logging_level(verbose: bool):
log_fmt = "%(name)s: %(asctime)s %(levelname)-8s %(message)s"
Expand Down
36 changes: 28 additions & 8 deletions pgscatalog_utils/download/download_scorefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
import os
import shutil
import textwrap
import time
from contextlib import closing
from functools import reduce
from urllib import request as request
from urllib.error import HTTPError, URLError

from pgscatalog_utils.download.publication import query_publication
from pgscatalog_utils.download.score import get_url
from pgscatalog_utils.download.trait import query_trait
from pgscatalog_utils.log_config import set_logging_level
from pgscatalog_utils.config import set_logging_level

logger = logging.getLogger(__name__)

Expand All @@ -31,21 +33,25 @@ def download_scorefile() -> None:

pgs_lst: list[list[str]] = []

pgsc_calc_info = None
if args.pgsc_calc:
pgsc_calc_info = args.pgsc_calc

if args.efo:
logger.debug("--trait set, querying traits")
pgs_lst = pgs_lst + [query_trait(x) for x in args.efo]
pgs_lst = pgs_lst + [query_trait(x, pgsc_calc_info) for x in args.efo]

if args.pgp:
logger.debug("--pgp set, querying publications")
pgs_lst = pgs_lst + [query_publication(x) for x in args.pgp]
pgs_lst = pgs_lst + [query_publication(x, pgsc_calc_info) for x in args.pgp]

if args.pgs:
logger.debug("--id set, querying scores")
pgs_lst.append(args.pgs) # pgs_lst: a list containing up to three flat lists

pgs_id: list[str] = list(set(reduce(lambda x, y: x + y, pgs_lst)))

urls: dict[str, str] = get_url(pgs_id, args.build)
urls: dict[str, str] = get_url(pgs_id, args.build, pgsc_calc_info)

for pgsid, url in urls.items():
logger.debug(f"Downloading {pgsid} from {url}")
Expand All @@ -62,14 +68,26 @@ def _mkdir(outdir: str) -> None:
os.makedirs(outdir)


def _download_ftp(url: str, path: str) -> None:
def _download_ftp(url: str, path: str, retry:int = 0) -> None:
if os.path.exists(path):
logger.warning(f"File already exists at {path}, skipping download")
return
else:
with closing(request.urlopen(url)) as r:
with open(path, 'wb') as f:
shutil.copyfileobj(r, f)
try:
with closing(request.urlopen(url)) as r:
with open(path, 'wb') as f:
shutil.copyfileobj(r, f)
except (HTTPError, URLError) as error:
max_retries = 5
print(f'Download failed: {error.reason}')
# Retry to download the file if the server is busy
if '421' in error.reason and retry < max_retries:
print(f'> Retry to download the file ... attempt {retry+1} out of {max_retries}.')
retry += 1
time.sleep(10)
_download_ftp(url,path,retry)
else:
raise RuntimeError("Failed to download '{}'.\nError message: '{}'".format(url, error.reason))


def _check_args(args):
Expand Down Expand Up @@ -121,6 +139,8 @@ def _parse_args(args=None) -> argparse.Namespace:
parser.add_argument('-o', '--outdir', dest='outdir', required=True,
default='scores/',
help='<Required> Output directory to store downloaded files')
parser.add_argument('-c', '--pgsc_calc', dest='pgsc_calc',
help='<Optional> Provide information about downloading scoring files via pgsc_calc')
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
help='<Optional> Extra logging information')
return parser.parse_args(args)
Expand Down
12 changes: 6 additions & 6 deletions pgscatalog_utils/download/publication.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import logging
from functools import reduce

import requests
from pgscatalog_utils.download.score import query_api

logger = logging.getLogger(__name__)


def query_publication(pgp: str) -> list[str]:
api: str = f'https://www.pgscatalog.org/rest/publication/{pgp}'
def query_publication(pgp: str, user_agent:str = None) -> list[str]:
logger.debug("Querying PGS Catalog with publication PGP ID")
r: requests.models.Response = requests.get(api)
api: str = f'/publication/{pgp}'
results_json = query_api(api, user_agent)

if r.json() == {}:
if results_json == {} or results_json == None:
logger.critical(f"Bad response from PGS Catalog for EFO term: {pgp}")
raise Exception

pgs: dict[str, list[str]] = r.json().get('associated_pgs_ids')
pgs: dict[str, list[str]] = results_json.get('associated_pgs_ids')
logger.debug(f"Valid response from PGS Catalog for PGP ID: {pgp}")
return list(reduce(lambda x, y: set(x).union(set(y)), pgs.values()))
43 changes: 37 additions & 6 deletions pgscatalog_utils/download/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@

import jq
import requests
import time
from pgscatalog_utils import __version__ as pgscatalog_utils_version

logger = logging.getLogger(__name__)


def get_url(pgs: list[str], build: str) -> dict[str, str]:
def get_url(pgs: list[str], build: str, user_agent:str = None) -> dict[str, str]:
pgs_result: list[str] = []
url_result: list[str] = []

for chunk in _chunker(pgs):
try:
response = _parse_json_query(query_score(chunk), build)
response = _parse_json_query(query_score(chunk,user_agent), build)
pgs_result = pgs_result + list(response.keys())
url_result = url_result + list(response.values())
except TypeError:
Expand All @@ -28,11 +30,40 @@ def get_url(pgs: list[str], build: str) -> dict[str, str]:
return dict(zip(pgs_result, url_result))


def query_score(pgs_id: list[str]) -> dict:
def query_api(api: str, user_agent:str = None, retry:int = 0) -> dict:
max_retries = 5
wait = 60
results_json = None
rest_url_root = 'https://www.pgscatalog.org/rest'
# Set pgscatalog_utils user agent if none provided
if not user_agent:
user_agent = 'pgscatalog_utils/'+pgscatalog_utils_version
try:
headers = {'User-Agent': user_agent}
r: requests.models.Response = requests.get(rest_url_root+api, headers=headers)
r.raise_for_status()
results_json = r.json()
except requests.exceptions.HTTPError as e:
print(f'HTTP Error: {e}')
if r.status_code in [421,429] and retry < 5:
retry +=1
print(f'> Retry to query the PGS Catalog REST API in {wait}s ... attempt {retry} out of {max_retries}.')
time.sleep(wait)
results_json = query_api(api,retry)
except requests.exceptions.ConnectionError as e:
print(f'Error Connecting: {e}')
except requests.exceptions.Timeout as e:
print(f'Timeout Error: {e}')
except requests.exceptions.RequestException as e:
print(f'Request Error: {e}')
return results_json


def query_score(pgs_id: list[str], user_agent:str = None) -> dict:
pgs: str = ','.join(pgs_id)
api: str = f'https://www.pgscatalog.org/rest/score/search?pgs_ids={pgs}'
r: requests.models.Response = requests.get(api)
return r.json()
api: str = f'/score/search?pgs_ids={pgs}'
results_json = query_api(api, user_agent)
return results_json


def _chunker(pgs: list[str]):
Expand Down
12 changes: 6 additions & 6 deletions pgscatalog_utils/download/trait.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import logging
from functools import reduce

import requests
from pgscatalog_utils.download.score import query_api

logger = logging.getLogger(__name__)


def query_trait(trait: str) -> list[str]:
api: str = f'https://www.pgscatalog.org/rest/trait/{trait}?include_children=1'
def query_trait(trait: str, user_agent:str = None) -> list[str]:
logger.debug(f"Querying PGS Catalog with trait {trait}")
r: requests.models.Response = requests.get(api)
api: str = f'/trait/{trait}?include_children=1'
results_json = query_api(api, user_agent)

if r.json() == {}:
if results_json == {} or results_json == None:
logger.critical(f"Bad response from PGS Catalog for EFO term: {trait}")
raise Exception

keys: list[str] = ['associated_pgs_ids', 'child_associated_pgs_ids']
pgs: list[str] = []
for key in keys:
pgs.append(r.json().get(key))
pgs.append(results_json.get(key))

logger.debug(f"Valid response from PGS Catalog for EFO term: {trait}")
return list(reduce(lambda x, y: set(x).union(set(y)), pgs))
Loading

0 comments on commit afd3a53

Please sign in to comment.