From 45fcd36a2253dda6d1551de2e44522434380e5f1 Mon Sep 17 00:00:00 2001 From: jchipmunk Date: Thu, 4 Jun 2020 16:07:14 +0300 Subject: [PATCH] Reduce number of calls to Kafka Kafdrop uses a heavy topic info object in all cases, even if additional data is not needed on UI side. It is proposed to add the ability to manage enrichment of the topic info object using the following modes: * PartitionSize - specifies to enrich topic info with partition size (first offset, last offset and size) * TopicConfig - specifies to enrich topic info with topic configuration These changes are based on the following assumptions: * cluster-overview and broker-detail views do not need data about size of partitions * message-inspector and consumer-detail views do not need data about topic configuration --- .../kafdrop/controller/BrokerController.java | 2 +- .../kafdrop/controller/ClusterController.java | 4 +- .../controller/ConsumerController.java | 4 +- .../kafdrop/controller/MessageController.java | 7 +- .../kafdrop/controller/TopicController.java | 10 +-- .../service/KafkaHighLevelConsumer.java | 42 +++++------- .../java/kafdrop/service/KafkaMonitor.java | 9 ++- .../kafdrop/service/KafkaMonitorImpl.java | 67 ++++++++----------- .../java/kafdrop/service/TopicEnrichMode.java | 31 +++++++++ 9 files changed, 98 insertions(+), 78 deletions(-) create mode 100644 src/main/java/kafdrop/service/TopicEnrichMode.java diff --git a/src/main/java/kafdrop/controller/BrokerController.java b/src/main/java/kafdrop/controller/BrokerController.java index 4eb35e7f..d27007b8 100644 --- a/src/main/java/kafdrop/controller/BrokerController.java +++ b/src/main/java/kafdrop/controller/BrokerController.java @@ -40,7 +40,7 @@ public BrokerController(KafkaMonitor kafkaMonitor) { public String brokerDetails(@PathVariable("id") int brokerId, Model model) { model.addAttribute("broker", kafkaMonitor.getBroker(brokerId) .orElseThrow(() -> new BrokerNotFoundException("No such broker " + brokerId))); - model.addAttribute("topics", kafkaMonitor.getTopics()); + model.addAttribute("topics", kafkaMonitor.getTopics(TopicEnrichMode.TopicConfig)); return "broker-detail"; } diff --git a/src/main/java/kafdrop/controller/ClusterController.java b/src/main/java/kafdrop/controller/ClusterController.java index 134a08ac..5672142b 100644 --- a/src/main/java/kafdrop/controller/ClusterController.java +++ b/src/main/java/kafdrop/controller/ClusterController.java @@ -64,7 +64,7 @@ public String clusterInfo(Model model, model.addAttribute("buildProperties", buildProperties); final var brokers = kafkaMonitor.getBrokers(); - final var topics = kafkaMonitor.getTopics(); + final var topics = kafkaMonitor.getTopics(TopicEnrichMode.TopicConfig); final var clusterSummary = kafkaMonitor.getClusterSummary(topics); final var missingBrokerIds = clusterSummary.getExpectedBrokerIds().stream() @@ -91,7 +91,7 @@ public String clusterInfo(Model model, public @ResponseBody ClusterInfoVO getCluster() { final var vo = new ClusterInfoVO(); vo.brokers = kafkaMonitor.getBrokers(); - vo.topics = kafkaMonitor.getTopics(); + vo.topics = kafkaMonitor.getTopics(TopicEnrichMode.TopicConfig); vo.summary = kafkaMonitor.getClusterSummary(vo.topics); return vo; } diff --git a/src/main/java/kafdrop/controller/ConsumerController.java b/src/main/java/kafdrop/controller/ConsumerController.java index 96895123..fe623ddf 100644 --- a/src/main/java/kafdrop/controller/ConsumerController.java +++ b/src/main/java/kafdrop/controller/ConsumerController.java @@ -37,7 +37,7 @@ public ConsumerController(KafkaMonitor kafkaMonitor) { @RequestMapping("/{groupId:.+}") public String consumerDetail(@PathVariable("groupId") String groupId, Model model) throws ConsumerNotFoundException { - final var topicVos = kafkaMonitor.getTopics(); + final var topicVos = kafkaMonitor.getTopics(TopicEnrichMode.PartitionSize); final var consumer = kafkaMonitor.getConsumers(topicVos) .stream() .filter(c -> c.getGroupId().equals(groupId)) @@ -53,7 +53,7 @@ public String consumerDetail(@PathVariable("groupId") String groupId, Model mode }) @RequestMapping(path = "/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) public @ResponseBody ConsumerVO getConsumer(@PathVariable("groupId") String groupId) throws ConsumerNotFoundException { - final var topicVos = kafkaMonitor.getTopics(); + final var topicVos = kafkaMonitor.getTopics(TopicEnrichMode.PartitionSize); final var consumer = kafkaMonitor.getConsumers(topicVos) .stream() .filter(c -> c.getGroupId().equals(groupId)) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 82e0617c..5b980aa2 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -28,6 +28,7 @@ import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import kafdrop.service.TopicEnrichMode; import kafdrop.util.*; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; @@ -94,7 +95,7 @@ public String viewAllMessages(@PathVariable("name") String topicName, final int size = (count != null? count : 100); final MessageFormat defaultFormat = messageFormatProperties.getFormat(); final MessageFormat defaultKeyFormat = keyFormatProperties.getFormat(); - final TopicVO topic = kafkaMonitor.getTopic(topicName) + final TopicVO topic = kafkaMonitor.getTopic(topicName, TopicEnrichMode.PartitionSize) .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); @@ -151,7 +152,7 @@ public String viewMessageForm(@PathVariable("name") String topicName, model.addAttribute("messageForm", defaultForm); } - final TopicVO topic = kafkaMonitor.getTopic(topicName) + final TopicVO topic = kafkaMonitor.getTopic(topicName, TopicEnrichMode.PartitionSize) .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); @@ -224,7 +225,7 @@ List getPartitionOrMessages( @RequestParam(name = "msgTypeName", required = false) String msgTypeName ) { if (partition == null || offset == null || count == null) { - final TopicVO topic = kafkaMonitor.getTopic(topicName) + final TopicVO topic = kafkaMonitor.getTopic(topicName, TopicEnrichMode.PartitionSize) .orElseThrow(() -> new TopicNotFoundException(topicName)); List partitionList = new ArrayList<>(); diff --git a/src/main/java/kafdrop/controller/TopicController.java b/src/main/java/kafdrop/controller/TopicController.java index b9719af8..ff014c9e 100644 --- a/src/main/java/kafdrop/controller/TopicController.java +++ b/src/main/java/kafdrop/controller/TopicController.java @@ -31,6 +31,8 @@ import java.util.*; +import static kafdrop.service.KafkaMonitor.ALL_TOPIC_ENRICH_MODES; + /** * Handles requests for the topic page. */ @@ -49,7 +51,7 @@ public TopicController(KafkaMonitor kafkaMonitor, @RequestMapping("/{name:.+}") public String topicDetails(@PathVariable("name") String topicName, Model model) { - final var topic = kafkaMonitor.getTopic(topicName) + final var topic = kafkaMonitor.getTopic(topicName, ALL_TOPIC_ENRICH_MODES) .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); model.addAttribute("consumers", kafkaMonitor.getConsumers(Collections.singleton(topic))); @@ -92,7 +94,7 @@ public String createTopicPage(Model model) { }) @RequestMapping(path = "/{name:.+}", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) public @ResponseBody TopicVO getTopic(@PathVariable("name") String topicName) { - return kafkaMonitor.getTopic(topicName) + return kafkaMonitor.getTopic(topicName, ALL_TOPIC_ENRICH_MODES) .orElseThrow(() -> new TopicNotFoundException(topicName)); } @@ -102,7 +104,7 @@ public String createTopicPage(Model model) { }) @RequestMapping(produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) public @ResponseBody List getAllTopics() { - return kafkaMonitor.getTopics(); + return kafkaMonitor.getTopics(TopicEnrichMode.TopicConfig); } @ApiOperation(value = "getConsumers", notes = "Get consumers for a topic") @@ -112,7 +114,7 @@ public String createTopicPage(Model model) { }) @RequestMapping(path = "/{name:.+}/consumers", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) public @ResponseBody List getConsumers(@PathVariable("name") String topicName) { - final var topic = kafkaMonitor.getTopic(topicName) + final var topic = kafkaMonitor.getTopic(topicName, TopicEnrichMode.PartitionSize) .orElseThrow(() -> new TopicNotFoundException(topicName)); return kafkaMonitor.getConsumers(Collections.singleton(topic)); } diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index c714fa17..7d00b697 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -49,19 +49,16 @@ private void initializeClient() { } } - synchronized Map getPartitionSize(String topic) { + synchronized void fillPartitionSize(TopicVO topicVo) { initializeClient(); - final var partitionInfoSet = kafkaConsumer.partitionsFor(topic); - kafkaConsumer.assign(partitionInfoSet.stream() - .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), - partitionInfo.partition())) + kafkaConsumer.assign(topicVo.getPartitions().stream() + .map(partitionInfo -> new TopicPartition(topicVo.getName(), partitionInfo.getId())) .collect(Collectors.toList())); - kafkaConsumer.poll(Duration.ofMillis(0)); + final Set assignedPartitionList = kafkaConsumer.assignment(); - final TopicVO topicVO = getTopicInfo(topic); - final Map partitionsVo = topicVO.getPartitionMap(); + final Map partitionsVo = topicVo.getPartitionMap(); kafkaConsumer.seekToBeginning(assignedPartitionList); assignedPartitionList.forEach(topicPartition -> { @@ -78,7 +75,6 @@ synchronized Map getPartitionSize(String topic) { final TopicPartitionVO partitionVo = partitionsVo.get(topicPartition.partition()); partitionVo.setSize(latestOffset); }); - return partitionsVo; } /** @@ -199,29 +195,25 @@ private static String deserialize(MessageDeserializer deserializer, byte[] bytes return bytes != null ? deserializer.deserializeMessage(ByteBuffer.wrap(bytes)) : "empty"; } - synchronized Map getTopicInfos(String[] topics) { + synchronized TopicVO getTopicInfo(String topicName) { initializeClient(); - final var topicSet = kafkaConsumer.listTopics().keySet(); - if (topics.length == 0) { - topics = Arrays.copyOf(topicSet.toArray(), topicSet.size(), String[].class); - } - final var topicVos = new HashMap(topics.length, 1f); - - for (var topic : topics) { - if (topicSet.contains(topic)) { - topicVos.put(topic, getTopicInfo(topic)); - } - } + var partitionInfos = kafkaConsumer.partitionsFor(topicName); + return (partitionInfos == null) ? null : createTopicVo(topicName, partitionInfos); + } + synchronized Map getTopicInfos() { + initializeClient(); + final var topics = kafkaConsumer.listTopics(); + final var topicVos = new HashMap(topics.size(), 1f); + topics.forEach((topicName, partitionInfos) -> topicVos.put(topicName, createTopicVo(topicName, partitionInfos))); return topicVos; } - private TopicVO getTopicInfo(String topic) { - final var partitionInfoList = kafkaConsumer.partitionsFor(topic); - final var topicVo = new TopicVO(topic); + private TopicVO createTopicVo(String topicName, List partitionInfos) { + final var topicVo = new TopicVO(topicName); final var partitions = new TreeMap(); - for (var partitionInfo : partitionInfoList) { + for (var partitionInfo : partitionInfos) { final var topicPartitionVo = new TopicPartitionVO(partitionInfo.partition()); final var inSyncReplicaIds = Arrays.stream(partitionInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toSet()); final var offlineReplicaIds = Arrays.stream(partitionInfo.offlineReplicas()).map(Node::id).collect(Collectors.toSet()); diff --git a/src/main/java/kafdrop/service/KafkaMonitor.java b/src/main/java/kafdrop/service/KafkaMonitor.java index d4bcc000..ab01031a 100644 --- a/src/main/java/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/kafdrop/service/KafkaMonitor.java @@ -25,11 +25,16 @@ import java.util.*; public interface KafkaMonitor { + TopicEnrichMode[] ALL_TOPIC_ENRICH_MODES = new TopicEnrichMode[]{ + TopicEnrichMode.PartitionSize, + TopicEnrichMode.TopicConfig + }; + List getBrokers(); Optional getBroker(int id); - List getTopics(); + List getTopics(TopicEnrichMode... topicEnrichModes); /** * Returns messages for a given topic. @@ -40,7 +45,7 @@ List getMessages(String topic, int count, List getMessages(TopicPartition topicPartition, long offset, int count, Deserializers deserializers); - Optional getTopic(String topic); + Optional getTopic(String topic, TopicEnrichMode... topicEnrichModes); ClusterSummaryVO getClusterSummary(Collection topics); diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index ea403efe..52c8c35a 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -98,29 +98,34 @@ public ClusterSummaryVO getClusterSummary(Collection topics) { } @Override - public List getTopics() { - final var topicVos = getTopicMetadata().values().stream() - .sorted(Comparator.comparing(TopicVO::getName)) - .collect(Collectors.toList()); - for (var topicVo : topicVos) { - topicVo.setPartitions(getTopicPartitionSizes(topicVo)); - } - return topicVos; + public List getTopics(TopicEnrichMode... topicEnrichModes) { + final var topicVos = highLevelConsumer.getTopicInfos(); + enrichTopics(topicVos, topicEnrichModes); + return topicVos.values().stream() + .sorted(Comparator.comparing(TopicVO::getName)) + .collect(Collectors.toList()); } @Override - public Optional getTopic(String topic) { - final var topicVo = Optional.ofNullable(getTopicMetadata(topic).get(topic)); - topicVo.ifPresent(vo -> vo.setPartitions(getTopicPartitionSizes(vo))); + public Optional getTopic(String topicName, TopicEnrichMode... topicEnrichModes) { + final var topicVo = Optional.ofNullable(highLevelConsumer.getTopicInfo(topicName)); + topicVo.ifPresent(topic -> enrichTopics(Collections.singletonMap(topic.getName(), topic), topicEnrichModes)); return topicVo; } - private Map getTopicMetadata(String... topics) { - final var topicInfos = highLevelConsumer.getTopicInfos(topics); - final var retrievedTopicNames = topicInfos.keySet(); - final var topicConfigs = highLevelAdminClient.describeTopicConfigs(retrievedTopicNames); + private void enrichTopics(Map topicVos, TopicEnrichMode[] topicEnrichModes) { + var topicEnrichModeSet = Set.of(topicEnrichModes); + if (topicEnrichModeSet.contains(TopicEnrichMode.PartitionSize)) { + topicVos.values().forEach(highLevelConsumer::fillPartitionSize); + } + if (topicEnrichModeSet.contains(TopicEnrichMode.TopicConfig)) { + fillTopicConfig(topicVos); + } + } - for (var topicVo : topicInfos.values()) { + private void fillTopicConfig(Map topicVos) { + final var topicConfigs = highLevelAdminClient.describeTopicConfigs(topicVos.keySet()); + for (var topicVo : topicVos.values()) { final var config = topicConfigs.get(topicVo.getName()); if (config != null) { final var configMap = new TreeMap(); @@ -135,40 +140,28 @@ private Map getTopicMetadata(String... topics) { LOG.warn("Missing config for topic {}", topicVo.getName()); } } - return topicInfos; } @Override public List getMessages(String topic, int count, Deserializers deserializers) { final var records = highLevelConsumer.getLatestRecords(topic, count, deserializers); - if (records != null) { - final var messageVos = new ArrayList(); - for (var record : records) { - final var messageVo = new MessageVO(); - messageVo.setPartition(record.partition()); - messageVo.setOffset(record.offset()); - messageVo.setKey(record.key()); - messageVo.setMessage(record.value()); - messageVo.setHeaders(headersToMap(record.headers())); - messageVo.setTimestamp(new Date(record.timestamp())); - messageVos.add(messageVo); - } - return messageVos; - } else { - return Collections.emptyList(); - } + return convertRecortsToMessageVos(records); } @Override public List getMessages(TopicPartition topicPartition, long offset, int count, Deserializers deserializers) { final var records = highLevelConsumer.getLatestRecords(topicPartition, offset, count, deserializers); + return convertRecortsToMessageVos(records); + } + + private List convertRecortsToMessageVos(List> records) { if (records != null) { - final var messageVos = new ArrayList(); + final var messageVos = new ArrayList(records.size()); for (var record : records) { final var messageVo = new MessageVO(); - messageVo.setPartition(topicPartition.partition()); + messageVo.setPartition(record.partition()); messageVo.setOffset(record.offset()); messageVo.setKey(record.key()); messageVo.setMessage(record.value()); @@ -191,10 +184,6 @@ private static Map headersToMap(Headers headers) { return map; } - private Map getTopicPartitionSizes(TopicVO topic) { - return highLevelConsumer.getPartitionSize(topic.getName()); - } - @Override public List getConsumers(Collection topicVos) { final var topics = topicVos.stream().map(TopicVO::getName).collect(Collectors.toSet()); diff --git a/src/main/java/kafdrop/service/TopicEnrichMode.java b/src/main/java/kafdrop/service/TopicEnrichMode.java new file mode 100644 index 00000000..3ccae430 --- /dev/null +++ b/src/main/java/kafdrop/service/TopicEnrichMode.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020 Kafdrop contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package kafdrop.service; + +public enum TopicEnrichMode { + /** + * Specifies to enrich topic info with partition size (first offset, last offset and size). + */ + PartitionSize, + + /** + * Specifies to enrich topic info with topic configuration. + */ + TopicConfig +}