Skip to content

Commit

Permalink
Merge pull request #276 from hashicorp/jm-member-list-size-metric
Browse files Browse the repository at this point in the history
Emit metrics for local memberlist size and remote memberlist size
  • Loading branch information
jmurret authored Sep 14, 2022
2 parents e892776 + e1b2314 commit 9c88db2
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 6 deletions.
7 changes: 1 addition & 6 deletions memberlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,8 @@ func TestCreate_checkBroadcastQueueMetrics(t *testing.T) {

time.Sleep(3 * time.Second)

intv := getIntervalMetrics(t, sink)
sampleName := "consul.usage.test.memberlist.queue.broadcasts"
actualSample := intv.Samples[sampleName]

if actualSample.Count == 0 {
t.Fatalf("%s sample not taken", sampleName)
}
verifySampleExists(t, sampleName, sink)
}

func TestCreate_keyringOnly(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions net.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,22 @@ func (m *Memberlist) sendLocalState(conn net.Conn, join bool, streamLabel string
}
m.nodeLock.RUnlock()

nodeStateCounts := make(map[string]int)
nodeStateCounts[StateAlive.metricsString()] = 0
nodeStateCounts[StateLeft.metricsString()] = 0
nodeStateCounts[StateDead.metricsString()] = 0
nodeStateCounts[StateSuspect.metricsString()] = 0

for _, n := range localNodes {
nodeStateCounts[n.State.metricsString()]++
}

for nodeState, cnt := range nodeStateCounts {
metrics.SetGaugeWithLabels([]string{"memberlist", "node", "instances"},
float32(cnt),
append(m.metricLabels, metrics.Label{Name: "node_state", Value: nodeState}))
}

// Get the delegate state
var userData []byte
if m.config.Delegate != nil {
Expand Down Expand Up @@ -1042,6 +1058,9 @@ func (m *Memberlist) sendLocalState(conn net.Conn, join bool, streamLabel string
}
}

moreBytes := binary.BigEndian.Uint32(bufConn.Bytes()[1:5])
metrics.SetGaugeWithLabels([]string{"memberlist", "size", "local"}, float32(moreBytes), m.metricLabels)

// Get the send buffer
return m.rawSendMsgStream(conn, bufConn.Bytes(), streamLabel)
}
Expand Down Expand Up @@ -1088,6 +1107,8 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader, streamLabel string) (
// Ensure we aren't asked to download too much. This is to guard against
// an attack vector where a huge amount of state is sent
moreBytes := binary.BigEndian.Uint32(cipherText.Bytes()[1:5])
metrics.AddSampleWithLabels([]string{"memberlist", "size", "remote"}, float32(moreBytes), m.metricLabels)

if moreBytes > maxPushStateBytes {
return nil, fmt.Errorf("Remote node state is larger than limit (%d)", moreBytes)

Expand Down
2 changes: 2 additions & 0 deletions net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ func TestEncryptDecryptState(t *testing.T) {
SecretKey: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
ProtocolVersion: ProtocolVersionMax,
}
sink := registerInMemorySink(t)

m, err := Create(config)
if err != nil {
Expand All @@ -710,6 +711,7 @@ func TestEncryptDecryptState(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
verifySampleExists(t, "consul.usage.test.memberlist.size.remote", sink)

if !reflect.DeepEqual(state, plain) {
t.Fatalf("Decrypt failed: %v", plain)
Expand Down
15 changes: 15 additions & 0 deletions state.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@ import (

type NodeStateType int

func (t NodeStateType) metricsString() string {
switch t {
case StateAlive:
return "alive"
case StateDead:
return "dead"
case StateSuspect:
return "suspect"
case StateLeft:
return "left"
default:
return fmt.Sprintf("unhandled-value-%d", t)
}
}

const (
StateAlive NodeStateType = iota
StateSuspect
Expand Down
30 changes: 30 additions & 0 deletions state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2239,6 +2239,8 @@ func TestMemberlist_PushPull(t *testing.T) {
ip1 := []byte(addr1)
ip2 := []byte(addr2)

sink := registerInMemorySink(t)

ch := make(chan NodeEvent, 3)

m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
Expand Down Expand Up @@ -2270,6 +2272,13 @@ func TestMemberlist_PushPull(t *testing.T) {
if len(ch) < 2 {
failf("expected 2 messages from pushPull")
}

instancesMetricName := "consul.usage.test.memberlist.node.instances"
verifyGaugeExists(t, "consul.usage.test.memberlist.size.local", sink)
verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateAlive.metricsString()), sink)
verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateDead.metricsString()), sink)
verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateLeft.metricsString()), sink)
verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateSuspect.metricsString()), sink)
})
}

Expand Down Expand Up @@ -2412,3 +2421,24 @@ func getIntervalMetrics(t *testing.T, sink *metrics.InmemSink) *metrics.Interval
intv := intervals[0]
return intv
}

func verifyGaugeExists(t *testing.T, name string, sink *metrics.InmemSink) {
t.Helper()
interval := getIntervalMetrics(t, sink)
interval.RLock()
defer interval.RUnlock()
if _, ok := interval.Gauges[name]; !ok {
t.Fatalf("%s gauge not emmited", name)
}
}

func verifySampleExists(t *testing.T, name string, sink *metrics.InmemSink) {
t.Helper()
interval := getIntervalMetrics(t, sink)
interval.RLock()
defer interval.RUnlock()

if _, ok := interval.Samples[name]; !ok {
t.Fatalf("%s sample not emmited", name)
}
}

0 comments on commit 9c88db2

Please sign in to comment.