Skip to content

Commit

Permalink
[FLINK-36863][autoscaler] Use the maximum parallelism in the past sca…
Browse files Browse the repository at this point in the history
…le-down.interval window when scaling down
  • Loading branch information
1996fanrui committed Dec 9, 2024
1 parent d9e8cce commit 345af37
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,83 @@

import javax.annotation.Nonnull;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkState;

/** All delayed scale down requests. */
public class DelayedScaleDown {

@Data
private static class RecommendedParallelism {
@Nonnull private final Instant triggerTime;
private final int parallelism;

@JsonCreator
public RecommendedParallelism(
@Nonnull @JsonProperty("triggerTime") Instant triggerTime,
@JsonProperty("parallelism") int parallelism) {
this.triggerTime = triggerTime;
this.parallelism = parallelism;
}
}

/** The delayed scale down info for vertex. */
@Data
public static class VertexDelayedScaleDownInfo {
private final Instant firstTriggerTime;
private int maxRecommendedParallelism;
// TODO : add the comment to explain how to calculate the max parallelism within the sliding
// window.
private final LinkedList<RecommendedParallelism> recommendedParallelisms;

public VertexDelayedScaleDownInfo(Instant firstTriggerTime) {
this.firstTriggerTime = firstTriggerTime;
this.recommendedParallelisms = new LinkedList<>();
}

@JsonCreator
public VertexDelayedScaleDownInfo(
@JsonProperty("firstTriggerTime") Instant firstTriggerTime,
@JsonProperty("maxRecommendedParallelism") int maxRecommendedParallelism) {
@JsonProperty("recommendedParallelisms")
LinkedList<RecommendedParallelism> recommendedParallelisms) {
this.firstTriggerTime = firstTriggerTime;
this.maxRecommendedParallelism = maxRecommendedParallelism;
this.recommendedParallelisms = recommendedParallelisms;
}

/** Record current recommended parallelism. */
public void recordRecommendedParallelism(
Instant triggerTime, int parallelism, Duration scaleDownInterval) {
var windowStartTime = triggerTime.minus(scaleDownInterval);

// Remove all recommended parallelisms before the window start time.
while (!recommendedParallelisms.isEmpty()
&& recommendedParallelisms
.peekFirst()
.getTriggerTime()
.isBefore(windowStartTime)) {
recommendedParallelisms.pollFirst();
}

// Remove all recommended parallelisms that are lower than the latest parallelism.
while (!recommendedParallelisms.isEmpty()
&& recommendedParallelisms.peekLast().getParallelism() <= parallelism) {
recommendedParallelisms.pollLast();
}

recommendedParallelisms.addLast(new RecommendedParallelism(triggerTime, parallelism));
}

@JsonIgnore
public int getMaxRecommendedParallelism() {
var maxRecommendedParallelism = recommendedParallelisms.peekFirst();
checkState(
maxRecommendedParallelism != null,
"The getMaxRecommendedParallelism should be called after triggering a scale down, it may be a bug.");
return maxRecommendedParallelism.getParallelism();
}
}

Expand All @@ -63,18 +121,18 @@ public DelayedScaleDown() {
/** Trigger a scale down, and return the corresponding {@link VertexDelayedScaleDownInfo}. */
@Nonnull
public VertexDelayedScaleDownInfo triggerScaleDown(
JobVertexID vertex, Instant triggerTime, int parallelism) {
JobVertexID vertex, Instant triggerTime, int parallelism, Duration scaleDownInterval) {
// The vertexDelayedScaleDownInfo is updated once scale down is triggered due to we need
// update the triggerTime each time.
updated = true;

var vertexDelayedScaleDownInfo = delayedVertices.get(vertex);
if (vertexDelayedScaleDownInfo == null) {
// It's the first trigger
vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime, parallelism);
vertexDelayedScaleDownInfo = new VertexDelayedScaleDownInfo(triggerTime);
delayedVertices.put(vertex, vertexDelayedScaleDownInfo);
updated = true;
} else if (parallelism > vertexDelayedScaleDownInfo.getMaxRecommendedParallelism()) {
// Not the first trigger, but the maxRecommendedParallelism needs to be updated.
vertexDelayedScaleDownInfo.setMaxRecommendedParallelism(parallelism);
updated = true;
}
vertexDelayedScaleDownInfo.recordRecommendedParallelism(
triggerTime, parallelism, scaleDownInterval);

return vertexDelayedScaleDownInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ private ParallelismChange applyScaleDownInterval(
}

var now = clock.instant();
var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism);
var delayedScaleDownInfo =
delayedScaleDown.triggerScaleDown(vertex, now, newParallelism, scaleDownInterval);

// Never scale down within scale down interval
if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.time.Instant;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -33,35 +34,46 @@ public class DelayedScaleDownTest {
@Test
void testTriggerUpdateAndClean() {
var instant = Instant.now();
var scaleDownInterval = Duration.ofHours(1);
var delayedScaleDown = new DelayedScaleDown();
assertThat(delayedScaleDown.isUpdated()).isFalse();

// First trigger time as the trigger time, and it won't be updated.
assertVertexDelayedScaleDownInfo(
delayedScaleDown.triggerScaleDown(vertex, instant, 5), instant, 5);
delayedScaleDown.triggerScaleDown(vertex, instant, 5, scaleDownInterval),
instant,
5);
assertThat(delayedScaleDown.isUpdated()).isTrue();

// The lower parallelism doesn't update the result
assertVertexDelayedScaleDownInfo(
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(5), 3), instant, 5);
delayedScaleDown.triggerScaleDown(
vertex, instant.plusSeconds(5), 3, scaleDownInterval),
instant,
5);

// The higher parallelism will update the result
assertVertexDelayedScaleDownInfo(
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(10), 8), instant, 8);
delayedScaleDown.triggerScaleDown(
vertex, instant.plusSeconds(10), 8, scaleDownInterval),
instant,
8);

// The scale down could be re-triggered again after clean
delayedScaleDown.clearVertex(vertex);
assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
assertVertexDelayedScaleDownInfo(
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 4),
delayedScaleDown.triggerScaleDown(
vertex, instant.plusSeconds(15), 4, scaleDownInterval),
instant.plusSeconds(15),
4);

// The scale down could be re-triggered again after cleanAll
delayedScaleDown.clearAll();
assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
assertVertexDelayedScaleDownInfo(
delayedScaleDown.triggerScaleDown(vertex, instant.plusSeconds(15), 2),
delayedScaleDown.triggerScaleDown(
vertex, instant.plusSeconds(15), 2, scaleDownInterval),
instant.plusSeconds(15),
2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,10 @@ protected void testDiscardAllState() throws Exception {
stateStore.storeScalingTracking(ctx, scalingTracking);

var delayedScaleDown = new DelayedScaleDown();
delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now(), 10);
delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now().plusSeconds(10), 12);
var scaleDownInterval = Duration.ofHours(1);
delayedScaleDown.triggerScaleDown(new JobVertexID(), Instant.now(), 10, scaleDownInterval);
delayedScaleDown.triggerScaleDown(
new JobVertexID(), Instant.now().plusSeconds(10), 12, scaleDownInterval);

stateStore.storeDelayedScaleDown(ctx, delayedScaleDown);

Expand Down

0 comments on commit 345af37

Please sign in to comment.