From cd93772c5019bff66b509f2a2fcac740ce8d8a7f Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Wed, 12 Jul 2023 12:21:41 +0800 Subject: [PATCH] fixed e2e and config DEBEZIUM_RECORD_INCLUDE_SCHEMA --- docs/en/connector-v2/source/kafka.md | 1 + .../seatunnel/kafka/config/Config.java | 8 +- .../seatunnel/kafka/config/MessageFormat.java | 5 +- .../DefaultSeaTunnelRowSerializer.java | 4 +- .../seatunnel/kafka/source/KafkaSource.java | 8 +- .../kafka/source/KafkaSourceFactory.java | 1 + .../connector/kafka/DebeziumToKafkaIT.java | 276 +++++++++--------- .../resources/debezium/register-mysql.json | 10 +- .../kafkasource_debezium_cdc_to_pgsql.conf | 9 +- .../kafkasource_debezium_to_kafka.conf | 6 +- .../DebeziumJsonDeserializationSchema.java | 32 +- .../debezium/DebeziumJsonSerDeSchemaTest.java | 2 +- 12 files changed, 204 insertions(+), 158 deletions(-) diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index 68561c8380e7..2ed6ec6f12e4 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -222,6 +222,7 @@ source { - Add Kafka Source Connector ### Next Version + - [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/incubator-seatunnel/pull/3157)) - [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/incubator-seatunnel/pull/3125)) - [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java index be1a5df32e22..f126e563fbb8 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java @@ -26,8 +26,6 @@ public class Config { public static final String CONNECTOR_IDENTITY = "Kafka"; - public static final String REPLICATION_FACTOR = "replication.factor"; - public static final String DEBEZIUM_FORMAT = "debezium-json"; /** The default field delimiter is “,” */ public static final String DEFAULT_FIELD_DELIMITER = ","; @@ -99,6 +97,12 @@ public class Config { "Data format. The default format is json. Optional text format. The default field separator is \", \". " + "If you customize the delimiter, add the \"field_delimiter\" option."); + public static final Option DEBEZIUM_RECORD_INCLUDE_SCHEMA = + Options.key("debezium_record_include_schema") + .booleanType() + .defaultValue(true) + .withDescription("Does the debezium record carry a schema."); + public static final Option FIELD_DELIMITER = Options.key("field_delimiter") .stringType() diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java index 2b0915d30e1d..1ef29f6322a3 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java @@ -21,7 +21,6 @@ public enum MessageFormat { JSON, TEXT, CANAL_JSON, - COMPATIBLE_DEBEZIUM_JSON, - DEBEZIUM_FORMAT - + DEBEZIUM_JSON, + COMPATIBLE_DEBEZIUM_JSON } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java index 5a6de56f0742..f8974d0f1a9e 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java @@ -220,10 +220,10 @@ private static SerializationSchema createSerializationSchema( .build(); case CANAL_JSON: return new CanalJsonSerializationSchema(rowType); + case DEBEZIUM_JSON: + return new DebeziumJsonSerializationSchema(rowType); case COMPATIBLE_DEBEZIUM_JSON: return new CompatibleDebeziumJsonSerializationSchema(rowType, isKey); - case DEBEZIUM_FORMAT: - return new DebeziumJsonSerializationSchema(rowType); default: throw new SeaTunnelJsonFormatException( CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format); diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java index b7e0deb8ffb8..30878e82a2c4 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java @@ -63,6 +63,7 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT; @@ -268,7 +269,12 @@ private void setDeserialization(Config config) { .build(); break; case DEBEZIUM_JSON: - deserializationSchema = new DebeziumJsonDeserializationSchema(typeInfo, true); + boolean includeSchema = DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue(); + if (config.hasPath(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) { + includeSchema = config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key()); + } + deserializationSchema = + new DebeziumJsonDeserializationSchema(typeInfo, true, includeSchema); break; default: throw new SeaTunnelJsonFormatException( diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java index daa75385e4d6..21057040ec2e 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java @@ -46,6 +46,7 @@ public OptionRule optionRule() { Config.KAFKA_CONFIG, Config.SCHEMA, Config.FORMAT, + Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA, Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS) .conditional(Config.START_MODE, StartMode.TIMESTAMP, Config.START_MODE_TIMESTAMP) .conditional( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java index c5995c3207c8..50a291b08a45 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/DebeziumToKafkaIT.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.e2e.connector.kafka; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; @@ -24,17 +26,11 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; -import org.apache.seatunnel.e2e.common.util.ContainerUtil; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -47,12 +43,14 @@ import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.lifecycle.Startables; import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; import org.testcontainers.utility.MountableFile; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -60,7 +58,6 @@ import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; -import java.nio.file.Path; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; @@ -79,6 +76,7 @@ import java.util.stream.Stream; import static org.awaitility.Awaitility.given; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; @DisabledOnContainer( value = {}, @@ -90,7 +88,7 @@ public class DebeziumToKafkaIT extends TestSuiteBase implements TestResource { private static GenericContainer DEBEZIUM_CONTAINER; - private static final String DEBEZIUM_DOCKER_IMAGE = "debezium/connect:2.1"; + private static final String DEBEZIUM_DOCKER_IMAGE = "quay.io/debezium/connect:2.3.0.Final"; private static final String DEBEZIUM_HOST = "debezium_e2e"; @@ -100,25 +98,16 @@ public class DebeziumToKafkaIT extends TestSuiteBase implements TestResource { // kafka private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:6.2.1"; - private static final int KAFKA_PORT = 9095; - - private static final String KAFKA_HOST = "kafka"; - - private KafkaProducer producer; + private static final String KAFKA_HOST = "kafka_dbz_e2e"; + private KafkaConsumer kafkaConsumer; private KafkaContainer KAFKA_CONTAINER; + private String KAFKA_TOPIC = "test-debezium-sink"; // ---------------------------------------------------------------------------- // mysql - private static final String MYSQL_IMAGE = "quay.io/debezium/example-mysql:2.1"; - private static final String MYSQL_HOST = "mysql"; - - private static final int MYSQL_PORT = 3306; - - private static final int MYSQL_EXPOSE_PORT = 3307; - - private static GenericContainer MYSQL_CONTAINER; + private static MySqlContainer MYSQL_CONTAINER; // ---------------------------------------------------------------------------- // postgres @@ -134,20 +123,6 @@ public class DebeziumToKafkaIT extends TestSuiteBase implements TestResource { @TestContainerExtension private final ContainerExtendedFactory extendedFactory = container -> { - Path jsonPath = - ContainerUtil.getResourcesFile("/debezium/register-mysql.json").toPath(); - container.copyFileToContainer( - MountableFile.forHostPath(jsonPath), - "/tmp/seatunnel/plugins/Jdbc/register-mysql.json"); - Container.ExecResult extraCommand = - container.execInContainer( - "bash", - "-c", - "cd /tmp/seatunnel/plugins/Jdbc && curl -i -X POST -H \"Accept:application/json\" -H \"Content-Type:application/json\" http://" - + getLinuxLocalIp() - + ":8083/connectors/ -d @register-mysql.json"); - Assertions.assertEquals(0, extraCommand.getExitCode()); - Container.ExecResult extraCommands = container.execInContainer( "bash", @@ -155,27 +130,31 @@ public class DebeziumToKafkaIT extends TestSuiteBase implements TestResource { "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + PG_DRIVER_JAR); Assertions.assertEquals(0, extraCommands.getExitCode()); - Thread.sleep(1000); - given().ignoreExceptions() - .await() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(2, TimeUnit.MINUTES) - .untilAsserted(this::updateMysqlTable); }; private void createDebeziumContainer() { DEBEZIUM_CONTAINER = new GenericContainer<>(DEBEZIUM_DOCKER_IMAGE) + .withCopyFileToContainer( + MountableFile.forClasspathResource("/debezium/register-mysql.json"), + "/tmp/seatunnel/plugins/Jdbc/register-mysql.json") .withNetwork(NETWORK) .withNetworkAliases(DEBEZIUM_HOST) + .withExposedPorts(DEBEZIUM_PORT) .withEnv("GROUP_ID", "1") .withEnv("CONFIG_STORAGE_TOPIC", "my-connect-configs") .withEnv("OFFSET_STORAGE_TOPIC", "my-connect-offsets") - .withEnv("BOOTSTRAP_SERVERS", "kafka:9092") + .withEnv("STATUS_STORAGE_TOPIC", "my-connect-status") + .withEnv("BOOTSTRAP_SERVERS", KAFKA_HOST + ":9092") .withLogConsumer( new Slf4jLogConsumer( - DockerLoggerFactory.getLogger(DEBEZIUM_DOCKER_IMAGE))); + DockerLoggerFactory.getLogger(DEBEZIUM_DOCKER_IMAGE))) + .dependsOn(KAFKA_CONTAINER, MYSQL_CONTAINER); + DEBEZIUM_CONTAINER.setWaitStrategy( + (new HttpWaitStrategy()) + .forPath("/connectors") + .forPort(DEBEZIUM_PORT) + .withStartupTimeout(Duration.ofSeconds(120))); DEBEZIUM_CONTAINER.setPortBindings( com.google.common.collect.Lists.newArrayList( String.format("%s:%s", DEBEZIUM_PORT, DEBEZIUM_PORT))); @@ -189,35 +168,30 @@ private void createKafkaContainer() { .withLogConsumer( new Slf4jLogConsumer( DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); - KAFKA_CONTAINER.setPortBindings( - com.google.common.collect.Lists.newArrayList( - String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); } - private void createMysqlContainer() throws ClassNotFoundException { + private void createMysqlContainer() { MYSQL_CONTAINER = - new GenericContainer<>(DockerImageName.parse(MYSQL_IMAGE)) + new MySqlContainer(MySqlVersion.V8_0) + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withSetupSQL("docker/setup.sql") .withNetwork(NETWORK) .withNetworkAliases(MYSQL_HOST) - .withExposedPorts(MYSQL_PORT) - .withEnv("MYSQL_ROOT_PASSWORD", "debezium") - .withEnv("MYSQL_USER", "mysqluser") - .withEnv("MYSQL_PASSWORD", "mysqlpw") - .withLogConsumer( - new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MYSQL_IMAGE))); - MYSQL_CONTAINER.setPortBindings( - com.google.common.collect.Lists.newArrayList( - String.format("%s:%s", MYSQL_EXPOSE_PORT, MYSQL_PORT))); + .withDatabaseName("debezium") + .withUsername("st_user") + .withPassword("seatunnel") + .withLogConsumer(new Slf4jLogConsumer(LOG)); } - private void createPostgreSQLContainer() throws ClassNotFoundException { + private void createPostgreSQLContainer() { POSTGRESQL_CONTAINER = new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE)) .withNetwork(NETWORK) - .withNetworkAliases("postgresql") - .withExposedPorts(PG_PORT) + .withNetworkAliases("postgresql_e2e") .withLogConsumer( new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE))); + POSTGRESQL_CONTAINER.setPortBindings( + Lists.newArrayList(String.format("%s:%s", PG_PORT, PG_PORT))); } @BeforeAll @@ -226,51 +200,83 @@ public void startUp() throws Exception { LOG.info("The first stage: Starting Kafka containers..."); createKafkaContainer(); Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join(); - LOG.info("Kafka Containers are started"); LOG.info("The second stage: Starting Mysql containers..."); createMysqlContainer(); Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); - LOG.info("Mysql Containers are started"); - - Awaitility.given() - .ignoreExceptions() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(() -> initKafkaProducer()); LOG.info("The third stage: Starting Debezium Connector containers..."); createDebeziumContainer(); Startables.deepStart(Stream.of(DEBEZIUM_CONTAINER)).join(); - LOG.info("Debezium Connector Containers are started"); LOG.info("The fourth stage: Starting PostgreSQL container..."); createPostgreSQLContainer(); Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join(); Class.forName(POSTGRESQL_CONTAINER.getDriverClassName()); - LOG.info("postgresql Containers are started"); + + Awaitility.given() + .ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(3, TimeUnit.MINUTES) + .untilAsserted(this::initializeSourceTableData); + + given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(3, TimeUnit.MINUTES) + .untilAsserted(this::initKafkaConsumer); given().ignoreExceptions() .await() .atLeast(100, TimeUnit.MILLISECONDS) .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(2, TimeUnit.MINUTES) - .untilAsserted(this::initializeJdbcTable); + .atMost(5, TimeUnit.MINUTES) + .untilAsserted(this::initializeSinkJdbcTable); + + Container.ExecResult extraCommand = + DEBEZIUM_CONTAINER.execInContainer( + "bash", + "-c", + "cd /tmp/seatunnel/plugins/Jdbc && curl -i -X POST -H \"Accept:application/json\" -H \"Content-Type:application/json\" http://" + + getLinuxLocalIp() + + ":8083/connectors/ -d @register-mysql.json"); + Assertions.assertEquals(0, extraCommand.getExitCode()); + // ensure debezium has handled the data + Thread.sleep(30 * 1000); + updateSourceTableData(); + Thread.sleep(30 * 1000); } @AfterAll @Override public void tearDown() throws Exception { - if (producer != null) { - producer.close(); - } MYSQL_CONTAINER.close(); KAFKA_CONTAINER.close(); DEBEZIUM_CONTAINER.close(); POSTGRESQL_CONTAINER.close(); } + @TestTemplate + public void testKafkaSinkDebeziumFormat(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/kafkasource_debezium_to_kafka.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + ArrayList result = new ArrayList<>(); + kafkaConsumer.subscribe(Lists.newArrayList(KAFKA_TOPIC)); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + ConsumerRecords consumerRecords = + kafkaConsumer.poll(Duration.ofMillis(1000)); + for (ConsumerRecord record : consumerRecords) { + result.add(record.value()); + } + Assertions.assertEquals(12, result.size()); + }); + } + @TestTemplate public void testDebeziumFormatKafkaCdcToPgsql(TestContainer container) throws IOException, InterruptedException, SQLException { @@ -284,7 +290,7 @@ public void testDebeziumFormatKafkaCdcToPgsql(TestContainer container) POSTGRESQL_CONTAINER.getUsername(), POSTGRESQL_CONTAINER.getPassword())) { try (Statement statement = connection.createStatement()) { - ResultSet resultSet = statement.executeQuery("select * from sink"); + ResultSet resultSet = statement.executeQuery("select * from sink order by id"); while (resultSet.next()) { List row = Arrays.asList( @@ -298,6 +304,7 @@ public void testDebeziumFormatKafkaCdcToPgsql(TestContainer container) } Set> expected = Stream.>of( + Arrays.asList(101, "scooter", "Small 2-wheel scooter", "4.56"), Arrays.asList(102, "car battery", "12V car battery", "8.1"), Arrays.asList( 103, @@ -307,42 +314,63 @@ public void testDebeziumFormatKafkaCdcToPgsql(TestContainer container) Arrays.asList(104, "hammer", "12oz carpenter's hammer", "0.75"), Arrays.asList(105, "hammer", "14oz carpenter's hammer", "0.875"), Arrays.asList(106, "hammer", "16oz carpenter's hammer", "1"), + Arrays.asList(107, "rocks", "box of assorted rocks", "5.3"), Arrays.asList( - 108, "jacket", "water resistent black wind breaker", "0.1"), - Arrays.asList(101, "scooter", "Small 2-wheel scooter", "4.56"), - Arrays.asList(107, "rocks", "box of assorted rocks", "5.3")) + 108, "jacket", "water resistent black wind breaker", "0.1")) .collect(Collectors.toSet()); Assertions.assertIterableEquals(expected, actual); } - private void initKafkaProducer() { - Properties props = new Properties(); - String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producer = new KafkaProducer<>(props); + public void initializeSourceTableData() throws Exception { + try (Connection connection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + Statement statement = connection.createStatement()) { + statement.execute("create database if not exists debezium"); + statement.execute( + "CREATE TABLE if not exists debezium.products (\n" + + " id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n" + + " name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',\n" + + " description VARCHAR(512),\n" + + " weight VARCHAR(512)\n" + + ");"); + statement.execute( + "INSERT INTO debezium.products\n" + + "VALUES (101,\"scooter\",\"Small 2-wheel scooter\",\"3.14\"),\n" + + " (102,\"car battery\",\"12V car battery\",\"8.1\"),\n" + + " (103,\"12-pack drill bits\",\"12-pack of drill bits with sizes ranging from #40 to #3\"," + + "\"0.8\"),\n" + + " (104,\"hammer\",\"12oz carpenter's hammer\",\"0.75\"),\n" + + " (105,\"hammer\",\"14oz carpenter's hammer\",\"0.875\"),\n" + + " (106,\"hammer\",\"16oz carpenter's hammer\",\"1.0\"),\n" + + " (107,\"rocks\",\"box of assorted rocks\",\"5.3\"),\n" + + " (108,\"jacket\",\"water resistent black wind breaker\",\"0.1\"),\n" + + " (109,\"spare tire\",\"24 inch spare tire\",\"22.2\")"); + } } - private Properties kafkaConsumerConfig() { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "seatunnel-debezium-sink-group"); - props.put( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - OffsetResetStrategy.EARLIEST.toString().toLowerCase()); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - return props; + public void updateSourceTableData() throws Exception { + try (Connection connection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + Statement statement = connection.createStatement()) { + statement.execute( + "UPDATE debezium.products SET weight = '4.56' WHERE name = 'scooter'"); + statement.execute("DELETE FROM debezium.products WHERE name = \"spare tire\""); + } } - private void initializeJdbcTable() { + private void initializeSinkJdbcTable() { try (Connection connection = - DriverManager.getConnection( - POSTGRESQL_CONTAINER.getJdbcUrl(), - POSTGRESQL_CONTAINER.getUsername(), - POSTGRESQL_CONTAINER.getPassword())) { - Statement statement = connection.createStatement(); + DriverManager.getConnection( + POSTGRESQL_CONTAINER.getJdbcUrl(), + POSTGRESQL_CONTAINER.getUsername(), + POSTGRESQL_CONTAINER.getPassword()); + Statement statement = connection.createStatement()) { String sink = "create table sink(\n" + "id INT NOT NULL PRIMARY KEY,\n" @@ -356,32 +384,20 @@ private void initializeJdbcTable() { } } - private void updateMysqlTable() { - try (Connection connection = - DriverManager.getConnection( - "jdbc:mysql://0.0.0.0:3307/inventory", "mysqluser", "mysqlpw")) { - Statement stmt = connection.createStatement(); - stmt.execute("SET FOREIGN_KEY_CHECKS=0"); - stmt.execute("UPDATE products SET weight = '4.56' WHERE name = 'scooter'"); // update - stmt.execute("DELETE FROM products WHERE name = 'spare tire'"); // delete - } catch (SQLException e) { - throw new RuntimeException("Update Mysql table failed!", e); - } - } - - private ArrayList getKafkaConsumerData(String topicName) { - ArrayList data = new ArrayList<>(); - try (KafkaConsumer consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { - ArrayList topics = new ArrayList<>(); - topics.add(topicName); - consumer.subscribe(topics); - ConsumerRecords consumerRecords = - consumer.poll(Duration.ofSeconds(10000)); - for (ConsumerRecord record : consumerRecords) { - data.add(record.value()); - } - } - return data; + private void initKafkaConsumer() { + Properties prop = new Properties(); + String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers(); + prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + prop.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + prop.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + prop.put(ConsumerConfig.GROUP_ID_CONFIG, "seatunnel-debezium-sink-group"); + kafkaConsumer = new KafkaConsumer<>(prop); } public String getLinuxLocalIp() { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json index cafdfb0b4034..d70e8e0c613d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/debezium/register-mysql.json @@ -5,12 +5,12 @@ "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", - "database.user": "debezium", - "database.password": "dbz", + "database.user": "st_user", + "database.password": "seatunnel", "database.server.id": "184054", "topic.prefix": "dbserver1", - "database.include.list": "inventory", - "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", - "schema.history.internal.kafka.topic": "schema-changes.inventory" + "database.include.list": "debezium", + "schema.history.internal.kafka.bootstrap.servers": "kafka_dbz_e2e:9092", + "schema.history.internal.kafka.topic": "schema-changes.debezium" } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf index ef6fb4dbe72c..a0531b2345a0 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_cdc_to_pgsql.conf @@ -32,8 +32,8 @@ env { source { Kafka { - bootstrap.servers = "kafka:9092" - topic = "dbserver1.inventory.products" + bootstrap.servers = "kafka_dbz_e2e:9092" + topic = "dbserver1.debezium.products" result_table_name = "kafka_name" start_mode = earliest format = debezium_json @@ -51,10 +51,11 @@ source { sink { Jdbc { driver = org.postgresql.Driver - url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" + url = "jdbc:postgresql://postgresql_e2e:5432/test?loggerLevel=OFF" user = test password = test - + generate_sink_sql = true + database = public table = sink primary_keys = ["id"] } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf index a1c43bc8d755..4944829c24ab 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_debezium_to_kafka.conf @@ -32,8 +32,8 @@ env { source { Kafka { - bootstrap.servers = "kafka:9092" - topic = "dbserver1.inventory.products" + bootstrap.servers = "kafka_dbz_e2e:9092" + topic = "dbserver1.debezium.products" result_table_name = "kafka_name" start_mode = earliest format = debezium_json @@ -50,7 +50,7 @@ source { sink { Kafka { - bootstrap.servers = "kafka:9092" + bootstrap.servers = "kafka_dbz_e2e:9092" topic = "test-debezium-sink" format = debezium_json } diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java index a5d5140f0d9e..3996c4ed7d82 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.format.json.debezium; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; -import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Collector; @@ -51,11 +50,23 @@ public class DebeziumJsonDeserializationSchema implements DeserializationSchema< private final boolean ignoreParseErrors; + private final boolean debeziumEnabledSchema; + public DebeziumJsonDeserializationSchema(SeaTunnelRowType rowType, boolean ignoreParseErrors) { this.rowType = rowType; this.ignoreParseErrors = ignoreParseErrors; this.jsonDeserializer = new JsonDeserializationSchema(false, ignoreParseErrors, createJsonRowType(rowType)); + this.debeziumEnabledSchema = false; + } + + public DebeziumJsonDeserializationSchema( + SeaTunnelRowType rowType, boolean ignoreParseErrors, boolean debeziumEnabledSchema) { + this.rowType = rowType; + this.ignoreParseErrors = ignoreParseErrors; + this.jsonDeserializer = + new JsonDeserializationSchema(false, ignoreParseErrors, createJsonRowType(rowType)); + this.debeziumEnabledSchema = debeziumEnabledSchema; } @Override @@ -72,15 +83,15 @@ public void deserialize(byte[] message, Collector out) throws IOEx } try { - ObjectNode jsonNode = (ObjectNode) convertBytes(message); - String op = jsonNode.get("op").asText(); + JsonNode payload = getPayload(convertBytes(message)); + String op = payload.get("op").asText(); if (OP_CREATE.equals(op) || OP_READ.equals(op)) { - SeaTunnelRow insert = convertJsonNode(jsonNode.get("after")); + SeaTunnelRow insert = convertJsonNode(payload.get("after")); insert.setRowKind(RowKind.INSERT); out.collect(insert); } else if (OP_UPDATE.equals(op)) { - SeaTunnelRow before = convertJsonNode(jsonNode.get("before")); + SeaTunnelRow before = convertJsonNode(payload.get("before")); if (before == null) { throw new SeaTunnelJsonFormatException( CommonErrorCode.UNSUPPORTED_DATA_TYPE, @@ -89,11 +100,11 @@ public void deserialize(byte[] message, Collector out) throws IOEx before.setRowKind(RowKind.UPDATE_BEFORE); out.collect(before); - SeaTunnelRow after = convertJsonNode(jsonNode.get("after")); + SeaTunnelRow after = convertJsonNode(payload.get("after")); after.setRowKind(RowKind.UPDATE_AFTER); out.collect(after); } else if (OP_DELETE.equals(op)) { - SeaTunnelRow delete = convertJsonNode(jsonNode.get("before")); + SeaTunnelRow delete = convertJsonNode(payload.get("before")); if (delete == null) { throw new SeaTunnelJsonFormatException( CommonErrorCode.UNSUPPORTED_DATA_TYPE, @@ -121,6 +132,13 @@ public void deserialize(byte[] message, Collector out) throws IOEx } } + private JsonNode getPayload(JsonNode jsonNode) { + if (debeziumEnabledSchema) { + return jsonNode.get("payload"); + } + return jsonNode; + } + private JsonNode convertBytes(byte[] message) { try { return jsonDeserializer.deserializeToJsonNode(message); diff --git a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java index e4210597d0b9..20088e525bfb 100644 --- a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java @@ -69,7 +69,7 @@ private void testSerializationDeserialization(String resourceFile, boolean schem throws Exception { List lines = readLines(resourceFile); DebeziumJsonDeserializationSchema deserializationSchema = - new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, schemaInclude); + new DebeziumJsonDeserializationSchema(PHYSICAL_DATA_TYPE, true, schemaInclude); SimpleCollector collector = new SimpleCollector();