-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17248 - KIP 1076 implementation #17021
base: trunk
Are you sure you want to change the base?
Conversation
4e8ffa7
to
d390f0a
Compare
61270fb
to
b566d34
Compare
d6f9919
to
b96b967
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @bbejeck. I have taken a pass and some comments.
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Show resolved
Hide resolved
Thanks for the comments @apoorvmittal10 - I've addressed your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I understand the end-to-end wiring already, so need to make another pass.
Btw: should we also verify state store metrics (or are they automatically verified as part of task metrics)?
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadDelegatingMetricsReporter.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
They are covered as part of task metrics, but I expanded the test to explicitly verify state store metrics are correct as well |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR update. Made another pass.
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsClientMetricsDelegatingReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Show resolved
Hide resolved
020d7c4
to
965fd23
Compare
@mjsax thanks for the second review, I've addressed your comments |
.../java/org/apache/kafka/streams/internals/metrics/StreamsClientMetricsDelegatingReporter.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/internals/metrics/StreamsClientMetricsDelegatingReporter.java
Outdated
Show resolved
Hide resolved
965fd23
to
8d64d7d
Compare
b3d420d
to
235ccb6
Compare
@mjsax, @apoorvmittal10, @AndrewJSchofield comments addressed |
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one remaining comment.
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
@AndrewJSchofield, thanks for the review; I've addressed your comment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
@@ -1812,7 +1812,6 @@ public Map<String, Object> getAdminConfigs(final String clientId) { | |||
|
|||
// add client id with stream client id prefix | |||
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blank line should be back in now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still see it :)
d3e36be
to
cd8e201
Compare
.filter(metricName -> metricName.tags().containsKey("thread-id")).map(mn -> { | ||
final String name = mn.name().replace('-', '.'); | ||
final String group = mn.group().replace("-metrics", "").replace('-', '.'); | ||
return "org.apache.kafka." + group + "." + name; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to format the Kafka Streams metrics names into the KIP-714 naming format convention.
@bbejeck Can you rebase this PR to resolve merge conflicts? |
Implementation of KIP-1076 to allow for adding client application metrics to the KIP-714 framework
Integration and unit tests forthcoming
Committer Checklist (excluded from commit message)