diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index c3ed96710a6df..7ce2f9abf67c8 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -45,10 +45,13 @@ #include "storage/ipc.h" #endif +#include + PG_MODULE_MAGIC; void _PG_init(void); static int logical_replication_max_snap_files = 300; +static int logical_replication_max_logicalsnapdir_size = 128000; static int running_xacts_overflow_policy; @@ -82,6 +85,15 @@ static const struct config_enum_entry running_xacts_overflow_policies[] = { {NULL, 0, false} }; +/* + * A primitive description of a logical snapshot file including the LSN of the + * file and its size. + */ +typedef struct SnapDesc { + XLogRecPtr lsn; + off_t sz; +} SnapDesc; + static void InitLogicalReplicationMonitor(void) { @@ -97,6 +109,16 @@ InitLogicalReplicationMonitor(void) 0, NULL, NULL, NULL); + DefineCustomIntVariable( + "neon.logical_replication_max_logicalsnapdir_size", + "Maximum allowed size of the pg_logical/snapshots directory (KB). When exceeded, slots are dropped until the limit is met. -1 disables the limit.", + NULL, + &logical_replication_max_snap_files, + 300, -1, INT_MAX, + PGC_SIGHUP, + GUC_UNIT_KB, + NULL, NULL, NULL); + memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; @@ -111,15 +133,18 @@ InitLogicalReplicationMonitor(void) RegisterBackgroundWorker(&bgw); } +/* + * Sorts an array of snapshot descriptors by their LSN. + */ static int -LsnDescComparator(const void *a, const void *b) +SnapDescComparator(const void *a, const void *b) { - XLogRecPtr lsn1 = *((const XLogRecPtr *) a); - XLogRecPtr lsn2 = *((const XLogRecPtr *) b); + const SnapDesc *desc1 = a; + const SnapDesc *desc2 = b; - if (lsn1 < lsn2) + if (desc1->lsn < desc2->lsn) return 1; - else if (lsn1 == lsn2) + else if (desc1->lsn == desc2->lsn) return 0; else return -1; @@ -131,28 +156,37 @@ LsnDescComparator(const void *a, const void *b) * slots having lower restart_lsn should be dropped. */ static XLogRecPtr -get_num_snap_files_lsn_threshold(void) +get_snapshots_cutoff_lsn(void) { +#define SNAPDIR "pg_logical/snapshots" + DIR *dirdesc; + int dirdesc_fd; struct dirent *de; - char *snap_path = "pg_logical/snapshots/"; - int lsns_allocated = 1024; - int lsns_num = 0; - XLogRecPtr *lsns; - XLogRecPtr cutoff; + size_t snapshot_index = 0; + SnapDesc *snapshot_descriptors; + size_t descriptors_allocated = 1024; + XLogRecPtr cutoff = 0; + off_t logicalsnapdir_size; - if (logical_replication_max_snap_files < 0) + if (logical_replication_max_snap_files < 0 || logical_replication_max_logicalsnapdir_size < 0) return 0; - lsns = palloc(sizeof(XLogRecPtr) * lsns_allocated); + snapshot_descriptors = palloc(sizeof(*snapshot_descriptors) * descriptors_allocated); + + dirdesc = AllocateDir(SNAPDIR); + dirdesc_fd = dirfd(dirdesc); + if (dirdesc_fd == -1) + ereport(ERROR, errmsg("failed to get a file descriptor for " SNAPDIR ": %m")); /* find all .snap files and get their lsns */ - dirdesc = AllocateDir(snap_path); - while ((de = ReadDir(dirdesc, snap_path)) != NULL) + while ((de = ReadDir(dirdesc, SNAPDIR)) != NULL) { - XLogRecPtr lsn; uint32 hi; uint32 lo; + struct stat st; + XLogRecPtr lsn; + SnapDesc *desc; if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) @@ -167,28 +201,71 @@ get_num_snap_files_lsn_threshold(void) lsn = ((uint64) hi) << 32 | lo; elog(DEBUG5, "found snap file %X/%X", LSN_FORMAT_ARGS(lsn)); - if (lsns_allocated == lsns_num) + + if (fstatat(dirdesc_fd, de->d_name, &st, 0) == -1) + ereport(ERROR, errmsg("failed to get the size of " SNAPDIR "/%s: %m", de->d_name)); + + if (descriptors_allocated == snapshot_index) { - lsns_allocated *= 2; - lsns = repalloc(lsns, sizeof(XLogRecPtr) * lsns_allocated); + descriptors_allocated *= 2; + snapshot_descriptors = repalloc(snapshot_descriptors, sizeof(*snapshot_descriptors) * descriptors_allocated); } - lsns[lsns_num++] = lsn; + + desc = &snapshot_descriptors[snapshot_index++]; + desc->lsn = lsn; + desc->sz = st.st_size; } - /* sort by lsn desc */ - qsort(lsns, lsns_num, sizeof(XLogRecPtr), LsnDescComparator); - /* and take cutoff at logical_replication_max_snap_files */ - if (logical_replication_max_snap_files > lsns_num) - cutoff = 0; - /* have less files than cutoff */ - else + + qsort(snapshot_descriptors, snapshot_index, sizeof(*snapshot_descriptors), SnapDescComparator); + + /* Are there more snapshot files than specified? */ + if (logical_replication_max_snap_files <= snapshot_index) { - cutoff = lsns[logical_replication_max_snap_files - 1]; - elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %d .snap files, limit is %d", - LSN_FORMAT_ARGS(cutoff), lsns_num, logical_replication_max_snap_files); + cutoff = snapshot_descriptors[logical_replication_max_snap_files - 1].lsn; + elog(LOG, + "ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %d snapshot files, limit is %d", + LSN_FORMAT_ARGS(cutoff), snapshot_index, logical_replication_max_snap_files); } - pfree(lsns); + + /* Is the size of the logical snapshots directory larger than specified? + * + * It's possible we could hit both thresholds, so remove any extra files + * first, and then truncate based on size of the remaining files. + */ + if (logical_replication_max_logicalsnapdir_size > logicalsnapdir_size) + { + /* Unfortunately, iterating the directory does not guarantee any order + * so we can't cache an index in the preceding loop. + */ + + off_t sz = 0; + const XLogRecPtr original = cutoff; + + for (size_t i = 0; i < logical_replication_max_snap_files; ++i) + { + sz += snapshot_descriptors[i].sz; + if (sz > logical_replication_max_logicalsnapdir_size) + { + /* TODO: What should we do if i == 0? This would mean that the + * very first snapshot file (based on LSN) is too large, which + * will probably never be the case. + */ + cutoff = snapshot_descriptors[i - 1].lsn; + break; + } + } + + if (cutoff != original) + elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower than %X/%X, " SNAPDIR " is larger than %d KB", + LSN_FORMAT_ARGS(cutoff), logical_replication_max_logicalsnapdir_size / 1000); + } + + pfree(snapshot_descriptors); FreeDir(dirdesc); + return cutoff; + +#undef SNAPDIR } #define LS_MONITOR_CHECK_INTERVAL 10000 /* ms */ @@ -223,7 +300,7 @@ LogicalSlotsMonitorMain(Datum main_arg) * If there are too many .snap files, just drop all logical slots to * prevent aux files bloat. */ - cutoff_lsn = get_num_snap_files_lsn_threshold(); + cutoff_lsn = get_snapshots_cutoff_lsn(); if (cutoff_lsn > 0) { for (int i = 0; i < max_replication_slots; i++)