Skip to content

Commit

Permalink
[hotfix][connector-v2-hbase]fix and optimize hbase source problem (ap…
Browse files Browse the repository at this point in the history
…ache#7148)

* [hotfix][improve][doc]optimize connector hbase source

* [doc]add dependent document

* [doc]update dependent document

* [improve]improve static use

* [hotfix]add test case

* [hotfix]add test case

---------

Co-authored-by: Jia Fan <fanjiaeminem@qq.com>
  • Loading branch information
zhangshenghang and Hisoka-X authored Jul 31, 2024
1 parent 9df557c commit 34a6b8e
Show file tree
Hide file tree
Showing 11 changed files with 455 additions and 90 deletions.
109 changes: 57 additions & 52 deletions docs/en/connector-v2/source/Hbase.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Hbase

> Hbase source connector
> Hbase Source Connector
## Description

Read data from Apache Hbase.
Reads data from Apache Hbase.

## Key features
## Key Features

- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
Expand All @@ -17,75 +17,80 @@ Read data from Apache Hbase.

## Options

| name | type | required | default value |
|--------------------|--------|----------|---------------|
| zookeeper_quorum | string | yes | - |
| table | string | yes | - |
| query_columns | list | yes | - |
| schema | config | yes | - |
| hbase_extra_config | string | no | - |
| common-options | | no | - |
| Name | Type | Required | Default |
|--------------------|---------|----------|---------|
| zookeeper_quorum | string | Yes | - |
| table | string | Yes | - |
| schema | config | Yes | - |
| hbase_extra_config | string | No | - |
| caching | int | No | -1 |
| batch | int | No | -1 |
| cache_blocks | boolean | No | false |
| common-options | | No | - |

### zookeeper_quorum [string]

The zookeeper cluster host of hbase, example: "hadoop001:2181,hadoop002:2181,hadoop003:2181"
The zookeeper quorum for Hbase cluster hosts, e.g., "hadoop001:2181,hadoop002:2181,hadoop003:2181".

### table [string]

The table name you want to write, example: "seatunnel"

### query_columns [list]

The column name which you want to query in the table. If you want to query the rowkey column, please set "rowkey" in query_columns.
Other column format should be: columnFamily:columnName, example: ["rowkey", "columnFamily1:column1", "columnFamily1:column1", "columnFamily2:column1"]
The name of the table to write to, e.g., "seatunnel".

### schema [config]

Hbase uses byte arrays for storage. Therefore, you need to configure data types for each column in a table. For more information, see: [guide](../../concept/schema-feature.md#how-to-declare-type-supported).
Hbase stores data in byte arrays. Therefore, you need to configure the data types for each column in the table. For more information, see: [guide](../../concept/schema-feature.md#how-to-declare-type-supported).

### hbase_extra_config [config]

The extra configuration of hbase
Additional configurations for Hbase.

### caching

The caching parameter sets the number of rows fetched per server trip during scans. This reduces round-trips between client and server, improving scan efficiency. Default: -1.

### batch

The batch parameter sets the maximum number of columns returned per scan. This is useful for rows with many columns to avoid fetching excessive data at once, thus saving memory and improving performance.

### common options
### cache_blocks

Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details
The cache_blocks parameter determines whether to cache data blocks during scans. By default, HBase caches data blocks during scans. Setting this to false reduces memory usage during scans. Default in SeaTunnel: false.

## Examples
### common-options

Common parameters for Source plugins, refer to [Common Source Options](common-options.md).

## Example

```bash
source {
Hbase {
zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
table = "seatunnel_test"
query_columns=["rowkey", "columnFamily1:column1", "columnFamily1:column1", "columnFamily2:column1"]
schema = {
columns = [
{
name = rowkey
type = string
},
{
name = "columnFamily1:column1"
type = boolean
},
{
name = "columnFamily1:column1"
type = double
},
{
name = "columnFamily2:column1"
type = bigint
}
]
}
zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
table = "seatunnel_test"
caching = 1000
batch = 100
cache_blocks = false
schema = {
columns = [
{
name = "rowkey"
type = string
},
{
name = "columnFamily1:column1"
type = boolean
},
{
name = "columnFamily1:column2"
type = double
},
{
name = "columnFamily2:column1"
type = bigint
}
]
}
}
}
```

## Changelog

### next version

- Add Hbase Source Connector

96 changes: 96 additions & 0 deletions docs/zh/connector-v2/source/Hbase.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Hbase

> Hbase 源连接器
## 描述

从 Apache Hbase 读取数据。

## 主要功能

- [x] [批处理](../../concept/connector-v2-features.md)
- [ ] [流处理](../../concept/connector-v2-features.md)
- [ ] [精确一次](../../concept/connector-v2-features.md)
- [x] [Schema](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)
- [ ] [支持用户定义的拆分](../../concept/connector-v2-features.md)

## 选项

| 名称 | 类型 | 必填 | 默认值 |
|--------------------|---------|----|-------|
| zookeeper_quorum | string || - |
| table | string || - |
| schema | config || - |
| hbase_extra_config | string || - |
| caching | int || -1 |
| batch | int || -1 |
| cache_blocks | boolean || false |
| common-options | || - |

### zookeeper_quorum [string]

hbase的zookeeper集群主机,例如:“hadoop001:2181,hadoop002:2181,hadoop003:2181”

### table [string]

要写入的表名,例如:“seatunnel”

### schema [config]

Hbase 使用字节数组进行存储。因此,您需要为表中的每一列配置数据类型。有关更多信息,请参阅:[guide](../../concept/schema-feature.md#how-to-declare-type-supported)

### hbase_extra_config [config]

hbase 的额外配置

### caching

caching 参数用于设置在扫描过程中一次从服务器端获取的行数。这可以减少客户端与服务器之间的往返次数,从而提高扫描效率。默认值:-1

### batch

batch 参数用于设置在扫描过程中每次返回的最大列数。这对于处理有很多列的行特别有用,可以避免一次性返回过多数据,从而节省内存并提高性能。

### cache_blocks

cache_blocks 参数用于设置在扫描过程中是否缓存数据块。默认情况下,HBase 会在扫描时将数据块缓存到块缓存中。如果设置为 false,则在扫描过程中不会缓存数据块,从而减少内存的使用。在SeaTunnel中默认值为: false

### 常用选项

Source 插件常用参数,具体请参考 [Source 常用选项](common-options.md)

## 示例

```bash
source {
Hbase {
zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
table = "seatunnel_test"
caching = 1000
batch = 100
cache_blocks = false
schema = {
columns = [
{
name = "rowkey"
type = string
},
{
name = "columnFamily1:column1"
type = boolean
},
{
name = "columnFamily1:column2"
type = double
},
{
name = "columnFamily2:column1"
type = bigint
}
]
}
}
}
```

81 changes: 81 additions & 0 deletions docs/zh/connector-v2/source/common-options.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Source Common Options

> Source connector 的常用参数
| 名称 | 类型 | 必填 | 默认值 | 描述 |
|-------------------|--------|----|-----|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| result_table_name | String || - | 当未指定 `result_table_name` 时,此插件处理的数据将不会被注册为可由其他插件直接访问的数据集 `(dataStream/dataset)`,或称为临时表 `(table)`。<br/>当指定了 `result_table_name` 时,此插件处理的数据将被注册为可由其他插件直接访问的数据集 `(dataStream/dataset)`,或称为临时表 `(table)`。此处注册的数据集 `(dataStream/dataset)` 可通过指定 `source_table_name` 直接被其他插件访问。 |
| parallelism | Int || - | 当未指定 `parallelism` 时,默认使用环境中的 `parallelism`。<br/>当指定了 `parallelism` 时,将覆盖环境中的 `parallelism` 设置。 |

# 重要提示

在作业配置中使用 `result_table_name` 时,必须设置 `source_table_name` 参数。

## 任务示例

### 简单示例

> 注册一个流或批处理数据源,并在注册时返回表名 `fake_table`
```bash
source {
FakeSourceStream {
result_table_name = "fake_table"
}
}
```

### 复杂示例

> 这是将Fake数据源转换并写入到两个不同的目标中
```bash
env {
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
row.num = 100
schema = {
fields {
id = "int"
name = "string"
age = "int"
c_timestamp = "timestamp"
c_date = "date"
c_map = "map<string, string>"
c_array = "array<int>"
c_decimal = "decimal(30, 8)"
c_row = {
c_row = {
c_int = int
}
}
}
}
}
}

transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
# 查询表名必须与字段 'source_table_name' 相同
query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake"
}
# SQL 转换支持基本函数和条件操作
# 但不支持复杂的 SQL 操作,包括:多源表/行 JOIN 和聚合操作等
}

sink {
Console {
source_table_name = "fake1"
}
Console {
source_table_name = "fake"
}
}
```

Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ public class HbaseConfig {
.noDefaultValue()
.withDescription("Hbase rowkey column");

public static final Option<List<String>> QUERY_COLUMNS =
Options.key("query_columns")
.listType()
.noDefaultValue()
.withDescription("query Hbase columns");

public static final Option<String> ROWKEY_DELIMITER =
Options.key("rowkey_delimiter")
.stringType()
Expand Down Expand Up @@ -104,6 +98,27 @@ public class HbaseConfig {
.withDescription(
"The expiration time configuration for writing hbase data. The default value is -1, indicating no expiration time.");

public static final Option<Boolean> HBASE_CACHE_BLOCKS_CONFIG =
Options.key("cache_blocks")
.booleanType()
.defaultValue(false)
.withDescription(
"When it is false, data blocks are not cached. When it is true, data blocks are cached. This value should be set to false when scanning a large amount of data to reduce memory consumption. The default value is false");

public static final Option<Integer> HBASE_CACHING_CONFIG =
Options.key("caching")
.intType()
.defaultValue(-1)
.withDescription(
"Set the number of rows read from the server each time can reduce the number of round trips between the client and the server, thereby improving performance. The default value is -1.");

public static final Option<Integer> HBASE_BATCH_CONFIG =
Options.key("batch")
.intType()
.defaultValue(-1)
.withDescription(
"Set the batch size to control the maximum number of cells returned each time, thereby controlling the amount of data returned by a single RPC call. The default value is -1.");

public enum NullMode {
SKIP,
EMPTY;
Expand Down
Loading

0 comments on commit 34a6b8e

Please sign in to comment.