-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcalling_validator.py
146 lines (121 loc) · 3.92 KB
/
calling_validator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
### Boas Pucker ###
### bpucker@cebitec.uni-bielefeld.de ###
### v0.35 ###
__usage__ = """
python calling_validator.py\n
--vcf <VCF_FILE>
--fasta <FASTA_FILE>
--out <OUTPUT_DIRECTORY>
feature requests and bug reports: bpucker@cebitec.uni-bielefeld.de
"""
import sys, os
# --- end of imports --- #
def load_seq_lengths( fasta_file ):
"""! @brief load sequence lengths from given FASTA file """
seq_lengths = {}
with open( fasta_file ) as f:
header = f.readline()[1:].strip().split(" ")[0]
seq = 0
line = f.readline()
while line:
if line[0] == '>':
seq_lengths.update( { header: seq } )
header = line.strip()[1:].split(" ")[0]
seq = 0
else:
seq += len( line.strip() )
line = f.readline()
seq_lengths.update( { header: seq } )
return seq_lengths
def validate_vcf( vcf_file, seq_lengths, result_file, window_size ):
"""! @brief check coverage file """
valid = True
vcf_chromosomes = {}
with open( result_file, "w" ) as out:
with open( vcf_file, "r" ) as f:
line = f.readline()
chromosome = False
variant_pos = []
while line:
if line[0] != '#':
if not chromosome:
chromosome = line.split('\t')[0]
vcf_chromosomes.update( { line.split('\t')[0]: None } )
parts = line.strip().split('\t')
if parts[0] != chromosome:
chunks = {}
for pos in variant_pos:
val = pos / window_size
try:
chunks[ val ] += 1
except KeyError:
chunks.update( { val: 0 } )
try:
for i in range( max( chunks.keys() ) ):
try:
if chunks[ i ] > 0:
out.write( str( chunks[ i ] ) + "\t" + str( window_size ) + '\n' )
else:
out.write( "ERROR: no variants - " + chromosome + " - block idx: " + str( i ) + "\n" )
valid = False
except KeyError:
out.write( "ERROR: no variants - " + chromosome + " - block idx: " + str( i ) + "\n" )
valid = False
except ValueError:
out.write( "ERROR: no variants - " + chromosome + " - block idx: ?\n" )
valid = False
variant_pos = []
chromosome = parts[0]
vcf_chromosomes.update( { parts[0]: None } )
variant_pos.append( int( parts[1] ) )
line = f.readline()
chunks = {}
for pos in variant_pos:
val = pos / window_size
try:
chunks[ val ] += 1
except KeyError:
chunks.update( { val: 0 } )
for i in range( max( chunks.keys() ) ):
try:
if chunks[ i ] > 0:
out.write( str( chunks[ i ] ) + "\t" + str( window_size ) + '\n' )
else:
out.write( "ERROR: coverage is zero - " + chromosome + " - block idx: " + str( i ) + "\n" )
valid = False
except KeyError:
out.write( "ERROR: coverage is zero - " + chromosome + " - block idx: " + str( i ) + "\n" )
valid = False
# --- check if all sequences are present --- #
for key in seq_lengths.keys():
try:
vcf_chromosomes[ key ]
except:
out.write( "ERROR: missing chromosome - " + key + "\n" )
valid = False
out.write( "FINAL STATUS: valid? >> " + str( valid ) + '\n' )
return valid
def main( arguments ):
"""! @brief run everything """
vcf_file = arguments[ arguments.index( '--vcf' )+1 ]
fasta_file = arguments[ arguments.index( '--fasta' )+1 ]
prefix = arguments[ arguments.index( '--out' )+1 ]
if not prefix[-1] == "/":
prefix += "/"
if not os.path.exists( prefix ):
os.makedirs( prefix )
window_size = 500000
# --- check completeness of file --- #
seq_lengths = load_seq_lengths( fasta_file )
result_file = prefix + vcf_file.split('/')[-1].lower().replace( ".vcf", ".results" )
status = validate_vcf( vcf_file, seq_lengths, result_file, window_size )
if not status:
print "ERROR detected in " + vcf_file
else:
print "OK!"
if __name__ == '__main__':
if '--vcf' in sys.argv and '--out' in sys.argv and '--fasta' in sys.argv:
main( sys.argv )
else:
sys.exit( __usage__ )
print "all done!"