diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 29bb1adbfb9..7c137264be7 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -252,3 +252,19 @@ problems encountered by users.
|---------------------------|------------------------|-------------------------|
| FILTER_FIELD_TRANSFORM-01 | filter field not found | filter field not found. |
+## RocketMq Connector Error Codes
+
+| code | description | solution |
+|-------------|-------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
+| ROCKETMQ-01 | Add a split back to the split enumerator failed, it will only happen when a SourceReader failed | When users encounter this error code, it means that add a split back to the split enumerator failed, please check it. |
+| ROCKETMQ-02 | Add the split checkpoint state to reader failed | When users encounter this error code, it means that add the split checkpoint state to reader failed, please check it. |
+| ROCKETMQ-03 | Rocketmq failed to consume data | When users encounter this error code, it means that rocketmq failed to consume data, please check it., please check it. |
+| ROCKETMQ-04 | Error occurred when the rocketmq consumer thread was running | When the user encounters this error code, it means that an error occurred while running the Rocketmq consumer thread |
+| ROCKETMQ-05 | Rocketmq producer failed to send message | When users encounter this error code, it means that Rocketmq producer failed to send message, please check it. |
+| ROCKETMQ-06 | Rocketmq producer failed to start | When users encounter this error code, it means that Rocketmq producer failed to start, please check it. |
+| ROCKETMQ-07 | Rocketmq consumer failed to start | When users encounter this error code, it means that Rocketmq consumer failed to start, please check it. |
+| ROCKETMQ-08 | Unsupported start mode | When users encounter this error code, it means that the configured start mode is not supported, please check it. |
+| ROCKETMQ-09 | Failed to get the offsets of the current consumer group | When users encounter this error code, it means that failed to get the offsets of the current consumer group, please check it. |
+| ROCKETMQ-10 | Failed to search offset through timestamp | When users encounter this error code, it means that failed to search offset through timestamp, please check it. |
+| ROCKETMQ-11 | Failed to get topic min and max topic | When users encounter this error code, it means that failed to get topic min and max topic, please check it. |
+
diff --git a/docs/en/connector-v2/sink/RocketMQ.md b/docs/en/connector-v2/sink/RocketMQ.md
new file mode 100644
index 00000000000..70319202145
--- /dev/null
+++ b/docs/en/connector-v2/sink/RocketMQ.md
@@ -0,0 +1,82 @@
+# RocketMQ
+
+> RocketMQ sink connector
+>
+ ## Description
+
+Write Rows to a Apache RocketMQ topic.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we will use 2pc to guarantee the message is sent to RocketMQ exactly once.
+
+## Options
+
+| name | type | required | default value |
+|----------------------|---------|----------|--------------------------|
+| topic | string | yes | - |
+| name.srv.addr | string | yes | - |
+| acl.enabled | Boolean | no | false |
+| access.key | String | no | |
+| secret.key | String | no | |
+| producer.group | String | no | SeaTunnel-producer-Group |
+| semantic | string | no | NON |
+| partition.key.fields | array | no | - |
+| format | String | no | json |
+| field.delimiter | String | no | , |
+| common-options | config | no | - |
+
+### topic [string]
+
+`RocketMQ topic` name.
+
+### name.srv.addr [string]
+
+`RocketMQ` name server cluster address.
+
+### semantic [string]
+
+Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
+
+### partition.key.fields [array]
+
+Configure which fields are used as the key of the RocketMQ message.
+
+For example, if you want to use value of fields from upstream data as key, you can assign field names to this property.
+
+Upstream data is the following:
+
+| name | age | data |
+|------|-----|---------------|
+| Jack | 16 | data-example1 |
+| Mary | 23 | data-example2 |
+
+If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.
+
+### format
+
+Data format. The default format is json. Optional text format. The default field separator is ",".
+If you customize the delimiter, add the "field_delimiter" option.
+
+### field_delimiter
+
+Customize the field delimiter for data format.
+
+### common options [config]
+
+Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
+
+## Examples
+
+```hocon
+sink {
+ Rocketmq {
+ name.srv.addr = "localhost:9876"
+ topic = "test-topic-003"
+ partition.key.fields = ["name"]
+ }
+}
+```
+
diff --git a/docs/en/connector-v2/source/RocketMQ.md b/docs/en/connector-v2/source/RocketMQ.md
new file mode 100644
index 00000000000..fd209ce70b2
--- /dev/null
+++ b/docs/en/connector-v2/source/RocketMQ.md
@@ -0,0 +1,142 @@
+# RocketMQ
+
+> RocketMQ source connector
+
+## Description
+
+Source connector for Apache RocketMQ.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|-------------------------------------|---------|----------|----------------------------|
+| topics | String | yes | - |
+| name.srv.addr | String | yes | - |
+| acl.enabled | Boolean | no | false |
+| access.key | String | no | |
+| secret.key | String | no | |
+| batch.size | int | no | 100 |
+| consumer.group | String | no | SeaTunnel-Consumer-Group |
+| commit.on.checkpoint | Boolean | no | true |
+| schema | | no | - |
+| format | String | no | json |
+| field.delimiter | String | no | , |
+| start.mode | String | no | CONSUME_FROM_GROUP_OFFSETS |
+| start.mode.offsets | | no | |
+| start.mode.timestamp | Long | no | |
+| partition.discovery.interval.millis | long | no | -1 |
+| common-options | config | no | - |
+
+### topics [string]
+
+`RocketMQ topic` name. If there are multiple `topics`, use `,` to split, for example: `"tpc1,tpc2"`.
+
+### name.srv.addr [string]
+
+`RocketMQ` name server cluster address.
+
+### consumer.group [string]
+
+`RocketMQ consumer group id`, used to distinguish different consumer groups.
+
+### acl.enabled [boolean]
+
+If true, access control is enabled, and access key and secret key need to be configured.
+
+### access.key [string]
+
+When ACL_ENABLED is true, access key cannot be empty.
+
+### secret.key [string]
+
+When ACL_ENABLED is true, secret key cannot be empty.
+
+### batch.size [int]
+
+`RocketMQ` consumer pull batch size
+
+### commit.on.checkpoint [boolean]
+
+If true the consumer's offset will be periodically committed in the background.
+
+## partition.discovery.interval.millis [long]
+
+The interval for dynamically discovering topics and partitions.
+
+### schema
+
+The structure of the data, including field names and field types.
+
+## format
+
+Data format. The default format is json. Optional text format. The default field separator is ", ".
+If you customize the delimiter, add the "field.delimiter" option.
+
+## field.delimiter
+
+Customize the field delimiter for data format.
+
+## start.mode
+
+The initial consumption pattern of consumers,there are several types:
+[CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP]
+,[CONSUME_FROM_SPECIFIC_OFFSETS]
+
+## start.mode.timestamp
+
+The time required for consumption mode to be "CONSUME_FROM_TIMESTAMP".
+
+## start.mode.offsets
+
+The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".
+
+for example:
+
+```hocon
+start.mode.offsets = {
+ topic1-0 = 70
+ topic1-1 = 10
+ topic1-2 = 10
+ }
+```
+
+### common-options [config]
+
+Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
+
+## Example
+
+### Simple
+
+```hocon
+source {
+ Rocketmq {
+ name.srv.addr = "localhost:9876"
+ topics = "test-topic-002"
+ consumer.group = "consumer-group"
+ parallelism = 2
+ batch.size = 20
+ schema = {
+ fields {
+ age = int
+ name = string
+ }
+ }
+ start.mode = "CONSUME_FROM_SPECIFIC_OFFSETS"
+ start.mode.offsets = {
+ test-topic-002-0 = 20
+ }
+
+ }
+}
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index d72508717b9..87debbe7ec2 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -105,4 +105,5 @@ seatunnel.source.Persistiq = connector-http-persistiq
seatunnel.sink.SelectDBCloud = connector-selectdb-cloud
seatunnel.sink.Hbase = connector-hbase
seatunnel.source.StarRocks = connector-starrocks
-
+seatunnel.source.Rocketmq = connector-rocketmq
+seatunnel.sink.Rocketmq = connector-rocketmq
diff --git a/release-note.md b/release-note.md
index 65ae1abab0f..5554c24d819 100644
--- a/release-note.md
+++ b/release-note.md
@@ -19,6 +19,7 @@
- [Hbase] Add hbase sink connector #4049
- [Github] Add Github source connector #4155
- [CDC] Support export debezium-json format to kafka #4339
+- [RocketMQ] Add RocketMQ source and sink connector #4007
### Formats
- [Canal]Support read canal format message #3950
diff --git a/seatunnel-connectors-v2/connector-rocketmq/pom.xml b/seatunnel-connectors-v2/connector-rocketmq/pom.xml
new file mode 100644
index 00000000000..5076eaaad70
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/pom.xml
@@ -0,0 +1,61 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-connectors-v2
+ ${revision}
+
+ connector-rocketmq
+
+
+ 4.9.4
+
+
+
+
+ org.apache.seatunnel
+ connector-common
+ ${project.version}
+
+
+ org.apache.seatunnel
+ seatunnel-format-json
+ ${project.version}
+
+
+ org.apache.seatunnel
+ seatunnel-format-text
+ ${project.version}
+
+
+ org.apache.rocketmq
+ rocketmq-client
+ ${rocketmq.version}
+
+
+ org.apache.rocketmq
+ rocketmq-tools
+ ${rocketmq.version}
+
+
+
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java
new file mode 100644
index 00000000000..ee831257a97
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.common;
+
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** Tools for creating RocketMq topic and group. */
+public class RocketMqAdminUtil {
+
+ public static String createUniqInstance(String prefix) {
+ return prefix.concat("-").concat(UUID.randomUUID().toString());
+ }
+
+ public static RPCHook getAclRpcHook(String accessKey, String secretKey) {
+ return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+ }
+
+ /** Init default lite pull consumer */
+ public static DefaultLitePullConsumer initDefaultLitePullConsumer(
+ RocketMqBaseConfiguration config, boolean autoCommit) {
+ DefaultLitePullConsumer consumer = null;
+ if (Objects.isNull(consumer)) {
+ if (StringUtils.isBlank(config.getAccessKey())
+ && StringUtils.isBlank(config.getSecretKey())) {
+ consumer = new DefaultLitePullConsumer(config.getGroupId());
+ } else {
+ consumer =
+ new DefaultLitePullConsumer(
+ config.getGroupId(),
+ getAclRpcHook(config.getAccessKey(), config.getSecretKey()));
+ }
+ }
+ consumer.setNamesrvAddr(config.getNamesrvAddr());
+ String uniqueName = createUniqInstance(config.getNamesrvAddr());
+ consumer.setInstanceName(uniqueName);
+ consumer.setUnitName(uniqueName);
+ consumer.setAutoCommit(autoCommit);
+ if (config.getBatchSize() != null) {
+ consumer.setPullBatchSize(config.getBatchSize());
+ }
+ return consumer;
+ }
+
+ /** Init transaction producer */
+ public static TransactionMQProducer initTransactionMqProducer(
+ RocketMqBaseConfiguration config, TransactionListener listener) {
+ RPCHook rpcHook = null;
+ if (config.isAclEnable()) {
+ rpcHook =
+ new AclClientRPCHook(
+ new SessionCredentials(config.getAccessKey(), config.getSecretKey()));
+ }
+ TransactionMQProducer producer = new TransactionMQProducer(config.getGroupId(), rpcHook);
+ producer.setNamesrvAddr(config.getNamesrvAddr());
+ producer.setInstanceName(createUniqInstance(config.getNamesrvAddr()));
+ producer.setLanguage(LanguageCode.JAVA);
+ producer.setTransactionListener(listener);
+ if (config.getMaxMessageSize() != null) {
+ producer.setMaxMessageSize(config.getMaxMessageSize());
+ }
+ if (config.getSendMsgTimeout() != null) {
+ producer.setSendMsgTimeout(config.getSendMsgTimeout());
+ }
+
+ return producer;
+ }
+
+ public static DefaultMQProducer initDefaultMqProducer(RocketMqBaseConfiguration config) {
+ RPCHook rpcHook = null;
+ if (config.isAclEnable()) {
+ rpcHook =
+ new AclClientRPCHook(
+ new SessionCredentials(config.getAccessKey(), config.getSecretKey()));
+ }
+ DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
+ producer.setNamesrvAddr(config.getNamesrvAddr());
+ producer.setInstanceName(createUniqInstance(config.getNamesrvAddr()));
+ producer.setProducerGroup(config.getGroupId());
+ producer.setLanguage(LanguageCode.JAVA);
+ if (config.getMaxMessageSize() != null && config.getMaxMessageSize() > 0) {
+ producer.setMaxMessageSize(config.getMaxMessageSize());
+ }
+ if (config.getSendMsgTimeout() != null && config.getMaxMessageSize() > 0) {
+ producer.setSendMsgTimeout(config.getSendMsgTimeout());
+ }
+ return producer;
+ }
+
+ private static DefaultMQAdminExt startMQAdminTool(RocketMqBaseConfiguration config)
+ throws MQClientException {
+ DefaultMQAdminExt admin;
+ if (config.isAclEnable()) {
+ admin =
+ new DefaultMQAdminExt(
+ new AclClientRPCHook(
+ new SessionCredentials(
+ config.getAccessKey(), config.getSecretKey())));
+ } else {
+ admin = new DefaultMQAdminExt();
+ }
+ admin.setNamesrvAddr(config.getNamesrvAddr());
+ admin.setAdminExtGroup(config.getGroupId());
+ admin.setInstanceName(createUniqInstance(config.getNamesrvAddr()));
+ admin.start();
+ return admin;
+ }
+
+ /** Create rocketMq topic */
+ public static void createTopic(RocketMqBaseConfiguration config, TopicConfig topicConfig) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(config);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Set clusterNameSet = clusterAddrTable.keySet();
+ for (String clusterName : clusterNameSet) {
+ Set masterSet =
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+ }
+ }
+ } catch (Exception e) {
+ throw new RocketMqConnectorException(RocketMqConnectorErrorCode.CREATE_TOPIC_ERROR, e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ }
+
+ /** check topic exist */
+ public static boolean topicExist(RocketMqBaseConfiguration config, String topic) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ boolean foundTopicRouteInfo = false;
+ try {
+ defaultMQAdminExt = startMQAdminTool(config);
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (topicRouteData != null) {
+ foundTopicRouteInfo = true;
+ }
+ } catch (Exception e) {
+ if (e instanceof MQClientException) {
+ if (((MQClientException) e).getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
+ foundTopicRouteInfo = false;
+ } else {
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.TOPIC_NOT_EXIST_ERROR, e);
+ }
+ } else {
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.TOPIC_NOT_EXIST_ERROR, e);
+ }
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ return foundTopicRouteInfo;
+ }
+
+ /** Get topic offsets */
+ public static List