Skip to content

Commit

Permalink
[Improve] update kafka avro e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie committed Dec 27, 2023
1 parent b051e33 commit 341590c
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.e2e.connector.kafka;

import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;

Expand All @@ -36,6 +37,7 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.format.avro.AvroDeserializationSchema;
import org.apache.seatunnel.format.text.TextSerializationSchema;

import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -300,11 +302,94 @@ public void testFakeSourceToKafkaAvroFormat(TestContainer container)
Container.ExecResult execResult =
container.executeJob("/avro/fake_source_to_kafka_avro_format.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
String[] subField = {
"c_map",
"c_array",
"c_string",
"c_boolean",
"c_tinyint",
"c_smallint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_bytes",
"c_date",
"c_decimal",
"c_timestamp"
};
SeaTunnelDataType<?>[] subFieldTypes = {
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
ArrayType.INT_ARRAY_TYPE,
BasicType.STRING_TYPE,
BasicType.BOOLEAN_TYPE,
BasicType.BYTE_TYPE,
BasicType.SHORT_TYPE,
BasicType.INT_TYPE,
BasicType.LONG_TYPE,
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
BasicType.BYTE_TYPE,
LocalTimeType.LOCAL_DATE_TYPE,
new DecimalType(38, 18),
LocalTimeType.LOCAL_DATE_TIME_TYPE
};
SeaTunnelRowType subRow = new SeaTunnelRowType(subField, subFieldTypes);
String[] fieldNames = {
"c_map",
"c_array",
"c_string",
"c_boolean",
"c_tinyint",
"c_smallint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_bytes",
"c_date",
"c_decimal",
"c_timestamp",
"c_row"
};
SeaTunnelDataType<?>[] fieldTypes = {
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
ArrayType.INT_ARRAY_TYPE,
BasicType.STRING_TYPE,
BasicType.BOOLEAN_TYPE,
BasicType.BYTE_TYPE,
BasicType.SHORT_TYPE,
BasicType.INT_TYPE,
BasicType.LONG_TYPE,
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
BasicType.BYTE_TYPE,
LocalTimeType.LOCAL_DATE_TYPE,
new DecimalType(38, 18),
LocalTimeType.LOCAL_DATE_TIME_TYPE,
subRow
};
SeaTunnelRowType fake_source_row_type = new SeaTunnelRowType(fieldNames, fieldTypes);
AvroDeserializationSchema avroDeserializationSchema =
new AvroDeserializationSchema(fake_source_row_type);
List<SeaTunnelRow> kafkaSTRow =
getKafkaSTRow(
"test_avro_topic",
value -> {
try {
return avroDeserializationSchema.deserialize(value.value());
} catch (IOException e) {
throw new RuntimeException(e);
}
});
Assertions.assertEquals(100, kafkaSTRow.size());
kafkaSTRow.forEach(
row -> Assertions.assertEquals("fake_source_string", row.getField(2).toString()));
}

@TestTemplate
@DisabledOnContainer(TestContainerId.SPARK_2_4)
public void testKafkaAvroToConsole(TestContainer container)
public void testKafkaAvroToAssert(TestContainer container)
throws IOException, InterruptedException {
DefaultSeaTunnelRowSerializer serializer =
DefaultSeaTunnelRowSerializer.create(
Expand All @@ -313,7 +398,7 @@ public void testKafkaAvroToConsole(TestContainer container)
MessageFormat.AVRO,
DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), 0, 100);
Container.ExecResult execResult = container.executeJob("/avro/kafka_avro_to_console.conf");
Container.ExecResult execResult = container.executeJob("/avro/kafka_avro_to_assert.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}

Expand Down Expand Up @@ -373,6 +458,18 @@ private Properties kafkaConsumerConfig() {
return props;
}

private Properties kafkaByteConsumerConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "seatunnel-kafka-sink-group");
props.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.toString().toLowerCase());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return props;
}

private void generateTestData(ProducerRecordConverter converter, int start, int end) {
for (int i = start; i < end; i++) {
SeaTunnelRow row =
Expand Down Expand Up @@ -480,7 +577,34 @@ private List<String> getKafkaConsumerListData(String topicName) {
return data;
}

private List<SeaTunnelRow> getKafkaSTRow(String topicName, ConsumerRecordConverter converter) {
List<SeaTunnelRow> data = new ArrayList<>();
try (KafkaConsumer<byte[], byte[]> consumer =
new KafkaConsumer<>(kafkaByteConsumerConfig())) {
consumer.subscribe(Arrays.asList(topicName));
Map<TopicPartition, Long> offsets =
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
Long endOffset = offsets.entrySet().iterator().next().getValue();
Long lastProcessedOffset = -1L;

do {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<byte[], byte[]> record : records) {
if (lastProcessedOffset < record.offset()) {
data.add(converter.convert(record));
}
lastProcessedOffset = record.offset();
}
} while (lastProcessedOffset < endOffset - 1);
}
return data;
}

interface ProducerRecordConverter {
ProducerRecord<byte[], byte[]> convert(SeaTunnelRow row);
}

interface ConsumerRecordConverter {
SeaTunnelRow convert(ConsumerRecord<byte[], byte[]> value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ env {

source {
FakeSource {
row.num = 100
string.template = ["fake_source_string"]
schema = {
fields {
c_map = "map<string, string>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,21 @@ sink {
rule_value = 99
}
]
}
},
{
field_name = c_string
field_type = string
field_value = [
{
rule_type = MIN_LENGTH
rule_value = 6
},
{
rule_type = MAX_LENGTH
rule_value = 6
}
]
}
]
}
}
Expand Down

0 comments on commit 341590c

Please sign in to comment.