-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgmm.py
241 lines (205 loc) · 7.83 KB
/
gmm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import numpy
import scipy
import utility as util
import dcf
def logpdf_GAU_ND_Opt(X, mu, C):
# function for the log likelihood
#print("svd:", numpy.linalg.svd(C)[1])
P = numpy.linalg.inv(C)
const = -0.5 * X.shape[0] * numpy.log(2*numpy.pi)
const += -0.5 * numpy.linalg.slogdet(C)[1]
Y = []
for i in range(X.shape[1]):
x = X[:, i:i+1]
res = const + -0.5 * numpy.dot( (x-mu).T, numpy.dot(P, (x-mu)))
Y.append(res)
return numpy.array(Y).ravel()
def GMM_ll_perSample(X, gmm):
# compute log likelihood for each samples, so return an array of ll for each of the samples contained in X
G = len(gmm)
N = X.shape[1]
S = numpy.zeros((G, N))
for g in range(G):
# for each component we compute the likelihood of the corrisponding gaussian for the samples. We add the value of the weight, that is the prior
# of the component. This for compute the Joint matrix
S[g, :] = logpdf_GAU_ND_Opt(X, gmm[g][1], gmm[g][2]) + numpy.log(gmm[g][0])
# take the sum of the exponential of the row of matrix Joint
return scipy.special.logsumexp(S, axis=0)
def mrow(v):
return v.reshape((1, v.shape[0]))
def mcol(v):
return v.reshape((v.shape[0], 1))
def GMM_tied(GMM, Z_vec, N):
tied_Sigma = numpy.zeros((GMM[0][2].shape[0], GMM[0][2].shape[0]))
for g in range((len(GMM))):
tied_Sigma += GMM[g][2] * Z_vec[g]
tied_Sigma = (1/N) * tied_Sigma
for g in range((len(GMM))):
GMM[g] = (GMM[g][0], GMM[g][1], tied_Sigma)
return GMM
def GMM_EM(X, gmm, full_cov=True, threshold=1e-6, psi=0.01,tied=False):
# ll of the current gmm
llNew = None
llOld = None
lastll = 0
G = len(gmm)
N = X.shape[1]
i = 0
#if tied:
# return _GMM_EM_tied(X, gmm)
# this difference represents the difference between the two likelihoods
while llOld is None or llNew - llOld > threshold:
llOld = llNew
SJ = numpy.zeros((G, N))
##### compute the matrix of Joint density like in the previous function####
for g in range(G):
SJ[g, :] = logpdf_GAU_ND_Opt(X, gmm[g][1], gmm[g][2]) + numpy.log(gmm[g][0])
# marginal probability (like MVG classifier)
SM = scipy.special.logsumexp(SJ, axis=0)
# log likelihood for the all dataset (indipendent samples hypothesis)
llNew = SM.sum() / N
# component posterior probabilities for each sample in log domain (-)
P = numpy.exp(SJ - SM)
# generate the updated parameters of our GMM
gmmNew = []
sigma_sum = numpy.zeros((X.shape[0], X.shape[0]))
# compute min covariance matrix and weight
for g in range(G):
gamma = P[g, :]
# zero order statistic
Z = gamma.sum()
# first order and second order statistics
# evaluate F with broadcasting
F = (mrow(gamma) * X).sum(1)
S = numpy.dot(X ,(mrow(gamma) * X).T)
# weight
w = Z / N
# mean
mu = mcol(F / Z)
# covariance matrix
Sigma = S/Z - numpy.dot(mu, mu.T)
#print('Sigma: ',Sigma)
if tied:
# compute sum of all covariances
sigma_sum += Z*Sigma
gmmNew.append((w, mu))
else:
# full covariance case
if not full_cov:
# diagonal case
Sigma = Sigma * numpy.eye(Sigma.shape[0])
U, s, _ = numpy.linalg.svd(Sigma)
s[s < psi] = psi
Sigma = numpy.dot(U, mcol(s)*U.T)
gmmNew.append((w, mu, Sigma))
if tied:
gmm = gmmNew
Sigma = sigma_sum / N
if not full_cov:
Sigma = Sigma * numpy.eye(Sigma.shape[0])
U, s, _ = numpy.linalg.svd(Sigma)
s[s < psi] = psi
Sigma = numpy.dot(U, mcol(s)*U.T)
tied_gmm = []
for g in gmm:
(w, mu) = g
tied_gmm.append((w, mu, Sigma))
gmm = tied_gmm
else:
gmm = gmmNew
# check if the llNew in increasing used for bugs
if(llNew < lastll and lastll != 0):
#print("ERROR decreasing ll")
pass
lastll = llNew
#print("new ll:", llNew)
i+=1
#print("diff:", llNew - llOld)
return gmm
def empirical_cov(D):
mu = D.mean(1)
#print(mu,mu.shape)
DC = D - mu.reshape(mu.size,1)
return numpy.dot(DC,DC.T) / float(DC.shape[1])
def split(GMM, alpha):
U, s, Vh = numpy.linalg.svd(GMM[2])
d = U[:, 0:1] * s[0]**0.5 * alpha
return (GMM[0] / 2, GMM[1] + d, GMM[2]), (GMM[0] / 2, GMM[1] - d, GMM[2])
def LBG(gmms, alpha):
ret = []
for gmm in gmms:
C = constrEigenvaluesCovMat(gmm[2], 0.01)
gmmConstr = (gmm[0], gmm[1], C)
gmm1, gmm2 = split(gmmConstr, alpha)
ret.append(gmm1)
ret.append(gmm2)
return ret
# Constraining the eigenvalues of the covariance matrices
def constrEigenvaluesCovMat(C, psi):
U, s, _ = numpy.linalg.svd(C)
#print("s:", s)
s[s<psi] = psi
new_C = numpy.dot(U, mcol(s) * U.T)
return new_C
def diag_cov(DTR):
C = empirical_cov(DTR)
return C * numpy.eye(C.shape[0])
# compute_cov, alpha, stopping_criterion, G, psi, cov_type='full',tied=False
def GMM(DTR, DTE, LTR, params={}):
priors = params.setdefault('priors', [0.5, 0.5])
alpha = params.setdefault('alpha', 0.1)
stopping_criterion = params.setdefault('stopping_criterion', 10**-6)
G = params.get('G')
psi = params.setdefault('psi', 0.01)
full = params.setdefault('full_cov', True)
tied = params.setdefault('tied', False)
if full:
compute_cov = empirical_cov
else:
compute_cov = diag_cov
lls = []
for i in range(2):
ll_for_class = []
class_data = DTR[:, LTR == i]
Sigma = constrEigenvaluesCovMat(compute_cov(class_data), psi)
start_gmm = [(1, mcol(numpy.mean(class_data, 1 )), Sigma)]
#print("\t\tprocessing class:", i)
lbg_gmm = start_gmm
for j in range(G):
#print("\t\twith component(s):", len(lbg_gmm))
new_gmm = GMM_EM(class_data, lbg_gmm, full, stopping_criterion, psi,tied)
start_gmm = new_gmm
ll = GMM_ll_perSample(DTE, new_gmm)
ll_for_class.append(ll)
lbg_gmm = LBG(start_gmm, alpha)
for i,gmm in enumerate(lbg_gmm):
Cconstr = constrEigenvaluesCovMat(gmm[2], psi)
lbg_gmm[i] = (gmm[0], gmm[1], Cconstr)
lls.append(ll_for_class)
llrs = []
for i in range(G):
llr = lls[1][i] - lls[0][i]
llrs.append(llr)
llrs = numpy.vstack(llrs)
print("llrs shape", llrs.shape)
return llrs
def build_filename(minDCF: bool, preprocessing: bool, components: int, cov_type: str) -> str:
_ext = ".bin" if minDCF else ".npy"
_minDCF = "minDCFs" if minDCF else "llrs"
_preprocessing = "z_norm" if preprocessing else "raw"
filename = f"{_minDCF}_{_preprocessing}-gmm-{components}-{cov_type}{_ext}"
return filename
def build_path(minDCF: bool, preprocessing: bool, components: int, cov_type: str, training: bool= True) -> str:
ROOT = f"./results/gmm"
if not training:
ROOT += "/experimental"
filename = build_filename(minDCF, preprocessing, components, cov_type)
return f"{ROOT}/{filename}"
def load_results(minDCF:bool, preprocessing:bool, components:int, cov_type:str, application:tuple= (0.5, 1, 1), training:bool= True):
# get training results
PATH = build_path(minDCF, preprocessing, components, cov_type, training)
if minDCF:
ret = util.pickle_load(PATH)[application]
else:
ret = numpy.load(PATH)
return ret