From 12268a6f4b6fd47f7bb797a13b788cbac33bbce4 Mon Sep 17 00:00:00 2001
From: FWLamb <541947595@qq.com>
Date: Tue, 8 Nov 2022 19:14:33 +0800
Subject: [PATCH] [Feature][Connector-V2][Cassandra] Add Cassandra Source And
Sink Connector (#3229)
---
docs/en/Connector-v2-release-state.md | 4 +-
docs/en/connector-v2/sink/Cassandra.md | 98 +++++
docs/en/connector-v2/source/Cassandra.md | 82 ++++
plugin-mapping.properties | 4 +-
.../connector-cassandra/pom.xml | 52 +++
.../cassandra/client/CassandraClient.java | 65 ++++
.../cassandra/config/CassandraConfig.java | 116 ++++++
.../cassandra/sink/CassandraSink.java | 110 ++++++
.../cassandra/sink/CassandraSinkWriter.java | 148 +++++++
.../cassandra/source/CassandraSource.java | 102 +++++
.../source/CassandraSourceReader.java | 76 ++++
.../cassandra/util/TypeConvertUtil.java | 309 +++++++++++++++
seatunnel-connectors-v2/pom.xml | 1 +
.../connector-cassandra-e2e/pom.xml | 45 +++
.../seatunnel/cassandra/CassandraIT.java | 365 ++++++++++++++++++
.../src/test/resources/application.conf | 25 ++
.../resources/cassandra_to_cassandra.conf | 52 +++
.../test/resources/init/cassandra_init.conf | 105 +++++
.../seatunnel-connector-v2-e2e/pom.xml | 1 +
19 files changed, 1758 insertions(+), 2 deletions(-)
create mode 100644 docs/en/connector-v2/sink/Cassandra.md
create mode 100644 docs/en/connector-v2/source/Cassandra.md
create mode 100644 seatunnel-connectors-v2/connector-cassandra/pom.xml
create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java
create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java
create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java
create mode 100644 seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/util/TypeConvertUtil.java
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/application.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/init/cassandra_init.conf
diff --git a/docs/en/Connector-v2-release-state.md b/docs/en/Connector-v2-release-state.md
index 4d861d3946a..2691192a336 100644
--- a/docs/en/Connector-v2-release-state.md
+++ b/docs/en/Connector-v2-release-state.md
@@ -58,4 +58,6 @@ SeaTunnel uses a grading system for connectors to help you understand what to ex
| [Kafka](connector-v2/source/kafka.md) | Source | Alpha | 2.3.0-beta |
| [Kafka](connector-v2/sink/Kafka.md) | Sink | Alpha | 2.3.0-beta |
| [S3File](connector-v2/source/S3File.md) | Source | Alpha | 2.3.0-beta |
-| [S3File](connector-v2/sink/S3File.md) | Sink | Alpha | 2.3.0-beta |
\ No newline at end of file
+| [S3File](connector-v2/sink/S3File.md) | Sink | Alpha | 2.3.0-beta |
+| [Cassandra](connector-v2/source/Cassandra.md) | Source | Alpha | 2.3.0-beta |
+| [Cassandra](connector-v2/sink/Cassandra.md) | Sink | Alpha | 2.3.0-beta |
diff --git a/docs/en/connector-v2/sink/Cassandra.md b/docs/en/connector-v2/sink/Cassandra.md
new file mode 100644
index 00000000000..0a4ece086ec
--- /dev/null
+++ b/docs/en/connector-v2/sink/Cassandra.md
@@ -0,0 +1,98 @@
+# Cassandra
+
+> Cassandra sink connector
+
+## Description
+
+Write data to Apache Cassandra.
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|-------------------|--------|----------|---------------|
+| host | String | Yes | - |
+| keyspace | String | Yes | - |
+| table | String | Yes | - |
+| username | String | No | - |
+| password | String | No | - |
+| datacenter | String | No | datacenter1 |
+| consistency_level | String | No | LOCAL_ONE |
+| fields | String | No | LOCAL_ONE |
+| batch_size | String | No | 5000 |
+| batch_type | String | No | UNLOGGER |
+| async_write | String | No | true |
+
+### host [string]
+
+`Cassandra` cluster address, the format is `host:port` , allowing multiple `hosts` to be specified. Such as
+`"cassandra1:9042,cassandra2:9042"`.
+
+### keyspace [string]
+
+The `Cassandra` keyspace.
+
+### table [String]
+
+The `Cassandra` table name.
+
+### username [string]
+
+`Cassandra` user username.
+
+### password [string]
+
+`Cassandra` user password.
+
+### datacenter [String]
+
+The `Cassandra` datacenter, default is `datacenter1`.
+
+### consistency_level [String]
+
+The `Cassandra` write consistency level, default is `LOCAL_ONE`.
+
+### fields [array]
+
+The data field that needs to be output to `Cassandra` , if not configured, it will be automatically adapted
+according to the sink table `schema`.
+
+### batch_size [number]
+
+The number of rows written through [Cassandra-Java-Driver](https://github.com/datastax/java-driver) each time,
+default is `5000`.
+
+### batch_type [String]
+
+The `Cassandra` batch processing mode, default is `UNLOGGER`.
+
+### async_write [boolean]
+
+Whether `cassandra` writes in asynchronous mode, default is `true`.
+
+## Examples
+
+```hocon
+sink {
+ Cassandra {
+ host = "localhost:9042"
+ username = "cassandra"
+ password = "cassandra"
+ datacenter = "datacenter1"
+ keyspace = "test"
+ }
+}
+```
+
+## Changelog
+
+### next version
+
+- Add Cassandra Sink Connector
+
+
+
diff --git a/docs/en/connector-v2/source/Cassandra.md b/docs/en/connector-v2/source/Cassandra.md
new file mode 100644
index 00000000000..e2a6e4e8c77
--- /dev/null
+++ b/docs/en/connector-v2/source/Cassandra.md
@@ -0,0 +1,82 @@
+# Cassandra
+
+> Cassandra source connector
+
+## Description
+
+Read data from Apache Cassandra.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|-------------------------|--------|----------|---------------|
+| host | String | Yes | - |
+| keyspace | String | Yes | - |
+| cql | String | Yes | - |
+| username | String | No | - |
+| password | String | No | - |
+| datacenter | String | No | datacenter1 |
+| consistency_level | String | No | LOCAL_ONE |
+
+### host [string]
+
+`Cassandra` cluster address, the format is `host:port` , allowing multiple `hosts` to be specified. Such as
+`"cassandra1:9042,cassandra2:9042"`.
+
+### keyspace [string]
+
+The `Cassandra` keyspace.
+
+### cql [String]
+
+The query cql used to search data though Cassandra session.
+
+### username [string]
+
+`Cassandra` user username.
+
+### password [string]
+
+`Cassandra` user password.
+
+### datacenter [String]
+
+The `Cassandra` datacenter, default is `datacenter1`.
+
+### consistency_level [String]
+
+The `Cassandra` write consistency level, default is `LOCAL_ONE`.
+
+## Examples
+
+```hocon
+source {
+ Cassandra {
+ host = "localhost:9042"
+ username = "cassandra"
+ password = "cassandra"
+ datacenter = "datacenter1"
+ keyspace = "test"
+ cql = "select * from source_table"
+ result_table_name = "source_table"
+ }
+}
+```
+
+## Changelog
+
+### next version
+
+- Add Cassandra Source Connector
+
+
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 1ac1e268725..c943de4e35a 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -139,5 +139,7 @@ seatunnel.source.S3File = connector-file-s3
seatunnel.sink.S3File = connector-file-s3
seatunnel.source.Amazondynamodb = connector-amazondynamodb
seatunnel.sink.Amazondynamodb = connector-amazondynamodb
+seatunnel.source.Cassandra = connector-cassandra
+seatunnel.sink.Cassandra = connector-cassandra
seatunnel.sink.StarRocks = connector-starrocks
-seatunnel.sink.InfluxDB = connector-influxdb
+seatunnel.sink.InfluxDB = connector-influxdb
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-cassandra/pom.xml b/seatunnel-connectors-v2/connector-cassandra/pom.xml
new file mode 100644
index 00000000000..4be1ba14c7e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/pom.xml
@@ -0,0 +1,52 @@
+
+
+
+
+ seatunnel-connectors-v2
+ org.apache.seatunnel
+ ${revision}
+
+ 4.0.0
+
+ connector-cassandra
+
+
+ 4.14.0
+
+
+
+
+ com.datastax.oss
+ java-driver-core
+ ${cassandra.driver.version}
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ org.apache.seatunnel
+ connector-common
+ ${project.version}
+
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java
new file mode 100644
index 00000000000..a806b48fe62
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.client;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+public class CassandraClient {
+ public static CqlSessionBuilder getCqlSessionBuilder(String nodeAddress, String keyspace, String username, String password, String dataCenter) {
+ List cqlSessionBuilderList = Arrays.stream(nodeAddress.split(",")).map(address -> {
+ String[] nodeAndPort = address.split(":", 2);
+ if (StringUtils.isEmpty(username) && StringUtils.isEmpty(password)) {
+ return CqlSession.builder()
+ .addContactPoint(new InetSocketAddress(nodeAndPort[0], Integer.parseInt(nodeAndPort[1])))
+ .withKeyspace(keyspace)
+ .withLocalDatacenter(dataCenter);
+ }
+ return CqlSession.builder()
+ .addContactPoint(new InetSocketAddress(nodeAndPort[0], Integer.parseInt(nodeAndPort[1])))
+ .withAuthCredentials(username, password)
+ .withKeyspace(keyspace)
+ .withLocalDatacenter(dataCenter);
+ }).collect(Collectors.toList());
+ return cqlSessionBuilderList.get(ThreadLocalRandom.current().nextInt(cqlSessionBuilderList.size()));
+ }
+
+ public static SimpleStatement createSimpleStatement(String cql, ConsistencyLevel consistencyLevel) {
+ return SimpleStatement.builder(cql).setConsistencyLevel(consistencyLevel).build();
+ }
+
+ public static ColumnDefinitions getTableSchema(CqlSession session, String table) {
+ try {
+ return session.execute(String.format("select * from %s limit 1", table))
+ .getColumnDefinitions();
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot get table schema from cassandra", e);
+ }
+
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
new file mode 100644
index 00000000000..c2d6c129f06
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
+import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.ToString;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@ToString
+@NoArgsConstructor
+public class CassandraConfig implements Serializable {
+
+ public static final String HOST = "host";
+ public static final String USERNAME = "username";
+ public static final String PASSWORD = "password";
+ public static final String DATACENTER = "datacenter";
+ public static final String KEYSPACE = "keyspace";
+ public static final String TABLE = "table";
+ public static final String CQL = "cql";
+ public static final String FIELDS = "fields";
+ public static final String CONSISTENCY_LEVEL = "consistency_level";
+ public static final String BATCH_SIZE = "batch_size";
+ public static final String BATCH_TYPE = "batch_type";
+ public static final String ASYNC_WRITE = "async_write";
+
+ private String host;
+ private String username;
+ private String password;
+ private String datacenter;
+ private String keyspace;
+ private String table;
+ private String cql;
+ private List fields;
+ private ConsistencyLevel consistencyLevel;
+ private Integer batchSize;
+ private DefaultBatchType batchType;
+ private Boolean asyncWrite;
+
+ public CassandraConfig(@NonNull String host, @NonNull String keyspace) {
+ this.host = host;
+ this.keyspace = keyspace;
+ }
+
+ public static CassandraConfig getCassandraConfig(Config config) {
+ CassandraConfig cassandraConfig = new CassandraConfig(
+ config.getString(HOST),
+ config.getString(KEYSPACE)
+ );
+ if (config.hasPath(USERNAME)) {
+ cassandraConfig.setUsername(config.getString(USERNAME));
+ }
+ if (config.hasPath(PASSWORD)) {
+ cassandraConfig.setPassword(config.getString(PASSWORD));
+ }
+ if (config.hasPath(DATACENTER)) {
+ cassandraConfig.setDatacenter(config.getString(DATACENTER));
+ } else {
+ cassandraConfig.setDatacenter("datacenter1");
+ }
+ if (config.hasPath(TABLE)) {
+ cassandraConfig.setTable(config.getString(TABLE));
+ }
+ if (config.hasPath(CQL)) {
+ cassandraConfig.setCql(config.getString(CQL));
+ }
+ if (config.hasPath(FIELDS)) {
+ cassandraConfig.setFields(config.getStringList(FIELDS));
+ }
+ if (config.hasPath(CONSISTENCY_LEVEL)) {
+ cassandraConfig.setConsistencyLevel(DefaultConsistencyLevel.valueOf(config.getString(CONSISTENCY_LEVEL)));
+ } else {
+ cassandraConfig.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_ONE);
+ }
+ if (config.hasPath(BATCH_SIZE)) {
+ cassandraConfig.setBatchSize(config.getInt(BATCH_SIZE));
+ } else {
+ cassandraConfig.setBatchSize(Integer.parseInt("5000"));
+ }
+ if (config.hasPath(BATCH_TYPE)) {
+ cassandraConfig.setBatchType(DefaultBatchType.valueOf(config.getString(BATCH_TYPE)));
+ } else {
+ cassandraConfig.setBatchType(DefaultBatchType.UNLOGGED);
+ }
+ if (config.hasPath(ASYNC_WRITE)) {
+ cassandraConfig.setAsyncWrite(config.getBoolean(ASYNC_WRITE));
+ } else {
+ cassandraConfig.setAsyncWrite(true);
+ }
+ return cassandraConfig;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
new file mode 100644
index 00000000000..b9c1154b50c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;
+
+import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST;
+import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE;
+import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.TABLE;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@AutoService(SeaTunnelSink.class)
+public class CassandraSink extends AbstractSimpleSink {
+
+ private CassandraConfig cassandraConfig;
+
+ private SeaTunnelRowType seaTunnelRowType;
+
+ private ColumnDefinitions tableSchema;
+
+ @Override
+ public String getPluginName() {
+ return "Cassandra";
+ }
+
+ @Override
+ public void prepare(Config config) throws PrepareFailException {
+ CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, KEYSPACE, TABLE);
+ if (!checkResult.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK, checkResult.getMsg());
+ }
+ this.cassandraConfig = CassandraConfig.getCassandraConfig(config);
+ try (CqlSession session = CassandraClient.getCqlSessionBuilder(
+ cassandraConfig.getHost(),
+ cassandraConfig.getKeyspace(),
+ cassandraConfig.getUsername(),
+ cassandraConfig.getPassword(),
+ cassandraConfig.getDatacenter()
+ ).build()) {
+ List fields = cassandraConfig.getFields();
+ this.tableSchema = CassandraClient.getTableSchema(session, cassandraConfig.getTable());
+ if (fields == null || fields.isEmpty()) {
+ List newFields = new ArrayList<>();
+ for (int i = 0; i < tableSchema.size(); i++) {
+ newFields.add(tableSchema.get(i).getName().asInternal());
+ }
+ cassandraConfig.setFields(newFields);
+ } else {
+ for (String field : fields) {
+ if (!tableSchema.contains(field)) {
+ throw new RuntimeException("Field " + field + " does not exist in table " + config.getString(TABLE));
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK, e.getMessage());
+ }
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType getConsumedType() {
+ return this.seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException {
+ return new CassandraSinkWriter(cassandraConfig, seaTunnelRowType, tableSchema);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java
new file mode 100644
index 00000000000..10639f7f054
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
+import com.datastax.oss.driver.api.core.cql.BatchStatement;
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
+import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.type.DataType;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class CassandraSinkWriter extends AbstractSinkWriter {
+
+ private final CassandraConfig cassandraConfig;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final ColumnDefinitions tableSchema;
+ private final CqlSession session;
+ private BatchStatement batchStatement;
+ private List boundStatementList;
+ private List> completionStages;
+ private final PreparedStatement preparedStatement;
+ private final AtomicInteger counter = new AtomicInteger(0);
+
+ public CassandraSinkWriter(CassandraConfig cassandraConfig, SeaTunnelRowType seaTunnelRowType, ColumnDefinitions tableSchema) {
+ this.cassandraConfig = cassandraConfig;
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.tableSchema = tableSchema;
+ this.session = CassandraClient.getCqlSessionBuilder(
+ cassandraConfig.getHost(),
+ cassandraConfig.getKeyspace(),
+ cassandraConfig.getUsername(),
+ cassandraConfig.getPassword(),
+ cassandraConfig.getDatacenter()).build();
+ this.batchStatement = BatchStatement.builder(cassandraConfig.getBatchType()).build();
+ this.boundStatementList = new ArrayList<>();
+ this.completionStages = new ArrayList<>();
+ this.preparedStatement = session.prepare(initPrepareCQL());
+ }
+
+ @Override
+ public void write(SeaTunnelRow row) throws IOException {
+ BoundStatement boundStatement = this.preparedStatement.bind();
+ addIntoBatch(row, boundStatement);
+ if (counter.getAndIncrement() >= cassandraConfig.getBatchSize()) {
+ flush();
+ counter.set(0);
+ }
+ }
+
+ private void flush() {
+ if (cassandraConfig.getAsyncWrite()) {
+ completionStages.forEach(resultStage -> resultStage.whenComplete(
+ (resultSet, error) -> {
+ if (error != null) {
+ log.error(ExceptionUtils.getMessage(error));
+ }
+ }
+ ));
+ completionStages.clear();
+ } else {
+ try {
+ this.session.execute(this.batchStatement.addAll(boundStatementList));
+ } catch (Exception e) {
+ log.error("Batch insert error,Try inserting one by one!");
+ for (BoundStatement statement : boundStatementList) {
+ this.session.execute(statement);
+ }
+ } finally {
+ this.batchStatement.clear();
+ this.boundStatementList.clear();
+ }
+ }
+
+ }
+
+ private void addIntoBatch(SeaTunnelRow row, BoundStatement boundStatement) {
+ try {
+ for (int i = 0; i < cassandraConfig.getFields().size(); i++) {
+ String fieldName = cassandraConfig.getFields().get(i);
+ DataType dataType = tableSchema.get(i).getType();
+ Object fieldValue = row.getField(seaTunnelRowType.indexOf(fieldName));
+ boundStatement = TypeConvertUtil.reconvertAndInject(boundStatement, i, dataType, fieldValue);
+ }
+ if (cassandraConfig.getAsyncWrite()) {
+ completionStages.add(session.executeAsync(boundStatement));
+ } else {
+ boundStatementList.add(boundStatement);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Add row data into batch error!", e);
+ }
+ }
+
+ private String initPrepareCQL() {
+ String[] placeholder = new String[cassandraConfig.getFields().size()];
+ Arrays.fill(placeholder, "?");
+ return String.format("INSERT INTO %s (%s) VALUES (%s)",
+ cassandraConfig.getTable(),
+ String.join(",", cassandraConfig.getFields()),
+ String.join(",", placeholder));
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ try {
+ if (this.session != null) {
+ this.session.close();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to close CqlSession!", e);
+ }
+
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
new file mode 100644
index 00000000000..163433c4744
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.CQL;
+import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST;
+import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
+public class CassandraSource extends AbstractSingleSplitSource {
+
+ private SeaTunnelRowType rowTypeInfo;
+ private CassandraConfig cassandraConfig;
+
+ @Override
+ public String getPluginName() {
+ return "Cassandra";
+ }
+
+ @Override
+ public void prepare(Config config) throws PrepareFailException {
+ CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, KEYSPACE, CQL);
+ if (!checkResult.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE, checkResult.getMsg());
+ }
+ this.cassandraConfig = CassandraConfig.getCassandraConfig(config);
+ try (CqlSession currentSession = CassandraClient.getCqlSessionBuilder(
+ cassandraConfig.getHost(),
+ cassandraConfig.getKeyspace(),
+ cassandraConfig.getUsername(),
+ cassandraConfig.getPassword(),
+ cassandraConfig.getDatacenter()).build()) {
+ Row rs = currentSession.execute(CassandraClient.createSimpleStatement(cassandraConfig.getCql(), cassandraConfig.getConsistencyLevel())).one();
+ if (rs == null) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "No data in the table!");
+ }
+ int columnSize = rs.getColumnDefinitions().size();
+ String[] fieldNames = new String[columnSize];
+ SeaTunnelDataType>[] seaTunnelDataTypes = new SeaTunnelDataType[columnSize];
+ for (int i = 0; i < columnSize; i++) {
+ fieldNames[i] = rs.getColumnDefinitions().get(i).getName().asInternal();
+ seaTunnelDataTypes[i] = TypeConvertUtil.convert(rs.getColumnDefinitions().get(i).getType());
+ }
+ this.rowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
+ } catch (Exception e) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE, e.getMessage());
+ }
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SeaTunnelDataType getProducedType() {
+ return this.rowTypeInfo;
+ }
+
+ @Override
+ public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) throws Exception {
+ return new CassandraSourceReader(cassandraConfig, readerContext);
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java
new file mode 100644
index 00000000000..e3f95629ad4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+@Slf4j
+public class CassandraSourceReader extends AbstractSingleSplitReader {
+ private final CassandraConfig cassandraConfig;
+ private final SingleSplitReaderContext readerContext;
+ private CqlSession session;
+
+ CassandraSourceReader(CassandraConfig cassandraConfig, SingleSplitReaderContext readerContext) {
+ this.cassandraConfig = cassandraConfig;
+ this.readerContext = readerContext;
+ }
+
+ @Override
+ public void open() throws Exception {
+ session = CassandraClient.getCqlSessionBuilder(
+ cassandraConfig.getHost(),
+ cassandraConfig.getKeyspace(),
+ cassandraConfig.getUsername(),
+ cassandraConfig.getPassword(),
+ cassandraConfig.getDatacenter()
+ ).build();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (session != null) {
+ session.close();
+ }
+ }
+
+ @Override
+ public void pollNext(Collector output) throws Exception {
+ try {
+ ResultSet resultSet = session.execute(CassandraClient.createSimpleStatement(cassandraConfig.getCql(), cassandraConfig.getConsistencyLevel()));
+ resultSet.forEach(row -> output.collect(TypeConvertUtil.buildSeaTunnelRow(row)));
+ } finally {
+ this.readerContext.signalNoMoreElement();
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/util/TypeConvertUtil.java b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/util/TypeConvertUtil.java
new file mode 100644
index 00000000000..1e492c62e0c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/util/TypeConvertUtil.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.util;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.type.DataType;
+import com.datastax.oss.driver.internal.core.type.DefaultListType;
+import com.datastax.oss.driver.internal.core.type.DefaultMapType;
+import com.datastax.oss.driver.internal.core.type.DefaultSetType;
+import com.datastax.oss.protocol.internal.ProtocolConstants;
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class TypeConvertUtil {
+ public static SeaTunnelDataType> convert(DataType type) {
+ switch (type.getProtocolCode()) {
+ case ProtocolConstants.DataType.VARCHAR:
+ case ProtocolConstants.DataType.VARINT:
+ case ProtocolConstants.DataType.ASCII:
+ case ProtocolConstants.DataType.UUID:
+ case ProtocolConstants.DataType.INET:
+ case ProtocolConstants.DataType.TIMEUUID:
+ return BasicType.STRING_TYPE;
+ case ProtocolConstants.DataType.TINYINT:
+ return BasicType.BYTE_TYPE;
+ case ProtocolConstants.DataType.SMALLINT:
+ return BasicType.SHORT_TYPE;
+ case ProtocolConstants.DataType.INT:
+ return BasicType.INT_TYPE;
+ case ProtocolConstants.DataType.BIGINT:
+ case ProtocolConstants.DataType.COUNTER:
+ return BasicType.LONG_TYPE;
+ case ProtocolConstants.DataType.FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case ProtocolConstants.DataType.DOUBLE:
+ case ProtocolConstants.DataType.DECIMAL:
+ return BasicType.DOUBLE_TYPE;
+ case ProtocolConstants.DataType.BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case ProtocolConstants.DataType.TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case ProtocolConstants.DataType.DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case ProtocolConstants.DataType.TIMESTAMP:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case ProtocolConstants.DataType.BLOB:
+ return ArrayType.BYTE_ARRAY_TYPE;
+ case ProtocolConstants.DataType.MAP:
+ return new MapType<>(convert(((DefaultMapType) type).getKeyType()), convert(((DefaultMapType) type).getValueType()));
+ case ProtocolConstants.DataType.LIST:
+ return convertToArrayType(convert(((DefaultListType) type).getElementType()));
+ case ProtocolConstants.DataType.SET:
+ return convertToArrayType(convert(((DefaultSetType) type).getElementType()));
+ default:
+ throw new RuntimeException("not supported data type: " + type);
+ }
+ }
+
+ private static ArrayType, ?> convertToArrayType(SeaTunnelDataType> dataType) {
+ if (dataType.equals(BasicType.STRING_TYPE)) {
+ return ArrayType.STRING_ARRAY_TYPE;
+ } else if (dataType.equals(BasicType.BYTE_TYPE)) {
+ return ArrayType.BYTE_ARRAY_TYPE;
+ } else if (dataType.equals(BasicType.SHORT_TYPE)) {
+ return ArrayType.SHORT_ARRAY_TYPE;
+ } else if (dataType.equals(BasicType.INT_TYPE)) {
+ return ArrayType.INT_ARRAY_TYPE;
+ } else if (dataType.equals(BasicType.LONG_TYPE)) {
+ return ArrayType.LONG_ARRAY_TYPE;
+ } else if (dataType.equals(BasicType.FLOAT_TYPE)) {
+ return ArrayType.FLOAT_ARRAY_TYPE;
+ } else if (dataType.equals(BasicType.DOUBLE_TYPE)) {
+ return ArrayType.DOUBLE_ARRAY_TYPE;
+ } else if (dataType.equals(BasicType.BOOLEAN_TYPE)) {
+ return ArrayType.BOOLEAN_ARRAY_TYPE;
+ } else {
+ throw new RuntimeException("not supported data type: " + dataType);
+ }
+ }
+
+ public static SeaTunnelRow buildSeaTunnelRow(Row row) {
+ DataType subType;
+ Class> typeClass;
+ Object[] fields = new Object[row.size()];
+ ColumnDefinitions metaData = row.getColumnDefinitions();
+ for (int i = 0; i < row.size(); i++) {
+ switch (metaData.get(i).getType().getProtocolCode()) {
+ case ProtocolConstants.DataType.ASCII:
+ case ProtocolConstants.DataType.VARCHAR:
+ fields[i] = row.getString(i);
+ break;
+ case ProtocolConstants.DataType.VARINT:
+ fields[i] = Objects.requireNonNull(row.getBigInteger(i)).toString();
+ break;
+ case ProtocolConstants.DataType.TIMEUUID:
+ case ProtocolConstants.DataType.UUID:
+ fields[i] = Objects.requireNonNull(row.getUuid(i)).toString();
+ break;
+ case ProtocolConstants.DataType.INET:
+ fields[i] = Objects.requireNonNull(row.getInetAddress(i)).getHostAddress();
+ break;
+ case ProtocolConstants.DataType.TINYINT:
+ fields[i] = row.getByte(i);
+ break;
+ case ProtocolConstants.DataType.SMALLINT:
+ fields[i] = row.getShort(i);
+ break;
+ case ProtocolConstants.DataType.INT:
+ fields[i] = row.getInt(i);
+ break;
+ case ProtocolConstants.DataType.BIGINT:
+ fields[i] = row.getLong(i);
+ break;
+ case ProtocolConstants.DataType.FLOAT:
+ fields[i] = row.getFloat(i);
+ break;
+ case ProtocolConstants.DataType.DOUBLE:
+ fields[i] = row.getDouble(i);
+ break;
+ case ProtocolConstants.DataType.DECIMAL:
+ fields[i] = Objects.requireNonNull(row.getBigDecimal(i)).doubleValue();
+ break;
+ case ProtocolConstants.DataType.BOOLEAN:
+ fields[i] = row.getBoolean(i);
+ break;
+ case ProtocolConstants.DataType.TIME:
+ fields[i] = row.getLocalTime(i);
+ break;
+ case ProtocolConstants.DataType.DATE:
+ fields[i] = row.getLocalDate(i);
+ break;
+ case ProtocolConstants.DataType.TIMESTAMP:
+ fields[i] = Timestamp.from(Objects.requireNonNull(row.getInstant(i))).toLocalDateTime();
+ break;
+ case ProtocolConstants.DataType.BLOB:
+ fields[i] = ArrayUtils.toObject(Objects.requireNonNull(row.getByteBuffer(i)).array());
+ break;
+ case ProtocolConstants.DataType.MAP:
+ subType = metaData.get(i).getType();
+ fields[i] = row.getMap(i, convert(((DefaultMapType) subType).getKeyType()).getTypeClass(), convert(((DefaultMapType) subType).getValueType()).getTypeClass());
+ break;
+ case ProtocolConstants.DataType.LIST:
+ typeClass = convert(((DefaultListType) metaData.get(i).getType()).getElementType()).getTypeClass();
+ if (String.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getList(i, String.class)).toArray(new String[0]);
+ } else if (Byte.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getList(i, Byte.class)).toArray(new Byte[0]);
+ } else if (Short.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getList(i, Short.class)).toArray(new Short[0]);
+ } else if (Integer.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getList(i, Integer.class)).toArray(new Integer[0]);
+ } else if (Long.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getList(i, Long.class)).toArray(new Long[0]);
+ } else if (Float.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getList(i, Float.class)).toArray(new Float[0]);
+ } else if (Double.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getList(i, Double.class)).toArray(new Double[0]);
+ } else if (Boolean.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getList(i, Boolean.class)).toArray(new Boolean[0]);
+ } else {
+ throw new RuntimeException("List not supported data type: " + typeClass.toString());
+ }
+ break;
+ case ProtocolConstants.DataType.SET:
+ typeClass = convert(((DefaultSetType) metaData.get(i).getType()).getElementType()).getTypeClass();
+ if (String.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getSet(i, String.class)).toArray(new String[0]);
+ } else if (Byte.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getSet(i, Byte.class)).toArray(new Byte[0]);
+ } else if (Short.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getSet(i, Short.class)).toArray(new Short[0]);
+ } else if (Integer.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getSet(i, Integer.class)).toArray(new Integer[0]);
+ } else if (Long.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getSet(i, Long.class)).toArray(new Long[0]);
+ } else if (Float.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getSet(i, Float.class)).toArray(new Float[0]);
+ } else if (Double.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getSet(i, Double.class)).toArray(new Double[0]);
+ } else if (Boolean.class.equals(typeClass)) {
+ fields[i] = Objects.requireNonNull(row.getSet(i, Boolean.class)).toArray(new Boolean[0]);
+ } else {
+ throw new RuntimeException("List not supported data type: " + typeClass.toString());
+ }
+ break;
+ default:
+ fields[i] = row.getObject(i);
+ }
+ }
+ return new SeaTunnelRow(fields);
+ }
+
+ public static BoundStatement reconvertAndInject(BoundStatement statement, int index, DataType type, Object fileValue) {
+ switch (type.getProtocolCode()) {
+ case ProtocolConstants.DataType.VARCHAR:
+ case ProtocolConstants.DataType.ASCII:
+ statement = statement.setString(index, (String) fileValue);
+ return statement;
+ case ProtocolConstants.DataType.VARINT:
+ statement = statement.setBigInteger(index, new BigInteger((String) fileValue));
+ return statement;
+ case ProtocolConstants.DataType.UUID:
+ case ProtocolConstants.DataType.TIMEUUID:
+ statement = statement.setUuid(index, UUID.fromString((String) fileValue));
+ return statement;
+ case ProtocolConstants.DataType.INET:
+ try {
+ statement = statement.setInetAddress(index, InetAddress.getByName((String) fileValue));
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ return statement;
+ case ProtocolConstants.DataType.TINYINT:
+ statement = statement.setByte(index, (Byte) fileValue);
+ return statement;
+ case ProtocolConstants.DataType.SMALLINT:
+ statement = statement.setShort(index, (Short) fileValue);
+ return statement;
+ case ProtocolConstants.DataType.INT:
+ statement = statement.setInt(index, (Integer) fileValue);
+ return statement;
+ case ProtocolConstants.DataType.BIGINT:
+ case ProtocolConstants.DataType.COUNTER:
+ statement = statement.setLong(index, (Long) fileValue);
+ return statement;
+ case ProtocolConstants.DataType.FLOAT:
+ statement = statement.setFloat(index, (Float) fileValue);
+ return statement;
+ case ProtocolConstants.DataType.DOUBLE:
+ statement = statement.setDouble(index, (Double) fileValue);
+ return statement;
+ case ProtocolConstants.DataType.DECIMAL:
+ statement = statement.setBigDecimal(index, BigDecimal.valueOf((Double) fileValue));
+ return statement;
+ case ProtocolConstants.DataType.BOOLEAN:
+ statement = statement.setBoolean(index, (Boolean) fileValue);
+ return statement;
+ case ProtocolConstants.DataType.TIME:
+ statement = statement.setLocalTime(index, (LocalTime) fileValue);
+ return statement;
+ case ProtocolConstants.DataType.DATE:
+ statement = statement.setLocalDate(index, (LocalDate) fileValue);
+ return statement;
+ case ProtocolConstants.DataType.TIMESTAMP:
+ statement = statement.setInstant(index, ((LocalDateTime) fileValue).atZone(ZoneId.systemDefault()).toInstant());
+ return statement;
+ case ProtocolConstants.DataType.BLOB:
+ if (fileValue.getClass().equals(Object[].class)) {
+ fileValue = Arrays.stream((Object[]) fileValue).toArray(Byte[]::new);
+ }
+ statement = statement.setByteBuffer(index, ByteBuffer.wrap(ArrayUtils.toPrimitive((Byte[]) fileValue)));
+ return statement;
+ case ProtocolConstants.DataType.MAP:
+ statement = statement.set(index, (Map, ?>) fileValue, Map.class);
+ return statement;
+ case ProtocolConstants.DataType.LIST:
+ statement = statement.set(index, Arrays.stream((Object[]) fileValue).collect(Collectors.toList()), List.class);
+ return statement;
+ case ProtocolConstants.DataType.SET:
+ statement = statement.set(index, Arrays.stream((Object[]) fileValue).collect(Collectors.toSet()), Set.class);
+ return statement;
+ default:
+ statement = statement.set(index, fileValue, Object.class);
+ return statement;
+ }
+ }
+
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index d2fd18c6a8c..267ead16405 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -57,6 +57,7 @@
connector-iceberg
connector-influxdb
connector-amazondynamodb
+ connector-cassandra
connector-starrocks
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml
new file mode 100644
index 00000000000..d9775f3b2eb
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml
@@ -0,0 +1,45 @@
+
+
+
+
+ seatunnel-connector-v2-e2e
+ org.apache.seatunnel
+ ${revision}
+
+ 4.0.0
+
+ connector-cassandra-e2e
+
+
+
+ org.apache.seatunnel
+ connector-cassandra
+ ${project.version}
+ test
+
+
+ org.testcontainers
+ cassandra
+ ${testcontainer.version}
+ test
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
new file mode 100644
index 00000000000..9e54b914d5b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.cql.BatchStatement;
+import com.datastax.oss.driver.api.core.cql.BatchType;
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.uuid.Uuids;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+@Slf4j
+public class CassandraIT extends TestSuiteBase implements TestResource {
+ private static final String CASSANDRA_DOCKER_IMAGE = "cassandra";
+ private static final String HOST = "cassandra";
+ private static final Integer PORT = 9042;
+ private static final String INIT_CASSANDRA_PATH = "/init/cassandra_init.conf";
+ private static final String CASSANDRA_JOB_CONFIG = "/cassandra_to_cassandra.conf";
+ private static final String CASSANDRA_DRIVER_CONFIG = "/application.conf";
+ private static final String DATACENTER = "datacenter1";
+ private static final String KEYSPACE = "test";
+ private static final String SOURCE_TABLE = "source_table";
+ private static final String SINK_TABLE = "sink_table";
+ private static final String INSERT_CQL = "insert_cql";
+ private static final Tuple2> TEST_DATASET = generateTestDataSet();
+ private Config config;
+ private CassandraContainer> container;
+ private CqlSession session;
+
+ @TestTemplate
+ public void testCassandra(TestContainer container) throws Exception {
+ Container.ExecResult execResult = container.executeJob(CASSANDRA_JOB_CONFIG);
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertNotNull(getRow());
+ compareResult();
+ clearSinkTable();
+ Assertions.assertNull(getRow());
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ this.container = new CassandraContainer<>(CASSANDRA_DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CASSANDRA_DOCKER_IMAGE)));
+ container.setPortBindings(Lists.newArrayList(String.format("%s:%s", PORT, PORT)));
+ Startables.deepStart(Stream.of(this.container)).join();
+ log.info("Cassandra container started");
+ Awaitility.given()
+ .ignoreExceptions()
+ .await()
+ .atMost(180L, TimeUnit.SECONDS)
+ .untilAsserted(this::initConnection);
+ this.initializeCassandraTable();
+ this.batchInsertData();
+ }
+
+ private void initializeCassandraTable() {
+ initCassandraConfig();
+ createKeyspace();
+ try {
+ session.execute(SimpleStatement.builder(config.getString(SOURCE_TABLE)).setKeyspace(KEYSPACE).build());
+ session.execute(SimpleStatement.builder(config.getString(SINK_TABLE)).setKeyspace(KEYSPACE).build());
+ } catch (Exception e) {
+ throw new RuntimeException("Initializing Cassandra table failed!", e);
+ }
+ }
+
+ private void initConnection() {
+ try {
+ File file = new File(CASSANDRA_DRIVER_CONFIG);
+ this.session = CqlSession.builder()
+ .addContactPoint(new InetSocketAddress(container.getHost(), container.getExposedPorts().get(0)))
+ .withLocalDatacenter(DATACENTER)
+ .withConfigLoader(DriverConfigLoader.fromFile(file))
+ .build();
+ } catch (Exception e) {
+ throw new RuntimeException("Init connection failed!", e);
+ }
+
+ }
+
+ private void batchInsertData() {
+ try {
+ BatchStatement batchStatement = BatchStatement.builder(BatchType.UNLOGGED).build();
+ BoundStatement boundStatement = session.prepare(
+ SimpleStatement.builder(config.getString(INSERT_CQL)).setKeyspace(KEYSPACE).build())
+ .bind();
+ for (SeaTunnelRow row : TEST_DATASET._2()) {
+ boundStatement = boundStatement
+ .setLong(0, (Long) row.getField(0))
+ .setString(1, (String) row.getField(1))
+ .setLong(2, (Long) row.getField(2))
+ .setByteBuffer(3, (ByteBuffer) row.getField(3))
+ .setBoolean(4, (Boolean) row.getField(4))
+ .setBigDecimal(5, (BigDecimal) row.getField(5))
+ .setDouble(6, (Double) row.getField(6))
+ .setFloat(7, (Float) row.getField(7))
+ .setInt(8, (Integer) row.getField(8))
+ .setInstant(9, (Instant) row.getField(9))
+ .setUuid(10, (UUID) row.getField(10))
+ .setString(11, (String) row.getField(11))
+ .setBigInteger(12, (BigInteger) row.getField(12))
+ .setUuid(13, (UUID) row.getField(13))
+ .setInetAddress(14, (InetAddress) row.getField(14))
+ .setLocalDate(15, (LocalDate) row.getField(15))
+ .setShort(16, (Short) row.getField(16))
+ .setByte(17, (Byte) row.getField(17))
+ .setList(18, (List) row.getField(18), Float.class)
+ .setList(19, (List) row.getField(19), Integer.class)
+ .setSet(20, (Set) row.getField(20), Double.class)
+ .setSet(21, (Set) row.getField(21), Long.class)
+ .setMap(22, (Map) row.getField(22), String.class, Integer.class);
+ batchStatement = batchStatement.add(boundStatement);
+ }
+ session.execute(batchStatement);
+ batchStatement.clear();
+ } catch (Exception e) {
+ throw new RuntimeException("Batch insert data failed!", e);
+ }
+ }
+
+ private void compareResult() throws IOException {
+ String sourceCql = "select * from " + SOURCE_TABLE;
+ String sinkCql = "select * from " + SINK_TABLE;
+ List columnList = Arrays.stream(generateTestDataSet()._1().getFieldNames()).collect(Collectors.toList());
+ ResultSet sourceResultSet = session.execute(SimpleStatement.builder(sourceCql).setKeyspace(KEYSPACE).build());
+ ResultSet sinkResultSet = session.execute(SimpleStatement.builder(sinkCql).setKeyspace(KEYSPACE).build());
+ Assertions.assertEquals(sourceResultSet.getColumnDefinitions().size(), sinkResultSet.getColumnDefinitions().size());
+ Iterator sourceIterator = sourceResultSet.iterator();
+ Iterator sinkIterator = sinkResultSet.iterator();
+ while (sourceIterator.hasNext()) {
+ if (sinkIterator.hasNext()) {
+ Row sourceNext = sourceIterator.next();
+ Row sinkNext = sinkIterator.next();
+ for (String column : columnList) {
+ Object source = sourceNext.getObject(column);
+ Object sink = sinkNext.getObject(column);
+ if (!Objects.deepEquals(source, sink)) {
+ InputStream sourceAsciiStream = sourceNext.get(column, ByteArrayInputStream.class);
+ InputStream sinkAsciiStream = sinkNext.get(column, ByteArrayInputStream.class);
+ Assertions.assertNotNull(sourceAsciiStream);
+ Assertions.assertNotNull(sinkAsciiStream);
+ String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
+ String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
+ Assertions.assertEquals(sourceValue, sinkValue);
+ }
+ Assertions.assertTrue(true);
+ }
+ }
+ }
+
+ }
+
+ private void createKeyspace() {
+ try {
+ this.session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE +
+ " WITH replication = \n" +
+ "{'class':'SimpleStrategy','replication_factor':'1'};");
+ } catch (Exception e) {
+ throw new RuntimeException("Create keyspace failed!", e);
+ }
+ }
+
+ private void clearSinkTable() {
+ try {
+ session.execute(SimpleStatement.builder(String.format("truncate table %s", SINK_TABLE)).setKeyspace(KEYSPACE).build());
+ } catch (Exception e) {
+ throw new RuntimeException("Test clickhouse server image failed!", e);
+ }
+ }
+
+ private static Tuple2> generateTestDataSet() {
+ SeaTunnelRowType rowType = new SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_ascii",
+ "c_bigint",
+ "c_blob",
+ "c_boolean",
+ "c_decimal",
+ "c_double",
+ "c_float",
+ "c_int",
+ "c_timestamp",
+ "c_uuid",
+ "c_text",
+ "c_varint",
+ "c_timeuuid",
+ "c_inet",
+ "c_date",
+ "c_smallint",
+ "c_tinyint",
+ "c_list_float",
+ "c_list_int",
+ "c_set_double",
+ "c_set_bigint",
+ "c_map"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.LONG_TYPE,
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ new DecimalType(9, 4),
+ BasicType.DOUBLE_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.INT_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.BYTE_TYPE,
+ ArrayType.FLOAT_ARRAY_TYPE,
+ ArrayType.INT_ARRAY_TYPE,
+ ArrayType.DOUBLE_ARRAY_TYPE,
+ ArrayType.LONG_ARRAY_TYPE,
+ new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE)
+ });
+ List rows = new ArrayList<>();
+ for (int i = 0; i < 50; ++i) {
+ SeaTunnelRow row;
+ try {
+ row = new SeaTunnelRow(
+ new Object[]{
+ (long) i,
+ String.valueOf(i),
+ (long) i,
+ ByteBuffer.wrap(new byte[]{Byte.parseByte("1")}),
+ Boolean.FALSE,
+ BigDecimal.valueOf(11L, 2),
+ Double.parseDouble("1.1"),
+ Float.parseFloat("2.1"),
+ i,
+ Instant.now(),
+ UUID.randomUUID(),
+ "text",
+ new BigInteger("12345678909876543210"),
+ Uuids.timeBased(),
+ InetAddress.getByName("1.2.3.4"),
+ LocalDate.now(),
+ Short.parseShort("1"),
+ Byte.parseByte("1"),
+ Collections.singletonList((float) i),
+ Collections.singletonList(i),
+ Collections.singleton(Double.valueOf("1.1")),
+ Collections.singleton((long) i),
+ Collections.singletonMap("key_" + i, i)
+ });
+ } catch (UnknownHostException e) {
+ throw new RuntimeException("Generate Test DataSet Failed!", e);
+ }
+ rows.add(row);
+ }
+ return Tuple2.apply(rowType, rows);
+ }
+
+ private Row getRow() {
+ try {
+ String sql = String.format("select * from %s limit 1", SINK_TABLE);
+ ResultSet resultSet = session.execute(SimpleStatement.builder(sql).setKeyspace(KEYSPACE).build());
+ return resultSet.one();
+ } catch (Exception e) {
+ throw new RuntimeException("test cassandra server image failed!", e);
+ }
+ }
+
+ private void initCassandraConfig() {
+ File file = ContainerUtil.getResourcesFile(INIT_CASSANDRA_PATH);
+ Config config = ConfigFactory.parseFile(file);
+ assert config.hasPath(SOURCE_TABLE) && config.hasPath(SINK_TABLE) && config.hasPath(INSERT_CQL);
+ this.config = config;
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (this.session != null) {
+ this.session.close();
+ }
+ if (this.container != null) {
+ this.container.close();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/application.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/application.conf
new file mode 100644
index 00000000000..88be3f73c57
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/application.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+datastax-java-driver {
+ advanced.protocol.version = V5
+ profiles {
+ slow {
+ basic.request.timeout = 10 seconds
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf
new file mode 100644
index 00000000000..4f42eb8625f
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ # You can set spark configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Cassandra {
+ host = "cassandra:9042"
+ username = ""
+ password = ""
+ datacenter = "datacenter1"
+ keyspace = "test"
+ cql = "select * from source_table"
+ result_table_name = "source_table"
+ }
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Cassandra {
+ host = "cassandra:9042"
+ username = ""
+ password = ""
+ datacenter = "datacenter1"
+ keyspace = "test"
+ async_write = "true"
+ table = "sink_table"
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/init/cassandra_init.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/init/cassandra_init.conf
new file mode 100644
index 00000000000..62b49524403
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/init/cassandra_init.conf
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source_table = """
+create table if not exists source_table(
+ id bigint,
+ c_ascii ascii,
+ c_bigint bigint,
+ c_blob blob,
+ c_boolean boolean,
+ c_decimal decimal,
+ c_double double,
+ c_float float,
+ c_int int,
+ c_timestamp timestamp,
+ c_uuid uuid,
+ c_text text,
+ c_varint varint,
+ c_timeuuid timeuuid,
+ c_inet inet,
+ c_date date,
+ c_smallint smallint,
+ c_tinyint tinyint,
+ c_list_float list,
+ c_list_int list,
+ c_set_double set,
+ c_set_bigint set,
+ c_map map,
+ PRIMARY KEY (id)
+);
+"""
+
+sink_table = """
+create table if not exists sink_table(
+ id bigint,
+ c_ascii ascii,
+ c_bigint bigint,
+ c_blob blob,
+ c_boolean boolean,
+ c_decimal decimal,
+ c_double double,
+ c_float float,
+ c_int int,
+ c_timestamp timestamp,
+ c_uuid uuid,
+ c_text text,
+ c_varint varint,
+ c_timeuuid timeuuid,
+ c_inet inet,
+ c_date date,
+ c_smallint smallint,
+ c_tinyint tinyint,
+ c_list_float list,
+ c_list_int list,
+ c_set_double set,
+ c_set_bigint set,
+ c_map map,
+ PRIMARY KEY (id)
+);
+"""
+
+insert_cql = """
+insert into source_table
+(
+ id,
+ c_ascii,
+ c_bigint,
+ c_blob,
+ c_boolean,
+ c_decimal,
+ c_double,
+ c_float,
+ c_int,
+ c_timestamp,
+ c_uuid,
+ c_text,
+ c_varint,
+ c_timeuuid,
+ c_inet,
+ c_date,
+ c_smallint,
+ c_tinyint,
+ c_list_float,
+ c_list_int,
+ c_set_double,
+ c_set_bigint,
+ c_map
+)
+values
+(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
+"""
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index d5cdf69aa49..e5d8ca07603 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -32,6 +32,7 @@
connector-influxdb-e2e
connector-amazondynamodb-e2e
connector-file-local-e2e
+ connector-cassandra-e2e
seatunnel-connector-v2-e2e