Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add neon.logical_replication_max_logicalsnapdir_size #9467

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 114 additions & 33 deletions pgxn/neon/logical_replication_monitor.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#include <dirent.h>
#include <limits.h>
#include <string.h>
#include <dirent.h>
#include <signal.h>
#include <sys/stat.h>

#include "postgres.h"

Expand All @@ -21,17 +22,35 @@

static int logical_replication_max_snap_files = 300;

/*
* According to Chi (shyzh), the pageserver _should_ be good with 10 MB worth of
* snapshot files. Let's use 8 MB since 8 is a power of 2.
*/
static int logical_replication_max_logicalsnapdir_size = 8000;

/*
* A primitive description of a logical snapshot file including the LSN of the
* file and its size.
*/
typedef struct SnapDesc {
XLogRecPtr lsn;
off_t sz;
} SnapDesc;

PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);

/*
* Sorts an array of snapshot descriptors by their LSN.
*/
static int
LsnDescComparator(const void *a, const void *b)
SnapDescComparator(const void *a, const void *b)
{
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
const SnapDesc *desc1 = a;
const SnapDesc *desc2 = b;

if (lsn1 < lsn2)
if (desc1->lsn < desc2->lsn)
return 1;
else if (lsn1 == lsn2)
else if (desc1->lsn == desc2->lsn)
return 0;
else
return -1;
Expand All @@ -43,28 +62,39 @@ LsnDescComparator(const void *a, const void *b)
* slots having lower restart_lsn should be dropped.
*/
static XLogRecPtr
get_num_snap_files_lsn_threshold(void)
get_snapshots_cutoff_lsn(void)
{
/* PG 18 has a constant defined for this, PG_LOGICAL_SNAPSHOTS_DIR */
#define SNAPDIR "pg_logical/snapshots"

DIR *dirdesc;
int dirdesc_fd;
struct dirent *de;
char *snap_path = "pg_logical/snapshots/";
int lsns_allocated = 1024;
int lsns_num = 0;
XLogRecPtr *lsns;
XLogRecPtr cutoff;
size_t snapshot_index = 0;
SnapDesc *snapshot_descriptors;
size_t descriptors_allocated = 1024;
XLogRecPtr cutoff = 0;
off_t logicalsnapdir_size = 0;
const int logical_replication_max_logicalsnapdir_size_bytes = logical_replication_max_logicalsnapdir_size * 1000;

if (logical_replication_max_snap_files < 0)
if (logical_replication_max_snap_files < 0 && logical_replication_max_logicalsnapdir_size < 0)
return 0;

lsns = palloc(sizeof(XLogRecPtr) * lsns_allocated);
snapshot_descriptors = palloc(sizeof(*snapshot_descriptors) * descriptors_allocated);

dirdesc = AllocateDir(SNAPDIR);
dirdesc_fd = dirfd(dirdesc);
if (dirdesc_fd == -1)
ereport(ERROR, errmsg("failed to get a file descriptor for " SNAPDIR ": %m"));

/* find all .snap files and get their lsns */
dirdesc = AllocateDir(snap_path);
knizhnik marked this conversation as resolved.
Show resolved Hide resolved
while ((de = ReadDir(dirdesc, snap_path)) != NULL)
while ((de = ReadDir(dirdesc, SNAPDIR)) != NULL)
{
XLogRecPtr lsn;
uint32 hi;
uint32 lo;
struct stat st;
XLogRecPtr lsn;
SnapDesc *desc;

if (strcmp(de->d_name, ".") == 0 ||
strcmp(de->d_name, "..") == 0)
Expand All @@ -79,28 +109,69 @@ get_num_snap_files_lsn_threshold(void)

lsn = ((uint64) hi) << 32 | lo;
elog(DEBUG5, "found snap file %X/%X", LSN_FORMAT_ARGS(lsn));
if (lsns_allocated == lsns_num)

if (fstatat(dirdesc_fd, de->d_name, &st, 0) == -1)
ereport(ERROR, errmsg("failed to get the size of " SNAPDIR "/%s: %m", de->d_name));

if (descriptors_allocated == snapshot_index)
{
lsns_allocated *= 2;
lsns = repalloc(lsns, sizeof(XLogRecPtr) * lsns_allocated);
descriptors_allocated *= 2;
snapshot_descriptors = repalloc(snapshot_descriptors, sizeof(*snapshot_descriptors) * descriptors_allocated);
}
lsns[lsns_num++] = lsn;

desc = &snapshot_descriptors[snapshot_index++];
desc->lsn = lsn;
desc->sz = st.st_size;
}
/* sort by lsn desc */
qsort(lsns, lsns_num, sizeof(XLogRecPtr), LsnDescComparator);
/* and take cutoff at logical_replication_max_snap_files */
if (logical_replication_max_snap_files > lsns_num)
cutoff = 0;
/* have less files than cutoff */
else

qsort(snapshot_descriptors, snapshot_index, sizeof(*snapshot_descriptors), SnapDescComparator);

/* Are there more snapshot files than specified? */
if (logical_replication_max_snap_files <= snapshot_index)
{
cutoff = lsns[logical_replication_max_snap_files - 1];
elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %d .snap files, limit is %d",
LSN_FORMAT_ARGS(cutoff), lsns_num, logical_replication_max_snap_files);
cutoff = snapshot_descriptors[logical_replication_max_snap_files - 1].lsn;
elog(LOG,
"ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %zu snapshot files, limit is %d",
LSN_FORMAT_ARGS(cutoff), snapshot_index, logical_replication_max_snap_files);
}
pfree(lsns);

/* Is the size of the logical snapshots directory larger than specified?
*
* It's possible we could hit both thresholds, so remove any extra files
* first, and then truncate based on size of the remaining files.
*/
if (logicalsnapdir_size > logical_replication_max_logicalsnapdir_size_bytes)
{
/* Unfortunately, iterating the directory does not guarantee any order
* so we can't cache an index in the preceding loop.
*/

off_t sz;
const XLogRecPtr original = cutoff;

sz = snapshot_descriptors[0].sz;
for (size_t i = 1; i < logical_replication_max_snap_files; ++i)
{
if (sz > logical_replication_max_logicalsnapdir_size_bytes)
{
cutoff = snapshot_descriptors[i - 1].lsn;
break;
}

sz += snapshot_descriptors[i].sz;
}

tristan957 marked this conversation as resolved.
Show resolved Hide resolved
if (cutoff != original)
elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower than %X/%X, " SNAPDIR " is larger than %d KB",
LSN_FORMAT_ARGS(cutoff), logical_replication_max_logicalsnapdir_size);
}

pfree(snapshot_descriptors);
FreeDir(dirdesc);

return cutoff;

#undef SNAPDIR
}

void
Expand All @@ -118,6 +189,16 @@ InitLogicalReplicationMonitor(void)
0,
NULL, NULL, NULL);

DefineCustomIntVariable(
"neon.logical_replication_max_logicalsnapdir_size",
"Maximum allowed size of the pg_logical/snapshots directory (KB). When exceeded, slots are dropped until the limit is met. -1 disables the limit.",
NULL,
&logical_replication_max_logicalsnapdir_size,
8000, -1, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_KB,
NULL, NULL, NULL);

memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
Expand Down Expand Up @@ -162,7 +243,7 @@ LogicalSlotsMonitorMain(Datum main_arg)
* If there are too many .snap files, just drop all logical slots to
* prevent aux files bloat.
*/
cutoff_lsn = get_num_snap_files_lsn_threshold();
cutoff_lsn = get_snapshots_cutoff_lsn();
if (cutoff_lsn > 0)
{
for (int i = 0; i < max_replication_slots; i++)
Expand Down
Loading