Skip to content

Commit

Permalink
Correct WAL metrics registrations
Browse files Browse the repository at this point in the history
  • Loading branch information
banks committed Feb 23, 2023
1 parent 0c66bbf commit 1180908
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 7 deletions.
4 changes: 2 additions & 2 deletions agent/consul/server_log_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ func makeLogVerifyReportFn(logger hclog.Logger) verifier.ReportFn {
if r.WrittenSum > 0 && r.WrittenSum != r.ExpectedSum {
// The failure occurred before the follower wrote to the log so it
// must be corrupted in flight from the leader!
l2.Info("verification checksum FAILED: in-flight corruption",
l2.Error("verification checksum FAILED: in-flight corruption",
"followerWriteChecksum", fmt.Sprintf("%08x", r.WrittenSum),
"readChecksum", fmt.Sprintf("%08x", r.ReadSum),
)
} else {
l2.Info("verification checksum FAILED: storage corruption",
l2.Error("verification checksum FAILED: storage corruption",
"followerWriteChecksum", fmt.Sprintf("%08x", r.WrittenSum),
"readChecksum", fmt.Sprintf("%08x", r.ReadSum),
)
Expand Down
190 changes: 190 additions & 0 deletions agent/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,193 @@ func TestHTTPHandlers_AgentMetrics_CACertExpiry_Prometheus(t *testing.T) {
})

}

func TestHTTPHandlers_AgentMetrics_WAL_Prometheus(t *testing.T) {
skipIfShortTesting(t)
// This test cannot use t.Parallel() since we modify global state, ie the global metrics instance

t.Run("client agent emits nothing", func(t *testing.T) {
hcl := `
server = false
telemetry = {
prometheus_retention_time = "5s",
disable_hostname = true
metrics_prefix = "agent_4"
}
raft_logstore {
backend = "wal"
}
bootstrap = false
`

a := StartTestAgent(t, TestAgent{HCL: hcl})
defer a.Shutdown()

respRec := httptest.NewRecorder()
recordPromMetrics(t, a, respRec)

require.NotContains(t, respRec.Body.String(), "agent_4_raft_wal")
})

t.Run("server with WAL enabled emits WAL metrics", func(t *testing.T) {
hcl := `
server = true
bootstrap = true
telemetry = {
prometheus_retention_time = "5s",
disable_hostname = true
metrics_prefix = "agent_5"
}
connect {
enabled = true
}
raft_logstore {
backend = "wal"
}
`

a := StartTestAgent(t, TestAgent{HCL: hcl})
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")

respRec := httptest.NewRecorder()
recordPromMetrics(t, a, respRec)

out := respRec.Body.String()
require.Contains(t, out, "agent_5_raft_wal_head_truncations")
require.Contains(t, out, "agent_5_raft_wal_last_segment_age_seconds")
require.Contains(t, out, "agent_5_raft_wal_log_appends")
require.Contains(t, out, "agent_5_raft_wal_log_entries_read")
require.Contains(t, out, "agent_5_raft_wal_log_entries_written")
require.Contains(t, out, "agent_5_raft_wal_log_entry_bytes_read")
require.Contains(t, out, "agent_5_raft_wal_log_entry_bytes_written")
require.Contains(t, out, "agent_5_raft_wal_segment_rotations")
require.Contains(t, out, "agent_5_raft_wal_stable_gets")
require.Contains(t, out, "agent_5_raft_wal_stable_sets")
require.Contains(t, out, "agent_5_raft_wal_tail_truncations")
})

t.Run("server without WAL enabled emits no WAL metrics", func(t *testing.T) {
hcl := `
server = true
bootstrap = true
telemetry = {
prometheus_retention_time = "5s",
disable_hostname = true
metrics_prefix = "agent_6"
}
connect {
enabled = true
}
raft_logstore {
backend = "boltdb"
}
`

a := StartTestAgent(t, TestAgent{HCL: hcl})
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")

respRec := httptest.NewRecorder()
recordPromMetrics(t, a, respRec)

require.NotContains(t, respRec.Body.String(), "agent_6_raft_wal")
})

}

func TestHTTPHandlers_AgentMetrics_LogVerifier_Prometheus(t *testing.T) {
skipIfShortTesting(t)
// This test cannot use t.Parallel() since we modify global state, ie the global metrics instance

t.Run("client agent emits nothing", func(t *testing.T) {
hcl := `
server = false
telemetry = {
prometheus_retention_time = "5s",
disable_hostname = true
metrics_prefix = "agent_4"
}
raft_logstore {
verification {
enabled = true
interval = "1s"
}
}
bootstrap = false
`

a := StartTestAgent(t, TestAgent{HCL: hcl})
defer a.Shutdown()

respRec := httptest.NewRecorder()
recordPromMetrics(t, a, respRec)

require.NotContains(t, respRec.Body.String(), "agent_4_raft_logstore_verifier")
})

t.Run("server with verifier enabled emits all metrics", func(t *testing.T) {
hcl := `
server = true
bootstrap = true
telemetry = {
prometheus_retention_time = "5s",
disable_hostname = true
metrics_prefix = "agent_5"
}
connect {
enabled = true
}
raft_logstore {
verification {
enabled = true
interval = "1s"
}
}
`

a := StartTestAgent(t, TestAgent{HCL: hcl})
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")

respRec := httptest.NewRecorder()
recordPromMetrics(t, a, respRec)

out := respRec.Body.String()
require.Contains(t, out, "agent_5_raft_logstore_verifier_checkpoints_written")
require.Contains(t, out, "agent_5_raft_logstore_verifier_dropped_reports")
require.Contains(t, out, "agent_5_raft_logstore_verifier_ranges_verified")
require.Contains(t, out, "agent_5_raft_logstore_verifier_read_checksum_failures")
require.Contains(t, out, "agent_5_raft_logstore_verifier_write_checksum_failures")
})

t.Run("server with verifier disabled emits no extra metrics", func(t *testing.T) {
hcl := `
server = true
bootstrap = true
telemetry = {
prometheus_retention_time = "5s",
disable_hostname = true
metrics_prefix = "agent_6"
}
connect {
enabled = true
}
raft_logstore {
verification {
enabled = false
}
}
`

a := StartTestAgent(t, TestAgent{HCL: hcl})
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")

respRec := httptest.NewRecorder()
recordPromMetrics(t, a, respRec)

require.NotContains(t, respRec.Body.String(), "agent_6_raft_logstore_verifier")
})

}
61 changes: 56 additions & 5 deletions agent/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
wal "github.com/hashicorp/raft-wal"
"github.com/hashicorp/raft-wal/verifier"
"google.golang.org/grpc/grpclog"

autoconf "github.com/hashicorp/consul/agent/auto-config"
Expand Down Expand Up @@ -89,7 +91,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl
}

isServer := result.RuntimeConfig.ServerMode
gauges, counters, summaries := getPrometheusDefs(cfg.Telemetry, isServer)
gauges, counters, summaries := getPrometheusDefs(cfg, isServer)
cfg.Telemetry.PrometheusOpts.GaugeDefinitions = gauges
cfg.Telemetry.PrometheusOpts.CounterDefinitions = counters
cfg.Telemetry.PrometheusOpts.SummaryDefinitions = summaries
Expand Down Expand Up @@ -226,7 +228,7 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil

// getPrometheusDefs reaches into every slice of prometheus defs we've defined in each part of the agent, and appends
// all of our slices into one nice slice of definitions per metric type for the Consul agent to pass to go-metrics.
func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.GaugeDefinition, []prometheus.CounterDefinition, []prometheus.SummaryDefinition) {
func getPrometheusDefs(cfg *config.RuntimeConfig, isServer bool) ([]prometheus.GaugeDefinition, []prometheus.CounterDefinition, []prometheus.SummaryDefinition) {
// TODO: "raft..." metrics come from the raft lib and we should migrate these to a telemetry
// package within. In the mean time, we're going to define a few here because they're key to monitoring Consul.
raftGauges := []prometheus.GaugeDefinition{
Expand Down Expand Up @@ -272,6 +274,29 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
)
}

if isServer && cfg.RaftLogStoreConfig.Verification.Enabled {
verifierGauges := make([]prometheus.GaugeDefinition, 0)
for _, d := range verifier.MetricDefinitions.Gauges {
verifierGauges = append(verifierGauges, prometheus.GaugeDefinition{
Name: []string{"raft", "logstore", "verifier", d.Name},
Help: d.Desc,
})
}
gauges = append(gauges, verifierGauges)
}

if isServer && cfg.RaftLogStoreConfig.Backend == consul.LogStoreBackendWAL {

walGauges := make([]prometheus.GaugeDefinition, 0)
for _, d := range wal.MetricDefinitions.Gauges {
walGauges = append(walGauges, prometheus.GaugeDefinition{
Name: []string{"raft", "wal", d.Name},
Help: d.Desc,
})
}
gauges = append(gauges, walGauges)
}

// Flatten definitions
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique?
var gaugeDefs []prometheus.GaugeDefinition
Expand All @@ -280,7 +305,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
// TODO(kit): Prepending the service to each definition should be handled by go-metrics
var withService []prometheus.GaugeDefinition
for _, gauge := range g {
gauge.Name = append([]string{cfg.MetricsPrefix}, gauge.Name...)
gauge.Name = append([]string{cfg.Telemetry.MetricsPrefix}, gauge.Name...)
withService = append(withService, gauge)
}
gaugeDefs = append(gaugeDefs, withService...)
Expand Down Expand Up @@ -316,14 +341,40 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
raftCounters,
rate.Counters,
}

