Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: import_stream internal method for Series to support Arrow C stream interface #1078

Merged
merged 27 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
98a81ec
refactor: import_stream method for Series
eitsupi May 5, 2024
16ec40f
test: add test
eitsupi May 5, 2024
9f095c1
test: more tests for nanoarrow
eitsupi May 6, 2024
2b976c1
refactor: handle name in rust side
eitsupi May 6, 2024
c6b4ee7
refactor!: use the Arrow C Stream interface inside `as_polars_df(<arr…
eitsupi May 6, 2024
8de1e21
refactor: simplify type check for struct nanoarrow_array_stream
eitsupi May 6, 2024
bbba938
feat: as_polars_df for arrow::RecordBatchReader
eitsupi May 6, 2024
350689c
refactor!: rewrite `as_polars_df` to use Arrow C Stream interface
eitsupi May 6, 2024
e4fdaad
docs(news): add bluets
eitsupi May 6, 2024
f7d917a
Merge remote-tracking branch 'upstream/main' into import_stream
eitsupi May 6, 2024
a70fbf6
test: ensure auto rechunk
eitsupi May 6, 2024
a543e95
test: add comment about the test case
eitsupi May 6, 2024
4cab33b
docs: update examples to remove removed options
eitsupi May 6, 2024
fa5e600
Revert "refactor!: rewrite `as_polars_df` to use Arrow C Stream inter…
eitsupi May 6, 2024
e04762c
feat: add the experimental argument to use the import_stream method i…
eitsupi May 6, 2024
fe06f29
feat: re add `$from_arrow_record_batches`
eitsupi May 7, 2024
2fde92a
feat: add `experimental` option to use C stream interface
eitsupi May 7, 2024
303ea0b
test: re add tests for as_polars_df(<ArrowTabular>)
eitsupi May 7, 2024
89fc55c
test: update snapshot
eitsupi May 7, 2024
e3b9567
chore: auto formatting
eitsupi May 7, 2024
5a97690
docs(news): update NEWS about C stream interface
eitsupi May 7, 2024
2b5e8a6
refactor: simplify
eitsupi May 7, 2024
038afb6
fix: `experimental` argument for as_polars_df(<nanoarrow_array_stream…
eitsupi May 8, 2024
b2277fb
docs(news): more notes about the Arrow C stream interface
eitsupi May 8, 2024
db284af
test: skip judges should be done inside `.cases`
eitsupi May 8, 2024
ccb51e7
refactor: simplify
eitsupi May 8, 2024
b6cc521
test: add more tests for the experimental argument
eitsupi May 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ S3method(as_polars_df,RPolarsLazyFrame)
S3method(as_polars_df,RPolarsLazyGroupBy)
S3method(as_polars_df,RPolarsRollingGroupBy)
S3method(as_polars_df,RPolarsSeries)
S3method(as_polars_df,RecordBatchReader)
S3method(as_polars_df,data.frame)
S3method(as_polars_df,default)
S3method(as_polars_df,nanoarrow_array)
Expand All @@ -171,6 +172,7 @@ S3method(as_polars_series,RPolarsChainedThen)
S3method(as_polars_series,RPolarsExpr)
S3method(as_polars_series,RPolarsSeries)
S3method(as_polars_series,RPolarsThen)
S3method(as_polars_series,RecordBatchReader)
S3method(as_polars_series,clock_sys_time)
S3method(as_polars_series,clock_time_point)
S3method(as_polars_series,clock_zoned_time)
Expand Down
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
- New S3 methods `nanoarrow::as_nanoarrow_array_stream()` and `nanoarrow::infer_nanoarrow_schema()`
for `RPolarsSeries` (#1076).
- New method `$dt$is_leap_year()` (#1077).
- `as_polars_df()` and `as_polars_series()` supports `arrow::RecordBatchReader` (#1078).
- The new `experimental` argument for `as_polars_df(<ArrowTabular>)`, `as_polars_df(<RecordBatchReader>)`,
eitsupi marked this conversation as resolved.
Show resolved Hide resolved
`as_polars_series(<nanoarrow_array_stream>)` (#1078).

## Polars R Package 0.16.3

Expand Down
111 changes: 76 additions & 35 deletions R/as_polars.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,36 @@
#' [$collect()][LazyFrame_collect] or [$fetch()][LazyFrame_fetch], depending on
#' whether the number of rows to fetch is infinite or not.
#' @rdname as_polars_df
#' @inheritParams as_polars_series
#' @param x Object to convert to a polars DataFrame.
#' @param ... Additional arguments passed to methods.
#' @return a [DataFrame][DataFrame_class]
#' @examplesIf requireNamespace("arrow", quietly = TRUE)
#' # Convert the row names of a data frame to a column
#' as_polars_df(mtcars, rownames = "car")
#'
#' # Convert an arrow Table to a polars DataFrame
#' at = arrow::arrow_table(x = 1:5, y = 6:10)
#' as_polars_df(at)
#'
#' # Convert an arrow Table, with renaming all columns
#' # Convert a data frame, with renaming all columns
#' as_polars_df(
#' at,
#' data.frame(x = 1, y = 2),
#' schema = c("a", "b")
#' )
#'
#' # Convert an arrow Table, with renaming and casting all columns
#' # Convert a data frame, with renaming and casting all columns
#' as_polars_df(
#' at,
#' data.frame(x = 1, y = 2),
#' schema = list(b = pl$Int64, a = pl$String)
#' )
#'
#' # Convert an arrow Table, with casting some columns
#' # Convert a data frame, with casting some columns
#' as_polars_df(
#' at,
#' data.frame(x = 1, y = 2),
#' schema_overrides = list(y = pl$String) # cast some columns
#' )
#'
#' # Convert an arrow Table to a polars DataFrame
#' at = arrow::arrow_table(x = 1:5, y = 6:10)
#' as_polars_df(at)
#'
#' # Create a polars DataFrame from a data.frame
#' lf = as_polars_df(mtcars)$lazy()
#'
Expand Down Expand Up @@ -212,13 +213,33 @@ as_polars_df.ArrowTabular = function(
...,
rechunk = TRUE,
schema = NULL,
schema_overrides = NULL) {
schema_overrides = NULL,
experimental = FALSE) {
arrow_to_rpldf(
x,
rechunk = rechunk,
schema = schema,
schema_overrides = schema_overrides
)
schema_overrides = schema_overrides,
experimental = experimental
) |>
result() |>
unwrap("in as_polars_df():")
}


#' @rdname as_polars_df
#' @export
as_polars_df.RecordBatchReader = function(x, ..., experimental = FALSE) {
uw = \(res) unwrap(res, "in as_polars_df(<RecordBatchReader>):")

if (isTRUE(experimental)) {
as_polars_series(x, name = "")$to_frame()$unnest("") |>
result() |>
uw()
} else {
.pr$DataFrame$from_arrow_record_batches(x$batches()) |>
uw()
}
}


Expand Down Expand Up @@ -248,19 +269,12 @@ as_polars_df.nanoarrow_array = function(x, ...) {
#' @rdname as_polars_df
#' @export
as_polars_df.nanoarrow_array_stream = function(x, ...) {
if (!inherits(nanoarrow::infer_nanoarrow_ptype(x$get_schema()), "data.frame")) {
if (!identical(nanoarrow::nanoarrow_schema_parse(x$get_schema())$type, "struct")) {
Err_plain("Can't convert non-struct array stream to RPolarsDataFrame") |>
unwrap("in as_polars_df(<nanoarrow_array_stream>):")
}

series = as_polars_series.nanoarrow_array_stream(x, name = NULL)

if (length(series)) {
series$to_frame()$unnest("")
} else {
# TODO: support 0-length array stream
pl$DataFrame()
}
as_polars_series.nanoarrow_array_stream(x, name = "")$to_frame()$unnest("")
}


Expand Down Expand Up @@ -397,6 +411,20 @@ as_polars_series.Array = function(x, name = NULL, ..., rechunk = TRUE) {
as_polars_series.ChunkedArray = as_polars_series.Array


#' @rdname as_polars_series
#' @export
as_polars_series.RecordBatchReader = function(x, name = NULL, ...) {
stream_out = polars_allocate_array_stream()
x$export_to_c(stream_out)

.pr$Series$import_stream(
name %||% "",
stream_out
) |>
unwrap("in as_polars_series(<RecordBatchReader>):")
}


#' @rdname as_polars_series
#' @export
as_polars_series.nanoarrow_array = function(x, name = NULL, ...) {
Expand All @@ -406,26 +434,39 @@ as_polars_series.nanoarrow_array = function(x, name = NULL, ...) {
}


#' @param experimental If `TRUE`, use experimental Arrow C stream interface inside the function.
#' This argument is experimental and may be removed in the future.
#' @rdname as_polars_series
#' @export
as_polars_series.nanoarrow_array_stream = function(x, name = NULL, ...) {
as_polars_series.nanoarrow_array_stream = function(x, name = NULL, ..., experimental = FALSE) {
on.exit(x$release())

list_of_arrays = nanoarrow::collect_array_stream(x, validate = FALSE)
if (isTRUE(experimental)) {
stream_out = polars_allocate_array_stream()
nanoarrow::nanoarrow_pointer_export(x, stream_out)

if (length(list_of_arrays) < 1L) {
# TODO: support 0-length array stream
out = pl$Series(name = name)
} else {
out = as_polars_series.nanoarrow_array(list_of_arrays[[1L]], name = name)
lapply(
list_of_arrays[-1L],
\(array) .pr$Series$append_mut(out, as_polars_series.nanoarrow_array(array))
.pr$Series$import_stream(
name %||% "",
stream_out
) |>
invisible()
}
unwrap("in as_polars_series(<nanoarrow_array_stream>):")
} else {
list_of_arrays = nanoarrow::collect_array_stream(x, validate = FALSE)

out
if (length(list_of_arrays) < 1L) {
# TODO: support 0-length array stream
out = pl$Series(name = name)
} else {
out = as_polars_series.nanoarrow_array(list_of_arrays[[1L]], name = name)
lapply(
list_of_arrays[-1L],
\(array) .pr$Series$append_mut(out, as_polars_series.nanoarrow_array(array))
) |>
invisible()
}

out
}
}


Expand Down
13 changes: 9 additions & 4 deletions R/construction.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
#' If schema names or types do not match `x`, the columns will be renamed/recast.
#' If `NULL` (default), convert columns as is.
#' @param schema_overrides named list of DataTypes. Cast some columns to the DataType.
#' @param experimental If `TRUE`, use the Arrow C stream interface.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you inherit that param from as_polars_series()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.
This is a internal function and doesn't have Rd file, so roxygen comment inheritance does not make sense.

#' @noRd
#' @return RPolarsDataFrame
arrow_to_rpldf = function(at, schema = NULL, schema_overrides = NULL, rechunk = TRUE) {
arrow_to_rpldf = function(
at, schema = NULL, schema_overrides = NULL, rechunk = TRUE, ..., experimental = FALSE) {
# new column names by schema, #todo get names if schema not NULL
n_cols = at$num_columns

Expand Down Expand Up @@ -53,9 +55,12 @@ arrow_to_rpldf = function(at, schema = NULL, schema_overrides = NULL, rechunk =
if (tbl$num_rows == 0L) {
rdf = pl$DataFrame() # TODO: support creating 0-row DataFrame
} else {
rdf = unwrap(
.pr$DataFrame$from_arrow_record_batches(arrow::as_record_batch_reader(tbl)$batches())
)
if (isTRUE(experimental)) {
rdf = as_polars_series(arrow::as_record_batch_reader(tbl))$to_frame()$unnest("")
} else {
rdf = .pr$DataFrame$from_arrow_record_batches(arrow::as_record_batch_reader(tbl)$batches()) |>
unwrap()
}
}
} else {
rdf = pl$DataFrame()
Expand Down
10 changes: 3 additions & 7 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#' @useDynLib polars, .registration = TRUE
NULL

polars_allocate_array_stream <- function() .Call(wrap__polars_allocate_array_stream)

all_horizontal <- function(dotdotdot) .Call(wrap__all_horizontal, dotdotdot)

any_horizontal <- function(dotdotdot) .Call(wrap__any_horizontal, dotdotdot)
Expand Down Expand Up @@ -58,12 +60,6 @@ struct_ <- function(exprs, eager, schema) .Call(wrap__struct_, exprs, eager, sch

dtype_str_repr <- function(dtype) .Call(wrap__dtype_str_repr, dtype)

new_arrow_stream <- function() .Call(wrap__new_arrow_stream)

arrow_stream_to_df <- function(robj_str) .Call(wrap__arrow_stream_to_df, robj_str)

arrow_stream_to_series <- function(robj_str) .Call(wrap__arrow_stream_to_series, robj_str)

mem_address <- function(robj) .Call(wrap__mem_address, robj)

clone_robj <- function(robj) .Call(wrap__clone_robj, robj)
Expand Down Expand Up @@ -1376,7 +1372,7 @@ RPolarsSeries$struct_fields <- function() .Call(wrap__RPolarsSeries__struct_fiel

RPolarsSeries$export_stream <- function(stream_ptr, pl_flavor) invisible(.Call(wrap__RPolarsSeries__export_stream, self, stream_ptr, pl_flavor))

RPolarsSeries$from_arrow_array_stream_str <- function(name, robj_str) .Call(wrap__RPolarsSeries__from_arrow_array_stream_str, name, robj_str)
RPolarsSeries$import_stream <- function(name, stream_ptr) .Call(wrap__RPolarsSeries__import_stream, name, stream_ptr)

RPolarsSeries$from_arrow_array_robj <- function(name, array) .Call(wrap__RPolarsSeries__from_arrow_array_robj, name, array)

Expand Down
35 changes: 24 additions & 11 deletions man/as_polars_df.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion man/as_polars_series.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions src/rust/src/arrow_interop/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod to_rust;

use polars_core::utils::arrow;

use extendr_api::prelude::*;
use std::result::Result;

Expand Down Expand Up @@ -61,3 +63,16 @@ impl RPackage for NanoArrowRPackage {
"#)
}
}

#[extendr]
pub fn polars_allocate_array_stream() -> Robj {
let aas = Box::new(arrow::ffi::ArrowArrayStream::empty());
let x = Box::leak(aas); // leak box to make lifetime static
let x = x as *mut arrow::ffi::ArrowArrayStream;
eitsupi marked this conversation as resolved.
Show resolved Hide resolved
format!("{:?}", x as usize).into()
}

extendr_module! {
mod arrow_interop;
fn polars_allocate_array_stream;
}
Loading
Loading