Skip to content

Commit

Permalink
Add rocketmq source and sink
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Apr 13, 2023
1 parent 884f733 commit 3a6fc02
Show file tree
Hide file tree
Showing 48 changed files with 4,471 additions and 1 deletion.
16 changes: 16 additions & 0 deletions docs/en/connector-v2/Error-Quick-Reference-Manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,19 @@ problems encountered by users.
|---------------------------|------------------------|-------------------------|
| FILTER_FIELD_TRANSFORM-01 | filter field not found | filter field not found. |

## RocketMq Connector Error Codes

| code | description | solution |
|-------------|-------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
| ROCKETMQ-01 | Add a split back to the split enumerator failed, it will only happen when a SourceReader failed | When users encounter this error code, it means that add a split back to the split enumerator failed, please check it. |
| ROCKETMQ-02 | Add the split checkpoint state to reader failed | When users encounter this error code, it means that add the split checkpoint state to reader failed, please check it. |
| ROCKETMQ-03 | Rocketmq failed to consume data | When users encounter this error code, it means that rocketmq failed to consume data, please check it., please check it. |
| ROCKETMQ-04 | Error occurred when the rocketmq consumer thread was running | When the user encounters this error code, it means that an error occurred while running the Rocketmq consumer thread |
| ROCKETMQ-05 | Rocketmq producer failed to send message | When users encounter this error code, it means that Rocketmq producer failed to send message, please check it. |
| ROCKETMQ-06 | Rocketmq producer failed to start | When users encounter this error code, it means that Rocketmq producer failed to start, please check it. |
| ROCKETMQ-07 | Rocketmq consumer failed to start | When users encounter this error code, it means that Rocketmq consumer failed to start, please check it. |
| ROCKETMQ-08 | Unsupported start mode | When users encounter this error code, it means that the configured start mode is not supported, please check it. |
| ROCKETMQ-09 | Failed to get the offsets of the current consumer group | When users encounter this error code, it means that failed to get the offsets of the current consumer group, please check it. |
| ROCKETMQ-10 | Failed to search offset through timestamp | When users encounter this error code, it means that failed to search offset through timestamp, please check it. |
| ROCKETMQ-11 | Failed to get topic min and max topic | When users encounter this error code, it means that failed to get topic min and max topic, please check it. |

82 changes: 82 additions & 0 deletions docs/en/connector-v2/sink/RocketMQ.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# RocketMQ

> RocketMQ sink connector
>
## Description

Write Rows to a Apache RocketMQ topic.

## Key features

- [x] [exactly-once](../../concept/connector-v2-features.md)

By default, we will use 2pc to guarantee the message is sent to RocketMQ exactly once.

## Options

| name | type | required | default value |
|----------------------|---------|----------|--------------------------|
| topic | string | yes | - |
| name.srv.addr | string | yes | - |
| acl.enabled | Boolean | no | false |
| access.key | String | no | |
| secret.key | String | no | |
| producer.group | String | no | SeaTunnel-producer-Group |
| semantic | string | no | NON |
| partition.key.fields | array | no | - |
| format | String | no | json |
| field.delimiter | String | no | , |
| common-options | config | no | - |

### topic [string]

`RocketMQ topic` name.

### name.srv.addr [string]

`RocketMQ` name server cluster address.

### semantic [string]

Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.

### partition.key.fields [array]

Configure which fields are used as the key of the RocketMQ message.

For example, if you want to use value of fields from upstream data as key, you can assign field names to this property.

Upstream data is the following:

| name | age | data |
|------|-----|---------------|
| Jack | 16 | data-example1 |
| Mary | 23 | data-example2 |

If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.

### format

Data format. The default format is json. Optional text format. The default field separator is ",".
If you customize the delimiter, add the "field_delimiter" option.

### field_delimiter

Customize the field delimiter for data format.

### common options [config]

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.

## Examples

```hocon
sink {
Rocketmq {
name.srv.addr = "localhost:9876"
topic = "test-topic-003"
partition.key.fields = ["name"]
}
}
```

