Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15045: (KIP-924 pt. 12) Wiring in new assignment configs and logic #16074

Merged
merged 5 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,9 @@ public class StreamsConfig extends AbstractConfig {
+ "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "</code> will "
+ "optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors.";

@SuppressWarnings("WeakerAccess")
public static final String TASK_ASSIGNOR_CLASS_CONFIG = "task.assignor.class";
private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <@link TaskAssignor> interface. Defaults to the <@link HighAvailabilityTaskAssignor> class.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure to include what a value of null means here (which I believe is the "real" default at this moment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null means we use the old task assignment strategy, which itself defaults to the HA assigner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think the javadocs are correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading the code correctly, the javadocs are not correct. If you specify HighAvailabilityTaskAssignor.class it will use the new code path vs. if you use null it'll use the old code path.


/**
* {@code topology.optimization}
Expand Down Expand Up @@ -980,6 +983,11 @@ public class StreamsConfig extends AbstractConfig {
null,
Importance.MEDIUM,
RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC)
.define(TASK_ASSIGNOR_CLASS_CONFIG,
Type.STRING,
null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it doesn't exist yet, but once we have the new HA assignor then imo we should use that as the default here. Then the logic will be as such:

  1. If the new config is set but not the old one, use the new config assignor
  2. If the old config is set but not the new one, use the old config assignor
  3. If both configs are set, use the new config assignor
  4. If neither config is set, use the new config assignor (ie default to new HA assignor)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: as discussed, the above is the eventual plan, but won't be implemented until we have the new HAAssignor to switch over to. For now, if neither config is set, we'll default to the old config assignor (ie the current/original HA assignor)

Importance.MEDIUM,
TASK_ASSIGNOR_CLASS_DOC)
.define(REPLICATION_FACTOR_CONFIG,
Type.INT,
-1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public String toString() {
private RebalanceProtocol rebalanceProtocol;
private AssignmentListener assignmentListener;

private Supplier<Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor>> userTaskAssignorSupplier;
private Supplier<TaskAssignor> taskAssignorSupplier;
private byte uniqueField;
private Map<String, String> clientTags;
Expand Down Expand Up @@ -248,6 +249,7 @@ public void configure(final Map<String, ?> configs) {
internalTopicManager = assignorConfiguration.internalTopicManager();
copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer();
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
userTaskAssignorSupplier = assignorConfiguration::userTaskAssignor;
taskAssignorSupplier = assignorConfiguration::taskAssignor;
assignmentListener = assignorConfiguration.assignmentListener();
uniqueField = 0;
Expand Down Expand Up @@ -760,23 +762,39 @@ private boolean assignTasksToClients(final Cluster fullMetadata,
log.debug("Assigning tasks and {} standby replicas to client nodes {}",
numStandbyReplicas(), clientStates);

final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);

final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(
fullMetadata,
partitionsForTask,
changelogTopics.changelogPartionsForTask(),
tasksForTopicGroup,
racksForProcessConsumer,
internalTopicManager,
assignmentConfigs,
time
);
final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates,
allTasks,
statefulTasks,
rackAwareTaskAssignor,
assignmentConfigs);
final Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor> userTaskAssignor =
userTaskAssignorSupplier.get();
boolean probingRebalanceNeeded;
if (userTaskAssignor.isPresent()) {
final ApplicationState applicationState = buildApplicationState(
taskManager.topologyMetadata(),
clientMetadataMap,
topicGroups,
fullMetadata
);
final TaskAssignment taskAssignment = userTaskAssignor.get().assign(applicationState);
processStreamsPartitionAssignment(clientMetadataMap, taskAssignment);
probingRebalanceNeeded = taskAssignment.assignment().stream().anyMatch(assignment -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct -- a "probing rebalance" is a very specific type of followup rebalance related to the HA assignor where Streams will schedule it for 10 min out. Whereas the followupRebalanceDeadline could be an immediate follow rebalance or a probing rebalance or any custom deadline.

This is why I think we need to do the other end first and work backwards, ie use the TaskAssignment to build up the individual consumer assignments. The followup rebalance logic is a lot more "nuanced" (to put it nicely) and can't really be simplified in this way. We should put this PR on hold until that one is merged

return assignment.followupRebalanceDeadline().isPresent();
});
} else {
final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);
final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(
fullMetadata,
partitionsForTask,
changelogTopics.changelogPartionsForTask(),
tasksForTopicGroup,
racksForProcessConsumer,
internalTopicManager,
assignmentConfigs,
time
);
probingRebalanceNeeded = taskAssignor.assign(clientStates,
allTasks,
statefulTasks,
rackAwareTaskAssignor,
assignmentConfigs);
}

// Break this up into multiple logs to make sure the summary info gets through, which helps avoid
// info loss for example due to long line truncation with large apps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Optional;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
import org.apache.kafka.common.KafkaException;
Expand Down Expand Up @@ -253,6 +254,24 @@ public TaskAssignor taskAssignor() {
}
}

public Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor> userTaskAssignor() {
final String userTaskAssignorClassname = streamsConfig.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
if (userTaskAssignorClassname == null) {
return Optional.empty();
}
try {
final org.apache.kafka.streams.processor.assignment.TaskAssignor assignor = Utils.newInstance(userTaskAssignorClassname,
org.apache.kafka.streams.processor.assignment.TaskAssignor.class);
log.info("Instantiated {} as the task assignor.", userTaskAssignorClassname);
return Optional.of(assignor);
} catch (final ClassNotFoundException e) {
throw new IllegalArgumentException(
"Expected an instantiable class name for " + StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG + " but got " + userTaskAssignorClassname,
e
);
}
}

public AssignmentListener assignmentListener() {
final Object o = internalConfigs.get(InternalConfig.ASSIGNMENT_LISTENER);
if (o == null) {
Expand Down