Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added functionality to allow user to add a message to a topic partition manually #703

Merged
merged 6 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 100 additions & 4 deletions src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,28 @@
import kafdrop.service.MessageInspector;
import kafdrop.service.TopicNotFoundException;
import kafdrop.util.AvroMessageDeserializer;
import kafdrop.util.AvroMessageSerializer;
import kafdrop.util.DefaultMessageDeserializer;
import kafdrop.util.DefaultMessageSerializer;
import kafdrop.util.Deserializers;
import kafdrop.util.KeyFormat;
import kafdrop.util.MessageDeserializer;
import kafdrop.util.MessageFormat;
import kafdrop.util.MessageSerializer;
import kafdrop.util.MsgPackMessageDeserializer;
import kafdrop.util.MsgPackMessageSerializer;
import kafdrop.util.ProtobufMessageDeserializer;
import kafdrop.util.ProtobufMessageSerializer;
import kafdrop.util.ProtobufSchemaRegistryMessageDeserializer;

import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;


import kafdrop.util.Serializers;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
Expand All @@ -57,11 +71,10 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;

import org.springframework.web.bind.annotation.PostMapping;
import kafdrop.model.CreateMessageVO;

@Tag(name = "message-controller", description = "Message Controller")
@Controller
Expand Down Expand Up @@ -195,6 +208,60 @@ public String viewMessageForm(@PathVariable("name") String topicName,
return "message-inspector";
}

@PostMapping("/topic/{name:.+}/addmessage")
public String addMessage(
@PathVariable("name")
String topicName,
@ModelAttribute("addMessageForm") CreateMessageVO body,
Model model) {
try {
final MessageFormat defaultFormat = messageFormatProperties.getFormat();
final MessageFormat defaultKeyFormat = messageFormatProperties.getKeyFormat();

final var serializers = new Serializers(
getSerializer(topicName, defaultKeyFormat, "", ""),
getSerializer(topicName, defaultFormat, "", ""));
RecordMetadata recordMetadata = kafkaMonitor.publishMessage(body, serializers);

final var deserializers = new Deserializers(
getDeserializer(topicName, defaultKeyFormat, "", ""),
getDeserializer(topicName, defaultFormat, "", "")
);

final PartitionOffsetInfo defaultForm = new PartitionOffsetInfo();

defaultForm.setCount(100l);
defaultForm.setOffset(recordMetadata.offset());
defaultForm.setPartition(body.getTopicPartition());
defaultForm.setFormat(defaultFormat);
defaultForm.setKeyFormat(defaultFormat);

model.addAttribute("messageForm", defaultForm);

final TopicVO topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));

model.addAttribute("topic", topic);

model.addAttribute("defaultFormat", defaultFormat);
model.addAttribute("messageFormats", MessageFormat.values());
model.addAttribute("defaultKeyFormat", defaultKeyFormat);
model.addAttribute("keyFormats", KeyFormat.values());
model.addAttribute("descFiles", protobufProperties.getDescFilesList());
model.addAttribute("messages",
messageInspector.getMessages(topicName,
body.getTopicPartition(),
recordMetadata.offset(),
100,
deserializers));
model.addAttribute("isAnyProtoOpts", List.of(true, false));

} catch (Exception ex) {
model.addAttribute("errorMessage", ex.getMessage());
}
return "message-inspector";
}

