Skip to content

Commit

Permalink
only fetch topic data that are part of the consumer groupId
Browse files Browse the repository at this point in the history
  • Loading branch information
Jork Zijlstra committed Feb 1, 2022
1 parent e084812 commit 2d1cf32
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 26 deletions.
16 changes: 6 additions & 10 deletions src/main/java/kafdrop/controller/ConsumerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.springframework.ui.*;
import org.springframework.web.bind.annotation.*;

import java.util.Optional;

@Controller
@RequestMapping("/consumer")
public final class ConsumerController {
Expand All @@ -37,11 +39,8 @@ public ConsumerController(KafkaMonitor kafkaMonitor) {

@RequestMapping("/{groupId:.+}")
public String consumerDetail(@PathVariable("groupId") String groupId, Model model) throws ConsumerNotFoundException {
final var topicVos = kafkaMonitor.getTopicsWithOffsets();
final var consumer = kafkaMonitor.getConsumers(topicVos)
.stream()
.filter(c -> c.getGroupId().equals(groupId))
.findAny();
final var consumer = kafkaMonitor.getConsumersByGroup(groupId).stream().findAny();

model.addAttribute("consumer", consumer.orElseThrow(() -> new ConsumerNotFoundException(groupId)));
return "consumer-detail";
}
Expand All @@ -53,11 +52,8 @@ public String consumerDetail(@PathVariable("groupId") String groupId, Model mode
})
@GetMapping(path = "/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody ConsumerVO getConsumer(@PathVariable("groupId") String groupId) throws ConsumerNotFoundException {
final var topicVos = kafkaMonitor.getTopics();
final var consumer = kafkaMonitor.getConsumers(topicVos)
.stream()
.filter(c -> c.getGroupId().equals(groupId))
.findAny();
final var consumer = kafkaMonitor.getConsumersByGroup(groupId).stream().findAny();

return consumer.orElseThrow(() -> new ConsumerNotFoundException(groupId));
}
}
4 changes: 2 additions & 2 deletions src/main/java/kafdrop/controller/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public String topicDetails(@PathVariable("name") String topicName, Model model)
final var topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));
model.addAttribute("topic", topic);
model.addAttribute("consumers", kafkaMonitor.getConsumers(Collections.singleton(topic)));
model.addAttribute("consumers", kafkaMonitor.getConsumersByTopics(Collections.singleton(topic)));
model.addAttribute("topicDeleteEnabled", topicDeleteEnabled);
model.addAttribute("keyFormat", defaultKeyFormat);
model.addAttribute("format", defaultFormat);
Expand Down Expand Up @@ -125,7 +125,7 @@ public String createTopicPage(Model model) {
public @ResponseBody List<ConsumerVO> getConsumers(@PathVariable("name") String topicName) {
final var topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));
return kafkaMonitor.getConsumers(Collections.singleton(topic));
return kafkaMonitor.getConsumersByTopics(Collections.singleton(topic));
}

/**
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/kafdrop/service/KafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ public interface KafkaMonitor {

List<TopicVO> getTopics();

List<TopicVO> getTopicsWithOffsets();

/**
* Returns messages for a given topic.
*/
Expand All @@ -46,7 +44,9 @@ List<MessageVO> getMessages(TopicPartition topicPartition, long offset, int coun

ClusterSummaryVO getClusterSummary(Collection<TopicVO> topics);

List<ConsumerVO> getConsumers(Collection<TopicVO> topicVos);
List<ConsumerVO> getConsumersByGroup(String groupId);

List<ConsumerVO> getConsumersByTopics(Collection<TopicVO> topicVos);

/**
* Create topic
Expand Down
36 changes: 25 additions & 11 deletions src/main/java/kafdrop/service/KafkaMonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,26 +106,20 @@ public List<TopicVO> getTopics() {
return topicVos;
}

@Override
public List<TopicVO> getTopicsWithOffsets() {
public List<TopicVO> getTopics(String[] topics) {
Map<String, List<PartitionInfo>> topicsMap = highLevelConsumer.getAllTopics();

final var topicVos = getTopicMetadata(topicsMap).values().stream()
.sorted(Comparator.comparing(TopicVO::getName))
.collect(Collectors.toList());

ArrayList<TopicVO> topicVos = new ArrayList<>(getTopicMetadata(topicsMap, topics).values());
setTopicPartitionSizes(topicVos);

return topicVos;
}

@Override
public Optional<TopicVO> getTopic(String topic) {
Map<String, List<PartitionInfo>> topicsMap = highLevelConsumer.getAllTopics();
String[] topics = { topic };

final var topicVo = Optional.ofNullable(getTopicMetadata(topicsMap, topic).get(topic));
topicVo.ifPresent(vo -> setTopicPartitionSizes(Collections.singletonList(vo)));
return topicVo;
return getTopics(topics).stream().findAny();
}

private Map<String, TopicVO> getTopicMetadata(Map<String, List<PartitionInfo>> allTopicsMap, String... topics) {
Expand Down Expand Up @@ -209,7 +203,23 @@ private void setTopicPartitionSizes(List<TopicVO> topics) {
}

@Override
public List<ConsumerVO> getConsumers(Collection<TopicVO> topicVos) {
public List<ConsumerVO> getConsumersByGroup(String groupId) {
List<ConsumerGroupOffsets> consumerGroupOffsets = getConsumerOffsets(groupId);

String[] uniqueTopicNames = consumerGroupOffsets.stream()
.flatMap(consumerGroupOffset -> consumerGroupOffset.offsets.keySet().stream().map(TopicPartition::topic))
.distinct()
.toArray(String[]::new);

List<TopicVO> topicVOs = getTopics(uniqueTopicNames);

LOG.debug("consumerGroupOffsets: {}", consumerGroupOffsets);
LOG.debug("topicVos: {}", topicVOs);
return convert(consumerGroupOffsets, topicVOs);
}

@Override
public List<ConsumerVO> getConsumersByTopics(Collection<TopicVO> topicVos) {
final var topics = topicVos.stream().map(TopicVO::getName).collect(Collectors.toSet());
final var consumerGroupOffsets = getConsumerOffsets(topics);
LOG.debug("consumerGroupOffsets: {}", consumerGroupOffsets);
Expand Down Expand Up @@ -321,6 +331,10 @@ private ConsumerGroupOffsets resolveOffsets(String groupId) {
return new ConsumerGroupOffsets(groupId, highLevelAdminClient.listConsumerGroupOffsetsIfAuthorized(groupId));
}

private List<ConsumerGroupOffsets> getConsumerOffsets(String groupId) {
return Collections.singletonList(resolveOffsets(groupId));
}

private List<ConsumerGroupOffsets> getConsumerOffsets(Set<String> topics) {
final var consumerGroups = highLevelAdminClient.listConsumerGroups();
return consumerGroups.stream()
Expand Down

0 comments on commit 2d1cf32

Please sign in to comment.