// For some unknown reason, we seem to add the raft counters above without
// checking if this is a server like we do above for some of the summaries
// above. We should probably fix that but I want to not change behavior right
// now. If we are a server, add summaries for WAL and verifier metrics.
if isServer && cfg.RaftLogStoreConfig.Verification.Enabled {
verifierCounters := make([]prometheus.CounterDefinition, 0)
for _, d := range verifier.MetricDefinitions.Counters {
verifierCounters = append(verifierCounters, prometheus.CounterDefinition{
Name: []string{"raft", "logstore", "verifier", d.Name},
Help: d.Desc,
})
}
counters = append(counters, verifierCounters)
}
if isServer && cfg.RaftLogStoreConfig.Backend == consul.LogStoreBackendWAL {
walCounters := make([]prometheus.CounterDefinition, 0)
for _, d := range wal.MetricDefinitions.Counters {
walCounters = append(walCounters, prometheus.CounterDefinition{
Name: []string{"raft", "wal", d.Name},
Help: d.Desc,
})
}
counters = append(counters, walCounters)
}

// Flatten definitions
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique?
var counterDefs []prometheus.CounterDefinition
for _, c := range counters {
// TODO(kit): Prepending the service to each definition should be handled by go-metrics
var withService []prometheus.CounterDefinition
for _, counter := range c {
counter.Name = append([]string{cfg.MetricsPrefix}, counter.Name...)
counter.Name = append([]string{cfg.Telemetry.MetricsPrefix}, counter.Name...)
withService = append(withService, counter)
}
counterDefs = append(counterDefs, withService...)
Expand Down Expand Up @@ -377,7 +428,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
// TODO(kit): Prepending the service to each definition should be handled by go-metrics
var withService []prometheus.SummaryDefinition
for _, summary := range s {
summary.Name = append([]string{cfg.MetricsPrefix}, summary.Name...)
summary.Name = append([]string{cfg.Telemetry.MetricsPrefix}, summary.Name...)
withService = append(withService, summary)
}
summaryDefs = append(summaryDefs, withService...)
Expand Down

0 comments on commit 1180908

Please sign in to comment.