Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce number of calls to Kafka #142

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/kafdrop/controller/BrokerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public BrokerController(KafkaMonitor kafkaMonitor) {
public String brokerDetails(@PathVariable("id") int brokerId, Model model) {
model.addAttribute("broker", kafkaMonitor.getBroker(brokerId)
.orElseThrow(() -> new BrokerNotFoundException("No such broker " + brokerId)));
model.addAttribute("topics", kafkaMonitor.getTopics());
model.addAttribute("topics", kafkaMonitor.getTopics(TopicEnrichMode.TopicConfig));
return "broker-detail";
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/kafdrop/controller/ClusterController.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public String clusterInfo(Model model,
model.addAttribute("buildProperties", buildProperties);

final var brokers = kafkaMonitor.getBrokers();
final var topics = kafkaMonitor.getTopics();
final var topics = kafkaMonitor.getTopics(TopicEnrichMode.TopicConfig);
final var clusterSummary = kafkaMonitor.getClusterSummary(topics);

final var missingBrokerIds = clusterSummary.getExpectedBrokerIds().stream()
Expand All @@ -91,7 +91,7 @@ public String clusterInfo(Model model,
public @ResponseBody ClusterInfoVO getCluster() {
final var vo = new ClusterInfoVO();
vo.brokers = kafkaMonitor.getBrokers();
vo.topics = kafkaMonitor.getTopics();
vo.topics = kafkaMonitor.getTopics(TopicEnrichMode.TopicConfig);
vo.summary = kafkaMonitor.getClusterSummary(vo.topics);
return vo;
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/kafdrop/controller/ConsumerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public ConsumerController(KafkaMonitor kafkaMonitor) {

@RequestMapping("/{groupId:.+}")
public String consumerDetail(@PathVariable("groupId") String groupId, Model model) throws ConsumerNotFoundException {
final var topicVos = kafkaMonitor.getTopics();
final var topicVos = kafkaMonitor.getTopics(TopicEnrichMode.PartitionSize);
final var consumer = kafkaMonitor.getConsumers(topicVos)
.stream()
.filter(c -> c.getGroupId().equals(groupId))
Expand All @@ -53,7 +53,7 @@ public String consumerDetail(@PathVariable("groupId") String groupId, Model mode
})
@RequestMapping(path = "/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET)
public @ResponseBody ConsumerVO getConsumer(@PathVariable("groupId") String groupId) throws ConsumerNotFoundException {
final var topicVos = kafkaMonitor.getTopics();
final var topicVos = kafkaMonitor.getTopics(TopicEnrichMode.PartitionSize);
final var consumer = kafkaMonitor.getConsumers(topicVos)
.stream()
.filter(c -> c.getGroupId().equals(groupId))
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import kafdrop.service.TopicEnrichMode;
import kafdrop.util.*;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
Expand Down Expand Up @@ -94,7 +95,7 @@ public String viewAllMessages(@PathVariable("name") String topicName,
final int size = (count != null? count : 100);
final MessageFormat defaultFormat = messageFormatProperties.getFormat();
final MessageFormat defaultKeyFormat = keyFormatProperties.getFormat();
final TopicVO topic = kafkaMonitor.getTopic(topicName)
final TopicVO topic = kafkaMonitor.getTopic(topicName, TopicEnrichMode.PartitionSize)
.orElseThrow(() -> new TopicNotFoundException(topicName));

model.addAttribute("topic", topic);
Expand Down Expand Up @@ -151,7 +152,7 @@ public String viewMessageForm(@PathVariable("name") String topicName,
model.addAttribute("messageForm", defaultForm);
}

final TopicVO topic = kafkaMonitor.getTopic(topicName)
final TopicVO topic = kafkaMonitor.getTopic(topicName, TopicEnrichMode.PartitionSize)
.orElseThrow(() -> new TopicNotFoundException(topicName));
model.addAttribute("topic", topic);

Expand Down Expand Up @@ -224,7 +225,7 @@ List<Object> getPartitionOrMessages(
@RequestParam(name = "msgTypeName", required = false) String msgTypeName
) {
if (partition == null || offset == null || count == null) {
final TopicVO topic = kafkaMonitor.getTopic(topicName)
final TopicVO topic = kafkaMonitor.getTopic(topicName, TopicEnrichMode.PartitionSize)
.orElseThrow(() -> new TopicNotFoundException(topicName));

List<Object> partitionList = new ArrayList<>();
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/kafdrop/controller/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

import java.util.*;

import static kafdrop.service.KafkaMonitor.ALL_TOPIC_ENRICH_MODES;

/**
* Handles requests for the topic page.
*/
Expand All @@ -49,7 +51,7 @@ public TopicController(KafkaMonitor kafkaMonitor,

@RequestMapping("/{name:.+}")
public String topicDetails(@PathVariable("name") String topicName, Model model) {
final var topic = kafkaMonitor.getTopic(topicName)
final var topic = kafkaMonitor.getTopic(topicName, ALL_TOPIC_ENRICH_MODES)
.orElseThrow(() -> new TopicNotFoundException(topicName));
model.addAttribute("topic", topic);
model.addAttribute("consumers", kafkaMonitor.getConsumers(Collections.singleton(topic)));
Expand Down Expand Up @@ -92,7 +94,7 @@ public String createTopicPage(Model model) {
})
@RequestMapping(path = "/{name:.+}", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET)
public @ResponseBody TopicVO getTopic(@PathVariable("name") String topicName) {
return kafkaMonitor.getTopic(topicName)
return kafkaMonitor.getTopic(topicName, ALL_TOPIC_ENRICH_MODES)
.orElseThrow(() -> new TopicNotFoundException(topicName));
}

Expand All @@ -102,7 +104,7 @@ public String createTopicPage(Model model) {
})
@RequestMapping(produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET)
public @ResponseBody List<TopicVO> getAllTopics() {
return kafkaMonitor.getTopics();
return kafkaMonitor.getTopics(TopicEnrichMode.TopicConfig);
}

@ApiOperation(value = "getConsumers", notes = "Get consumers for a topic")
Expand All @@ -112,7 +114,7 @@ public String createTopicPage(Model model) {
})
@RequestMapping(path = "/{name:.+}/consumers", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET)
public @ResponseBody List<ConsumerVO> getConsumers(@PathVariable("name") String topicName) {
final var topic = kafkaMonitor.getTopic(topicName)
final var topic = kafkaMonitor.getTopic(topicName, TopicEnrichMode.PartitionSize)
.orElseThrow(() -> new TopicNotFoundException(topicName));
return kafkaMonitor.getConsumers(Collections.singleton(topic));
}
Expand Down
42 changes: 17 additions & 25 deletions src/main/java/kafdrop/service/KafkaHighLevelConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,16 @@ private void initializeClient() {
}
}

synchronized Map<Integer, TopicPartitionVO> getPartitionSize(String topic) {
synchronized void fillPartitionSize(TopicVO topicVo) {
initializeClient();

final var partitionInfoSet = kafkaConsumer.partitionsFor(topic);
kafkaConsumer.assign(partitionInfoSet.stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(),
partitionInfo.partition()))
kafkaConsumer.assign(topicVo.getPartitions().stream()
.map(partitionInfo -> new TopicPartition(topicVo.getName(), partitionInfo.getId()))
.collect(Collectors.toList()));

kafkaConsumer.poll(Duration.ofMillis(0));

final Set<TopicPartition> assignedPartitionList = kafkaConsumer.assignment();
final TopicVO topicVO = getTopicInfo(topic);
final Map<Integer, TopicPartitionVO> partitionsVo = topicVO.getPartitionMap();
final Map<Integer, TopicPartitionVO> partitionsVo = topicVo.getPartitionMap();

kafkaConsumer.seekToBeginning(assignedPartitionList);
assignedPartitionList.forEach(topicPartition -> {
Expand All @@ -78,7 +75,6 @@ synchronized Map<Integer, TopicPartitionVO> getPartitionSize(String topic) {
final TopicPartitionVO partitionVo = partitionsVo.get(topicPartition.partition());
partitionVo.setSize(latestOffset);
});
return partitionsVo;
}

/**
Expand Down Expand Up @@ -199,29 +195,25 @@ private static String deserialize(MessageDeserializer deserializer, byte[] bytes
return bytes != null ? deserializer.deserializeMessage(ByteBuffer.wrap(bytes)) : "empty";
}

synchronized Map<String, TopicVO> getTopicInfos(String[] topics) {
synchronized TopicVO getTopicInfo(String topicName) {
initializeClient();
final var topicSet = kafkaConsumer.listTopics().keySet();
if (topics.length == 0) {
topics = Arrays.copyOf(topicSet.toArray(), topicSet.size(), String[].class);
}
final var topicVos = new HashMap<String, TopicVO>(topics.length, 1f);

for (var topic : topics) {
if (topicSet.contains(topic)) {
topicVos.put(topic, getTopicInfo(topic));
}
}
var partitionInfos = kafkaConsumer.partitionsFor(topicName);
return (partitionInfos == null) ? null : createTopicVo(topicName, partitionInfos);
}

synchronized Map<String, TopicVO> getTopicInfos() {
initializeClient();
final var topics = kafkaConsumer.listTopics();
final var topicVos = new HashMap<String, TopicVO>(topics.size(), 1f);
topics.forEach((topicName, partitionInfos) -> topicVos.put(topicName, createTopicVo(topicName, partitionInfos)));
return topicVos;
}

private TopicVO getTopicInfo(String topic) {
final var partitionInfoList = kafkaConsumer.partitionsFor(topic);
final var topicVo = new TopicVO(topic);
private TopicVO createTopicVo(String topicName, List<PartitionInfo> partitionInfos) {
final var topicVo = new TopicVO(topicName);
final var partitions = new TreeMap<Integer, TopicPartitionVO>();

for (var partitionInfo : partitionInfoList) {
for (var partitionInfo : partitionInfos) {
final var topicPartitionVo = new TopicPartitionVO(partitionInfo.partition());
final var inSyncReplicaIds = Arrays.stream(partitionInfo.inSyncReplicas()).map(Node::id).collect(Collectors.toSet());
final var offlineReplicaIds = Arrays.stream(partitionInfo.offlineReplicas()).map(Node::id).collect(Collectors.toSet());
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/kafdrop/service/KafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@
import java.util.*;

public interface KafkaMonitor {
TopicEnrichMode[] ALL_TOPIC_ENRICH_MODES = new TopicEnrichMode[]{
TopicEnrichMode.PartitionSize,
TopicEnrichMode.TopicConfig
};

List<BrokerVO> getBrokers();

Optional<BrokerVO> getBroker(int id);

List<TopicVO> getTopics();
List<TopicVO> getTopics(TopicEnrichMode... topicEnrichModes);

/**
* Returns messages for a given topic.
Expand All @@ -40,7 +45,7 @@ List<MessageVO> getMessages(String topic, int count,
List<MessageVO> getMessages(TopicPartition topicPartition, long offset, int count,
Deserializers deserializers);

Optional<TopicVO> getTopic(String topic);
Optional<TopicVO> getTopic(String topic, TopicEnrichMode... topicEnrichModes);

ClusterSummaryVO getClusterSummary(Collection<TopicVO> topics);

Expand Down
67 changes: 28 additions & 39 deletions src/main/java/kafdrop/service/KafkaMonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,29 +98,34 @@ public ClusterSummaryVO getClusterSummary(Collection<TopicVO> topics) {
}

@Override
public List<TopicVO> getTopics() {
final var topicVos = getTopicMetadata().values().stream()
.sorted(Comparator.comparing(TopicVO::getName))
.collect(Collectors.toList());
for (var topicVo : topicVos) {
topicVo.setPartitions(getTopicPartitionSizes(topicVo));
}
return topicVos;
public List<TopicVO> getTopics(TopicEnrichMode... topicEnrichModes) {
final var topicVos = highLevelConsumer.getTopicInfos();
enrichTopics(topicVos, topicEnrichModes);
return topicVos.values().stream()
.sorted(Comparator.comparing(TopicVO::getName))
.collect(Collectors.toList());
}

@Override
public Optional<TopicVO> getTopic(String topic) {
final var topicVo = Optional.ofNullable(getTopicMetadata(topic).get(topic));
topicVo.ifPresent(vo -> vo.setPartitions(getTopicPartitionSizes(vo)));
public Optional<TopicVO> getTopic(String topicName, TopicEnrichMode... topicEnrichModes) {
final var topicVo = Optional.ofNullable(highLevelConsumer.getTopicInfo(topicName));
topicVo.ifPresent(topic -> enrichTopics(Collections.singletonMap(topic.getName(), topic), topicEnrichModes));
return topicVo;
}

private Map<String, TopicVO> getTopicMetadata(String... topics) {
final var topicInfos = highLevelConsumer.getTopicInfos(topics);
final var retrievedTopicNames = topicInfos.keySet();
final var topicConfigs = highLevelAdminClient.describeTopicConfigs(retrievedTopicNames);
private void enrichTopics(Map<String, TopicVO> topicVos, TopicEnrichMode[] topicEnrichModes) {
var topicEnrichModeSet = Set.of(topicEnrichModes);
if (topicEnrichModeSet.contains(TopicEnrichMode.PartitionSize)) {
topicVos.values().forEach(highLevelConsumer::fillPartitionSize);
}
if (topicEnrichModeSet.contains(TopicEnrichMode.TopicConfig)) {
fillTopicConfig(topicVos);
}
}

for (var topicVo : topicInfos.values()) {
private void fillTopicConfig(Map<String, TopicVO> topicVos) {
final var topicConfigs = highLevelAdminClient.describeTopicConfigs(topicVos.keySet());
for (var topicVo : topicVos.values()) {
final var config = topicConfigs.get(topicVo.getName());
if (config != null) {
final var configMap = new TreeMap<String, String>();
Expand All @@ -135,40 +140,28 @@ private Map<String, TopicVO> getTopicMetadata(String... topics) {
LOG.warn("Missing config for topic {}", topicVo.getName());
}
}
return topicInfos;
}

@Override
public List<MessageVO> getMessages(String topic, int count,
Deserializers deserializers) {
final var records = highLevelConsumer.getLatestRecords(topic, count, deserializers);
if (records != null) {
final var messageVos = new ArrayList<MessageVO>();
for (var record : records) {
final var messageVo = new MessageVO();
messageVo.setPartition(record.partition());
messageVo.setOffset(record.offset());
messageVo.setKey(record.key());
messageVo.setMessage(record.value());
messageVo.setHeaders(headersToMap(record.headers()));
messageVo.setTimestamp(new Date(record.timestamp()));
messageVos.add(messageVo);
}
return messageVos;
} else {
return Collections.emptyList();
}
return convertRecortsToMessageVos(records);
}

@Override
public List<MessageVO> getMessages(TopicPartition topicPartition, long offset, int count,
Deserializers deserializers) {
final var records = highLevelConsumer.getLatestRecords(topicPartition, offset, count, deserializers);
return convertRecortsToMessageVos(records);
}

private List<MessageVO> convertRecortsToMessageVos(List<ConsumerRecord<String, String>> records) {
if (records != null) {
final var messageVos = new ArrayList<MessageVO>();
final var messageVos = new ArrayList<MessageVO>(records.size());
for (var record : records) {
final var messageVo = new MessageVO();
messageVo.setPartition(topicPartition.partition());
messageVo.setPartition(record.partition());
messageVo.setOffset(record.offset());
messageVo.setKey(record.key());
messageVo.setMessage(record.value());
Expand All @@ -191,10 +184,6 @@ private static Map<String, String> headersToMap(Headers headers) {
return map;
}

private Map<Integer, TopicPartitionVO> getTopicPartitionSizes(TopicVO topic) {
return highLevelConsumer.getPartitionSize(topic.getName());
}

@Override
public List<ConsumerVO> getConsumers(Collection<TopicVO> topicVos) {
final var topics = topicVos.stream().map(TopicVO::getName).collect(Collectors.toSet());
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/kafdrop/service/TopicEnrichMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2020 Kafdrop contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/

package kafdrop.service;

public enum TopicEnrichMode {
/**
* Specifies to enrich topic info with partition size (first offset, last offset and size).
*/
PartitionSize,

/**
* Specifies to enrich topic info with topic configuration.
*/
TopicConfig
}