Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Maditr #231

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ Suggests:
speedglm,
broom,
learnr,
ggplot2,
tidyfast (>= 0.1.8)
ggplot2,
maditr (>= 0.6.3)
LinkingTo:
Rcpp
RoxygenNote: 7.0.2
Expand Down
23 changes: 23 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ S3method(compute,disk.frame)
S3method(delayed,disk.frame)
S3method(distinct,disk.frame)
S3method(do,disk.frame)
S3method(dt_filter,disk.frame)
S3method(dt_select,disk.frame)
S3method(filter,disk.frame)
S3method(full_join,disk.frame)
S3method(get_chunk,disk.frame)
Expand All @@ -37,6 +39,7 @@ S3method(lazy,disk.frame)
S3method(left_join,disk.frame)
S3method(length,chunk_agg.disk.frame)
S3method(length,collected_agg.disk.frame)
S3method(let,disk.frame)
S3method(map,default)
S3method(map,disk.frame)
S3method(map2,default)
Expand Down Expand Up @@ -74,6 +77,8 @@ S3method(summarise,grouped_disk.frame)
S3method(summarize,disk.frame)
S3method(summarize,grouped_disk.frame)
S3method(tail,disk.frame)
S3method(take,disk.frame)
S3method(take_if,disk.frame)
S3method(tbl_vars,disk.frame)
S3method(transmute,disk.frame)
export(IQR.chunk_agg.disk.frame)
Expand All @@ -85,6 +90,8 @@ export(as.disk.frame)
export(ceremony_text)
export(chunk_arrange)
export(chunk_distinct)
export(chunk_dt_arrange.disk.frame)
export(chunk_dt_summarize.disk.frame)
export(chunk_group_by)
export(chunk_lapply)
export(chunk_summarise)
Expand All @@ -108,6 +115,10 @@ export(df_ram_size)
export(dfglm)
export(disk.frame)
export(distribute)
export(dt_full_join.disk.frame)
export(dt_inner_join.disk.frame)
export(dt_left_join.disk.frame)
export(dt_mutate.disk.frame)
export(evalparseglue)
export(filter_all.disk.frame)
export(filter_at.disk.frame)
Expand Down Expand Up @@ -184,6 +195,7 @@ importFrom(data.table,foverlaps)
importFrom(data.table,fread)
importFrom(data.table,rbindlist)
importFrom(data.table,setDT)
importFrom(data.table,setkey)
importFrom(data.table,setkeyv)
importFrom(data.table,timetaken)
importFrom(dplyr,add_count)
Expand Down Expand Up @@ -253,6 +265,17 @@ importFrom(globals,findGlobals)
importFrom(glue,glue)
importFrom(jsonlite,fromJSON)
importFrom(jsonlite,toJSON)
importFrom(maditr,dt_arrange)
importFrom(maditr,dt_filter)
importFrom(maditr,dt_full_join)
importFrom(maditr,dt_inner_join)
importFrom(maditr,dt_left_join)
importFrom(maditr,dt_mutate)
importFrom(maditr,dt_select)
importFrom(maditr,dt_summarize)
importFrom(maditr,let)
importFrom(maditr,take)
importFrom(maditr,take_if)
importFrom(pryr,do_call)
importFrom(pryr,object_size)
importFrom(purrr,as_mapper)
Expand Down
22 changes: 0 additions & 22 deletions R/dplyr_verbs.r
Original file line number Diff line number Diff line change
Expand Up @@ -128,22 +128,6 @@ chunk_summarize <- create_chunk_mapper(dplyr::summarize)
chunk_summarise <- create_chunk_mapper(dplyr::summarise)


#' @export
#' @importFrom dplyr summarize
#' @rdname dplyr_verbs
summarize.disk.frame <- function(...) {
# comment summarize.grouped_disk.frame
stop("`summarize.disk.frame` has been removed. Please use `chunk_summarize` instead. This is in preparation for a more powerful `group_by` framework")
}



#' @export
#' @importFrom dplyr summarize
#' @rdname dplyr_verbs
summarise.disk.frame <- summarize.disk.frame


