diff --git a/collector/pg_replication_slot.go b/collector/pg_replication_slot.go index 9f250ef96..a3c5c4ac7 100644 --- a/collector/pg_replication_slot.go +++ b/collector/pg_replication_slot.go @@ -72,6 +72,15 @@ var ( "number of bytes that can be written to WAL such that this slot is not in danger of getting in state lost", []string{"slot_name", "slot_type"}, nil, ) + pgReplicationSlotWalStatus = prometheus.NewDesc( + prometheus.BuildFQName( + namespace, + replicationSlotSubsystem, + "wal_status", + ), + "availability of WAL files claimed by this slot", + []string{"slot_name", "slot_type"}, nil, + ) pgReplicationSlotQuery = `SELECT slot_name, @@ -83,7 +92,8 @@ var ( END AS current_wal_lsn, COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn, active, - safe_wal_size + safe_wal_size, + wal_status FROM pg_replication_slots;` ) @@ -103,7 +113,8 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance var flushLSN sql.NullFloat64 var isActive sql.NullBool var safeWalSize sql.NullInt64 - if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize); err != nil { + var walStatus sql.NullString + if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize, &walStatus); err != nil { return err } @@ -149,6 +160,21 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance prometheus.GaugeValue, float64(safeWalSize.Int64), slotNameLabel, slotTypeLabel, ) } + + if walStatus.Valid { + // See https://www.postgresql.org/docs/14/view-pg-replication-slots.html + walStatusMap := map[string]int{ + "reserved": 0, // reserved means that the claimed files are within max_wal_size. + "extended": 1, // extended means that max_wal_size is exceeded but the files are still retained, either by the replication slot or by wal_keep_size. + "unreserved": 2, // unreserved means that the slot no longer retains the required WAL files and some of them are to be removed at the next checkpoint. This state can return to reserved or extended. + "lost": 3, // lost means that some required WAL files have been removed and this slot is no longer usable. + } + + ch <- prometheus.MustNewConstMetric( + pgReplicationSlotWalStatus, + prometheus.GaugeValue, float64(walStatusMap[walStatus.String]), slotNameLabel, slotTypeLabel, + ) + } } return rows.Err() } diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index ec0bee135..4d137bf96 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -31,9 +31,9 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 5, 3, true, 323906992) + AddRow("test_slot", "physical", 5, 3, true, 323906992, "reserved") mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -51,6 +51,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 323906992, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -73,9 +74,9 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 6, 12, false, -4000) + AddRow("test_slot", "physical", 6, 12, false, -4000, "extended") mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -92,6 +93,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: -4000, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -115,9 +117,9 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 6, 12, nil, nil) + AddRow("test_slot", "physical", 6, 12, nil, nil, "lost") mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -133,6 +135,7 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { expected := []MetricResult{ {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -155,9 +158,9 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow(nil, nil, nil, nil, true, nil) + AddRow(nil, nil, nil, nil, true, nil, nil) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric)