Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15045: (KIP-924 pt. 12) Wiring in new assignment configs and logic #16074

Merged
merged 5 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,9 @@ public class StreamsConfig extends AbstractConfig {
+ "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "</code> will "
+ "optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors.";

@SuppressWarnings("WeakerAccess")
public static final String TASK_ASSIGNOR_CLASS_CONFIG = "task.assignor.class";
private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <@link TaskAssignor> interface. Defaults to the <@link HighAvailabilityTaskAssignor> class.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure to include what a value of null means here (which I believe is the "real" default at this moment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null means we use the old task assignment strategy, which itself defaults to the HA assigner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think the javadocs are correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading the code correctly, the javadocs are not correct. If you specify HighAvailabilityTaskAssignor.class it will use the new code path vs. if you use null it'll use the old code path.


/**
* {@code topology.optimization}
Expand Down Expand Up @@ -980,6 +983,11 @@ public class StreamsConfig extends AbstractConfig {
null,
Importance.MEDIUM,
RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC)
.define(TASK_ASSIGNOR_CLASS_CONFIG,
Type.STRING,
null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it doesn't exist yet, but once we have the new HA assignor then imo we should use that as the default here. Then the logic will be as such:

  1. If the new config is set but not the old one, use the new config assignor
  2. If the old config is set but not the new one, use the old config assignor
  3. If both configs are set, use the new config assignor
  4. If neither config is set, use the new config assignor (ie default to new HA assignor)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: as discussed, the above is the eventual plan, but won't be implemented until we have the new HAAssignor to switch over to. For now, if neither config is set, we'll default to the old config assignor (ie the current/original HA assignor)

Importance.MEDIUM,
TASK_ASSIGNOR_CLASS_DOC)
.define(REPLICATION_FACTOR_CONFIG,
Type.INT,
-1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public String toString() {
private RebalanceProtocol rebalanceProtocol;
private AssignmentListener assignmentListener;

private Supplier<org.apache.kafka.streams.processor.assignment.TaskAssignor> userTaskAssignorSupplier;
private Supplier<TaskAssignor> taskAssignorSupplier;
private byte uniqueField;
private Map<String, String> clientTags;
Expand Down Expand Up @@ -248,6 +249,7 @@ public void configure(final Map<String, ?> configs) {
internalTopicManager = assignorConfiguration.internalTopicManager();
copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer();
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
userTaskAssignorSupplier = assignorConfiguration::userTaskAssignor;
taskAssignorSupplier = assignorConfiguration::taskAssignor;
assignmentListener = assignorConfiguration.assignmentListener();
uniqueField = 0;
Expand Down Expand Up @@ -760,23 +762,39 @@ private boolean assignTasksToClients(final Cluster fullMetadata,
log.debug("Assigning tasks and {} standby replicas to client nodes {}",
numStandbyReplicas(), clientStates);

final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);

final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(
fullMetadata,
partitionsForTask,
changelogTopics.changelogPartionsForTask(),
tasksForTopicGroup,
racksForProcessConsumer,
internalTopicManager,
assignmentConfigs,
time
);
final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates,
allTasks,
statefulTasks,
rackAwareTaskAssignor,
assignmentConfigs);
final org.apache.kafka.streams.processor.assignment.TaskAssignor userTaskAssignor =
createUserTaskAssignor(lagComputationSuccessful);
boolean probingRebalanceNeeded = false;
if (userTaskAssignor == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider making the if and else separate methods that return boolean (whether probing rebalance is necessary) and just assign directly so it can be final

final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);
final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(
fullMetadata,
partitionsForTask,
changelogTopics.changelogPartionsForTask(),
tasksForTopicGroup,
racksForProcessConsumer,
internalTopicManager,
assignmentConfigs,
time
);
probingRebalanceNeeded = taskAssignor.assign(clientStates,
allTasks,
statefulTasks,
rackAwareTaskAssignor,
assignmentConfigs);
} else {
final ApplicationState applicationState = buildApplicationState(
taskManager.topologyMetadata(),
clientMetadataMap,
topicGroups,
fullMetadata
);
final TaskAssignment taskAssignment = userTaskAssignor.assign(applicationState);
probingRebalanceNeeded = taskAssignment.assignment().stream().anyMatch(assignment -> {
return assignment.followupRebalanceDeadline().isPresent();
});
processStreamsPartitionAssignment(clientMetadataMap, taskAssignment);
}

// Break this up into multiple logs to make sure the summary info gets through, which helps avoid
// info loss for example due to long line truncation with large apps
Expand All @@ -793,6 +811,14 @@ private boolean assignTasksToClients(final Cluster fullMetadata,
return probingRebalanceNeeded;
}

private org.apache.kafka.streams.processor.assignment.TaskAssignor createUserTaskAssignor(final boolean lagComputationSuccessful) {
if (!lagComputationSuccessful) {
log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and "
+ "trigger another rebalance to retry.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems weird to make this part of this function? is this relevant only in the case when we're creating a user assignor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't think this makes sense in a world where users have a clear (read: non-internal) way to change assignment strategies and implementations. So i removed it.

return userTaskAssignorSupplier.get();
}

private TaskAssignor createTaskAssignor(final boolean lagComputationSuccessful) {
final TaskAssignor taskAssignor = taskAssignorSupplier.get();
if (taskAssignor instanceof StickyTaskAssignor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,21 @@ public TaskAssignor taskAssignor() {
}
}

public org.apache.kafka.streams.processor.assignment.TaskAssignor userTaskAssignor() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider returning Optional<...> to indicate that null is a semantically valid result

final String userTaskAssignorClassname = streamsConfig.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
if (userTaskAssignorClassname == null) {
return null;
}
try {
return Utils.newInstance(userTaskAssignorClassname, org.apache.kafka.streams.processor.assignment.TaskAssignor.class);
} catch (final ClassNotFoundException e) {
throw new IllegalArgumentException(
"Expected an instantiable class name for " + StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add the name of the class that was passed in to aid in debugging

e
);
}
}

public AssignmentListener assignmentListener() {
final Object o = internalConfigs.get(InternalConfig.ASSIGNMENT_LISTENER);
if (o == null) {
Expand Down