forked from DiegoAE/HMMLFM
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSDLFM_tests.py
138 lines (108 loc) · 4.02 KB
/
SDLFM_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from hmm.continuous.LFMHMMcontinuous import LFMHMMcontinuous
from matplotlib import pyplot as plt
import numpy as np
import scipy.io as sio
seed = np.random.random_integers(10000)
# seed = 4748
np.random.seed(seed)
print "USED SEED", seed
### LFM HMM
number_lfm = 3
outputs = 1
start_t = 0.1
end_t = 10.1
locations_per_segment = 201
n_latent_forces = 1 # TODO: currently not passing this argument to the model.
lfm_hmm = LFMHMMcontinuous(outputs, number_lfm, locations_per_segment, start_t,
end_t, verbose=True)
mat_file = sio.loadmat('Samples.mat')
x = mat_file['XTest'][0][-1] # all sample locations
n_samples = np.size(x)
picked_sample = 0
f = mat_file['yTest'][0][-1][0][picked_sample]
n_outputs = f.shape[1]
Y = np.zeros((n_samples, n_outputs))
for i in xrange(n_outputs):
Y[:, i] = f[0][i].flatten()
# testing_idx = 62
# print "X ", x[0][testing_idx]
# print "Y ", obs[testing_idx, 0], obs[testing_idx, 1]
#
# testing_idx = 63
# print "X ", x[0][testing_idx]
# print "Y ", obs[testing_idx, 0], obs[testing_idx, 1]
plt.plot(x.flatten(), Y)
for i in xrange(1, 6):
plt.axvline(x=10 * i, color='red', linestyle='--')
plt.show()
# Setting observations in the model.
channel_id = 0
number_training_sequences = 1
obs = []
for s in xrange(number_training_sequences):
number_segments = 6 # fixed for now.
c_obs = np.zeros((number_segments, locations_per_segment))
signal = Y[:, channel_id]
idx = 0
for i in xrange(number_segments):
c_obs[i, :] = signal[idx:idx + locations_per_segment]
idx = idx + locations_per_segment - 1
obs.append(c_obs)
lfm_hmm.set_observations(obs)
print "before training"
print lfm_hmm.pi
print lfm_hmm.A
print lfm_hmm.LFMparams
train_flag = False
if train_flag:
lfm_hmm.train()
lfm_hmm.save_params("/home/diego/tmp/Parameters", "pruebaSDLFM_1")
else:
lfm_hmm.read_params("/home/diego/tmp/Parameters", "pruebaSDLFM_1")
print "after training"
print lfm_hmm.pi
print lfm_hmm.A
print lfm_hmm.LFMparams
# Second experiment: Regression
number_testing_points = 100
regression_hidden_states = lfm_hmm._viterbi()[0]
last_value = 0
plt.axvline(x=last_value, color='red', linestyle='--')
considered_segments = 6
for i in xrange(considered_segments):
c_hidden_state = regression_hidden_states[i]
c_obv = obs[0][i]
# predicting more time steps
t_test = np.linspace(start_t, end_t, number_testing_points)
mean_pred, cov_pred = lfm_hmm.predict(t_test, c_hidden_state, c_obv)
sl = lfm_hmm.sample_locations
plt.scatter(last_value + sl - sl[0], c_obv, facecolors='none',
label=[None, 'observations'][i == 0])
plt.plot(last_value + t_test - t_test[0], mean_pred, color='green',
label=[None, 'predicted mean'][i == 0])
diag_cov = np.diag(cov_pred)
plt.plot(last_value + t_test - t_test[0], mean_pred.flatten() - 2 * np.sqrt(diag_cov), 'k--')
plt.plot(last_value + t_test - t_test[0], mean_pred.flatten() + 2 * np.sqrt(diag_cov), 'k--')
last_value = last_value + end_t - start_t
plt.axvline(x=last_value, color='red', linestyle='--')
print "Inferred hidden states ", regression_hidden_states
plt.title("Fitting of the model given an observation sequence.")
plt.legend(loc='upper left')
plt.show()
# Plotting the priors
last_value = 0
plt.axvline(x=last_value, color='red', linestyle='--')
for i in xrange(considered_segments):
c_hidden_state = regression_hidden_states[i]
# predicting more time steps
t_test = np.linspace(start_t, end_t, locations_per_segment)
mean_prior = np.zeros(len(t_test))
cov_prior = lfm_hmm.lfms[c_hidden_state].Kyy()
plt.plot(last_value + t_test - t_test[0], mean_prior, color='green')
diag_cov = np.diag(cov_prior)
plt.plot(last_value + t_test - t_test[0], mean_prior.flatten() - 2 * np.sqrt(diag_cov), 'k--')
plt.plot(last_value + t_test - t_test[0], mean_prior.flatten() + 2 * np.sqrt(diag_cov), 'k--')
last_value = last_value + end_t - start_t
plt.axvline(x=last_value, color='red', linestyle='--')
plt.title("Plotting priors.")
plt.show()