Skip to content

Commit

Permalink
feat: allow to export polars internal types via the Arrow C Stream in…
Browse files Browse the repository at this point in the history
…terface of DataFrame (#1075)
  • Loading branch information
eitsupi authored May 5, 2024
1 parent 930bd38 commit 53cc0e7
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 52 deletions.
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ URL: https://pola-rs.github.io/r-polars/,
https://github.com/pola-rs/r-polars,
https://rpolars.r-universe.dev/polars
Suggests:
arrow,
arrow (>= 15.0.1),
bench,
bit64,
callr,
Expand All @@ -32,7 +32,7 @@ Suggests:
jsonlite,
knitr,
lubridate,
nanoarrow,
nanoarrow (>= 0.4.0),
nycflights13,
patrick,
pillar,
Expand Down
4 changes: 1 addition & 3 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ arrow_stream_to_df <- function(robj_str) .Call(wrap__arrow_stream_to_df, robj_st

arrow_stream_to_series <- function(robj_str) .Call(wrap__arrow_stream_to_series, robj_str)

export_df_to_arrow_stream <- function(robj_df, robj_str) .Call(wrap__export_df_to_arrow_stream, robj_df, robj_str)

mem_address <- function(robj) .Call(wrap__mem_address, robj)

clone_robj <- function(robj) .Call(wrap__clone_robj, robj)
Expand Down Expand Up @@ -204,7 +202,7 @@ RPolarsDataFrame$unnest <- function(names) .Call(wrap__RPolarsDataFrame__unnest,

RPolarsDataFrame$partition_by <- function(by, maintain_order, include_key) .Call(wrap__RPolarsDataFrame__partition_by, self, by, maintain_order, include_key)

RPolarsDataFrame$export_stream <- function(stream_ptr) invisible(.Call(wrap__RPolarsDataFrame__export_stream, self, stream_ptr))
RPolarsDataFrame$export_stream <- function(stream_ptr, pl_flavor) invisible(.Call(wrap__RPolarsDataFrame__export_stream, self, stream_ptr, pl_flavor))

RPolarsDataFrame$from_arrow_record_batches <- function(rbr) .Call(wrap__RPolarsDataFrame__from_arrow_record_batches, rbr)

Expand Down
15 changes: 11 additions & 4 deletions R/pkg-arrow.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#' Create a arrow Table from a Polars object
#'
#' @inheritParams DataFrame_write_ipc
#' @param x [A Polars DataFrame][DataFrame_class]
#' @param ... Ignored
#' @rdname S3_as_arrow_table
Expand All @@ -9,8 +10,9 @@
#' pl_df = as_polars_df(mtcars)
#' as_arrow_table(pl_df)
# exported in zzz.R
as_arrow_table.RPolarsDataFrame = function(x, ...) {
reader = as_record_batch_reader.RPolarsDataFrame(x)
as_arrow_table.RPolarsDataFrame = function(x, ..., future = FALSE) {
reader = result(as_record_batch_reader.RPolarsDataFrame(x, future = future)) |>
unwrap("in as_arrow_table(<RPolarsDataFrame>):")
reader$read_table()
}

Expand All @@ -25,12 +27,17 @@ as_arrow_table.RPolarsDataFrame = function(x, ...) {
#' pl_df = as_polars_df(mtcars)
#' as_record_batch_reader(pl_df)
# exported in zzz.R
as_record_batch_reader.RPolarsDataFrame = function(x, ...) {
as_record_batch_reader.RPolarsDataFrame = function(x, ..., future = FALSE) {
if (!is_scalar_bool(future)) {
Err_plain("`future` argument must be `TRUE` or `FALSE`") |>
unwrap("in as_record_batch_reader(<RPolarsDataFrame>):")
}

# https://github.com/apache/arrow/issues/39793
allocate_arrow_array_stream = utils::getFromNamespace("allocate_arrow_array_stream", "arrow")
external_pointer_addr_character = utils::getFromNamespace("external_pointer_addr_character", "arrow")

stream = allocate_arrow_array_stream()
.pr$DataFrame$export_stream(x, external_pointer_addr_character(stream))
.pr$DataFrame$export_stream(x, external_pointer_addr_character(stream), future)
arrow::RecordBatchReader$import_from_c(stream)
}
26 changes: 20 additions & 6 deletions R/pkg-nanoarrow.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#' Create a nanoarrow_array_stream from a Polars object
#'
#' @inheritParams as_arrow_table.RPolarsDataFrame
#' @inheritParams DataFrame_write_ipc
#' @param schema must stay at default value NULL
#' @rdname S3_as_nanoarrow_array_stream
#' @examples
Expand All @@ -10,25 +11,38 @@
#' nanoarrow_array_stream = as_nanoarrow_array_stream(pl_df)
#' as.data.frame(nanoarrow_array_stream)
# exported in zzz.R
as_nanoarrow_array_stream.RPolarsDataFrame = function(x, ..., schema = NULL) {
as_nanoarrow_array_stream.RPolarsDataFrame = function(x, ..., schema = NULL, future = FALSE) {
uw = \(res) unwrap("in as_nanoarrow_array_stream(<RPolarsDataFrame>):")

# Don't support the schema argument yet
stopifnot(is.null(schema))
if (!is.null(schema)) {
Err_plain("The `schema` argument is not supported yet") |>
uw()
}

if (!is_scalar_bool(future)) {
Err_plain("`future` argument must be `TRUE` or `FALSE`") |>
uw()
}

stream = nanoarrow::nanoarrow_allocate_array_stream()
.pr$DataFrame$export_stream(x, nanoarrow::nanoarrow_pointer_addr_chr(stream))
.pr$DataFrame$export_stream(x, nanoarrow::nanoarrow_pointer_addr_chr(stream), future)
stream
}


#' Infer nanoarrow schema from a Polars object
#'
#' @inheritParams as_arrow_table.RPolarsDataFrame
#' @inheritParams as_nanoarrow_array_stream.RPolarsDataFrame
#' @rdname S3_infer_nanoarrow_schema
#' @examples
#' library(nanoarrow)
#' pl_df = as_polars_df(mtcars)
#'
#' infer_nanoarrow_schema(pl_df)
# exported in zzz.R
infer_nanoarrow_schema.RPolarsDataFrame = function(x, ...) {
as_nanoarrow_array_stream.RPolarsDataFrame(x)$get_schema()
infer_nanoarrow_schema.RPolarsDataFrame = function(x, ..., future = FALSE) {
as_nanoarrow_array_stream.RPolarsDataFrame(x, future = future)$get_schema() |>
result() |>
unwrap("in infer_nanoarrow_schema(<RPolarsDataFrame>):")
}
7 changes: 6 additions & 1 deletion man/S3_as_arrow_table.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion man/S3_as_nanoarrow_array_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion man/S3_as_record_batch_reader.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion man/S3_infer_nanoarrow_schema.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions src/rust/src/arrow_interop/to_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,3 @@ fn consume_arrow_stream_to_series(boxed_stream: Box<ffi::ArrowArrayStream>) -> R
}
Ok(s)
}

pub unsafe fn export_df_as_stream(df: pl::DataFrame, robj_str_ref: &Robj) -> RResult<()> {
let stream_ptr =
crate::utils::robj_str_ptr_to_usize(robj_str_ref)? as *mut ffi::ArrowArrayStream;
let schema = df.schema().to_arrow(true);
let data_type = pl::ArrowDataType::Struct(schema.fields);
let field = pl::ArrowField::new("", data_type, false);
let iter_boxed = Box::new(crate::rdataframe::OwnedDataFrameIterator::new(df));
unsafe { *stream_ptr = ffi::export_iterator(iter_boxed, field) };
Ok(())
}
14 changes: 8 additions & 6 deletions src/rust/src/rdataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,20 @@ pub struct OwnedDataFrameIterator {
data_type: arrow::datatypes::ArrowDataType,
idx: usize,
n_chunks: usize,
pl_flavor: bool,
}

impl OwnedDataFrameIterator {
pub fn new(df: polars::frame::DataFrame) -> Self {
let schema = df.schema().to_arrow(false);
pub fn new(df: polars::frame::DataFrame, pl_flavor: bool) -> Self {
let schema = df.schema().to_arrow(pl_flavor);
let data_type = ArrowDataType::Struct(schema.fields);
let vs = df.get_columns().to_vec();
Self {
columns: vs,
data_type,
idx: 0,
n_chunks: df.n_chunks(),
pl_flavor,
}
}
}
Expand All @@ -60,7 +62,7 @@ impl Iterator for OwnedDataFrameIterator {
let batch_cols = self
.columns
.iter()
.map(|s| s.to_arrow(self.idx, false))
.map(|s| s.to_arrow(self.idx, self.pl_flavor))
.collect();
self.idx += 1;

Expand Down Expand Up @@ -346,12 +348,12 @@ impl RPolarsDataFrame {
Ok(List::from_values(vec))
}

pub fn export_stream(&self, stream_ptr: &str) {
let schema = self.0.schema().to_arrow(false);
pub fn export_stream(&self, stream_ptr: &str, pl_flavor: bool) {
let schema = self.0.schema().to_arrow(pl_flavor);
let data_type = ArrowDataType::Struct(schema.fields);
let field = ArrowField::new("", data_type, false);

let iter_boxed = Box::new(OwnedDataFrameIterator::new(self.0.clone()));
let iter_boxed = Box::new(OwnedDataFrameIterator::new(self.0.clone(), pl_flavor));
let mut stream = arrow::ffi::export_iterator(iter_boxed, field);
let stream_out_ptr_addr: usize = stream_ptr.parse().unwrap();
let stream_out_ptr = stream_out_ptr_addr as *mut arrow::ffi::ArrowArrayStream;
Expand Down
12 changes: 0 additions & 12 deletions src/rust/src/rlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,17 +218,6 @@ fn arrow_stream_to_series(robj_str: Robj) -> RResult<Robj> {
Ok(RPolarsSeries(s).into_robj())
}

#[extendr]
unsafe fn export_df_to_arrow_stream(robj_df: Robj, robj_str: Robj) -> RResult<Robj> {
let res: ExternalPtr<RPolarsDataFrame> = robj_df.try_into()?;
let pl_df = RPolarsDataFrame(res.0.clone()).0;
//safety robj_str must be ptr to a arrow2 stream ready to export into
unsafe {
crate::arrow_interop::to_rust::export_df_as_stream(pl_df, &robj_str)?;
}
Ok(robj_str)
}

#[extendr]
pub fn dtype_str_repr(dtype: Robj) -> RResult<String> {
let dtype = robj_to!(RPolarsDataType, dtype)?.0;
Expand Down Expand Up @@ -474,7 +463,6 @@ extendr_module! {
fn new_arrow_stream;
fn arrow_stream_to_df;
fn arrow_stream_to_series;
fn export_df_to_arrow_stream;

//robj meta
fn mem_address;
Expand Down
11 changes: 7 additions & 4 deletions tests/testthat/test-pkg-arrow.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ test_that("as_record_batch_reader() works for DataFrame", {
expect_s3_class(reader, "RecordBatchReader")

expect_identical(
# two as.data.frame()s because arrow sometimes returns a tibble here
as.data.frame(as.data.frame(reader)),
as.data.frame(reader),
data.frame(a = 1L, b = "two")
)
})
Expand All @@ -20,8 +19,12 @@ test_that("as_arrow_table() works for DataFrame", {
expect_s3_class(table, "Table")

expect_identical(
# two as.data.frame()s because arrow sometimes returns a tibble here
as.data.frame(as.data.frame(table)),
as.data.frame(table),
data.frame(a = 1L, b = "two")
)

expect_identical(
arrow::as_arrow_table(df, future = TRUE)$b$type$ToString(),
"string_view"
)
})
8 changes: 8 additions & 0 deletions tests/testthat/test-pkg-nanoarrow.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ test_that("as_nanoarrow_array_stream() works for DataFrame", {
as.data.frame(stream),
data.frame(a = 1L, b = "two")
)

# nanoarrow does not support the string view type yet
# https://github.com/apache/arrow-nanoarrow/pull/367
expect_grepl_error(
nanoarrow::as_nanoarrow_array_stream(df, future = TRUE) |>
as.data.frame(),
"Unknown format: 'vu'"
)
})

test_that("infer_nanoarrow_schema() works for DataFrame", {
Expand Down

0 comments on commit 53cc0e7

Please sign in to comment.