forked from bmvdgeijn/WASP
-
Notifications
You must be signed in to change notification settings - Fork 0
/
chromstat.py
149 lines (109 loc) · 4.29 KB
/
chromstat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import sys
import numpy as np
class ChromStats(object):
def __init__(self):
self.n = 0
self.n_nan = 0
self.sum = 0
self.min = None
self.max = None
def mean(self):
"""Calculates mean of sites that are not nan
on this chromsome"""
n = self.n - self.n_nan
if n == 0:
return np.inf
return self.sum / float(n)
def set_from_vals(self, vals):
self.n = vals.size
if str(vals.dtype).startswith('float'):
nan_vals = np.isnan(vals)
self.n_nan = np.sum(nan_vals)
if self.n_nan < self.n:
self.min = np.min(vals[~nan_vals])
self.max = np.max(vals[~nan_vals])
self.sum = np.sum(vals[~nan_vals])
else:
self.min = np.min(vals)
self.max = np.max(vals)
self.sum = np.sum(vals)
def add(self, other):
self.n += other.n
self.n_nan += other.n_nan
self.sum += other.sum
if (self.min is None) or (other.min is not None and
self.min > other.min):
self.min = other.min
if (self.max is None) or (other.max is not None and
self.max < other.max):
self.max = other.max
def __str__(self):
return "n=%d n_nan=%s min=%s max=%s sum=%s" % \
(self.n, str(self.n_nan), str(self.min), str(self.max),
str(self.sum))
def calc_stats(h5f, chrom_list, verbose=False):
"""Calculates stats for each chromosome in provided list as well
as combined stats."""
combined = ChromStats()
for chrom in chrom_list:
chrom_stat = ChromStats()
node_name = "/%s" % chrom.name
if node_name in h5f:
node = h5f.getNode("/%s" % chrom.name)
vals = node[:]
chrom_stat.set_from_vals(vals)
if verbose:
sys.stderr.write("%s %s\n" % (str(chrom), str(chrom_stat)))
else:
sys.stderr.write("skipping chromosome %s because "
"not present in HDF5 file" % chrom.name)
combined.add(chrom_stat)
return combined
def set_stats(h5f, chrom_list, verbose=False):
"""Calculates stats for each chromosome and entire track and
stores them as attributes on the chromosome nodes. The
provided HDF5 file handle must have been opened in append mode"""
combined = ChromStats()
for chrom in chrom_list:
node_name = "/%s" % chrom.name
if node_name in h5f:
chrom_stat = ChromStats()
node = h5f.getNode(node_name)
chrom_stat.set_from_vals(node[:])
node.attrs.n = chrom_stat.n
node.attrs.n_nan = chrom_stat.n_nan
node.attrs.min = chrom_stat.min
node.attrs.max = chrom_stat.max
node.attrs.sum = chrom_stat.sum
node.flush()
if verbose:
sys.stderr.write("%s %s\n" % (str(chrom), str(chrom_stat)))
combined.add(chrom_stat)
else:
sys.stderr.write("skipping chromosome %s because "
"not present in HDF5 file\n" % chrom.name)
return combined
def get_stats(h5f, chrom_list, verbose=False):
"""Retrieves stats that are stored as attributes for the specified
set of chromosomes."""
combined = ChromStats()
chrom_stat = ChromStats()
for chrom in chrom_list:
node_name = "/%s" % chrom.name
if node_name in h5f:
node = h5f.getNode(node_name)
if 'n' not in node.attrs:
raise ValueError("Stat attributes are not set for track %s"
% track.name)
chrom_stat.n = node.attrs.n
chrom_stat.n_nan = node.attrs.n_nan
chrom_stat.min = node.attrs.min
chrom_stat.max = node.attrs.max
chrom_stat.sum = node.attrs.sum
if verbose:
sys.stderr.write("%s %s\n" % (str(chrom), str(chrom_stat)))
combined.add(chrom_stat)
else:
sys.stderr.write("skipping chromosome %s because "
"not present in HDF5 file\n" % chrom.name)
return combined