Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
Expand Down Expand Up @@ -55,7 +54,6 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -296,53 +294,4 @@ private void registerTaskExecutorAndOfferSlots(
.offerSlots(taskManagerLocation.getResourceID(), slotOffers, testingTimeout)
.get();
}

private static class TestingExecutionDeploymentTrackerWrapper
implements ExecutionDeploymentTracker {
private final ExecutionDeploymentTracker originalTracker;
private final CompletableFuture<ExecutionAttemptID> taskDeploymentFuture;
private final CompletableFuture<ExecutionAttemptID> stopFuture;

private TestingExecutionDeploymentTrackerWrapper() {
this(new DefaultExecutionDeploymentTracker());
}

private TestingExecutionDeploymentTrackerWrapper(
ExecutionDeploymentTracker originalTracker) {
this.originalTracker = originalTracker;
this.taskDeploymentFuture = new CompletableFuture<>();
this.stopFuture = new CompletableFuture<>();
}

@Override
public void startTrackingPendingDeploymentOf(
ExecutionAttemptID executionAttemptId, ResourceID host) {
originalTracker.startTrackingPendingDeploymentOf(executionAttemptId, host);
}

@Override
public void completeDeploymentOf(ExecutionAttemptID executionAttemptId) {
originalTracker.completeDeploymentOf(executionAttemptId);
taskDeploymentFuture.complete(executionAttemptId);
}

@Override
public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId) {
originalTracker.stopTrackingDeploymentOf(executionAttemptId);
stopFuture.complete(executionAttemptId);
}

@Override
public Map<ExecutionAttemptID, ExecutionDeploymentState> getExecutionsOn(ResourceID host) {
return originalTracker.getExecutionsOn(host);
}

public CompletableFuture<ExecutionAttemptID> getTaskDeploymentFuture() {
return taskDeploymentFuture;
}

