diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java index 7efba8246..137cd8074 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java @@ -559,6 +559,13 @@ public final class ExecutorConfig { public static final String MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_DOC = "The minimum execution progress check interval that users " + "can dynamically set the execution progress check interval to."; + /** + * prefer.broker.roundrobin.in.execution + */ + public static final String PREFER_BROKER_ROUND_ROBIN_CONFIG = "prefer.broker.roundrobin.in.execution"; + public static final boolean DEFAULT_PREFER_BROKER_ROUND_ROBIN = true; + public static final String PREFER_BROKER_ROUND_ROBIN_DOC = "whether to prefer round-robin of brokers in rebalance execution."; + /** * slow.task.alerting.backoff.ms */ @@ -990,6 +997,11 @@ public static ConfigDef define(ConfigDef configDef) { ConfigDef.Type.BOOLEAN, DEFAULT_AUTO_STOP_EXTERNAL_AGENT, ConfigDef.Importance.MEDIUM, - AUTO_STOP_EXTERNAL_AGENT_DOC); + AUTO_STOP_EXTERNAL_AGENT_DOC) + .define(PREFER_BROKER_ROUND_ROBIN_CONFIG, + ConfigDef.Type.BOOLEAN, + DEFAULT_PREFER_BROKER_ROUND_ROBIN, + ConfigDef.Importance.MEDIUM, + PREFER_BROKER_ROUND_ROBIN_DOC); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java index eeb13ba59..966a3f414 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java @@ -44,6 +44,8 @@ public class ExecutionProposal { private final Set _replicasToRemove; // Replicas to move between disks are the replicas which are to be hosted by a different disk of the same broker. private final Map _replicasToMoveBetweenDisksByBroker; + private final Set _oldReplicasSet; + private final Set _newReplicasSet; /** * Construct an execution proposals. @@ -69,10 +71,10 @@ public ExecutionProposal(TopicPartition tp, validate(); // Populate replicas to add, to remove and to move across disk. - Set newBrokerList = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet()); - Set oldBrokerList = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet()); - _replicasToAdd = _newReplicas.stream().filter(r -> !oldBrokerList.contains(r.brokerId())).collect(Collectors.toSet()); - _replicasToRemove = _oldReplicas.stream().filter(r -> !newBrokerList.contains(r.brokerId())).collect(Collectors.toSet()); + _newReplicasSet = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet()); + _oldReplicasSet = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet()); + _replicasToAdd = _newReplicas.stream().filter(r -> !_oldReplicasSet.contains(r.brokerId())).collect(Collectors.toSet()); + _replicasToRemove = _oldReplicas.stream().filter(r -> !_newReplicasSet.contains(r.brokerId())).collect(Collectors.toSet()); _replicasToMoveBetweenDisksByBroker = new HashMap<>(); newReplicas.stream().filter(r -> !_replicasToAdd.contains(r) && !_oldReplicas.contains(r)) .forEach(r -> _replicasToMoveBetweenDisksByBroker.put(r.brokerId(), r)); @@ -177,6 +179,20 @@ public List oldReplicas() { return Collections.unmodifiableList(_oldReplicas); } + /** + * @return The broker ID set of the partitions before executing the proposal. + */ + public Set oldReplicasBrokerIdSet() { + return Collections.unmodifiableSet(_oldReplicasSet); + } + + /** + * @return The broker ID set of the partitions after executing the proposal. + */ + public Set newReplicasBrokerIdSet() { + return Collections.unmodifiableSet(_newReplicasSet); + } + /** * @return The new replica list fo the partition after executing the proposal. */ diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java index dda14e9a4..00aa085c3 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java @@ -156,6 +156,10 @@ public synchronized void addExecutionProposals(Collection pro } } + Map getSortedBrokerIdToInterBrokerMoveTaskCountMap() { + return _executionTaskPlanner.getSortedBrokerIdToInterBrokerMoveTaskCountMap(); + } + /** * Set the execution mode of the tasks to keep track of the ongoing execution mode via sensors. * diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java index 2f3f671c0..5a400faa3 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.LinkedHashMap; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -39,11 +40,14 @@ import org.slf4j.LoggerFactory; import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG; -import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG; -import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG; import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTER_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG; import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTRA_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG; -import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.*; +import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG; +import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.PREFER_BROKER_ROUND_ROBIN_CONFIG; +import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG; +import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION; +import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION; +import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.LEADER_ACTION; import static org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; /** @@ -80,6 +84,7 @@ public class ExecutionTaskPlanner { private final long _taskExecutionAlertingThresholdMs; private final double _interBrokerReplicaMovementRateAlertingThreshold; private final double _intraBrokerReplicaMovementRateAlertingThreshold; + private final boolean _preferRoundRobin; private static final int PRIORITIZE_BROKER_1 = -1; private static final int PRIORITIZE_BROKER_2 = 1; private static final int PRIORITIZE_NONE = 0; @@ -120,6 +125,7 @@ public ExecutionTaskPlanner(AdminClient adminClient, KafkaCruiseControlConfig co _defaultReplicaMovementTaskStrategy = _defaultReplicaMovementTaskStrategy.chainBaseReplicaMovementStrategyIfAbsent(); } + _preferRoundRobin = config.getBoolean(PREFER_BROKER_ROUND_ROBIN_CONFIG); } /** @@ -379,11 +385,13 @@ public List getInterBrokerReplicaMovementTasks(Map proposalsForBroker = _interPartMoveTasksByBrokerId.get(brokerId); + // Make a TreeSet copy of the proposals for this broker to avoid ConcurrentModificationException and + // keep the same order of proposals. + SortedSet proposalsForBroker = new TreeSet<>(_interPartMoveTasksByBrokerId.get(brokerId)); LOG.trace("Execution task for broker {} are {}", brokerId, proposalsForBroker); for (ExecutionTask task : proposalsForBroker) { // Break if max cap reached @@ -398,8 +406,8 @@ public List getInterBrokerReplicaMovementTasks(Map destinationBrokers = task.proposal().replicasToAdd().stream().mapToInt(ReplicaPlacementInfo::brokerId) .boxed().collect(Collectors.toSet()); - if (brokerInvolved.contains(sourceBroker) - || KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers)) { + if (_preferRoundRobin && (brokerInvolved.contains(sourceBroker) + || KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers))) { continue; } TopicPartition tp = task.proposal().topicPartition(); @@ -429,8 +437,10 @@ public List getInterBrokerReplicaMovementTasks(Map brokerComparator(StrategyOptions strategyOptions, Re : broker1 - broker2; }; } + + Map getSortedBrokerIdToInterBrokerMoveTaskCountMap() { + if (_interPartMoveTasksByBrokerId == null || _interPartMoveTasksByBrokerId.isEmpty()) { + return Collections.emptyMap(); + } + Map resultMap = _interPartMoveTasksByBrokerId.entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().size() + )) + .entrySet() + .stream() + .sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue())) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (e1, e2) -> e1, + // maintain the order of the sorted map. + LinkedHashMap::new + )); + return resultMap; + } + + /* + * Package private for testing. + */ + Map> getInterPartMoveTasksByBrokerId() { + return _interPartMoveTasksByBrokerId; + } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index f8100cde8..06c233ab5 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -71,6 +71,7 @@ import static com.linkedin.kafka.cruisecontrol.executor.ExecutorAdminUtils.*; import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils.UNIT_INTERVAL_TO_PERCENTAGE; import static com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler.SamplingMode.*; +import static com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager.TaskState.COMPLETED; import static org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull; @@ -830,6 +831,10 @@ public synchronized void executeProposals(Collection proposal requestedIntraBrokerPartitionMovementConcurrency, requestedClusterLeadershipMovementConcurrency, requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor); + if (removedBrokers != null && !removedBrokers.isEmpty()) { + int[] numTasks = getNumTasksUnrelatedToBrokerRemoval(removedBrokers, proposals); + LOG.info("User task {}: {} of {} partition move proposals are unrelated to removed brokers.", uuid, numTasks[0], numTasks[1]); + } startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest); } catch (Exception e) { if (e instanceof OngoingExecutionException) { @@ -842,6 +847,32 @@ public synchronized void executeProposals(Collection proposal } } + /** + * Get the number of tasks unrelated to broker removal. + * @param removedBrokers removed brokers + * @param proposals proposals to execute + * @return an array of two integers, the first one is the number of tasks unrelated to broker removal, + * the second one is the total number of tasks. + */ + private int[] getNumTasksUnrelatedToBrokerRemoval(Set removedBrokers, Collection proposals) { + int[] numTasks = new int[2]; + int unrelatedCount = 0; + int totalCount = 0; + for (ExecutionProposal proposal: proposals) { + Set oldBrokers = proposal.oldReplicasBrokerIdSet(); + Set newBrokers = proposal.newReplicasBrokerIdSet(); + if (!oldBrokers.equals(newBrokers)) { + totalCount++; + if (oldBrokers.stream().noneMatch(removedBrokers::contains) && newBrokers.stream().noneMatch(removedBrokers::contains)) { + unrelatedCount++; + } + } + } + numTasks[0] = unrelatedCount; + numTasks[1] = totalCount; + return numTasks; + } + private void sanityCheckExecuteProposals(LoadMonitor loadMonitor, String uuid) throws OngoingExecutionException { if (_hasOngoingExecution) { throw new OngoingExecutionException("Cannot execute new proposals while there is an ongoing execution."); @@ -1362,8 +1393,8 @@ private class ProposalExecutionRunnable implements Runnable { public void run() { LOG.info("User task {}: Starting executing balancing proposals.", _uuid); final long start = System.currentTimeMillis(); + UserTaskManager.UserTaskInfo userTaskInfo = initExecution(); try { - UserTaskManager.UserTaskInfo userTaskInfo = initExecution(); execute(userTaskInfo); } catch (Exception e) { LOG.error("User task {}: ProposalExecutionRunnable got exception during run", _uuid, e); @@ -1381,7 +1412,12 @@ public void run() { if (_executionException != null) { LOG.info("User task {}: Execution failed: {}. Exception: {}", _uuid, executionStatusString, _executionException.getMessage()); } else { - LOG.info("User task {}: Execution succeeded: {}. ", _uuid, executionStatusString); + String status = "succeeded"; + if (userTaskInfo != null && userTaskInfo.state() != COMPLETED) { + // The task may be in state of COMPLETED_WITH_ERROR if the user requested to stop the execution. + status = userTaskInfo.state().toString(); + } + LOG.info("User task {}: Execution {}: {}. ", _uuid, status, executionStatusString); } // Clear completed execution. clearCompletedExecution(); @@ -1611,6 +1647,12 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements); + Map map = _executionTaskManager.getSortedBrokerIdToInterBrokerMoveTaskCountMap(); + LOG.info("User task {}: Broker Id to Execution Task Count Map: {}", _uuid, map); + if (!map.isEmpty()) { + LOG.info("User task {}: Degree of task count skew towards the largest single broker", _uuid, + map.entrySet().iterator().next().getValue() / (float) numTotalPartitionMovements); + } int partitionsToMove = numTotalPartitionMovements; // Exhaust all the pending partition movements. diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java index 16e05b96e..ef41c42b7 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java @@ -96,7 +96,12 @@ public RemoveBrokersRunnable(KafkaCruiseControl kafkaCruiseControl, @Override protected OptimizationResult getResult() throws Exception { - return new OptimizationResult(computeResult(), _kafkaCruiseControl.config()); + try { + return new OptimizationResult(computeResult(), _kafkaCruiseControl.config()); + } catch (Exception e) { + LOG.error("User task {}: failed to remove brokers due to {}", _uuid, e); + throw e; + } } @Override diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java index 3a3a4c24d..b03253cc6 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.SortedSet; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult; import org.apache.kafka.common.Cluster; @@ -353,6 +354,13 @@ public void testGetInterBrokerPartitionMovementWithMinIsrTasks() { readyBrokers.put(4, 5); readyBrokers.put(5, 6); prioritizeOneAboveMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null); + Map countMap = prioritizeOneAboveMinIsrMovementPlanner.getSortedBrokerIdToInterBrokerMoveTaskCountMap(); + Map> taskMap = prioritizeOneAboveMinIsrMovementPlanner.getInterPartMoveTasksByBrokerId(); + for (Map.Entry entry : countMap.entrySet()) { + int brokerId = entry.getKey(); + int count = entry.getValue(); + assertEquals(taskMap.get(brokerId).size(), count); + } List partitionMovementTasks = prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap); assertEquals("First task", _rf4PartitionMovement2, partitionMovementTasks.get(0).proposal()); @@ -361,6 +369,51 @@ public void testGetInterBrokerPartitionMovementWithMinIsrTasks() { assertEquals("Fourth task", _rf4PartitionMovement0, partitionMovementTasks.get(3).proposal()); } + @Test + public void testGetInterBrokerPartitionMovementWithMinIsrNoRoundRobinTasks() { + List proposals = new ArrayList<>(); + proposals.add(_rf4PartitionMovement0); + proposals.add(_rf4PartitionMovement1); + proposals.add(_rf4PartitionMovement2); + proposals.add(_rf4PartitionMovement3); + // Test PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy execution strategies. + // Create prioritizeOneAboveMinIsrMovementPlanner, chain after prioritizeMinIsr strategy + Properties prioritizeOneAboveMinIsrMovementProps = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties(); + prioritizeOneAboveMinIsrMovementProps.setProperty(ExecutorConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG, + String.format("%s,%s", PrioritizeMinIsrWithOfflineReplicasStrategy.class.getName(), + PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy.class.getName())); + prioritizeOneAboveMinIsrMovementProps.setProperty(ExecutorConfig.PREFER_BROKER_ROUND_ROBIN_CONFIG, "false"); + ExecutionTaskPlanner prioritizeOneAboveMinIsrMovementPlanner + = new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(prioritizeOneAboveMinIsrMovementProps)); + + Set partitions = new HashSet<>(); + partitions.add(generatePartitionInfo(_rf4PartitionMovement0, false)); + partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement1, 2)); + partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement2, 3)); + partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement3, 1)); + + Cluster expectedCluster = new Cluster(null, _rf4ExpectedNodes, partitions, Collections.emptySet(), Collections.emptySet()); + // Setting topic min ISR to 2 + Map minIsrWithTimeByTopic + = Collections.singletonMap(TOPIC3, new MinIsrWithTime((short) 2, 0)); + StrategyOptions strategyOptions = new StrategyOptions.Builder(expectedCluster).minIsrWithTimeByTopic(minIsrWithTimeByTopic).build(); + + Map readyBrokers = new HashMap<>(); + readyBrokers.put(0, 5); + readyBrokers.put(1, 6); + readyBrokers.put(2, 6); + readyBrokers.put(3, 6); + readyBrokers.put(4, 5); + readyBrokers.put(5, 6); + prioritizeOneAboveMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null); + List partitionMovementTasks + = prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap); + assertEquals("First task", _rf4PartitionMovement2, partitionMovementTasks.get(0).proposal()); + assertEquals("Second task", _rf4PartitionMovement1, partitionMovementTasks.get(1).proposal()); + assertEquals("Third task", _rf4PartitionMovement3, partitionMovementTasks.get(2).proposal()); + assertEquals("Fourth task", _rf4PartitionMovement0, partitionMovementTasks.get(3).proposal()); + } + @Test public void testDynamicConfigReplicaMovementStrategy() { List proposals = new ArrayList<>(); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java index 5f5f3e3a4..85385d252 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java @@ -746,6 +746,7 @@ private static UserTaskManager.UserTaskInfo getMockUserTaskInfo() { UserTaskManager.UserTaskInfo mockUserTaskInfo = EasyMock.mock(UserTaskManager.UserTaskInfo.class); // Run it any times to enable consecutive executions in tests. EasyMock.expect(mockUserTaskInfo.requestUrl()).andReturn("mock-request").anyTimes(); + expect(mockUserTaskInfo.state()).andReturn(UserTaskManager.TaskState.COMPLETED).anyTimes(); return mockUserTaskInfo; }