From 8d243dfbd415b5b34515695b72379d1489297a23 Mon Sep 17 00:00:00 2001 From: Antoine Pourchet Date: Tue, 28 May 2024 19:01:18 -0600 Subject: [PATCH] KAFKA-15045: (KIP-924 pt. 12) Wiring in new assignment configs and logic (#16074) This PR creates the new public config of KIP-924 in StreamsConfig and uses it to instantiate user-created TaskAssignors. If such a TaskAssignor is found and successfully created we then use that assignor to perform the task assignment, otherwise we revert back to the pre KIP-924 world with the internal task assignors. Reviewers: Anna Sophie Blee-Goldman , Almog Gavra --- .../apache/kafka/streams/StreamsConfig.java | 10 ++ .../internals/StreamsPartitionAssignor.java | 112 ++++++++++-------- .../assignment/AssignorConfiguration.java | 19 +++ .../internals/assignment/ClientState.java | 12 ++ .../kafka/streams/StreamsConfigTest.java | 7 ++ 5 files changed, 113 insertions(+), 47 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 70e4cba9625ac..25e86928b00c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -45,6 +45,7 @@ import org.apache.kafka.streams.internals.UpgradeFromValues; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor; @@ -820,6 +821,10 @@ public class StreamsConfig extends AbstractConfig { + "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + " will " + "optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors."; + @SuppressWarnings("WeakerAccess") + public static final String TASK_ASSIGNOR_CLASS_CONFIG = "task.assignor.class"; + private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the " + + TaskAssignor.class.getName() + " interface. Defaults to the HighAvailabilityTaskAssignor class."; /** * {@code topology.optimization} @@ -980,6 +985,11 @@ public class StreamsConfig extends AbstractConfig { null, Importance.MEDIUM, RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC) + .define(TASK_ASSIGNOR_CLASS_CONFIG, + Type.STRING, + null, + Importance.MEDIUM, + TASK_ASSIGNOR_CLASS_DOC) .define(REPLICATION_FACTOR_CONFIG, Type.INT, -1, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index d7c15f9fd0d69..f2b8143f107ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.time.Instant; import java.util.Optional; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ListOffsetsResult; @@ -214,6 +215,7 @@ public String toString() { private RebalanceProtocol rebalanceProtocol; private AssignmentListener assignmentListener; + private Supplier> userTaskAssignorSupplier; private Supplier taskAssignorSupplier; private byte uniqueField; private Map clientTags; @@ -248,6 +250,7 @@ public void configure(final Map configs) { internalTopicManager = assignorConfiguration.internalTopicManager(); copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer(); rebalanceProtocol = assignorConfiguration.rebalanceProtocol(); + userTaskAssignorSupplier = assignorConfiguration::userTaskAssignor; taskAssignorSupplier = assignorConfiguration::taskAssignor; assignmentListener = assignorConfiguration.assignmentListener(); uniqueField = 0; @@ -400,9 +403,6 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr } try { - final boolean versionProbing = - checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion); - log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap); // ---------------- Step One ---------------- // @@ -440,7 +440,9 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr final Set statefulTasks = new HashSet<>(); - final boolean probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, + final boolean versionProbing = + checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion); + assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, racksForProcessConsumer, statefulTasks); // ---------------- Step Three ---------------- // @@ -465,8 +467,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr allOwnedPartitions, minReceivedMetadataVersion, minSupportedMetadataVersion, - versionProbing, - probingRebalanceNeeded + versionProbing ); return new GroupAssignment(assignment); @@ -570,6 +571,9 @@ private static void processStreamsPartitionAssignment(final Map allSourceTopics, /** * Assigns a set of tasks to each client (Streams instance) using the configured task assignor, and also * populate the stateful tasks that have been assigned to the clients - * @return true if a probing rebalance should be triggered */ - private boolean assignTasksToClients(final Cluster fullMetadata, - final Set allSourceTopics, - final Map topicGroups, - final Map clientMetadataMap, - final Map> partitionsForTask, - final Map>> racksForProcessConsumer, - final Set statefulTasks) { + private void assignTasksToClients(final Cluster fullMetadata, + final Set allSourceTopics, + final Map topicGroups, + final Map clientMetadataMap, + final Map> partitionsForTask, + final Map>> racksForProcessConsumer, + final Set statefulTasks) { if (!statefulTasks.isEmpty()) { throw new TaskAssignmentException("The stateful tasks should not be populated before assigning tasks to clients"); } @@ -760,23 +763,45 @@ private boolean assignTasksToClients(final Cluster fullMetadata, log.debug("Assigning tasks and {} standby replicas to client nodes {}", numStandbyReplicas(), clientStates); - final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful); - - final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor( - fullMetadata, - partitionsForTask, - changelogTopics.changelogPartionsForTask(), - tasksForTopicGroup, - racksForProcessConsumer, - internalTopicManager, - assignmentConfigs, - time - ); - final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates, - allTasks, - statefulTasks, - rackAwareTaskAssignor, - assignmentConfigs); + final Optional userTaskAssignor = + userTaskAssignorSupplier.get(); + if (userTaskAssignor.isPresent()) { + final ApplicationState applicationState = buildApplicationState( + taskManager.topologyMetadata(), + clientMetadataMap, + topicGroups, + fullMetadata + ); + final TaskAssignment taskAssignment = userTaskAssignor.get().assign(applicationState); + processStreamsPartitionAssignment(clientMetadataMap, taskAssignment); + } else { + final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful); + final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor( + fullMetadata, + partitionsForTask, + changelogTopics.changelogPartionsForTask(), + tasksForTopicGroup, + racksForProcessConsumer, + internalTopicManager, + assignmentConfigs, + time + ); + final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates, + allTasks, + statefulTasks, + rackAwareTaskAssignor, + assignmentConfigs); + if (probingRebalanceNeeded) { + // Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance, + // note once we pick the first consumer within the process to trigger probing rebalance, other consumer + // would not set to trigger any more. + final ClientMetadata rebalanceClientMetadata = clientMetadataMap.get(taskManager.processId()); + if (rebalanceClientMetadata != null) { + final Instant rebalanceDeadline = Instant.ofEpochMilli(time.milliseconds() + probingRebalanceIntervalMs()); + rebalanceClientMetadata.state.setFollowupRebalanceDeadline(rebalanceDeadline); + } + } + } // Break this up into multiple logs to make sure the summary info gets through, which helps avoid // info loss for example due to long line truncation with large apps @@ -789,8 +814,6 @@ private boolean assignTasksToClients(final Cluster fullMetadata, .sorted(comparingByKey()) .map(entry -> entry.getKey() + "=" + entry.getValue().currentAssignment()) .collect(Collectors.joining(Utils.NL))); - - return probingRebalanceNeeded; } private TaskAssignor createTaskAssignor(final boolean lagComputationSuccessful) { @@ -948,9 +971,8 @@ private Map computeNewAssignment(final Set statefulT final Set allOwnedPartitions, final int minUserMetadataVersion, final int minSupportedMetadataVersion, - final boolean versionProbing, - final boolean shouldTriggerProbingRebalance) { - boolean rebalanceRequired = shouldTriggerProbingRebalance || versionProbing; + final boolean versionProbing) { + boolean rebalanceRequired = versionProbing; final Map assignment = new HashMap<>(); // within the client, distribute tasks to its owned consumers @@ -992,10 +1014,7 @@ private Map computeNewAssignment(final Set statefulT activeTaskAssignment.get(threadEntry.getKey()).addAll(threadEntry.getValue()); } - // Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance, - // note once we pick the first consumer within the process to trigger probing rebalance, other consumer - // would not set to trigger any more. - final boolean encodeNextProbingRebalanceTime = shouldTriggerProbingRebalance && clientId.equals(taskManager.processId()); + final boolean isNextProbingRebalanceEncoded = clientMetadata.state.followupRebalanceDeadline().isPresent(); final boolean tasksRevoked = addClientAssignments( statefulTasks, @@ -1008,11 +1027,10 @@ private Map computeNewAssignment(final Set statefulT activeTaskAssignment, standbyTaskAssignment, minUserMetadataVersion, - minSupportedMetadataVersion, - encodeNextProbingRebalanceTime + minSupportedMetadataVersion ); - if (tasksRevoked || encodeNextProbingRebalanceTime) { + if (tasksRevoked || isNextProbingRebalanceEncoded) { rebalanceRequired = true; log.debug("Requested client {} to schedule a followup rebalance", clientId); } @@ -1056,12 +1074,12 @@ private boolean addClientAssignments(final Set statefulTasks, final Map> activeTaskAssignments, final Map> standbyTaskAssignments, final int minUserMetadataVersion, - final int minSupportedMetadataVersion, - final boolean probingRebalanceNeeded) { + final int minSupportedMetadataVersion) { boolean followupRebalanceRequiredForRevokedTasks = false; // We only want to encode a scheduled probing rebalance for a single member in this client - boolean shouldEncodeProbingRebalance = probingRebalanceNeeded; + final Optional followupRebalanceDeadline = clientMetadata.state.followupRebalanceDeadline(); + boolean shouldEncodeProbingRebalance = followupRebalanceDeadline.isPresent(); // Loop through the consumers and build their assignment for (final String consumer : clientMetadata.consumers) { @@ -1108,7 +1126,7 @@ private boolean addClientAssignments(final Set statefulTasks, // Don't bother to schedule a probing rebalance if an immediate one is already scheduled shouldEncodeProbingRebalance = false; } else if (shouldEncodeProbingRebalance) { - final long nextRebalanceTimeMs = time.milliseconds() + probingRebalanceIntervalMs(); + final long nextRebalanceTimeMs = followupRebalanceDeadline.get().toEpochMilli(); log.info("Requesting followup rebalance be scheduled by {} for {} to probe for caught-up replica tasks.", consumer, Utils.toLogDateTimeFormat(nextRebalanceTimeMs)); info.setNextRebalanceTime(nextRebalanceTimeMs); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index 052ad117af467..176c475ffd9ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import java.util.Optional; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol; import org.apache.kafka.common.KafkaException; @@ -253,6 +254,24 @@ public TaskAssignor taskAssignor() { } } + public Optional userTaskAssignor() { + final String userTaskAssignorClassname = streamsConfig.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG); + if (userTaskAssignorClassname == null) { + return Optional.empty(); + } + try { + final org.apache.kafka.streams.processor.assignment.TaskAssignor assignor = Utils.newInstance(userTaskAssignorClassname, + org.apache.kafka.streams.processor.assignment.TaskAssignor.class); + log.info("Instantiated {} as the task assignor.", userTaskAssignorClassname); + return Optional.of(assignor); + } catch (final ClassNotFoundException e) { + throw new IllegalArgumentException( + "Expected an instantiable class name for " + StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG + " but got " + userTaskAssignorClassname, + e + ); + } + } + public AssignmentListener assignmentListener() { final Object o = internalConfigs.get(InternalConfig.ASSIGNMENT_LISTENER); if (o == null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index 1d9d8c47a4e08..d24e9c19167c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import java.time.Instant; +import java.util.Optional; import java.util.SortedMap; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.TaskId; @@ -62,6 +64,8 @@ public class ClientState { private final ClientStateTask previousStandbyTasks = new ClientStateTask(null, null); private final ClientStateTask revokingActiveTasks = new ClientStateTask(null, new TreeMap<>()); private final UUID processId; + + private Optional followupRebalanceDeadline = Optional.empty(); private int capacity; public ClientState() { @@ -143,6 +147,14 @@ boolean reachedCapacity() { return assignedTaskCount() >= capacity; } + public Optional followupRebalanceDeadline() { + return followupRebalanceDeadline; + } + + public void setFollowupRebalanceDeadline(final Instant followupRebalanceDeadline) { + this.followupRebalanceDeadline = Optional.of(followupRebalanceDeadline); + } + public Set activeTasks() { return unmodifiableSet(assignedActiveTasks.taskIds()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index a264de083b47f..8c89132ae2f9c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -67,6 +67,7 @@ import static org.apache.kafka.streams.StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG; import static org.apache.kafka.streams.StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG; import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG; import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; @@ -1457,6 +1458,12 @@ public void shouldReturnRackAwareAssignmentNonOverlapCost() { assertEquals(Integer.valueOf(10), new StreamsConfig(props).getInt(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)); } + @Test + public void shouldReturnTaskAssignorClass() { + props.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, "StickyTaskAssignor"); + assertEquals("StickyTaskAssignor", new StreamsConfig(props).getString(TASK_ASSIGNOR_CLASS_CONFIG)); + } + @Test public void shouldReturnDefaultClientSupplier() { final KafkaClientSupplier supplier = streamsConfig.getKafkaClientSupplier();