#' @export
#' @rdname dplyr_verbs
#' @importFrom dplyr do
Expand Down Expand Up @@ -289,12 +273,6 @@ groups.disk.frame <- function(x){
#' @param .data a disk.frame
#' @param ... same as the dplyr::group_by
#' @export
#' @rdname group_by
group_by.disk.frame <- function(...) {
stop("`arrange.disk.frame` has been removed. Please use `chunk_arrange` instead. This is preparation for a more powerful `group_by` framework")
}


#' @rdname group_by
#' @export
chunk_group_by <- create_chunk_mapper(dplyr::group_by)
Expand Down
45 changes: 45 additions & 0 deletions R/maditr_verbs.r
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#' Verbs from maditr
#' @rdname maditr_verbs
#' @importFrom maditr let
#' @export
let.disk.frame <- create_chunk_mapper(maditr::let, as.data.frame = FALSE)

#' @export
#' @importFrom maditr dt_mutate
dt_mutate.disk.frame <- create_chunk_mapper(maditr::mutate, as.data.frame = FALSE)

#' @export
#' @importFrom maditr dt_summarize
chunk_dt_summarize.disk.frame <- create_chunk_mapper(maditr::dt_summarize, as.data.frame = FALSE)

#' @export
#' @importFrom maditr dt_filter
dt_filter.disk.frame <- create_chunk_mapper(maditr::filter, as.data.frame = FALSE)

#' @export
#' @importFrom maditr dt_select
dt_select.disk.frame <- create_chunk_mapper(maditr::select, as.data.frame = FALSE)

#' @export
#' @importFrom maditr dt_arrange
chunk_dt_arrange.disk.frame <- create_chunk_mapper(maditr::dt_arrange, as.data.frame = FALSE)

#' @export
#' @importFrom maditr take
take.disk.frame <- create_chunk_mapper(maditr::take, as.data.frame = FALSE)

#' @export
#' @importFrom maditr take_if
take_if.disk.frame <- create_chunk_mapper(maditr::take_if, as.data.frame = FALSE)

#' @export
#' @importFrom maditr dt_inner_join
dt_inner_join.disk.frame <- create_chunk_mapper(maditr::dt_inner_join.disk.frame, as.data.frame = FALSE)

#' @export
#' @importFrom maditr dt_left_join
dt_left_join.disk.frame <- create_chunk_mapper(maditr::dt_left_join.disk.frame, as.data.frame = FALSE)

#' @export
#' @importFrom maditr dt_full_join
dt_full_join.disk.frame <- create_chunk_mapper(maditr::dt_full_join.disk.frame, as.data.frame = FALSE)
2 changes: 0 additions & 2 deletions R/map.r
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ map_dfr.disk.frame <- function(.x, .f, ..., .id = NULL, use.names = fill, fill =
#'
#' # clean up cars.df
#' delete(cars.df)
#' @rdname map
imap <- function(.x, .f, ...) {
UseMethod("imap")
}
Expand Down Expand Up @@ -269,7 +268,6 @@ delayed <- function(.x, .f, ...) {
}

#' @export
#' @rdname map
delayed.disk.frame <- function(.x, .f, ...) {
map.disk.frame(.x, .f, ..., lazy = TRUE)
}
Expand Down
126 changes: 89 additions & 37 deletions R/two-stage-verbs.R
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,74 @@ IQR.collected_agg.disk.frame <- function(listx, ...) {

#' A function to parse the summarize function
#' @importFrom dplyr filter select pull
#' @imporFrom purr map_dfr
#' @importFrom purrr map_dfr
#' @export
summarise.grouped_disk.frame <- function(.data, ...) {
ca_code = generate_summ_code(...)

chunk_summ_code = ca_code$chunk_summ_code
agg_summ_code = ca_code$agg_summ_code

# get the by variables
group_by_cols = purrr::map_chr(attr(.data, "group_by_cols"), ~{deparse(.x)})

list(group_by_cols = group_by_cols, chunk_summ_code = chunk_summ_code, agg_summ_code = agg_summ_code)

# generate full code
code_to_run = glue::glue("chunk_group_by({group_by_cols}) %>% chunk_summarize({chunk_summ_code}) %>% collect %>% group_by({group_by_cols}) %>% summarize({agg_summ_code})")

class(.data) <- c("summarized_disk.frame", "disk.frame")
attr(.data, "summarize_code") = code_to_run
.data
}

#' @export
summarize.grouped_disk.frame = summarise.grouped_disk.frame

#' Group by within each disk.frame
#' @description
#' The disk.frame group by operation perform group WITHIN each chunk. This is
#' often used for performance reasons. If the user wishes to perform group-by,
#' they may choose to use the `hard_group_by` function which is expensive as it
#' reorganizes the chunks by the shard key.
#' @seealso hard_group_by
#' @param .data a disk.frame
#' @param ... same as the dplyr::group_by
#' @export
#' @rdname group_by
# learning from https://docs.dask.org/en/latest/dataframe-groupby.html
group_by.disk.frame <- function(.data, ..., add = FALSE, .drop = group_by_drop_default(.data)) {
class(.data) <- c("grouped_disk.frame", "disk.frame")
attr(.data, "group_by_cols") = substitute(list(...))[-1]
.data
}

#' @export
#' @importFrom dplyr summarize
#' @rdname dplyr_verbs
summarize.disk.frame <- function(.data, ...) {
# comment summarize.grouped_disk.frame
warning("`summarize.disk.frame`'s behaviour has changed. Please use `chunk_summarize` if you wish to `dplyr::summarize` to each chunk")

ca_code = generate_summ_code(...)

chunk_summ_code = ca_code$chunk_summ_code
agg_summ_code = ca_code$agg_summ_code

# generate full code
code_to_run = glue::glue("chunk_summarize({chunk_summ_code}) %>% collect %>% summarize({agg_summ_code})")

class(.data) <- c("summarized_disk.frame", "disk.frame")
attr(.data, "summarize_code") = code_to_run
.data
}

#' Helper function to generate summarisation code
#' @importFrom data.table setDT setkey
generate_summ_code <- function(...) {
code = substitute(list(...))[-1]
expr_id = 0
temp_varn = 0
#browser()

list_of_chunk_agg_fns <- as.character(methods(class = "chunk_agg.disk.frame"))
list_of_collected_agg_fns <- as.character(methods(class = "collected_agg.disk.frame"))
Expand All @@ -183,12 +244,28 @@ summarise.grouped_disk.frame <- function(.data, ...) {

# search in the space to find functions name `fn`.chunk_agg.disk.frame
# only allow one such functions for now TODO improve it
#stopifnot(sum(paste0(unique(grp_funcs), ".chunk_agg.disk.frame") %in% list_of_chunk_agg_fns) == 1)
#stopifnot(sum(paste0(unique(grp_funcs), ".collected_agg.disk.frame") %in% list_of_collected_agg_fns) == 1)
stopifnot(sum(sapply(unique(grp_funcs), function(x) exists(paste0(x, ".chunk_agg.disk.frame")))) == 1)
stopifnot(sum(sapply(unique(grp_funcs), function(x) exists(paste0(x, ".collected_agg.disk.frame")))) == 1)
num_of_chunk_functions = sum(sapply(unique(grp_funcs), function(x) exists(paste0(x, ".chunk_agg.disk.frame"))))
num_of_collected_functions= sum(sapply(unique(grp_funcs), function(x) exists(paste0(x, ".collected_agg.disk.frame"))))

# the number chunk and aggregation functions must match
stopifnot(num_of_chunk_functions == num_of_collected_functions)

# keep only grp_functions
grp_funcs= grp_funcs[sapply(grp_funcs, function(x) exists(paste0(x, ".chunk_agg.disk.frame")))]

if(num_of_chunk_functions == 0) {
stop(sprintf("There must be at least one summarization function in %s", deparse(.x)))
} else if (num_of_chunk_functions > 1) {
stop(sprintf("Two or more summarisation functions are detected in \n\n```\n%s\n```\n\nThese are currently not supported by {disk.frame} at the moment \n * Nestling (like mean(sum(x) + y)) or \n * combinations (like sum(x) + mean(x))\n\nIf you want this implemented, please leave a comment or upvote at: https://github.com/xiaodaigh/disk.frame/issues/228 \n\n", deparse(.x)))
}

# check to see if the mean is only two from parent 0, otherwise it would a statement in the form of 1 + mean(x)
# which isn't supported
data.table::setDT(gpd)
data.table::setkey(gpd, parent)
if (gpd[id == gpd[id == gpd[(text == grp_funcs) & (token == "SYMBOL_FUNCTION_CALL"), parent], parent], parent] != 0) {
stop(sprintf("Combining summarization with other operations \n\n```\n%s\n```\n\nThese are currently not supported by {disk.frame} at the moment \n * combinations (like sum(x) + 1)\n* combinations (like list(sum(x)))\n\nIf you want this implemented, please leave a comment or upvote at: https://github.com/xiaodaigh/disk.frame/issues/228 \n\n", deparse(.x)))
}

temp_varn <<- temp_varn + 1
tmpcode = deparse(evalparseglue("substitute({deparse(.x)}, list({grp_funcs} = quote({grp_funcs}.chunk_agg.disk.frame)))")) %>% paste0(collapse = " ")
Expand All @@ -201,7 +278,7 @@ summarise.grouped_disk.frame <- function(.data, ...) {
chunk_code$name = ifelse(is.null(names(code[expr_id])), "", names(code[expr_id]))

# create the aggregation code
chunk_code$agg_expr = glue::glue("{grp_funcs}.collected_agg.disk.frame({paste0(chunk_code$assign_to, collapse=', ')})")
chunk_code$agg_expr = as.character(glue::glue("{grp_funcs}.collected_agg.disk.frame({paste0(chunk_code$assign_to, collapse=', ')})"))

#print(sapply(chunk_code, typeof))
chunk_code
Expand All @@ -213,42 +290,17 @@ summarise.grouped_disk.frame <- function(.data, ...) {
select(expr_id, name, agg_expr, orig_code) %>%
unique %>%
transmute(agg_code = paste0(ifelse(name == "", paste0("`", orig_code, "` = "), paste0(name, "=")), agg_expr))

agg_summ_code = paste0(agg_code_df$agg_code, collapse = ",")

# get the by variables
group_by_cols = purrr::map_chr(attr(.data, "group_by_cols"), ~{deparse(.x)})

list(group_by_cols = group_by_cols, chunk_summ_code = chunk_summ_code, agg_summ_code = agg_summ_code)

# generate full code
code_to_run = glue::glue("chunk_group_by({group_by_cols}) %>% chunk_summarize({chunk_summ_code}) %>% collect %>% group_by({group_by_cols}) %>% summarize({agg_summ_code})")
agg_summ_code = paste0(agg_code_df$agg_code, collapse = ",")

class(.data) <- c("summarized_disk.frame", "disk.frame")
attr(.data, "summarize_code") = code_to_run
.data
list(chunk_summ_code = chunk_summ_code, agg_summ_code = agg_summ_code)
}

#' @export
summarize.grouped_disk.frame = summarise.grouped_disk.frame

#' Group by within each disk.frame
#' @description
#' The disk.frame group by operation perform group WITHIN each chunk. This is
#' often used for performance reasons. If the user wishes to perform group-by,
#' they may choose to use the `hard_group_by` function which is expensive as it
#' reorganizes the chunks by the shard key.
#' @seealso hard_group_by
#' @param .data a disk.frame
#' @param ... same as the dplyr::group_by
#' @export
#' @rdname group_by
# learning from https://docs.dask.org/en/latest/dataframe-groupby.html
group_by.disk.frame <- function(.data, ..., add = FALSE, .drop = group_by_drop_default(.data)) {
class(.data) <- c("grouped_disk.frame", "disk.frame")
attr(.data, "group_by_cols") = substitute(list(...))[-1]
.data
}
#' @importFrom dplyr summarize
#' @rdname dplyr_verbs
summarise.disk.frame <- summarize.disk.frame



Expand Down
5 changes: 3 additions & 2 deletions book/01-intro.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ vignette: >
%\VignetteEncoding{UTF-8}
---

# The story of how `disk.frame` came to be
# The story of how `{disk.frame}` came to be
I was working at one of Australia's biggest banks and their shiny new SAS server was experiencing huge instability issues. As a result, we had to run SAS on our laptops to perform huge amounts of data manipulation. A simple SQL query can take around 20 minutes.

I had enough.

That's why I created `disk.frame` - a larger-than-RAM data manipulation framework for R. The same query now only takes 10 seconds.
That's why I created `disk.frame` - a larger-than-RAM data manipulation framework for R. The same query now only takes 10 seconds.

Loading