diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java index df73772e071..dac939777f7 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.SingleChoiceOption; import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; import org.apache.seatunnel.connectors.cdc.base.option.StopMode; @@ -234,7 +235,7 @@ public class MongodbSourceOptions extends SourceOptions { .withDescription( "Decides if the table options contains Debezium client properties that start with prefix 'debezium'."); - public static final Option STARTUP_MODE = + public static final SingleChoiceOption STARTUP_MODE = Options.key(SourceOptions.STARTUP_MODE_KEY) .singleChoice( StartupMode.class, @@ -245,7 +246,7 @@ public class MongodbSourceOptions extends SourceOptions { "Optional startup mode for CDC source, valid enumerations are " + "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or \"specific\""); - public static final Option STOP_MODE = + public static final SingleChoiceOption STOP_MODE = Options.key(SourceOptions.STOP_MODE_KEY) .singleChoice(StopMode.class, Collections.singletonList(StopMode.NEVER)) .defaultValue(StopMode.NEVER) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java index dd7f985f176..c01b36ef188 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java @@ -34,7 +34,9 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; @@ -78,7 +80,7 @@ public class MongodbCDCIT extends TestSuiteBase implements TestResource { // ---------------------------------------------------------------------------- // mysql - private static final String MYSQL_HOST = "mysql_cdc_e2e"; + private static final String MYSQL_HOST = "mysql_e2e"; private static final String MYSQL_USER_NAME = "st_user"; @@ -104,8 +106,10 @@ private static MySqlContainer createMySqlContainer() { mySqlContainer.withDatabaseName(MYSQL_DATABASE); mySqlContainer.withUsername(MYSQL_USER_NAME); mySqlContainer.withPassword(MYSQL_USER_PASSWORD); + mySqlContainer.withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mysql-Docker-Image"))); // For local test use - // mySqlContainer.setPortBindings(Collections.singletonList("3308:3306")); + mySqlContainer.setPortBindings(Collections.singletonList("3310:3306")); return mySqlContainer; } @@ -134,6 +138,9 @@ public void startUp() { mongodbContainer = new MongoDBContainer(NETWORK); // For local test use mongodbContainer.setPortBindings(Collections.singletonList("27017:27017")); + mongodbContainer.withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mongodb-Docker-Image"))); + Startables.deepStart(Stream.of(mongodbContainer)).join(); mongodbContainer.executeCommandFileInSeparateDatabase(MONGODB_DATABASE); initConnection(); @@ -213,6 +220,7 @@ private List> querySql() { for (int i = 1; i <= columnCount; i++) { objects.add(resultSet.getObject(i)); } + log.info("Print mysql sink data:" + objects); result.add(objects); } return result; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf index 7e4a492390b..12846c6a0c2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf @@ -14,9 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -###### -###### This config file is a demonstration of streaming processing in seatunnel config -###### env { # You can set engine configuration here @@ -45,11 +42,10 @@ source { sink { jdbc { - url = "jdbc:mysql://mysql_cdc_e2e:3306?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8" + url = "jdbc:mysql://mysql_e2e:3306/mongodb_cdc" driver = "com.mysql.cj.jdbc.Driver" user = "st_user" password = "seatunnel" - generate_sink_sql = true # You need to configure both database and table database = mongodb_cdc