Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose metrics using go-metrics (#683) #688

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 74 additions & 2 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/rcrowley/go-metrics"
)

// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
Expand All @@ -26,6 +28,19 @@ type Broker struct {

responses chan responsePromise
done chan bool

incomingByteRate metrics.Meter
requestRate metrics.Meter
requestSize metrics.Histogram
outgoingByteRate metrics.Meter
responseRate metrics.Meter
responseSize metrics.Histogram
brokerIncomingByteRate metrics.Meter
brokerRequestRate metrics.Meter
brokerRequestSize metrics.Histogram
brokerOutgoingByteRate metrics.Meter
brokerResponseRate metrics.Meter
brokerResponseSize metrics.Histogram
}

type responsePromise struct {
Expand Down Expand Up @@ -84,6 +99,23 @@ func (b *Broker) Open(conf *Config) error {

b.conf = conf

// Create or reuse the metrics shared between brokers
b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the metrics.GetOrRegister* methods concurrency-safe? Because many brokers can call these at once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes they are through a sync.Mutex.
I was thinking about caching the metrics resolved to avoid hitting that mutex but after running the existing benchmarks for the master branch and this branch I did not notice any slowness or contention.

b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
// Do not gather metrics for seeded broker (-1 id)
if b.id >= 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't you want metrics for the seeded broker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They would appear with -for-broker--1 without knowing which broker i used and metrics are already accumulated in the metrics for all brokers (without -for-broker-<Broker.ID()>.
Maybe using the hostname of the broker is more explicit but the Java client uses the broker id I believe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does java provide metrics for seed brokers? It just seems dangerous to omit them since it will mean your statistics will be incomplete.

Copy link
Contributor Author

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like java uses the ID because IPv6 addresses contain characters that aren't permitted in jmx metric names. I'm OK with following them on this one.

I don't know what limitations this metrics package has, but if it supports all the necessary names then I'm also OK actually using the address:port combo instead of the ID.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you stick with IDs, an explanatory comment here would be nice.

b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b.id, conf.MetricRegistry)
b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b.id, conf.MetricRegistry)
b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b.id, conf.MetricRegistry)
b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b.id, conf.MetricRegistry)
b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b.id, conf.MetricRegistry)
b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b.id, conf.MetricRegistry)
}

if conf.Net.SASL.Enable {
b.connErr = b.sendAndReceiveSASLPlainAuth()
if b.connErr != nil {
Expand Down Expand Up @@ -338,6 +370,8 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
return nil, err
}

b.updateOutgoingCommunicationMetrics(len(buf))

err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
if err != nil {
return nil, err
Expand Down Expand Up @@ -441,7 +475,7 @@ func (b *Broker) responseReceiver() {
continue
}

_, err = io.ReadFull(b.conn, header)
headerSize, err := io.ReadFull(b.conn, header)
if err != nil {
dead = err
response.errors <- err
Expand All @@ -464,13 +498,15 @@ func (b *Broker) responseReceiver() {
}

buf := make([]byte, decodedHeader.length-4)
_, err = io.ReadFull(b.conn, buf)
remainingPayloadSize, err := io.ReadFull(b.conn, buf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to capture this value (or headerSize above) since ReadFull will return an error unless it completely fills the buffer, and we'll never even get to the metric call in that case. You should be able to use len(header) + len(buf) instead.

if err != nil {
dead = err
response.errors <- err
continue
}

b.updateIncomingCommunicationMetrics(headerSize + remainingPayloadSize)

response.packets <- buf
}
close(b.done)
Expand Down Expand Up @@ -500,6 +536,8 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
binary.BigEndian.PutUint32(authBytes, uint32(length))
copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))

b.updateOutgoingCommunicationMetrics(len(authBytes))

err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
if err != nil {
Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
Expand All @@ -521,6 +559,40 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error {
return err
}

b.updateIncomingCommunicationMetrics(n)

Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
return nil
}

func (b *Broker) updateIncomingCommunicationMetrics(bytes int) {
b.responseRate.Mark(1)
if b.brokerResponseRate != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the metric creation methods return nil? Or is there some weird case where we could try and measure metrics before opening the connection?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh of course for seed-brokers we don't even generate some metrics. Oh well.

b.brokerResponseRate.Mark(1)
}
responseSize := int64(bytes)
b.incomingByteRate.Mark(responseSize)
if b.brokerIncomingByteRate != nil {
b.brokerIncomingByteRate.Mark(responseSize)
}
b.responseSize.Update(responseSize)
if b.brokerResponseSize != nil {
b.brokerResponseSize.Update(responseSize)
}
}

