diff --git a/CHANGELOG.md b/CHANGELOG.md index 318e3648e..f9ab3cb24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.13.2 / 2023-07-21 + +* [BUGFIX] Fix type issues on pg_postmaster metrics #828 +* [BUGFIX] Fix pg_replication collector instantiation #854 +* [BUGFIX] Fix pg_process_idle metrics #855 + ## 0.13.1 / 2023-06-27 * [BUGFIX] Make collectors not fail on null values #823 diff --git a/README.md b/README.md index e3774c85c..88c6c098c 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,26 @@ To use the multi-target functionality, send an http request to the endpoint `/pr To avoid putting sensitive information like username and password in the URL, preconfigured auth modules are supported via the [auth_modules](#auth_modules) section of the config file. auth_modules for DSNs can be used with the `/probe` endpoint by specifying the `?auth_module=foo` http parameter. +Example Prometheus config: +```yaml +scrape_configs: + - job_name: 'postgres' + static_configs: + - targets: + - server1:5432 + - server2:5432 + metrics_path: /probe + params: + auth_module: [foo] + relabel_configs: + - source_labels: [__address__] + target_label: __param_target + - source_labels: [__param_target] + target_label: instance + - target_label: __address__ + replacement: 127.0.0.1:9116 # The postgres exporter's real hostname:port. +``` + ## Configuration File The configuration file controls the behavior of the exporter. It can be set using the `--config.file` command line flag and defaults to `postgres_exporter.yml`. diff --git a/collector/collector_test.go b/collector/collector_test.go index 00c21ed23..18101f00e 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -49,6 +49,7 @@ func readMetric(m prometheus.Metric) MetricResult { func sanitizeQuery(q string) string { q = strings.Join(strings.Fields(q), " ") q = strings.Replace(q, "(", "\\(", -1) + q = strings.Replace(q, "?", "\\?", -1) q = strings.Replace(q, ")", "\\)", -1) q = strings.Replace(q, "[", "\\[", -1) q = strings.Replace(q, "]", "\\]", -1) diff --git a/collector/pg_database_wraparound.go b/collector/pg_database_wraparound.go new file mode 100644 index 000000000..d46270637 --- /dev/null +++ b/collector/pg_database_wraparound.go @@ -0,0 +1,115 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +const databaseWraparoundSubsystem = "database_wraparound" + +func init() { + registerCollector(databaseWraparoundSubsystem, defaultDisabled, NewPGDatabaseWraparoundCollector) +} + +type PGDatabaseWraparoundCollector struct { + log log.Logger +} + +func NewPGDatabaseWraparoundCollector(config collectorConfig) (Collector, error) { + return &PGDatabaseWraparoundCollector{log: config.logger}, nil +} + +var ( + databaseWraparoundAgeDatfrozenxid = prometheus.NewDesc( + prometheus.BuildFQName(namespace, databaseWraparoundSubsystem, "age_datfrozenxid_seconds"), + "Age of the oldest transaction ID that has not been frozen.", + []string{"datname"}, + prometheus.Labels{}, + ) + databaseWraparoundAgeDatminmxid = prometheus.NewDesc( + prometheus.BuildFQName(namespace, databaseWraparoundSubsystem, "age_datminmxid_seconds"), + "Age of the oldest multi-transaction ID that has been replaced with a transaction ID.", + []string{"datname"}, + prometheus.Labels{}, + ) + + databaseWraparoundQuery = ` + SELECT + datname, + age(d.datfrozenxid) as age_datfrozenxid, + mxid_age(d.datminmxid) as age_datminmxid + FROM + pg_catalog.pg_database d + WHERE + d.datallowconn + ` +) + +func (c *PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + databaseWraparoundQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var datname sql.NullString + var ageDatfrozenxid, ageDatminmxid sql.NullFloat64 + + if err := rows.Scan(&datname, &ageDatfrozenxid, &ageDatminmxid); err != nil { + return err + } + + if !datname.Valid { + level.Debug(c.log).Log("msg", "Skipping database with NULL name") + continue + } + if !ageDatfrozenxid.Valid { + level.Debug(c.log).Log("msg", "Skipping stat emission with NULL age_datfrozenxid") + continue + } + if !ageDatminmxid.Valid { + level.Debug(c.log).Log("msg", "Skipping stat emission with NULL age_datminmxid") + continue + } + + ageDatfrozenxidMetric := ageDatfrozenxid.Float64 + + ch <- prometheus.MustNewConstMetric( + databaseWraparoundAgeDatfrozenxid, + prometheus.GaugeValue, + ageDatfrozenxidMetric, datname.String, + ) + + ageDatminmxidMetric := ageDatminmxid.Float64 + ch <- prometheus.MustNewConstMetric( + databaseWraparoundAgeDatminmxid, + prometheus.GaugeValue, + ageDatminmxidMetric, datname.String, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_database_wraparound_test.go b/collector/pg_database_wraparound_test.go new file mode 100644 index 000000000..d0a74c362 --- /dev/null +++ b/collector/pg_database_wraparound_test.go @@ -0,0 +1,64 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGDatabaseWraparoundCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "datname", + "age_datfrozenxid", + "age_datminmxid", + } + rows := sqlmock.NewRows(columns). + AddRow("newreddit", 87126426, 0) + + mock.ExpectQuery(sanitizeQuery(databaseWraparoundQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGDatabaseWraparoundCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGDatabaseWraparoundCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"datname": "newreddit"}, value: 87126426, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"datname": "newreddit"}, value: 0, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_stat_activity_autovacuum.go b/collector/pg_stat_activity_autovacuum.go new file mode 100644 index 000000000..5e2d2d2ca --- /dev/null +++ b/collector/pg_stat_activity_autovacuum.go @@ -0,0 +1,84 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const statActivityAutovacuumSubsystem = "stat_activity_autovacuum" + +func init() { + registerCollector(statActivityAutovacuumSubsystem, defaultDisabled, NewPGStatActivityAutovacuumCollector) +} + +type PGStatActivityAutovacuumCollector struct { + log log.Logger +} + +func NewPGStatActivityAutovacuumCollector(config collectorConfig) (Collector, error) { + return &PGStatActivityAutovacuumCollector{log: config.logger}, nil +} + +var ( + statActivityAutovacuumAgeInSeconds = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statActivityAutovacuumSubsystem, "timestamp_seconds"), + "Start timestamp of the vacuum process in seconds", + []string{"relname"}, + prometheus.Labels{}, + ) + + statActivityAutovacuumQuery = ` + SELECT + SPLIT_PART(query, '.', 2) AS relname, + EXTRACT(xact_start) AS timestamp_seconds + FROM + pg_catalog.pg_stat_activity + WHERE + query LIKE 'autovacuum:%' + ` +) + +func (PGStatActivityAutovacuumCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + statActivityAutovacuumQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var relname string + var ageInSeconds float64 + + if err := rows.Scan(&relname, &ageInSeconds); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + statActivityAutovacuumAgeInSeconds, + prometheus.GaugeValue, + ageInSeconds, relname, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_stat_activity_autovacuum_test.go b/collector/pg_stat_activity_autovacuum_test.go new file mode 100644 index 000000000..a6fcdbcad --- /dev/null +++ b/collector/pg_stat_activity_autovacuum_test.go @@ -0,0 +1,62 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGStatActivityAutovacuumCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "relname", + "timestamp_seconds", + } + rows := sqlmock.NewRows(columns). + AddRow("test", 3600) + + mock.ExpectQuery(sanitizeQuery(statActivityAutovacuumQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatActivityAutovacuumCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatActivityAutovacuumCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"relname": "test"}, value: 3600, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_stat_database.go b/collector/pg_stat_database.go index 8a882f891..382ff7825 100644 --- a/collector/pg_stat_database.go +++ b/collector/pg_stat_database.go @@ -17,6 +17,8 @@ import ( "context" "database/sql" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" ) @@ -26,10 +28,12 @@ func init() { registerCollector(statDatabaseSubsystem, defaultEnabled, NewPGStatDatabaseCollector) } -type PGStatDatabaseCollector struct{} +type PGStatDatabaseCollector struct { + log log.Logger +} func NewPGStatDatabaseCollector(config collectorConfig) (Collector, error) { - return &PGStatDatabaseCollector{}, nil + return &PGStatDatabaseCollector{log: config.logger}, nil } var ( @@ -228,7 +232,7 @@ var ( ` ) -func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c *PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { db := instance.getDB() rows, err := db.QueryContext(ctx, statDatabaseQuery, @@ -267,217 +271,203 @@ func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, c if err != nil { return err } - datidLabel := "unknown" - if datid.Valid { - datidLabel = datid.String + + if !datid.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no datid") + continue } - datnameLabel := "unknown" - if datname.Valid { - datnameLabel = datname.String + if !datname.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no datname") + continue } - - numBackendsMetric := 0.0 - if numBackends.Valid { - numBackendsMetric = numBackends.Float64 + if !numBackends.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no numbackends") + continue + } + if !xactCommit.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no xact_commit") + continue + } + if !xactRollback.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no xact_rollback") + continue + } + if !blksRead.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blks_read") + continue + } + if !blksHit.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blks_hit") + continue + } + if !tupReturned.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_returned") + continue + } + if !tupFetched.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_fetched") + continue + } + if !tupInserted.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_inserted") + continue + } + if !tupUpdated.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_updated") + continue + } + if !tupDeleted.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no tup_deleted") + continue + } + if !conflicts.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no conflicts") + continue + } + if !tempFiles.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no temp_files") + continue + } + if !tempBytes.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no temp_bytes") + continue + } + if !deadlocks.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no deadlocks") + continue + } + if !blkReadTime.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blk_read_time") + continue + } + if !blkWriteTime.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no blk_write_time") + continue + } + if !statsReset.Valid { + level.Debug(c.log).Log("msg", "Skipping collecting metric because it has no stats_reset") + continue } + + labels := []string{datid.String, datname.String} + ch <- prometheus.MustNewConstMetric( statDatabaseNumbackends, prometheus.GaugeValue, - numBackendsMetric, - datidLabel, - datnameLabel, + numBackends.Float64, + labels..., ) - xactCommitMetric := 0.0 - if xactCommit.Valid { - xactCommitMetric = xactCommit.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseXactCommit, prometheus.CounterValue, - xactCommitMetric, - datidLabel, - datnameLabel, + xactCommit.Float64, + labels..., ) - xactRollbackMetric := 0.0 - if xactRollback.Valid { - xactRollbackMetric = xactRollback.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseXactRollback, prometheus.CounterValue, - xactRollbackMetric, - datidLabel, - datnameLabel, + xactRollback.Float64, + labels..., ) - blksReadMetric := 0.0 - if blksRead.Valid { - blksReadMetric = blksRead.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseBlksRead, prometheus.CounterValue, - blksReadMetric, - datidLabel, - datnameLabel, + blksRead.Float64, + labels..., ) - blksHitMetric := 0.0 - if blksHit.Valid { - blksHitMetric = blksHit.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseBlksHit, prometheus.CounterValue, - blksHitMetric, - datidLabel, - datnameLabel, + blksHit.Float64, + labels..., ) - tupReturnedMetric := 0.0 - if tupReturned.Valid { - tupReturnedMetric = tupReturned.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTupReturned, prometheus.CounterValue, - tupReturnedMetric, - datidLabel, - datnameLabel, + tupReturned.Float64, + labels..., ) - tupFetchedMetric := 0.0 - if tupFetched.Valid { - tupFetchedMetric = tupFetched.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTupFetched, prometheus.CounterValue, - tupFetchedMetric, - datidLabel, - datnameLabel, + tupFetched.Float64, + labels..., ) - tupInsertedMetric := 0.0 - if tupInserted.Valid { - tupInsertedMetric = tupInserted.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTupInserted, prometheus.CounterValue, - tupInsertedMetric, - datidLabel, - datnameLabel, + tupInserted.Float64, + labels..., ) - tupUpdatedMetric := 0.0 - if tupUpdated.Valid { - tupUpdatedMetric = tupUpdated.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTupUpdated, prometheus.CounterValue, - tupUpdatedMetric, - datidLabel, - datnameLabel, + tupUpdated.Float64, + labels..., ) - tupDeletedMetric := 0.0 - if tupDeleted.Valid { - tupDeletedMetric = tupDeleted.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTupDeleted, prometheus.CounterValue, - tupDeletedMetric, - datidLabel, - datnameLabel, + tupDeleted.Float64, + labels..., ) - conflictsMetric := 0.0 - if conflicts.Valid { - conflictsMetric = conflicts.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseConflicts, prometheus.CounterValue, - conflictsMetric, - datidLabel, - datnameLabel, + conflicts.Float64, + labels..., ) - tempFilesMetric := 0.0 - if tempFiles.Valid { - tempFilesMetric = tempFiles.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTempFiles, prometheus.CounterValue, - tempFilesMetric, - datidLabel, - datnameLabel, + tempFiles.Float64, + labels..., ) - tempBytesMetric := 0.0 - if tempBytes.Valid { - tempBytesMetric = tempBytes.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseTempBytes, prometheus.CounterValue, - tempBytesMetric, - datidLabel, - datnameLabel, + tempBytes.Float64, + labels..., ) - deadlocksMetric := 0.0 - if deadlocks.Valid { - deadlocksMetric = deadlocks.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseDeadlocks, prometheus.CounterValue, - deadlocksMetric, - datidLabel, - datnameLabel, + deadlocks.Float64, + labels..., ) - blkReadTimeMetric := 0.0 - if blkReadTime.Valid { - blkReadTimeMetric = blkReadTime.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseBlkReadTime, prometheus.CounterValue, - blkReadTimeMetric, - datidLabel, - datnameLabel, + blkReadTime.Float64, + labels..., ) - blkWriteTimeMetric := 0.0 - if blkWriteTime.Valid { - blkWriteTimeMetric = blkWriteTime.Float64 - } ch <- prometheus.MustNewConstMetric( statDatabaseBlkWriteTime, prometheus.CounterValue, - blkWriteTimeMetric, - datidLabel, - datnameLabel, + blkWriteTime.Float64, + labels..., ) - statsResetMetric := 0.0 - if statsReset.Valid { - statsResetMetric = float64(statsReset.Time.Unix()) - } ch <- prometheus.MustNewConstMetric( statDatabaseStatsReset, prometheus.CounterValue, - statsResetMetric, - datidLabel, - datnameLabel, + float64(statsReset.Time.Unix()), + labels..., ) } return nil diff --git a/collector/pg_stat_database_test.go b/collector/pg_stat_database_test.go index 1fe92eedc..70c73eb5b 100644 --- a/collector/pg_stat_database_test.go +++ b/collector/pg_stat_database_test.go @@ -18,6 +18,7 @@ import ( "time" "github.com/DATA-DOG/go-sqlmock" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/smartystreets/goconvey/convey" @@ -86,7 +87,9 @@ func TestPGStatDatabaseCollector(t *testing.T) { ch := make(chan prometheus.Metric) go func() { defer close(ch) - c := PGStatDatabaseCollector{} + c := PGStatDatabaseCollector{ + log: log.With(log.NewNopLogger(), "collector", "pg_stat_database"), + } if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatDatabaseCollector.Update: %s", err) @@ -131,6 +134,10 @@ func TestPGStatDatabaseCollectorNullValues(t *testing.T) { } defer db.Close() + srT, err := time.Parse("2006-01-02 15:04:05.00000-07", "2023-05-25 17:10:42.81132-07") + if err != nil { + t.Fatalf("Error parsing time: %s", err) + } inst := &instance{db: db} columns := []string{ @@ -158,31 +165,52 @@ func TestPGStatDatabaseCollectorNullValues(t *testing.T) { rows := sqlmock.NewRows(columns). AddRow( nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - nil, - ) + "postgres", + 354, + 4945, + 289097744, + 1242257, + int64(3275602074), + 89320867, + 450139, + 2034563757, + 0, + int64(2725688749), + 23, + 52, + 74, + 925, + 16, + 823, + srT). + AddRow( + "pid", + "postgres", + 354, + 4945, + 289097744, + 1242257, + int64(3275602074), + 89320867, + 450139, + 2034563757, + 0, + int64(2725688749), + 23, + 52, + 74, + 925, + 16, + 823, + srT) mock.ExpectQuery(sanitizeQuery(statDatabaseQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { defer close(ch) - c := PGStatDatabaseCollector{} + c := PGStatDatabaseCollector{ + log: log.With(log.NewNopLogger(), "collector", "pg_stat_database"), + } if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatDatabaseCollector.Update: %s", err) @@ -190,23 +218,23 @@ func TestPGStatDatabaseCollectorNullValues(t *testing.T) { }() expected := []MetricResult{ - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_GAUGE, value: 354}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 4945}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 289097744}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1242257}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 3275602074}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 89320867}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 450139}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2034563757}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2725688749}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 23}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 52}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 74}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 925}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 16}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 823}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842}, } convey.Convey("Metrics comparison", t, func() { @@ -296,14 +324,35 @@ func TestPGStatDatabaseCollectorRowLeakTest(t *testing.T) { nil, nil, nil, - ) - + ). + AddRow( + "pid", + "postgres", + 355, + 4946, + 289097745, + 1242258, + int64(3275602075), + 89320868, + 450140, + 2034563758, + 1, + int64(2725688750), + 24, + 53, + 75, + 926, + 17, + 824, + srT) mock.ExpectQuery(sanitizeQuery(statDatabaseQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { defer close(ch) - c := PGStatDatabaseCollector{} + c := PGStatDatabaseCollector{ + log: log.With(log.NewNopLogger(), "collector", "pg_stat_database"), + } if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatDatabaseCollector.Update: %s", err) @@ -328,23 +377,23 @@ func TestPGStatDatabaseCollectorRowLeakTest(t *testing.T) { {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 16}, {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 823}, {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_GAUGE, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, - {labels: labelMap{"datid": "unknown", "datname": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_GAUGE, value: 355}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 4946}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 289097745}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1242258}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 3275602075}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 89320868}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 450140}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2034563758}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 2725688750}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 24}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 53}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 75}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 926}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 17}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 824}, + {labels: labelMap{"datid": "pid", "datname": "postgres"}, metricType: dto.MetricType_COUNTER, value: 1685059842}, } convey.Convey("Metrics comparison", t, func() { diff --git a/collector/pg_stat_statements.go b/collector/pg_stat_statements.go index bbfee1a2b..c03e78b92 100644 --- a/collector/pg_stat_statements.go +++ b/collector/pg_stat_statements.go @@ -17,6 +17,7 @@ import ( "context" "database/sql" + "github.com/blang/semver/v4" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" ) @@ -90,12 +91,37 @@ var ( ) ORDER BY seconds_total DESC LIMIT 100;` + + pgStatStatementsNewQuery = `SELECT + pg_get_userbyid(userid) as user, + pg_database.datname, + pg_stat_statements.queryid, + pg_stat_statements.calls as calls_total, + pg_stat_statements.total_exec_time / 1000.0 as seconds_total, + pg_stat_statements.rows as rows_total, + pg_stat_statements.blk_read_time / 1000.0 as block_read_seconds_total, + pg_stat_statements.blk_write_time / 1000.0 as block_write_seconds_total + FROM pg_stat_statements + JOIN pg_database + ON pg_database.oid = pg_stat_statements.dbid + WHERE + total_exec_time > ( + SELECT percentile_cont(0.1) + WITHIN GROUP (ORDER BY total_exec_time) + FROM pg_stat_statements + ) + ORDER BY seconds_total DESC + LIMIT 100;` ) func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + query := pgStatStatementsQuery + if instance.version.GE(semver.MustParse("13.0.0")) { + query = pgStatStatementsNewQuery + } + db := instance.getDB() - rows, err := db.QueryContext(ctx, - pgStatStatementsQuery) + rows, err := db.QueryContext(ctx, query) if err != nil { return err diff --git a/collector/pg_stat_statements_test.go b/collector/pg_stat_statements_test.go index c4f89a60f..08aba34c2 100644 --- a/collector/pg_stat_statements_test.go +++ b/collector/pg_stat_statements_test.go @@ -17,6 +17,7 @@ import ( "testing" "github.com/DATA-DOG/go-sqlmock" + "github.com/blang/semver/v4" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/smartystreets/goconvey/convey" @@ -29,7 +30,7 @@ func TestPGStateStatementsCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &instance{db: db, version: semver.MustParse("12.0.0")} columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). @@ -72,12 +73,12 @@ func TestPGStateStatementsCollectorNull(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). AddRow(nil, nil, nil, nil, nil, nil, nil, nil) - mock.ExpectQuery(sanitizeQuery(pgStatStatementsQuery)).WillReturnRows(rows) + mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) go func() { @@ -107,3 +108,46 @@ func TestPGStateStatementsCollectorNull(t *testing.T) { t.Errorf("there were unfulfilled exceptions: %s", err) } } + +func TestPGStateStatementsCollectorNewPG(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db, version: semver.MustParse("13.3.7")} + + columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} + rows := sqlmock.NewRows(columns). + AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) + mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatStatementsCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1}, + {labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go new file mode 100644 index 000000000..3134c025b --- /dev/null +++ b/collector/pg_stat_walreceiver.go @@ -0,0 +1,269 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "database/sql" + "fmt" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + registerCollector(statWalReceiverSubsystem, defaultDisabled, NewPGStatWalReceiverCollector) +} + +type PGStatWalReceiverCollector struct { + log log.Logger +} + +const statWalReceiverSubsystem = "stat_wal_receiver" + +func NewPGStatWalReceiverCollector(config collectorConfig) (Collector, error) { + return &PGStatWalReceiverCollector{log: config.logger}, nil +} + +var ( + labelCats = []string{"upstream_host", "slot_name", "status"} + statWalReceiverReceiveStartLsn = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_lsn"), + "First write-ahead log location used when WAL receiver is started represented as a decimal", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverReceiveStartTli = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_tli"), + "First timeline number used when WAL receiver is started", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverFlushedLSN = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "flushed_lsn"), + "Last write-ahead log location already received and flushed to disk, the initial value of this field being the first log location used when WAL receiver is started represented as a decimal", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverReceivedTli = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "received_tli"), + "Timeline number of last write-ahead log location received and flushed to disk", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverLastMsgSendTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_send_time"), + "Send time of last message received from origin WAL sender", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverLastMsgReceiptTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_receipt_time"), + "Send time of last message received from origin WAL sender", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverLatestEndLsn = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_lsn"), + "Last write-ahead log location reported to origin WAL sender as integer", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverLatestEndTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_time"), + "Time of last write-ahead log location reported to origin WAL sender", + labelCats, + prometheus.Labels{}, + ) + statWalReceiverUpstreamNode = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "upstream_node"), + "Node ID of the upstream node", + labelCats, + prometheus.Labels{}, + ) + + pgStatWalColumnQuery = ` + SELECT + column_name + FROM information_schema.columns + WHERE + table_name = 'pg_stat_wal_receiver' and + column_name = 'flushed_lsn' + ` + + pgStatWalReceiverQueryTemplate = ` + SELECT + trim(both '''' from substring(conninfo from 'host=([^ ]*)')) as upstream_host, + slot_name, + status, + (receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn, + %s +receive_start_tli, + received_tli, + extract(epoch from last_msg_send_time) as last_msg_send_time, + extract(epoch from last_msg_receipt_time) as last_msg_receipt_time, + (latest_end_lsn - '0/0') % (2^52)::bigint as latest_end_lsn, + extract(epoch from latest_end_time) as latest_end_time, + substring(slot_name from 'repmgr_slot_([0-9]*)') as upstream_node + FROM pg_catalog.pg_stat_wal_receiver + ` +) + +func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + hasFlushedLSNRows, err := db.QueryContext(ctx, pgStatWalColumnQuery) + if err != nil { + return err + } + + defer hasFlushedLSNRows.Close() + hasFlushedLSN := hasFlushedLSNRows.Next() + var query string + if hasFlushedLSN { + query = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "(flushed_lsn - '0/0') % (2^52)::bigint as flushed_lsn,\n") + } else { + query = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "") + } + rows, err := db.QueryContext(ctx, query) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var upstreamHost, slotName, status sql.NullString + var receiveStartLsn, receiveStartTli, flushedLsn, receivedTli, latestEndLsn, upstreamNode sql.NullInt64 + var lastMsgSendTime, lastMsgReceiptTime, latestEndTime sql.NullFloat64 + + if hasFlushedLSN { + if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &flushedLsn, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil { + return err + } + } else { + if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil { + return err + } + } + if !upstreamHost.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream host is null") + continue + } + + if !slotName.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because slotname host is null") + continue + } + + if !status.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because status is null") + continue + } + labels := []string{upstreamHost.String, slotName.String, status.String} + + if !receiveStartLsn.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_lsn is null") + continue + } + if !receiveStartTli.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_tli is null") + continue + } + if hasFlushedLSN && !flushedLsn.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because flushed_lsn is null") + continue + } + if !receivedTli.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because received_tli is null") + continue + } + if !lastMsgSendTime.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_send_time is null") + continue + } + if !lastMsgReceiptTime.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_receipt_time is null") + continue + } + if !latestEndLsn.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_lsn is null") + continue + } + if !latestEndTime.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_time is null") + continue + } + if !upstreamNode.Valid { + level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream_node is null") + continue + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceiveStartLsn, + prometheus.CounterValue, + float64(receiveStartLsn.Int64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceiveStartTli, + prometheus.GaugeValue, + float64(receiveStartTli.Int64), + labels...) + + if hasFlushedLSN { + ch <- prometheus.MustNewConstMetric( + statWalReceiverFlushedLSN, + prometheus.CounterValue, + float64(flushedLsn.Int64), + labels...) + } + + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceivedTli, + prometheus.GaugeValue, + float64(receivedTli.Int64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverLastMsgSendTime, + prometheus.CounterValue, + float64(lastMsgSendTime.Float64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverLastMsgReceiptTime, + prometheus.CounterValue, + float64(lastMsgReceiptTime.Float64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverLatestEndLsn, + prometheus.CounterValue, + float64(latestEndLsn.Int64), + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverLatestEndTime, + prometheus.CounterValue, + latestEndTime.Float64, + labels...) + + ch <- prometheus.MustNewConstMetric( + statWalReceiverUpstreamNode, + prometheus.GaugeValue, + float64(upstreamNode.Int64), + labels...) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_stat_walreceiver_test.go b/collector/pg_stat_walreceiver_test.go new file mode 100644 index 000000000..3e2418b25 --- /dev/null +++ b/collector/pg_stat_walreceiver_test.go @@ -0,0 +1,186 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "fmt" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +var queryWithFlushedLSN = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "(flushed_lsn - '0/0') % (2^52)::bigint as flushed_lsn,\n") +var queryWithNoFlushedLSN = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "") + +func TestPGStatWalReceiverCollectorWithFlushedLSN(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + infoSchemaColumns := []string{ + "column_name", + } + + infoSchemaRows := sqlmock.NewRows(infoSchemaColumns). + AddRow( + "flushed_lsn", + ) + + mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows) + + columns := []string{ + "upstream_host", + "slot_name", + "status", + "receive_start_lsn", + "receive_start_tli", + "flushed_lsn", + "received_tli", + "last_msg_send_time", + "last_msg_receipt_time", + "latest_end_lsn", + "latest_end_time", + "upstream_node", + } + rows := sqlmock.NewRows(columns). + AddRow( + "foo", + "bar", + "stopping", + 1200668684563608, + 1687321285, + 1200668684563609, + 1687321280, + 1687321275, + 1687321276, + 1200668684563610, + 1687321277, + 5, + ) + + mock.ExpectQuery(sanitizeQuery(queryWithFlushedLSN)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWalReceiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563609, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "stopping"}, value: 5, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } + +} + +func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + infoSchemaColumns := []string{ + "column_name", + } + + infoSchemaRows := sqlmock.NewRows(infoSchemaColumns) + + mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows) + + columns := []string{ + "upstream_host", + "slot_name", + "status", + "receive_start_lsn", + "receive_start_tli", + "received_tli", + "last_msg_send_time", + "last_msg_receipt_time", + "latest_end_lsn", + "latest_end_time", + "upstream_node", + } + rows := sqlmock.NewRows(columns). + AddRow( + "foo", + "bar", + "starting", + 1200668684563608, + 1687321285, + 1687321280, + 1687321275, + 1687321276, + 1200668684563610, + 1687321277, + 5, + ) + mock.ExpectQuery(sanitizeQuery(queryWithNoFlushedLSN)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWalReceiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar", "status": "starting"}, value: 5, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } + +} diff --git a/collector/pg_xlog_location.go b/collector/pg_xlog_location.go new file mode 100644 index 000000000..92ac44acb --- /dev/null +++ b/collector/pg_xlog_location.go @@ -0,0 +1,80 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const xlogLocationSubsystem = "xlog_location" + +func init() { + registerCollector(xlogLocationSubsystem, defaultDisabled, NewPGXlogLocationCollector) +} + +type PGXlogLocationCollector struct { + log log.Logger +} + +func NewPGXlogLocationCollector(config collectorConfig) (Collector, error) { + return &PGXlogLocationCollector{log: config.logger}, nil +} + +var ( + xlogLocationBytes = prometheus.NewDesc( + prometheus.BuildFQName(namespace, xlogLocationSubsystem, "bytes"), + "Postgres LSN (log sequence number) being generated on primary or replayed on replica (truncated to low 52 bits)", + []string{}, + prometheus.Labels{}, + ) + + xlogLocationQuery = ` + SELECT CASE + WHEN pg_is_in_recovery() THEN (pg_last_xlog_replay_location() - '0/0') % (2^52)::bigint + ELSE (pg_current_xlog_location() - '0/0') % (2^52)::bigint + END AS bytes + ` +) + +func (PGXlogLocationCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + xlogLocationQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var bytes float64 + + if err := rows.Scan(&bytes); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + xlogLocationBytes, + prometheus.GaugeValue, + bytes, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_xlog_location_test.go b/collector/pg_xlog_location_test.go new file mode 100644 index 000000000..561a7df94 --- /dev/null +++ b/collector/pg_xlog_location_test.go @@ -0,0 +1,61 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGXlogLocationCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "bytes", + } + rows := sqlmock.NewRows(columns). + AddRow(53401) + + mock.ExpectQuery(sanitizeQuery(xlogLocationQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGXlogLocationCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGXlogLocationCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{}, value: 53401, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +}