Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SlidingTimeWindowReservoir trim operation after overflow #1063

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public class SlidingTimeWindowReservoir implements Reservoir {
private static final int COLLISION_BUFFER = 256;
// only trim on updating once every N
private static final int TRIM_THRESHOLD = 256;
// offsets the front of the time window for the purposes of clearing the buffer in trim
private static final long CLEAR_BUFFER = TimeUnit.HOURS.toNanos(1) * COLLISION_BUFFER;

private final Clock clock;
private final ConcurrentSkipListMap<Long, Long> measurements;
Expand Down Expand Up @@ -78,6 +80,14 @@ private long getTick() {
}

private void trim() {
measurements.headMap(getTick() - window).clear();
final long now = getTick();
final long windowStart = now - window;
final long windowEnd = now + CLEAR_BUFFER;
if (windowStart < windowEnd) {
measurements.headMap(windowStart).clear();
measurements.tailMap(windowEnd).clear();
} else {
measurements.subMap(windowEnd, windowStart).clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@

import org.junit.Test;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class SlidingTimeWindowReservoirTest {
private final Clock clock = mock(Clock.class);
private final SlidingTimeWindowReservoir reservoir = new SlidingTimeWindowReservoir(10, TimeUnit.NANOSECONDS, clock);

@Test
public void storesMeasurementsWithDuplicateTicks() throws Exception {
final Clock clock = mock(Clock.class);
final SlidingTimeWindowReservoir reservoir = new SlidingTimeWindowReservoir(10, NANOSECONDS, clock);

when(clock.getTick()).thenReturn(20L);

reservoir.update(1);
Expand All @@ -25,6 +28,9 @@ public void storesMeasurementsWithDuplicateTicks() throws Exception {

@Test
public void boundsMeasurementsToATimeWindow() throws Exception {
final Clock clock = mock(Clock.class);
final SlidingTimeWindowReservoir reservoir = new SlidingTimeWindowReservoir(10, NANOSECONDS, clock);

when(clock.getTick()).thenReturn(0L);
reservoir.update(1);

Expand All @@ -43,4 +49,69 @@ public void boundsMeasurementsToATimeWindow() throws Exception {
assertThat(reservoir.getSnapshot().getValues())
.containsOnly(4, 5);
}
}

@Test
public void testGetTickOverflow () {
final Random random = new Random(0);
final int window = 128;

// Note: 'threshold' defines the number of updates submitted to the reservoir after overflowing
for (int threshold : Arrays.asList(0, 1, 2, 127, 128, 129, 255, 256, 257)) {

// Note: 'updatePerTick' defines the number of updates submitted to the reservoir between each tick
for (int updatesPerTick : Arrays.asList(1, 2, 127, 128, 129, 255, 256, 257)) {
//logger.info("Executing test: threshold={}, updatesPerTick={}", threshold, updatesPerTick);

// Set the clock to overflow in (2*window+1)ns
final ManualClock clock = new ManualClock();
clock.addNanos(Long.MAX_VALUE/256 - 2*window - clock.getTick());
assertThat(clock.getTick() * 256).isGreaterThan(0);

// Create the reservoir
final SlidingTimeWindowReservoir reservoir = new SlidingTimeWindowReservoir(window, NANOSECONDS, clock);

int updatesAfterThreshold = 0;
while (true) {
// Update the reservoir
for (int i = 0; i < updatesPerTick; i++)
reservoir.update(0);

// Randomly check the reservoir size
if (random.nextDouble() < 0.1) {
assertThat(reservoir.size())
.as("Bad reservoir size with: threshold=%d, updatesPerTick=%d", threshold, updatesPerTick)
.isLessThanOrEqualTo(window * 256);
}

// Update the clock
clock.addNanos(1);

// If the clock has overflowed start counting updates
if ((clock.getTick() * 256) < 0) {
if (updatesAfterThreshold++ >= threshold)
break;
}
}

// Check the final reservoir size
assertThat(reservoir.size())
.as("Bad final reservoir size with: threshold=%d, updatesPerTick=%d", threshold, updatesPerTick)
.isLessThanOrEqualTo(window * 256);

// Advance the clock far enough to clear the reservoir. Note that here the window only loosely defines
// the reservoir window; when updatesPerTick is greater than 128 the sliding window will always be well
// ahead of the current clock time, and advances in getTick while in trim (called randomly above from
// size and every 256 updates). Until the clock "catches up" advancing the clock will have no effect on
// the reservoir, and reservoir.size() will merely move the window forward 1/256th of a ns - as such, an
// arbitrary increment of 1s here was used instead to advance the clock well beyond any updates recorded
// above.
clock.addSeconds(1);

// The reservoir should now be empty
assertThat(reservoir.size())
.as("Bad reservoir size after delay with: threshold=%d, updatesPerTick=%d", threshold, updatesPerTick)
.isEqualTo(0);
}
}
}
}