Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the broker round robin selection in task execution configurable #2223

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,13 @@ public final class ExecutorConfig {
public static final String MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_DOC = "The minimum execution progress check interval that users "
+ "can dynamically set the execution progress check interval to.";

/**
* <code>prefer.broker.roundrobin.in.execution</code>
*/
public static final String PREFER_BROKER_ROUND_ROBIN_CONFIG = "prefer.broker.roundrobin.in.execution";
public static final boolean DEFAULT_PREFER_BROKER_ROUND_ROBIN = true;
public static final String PREFER_BROKER_ROUND_ROBIN_DOC = "whether to prefer round-robin of brokers in rebalance execution.";

/**
* <code>slow.task.alerting.backoff.ms</code>
*/
Expand Down Expand Up @@ -990,6 +997,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.BOOLEAN,
DEFAULT_AUTO_STOP_EXTERNAL_AGENT,
ConfigDef.Importance.MEDIUM,
AUTO_STOP_EXTERNAL_AGENT_DOC);
AUTO_STOP_EXTERNAL_AGENT_DOC)
.define(PREFER_BROKER_ROUND_ROBIN_CONFIG,
ConfigDef.Type.BOOLEAN,
DEFAULT_PREFER_BROKER_ROUND_ROBIN,
ConfigDef.Importance.MEDIUM,
PREFER_BROKER_ROUND_ROBIN_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class ExecutionProposal {
private final Set<ReplicaPlacementInfo> _replicasToRemove;
// Replicas to move between disks are the replicas which are to be hosted by a different disk of the same broker.
private final Map<Integer, ReplicaPlacementInfo> _replicasToMoveBetweenDisksByBroker;
private final Set<Integer> _oldReplicasSet;
private final Set<Integer> _newReplicasSet;

/**
* Construct an execution proposals.
Expand All @@ -69,10 +71,10 @@ public ExecutionProposal(TopicPartition tp,
validate();

// Populate replicas to add, to remove and to move across disk.
Set<Integer> newBrokerList = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
Set<Integer> oldBrokerList = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
_replicasToAdd = _newReplicas.stream().filter(r -> !oldBrokerList.contains(r.brokerId())).collect(Collectors.toSet());
_replicasToRemove = _oldReplicas.stream().filter(r -> !newBrokerList.contains(r.brokerId())).collect(Collectors.toSet());
_newReplicasSet = _newReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
_oldReplicasSet = _oldReplicas.stream().mapToInt(ReplicaPlacementInfo::brokerId).boxed().collect(Collectors.toSet());
_replicasToAdd = _newReplicas.stream().filter(r -> !_oldReplicasSet.contains(r.brokerId())).collect(Collectors.toSet());
_replicasToRemove = _oldReplicas.stream().filter(r -> !_newReplicasSet.contains(r.brokerId())).collect(Collectors.toSet());
_replicasToMoveBetweenDisksByBroker = new HashMap<>();
newReplicas.stream().filter(r -> !_replicasToAdd.contains(r) && !_oldReplicas.contains(r))
.forEach(r -> _replicasToMoveBetweenDisksByBroker.put(r.brokerId(), r));
Expand Down Expand Up @@ -177,6 +179,20 @@ public List<ReplicaPlacementInfo> oldReplicas() {
return Collections.unmodifiableList(_oldReplicas);
}

/**
* @return The broker ID set of the partitions before executing the proposal.
*/
public Set<Integer> oldReplicasBrokerIdSet() {
return Collections.unmodifiableSet(_oldReplicasSet);
}

/**
* @return The broker ID set of the partitions after executing the proposal.
*/
public Set<Integer> newReplicasBrokerIdSet() {
return Collections.unmodifiableSet(_newReplicasSet);
}

/**
* @return The new replica list fo the partition after executing the proposal.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public synchronized void addExecutionProposals(Collection<ExecutionProposal> pro
}
}

Map<Integer, Integer> getSortedBrokerIdToInterBrokerMoveTaskCountMap() {
return _executionTaskPlanner.getSortedBrokerIdToInterBrokerMoveTaskCountMap();
}

/**
* Set the execution mode of the tasks to keep track of the ongoing execution mode via sensors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
Expand All @@ -39,11 +40,14 @@
import org.slf4j.LoggerFactory;

import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTER_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.INTRA_BROKER_REPLICA_MOVEMENT_RATE_ALERTING_THRESHOLD_CONFIG;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.*;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.PREFER_BROKER_ROUND_ROBIN_CONFIG;
import static com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType.LEADER_ACTION;
import static org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;

/**
Expand Down Expand Up @@ -80,6 +84,7 @@ public class ExecutionTaskPlanner {
private final long _taskExecutionAlertingThresholdMs;
private final double _interBrokerReplicaMovementRateAlertingThreshold;
private final double _intraBrokerReplicaMovementRateAlertingThreshold;
private final boolean _preferRoundRobin;
private static final int PRIORITIZE_BROKER_1 = -1;
private static final int PRIORITIZE_BROKER_2 = 1;
private static final int PRIORITIZE_NONE = 0;
Expand Down Expand Up @@ -120,6 +125,7 @@ public ExecutionTaskPlanner(AdminClient adminClient, KafkaCruiseControlConfig co

_defaultReplicaMovementTaskStrategy = _defaultReplicaMovementTaskStrategy.chainBaseReplicaMovementStrategyIfAbsent();
}
_preferRoundRobin = config.getBoolean(PREFER_BROKER_ROUND_ROBIN_CONFIG);
}

/**
Expand Down Expand Up @@ -379,11 +385,11 @@ public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integ
break;
}
// If this broker has already involved in this round, skip it.
if (brokerInvolved.contains(brokerId)) {
if (_preferRoundRobin && brokerInvolved.contains(brokerId)) {
continue;
}
// Check the available balancing proposals of this broker to see if we can find one ready to execute.
SortedSet<ExecutionTask> proposalsForBroker = _interPartMoveTasksByBrokerId.get(brokerId);
SortedSet<ExecutionTask> proposalsForBroker = new TreeSet<>(_interPartMoveTasksByBrokerId.get(brokerId));
allenxwang marked this conversation as resolved.
Show resolved Hide resolved
LOG.trace("Execution task for broker {} are {}", brokerId, proposalsForBroker);
for (ExecutionTask task : proposalsForBroker) {
// Break if max cap reached
Expand All @@ -398,8 +404,8 @@ public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integ
int sourceBroker = task.proposal().oldLeader().brokerId();
Set<Integer> destinationBrokers = task.proposal().replicasToAdd().stream().mapToInt(ReplicaPlacementInfo::brokerId)
.boxed().collect(Collectors.toSet());
if (brokerInvolved.contains(sourceBroker)
|| KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers)) {
if (_preferRoundRobin && (brokerInvolved.contains(sourceBroker)
allenxwang marked this conversation as resolved.
Show resolved Hide resolved
|| KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers))) {
continue;
}
TopicPartition tp = task.proposal().topicPartition();
Expand Down Expand Up @@ -429,8 +435,10 @@ public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integ
newTaskAdded = true;
numInProgressPartitions++;
LOG.debug("Found ready task {} for broker {}. Broker concurrency state: {}", task, brokerId, readyBrokers);
// We can stop the check for proposals for this broker because we have found a proposal.
break;
if (_preferRoundRobin) {
// We can stop the check for proposals for this broker because we have found a proposal.
break;
}
}
}
}
Expand Down Expand Up @@ -537,4 +545,34 @@ private Comparator<Integer> brokerComparator(StrategyOptions strategyOptions, Re
: broker1 - broker2;
};
}

Map<Integer, Integer> getSortedBrokerIdToInterBrokerMoveTaskCountMap() {
if (_interPartMoveTasksByBrokerId == null || _interPartMoveTasksByBrokerId.isEmpty()) {
return Collections.emptyMap();
}
Map<Integer, Integer> resultMap = _interPartMoveTasksByBrokerId.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().size()
))
.entrySet()
.stream()
.sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(e1, e2) -> e1,
// maintain the order of the sorted map.
LinkedHashMap::new
));
return resultMap;
}

/*
* Package private for testing.
*/
Map<Integer, SortedSet<ExecutionTask>> getInterPartMoveTasksByBrokerId() {
return _interPartMoveTasksByBrokerId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static com.linkedin.kafka.cruisecontrol.executor.ExecutorAdminUtils.*;
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils.UNIT_INTERVAL_TO_PERCENTAGE;
import static com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler.SamplingMode.*;
import static com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager.TaskState.COMPLETED;
import static org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;

Expand Down Expand Up @@ -830,6 +831,22 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
requestedIntraBrokerPartitionMovementConcurrency, requestedClusterLeadershipMovementConcurrency,
requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy,
isTriggeredByUserRequest, loadMonitor);
if (removedBrokers != null && !removedBrokers.isEmpty()) {
int count = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe extract this snippet as a separate function.

int totalCount = 0;
for (ExecutionProposal proposal: proposals) {
Set<Integer> oldBrokers = proposal.oldReplicasBrokerIdSet();
Set<Integer> newBrokers = proposal.newReplicasBrokerIdSet();
if (!oldBrokers.equals(newBrokers)) {
// Only count the proposals that involve partition movement.
totalCount++;
if (oldBrokers.stream().anyMatch(removedBrokers::contains) || newBrokers.stream().anyMatch(removedBrokers::contains)) {
count++;
}
}
}
LOG.info("User task {}: {} of partition move proposals are related to removed brokers.", uuid, ((float) count) / totalCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to actually log count and total count here, to observe whether this only happens for large scale rebalance or not.

}
startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest);
} catch (Exception e) {
if (e instanceof OngoingExecutionException) {
Expand Down Expand Up @@ -1362,8 +1379,8 @@ private class ProposalExecutionRunnable implements Runnable {
public void run() {
LOG.info("User task {}: Starting executing balancing proposals.", _uuid);
final long start = System.currentTimeMillis();
UserTaskManager.UserTaskInfo userTaskInfo = initExecution();
try {
UserTaskManager.UserTaskInfo userTaskInfo = initExecution();
execute(userTaskInfo);
} catch (Exception e) {
LOG.error("User task {}: ProposalExecutionRunnable got exception during run", _uuid, e);
Expand All @@ -1381,7 +1398,11 @@ public void run() {
if (_executionException != null) {
LOG.info("User task {}: Execution failed: {}. Exception: {}", _uuid, executionStatusString, _executionException.getMessage());
} else {
LOG.info("User task {}: Execution succeeded: {}. ", _uuid, executionStatusString);
String status = "succeeded";
if (userTaskInfo != null && userTaskInfo.state() != COMPLETED) {
status = userTaskInfo.state().toString();
allenxwang marked this conversation as resolved.
Show resolved Hide resolved
}
LOG.info("User task {}: Execution {}: {}. ", _uuid, status, executionStatusString);
}
// Clear completed execution.
clearCompletedExecution();
Expand Down Expand Up @@ -1611,6 +1632,12 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB();
long startTime = System.currentTimeMillis();
LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements);
Map<Integer, Integer> map = _executionTaskManager.getSortedBrokerIdToInterBrokerMoveTaskCountMap();
LOG.info("User task {}: Broker Id to Execution Task Count Map: {}", _uuid, map);
if (!map.isEmpty()) {
LOG.info("User task {}: Degree of task count skew towards the largest single broker", _uuid,
map.entrySet().iterator().next().getValue() / (float) numTotalPartitionMovements);
}

int partitionsToMove = numTotalPartitionMovements;
// Exhaust all the pending partition movements.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ public RemoveBrokersRunnable(KafkaCruiseControl kafkaCruiseControl,

@Override
protected OptimizationResult getResult() throws Exception {
return new OptimizationResult(computeResult(), _kafkaCruiseControl.config());
try {
return new OptimizationResult(computeResult(), _kafkaCruiseControl.config());
} catch (Exception e) {
LOG.error("User task {}: failed to remove brokers due to {}", _uuid, e);
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.common.Cluster;
Expand Down Expand Up @@ -353,6 +354,13 @@ public void testGetInterBrokerPartitionMovementWithMinIsrTasks() {
readyBrokers.put(4, 5);
readyBrokers.put(5, 6);
prioritizeOneAboveMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null);
Map<Integer, Integer> countMap = prioritizeOneAboveMinIsrMovementPlanner.getSortedBrokerIdToInterBrokerMoveTaskCountMap();
Map<Integer, SortedSet<ExecutionTask>> taskMap = prioritizeOneAboveMinIsrMovementPlanner.getInterPartMoveTasksByBrokerId();
for (Map.Entry<Integer, Integer> entry : countMap.entrySet()) {
int brokerId = entry.getKey();
int count = entry.getValue();
assertEquals(taskMap.get(brokerId).size(), count);
}
List<ExecutionTask> partitionMovementTasks
= prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap);
assertEquals("First task", _rf4PartitionMovement2, partitionMovementTasks.get(0).proposal());
Expand All @@ -361,6 +369,51 @@ public void testGetInterBrokerPartitionMovementWithMinIsrTasks() {
assertEquals("Fourth task", _rf4PartitionMovement0, partitionMovementTasks.get(3).proposal());
}

@Test
public void testGetInterBrokerPartitionMovementWithMinIsrNoRoundRobinTasks() {
List<ExecutionProposal> proposals = new ArrayList<>();
proposals.add(_rf4PartitionMovement0);
proposals.add(_rf4PartitionMovement1);
proposals.add(_rf4PartitionMovement2);
proposals.add(_rf4PartitionMovement3);
// Test PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy execution strategies.
// Create prioritizeOneAboveMinIsrMovementPlanner, chain after prioritizeMinIsr strategy
Properties prioritizeOneAboveMinIsrMovementProps = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
prioritizeOneAboveMinIsrMovementProps.setProperty(ExecutorConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG,
String.format("%s,%s", PrioritizeMinIsrWithOfflineReplicasStrategy.class.getName(),
PrioritizeOneAboveMinIsrWithOfflineReplicasStrategy.class.getName()));
prioritizeOneAboveMinIsrMovementProps.setProperty(ExecutorConfig.PREFER_BROKER_ROUND_ROBIN_CONFIG, "false");
ExecutionTaskPlanner prioritizeOneAboveMinIsrMovementPlanner
= new ExecutionTaskPlanner(null, new KafkaCruiseControlConfig(prioritizeOneAboveMinIsrMovementProps));

Set<PartitionInfo> partitions = new HashSet<>();
partitions.add(generatePartitionInfo(_rf4PartitionMovement0, false));
partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement1, 2));
partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement2, 3));
partitions.add(generatePartitionInfoWithUrpHavingOfflineReplica(_rf4PartitionMovement3, 1));

Cluster expectedCluster = new Cluster(null, _rf4ExpectedNodes, partitions, Collections.emptySet(), Collections.emptySet());
// Setting topic min ISR to 2
Map<String, MinIsrWithTime> minIsrWithTimeByTopic
= Collections.singletonMap(TOPIC3, new MinIsrWithTime((short) 2, 0));
StrategyOptions strategyOptions = new StrategyOptions.Builder(expectedCluster).minIsrWithTimeByTopic(minIsrWithTimeByTopic).build();

Map<Integer, Integer> readyBrokers = new HashMap<>();
readyBrokers.put(0, 5);
readyBrokers.put(1, 6);
readyBrokers.put(2, 6);
readyBrokers.put(3, 6);
readyBrokers.put(4, 5);
readyBrokers.put(5, 6);
prioritizeOneAboveMinIsrMovementPlanner.addExecutionProposals(proposals, strategyOptions, null);
List<ExecutionTask> partitionMovementTasks
= prioritizeOneAboveMinIsrMovementPlanner.getInterBrokerReplicaMovementTasks(readyBrokers, Collections.emptySet(), _defaultPartitionsMaxCap);
assertEquals("First task", _rf4PartitionMovement2, partitionMovementTasks.get(0).proposal());
assertEquals("Second task", _rf4PartitionMovement1, partitionMovementTasks.get(1).proposal());
assertEquals("Third task", _rf4PartitionMovement3, partitionMovementTasks.get(2).proposal());
assertEquals("Fourth task", _rf4PartitionMovement0, partitionMovementTasks.get(3).proposal());
}

@Test
public void testDynamicConfigReplicaMovementStrategy() {
List<ExecutionProposal> proposals = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ private static UserTaskManager.UserTaskInfo getMockUserTaskInfo() {
UserTaskManager.UserTaskInfo mockUserTaskInfo = EasyMock.mock(UserTaskManager.UserTaskInfo.class);
// Run it any times to enable consecutive executions in tests.
EasyMock.expect(mockUserTaskInfo.requestUrl()).andReturn("mock-request").anyTimes();
expect(mockUserTaskInfo.state()).andReturn(UserTaskManager.TaskState.COMPLETED).anyTimes();
return mockUserTaskInfo;
}

Expand Down