func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
b.requestRate.Mark(1)
if b.brokerRequestRate != nil {
b.brokerRequestRate.Mark(1)
}
requestSize := int64(bytes)
b.outgoingByteRate.Mark(requestSize)
if b.brokerOutgoingByteRate != nil {
b.brokerOutgoingByteRate.Mark(requestSize)
}
b.requestSize.Update(requestSize)
if b.brokerRequestSize != nil {
b.brokerRequestSize.Update(requestSize)
}
}
6 changes: 6 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"crypto/tls"
"regexp"
"time"

"github.com/rcrowley/go-metrics"
)

const defaultClientID = "sarama"
Expand Down Expand Up @@ -233,6 +235,9 @@ type Config struct {
// latest features. Setting it to a version greater than you are actually
// running may lead to random breakage.
Version KafkaVersion
// The registry to define metrics into.
// Defaults to metrics.DefaultRegistry.
MetricRegistry metrics.Registry
}

// NewConfig returns a new configuration instance with sane defaults.
Expand Down Expand Up @@ -268,6 +273,7 @@ func NewConfig() *Config {
c.ClientID = defaultClientID
c.ChannelBufferSize = 256
c.Version = minVersion
c.MetricRegistry = metrics.DefaultRegistry

return c
}
Expand Down
9 changes: 8 additions & 1 deletion config_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package sarama

import "testing"
import (
"testing"

"github.com/rcrowley/go-metrics"
)

func TestDefaultConfigValidates(t *testing.T) {
config := NewConfig()
if err := config.Validate(); err != nil {
t.Error(err)
}
if config.MetricRegistry != metrics.DefaultRegistry {
t.Error("Expected metrics.DefaultRegistry, got ", config.MetricRegistry)
}
}

