Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: save ArchiveAsync to a data.table with ArchiveAsyncFrozen #275

Merged
merged 3 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ RoxygenNote: 7.3.2
Collate:
'Archive.R'
'ArchiveAsync.R'
'ArchiveAsyncFrozen.R'
'ArchiveBatch.R'
'CallbackAsync.R'
'CallbackBatch.R'
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ S3method(bb_optimize,"function")
S3method(bb_optimize,Objective)
export(Archive)
export(ArchiveAsync)
export(ArchiveAsyncFrozen)
export(ArchiveBatch)
export(CallbackAsync)
export(CallbackBatch)
Expand Down
187 changes: 187 additions & 0 deletions R/ArchiveAsyncFrozen.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@

#' @title Rush Data Storage
#'
#' @description
#' Freezes the Redis data base of an [ArchiveAsync] to a `data.table::data.table()`.
#' No further points can be added to the archive but the data can be accessed and analyzed.
#' Useful when the Redis data base is not permanently available.
#'
#' @section S3 Methods:
#' * `as.data.table(archive)`\cr
#' [ArchiveAsync] -> [data.table::data.table()]\cr
#' Returns a tabular view of all performed function calls of the Objective.
#' The `x_domain` column is unnested to separate columns.
#'
#'
#' @export
ArchiveAsyncFrozen = R6Class("ArchiveAsyncFrozen",
inherit = ArchiveAsync,
cloneable = FALSE,
public = list(

#' @description
#' Creates a new instance of this [R6][R6::R6Class] class.
#'
#' @param archive ([ArchiveAsync])\cr
#' The archive to freeze.
initialize = function(archive) {
private$.frozen_data = copy(archive$data)
self$search_space = archive$search_space
self$codomain = archive$codomain
private$.label = "Frozen Data Storage"
private$.man = "bbotk::ArchiveAsyncFrozen"
},

#' @description
#' Push queued points to the archive.
#'
#' @param xss (list of named `list()`)\cr
#' List of named lists of point values.
push_points = function(xss) {
stop("Archive is frozen")
},

#' @description
#' Pop a point from the queue.
pop_point = function() {
stop("Archive is frozen")
},

#' @description
#' Push running point to the archive.
#'
#' @param xs (named `list`)\cr
#' Named list of point values.
#' @param extra (`list()`)\cr
#' Named list of additional information.
push_running_point = function(xs, extra = NULL) {
stop("Archive is frozen")
},

#' @description
#' Push result to the archive.
#'
#' @param key (`character()`)\cr
#' Key of the point.
#' @param ys (`list()`)\cr
#' Named list of results.
#' @param x_domain (`list()`)\cr
#' Named list of transformed point values.
#' @param extra (`list()`)\cr
#' Named list of additional information.
push_result = function(key, ys, x_domain, extra = NULL) {
stop("Archive is frozen")
},

#' @description
#' Push failed point to the archive.
#'
#' @param key (`character()`)\cr
#' Key of the point.
#' @param message (`character()`)\cr
#' Error message.
push_failed_point = function(key, message) {
stop("Archive is frozen")
},

#' @description
#' Fetch points with a specific state.
#'
#' @param fields (`character()`)\cr
#' Fields to fetch.
#' Defaults to `c("xs", "ys", "xs_extra", "worker_extra", "ys_extra")`.
#' @param states (`character()`)\cr
#' States of the tasks to be fetched.
#' Defaults to `c("queued", "running", "finished", "failed")`.
#' @param reset_cache (`logical(1)`)\cr
#' Whether to reset the cache of the finished points.
data_with_state = function(
fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"),
states = c("queued", "running", "finished", "failed"),
reset_cache = FALSE
) {
stop("Archive is frozen")
},

#' @description
#' Clear all evaluation results from archive.
clear = function() {
stop("Archive is frozen")
}
),

private = list(
.frozen_data = NULL
),

active = list(

#' @field data ([data.table::data.table])\cr
#' Data table with all finished points.
data = function(rhs) {
assert_ro_binding(rhs)
private$.frozen_data
},

#' @field queued_data ([data.table::data.table])\cr
#' Data table with all queued points.
queued_data = function() {
self$data["queued", , on = "state"]
},

#' @field running_data ([data.table::data.table])\cr
#' Data table with all running points.
running_data = function() {
self$data["running", , on = "state"]
},

#' @field finished_data ([data.table::data.table])\cr
#' Data table with all finished points.
finished_data = function() {
self$data["finished", , on = "state"]
},

#' @field failed_data ([data.table::data.table])\cr
#' Data table with all failed points.
failed_data = function() {
self$data["failed", , on = "state"]
},

#' @field n_queued (`integer(1)`)\cr
#' Number of queued points.
n_queued = function() {
nrow(self$queued_data)
},

#' @field n_running (`integer(1)`)\cr
#' Number of running points.
n_running = function() {
nrow(self$running_data)
},

#' @field n_finished (`integer(1)`)\cr
#' Number of finished points.
n_finished = function() {
nrow(self$finished_data)
},

#' @field n_failed (`integer(1)`)\cr
#' Number of failed points.
n_failed = function() {
nrow(self$failed_data)
},

#' @field n_evals (`integer(1)`)\cr
#' Number of evaluations stored in the archive.
n_evals = function() {
nrow(self$finished_data) + nrow(self$failed_data)
}
)
)

#' @export
as.data.table.ArchiveAsync = function(x, keep.rownames = FALSE, unnest = "x_domain", ...) { # nolint
data = x$data
cols = intersect(unnest, names(data))
unnest(data, cols, prefix = "{col}_")
}
24 changes: 24 additions & 0 deletions R/mlr_callbacks.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,27 @@ load_callback_backup = function() {
}
)
}

#' @title Freeze Archive Callback
#'
#' @include CallbackAsync.R
#' @name bbotk.async_freeze_archive
#'
#' @description
#' This [CallbackAsync] freezes the [ArchiveAsync] to [ArchiveAsyncFrozen] after the optimization has finished.
#'
#' @examples
#' clbk("bbotk.async_freeze_archive")
NULL

load_callback_freeze_archive = function() {
callback_async("bbotk.async_freeze_archive",
label = "Archive Freeze Callback",
man = "bbotk::bbotk.async_freeze_archive",
on_optimization_end = function(callback, context) {
context$instance$archive = ArchiveAsyncFrozen$new(context$instance$archive)
}
)
}


1 change: 1 addition & 0 deletions R/zzz.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# callbacks
x = utils::getFromNamespace("mlr_callbacks", ns = "mlr3misc")
x$add("bbotk.backup", load_callback_backup)
x$add("bbotk.async_freeze_archive", load_callback_freeze_archive)

lg = lgr::get_logger("bbotk")
assign("lg", lg, envir = parent.env(environment()))
Expand Down
Loading
Loading