diff --git a/src/main/java/kafdrop/controller/ConsumerController.java b/src/main/java/kafdrop/controller/ConsumerController.java index b1c76067..3b11ddaf 100644 --- a/src/main/java/kafdrop/controller/ConsumerController.java +++ b/src/main/java/kafdrop/controller/ConsumerController.java @@ -37,11 +37,8 @@ public ConsumerController(KafkaMonitor kafkaMonitor) { @RequestMapping("/{groupId:.+}") public String consumerDetail(@PathVariable("groupId") String groupId, Model model) throws ConsumerNotFoundException { - final var topicVos = kafkaMonitor.getTopicsWithOffsets(); - final var consumer = kafkaMonitor.getConsumers(topicVos) - .stream() - .filter(c -> c.getGroupId().equals(groupId)) - .findAny(); + final var consumer = kafkaMonitor.getConsumersByGroup(groupId).stream().findAny(); + model.addAttribute("consumer", consumer.orElseThrow(() -> new ConsumerNotFoundException(groupId))); return "consumer-detail"; } @@ -53,11 +50,8 @@ public String consumerDetail(@PathVariable("groupId") String groupId, Model mode }) @GetMapping(path = "/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody ConsumerVO getConsumer(@PathVariable("groupId") String groupId) throws ConsumerNotFoundException { - final var topicVos = kafkaMonitor.getTopics(); - final var consumer = kafkaMonitor.getConsumers(topicVos) - .stream() - .filter(c -> c.getGroupId().equals(groupId)) - .findAny(); + final var consumer = kafkaMonitor.getConsumersByGroup(groupId).stream().findAny(); + return consumer.orElseThrow(() -> new ConsumerNotFoundException(groupId)); } } \ No newline at end of file diff --git a/src/main/java/kafdrop/controller/TopicController.java b/src/main/java/kafdrop/controller/TopicController.java index ba9ed7c4..09a946c0 100644 --- a/src/main/java/kafdrop/controller/TopicController.java +++ b/src/main/java/kafdrop/controller/TopicController.java @@ -60,7 +60,7 @@ public String topicDetails(@PathVariable("name") String topicName, Model model) final var topic = kafkaMonitor.getTopic(topicName) .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); - model.addAttribute("consumers", kafkaMonitor.getConsumers(Collections.singleton(topic))); + model.addAttribute("consumers", kafkaMonitor.getConsumersByTopics(Collections.singleton(topic))); model.addAttribute("topicDeleteEnabled", topicDeleteEnabled); model.addAttribute("keyFormat", defaultKeyFormat); model.addAttribute("format", defaultFormat); @@ -125,7 +125,7 @@ public String createTopicPage(Model model) { public @ResponseBody List getConsumers(@PathVariable("name") String topicName) { final var topic = kafkaMonitor.getTopic(topicName) .orElseThrow(() -> new TopicNotFoundException(topicName)); - return kafkaMonitor.getConsumers(Collections.singleton(topic)); + return kafkaMonitor.getConsumersByTopics(Collections.singleton(topic)); } /** diff --git a/src/main/java/kafdrop/service/KafkaMonitor.java b/src/main/java/kafdrop/service/KafkaMonitor.java index 860f10c5..137ac4f3 100644 --- a/src/main/java/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/kafdrop/service/KafkaMonitor.java @@ -31,8 +31,6 @@ public interface KafkaMonitor { List getTopics(); - List getTopicsWithOffsets(); - /** * Returns messages for a given topic. */ @@ -46,7 +44,9 @@ List getMessages(TopicPartition topicPartition, long offset, int coun ClusterSummaryVO getClusterSummary(Collection topics); - List getConsumers(Collection topicVos); + List getConsumersByGroup(String groupId); + + List getConsumersByTopics(Collection topicVos); /** * Create topic diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index e9c73489..ab87593a 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -106,14 +106,10 @@ public List getTopics() { return topicVos; } - @Override - public List getTopicsWithOffsets() { + public List getTopics(String[] topics) { Map> topicsMap = highLevelConsumer.getAllTopics(); - final var topicVos = getTopicMetadata(topicsMap).values().stream() - .sorted(Comparator.comparing(TopicVO::getName)) - .collect(Collectors.toList()); - + ArrayList topicVos = new ArrayList<>(getTopicMetadata(topicsMap, topics).values()); setTopicPartitionSizes(topicVos); return topicVos; @@ -121,11 +117,9 @@ public List getTopicsWithOffsets() { @Override public Optional getTopic(String topic) { - Map> topicsMap = highLevelConsumer.getAllTopics(); + String[] topics = { topic }; - final var topicVo = Optional.ofNullable(getTopicMetadata(topicsMap, topic).get(topic)); - topicVo.ifPresent(vo -> setTopicPartitionSizes(Collections.singletonList(vo))); - return topicVo; + return getTopics(topics).stream().findAny(); } private Map getTopicMetadata(Map> allTopicsMap, String... topics) { @@ -209,7 +203,24 @@ private void setTopicPartitionSizes(List topics) { } @Override - public List getConsumers(Collection topicVos) { + public List getConsumersByGroup(String groupId) { + List consumerGroupOffsets = getConsumerOffsets(groupId); + + String[] uniqueTopicNames = consumerGroupOffsets.stream() + .flatMap(consumerGroupOffset -> consumerGroupOffset.offsets.keySet() + .stream().map(TopicPartition::topic)) + .distinct() + .toArray(String[]::new); + + List topicVOs = getTopics(uniqueTopicNames); + + LOG.debug("consumerGroupOffsets: {}", consumerGroupOffsets); + LOG.debug("topicVos: {}", topicVOs); + return convert(consumerGroupOffsets, topicVOs); + } + + @Override + public List getConsumersByTopics(Collection topicVos) { final var topics = topicVos.stream().map(TopicVO::getName).collect(Collectors.toSet()); final var consumerGroupOffsets = getConsumerOffsets(topics); LOG.debug("consumerGroupOffsets: {}", consumerGroupOffsets); @@ -321,6 +332,10 @@ private ConsumerGroupOffsets resolveOffsets(String groupId) { return new ConsumerGroupOffsets(groupId, highLevelAdminClient.listConsumerGroupOffsetsIfAuthorized(groupId)); } + private List getConsumerOffsets(String groupId) { + return Collections.singletonList(resolveOffsets(groupId)); + } + private List getConsumerOffsets(Set topics) { final var consumerGroups = highLevelAdminClient.listConsumerGroups(); return consumerGroups.stream()