Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read/write in parallel #91

Merged
merged 11 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,7 @@ Suggests:
rmarkdown,
crul,
Rarr,
vcr (>= 0.6.0)
vcr (>= 0.6.0),
foreach,
doParallel,
bench
9 changes: 2 additions & 7 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export(is_key_error)
export(is_scalar)
export(is_slice)
export(obj_list)
export(pizzarr_option_defaults)
export(pizzarr_sample)
export(slice)
export(zarr_create)
Expand All @@ -39,10 +40,4 @@ export(zarr_open_array)
export(zarr_open_group)
export(zarr_save_array)
export(zb_slice)
importFrom(R6,R6Class)
importFrom(memoise,memoise)
importFrom(memoise,timeout)
importFrom(qs,lz4_compress_raw)
importFrom(qs,lz4_decompress_raw)
importFrom(qs,zstd_compress_raw)
importFrom(qs,zstd_decompress_raw)
importFrom(foreach,"%dopar%")
10 changes: 4 additions & 6 deletions R/numcodecs.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ Codec <- R6::R6Class("Codec",
#' ZSTD compressor for Zarr
#' @title ZstdCodec Class
#' @docType class
#' @importFrom qs zstd_compress_raw zstd_decompress_raw
#' @description
#' Class representing a ZSTD compressor

Expand All @@ -62,7 +61,7 @@ ZstdCodec <- R6::R6Class("ZstdCodec",
#' @return Compressed data.
encode = function(buf, zarr_arr) {
# Reference: https://github.com/traversc/qs/blob/84e30f4/R/RcppExports.R#L16
result <- zstd_compress_raw(buf, self$level)
result <- qs::zstd_compress_raw(buf, self$level)
return(result)
},
#' @description
Expand All @@ -71,7 +70,7 @@ ZstdCodec <- R6::R6Class("ZstdCodec",
#' @param zarr_arr The ZarrArray instance.
#' @return Un-compressed data.
decode = function(buf, zarr_arr) {
result <- zstd_decompress_raw(buf)
result <- qs::zstd_decompress_raw(buf)
return(result)
},
#' @description
Expand All @@ -89,7 +88,6 @@ ZstdCodec <- R6::R6Class("ZstdCodec",
#' LZ4 compressor for Zarr
#' @title Lz4Codec Class
#' @docType class
#' @importFrom qs lz4_compress_raw lz4_decompress_raw
#' @description
#' Class representing a LZ4 compressor
#'
Expand All @@ -115,7 +113,7 @@ Lz4Codec <- R6::R6Class("Lz4Codec",
#' @return Compressed data.
encode = function(buf, zarr_arr) {
# Reference: https://github.com/traversc/qs/blob/84e30f4/R/RcppExports.R#L24
body <- lz4_compress_raw(buf, self$acceleration)
body <- qs::lz4_compress_raw(buf, self$acceleration)

# The compressed output includes a 4-byte header storing the original size
# of the decompressed data as a little-endian 32-bit integer.
Expand All @@ -135,7 +133,7 @@ Lz4Codec <- R6::R6Class("Lz4Codec",
decode = function(buf, zarr_arr) {
body <- buf[5:length(buf)]

result <- lz4_decompress_raw(body)
result <- qs::lz4_decompress_raw(body)
return(result)
},
#' @description
Expand Down
4 changes: 4 additions & 0 deletions R/onload.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#' @keywords internal
.onLoad <- function(libname = NULL, pkgname = NULL) {
init_options()
}
44 changes: 44 additions & 0 deletions R/options.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Adapted from https://github.com/IRkernel/IRkernel/blob/master/R/options.r

#' pizzarr_option_defaults
#' @export
pizzarr_option_defaults <- list(
pizzarr.http_store_cache_time_seconds = 3600,
pizzarr.parallel_read_enabled = FALSE,
pizzarr.parallel_write_enabled = FALSE
)

#' @keywords internal
from_env <- list(
PIZZARR_HTTP_STORE_CACHE_TIME_SECONDS = as.integer,
PIZZARR_PARALLEL_READ_ENABLED = as.logical,
PIZZARR_PARALLEL_WRITE_ENABLED = as.logical
)

# converts e.g. jupyter.log_level to JUPYTER_LOG_LEVEL
#' @keywords internal
opt_to_env <- function(nms) {
gsub('.', '_', toupper(nms), fixed = TRUE)
}

# called in .onLoad
#' @keywords internal
init_options <- function() {
for (opt_name in names(pizzarr_option_defaults)) {
# skip option if it is already set, e.g. in the Rprofile
if (is.null(getOption(opt_name))) {
# prepare `options` call from the default
call_arg <- pizzarr_option_defaults[opt_name] # single [] preserve names

# if an env var is set, get value from it.
env_name <- opt_to_env(opt_name)
convert <- from_env[[env_name]]
env_val <- Sys.getenv(env_name, unset = NA)
if (!is.null(convert) && !is.na(env_val)) {
call_arg[[opt_name]] <- convert(env_val)
}

do.call(options, call_arg)
}
}
}
61 changes: 40 additions & 21 deletions R/stores.R
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ MemoryStore <- R6::R6Class("MemoryStore",
#' HttpStore for Zarr
#' @title HttpStore Class
#' @docType class
#' @importFrom memoise memoise timeout
#' @description
#' Store class that uses HTTP requests.
#' Read-only. Depends on the `crul` package.
Expand All @@ -363,16 +362,36 @@ HttpStore <- R6::R6Class("HttpStore",
headers = NULL,
client = NULL,
zmetadata = NULL,
mem_get = NULL,
cache_time_seconds = 3600,
make_request_memoized = NULL,
cache_enabled = NULL,
cache_time_seconds = NULL,
make_request = function(item) {
key <- item_to_key(item)

# mem_get caches in memory on a per-session basis.
res <- private$mem_get(private$client,
paste(private$base_path, key, sep="/"))

return(res)

# For some reason, the crul::HttpClient fails in parallel settings
# (when used inside foreach %dopar% loops). This alternative
# with HttpRequest and AsyncVaried seems to work.
# Reference: https://docs.ropensci.org/crul/articles/async.html
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there maybe an argument to switch to a more traditional http client here? Seems like there's a lot of complexity coming together.

Maybe another option is to have one simple pathway for metadata gets and another non-caching parallelized get utility for iterating over chunks?

req <- crul::HttpRequest$new(
url = private$domain,
opts = private$options,
headers = private$headers
)
req$get(path = paste(private$base_path, key, sep="/"))
res <- crul::AsyncVaried$new(req)
res$request()

return(unclass(res$responses())[[1]])
},
memoize_make_request = function() {
if(private$cache_enabled) {
private$make_request_memoized <- memoise::memoise(
function(key) private$make_request(key),
~memoise::timeout(private$cache_time_seconds)
)
} else {
private$make_request_memoized <- private$make_request
}
},
get_zmetadata = function() {
res <- private$make_request(".zmetadata")
Expand Down Expand Up @@ -405,16 +424,14 @@ HttpStore <- R6::R6Class("HttpStore",
private$domain <- paste(segments[1:3], collapse="/")
private$base_path <- paste(segments[4:length(segments)], collapse="/")

if(!requireNamespace("crul", quietly = TRUE)) stop("HttpStore requires the crul package")

private$client <- crul::HttpClient$new(
url = private$domain,
opts = private$options,
headers = private$headers
)

private$mem_get <- memoise(function(client, path) client$get(path),
~timeout(private$cache_time_seconds))
if(!requireNamespace("crul", quietly = TRUE)) {
stop("HttpStore requires the crul package")
}

private$cache_time_seconds <- getOption("pizzarr.http_store_cache_time_seconds")
private$cache_enabled <- private$cache_time_seconds > 0

private$memoize_make_request()

private$zmetadata <- private$get_zmetadata()
},
Expand All @@ -423,7 +440,7 @@ HttpStore <- R6::R6Class("HttpStore",
#' @param item The item key.
#' @return The item data in a vector of type raw.
get_item = function(item) {
res <- private$make_request(item)
res <- private$make_request_memoized(item)
return(res$content)
},
#' @description
Expand All @@ -438,7 +455,7 @@ HttpStore <- R6::R6Class("HttpStore",
} else if(!is.null(self$get_consolidated_metadata())) {
return(FALSE)
} else {
res <- private$make_request(item)
res <- private$make_request_memoized(item)

return(res$status_code == 200)
}
Expand Down Expand Up @@ -475,6 +492,8 @@ HttpStore <- R6::R6Class("HttpStore",
#' @param seconds number of seconds until cache is invalid -- 0 for no cache
set_cache_time_seconds = function(seconds) {
private$cache_time_seconds <- seconds
# We need to re-memoize.
private$memoize_make_request()
}
)
)
106 changes: 97 additions & 9 deletions R/zarr-array.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#' The Zarr Array class.
#' @title ZarrArray Class
#' @docType class
#' @importFrom R6 R6Class
#' @importFrom foreach %dopar%
#' @description
#' Instantiate an array from an initialized store.
#' @param selection Selections are lists containing either scalars, strings, or Slice objects. Two character
Expand Down Expand Up @@ -324,9 +324,23 @@ ZarrArray <- R6::R6Class("ZarrArray",
return(out)
}

# TODO: use queue to handle async iterator
for(proj in indexer$iter()) {
private$chunk_getitem(proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes)
if(getOption("pizzarr.parallel_read_enabled")) {
if(!requireNamespace("foreach", quietly = TRUE)) {
stop("Parallel reading requires the 'foreach' package.")
}
parts <- indexer$iter()
part1_results <- foreach::foreach(proj=parts) %dopar% {
private$chunk_getitem_part1(proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes)
}
for(i in seq_along(parts)) {
proj <- parts[[i]]
part1_result <- part1_results[[i]]
private$chunk_getitem_part2(part1_result, proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes)
}
} else {
for(proj in indexer$iter()) {
private$chunk_getitem(proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes)
}
}

# Return scalar instead of zero-dimensional array.
Expand Down Expand Up @@ -441,10 +455,20 @@ ZarrArray <- R6::R6Class("ZarrArray",
stop("Unknown data type for setting :(")
}

# TODO: use queue to handle async iterator
for (proj in indexer$iter()) {
chunk_value <- private$get_chunk_value(proj, indexer, value, selection_shape)
private$chunk_setitem(proj$chunk_coords, proj$chunk_sel, chunk_value)
if(getOption("pizzarr.parallel_write_enabled")) {
if(!requireNamespace("foreach", quietly=TRUE)) {
stop("Parallel writing requires the 'foreach' package.")
}
foreach::foreach(proj=indexer$iter(), .combine = c, .inorder = FALSE, .init = NULL) %dopar% {
chunk_value <- private$get_chunk_value(proj, indexer, value, selection_shape)
private$chunk_setitem(proj$chunk_coords, proj$chunk_sel, chunk_value)
NULL # return null since we are not using the combined result
}
} else {
for (proj in indexer$iter()) {
chunk_value <- private$get_chunk_value(proj, indexer, value, selection_shape)
private$chunk_setitem(proj$chunk_coords, proj$chunk_sel, chunk_value)
}
}
}
},
Expand Down Expand Up @@ -480,7 +504,71 @@ ZarrArray <- R6::R6Class("ZarrArray",
# TODO
},
#' @description
#' TODO
#' For parallel usage
chunk_getitem_part1 = function(chunk_coords, chunk_selection, out, out_selection, drop_axes = NA, fields = NA) {
if(length(chunk_coords) != length(private$chunks)) {
stop("Inconsistent shapes: chunkCoordsLength: ${chunkCoords.length}, cDataShapeLength: ${this.chunkDataShape.length}")
}
c_key <- private$chunk_key(chunk_coords)

result <- tryCatch({
c_data <- self$get_chunk_store()$get_item(c_key)
decoded_chunk <- private$decode_chunk(c_data)
chunk_nested_arr <- NestedArray$new(decoded_chunk, shape=private$chunks, dtype=private$dtype, order = private$order)
return(list(
status = "success",
value = chunk_nested_arr
))
}, error = function(cond) {
return(list(status = "error", value = cond))
})
return(result)
},
#' @description
#' For parallel usage
chunk_getitem_part2 = function(part1_result, chunk_coords, chunk_selection, out, out_selection, drop_axes = NA, fields = NA) {
c_key <- private$chunk_key(chunk_coords)

if(part1_result$status == "success") {
chunk_nested_arr <- part1_result$value

if("NestedArray" %in% class(out)) {
if(is_contiguous_selection(out_selection) && is_total_slice(chunk_selection, private$chunks) && is.null(private$filters)) {
out$set(out_selection, chunk_nested_arr)
return(TRUE)
}

# Decode chunk
chunk <- chunk_nested_arr
tmp <- chunk$get(chunk_selection)

if(!is_na(drop_axes)) {
stop("Drop axes is not supported yet")
}
out$set(out_selection, tmp)
} else {
# RawArray
# Copies chunk by index directly into output. Doesn't matter if selection is contiguous
# since store/output are different shapes/strides.
#out$set(out_selection, private$chunk_buffer_to_raw_array(decoded_chunk), chunk_selection)
stop("TODO: support out for chunk_getitem")
}
} else {
# There was an error - this corresponds to the Catch statement in the non-parallel version.
cond <- part1_result$value
if(is_key_error(cond)) {
# fill with scalar if cKey doesn't exist in store
if(!is_na(private$fill_value)) {
out$set(out_selection, as_scalar(private$fill_value))
}
} else {
print(cond$message)
stop("Different type of error - rethrow")
}
}
},
#' @description
#' For non-parallel usage
chunk_getitem = function(chunk_coords, chunk_selection, out, out_selection, drop_axes = NA, fields = NA) {
# TODO
# Reference: https://github.com/gzuidhof/zarr.js/blob/15e3a3f00eb19f0133018fb65f002311ea53bb7c/src/core/index.ts#L380
Expand Down
8 changes: 7 additions & 1 deletion man/ZarrArray.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading