Skip to content

Commit

Permalink
Nanoarrow (#64)
Browse files Browse the repository at this point in the history
* drop DBI methods (#52)

* Switch to nanoarrow (#60)

* Switch from arrow to nanoarrow

* Delete unneeded code

* Update DESCRIPTION

Co-authored-by: eitsupi <50911393+eitsupi@users.noreply.github.com>

---------

Co-authored-by: eitsupi <50911393+eitsupi@users.noreply.github.com>
Co-authored-by: Bruno Tremblay <bruno@boostao.ca>

* Remove `overload_bq_table_download` (#61)

Since it will soon not be necessary. (Might make sense to merge this after the next bigrquery release)

* doc

* Move to nanoarrow WIP

* fix post_process conversion from switching to nanoarrow

* change option for nanoarrow to suppress warning

---------

Co-authored-by: Hadley Wickham <hadley@posit.co>
Co-authored-by: eitsupi <50911393+eitsupi@users.noreply.github.com>
Co-authored-by: Hadley Wickham <h.wickham@gmail.com>
  • Loading branch information
4 people authored Sep 21, 2024
1 parent 0ac2f29 commit 0090fa0
Show file tree
Hide file tree
Showing 13 changed files with 51 additions and 306 deletions.
8 changes: 3 additions & 5 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ Description: Easily talk to Google's 'BigQuery Storage' API from R
(<https://cloud.google.com/bigquery/docs/reference/storage/rpc>).
License: Apache License (>= 2)
Imports:
arrow,
methods,
nanoarrow (>= 0.5.0),
rlang,
DBI,
bigrquery,
assertthat,
Rcpp,
Expand All @@ -32,9 +30,9 @@ LinkingTo:
Encoding: UTF-8
URL: https://github.com/meztez/bigrquerystorage
BugReports: https://github.com/meztez/bigrquerystorage/issues
SystemRequirements: grpc/protobuf headers and compilers. For example
SystemRequirements: grpc/protobuf headers and compilers. For example
libprotobuf-dev libgrpc++-dev protobuf-compiler-grpc (Debian) or
protobuf-devel grpc-devel grpc-plugins (Fedora/RHEL)
RoxygenNote: 7.3.1
RoxygenNote: 7.3.2
Roxygen: list(markdown = TRUE)
Biarch: TRUE
11 changes: 1 addition & 10 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,12 @@
export(bqs_auth)
export(bqs_deauth)
export(bqs_table_download)
export(overload_bq_table_download)
exportMethods(dbFetch)
exportMethods(dbReadTable)
import(DBI)
import(bigrquery)
import(methods)
import(nanoarrow)
importFrom(Rcpp,sourceCpp)
importFrom(arrow,RecordBatchStreamReader)
importFrom(arrow,Table)
importFrom(bit64,is.integer64)
importFrom(lifecycle,badge)
importFrom(lifecycle,deprecate_warn)
importFrom(lifecycle,deprecated)
importFrom(rlang,check_installed)
importFrom(rlang,env_unlock)
importFrom(rlang,is_missing)
importFrom(rlang,is_named)
importFrom(tibble,tibble)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# bigrquerystorage 1.1.0.9000

* Now uses nanoarrow instead of arrow. This lightens the dependency chain quite a bit (@hadleywickham).
* With `use_tibble = TRUE`
* Convert BigQuery SQL types BYTES, GEOGRAPHY to blob, wk_wkt
* Set timezone of BigQuery SQL type DATETIME to UTC
* Fix nested list parse post processing.
* Fix returning more rows than actual rows in source table when n_max > nrows.
* Drop DBI methods.

# bigrquerystorage 1.1.0

Expand Down
1 change: 0 additions & 1 deletion R/bigrquerystorage.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#' @aliases NULL
#' @aliases bigrquerystorage-package
#' @import methods DBI
#' @importFrom Rcpp sourceCpp
#' @importFrom bit64 is.integer64
#' @useDynLib bigrquerystorage, .registration = TRUE
Expand Down
94 changes: 14 additions & 80 deletions R/bqs_download.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
#' More details about table modifiers and table options are available from the
#' API Reference documentation. (See [TableModifiers](https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#tablemodifiers) and
#' [TableReadOptions](https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#tablereadoptions))
#' @return This method returns a [arrow::Table] Table or optionally a tibble.
#' @return This method returns a data.frame or optionally a tibble.
#' If you need a `data.frame`, leave parameter as_tibble to FALSE and coerce
#' the results with [as.data.frame()].
#' @export
#' @importFrom arrow RecordBatchStreamReader Table
#' @importFrom lifecycle deprecated deprecate_warn
#' @importFrom tibble tibble
#' @importFrom rlang is_missing
#' @import nanoarrow
bqs_table_download <- function(
x,
parent = getOption("bigquerystorage.project", ""),
Expand All @@ -41,7 +41,7 @@ bqs_table_download <- function(
sample_percentage,
n_max = Inf,
quiet = NA,
as_tibble = FALSE,
as_tibble = lifecycle::deprecated(),
bigint = c("integer", "integer64", "numeric", "character"),
max_results = lifecycle::deprecated()) {
# Parameters validation
Expand Down Expand Up @@ -109,30 +109,8 @@ bqs_table_download <- function(
quiet = quiet
)

rdr <- RecordBatchStreamReader$create(unlist(raws))
# There is currently no way to create an Arrow Table from a
# RecordBatchStreamReader when there is a schema but no batches.
if (length(raws[[2]]) == 0L) {
tb <- Table$create(
stats::setNames(
data.frame(matrix(ncol = rdr$schema$num_fields, nrow = 0)),
rdr$schema$names
)
)
} else {
tb <- rdr$read_table()
}

if (isTRUE(as_tibble)) {
fields <- select_fields(bigrquery::bq_table_fields(x), selected_fields)
tb <- parse_postprocess(
tibble::tibble(
as.data.frame(tb)
),
bigint,
fields
)
}
fields <- select_fields(bigrquery::bq_table_fields(x), selected_fields)
tb <- parse_postprocess(tibble::tibble(as.data.frame(nanoarrow::read_nanoarrow(raws))), bigint, fields)

# Batches do not support a n_max so we get just enough results before
# exiting the streaming loop.
Expand Down Expand Up @@ -233,52 +211,6 @@ bqs_deauth <- function() {
invisible()
}

#' Overload `bigrquery::bq_table_download`
#' @description
#' `r lifecycle::badge("experimental")`
#' Replace bigrquery bq_table_download method in bigrquery namespace.
#' @param parent Parent project used by the API for billing.
#' @importFrom rlang env_unlock
#' @importFrom lifecycle badge
#' @import bigrquery
#' @return No return value, called for side effects.
#' @export
overload_bq_table_download <- function(parent) {
utils::assignInNamespace("bq_table_download", function(
x, n_max = Inf, page_size = NULL, start_index = 0L, max_connections = 6L,
quiet = NA, bigint = c("integer", "integer64", "numeric", "character"), max_results = deprecated()) {
x <- bigrquery::as_bq_table(x)
if (lifecycle::is_present(max_results)) {
lifecycle::deprecate_warn(
"1.4.0", "bq_table_download(max_results)",
"bq_table_download(n_max)"
)
n_max <- max_results
}
assertthat::assert_that(is.numeric(n_max), length(n_max) == 1)
assertthat::assert_that(is.numeric(start_index), length(start_index) == 1)
bigint <- match.arg(bigint)
table_data <- bigrquerystorage::bqs_table_download(
x = x,
parent = parent,
n_max = n_max + start_index,
as_tibble = TRUE,
quiet = quiet,
bigint = bigint
)
if (start_index > 0L) {
table_data <- table_data[start_index:nrow(table_data), ]
}
return(table_data)
}, ns = "bigrquery")
if ("package:bigrquery" %in% search()) {
env_unlock(environment(bq_table_download))
namespaceExport(environment(bq_table_download), "bq_table_download")
lockEnvironment(environment(bq_table_download), bindings = TRUE)
}
}


# BigQuery storage --------------------------------------------------------
#' @noRd
bqs_initiate <- function() {
Expand All @@ -287,8 +219,8 @@ bqs_initiate <- function() {
if (Sys.getenv("GRPC_DEFAULT_SSL_ROOTS_FILE_PATH") == "") {
warning("On Windows, GRPC_DEFAULT_SSL_ROOTS_FILE_PATH should be set to the PEM file path to load SSL roots from.")
}
# Issue with parallel arrow as.data.frame on Windows
options("arrow.use_threads" = FALSE)
# Suppress warning from unregistered BigQuery extension
options("nanoarrow.warn_unregistered_extension" = FALSE)
}
}

Expand All @@ -297,14 +229,14 @@ bqs_initiate <- function() {
#' @noRd
parse_postprocess <- function(df, bigint, fields) {
tests <- list()
if (bigint != "integer64") {
if (bigint != "numeric") {
as_bigint <- switch(bigint,
integer = as.integer,
numeric = as.numeric,
integer64 = bit64::as.integer64,
character = as.character
)
tests[["bigint"]] <- list(
"test" = function(x,y) bit64::is.integer64(x),
"test" = function(x,y) is.numeric(x),
"func" = function(x) as_bigint(x)
)
}
Expand Down Expand Up @@ -342,9 +274,12 @@ parse_postprocess <- function(df, bigint, fields) {
#' @importFrom rlang is_named
col_mapply <- function(x, y, tests) {
if (is.list(x)) {
if (inherits(x, "arrow_list")) {
if (inherits(x, c("arrow_list", "vctrs_list_of"))) {
x <- as.list(x)
}
if (inherits(x, "data.frame") && !inherits(x, "tbl_df")) {
x <- tibble::tibble(x)
}
if (rlang::is_named(x)) {
x[] <- mapply(col_mapply, x, y[["fields"]], MoreArgs = list(tests = tests), SIMPLIFY = FALSE)
return(x)
Expand Down Expand Up @@ -398,4 +333,3 @@ has_type <- function(fields, bqs_type) {
w <- which(grepl("type$", names(f)))
bqs_type %in% unique(f[w])
}

58 changes: 0 additions & 58 deletions R/dbi.R

This file was deleted.

16 changes: 2 additions & 14 deletions README.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ knitr::opts_chunk$set(

![Comparing bq_table_download from bigrquery to bgs_table_download from bigrquerystorage](./docs/bigrquerystorage.gif)

Use [BigQuery Storage API](https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1) from R.
Use [BigQuery Storage API](https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1) from R.

The main utility is to replace `bigrquery::bq_table_download` method.
The main utility is to replace `bigrquery::bq_table_download` method.

It supports [BigQueryRead interface](https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#bigqueryread).
Support for [BigQueryWrite interface](https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#bigquerywrite) may be added in a future release.
Expand All @@ -42,10 +42,6 @@ the raw stream into an R object.
`bqs_table_download` is the main function of this package. Other functions are helpers to
facilitate authentication and debugging.

The package also includes DBI methods for `dbFetch` and `dbReadTable`. It should be loaded after
`bigrquery`. Alternatively, use `overload_bq_table_download` to replace
`bigrquery::bq_table_download` directly in `bigrquery` namespace.

## Installation

#### CRAN
Expand Down Expand Up @@ -169,21 +165,13 @@ rows <- bqs_table_download(
, selected_fields = c("name", "number", "state"),
row_restriction = 'state = "WA"'
# , sample_percentage = 50
# , as_tibble = TRUE
)
sprintf(
"Got %d unique names in states: %s",
length(unique(rows$name)),
paste(unique(rows$state), collapse = " ")
)
# Replace bigrquery::bq_download_table
rows <- bigrquery::bq_table_download("bigquery-public-data.usa_names.usa_1910_current")
# Downloading 6,122,890 rows in 613 pages.
overload_bq_table_download(project_id)
rows <- bigrquery::bq_table_download("bigquery-public-data.usa_names.usa_1910_current")
# Streamed 6122890 rows in 5980 messages.
```

## Authentication
Expand Down
13 changes: 0 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ R package to transform the raw stream into an R object.
`bqs_table_download` is the main function of this package. Other
functions are helpers to facilitate authentication and debugging.

The package also includes DBI methods for `dbFetch` and `dbReadTable`.
It should be loaded after `bigrquery`. Alternatively, use
`overload_bq_table_download` to replace `bigrquery::bq_table_download`
directly in `bigrquery` namespace.

## Installation

#### CRAN
Expand Down Expand Up @@ -176,21 +171,13 @@ rows <- bqs_table_download(
, selected_fields = c("name", "number", "state"),
row_restriction = 'state = "WA"'
# , sample_percentage = 50
# , as_tibble = TRUE
)

sprintf(
"Got %d unique names in states: %s",
length(unique(rows$name)),
paste(unique(rows$state), collapse = " ")
)

# Replace bigrquery::bq_download_table
rows <- bigrquery::bq_table_download("bigquery-public-data.usa_names.usa_1910_current")
# Downloading 6,122,890 rows in 613 pages.
overload_bq_table_download(project_id)
rows <- bigrquery::bq_table_download("bigquery-public-data.usa_names.usa_1910_current")
# Streamed 6122890 rows in 5980 messages.
```

## Authentication
Expand Down
Loading

0 comments on commit 0090fa0

Please sign in to comment.