Skip to content

Commit 86e2d6f

Browse files
authored
[Feature][Kafka] Support native format read/write kafka record (apache#8724)
1 parent b5e5d00 commit 86e2d6f

File tree

13 files changed

+751
-78
lines changed

13 files changed

+751
-78
lines changed

docs/en/connector-v2/sink/Kafka.md

+46-15
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,21 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
3030

3131
## Sink Options
3232

33-
| Name | Type | Required | Default | Description |
34-
|-----------------------|--------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
35-
| topic | String | Yes | - | When the table is used as sink, the topic name is the topic to write data to. |
36-
| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. |
37-
| kafka.config | Map | No | - | In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs). |
38-
| semantics | String | No | NON | Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON. |
39-
| partition_key_fields | Array | No | - | Configure which fields are used as the key of the kafka message. |
40-
| partition | Int | No | - | We can specify the partition, all messages will be sent to this partition. |
41-
| assign_partitions | Array | No | - | We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information. |
42-
| transaction_prefix | String | No | - | If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction,kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. |
43-
| format | String | No | json | Data format. The default format is json. Optional text format, canal_json, debezium_json, ogg_json and avro.If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. |
44-
| field_delimiter | String | No | , | Customize the field delimiter for data format. |
45-
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../sink-common-options.md) for details |
46-
| protobuf_message_name | String | No | - | Effective when the format is set to protobuf, specifies the Message name |
47-
| protobuf_schema | String | No | - | Effective when the format is set to protobuf, specifies the Schema definition |
33+
| Name | Type | Required | Default | Description |
34+
|-----------------------|--------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
35+
| topic | String | Yes | - | When the table is used as sink, the topic name is the topic to write data to. |
36+
| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. |
37+
| kafka.config | Map | No | - | In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs). |
38+
| semantics | String | No | NON | Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON. |
39+
| partition_key_fields | Array | No | - | Configure which fields are used as the key of the kafka message. |
40+
| partition | Int | No | - | We can specify the partition, all messages will be sent to this partition. |
41+
| assign_partitions | Array | No | - | We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information. |
42+
| transaction_prefix | String | No | - | If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction,kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. |
43+
| format | String | No | json | Data format. The default format is json. Optional text format, canal_json, debezium_json, ogg_json , avro and native.If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. |
44+
| field_delimiter | String | No | , | Customize the field delimiter for data format. |
45+
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../sink-common-options.md) for details |
46+
| protobuf_message_name | String | No | - | Effective when the format is set to protobuf, specifies the Message name |
47+
| protobuf_schema | String | No | - | Effective when the format is set to protobuf, specifies the Schema definition |
4848

4949

5050
## Parameter Interpretation
@@ -269,3 +269,34 @@ sink {
269269
}
270270
}
271271
```
272+
273+
274+
### format
275+
If you need to write Kafka's native information, you can refer to the following configuration.
276+
277+
Config Example:
278+
```hocon
279+
sink {
280+
kafka {
281+
topic = "test_topic_native_sink"
282+
bootstrap.servers = "kafkaCluster:9092"
283+
format = "NATIVE"
284+
}
285+
}
286+
```
287+
288+
The input parameter requirements are as follows:
289+
```json
290+
{
291+
"headers": {
292+
"header1": "header1",
293+
"header2": "header2"
294+
},
295+
"key": "dGVzdF9ieXRlc19kYXRh",
296+
"partition": 3,
297+
"timestamp": 1672531200000,
298+
"timestampType": "CREATE_TIME",
299+
"value": "dGVzdF9ieXRlc19kYXRh"
300+
}
301+
```
302+
Note:key/value is of type byte[].

0 commit comments

Comments
 (0)