-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose metrics using go-metrics (#683) #688
Conversation
- add MetricRegistry configuration parameter that defaults to metrics.DefaultRegistry - provide the following metrics: - incoming-byte-rate meter (global and per registered broker) - request-rate meter (global and per registered broker) - request-size histogram (global and per registered broker) - outgoing-byte-rate meter (global and per registered broker) - response-rate meter (global and per registered broker) - response-size histogram (global and per registered broker) - batch-size histogram (global and per topic) - record-send-rate meter (global and per topic) - records-per-request histogram (global and per topic) - compression-rate histogram (global and per topic) - add metrics flag to kafka-console-producer to output metrics - validate metrics in functional_producer_test
Example of metrics output when using
|
@@ -84,6 +99,23 @@ func (b *Broker) Open(conf *Config) error { | |||
|
|||
b.conf = conf | |||
|
|||
// Create or reuse the metrics shared between brokers | |||
b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the metrics.GetOrRegister*
methods concurrency-safe? Because many brokers can call these at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes they are through a sync.Mutex
.
I was thinking about caching the metrics resolved to avoid hitting that mutex but after running the existing benchmarks for the master branch and this branch I did not notice any slowness or contention.
- provide underlying MessageSet when encoding fake compressed "message" - use len(msg.Set.Messages) for counting records - expose the configured metric registry in ProduceRequest - expose current offset in packetEncoder for batch size metric - expose real encoder flag in packetEncoder for recording metrics only once - record metrics in produce_request.go
I moved most of the metric gathering code to
If this looks better, I'll rebase against master, provide unit tests, documentation and some examples. |
@@ -464,13 +498,15 @@ func (b *Broker) responseReceiver() { | |||
} | |||
|
|||
buf := make([]byte, decodedHeader.length-4) | |||
_, err = io.ReadFull(b.conn, buf) | |||
remainingPayloadSize, err := io.ReadFull(b.conn, buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to capture this value (or headerSize
above) since ReadFull
will return an error unless it completely fills the buffer, and we'll never even get to the metric call in that case. You should be able to use len(header) + len(buf)
instead.
This is definitely better, thanks for all your work! It's getting large enough that when you rebase etc. I would ask you split it in two please: one PR for the config and docs and broker metrics, and then a follow-up PR adding producer metrics. It will be easier to review in chunks like that. |
MetricRegistry
configuration parameter that defaults tometrics.DefaultRegistry
incoming-byte-rate
meter (global and per registered broker)request-rate
meter (global and per registered broker)request-size
histogram (global and per registered broker)outgoing-byte-rate
meter (global and per registered broker)response-rate
meter (global and per registered broker)response-size
histogram (global and per registered broker)batch-size
histogram (global and per topic)record-send-rate
meter (global and per topic)records-per-request
histogram (global and per topic)compression-rate
histogram (global and per topic)kafka-console-producer
to output metricsfunctional_producer_test
This is a work in progress, next steps: