-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15045: (KIP-924 pt. 12) Wiring in new assignment configs and logic #16074
Changes from 1 commit
58a3f70
9101cc2
ea41fa9
c3d8e72
906379c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -820,6 +820,9 @@ public class StreamsConfig extends AbstractConfig { | |
+ "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "</code> will " | ||
+ "optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors."; | ||
|
||
@SuppressWarnings("WeakerAccess") | ||
public static final String TASK_ASSIGNOR_CLASS_CONFIG = "task.assignor.class"; | ||
private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <@link TaskAssignor> interface. Defaults to the <@link HighAvailabilityTaskAssignor> class."; | ||
|
||
/** | ||
* {@code topology.optimization} | ||
|
@@ -980,6 +983,11 @@ public class StreamsConfig extends AbstractConfig { | |
null, | ||
Importance.MEDIUM, | ||
RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC) | ||
.define(TASK_ASSIGNOR_CLASS_CONFIG, | ||
Type.STRING, | ||
null, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know it doesn't exist yet, but once we have the new HA assignor then imo we should use that as the default here. Then the logic will be as such:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note: as discussed, the above is the eventual plan, but won't be implemented until we have the new HAAssignor to switch over to. For now, if neither config is set, we'll default to the old config assignor (ie the current/original HA assignor) |
||
Importance.MEDIUM, | ||
TASK_ASSIGNOR_CLASS_DOC) | ||
.define(REPLICATION_FACTOR_CONFIG, | ||
Type.INT, | ||
-1, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -214,6 +214,7 @@ public String toString() { | |
private RebalanceProtocol rebalanceProtocol; | ||
private AssignmentListener assignmentListener; | ||
|
||
private Supplier<org.apache.kafka.streams.processor.assignment.TaskAssignor> userTaskAssignorSupplier; | ||
private Supplier<TaskAssignor> taskAssignorSupplier; | ||
private byte uniqueField; | ||
private Map<String, String> clientTags; | ||
|
@@ -248,6 +249,7 @@ public void configure(final Map<String, ?> configs) { | |
internalTopicManager = assignorConfiguration.internalTopicManager(); | ||
copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer(); | ||
rebalanceProtocol = assignorConfiguration.rebalanceProtocol(); | ||
userTaskAssignorSupplier = assignorConfiguration::userTaskAssignor; | ||
taskAssignorSupplier = assignorConfiguration::taskAssignor; | ||
assignmentListener = assignorConfiguration.assignmentListener(); | ||
uniqueField = 0; | ||
|
@@ -760,23 +762,39 @@ private boolean assignTasksToClients(final Cluster fullMetadata, | |
log.debug("Assigning tasks and {} standby replicas to client nodes {}", | ||
numStandbyReplicas(), clientStates); | ||
|
||
final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful); | ||
|
||
final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor( | ||
fullMetadata, | ||
partitionsForTask, | ||
changelogTopics.changelogPartionsForTask(), | ||
tasksForTopicGroup, | ||
racksForProcessConsumer, | ||
internalTopicManager, | ||
assignmentConfigs, | ||
time | ||
); | ||
final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates, | ||
allTasks, | ||
statefulTasks, | ||
rackAwareTaskAssignor, | ||
assignmentConfigs); | ||
final org.apache.kafka.streams.processor.assignment.TaskAssignor userTaskAssignor = | ||
createUserTaskAssignor(lagComputationSuccessful); | ||
boolean probingRebalanceNeeded = false; | ||
if (userTaskAssignor == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: consider making the if and else separate methods that return |
||
final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful); | ||
final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor( | ||
fullMetadata, | ||
partitionsForTask, | ||
changelogTopics.changelogPartionsForTask(), | ||
tasksForTopicGroup, | ||
racksForProcessConsumer, | ||
internalTopicManager, | ||
assignmentConfigs, | ||
time | ||
); | ||
probingRebalanceNeeded = taskAssignor.assign(clientStates, | ||
allTasks, | ||
statefulTasks, | ||
rackAwareTaskAssignor, | ||
assignmentConfigs); | ||
} else { | ||
final ApplicationState applicationState = buildApplicationState( | ||
taskManager.topologyMetadata(), | ||
clientMetadataMap, | ||
topicGroups, | ||
fullMetadata | ||
); | ||
final TaskAssignment taskAssignment = userTaskAssignor.assign(applicationState); | ||
probingRebalanceNeeded = taskAssignment.assignment().stream().anyMatch(assignment -> { | ||
return assignment.followupRebalanceDeadline().isPresent(); | ||
}); | ||
processStreamsPartitionAssignment(clientMetadataMap, taskAssignment); | ||
} | ||
|
||
// Break this up into multiple logs to make sure the summary info gets through, which helps avoid | ||
// info loss for example due to long line truncation with large apps | ||
|
@@ -793,6 +811,14 @@ private boolean assignTasksToClients(final Cluster fullMetadata, | |
return probingRebalanceNeeded; | ||
} | ||
|
||
private org.apache.kafka.streams.processor.assignment.TaskAssignor createUserTaskAssignor(final boolean lagComputationSuccessful) { | ||
if (!lagComputationSuccessful) { | ||
log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and " | ||
+ "trigger another rebalance to retry."); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems weird to make this part of this function? is this relevant only in the case when we're creating a user assignor? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I don't think this makes sense in a world where users have a clear (read: non-internal) way to change assignment strategies and implementations. So i removed it. |
||
return userTaskAssignorSupplier.get(); | ||
} | ||
|
||
private TaskAssignor createTaskAssignor(final boolean lagComputationSuccessful) { | ||
final TaskAssignor taskAssignor = taskAssignorSupplier.get(); | ||
if (taskAssignor instanceof StickyTaskAssignor) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -253,6 +253,21 @@ public TaskAssignor taskAssignor() { | |
} | ||
} | ||
|
||
public org.apache.kafka.streams.processor.assignment.TaskAssignor userTaskAssignor() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consider returning |
||
final String userTaskAssignorClassname = streamsConfig.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG); | ||
if (userTaskAssignorClassname == null) { | ||
return null; | ||
} | ||
try { | ||
return Utils.newInstance(userTaskAssignorClassname, org.apache.kafka.streams.processor.assignment.TaskAssignor.class); | ||
} catch (final ClassNotFoundException e) { | ||
throw new IllegalArgumentException( | ||
"Expected an instantiable class name for " + StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's add the name of the class that was passed in to aid in debugging |
||
e | ||
); | ||
} | ||
} | ||
|
||
public AssignmentListener assignmentListener() { | ||
final Object o = internalConfigs.get(InternalConfig.ASSIGNMENT_LISTENER); | ||
if (o == null) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure to include what a value of
null
means here (which I believe is the "real" default at this moment)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null
means we use the old task assignment strategy, which itself defaults to the HA assigner.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think the javadocs are correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm reading the code correctly, the javadocs are not correct. If you specify
HighAvailabilityTaskAssignor.class
it will use the new code path vs. if you use null it'll use the old code path.