forked from bmvdgeijn/WASP
-
Notifications
You must be signed in to change notification settings - Fork 0
/
update_het_probs.py
131 lines (101 loc) · 4.17 KB
/
update_het_probs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import gzip
import argparse
import math
import sys
from argparse import ArgumentParser
import tables
import util
def parse_options():
parser = ArgumentParser(description="""This script adjusts
heterozygote probabilities in CHT files to account for
possible genotyping errors. Total counts of reference
and alternative alleles are used to adjust the
probability. The read counts that are provided can be
from the same experiment, combined across many different
experiments, or (perhaps ideally) from DNA sequencing
of the same individual.""")
parser.add_argument("infile", action='store',
help="Input file for Combined Haplotype "
"Test (CHT) that needs het probabilities "
"adjusted",
default=None)
parser.add_argument("outfile", action='store',
help="Output CHT file with heterozygote "
"probabilities adjusted",
default=None)
parser.add_argument("--ref_as_counts",
action='store',
help="Path to HDF5 file containing counts "
"of reads that match reference allele",
metavar="REF_AS_COUNT_H5_FILE",
required=True)
parser.add_argument("--alt_as_counts",
help="Path to HDF5 file containing counts "
"of reads that match alternate allele",
metavar="ALT_AS_COUNT_H5_FILE",
action='store', required=True)
return parser.parse_args()
def main():
error = 0.01
args = parse_options()
if util.is_gzipped(args.infile):
infile = gzip.open(args.infile, "rt")
else:
infile = open(args.infile, "r")
if args.outfile.endswith(".gz"):
outfile = gzip.open(args.outfile,"w")
else:
outfile = open(args.outfile,"w")
ref_count_h5 = tables.openFile(args.ref_as_counts)
alt_count_h5 = tables.openFile(args.alt_as_counts)
snp_line = infile.readline()
if snp_line:
outfile.write(snp_line)
else:
sys.stderr.write("The input file was empty.\n")
exit(-1)
snp_line = infile.readline()
while snp_line:
snpinfo = snp_line.strip().split()
if snpinfo[9] == "NA":
outfile.write(snp_line)
else:
new_hetps = process_one_snp(snpinfo, ref_count_h5,
alt_count_h5, error)
outfile.write("\t".join(snpinfo[:10] +
[";".join(new_hetps)] +
snpinfo[11:]) + "\n")
snp_line = infile.readline()
ref_count_h5.close()
alt_count_h5.close()
def process_one_snp(snpinfo, ref_count_h5, alt_count_h5, error):
chrm = snpinfo[0]
# positions of target SNPs
snplocs = [int(y.strip()) for y in snpinfo[9].split(';')]
# heterozygote probabilities of target SNPs
hetps = [float(y.strip()) for y in snpinfo[10].split(';')]
update_hetps = []
ref_node = ref_count_h5.getNode("/%s" % chrm)
alt_node = alt_count_h5.getNode("/%s" % chrm)
for i in range(len(snplocs)):
pos = snplocs[i]
adr = ref_node[pos-1]
ada = alt_node[pos-1]
update_hetps.append(str(get_posterior_hetp(hetps[i], adr,
ada, error)))
return update_hetps
def get_posterior_hetp(hetp_prior, adr, ada, error):
prior = min(0.99, hetp_prior)
badlike = addlogs(math.log(error)*adr +
math.log(1-error)*ada,
math.log(1-error)*adr +
math.log(error)*ada)
goodlike = math.log(0.5)*adr + math.log(0.5)*ada
if goodlike-badlike > 40:
# avoid overflow (very close to 1.0)
return 1.0
else:
return prior*math.exp(goodlike - badlike) / (prior*math.exp(goodlike - badlike) + (1.0 - prior))
def addlogs(loga, logb):
return max(loga, logb) + math.log(1+math.exp(-abs(loga-logb)))
main()