-
Notifications
You must be signed in to change notification settings - Fork 210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Buffer Latency Metric #4237
Conversation
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@@ -70,7 +70,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka | |||
final AcknowledgementSetManager acknowledgementSetManager, | |||
final ByteDecoder byteDecoder, final AwsCredentialsSupplier awsCredentialsSupplier, | |||
final CircuitBreaker circuitBreaker) { | |||
super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()), pluginSetting.getPipelineName()); | |||
super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()+"buffer"), pluginSetting.getPipelineName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will this name look like?
kafkabuffer
Is there a better name we can use here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Right now the name looks like "kafka". I am changing it to "kafkabuffer"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we can have buffer in the base name to start with.
So for example:
my-pipeline.kafka.buffer.readLatency
Perhaps by changing
public static final String READ_LATENCY = "readLatency";
to
public static final String READ_LATENCY = "buffer.readLatency";
Thoughts?
protected void updateLatency(Collection<T> records) { | ||
for (T rec : records) { | ||
if (rec instanceof Record) { | ||
Record<Event> record = (Record<Event>)rec; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all buffers hold records at the moment.
We should change these lines to support non-events. Also, I think you don't need to cast to DefaultEventHandle
.
Object data = rec.getData();
if(data instanceof Event) {
Event event = (Event) data;
Instant receivedTime = event.getEventHandle().getInternalOriginationTime();
latencyTimer.record(Duration.between(receivedTime, Instant.now()));
}
@@ -86,4 +86,6 @@ private MetricNames() {} | |||
* Delimiter used to separate path components in metric names. | |||
*/ | |||
public static final String DELIMITER = "."; | |||
|
|||
public static final String LATENCY = "latency"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use a different name here? The term latency is ambiguous as there are numerous places where latency exists.
It appears that this is the time before events are read from the buffer. Some name ideas:
bufferReadLatency
readLatency
timeEventIdle
atRestTime
(to put it in opposition to in-flight)
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
@@ -70,7 +70,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka | |||
final AcknowledgementSetManager acknowledgementSetManager, | |||
final ByteDecoder byteDecoder, final AwsCredentialsSupplier awsCredentialsSupplier, | |||
final CircuitBreaker circuitBreaker) { | |||
super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()), pluginSetting.getPipelineName()); | |||
super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()+"buffer"), pluginSetting.getPipelineName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we can have buffer in the base name to start with.
So for example:
my-pipeline.kafka.buffer.readLatency
Perhaps by changing
public static final String READ_LATENCY = "readLatency";
to
public static final String READ_LATENCY = "buffer.readLatency";
Thoughts?
@dlvenable For blocking buffer, the name is already "blockingbuffer", I am making kafka buffer name to be similar. Rightnow, the kafkabuffer name is "kafka". |
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Description
Add Latency Metric to Buffers
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.