-
Notifications
You must be signed in to change notification settings - Fork 210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Buffer Latency Metric #4237
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,11 @@ | |
import org.opensearch.dataprepper.model.CheckpointState; | ||
import org.opensearch.dataprepper.model.configuration.PluginSetting; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import org.opensearch.dataprepper.model.event.Event; | ||
import org.opensearch.dataprepper.model.event.DefaultEventHandle; | ||
|
||
import java.time.Instant; | ||
import java.time.Duration; | ||
import java.util.Collection; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
|
@@ -32,6 +36,7 @@ public abstract class AbstractBuffer<T extends Record<?>> implements Buffer<T> { | |
private final Counter writeTimeoutCounter; | ||
private final Counter recordsWriteFailed; | ||
private final Timer writeTimer; | ||
private final Timer latencyTimer; | ||
private final Timer readTimer; | ||
private final Timer checkpointTimer; | ||
|
||
|
@@ -54,6 +59,7 @@ private AbstractBuffer(final PluginMetrics pluginMetrics, final String pipelineN | |
this.writeTimeoutCounter = pluginMetrics.counter(MetricNames.WRITE_TIMEOUTS); | ||
this.writeTimer = pluginMetrics.timer(MetricNames.WRITE_TIME_ELAPSED); | ||
this.readTimer = pluginMetrics.timer(MetricNames.READ_TIME_ELAPSED); | ||
this.latencyTimer = pluginMetrics.timer(MetricNames.LATENCY); | ||
this.checkpointTimer = pluginMetrics.timer(MetricNames.CHECKPOINT_TIME_ELAPSED); | ||
} | ||
|
||
|
@@ -142,6 +148,16 @@ public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) { | |
return readResult; | ||
} | ||
|
||
protected void updateLatency(Collection<T> records) { | ||
for (T rec : records) { | ||
if (rec instanceof Record) { | ||
Record<Event> record = (Record<Event>)rec; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not all buffers hold records at the moment. We should change these lines to support non-events. Also, I think you don't need to cast to
|
||
Instant receivedTime = ((DefaultEventHandle)(((Event)record.getData()).getEventHandle())).getInternalOriginationTime(); | ||
latencyTimer.record(Duration.between(receivedTime, Instant.now())); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void checkpoint(final CheckpointState checkpointState) { | ||
checkpointTimer.record(() -> doCheckpoint(checkpointState)); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,7 +70,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka | |
final AcknowledgementSetManager acknowledgementSetManager, | ||
final ByteDecoder byteDecoder, final AwsCredentialsSupplier awsCredentialsSupplier, | ||
final CircuitBreaker circuitBreaker) { | ||
super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()), pluginSetting.getPipelineName()); | ||
super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()+"buffer"), pluginSetting.getPipelineName()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What will this name look like?
Is there a better name we can use here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah. Right now the name looks like "kafka". I am changing it to "kafkabuffer" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering if we can have buffer in the base name to start with. So for example:
Perhaps by changing
to
Thoughts? |
||
final SerializationFactory serializationFactory = new BufferSerializationFactory(new CommonSerializationFactory()); | ||
final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier, new TopicServiceFactory()); | ||
this.byteDecoder = byteDecoder; | ||
|
@@ -141,7 +141,9 @@ public void doWriteAll(Collection<Record<Event>> records, int timeoutInMillis) t | |
public Map.Entry<Collection<Record<Event>>, CheckpointState> doRead(int timeoutInMillis) { | ||
try { | ||
setMdc(); | ||
return innerBuffer.read(timeoutInMillis); | ||
Map.Entry<Collection<Record<Event>>, CheckpointState> result = innerBuffer.read(timeoutInMillis); | ||
updateLatency(result.getKey()); | ||
return result; | ||
} finally { | ||
resetMdc(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use a different name here? The term latency is ambiguous as there are numerous places where latency exists.
It appears that this is the time before events are read from the buffer. Some name ideas:
bufferReadLatency
readLatency
timeEventIdle
atRestTime
(to put it in opposition to in-flight)