Skip to content

Commit

Permalink
(init) Imap storage supports kafka compact topic in cluster mode apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jul 2, 2023
1 parent b1b1f5e commit 0f974ef
Show file tree
Hide file tree
Showing 7 changed files with 455 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/

### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr

### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/

### VS Code ###
.vscode/

### Mac OS ###
.DS_Store
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-plugins</artifactId>
<version>${revision}</version>
</parent>

<groupId>org.example</groupId>
<artifactId>imap-storage-kafka</artifactId>
<name>SeaTunnel : Engine : Storage : IMap Storage Plugins : Kafka</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>serializer-protobuf</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.seatunnel.engine.imap.storage.kafka;

import org.apache.seatunnel.engine.imap.storage.api.IMapStorage;
import org.apache.seatunnel.engine.imap.storage.kafka.config.KafkaStorageConfig;
import org.apache.seatunnel.engine.serializer.api.Serializer;
import org.apache.seatunnel.engine.serializer.protobuf.ProtoStuffSerializer;

import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Utils;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@Slf4j
public class IMapKafkaStorage implements IMapStorage {

private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
private KafkaConfig kafkaConfig;

private Consumer<byte[], byte[]> consumer;
private Producer<byte[], byte[]> producer;
private KafkaAdminClient adminClient;

private Serializer serializer;

private List<TopicPartition> partitions = new ArrayList<>();

@Override
public void initialize(Map<String, Object> config) {
String bootstrapServers = config.get(KafkaStorageConfig.KAFKA_BOOTSTRAP_SERVERS).toString();
String compactTopic = config.get(KafkaStorageConfig.KAFKA_STORAGE_COMPACT_TOPIC).toString();
Integer topicReplicationFactor =
Integer.parseInt(
config.getOrDefault(
KafkaStorageConfig
.KAFKA_STORAGE_COMPACT_TOPIC_REPLICATION_FACTOR,
3)
.toString());
Integer topicPartition =
Integer.parseInt(
config.getOrDefault(
KafkaStorageConfig.KAFKA_STORAGE_COMPACT_TOPIC_PARTITION, 1)
.toString());

kafkaConfig =
KafkaConfig.builder()
.bootstrapServers(bootstrapServers)
.storageTopic(compactTopic)
.storageTopicPartition(topicPartition)
.storageTopicReplicationFactor(topicReplicationFactor)
.consumerConfigs(new HashMap<>())
.producerConfigs(new HashMap<>())
.build();

// Init serializer, default ProtoStuffSerializer
this.serializer = new ProtoStuffSerializer();
// create admin client
this.adminClient = createKafkaAdminClient();
// It must be compact topic
checkCompactTopicConfig();
this.consumer = createConsumer();
this.producer = createProducer();
}

/**
* Create producer
*
* @return
*/
private Producer<byte[], byte[]> createProducer() {
Map<String, Object> producerConfigs = kafkaConfig.getProducerConfigs();
// Always require producer acks to all to ensure durable writes
producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");

// Don't allow more than one in-flight request to prevent reordering on retry (if enabled)
producerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
return new KafkaProducer<>(producerConfigs);
}

/**
* Create consumer
*
* @return
*/
private Consumer<byte[], byte[]> createConsumer() {
Map<String, Object> consumerConfigs = kafkaConfig.getConsumerConfigs();
// Always force reset to the beginning of the log since this class wants to consume all
// available log data
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Turn off autocommit since we always want to consume the full log
consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfigs);
// List topic partitions
List<PartitionInfo> partitionInfos = null;
long started = System.currentTimeMillis();
while (partitionInfos == null
&& System.currentTimeMillis() - started < CREATE_TOPIC_TIMEOUT_MS) {
partitionInfos = consumer.partitionsFor(kafkaConfig.getStorageTopic());
Utils.sleep(Math.min(System.currentTimeMillis() - started, 1000));
}
if (partitionInfos == null) {
throw new RuntimeException(
"Could not look up partition metadata for offset backing store topic in"
+ " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if"
+ " this is your first use of the topic it may have taken too long to create.");
}
for (PartitionInfo partition : partitionInfos) {
this.partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
consumer.assign(this.partitions);
return consumer;
}

private KafkaAdminClient createKafkaAdminClient() {
return null;
}

/** It must be compact topic config */
private void checkCompactTopicConfig() {
// No-op
}

@Override
public boolean store(Object key, Object value) {
try {
byte[] bKey = serializer.serialize(key);
byte[] bValue = serializer.serialize(value);
Future<RecordMetadata> callback =
producer.send(
new ProducerRecord<>(kafkaConfig.getStorageTopic(), bKey, bValue));
return Objects.nonNull(callback.get());
} catch (IOException | InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

@Override
public Set<Object> storeAll(Map<Object, Object> all) {
for (Map.Entry<Object, Object> item : all.entrySet()) {
store(item.getKey(), item.getValue());
}
return all.keySet();
}

@Override
public boolean delete(Object key) {
try {
byte[] bKey = serializer.serialize(key);
// Set tombstone message
Future<RecordMetadata> callback =
producer.send(new ProducerRecord<>(kafkaConfig.getStorageTopic(), bKey, null));
return Objects.nonNull(callback.get());
} catch (IOException | InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

@Override
public Set<Object> deleteAll(Collection<Object> keys) {
for (Object key : keys) {
delete(key);
}
return new HashSet<>(keys);
}

@Override
public Map<Object, Object> loadAll() throws IOException {

Map<Object, Object> result = new HashMap<>();

// Always consume from the beginning of all partitions.
consumer.seekToBeginning(partitions);
//
Set<TopicPartition> assignment = consumer.assignment();
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
log.info("Reading to end of log offsets {}", endOffsets);
while (!endOffsets.isEmpty()) {
Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
TopicPartition topicPartition = entry.getKey();
long endOffset = entry.getValue();
long lastConsumedOffset = consumer.position(topicPartition);
if (lastConsumedOffset >= endOffset) {
log.info("Read to end offset {} for {}", endOffset, topicPartition);
it.remove();
} else {
log.info(
"Behind end offset {} for {}; last-read offset is {}",
endOffset,
topicPartition,
lastConsumedOffset);
poll(
Integer.MAX_VALUE,
new java.util.function.Consumer<ConsumerRecord<byte[], byte[]>>() {
@Override
public void accept(ConsumerRecord<byte[], byte[]> record) {
// serializer.deserialize()
}
});
break;
}
}
}
return result;
}

private void poll(
long timeoutMs, java.util.function.Consumer<ConsumerRecord<byte[], byte[]>> accepter) {
try {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(timeoutMs));
for (ConsumerRecord<byte[], byte[]> record : records) {
accepter.accept(record);
}
} catch (WakeupException e) {
// Expected on get() or stop(). The calling code should handle this
throw e;
} catch (KafkaException e) {
log.error("Error polling: " + e);
}
}

@Override
public Set<Object> loadAllKeys() {
return null;
}

@Override
public void destroy(boolean deleteAllFileFlag) {
// delete topic
}
}
Loading

0 comments on commit 0f974ef

Please sign in to comment.