-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
191 lines (160 loc) · 6.68 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
from operator import itemgetter
import lookups
AF_BUCKETS = [0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1]
METRICS = [
'BaseQRankSum',
'ClippingRankSum',
'DP',
'FS',
'InbreedingCoeff',
'MQ',
'MQRankSum',
'QD',
'ReadPosRankSum',
'VQSLOD'
]
def add_transcript_coordinate_to_variants(db, variant_list, transcript_id):
"""
Each variant has a 'xpos' and 'pos' positional attributes.
This method takes a list of variants and adds a third position: the "transcript coordinates".
This is defined as the distance from the start of the transcript, in coding bases.
So a variant in the 7th base of the 6th exon of a transcript will have a transcript coordinate of
the sum of the size of the first 5 exons) + 7
This is 0-based, so a variant in the first base of the first exon has a transcript coordinate of 0.
You may want to add transcript coordinates for multiple transcripts, so this is stored in a variant as
variant['transcript_coordinates'][transcript_id]
If a variant in variant_list does not have a `transcript_coordinates` dictionary, we create one
If a variant start position for some reason does not fall in any exons in this transcript, its coordinate is 0.
This is perhaps logically inconsistent,
but it allows you to spot errors quickly if there's a pileup at the first base.
`None` would just break things.
Consider the behavior if a 20 base deletion deletes parts of two exons.
I think the behavior in this method is consistent, but beware that it might break things downstream.
Edits variant_list in place; no return val
"""
# make sure exons is sorted by (start, end)
exons = sorted(lookups.get_exons_in_transcript(db, transcript_id), key=itemgetter('start', 'stop'))
# offset from start of base for exon in ith position (so first item in this list is always 0)
exon_offsets = [0 for i in range(len(exons))]
for i, exon in enumerate(exons):
for j in range(i+1, len(exons)):
exon_offsets[j] += exon['stop'] - exon['start']
for variant in variant_list:
if 'transcript_coordinates' not in variant:
variant['transcript_coordinates'] = {}
variant['transcript_coordinates'][transcript_id] = 0
for i, exon in enumerate(exons):
if exon['start'] <= variant['pos'] <= exon['stop']:
variant['transcript_coordinates'][transcript_id] = exon_offsets[i] + variant['pos'] - exon['start']
protein_letters_1to3 = {
'A': 'Ala', 'C': 'Cys', 'D': 'Asp', 'E': 'Glu',
'F': 'Phe', 'G': 'Gly', 'H': 'His', 'I': 'Ile',
'K': 'Lys', 'L': 'Leu', 'M': 'Met', 'N': 'Asn',
'P': 'Pro', 'Q': 'Gln', 'R': 'Arg', 'S': 'Ser',
'T': 'Thr', 'V': 'Val', 'W': 'Trp', 'Y': 'Tyr',
'X': 'Ter', '*': 'Ter'
}
def get_proper_hgvs(csq):
# Needs major_consequence
if csq['major_consequence'] in ('splice_donor_variant', 'splice_acceptor_variant', 'splice_region_variant'):
return get_transcript_hgvs(csq)
else:
return get_protein_hgvs(csq)
def get_transcript_hgvs(csq):
return csq['HGVSc'].split(':')[-1]
CHROMOSOMES = ['chr%s' % x for x in range(1, 23)]
CHROMOSOMES.extend(['chrX', 'chrY', 'chrM'])
CHROMOSOME_TO_CODE = { item: i+1 for i, item in enumerate(CHROMOSOMES) }
def get_single_location(chrom, pos):
"""
Gets a single location from chromosome and position
chr must be actual chromosme code (chrY) and pos must be integer
Borrowed from xbrowse
"""
return CHROMOSOME_TO_CODE[chrom] * int(1e9) + pos
def get_xpos(chrom, pos):
"""
Borrowed from xbrowse
"""
if not chrom.startswith('chr'):
chrom = 'chr{}'.format(chrom)
return get_single_location(chrom, int(pos))
def merge_dicts(*dict_args):
"""
Given any number of dicts, shallow copy and merge into a new dict,
precedence goes to key value pairs in latter dicts.
"""
result = {}
for dictionary in dict_args:
result.update(dictionary)
return result
def find_item(obj, key):
if key in obj:
return obj[key]
if isinstance(obj, dict):
for k in obj:
if isinstance(obj[k], dict):
item = find_item(obj[k], key)
if item is not None:
return item
elif isinstance(obj[k], list):
for i in obj[k]:
if isinstance(i, str):
continue
item = find_item(i, key)
if item is not None:
return item
elif isinstance(obj, list):
for k in obj:
if isinstance(k, dict):
item = find_item(k, key)
if item is not None:
return item
elif isinstance(k, list):
for i in k:
if isinstance(i, str):
continue
item = find_item(i, key)
if item is not None:
return item
def get_hpo_gene(hpo_ids):
hpo_db=get_db(app.config['DB_NAME_HPO'])
hpo_terms = [(i, hpo_db.hpo.find_one({'id':i})['name'][0]) for i in hpo_ids]
# this has missing HPO ids. see IRDC_batch2_OXF_3001 and #HP:0000593
hpo_gene=dict()
for hpo_id,hpo_term, in hpo_terms:
hpo_gene[hpo_id] = []
for gene_name in [x['Gene-Name'] for x in hpo_db.ALL_SOURCES_ALL_FREQUENCIES_phenotype_to_genes.find({'HPO-ID':hpo_id},{'Gene-Name':1,'_id':0})]:
#gene_hpo[gene_name]=gene_hpo.get(gene_name,[])+[{'hpo_id':hpo_id,'hpo_term':hpo_term}]
hpo_gene[hpo_id]=hpo_gene.get(hpo_id,[])+[gene_name]
for k in hpo_gene: hpo_gene[k]=list(frozenset(list(hpo_gene[k])))
return hpo_gene
def find_item(obj, key):
if key in obj:
return obj[key]
if isinstance(obj, dict):
for k in obj:
if isinstance(obj[k], dict):
item = find_item(obj[k], key)
if item is not None:
return item
elif isinstance(obj[k], list):
for i in obj[k]:
if isinstance(i, str):
continue
item = find_item(i, key)
if item is not None:
return item
elif isinstance(obj, list):
for k in obj:
if isinstance(k, dict):
item = find_item(k, key)
if item is not None:
return item
elif isinstance(k, list):
for i in k:
if isinstance(i, str):
continue
item = find_item(i, key)
if item is not None:
return item