From 367d37a7e57db30670013c0e67a2348f5c3e6d76 Mon Sep 17 00:00:00 2001 From: huyuanfeng Date: Mon, 25 Nov 2024 10:12:43 +0800 Subject: [PATCH] [hotfix] Fix incorrect `messageKey` passed in `ScalingLimited` event --- .../flink/autoscaler/JobVertexScaler.java | 2 +- .../flink/autoscaler/JobVertexScalerTest.java | 24 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 87099bc0f..84520d0fc 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -464,7 +464,7 @@ protected static > int scale( AutoScalerEventHandler.Type.Warning, SCALING_LIMITED, message, - SCALING_LIMITED + vertex + (scaleFactor * currentParallelism), + SCALING_LIMITED + vertex + newParallelism, context.getConfiguration().get(SCALING_EVENT_INTERVAL)); return p; } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index 704ddd58e..6b6c3b15e 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -1114,6 +1114,30 @@ public void testSendingScalingLimitedEvents() { .isEqualTo( String.format( SCALE_LIMITED_MESSAGE_FORMAT, jobVertexID, 20, 15, 15, 200, 1)); + // small changes for scaleFactor, verify that the event messageKey is the same. + var smallChangesForScaleFactor = evaluated(10, 199, 100); + smallChangesForScaleFactor.put( + ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(15)); + assertEquals( + ParallelismChange.required(15), + vertexScaler.computeScaleTargetParallelism( + context, + jobVertexID, + List.of(), + smallChangesForScaleFactor, + history, + restartTime, + delayedScaleDown)); + assertEquals(1, eventCollector.events.size()); + TestingEventCollector.Event> + smallChangesForScaleFactorLimitedEvent = eventCollector.events.poll(); + assertThat(partitionLimitedEvent.getMessage()) + .isEqualTo( + String.format( + SCALE_LIMITED_MESSAGE_FORMAT, jobVertexID, 20, 15, 15, 200, 1)); + assertThat(smallChangesForScaleFactorLimitedEvent).isNotNull(); + assertThat(partitionLimitedEvent.getMessageKey()) + .isEqualTo(smallChangesForScaleFactorLimitedEvent.getMessageKey()); } private Map evaluated(