Skip to content

Commit

Permalink
Apply review feedback & add pg and ddb subnamespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
JelteF committed Nov 27, 2024
1 parent fd185bf commit 504a315
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 53 deletions.
6 changes: 3 additions & 3 deletions include/pgduckdb/pg/transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ typedef enum {
typedef void (*SubXactCallback)(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg);
}

namespace pgduckdb {
bool PostgresDidWalWrites();
namespace pgduckdb::pg {
bool DidWalWrites();
CommandId GetCurrentCommandId(bool used = false);
bool IsInTransactionBlock(bool top_level);
void PreventInTransactionBlock(bool is_top_level, const char *statement_type);
void RegisterXactCallback(XactCallback callback, void *arg);
void UnregisterXactCallback(XactCallback callback, void *arg);
void RegisterSubXactCallback(SubXactCallback callback, void *arg);
void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
} // namespace pgduckdb
} // namespace pgduckdb::pg
6 changes: 4 additions & 2 deletions include/pgduckdb/pgduckdb_duckdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

namespace pgduckdb {

bool DuckdbDidWrites();
bool DuckdbDidWrites(duckdb::ClientContext &context);
namespace ddb {
bool DidWrites();
bool DidWrites(duckdb::ClientContext &context);
} // namespace ddb

class DuckDBManager {
public:
Expand Down
8 changes: 6 additions & 2 deletions include/pgduckdb/pgduckdb_xact.hpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
namespace pgduckdb {

namespace pg {
bool IsInTransactionBlock();
void PreventInTransactionBlock(const char *statement_type);
} // namespace pg

void ClaimCurrentCommandId();
void RegisterDuckdbXactCallback();
void AutocommitSingleStatementQueries();
void MarkStatementNotTopLevel();
bool IsInTransactionBlock();
void PreventInTransactionBlock(const char *statement_type);
} // namespace pgduckdb
6 changes: 3 additions & 3 deletions src/pg/transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ extern "C" {
#include "access/xlog.h" // XactLastRecEnd
}

namespace pgduckdb {
namespace pgduckdb::pg {

bool
PostgresDidWalWrites() {
DidWalWrites() {
return XactLastRecEnd != InvalidXLogRecPtr;
}

Expand Down Expand Up @@ -48,4 +48,4 @@ UnregisterSubXactCallback(SubXactCallback callback, void *arg) {
return PostgresFunctionGuard(::UnregisterSubXactCallback, callback, arg);
}

} // namespace pgduckdb
} // namespace pgduckdb::pg
10 changes: 6 additions & 4 deletions src/pgduckdb_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,12 @@ static void
DuckdbUtilityHook_Cpp(PlannedStmt *pstmt, const char *query_string, bool read_only_tree, ProcessUtilityContext context,
ParamListInfo params, struct QueryEnvironment *query_env, DestReceiver *dest,
QueryCompletion *qc) {
if (!pgduckdb::IsExtensionRegistered()) {
return prev_process_utility_hook(pstmt, query_string, read_only_tree, context, params, query_env, dest, qc);
}

Node *parsetree = pstmt->utilityStmt;
if (pgduckdb::IsExtensionRegistered() && IsA(parsetree, CopyStmt)) {
if (IsA(parsetree, CopyStmt)) {
auto copy_query = PostgresFunctionGuard(MakeDuckdbCopyQuery, pstmt, query_string, query_env);
if (copy_query) {
auto res = pgduckdb::DuckDBQueryOrThrow(copy_query);
Expand All @@ -127,9 +131,7 @@ DuckdbUtilityHook_Cpp(PlannedStmt *pstmt, const char *query_string, bool read_on
bool prev_top_level_ddl = top_level_ddl;
top_level_ddl = context == PROCESS_UTILITY_TOPLEVEL;

if (pgduckdb::IsExtensionRegistered()) {
DuckdbHandleDDL(parsetree);
}
DuckdbHandleDDL(parsetree);
prev_process_utility_hook(pstmt, query_string, read_only_tree, context, params, query_env, dest, qc);

top_level_ddl = prev_top_level_ddl;
Expand Down
20 changes: 14 additions & 6 deletions src/pgduckdb_duckdb.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "pgduckdb/pgduckdb_duckdb.hpp"
#include "duckdb.hpp"
#include "duckdb/common/exception.hpp"
#include "duckdb/parser/parsed_data/create_table_function_info.hpp"
#include "pgduckdb/pgduckdb_guc.h"
#include "duckdb/main/extension_util.hpp"
Expand Down Expand Up @@ -76,23 +77,25 @@ CreateOrGetDirectoryPath(const char *directory_name) {
return duckdb_data_directory;
}

namespace ddb {
bool
DuckdbDidWrites() {
DidWrites() {
if (!DuckDBManager::IsInitialized()) {
return false;
}
auto connection = DuckDBManager::GetConnectionUnsafe();
auto &context = *connection->context;
return DuckdbDidWrites(context);
return DidWrites(context);
}

bool
DuckdbDidWrites(duckdb::ClientContext &context) {
DidWrites(duckdb::ClientContext &context) {
if (!context.transaction.HasActiveTransaction()) {
return false;
}
return context.ActiveTransaction().ModifiedDatabase() != nullptr;
}
} // namespace ddb

DuckDBManager DuckDBManager::manager_instance;

Expand Down Expand Up @@ -335,13 +338,18 @@ DuckDBManager::GetConnection(bool force_transaction) {

if (!context.transaction.HasActiveTransaction()) {
if (IsSubTransaction()) {
elog(ERROR, "SAVEPOINT is not supported in DuckDB");
throw duckdb::NotImplementedException("SAVEPOINT and subtransactions are not supported in DuckDB");
}
if (IsInTransactionBlock() || force_transaction) {

if (force_transaction || pg::IsInTransactionBlock()) {
/*
* We only want to open a new DuckDB transaction if we're already
* in a Postgres transaction block. Always opening a transaction
* incurs a performance penalty when connecting to
* incurs a significant performance penalty for single statement
* queries on MotherDuck. This is because a second round-trip is
* needed to send the COMMIT to MotherDuck when Postgres its
* transaction finishes. So we only want to do this when actually
* necessary.
*/
instance.connection->BeginTransaction();
}
Expand Down
9 changes: 5 additions & 4 deletions src/pgduckdb_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ IsAllowedStatement(Query *query, bool throw_error = false) {
elog(elevel, "DuckDB does not support modififying Postgres tables");
return false;
}
if (pgduckdb::IsInTransactionBlock(true)) {
if (pgduckdb::PostgresDidWalWrites()) {
if (pgduckdb::pg::IsInTransactionBlock(true)) {
if (pgduckdb::pg::DidWalWrites()) {
elog(elevel, "Writing to DuckDB and Postgres tables in the same transaction block is not supported");
return false;
}
Expand Down Expand Up @@ -203,7 +203,8 @@ DuckdbPlannerHook_Cpp(Query *parse, const char *query_string, int cursor_options
}
/* If we can't create a plan, we'll fall back to Postgres */
}
if (parse->commandType != CMD_SELECT && pgduckdb::DuckdbDidWrites() && pgduckdb::IsInTransactionBlock(true)) {
if (parse->commandType != CMD_SELECT && pgduckdb::ddb::DidWrites() &&
pgduckdb::pg::IsInTransactionBlock(true)) {
elog(ERROR, "Writing to DuckDB and Postgres tables in the same transaction block is not supported");
}
}
Expand Down Expand Up @@ -320,7 +321,7 @@ DuckdbExecutorFinishHook_Cpp(QueryDesc *queryDesc) {
return;
}

if (!pgduckdb::DuckdbDidWrites()) {
if (!pgduckdb::ddb::DidWrites()) {
return;
}

Expand Down
2 changes: 1 addition & 1 deletion src/pgduckdb_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ DECLARE_PG_FUNCTION(pgduckdb_recycle_ddb) {
* transaction might have already started. Recycling the database will
* violate our assumptions about DuckDB its transaction lifecycle
*/
pgduckdb::PreventInTransactionBlock("duckdb.recycle_ddb()");
pgduckdb::pg::PreventInTransactionBlock("duckdb.recycle_ddb()");
pgduckdb::DuckDBManager::Get().Reset();
PG_RETURN_BOOL(true);
}
Expand Down
4 changes: 2 additions & 2 deletions src/pgduckdb_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ DuckdbPlanNode(Query *parse, const char *query_string, int cursor_options, Param
}

/*
* If creating a plan for a scrollable cursor, make sure it can run
* backwards on demand. Add a Material node at the top at need.
* If creating a plan for a scrollable cursor add a Material node at the
* top because or CustomScan does not support backwards scanning.
*/
if (cursor_options & CURSOR_OPT_SCROLL) {
duckdb_plan = materialize_finished_plan(duckdb_plan);
Expand Down
52 changes: 28 additions & 24 deletions src/pgduckdb_xact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace pgduckdb {
static int64_t duckdb_command_id = -1;
static bool top_level_statement = true;

namespace pg {

/*
* Returns if we're currently in a transaction block. To determine if we are in
* a function or not, this uses the tracked top_level_statement variable.
Expand All @@ -26,6 +28,23 @@ PreventInTransactionBlock(const char *statement_type) {
PreventInTransactionBlock(top_level_statement, statement_type);
}

/*
* Check if Postgres did any writes at the end of a transaction.
*
* We do this by both checking if there were any WAL writes and as an added
* measure if the current command id was incremented more than once after the
* last known DuckDB command.
*
* IMPORTANT: This function should only be called at trasaction commit. At
* other points in the transaction lifecycle its return value is not reliable.
*/
static bool
DidWritesAtTransactionEnd() {
return pg::DidWalWrites() || pg::GetCurrentCommandId() > duckdb_command_id + 1;
}

} // namespace pg

/*
* Claim the current command id as being executed by a DuckDB write query.
*
Expand All @@ -50,13 +69,13 @@ ClaimCurrentCommandId() {
* cross-database write.
*/
bool used = duckdb_command_id == -1;
CommandId new_command_id = GetCurrentCommandId(used);
CommandId new_command_id = pg::GetCurrentCommandId(used);

if (new_command_id == duckdb_command_id) {
return;
}

if (!IsInTransactionBlock()) {
if (!pg::IsInTransactionBlock()) {
/*
* Allow mixed writes outside of a transaction block, this is needed
* for DDL.
Expand Down Expand Up @@ -97,28 +116,13 @@ MarkStatementNotTopLevel() {
*/
void
AutocommitSingleStatementQueries() {
if (IsInTransactionBlock()) {
if (pg::IsInTransactionBlock()) {
/* We're in a transaction block, we can just execute the query */
return;
}

PreventInTransactionBlock(top_level_statement,
"BUG: You should never see this error we checked IsInTransactionBlock before.");
}

/*
* Check if Postgres did any writes at the end of a transaction.
*
* We do this by both checking if there were any WAL writes and as an added
* measure if the current command id was incremented more than once after the
* last known DuckDB command.
*
* IMPORTANT: This function should only be called at trasaction commit. At
* other points in the transaction lifecycle its return value is not reliable.
*/
static bool
PostgresDidWritesAtTransactionEnd() {
return PostgresDidWalWrites() || GetCurrentCommandId() > duckdb_command_id + 1;
pg::PreventInTransactionBlock(top_level_statement,
"BUG: You should never see this error we checked IsInTransactionBlock before.");
}

static void
Expand All @@ -144,8 +148,8 @@ DuckdbXactCallback_Cpp(XactEvent event) {
switch (event) {
case XACT_EVENT_PRE_COMMIT:
case XACT_EVENT_PARALLEL_PRE_COMMIT:
if (IsInTransactionBlock(top_level_statement)) {
if (PostgresDidWritesAtTransactionEnd() && DuckdbDidWrites(context)) {
if (pg::IsInTransactionBlock(top_level_statement)) {
if (pg::DidWritesAtTransactionEnd() && ddb::DidWrites(context)) {
throw duckdb::NotImplementedException(
"Writing to DuckDB and Postgres tables in the same transaction block is not supported");
}
Expand Down Expand Up @@ -227,8 +231,8 @@ RegisterDuckdbXactCallback() {
if (transaction_handler_configured) {
return;
}
RegisterXactCallback(DuckdbXactCallback, nullptr);
RegisterSubXactCallback(DuckdbSubXactCallback, nullptr);
pg::RegisterXactCallback(DuckdbXactCallback, nullptr);
pg::RegisterSubXactCallback(DuckdbSubXactCallback, nullptr);
transaction_handler_configured = true;
}
} // namespace pgduckdb
1 change: 0 additions & 1 deletion test/regression/expected/transaction_errors.out
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,4 @@ LINE 1: SELECT * FROM do_not_exist;
^
END;
WARNING: pgduckdb: IsExtensionRegistered called in an aborted transaction
WARNING: pgduckdb: IsExtensionRegistered called in an aborted transaction
CREATE EXTENSION pg_duckdb;
2 changes: 1 addition & 1 deletion test/regression/expected/transactions.out
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ ROLLBACK;;
BEGIN;
SAVEPOINT my_savepoint;
INSERT INTO t_ddb VALUES (2);
ERROR: SAVEPOINT is not supported in DuckDB
ERROR: (PGDuckDB/DuckdbPlanNode) Not implemented Error: SAVEPOINT and subtransactions are not supported in DuckDB
ROLLBACK;;
-- Unless the subtransaction is already completed
BEGIN;
Expand Down

0 comments on commit 504a315

Please sign in to comment.