Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15045: (KIP-924 pt. 13) AssignmentError calculation added #16114

Merged
merged 3 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public interface TaskAssignor extends Configurable {
* ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES: multiple KafkaStreams clients assigned with the same active task
* ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS: active task and standby task assigned to the same KafkaStreams client
* INVALID_STANDBY_TASK: stateless task assigned as a standby task
* MISSING_PROCESS_ID: ProcessId present in the input ApplicationState was not present in the output TaskAssignment
* UNKNOWN_PROCESS_ID: unrecognized ProcessId not matching any of the participating consumers
* UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned
*/
Expand All @@ -44,6 +45,7 @@ enum AssignmentError {
ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,
ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
INVALID_STANDBY_TASK,
MISSING_PROCESS_ID,
UNKNOWN_PROCESS_ID,
UNKNOWN_TASK_ID
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ApplicationState;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
import org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError;
import org.apache.kafka.streams.processor.assignment.TaskInfo;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.assignment.TaskAssignor.TaskAssignment;
Expand Down Expand Up @@ -1522,6 +1525,72 @@ private void maybeScheduleFollowupRebalance(final long encodedNextScheduledRebal
}
}

private AssignmentError validateTaskAssignment(final ApplicationState applicationState,
final TaskAssignment taskAssignment) {
final Collection<KafkaStreamsAssignment> assignments = taskAssignment.assignment();
final Set<TaskId> activeTasksInOutput = new HashSet<>();
final Set<TaskId> standbyTasksInOutput = new HashSet<>();
for (final KafkaStreamsAssignment assignment : assignments) {
final Set<TaskId> tasksForAssignment = new HashSet<>();
for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) {
if (activeTasksInOutput.contains(task.id()) && task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
log.error("Assignment is invalid: an active task was assigned multiple times: {}", task.id());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to include the ProcessId of both clients. We'll need to change activeTasksInOutput into a Map<TaskId, ProcessId> so we can look up the other ProcessId, then do something like this:

Suggested change
log.error("Assignment is invalid: an active task was assigned multiple times: {}", task.id());
log.error("Assignment is invalid: active task {} was assigned to multiple KafkaStreams clients: {} and {}", task.id(), assignments.processId().id(), activeTasksInOutput.get(task.id().id()));

return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
}

if (tasksForAssignment.contains(task.id())) {
log.error("Assignment is invalid: both an active and standby assignment of a task were assigned to the same client: {}", task.id());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should include the client id as well

Suggested change
log.error("Assignment is invalid: both an active and standby assignment of a task were assigned to the same client: {}", task.id());
log.error("Assignment is invalid: both an active and standby copy of task {} were assigned to KafkaStreams client {}", task.id(), assignment.processId().id());

return AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
}

tasksForAssignment.add(task.id());
if (task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
activeTasksInOutput.add(task.id());
} else {
standbyTasksInOutput.add(task.id());
}
}
}

for (final TaskInfo task : applicationState.allTasks()) {
if (!task.isStateful() && standbyTasksInOutput.contains(task.id())) {
log.error("Assignment is invalid: a standby task was found for a stateless task: {}", task.id());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, would be nice to log which client it was assigned to (which means turning standbyTasksInOutput into a Map<TaskId, ProcessId> as well). Then:

Suggested change
log.error("Assignment is invalid: a standby task was found for a stateless task: {}", task.id());
log.error("Assignment is invalid: standby task for stateless task {} was assigned to KafkaStreams client {}", task.id(), standbyTasksInOutput.get(task.id().id());

return AssignmentError.INVALID_STANDBY_TASK;
}
}

final Map<ProcessId, KafkaStreamsState> clientStates = applicationState.kafkaStreamsStates(false);
final Set<ProcessId> clientsInOutput = assignments.stream().map(KafkaStreamsAssignment::processId)
.collect(Collectors.toSet());
for (final Map.Entry<ProcessId, KafkaStreamsState> entry : clientStates.entrySet()) {
final ProcessId processIdInInput = entry.getKey();
if (!clientsInOutput.contains(processIdInInput)) {
log.error("Assignment is invalid: one of the clients has no assignment: {}", processIdInInput.id());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (clarify "KafkaStreams client" since "client" as a term is so massively overloaded):

Suggested change
log.error("Assignment is invalid: one of the clients has no assignment: {}", processIdInInput.id());
log.error("Assignment is invalid: KafkaStreams client {} has no assignment", processIdInInput.id());

return AssignmentError.MISSING_PROCESS_ID;
}
}

for (final ProcessId processIdInOutput : clientsInOutput) {
if (!clientStates.containsKey(processIdInOutput)) {
log.error("Assignment is invalid: one of the clients in the assignment is unknown: {}", processIdInOutput.id());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.error("Assignment is invalid: one of the clients in the assignment is unknown: {}", processIdInOutput.id());
log.error("Assignment is invalid: the KafkaStreams client {} is unknown", processIdInOutput.id());

return AssignmentError.UNKNOWN_PROCESS_ID;
}
}

final Set<TaskId> taskIdsInInput = applicationState.allTasks().stream().map(TaskInfo::id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, changing the ApplicationState#allTasks method to return a Map<TaskId, TaskInfo> map would let us simplify this as well. Let's make that change, I'll update the KIP.

Then we can remove this line altogether and move the if (!applicationState.allTasks().containsKey(task.id())) --> return AssignmentError.UNKNOWN_TASK_ID; inside the single loop I outlined in a comment above

.collect(Collectors.toSet());
for (final KafkaStreamsAssignment assignment : assignments) {
for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) {
if (!taskIdsInInput.contains(task.id())) {
log.error("Assignment is invalid: one of the tasks in the assignment is unknown: {}", task.id());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.error("Assignment is invalid: one of the tasks in the assignment is unknown: {}", task.id());
log.error("Assignment is invalid: task {} assigned to KafkaStreams client {} was unknown", task.id(), assignment.processId().id());

return AssignmentError.UNKNOWN_TASK_ID;
}
}
}

return AssignmentError.NONE;
}

/**
* Verify that this client's host info was included in the map returned in the assignment, and trigger a
* rebalance if not. This may be necessary when using static membership, as a rejoining client will be handed
Expand Down