-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsimulator.py
126 lines (104 loc) · 5.34 KB
/
simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import numpy as np
# import pandas as pd
# from scipy.stats import bernoulli
# #!pip install sklearn
# from sklearn.datasets import make_sparse_spd_matrix
# import matplotlib.pyplot as plt
# import scipy
from transformation_functions import update_Sigma, update_G, update_Theta, update_E, update_C
from generator_functions import sample_B, sample_K, sample_H, sample_Z_from_W
# Main Simulator Class
class Simulator:
# Remember we will have indexes starting from 0 so all max are -=1
def __init__(self, D, V, M, k, gamma, seed):
# Create zero matrices for all possible matrices
self.W = np.zeros((D, V)) # matrix of D×V where Wdn is counter of appearances of the word n in document d
self.B = np.zeros((k, V)) # matrix of kxV where Bz is the parameter vector of the distribution for the z-th topic
self.C = np.zeros((k, V)) # matrix of kxV where Cz is the count vec of sampled topics over each word for all docs
self.E = np.zeros((D, k)) # matrix of Dxk where Ed is the count vec of sampled drawings for topic z over all words for each doc
self.H = np.zeros((D, k)) # H_d is eta_d
self.Theta = np.zeros((D, k)) # This is just a transformation of H
self.G = np.zeros((k, k)) # Adjacency Matrix (Check also python package "networkx" for graph objects!)
self.K = np.zeros((k, k)) # Precision matrix of G
self.Sigma = np.zeros((k, k)) # Inverse of K
self.Z = -np.ones((D, V, M)) # Topic assignments for each words of each document
self.D = D
self.V = V
self.M = M
self.k = k
self.gamma = gamma
self.seed = seed # Random seed
# Generations
def generate_WZ(self):
if self.M == 0:
raise Exception('Error: M value is 0')
elif np.sum(self.Theta, axis=1).sum(axis=0) == 0:
raise Exception('Error: Theta matrix 0')
elif np.sum(self.B, axis=1).sum(axis=0) == 0:
raise Exception('Error: B matrix 0')
np.random.seed(self.seed)
# Ref https://numpy.org/doc/stable/reference/random/generated/numpy.random.multinomial.html
# Multinomial drawing for Z and then W
for d in range(self.D):
# Maximum number of word drawings in the document
N_d = np.random.randint(1, int(self.M * self.V * 0.7)) # Hard-coding 70% thinning factor
for n in range(N_d):
# Multinomial drawing from Theta, because it has to be normalized
# This will give a canonical vector over k
mult = np.random.multinomial(1, self.Theta[d], size=1) # This is a vector of 0's with a single 1
z = np.argmax(mult) # This is the index of the 1 (Topic index)
# Multinomial drawing from Beta
# This will give a canonical vector over V
mult = np.random.multinomial(1, self.B[z], size=1) # This is a vector of 0's with a single 1
w = np.argmax(mult) # This is the index of the 1 (Word index)
empty_cell_indexes = np.nonzero(self.Z[d, w] == -1)[0] # Check if there are still possible unassigned occurrences for this word
if empty_cell_indexes.size != 0: # At least one entry is not assigned
first_empty_index = empty_cell_indexes[0]
self.Z[d, w, first_empty_index] = z # Assinging word to topic
self.W[d, w] += 1 # Increasing word counter
print('Success: W and Z generated')
# Transformations
def update_Theta(self):
self.Theta = update_Theta(self.Theta, self.H)
def update_E(self):
self.E = update_E(self.E, self.Z)
def update_C(self):
self.C = update_C(self.C, self.Z)
def update_Sigma(self):
self.Sigma = update_Sigma(self.K)
def update_G(self):
self.G = update_G(self.K)
# Priors
def sample_B(self):
self.B = sample_B(self.k, self.V, self.seed)
def sample_GK(self): # Here we can update Sigma automatically
self.K = sample_K(self.k, self.gamma, self.seed)
self.update_Sigma()
self.update_G()
def sample_H(self): # Here we can update Theta automatically
self.H = sample_H(self.Sigma, self.D, self.k, self.seed)
self.update_Theta()
def generate_all_data(self):
# TODO: This should run all relevant methods one after the other in order to fully populate all data matrixes
self.sample_B() # Will get B
self.sample_GK() # Will get G, K, Sigma
self.sample_H() # Will get H, Theta from Sigma
self.generate_WZ() # Will get W, Z from Theta, B
self.update_E() # Will get E from Z
self.update_C() # Will get C from Z
pass
def generate_non_informative(self,W):
self.W=W
V=self.V
k=self.k
D=self.D
self.B=(1/V)*np.ones((k,V))
self.K=np.identity(k)
self.Sigma=np.identity(k)
self.update_G()
self.H=(1/k)*np.ones((D,k)) # H non informative
self.update_Theta() #Theta non informative
self.Z = sample_Z_from_W(self.W, self.k, self.seed) # Will get W, Z from Theta, B
self.update_E() # Will get E from Z
self.update_C()
pass