-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtask.py
268 lines (224 loc) · 9.44 KB
/
task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
import numpy as np
# Wei-Long version
class RikhyeTask():
def __init__(self, Ntrain, Nextra, Ncontexts, inpsPerConext, blockTrain):
self.Ncontexts = Ncontexts
self.blockTrain = blockTrain
self.inpsPerConext = inpsPerConext
self.tsteps = 200
self.cuesteps = 100 #200 # if 200, no delay period
self.Ncues = self.Ncontexts * self.inpsPerConext
if self.blockTrain:
self.Nextra = Nextra # add cycles to show if block1
# learning is remembered
Ntrain = Ntrain*self.Ncontexts + self.Nextra
else:
Ntrain *= self.Ncontexts
self.Ntrain = Ntrain
# Initialize counter
self.traini = 0
def __call__(self, *args, **kwargs):
"""Return the input stimulus and target output for one cycle.
Returns:
input: (n_time, n_input)
target: (n_time, n_output)
"""
# if blockTrain,
# first half of trials is context1, second half is context2
if self.blockTrain:
n_switch = (self.Ntrain - self.Nextra) // self.Ncontexts
taski = self.traini // (n_switch)
# last block is just the first context again
if self.traini >= self.Ntrain - self.Nextra:
taski = 0
cueList = self.get_cue_list(taski)
else:
cueList = self.get_cue_list()
cues_order = self.get_cues_order(cueList)
num_trial = cues_order.shape[0]
inputs = np.zeros((self.tsteps * num_trial, self.Ncues))
targets = np.zeros((self.tsteps * num_trial, 2))
t_start = 0
for taski, cuei in cues_order:
cue, target = \
self.get_cue_target(taski, cuei)
inputs[t_start:t_start+self.cuesteps] = cue
targets[t_start:t_start+self.tsteps] = target
t_start += self.tsteps
self.traini += 1
return inputs, targets
def get_cues_order(self, cues):
cues_order = np.random.permutation(cues)
return cues_order
def get_cue_target(self,taski,cuei):
cue = np.zeros(self.Ncues)
inpBase = taski*self.inpsPerConext
cue[inpBase+cuei] = 1.
if (cuei % 2) == 0:
target = np.array((1.,0.))
else:
target = np.array((0.,1.))
return cue, target
# def get_cue_target_old(self, taski, cuei):
# cue = np.zeros(self.Ncues)
# inpBase = taski * 2
# if cuei in (0, 1):
# cue[inpBase + cuei] = 1.
# elif cuei == 3:
# cue[inpBase:inpBase + 2] = 1
#
# if cuei == 0:
# target = np.array((1., 0.))
# else:
# target = np.array((0., 1.))
# return cue, target
def get_cue_list(self,taski=None):
if taski is not None:
# (taski,cuei) combinations for one given taski
cueList = np.dstack(( np.repeat(taski,self.inpsPerConext),
np.arange(self.inpsPerConext) ))
else:
# every possible (taski,cuei) combination
cueList = np.dstack(( np.repeat(np.arange(self.Ncontexts),self.inpsPerConext),
np.tile(np.arange(self.inpsPerConext),self.Ncontexts) ))
return cueList[0]
class RikhyeTaskMultiCues():
def __init__(self, Ntrain, Nextra, Ncontexts, inpsPerConext, blockTrain, tsteps_noise):
self.Ncontexts = Ncontexts
self.blockTrain = blockTrain
self.inpsPerConext = inpsPerConext
self.tsteps = 200
self.cuesteps = 100 #200 # if 200, no delay period
self.Ncues = self.Ncontexts * self.inpsPerConext
self.tsteps_noise = tsteps_noise
if self.blockTrain:
self.Nextra = Nextra # add cycles to show if block1
# learning is remembered
Ntrain = Ntrain*self.Ncontexts + self.Nextra
else:
Ntrain *= self.Ncontexts
self.Ntrain = Ntrain
# Initialize counter
self.traini = 0
def __call__(self, *args, **kwargs):
"""Return the input stimulus and target output for one cycle.
Returns:
input: (n_time, n_input)
target: (n_time, n_output)
"""
# if blockTrain,
# first half of trials is context1, second half is context2
if self.blockTrain:
n_switch = (self.Ntrain - self.Nextra) // self.Ncontexts
taski = self.traini // (n_switch)
# last block is just the first context again
if self.traini >= self.Ntrain - self.Nextra:
taski = 0
cueList = self.get_cue_list(taski)
else:
cueList = self.get_cue_list()
cues_order = self.get_cues_order(cueList)
num_trial = cues_order.shape[0]
inputs = np.zeros((self.tsteps * num_trial, self.Ncues))
targets = np.zeros((self.tsteps * num_trial, 2))
delaytsteps = self.tsteps-self.cuesteps
t_start = 0
for taski, cuei in cues_order:
cue, target = \
self.get_cue_target(taski, cuei)
inputs[t_start:t_start+self.cuesteps] = cue
inputs[t_start+self.tsteps-delaytsteps-self.tsteps_noise:t_start+self.tsteps-delaytsteps] = np.ones_like(cue)
targets[t_start:t_start+self.tsteps] = target
t_start += self.tsteps
self.traini += 1
return inputs, targets
def get_cues_order(self, cues):
cues_order = np.random.permutation(cues)
return cues_order
def get_cue_target(self,taski,cuei):
cue = np.zeros(self.Ncues)
inpBase = taski*self.inpsPerConext
cue[inpBase+cuei] = 1.
if (cuei % 2) == 0:
target = np.array((1.,0.))
else:
target = np.array((0.,1.))
return cue, target
def get_cue_list(self,taski=None):
if taski is not None:
# (taski,cuei) combinations for one given taski
cueList = np.dstack(( np.repeat(taski,self.inpsPerConext),
np.arange(self.inpsPerConext) ))
else:
# every possible (taski,cuei) combination
cueList = np.dstack(( np.repeat(np.arange(self.Ncontexts),self.inpsPerConext),
np.tile(np.arange(self.inpsPerConext),self.Ncontexts) ))
return cueList[0]
# Zhongxuan version
class RikhyeTaskBatch():
def __init__(self, num_cueingcontext, num_cue, num_rule, rule, blocklen, block_cueingcontext, tsteps, cuesteps, batch_size):
'''Generate Rikhye task dataset
Parameters:
num_cueingcontext: int, number of cueing contexts
num_cue: int, number of cues in each cueing contexts
num_rule: int, number of rules (e.g. attend to audition is a rule)
rule: list of int, rule corresponding to one cue in one cueing context
blocklen: list of int, trainlen of each block
block_cueingcontext: list of int, cueing context trained in each block
tsteps: int, length of a trial, equals to cuesteps + delaysteps
cuesteps: int, length of showing cues
batch_size: int
'''
self.num_cueingcontext = num_cueingcontext
self.num_cue = num_cue
self.num_rule = num_rule
self.rule = rule
self.blockrange = np.zeros_like(blocklen) # index range of each block
for i in range(len(blocklen)):
self.blockrange[i] = sum(blocklen[:i+1])
self.block_cueingcontext = block_cueingcontext
self.tsteps = tsteps
self.cuesteps = cuesteps
self.batch_size = batch_size
# Initialize counter
self.traini = 0
def __call__(self, *args, **kwargs):
"""Return the input stimulus and target output for one cycle.
Parameters:
No parameter
Returns:
input: (n_time, n_input)
target: (n_time, n_output)
"""
inputs = np.zeros((self.tsteps * self.num_cue, self.batch_size, self.num_cue*self.num_cueingcontext))
targets = np.zeros((self.tsteps * self.num_cue, self.batch_size, self.num_rule))
for i in range(self.batch_size):
blocki = np.argwhere(self.traini / self.blockrange < 1.0)
if len(blocki) == 0:
raise ValueError("the end")
blocki = int(blocki[0])
cueingcontext = self.block_cueingcontext[blocki]
cueList = self.get_cue_list(cueingcontext)
cues_order = np.random.permutation(cueList)
t_start = 0
for cueingcontext, cuei in cues_order:
cue, target = self.get_cue_target(cueingcontext, cuei)
inputs[t_start:t_start+self.cuesteps, i, :] = cue
targets[t_start:t_start+self.tsteps, i, :] = target
t_start += self.tsteps
self.traini += 1
return inputs, targets
# Helper functions
def get_cue_list(self, cueingcontext):
'''
Return:
(cueingcontext, cuei) combinations for one training step
'''
cueList = np.dstack((np.repeat(cueingcontext,self.num_cue), np.arange(self.num_cue)))
return cueList[0]
def get_cue_target(self, cueingcontext, cuei):
cue = np.zeros(self.num_cue*self.num_cueingcontext)
cue[cueingcontext*self.num_cue + cuei] = 1.
target = np.zeros(self.num_rule)
target[self.rule[cueingcontext*self.num_cue + cuei]] = 1
return cue, target