Skip to content

Commit

Permalink
<LazyFrame>$fetch() (#319)
Browse files Browse the repository at this point in the history
Co-authored-by: Etienne Bacher <52219252+etiennebacher@users.noreply.github.com>
  • Loading branch information
sorhawell and etiennebacher authored Aug 30, 2023
1 parent dac097b commit afb7650
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 40 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ features. Unrelated breaking changes and new features are put in separate sectio
- Stream query to file with `pl$sink_ipc()` and `pl$sink_parquet()` (#343)
- New method `$explode()` for `DataFrame` and `LazyFrame` (#314).
- New method `$clone()` for `LazyFrame` (#347).
- New method `$fetch()` for `LazyFrame` (#319).
- New methods `$optimization_toggle()` and `$profile()` for `LazyFrame` (#323).
- `$with_column()` is now deprecated (following upstream `polars`). It will be
removed in 0.9.0. It should be replaced with `$with_columns()` (#313).
Expand Down
1 change: 1 addition & 0 deletions R/error__trait.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ call_to_string = function(call) paste(capture.output(print(call)), collapse = "\
#' @param err any type which impl as.character
#' @param context calling context
#' @keywords internal
#' @noRd
#' @return err as string
#' @examples
#' #
Expand Down
2 changes: 2 additions & 0 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,8 @@ LazyFrame$rename <- function(existing, new) .Call(wrap__LazyFrame__rename, self,

LazyFrame$schema <- function() .Call(wrap__LazyFrame__schema, self)

LazyFrame$fetch <- function(n_rows) .Call(wrap__LazyFrame__fetch, self, n_rows)

LazyFrame$optimization_toggle <- function(type_coercion, predicate_pushdown, projection_pushdown, simplify_expr, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, streaming) .Call(wrap__LazyFrame__optimization_toggle, self, type_coercion, predicate_pushdown, projection_pushdown, simplify_expr, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, streaming)

LazyFrame$profile <- function() .Call(wrap__LazyFrame__profile, self)
Expand Down
117 changes: 109 additions & 8 deletions R/lazyframe__lazy.R
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,17 @@ LazyFrame_filter = "use_extendr_wrapper"
#' run on minimal required memory.
#' @param predicate_pushdown Boolean. Applies filters as early as possible at
#' scan level.
#' @param projection_pushdown Boolean. Select only the columns that are needed at the scan level.
#' @param simplify_expression Boolean. Various optimizations, such as constant folding
#' and replacing expensive operations with faster alternatives.
#' @param projection_pushdown Boolean. Select only the columns that are needed
#' at the scan level.
#' @param simplify_expression Boolean. Various optimizations, such as constant
#' folding and replacing expensive operations with faster alternatives.
#' @param slice_pushdown Boolean. Only load the required slice from the scan
#' Don't materialize sliced outputs
#' level. Don't materialize sliced outputs (e.g. `join$head(10)`).
#' @param comm_subplan_elim Boolean. Will try to cache branching subplans that occur on self-joins
#' or unions.
#' @param comm_subexpr_elim Boolean. Common subexpressions will be cached and reused.
#' or unions.
#' @param comm_subplan_elim Boolean. Will try to cache branching subplans that
#' occur on self-joins or unions.
#' @param comm_subexpr_elim Boolean. Common subexpressions will be cached and
#' reused.
#' @param no_optimization Boolean. Turn off the following optimizations:
#' predicate_pushdown = FALSE
#' projection_pushdown = FALSE
Expand All @@ -297,6 +298,11 @@ LazyFrame_filter = "use_extendr_wrapper"
#' @return A `DataFrame`
#' @examples pl$LazyFrame(iris)$filter(pl$col("Species") == "setosa")$collect()
#' @seealso
#' - [`$fetch()`][LazyFrame_fetch] - fast limited query check
#' - [`$profile()`][LazyFrame_profile] - returns as `$collect()` but also table with each operation
#' profiled.
#' - [`$collect_in_background()`][LazyFrame_collect_in_background] - non-blocking collect returns
#' a future handle. Can also just be used via `$collect(collect_in_background = TRUE)`.
#' - [`$sink_parquet()`][LazyFrame_sink_parquet()] stream query to a parquet file.
#' - [`$sink_ipc()`][LazyFrame_sink_ipc()] stream query to a arrow file.
LazyFrame_collect = function(
Expand Down Expand Up @@ -1183,13 +1189,108 @@ LazyFrame_dtypes = method_as_property(function() {
unwrap("in $dtypes()")
})


#' Fetch `n` rows of a LazyFrame
#'
#' This is similar to `$collect()` but limit the number of rows to collect. It
#' is mostly useful to check that a query works as expected.
#'
#' @keywords LazyFrame
#' @details
#' `$fetch()` does not guarantee the final number of rows in the DataFrame output.
#' It only guarantees that `n` rows are used at the beginning of the query.
#' Filters, join operations and a lower number of rows available in the scanned
#' file influence the final number of rows.
#'
#' @param n_rows Integer. Maximum number of rows to fetch.
#' @param type_coercion Boolean. Coerce types such that operations succeed and
#' run on minimal required memory.
#' @param predicate_pushdown Boolean. Applies filters as early as possible / at
#' scan level.
#' @param projection_pushdown Boolean. Applies filters as early as possible / at
#' scan level.
#' @param simplify_expression Boolean. Cache subtrees/file scans that are used
#' by multiple subtrees in the query plan.
#' @param slice_pushdown Boolean. Only load the required slice from the scan
#' level. Don't materialize sliced outputs (e.g. `join$head(10)`).
#' @param comm_subplan_elim Boolean. Will try to cache branching subplans that
#' occur on self-joins or unions.
#' @param comm_subexpr_elim Boolean. Common subexpressions will be cached and
#' reused.
#' @param no_optimization Boolean. Turn off the following optimizations:
#' predicate_pushdown = FALSE
#' projection_pushdown = FALSE
#' slice_pushdown = FALSE
#' common_subplan_elimination = FALSE
#' @param streaming Boolean. Run parts of the query in a streaming fashion
#' (this is in an alpha state).
#' @return A DataFrame of maximum n_rows
#' @seealso
#' - [`$collect()`][LazyFrame_collect] - regular collect.
#' - [`$profile()`][LazyFrame_profile] - returns as `$collect()` but also table with each operation
#' profiled.
#' - [`$collect_in_background()`][LazyFrame_collect_in_background] - non-blocking collect returns
#' a future handle. Can also just be used via `$collect(collect_in_background = TRUE)`.
#' @examples
#'
#' # fetch 3 rows
#' pl$LazyFrame(iris)$fetch(3)
#'
#' # this fetch-query returns 4 rows, because we started with 3 and appended one
#' # row in the query (see section 'Details')
#' pl$LazyFrame(iris)$select(pl$col("Species")$append("flora gigantica, alien"))$fetch(3)
LazyFrame_fetch = function(
n_rows = 500,
type_coercion = TRUE,
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
comm_subplan_elim = TRUE,
comm_subexpr_elim = TRUE,
no_optimization = FALSE,
streaming = FALSE) {

if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
projection_pushdown = FALSE
slice_pushdown = FALSE
comm_subplan_elim = FALSE
comm_subexpr_elim = FALSE
}

if (isTRUE(streaming)) {
comm_subplan_elim = FALSE
}

self |>
.pr$LazyFrame$optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
streaming
) |>
and_then(\(self) .pr$LazyFrame$fetch(self, n_rows)) |>
unwrap("in $fetch()")
}

#' @title Collect and profile a lazy query.
#' @description This will run the query and return a list containing the materialized DataFrame and
#' a DataFrame that contains profiling information of each node that is executed.
#' @details The units of the timings are microseconds.
#'
#' @keywords LazyFrame
#' @return List of two `DataFrame`s: one with the collected result, the other with the timings of each step.
#' @return List of two `DataFrame`s: one with the collected result, the other with the timings of
#' each step.
#' @seealso
#' - [`$collect()`][LazyFrame_collect] - regular collect.
#' - [`$fetch()`][LazyFrame_fetch] - fast limited query check
#' - [`$collect_in_background()`][LazyFrame_collect_in_background] - non-blocking collect returns
#' a future handle. Can also just be used via `$collect(collect_in_background = TRUE)`.
#' @examples
#'
#' ## Simplest use case
Expand Down
20 changes: 13 additions & 7 deletions man/LazyFrame_collect.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

84 changes: 84 additions & 0 deletions man/LazyFrame_fetch.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion man/LazyFrame_profile.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 0 additions & 23 deletions man/where_in.Rd

This file was deleted.

16 changes: 16 additions & 0 deletions src/rust/src/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,19 @@ pub fn profile_with_r_func_support(lazy_df: pl::LazyFrame) -> RResult<(DataFrame
.map_err(polars_to_rpolars_err)
.map(|(result_df, profile_df)| (DataFrame(result_df), DataFrame(profile_df)))
}

pub fn fetch_with_r_func_support(lazy_df: pl::LazyFrame, n_rows: usize) -> RResult<DataFrame> {
concurrent_handler(
move |tc| {
let retval = lazy_df.fetch(n_rows);
ThreadCom::kill_global(&CONFIG);
drop(tc);
retval
},
serve_r,
&CONFIG,
)
.map_err(|err| RPolarsErr::new().plain(err.to_string()))?
.map_err(polars_to_rpolars_err)
.map(DataFrame)
}
9 changes: 8 additions & 1 deletion src/rust/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::concurrent::{collect_with_r_func_support, profile_with_r_func_support};
use crate::concurrent::{
collect_with_r_func_support, fetch_with_r_func_support, profile_with_r_func_support,
};
use crate::conversion::strings_to_smartstrings;

use crate::lazy::dsl::*;

use crate::rdataframe::DataFrame as RDF;
Expand Down Expand Up @@ -428,6 +431,10 @@ impl LazyFrame {
))
}

fn fetch(&self, n_rows: Robj) -> RResult<RDF> {
fetch_with_r_func_support(self.0.clone(), robj_to!(usize, n_rows)?)
}

#[allow(clippy::too_many_arguments)]
fn optimization_toggle(
&self,
Expand Down
Loading

0 comments on commit afb7650

Please sign in to comment.