Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18613: Improve test coverage for missing topics #19189

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -2246,25 +2246,27 @@ private StreamsTopology maybeUpdateTopology(final String groupId,
final Topology topology,
final StreamsGroup group,
final List<CoordinatorRecord> records) {
StreamsTopology updatedTopology;
if (topology != null) {
StreamsGroupTopologyValue recordValue = convertToStreamsGroupTopologyRecord(topology);

updatedTopology = StreamsTopology.fromHeartbeatRequest(topology);

StreamsTopology streamsTopologyFromRequest = StreamsTopology.fromHeartbeatRequest(topology);
if (group.topology().isEmpty()) {
log.info("[GroupId {}][MemberId {}] Member initialized the topology with epoch {}", groupId, memberId, topology.epoch());

StreamsGroupTopologyValue recordValue = convertToStreamsGroupTopologyRecord(topology);
records.add(newStreamsGroupTopologyRecord(groupId, recordValue));
} else if (!updatedTopology.equals(group.topology().get())) {
return streamsTopologyFromRequest;
} else if (group.topology().get().topologyEpoch() > topology.epoch()) {
log.info("[GroupId {}][MemberId {}] Member joined with stake topology epoch {}", groupId, memberId, topology.epoch());
return group.topology().get();
} else if (!group.topology().get().equals(streamsTopologyFromRequest)) {
throw new InvalidRequestException("Topology updates are not supported yet.");
} else {
log.debug("[GroupId {}][MemberId {}] Member joined with currently initialized topology {}", groupId, memberId, topology.epoch());
return group.topology().get();
}
} else if (group.topology().isPresent()) {
updatedTopology = group.topology().get();
return group.topology().get();
} else {
throw new IllegalStateException("The topology is null and the group topology is also null.");
}
return updatedTopology;
}

private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGroupHeartbeatResponseTaskIds(final Map<String, Set<Integer>> taskIds) {
Original file line number Diff line number Diff line change
@@ -156,7 +156,6 @@ private static Map<String, Integer> decidePartitionCounts(final LogContext logCo
enforceCopartitioning(
topology,
copartitionGroupsBySubtopology,
log,
decidedPartitionCountsForInternalTopics,
copartitionedTopicsEnforcer
);
@@ -168,7 +167,6 @@ private static Map<String, Integer> decidePartitionCounts(final LogContext logCo

private static void enforceCopartitioning(final StreamsTopology topology,
final Map<String, Collection<Set<String>>> copartitionGroupsBySubtopology,
final Logger log,
final Map<String, Integer> decidedPartitionCountsForInternalTopics,
final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer) {
final Set<String> fixedRepartitionTopics =
@@ -180,17 +178,13 @@ private static void enforceCopartitioning(final StreamsTopology topology,
x.repartitionSourceTopics().stream().filter(y -> y.partitions() == 0)
).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet());

if (fixedRepartitionTopics.isEmpty() && flexibleRepartitionTopics.isEmpty()) {
log.info("Skipping the repartition topic validation since there are no repartition topics.");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping here is actually not correct - we need to enforce copartitioning also for source topics.

} else {
// ensure the co-partitioning topics within the group have the same number of partitions,
// and enforce the number of partitions for those repartition topics to be the same if they
// are co-partitioned as well.
for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) {
for (Set<String> copartitionGroup : copartitionGroups) {
decidedPartitionCountsForInternalTopics.putAll(
copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics));
}
// ensure the co-partitioning topics within the group have the same number of partitions,
// and enforce the number of partitions for those repartition topics to be the same if they
// are co-partitioned as well.
for (Collection<Set<String>> copartitionGroups : copartitionGroupsBySubtopology.values()) {
for (Set<String> copartitionGroup : copartitionGroups) {
decidedPartitionCountsForInternalTopics.putAll(
copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics));
}
}
}
Loading