-
Notifications
You must be signed in to change notification settings - Fork 26
/
simulator.R
517 lines (484 loc) · 20.1 KB
/
simulator.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
#' @importFrom foreach %dopar% %do% foreach
#' @importFrom doParallel registerDoParallel stopImplicitCluster
#' @importFrom itertools isplitVector
#' @importFrom data.table rbindlist
#' @importFrom iterators icount
#' @import Formula
#' @export
Simulator <- R6::R6Class(
"Simulator",
class = FALSE,
public = list(
agents = NULL,
workers = NULL,
agent_count = NULL,
horizon = NULL,
simulations = NULL,
worker_max = NULL,
internal_history = NULL,
save_context = NULL,
save_theta = NULL,
do_parallel = NULL,
sims_and_agents_list = NULL,
t_over_sims = NULL,
set_seed = NULL,
progress_file = NULL,
log_interval = NULL,
save_interval = NULL,
include_packages = NULL,
outfile = NULL,
global_seed = NULL,
chunk_multiplier = NULL,
policy_time_loop = NULL,
cl = NULL,
initialize = function(agents,
horizon = 100L,
simulations = 100L,
save_context = FALSE,
save_theta = FALSE,
do_parallel = TRUE,
worker_max = NULL,
set_seed = 0,
save_interval = 1,
progress_file = FALSE,
log_interval = 1000,
include_packages = NULL,
t_over_sims = FALSE,
chunk_multiplier = 1,
policy_time_loop = FALSE) {
# save current seed
self$global_seed <- contextual::get_global_seed()
if (!is.list(agents)) agents <- list(agents)
self$progress_file <- progress_file
self$log_interval <- as.integer(log_interval)
self$horizon <- as.integer(horizon)
self$simulations <- as.integer(simulations)
self$save_theta <- save_theta
self$save_context <- save_context
self$agents <- agents
self$agent_count <- length(agents)
self$worker_max <- worker_max
self$do_parallel <- do_parallel
self$t_over_sims <- t_over_sims
self$set_seed <- set_seed
self$save_interval <- as.integer(save_interval)
self$include_packages <- include_packages
self$chunk_multiplier <- as.integer(chunk_multiplier)
self$policy_time_loop <- policy_time_loop
self$reset()
},
reset = function() {
set.seed(self$set_seed)
self$workers <- 1
# create or clear log files
if (self$progress_file) {
cat(paste0(""), file = "workers_progress.log", append = FALSE)
cat(paste0(""), file = "agents_progress.log", append = FALSE)
cat(paste0(""), file = "parallel.log", append = FALSE)
self$outfile <- "parallel.log"
}
# (re)create history data and meta data tables
self$internal_history <- History$new()
self$internal_history$set_meta_data("horizon",self$horizon)
self$internal_history$set_meta_data("agents",self$agent_count)
self$internal_history$set_meta_data("simulations",self$simulations)
self$internal_history$set_meta_data("sim_start_time",format(Sys.time(), "%a %b %d %X %Y"))
# unique policy name creation
agent_name_list <- list()
for (agent_index in 1L:self$agent_count) {
current_agent_name <- self$agents[[agent_index]]$name
agent_name_list <- c(agent_name_list,current_agent_name)
current_agent_name_occurrences <-
length(agent_name_list[agent_name_list == current_agent_name])
if (current_agent_name_occurrences > 1) {
self$agents[[agent_index]]$name <-
paste0(current_agent_name,'.',current_agent_name_occurrences)
}
agent_name <- self$agents[[agent_index]]$name
bandit_name <- self$agents[[agent_index]]$bandit$class_name
policy_name <- self$agents[[agent_index]]$policy$class_name
self$internal_history$set_meta_data("bandit", bandit_name , group = "sim", agent_name = agent_name)
self$internal_history$set_meta_data("policy", policy_name , group = "sim", agent_name = agent_name)
}
},
run = function() {
# set parallel or serial processing
`%fun%` <- foreach::`%do%`
# nocov start
if (self$do_parallel) {
self$register_parallel_backend()
`%fun%` <- foreach::`%dopar%`
# If Microsoft R, set MKL threads to 1
# Due to an unresolved incompatibility between MRAN and RStudio:
# https://github.com/rstudio/rstudio/issues/5933
# https://social.technet.microsoft.com/Forums/en-US/2791e896-c284-4330-88f2-2dcd4acea074
# setting MKL threads to 1 is disabled when running from RStudio.
isRStudio <- Sys.getenv("RSTUDIO") == "1"
if (!isRStudio && "RevoUtilsMath" %in% rownames(installed.packages())) {
RevoUtilsMath::setMKLthreads(1)
}
}
# nocov end
# create a list of all sims (sims*agents), to be divided into chunks
index <- 1
sims_and_agents_list <- vector("list", self$simulations*self$agent_count)
for (sim_index in 1L:self$simulations) {
for (agent_index in 1L:self$agent_count) {
sims_and_agents_list[[index]] <-
list(agent_index = agent_index, sim_index = sim_index)
index <- index + 1
}
}
# copy variables used in parallel processing to local environment
horizon <- self$horizon
agent_count <- self$agent_count
save_context <- self$save_context
save_theta <- self$save_theta
progress_file <- self$progress_file
save_interval <- self$save_interval
log_interval <- self$log_interval
t_over_sims <- self$t_over_sims
set_seed <- self$set_seed
agents <- self$agents
include_packages <- self$include_packages
policy_time_loop <- self$policy_time_loop
# calculate chunk size
if (length(sims_and_agents_list) <= self$workers) {
chunk_divider <- length(sims_and_agents_list)
} else {
chunk_divider <- self$workers * self$chunk_multiplier
}
# split sims vector into chuncks
sa_iterator <- itertools::isplitVector(sims_and_agents_list, chunks = chunk_divider)
# include packages that are used in parallel processes
par_packages <- c(c("data.table","iterators","itertools"),include_packages)
# some info messages
message(paste("Simulation horizon:",horizon))
message(paste("Number of simulations:",length(sims_and_agents_list)))
message(paste("Number of batches:",chunk_divider))
message("Starting main loop.")
# start running the main simulation loop
private$start_time <- Sys.time()
foreach_results <- foreach::foreach(
sims_agent_list = sa_iterator,
i = iterators::icount(),
.inorder = TRUE,
.export = c("History","Formula"),
.noexport = c("sims_and_agents_list","internal_history","sa_iterator"),
.packages = par_packages
) %fun% {
index <- 1L
sim_agent_counter <- 0
sim_agent_total <- length(sims_agent_list)
# TODO: Can be done smarter and cleaner?
multiplier <- 1
for (sim_agent_index in sims_agent_list) {
sim_agent <- agents[[sim_agent_index$agent_index]]
if(isTRUE(sim_agent$bandit$arm_multiply))
if(multiplier < sim_agent$bandit$k)
multiplier <- sim_agent$bandit$k
}
allocate_space <- floor((horizon * sim_agent_total * multiplier) / save_interval) + sim_agent_total
local_history <- History$new( allocate_space,
save_context,
save_theta)
for (sim_agent_index in sims_agent_list) {
sim_agent <- agents[[sim_agent_index$agent_index]]$clone(deep = TRUE)
sim_agent$sim_index <- sim_agent_index$sim_index
sim_agent$agent_index <- sim_agent_index$agent_index
sim_agent_counter <- sim_agent_counter + 1
if (isTRUE(progress_file)) {
sim_agent$progress_file <- TRUE
sim_agent$log_interval <- log_interval
cat(paste0("[",format(Sys.time(), format = "%H:%M:%OS6"),"] ",
" 0 > init - ",sprintf("%-20s", sim_agent$name),
" worker ", i,
" at sim ", sim_agent_counter,
" of ", sim_agent_total,"\n"),
file = "workers_progress.log", append = TRUE)
}
simulation_index <- sim_agent$sim_index
agent_name <- sim_agent$name
local_curent_seed <- simulation_index + set_seed * 42
set.seed(local_curent_seed)
sim_agent$bandit$post_initialization()
sim_agent$policy$post_initialization()
if(isTRUE(sim_agent$bandit$arm_multiply)) {
if(policy_time_loop)
horizon_loop <- horizon
else
horizon_loop <- horizon * sim_agent$bandit$k
data_length <- horizon * sim_agent$bandit$k
} else {
horizon_loop <- horizon
data_length <- horizon
}
set.seed(local_curent_seed + 1e+06)
sim_agent$bandit$generate_bandit_data(n = data_length)
if (isTRUE(t_over_sims)) sim_agent$set_t(as.integer((simulation_index - 1L) * horizon_loop))
step <- list()
loop_time <- 0L
while (loop_time < horizon_loop) {
step <- sim_agent$do_step()
if(isTRUE(policy_time_loop)) {
loop_time <- step[[5]]
} else {
loop_time <- loop_time + 1L
}
if (!is.null(step[[3]]) && ((step[[5]] == 1) || (step[[5]] %% save_interval == 0))) {
local_history$insert(
index, #index
step[[5]], #policy_t
step[[1]][["k"]], #k
step[[1]][["d"]], #d
step[[2]], #action
step[[3]], #reward
agent_name, #agentname
simulation_index, #sim
if (save_context) step[[1]][["X"]] else NA, #context
if (save_theta) step[[4]] else NA #theta
)
index <- index + 1L
}
}
}
sim_agent$bandit$final()
local_history$data[t!=0]
}
# bind all results
foreach_results <- data.table::rbindlist(foreach_results)
foreach_results[, agent := factor(agent)]
self$internal_history$set_data_table(foreach_results[sim > 0 & t > 0], auto_stats = FALSE)
rm(foreach_results)
private$end_time <- Sys.time()
gc()
message("Finished main loop.")
self$internal_history$set_meta_data("sim_end_time",format(Sys.time(), "%a %b %d %X %Y"))
formatted_duration <- contextual::formatted_difftime(private$end_time - private$start_time)
self$internal_history$set_meta_data("sim_total_duration", formatted_duration)
message(paste0("Completed simulation in ",formatted_duration))
start_time_stats <- Sys.time()
message("Computing statistics.")
# update statistics TODO: not always necessary, add option arg to class?
self$internal_history$update_statistics()
# load global seed
.Random.seed <- self$global_seed
# set meta data and messages
self$stop_parallel_backend()
self$internal_history
},
register_parallel_backend = function() {
# nocov start
# setup parallel backend
message("Setting up parallel backend.")
nr_cores <- parallel::detectCores()
if (nr_cores >= 3) self$workers <- nr_cores - 1
if (!is.null(self$worker_max)) {
if (self$workers > self$worker_max) self$workers <- self$worker_max
}
# make sure no leftover processes
doParallel::stopImplicitCluster()
if(!is.null(self$outfile)) {
self$cl <- parallel::makeCluster(self$workers, useXDR = FALSE, type = "PSOCK",
methods = FALSE, setup_timeout = 30, outfile = self$outfile)
} else {
self$cl <- parallel::makeCluster(self$workers, useXDR = FALSE, type = "PSOCK",
methods = FALSE, setup_timeout = 30)
}
message(paste0("Cores available: ",nr_cores))
message(paste0("Workers assigned: ",self$workers))
doParallel::registerDoParallel(self$cl)
# nocov end
},
stop_parallel_backend = function() {
# nocov start
if (self$do_parallel) {
try({
parallel::stopCluster(self$cl)
})
doParallel::stopImplicitCluster()
}
# nocov end
},
finalize = function() {
# set global seed back to value before
contextual::set_global_seed(self$global_seed)
#closeAllConnections()
}
),
private = list(
start_time = NULL,
end_time = NULL
),
active = list(
history = function(value) {
if (missing(value)) {
self$internal_history
} else {
warning("## history$data is read only", call. = FALSE)
}
}
)
)
#' Simulator
#'
#' The entry point of any \pkg{contextual} simulation.
#'
#' A Simulator takes, at a minimum, one or more \code{\link{Agent}} instances, a horizon
#' (the length of an individual simulation, \emph{t} = \{1, \ldots, T\}) and the number of simulations
#' (How many times to repeat each simulation over \emph{t} = \{1, \ldots, T\}, with a new seed
#' on each repeat*).
#'
#' It then runs all simulations (in parallel by default), keeping a log of all \code{\link{Policy}} and
#' \code{\link{Bandit}} interactions in a \code{\link{History}} instance.
#'
#' * Note: to be able to fairly evaluate and compare each agent's performance, and to make sure that
#' simulations are replicable, for each separate agent, seeds are set equally and deterministically for
#' each agent over all \code{horizon x simulations} time steps.
#'
#' ![](1simulator.jpeg "contextual diagram: simulator")
#'
#' @name Simulator
#' @aliases run simulator
#'
#' @section Usage:
#' \preformatted{
#' simulator <- Simulator$new(agents,
#' horizon = 100L,
#' simulations = 100L,
#' save_context = FALSE,
#' save_theta = FALSE,
#' do_parallel = TRUE,
#' worker_max = NULL,
#' set_seed = 0,
#' save_interval = 1,
#' progress_file = FALSE,
#' log_interval = 1000,
#' include_packages = NULL,
#' t_over_sims = FALSE,
#' chunk_multiplier = 1,
#' policy_time_loop = FALSE)
#' }
#'
#' @section Arguments:
#'
#' \describe{
#' \item{\code{agents}}{
#' An \code{Agent} instance or a \code{list} of \code{Agent} instances.
#' }
#' \item{\code{horizon}}{
#' \code{integer}. The number of pulls or time steps to run each agent, where \emph{t} = \{1, \ldots, T\}.
#' }
#' \item{\code{simulations}}{
#' \code{integer}. How many times to repeat each agent's simulation over \emph{t} = \{1, \ldots, T\},
#' with a new seed on each repeat (itself deterministically derived from set\_seed).
#' }
#' \item{\code{save_interval}}{
#' \code{integer}. Write data to historyonly every \code{save_interval} time steps. Default is 1.
#' }
#' \item{\code{save_context}}{
#' \code{logical}. Save the context matrices \code{X} to the History log during a simulation?
#' }
#' \item{\code{save_theta}}{
#' \code{logical}. Save the parameter list \code{theta} to the History log during a simulation?
#' }
#' \item{\code{do_parallel}}{
#' \code{logical}. Run \code{Simulator} processes in parallel?
#' }
#' \item{\code{worker_max}}{
#' \code{integer}. Specifies how many parallel workers are to be used.
#' If unspecified, the amount of workers defaults to \code{max(workers_available)-1}.
#' }
#' \item{\code{t_over_sims}}{
#' \code{logical}. Of use to, among others, offline Bandits.
#' If \code{t_over_sims} is set to \code{TRUE}, the current \code{Simulator}
#' iterates over all rows in a data set for each repeated simulation.
#' If \code{FALSE}, it splits the data into \code{simulations} parts,
#' and a different subset of the data for each repeat of an agent's simulation.
#' }
#' \item{\code{set_seed}}{
#' \code{integer}. Sets the seed of R's random number generator for the current \code{Simulator}.
#' }
#' \item{\code{progress_file}}{
#' \code{logical}. If \code{TRUE}, \code{Simulator} writes \code{workers_progress.log},
#' \code{agents_progress.log} and \code{parallel.log} files to the current working directory,
#' allowing you to keep track of respectively \code{workers}, \code{agents},
#' and potential errors when running a \code{Simulator} in parallel mode.
#' }
#' \item{\code{log_interval}}{
#' \code{integer}. Sets the log write interval. Default every 1000 time steps.
#' }
#' \item{\code{include_packages}}{
#' \code{List}. List of packages that (one of) the policies depend on. If a \code{Policy} requires an
#' R package to be loaded, this option can be used to load that package on each of the workers.
#' Ignored if \code{do_parallel} is \code{FALSE}.
#' }
#' \item{\code{chunk_multiplier}}{
#' \code{integer} By default, simulations are equally divided over available workers, and every
#' worker saves its simulation results to a local history file which is then aggregated.
#' Depending on workload, network bandwith, memory size and other variables it can sometimes be useful to
#' break these workloads into smaller chunks. This can be done by setting the chunk_multiplier to some
#' integer value, where the number of chunks will total chunk_multiplier x number_of_workers.
#' }
#' \item{\code{policy_time_loop}}{
#' \code{logical} In the case of replay style bandits, a Simulator's horizon equals the number of
#' accepted plus the number of rejected data points or samples. If \code{policy_time_loop} is \code{TRUE},
#' the horizon equals the number of accepted data points or samples. That is, when \code{policy_time_loop}
#' is \code{TRUE}, a Simulator will keep running until the number of data points saved to History is
#' equal to the Simulator's horizon.
#' }
#'
#' }
#'
#' @section Methods:
#'
#' \describe{
#'
#' \item{\code{reset()}}{
#' Resets a \code{Simulator} instance to its original initialisation values.
#' }
#'
#' \item{\code{run()}}{
#' Runs a \code{Simulator} instance.
#' }
#' \item{\code{history}}{
#' Active binding, read access to Simulator's History instance.
#' }
#'
#' }
#'
#' @seealso
#'
#' Core contextual classes: \code{\link{Bandit}}, \code{\link{Policy}}, \code{\link{Simulator}},
#' \code{\link{Agent}}, \code{\link{History}}, \code{\link{Plot}}
#'
#' Bandit subclass examples: \code{\link{BasicBernoulliBandit}}, \code{\link{ContextualLogitBandit}}, \code{\link{OfflineReplayEvaluatorBandit}}
#'
#' Policy subclass examples: \code{\link{EpsilonGreedyPolicy}}, \code{\link{ContextualLinTSPolicy}}
#'
#' @examples
#' \dontrun{
#'
#' policy <- EpsilonGreedyPolicy$new(epsilon = 0.1)
#' bandit <- BasicBernoulliBandit$new(weights = c(0.6, 0.1, 0.1))
#' agent <- Agent$new(policy, bandit, name = "E.G.", sparse = 0.5)
#'
#' history <- Simulator$new(agents = agent,
#' horizon = 10,
#' simulations = 10)$run()
#'
#' summary(history)
#'
#' plot(history)
#'
#' dt <- history$get_data_table()
#'
#' df <- history$get_data_frame()
#'
#' print(history$cumulative$E.G.$cum_regret_sd)
#'
#' print(history$cumulative$E.G.$cum_regret)
#'
#' }
#'
NULL