diff --git a/src/main/java/kafdrop/controller/BrokerController.java b/src/main/java/kafdrop/controller/BrokerController.java index 4eb35e7f..d27007b8 100644 --- a/src/main/java/kafdrop/controller/BrokerController.java +++ b/src/main/java/kafdrop/controller/BrokerController.java @@ -40,7 +40,7 @@ public BrokerController(KafkaMonitor kafkaMonitor) { public String brokerDetails(@PathVariable("id") int brokerId, Model model) { model.addAttribute("broker", kafkaMonitor.getBroker(brokerId) .orElseThrow(() -> new BrokerNotFoundException("No such broker " + brokerId))); - model.addAttribute("topics", kafkaMonitor.getTopics()); + model.addAttribute("topics", kafkaMonitor.getTopics(TopicEnrichMode.TopicConfig)); return "broker-detail"; } diff --git a/src/main/java/kafdrop/controller/ClusterController.java b/src/main/java/kafdrop/controller/ClusterController.java index 134a08ac..5672142b 100644 --- a/src/main/java/kafdrop/controller/ClusterController.java +++ b/src/main/java/kafdrop/controller/ClusterController.java @@ -64,7 +64,7 @@ public String clusterInfo(Model model, model.addAttribute("buildProperties", buildProperties); final var brokers = kafkaMonitor.getBrokers(); - final var topics = kafkaMonitor.getTopics(); + final var topics = kafkaMonitor.getTopics(TopicEnrichMode.TopicConfig); final var clusterSummary = kafkaMonitor.getClusterSummary(topics); final var missingBrokerIds = clusterSummary.getExpectedBrokerIds().stream() @@ -91,7 +91,7 @@ public String clusterInfo(Model model, public @ResponseBody ClusterInfoVO getCluster() { final var vo = new ClusterInfoVO(); vo.brokers = kafkaMonitor.getBrokers(); - vo.topics = kafkaMonitor.getTopics(); + vo.topics = kafkaMonitor.getTopics(TopicEnrichMode.TopicConfig); vo.summary = kafkaMonitor.getClusterSummary(vo.topics); return vo; } diff --git a/src/main/java/kafdrop/controller/ConsumerController.java b/src/main/java/kafdrop/controller/ConsumerController.java index 96895123..fe623ddf 100644 --- a/src/main/java/kafdrop/controller/ConsumerController.java +++ b/src/main/java/kafdrop/controller/ConsumerController.java @@ -37,7 +37,7 @@ public ConsumerController(KafkaMonitor kafkaMonitor) { @RequestMapping("/{groupId:.+}") public String consumerDetail(@PathVariable("groupId") String groupId, Model model) throws ConsumerNotFoundException { - final var topicVos = kafkaMonitor.getTopics(); + final var topicVos = kafkaMonitor.getTopics(TopicEnrichMode.PartitionSize); final var consumer = kafkaMonitor.getConsumers(topicVos) .stream() .filter(c -> c.getGroupId().equals(groupId)) @@ -53,7 +53,7 @@ public String consumerDetail(@PathVariable("groupId") String groupId, Model mode }) @RequestMapping(path = "/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) public @ResponseBody ConsumerVO getConsumer(@PathVariable("groupId") String groupId) throws ConsumerNotFoundException { - final var topicVos = kafkaMonitor.getTopics(); + final var topicVos = kafkaMonitor.getTopics(TopicEnrichMode.PartitionSize); final var consumer = kafkaMonitor.getConsumers(topicVos) .stream() .filter(c -> c.getGroupId().equals(groupId)) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 82e0617c..5b980aa2 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -28,6 +28,7 @@ import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import kafdrop.service.TopicEnrichMode; import kafdrop.util.*; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; @@ -94,7 +95,7 @@ public String viewAllMessages(@PathVariable("name") String topicName, final int size = (count != null? count : 100); final MessageFormat defaultFormat = messageFormatProperties.getFormat(); final MessageFormat defaultKeyFormat = keyFormatProperties.getFormat(); - final TopicVO topic = kafkaMonitor.getTopic(topicName) + final TopicVO topic = kafkaMonitor.getTopic(topicName, TopicEnrichMode.PartitionSize) .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); @@ -151,7 +152,7 @@ public String viewMessageForm(@PathVariable("name") String topicName, model.addAttribute("messageForm", defaultForm); } - final TopicVO topic = kafkaMonitor.getTopic(topicName) + final TopicVO topic = kafkaMonitor.getTopic(topicName, TopicEnrichMode.PartitionSize) .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); @@ -224,7 +225,7 @@ List getPartitionOrMessages( @RequestParam(name = "msgTypeName", required = false) String msgTypeName ) { if (partition == null || offset == null || count == null) { - final TopicVO topic = kafkaMonitor.getTopic(topicName) + final TopicVO topic = kafkaMonitor.getTopic(topicName, TopicEnrichMode.PartitionSize) .orElseThrow(() -> new TopicNotFoundException(topicName)); List partitionList = new ArrayList<>(); diff --git a/src/main/java/kafdrop/controller/TopicController.java b/src/main/java/kafdrop/controller/TopicController.java index b9719af8..ff014c9e 100644 --- a/src/main/java/kafdrop/controller/TopicController.java +++ b/src/main/java/kafdrop/controller/TopicController.java @@ -31,6 +31,8 @@ import java.util.*; +import static kafdrop.service.KafkaMonitor.ALL_TOPIC_ENRICH_MODES; + /** * Handles requests for the topic page. */ @@ -49,7 +51,7 @@ public TopicController(KafkaMonitor kafkaMonitor, @RequestMapping("/{name:.+}") public String topicDetails(@PathVariable("name") String topicName, Model model) { - final var topic = kafkaMonitor.getTopic(topicName) + final var topic = kafkaMonitor.getTopic(topicName, ALL_TOPIC_ENRICH_MODES) .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); model.addAttribute("consumers", kafkaMonitor.getConsumers(Collections.singleton(topic))); @@ -92,7 +94,7 @@ public String createTopicPage(Model model) { }) @RequestMapping(path = "/{name:.+}", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) public @ResponseBody TopicVO getTopic(@PathVariable("name") String topicName) { - return kafkaMonitor.getTopic(topicName) + return kafkaMonitor.getTopic(topicName, ALL_TOPIC_ENRICH_MODES) .orElseThrow(() -> new TopicNotFoundException(topicName)); } @@ -102,7 +104,7 @@ public String createTopicPage(Model model) { }) @RequestMapping(produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) public @ResponseBody List getAllTopics() { - return kafkaMonitor.getTopics(); + return kafkaMonitor.getTopics(TopicEnrichMode.TopicConfig); } @ApiOperation(value = "getConsumers", notes = "Get consumers for a topic") @@ -112,7 +114,7 @@ public String createTopicPage(Model model) { }) @RequestMapping(path = "/{name:.+}/consumers", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) public @ResponseBody List getConsumers(@PathVariable("name") String topicName) { - final var topic = kafkaMonitor.getTopic(topicName) + final var topic = kafkaMonitor.getTopic(topicName, TopicEnrichMode.PartitionSize) .orElseThrow(() -> new TopicNotFoundException(topicName)); return kafkaMonitor.getConsumers(Collections.singleton(topic)); } diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index c714fa17..7d00b697 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -49,19 +49,16 @@ private void initializeClient() { } } - synchronized Map getPartitionSize(String topic) { + synchronized void fillPartitionSize(TopicVO topicVo) { initializeClient(); - final var partitionInfoSet = kafkaConsumer.partitionsFor(topic); - kafkaConsumer.assign(partitionInfoSet.stream() - .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), - partitionInfo.partition())) + kafkaConsumer.assign(topicVo.getPartitions().stream() + .map(partitionInfo -> new TopicPartition(topicVo.getName(), partitionInfo.getId())) .collect(Collectors.toList())); - kafkaConsumer.poll(Duration.ofMillis(0)); + final Set assignedPartitionList = kafkaConsumer.assignment(); - final TopicVO topicVO = getTopicInfo(topic); - final Map partitionsVo = topicVO.getPartitionMap(); + final Map partitionsVo = topicVo.getPartitionMap(); kafkaConsumer.seekToBeginning(assignedPartitionList); assignedPartitionList.forEach(topicPartition -> { @@ -78,7 +75,6 @@ synchronized Map getPartitionSize(String topic) { final TopicPartitionVO partitionVo = partitionsVo.get(topicPartition.partition()); partitionVo.setSize(latestOffset); }); - return partitionsVo; } /** @@ -199,29 +195,25 @@ private static String deserialize(MessageDeserializer deserializer, byte[] bytes return bytes != null ? deserializer.deserializeMessage(ByteBuffer.wrap(bytes)) : "empty"; } - synchronized Map getTopicInfos(String[] topics) { + synchronized TopicVO getTopicInfo(String topicName) { initializeClient(); - final var topicSet = kafkaConsumer.listTopics().keySet(); - if (topics.length == 0) { - topics = Arrays.copyOf(topicSet.toArray(), topicSet.size(), String[].class); - } - final var topicVos = new HashMap(topics.length, 1f); - - for (var topic : topics) { - if (topicSet.contains(topic)) { - topicVos.put(topic, getTopicInfo(topic)); - } - } + var partitionInfos = kafkaConsumer.partitionsFor(topicName); + return (partitionInfos == null) ? null : createTopicVo(topicName, partitionInfos); + } + synchronized Map getTopicInfos() { + initializeClient(); + final var topics = kafkaConsumer.listTopics(); + final var topicVos = new HashMap(topics.size(), 1f); + topics.forEach((topicName, partitionInfos) -> topicVos.put(topicName, createTopicVo(topicName, partitionInfos))); return topicVos; } - private TopicVO getTopicInfo(String topic) { - final var partitionInfoList = kafkaConsumer.partitionsFor(topic); - final var topicVo = new TopicVO(topic); + private TopicVO createTopicVo(String topicName, List partitionInfos) { + final var topicVo = new TopicVO(topicName); final var partitions = new TreeMap(); - for (var partitionInfo : partitionInfoList) { + for (var partitionInfo : partitionInfos) { final var topicPartitionVo = new TopicPartitionVO(partitionInfo.partition()); final var inSyncReplicaIds = Arrays.stream(partitionInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toSet()); final var offlineReplicaIds = Arrays.stream(partitionInfo.offlineReplicas()).map(Node::id).collect(Collectors.toSet()); diff --git a/src/main/java/kafdrop/service/KafkaMonitor.java b/src/main/java/kafdrop/service/KafkaMonitor.java index d4bcc000..ab01031a 100644 --- a/src/main/java/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/kafdrop/service/KafkaMonitor.java @@ -25,11 +25,16 @@ import java.util.*; public interface KafkaMonitor { + TopicEnrichMode[] ALL_TOPIC_ENRICH_MODES = new TopicEnrichMode[]{ + TopicEnrichMode.PartitionSize, + TopicEnrichMode.TopicConfig + }; + List getBrokers(); Optional getBroker(int id); - List getTopics(); + List getTopics(TopicEnrichMode... topicEnrichModes); /** * Returns messages for a given topic. @@ -40,7 +45,7 @@ List getMessages(String topic, int count, List getMessages(TopicPartition topicPartition, long offset, int count, Deserializers deserializers); - Optional getTopic(String topic); + Optional getTopic(String topic, TopicEnrichMode... topicEnrichModes); ClusterSummaryVO getClusterSummary(Collection topics); diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index ea403efe..52c8c35a 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -98,29 +98,34 @@ public ClusterSummaryVO getClusterSummary(Collection topics) { } @Override - public List getTopics() { - final var topicVos = getTopicMetadata().values().stream() - .sorted(Comparator.comparing(TopicVO::getName)) - .collect(Collectors.toList()); - for (var topicVo : topicVos) { - topicVo.setPartitions(getTopicPartitionSizes(topicVo)); - } - return topicVos; + public List getTopics(TopicEnrichMode... topicEnrichModes) { + final var topicVos = highLevelConsumer.getTopicInfos(); + enrichTopics(topicVos, topicEnrichModes); + return topicVos.values().stream() + .sorted(Comparator.comparing(TopicVO::getName)) + .collect(Collectors.toList()); } @Override - public Optional getTopic(String topic) { - final var topicVo = Optional.ofNullable(getTopicMetadata(topic).get(topic)); - topicVo.ifPresent(vo -> vo.setPartitions(getTopicPartitionSizes(vo))); + public Optional getTopic(String topicName, TopicEnrichMode... topicEnrichModes) { + final var topicVo = Optional.ofNullable(highLevelConsumer.getTopicInfo(topicName)); + topicVo.ifPresent(topic -> enrichTopics(Collections.singletonMap(topic.getName(), topic), topicEnrichModes)); return topicVo; } - private Map getTopicMetadata(String... topics) { - final var topicInfos = highLevelConsumer.getTopicInfos(topics); - final var retrievedTopicNames = topicInfos.keySet(); - final var topicConfigs = highLevelAdminClient.describeTopicConfigs(retrievedTopicNames); + private void enrichTopics(Map topicVos, TopicEnrichMode[] topicEnrichModes) { + var topicEnrichModeSet = Set.of(topicEnrichModes); + if (topicEnrichModeSet.contains(TopicEnrichMode.PartitionSize)) { + topicVos.values().forEach(highLevelConsumer::fillPartitionSize); + } + if (topicEnrichModeSet.contains(TopicEnrichMode.TopicConfig)) { + fillTopicConfig(topicVos); + } + } - for (var topicVo : topicInfos.values()) { + private void fillTopicConfig(Map topicVos) { + final var topicConfigs = highLevelAdminClient.describeTopicConfigs(topicVos.keySet()); + for (var topicVo : topicVos.values()) { final var config = topicConfigs.get(topicVo.getName()); if (config != null) { final var configMap = new TreeMap(); @@ -135,40 +140,28 @@ private Map getTopicMetadata(String... topics) { LOG.warn("Missing config for topic {}", topicVo.getName()); } } - return topicInfos; } @Override public List getMessages(String topic, int count, Deserializers deserializers) { final var records = highLevelConsumer.getLatestRecords(topic, count, deserializers); - if (records != null) { - final var messageVos = new ArrayList(); - for (var record : records) { - final var messageVo = new MessageVO(); - messageVo.setPartition(record.partition()); - messageVo.setOffset(record.offset()); - messageVo.setKey(record.key()); - messageVo.setMessage(record.value()); - messageVo.setHeaders(headersToMap(record.headers())); - messageVo.setTimestamp(new Date(record.timestamp())); - messageVos.add(messageVo); - } - return messageVos; - } else { - return Collections.emptyList(); - } + return convertRecortsToMessageVos(records); } @Override public List getMessages(TopicPartition topicPartition, long offset, int count, Deserializers deserializers) { final var records = highLevelConsumer.getLatestRecords(topicPartition, offset, count, deserializers); + return convertRecortsToMessageVos(records); + } + + private List convertRecortsToMessageVos(List> records) { if (records != null) { - final var messageVos = new ArrayList(); + final var messageVos = new ArrayList(records.size()); for (var record : records) { final var messageVo = new MessageVO(); - messageVo.setPartition(topicPartition.partition()); + messageVo.setPartition(record.partition()); messageVo.setOffset(record.offset()); messageVo.setKey(record.key()); messageVo.setMessage(record.value()); @@ -191,10 +184,6 @@ private static Map headersToMap(Headers headers) { return map; } - private Map getTopicPartitionSizes(TopicVO topic) { - return highLevelConsumer.getPartitionSize(topic.getName()); - } - @Override public List getConsumers(Collection topicVos) { final var topics = topicVos.stream().map(TopicVO::getName).collect(Collectors.toSet()); diff --git a/src/main/java/kafdrop/service/TopicEnrichMode.java b/src/main/java/kafdrop/service/TopicEnrichMode.java new file mode 100644 index 00000000..3ccae430 --- /dev/null +++ b/src/main/java/kafdrop/service/TopicEnrichMode.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020 Kafdrop contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package kafdrop.service; + +public enum TopicEnrichMode { + /** + * Specifies to enrich topic info with partition size (first offset, last offset and size). + */ + PartitionSize, + + /** + * Specifies to enrich topic info with topic configuration. + */ + TopicConfig +}