diff --git a/DESCRIPTION b/DESCRIPTION index 498e484..3646faa 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -38,4 +38,8 @@ Suggests: rmarkdown, crul, Rarr, - vcr (>= 0.6.0) + vcr (>= 0.6.0), + pbapply, + parallel, + future, + bench diff --git a/NAMESPACE b/NAMESPACE index b2ba27d..2e33938 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -27,6 +27,7 @@ export(is_key_error) export(is_scalar) export(is_slice) export(obj_list) +export(pizzarr_option_defaults) export(pizzarr_sample) export(slice) export(zarr_create) @@ -39,10 +40,3 @@ export(zarr_open_array) export(zarr_open_group) export(zarr_save_array) export(zb_slice) -importFrom(R6,R6Class) -importFrom(memoise,memoise) -importFrom(memoise,timeout) -importFrom(qs,lz4_compress_raw) -importFrom(qs,lz4_decompress_raw) -importFrom(qs,zstd_compress_raw) -importFrom(qs,zstd_decompress_raw) diff --git a/R/numcodecs.R b/R/numcodecs.R index d39e6b6..99e1845 100644 --- a/R/numcodecs.R +++ b/R/numcodecs.R @@ -36,7 +36,6 @@ Codec <- R6::R6Class("Codec", #' ZSTD compressor for Zarr #' @title ZstdCodec Class #' @docType class -#' @importFrom qs zstd_compress_raw zstd_decompress_raw #' @description #' Class representing a ZSTD compressor @@ -62,7 +61,7 @@ ZstdCodec <- R6::R6Class("ZstdCodec", #' @return Compressed data. encode = function(buf, zarr_arr) { # Reference: https://github.com/traversc/qs/blob/84e30f4/R/RcppExports.R#L16 - result <- zstd_compress_raw(buf, self$level) + result <- qs::zstd_compress_raw(buf, self$level) return(result) }, #' @description @@ -71,7 +70,7 @@ ZstdCodec <- R6::R6Class("ZstdCodec", #' @param zarr_arr The ZarrArray instance. #' @return Un-compressed data. decode = function(buf, zarr_arr) { - result <- zstd_decompress_raw(buf) + result <- qs::zstd_decompress_raw(buf) return(result) }, #' @description @@ -89,7 +88,6 @@ ZstdCodec <- R6::R6Class("ZstdCodec", #' LZ4 compressor for Zarr #' @title Lz4Codec Class #' @docType class -#' @importFrom qs lz4_compress_raw lz4_decompress_raw #' @description #' Class representing a LZ4 compressor #' @@ -115,7 +113,7 @@ Lz4Codec <- R6::R6Class("Lz4Codec", #' @return Compressed data. encode = function(buf, zarr_arr) { # Reference: https://github.com/traversc/qs/blob/84e30f4/R/RcppExports.R#L24 - body <- lz4_compress_raw(buf, self$acceleration) + body <- qs::lz4_compress_raw(buf, self$acceleration) # The compressed output includes a 4-byte header storing the original size # of the decompressed data as a little-endian 32-bit integer. @@ -135,7 +133,7 @@ Lz4Codec <- R6::R6Class("Lz4Codec", decode = function(buf, zarr_arr) { body <- buf[5:length(buf)] - result <- lz4_decompress_raw(body) + result <- qs::lz4_decompress_raw(body) return(result) }, #' @description diff --git a/R/onload.R b/R/onload.R new file mode 100644 index 0000000..efb5777 --- /dev/null +++ b/R/onload.R @@ -0,0 +1,4 @@ +#' @keywords internal +.onLoad <- function(libname = NULL, pkgname = NULL) { + init_options() +} diff --git a/R/options.R b/R/options.R new file mode 100644 index 0000000..a3043af --- /dev/null +++ b/R/options.R @@ -0,0 +1,61 @@ +# Adapted from https://github.com/IRkernel/IRkernel/blob/master/R/options.r + +#' pizzarr_option_defaults +#' @export +pizzarr_option_defaults <- list( + pizzarr.http_store_cache_time_seconds = 3600, + pizzarr.parallel_read_enabled = FALSE, + pizzarr.parallel_write_enabled = FALSE +) + +#' @keywords internal +parse_parallel_option <- function(val) { + if(val == "future") { + return("future") + } + logical_val <- suppressWarnings(as.logical(val)) + integer_val <- suppressWarnings(as.integer(val)) + + if(is.na(integer_val)) { + return(logical_val) + } + if(integer_val <= 1) { + return(as.logical(integer_val)) + } + return(integer_val) +} + +#' @keywords internal +from_env <- list( + PIZZARR_HTTP_STORE_CACHE_TIME_SECONDS = as.integer, + PIZZARR_PARALLEL_READ_ENABLED = parse_parallel_option, + PIZZARR_PARALLEL_WRITE_ENABLED = parse_parallel_option +) + +# converts e.g. jupyter.log_level to JUPYTER_LOG_LEVEL +#' @keywords internal +opt_to_env <- function(nms) { + gsub('.', '_', toupper(nms), fixed = TRUE) +} + +# called in .onLoad +#' @keywords internal +init_options <- function() { + for (opt_name in names(pizzarr_option_defaults)) { + # skip option if it is already set, e.g. in the Rprofile + if (is.null(getOption(opt_name))) { + # prepare `options` call from the default + call_arg <- pizzarr_option_defaults[opt_name] # single [] preserve names + + # if an env var is set, get value from it. + env_name <- opt_to_env(opt_name) + convert <- from_env[[env_name]] + env_val <- Sys.getenv(env_name, unset = NA) + if (!is.null(convert) && !is.na(env_val)) { + call_arg[[opt_name]] <- convert(env_val) + } + + do.call(options, call_arg) + } + } +} \ No newline at end of file diff --git a/R/stores.R b/R/stores.R index bebdfb0..96af264 100644 --- a/R/stores.R +++ b/R/stores.R @@ -346,7 +346,6 @@ MemoryStore <- R6::R6Class("MemoryStore", #' HttpStore for Zarr #' @title HttpStore Class #' @docType class -#' @importFrom memoise memoise timeout #' @description #' Store class that uses HTTP requests. #' Read-only. Depends on the `crul` package. @@ -363,16 +362,44 @@ HttpStore <- R6::R6Class("HttpStore", headers = NULL, client = NULL, zmetadata = NULL, - mem_get = NULL, - cache_time_seconds = 3600, + make_request_memoized = NULL, + cache_enabled = NULL, + cache_time_seconds = NULL, make_request = function(item) { key <- item_to_key(item) - - # mem_get caches in memory on a per-session basis. - res <- private$mem_get(private$client, - paste(private$base_path, key, sep="/")) - - return(res) + path <- paste(private$base_path, key, sep="/") + + parallel_option <- getOption("pizzarr.parallel_read_enabled") + parallel_option <- parse_parallel_option(parallel_option) + is_parallel <- is_truthy_parallel_option(parallel_option) + + if(is_parallel) { + # For some reason, the crul::HttpClient fails in parallel settings + # This alternative + # with HttpRequest and AsyncVaried seems to work. + # Reference: https://docs.ropensci.org/crul/articles/async.html + req <- crul::HttpRequest$new( + url = private$domain, + opts = private$options, + headers = private$headers + ) + req$get(path = path) + res <- crul::AsyncVaried$new(req) + res$request() + return(unclass(res$responses())[[1]]) + } else { + return(private$client$get(path = path)) + } + }, + memoize_make_request = function() { + if(private$cache_enabled) { + private$make_request_memoized <- memoise::memoise( + function(key) private$make_request(key), + ~memoise::timeout(private$cache_time_seconds) + ) + } else { + private$make_request_memoized <- private$make_request + } }, get_zmetadata = function() { res <- private$make_request(".zmetadata") @@ -405,16 +432,20 @@ HttpStore <- R6::R6Class("HttpStore", private$domain <- paste(segments[1:3], collapse="/") private$base_path <- paste(segments[4:length(segments)], collapse="/") - if(!requireNamespace("crul", quietly = TRUE)) stop("HttpStore requires the crul package") - + if(!requireNamespace("crul", quietly = TRUE)) { + stop("HttpStore requires the crul package") + } + private$client <- crul::HttpClient$new( url = private$domain, opts = private$options, headers = private$headers ) - - private$mem_get <- memoise(function(client, path) client$get(path), - ~timeout(private$cache_time_seconds)) + + private$cache_time_seconds <- getOption("pizzarr.http_store_cache_time_seconds") + private$cache_enabled <- private$cache_time_seconds > 0 + + private$memoize_make_request() private$zmetadata <- private$get_zmetadata() }, @@ -423,7 +454,7 @@ HttpStore <- R6::R6Class("HttpStore", #' @param item The item key. #' @return The item data in a vector of type raw. get_item = function(item) { - res <- private$make_request(item) + res <- private$make_request_memoized(item) return(res$content) }, #' @description @@ -438,7 +469,7 @@ HttpStore <- R6::R6Class("HttpStore", } else if(!is.null(self$get_consolidated_metadata())) { return(FALSE) } else { - res <- private$make_request(item) + res <- private$make_request_memoized(item) return(res$status_code == 200) } @@ -475,6 +506,8 @@ HttpStore <- R6::R6Class("HttpStore", #' @param seconds number of seconds until cache is invalid -- 0 for no cache set_cache_time_seconds = function(seconds) { private$cache_time_seconds <- seconds + # We need to re-memoize. + private$memoize_make_request() } ) ) diff --git a/R/utils.R b/R/utils.R index f5fd7d6..75c55a8 100644 --- a/R/utils.R +++ b/R/utils.R @@ -349,6 +349,14 @@ item_to_key <- function(item) { key } +#' @keywords internal +is_truthy_parallel_option <- function(val) { + if(val == "future") { + return(TRUE) + } + return(as.logical(as.integer(val))) +} + try_from_zmeta <- function(key, store) { store$get_consolidated_metadata()$metadata[[key]] } diff --git a/R/zarr-array.R b/R/zarr-array.R index 673bd7f..6879488 100644 --- a/R/zarr-array.R +++ b/R/zarr-array.R @@ -3,7 +3,6 @@ #' The Zarr Array class. #' @title ZarrArray Class #' @docType class -#' @importFrom R6 R6Class #' @description #' Instantiate an array from an initialized store. #' @param selection Selections are lists containing either scalars, strings, or Slice objects. Two character @@ -324,9 +323,27 @@ ZarrArray <- R6::R6Class("ZarrArray", return(out) } - # TODO: use queue to handle async iterator - for(proj in indexer$iter()) { - private$chunk_getitem(proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes) + parallel_option <- getOption("pizzarr.parallel_read_enabled") + cl <- parse_parallel_option(parallel_option) + is_parallel <- is_truthy_parallel_option(cl) + + apply_func <- lapply + if(is_parallel) { + if(!requireNamespace("pbapply", quietly = TRUE)) { + stop("Parallel reading requires the 'pbapply' package.") + } + apply_func <- pbapply::pblapply + } + + parts <- indexer$iter() + part1_results <- apply_func(parts, function(proj, cl = NA) { + private$chunk_getitem_part1(proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes) + }, cl = cl) + + for(i in seq_along(parts)) { + proj <- parts[[i]] + part1_result <- part1_results[[i]] + private$chunk_getitem_part2(part1_result, proj$chunk_coords, proj$chunk_sel, out, proj$out_sel, drop_axes = indexer$drop_axes) } # Return scalar instead of zero-dimensional array. @@ -441,11 +458,26 @@ ZarrArray <- R6::R6Class("ZarrArray", stop("Unknown data type for setting :(") } - # TODO: use queue to handle async iterator - for (proj in indexer$iter()) { + parallel_option <- getOption("pizzarr.parallel_write_enabled") + cl <- parse_parallel_option(parallel_option) + is_parallel <- is_truthy_parallel_option(cl) + + apply_func <- lapply + if(is_parallel) { + if(!requireNamespace("pbapply", quietly=TRUE)) { + stop("Parallel writing requires the 'pbapply' package.") + } + apply_func <- pbapply::pblapply + } + + parts <- indexer$iter() + apply_func(parts, function(proj, cl = NA) { chunk_value <- private$get_chunk_value(proj, indexer, value, selection_shape) private$chunk_setitem(proj$chunk_coords, proj$chunk_sel, chunk_value) - } + NULL + }, cl = cl) + + return() } }, #' @description @@ -480,7 +512,71 @@ ZarrArray <- R6::R6Class("ZarrArray", # TODO }, #' @description - #' TODO + #' For parallel usage + chunk_getitem_part1 = function(chunk_coords, chunk_selection, out, out_selection, drop_axes = NA, fields = NA) { + if(length(chunk_coords) != length(private$chunks)) { + stop("Inconsistent shapes: chunkCoordsLength: ${chunkCoords.length}, cDataShapeLength: ${this.chunkDataShape.length}") + } + c_key <- private$chunk_key(chunk_coords) + + result <- tryCatch({ + c_data <- self$get_chunk_store()$get_item(c_key) + decoded_chunk <- private$decode_chunk(c_data) + chunk_nested_arr <- NestedArray$new(decoded_chunk, shape=private$chunks, dtype=private$dtype, order = private$order) + return(list( + status = "success", + value = chunk_nested_arr + )) + }, error = function(cond) { + return(list(status = "error", value = cond)) + }) + return(result) + }, + #' @description + #' For parallel usage + chunk_getitem_part2 = function(part1_result, chunk_coords, chunk_selection, out, out_selection, drop_axes = NA, fields = NA) { + c_key <- private$chunk_key(chunk_coords) + + if(part1_result$status == "success") { + chunk_nested_arr <- part1_result$value + + if("NestedArray" %in% class(out)) { + if(is_contiguous_selection(out_selection) && is_total_slice(chunk_selection, private$chunks) && is.null(private$filters)) { + out$set(out_selection, chunk_nested_arr) + return(TRUE) + } + + # Decode chunk + chunk <- chunk_nested_arr + tmp <- chunk$get(chunk_selection) + + if(!is_na(drop_axes)) { + stop("Drop axes is not supported yet") + } + out$set(out_selection, tmp) + } else { + # RawArray + # Copies chunk by index directly into output. Doesn't matter if selection is contiguous + # since store/output are different shapes/strides. + #out$set(out_selection, private$chunk_buffer_to_raw_array(decoded_chunk), chunk_selection) + stop("TODO: support out for chunk_getitem") + } + } else { + # There was an error - this corresponds to the Catch statement in the non-parallel version. + cond <- part1_result$value + if(is_key_error(cond)) { + # fill with scalar if cKey doesn't exist in store + if(!is_na(private$fill_value)) { + out$set(out_selection, as_scalar(private$fill_value)) + } + } else { + print(cond$message) + stop("Different type of error - rethrow") + } + } + }, + #' @description + #' For non-parallel usage chunk_getitem = function(chunk_coords, chunk_selection, out, out_selection, drop_axes = NA, fields = NA) { # TODO # Reference: https://github.com/gzuidhof/zarr.js/blob/15e3a3f00eb19f0133018fb65f002311ea53bb7c/src/core/index.ts#L380 diff --git a/man/ZarrArray.Rd b/man/ZarrArray.Rd index df4d60f..9b69386 100644 --- a/man/ZarrArray.Rd +++ b/man/ZarrArray.Rd @@ -221,7 +221,13 @@ TODO TODO -TODO +For parallel usage + + +For parallel usage + + +For non-parallel usage TODO diff --git a/man/as_scalar.Rd b/man/as_scalar.Rd index 1708c1d..3f4a75d 100644 --- a/man/as_scalar.Rd +++ b/man/as_scalar.Rd @@ -2,7 +2,8 @@ % Please edit documentation in R/atomic.R \name{as_scalar} \alias{as_scalar} -\title{Convert a value to a scalar to opt-out of R default vector casting behavior.} +\title{Convert a value to a scalar to opt-out of R default vector casting behavior. +This uses the \code{jsonlite::unbox} function to "tag" the value as a scalar.} \usage{ as_scalar(obj) } @@ -14,4 +15,5 @@ The value wrapped as a scalar. } \description{ Convert a value to a scalar to opt-out of R default vector casting behavior. +This uses the \code{jsonlite::unbox} function to "tag" the value as a scalar. } diff --git a/man/ensure_integer_vec.Rd b/man/ensure_integer_vec.Rd new file mode 100644 index 0000000..752e1c8 --- /dev/null +++ b/man/ensure_integer_vec.Rd @@ -0,0 +1,14 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/atomic.R +\name{ensure_integer_vec} +\alias{ensure_integer_vec} +\title{Ensure that scalars and lists of integers are converted to +an R vector of integers.} +\usage{ +ensure_integer_vec(selection) +} +\description{ +Ensure that scalars and lists of integers are converted to +an R vector of integers. +} +\keyword{internal} diff --git a/man/ensure_list.Rd b/man/ensure_list.Rd new file mode 100644 index 0000000..ef1875d --- /dev/null +++ b/man/ensure_list.Rd @@ -0,0 +1,14 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/atomic.R +\name{ensure_list} +\alias{ensure_list} +\title{Ensure that scalars, single slices, and R integer vectors are converted +to a list containing either R integer vectors or slice instances as values.} +\usage{ +ensure_list(selection) +} +\description{ +Ensure that scalars, single slices, and R integer vectors are converted +to a list containing either R integer vectors or slice instances as values. +} +\keyword{internal} diff --git a/man/is_integer.Rd b/man/is_integer.Rd new file mode 100644 index 0000000..94d726c --- /dev/null +++ b/man/is_integer.Rd @@ -0,0 +1,12 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/atomic.R +\name{is_integer} +\alias{is_integer} +\title{Check if a value is an integer R vector or scalar.} +\usage{ +is_integer(s) +} +\description{ +Check if a value is an integer R vector or scalar. +} +\keyword{internal} diff --git a/man/is_integer_list.Rd b/man/is_integer_list.Rd new file mode 100644 index 0000000..54be216 --- /dev/null +++ b/man/is_integer_list.Rd @@ -0,0 +1,12 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/atomic.R +\name{is_integer_list} +\alias{is_integer_list} +\title{Check that a value is a list of one or more integers.} +\usage{ +is_integer_list(s) +} +\description{ +Check that a value is a list of one or more integers. +} +\keyword{internal} diff --git a/man/is_integer_scalar.Rd b/man/is_integer_scalar.Rd new file mode 100644 index 0000000..e3d5ef7 --- /dev/null +++ b/man/is_integer_scalar.Rd @@ -0,0 +1,12 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/atomic.R +\name{is_integer_scalar} +\alias{is_integer_scalar} +\title{Check if a value is both a scalar (tagged by as_scalar) and an integer.} +\usage{ +is_integer_scalar(s) +} +\description{ +Check if a value is both a scalar (tagged by as_scalar) and an integer. +} +\keyword{internal} diff --git a/man/is_integer_vec.Rd b/man/is_integer_vec.Rd new file mode 100644 index 0000000..65ae76c --- /dev/null +++ b/man/is_integer_vec.Rd @@ -0,0 +1,14 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/atomic.R +\name{is_integer_vec} +\alias{is_integer_vec} +\title{Check that a value is a vector of one or more integers and has not been +explicitly tagged as a scalar.} +\usage{ +is_integer_vec(s) +} +\description{ +Check that a value is a vector of one or more integers and has not been +explicitly tagged as a scalar. +} +\keyword{internal} diff --git a/man/is_scalar.Rd b/man/is_scalar.Rd index 49d8dab..9e1ae67 100644 --- a/man/is_scalar.Rd +++ b/man/is_scalar.Rd @@ -2,7 +2,7 @@ % Please edit documentation in R/atomic.R \name{is_scalar} \alias{is_scalar} -\title{Check if a value is a scalar.} +\title{Check if a value is a scalar (i.e., a one-element vector that was converted with as_scalar).} \usage{ is_scalar(s) } @@ -13,5 +13,5 @@ is_scalar(s) TRUE if the value is a scalar, FALSE otherwise. } \description{ -Check if a value is a scalar. +Check if a value is a scalar (i.e., a one-element vector that was converted with as_scalar). } diff --git a/man/pizzarr_option_defaults.Rd b/man/pizzarr_option_defaults.Rd new file mode 100644 index 0000000..2bca041 --- /dev/null +++ b/man/pizzarr_option_defaults.Rd @@ -0,0 +1,16 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/options.R +\docType{data} +\name{pizzarr_option_defaults} +\alias{pizzarr_option_defaults} +\title{pizzarr_option_defaults} +\format{ +An object of class \code{list} of length 3. +} +\usage{ +pizzarr_option_defaults +} +\description{ +pizzarr_option_defaults +} +\keyword{datasets} diff --git a/man/zb_slice.Rd b/man/zb_slice.Rd index f32517c..f4c0203 100644 --- a/man/zb_slice.Rd +++ b/man/zb_slice.Rd @@ -2,7 +2,7 @@ % Please edit documentation in R/slicing.R \name{zb_slice} \alias{zb_slice} -\title{Convenience function for the internal Sliceclass constructor +\title{Convenience function for the internal Slice class constructor with zero-based indexing and exclusive stop index.} \usage{ zb_slice(start, stop = NA, step = NA) @@ -15,6 +15,6 @@ zb_slice(start, stop = NA, step = NA) \item{step}{The step size.} } \description{ -Convenience function for the internal Sliceclass constructor +Convenience function for the internal Slice class constructor with zero-based indexing and exclusive stop index. } diff --git a/pkgdown/_pkgdown.yml b/pkgdown/_pkgdown.yml index 79db12e..1bd8129 100644 --- a/pkgdown/_pkgdown.yml +++ b/pkgdown/_pkgdown.yml @@ -77,6 +77,7 @@ reference: - is_scalar - is_key_error - pizzarr_sample + - pizzarr_option_defaults - Dtype @@ -88,6 +89,7 @@ articles: - ome-ngff - remote-ome-ngff - remote-anndata + - parallel - title: Articles navbar: Troubleshooting contents: diff --git a/tests/testthat/setup-utils.R b/tests/testthat/setup-utils.R index 26eda5a..c7e3dbd 100644 --- a/tests/testthat/setup-utils.R +++ b/tests/testthat/setup-utils.R @@ -1,8 +1,9 @@ setup({ - + pbapply::pboptions(type = "none") + do.call(options, pizzarr_option_defaults) }) teardown({ - + do.call(options, pizzarr_option_defaults) }) zarr_volcano <- function() { diff --git a/tests/testthat/test-compat.R b/tests/testthat/test-compat.R index d2b0211..1401912 100644 --- a/tests/testthat/test-compat.R +++ b/tests/testthat/test-compat.R @@ -79,7 +79,7 @@ test_that("Can open Zarr group and read a 1D 2-byte integer array with Blosc com expect_equal(selection$data, array(data=c(1, 2, 3, 4), dim=c(4))) } - if(require("Rarr", quietly=TRUE)) { + if(requireNamespace("Rarr", quietly=TRUE)) { f() } else { expect_error(f()) diff --git a/tests/testthat/test-parallel.R b/tests/testthat/test-parallel.R new file mode 100644 index 0000000..2d82c1d --- /dev/null +++ b/tests/testthat/test-parallel.R @@ -0,0 +1,119 @@ +library(pizzarr) + +SlowGettingDirectoryStore <- R6::R6Class("SlowGettingDirectoryStore", + inherit = DirectoryStore, + public = list( + get_item = function(key) { + # Simulate a slow read such as an HTTP request. + Sys.sleep(1.0/10.0) + return(super$get_item(key)) + } + ) +) + +SlowSettingDirectoryStore <- R6::R6Class("SlowSettingDirectoryStore", + inherit = DirectoryStore, + public = list( + set_item = function(key, value) { + # Simulate a slow write such as an HTTP request. + Sys.sleep(1.0/10.0) + return(super$set_item(key, value)) + } + ) +) + +get_dog_arr <- function(slow_setting = FALSE) { + # The path to the root of the OME-NGFF Zarr store. + root <- pizzarr_sample("dog.ome.zarr") + + # Open the OME-NGFF as a DirectoryStore. + if(slow_setting) { + store <- SlowSettingDirectoryStore$new(root) + } else { + store <- SlowGettingDirectoryStore$new(root) + } + + zarr_arr <- zarr_open(store = store, path = "/0") + return(zarr_arr) +} + +run_parallel_get <- function(num_workers) { + options(pizzarr.parallel_read_enabled = num_workers) + + zarr_arr <- get_dog_arr() + arr <- zarr_arr$get_item("...")$data + + options(pizzarr.parallel_read_enabled = FALSE) + + return(sum(arr)) +} + + +run_parallel_set <- function(num_workers) { + options(pizzarr.parallel_write_enabled = num_workers) + + zarr_arr <- get_dog_arr(slow_setting = TRUE) + arr <- zarr_arr$get_item("...")$data + + # Set the contents of the array to be twice the original value. + zarr_arr$set_item("...", arr * 2.0) + + doubled_arr <- zarr_arr$get_item("...")$data + + options(pizzarr.parallel_write_enabled = FALSE) + + return(sum(doubled_arr)) +} + +test_that("can run get_item() in parallel", { + bench_df <- bench::mark( + run_parallel_get(1), + run_parallel_get(2), + run_parallel_get(4), + iterations = 10, + memory = FALSE, + filter_gc = FALSE + ) + + expect_equal(unlist(bench_df$result), rep(134538481, 3)) + expect_equal(bench_df$total_time[[1]] > bench_df$total_time[[2]], TRUE) + expect_equal(bench_df$total_time[[2]] > bench_df$total_time[[3]], TRUE) +}) + +test_that("can run set_item() in parallel", { + bench_df <- bench::mark( + run_parallel_set(1), + run_parallel_set(2), + run_parallel_set(4), + iterations = 10, + memory = FALSE, + filter_gc = FALSE + ) + + expect_equal(unlist(bench_df$result), rep(134538481*2.0, 3)) + expect_equal(bench_df$total_time[[1]] > bench_df$total_time[[2]], TRUE) + expect_equal(bench_df$total_time[[2]] > bench_df$total_time[[3]], TRUE) +}) + +test_that("parse_parallel_option works as expected", { + expect_equal(parse_parallel_option("future"), "future") + expect_equal(parse_parallel_option("0"), FALSE) + expect_equal(parse_parallel_option(0), FALSE) + expect_equal(parse_parallel_option("FALSE"), FALSE) + expect_equal(parse_parallel_option(FALSE), FALSE) + expect_equal(parse_parallel_option("1"), TRUE) + expect_equal(parse_parallel_option(1), TRUE) + expect_equal(parse_parallel_option("TRUE"), TRUE) + expect_equal(parse_parallel_option(TRUE), TRUE) + expect_equal(parse_parallel_option("2"), 2) + expect_equal(parse_parallel_option(2), 2) +}) + +test_that("is_truthy_parallel_option works as expected", { + expect_equal(is_truthy_parallel_option("future"), TRUE) + expect_equal(is_truthy_parallel_option(FALSE), FALSE) + expect_equal(is_truthy_parallel_option(0), FALSE) + expect_equal(is_truthy_parallel_option(TRUE), TRUE) + expect_equal(is_truthy_parallel_option(1), TRUE) + expect_equal(is_truthy_parallel_option(2), TRUE) +}) diff --git a/vignettes/parallel.Rmd b/vignettes/parallel.Rmd new file mode 100644 index 0000000..d82a644 --- /dev/null +++ b/vignettes/parallel.Rmd @@ -0,0 +1,135 @@ +--- +title: "Read and write in parallel" +output: rmarkdown::html_vignette +vignette: > + %\VignetteIndexEntry{Read and write in parallel} + %\VignetteEngine{knitr::rmarkdown} + %\VignetteEncoding{UTF-8} +--- + +```{r, include = FALSE} + knitr::opts_chunk$set( + collapse = TRUE, + comment = "#>", + out.width = "100%" +) +``` + +By default, reads and writes are performed sequentially (i.e., not in parallel). +Users can opt-in to parallel read/write functionality via `options`. + +```{r} +library(pizzarr) + +if(!requireNamespace("pbapply", quietly = TRUE)) { + install.packages("pbapply") +} +``` + +## Simulate slow operations + +```{r} +SlowDirectoryStore <- R6::R6Class("SlowDirectoryStore", + inherit = DirectoryStore, + public = list( + get_item = function(key) { + Sys.sleep(0.5) # Simulate a slow read. + return(super$get_item(key)) + }, + set_item = function(key, value) { + Sys.sleep(0.5) # Simulate a slow write. + return(super$set_item(key, value)) + } + ) +) +``` + +## Read in parallel + +Provide an integer >= 2 to the option to use forking-based parallelism. +This value will be passed to the `cl` parameter of `pbapply::pblapply`. + +```{r} +options(pizzarr.parallel_read_enabled = 4) + +root <- pizzarr_sample("dog.ome.zarr") +store <- SlowDirectoryStore$new(root) +zarr_arr <- zarr_open(store = store, path = "/0") +arr <- zarr_arr$get_item("...")$data +sum(arr) +``` + +## Write in parallel + +```{r} +options(pizzarr.parallel_write_enabled = 4) + +root <- pizzarr_sample("dog.ome.zarr") +store <- SlowDirectoryStore$new(root) +zarr_arr <- zarr_open(store = store, path = "/0") +arr <- zarr_arr$get_item("...")$data +zarr_arr$set_item("...", arr * 2.0) +doubled_arr <- zarr_arr$get_item("...")$data +sum(doubled_arr) +``` + +## Parallel operations with future backend + +To use the `future` backend for `pbapply`, set the value of the option to the string `"future"`. + +Cluster-based: + +```{r} +options(pizzarr.parallel_read_enabled = "future") + +cl <- parallel::makeCluster(2) +future::plan(future::cluster, workers = cl) + +root <- pizzarr_sample("dog.ome.zarr") +store <- SlowDirectoryStore$new(root) +zarr_arr <- zarr_open(store = store, path = "/0") +arr <- zarr_arr$get_item("...")$data +sum(arr) + +parallel::stopCluster(cl) +``` + +Multisession-based: + +```{r} +options(pizzarr.parallel_read_enabled = "future") + +future::plan(future::multisession, workers = 4) + +root <- pizzarr_sample("dog.ome.zarr") +store <- SlowDirectoryStore$new(root) +zarr_arr <- zarr_open(store = store, path = "/0") +arr <- zarr_arr$get_item("...")$data +sum(arr) +``` + +## Sequential operations + +To return to sequential mode, run: + +```{r} +options( + pizzarr.parallel_read_enabled = FALSE, + pizzarr.parallel_write_enabled = FALSE +) +``` + +## Disable progress bar + +Parallel operations are implemented with `pbapply`. +To disable the progress bar, run: + +```{r} +pbapply::pboptions(type = "none") +``` + +To re-enable, run: + +```{r} +pbapply::pboptions(type = "timer") +``` \ No newline at end of file