func TestInvalidClientIDConfigValidates(t *testing.T) {
Expand Down
134 changes: 129 additions & 5 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package sarama

import (
"fmt"
"math"
"sync"
"testing"
"time"

"github.com/rcrowley/go-metrics"
)

const TestBatchSize = 1000
Expand Down Expand Up @@ -96,6 +99,9 @@ func testProducingMessages(t *testing.T, config *Config) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

// Use a dedicated registry to prevent side effect caused by the global one
config.MetricRegistry = metrics.NewRegistry()

config.Producer.Return.Successes = true
config.Consumer.Return.Errors = true

Expand All @@ -104,11 +110,8 @@ func testProducingMessages(t *testing.T, config *Config) {
t.Fatal(err)
}

master, err := NewConsumerFromClient(client)
if err != nil {
t.Fatal(err)
}
consumer, err := master.ConsumePartition("test.1", 0, OffsetNewest)
// Keep in mind the current offset
initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -140,6 +143,18 @@ func testProducingMessages(t *testing.T, config *Config) {
}
safeClose(t, producer)

// Validate producer metrics before using the consumer
validateMetrics(t, client)

master, err := NewConsumerFromClient(client)
if err != nil {
t.Fatal(err)
}
consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
if err != nil {
t.Fatal(err)
}

for i := 1; i <= TestBatchSize; i++ {
select {
case <-time.After(10 * time.Second):
Expand All @@ -159,6 +174,115 @@ func testProducingMessages(t *testing.T, config *Config) {
safeClose(t, client)
}

func validateMetrics(t *testing.T, client Client) {
// Get the broker ids used by test1 topic
brokerIds := make(map[int32]bool)
if partitions, err := client.Partitions("test.1"); err != nil {
t.Error(err)
} else {
for _, partition := range partitions {
if broker, err := client.Leader("test.1", partition); err != nil {
t.Error(err)
} else {
brokerIds[broker.ID()] = true
}
}
}

minCountMeterValidatorBuilder := func(minCount int64) func(string, interface{}) {
return func(name string, metric interface{}) {
if meter, ok := metric.(metrics.Meter); !ok {
t.Errorf("Expected meter metric for '%s', got %T", name, metric)
} else {
count := meter.Count()
if count < minCount {
t.Errorf("Expected meter metric '%s' count >= %d, got %d", name, minCount, count)
}
}
}
}

histogramValidatorBuilder := func(minCount int64, minMin int64, maxMax int64) func(string, interface{}) {
return func(name string, metric interface{}) {
if histogram, ok := metric.(metrics.Histogram); !ok {
t.Errorf("Expected histogram metric for '%s', got %T", name, metric)
} else {
count := histogram.Count()
if count < minCount {
t.Errorf("Expected histogram metric '%s' count >= %d, got %d", name, minCount, count)
}
min := int64(histogram.Min())
if min < minMin {
t.Errorf("Expected histogram metric '%s' min >= %d, got %d", name, minMin, min)
}
max := int64(histogram.Max())
if max > maxMax {
t.Errorf("Expected histogram metric '%s' max <= %d, got %d", name, maxMax, max)
}
}
}
}

type expectedMetric struct {
name string
validator func(string, interface{})
}
expectedMetrics := make([]expectedMetric, 0, 20)
addExpectedBrokerMetric := func(name string, validator func(string, interface{})) {
expectedMetrics = append(expectedMetrics, expectedMetric{name, validator})
for brokerId, _ := range brokerIds {
expectedMetrics = append(expectedMetrics, expectedMetric{fmt.Sprintf("%s-for-broker-%d", name, brokerId), validator})
}
}
addExpectedTopicMetric := func(name string, validator func(string, interface{})) {
expectedMetrics = append(expectedMetrics, expectedMetric{name, validator})
expectedMetrics = append(expectedMetrics, expectedMetric{fmt.Sprintf("%s-for-topic-test.1", name), validator})
}

compressionEnabled := client.Config().Producer.Compression != CompressionNone
noResponse := client.Config().Producer.RequiredAcks == NoResponse

// We read at least 1 byte from the brokers
addExpectedBrokerMetric("incoming-byte-rate", minCountMeterValidatorBuilder(1))
// in at least 2 requests to the brokers (1 for metadata request and N for produce request)
addExpectedBrokerMetric("request-rate", minCountMeterValidatorBuilder(2)) // Count 15 / 8 for broker
addExpectedBrokerMetric("request-size", histogramValidatorBuilder(2, 1, math.MaxInt64))
// We receive at least 1 byte from the broker
addExpectedBrokerMetric("outgoing-byte-rate", minCountMeterValidatorBuilder(1))
if noResponse {
// in a single response (metadata)
addExpectedBrokerMetric("response-rate", minCountMeterValidatorBuilder(1))
addExpectedBrokerMetric("response-size", histogramValidatorBuilder(1, 1, math.MaxInt64))
} else {
// in at least 2 responses (metadata + produce)
addExpectedBrokerMetric("response-rate", minCountMeterValidatorBuilder(2))
addExpectedBrokerMetric("response-size", histogramValidatorBuilder(2, 1, math.MaxInt64))
}

// We send at least 1 batch
addExpectedTopicMetric("batch-size", histogramValidatorBuilder(1, 1, math.MaxInt64))
if compressionEnabled {
// We record compression ratio 0.01-2.00 (1-200 with a histogram) for at least one fake message
addExpectedTopicMetric("compression-rate", histogramValidatorBuilder(1, 1, 200))
} else {
// We record compression ratio 1.00 (100 with a histogram) for every record
addExpectedTopicMetric("compression-rate", histogramValidatorBuilder(TestBatchSize, 100, 100))
}
// We send exactly TestBatchSize messages
addExpectedTopicMetric("record-send-rate", minCountMeterValidatorBuilder(TestBatchSize))
// We send at least one record per request
addExpectedTopicMetric("records-per-request", histogramValidatorBuilder(1, 1, math.MaxInt64))

for _, expectedMetric := range expectedMetrics {
foundMetric := client.Config().MetricRegistry.Get(expectedMetric.name)
if foundMetric == nil {
t.Error("No metric named", expectedMetric.name)
} else {
expectedMetric.validator(expectedMetric.name, foundMetric)
}
}
}

// Benchmarks

func BenchmarkProducerSmall(b *testing.B) {
Expand Down
37 changes: 37 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package sarama

import (
"fmt"

"github.com/rcrowley/go-metrics"
)

func getOrRegisterHistogram(name string, r metrics.Registry) metrics.Histogram {
return r.GetOrRegister(name, func() metrics.Histogram {
return metrics.NewHistogram(metrics.NewExpDecaySample(1028, 0.015))
}).(metrics.Histogram)
}

func getMetricNameForBroker(name string, brokerId int32) string {
return fmt.Sprintf(name+"-for-broker-%d", brokerId)
}

func getOrRegisterBrokerMeter(name string, brokerId int32, r metrics.Registry) metrics.Meter {
return metrics.GetOrRegisterMeter(getMetricNameForBroker(name, brokerId), r)
}

func getOrRegisterBrokerHistogram(name string, brokerId int32, r metrics.Registry) metrics.Histogram {
return getOrRegisterHistogram(getMetricNameForBroker(name, brokerId), r)
}

func getMetricNameForTopic(name string, topic string) string {
return fmt.Sprintf(name+"-for-topic-%s", topic)
}

func getOrRegisterTopicMeter(name string, topic string, r metrics.Registry) metrics.Meter {
return metrics.GetOrRegisterMeter(getMetricNameForTopic(name, topic), r)
}

func getOrRegisterTopicHistogram(name string, topic string, r metrics.Registry) metrics.Histogram {
return getOrRegisterHistogram(getMetricNameForTopic(name, topic), r)
}
Loading