From 341590c84794cff1490542bba1f62446bfc3224d Mon Sep 17 00:00:00 2001 From: jarivs Date: Tue, 26 Dec 2023 21:56:51 +0800 Subject: [PATCH] [Improve] update kafka avro e2e --- .../e2e/connector/kafka/KafkaIT.java | 128 +++++++++++++++++- .../fake_source_to_kafka_avro_format.conf | 2 + ...console.conf => kafka_avro_to_assert.conf} | 16 ++- 3 files changed, 143 insertions(+), 3 deletions(-) rename seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/{kafka_avro_to_console.conf => kafka_avro_to_assert.conf} (87%) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 0bf222bf87d9..47d9d4ece365 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.e2e.connector.kafka; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; @@ -36,6 +37,7 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.container.TestContainerId; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.format.avro.AvroDeserializationSchema; import org.apache.seatunnel.format.text.TextSerializationSchema; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -300,11 +302,94 @@ public void testFakeSourceToKafkaAvroFormat(TestContainer container) Container.ExecResult execResult = container.executeJob("/avro/fake_source_to_kafka_avro_format.conf"); Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + String[] subField = { + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_bytes", + "c_date", + "c_decimal", + "c_timestamp" + }; + SeaTunnelDataType[] subFieldTypes = { + new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE), + ArrayType.INT_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + BasicType.BYTE_TYPE, + LocalTimeType.LOCAL_DATE_TYPE, + new DecimalType(38, 18), + LocalTimeType.LOCAL_DATE_TIME_TYPE + }; + SeaTunnelRowType subRow = new SeaTunnelRowType(subField, subFieldTypes); + String[] fieldNames = { + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_bytes", + "c_date", + "c_decimal", + "c_timestamp", + "c_row" + }; + SeaTunnelDataType[] fieldTypes = { + new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE), + ArrayType.INT_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + BasicType.BYTE_TYPE, + LocalTimeType.LOCAL_DATE_TYPE, + new DecimalType(38, 18), + LocalTimeType.LOCAL_DATE_TIME_TYPE, + subRow + }; + SeaTunnelRowType fake_source_row_type = new SeaTunnelRowType(fieldNames, fieldTypes); + AvroDeserializationSchema avroDeserializationSchema = + new AvroDeserializationSchema(fake_source_row_type); + List kafkaSTRow = + getKafkaSTRow( + "test_avro_topic", + value -> { + try { + return avroDeserializationSchema.deserialize(value.value()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + Assertions.assertEquals(100, kafkaSTRow.size()); + kafkaSTRow.forEach( + row -> Assertions.assertEquals("fake_source_string", row.getField(2).toString())); } @TestTemplate @DisabledOnContainer(TestContainerId.SPARK_2_4) - public void testKafkaAvroToConsole(TestContainer container) + public void testKafkaAvroToAssert(TestContainer container) throws IOException, InterruptedException { DefaultSeaTunnelRowSerializer serializer = DefaultSeaTunnelRowSerializer.create( @@ -313,7 +398,7 @@ public void testKafkaAvroToConsole(TestContainer container) MessageFormat.AVRO, DEFAULT_FIELD_DELIMITER); generateTestData(row -> serializer.serializeRow(row), 0, 100); - Container.ExecResult execResult = container.executeJob("/avro/kafka_avro_to_console.conf"); + Container.ExecResult execResult = container.executeJob("/avro/kafka_avro_to_assert.conf"); Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } @@ -373,6 +458,18 @@ private Properties kafkaConsumerConfig() { return props; } + private Properties kafkaByteConsumerConfig() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "seatunnel-kafka-sink-group"); + props.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.EARLIEST.toString().toLowerCase()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + return props; + } + private void generateTestData(ProducerRecordConverter converter, int start, int end) { for (int i = start; i < end; i++) { SeaTunnelRow row = @@ -480,7 +577,34 @@ private List getKafkaConsumerListData(String topicName) { return data; } + private List getKafkaSTRow(String topicName, ConsumerRecordConverter converter) { + List data = new ArrayList<>(); + try (KafkaConsumer consumer = + new KafkaConsumer<>(kafkaByteConsumerConfig())) { + consumer.subscribe(Arrays.asList(topicName)); + Map offsets = + consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); + Long endOffset = offsets.entrySet().iterator().next().getValue(); + Long lastProcessedOffset = -1L; + + do { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + if (lastProcessedOffset < record.offset()) { + data.add(converter.convert(record)); + } + lastProcessedOffset = record.offset(); + } + } while (lastProcessedOffset < endOffset - 1); + } + return data; + } + interface ProducerRecordConverter { ProducerRecord convert(SeaTunnelRow row); } + + interface ConsumerRecordConverter { + SeaTunnelRow convert(ConsumerRecord value); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf index 99eac77ef1b2..0ea9cadc5765 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf @@ -29,6 +29,8 @@ env { source { FakeSource { + row.num = 100 + string.template = ["fake_source_string"] schema = { fields { c_map = "map" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf similarity index 87% rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf index ce5932e744e7..d357eb6583a4 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf @@ -82,7 +82,21 @@ sink { rule_value = 99 } ] - } + }, + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = MIN_LENGTH + rule_value = 6 + }, + { + rule_type = MAX_LENGTH + rule_value = 6 + } + ] + } ] } }