forked from SensibilityTestbed/indoor-localization
-
Notifications
You must be signed in to change notification settings - Fork 0
/
butterworth.r2py
132 lines (90 loc) · 3.07 KB
/
butterworth.r2py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""
<Program Name>
butterworth.r2py
<Purpose>
This is a script for low/high pass filter to filter out
the raw data with variable cut off frequency.
Introducing an preset maximum sample interval to avoid device lag.
"""
# temporarly added trigonomery functions in this script
# Will delete these functions after fix trig.r2py
pi = 3.14159265358979323846
TERMS = 11
def butter_factorial(x):
result = 1
for num in range(x):
result *= (num + 1)
return result
def butter_cos(x):
taylorsum = 0.0
for n in range(TERMS):
taylorsum += (-1)**n * (x**(2.0*n) / butter_factorial(2*n))
return max(min(taylorsum, 1), -1)
def butter_sin(x):
return (1-butter_cos(x)**2)**0.5
def butter_tan(x):
return butter_sin(x)/butter_cos(x)
# y(k) = b0x(k) + b1x(k-1) + b2x(k-2) - a1y(k-1) - a2y(k-2)
class ButterworthFilter:
def __init__(self, type, cutoff_frequency, sampling_frequency, current_time, threshold):
self.mag_list = []
self.output_list = []
self.cutoff_frequency = cutoff_frequency
self.type = type
self.a1 = 0.0
self.a2 = 0.0
self.b0 = 0.0
self.b1 = 0.0
self.b2 = 0.0
self._filter_coefficient(sampling_frequency)
self.last_time = current_time
self.threshold = threshold
self.timelist = []
# get filter coefficient
# by cutoff frequency and sampling frequency
def _filter_coefficient(self, sampling_frequency):
omegaC = self.cutoff_frequency / sampling_frequency
c = 1.0 / butter_tan(pi * omegaC)
q = 2 ** 0.5
a0 = 1.0 + q * c + c * c
self.a1 = -2.0 * (c * c - 1.0) / a0
self.a2 = (1.0 - q * c + c * c) / a0
if self.type == "low":
self.b0 = 1.0 / a0
self.b1 = 2.0 / a0
self.b2 = self.b0
elif self.type == "high":
self.b0 = c * c / a0
self.b1 = -2.0 * c * c / a0
self.b2 = self.b0
# lowpass/high pass filter
# Default dynamic is False
# if dynamic == True, get each sample's sampling rate
def filter(self, data, current_time, dynamic = False):
# avoid device lag
# different sampling time interval will reduce the filter's accuracy
if current_time - self.last_time > self.threshold:
self.mag_list = []
self.output_list = []
self.timelist = []
length = len(self.mag_list)
if length < 2:
self.mag_list.append(data)
self.output_list.append(data)
self.timelist.append(current_time)
filtered_magNoG = data
# will change to queue later
elif length == 2:
if dynamic:
sampling_rate = 2/(current_time - self.timelist[0])
self._filter_coefficient(sampling_rate)
self.timelist[0] = self.timelist[1]
self.timelist[1] = current_time
filtered_magNoG = self.b0 * data + self.b1 * self.mag_list[1] + self.b2 * self.mag_list[0] - self.a1 * self.output_list[1] - self.a2 * self.output_list[0]
self.mag_list[0] = self.mag_list[1]
self.mag_list[1] = data
self.output_list[0] = self.output_list[1]
self.output_list[1] = filtered_magNoG
self.last_time = current_time
return filtered_magNoG
# -*- mode: python;-*-