Skip to content

Commit

Permalink
Fixed failing code verification test by adding new test case
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
Krishna Kondaka committed Mar 6, 2024
1 parent 5948bcd commit f954bef
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) {
return readResult;
}

Timer getLatencyTimer() {
return latencyTimer;
}

protected void updateLatency(Collection<T> records) {
for (T rec : records) {
if (rec instanceof Record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;

import java.util.AbstractMap;
import java.util.ArrayList;
Expand All @@ -36,6 +38,7 @@
public class AbstractBufferTest {
private static final String BUFFER_NAME = "testBuffer";
private static final String PIPELINE_NAME = "pipelineName";
private static final String TEST_MESSAGE = "testMessage";

private PluginSetting testPluginSetting;

Expand Down Expand Up @@ -222,6 +225,16 @@ public void testWriteAllTimeoutMetric() throws TimeoutException {
assertEquals(1.0, timeoutMeasurements.get(0).getValue(), 0);
}

@Test
public void testUpdateLatency() {
final AbstractBuffer<Record<Event>> abstractBuffer = new AbstractBufferEventImpl(testPluginSetting);
final Collection<Record<Event>> testRecords = Arrays.asList(
new Record<>(JacksonEvent.fromMessage(TEST_MESSAGE)));
abstractBuffer.updateLatency(testRecords);
assertEquals(abstractBuffer.getLatencyTimer().count(), 1);

}

@Test
public void testWriteAllRecordsWriteFailedMetric() {
// Given
Expand Down Expand Up @@ -387,4 +400,33 @@ public void doWriteAll(Collection<Record<String>> records, int timeoutInMillis)
throw new NullPointerException();
}
}

public static class AbstractBufferEventImpl extends AbstractBuffer<Record<Event>> {
public AbstractBufferEventImpl(PluginSetting pluginSetting) {
super(pluginSetting);
}

@Override
public void doWrite(Record<Event> record, int timeoutInMillis) throws TimeoutException {
}

@Override
public void doWriteAll(Collection<Record<Event>> records, int timeoutInMillis) throws Exception {
}

@Override
public void doCheckpoint(final CheckpointState checkpointState) {

}

@Override
public Map.Entry<Collection<Record<Event>>, CheckpointState> doRead(int timeoutInMillis) {
return null;
}

@Override
public boolean isEmpty() {
return true;
}
}
}

0 comments on commit f954bef

Please sign in to comment.