Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.2.0 release #26

Merged
merged 52 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
77098be
Fist commit of the 'validate' utils
ens-lgil Sep 2, 2022
08409aa
Python 3.10 compatibility changes and minor updates
ens-lgil Sep 6, 2022
e2fc209
Code updates and add tests for validation
ens-lgil Sep 14, 2022
99eb7b1
Catch ftp download errors
ens-lgil Sep 14, 2022
5fe9fad
pin polars to 0.14.9
nebfield Sep 15, 2022
cd7ee78
bump version for next release
nebfield Sep 15, 2022
96896e3
Handle PGS Catalog REST API errors and retries
ens-lgil Sep 16, 2022
50b9e9e
Update README.md
smlmbrt Sep 20, 2022
268e96a
add memory profiler to development dependencies
nebfield Sep 21, 2022
012ff6d
add support for reading zstd compressed targets
nebfield Sep 21, 2022
7975951
remove single match mode, scan_csv not compatible with bytesIO (zstd)
nebfield Sep 21, 2022
b6aa2b0
compress matched scorefiles
nebfield Sep 21, 2022
92bd91e
skeleton aggreggation
nebfield Sep 21, 2022
db8f63a
Merge pull request #19 from ens-lgil/feature/api_error
nebfield Sep 21, 2022
98f8de7
Merge pull request #16 from ens-lgil/feature/download_error
nebfield Sep 21, 2022
da0105e
add aggregate_score
nebfield Sep 22, 2022
85715dc
Cleanup the code, ignore some user warnings from pandas_schema and up…
ens-lgil Sep 22, 2022
60e3d9f
Attempt to fix poetry error
ens-lgil Sep 22, 2022
f6d727f
Fix version discrepancies for numpy
ens-lgil Sep 22, 2022
13216fe
Fix path to the 'validate' test data files
ens-lgil Sep 22, 2022
b60b0aa
Update the README file
ens-lgil Sep 23, 2022
1881c17
fix df truthiness
nebfield Sep 27, 2022
3bf62c3
fix bumped version
nebfield Sep 27, 2022
f02c58c
batch process input to reduce memory usage
nebfield Sep 27, 2022
ae8ce14
read uncompressed data with a bufferedreader
nebfield Sep 27, 2022
3bb3e3d
add lazy evaluation
nebfield Sep 28, 2022
a559d76
improve RAM usage
nebfield Sep 30, 2022
5c15a67
fix reading bim files
nebfield Oct 3, 2022
0dc745d
fix tests
nebfield Oct 3, 2022
ba793fb
fix types
nebfield Oct 3, 2022
7bc290c
Merge pull request #22 from ens-lgil/feature/scoring_file_validator
nebfield Oct 4, 2022
470265d
Merge branch 'dev' into compress
nebfield Oct 4, 2022
a171e5b
update poetry lock file
nebfield Oct 4, 2022
593757c
treat lists of files consistently
nebfield Oct 4, 2022
353d8f2
Setup a user agent for the download_scorefiles utils (REST API calls …
ens-lgil Oct 4, 2022
cc41b4f
Improve library call
ens-lgil Oct 4, 2022
4eec95b
don't hold scorefiles in memory when combining them
nebfield Oct 4, 2022
03699e2
check if input and outputs are empty in combine_scorefiles
nebfield Oct 4, 2022
50b1517
Handle case where we might be removing the missing variants (not defa…
smlmbrt Oct 4, 2022
0a5dfbe
add parameter for n_threads, set POLARS_MAX_THREADS with it
nebfield Oct 5, 2022
ca1734e
move dropping multiallelics from preprocessing to labelling
nebfield Oct 5, 2022
8f1f771
move skipping flips from matching to labelling
nebfield Oct 5, 2022
f5b64cf
is_flipped -> match_flipped, fix uppercase match type
nebfield Oct 6, 2022
5b18299
move label_matches from get_all_matches to match_variants
nebfield Oct 7, 2022
df44d9b
fix setting n_threads when reading
nebfield Oct 7, 2022
be96d14
fix tests
nebfield Oct 7, 2022
d5cfcf0
add sort by match type
nebfield Oct 10, 2022
126f153
fix _match_multiple_targets
nebfield Oct 10, 2022
6aee170
oops
nebfield Oct 10, 2022
4443ea3
Merge pull request #20 from PGScatalog/compress
nebfield Oct 10, 2022
d3da286
Merge pull request #25 from ens-lgil/feature/user_agent
nebfield Oct 10, 2022
4a21393
Update pyproject.toml
smlmbrt Oct 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ FROM python:3.10

WORKDIR /opt/

COPY --from=builder /app/dist/pgscatalog_utils-0.1.2-py3-none-any.whl .
COPY --from=builder /app/dist/pgscatalog_utils-0.2.0-py3-none-any.whl .

RUN pip install pgscatalog_utils-0.1.2-py3-none-any.whl
RUN pip install pgscatalog_utils-0.2.0-py3-none-any.whl

RUN apt-get update && apt-get install -y sqlite3
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

[![CI](https://github.com/PGScatalog/pgscatalog_utils/actions/workflows/main.yml/badge.svg)](https://github.com/PGScatalog/pgscatalog_utils/actions/workflows/main.yml)

This repository is a collection of useful tools for working with data from the
PGS Catalog. This is mostly used internally by the PGS Catalog calculator, but
other users might find some of these tools helpful.
This repository is a collection of useful tools for downloading and working with scoring files from the
PGS Catalog. This is mostly used internally by the PGS Catalog Calculator ([`PGScatalog/pgsc_calc`](https://github.com/PGScatalog/pgsc_calc)); however, other users may find some of these tools helpful.

## Overview

Expand All @@ -13,6 +12,7 @@ other users might find some of these tools helpful.
in 'long' format
* `match_variants`: Match target variants (bim or pvar files) against the output
of `combine_scorefile` to produce scoring files for plink 2
* `validate_scorefiles`: Check/validate that the scoring files and harmonized scoring files match the PGS Catalog scoring file formats.

## Installation

Expand All @@ -26,6 +26,7 @@ $ pip install pgscatalog-utils
$ download_scorefiles -i PGS000922 PGS001229 -o . -b GRCh37
$ combine_scorefiles -s PGS*.txt.gz -o combined.txt
$ match_variants -s combined.txt -t <example.pvar> --min_overlap 0.75 --outdir .
$ validate_scorefiles -t formatted --dir <scoringfiles_directory> --log_dir <logs_directory>
```

More details are available using the `--help` parameter.
Expand Down Expand Up @@ -66,4 +67,4 @@ doi:[10.1038/s41588-021-00783-5](https://doi.org/10.1038/s41588-021-00783-5).

This work has received funding from EMBL-EBI core funds, the Baker Institute, the University of Cambridge,
Health Data Research UK (HDRUK), and the European Union's Horizon 2020 research and innovation programme
under grant agreement No 101016775 INTERVENE.
under grant agreement No 101016775 INTERVENE.
2 changes: 1 addition & 1 deletion pgscatalog_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.2'
__version__ = '0.2.0'
Empty file.
92 changes: 92 additions & 0 deletions pgscatalog_utils/aggregate/aggregate_scores.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import argparse
import textwrap

import pandas as pd

from pgscatalog_utils.config import set_logging_level
import glob
import logging

logger = logging.getLogger(__name__)


def aggregate_scores():
args = _parse_args()
set_logging_level(args.verbose)
df = aggregate(list(set(args.scores)))
logger.debug("Compressing and writing combined scores")
df.to_csv('aggregated_scores.txt.gz', sep='\t', compression='gzip')


def aggregate(scorefiles: list[str]):
combined = pd.DataFrame()
aggcols = set()

for i, path in enumerate(scorefiles):
logger.debug(f"Reading {path}")
# pandas can automatically detect zst compression, neat!
df = (pd.read_table(path)
.assign(sampleset=path.split('_')[0])
.set_index(['sampleset', '#IID']))

df.index.names = ['sampleset', 'IID']

# Subset to aggregatable columns
df = df[_select_agg_cols(df.columns)]
aggcols.update(set(df.columns))

# Combine DFs
if i == 0:
logger.debug('Initialising combined DF')
combined = df.copy()
else:
logger.debug('Adding to combined DF')
combined = combined.add(df, fill_value=0)

assert all([x in combined.columns for x in aggcols]), "All Aggregatable Columns are present in the final DF"

return combined.pipe(_calculate_average)


def _calculate_average(combined: pd.DataFrame):
logger.debug("Averaging data")
avgs = combined.loc[:, combined.columns.str.endswith('_SUM')].divide(combined['DENOM'], axis=0)
avgs.columns = avgs.columns.str.replace('_SUM', '_AVG')
return pd.concat([combined, avgs], axis=1)


def _select_agg_cols(cols):
keep_cols = ['DENOM']
return [x for x in cols if (x.endswith('_SUM') and (x != 'NAMED_ALLELE_DOSAGE_SUM')) or (x in keep_cols)]


def _description_text() -> str:
return textwrap.dedent('''
Aggregate plink .sscore files into a combined TSV table.

This aggregation sums scores that were calculated from plink
.scorefiles. Scorefiles may be split to calculate scores over different
chromosomes or effect types. The PGS Catalog calculator automatically splits
scorefiles where appropriate, and uses this script to combine them.

Input .sscore files can be optionally compressed with zstd or gzip.

The aggregated output scores are compressed with gzip.
''')


def _parse_args(args=None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=_description_text(),
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-s', '--scores', dest='scores', required=True, nargs='+',
help='<Required> List of scorefile paths. Use a wildcard (*) to select multiple files.')
parser.add_argument('-o', '--outdir', dest='outdir', required=True,
default='scores/', help='<Required> Output directory to store downloaded files')
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
help='<Optional> Extra logging information')
return parser.parse_args(args)


if __name__ == "__main__":
aggregate_scores()

2 changes: 2 additions & 0 deletions pgscatalog_utils/log_config.py → pgscatalog_utils/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging

POLARS_MAX_THREADS = 1 # dummy value, is reset by args.n_threads (default: 1)


def set_logging_level(verbose: bool):
log_fmt = "%(name)s: %(asctime)s %(levelname)-8s %(message)s"
Expand Down
36 changes: 28 additions & 8 deletions pgscatalog_utils/download/download_scorefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
import os
import shutil
import textwrap
import time
from contextlib import closing
from functools import reduce
from urllib import request as request
from urllib.error import HTTPError, URLError

from pgscatalog_utils.download.publication import query_publication
from pgscatalog_utils.download.score import get_url
from pgscatalog_utils.download.trait import query_trait
from pgscatalog_utils.log_config import set_logging_level
from pgscatalog_utils.config import set_logging_level

logger = logging.getLogger(__name__)

Expand All @@ -31,21 +33,25 @@ def download_scorefile() -> None:

pgs_lst: list[list[str]] = []

pgsc_calc_info = None
if args.pgsc_calc:
pgsc_calc_info = args.pgsc_calc

if args.efo:
logger.debug("--trait set, querying traits")
pgs_lst = pgs_lst + [query_trait(x) for x in args.efo]
pgs_lst = pgs_lst + [query_trait(x, pgsc_calc_info) for x in args.efo]

if args.pgp:
logger.debug("--pgp set, querying publications")
pgs_lst = pgs_lst + [query_publication(x) for x in args.pgp]
pgs_lst = pgs_lst + [query_publication(x, pgsc_calc_info) for x in args.pgp]

if args.pgs:
logger.debug("--id set, querying scores")
pgs_lst.append(args.pgs) # pgs_lst: a list containing up to three flat lists

pgs_id: list[str] = list(set(reduce(lambda x, y: x + y, pgs_lst)))

urls: dict[str, str] = get_url(pgs_id, args.build)
urls: dict[str, str] = get_url(pgs_id, args.build, pgsc_calc_info)

for pgsid, url in urls.items():
logger.debug(f"Downloading {pgsid} from {url}")
Expand All @@ -62,14 +68,26 @@ def _mkdir(outdir: str) -> None:
os.makedirs(outdir)


def _download_ftp(url: str, path: str) -> None:
def _download_ftp(url: str, path: str, retry:int = 0) -> None:
if os.path.exists(path):
logger.warning(f"File already exists at {path}, skipping download")
return
else:
with closing(request.urlopen(url)) as r:
with open(path, 'wb') as f:
shutil.copyfileobj(r, f)
try:
with closing(request.urlopen(url)) as r:
with open(path, 'wb') as f:
shutil.copyfileobj(r, f)
except (HTTPError, URLError) as error:
max_retries = 5
print(f'Download failed: {error.reason}')
# Retry to download the file if the server is busy
if '421' in error.reason and retry < max_retries:
print(f'> Retry to download the file ... attempt {retry+1} out of {max_retries}.')
retry += 1
time.sleep(10)
_download_ftp(url,path,retry)
else:
raise RuntimeError("Failed to download '{}'.\nError message: '{}'".format(url, error.reason))


def _check_args(args):
Expand Down Expand Up @@ -121,6 +139,8 @@ def _parse_args(args=None) -> argparse.Namespace:
parser.add_argument('-o', '--outdir', dest='outdir', required=True,
default='scores/',
help='<Required> Output directory to store downloaded files')
parser.add_argument('-c', '--pgsc_calc', dest='pgsc_calc',
help='<Optional> Provide information about downloading scoring files via pgsc_calc')
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
help='<Optional> Extra logging information')
return parser.parse_args(args)
Expand Down
12 changes: 6 additions & 6 deletions pgscatalog_utils/download/publication.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import logging
from functools import reduce

import requests
from pgscatalog_utils.download.score import query_api

logger = logging.getLogger(__name__)


def query_publication(pgp: str) -> list[str]:
api: str = f'https://www.pgscatalog.org/rest/publication/{pgp}'
def query_publication(pgp: str, user_agent:str = None) -> list[str]:
logger.debug("Querying PGS Catalog with publication PGP ID")
r: requests.models.Response = requests.get(api)
api: str = f'/publication/{pgp}'
results_json = query_api(api, user_agent)

if r.json() == {}:
if results_json == {} or results_json == None:
logger.critical(f"Bad response from PGS Catalog for EFO term: {pgp}")
raise Exception

pgs: dict[str, list[str]] = r.json().get('associated_pgs_ids')
pgs: dict[str, list[str]] = results_json.get('associated_pgs_ids')
logger.debug(f"Valid response from PGS Catalog for PGP ID: {pgp}")
return list(reduce(lambda x, y: set(x).union(set(y)), pgs.values()))
43 changes: 37 additions & 6 deletions pgscatalog_utils/download/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@

import jq
import requests
import time
from pgscatalog_utils import __version__ as pgscatalog_utils_version

logger = logging.getLogger(__name__)


def get_url(pgs: list[str], build: str) -> dict[str, str]:
def get_url(pgs: list[str], build: str, user_agent:str = None) -> dict[str, str]:
pgs_result: list[str] = []
url_result: list[str] = []

for chunk in _chunker(pgs):
try:
response = _parse_json_query(query_score(chunk), build)
response = _parse_json_query(query_score(chunk,user_agent), build)
pgs_result = pgs_result + list(response.keys())
url_result = url_result + list(response.values())
except TypeError:
Expand All @@ -28,11 +30,40 @@ def get_url(pgs: list[str], build: str) -> dict[str, str]:
return dict(zip(pgs_result, url_result))


def query_score(pgs_id: list[str]) -> dict:
def query_api(api: str, user_agent:str = None, retry:int = 0) -> dict:
max_retries = 5
wait = 60
results_json = None
rest_url_root = 'https://www.pgscatalog.org/rest'
# Set pgscatalog_utils user agent if none provided
if not user_agent:
user_agent = 'pgscatalog_utils/'+pgscatalog_utils_version
try:
headers = {'User-Agent': user_agent}
r: requests.models.Response = requests.get(rest_url_root+api, headers=headers)
r.raise_for_status()
results_json = r.json()
except requests.exceptions.HTTPError as e:
print(f'HTTP Error: {e}')
if r.status_code in [421,429] and retry < 5:
retry +=1
print(f'> Retry to query the PGS Catalog REST API in {wait}s ... attempt {retry} out of {max_retries}.')
time.sleep(wait)
results_json = query_api(api,retry)
except requests.exceptions.ConnectionError as e:
print(f'Error Connecting: {e}')
except requests.exceptions.Timeout as e:
print(f'Timeout Error: {e}')
except requests.exceptions.RequestException as e:
print(f'Request Error: {e}')
return results_json


def query_score(pgs_id: list[str], user_agent:str = None) -> dict:
pgs: str = ','.join(pgs_id)
api: str = f'https://www.pgscatalog.org/rest/score/search?pgs_ids={pgs}'
r: requests.models.Response = requests.get(api)
return r.json()
api: str = f'/score/search?pgs_ids={pgs}'
results_json = query_api(api, user_agent)
return results_json


def _chunker(pgs: list[str]):
Expand Down
12 changes: 6 additions & 6 deletions pgscatalog_utils/download/trait.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import logging
from functools import reduce

import requests
from pgscatalog_utils.download.score import query_api

logger = logging.getLogger(__name__)


def query_trait(trait: str) -> list[str]:
api: str = f'https://www.pgscatalog.org/rest/trait/{trait}?include_children=1'
def query_trait(trait: str, user_agent:str = None) -> list[str]:
logger.debug(f"Querying PGS Catalog with trait {trait}")
r: requests.models.Response = requests.get(api)
api: str = f'/trait/{trait}?include_children=1'
results_json = query_api(api, user_agent)

if r.json() == {}:
if results_json == {} or results_json == None:
logger.critical(f"Bad response from PGS Catalog for EFO term: {trait}")
raise Exception

keys: list[str] = ['associated_pgs_ids', 'child_associated_pgs_ids']
pgs: list[str] = []
for key in keys:
pgs.append(r.json().get(key))
pgs.append(results_json.get(key))

logger.debug(f"Valid response from PGS Catalog for EFO term: {trait}")
return list(reduce(lambda x, y: set(x).union(set(y)), pgs))
Loading