forked from PGScatalog/pgs_metadata_exports
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpgs_metadata_exports.py
309 lines (253 loc) · 10.4 KB
/
pgs_metadata_exports.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
import os, os.path
import argparse
import requests
import shutil
import tarfile
import time
from pgs_exports.PGSExportGenerator import PGSExportGenerator
from pgs_exports.PGSFtpGenerator import PGSFtpGenerator
large_publication_ids_list = ['PGP000244','PGP000263','PGP000332','PGP000393']
def rest_api_call(url,endpoint,parameters=None):
""""
Generic method to perform REST API calls to the PGS Catalog
> Parameters:
- url: URL to the REST API
- endpoint: REST API endpoint
- parameters: extra parameters to the REST API endpoint, if needed
> Return type: dictionary
"""
if not url.endswith('/'):
url += '/'
rest_full_url = url+endpoint
if parameters:
rest_full_url += '?'+parameters
print("\t\t> URL: "+rest_full_url)
try:
response = requests.get(rest_full_url)
response_json = response.json()
# Response with pagination
if 'next' in response_json:
count_items = response_json['count']
results = response_json['results']
# Loop over the pages
while response_json['next'] and count_items > len(results):
time.sleep(1)
response = requests.get(response_json['next'])
response_json = response.json()
results = results + response_json['results']
if count_items != len(results):
print(f'The number of items are differents from expected: {len(results)} found instead of {count_items}')
# Respone without pagination
else:
results = response_json
except requests.exceptions.RequestException as e: # This is the correct syntax
raise SystemExit(e)
return results
def get_all_pgs_data(url_root):
"""
Fetch all the PGS data via the REST API
> Parameter:
- url_root: Root of the REST API URL
> Return type: dictionary
"""
data = {}
for type in ['score', 'trait', 'publication', 'performance', 'cohort']:
print(f'\t- Fetch all {type}s')
tmp_data = rest_api_call(url_root, f'{type}/all')
# Wait a bit to avoid reaching the maximum of allowed queries/min (might be increased)
time.sleep(5)
if tmp_data:
print(f'\t\t> {type}s: {len(tmp_data)} entries')
data[type] = tmp_data
else:
print(f'\t/!\ Error: cannot retrieve "{type}" data')
return data
def get_latest_release(url_root) -> dict:
"""
Fetch the date of the latest the PGS Catalog release
> Parameter:
- url_root: Root of the REST API URL
> Return type: dictionary
"""
release = ''
release_data = rest_api_call(url_root, 'release/current')
if release_data:
release = release_data
print(f'\t\t> Release: {release["date"]}')
else:
print('\t/!\ Error: cannot retrieve current release')
return release
def get_previous_release(url_root):
"""
Fetch the date of the latest the PGS Catalog release
> Parameter:
- url_root: Root of the REST API URL
> Return type: dictionary
"""
release = ''
release_data = rest_api_call(url_root, 'release/all')
if release_data:
if 'results' in release_data:
release = release_data['results'][1]
else:
release = release_data[1]
print(f'\t\t> Previous release: {release["date"]}')
else:
print('\t/!\ Error: cannot retrieve previous release')
return release
def get_ancestry_categories(url_root):
"""
Fetch the list of ancestry categories
> Parameter:
- url_root: Root of the REST API URL
> Return type: dictionary
"""
data = {}
ancestry_data = rest_api_call(url_root, 'ancestry_categories')
if ancestry_data:
for anc in ancestry_data:
data[anc] = ancestry_data[anc]['display_category']
else:
print('\t/!\ Error: cannot retrieve the list of ancestry categories')
return data
def create_pgs_directory(path, force_recreate=None):
"""
Creates directory for a given PGS
> Parameters:
- path: path of the directory
- force_recreate: if it already exists, remove it before creating it again
"""
# Remove directory before creating it again
if force_recreate and os.path.isdir(path):
try:
shutil.rmtree(path,ignore_errors=True)
except OSError:
print (f'Deletion of the existing directory prior to it\'s regeneration failed ({path}).')
exit()
# Create directory if it doesn't exist
if not os.path.isdir(path):
try:
os.mkdir(path, 0o755)
except OSError:
print (f'Creation of the directory {path} failed')
exit()
def tardir(path, tar_name):
"""
Generates a tarball of the new PGS FTP metadata files
> Parameters:
- path: path to the directory containing the files we want to compress
- tar_name: file name of the tar file
"""
with tarfile.open(tar_name, "w:gz") as tar_handle:
for root, dirs, files in os.walk(path):
for file in files:
tar_handle.add(os.path.join(root, file))
def check_new_data_entry_in_metadata(dirpath_new,data,release_data):
"""
Check that the metadata directory for the new Scores and Performance Metrics exists
> Parameters:
- dirpath_new: path to the directory where the metadata files have be copied
- data: dictionary containing the metadata
- release_data: data related to the current release
"""
scores_dir = dirpath_new+'/scores/'
# Score(s)
missing_score_dir = set()
for score_id in release_data['released_score_ids']:
if not os.path.isdir(scores_dir+score_id):
missing_score_dir.add(score_id)
# Performance Metric(s)
missing_perf_dir = set()
new_performances = release_data['released_performance_ids']
for perf in [ x for x in data['performance'] if x['id'] in new_performances]:
score_id = perf['associated_pgs_id']
if not os.path.isdir(scores_dir+score_id):
missing_perf_dir.add(score_id)
if len(missing_score_dir) != 0 or len(missing_perf_dir) != 0:
if len(missing_score_dir) != 0:
print('/!\ Missing PGS directories for the new entry(ies):\n - '+'\n - '.join(list(missing_score_dir)))
if len(missing_perf_dir) != 0:
print('/!\ Missing PGS directories for the new associated Performance Metric entry(ies):\n - '+'\n - '.join(list(missing_perf_dir)))
exit(1)
else:
print("OK - No missing PGS directory for the new entry(ies)")
#===============#
# Main method #
#===============#
def main():
debug = 0
tmp_export_dir_name = 'export'
tmp_ftp_dir_name = 'new_ftp_content'
# Script parameters
argparser = argparse.ArgumentParser()
argparser.add_argument("--url", help='The URL root of the REST API, e.g. "http://127.0.0.1:8000/rest/"', required=True)
argparser.add_argument("--dir", help=f'The path of the root dir of the metadata "<dir>/{tmp_ftp_dir_name}"', required=True)
argparser.add_argument("--remote_ftp", help='Flag to indicate whether the FTP is remote (FTP protocol) or local (file system) - Default: False (file system)', action='store_true')
args = argparser.parse_args()
rest_url_root = args.url
content_dir = args.dir
use_remote_ftp = False
if args.remote_ftp:
use_remote_ftp = True
if not os.path.isdir(content_dir):
print(f'Directory {content_dir} can\'t be found!')
exit(1)
# Setup new FTP directory
new_ftp_dir = content_dir+'/'+tmp_ftp_dir_name
create_pgs_directory(new_ftp_dir , 1)
# Setup temporary export directory
export_dir = content_dir+'/'+tmp_export_dir_name+'/'
create_pgs_directory(export_dir, 1)
# Fetch all the metadata (via REST API)
print('\t- Fetch metadata')
data = get_all_pgs_data(rest_url_root)
# Fetch releases data (current and previous)
print('\t- Fetch release dates')
current_release = get_latest_release(rest_url_root)
current_release_date = current_release['date']
previous_release_date = get_previous_release(rest_url_root)['date']
# Fetch the list of ancestry categories
print('\t- Fetch ancestry categories')
ancestry_categories = get_ancestry_categories(rest_url_root)
# Setup path to some of the extra export files
scores_list_file = new_ftp_dir+'/pgs_scores_list.txt'
archive_file_name = '{}/../pgs_ftp_{}.tar.gz'.format(export_dir,current_release_date)
#-----------------------#
# Generate Export files #
#-----------------------#
# Get the list of published PGS IDs
score_ids_list = [ x['id'] for x in data['score'] ]
exports_generator = PGSExportGenerator(export_dir,data,scores_list_file,score_ids_list,large_publication_ids_list,current_release_date,ancestry_categories,debug)
# Generate file listing all the released Scores
exports_generator.generate_scores_list_file()
# Generate all PGS metadata export files
exports_generator.call_generate_all_metadata_exports()
# Generate all PGS metadata export files
exports_generator.call_generate_large_studies_metadata_exports()
# Generate PGS metadata export files for each released studies
exports_generator.call_generate_studies_metadata_exports()
#------------------------#
# Generate FTP structure #
#------------------------#
ftp_generator = PGSFtpGenerator(export_dir,new_ftp_dir,score_ids_list,large_publication_ids_list,previous_release_date,use_remote_ftp,debug)
# Build FTP structure for metadata files
ftp_generator.build_metadata_ftp()
# Check that the new entries have a PGS directory
check_new_data_entry_in_metadata(new_ftp_dir,data,current_release)
# Build FTP structure for the bulk metadata files
ftp_generator.build_bulk_metadata_ftp()
# Build FTP structure for the large study metadata files
ftp_generator.build_large_study_metadata_ftp()
# Generates the compressed archive to be copied to the EBI Private FTP
tardir(new_ftp_dir, archive_file_name)
# Generate release file (containing the release date)
release_filename = f'{new_ftp_dir}/release_date.txt'
try:
release_file = open(release_filename,'w')
release_file.write(current_release_date)
release_file.close()
except:
print(f"Can't create the release file '{release_filename}'.")
exit()
if __name__ == '__main__':
main()