diff --git a/include/pgduckdb/pg/transactions.hpp b/include/pgduckdb/pg/transactions.hpp index aa108ead..a4ece569 100644 --- a/include/pgduckdb/pg/transactions.hpp +++ b/include/pgduckdb/pg/transactions.hpp @@ -32,8 +32,8 @@ typedef enum { typedef void (*SubXactCallback)(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg); } -namespace pgduckdb { -bool PostgresDidWalWrites(); +namespace pgduckdb::pg { +bool DidWalWrites(); CommandId GetCurrentCommandId(bool used = false); bool IsInTransactionBlock(bool top_level); void PreventInTransactionBlock(bool is_top_level, const char *statement_type); @@ -41,4 +41,4 @@ void RegisterXactCallback(XactCallback callback, void *arg); void UnregisterXactCallback(XactCallback callback, void *arg); void RegisterSubXactCallback(SubXactCallback callback, void *arg); void UnregisterSubXactCallback(SubXactCallback callback, void *arg); -} // namespace pgduckdb +} // namespace pgduckdb::pg diff --git a/include/pgduckdb/pgduckdb_duckdb.hpp b/include/pgduckdb/pgduckdb_duckdb.hpp index 70ce9eb8..05005939 100644 --- a/include/pgduckdb/pgduckdb_duckdb.hpp +++ b/include/pgduckdb/pgduckdb_duckdb.hpp @@ -4,8 +4,10 @@ namespace pgduckdb { -bool DuckdbDidWrites(); -bool DuckdbDidWrites(duckdb::ClientContext &context); +namespace ddb { +bool DidWrites(); +bool DidWrites(duckdb::ClientContext &context); +} // namespace ddb class DuckDBManager { public: diff --git a/include/pgduckdb/pgduckdb_xact.hpp b/include/pgduckdb/pgduckdb_xact.hpp index fd61f58c..10fdad50 100644 --- a/include/pgduckdb/pgduckdb_xact.hpp +++ b/include/pgduckdb/pgduckdb_xact.hpp @@ -1,8 +1,12 @@ namespace pgduckdb { + +namespace pg { +bool IsInTransactionBlock(); +void PreventInTransactionBlock(const char *statement_type); +} // namespace pg + void ClaimCurrentCommandId(); void RegisterDuckdbXactCallback(); void AutocommitSingleStatementQueries(); void MarkStatementNotTopLevel(); -bool IsInTransactionBlock(); -void PreventInTransactionBlock(const char *statement_type); } // namespace pgduckdb diff --git a/src/pg/transactions.cpp b/src/pg/transactions.cpp index 030f8f7d..63a39ae5 100644 --- a/src/pg/transactions.cpp +++ b/src/pg/transactions.cpp @@ -6,10 +6,10 @@ extern "C" { #include "access/xlog.h" // XactLastRecEnd } -namespace pgduckdb { +namespace pgduckdb::pg { bool -PostgresDidWalWrites() { +DidWalWrites() { return XactLastRecEnd != InvalidXLogRecPtr; } @@ -48,4 +48,4 @@ UnregisterSubXactCallback(SubXactCallback callback, void *arg) { return PostgresFunctionGuard(::UnregisterSubXactCallback, callback, arg); } -} // namespace pgduckdb +} // namespace pgduckdb::pg diff --git a/src/pgduckdb_ddl.cpp b/src/pgduckdb_ddl.cpp index 263cb1cd..ccce799e 100644 --- a/src/pgduckdb_ddl.cpp +++ b/src/pgduckdb_ddl.cpp @@ -100,8 +100,12 @@ static void DuckdbUtilityHook_Cpp(PlannedStmt *pstmt, const char *query_string, bool read_only_tree, ProcessUtilityContext context, ParamListInfo params, struct QueryEnvironment *query_env, DestReceiver *dest, QueryCompletion *qc) { + if (!pgduckdb::IsExtensionRegistered()) { + return prev_process_utility_hook(pstmt, query_string, read_only_tree, context, params, query_env, dest, qc); + } + Node *parsetree = pstmt->utilityStmt; - if (pgduckdb::IsExtensionRegistered() && IsA(parsetree, CopyStmt)) { + if (IsA(parsetree, CopyStmt)) { auto copy_query = PostgresFunctionGuard(MakeDuckdbCopyQuery, pstmt, query_string, query_env); if (copy_query) { auto res = pgduckdb::DuckDBQueryOrThrow(copy_query); @@ -127,9 +131,7 @@ DuckdbUtilityHook_Cpp(PlannedStmt *pstmt, const char *query_string, bool read_on bool prev_top_level_ddl = top_level_ddl; top_level_ddl = context == PROCESS_UTILITY_TOPLEVEL; - if (pgduckdb::IsExtensionRegistered()) { - DuckdbHandleDDL(parsetree); - } + DuckdbHandleDDL(parsetree); prev_process_utility_hook(pstmt, query_string, read_only_tree, context, params, query_env, dest, qc); top_level_ddl = prev_top_level_ddl; diff --git a/src/pgduckdb_duckdb.cpp b/src/pgduckdb_duckdb.cpp index bf155904..6eba8b5e 100644 --- a/src/pgduckdb_duckdb.cpp +++ b/src/pgduckdb_duckdb.cpp @@ -1,5 +1,6 @@ #include "pgduckdb/pgduckdb_duckdb.hpp" #include "duckdb.hpp" +#include "duckdb/common/exception.hpp" #include "duckdb/parser/parsed_data/create_table_function_info.hpp" #include "pgduckdb/pgduckdb_guc.h" #include "duckdb/main/extension_util.hpp" @@ -76,23 +77,25 @@ CreateOrGetDirectoryPath(const char *directory_name) { return duckdb_data_directory; } +namespace ddb { bool -DuckdbDidWrites() { +DidWrites() { if (!DuckDBManager::IsInitialized()) { return false; } auto connection = DuckDBManager::GetConnectionUnsafe(); auto &context = *connection->context; - return DuckdbDidWrites(context); + return DidWrites(context); } bool -DuckdbDidWrites(duckdb::ClientContext &context) { +DidWrites(duckdb::ClientContext &context) { if (!context.transaction.HasActiveTransaction()) { return false; } return context.ActiveTransaction().ModifiedDatabase() != nullptr; } +} // namespace ddb DuckDBManager DuckDBManager::manager_instance; @@ -335,13 +338,18 @@ DuckDBManager::GetConnection(bool force_transaction) { if (!context.transaction.HasActiveTransaction()) { if (IsSubTransaction()) { - elog(ERROR, "SAVEPOINT is not supported in DuckDB"); + throw duckdb::NotImplementedException("SAVEPOINT and subtransactions are not supported in DuckDB"); } - if (IsInTransactionBlock() || force_transaction) { + + if (force_transaction || pg::IsInTransactionBlock()) { /* * We only want to open a new DuckDB transaction if we're already * in a Postgres transaction block. Always opening a transaction - * incurs a performance penalty when connecting to + * incurs a significant performance penalty for single statement + * queries on MotherDuck. This is because a second round-trip is + * needed to send the COMMIT to MotherDuck when Postgres its + * transaction finishes. So we only want to do this when actually + * necessary. */ instance.connection->BeginTransaction(); } diff --git a/src/pgduckdb_hooks.cpp b/src/pgduckdb_hooks.cpp index 0e2b7700..23b80b23 100644 --- a/src/pgduckdb_hooks.cpp +++ b/src/pgduckdb_hooks.cpp @@ -150,8 +150,8 @@ IsAllowedStatement(Query *query, bool throw_error = false) { elog(elevel, "DuckDB does not support modififying Postgres tables"); return false; } - if (pgduckdb::IsInTransactionBlock(true)) { - if (pgduckdb::PostgresDidWalWrites()) { + if (pgduckdb::pg::IsInTransactionBlock(true)) { + if (pgduckdb::pg::DidWalWrites()) { elog(elevel, "Writing to DuckDB and Postgres tables in the same transaction block is not supported"); return false; } @@ -203,7 +203,8 @@ DuckdbPlannerHook_Cpp(Query *parse, const char *query_string, int cursor_options } /* If we can't create a plan, we'll fall back to Postgres */ } - if (parse->commandType != CMD_SELECT && pgduckdb::DuckdbDidWrites() && pgduckdb::IsInTransactionBlock(true)) { + if (parse->commandType != CMD_SELECT && pgduckdb::ddb::DidWrites() && + pgduckdb::pg::IsInTransactionBlock(true)) { elog(ERROR, "Writing to DuckDB and Postgres tables in the same transaction block is not supported"); } } @@ -320,7 +321,7 @@ DuckdbExecutorFinishHook_Cpp(QueryDesc *queryDesc) { return; } - if (!pgduckdb::DuckdbDidWrites()) { + if (!pgduckdb::ddb::DidWrites()) { return; } diff --git a/src/pgduckdb_options.cpp b/src/pgduckdb_options.cpp index 5c013159..88904a59 100644 --- a/src/pgduckdb_options.cpp +++ b/src/pgduckdb_options.cpp @@ -302,7 +302,7 @@ DECLARE_PG_FUNCTION(pgduckdb_recycle_ddb) { * transaction might have already started. Recycling the database will * violate our assumptions about DuckDB its transaction lifecycle */ - pgduckdb::PreventInTransactionBlock("duckdb.recycle_ddb()"); + pgduckdb::pg::PreventInTransactionBlock("duckdb.recycle_ddb()"); pgduckdb::DuckDBManager::Get().Reset(); PG_RETURN_BOOL(true); } diff --git a/src/pgduckdb_planner.cpp b/src/pgduckdb_planner.cpp index 75234e3c..c982c49e 100644 --- a/src/pgduckdb_planner.cpp +++ b/src/pgduckdb_planner.cpp @@ -121,8 +121,8 @@ DuckdbPlanNode(Query *parse, const char *query_string, int cursor_options, Param } /* - * If creating a plan for a scrollable cursor, make sure it can run - * backwards on demand. Add a Material node at the top at need. + * If creating a plan for a scrollable cursor add a Material node at the + * top because or CustomScan does not support backwards scanning. */ if (cursor_options & CURSOR_OPT_SCROLL) { duckdb_plan = materialize_finished_plan(duckdb_plan); diff --git a/src/pgduckdb_xact.cpp b/src/pgduckdb_xact.cpp index ca27cbe8..e8d43fc8 100644 --- a/src/pgduckdb_xact.cpp +++ b/src/pgduckdb_xact.cpp @@ -8,6 +8,8 @@ namespace pgduckdb { static int64_t duckdb_command_id = -1; static bool top_level_statement = true; +namespace pg { + /* * Returns if we're currently in a transaction block. To determine if we are in * a function or not, this uses the tracked top_level_statement variable. @@ -26,6 +28,23 @@ PreventInTransactionBlock(const char *statement_type) { PreventInTransactionBlock(top_level_statement, statement_type); } +/* + * Check if Postgres did any writes at the end of a transaction. + * + * We do this by both checking if there were any WAL writes and as an added + * measure if the current command id was incremented more than once after the + * last known DuckDB command. + * + * IMPORTANT: This function should only be called at trasaction commit. At + * other points in the transaction lifecycle its return value is not reliable. + */ +static bool +DidWritesAtTransactionEnd() { + return pg::DidWalWrites() || pg::GetCurrentCommandId() > duckdb_command_id + 1; +} + +} // namespace pg + /* * Claim the current command id as being executed by a DuckDB write query. * @@ -50,13 +69,13 @@ ClaimCurrentCommandId() { * cross-database write. */ bool used = duckdb_command_id == -1; - CommandId new_command_id = GetCurrentCommandId(used); + CommandId new_command_id = pg::GetCurrentCommandId(used); if (new_command_id == duckdb_command_id) { return; } - if (!IsInTransactionBlock()) { + if (!pg::IsInTransactionBlock()) { /* * Allow mixed writes outside of a transaction block, this is needed * for DDL. @@ -97,28 +116,13 @@ MarkStatementNotTopLevel() { */ void AutocommitSingleStatementQueries() { - if (IsInTransactionBlock()) { + if (pg::IsInTransactionBlock()) { /* We're in a transaction block, we can just execute the query */ return; } - PreventInTransactionBlock(top_level_statement, - "BUG: You should never see this error we checked IsInTransactionBlock before."); -} - -/* - * Check if Postgres did any writes at the end of a transaction. - * - * We do this by both checking if there were any WAL writes and as an added - * measure if the current command id was incremented more than once after the - * last known DuckDB command. - * - * IMPORTANT: This function should only be called at trasaction commit. At - * other points in the transaction lifecycle its return value is not reliable. - */ -static bool -PostgresDidWritesAtTransactionEnd() { - return PostgresDidWalWrites() || GetCurrentCommandId() > duckdb_command_id + 1; + pg::PreventInTransactionBlock(top_level_statement, + "BUG: You should never see this error we checked IsInTransactionBlock before."); } static void @@ -144,8 +148,8 @@ DuckdbXactCallback_Cpp(XactEvent event) { switch (event) { case XACT_EVENT_PRE_COMMIT: case XACT_EVENT_PARALLEL_PRE_COMMIT: - if (IsInTransactionBlock(top_level_statement)) { - if (PostgresDidWritesAtTransactionEnd() && DuckdbDidWrites(context)) { + if (pg::IsInTransactionBlock(top_level_statement)) { + if (pg::DidWritesAtTransactionEnd() && ddb::DidWrites(context)) { throw duckdb::NotImplementedException( "Writing to DuckDB and Postgres tables in the same transaction block is not supported"); } @@ -227,8 +231,8 @@ RegisterDuckdbXactCallback() { if (transaction_handler_configured) { return; } - RegisterXactCallback(DuckdbXactCallback, nullptr); - RegisterSubXactCallback(DuckdbSubXactCallback, nullptr); + pg::RegisterXactCallback(DuckdbXactCallback, nullptr); + pg::RegisterSubXactCallback(DuckdbSubXactCallback, nullptr); transaction_handler_configured = true; } } // namespace pgduckdb diff --git a/test/regression/expected/transaction_errors.out b/test/regression/expected/transaction_errors.out index 98ba8923..fa8f7c07 100644 --- a/test/regression/expected/transaction_errors.out +++ b/test/regression/expected/transaction_errors.out @@ -20,5 +20,4 @@ LINE 1: SELECT * FROM do_not_exist; ^ END; WARNING: pgduckdb: IsExtensionRegistered called in an aborted transaction -WARNING: pgduckdb: IsExtensionRegistered called in an aborted transaction CREATE EXTENSION pg_duckdb; diff --git a/test/regression/expected/transactions.out b/test/regression/expected/transactions.out index 6965e88b..4ebebfe7 100644 --- a/test/regression/expected/transactions.out +++ b/test/regression/expected/transactions.out @@ -62,7 +62,7 @@ ROLLBACK;; BEGIN; SAVEPOINT my_savepoint; INSERT INTO t_ddb VALUES (2); -ERROR: SAVEPOINT is not supported in DuckDB +ERROR: (PGDuckDB/DuckdbPlanNode) Not implemented Error: SAVEPOINT and subtransactions are not supported in DuckDB ROLLBACK;; -- Unless the subtransaction is already completed BEGIN;