public CompletableFuture<ExecutionAttemptID> getStopFuture() {
return stopFuture;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

/** Testing implementation of the {@link ExecutionDeploymentTracker}. */
public class TestingExecutionDeploymentTrackerWrapper implements ExecutionDeploymentTracker {
private final ExecutionDeploymentTracker originalTracker;
private final CompletableFuture<ExecutionAttemptID> taskDeploymentFuture;
private final CompletableFuture<ExecutionAttemptID> stopFuture;
private final Set<ExecutionAttemptID> deployedExecutions = new HashSet<>();

public TestingExecutionDeploymentTrackerWrapper() {
this(new DefaultExecutionDeploymentTracker());
}

public TestingExecutionDeploymentTrackerWrapper(ExecutionDeploymentTracker originalTracker) {
this.originalTracker = originalTracker;
this.taskDeploymentFuture = new CompletableFuture<>();
this.stopFuture = new CompletableFuture<>();
}

@Override
public void startTrackingPendingDeploymentOf(
ExecutionAttemptID executionAttemptId, ResourceID host) {
originalTracker.startTrackingPendingDeploymentOf(executionAttemptId, host);
}

@Override
public void completeDeploymentOf(ExecutionAttemptID executionAttemptId) {
originalTracker.completeDeploymentOf(executionAttemptId);
taskDeploymentFuture.complete(executionAttemptId);
deployedExecutions.add(executionAttemptId);
}

@Override
public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId) {
originalTracker.stopTrackingDeploymentOf(executionAttemptId);
stopFuture.complete(executionAttemptId);
}

@Override
public Map<ExecutionAttemptID, ExecutionDeploymentState> getExecutionsOn(ResourceID host) {
return originalTracker.getExecutionsOn(host);
}

public CompletableFuture<ExecutionAttemptID> getTaskDeploymentFuture() {
return taskDeploymentFuture;
}

public CompletableFuture<ExecutionAttemptID> getStopFuture() {
return stopFuture;
}

public Set<ExecutionAttemptID> getDeployedExecutions() {
return Collections.unmodifiableSet(deployedExecutions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
Expand Down Expand Up @@ -122,6 +123,8 @@ public class DefaultSchedulerBuilder {
private InputConsumableDecider.Factory inputConsumableDeciderFactory =
AllFinishedInputConsumableDecider.Factory.INSTANCE;
private BatchJobRecoveryHandler jobRecoveryHandler = new DummyBatchJobRecoveryHandler();
private ExecutionDeploymentTracker executionDeploymentTracker =
new DefaultExecutionDeploymentTracker();

public DefaultSchedulerBuilder(
JobGraph jobGraph,
Expand Down Expand Up @@ -301,6 +304,12 @@ public DefaultSchedulerBuilder setJobRecoveryHandler(
return this;
}

public DefaultSchedulerBuilder setExecutionDeploymentTracker(
ExecutionDeploymentTracker executionDeploymentTracker) {
this.executionDeploymentTracker = executionDeploymentTracker;
return this;
}

public DefaultScheduler build() throws Exception {
return new DefaultScheduler(
log,
Expand Down Expand Up @@ -367,7 +376,7 @@ public AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler(boolean enableSpecu
jobManagerJobMetricGroup,
shuffleMaster,
partitionTracker,
new DefaultExecutionDeploymentTracker(),
executionDeploymentTracker,
System.currentTimeMillis(),
mainThreadExecutor,
jobStatusListener,
Expand All @@ -390,7 +399,7 @@ private ExecutionGraphFactory createExecutionGraphFactory(
return new DefaultExecutionGraphFactory(
jobMasterConfiguration,
userCodeLoader,
new DefaultExecutionDeploymentTracker(),
executionDeploymentTracker,
futureExecutor,
ioExecutor,
rpcTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.TestingExecutionDeploymentTrackerWrapper;
import org.apache.flink.runtime.jobmaster.event.ExecutionVertexFinishedEvent;
import org.apache.flink.runtime.jobmaster.event.FileSystemJobEventStore;
import org.apache.flink.runtime.jobmaster.event.JobEvent;
Expand Down Expand Up @@ -159,6 +160,9 @@ public class BatchJobRecoveryTest {
private ScheduledExecutor delayedExecutor =
new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());

private TestingExecutionDeploymentTrackerWrapper executionDeploymentTracker =
new TestingExecutionDeploymentTrackerWrapper();

private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
private static final int NUM_SPLITS = 10;
private static final int SOURCE_PARALLELISM = 5;
Expand Down Expand Up @@ -216,6 +220,7 @@ void setUp() throws IOException {

this.serializedJobGraph = serializeJobGraph(createDefaultJobGraph());
allPartitionWithMetrics.clear();
executionDeploymentTracker = new TestingExecutionDeploymentTrackerWrapper();
}

@AfterEach
Expand All @@ -238,11 +243,14 @@ void testRecoverFromJMFailover() throws Exception {

runInMainThread(scheduler::startScheduling);

waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
runInMainThread(
() -> {
// transition all sources to finished.
transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID);
});

waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler);
runInMainThread(
() -> {
// transition all middle tasks to RUNNING state
Expand Down Expand Up @@ -338,11 +346,14 @@ void testJobVertexUnFinishedAndOperatorCoordinatorNotSupportBatchSnapshot() thro

runInMainThread(scheduler::startScheduling);

waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
runInMainThread(
() -> {
// transition all sources to finished.
transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID);
});

waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler);
runInMainThread(
() -> {
// transition first middle task to finished.
Expand Down Expand Up @@ -451,6 +462,7 @@ void testJobVertexFinishedAndOperatorCoordinatorNotSupportBatchSnapshotAndPartit

runInMainThread(scheduler::startScheduling);

waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
runInMainThread(
() -> {
// transition all sources to finished.
Expand Down Expand Up @@ -495,14 +507,13 @@ void testJobVertexFinishedAndOperatorCoordinatorNotSupportBatchSnapshotAndPartit
}
}

for (ExecutionVertex taskVertex :
getExecutionVertices(MIDDLE_ID, newScheduler.getExecutionGraph())) {
waitUntilExecutionVertexState(taskVertex, ExecutionState.DEPLOYING, 15000L);
}
waitUntilAllExecutionsDeployed(MIDDLE_ID, newScheduler);

waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler);
runInMainThread(
() -> {
// transition all middle tasks to running
transitionExecutionsState(scheduler, ExecutionState.INITIALIZING, MIDDLE_ID);
transitionExecutionsState(scheduler, ExecutionState.RUNNING, MIDDLE_ID);
});

Expand Down Expand Up @@ -539,6 +550,7 @@ void testRecoverFromJMFailoverAndPartitionsUnavailable() throws Exception {

runInMainThread(scheduler::startScheduling);

waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
runInMainThread(
() -> {
// transition all sources to finished.
Expand Down Expand Up @@ -596,15 +608,20 @@ void testRecoverDecidedParallelismFromTheSameJobGraphInstance() throws Exception

runInMainThread(scheduler::startScheduling);

waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
runInMainThread(
() -> {
// transition all sources to finished.
transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID);
});

waitUntilAllExecutionsDeployed(MIDDLE_ID, scheduler);
runInMainThread(
() -> { // transition all middle tasks to finished.
transitionExecutionsState(scheduler, ExecutionState.FINISHED, MIDDLE_ID);
});

waitUntilAllExecutionsDeployed(SINK_ID, scheduler);
runInMainThread(
() -> {
// transition all sinks to finished.
Expand Down Expand Up @@ -676,6 +693,7 @@ void testPartitionNotFoundTwiceAfterJMFailover() throws Exception {
});

// transition all sources to finished.
waitUntilAllExecutionsDeployed(SOURCE_ID, scheduler);
runInMainThread(
() -> transitionExecutionsState(scheduler, ExecutionState.FINISHED, SOURCE_ID));

Expand Down Expand Up @@ -1124,6 +1142,7 @@ private AdaptiveBatchScheduler createScheduler(
jobGraph,
mainThreadExecutor.getMainThreadExecutor(),
EXECUTOR_RESOURCE.getExecutor())
.setExecutionDeploymentTracker(executionDeploymentTracker)
.setRestartBackoffTimeStrategy(
new FixedDelayRestartBackoffTimeStrategy
.FixedDelayRestartBackoffTimeStrategyFactory(10, 0)
Expand Down Expand Up @@ -1212,4 +1231,30 @@ public Optional<ResourceID> storesLocalResourcesOn() {
};
}
}

private void waitUntilAllExecutionsDeployed(
JobVertexID vertexId, AdaptiveBatchScheduler scheduler) throws Exception {
AtomicBoolean isAllExecutionDeployed = new AtomicBoolean(false);

while (!isAllExecutionDeployed.get()) {
runInMainThread(
() -> {
List<ExecutionAttemptID> attemptIds =
Arrays.stream(
scheduler
.getExecutionJobVertex(vertexId)
.getTaskVertices())
.map(ExecutionVertex::getCurrentExecutionAttempt)
.map(Execution::getAttemptId)
.collect(Collectors.toList());
if (!attemptIds.isEmpty()
&& executionDeploymentTracker
.getDeployedExecutions()
.containsAll(attemptIds)) {
isAllExecutionDeployed.set(true);
}
});
Thread.sleep(2);
}
}
}