diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java
index 7efba8246..137cd8074 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java
@@ -559,6 +559,13 @@ public final class ExecutorConfig {
public static final String MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_DOC = "The minimum execution progress check interval that users "
+ "can dynamically set the execution progress check interval to.";
+ /**
+ * prefer.broker.roundrobin.in.execution
+ */
+ public static final String PREFER_BROKER_ROUND_ROBIN_CONFIG = "prefer.broker.roundrobin.in.execution";
+ public static final boolean DEFAULT_PREFER_BROKER_ROUND_ROBIN = true;
+ public static final String PREFER_BROKER_ROUND_ROBIN_DOC = "whether to prefer round-robin of brokers in rebalance execution.";
+
/**
* slow.task.alerting.backoff.ms
*/
@@ -990,6 +997,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.BOOLEAN,
DEFAULT_AUTO_STOP_EXTERNAL_AGENT,
ConfigDef.Importance.MEDIUM,
- AUTO_STOP_EXTERNAL_AGENT_DOC);
+ AUTO_STOP_EXTERNAL_AGENT_DOC)
+ .define(PREFER_BROKER_ROUND_ROBIN_CONFIG,
+ ConfigDef.Type.BOOLEAN,
+ DEFAULT_PREFER_BROKER_ROUND_ROBIN,
+ ConfigDef.Importance.MEDIUM,
+ PREFER_BROKER_ROUND_ROBIN_DOC);
}
}
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java
index eeb13ba59..966a3f414 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java
@@ -44,6 +44,8 @@ public class ExecutionProposal {
private final Set _replicasToRemove;
// Replicas to move between disks are the replicas which are to be hosted by a different disk of the same broker.
private final Map _replicasToMoveBetweenDisksByBroker;
+ private final Set _oldReplicasSet;
+ private final Set _newReplicasSet;
/**
* Construct an execution proposals.
@@ -69,10 +71,10 @@ public ExecutionProposal(TopicPartition tp,
validate();
// Populate replicas to add, to remove and to move across disk.
- Set newBrokerList = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
- Set oldBrokerList = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
- _replicasToAdd = _newReplicas.stream().filter(r -> !oldBrokerList.contains(r.brokerId())).collect(Collectors.toSet());
- _replicasToRemove = _oldReplicas.stream().filter(r -> !newBrokerList.contains(r.brokerId())).collect(Collectors.toSet());
+ _newReplicasSet = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
+ _oldReplicasSet = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
+ _replicasToAdd = _newReplicas.stream().filter(r -> !_oldReplicasSet.contains(r.brokerId())).collect(Collectors.toSet());
+ _replicasToRemove = _oldReplicas.stream().filter(r -> !_newReplicasSet.contains(r.brokerId())).collect(Collectors.toSet());
_replicasToMoveBetweenDisksByBroker = new HashMap<>();
newReplicas.stream().filter(r -> !_replicasToAdd.contains(r) && !_oldReplicas.contains(r))
.forEach(r -> _replicasToMoveBetweenDisksByBroker.put(r.brokerId(), r));
@@ -177,6 +179,20 @@ public List oldReplicas() {
return Collections.unmodifiableList(_oldReplicas);
}
+ /**
+ * @return The broker ID set of the partitions before executing the proposal.
+ */
+ public Set oldReplicasBrokerIdSet() {
+ return Collections.unmodifiableSet(_oldReplicasSet);
+ }
+
+ /**
+ * @return The broker ID set of the partitions after executing the proposal.
+ */
+ public Set newReplicasBrokerIdSet() {
+ return Collections.unmodifiableSet(_newReplicasSet);
+ }
+
/**
* @return The new replica list fo the partition after executing the proposal.
*/
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java
index dda14e9a4..00aa085c3 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java
@@ -156,6 +156,10 @@ public synchronized void addExecutionProposals(Collection pro
}
}
+ Map getSortedBrokerIdToInterBrokerMoveTaskCountMap() {
+ return _executionTaskPlanner.getSortedBrokerIdToInterBrokerMoveTaskCountMap();
+ }
+
/**
* Set the execution mode of the tasks to keep track of the ongoing execution mode via sensors.
*
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java
index 2f3f671c0..5a400faa3 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java
@@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.LinkedHashMap;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -39,11 +40,14 @@
import org.slf4j.LoggerFactory;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG;
-import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG;
-import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTER_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTRA_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG;
-import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.*;
+import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG;
+import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.PREFER_BROKER_ROUND_ROBIN_CONFIG;
+import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG;
+import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION;
+import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION;
+import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.LEADER_ACTION;
import static org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
/**
@@ -80,6 +84,7 @@ public class ExecutionTaskPlanner {
private final long _taskExecutionAlertingThresholdMs;
private final double _interBrokerReplicaMovementRateAlertingThreshold;
private final double _intraBrokerReplicaMovementRateAlertingThreshold;
+ private final boolean _preferRoundRobin;
private static final int PRIORITIZE_BROKER_1 = -1;
private static final int PRIORITIZE_BROKER_2 = 1;
private static final int PRIORITIZE_NONE = 0;
@@ -120,6 +125,7 @@ public ExecutionTaskPlanner(AdminClient adminClient, KafkaCruiseControlConfig co
_defaultReplicaMovementTaskStrategy = _defaultReplicaMovementTaskStrategy.chainBaseReplicaMovementStrategyIfAbsent();
}
+ _preferRoundRobin = config.getBoolean(PREFER_BROKER_ROUND_ROBIN_CONFIG);
}
/**
@@ -379,11 +385,13 @@ public List getInterBrokerReplicaMovementTasks(Map proposalsForBroker = _interPartMoveTasksByBrokerId.get(brokerId);
+ // Make a TreeSet copy of the proposals for this broker to avoid ConcurrentModificationException and
+ // keep the same order of proposals.
+ SortedSet proposalsForBroker = new TreeSet<>(_interPartMoveTasksByBrokerId.get(brokerId));
LOG.trace("Execution task for broker {} are {}", brokerId, proposalsForBroker);
for (ExecutionTask task : proposalsForBroker) {
// Break if max cap reached
@@ -398,8 +406,8 @@ public List getInterBrokerReplicaMovementTasks(Map destinationBrokers = task.proposal().replicasToAdd().stream().mapToInt(ReplicaPlacementInfo::brokerId)
.boxed().collect(Collectors.toSet());
- if (brokerInvolved.contains(sourceBroker)
- || KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers)) {
+ if (_preferRoundRobin && (brokerInvolved.contains(sourceBroker)
+ || KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers))) {
continue;
}
TopicPartition tp = task.proposal().topicPartition();
@@ -429,8 +437,10 @@ public List getInterBrokerReplicaMovementTasks(Map brokerComparator(StrategyOptions strategyOptions, Re
: broker1 - broker2;
};
}
+
+ Map getSortedBrokerIdToInterBrokerMoveTaskCountMap() {
+ if (_interPartMoveTasksByBrokerId == null || _interPartMoveTasksByBrokerId.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ Map resultMap = _interPartMoveTasksByBrokerId.entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> entry.getValue().size()
+ ))
+ .entrySet()
+ .stream()
+ .sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue,
+ (e1, e2) -> e1,
+ // maintain the order of the sorted map.
+ LinkedHashMap::new
+ ));
+ return resultMap;
+ }
+
+ /*
+ * Package private for testing.
+ */
+ Map> getInterPartMoveTasksByBrokerId() {
+ return _interPartMoveTasksByBrokerId;
+ }
}
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java
index f8100cde8..06c233ab5 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java
@@ -71,6 +71,7 @@
import static com.linkedin.kafka.cruisecontrol.executor.ExecutorAdminUtils.*;
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils.UNIT_INTERVAL_TO_PERCENTAGE;
import static com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler.SamplingMode.*;
+import static com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager.TaskState.COMPLETED;
import static org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;
@@ -830,6 +831,10 @@ public synchronized void executeProposals(Collection proposal
requestedIntraBrokerPartitionMovementConcurrency, requestedClusterLeadershipMovementConcurrency,
requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy,
isTriggeredByUserRequest, loadMonitor);
+ if (removedBrokers != null && !removedBrokers.isEmpty()) {
+ int[] numTasks = getNumTasksUnrelatedToBrokerRemoval(removedBrokers, proposals);
+ LOG.info("User task {}: {} of {} partition move proposals are unrelated to removed brokers.", uuid, numTasks[0], numTasks[1]);
+ }
startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest);
} catch (Exception e) {
if (e instanceof OngoingExecutionException) {
@@ -842,6 +847,32 @@ public synchronized void executeProposals(Collection proposal
}
}
+ /**
+ * Get the number of tasks unrelated to broker removal.
+ * @param removedBrokers removed brokers
+ * @param proposals proposals to execute
+ * @return an array of two integers, the first one is the number of tasks unrelated to broker removal,
+ * the second one is the total number of tasks.
+ */
+ private int[] getNumTasksUnrelatedToBrokerRemoval(Set removedBrokers, Collection proposals) {
+ int[] numTasks = new int[2];
+ int unrelatedCount = 0;
+ int totalCount = 0;
+ for (ExecutionProposal proposal: proposals) {
+ Set oldBrokers = proposal.oldReplicasBrokerIdSet();
+ Set newBrokers = proposal.newReplicasBrokerIdSet();
+ if (!oldBrokers.equals(newBrokers)) {
+ totalCount++;
+ if (oldBrokers.stream().noneMatch(removedBrokers::contains) && newBrokers.stream().noneMatch(removedBrokers::contains)) {
+ unrelatedCount++;
+ }
+ }
+ }
+ numTasks[0] = unrelatedCount;
+ numTasks[1] = totalCount;
+ return numTasks;
+ }
+
private void sanityCheckExecuteProposals(LoadMonitor loadMonitor, String uuid) throws OngoingExecutionException {
if (_hasOngoingExecution) {
throw new OngoingExecutionException("Cannot execute new proposals while there is an ongoing execution.");
@@ -1362,8 +1393,8 @@ private class ProposalExecutionRunnable implements Runnable {
public void run() {
LOG.info("User task {}: Starting executing balancing proposals.", _uuid);
final long start = System.currentTimeMillis();
+ UserTaskManager.UserTaskInfo userTaskInfo = initExecution();
try {
- UserTaskManager.UserTaskInfo userTaskInfo = initExecution();
execute(userTaskInfo);
} catch (Exception e) {
LOG.error("User task {}: ProposalExecutionRunnable got exception during run", _uuid, e);
@@ -1381,7 +1412,12 @@ public void run() {
if (_executionException != null) {
LOG.info("User task {}: Execution failed: {}. Exception: {}", _uuid, executionStatusString, _executionException.getMessage());
} else {
- LOG.info("User task {}: Execution succeeded: {}. ", _uuid, executionStatusString);
+ String status = "succeeded";
+ if (userTaskInfo != null && userTaskInfo.state() != COMPLETED) {
+ // The task may be in state of COMPLETED_WITH_ERROR if the user requested to stop the execution.
+ status = userTaskInfo.state().toString();
+ }
+ LOG.info("User task {}: Execution {}: {}. ", _uuid, status, executionStatusString);
}
// Clear completed execution.
clearCompletedExecution();
@@ -1611,6 +1647,12 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB();
long startTime = System.currentTimeMillis();
LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements);
+ Map map = _executionTaskManager.getSortedBrokerIdToInterBrokerMoveTaskCountMap();
+ LOG.info("User task {}: Broker Id to Execution Task Count Map: {}", _uuid, map);
+ if (!map.isEmpty()) {
+ LOG.info("User task {}: Degree of task count skew towards the largest single broker", _uuid,
+ map.entrySet().iterator().next().getValue() / (float) numTotalPartitionMovements);
+ }
int partitionsToMove = numTotalPartitionMovements;
// Exhaust all the pending partition movements.
diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java
index 16e05b96e..ef41c42b7 100644
--- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java
+++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java
@@ -96,7 +96,12 @@ public RemoveBrokersRunnable(KafkaCruiseControl kafkaCruiseControl,
@Override
protected OptimizationResult getResult() throws Exception {
- return new OptimizationResult(computeResult(), _kafkaCruiseControl.config());
+ try {
+ return new OptimizationResult(computeResult(), _kafkaCruiseControl.config());
+ } catch (Exception e) {
+ LOG.error("User task {}: failed to remove brokers due to {}", _uuid, e);
+ throw e;
+ }
}
@Override
diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java
index 3a3a4c24d..b03253cc6 100644
--- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java
+++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java
@@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import java.util.SortedSet;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.common.Cluster;
@@ -353,6 +354,13 @@ public void testGetInterBrokerPartitionMovementWithMinIsrTasks() {
readyBrokers.put(4, 5);
readyBrokers.put(5, 6);
prioritizeOneAboveMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null);
+ Map countMap = prioritizeOneAboveMinIsrMovementPlanner.getSortedBrokerIdToInterBrokerMoveTaskCountMap();
+ Map> taskMap = prioritizeOneAboveMinIsrMovementPlanner.getInterPartMoveTasksByBrokerId();
+ for (Map.Entry entry : countMap.entrySet()) {
+ int brokerId = entry.getKey();
+ int count = entry.getValue();
+ assertEquals(taskMap.get(brokerId).size(), count);
+ }
List partitionMovementTasks
= prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap);
assertEquals("First task", _rf4PartitionMovement2, partitionMovementTasks.get(0).proposal());
@@ -361,6 +369,51 @@ public void testGetInterBrokerPartitionMovementWithMinIsrTasks() {
assertEquals("Fourth task", _rf4PartitionMovement0, partitionMovementTasks.get(3).proposal());
}
+ @Test
+ public void testGetInterBrokerPartitionMovementWithMinIsrNoRoundRobinTasks() {
+ List proposals = new ArrayList<>();
+ proposals.add(_rf4PartitionMovement0);
+ proposals.add(_rf4PartitionMovement1);
+ proposals.add(_rf4PartitionMovement2);
+ proposals.add(_rf4PartitionMovement3);
+ // Test PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy execution strategies.
+ // Create prioritizeOneAboveMinIsrMovementPlanner, chain after prioritizeMinIsr strategy
+ Properties prioritizeOneAboveMinIsrMovementProps = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
+ prioritizeOneAboveMinIsrMovementProps.setProperty(ExecutorConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG,
+ String.format("%s,%s", PrioritizeMinIsrWithOfflineReplicasStrategy.class.getName(),
+ PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy.class.getName()));
+ prioritizeOneAboveMinIsrMovementProps.setProperty(ExecutorConfig.PREFER_BROKER_ROUND_ROBIN_CONFIG, "false");
+ ExecutionTaskPlanner prioritizeOneAboveMinIsrMovementPlanner
+ = new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(prioritizeOneAboveMinIsrMovementProps));
+
+ Set partitions = new HashSet<>();
+ partitions.add(generatePartitionInfo(_rf4PartitionMovement0, false));
+ partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement1, 2));
+ partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement2, 3));
+ partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement3, 1));
+
+ Cluster expectedCluster = new Cluster(null, _rf4ExpectedNodes, partitions, Collections.emptySet(), Collections.emptySet());
+ // Setting topic min ISR to 2
+ Map minIsrWithTimeByTopic
+ = Collections.singletonMap(TOPIC3, new MinIsrWithTime((short) 2, 0));
+ StrategyOptions strategyOptions = new StrategyOptions.Builder(expectedCluster).minIsrWithTimeByTopic(minIsrWithTimeByTopic).build();
+
+ Map readyBrokers = new HashMap<>();
+ readyBrokers.put(0, 5);
+ readyBrokers.put(1, 6);
+ readyBrokers.put(2, 6);
+ readyBrokers.put(3, 6);
+ readyBrokers.put(4, 5);
+ readyBrokers.put(5, 6);
+ prioritizeOneAboveMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null);
+ List partitionMovementTasks
+ = prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap);
+ assertEquals("First task", _rf4PartitionMovement2, partitionMovementTasks.get(0).proposal());
+ assertEquals("Second task", _rf4PartitionMovement1, partitionMovementTasks.get(1).proposal());
+ assertEquals("Third task", _rf4PartitionMovement3, partitionMovementTasks.get(2).proposal());
+ assertEquals("Fourth task", _rf4PartitionMovement0, partitionMovementTasks.get(3).proposal());
+ }
+
@Test
public void testDynamicConfigReplicaMovementStrategy() {
List proposals = new ArrayList<>();
diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java
index 5f5f3e3a4..85385d252 100644
--- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java
+++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java
@@ -746,6 +746,7 @@ private static UserTaskManager.UserTaskInfo getMockUserTaskInfo() {
UserTaskManager.UserTaskInfo mockUserTaskInfo = EasyMock.mock(UserTaskManager.UserTaskInfo.class);
// Run it any times to enable consecutive executions in tests.
EasyMock.expect(mockUserTaskInfo.requestUrl()).andReturn("mock-request").anyTimes();
+ expect(mockUserTaskInfo.state()).andReturn(UserTaskManager.TaskState.COMPLETED).anyTimes();
return mockUserTaskInfo;
}