Skip to content

Commit

Permalink
Extract mutate_col() from mutate_cols()
Browse files Browse the repository at this point in the history
  • Loading branch information
lionel- committed Aug 30, 2022
1 parent 157261d commit 54f8218
Showing 1 changed file with 169 additions and 165 deletions.
334 changes: 169 additions & 165 deletions R/mutate.R
Original file line number Diff line number Diff line change
Expand Up @@ -230,199 +230,203 @@ mutate_cols <- function(.data, dots, caller_env, error_call = caller_env()) {
on.exit(context_poke("column", old_current_column), add = TRUE)
on.exit(mask$forget(), add = TRUE)

rows <- mask$get_rows()

new_columns <- set_names(list(), character())

withCallingHandlers({
withCallingHandlers(
for (i in seq_along(dots)) {
context_poke("column", old_current_column)
new_columns <- mutate_col(i, .data, dots, mask, new_columns)
},
error = function(e) {
local_error_context(dots = dots, .index = i, mask = mask)

bullets <- c(
cnd_bullet_header("computing"),
mutate_bullets(e)
)

abort(
bullets,
class = "dplyr:::mutate_error",
parent = skip_internal_condition(e),
bullets = bullets,
call = error_call
)
},
warning = function(w) {
# Check if there is an upstack calling handler that would muffle
# the warning. This avoids doing the expensive work below for a
# silenced warning (#5675).
if (check_muffled_warning(w)) {
maybe_restart("muffleWarning")
}

# get results from all the quosures that are expanded from ..i
# then ingest them after
quosures <- expand_across(dots[[i]])
quosures_results <- vector(mode = "list", length = length(quosures))
local_error_context(dots = dots, .index = i, mask = mask)

for (k in seq_along(quosures)) {
quo <- quosures[[k]]
quo_data <- attr(quo, "dplyr:::data")
if (!is.null(quo_data$column)) {
context_poke("column", quo_data$column)
}
# a list in which each element is the result of
# evaluating the quosure in the "sliced data mask"
# recycling it appropriately to match the group size
#
# TODO: reinject hybrid evaluation at the R level
chunks <- NULL

# result after unchopping the chunks
result <- NULL

if (quo_is_symbol(quo)){
name <- as_string(quo_get_expr(quo))

if (name %in% names(new_columns)) {
# already have result and chunks
result <- new_columns[[name]]
chunks <- mask$resolve(name)
} else if (name %in% names(.data)) {
# column from the original data
result <- .data[[name]]
chunks <- mask$resolve(name)
}
warn(c(
cnd_bullet_header("computing"),
i = cnd_header(w),
i = cnd_bullet_cur_group_label(what = "warning")
))

if (inherits(.data, "rowwise_df") && vec_is_list(result)) {
sizes <- list_sizes(result)
wrong <- which(sizes != 1)
if (length(wrong)) {
# same error as would have been generated by mask$eval_all_mutate()
group <- wrong[1L]
mask$set_current_group(group)

abort(
class = c("dplyr:::mutate_incompatible_size", "dplyr:::internal_error"),
dplyr_error_data = list(result_size = sizes[group], expected_size = 1)
)
}
result_ptype <- attr(result, "ptype", exact = TRUE)
if (length(result) == 0 && is.null(result_ptype)) {
# i.e. `vec_ptype_finalise(unspecified())` (#6369)
result <- logical()
} else {
result <- vec_unchop(result, ptype = result_ptype)
}
}
} else if (!quo_is_symbolic(quo) && !is.null(quo_get_expr(quo))) {
# constant, we still need both `result` and `chunks`
result <- quo_get_expr(quo)

result <- withCallingHandlers(
vec_recycle(result, vec_size(.data)),
error = function(cnd) {
abort(
class = c("dplyr:::mutate_constant_recycle_error", "dplyr:::internal_error"),
constant_size = vec_size(result), data_size = vec_size(.data)
)
}
)
# Cancel `w`
maybe_restart("muffleWarning")
})

chunks <- vec_chop(result, rows)
}
is_zap <- map_lgl(new_columns, inherits, "rlang_zap")
new_columns[is_zap] <- rep(list(NULL), sum(is_zap))

if (is.null(chunks)) {
if (is.null(quo_data$column)) {
chunks <- mask$eval_all_mutate(quo)
} else {
chunks <- withCallingHandlers(
mask$eval_all_mutate(quo),
error = function(cnd) {
msg <- glue("Problem while computing column `{quo_data$name_auto}`.")
abort(msg, call = call("across"), parent = cnd)
}
)
}
}
used <- mask$get_used()
names(used) <- mask$current_vars()
attr(new_columns, "used") <- used

if (is.null(chunks)) {
next
}
new_columns
}

# only unchop if needed
if (is.null(result)) {
if (length(rows) == 1) {
result <- chunks[[1]]
} else {
chunks <- dplyr_vec_cast_common(chunks, quo_data$name_auto)
result <- vec_unchop(chunks, rows)
}
}
mutate_col <- function(i, data, dots, mask, new_columns) {
rows <- mask$get_rows()

quosures_results[[k]] <- list(result = result, chunks = chunks)
# get results from all the quosures that are expanded from ..i
# then ingest them after
quosures <- expand_across(dots[[i]])
quosures_results <- vector(mode = "list", length = length(quosures))

# First pass
for (k in seq_along(quosures)) {
quo <- quosures[[k]]
quo_data <- attr(quo, "dplyr:::data")
if (!is.null(quo_data$column)) {
context_poke("column", quo_data$column)
}
# a list in which each element is the result of
# evaluating the quosure in the "sliced data mask"
# recycling it appropriately to match the group size
#
# TODO: reinject hybrid evaluation at the R level
chunks <- NULL

# result after unchopping the chunks
result <- NULL

if (quo_is_symbol(quo)){
name <- as_string(quo_get_expr(quo))

if (name %in% names(new_columns)) {
# already have result and chunks
result <- new_columns[[name]]
chunks <- mask$resolve(name)
} else if (name %in% names(data)) {
# column from the original data
result <- data[[name]]
chunks <- mask$resolve(name)
}


for (k in seq_along(quosures)) {
quo <- quosures[[k]]
quo_data <- attr(quo, "dplyr:::data")

quo_result <- quosures_results[[k]]
if (is.null(quo_result)) {
if (quo_data$is_named) {
name <- quo_data$name_given
new_columns[[name]] <- zap()
mask$remove(name)
}
next
if (inherits(data, "rowwise_df") && vec_is_list(result)) {
sizes <- list_sizes(result)
wrong <- which(sizes != 1)
if (length(wrong)) {
# same error as would have been generated by mask$eval_all_mutate()
group <- wrong[1L]
mask$set_current_group(group)

abort(
class = c("dplyr:::mutate_incompatible_size", "dplyr:::internal_error"),
dplyr_error_data = list(result_size = sizes[group], expected_size = 1)
)
}
result_ptype <- attr(result, "ptype", exact = TRUE)
if (length(result) == 0 && is.null(result_ptype)) {
# i.e. `vec_ptype_finalise(unspecified())` (#6369)
result <- logical()
} else {
result <- vec_unchop(result, ptype = result_ptype)
}
}
} else if (!quo_is_symbolic(quo) && !is.null(quo_get_expr(quo))) {
# constant, we still need both `result` and `chunks`
result <- quo_get_expr(quo)

result <- withCallingHandlers(
vec_recycle(result, vec_size(data)),
error = function(cnd) {
abort(
class = c("dplyr:::mutate_constant_recycle_error", "dplyr:::internal_error"),
constant_size = vec_size(result), data_size = vec_size(data)
)
}
)

result <- quo_result$result
chunks <- quo_result$chunks

if (!quo_data$is_named && is.data.frame(result)) {
types <- vec_ptype(result)
types_names <- names(types)
chunks_extracted <- .Call(dplyr_extract_chunks, chunks, types)
chunks <- vec_chop(result, rows)
}

for (j in seq_along(types)) {
mask$add_one(types_names[j], chunks_extracted[[j]], result = result[[j]])
if (is.null(chunks)) {
if (is.null(quo_data$column)) {
chunks <- mask$eval_all_mutate(quo)
} else {
chunks <- withCallingHandlers(
mask$eval_all_mutate(quo),
error = function(cnd) {
msg <- glue("Problem while computing column `{quo_data$name_auto}`.")
abort(msg, call = call("across"), parent = cnd)
}
)
}
}

new_columns[types_names] <- result
} else {
# treat as a single output otherwise
name <- quo_data$name_auto
mask$add_one(name = name, chunks = chunks, result = result)

new_columns[[name]] <- result
}
if (is.null(chunks)) {
next
}

# only unchop if needed
if (is.null(result)) {
if (length(rows) == 1) {
result <- chunks[[1]]
} else {
chunks <- dplyr_vec_cast_common(chunks, quo_data$name_auto)
result <- vec_unchop(chunks, rows)
}

}

},
error = function(e) {
local_error_context(dots = dots, .index = i, mask = mask)

bullets <- c(
cnd_bullet_header("computing"),
mutate_bullets(e)
)

abort(
bullets,
class = "dplyr:::mutate_error",
parent = skip_internal_condition(e),
bullets = bullets,
call = error_call
)
},
warning = function(w) {
# Check if there is an upstack calling handler that would muffle
# the warning. This avoids doing the expensive work below for a
# silenced warning (#5675).
if (check_muffled_warning(w)) {
maybe_restart("muffleWarning")
quosures_results[[k]] <- list(result = result, chunks = chunks)
}

# Second pass
for (k in seq_along(quosures)) {
quo <- quosures[[k]]
quo_data <- attr(quo, "dplyr:::data")

quo_result <- quosures_results[[k]]
if (is.null(quo_result)) {
if (quo_data$is_named) {
name <- quo_data$name_given
new_columns[[name]] <- zap()
mask$remove(name)
}
next
}

local_error_context(dots = dots, .index = i, mask = mask)
result <- quo_result$result
chunks <- quo_result$chunks

warn(c(
cnd_bullet_header("computing"),
i = cnd_header(w),
i = cnd_bullet_cur_group_label(what = "warning")
))
if (!quo_data$is_named && is.data.frame(result)) {
types <- vec_ptype(result)
types_names <- names(types)
chunks_extracted <- .Call(dplyr_extract_chunks, chunks, types)

# Cancel `w`
maybe_restart("muffleWarning")
})
for (j in seq_along(types)) {
mask$add_one(types_names[j], chunks_extracted[[j]], result = result[[j]])
}

new_columns[types_names] <- result
} else {
# treat as a single output otherwise
name <- quo_data$name_auto
mask$add_one(name = name, chunks = chunks, result = result)

new_columns[[name]] <- result
}
}

is_zap <- map_lgl(new_columns, inherits, "rlang_zap")
new_columns[is_zap] <- rep(list(NULL), sum(is_zap))
used <- mask$get_used()
names(used) <- mask$current_vars()
attr(new_columns, "used") <- used
new_columns
}

Expand Down

0 comments on commit 54f8218

Please sign in to comment.