From b943355cac3bea6bc7d2bf65477e73c3c91dd4b6 Mon Sep 17 00:00:00 2001 From: noorall <863485501@qq.com> Date: Sun, 28 Sep 2025 13:59:49 +0800 Subject: [PATCH] [FLINK-38272][runtime] Fix unstable BatchJobRecoveryTest --- ...ExecutionDeploymentReconciliationTest.java | 51 ------------ ...tingExecutionDeploymentTrackerWrapper.java | 82 +++++++++++++++++++ .../scheduler/DefaultSchedulerBuilder.java | 13 ++- .../adaptivebatch/BatchJobRecoveryTest.java | 53 +++++++++++- 4 files changed, 142 insertions(+), 57 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingExecutionDeploymentTrackerWrapper.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java index ed40475b96b8d..5f08f8e0c4653 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java @@ -22,7 +22,6 @@ import org.apache.flink.core.testutils.AllCallbackWrapper; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; @@ -55,7 +54,6 @@ import java.time.Duration; import java.util.Collection; import java.util.Collections; -import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -296,53 +294,4 @@ private void registerTaskExecutorAndOfferSlots( .offerSlots(taskManagerLocation.getResourceID(), slotOffers, testingTimeout) .get(); } - - private static class TestingExecutionDeploymentTrackerWrapper - implements ExecutionDeploymentTracker { - private final ExecutionDeploymentTracker originalTracker; - private final CompletableFuture taskDeploymentFuture; - private final CompletableFuture stopFuture; - - private TestingExecutionDeploymentTrackerWrapper() { - this(new DefaultExecutionDeploymentTracker()); - } - - private TestingExecutionDeploymentTrackerWrapper( - ExecutionDeploymentTracker originalTracker) { - this.originalTracker = originalTracker; - this.taskDeploymentFuture = new CompletableFuture<>(); - this.stopFuture = new CompletableFuture<>(); - } - - @Override - public void startTrackingPendingDeploymentOf( - ExecutionAttemptID executionAttemptId, ResourceID host) { - originalTracker.startTrackingPendingDeploymentOf(executionAttemptId, host); - } - - @Override - public void completeDeploymentOf(ExecutionAttemptID executionAttemptId) { - originalTracker.completeDeploymentOf(executionAttemptId); - taskDeploymentFuture.complete(executionAttemptId); - } - - @Override - public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId) { - originalTracker.stopTrackingDeploymentOf(executionAttemptId); - stopFuture.complete(executionAttemptId); - } - - @Override - public Map getExecutionsOn(ResourceID host) { - return originalTracker.getExecutionsOn(host); - } - - public CompletableFuture getTaskDeploymentFuture() { - return taskDeploymentFuture; - } - - public CompletableFuture getStopFuture() { - return stopFuture; - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingExecutionDeploymentTrackerWrapper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingExecutionDeploymentTrackerWrapper.java new file mode 100644 index 0000000000000..3fc5a1af0d8ca --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingExecutionDeploymentTrackerWrapper.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** Testing implementation of the {@link ExecutionDeploymentTracker}. */ +public class TestingExecutionDeploymentTrackerWrapper implements ExecutionDeploymentTracker { + private final ExecutionDeploymentTracker originalTracker; + private final CompletableFuture taskDeploymentFuture; + private final CompletableFuture stopFuture; + private final Set deployedExecutions = new HashSet<>(); + + public TestingExecutionDeploymentTrackerWrapper() { + this(new DefaultExecutionDeploymentTracker()); + } + + public TestingExecutionDeploymentTrackerWrapper(ExecutionDeploymentTracker originalTracker) { + this.originalTracker = originalTracker; + this.taskDeploymentFuture = new CompletableFuture<>(); + this.stopFuture = new CompletableFuture<>(); + } + + @Override + public void startTrackingPendingDeploymentOf( + ExecutionAttemptID executionAttemptId, ResourceID host) { + originalTracker.startTrackingPendingDeploymentOf(executionAttemptId, host); + } + + @Override + public void completeDeploymentOf(ExecutionAttemptID executionAttemptId) { + originalTracker.completeDeploymentOf(executionAttemptId); + taskDeploymentFuture.complete(executionAttemptId); + deployedExecutions.add(executionAttemptId); + } + + @Override + public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId) { + originalTracker.stopTrackingDeploymentOf(executionAttemptId); + stopFuture.complete(executionAttemptId); + } + + @Override + public Map getExecutionsOn(ResourceID host) { + return originalTracker.getExecutionsOn(host); + } + + public CompletableFuture getTaskDeploymentFuture() { + return taskDeploymentFuture; + } + + public CompletableFuture getStopFuture() { + return stopFuture; + } + + public Set getDeployedExecutions() { + return Collections.unmodifiableSet(deployedExecutions); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java index 9c8d1a67216c3..7ddb5106f56c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker; +import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler; @@ -122,6 +123,8 @@ public class DefaultSchedulerBuilder { private InputConsumableDecider.Factory inputConsumableDeciderFactory = AllFinishedInputConsumableDecider.Factory.INSTANCE; private BatchJobRecoveryHandler jobRecoveryHandler = new DummyBatchJobRecoveryHandler(); + private ExecutionDeploymentTracker executionDeploymentTracker = + new DefaultExecutionDeploymentTracker(); public DefaultSchedulerBuilder( JobGraph jobGraph, @@ -301,6 +304,12 @@ public DefaultSchedulerBuilder setJobRecoveryHandler( return this; } + public DefaultSchedulerBuilder setExecutionDeploymentTracker( + ExecutionDeploymentTracker executionDeploymentTracker) { + this.executionDeploymentTracker = executionDeploymentTracker; + return this; + } + public DefaultScheduler build() throws Exception { return new DefaultScheduler( log, @@ -367,7 +376,7 @@ public AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler(boolean enableSpecu jobManagerJobMetricGroup, shuffleMaster, partitionTracker, - new DefaultExecutionDeploymentTracker(), + executionDeploymentTracker, System.currentTimeMillis(), mainThreadExecutor, jobStatusListener, @@ -390,7 +399,7 @@ private ExecutionGraphFactory createExecutionGraphFactory( return new DefaultExecutionGraphFactory( jobMasterConfiguration, userCodeLoader, - new DefaultExecutionDeploymentTracker(), + executionDeploymentTracker, futureExecutor, ioExecutor, rpcTimeout, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java index 2146e78954add..35f6a07a3b2f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java @@ -54,6 +54,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobmaster.TestingExecutionDeploymentTrackerWrapper; import org.apache.flink.runtime.jobmaster.event.ExecutionVertexFinishedEvent; import org.apache.flink.runtime.jobmaster.event.FileSystemJobEventStore; import org.apache.flink.runtime.jobmaster.event.JobEvent; @@ -159,6 +160,9 @@ public class BatchJobRecoveryTest { private ScheduledExecutor delayedExecutor = new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()); + private TestingExecutionDeploymentTrackerWrapper executionDeploymentTracker = + new TestingExecutionDeploymentTrackerWrapper(); + private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L); private static final int NUM_SPLITS = 10; private static final int SOURCE_PARALLELISM = 5; @@ -216,6 +220,7 @@ void setUp() throws IOException { this.serializedJobGraph = serializeJobGraph(createDefaultJobGraph()); allPartitionWithMetrics.clear(); + executionDeploymentTracker = new TestingExecutionDeploymentTrackerWrapper(); } @AfterEach @@ -238,11 +243,14 @@ void testRecoverFromJMFailover() throws Exception { runInMainThread(scheduler::startScheduling); + waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler); runInMainThread( () -> { // transition all sources to finished. transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID); }); + + waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler); runInMainThread( () -> { // transition all middle tasks to RUNNING state @@ -338,11 +346,14 @@ void testJobVertexUnFinishedAndOperatorCoordinatorNotSupportBatchSnapshot() thro runInMainThread(scheduler::startScheduling); + waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler); runInMainThread( () -> { // transition all sources to finished. transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID); }); + + waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler); runInMainThread( () -> { // transition first middle task to finished. @@ -451,6 +462,7 @@ void testJobVertexFinishedAndOperatorCoordinatorNotSupportBatchSnapshotAndPartit runInMainThread(scheduler::startScheduling); + waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler); runInMainThread( () -> { // transition all sources to finished. @@ -495,14 +507,13 @@ void testJobVertexFinishedAndOperatorCoordinatorNotSupportBatchSnapshotAndPartit } } - for (ExecutionVertex taskVertex : - getExecutionVertices(MIDDLE_ID, newScheduler.getExecutionGraph())) { - waitUntilExecutionVertexState(taskVertex, ExecutionState.DEPLOYING, 15000L); - } + waitUntilAllExecutionsDeployed(MIDDLE_ID, newScheduler); + waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler); runInMainThread( () -> { // transition all middle tasks to running + transitionExecutionsState(scheduler, ExecutionState.INITIALIZING, MIDDLE_ID); transitionExecutionsState(scheduler, ExecutionState.RUNNING, MIDDLE_ID); }); @@ -539,6 +550,7 @@ void testRecoverFromJMFailoverAndPartitionsUnavailable() throws Exception { runInMainThread(scheduler::startScheduling); + waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler); runInMainThread( () -> { // transition all sources to finished. @@ -596,15 +608,20 @@ void testRecoverDecidedParallelismFromTheSameJobGraphInstance() throws Exception runInMainThread(scheduler::startScheduling); + waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler); runInMainThread( () -> { // transition all sources to finished. transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID); }); + + waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler); runInMainThread( () -> { // transition all middle tasks to finished. transitionExecutionsState(scheduler, ExecutionState.FINISHED, MIDDLE_ID); }); + + waitUntilAllExecutionsDeployed(SINK_ID, scheduler); runInMainThread( () -> { // transition all sinks to finished. @@ -676,6 +693,7 @@ void testPartitionNotFoundTwiceAfterJMFailover() throws Exception { }); // transition all sources to finished. + waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler); runInMainThread( () -> transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID)); @@ -1124,6 +1142,7 @@ private AdaptiveBatchScheduler createScheduler( jobGraph, mainThreadExecutor.getMainThreadExecutor(), EXECUTOR_RESOURCE.getExecutor()) + .setExecutionDeploymentTracker(executionDeploymentTracker) .setRestartBackoffTimeStrategy( new FixedDelayRestartBackoffTimeStrategy .FixedDelayRestartBackoffTimeStrategyFactory(10, 0) @@ -1212,4 +1231,30 @@ public Optional storesLocalResourcesOn() { }; } } + + private void waitUntilAllExecutionsDeployed( + JobVertexID vertexId, AdaptiveBatchScheduler scheduler) throws Exception { + AtomicBoolean isAllExecutionDeployed = new AtomicBoolean(false); + + while (!isAllExecutionDeployed.get()) { + runInMainThread( + () -> { + List attemptIds = + Arrays.stream( + scheduler + .getExecutionJobVertex(vertexId) + .getTaskVertices()) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getAttemptId) + .collect(Collectors.toList()); + if (!attemptIds.isEmpty() + && executionDeploymentTracker + .getDeployedExecutions() + .containsAll(attemptIds)) { + isAllExecutionDeployed.set(true); + } + }); + Thread.sleep(2); + } + } }