142 changes: 142 additions & 0 deletions docs/en/connector-v2/source/RocketMQ.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# RocketMQ

> RocketMQ source connector
## Description

Source connector for Apache RocketMQ.

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-------------------------------------|---------|----------|----------------------------|
| topics | String | yes | - |
| name.srv.addr | String | yes | - |
| acl.enabled | Boolean | no | false |
| access.key | String | no | |
| secret.key | String | no | |
| batch.size | int | no | 100 |
| consumer.group | String | no | SeaTunnel-Consumer-Group |
| commit.on.checkpoint | Boolean | no | true |
| schema | | no | - |
| format | String | no | json |
| field.delimiter | String | no | , |
| start.mode | String | no | CONSUME_FROM_GROUP_OFFSETS |
| start.mode.offsets | | no | |
| start.mode.timestamp | Long | no | |
| partition.discovery.interval.millis | long | no | -1 |
| common-options | config | no | - |

### topics [string]

`RocketMQ topic` name. If there are multiple `topics`, use `,` to split, for example: `"tpc1,tpc2"`.

### name.srv.addr [string]

`RocketMQ` name server cluster address.

### consumer.group [string]

`RocketMQ consumer group id`, used to distinguish different consumer groups.

### acl.enabled [boolean]

If true, access control is enabled, and access key and secret key need to be configured.

### access.key [string]

When ACL_ENABLED is true, access key cannot be empty.

### secret.key [string]

When ACL_ENABLED is true, secret key cannot be empty.

### batch.size [int]

`RocketMQ` consumer pull batch size

### commit.on.checkpoint [boolean]

If true the consumer's offset will be periodically committed in the background.

## partition.discovery.interval.millis [long]

The interval for dynamically discovering topics and partitions.

### schema

The structure of the data, including field names and field types.

## format

Data format. The default format is json. Optional text format. The default field separator is ", ".
If you customize the delimiter, add the "field.delimiter" option.

## field.delimiter

Customize the field delimiter for data format.

## start.mode

The initial consumption pattern of consumers,there are several types:
[CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP]
,[CONSUME_FROM_SPECIFIC_OFFSETS]

## start.mode.timestamp

The time required for consumption mode to be "CONSUME_FROM_TIMESTAMP".

## start.mode.offsets

The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".

for example:

```hocon
start.mode.offsets = {
topic1-0 = 70
topic1-1 = 10
topic1-2 = 10
}
```

### common-options [config]

Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.

## Example

### Simple

```hocon
source {
Rocketmq {
name.srv.addr = "localhost:9876"
topics = "test-topic-002"
consumer.group = "consumer-group"
parallelism = 2
batch.size = 20
schema = {
fields {
age = int
name = string
}
}
start.mode = "CONSUME_FROM_SPECIFIC_OFFSETS"
start.mode.offsets = {
test-topic-002-0 = 20
}
}
}
```

3 changes: 2 additions & 1 deletion plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,5 @@ seatunnel.source.Persistiq = connector-http-persistiq
seatunnel.sink.SelectDBCloud = connector-selectdb-cloud
seatunnel.sink.Hbase = connector-hbase
seatunnel.source.StarRocks = connector-starrocks

seatunnel.source.Rocketmq = connector-rocketmq
seatunnel.sink.Rocketmq = connector-rocketmq
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- [Hbase] Add hbase sink connector #4049
- [Github] Add Github source connector #4155
- [CDC] Support export debezium-json format to kafka #4339
- [RocketMQ] Add RocketMQ source and sink connector #4007
### Formats
- [Canal]Support read canal format message #3950

Expand Down
61 changes: 61 additions & 0 deletions seatunnel-connectors-v2/connector-rocketmq/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connectors-v2</artifactId>
<version>${revision}</version>
</parent>
<artifactId>connector-rocketmq</artifactId>

<properties>
<rocketmq.version>4.9.4</rocketmq.version>
</properties>
<dependencies>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-json</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-text</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 3a6fc02

Please sign in to comment.