Skip to content

Commit

Permalink
Read/write in parallel (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
keller-mark committed Jun 21, 2024
1 parent 0825c03 commit efe90f7
Show file tree
Hide file tree
Showing 24 changed files with 604 additions and 47 deletions.
6 changes: 5 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ Suggests:
rmarkdown,
crul,
Rarr,
vcr (>= 0.6.0)
vcr (>= 0.6.0),
pbapply,
parallel,
future,
bench
8 changes: 1 addition & 7 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export(is_key_error)
export(is_scalar)
export(is_slice)
export(obj_list)
export(pizzarr_option_defaults)
export(pizzarr_sample)
export(slice)
export(zarr_create)
Expand All @@ -39,10 +40,3 @@ export(zarr_open_array)
export(zarr_open_group)
export(zarr_save_array)
export(zb_slice)
importFrom(R6,R6Class)
importFrom(memoise,memoise)
importFrom(memoise,timeout)
importFrom(qs,lz4_compress_raw)
importFrom(qs,lz4_decompress_raw)
importFrom(qs,zstd_compress_raw)
importFrom(qs,zstd_decompress_raw)
10 changes: 4 additions & 6 deletions R/numcodecs.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ Codec <- R6::R6Class("Codec",
#' ZSTD compressor for Zarr
#' @title ZstdCodec Class
#' @docType class
#' @importFrom qs zstd_compress_raw zstd_decompress_raw
#' @description
#' Class representing a ZSTD compressor

Expand All @@ -62,7 +61,7 @@ ZstdCodec <- R6::R6Class("ZstdCodec",
#' @return Compressed data.
encode = function(buf, zarr_arr) {
# Reference: https://github.com/traversc/qs/blob/84e30f4/R/RcppExports.R#L16
result <- zstd_compress_raw(buf, self$level)
result <- qs::zstd_compress_raw(buf, self$level)
return(result)
},
#' @description
Expand All @@ -71,7 +70,7 @@ ZstdCodec <- R6::R6Class("ZstdCodec",
#' @param zarr_arr The ZarrArray instance.
#' @return Un-compressed data.
decode = function(buf, zarr_arr) {
result <- zstd_decompress_raw(buf)
result <- qs::zstd_decompress_raw(buf)
return(result)
},
#' @description
Expand All @@ -89,7 +88,6 @@ ZstdCodec <- R6::R6Class("ZstdCodec",
#' LZ4 compressor for Zarr
#' @title Lz4Codec Class
#' @docType class
#' @importFrom qs lz4_compress_raw lz4_decompress_raw
#' @description
#' Class representing a LZ4 compressor
#'
Expand All @@ -115,7 +113,7 @@ Lz4Codec <- R6::R6Class("Lz4Codec",
#' @return Compressed data.
encode = function(buf, zarr_arr) {
# Reference: https://github.com/traversc/qs/blob/84e30f4/R/RcppExports.R#L24
body <- lz4_compress_raw(buf, self$acceleration)
body <- qs::lz4_compress_raw(buf, self$acceleration)

# The compressed output includes a 4-byte header storing the original size
# of the decompressed data as a little-endian 32-bit integer.
Expand All @@ -135,7 +133,7 @@ Lz4Codec <- R6::R6Class("Lz4Codec",
decode = function(buf, zarr_arr) {
body <- buf[5:length(buf)]

result <- lz4_decompress_raw(body)
result <- qs::lz4_decompress_raw(body)
return(result)
},
#' @description
Expand Down
4 changes: 4 additions & 0 deletions R/onload.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#' @keywords internal
.onLoad <- function(libname = NULL, pkgname = NULL) {
init_options()
}
61 changes: 61 additions & 0 deletions R/options.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Adapted from https://github.com/IRkernel/IRkernel/blob/master/R/options.r

#' pizzarr_option_defaults
#' @export
pizzarr_option_defaults <- list(
pizzarr.http_store_cache_time_seconds = 3600,
pizzarr.parallel_read_enabled = FALSE,
pizzarr.parallel_write_enabled = FALSE
)

#' @keywords internal
parse_parallel_option <- function(val) {
if(val == "future") {
return("future")
}
logical_val <- suppressWarnings(as.logical(val))
integer_val <- suppressWarnings(as.integer(val))

if(is.na(integer_val)) {
return(logical_val)
}
if(integer_val <= 1) {
return(as.logical(integer_val))
}
return(integer_val)
}

#' @keywords internal
from_env <- list(
PIZZARR_HTTP_STORE_CACHE_TIME_SECONDS = as.integer,
PIZZARR_PARALLEL_READ_ENABLED = parse_parallel_option,
PIZZARR_PARALLEL_WRITE_ENABLED = parse_parallel_option
)

# converts e.g. jupyter.log_level to JUPYTER_LOG_LEVEL
#' @keywords internal
opt_to_env <- function(nms) {
gsub('.', '_', toupper(nms), fixed = TRUE)
}

# called in .onLoad
#' @keywords internal
init_options <- function() {
for (opt_name in names(pizzarr_option_defaults)) {
# skip option if it is already set, e.g. in the Rprofile
if (is.null(getOption(opt_name))) {
# prepare `options` call from the default
call_arg <- pizzarr_option_defaults[opt_name] # single [] preserve names

# if an env var is set, get value from it.
env_name <- opt_to_env(opt_name)
convert <- from_env[[env_name]]
env_val <- Sys.getenv(env_name, unset = NA)
if (!is.null(convert) && !is.na(env_val)) {
call_arg[[opt_name]] <- convert(env_val)
}

do.call(options, call_arg)
}
}
}
65 changes: 49 additions & 16 deletions R/stores.R
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ MemoryStore <- R6::R6Class("MemoryStore",
#' HttpStore for Zarr
#' @title HttpStore Class
#' @docType class
#' @importFrom memoise memoise timeout
#' @description
#' Store class that uses HTTP requests.
#' Read-only. Depends on the `crul` package.
Expand All @@ -363,16 +362,44 @@ HttpStore <- R6::R6Class("HttpStore",
headers = NULL,
client = NULL,
zmetadata = NULL,
mem_get = NULL,
cache_time_seconds = 3600,
make_request_memoized = NULL,
cache_enabled = NULL,
cache_time_seconds = NULL,
make_request = function(item) {
key <- item_to_key(item)

# mem_get caches in memory on a per-session basis.
res <- private$mem_get(private$client,
paste(private$base_path, key, sep="/"))

return(res)
path <- paste(private$base_path, key, sep="/")

parallel_option <- getOption("pizzarr.parallel_read_enabled")
parallel_option <- parse_parallel_option(parallel_option)
is_parallel <- is_truthy_parallel_option(parallel_option)

if(is_parallel) {
# For some reason, the crul::HttpClient fails in parallel settings
# This alternative
# with HttpRequest and AsyncVaried seems to work.
# Reference: https://docs.ropensci.org/crul/articles/async.html
req <- crul::HttpRequest$new(
url = private$domain,
opts = private$options,
headers = private$headers
)
req$get(path = path)
res <- crul::AsyncVaried$new(req)
res$request()
return(unclass(res$responses())[[1]])
} else {
return(private$client$get(path = path))
}
},
memoize_make_request = function() {
if(private$cache_enabled) {
private$make_request_memoized <- memoise::memoise(
function(key) private$make_request(key),
~memoise::timeout(private$cache_time_seconds)
)
} else {
private$make_request_memoized <- private$make_request
}
},
get_zmetadata = function() {
res <- private$make_request(".zmetadata")
Expand Down Expand Up @@ -405,16 +432,20 @@ HttpStore <- R6::R6Class("HttpStore",
private$domain <- paste(segments[1:3], collapse="/")
private$base_path <- paste(segments[4:length(segments)], collapse="/")

if(!requireNamespace("crul", quietly = TRUE)) stop("HttpStore requires the crul package")

if(!requireNamespace("crul", quietly = TRUE)) {
stop("HttpStore requires the crul package")
}

private$client <- crul::HttpClient$new(
url = private$domain,
opts = private$options,
headers = private$headers
)

private$mem_get <- memoise(function(client, path) client$get(path),
~timeout(private$cache_time_seconds))

private$cache_time_seconds <- getOption("pizzarr.http_store_cache_time_seconds")
private$cache_enabled <- private$cache_time_seconds > 0

private$memoize_make_request()

private$zmetadata <- private$get_zmetadata()
},
Expand All @@ -423,7 +454,7 @@ HttpStore <- R6::R6Class("HttpStore",
#' @param item The item key.
#' @return The item data in a vector of type raw.
get_item = function(item) {
res <- private$make_request(item)
res <- private$make_request_memoized(item)
return(res$content)
},
#' @description
Expand All @@ -438,7 +469,7 @@ HttpStore <- R6::R6Class("HttpStore",
} else if(!is.null(self$get_consolidated_metadata())) {
return(FALSE)
} else {
res <- private$make_request(item)
res <- private$make_request_memoized(item)

return(res$status_code == 200)
}
Expand Down Expand Up @@ -475,6 +506,8 @@ HttpStore <- R6::R6Class("HttpStore",
#' @param seconds number of seconds until cache is invalid -- 0 for no cache
set_cache_time_seconds = function(seconds) {
private$cache_time_seconds <- seconds
# We need to re-memoize.
private$memoize_make_request()
}
)
)
8 changes: 8 additions & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,14 @@ item_to_key <- function(item) {
key
}

#' @keywords internal
is_truthy_parallel_option <- function(val) {
if(val == "future") {
return(TRUE)
}
return(as.logical(as.integer(val)))
}

try_from_zmeta <- function(key, store) {
store$get_consolidated_metadata()$metadata[[key]]
}
Expand Down
Loading

0 comments on commit efe90f7

Please sign in to comment.