From 11420e67a7f907031aa5de41034a9bdb4ff13d34 Mon Sep 17 00:00:00 2001 From: Antoine Pourchet Date: Tue, 28 May 2024 15:04:25 -0600 Subject: [PATCH 1/3] KAFKA-15045: (KIP-924 pt. 13) AssignmentError calculation added This PR adds the post-processing of the TaskAssignment to figure out if the new assignment is valid, and return an AssignmentError otherwise. --- .../processor/assignment/TaskAssignor.java | 2 + .../internals/StreamsPartitionAssignor.java | 89 +++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java index acfbe4beb84e9..ef664e708ad7c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignor.java @@ -36,6 +36,7 @@ public interface TaskAssignor extends Configurable { * ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES: multiple KafkaStreams clients assigned with the same active task * ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS: active task and standby task assigned to the same KafkaStreams client * INVALID_STANDBY_TASK: stateless task assigned as a standby task + * MISSING_PROCESS_ID: ProcessId present in the input ApplicationState was not present in the output TaskAssignment * UNKNOWN_PROCESS_ID: unrecognized ProcessId not matching any of the participating consumers * UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned */ @@ -44,6 +45,7 @@ enum AssignmentError { ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES, ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS, INVALID_STANDBY_TASK, + MISSING_PROCESS_ID, UNKNOWN_PROCESS_ID, UNKNOWN_TASK_ID } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index d7c15f9fd0d69..35351978fadca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -38,6 +38,9 @@ import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError; import org.apache.kafka.streams.processor.assignment.TaskInfo; import org.apache.kafka.streams.processor.assignment.ProcessId; import org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment; @@ -1522,6 +1525,92 @@ private void maybeScheduleFollowupRebalance(final long encodedNextScheduledRebal } } + private AssignmentError validateTaskAssignment(final ApplicationState applicationState, + final TaskAssignment taskAssignment) { + final Collection assignments = taskAssignment.assignment(); + + // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES + final Set activeTasks = new HashSet<>(); + for (final KafkaStreamsAssignment assignment : assignments) { + for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { + if (task.type() != KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) { + continue; + } + + if (activeTasks.contains(task.id())) { + return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES; + } + activeTasks.add(task.id()); + } + } + + // Check for ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS + for (final KafkaStreamsAssignment assignment : assignments) { + final Set activeTasksForAssignment = new HashSet<>(); + for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { + if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) { + activeTasksForAssignment.add(task.id()); + } + } + + for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { + if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY) { + if (activeTasksForAssignment.contains(task.id())) { + return AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS; + } + } + } + } + + // Check for INVALID_STANDBY_TASK + final Set standbyTasksInOutput = new HashSet<>(); + for (final KafkaStreamsAssignment assignment : assignments) { + for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { + if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY) { + standbyTasksInOutput.add(task.id()); + } + } + } + for (final TaskInfo task : applicationState.allTasks()) { + if (!task.isStateful() && standbyTasksInOutput.contains(task.id())) { + return AssignmentError.INVALID_STANDBY_TASK; + } + } + + // Check for MISSING_PROCESS_ID + final Map clientStates = applicationState.kafkaStreamsStates(false); + final Set clientsInOutput = assignments.stream().map(KafkaStreamsAssignment::processId) + .collect(Collectors.toSet()); + for (final Map.Entry entry : clientStates.entrySet()) { + final ProcessId processIdInInput = entry.getKey(); + if (!clientsInOutput.contains(processIdInInput)) { + return AssignmentError.MISSING_PROCESS_ID; + } + } + + // Check for UNKNOWN_PROCESS_ID + final Set clientsInInput = clientStates.entrySet().stream().map(Map.Entry::getKey) + .collect(Collectors.toSet()); + for (final ProcessId processIdInOutput : clientsInOutput) { + if (!clientsInInput.contains(processIdInOutput)) { + return AssignmentError.UNKNOWN_PROCESS_ID; + } + } + + // Check for UNKNOWN_TASK_ID + final Set taskIdsInInput = applicationState.allTasks().stream().map(TaskInfo::id) + .collect(Collectors.toSet()); + for (final KafkaStreamsAssignment assignment : assignments) { + for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { + if (!taskIdsInInput.contains(task.id())) { + return AssignmentError.UNKNOWN_TASK_ID; + } + } + } + + return AssignmentError.NONE; + } + /** * Verify that this client's host info was included in the map returned in the assignment, and trigger a * rebalance if not. This may be necessary when using static membership, as a rejoining client will be handed From 853c9d3ec63b1aac0c5a21d53e571cb3f5d66361 Mon Sep 17 00:00:00 2001 From: Antoine Pourchet Date: Tue, 28 May 2024 18:41:53 -0600 Subject: [PATCH 2/3] addressed some comments --- .../internals/StreamsPartitionAssignor.java | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 35351978fadca..6be80e08c62a1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -1529,22 +1529,19 @@ private AssignmentError validateTaskAssignment(final ApplicationState applicatio final TaskAssignment taskAssignment) { final Collection assignments = taskAssignment.assignment(); - // Check for AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES final Set activeTasks = new HashSet<>(); for (final KafkaStreamsAssignment assignment : assignments) { for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { - if (task.type() != KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) { - continue; - } - - if (activeTasks.contains(task.id())) { - return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES; + if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) { + if (activeTasks.contains(task.id())) { + log.error("Assignment is invalid: an active task was assigned multiple times: {}", task.id()); + return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES; + } + activeTasks.add(task.id()); } - activeTasks.add(task.id()); } } - // Check for ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS for (final KafkaStreamsAssignment assignment : assignments) { final Set activeTasksForAssignment = new HashSet<>(); for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { @@ -1556,13 +1553,13 @@ private AssignmentError validateTaskAssignment(final ApplicationState applicatio for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY) { if (activeTasksForAssignment.contains(task.id())) { + log.error("Assignment is invalid: both an active and standby assignment of a task were assigned to the same client: {}", task.id()); return AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS; } } } } - // Check for INVALID_STANDBY_TASK final Set standbyTasksInOutput = new HashSet<>(); for (final KafkaStreamsAssignment assignment : assignments) { for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { @@ -1573,36 +1570,35 @@ private AssignmentError validateTaskAssignment(final ApplicationState applicatio } for (final TaskInfo task : applicationState.allTasks()) { if (!task.isStateful() && standbyTasksInOutput.contains(task.id())) { + log.error("Assignment is invalid: a standby task was found for a stateless task: {}", task.id()); return AssignmentError.INVALID_STANDBY_TASK; } } - // Check for MISSING_PROCESS_ID final Map clientStates = applicationState.kafkaStreamsStates(false); final Set clientsInOutput = assignments.stream().map(KafkaStreamsAssignment::processId) .collect(Collectors.toSet()); for (final Map.Entry entry : clientStates.entrySet()) { final ProcessId processIdInInput = entry.getKey(); if (!clientsInOutput.contains(processIdInInput)) { + log.error("Assignment is invalid: one of the clients has no assignment: {}", processIdInInput.id()); return AssignmentError.MISSING_PROCESS_ID; } } - // Check for UNKNOWN_PROCESS_ID - final Set clientsInInput = clientStates.entrySet().stream().map(Map.Entry::getKey) - .collect(Collectors.toSet()); for (final ProcessId processIdInOutput : clientsInOutput) { - if (!clientsInInput.contains(processIdInOutput)) { + if (!clientStates.containsKey(processIdInOutput)) { + log.error("Assignment is invalid: one of the clients in the assignment is unknown: {}", processIdInOutput.id()); return AssignmentError.UNKNOWN_PROCESS_ID; } } - // Check for UNKNOWN_TASK_ID final Set taskIdsInInput = applicationState.allTasks().stream().map(TaskInfo::id) .collect(Collectors.toSet()); for (final KafkaStreamsAssignment assignment : assignments) { for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { if (!taskIdsInInput.contains(task.id())) { + log.error("Assignment is invalid: one of the tasks in the assignment is unknown: {}", task.id()); return AssignmentError.UNKNOWN_TASK_ID; } } From 240cedd851dcf6010b6573aada91c387538e69df Mon Sep 17 00:00:00 2001 From: Antoine Pourchet Date: Tue, 28 May 2024 18:51:35 -0600 Subject: [PATCH 3/3] fewer loops --- .../internals/StreamsPartitionAssignor.java | 44 ++++++------------- 1 file changed, 14 insertions(+), 30 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 6be80e08c62a1..1b1f7b678db7f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -1528,46 +1528,30 @@ private void maybeScheduleFollowupRebalance(final long encodedNextScheduledRebal private AssignmentError validateTaskAssignment(final ApplicationState applicationState, final TaskAssignment taskAssignment) { final Collection assignments = taskAssignment.assignment(); - - final Set activeTasks = new HashSet<>(); - for (final KafkaStreamsAssignment assignment : assignments) { - for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { - if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) { - if (activeTasks.contains(task.id())) { - log.error("Assignment is invalid: an active task was assigned multiple times: {}", task.id()); - return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES; - } - activeTasks.add(task.id()); - } - } - } - + final Set activeTasksInOutput = new HashSet<>(); + final Set standbyTasksInOutput = new HashSet<>(); for (final KafkaStreamsAssignment assignment : assignments) { - final Set activeTasksForAssignment = new HashSet<>(); + final Set tasksForAssignment = new HashSet<>(); for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { - if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) { - activeTasksForAssignment.add(task.id()); + if (activeTasksInOutput.contains(task.id()) && task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) { + log.error("Assignment is invalid: an active task was assigned multiple times: {}", task.id()); + return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES; } - } - for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { - if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY) { - if (activeTasksForAssignment.contains(task.id())) { - log.error("Assignment is invalid: both an active and standby assignment of a task were assigned to the same client: {}", task.id()); - return AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS; - } + if (tasksForAssignment.contains(task.id())) { + log.error("Assignment is invalid: both an active and standby assignment of a task were assigned to the same client: {}", task.id()); + return AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS; } - } - } - final Set standbyTasksInOutput = new HashSet<>(); - for (final KafkaStreamsAssignment assignment : assignments) { - for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) { - if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY) { + tasksForAssignment.add(task.id()); + if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) { + activeTasksInOutput.add(task.id()); + } else { standbyTasksInOutput.add(task.id()); } } } + for (final TaskInfo task : applicationState.allTasks()) { if (!task.isStateful() && standbyTasksInOutput.contains(task.id())) { log.error("Assignment is invalid: a standby task was found for a stateless task: {}", task.id());