-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose metrics using go-metrics (#683) #688
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,8 @@ import ( | |
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/rcrowley/go-metrics" | ||
) | ||
|
||
// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe. | ||
|
@@ -26,6 +28,19 @@ type Broker struct { | |
|
||
responses chan responsePromise | ||
done chan bool | ||
|
||
incomingByteRate metrics.Meter | ||
requestRate metrics.Meter | ||
requestSize metrics.Histogram | ||
outgoingByteRate metrics.Meter | ||
responseRate metrics.Meter | ||
responseSize metrics.Histogram | ||
brokerIncomingByteRate metrics.Meter | ||
brokerRequestRate metrics.Meter | ||
brokerRequestSize metrics.Histogram | ||
brokerOutgoingByteRate metrics.Meter | ||
brokerResponseRate metrics.Meter | ||
brokerResponseSize metrics.Histogram | ||
} | ||
|
||
type responsePromise struct { | ||
|
@@ -84,6 +99,23 @@ func (b *Broker) Open(conf *Config) error { | |
|
||
b.conf = conf | ||
|
||
// Create or reuse the metrics shared between brokers | ||
b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry) | ||
b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry) | ||
b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry) | ||
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry) | ||
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry) | ||
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry) | ||
// Do not gather metrics for seeded broker (-1 id) | ||
if b.id >= 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why don't you want metrics for the seeded broker? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They would appear with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does java provide metrics for seed brokers? It just seems dangerous to omit them since it will mean your statistics will be incomplete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They do not provide the network related metrics for seed brokers either:
I do not know why they chose not to but I found this comment on a JIRA ticket: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds like java uses the ID because IPv6 addresses contain characters that aren't permitted in jmx metric names. I'm OK with following them on this one. I don't know what limitations this metrics package has, but if it supports all the necessary names then I'm also OK actually using the address:port combo instead of the ID. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you stick with IDs, an explanatory comment here would be nice. |
||
b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b.id, conf.MetricRegistry) | ||
b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b.id, conf.MetricRegistry) | ||
b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b.id, conf.MetricRegistry) | ||
b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b.id, conf.MetricRegistry) | ||
b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b.id, conf.MetricRegistry) | ||
b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b.id, conf.MetricRegistry) | ||
} | ||
|
||
if conf.Net.SASL.Enable { | ||
b.connErr = b.sendAndReceiveSASLPlainAuth() | ||
if b.connErr != nil { | ||
|
@@ -338,6 +370,8 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, | |
return nil, err | ||
} | ||
|
||
b.updateOutgoingCommunicationMetrics(len(buf)) | ||
|
||
err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) | ||
if err != nil { | ||
return nil, err | ||
|
@@ -441,7 +475,7 @@ func (b *Broker) responseReceiver() { | |
continue | ||
} | ||
|
||
_, err = io.ReadFull(b.conn, header) | ||
headerSize, err := io.ReadFull(b.conn, header) | ||
if err != nil { | ||
dead = err | ||
response.errors <- err | ||
|
@@ -464,13 +498,15 @@ func (b *Broker) responseReceiver() { | |
} | ||
|
||
buf := make([]byte, decodedHeader.length-4) | ||
_, err = io.ReadFull(b.conn, buf) | ||
remainingPayloadSize, err := io.ReadFull(b.conn, buf) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't need to capture this value (or |
||
if err != nil { | ||
dead = err | ||
response.errors <- err | ||
continue | ||
} | ||
|
||
b.updateIncomingCommunicationMetrics(headerSize + remainingPayloadSize) | ||
|
||
response.packets <- buf | ||
} | ||
close(b.done) | ||
|
@@ -500,6 +536,8 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error { | |
binary.BigEndian.PutUint32(authBytes, uint32(length)) | ||
copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)) | ||
|
||
b.updateOutgoingCommunicationMetrics(len(authBytes)) | ||
|
||
err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) | ||
if err != nil { | ||
Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error()) | ||
|
@@ -521,6 +559,40 @@ func (b *Broker) sendAndReceiveSASLPlainAuth() error { | |
return err | ||
} | ||
|
||
b.updateIncomingCommunicationMetrics(n) | ||
|
||
Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header) | ||
return nil | ||
} | ||
|
||
func (b *Broker) updateIncomingCommunicationMetrics(bytes int) { | ||
b.responseRate.Mark(1) | ||
if b.brokerResponseRate != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the metric creation methods return nil? Or is there some weird case where we could try and measure metrics before opening the connection? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh of course for seed-brokers we don't even generate some metrics. Oh well. |
||
b.brokerResponseRate.Mark(1) | ||
} | ||
responseSize := int64(bytes) | ||
b.incomingByteRate.Mark(responseSize) | ||
if b.brokerIncomingByteRate != nil { | ||
b.brokerIncomingByteRate.Mark(responseSize) | ||
} | ||
b.responseSize.Update(responseSize) | ||
if b.brokerResponseSize != nil { | ||
b.brokerResponseSize.Update(responseSize) | ||
} | ||
} | ||
|
||
func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) { | ||
b.requestRate.Mark(1) | ||
if b.brokerRequestRate != nil { | ||
b.brokerRequestRate.Mark(1) | ||
} | ||
requestSize := int64(bytes) | ||
b.outgoingByteRate.Mark(requestSize) | ||
if b.brokerOutgoingByteRate != nil { | ||
b.brokerOutgoingByteRate.Mark(requestSize) | ||
} | ||
b.requestSize.Update(requestSize) | ||
if b.brokerRequestSize != nil { | ||
b.brokerRequestSize.Update(requestSize) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package sarama | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/rcrowley/go-metrics" | ||
) | ||
|
||
func getOrRegisterHistogram(name string, r metrics.Registry) metrics.Histogram { | ||
return r.GetOrRegister(name, func() metrics.Histogram { | ||
return metrics.NewHistogram(metrics.NewExpDecaySample(1028, 0.015)) | ||
}).(metrics.Histogram) | ||
} | ||
|
||
func getMetricNameForBroker(name string, brokerId int32) string { | ||
return fmt.Sprintf(name+"-for-broker-%d", brokerId) | ||
} | ||
|
||
func getOrRegisterBrokerMeter(name string, brokerId int32, r metrics.Registry) metrics.Meter { | ||
return metrics.GetOrRegisterMeter(getMetricNameForBroker(name, brokerId), r) | ||
} | ||
|
||
func getOrRegisterBrokerHistogram(name string, brokerId int32, r metrics.Registry) metrics.Histogram { | ||
return getOrRegisterHistogram(getMetricNameForBroker(name, brokerId), r) | ||
} | ||
|
||
func getMetricNameForTopic(name string, topic string) string { | ||
return fmt.Sprintf(name+"-for-topic-%s", topic) | ||
} | ||
|
||
func getOrRegisterTopicMeter(name string, topic string, r metrics.Registry) metrics.Meter { | ||
return metrics.GetOrRegisterMeter(getMetricNameForTopic(name, topic), r) | ||
} | ||
|
||
func getOrRegisterTopicHistogram(name string, topic string, r metrics.Registry) metrics.Histogram { | ||
return getOrRegisterHistogram(getMetricNameForTopic(name, topic), r) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the
metrics.GetOrRegister*
methods concurrency-safe? Because many brokers can call these at once.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes they are through a
sync.Mutex
.I was thinking about caching the metrics resolved to avoid hitting that mutex but after running the existing benchmarks for the master branch and this branch I did not notice any slowness or contention.