Skip to content

Commit

Permalink
Support multi-statement transactions
Browse files Browse the repository at this point in the history
Only supports multi-statement transactions for non-DDL queries.
  • Loading branch information
JelteF committed Nov 26, 2024
1 parent d53247f commit fcf7a01
Show file tree
Hide file tree
Showing 26 changed files with 904 additions and 133 deletions.
6 changes: 6 additions & 0 deletions include/pgduckdb/pg/declarations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ struct TupleDescData;
typedef struct TupleDescData *TupleDesc;

struct TupleTableSlot;

struct TableAmRoutine;

typedef uint32_t CommandId;

typedef uint32_t SubTransactionId;
}
44 changes: 44 additions & 0 deletions include/pgduckdb/pg/transactions.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#pragma once

#include "pgduckdb/pg/declarations.hpp"

extern "C" {
extern bool IsSubTransaction(void);

/*
* These enum definitions are vendored in so we can implement a postgres
* XactCallback in C++. It's not expected that these will ever change.
*/
typedef enum {
XACT_EVENT_COMMIT,
XACT_EVENT_PARALLEL_COMMIT,
XACT_EVENT_ABORT,
XACT_EVENT_PARALLEL_ABORT,
XACT_EVENT_PREPARE,
XACT_EVENT_PRE_COMMIT,
XACT_EVENT_PARALLEL_PRE_COMMIT,
XACT_EVENT_PRE_PREPARE,
} XactEvent;

typedef void (*XactCallback)(XactEvent event, void *arg);

typedef enum {
SUBXACT_EVENT_START_SUB,
SUBXACT_EVENT_COMMIT_SUB,
SUBXACT_EVENT_ABORT_SUB,
SUBXACT_EVENT_PRE_COMMIT_SUB,
} SubXactEvent;

typedef void (*SubXactCallback)(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg);
}

namespace pgduckdb {
bool PostgresDidWalWrites();
CommandId GetCurrentCommandId(bool used = false);
bool IsInTransactionBlock(bool top_level);
void PreventInTransactionBlock(bool is_top_level, const char *statement_type);
void RegisterXactCallback(XactCallback callback, void *arg);
void UnregisterXactCallback(XactCallback callback, void *arg);
void RegisterSubXactCallback(SubXactCallback callback, void *arg);
void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
} // namespace pgduckdb
2 changes: 1 addition & 1 deletion include/pgduckdb/pgduckdb_ddl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

#include "pgduckdb/pg/declarations.hpp"

void DuckdbHandleDDL(Node *ParseTree);
void DuckdbTruncateTable(Oid relation_oid);
void DuckdbInitUtilityHook();
19 changes: 13 additions & 6 deletions include/pgduckdb/pgduckdb_duckdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@