/**
* Human friendly view of searching messages.
*
Expand Down Expand Up @@ -339,6 +406,11 @@ List<Object> getPartitionOrMessages(
}
}

private MessageDeserializer getDeserializer(String topicName, MessageFormat format, String descFile,
String msgTypeName) {
return getDeserializer(topicName, format, descFile, msgTypeName, false);
}

private MessageDeserializer getDeserializer(String topicName, MessageFormat format, String descFile,
String msgTypeName, boolean isAnyProto) {
final MessageDeserializer deserializer;
Expand Down Expand Up @@ -370,6 +442,30 @@ private MessageDeserializer getDeserializer(String topicName, MessageFormat form
return deserializer;
}

private MessageSerializer getSerializer(String topicName, MessageFormat format, String descFile, String msgTypeName) {
final MessageSerializer serializer;

if (format == MessageFormat.AVRO) {
final var schemaRegistryUrl = schemaRegistryProperties.getConnect();
final var schemaRegistryAuth = schemaRegistryProperties.getAuth();

serializer = new AvroMessageSerializer(topicName, schemaRegistryUrl, schemaRegistryAuth);
} else if (format == MessageFormat.PROTOBUF) {
// filter the input file name
final var descFileName = descFile.replace(".desc", "")
.replaceAll("\\.", "")
.replaceAll("/", "");
final var fullDescFile = protobufProperties.getDirectory() + File.separator + descFileName + ".desc";
serializer = new ProtobufMessageSerializer(fullDescFile, msgTypeName);
} else if (format == MessageFormat.MSGPACK) {
serializer = new MsgPackMessageSerializer();
} else {
serializer = new DefaultMessageSerializer();
}

return serializer;
}

/**
* Encapsulates offset data for a single partition.
*/
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/kafdrop/model/CreateMessageVO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package kafdrop.model;

import lombok.Data;
import lombok.RequiredArgsConstructor;

@Data
@RequiredArgsConstructor
public final class CreateMessageVO {

private int topicPartition;

private String key;

private String value;

private String topic;
}
7 changes: 3 additions & 4 deletions src/main/java/kafdrop/service/KafkaHighLevelConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,9 @@ synchronized SearchResults searchRecords(String topic,
List<TopicPartition> partitions = determinePartitionsForTopic(topic);
if (partition != -1) {
var partitionOpt = partitions.stream().filter(p -> p.partition() == partition).findAny();
if (partitionOpt.isEmpty()) {
throw new IllegalArgumentException("Partition does not exist in topic");
}
partitions = List.of(partitionOpt.get());
partitions = List.of(partitionOpt.orElseThrow(
() -> new IllegalArgumentException("Partition " + partition + " does not exist in topic")
));
}
kafkaConsumer.assign(partitions);
seekToTimestamp(partitions, startTimestamp);
Expand Down
65 changes: 65 additions & 0 deletions src/main/java/kafdrop/service/KafkaHighLevelProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package kafdrop.service;

import java.util.Properties;
import java.util.concurrent.Future;


import jakarta.annotation.PostConstruct;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import kafdrop.config.KafkaConfiguration;
import kafdrop.model.CreateMessageVO;
import kafdrop.util.Serializers;

@Service
public final class KafkaHighLevelProducer {

private static final Logger LOG = LoggerFactory.getLogger(KafkaHighLevelProducer.class);
private final KafkaConfiguration kafkaConfiguration;
private KafkaProducer<byte[], byte[]> kafkaProducer;

public KafkaHighLevelProducer(KafkaConfiguration kafkaConfiguration) {
this.kafkaConfiguration = kafkaConfiguration;
}

@PostConstruct
private void initializeClient() {
if (kafkaProducer == null) {
final var properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "kafdrop-producer");
kafkaConfiguration.applyCommon(properties);

kafkaProducer = new KafkaProducer<>(properties);
}
}

public RecordMetadata publishMessage(CreateMessageVO message, Serializers serializers) {
initializeClient();

final ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(message.getTopic(),
message.getTopicPartition(), serializers.getKeySerializer().serializeMessage(message.getKey()),
serializers.getValueSerializer().serializeMessage(message.getValue()));

Future<RecordMetadata> result = kafkaProducer.send(record);
try {
RecordMetadata recordMetadata = result.get();
LOG.info("Record published successfully [{}]", recordMetadata);
return recordMetadata;
} catch (Exception e) {
LOG.error("Failed to publish message", e);
throw new KafkaProducerException(e);
}
}
}
6 changes: 6 additions & 0 deletions src/main/java/kafdrop/service/KafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import kafdrop.model.BrokerVO;
import kafdrop.model.ClusterSummaryVO;
import kafdrop.model.ConsumerVO;
import kafdrop.model.CreateMessageVO;
import kafdrop.model.CreateTopicVO;
import kafdrop.model.MessageVO;
import kafdrop.model.SearchResultsVO;
import kafdrop.model.TopicVO;
import kafdrop.util.Deserializers;
import kafdrop.util.Serializers;
import org.apache.kafka.common.TopicPartition;

