From 6f399b646cdd99b93961642228f0570a5f18fc5a Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Tue, 19 Nov 2024 15:00:35 -0800 Subject: [PATCH 1/6] Make the broker round robin selection in task execution configurable --- .../config/constants/ExecutorConfig.java | 14 +++++++++++++- .../executor/ExecutionTaskPlanner.java | 17 +++++++++++------ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java index 7efba8246..137cd8074 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java @@ -559,6 +559,13 @@ public final class ExecutorConfig { public static final String MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_DOC = "The minimum execution progress check interval that users " + "can dynamically set the execution progress check interval to."; + /** + * prefer.broker.roundrobin.in.execution + */ + public static final String PREFER_BROKER_ROUND_ROBIN_CONFIG = "prefer.broker.roundrobin.in.execution"; + public static final boolean DEFAULT_PREFER_BROKER_ROUND_ROBIN = true; + public static final String PREFER_BROKER_ROUND_ROBIN_DOC = "whether to prefer round-robin of brokers in rebalance execution."; + /** * slow.task.alerting.backoff.ms */ @@ -990,6 +997,11 @@ public static ConfigDef define(ConfigDef configDef) { ConfigDef.Type.BOOLEAN, DEFAULT_AUTO_STOP_EXTERNAL_AGENT, ConfigDef.Importance.MEDIUM, - AUTO_STOP_EXTERNAL_AGENT_DOC); + AUTO_STOP_EXTERNAL_AGENT_DOC) + .define(PREFER_BROKER_ROUND_ROBIN_CONFIG, + ConfigDef.Type.BOOLEAN, + DEFAULT_PREFER_BROKER_ROUND_ROBIN, + ConfigDef.Importance.MEDIUM, + PREFER_BROKER_ROUND_ROBIN_DOC); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java index 2f3f671c0..32db67f0d 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java @@ -39,11 +39,14 @@ import org.slf4j.LoggerFactory; import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG; -import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG; -import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG; import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTER_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG; import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTRA_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG; -import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.*; +import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG; +import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.PREFER_BROKER_ROUND_ROBIN_CONFIG; +import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG; +import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION; +import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION; +import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.LEADER_ACTION; import static org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; /** @@ -80,6 +83,7 @@ public class ExecutionTaskPlanner { private final long _taskExecutionAlertingThresholdMs; private final double _interBrokerReplicaMovementRateAlertingThreshold; private final double _intraBrokerReplicaMovementRateAlertingThreshold; + private final boolean _preferRoundRobin; private static final int PRIORITIZE_BROKER_1 = -1; private static final int PRIORITIZE_BROKER_2 = 1; private static final int PRIORITIZE_NONE = 0; @@ -120,6 +124,7 @@ public ExecutionTaskPlanner(AdminClient adminClient, KafkaCruiseControlConfig co _defaultReplicaMovementTaskStrategy = _defaultReplicaMovementTaskStrategy.chainBaseReplicaMovementStrategyIfAbsent(); } + _preferRoundRobin = config.getBoolean(PREFER_BROKER_ROUND_ROBIN_CONFIG); } /** @@ -379,7 +384,7 @@ public List getInterBrokerReplicaMovementTasks(Map getInterBrokerReplicaMovementTasks(Map destinationBrokers = task.proposal().replicasToAdd().stream().mapToInt(ReplicaPlacementInfo::brokerId) .boxed().collect(Collectors.toSet()); - if (brokerInvolved.contains(sourceBroker) - || KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers)) { + if (_preferRoundRobin && (brokerInvolved.contains(sourceBroker) + || KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers))) { continue; } TopicPartition tp = task.proposal().topicPartition(); From 90829f3f633edf4ade8690b89fa757d391b2c208 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Fri, 22 Nov 2024 18:04:16 -0800 Subject: [PATCH 2/6] WIP --- .../executor/ExecutionTaskPlanner.java | 8 ++-- .../cruisecontrol/executor/Executor.java | 6 ++- .../async/runnable/RemoveBrokersRunnable.java | 7 ++- .../executor/ExecutionTaskPlannerTest.java | 45 +++++++++++++++++++ 4 files changed, 60 insertions(+), 6 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java index 32db67f0d..8b6f7137a 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java @@ -388,7 +388,7 @@ public List getInterBrokerReplicaMovementTasks(Map proposalsForBroker = _interPartMoveTasksByBrokerId.get(brokerId); + SortedSet proposalsForBroker = new TreeSet<>(_interPartMoveTasksByBrokerId.get(brokerId)); LOG.trace("Execution task for broker {} are {}", brokerId, proposalsForBroker); for (ExecutionTask task : proposalsForBroker) { // Break if max cap reached @@ -434,8 +434,10 @@ public List getInterBrokerReplicaMovementTasks(Map proposals = new ArrayList<>(); + proposals.add(_rf4PartitionMovement0); + proposals.add(_rf4PartitionMovement1); + proposals.add(_rf4PartitionMovement2); + proposals.add(_rf4PartitionMovement3); + // Test PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy execution strategies. + // Create prioritizeOneAboveMinIsrMovementPlanner, chain after prioritizeMinIsr strategy + Properties prioritizeOneAboveMinIsrMovementProps = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties(); + prioritizeOneAboveMinIsrMovementProps.setProperty(ExecutorConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG, + String.format("%s,%s", PrioritizeMinIsrWithOfflineReplicasStrategy.class.getName(), + PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy.class.getName())); + prioritizeOneAboveMinIsrMovementProps.setProperty(ExecutorConfig.PREFER_BROKER_ROUND_ROBIN_CONFIG, "false"); + ExecutionTaskPlanner prioritizeOneAboveMinIsrMovementPlanner + = new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(prioritizeOneAboveMinIsrMovementProps)); + + Set partitions = new HashSet<>(); + partitions.add(generatePartitionInfo(_rf4PartitionMovement0, false)); + partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement1, 2)); + partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement2, 3)); + partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement3, 1)); + + Cluster expectedCluster = new Cluster(null, _rf4ExpectedNodes, partitions, Collections.emptySet(), Collections.emptySet()); + // Setting topic min ISR to 2 + Map minIsrWithTimeByTopic + = Collections.singletonMap(TOPIC3, new MinIsrWithTime((short) 2, 0)); + StrategyOptions strategyOptions = new StrategyOptions.Builder(expectedCluster).minIsrWithTimeByTopic(minIsrWithTimeByTopic).build(); + + Map readyBrokers = new HashMap<>(); + readyBrokers.put(0, 5); + readyBrokers.put(1, 6); + readyBrokers.put(2, 6); + readyBrokers.put(3, 6); + readyBrokers.put(4, 5); + readyBrokers.put(5, 6); + prioritizeOneAboveMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null); + List partitionMovementTasks + = prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap); + assertEquals("First task", _rf4PartitionMovement2, partitionMovementTasks.get(0).proposal()); + assertEquals("Second task", _rf4PartitionMovement1, partitionMovementTasks.get(1).proposal()); + assertEquals("Third task", _rf4PartitionMovement3, partitionMovementTasks.get(2).proposal()); + assertEquals("Fourth task", _rf4PartitionMovement0, partitionMovementTasks.get(3).proposal()); + } + @Test public void testDynamicConfigReplicaMovementStrategy() { List proposals = new ArrayList<>(); From eb899fa55571d48ae8e2138dece33b051f1b7745 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Mon, 25 Nov 2024 13:41:47 -0800 Subject: [PATCH 3/6] Add execution task statistics and fix junit test --- .../executor/ExecutionProposal.java | 24 ++++++++++++++--- .../executor/ExecutionTaskManager.java | 4 +++ .../executor/ExecutionTaskPlanner.java | 24 +++++++++++++++++ .../cruisecontrol/executor/Executor.java | 27 ++++++++++++++++++- .../cruisecontrol/executor/ExecutorTest.java | 1 + 5 files changed, 75 insertions(+), 5 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java index eeb13ba59..966a3f414 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionProposal.java @@ -44,6 +44,8 @@ public class ExecutionProposal { private final Set _replicasToRemove; // Replicas to move between disks are the replicas which are to be hosted by a different disk of the same broker. private final Map _replicasToMoveBetweenDisksByBroker; + private final Set _oldReplicasSet; + private final Set _newReplicasSet; /** * Construct an execution proposals. @@ -69,10 +71,10 @@ public ExecutionProposal(TopicPartition tp, validate(); // Populate replicas to add, to remove and to move across disk. - Set newBrokerList = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet()); - Set oldBrokerList = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet()); - _replicasToAdd = _newReplicas.stream().filter(r -> !oldBrokerList.contains(r.brokerId())).collect(Collectors.toSet()); - _replicasToRemove = _oldReplicas.stream().filter(r -> !newBrokerList.contains(r.brokerId())).collect(Collectors.toSet()); + _newReplicasSet = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet()); + _oldReplicasSet = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet()); + _replicasToAdd = _newReplicas.stream().filter(r -> !_oldReplicasSet.contains(r.brokerId())).collect(Collectors.toSet()); + _replicasToRemove = _oldReplicas.stream().filter(r -> !_newReplicasSet.contains(r.brokerId())).collect(Collectors.toSet()); _replicasToMoveBetweenDisksByBroker = new HashMap<>(); newReplicas.stream().filter(r -> !_replicasToAdd.contains(r) && !_oldReplicas.contains(r)) .forEach(r -> _replicasToMoveBetweenDisksByBroker.put(r.brokerId(), r)); @@ -177,6 +179,20 @@ public List oldReplicas() { return Collections.unmodifiableList(_oldReplicas); } + /** + * @return The broker ID set of the partitions before executing the proposal. + */ + public Set oldReplicasBrokerIdSet() { + return Collections.unmodifiableSet(_oldReplicasSet); + } + + /** + * @return The broker ID set of the partitions after executing the proposal. + */ + public Set newReplicasBrokerIdSet() { + return Collections.unmodifiableSet(_newReplicasSet); + } + /** * @return The new replica list fo the partition after executing the proposal. */ diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java index dda14e9a4..66c8dbc12 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java @@ -156,6 +156,10 @@ public synchronized void addExecutionProposals(Collection pro } } + Map getSotedBrokerIdToInterBrokerMoveTaskCountMap() { + return _executionTaskPlanner.getSortedBrokerIdToInterBrokerMoveTaskCountMap(); + } + /** * Set the execution mode of the tasks to keep track of the ongoing execution mode via sensors. * diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java index 8b6f7137a..d7c120255 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.LinkedHashMap; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -544,4 +545,27 @@ private Comparator brokerComparator(StrategyOptions strategyOptions, Re : broker1 - broker2; }; } + + Map getSortedBrokerIdToInterBrokerMoveTaskCountMap() { + if (_interPartMoveTasksByBrokerId == null || _interPartMoveTasksByBrokerId.isEmpty()) { + return Collections.emptyMap(); + } + Map resultMap = _interPartMoveTasksByBrokerId.entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().size() + )) + .entrySet() + .stream() + .sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue())) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (e1, e2) -> e1, + // maintain the order of the sorted map. + LinkedHashMap::new + )); + return resultMap; + } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 56cbc85bd..bdf8c82ff 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -831,6 +831,22 @@ public synchronized void executeProposals(Collection proposal requestedIntraBrokerPartitionMovementConcurrency, requestedClusterLeadershipMovementConcurrency, requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor); + if (removedBrokers != null && !removedBrokers.isEmpty()) { + int count = 0; + int totalCount = 0; + for (ExecutionProposal proposal: proposals) { + Set oldBrokers = proposal.oldReplicasBrokerIdSet(); + Set newBrokers = proposal.newReplicasBrokerIdSet(); + if (!oldBrokers.equals(newBrokers)) { + // Only count the proposals that involve partition movement. + totalCount++; + if (oldBrokers.stream().anyMatch(removedBrokers::contains) || newBrokers.stream().anyMatch(removedBrokers::contains)) { + count++; + } + } + } + LOG.info("User task {}: {} of partition move proposals are related to removed brokers.", uuid, ((float) count) / totalCount); + } startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest); } catch (Exception e) { if (e instanceof OngoingExecutionException) { @@ -1382,7 +1398,10 @@ public void run() { if (_executionException != null) { LOG.info("User task {}: Execution failed: {}. Exception: {}", _uuid, executionStatusString, _executionException.getMessage()); } else { - String status = userTaskInfo.state() == COMPLETED ? "succeeded" : userTaskInfo.state().toString(); + String status = "succeeded"; + if (userTaskInfo != null && userTaskInfo.state() != COMPLETED) { + status = userTaskInfo.state().toString(); + } LOG.info("User task {}: Execution {}: {}. ", _uuid, status, executionStatusString); } // Clear completed execution. @@ -1613,6 +1632,12 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements); + Map map = _executionTaskManager.getSotedBrokerIdToInterBrokerMoveTaskCountMap(); + LOG.info("User task {}: Broker Id to Execution Task Count Map: {}", _uuid, map); + if (!map.isEmpty()) { + LOG.info("User task {}: Degree of task count skew towards the largest single broker", _uuid, + map.entrySet().iterator().next().getValue() / (float) numTotalPartitionMovements); + } int partitionsToMove = numTotalPartitionMovements; // Exhaust all the pending partition movements. diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java index 5f5f3e3a4..85385d252 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java @@ -746,6 +746,7 @@ private static UserTaskManager.UserTaskInfo getMockUserTaskInfo() { UserTaskManager.UserTaskInfo mockUserTaskInfo = EasyMock.mock(UserTaskManager.UserTaskInfo.class); // Run it any times to enable consecutive executions in tests. EasyMock.expect(mockUserTaskInfo.requestUrl()).andReturn("mock-request").anyTimes(); + expect(mockUserTaskInfo.state()).andReturn(UserTaskManager.TaskState.COMPLETED).anyTimes(); return mockUserTaskInfo; } From 7eb541d49e9b61bf037515747164390c0be76539 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Mon, 25 Nov 2024 16:18:48 -0800 Subject: [PATCH 4/6] Cover new method in junit test --- .../cruisecontrol/executor/ExecutionTaskPlanner.java | 7 +++++++ .../cruisecontrol/executor/ExecutionTaskPlannerTest.java | 8 ++++++++ 2 files changed, 15 insertions(+) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java index d7c120255..b86773394 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java @@ -568,4 +568,11 @@ Map getSortedBrokerIdToInterBrokerMoveTaskCountMap() { )); return resultMap; } + + /* + * Package private for testing. + */ + Map> getInterPartMoveTasksByBrokerId() { + return _interPartMoveTasksByBrokerId; + } } diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java index 41bca12ef..b03253cc6 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlannerTest.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.SortedSet; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult; import org.apache.kafka.common.Cluster; @@ -353,6 +354,13 @@ public void testGetInterBrokerPartitionMovementWithMinIsrTasks() { readyBrokers.put(4, 5); readyBrokers.put(5, 6); prioritizeOneAboveMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null); + Map countMap = prioritizeOneAboveMinIsrMovementPlanner.getSortedBrokerIdToInterBrokerMoveTaskCountMap(); + Map> taskMap = prioritizeOneAboveMinIsrMovementPlanner.getInterPartMoveTasksByBrokerId(); + for (Map.Entry entry : countMap.entrySet()) { + int brokerId = entry.getKey(); + int count = entry.getValue(); + assertEquals(taskMap.get(brokerId).size(), count); + } List partitionMovementTasks = prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap); assertEquals("First task", _rf4PartitionMovement2, partitionMovementTasks.get(0).proposal()); From e020671ea10b85956c206259f1825b9eb6fc32e8 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Mon, 25 Nov 2024 17:10:03 -0800 Subject: [PATCH 5/6] Fix typo --- .../kafka/cruisecontrol/executor/ExecutionTaskManager.java | 2 +- .../com/linkedin/kafka/cruisecontrol/executor/Executor.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java index 66c8dbc12..00aa085c3 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskManager.java @@ -156,7 +156,7 @@ public synchronized void addExecutionProposals(Collection pro } } - Map getSotedBrokerIdToInterBrokerMoveTaskCountMap() { + Map getSortedBrokerIdToInterBrokerMoveTaskCountMap() { return _executionTaskPlanner.getSortedBrokerIdToInterBrokerMoveTaskCountMap(); } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index bdf8c82ff..ef4d3aa0f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -1632,7 +1632,7 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements); - Map map = _executionTaskManager.getSotedBrokerIdToInterBrokerMoveTaskCountMap(); + Map map = _executionTaskManager.getSortedBrokerIdToInterBrokerMoveTaskCountMap(); LOG.info("User task {}: Broker Id to Execution Task Count Map: {}", _uuid, map); if (!map.isEmpty()) { LOG.info("User task {}: Degree of task count skew towards the largest single broker", _uuid, From b2617c0db70ef648f33eb2751dd67f26842f178b Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Mon, 2 Dec 2024 11:08:31 -0800 Subject: [PATCH 6/6] Incorporate review comments --- .../executor/ExecutionTaskPlanner.java | 2 + .../cruisecontrol/executor/Executor.java | 43 +++++++++++++------ 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java index b86773394..5a400faa3 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionTaskPlanner.java @@ -389,6 +389,8 @@ public List getInterBrokerReplicaMovementTasks(Map proposalsForBroker = new TreeSet<>(_interPartMoveTasksByBrokerId.get(brokerId)); LOG.trace("Execution task for broker {} are {}", brokerId, proposalsForBroker); for (ExecutionTask task : proposalsForBroker) { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index ef4d3aa0f..06c233ab5 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -832,20 +832,8 @@ public synchronized void executeProposals(Collection proposal requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor); if (removedBrokers != null && !removedBrokers.isEmpty()) { - int count = 0; - int totalCount = 0; - for (ExecutionProposal proposal: proposals) { - Set oldBrokers = proposal.oldReplicasBrokerIdSet(); - Set newBrokers = proposal.newReplicasBrokerIdSet(); - if (!oldBrokers.equals(newBrokers)) { - // Only count the proposals that involve partition movement. - totalCount++; - if (oldBrokers.stream().anyMatch(removedBrokers::contains) || newBrokers.stream().anyMatch(removedBrokers::contains)) { - count++; - } - } - } - LOG.info("User task {}: {} of partition move proposals are related to removed brokers.", uuid, ((float) count) / totalCount); + int[] numTasks = getNumTasksUnrelatedToBrokerRemoval(removedBrokers, proposals); + LOG.info("User task {}: {} of {} partition move proposals are unrelated to removed brokers.", uuid, numTasks[0], numTasks[1]); } startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest); } catch (Exception e) { @@ -859,6 +847,32 @@ public synchronized void executeProposals(Collection proposal } } + /** + * Get the number of tasks unrelated to broker removal. + * @param removedBrokers removed brokers + * @param proposals proposals to execute + * @return an array of two integers, the first one is the number of tasks unrelated to broker removal, + * the second one is the total number of tasks. + */ + private int[] getNumTasksUnrelatedToBrokerRemoval(Set removedBrokers, Collection proposals) { + int[] numTasks = new int[2]; + int unrelatedCount = 0; + int totalCount = 0; + for (ExecutionProposal proposal: proposals) { + Set oldBrokers = proposal.oldReplicasBrokerIdSet(); + Set newBrokers = proposal.newReplicasBrokerIdSet(); + if (!oldBrokers.equals(newBrokers)) { + totalCount++; + if (oldBrokers.stream().noneMatch(removedBrokers::contains) && newBrokers.stream().noneMatch(removedBrokers::contains)) { + unrelatedCount++; + } + } + } + numTasks[0] = unrelatedCount; + numTasks[1] = totalCount; + return numTasks; + } + private void sanityCheckExecuteProposals(LoadMonitor loadMonitor, String uuid) throws OngoingExecutionException { if (_hasOngoingExecution) { throw new OngoingExecutionException("Cannot execute new proposals while there is an ongoing execution."); @@ -1400,6 +1414,7 @@ public void run() { } else { String status = "succeeded"; if (userTaskInfo != null && userTaskInfo.state() != COMPLETED) { + // The task may be in state of COMPLETED_WITH_ERROR if the user requested to stop the execution. status = userTaskInfo.state().toString(); } LOG.info("User task {}: Execution {}: {}. ", _uuid, status, executionStatusString);