diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 70e4cba9625ac..25e86928b00c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -45,6 +45,7 @@
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
@@ -820,6 +821,10 @@ public class StreamsConfig extends AbstractConfig {
+ "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + "
will "
+ "optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors.";
+ @SuppressWarnings("WeakerAccess")
+ public static final String TASK_ASSIGNOR_CLASS_CONFIG = "task.assignor.class";
+ private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the " +
+ TaskAssignor.class.getName() + "
interface. Defaults to the HighAvailabilityTaskAssignor
class.";
/**
* {@code topology.optimization}
@@ -980,6 +985,11 @@ public class StreamsConfig extends AbstractConfig {
null,
Importance.MEDIUM,
RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC)
+ .define(TASK_ASSIGNOR_CLASS_CONFIG,
+ Type.STRING,
+ null,
+ Importance.MEDIUM,
+ TASK_ASSIGNOR_CLASS_DOC)
.define(REPLICATION_FACTOR_CONFIG,
Type.INT,
-1,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index d7c15f9fd0d69..f2b8143f107ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.time.Instant;
import java.util.Optional;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
@@ -214,6 +215,7 @@ public String toString() {
private RebalanceProtocol rebalanceProtocol;
private AssignmentListener assignmentListener;
+ private Supplier> userTaskAssignorSupplier;
private Supplier taskAssignorSupplier;
private byte uniqueField;
private Map clientTags;
@@ -248,6 +250,7 @@ public void configure(final Map configs) {
internalTopicManager = assignorConfiguration.internalTopicManager();
copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer();
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
+ userTaskAssignorSupplier = assignorConfiguration::userTaskAssignor;
taskAssignorSupplier = assignorConfiguration::taskAssignor;
assignmentListener = assignorConfiguration.assignmentListener();
uniqueField = 0;
@@ -400,9 +403,6 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
}
try {
- final boolean versionProbing =
- checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);
-
log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
// ---------------- Step One ---------------- //
@@ -440,7 +440,9 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
final Set statefulTasks = new HashSet<>();
- final boolean probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups,
+ final boolean versionProbing =
+ checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);
+ assignTasksToClients(fullMetadata, allSourceTopics, topicGroups,
clientMetadataMap, partitionsForTask, racksForProcessConsumer, statefulTasks);
// ---------------- Step Three ---------------- //
@@ -465,8 +467,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
allOwnedPartitions,
minReceivedMetadataVersion,
minSupportedMetadataVersion,
- versionProbing,
- probingRebalanceNeeded
+ versionProbing
);
return new GroupAssignment(assignment);
@@ -570,6 +571,9 @@ private static void processStreamsPartitionAssignment(final Map allSourceTopics,
/**
* Assigns a set of tasks to each client (Streams instance) using the configured task assignor, and also
* populate the stateful tasks that have been assigned to the clients
- * @return true if a probing rebalance should be triggered
*/
- private boolean assignTasksToClients(final Cluster fullMetadata,
- final Set allSourceTopics,
- final Map topicGroups,
- final Map clientMetadataMap,
- final Map> partitionsForTask,
- final Map>> racksForProcessConsumer,
- final Set statefulTasks) {
+ private void assignTasksToClients(final Cluster fullMetadata,
+ final Set allSourceTopics,
+ final Map topicGroups,
+ final Map clientMetadataMap,
+ final Map> partitionsForTask,
+ final Map>> racksForProcessConsumer,
+ final Set statefulTasks) {
if (!statefulTasks.isEmpty()) {
throw new TaskAssignmentException("The stateful tasks should not be populated before assigning tasks to clients");
}
@@ -760,23 +763,45 @@ private boolean assignTasksToClients(final Cluster fullMetadata,
log.debug("Assigning tasks and {} standby replicas to client nodes {}",
numStandbyReplicas(), clientStates);
- final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);
-
- final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(
- fullMetadata,
- partitionsForTask,
- changelogTopics.changelogPartionsForTask(),
- tasksForTopicGroup,
- racksForProcessConsumer,
- internalTopicManager,
- assignmentConfigs,
- time
- );
- final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates,
- allTasks,
- statefulTasks,
- rackAwareTaskAssignor,
- assignmentConfigs);
+ final Optional userTaskAssignor =
+ userTaskAssignorSupplier.get();
+ if (userTaskAssignor.isPresent()) {
+ final ApplicationState applicationState = buildApplicationState(
+ taskManager.topologyMetadata(),
+ clientMetadataMap,
+ topicGroups,
+ fullMetadata
+ );
+ final TaskAssignment taskAssignment = userTaskAssignor.get().assign(applicationState);
+ processStreamsPartitionAssignment(clientMetadataMap, taskAssignment);
+ } else {
+ final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);
+ final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(
+ fullMetadata,
+ partitionsForTask,
+ changelogTopics.changelogPartionsForTask(),
+ tasksForTopicGroup,
+ racksForProcessConsumer,
+ internalTopicManager,
+ assignmentConfigs,
+ time
+ );
+ final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates,
+ allTasks,
+ statefulTasks,
+ rackAwareTaskAssignor,
+ assignmentConfigs);
+ if (probingRebalanceNeeded) {
+ // Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance,
+ // note once we pick the first consumer within the process to trigger probing rebalance, other consumer
+ // would not set to trigger any more.
+ final ClientMetadata rebalanceClientMetadata = clientMetadataMap.get(taskManager.processId());
+ if (rebalanceClientMetadata != null) {
+ final Instant rebalanceDeadline = Instant.ofEpochMilli(time.milliseconds() + probingRebalanceIntervalMs());
+ rebalanceClientMetadata.state.setFollowupRebalanceDeadline(rebalanceDeadline);
+ }
+ }
+ }
// Break this up into multiple logs to make sure the summary info gets through, which helps avoid
// info loss for example due to long line truncation with large apps
@@ -789,8 +814,6 @@ private boolean assignTasksToClients(final Cluster fullMetadata,
.sorted(comparingByKey())
.map(entry -> entry.getKey() + "=" + entry.getValue().currentAssignment())
.collect(Collectors.joining(Utils.NL)));
-
- return probingRebalanceNeeded;
}
private TaskAssignor createTaskAssignor(final boolean lagComputationSuccessful) {
@@ -948,9 +971,8 @@ private Map computeNewAssignment(final Set statefulT
final Set allOwnedPartitions,
final int minUserMetadataVersion,
final int minSupportedMetadataVersion,
- final boolean versionProbing,
- final boolean shouldTriggerProbingRebalance) {
- boolean rebalanceRequired = shouldTriggerProbingRebalance || versionProbing;
+ final boolean versionProbing) {
+ boolean rebalanceRequired = versionProbing;
final Map assignment = new HashMap<>();
// within the client, distribute tasks to its owned consumers
@@ -992,10 +1014,7 @@ private Map computeNewAssignment(final Set statefulT
activeTaskAssignment.get(threadEntry.getKey()).addAll(threadEntry.getValue());
}
- // Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance,
- // note once we pick the first consumer within the process to trigger probing rebalance, other consumer
- // would not set to trigger any more.
- final boolean encodeNextProbingRebalanceTime = shouldTriggerProbingRebalance && clientId.equals(taskManager.processId());
+ final boolean isNextProbingRebalanceEncoded = clientMetadata.state.followupRebalanceDeadline().isPresent();
final boolean tasksRevoked = addClientAssignments(
statefulTasks,
@@ -1008,11 +1027,10 @@ private Map computeNewAssignment(final Set statefulT
activeTaskAssignment,
standbyTaskAssignment,
minUserMetadataVersion,
- minSupportedMetadataVersion,
- encodeNextProbingRebalanceTime
+ minSupportedMetadataVersion
);
- if (tasksRevoked || encodeNextProbingRebalanceTime) {
+ if (tasksRevoked || isNextProbingRebalanceEncoded) {
rebalanceRequired = true;
log.debug("Requested client {} to schedule a followup rebalance", clientId);
}
@@ -1056,12 +1074,12 @@ private boolean addClientAssignments(final Set statefulTasks,
final Map> activeTaskAssignments,
final Map> standbyTaskAssignments,
final int minUserMetadataVersion,
- final int minSupportedMetadataVersion,
- final boolean probingRebalanceNeeded) {
+ final int minSupportedMetadataVersion) {
boolean followupRebalanceRequiredForRevokedTasks = false;
// We only want to encode a scheduled probing rebalance for a single member in this client
- boolean shouldEncodeProbingRebalance = probingRebalanceNeeded;
+ final Optional followupRebalanceDeadline = clientMetadata.state.followupRebalanceDeadline();
+ boolean shouldEncodeProbingRebalance = followupRebalanceDeadline.isPresent();
// Loop through the consumers and build their assignment
for (final String consumer : clientMetadata.consumers) {
@@ -1108,7 +1126,7 @@ private boolean addClientAssignments(final Set statefulTasks,
// Don't bother to schedule a probing rebalance if an immediate one is already scheduled
shouldEncodeProbingRebalance = false;
} else if (shouldEncodeProbingRebalance) {
- final long nextRebalanceTimeMs = time.milliseconds() + probingRebalanceIntervalMs();
+ final long nextRebalanceTimeMs = followupRebalanceDeadline.get().toEpochMilli();
log.info("Requesting followup rebalance be scheduled by {} for {} to probe for caught-up replica tasks.",
consumer, Utils.toLogDateTimeFormat(nextRebalanceTimeMs));
info.setNextRebalanceTime(nextRebalanceTimeMs);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 052ad117af467..176c475ffd9ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
+import java.util.Optional;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
import org.apache.kafka.common.KafkaException;
@@ -253,6 +254,24 @@ public TaskAssignor taskAssignor() {
}
}
+ public Optional userTaskAssignor() {
+ final String userTaskAssignorClassname = streamsConfig.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
+ if (userTaskAssignorClassname == null) {
+ return Optional.empty();
+ }
+ try {
+ final org.apache.kafka.streams.processor.assignment.TaskAssignor assignor = Utils.newInstance(userTaskAssignorClassname,
+ org.apache.kafka.streams.processor.assignment.TaskAssignor.class);
+ log.info("Instantiated {} as the task assignor.", userTaskAssignorClassname);
+ return Optional.of(assignor);
+ } catch (final ClassNotFoundException e) {
+ throw new IllegalArgumentException(
+ "Expected an instantiable class name for " + StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG + " but got " + userTaskAssignorClassname,
+ e
+ );
+ }
+ }
+
public AssignmentListener assignmentListener() {
final Object o = internalConfigs.get(InternalConfig.ASSIGNMENT_LISTENER);
if (o == null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 1d9d8c47a4e08..d24e9c19167c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
+import java.time.Instant;
+import java.util.Optional;
import java.util.SortedMap;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;
@@ -62,6 +64,8 @@ public class ClientState {
private final ClientStateTask previousStandbyTasks = new ClientStateTask(null, null);
private final ClientStateTask revokingActiveTasks = new ClientStateTask(null, new TreeMap<>());
private final UUID processId;
+
+ private Optional followupRebalanceDeadline = Optional.empty();
private int capacity;
public ClientState() {
@@ -143,6 +147,14 @@ boolean reachedCapacity() {
return assignedTaskCount() >= capacity;
}
+ public Optional followupRebalanceDeadline() {
+ return followupRebalanceDeadline;
+ }
+
+ public void setFollowupRebalanceDeadline(final Instant followupRebalanceDeadline) {
+ this.followupRebalanceDeadline = Optional.of(followupRebalanceDeadline);
+ }
+
public Set activeTasks() {
return unmodifiableSet(assignedActiveTasks.taskIds());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index a264de083b47f..8c89132ae2f9c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -67,6 +67,7 @@
import static org.apache.kafka.streams.StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
@@ -1457,6 +1458,12 @@ public void shouldReturnRackAwareAssignmentNonOverlapCost() {
assertEquals(Integer.valueOf(10), new StreamsConfig(props).getInt(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
}
+ @Test
+ public void shouldReturnTaskAssignorClass() {
+ props.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, "StickyTaskAssignor");
+ assertEquals("StickyTaskAssignor", new StreamsConfig(props).getString(TASK_ASSIGNOR_CLASS_CONFIG));
+ }
+
@Test
public void shouldReturnDefaultClientSupplier() {
final KafkaClientSupplier supplier = streamsConfig.getKafkaClientSupplier();