import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Collection;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -79,5 +83,7 @@ SearchResultsVO searchMessages(String topic,
*/
void deleteTopic(String topic);

RecordMetadata publishMessage(CreateMessageVO message, Serializers serializers);

List<AclVO> getAcls();
}
23 changes: 18 additions & 5 deletions src/main/java/kafdrop/service/KafkaMonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,40 @@
import kafdrop.model.AclVO;
import kafdrop.model.BrokerVO;
import kafdrop.model.ClusterSummaryVO;
import kafdrop.model.ConsumerPartitionVO;
import kafdrop.model.ConsumerTopicVO;
import kafdrop.model.ConsumerVO;
import kafdrop.model.ConsumerTopicVO;
import kafdrop.model.ConsumerPartitionVO;
import kafdrop.model.CreateMessageVO;
import kafdrop.model.CreateTopicVO;
import kafdrop.model.MessageVO;
import kafdrop.model.SearchResultsVO;
import kafdrop.model.TopicPartitionVO;
import kafdrop.model.TopicVO;
import kafdrop.model.TopicPartitionVO;
import kafdrop.util.Serializers;
import kafdrop.util.Deserializers;
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;


import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -64,9 +68,13 @@ public final class KafkaMonitorImpl implements KafkaMonitor {

private final KafkaHighLevelAdminClient highLevelAdminClient;

public KafkaMonitorImpl(KafkaHighLevelConsumer highLevelConsumer, KafkaHighLevelAdminClient highLevelAdminClient) {
private final KafkaHighLevelProducer highLevelProducer;

public KafkaMonitorImpl(KafkaHighLevelConsumer highLevelConsumer, KafkaHighLevelAdminClient highLevelAdminClient,
KafkaHighLevelProducer highLevelProducer) {
this.highLevelConsumer = highLevelConsumer;
this.highLevelAdminClient = highLevelAdminClient;
this.highLevelProducer = highLevelProducer;
}

@Override
Expand Down Expand Up @@ -419,4 +427,9 @@ private List<ConsumerGroupOffsets> getConsumerOffsets(Set<String> topics) {
.filter(not(ConsumerGroupOffsets::isEmpty))
.collect(Collectors.toList());
}

@Override
public RecordMetadata publishMessage(CreateMessageVO message, Serializers serializers) {
return highLevelProducer.publishMessage(message, serializers);
}
}
16 changes: 16 additions & 0 deletions src/main/java/kafdrop/service/KafkaProducerException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package kafdrop.service;

import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;

@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public class KafkaProducerException extends RuntimeException {

public KafkaProducerException(Throwable exception) {
super(exception);
}

public KafkaProducerException(String message) {
super(message);
}
}
36 changes: 36 additions & 0 deletions src/main/java/kafdrop/util/AvroMessageSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package kafdrop.util;

import java.util.HashMap;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;

public class AvroMessageSerializer implements MessageSerializer {

private final String topicName;
private final KafkaAvroSerializer serializer;

public AvroMessageSerializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) {
this.topicName = topicName;
this.serializer = getSerializer(schemaRegistryUrl, schemaRegistryAuth);
}

@Override
public byte[] serializeMessage(String value) {
final var bytes = value.getBytes();
return serializer.serialize(topicName, bytes);
}

private KafkaAvroSerializer getSerializer(String schemaRegistryUrl, String schemaRegistryAuth) {
final var config = new HashMap<String, Object>();
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
if (schemaRegistryAuth != null) {
config.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
config.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
}
final var kafkaAvroSerializer = new KafkaAvroSerializer();
kafkaAvroSerializer.configure(config, false);
return kafkaAvroSerializer;
}

}
Loading
Loading