Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: With duckdb(environment_scan = TRUE), data frame objects are available as views in duckdb SQL queries #164

Merged
merged 5 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,7 @@ src/*.dll
/src/symbols.rds
/docs/
/vscode-*/
.idea
cmake-build-debug
compile_commands.json
.cache
19 changes: 17 additions & 2 deletions R/Driver.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,28 @@ driver_registry <- new.env(parent = emptyenv())
#' @description
#' `duckdb()` creates or reuses a database instance.
#'
#' @param environment_scan Set to `TRUE` to treat
#' data frames from the calling environment as tables.
#' If a database table with the same name exists, it takes precedence.
#' The default of this setting may change in a future version.
#'
#' @return `duckdb()` returns an object of class [duckdb_driver-class].
#'
#' @import methods DBI
#' @export
duckdb <- function(dbdir = DBDIR_MEMORY, read_only = FALSE, bigint = "numeric", config = list()) {
duckdb <- function(
dbdir = DBDIR_MEMORY,
read_only = FALSE,
bigint = "numeric",
config = list(),
...,
environment_scan = FALSE
) {
check_flag(read_only)
check_bigint(bigint)
if (...length() > 0) {
stop("... must be empty")
}

dbdir <- path_normalize(dbdir)
if (dbdir != DBDIR_MEMORY) {
Expand Down Expand Up @@ -57,7 +72,7 @@ duckdb <- function(dbdir = DBDIR_MEMORY, read_only = FALSE, bigint = "numeric",
drv <- new(
"duckdb_driver",
config = config,
database_ref = rethrow_rapi_startup(dbdir, read_only, config),
database_ref = rethrow_rapi_startup(dbdir, read_only, config, environment_scan),
dbdir = dbdir,
read_only = read_only,
bigint = bigint
Expand Down
8 changes: 4 additions & 4 deletions R/cpp11.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ rapi_disconnect <- function(conn) {
invisible(.Call(`_duckdb_rapi_disconnect`, conn))
}

rapi_startup <- function(dbdir, readonly, configsexp) {
.Call(`_duckdb_rapi_startup`, dbdir, readonly, configsexp)
rapi_startup <- function(dbdir, readonly, configsexp, environment_scan) {
.Call(`_duckdb_rapi_startup`, dbdir, readonly, configsexp, environment_scan)
}

rapi_lock <- function(dual) {
Expand Down Expand Up @@ -201,8 +201,8 @@ rapi_prepare_substrait_json <- function(conn, json) {
.Call(`_duckdb_rapi_prepare_substrait_json`, conn, json)
}

rapi_prepare <- function(conn, query) {
.Call(`_duckdb_rapi_prepare`, conn, query)
rapi_prepare <- function(conn, query, env) {
.Call(`_duckdb_rapi_prepare`, conn, query, env)
}

rapi_bind <- function(stmt, params, arrow, integer64) {
Expand Down
4 changes: 2 additions & 2 deletions R/dbConnect__duckdb_driver.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
#' @param dbdir Location for database files. Should be a path to an existing
#' directory in the file system. With the default (or `""`), all
#' data is kept in RAM.
#' @param ... Ignored
#' @param debug Print additional debug information such as queries
#' @param ... Reserved for future extensions, must be empty.
#' @param debug Print additional debug information, such as queries.
#' @param read_only Set to `TRUE` for read-only operation.
#' For file-based databases, this is only applied when the database file is opened for the first time.
#' Subsequent connections (via the same `drv` object or a `drv` object pointing to the same path)
Expand Down
21 changes: 20 additions & 1 deletion R/dbSendQuery__duckdb_connection_character.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ dbSendQuery__duckdb_connection_character <- function(conn, statement, params = N
if (conn@debug) {
message("Q ", statement)
}

env <- find_caller()

statement <- enc2utf8(statement)
stmt_lst <- rethrow_rapi_prepare(conn@conn_ref, statement)
stmt_lst <- rethrow_rapi_prepare(conn@conn_ref, statement, env)

res <- duckdb_result(
connection = conn,
Expand All @@ -24,3 +27,19 @@ dbSendQuery__duckdb_connection_character <- function(conn, statement, params = N
#' @rdname duckdb_connection-class
#' @export
setMethod("dbSendQuery", c("duckdb_connection", "character"), dbSendQuery__duckdb_connection_character)

find_caller <- function() {
i <- 3L
env <- parent.frame(i)

while (!identical(env, emptyenv())) {
env_name <- environmentName(parent.env(env))
if (!(env_name %in% c("duckdb", "DBI"))) {
return(env)
}
i <- i + 1L
env <- parent.frame(i)
}

env
}
8 changes: 4 additions & 4 deletions R/rethrow-gen.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ rethrow_rapi_disconnect <- function(conn, call = parent.frame(2)) {
)
}

rethrow_rapi_startup <- function(dbdir, readonly, configsexp, call = parent.frame(2)) {
rethrow_rapi_startup <- function(dbdir, readonly, configsexp, environment_scan, call = parent.frame(2)) {
rlang::try_fetch(
rapi_startup(dbdir, readonly, configsexp),
rapi_startup(dbdir, readonly, configsexp, environment_scan),
error = function(e) {
rethrow_error_from_rapi(e, call)
}
Expand Down Expand Up @@ -451,9 +451,9 @@ rethrow_rapi_prepare_substrait_json <- function(conn, json, call = parent.frame(
)
}

rethrow_rapi_prepare <- function(conn, query, call = parent.frame(2)) {
rethrow_rapi_prepare <- function(conn, query, env, call = parent.frame(2)) {
rlang::try_fetch(
rapi_prepare(conn, query),
rapi_prepare(conn, query, env),
error = function(e) {
rethrow_error_from_rapi(e, call)
}
Expand Down
15 changes: 11 additions & 4 deletions man/duckdb.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions src/cpp11.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ extern "C" SEXP _duckdb_rapi_disconnect(SEXP conn) {
END_CPP11
}
// database.cpp
duckdb::db_eptr_t rapi_startup(std::string dbdir, bool readonly, cpp11::list configsexp);
extern "C" SEXP _duckdb_rapi_startup(SEXP dbdir, SEXP readonly, SEXP configsexp) {
duckdb::db_eptr_t rapi_startup(std::string dbdir, bool readonly, cpp11::list configsexp, bool environment_scan);
extern "C" SEXP _duckdb_rapi_startup(SEXP dbdir, SEXP readonly, SEXP configsexp, SEXP environment_scan) {
BEGIN_CPP11
return cpp11::as_sexp(rapi_startup(cpp11::as_cpp<cpp11::decay_t<std::string>>(dbdir), cpp11::as_cpp<cpp11::decay_t<bool>>(readonly), cpp11::as_cpp<cpp11::decay_t<cpp11::list>>(configsexp)));
return cpp11::as_sexp(rapi_startup(cpp11::as_cpp<cpp11::decay_t<std::string>>(dbdir), cpp11::as_cpp<cpp11::decay_t<bool>>(readonly), cpp11::as_cpp<cpp11::decay_t<cpp11::list>>(configsexp), cpp11::as_cpp<cpp11::decay_t<bool>>(environment_scan)));
END_CPP11
}
// database.cpp
Expand Down Expand Up @@ -365,10 +365,10 @@ extern "C" SEXP _duckdb_rapi_prepare_substrait_json(SEXP conn, SEXP json) {
END_CPP11
}
// statement.cpp
cpp11::list rapi_prepare(duckdb::conn_eptr_t conn, std::string query);
extern "C" SEXP _duckdb_rapi_prepare(SEXP conn, SEXP query) {
cpp11::list rapi_prepare(duckdb::conn_eptr_t conn, std::string query, cpp11::environment env);
extern "C" SEXP _duckdb_rapi_prepare(SEXP conn, SEXP query, SEXP env) {
BEGIN_CPP11
return cpp11::as_sexp(rapi_prepare(cpp11::as_cpp<cpp11::decay_t<duckdb::conn_eptr_t>>(conn), cpp11::as_cpp<cpp11::decay_t<std::string>>(query)));
return cpp11::as_sexp(rapi_prepare(cpp11::as_cpp<cpp11::decay_t<duckdb::conn_eptr_t>>(conn), cpp11::as_cpp<cpp11::decay_t<std::string>>(query), cpp11::as_cpp<cpp11::decay_t<cpp11::environment>>(env)));
END_CPP11
}
// statement.cpp
Expand Down Expand Up @@ -452,7 +452,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_duckdb_rapi_list_arrow", (DL_FUNC) &_duckdb_rapi_list_arrow, 1},
{"_duckdb_rapi_load_rfuns", (DL_FUNC) &_duckdb_rapi_load_rfuns, 1},
{"_duckdb_rapi_lock", (DL_FUNC) &_duckdb_rapi_lock, 1},
{"_duckdb_rapi_prepare", (DL_FUNC) &_duckdb_rapi_prepare, 2},
{"_duckdb_rapi_prepare", (DL_FUNC) &_duckdb_rapi_prepare, 3},
{"_duckdb_rapi_prepare_substrait", (DL_FUNC) &_duckdb_rapi_prepare_substrait, 2},
{"_duckdb_rapi_prepare_substrait_json", (DL_FUNC) &_duckdb_rapi_prepare_substrait_json, 2},
{"_duckdb_rapi_ptr_to_str", (DL_FUNC) &_duckdb_rapi_ptr_to_str, 1},
Expand Down Expand Up @@ -487,7 +487,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_duckdb_rapi_rel_union_all", (DL_FUNC) &_duckdb_rapi_rel_union_all, 2},
{"_duckdb_rapi_release", (DL_FUNC) &_duckdb_rapi_release, 1},
{"_duckdb_rapi_shutdown", (DL_FUNC) &_duckdb_rapi_shutdown, 1},
{"_duckdb_rapi_startup", (DL_FUNC) &_duckdb_rapi_startup, 3},
{"_duckdb_rapi_startup", (DL_FUNC) &_duckdb_rapi_startup, 4},
{"_duckdb_rapi_unlock", (DL_FUNC) &_duckdb_rapi_unlock, 1},
{"_duckdb_rapi_unregister_arrow", (DL_FUNC) &_duckdb_rapi_unregister_arrow, 2},
{"_duckdb_rapi_unregister_df", (DL_FUNC) &_duckdb_rapi_unregister_df, 2},
Expand Down
14 changes: 10 additions & 4 deletions src/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ static bool CastRstringToVarchar(Vector &source, Vector &result, idx_t count, Ca
return true;
}

[[cpp11::register]] duckdb::db_eptr_t rapi_startup(std::string dbdir, bool readonly, cpp11::list configsexp) {
[[cpp11::register]] duckdb::db_eptr_t rapi_startup(std::string dbdir, bool readonly, cpp11::list configsexp, bool environment_scan) {
const char *dbdirchar;

if (dbdir.length() == 0 || dbdir.compare(IN_MEMORY_PATH) == 0) {
Expand Down Expand Up @@ -47,9 +47,15 @@ static bool CastRstringToVarchar(Vector &source, Vector &result, idx_t count, Ca
try {
wrapper = new DBWrapper();

auto data = make_uniq<ArrowScanReplacementData>();
data->wrapper = wrapper;
config.replacement_scans.emplace_back(ArrowScanReplacement, std::move(data));
auto data1 = make_uniq<ReplacementDataDBWrapper>();
data1->wrapper = wrapper;
config.replacement_scans.emplace_back(ArrowScanReplacement, std::move(data1));

if (environment_scan) {
auto data2 = make_uniq<ReplacementDataDBWrapper>();
data2->wrapper = wrapper;
config.replacement_scans.emplace_back(EnvironmentScanReplacement, std::move(data2));
}
wrapper->db = make_uniq<DuckDB>(dbdirchar, &config);

auto &instance = *wrapper->db->instance;
Expand Down
6 changes: 5 additions & 1 deletion src/include/rapi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ struct DBWrapper {
duckdb::unique_ptr<DuckDB> db;
arrow_scans_t arrow_scans;
mutex lock;
cpp11::sexp env;
cpp11::sexp registered_dfs;
};

template <class T>
Expand Down Expand Up @@ -122,7 +124,9 @@ typedef cpp11::external_pointer<RQueryResult> rqry_eptr_t;
// internal
unique_ptr<TableRef> ArrowScanReplacement(ClientContext &context, ReplacementScanInput &input, optional_ptr<ReplacementScanData> data);

struct ArrowScanReplacementData : public ReplacementScanData {
unique_ptr<TableRef> EnvironmentScanReplacement(ClientContext &context, ReplacementScanInput &input, optional_ptr<ReplacementScanData> data);

struct ReplacementDataDBWrapper : public ReplacementScanData {
DBWrapper *wrapper;
};

Expand Down
42 changes: 41 additions & 1 deletion src/register.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,46 @@ using namespace duckdb;
}
}



unique_ptr<TableRef> duckdb::EnvironmentScanReplacement(ClientContext &context, ReplacementScanInput &input, optional_ptr<ReplacementScanData> data_p) {
auto &data = (ReplacementDataDBWrapper &)*data_p;
auto db_wrapper = data.wrapper;

auto table_name_symbol = cpp11::safe[Rf_install](input.table_name.c_str());
SEXP df;
SEXP rho = db_wrapper->env;
while(rho != R_EmptyEnv) {
df = cpp11::safe[Rf_findVarInFrame3](rho, table_name_symbol, TRUE);
if (df != R_UnboundValue) {
break;
}
rho = ENCLOS(rho);
}
if (!df) {
return nullptr;
}
if (TYPEOF(df) == PROMSXP) {
df = cpp11::safe[Rf_eval](df, rho);
}
if (!Rf_inherits(df, "data.frame")) {
return nullptr;
}

// Avoid garbage collection of data frame
SEXP node = Rf_cons(df, CDR(db_wrapper->registered_dfs));
SETCDR(db_wrapper->registered_dfs, node);

// TODO: do utf conversion
auto table_function = make_uniq<TableFunctionRef>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or perhaps subclass TableFunctionRef ?

vector<duckdb::unique_ptr<ParsedExpression>> children;
children.push_back(
make_uniq<ConstantExpression>(Value::POINTER((uintptr_t)df)));
hannes marked this conversation as resolved.
Show resolved Hide resolved
table_function->function = make_uniq<FunctionExpression>("r_dataframe_scan", std::move(children));
return std::move(table_function);
}


class RArrowTabularStreamFactory {
public:
RArrowTabularStreamFactory(SEXP export_fun_p, SEXP arrow_scannable_p, ClientProperties config)
Expand Down Expand Up @@ -202,7 +242,7 @@ class RArrowTabularStreamFactory {

unique_ptr<TableRef> duckdb::ArrowScanReplacement(ClientContext &context, ReplacementScanInput &input, optional_ptr<ReplacementScanData> data_p) {
auto table_name = input.table_name;
ArrowScanReplacementData& data = static_cast<ArrowScanReplacementData&>(*data_p);
ReplacementDataDBWrapper& data = static_cast<ReplacementDataDBWrapper&>(*data_p);
auto db_wrapper = data.wrapper;
lock_guard<mutex> arrow_scans_lock(db_wrapper->lock);
const auto &arrow_scans = db_wrapper->arrow_scans;
Expand Down
18 changes: 14 additions & 4 deletions src/statement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "typesr.hpp"

#include <R_ext/Utils.h>
#include "httplib.hpp"

using namespace duckdb;
using namespace cpp11::literals;
Expand Down Expand Up @@ -61,9 +62,9 @@ using namespace cpp11::literals;
return StringsToSexp({json});
}

static cpp11::list construct_retlist(duckdb::unique_ptr<PreparedStatement> stmt, const string &query, idx_t n_param) {
static cpp11::list construct_retlist(duckdb::unique_ptr<PreparedStatement> stmt, const string &query, idx_t n_param, SEXP registered_dfs = R_NilValue) {
cpp11::writable::list retlist;
retlist.reserve(7);
retlist.reserve(8);
retlist.push_back({"str"_nm = query});

auto stmtholder = new RStatement();
Expand All @@ -85,6 +86,7 @@ static cpp11::list construct_retlist(duckdb::unique_ptr<PreparedStatement> stmt,
retlist.push_back({"n_param"_nm = n_param});
retlist.push_back(
{"return_type"_nm = StatementReturnTypeToString(stmtholder->stmt->GetStatementProperties().return_type)});
retlist.push_back({"registered_dfs"_nm = registered_dfs});

return retlist;
}
Expand Down Expand Up @@ -126,11 +128,19 @@ static cpp11::list construct_retlist(duckdb::unique_ptr<PreparedStatement> stmt,
return construct_retlist(std::move(stmt), "", 0);
}

[[cpp11::register]] cpp11::list rapi_prepare(duckdb::conn_eptr_t conn, std::string query) {
[[cpp11::register]] cpp11::list rapi_prepare(duckdb::conn_eptr_t conn, std::string query, cpp11::environment env) {
if (!conn || !conn.get() || !conn->conn) {
cpp11::stop("rapi_prepare: Invalid connection");
}

D_ASSERT(conn->db->env == R_NilValue);
conn->db->env = (SEXP)env;
conn->db->registered_dfs = Rf_cons(R_NilValue, R_NilValue);
duckdb_httplib::detail::scope_exit reset_db_env([&]() {
conn->db->env = R_NilValue;
conn->db->registered_dfs = R_NilValue;
});

vector<unique_ptr<SQLStatement>> statements;
try {
statements = conn->conn->ExtractStatements(query.c_str());
Expand Down Expand Up @@ -158,7 +168,7 @@ static cpp11::list construct_retlist(duckdb::unique_ptr<PreparedStatement> stmt,
stmt->error.Message().c_str());
}
auto n_param = stmt->named_param_map.size();
return construct_retlist(std::move(stmt), query, n_param);
return construct_retlist(std::move(stmt), query, n_param, conn->db->registered_dfs);
}

[[cpp11::register]] cpp11::list rapi_bind(duckdb::stmt_eptr_t stmt, cpp11::list params, bool arrow, bool integer64) {
Expand Down
Loading
Loading