Skip to content

Commit

Permalink
Prevent race condition when starting bgw
Browse files Browse the repository at this point in the history
  • Loading branch information
Y-- committed Jan 17, 2025
1 parent ff48a68 commit 9afec02
Showing 1 changed file with 43 additions and 4 deletions.
47 changes: 43 additions & 4 deletions src/pgduckdb_background_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern "C" {
#include "miscadmin.h"
#include "pgstat.h"
#include "executor/spi.h"
#include "common/file_utils.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "storage/ipc.h"
Expand Down Expand Up @@ -57,10 +58,17 @@ static bool is_background_worker = false;
static std::unordered_map<std::string, std::string> last_known_motherduck_catalog_versions;
static uint64 initial_cache_version = 0;

bool CanTakeLockForDatabase(Oid database_oid);

extern "C" {

PGDLLEXPORT void
pgduckdb_background_worker_main(Datum /* main_arg */) {
elog(LOG, "started pg_duckdb background worker");
if (!CanTakeLockForDatabase(0)) {
elog(LOG, "pg_duckdb background worker: could not take lock for database '%u'. Will exit.", 0);
return;
}
// Set up a signal handler for SIGTERM
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
Expand Down Expand Up @@ -124,9 +132,10 @@ force_motherduck_sync(PG_FUNCTION_ARGS) {
}
}

constexpr const char* PGDUCKDB_SYNC_WORKER_NAME = "pg_duckdb sync worker";
constexpr const char *PGDUCKDB_SYNC_WORKER_NAME = "pg_duckdb sync worker";

bool HasBgwRunningForMyDatabase() {
bool
HasBgwRunningForMyDatabase() {
const auto num_backends = pgstat_fetch_stat_numbackends();
for (int backend_idx = 1; backend_idx <= num_backends; ++backend_idx) {
LocalPgBackendStatus *local_beentry = pgstat_get_local_beentry_by_index(backend_idx);
Expand All @@ -151,6 +160,35 @@ bool HasBgwRunningForMyDatabase() {
return false;
}

/*
Attempts to take a lock on a file named 'pgduckdb_worker_<database_oid>.lock'
If the lock is taken, the function returns true. If the lock is not taken, the function returns false.
*/
bool
CanTakeLockForDatabase(Oid database_oid) {
char lock_file_name[MAXPGPATH];
snprintf(lock_file_name, MAXPGPATH, "%s/%s.pgduckdb_worker.%d", DataDir, PG_TEMP_FILE_PREFIX, database_oid);

auto fd = open(lock_file_name, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
if (fd < 0) {
auto err = strerror(errno);
elog(ERROR, "Could not take lock on file '%s': %s", lock_file_name, err);
}

// Take exclusive lock on the file
auto ret = flock(fd, LOCK_EX | LOCK_NB);
if (ret == EWOULDBLOCK) {
return false;
}

if (ret != 0) {
auto err = strerror(errno);
elog(ERROR, "Could not take lock on file '%s': %s", lock_file_name, err);
}

return true;
}

/*
Will start the background worker if:
- MotherDuck is enabled (TODO: should be database-specific)
Expand All @@ -159,12 +197,12 @@ Will start the background worker if:
void
StartBackgroundWorkerIfNeeded(void) {
if (!pgduckdb::IsMotherDuckEnabledAnywhere()) {
elog(DEBUG1, "pg_duckdb background worker not started because MotherDuck is not enabled");
elog(DEBUG3, "pg_duckdb background worker not started because MotherDuck is not enabled");
return;
}

if (HasBgwRunningForMyDatabase()) {
elog(DEBUG1, "pg_duckdb background worker already running for database %u", MyDatabaseId);
elog(DEBUG3, "pg_duckdb background worker already running for database %u", MyDatabaseId);
return;
}

Expand Down Expand Up @@ -640,6 +678,7 @@ SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade) {
if (current_motherduck_catalog_version) {
pfree(current_motherduck_catalog_version);
}

current_motherduck_catalog_version = pstrdup(catalog_version.c_str());
MemoryContextSwitchTo(old_context);

Expand Down

0 comments on commit 9afec02

Please sign in to comment.