-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15045: (KIP-924 pt. 12) Wiring in new assignment configs and logic #16074
Changes from 2 commits
58a3f70
9101cc2
ea41fa9
c3d8e72
906379c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -820,6 +820,9 @@ public class StreamsConfig extends AbstractConfig { | |
+ "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "</code> will " | ||
+ "optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors."; | ||
|
||
@SuppressWarnings("WeakerAccess") | ||
public static final String TASK_ASSIGNOR_CLASS_CONFIG = "task.assignor.class"; | ||
private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <@link TaskAssignor> interface. Defaults to the <@link HighAvailabilityTaskAssignor> class."; | ||
|
||
/** | ||
* {@code topology.optimization} | ||
|
@@ -980,6 +983,11 @@ public class StreamsConfig extends AbstractConfig { | |
null, | ||
Importance.MEDIUM, | ||
RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC) | ||
.define(TASK_ASSIGNOR_CLASS_CONFIG, | ||
Type.STRING, | ||
null, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know it doesn't exist yet, but once we have the new HA assignor then imo we should use that as the default here. Then the logic will be as such:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note: as discussed, the above is the eventual plan, but won't be implemented until we have the new HAAssignor to switch over to. For now, if neither config is set, we'll default to the old config assignor (ie the current/original HA assignor) |
||
Importance.MEDIUM, | ||
TASK_ASSIGNOR_CLASS_DOC) | ||
.define(REPLICATION_FACTOR_CONFIG, | ||
Type.INT, | ||
-1, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -214,6 +214,7 @@ public String toString() { | |
private RebalanceProtocol rebalanceProtocol; | ||
private AssignmentListener assignmentListener; | ||
|
||
private Supplier<Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor>> userTaskAssignorSupplier; | ||
private Supplier<TaskAssignor> taskAssignorSupplier; | ||
private byte uniqueField; | ||
private Map<String, String> clientTags; | ||
|
@@ -248,6 +249,7 @@ public void configure(final Map<String, ?> configs) { | |
internalTopicManager = assignorConfiguration.internalTopicManager(); | ||
copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer(); | ||
rebalanceProtocol = assignorConfiguration.rebalanceProtocol(); | ||
userTaskAssignorSupplier = assignorConfiguration::userTaskAssignor; | ||
taskAssignorSupplier = assignorConfiguration::taskAssignor; | ||
assignmentListener = assignorConfiguration.assignmentListener(); | ||
uniqueField = 0; | ||
|
@@ -760,23 +762,39 @@ private boolean assignTasksToClients(final Cluster fullMetadata, | |
log.debug("Assigning tasks and {} standby replicas to client nodes {}", | ||
numStandbyReplicas(), clientStates); | ||
|
||
final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful); | ||
|
||
final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor( | ||
fullMetadata, | ||
partitionsForTask, | ||
changelogTopics.changelogPartionsForTask(), | ||
tasksForTopicGroup, | ||
racksForProcessConsumer, | ||
internalTopicManager, | ||
assignmentConfigs, | ||
time | ||
); | ||
final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates, | ||
allTasks, | ||
statefulTasks, | ||
rackAwareTaskAssignor, | ||
assignmentConfigs); | ||
final Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor> userTaskAssignor = | ||
userTaskAssignorSupplier.get(); | ||
boolean probingRebalanceNeeded; | ||
if (userTaskAssignor.isPresent()) { | ||
final ApplicationState applicationState = buildApplicationState( | ||
taskManager.topologyMetadata(), | ||
clientMetadataMap, | ||
topicGroups, | ||
fullMetadata | ||
); | ||
final TaskAssignment taskAssignment = userTaskAssignor.get().assign(applicationState); | ||
processStreamsPartitionAssignment(clientMetadataMap, taskAssignment); | ||
probingRebalanceNeeded = taskAssignment.assignment().stream().anyMatch(assignment -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't correct -- a "probing rebalance" is a very specific type of followup rebalance related to the HA assignor where Streams will schedule it for 10 min out. Whereas the This is why I think we need to do the other end first and work backwards, ie use the TaskAssignment to build up the individual consumer assignments. The followup rebalance logic is a lot more "nuanced" (to put it nicely) and can't really be simplified in this way. We should put this PR on hold until that one is merged |
||
return assignment.followupRebalanceDeadline().isPresent(); | ||
}); | ||
} else { | ||
final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful); | ||
final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor( | ||
fullMetadata, | ||
partitionsForTask, | ||
changelogTopics.changelogPartionsForTask(), | ||
tasksForTopicGroup, | ||
racksForProcessConsumer, | ||
internalTopicManager, | ||
assignmentConfigs, | ||
time | ||
); | ||
probingRebalanceNeeded = taskAssignor.assign(clientStates, | ||
allTasks, | ||
statefulTasks, | ||
rackAwareTaskAssignor, | ||
assignmentConfigs); | ||
} | ||
|
||
// Break this up into multiple logs to make sure the summary info gets through, which helps avoid | ||
// info loss for example due to long line truncation with large apps | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure to include what a value of
null
means here (which I believe is the "real" default at this moment)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null
means we use the old task assignment strategy, which itself defaults to the HA assigner.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think the javadocs are correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm reading the code correctly, the javadocs are not correct. If you specify
HighAvailabilityTaskAssignor.class
it will use the new code path vs. if you use null it'll use the old code path.