diff --git a/docs/en/connector-v2/sink/TDengine.md b/docs/en/connector-v2/sink/TDengine.md
new file mode 100644
index 00000000000..597a7a99b50
--- /dev/null
+++ b/docs/en/connector-v2/sink/TDengine.md
@@ -0,0 +1,69 @@
+# TDengine
+
+> TDengine sink connector
+
+## Description
+
+Used to write data to TDengine. You need to create stable before running seatunnel task
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|----------------------------|---------|----------|---------------|
+| url | string | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| database | string | yes | |
+| stable | string | yes | - |
+| timezone | string | no | UTC |
+
+### url [string]
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### timezone [string]
+
+the timeznoe of the TDengine sever, it's important to the ts field
+
+## Example
+
+### sink
+
+```hocon
+sink {
+ TDengine {
+ url : "jdbc:TAOS-RS://localhost:6041/"
+ username : "root"
+ password : "taosdata"
+ database : "power2"
+ stable : "meters2"
+ timezone: UTC
+ }
+}
+```
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/TDengine.md b/docs/en/connector-v2/source/TDengine.md
new file mode 100644
index 00000000000..fea0a080d99
--- /dev/null
+++ b/docs/en/connector-v2/source/TDengine.md
@@ -0,0 +1,82 @@
+# TDengine
+
+> TDengine source connector
+
+## Description
+
+Read external data source data through TDengine.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+
+supports query SQL and can achieve projection effect.
+
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|----------------------------|---------|----------|---------------|
+| url | string | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| database | string | yes | |
+| stable | string | yes | - |
+| lower_bound | long | yes | - |
+| upper_bound | long | yes | - |
+
+### url [string]
+
+the url of the TDengine when you select the TDengine
+
+e.g.
+```
+jdbc:TAOS-RS://localhost:6041/
+```
+
+### username [string]
+
+the username of the TDengine when you select
+
+### password [string]
+
+the password of the TDengine when you select
+
+### database [string]
+
+the database of the TDengine when you select
+
+### stable [string]
+
+the stable of the TDengine when you select
+
+### lower_bound [long]
+
+the lower_bound of the migration period
+
+### upper_bound [long]
+
+the upper_bound of the migration period
+
+## Example
+
+### source
+```hocon
+source {
+ TDengine {
+ url : "jdbc:TAOS-RS://localhost:6041/"
+ username : "root"
+ password : "taosdata"
+ database : "power"
+ stable : "meters"
+ lower_bound : "2018-10-03 14:38:05.000"
+ upper_bound : "2018-10-03 14:38:16.800"
+ result_table_name = "tdengine_result"
+ }
+}
+```
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-tdengine/pom.xml b/seatunnel-connectors-v2/connector-tdengine/pom.xml
new file mode 100644
index 00000000000..31db6a553ec
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/pom.xml
@@ -0,0 +1,45 @@
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-connectors-v2
+ ${revision}
+
+ 4.0.0
+
+ connector-tdengine
+
+
+
+ org.apache.seatunnel
+ connector-common
+ ${project.version}
+
+
+ com.taosdata.jdbc
+ taos-jdbcdriver
+ 3.0.3
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
new file mode 100644
index 00000000000..4bd7e1c6087
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/config/TDengineSourceConfig.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.config;
+
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.LOWER_BOUND;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.TIMEZONE;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.UPPER_BOUND;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL;
+import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class TDengineSourceConfig implements Serializable {
+
+ /**
+ * jdbc:TAOS-RS://localhost:6041/
+ */
+ private String url;
+ private String username;
+ private String password;
+ private String database;
+ private String stable;
+ //param of timezone in 'jdbc:TAOS-RS' just effect on taosadapter side, other than the JDBC client side
+ //so this param represent the server-side timezone setting up
+ private String timezone;
+ private String lowerBound;
+ private String upperBound;
+ private List fields;
+ private List tags;
+
+ public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) {
+ TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig();
+ tdengineSourceConfig.setUrl(pluginConfig.hasPath(URL) ? pluginConfig.getString(URL) : null);
+ tdengineSourceConfig.setDatabase(pluginConfig.hasPath(DATABASE) ? pluginConfig.getString(DATABASE) : null);
+ tdengineSourceConfig.setStable(pluginConfig.hasPath(STABLE) ? pluginConfig.getString(STABLE) : null);
+ tdengineSourceConfig.setUsername(pluginConfig.hasPath(USERNAME) ? pluginConfig.getString(USERNAME) : null);
+ tdengineSourceConfig.setPassword(pluginConfig.hasPath(PASSWORD) ? pluginConfig.getString(PASSWORD) : null);
+ tdengineSourceConfig.setUpperBound(pluginConfig.hasPath(UPPER_BOUND) ? pluginConfig.getString(UPPER_BOUND) : null);
+ tdengineSourceConfig.setLowerBound(pluginConfig.hasPath(LOWER_BOUND) ? pluginConfig.getString(LOWER_BOUND) : null);
+ tdengineSourceConfig.setTimezone(pluginConfig.hasPath(TIMEZONE) ? pluginConfig.getString(TIMEZONE) : "UTC");
+ return tdengineSourceConfig;
+ }
+
+ public static class ConfigNames {
+
+ public static String URL = "url";
+ public static String USERNAME = "username";
+ public static String PASSWORD = "password";
+ public static String DATABASE = "database";
+ public static String STABLE = "stable";
+ public static String TIMEZONE = "timezone";
+ public static String LOWER_BOUND = "lower_bound";
+ public static String UPPER_BOUND = "upper_bound";
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorException.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorException.java
new file mode 100644
index 00000000000..77e1edac8fa
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class TDengineConnectorException extends SeaTunnelRuntimeException {
+ public TDengineConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public TDengineConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public TDengineConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
new file mode 100644
index 00000000000..a06cd8999d3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSink.class)
+public class TDengineSink extends AbstractSimpleSink {
+ private SeaTunnelRowType seaTunnelRowType;
+ private Config pluginConfig;
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType getConsumedType() {
+ return this.seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException {
+ return new TDengineSinkWriter(pluginConfig, seaTunnelRowType);
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) {
+ this.pluginConfig = pluginConfig;
+ }
+
+ @Override
+ public String getPluginName() {
+ return "TDengine";
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
new file mode 100644
index 00000000000..e6c0b5785d0
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tdengine.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+
+@Slf4j
+public class TDengineSinkWriter extends AbstractSinkWriter {
+
+ private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+ private final Connection conn;
+ private final TDengineSourceConfig config;
+ private int tagsNum;
+
+ @SneakyThrows
+ public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+ config = TDengineSourceConfig.buildSourceConfig(pluginConfig);
+ String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword());
+ conn = DriverManager.getConnection(jdbcUrl);
+ try (Statement statement = conn.createStatement()) {
+ final ResultSet metaResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable());
+ while (metaResultSet.next()) {
+ if (StringUtils.equals("TAG", metaResultSet.getString("note"))) {
+ tagsNum++;
+ }
+ }
+ }
+ }
+
+ @SneakyThrows
+ @Override
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ public void write(SeaTunnelRow element) {
+ final ArrayList