namespace pgduckdb {

extern bool started_duckdb_transaction;
bool DuckdbDidWrites();
bool DuckdbDidWrites(duckdb::ClientContext &context);

class DuckDBManager {
public:
static inline bool
IsInitialized() {
return manager_instance.database != nullptr;
}

static inline DuckDBManager &
Get() {
static DuckDBManager instance;
if (!instance.database) {
instance.Initialize();
if (!manager_instance.database) {
manager_instance.Initialize();
}
return instance;
return manager_instance;
}

static duckdb::unique_ptr<duckdb::Connection> CreateConnection();
static duckdb::Connection *GetConnection();
static duckdb::Connection *GetConnection(bool force_transaction = false);
static duckdb::Connection *GetConnectionUnsafe();

inline const std::string &
Expand All @@ -28,12 +33,14 @@ class DuckDBManager {

void
Reset() {
connection = nullptr;
delete database;
database = nullptr;
}

private:
DuckDBManager();
static DuckDBManager manager_instance;

void Initialize();

Expand Down
5 changes: 1 addition & 4 deletions include/pgduckdb/pgduckdb_table_am.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
extern "C" {
#include "postgres.h"
#include "access/tableam.h"
}
#include "pgduckdb/pg/declarations.hpp"

namespace pgduckdb {
bool IsDuckdbTableAm(const TableAmRoutine *am);
Expand Down
7 changes: 6 additions & 1 deletion include/pgduckdb/pgduckdb_xact.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
namespace pgduckdb {
void ClaimCurrentCommandId();
void RegisterDuckdbXactCallback();
}
void AutocommitSingleStatementQueries();
void MarkStatementNotTopLevel();
bool IsInTransactionBlock();
void PreventInTransactionBlock(const char *statement_type);
} // namespace pgduckdb
6 changes: 6 additions & 0 deletions sql/pg_duckdb--0.1.0--0.2.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,9 @@ CREATE FUNCTION duckdb.cache_delete(cache_key TEXT)
SET search_path = pg_catalog, pg_temp
LANGUAGE C AS 'MODULE_PATHNAME', 'cache_delete';
REVOKE ALL ON FUNCTION duckdb.cache_delete(cache_key TEXT) FROM PUBLIC;

DROP FUNCTION duckdb.recycle_ddb();
CREATE PROCEDURE duckdb.recycle_ddb()
SET search_path = pg_catalog, pg_temp
LANGUAGE C AS 'MODULE_PATHNAME', 'pgduckdb_recycle_ddb';
REVOKE ALL ON PROCEDURE duckdb.recycle_ddb() FROM PUBLIC;
1 change: 0 additions & 1 deletion src/catalog/pgduckdb_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ SchemaItems::GetTable(const duckdb::string &entry_name) {
return nullptr; // Table could not be found
}


Relation rel = PostgresTable::OpenRelation(rel_oid);
if (IsRelView(rel)) {
// Let the replacement scan handle this, the ReplacementScan replaces the view with its view_definition, which
Expand Down
51 changes: 51 additions & 0 deletions src/pg/transactions.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include "pgduckdb/pgduckdb_utils.hpp"

extern "C" {
#include "postgres.h"
#include "access/xact.h" // RegisterXactCallback, XactEvent, SubXactEvent, SubTransactionId
#include "access/xlog.h" // XactLastRecEnd
}

namespace pgduckdb {

bool
PostgresDidWalWrites() {
return XactLastRecEnd != InvalidXLogRecPtr;
}

CommandId
GetCurrentCommandId(bool used = false) {
return PostgresFunctionGuard(::GetCurrentCommandId, used);
}

bool
IsInTransactionBlock(bool is_top_level) {
return PostgresFunctionGuard(::IsInTransactionBlock, is_top_level);
}

void
PreventInTransactionBlock(bool is_top_level, const char *statement_type) {
return PostgresFunctionGuard(::PreventInTransactionBlock, is_top_level, statement_type);
}

void
RegisterXactCallback(XactCallback callback, void *arg) {
return PostgresFunctionGuard(::RegisterXactCallback, callback, arg);
}

void
UnregisterXactCallback(XactCallback callback, void *arg) {
return PostgresFunctionGuard(::UnregisterXactCallback, callback, arg);
}

void
RegisterSubXactCallback(SubXactCallback callback, void *arg) {
return PostgresFunctionGuard(::RegisterSubXactCallback, callback, arg);
}

void
UnregisterSubXactCallback(SubXactCallback callback, void *arg) {
return PostgresFunctionGuard(::UnregisterSubXactCallback, callback, arg);
}

} // namespace pgduckdb
2 changes: 2 additions & 0 deletions src/pgduckdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extern "C" {
#include "pgduckdb/pgduckdb.h"
#include "pgduckdb/pgduckdb_node.hpp"
#include "pgduckdb/pgduckdb_background_worker.hpp"
#include "pgduckdb/pgduckdb_xact.hpp"

static void DuckdbInitGUC(void);

Expand Down Expand Up @@ -39,6 +40,7 @@ _PG_init(void) {
DuckdbInitHooks();
DuckdbInitNode();
DuckdbInitBackgroundWorker();
pgduckdb::RegisterDuckdbXactCallback();
}
} // extern "C"

Expand Down
107 changes: 83 additions & 24 deletions src/pgduckdb_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ extern "C" {
#include "pgduckdb/pgduckdb_background_worker.hpp"
#include "pgduckdb/pgduckdb_metadata_cache.hpp"
#include "pgduckdb/pgduckdb_utils.hpp"
#include "pgduckdb/utility/copy.hpp"
#include "pgduckdb/vendor/pg_list.hpp"
#include <inttypes.h>

Expand All @@ -41,16 +42,10 @@ extern "C" {
*/
static bool ctas_skip_data = false;

/*
* Truncates the given table in DuckDB.
*/
void
DuckdbTruncateTable(Oid relation_oid) {
auto name = PostgresFunctionGuard(pgduckdb_relation_name, relation_oid);
pgduckdb::DuckDBQueryOrThrow(std::string("TRUNCATE ") + name);
}
static bool top_level_ddl = true;
static ProcessUtility_hook_type prev_process_utility_hook = NULL;

void
static void
DuckdbHandleDDL(Node *parsetree) {
if (!pgduckdb::IsExtensionRegistered()) {
/* We're not installed, so don't mess with the query */
Expand Down Expand Up @@ -101,6 +96,66 @@ DuckdbHandleDDL(Node *parsetree) {
}
}

static void
DuckdbUtilityHook_Cpp(PlannedStmt *pstmt, const char *query_string, bool read_only_tree, ProcessUtilityContext context,
ParamListInfo params, struct QueryEnvironment *query_env, DestReceiver *dest,
QueryCompletion *qc) {
Node *parsetree = pstmt->utilityStmt;
if (pgduckdb::IsExtensionRegistered() && IsA(parsetree, CopyStmt)) {
auto copy_query = PostgresFunctionGuard(MakeDuckdbCopyQuery, pstmt, query_string, query_env);
if (copy_query) {
auto res = pgduckdb::DuckDBQueryOrThrow(copy_query);
auto chunk = res->Fetch();
auto processed = chunk->GetValue(0, 0).GetValue<uint64_t>();
if (qc) {
SetQueryCompletion(qc, CMDTAG_COPY, processed);
}
return;
}
}

/*
* We need this prev_top_level_ddl variable because top_level_ddl because
* its possible that the first DDL command then triggers a second DDL
* command. The first of which is at the top level, but the second of which
* is not. So this way we make sure that the global top_level_ddl
* variable matches whichever level we're currently executing.
*
* NOTE: We don't care about resetting the global variable in case of an
* error, because we'll set it correctly for the next command anyway.
*/
bool prev_top_level_ddl = top_level_ddl;
top_level_ddl = context == PROCESS_UTILITY_TOPLEVEL;

if (pgduckdb::IsExtensionRegistered()) {
DuckdbHandleDDL(parsetree);
}
prev_process_utility_hook(pstmt, query_string, read_only_tree, context, params, query_env, dest, qc);

top_level_ddl = prev_top_level_ddl;
}

static void
DuckdbUtilityHook(PlannedStmt *pstmt, const char *query_string, bool read_only_tree, ProcessUtilityContext context,
ParamListInfo params, struct QueryEnvironment *query_env, DestReceiver *dest, QueryCompletion *qc) {
InvokeCPPFunc(DuckdbUtilityHook_Cpp, pstmt, query_string, read_only_tree, context, params, query_env, dest, qc);
}

void
DuckdbInitUtilityHook() {
prev_process_utility_hook = ProcessUtility_hook ? ProcessUtility_hook : standard_ProcessUtility;
ProcessUtility_hook = DuckdbUtilityHook;
}

/*
* Truncates the given table in DuckDB.
*/
void
DuckdbTruncateTable(Oid relation_oid) {
auto name = PostgresFunctionGuard(pgduckdb_relation_name, relation_oid);
pgduckdb::DuckDBQueryOrThrow(std::string("TRUNCATE ") + name);
}

/*
* Throws an error when an unsupported ON COMMIT clause is used. DuckDB does
* not support ON COMMIT DROP, and it's difficult to emulate because Postgres
Expand Down Expand Up @@ -259,11 +314,10 @@ DECLARE_PG_FUNCTION(duckdb_create_table_trigger) {
}

/*
* For now, we don't support DuckDB queries in transactions. To support
* write queries in transactions we'll need to link Postgres and DuckdB
* their transaction lifecycles.
* For now, we don't support DuckDB DDL queries in transactions,
* because they write to both the Postgres and the DuckDB database.
*/
PreventInTransactionBlock(true, "DuckDB queries");
PreventInTransactionBlock(top_level_ddl, "DuckDB DDL statements");

if (IsA(parsetree, CreateStmt)) {
auto stmt = castNode(CreateStmt, parsetree);
Expand All @@ -282,7 +336,9 @@ DECLARE_PG_FUNCTION(duckdb_create_table_trigger) {
*/
std::string create_table_string(pgduckdb_get_tabledef(relid));

auto connection = pgduckdb::DuckDBManager::GetConnection();
/* We're going to run multiple queries in DuckDB, so we need to start a
* transaction to ensure ACID guarantees hold. */
auto connection = pgduckdb::DuckDBManager::GetConnection(true);
Query *ctas_query = nullptr;

if (IsA(parsetree, CreateTableAsStmt) && !ctas_skip_data) {
Expand Down Expand Up @@ -421,12 +477,14 @@ DECLARE_PG_FUNCTION(duckdb_drop_trigger) {
for (uint64_t proc = 0; proc < SPI_processed; ++proc) {
if (!connection) {
/*
* For now, we don't support DuckDB queries in transactions. To support
* write queries in transactions we'll need to link Postgres and DuckdB
* their transaction lifecycles.
* For now, we don't support DuckDB DDL queries in
* transactions, because they write to both the Postgres and
* the DuckDB database.
*/
PreventInTransactionBlock(true, "DuckDB queries");
connection = pgduckdb::DuckDBManager::GetConnection();
PreventInTransactionBlock(top_level_ddl, "DuckDB DDL statements");
/* We're going to run multiple queries in DuckDB, so we need to
* start a transaction to ensure ACID guarantees hold. */
connection = pgduckdb::DuckDBManager::GetConnection(true);
}
HeapTuple tuple = SPI_tuptable->vals[proc];

Expand Down Expand Up @@ -468,12 +526,13 @@ DECLARE_PG_FUNCTION(duckdb_drop_trigger) {
}
if (!connection) {
/*
* For now, we don't support DuckDB queries in transactions. To support
* write queries in transactions we'll need to link Postgres and DuckdB
* their transaction lifecycles.
* For now, we don't support DuckDB DDL queries in transactions,
* because they write to both the Postgres and the DuckDB database.
*/
PreventInTransactionBlock(true, "DuckDB queries");
connection = pgduckdb::DuckDBManager::GetConnection();
PreventInTransactionBlock(top_level_ddl, "DuckDB DDL statements");
/* We're going to run multiple queries in DuckDB, so we need to
* start a transaction to ensure ACID guarantees hold. */
connection = pgduckdb::DuckDBManager::GetConnection(true);
}
char *table_name = SPI_getvalue(tuple, SPI_tuptable->tupdesc, 2);
pgduckdb::DuckDBQueryOrThrow(*connection,
Expand Down
Loading

0 comments on commit fcf7a01

Please sign in to comment.