-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsvd.py
142 lines (116 loc) · 4.44 KB
/
svd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import numpy as np
import matplotlib.pyplot as plt
import pickle
def centerilze(X):
# This function substracts the mean of each row of X
# from the entries of the corresponding row.
# If X is an n * m matrix, where each column is a sample,
# then this function effectively centralizes the samples.
meanCol = np.matrix(np.mean(X, axis=1))
meanRepeat = np.repeat(meanCol, X.shape[1], 1)
X = np.subtract(X, meanRepeat)
return X, meanCol
def svd(X):
# This function calculates the covariance matrix of X
# and then decompose it
# into singular vectors and signular values.
_, c = X.shape
covMatrix = np.dot(X, np.transpose(X)) / (c - 1)
U, s, V = np.linalg.svd(covMatrix)
return U, s, V
def singluarAnalysis(s, thres=0.05):
# Given a signluar value vector and a threshold, this function finds the
# number of largest singular values whose sum is more than $1-thres$
# of the total singular values.
d = len(s)
cumu = 0
sumThres = sum(s) * (1 - thres)
for i in range(d):
cumu += s[i]
if cumu > sumThres:
break
return i + 1, d / (i + 1)
def getRatio(pid=0, numIter=300, maxPara=1000, thres=0.05):
# This function is currently hard coded.
# It reads a set of data samples, use the first half to train PCA,
# and then use the second half for testing.
# read the samples
x = []
pidName = '_pid0' + str(pid) + '.bin' # hard coded file name
for i in range(1, numIter + 1):
fileName = 'phase1/iter000' + '0' * (3 - len(str(i))) + str(i) + pidName
# fileName = 'phase3/iter025' + '0' * (3 - len(str(i))) + str(i) + pidName
# fileName = 'phase3/iter049' + '0' * (3 - len(str(i))) + str(i) + pidName
temp = np.fromfile(fileName, dtype=np.float32)
x.append(temp[:maxPara])
X = np.transpose(np.matrix(x))
# extract the training set
XTrain = X[:, 0:150]
# perform PCA
Xc, meanCol = centerilze(XTrain)
U, s, V = svd(Xc)
# analysis compression ratio
d, ratio = singluarAnalysis(s, thres=thres)
print("compression ratio:", ratio)
# compress then recover the training set, then analyse the loss
XTrainC = compress(XTrain, meanCol, U, d)
XTrainD = decompress(XTrainC, meanCol, U, d)
print("training set recovery loss (normalized square error) is: ",
offset(XTrain, XTrainD))
# extract the testing set
XTest = X[:, 150:300]
# compress then recover the training set, then analyse the loss
# _, meanCol = centerilze(XTest)
XTestC = compress(XTest, meanCol, U, d)
XTestD = decompress(XTestC, meanCol, U, d)
print("training set recovery loss (normalized square error) is: ",
offset(XTest, XTestD))
return ratio, offset(XTest, XTestD), U
def compress(X, meanCol, U, d):
# compress a sample set by projecting them to the first d dimension
# subspace spanned by U.
X = np.subtract(X, np.repeat(meanCol, X.shape[1], 1))
return np.dot(np.transpose(U[:, :d]), X)
def decompress(Xd, meanCol, U, d):
# decompress the compressed sample set by projecting them to the original
# n-dimensional space.
Xr = np.add(np.dot(U[:, :d], Xd), np.repeat(meanCol, Xd.shape[1], 1))
return Xr
def offset(X, Xr):
# calculate the normalized square error of ground truth X
# and the recovered Xr.
diff = np.subtract(X, Xr)
diff = np.sum(np.multiply(diff, diff))
base = np.sum(np.multiply(X, X))
return diff / base
def doubleplot(x, y1, y2):
# plot two y-axis in one graph.
fig, ax1 = plt.subplots()
ax1.plot(x, y1, 'b-', linewidth=3)
ax1.set_xlabel('variance loss (%)', fontsize=16)
ax1.set_ylabel('compression ratio', color='b', fontsize=16)
ax1.tick_params('y', colors='b')
ax2 = ax1.twinx()
ax2.plot(x, y2, 'r--', marker='s', linewidth=3)
ax2.set_ylabel('testing loss (%)', color='r', fontsize=16)
ax2.tick_params('y', colors='r')
fig.tight_layout()
plt.grid()
plt.show()
Ustored = {}
for pid in [0, 2, 4, 6, 8]:
print("\ncurrently working on pid=", pid)
r = []
loss = []
varloss = [20, 15, 10, 5, 1]
for var in varloss:
print("when var loss is ", var, "%:")
rr, ll, U = getRatio(pid=pid, thres=(var / 100))
r.append(rr)
loss.append(ll)
Ustored[pid] = U
loss = [l * 100 for l in loss]
doubleplot(varloss, r, loss)
print("\n")
# with open("U_phase3.pickle", 'wb') as handle:
# pickle.dump(Ustored, handle)