From 34a6b8e9f62d9e04da554062b46949c7075176ed Mon Sep 17 00:00:00 2001 From: Jast Date: Wed, 31 Jul 2024 14:43:11 +0800 Subject: [PATCH] [hotfix][connector-v2-hbase]fix and optimize hbase source problem (#7148) * [hotfix][improve][doc]optimize connector hbase source * [doc]add dependent document * [doc]update dependent document * [improve]improve static use * [hotfix]add test case * [hotfix]add test case --------- Co-authored-by: Jia Fan --- docs/en/connector-v2/source/Hbase.md | 109 ++++++++------- docs/zh/connector-v2/source/Hbase.md | 96 +++++++++++++ docs/zh/connector-v2/source/common-options.md | 81 +++++++++++ .../seatunnel/hbase/config/HbaseConfig.java | 27 +++- .../hbase/config/HbaseParameters.java | 24 +++- .../seatunnel/hbase/sink/HbaseSink.java | 2 +- .../seatunnel/hbase/source/HbaseSource.java | 6 +- .../hbase/source/HbaseSourceFactory.java | 1 - .../hbase/source/HbaseSourceReader.java | 30 ++-- .../e2e/connector/hbase/HbaseIT.java | 37 +++-- ...ase-source-to-assert-with-batch-query.conf | 132 ++++++++++++++++++ 11 files changed, 455 insertions(+), 90 deletions(-) create mode 100644 docs/zh/connector-v2/source/Hbase.md create mode 100644 docs/zh/connector-v2/source/common-options.md create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-to-assert-with-batch-query.conf diff --git a/docs/en/connector-v2/source/Hbase.md b/docs/en/connector-v2/source/Hbase.md index 677b827fb29..753d68eb6e8 100644 --- a/docs/en/connector-v2/source/Hbase.md +++ b/docs/en/connector-v2/source/Hbase.md @@ -1,12 +1,12 @@ # Hbase -> Hbase source connector +> Hbase Source Connector ## Description -Read data from Apache Hbase. +Reads data from Apache Hbase. -## Key features +## Key Features - [x] [batch](../../concept/connector-v2-features.md) - [ ] [stream](../../concept/connector-v2-features.md) @@ -17,75 +17,80 @@ Read data from Apache Hbase. ## Options -| name | type | required | default value | -|--------------------|--------|----------|---------------| -| zookeeper_quorum | string | yes | - | -| table | string | yes | - | -| query_columns | list | yes | - | -| schema | config | yes | - | -| hbase_extra_config | string | no | - | -| common-options | | no | - | +| Name | Type | Required | Default | +|--------------------|---------|----------|---------| +| zookeeper_quorum | string | Yes | - | +| table | string | Yes | - | +| schema | config | Yes | - | +| hbase_extra_config | string | No | - | +| caching | int | No | -1 | +| batch | int | No | -1 | +| cache_blocks | boolean | No | false | +| common-options | | No | - | ### zookeeper_quorum [string] -The zookeeper cluster host of hbase, example: "hadoop001:2181,hadoop002:2181,hadoop003:2181" +The zookeeper quorum for Hbase cluster hosts, e.g., "hadoop001:2181,hadoop002:2181,hadoop003:2181". ### table [string] -The table name you want to write, example: "seatunnel" - -### query_columns [list] - -The column name which you want to query in the table. If you want to query the rowkey column, please set "rowkey" in query_columns. -Other column format should be: columnFamily:columnName, example: ["rowkey", "columnFamily1:column1", "columnFamily1:column1", "columnFamily2:column1"] +The name of the table to write to, e.g., "seatunnel". ### schema [config] -Hbase uses byte arrays for storage. Therefore, you need to configure data types for each column in a table. For more information, see: [guide](../../concept/schema-feature.md#how-to-declare-type-supported). +Hbase stores data in byte arrays. Therefore, you need to configure the data types for each column in the table. For more information, see: [guide](../../concept/schema-feature.md#how-to-declare-type-supported). ### hbase_extra_config [config] -The extra configuration of hbase +Additional configurations for Hbase. + +### caching + +The caching parameter sets the number of rows fetched per server trip during scans. This reduces round-trips between client and server, improving scan efficiency. Default: -1. + +### batch + +The batch parameter sets the maximum number of columns returned per scan. This is useful for rows with many columns to avoid fetching excessive data at once, thus saving memory and improving performance. -### common options +### cache_blocks -Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details +The cache_blocks parameter determines whether to cache data blocks during scans. By default, HBase caches data blocks during scans. Setting this to false reduces memory usage during scans. Default in SeaTunnel: false. -## Examples +### common-options + +Common parameters for Source plugins, refer to [Common Source Options](common-options.md). + +## Example ```bash source { Hbase { - zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181" - table = "seatunnel_test" - query_columns=["rowkey", "columnFamily1:column1", "columnFamily1:column1", "columnFamily2:column1"] - schema = { - columns = [ - { - name = rowkey - type = string - }, - { - name = "columnFamily1:column1" - type = boolean - }, - { - name = "columnFamily1:column1" - type = double - }, - { - name = "columnFamily2:column1" - type = bigint - } - ] - } + zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181" + table = "seatunnel_test" + caching = 1000 + batch = 100 + cache_blocks = false + schema = { + columns = [ + { + name = "rowkey" + type = string + }, + { + name = "columnFamily1:column1" + type = boolean + }, + { + name = "columnFamily1:column2" + type = double + }, + { + name = "columnFamily2:column1" + type = bigint + } + ] + } } } ``` -## Changelog - -### next version - -- Add Hbase Source Connector - diff --git a/docs/zh/connector-v2/source/Hbase.md b/docs/zh/connector-v2/source/Hbase.md new file mode 100644 index 00000000000..5f15a30b99a --- /dev/null +++ b/docs/zh/connector-v2/source/Hbase.md @@ -0,0 +1,96 @@ +# Hbase + +> Hbase 源连接器 + +## 描述 + +从 Apache Hbase 读取数据。 + +## 主要功能 + +- [x] [批处理](../../concept/connector-v2-features.md) +- [ ] [流处理](../../concept/connector-v2-features.md) +- [ ] [精确一次](../../concept/connector-v2-features.md) +- [x] [Schema](../../concept/connector-v2-features.md) +- [x] [并行度](../../concept/connector-v2-features.md) +- [ ] [支持用户定义的拆分](../../concept/connector-v2-features.md) + +## 选项 + +| 名称 | 类型 | 必填 | 默认值 | +|--------------------|---------|----|-------| +| zookeeper_quorum | string | 是 | - | +| table | string | 是 | - | +| schema | config | 是 | - | +| hbase_extra_config | string | 否 | - | +| caching | int | 否 | -1 | +| batch | int | 否 | -1 | +| cache_blocks | boolean | 否 | false | +| common-options | | 否 | - | + +### zookeeper_quorum [string] + +hbase的zookeeper集群主机,例如:“hadoop001:2181,hadoop002:2181,hadoop003:2181” + +### table [string] + +要写入的表名,例如:“seatunnel” + +### schema [config] + +Hbase 使用字节数组进行存储。因此,您需要为表中的每一列配置数据类型。有关更多信息,请参阅:[guide](../../concept/schema-feature.md#how-to-declare-type-supported)。 + +### hbase_extra_config [config] + +hbase 的额外配置 + +### caching + +caching 参数用于设置在扫描过程中一次从服务器端获取的行数。这可以减少客户端与服务器之间的往返次数,从而提高扫描效率。默认值:-1 + +### batch + +batch 参数用于设置在扫描过程中每次返回的最大列数。这对于处理有很多列的行特别有用,可以避免一次性返回过多数据,从而节省内存并提高性能。 + +### cache_blocks + +cache_blocks 参数用于设置在扫描过程中是否缓存数据块。默认情况下,HBase 会在扫描时将数据块缓存到块缓存中。如果设置为 false,则在扫描过程中不会缓存数据块,从而减少内存的使用。在SeaTunnel中默认值为: false + +### 常用选项 + +Source 插件常用参数,具体请参考 [Source 常用选项](common-options.md) + +## 示例 + +```bash +source { + Hbase { + zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181" + table = "seatunnel_test" + caching = 1000 + batch = 100 + cache_blocks = false + schema = { + columns = [ + { + name = "rowkey" + type = string + }, + { + name = "columnFamily1:column1" + type = boolean + }, + { + name = "columnFamily1:column2" + type = double + }, + { + name = "columnFamily2:column1" + type = bigint + } + ] + } + } +} +``` + diff --git a/docs/zh/connector-v2/source/common-options.md b/docs/zh/connector-v2/source/common-options.md new file mode 100644 index 00000000000..902dca2c195 --- /dev/null +++ b/docs/zh/connector-v2/source/common-options.md @@ -0,0 +1,81 @@ +# Source Common Options + +> Source connector 的常用参数 + +| 名称 | 类型 | 必填 | 默认值 | 描述 | +|-------------------|--------|----|-----|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| result_table_name | String | 否 | - | 当未指定 `result_table_name` 时,此插件处理的数据将不会被注册为可由其他插件直接访问的数据集 `(dataStream/dataset)`,或称为临时表 `(table)`。
当指定了 `result_table_name` 时,此插件处理的数据将被注册为可由其他插件直接访问的数据集 `(dataStream/dataset)`,或称为临时表 `(table)`。此处注册的数据集 `(dataStream/dataset)` 可通过指定 `source_table_name` 直接被其他插件访问。 | +| parallelism | Int | 否 | - | 当未指定 `parallelism` 时,默认使用环境中的 `parallelism`。
当指定了 `parallelism` 时,将覆盖环境中的 `parallelism` 设置。 | + +# 重要提示 + +在作业配置中使用 `result_table_name` 时,必须设置 `source_table_name` 参数。 + +## 任务示例 + +### 简单示例 + +> 注册一个流或批处理数据源,并在注册时返回表名 `fake_table` + +```bash +source { + FakeSourceStream { + result_table_name = "fake_table" + } +} +``` + +### 复杂示例 + +> 这是将Fake数据源转换并写入到两个不同的目标中 + +```bash +env { + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + id = "int" + name = "string" + age = "int" + c_timestamp = "timestamp" + c_date = "date" + c_map = "map" + c_array = "array" + c_decimal = "decimal(30, 8)" + c_row = { + c_row = { + c_int = int + } + } + } + } + } +} + +transform { + Sql { + source_table_name = "fake" + result_table_name = "fake1" + # 查询表名必须与字段 'source_table_name' 相同 + query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake" + } + # SQL 转换支持基本函数和条件操作 + # 但不支持复杂的 SQL 操作,包括:多源表/行 JOIN 和聚合操作等 +} + +sink { + Console { + source_table_name = "fake1" + } + Console { + source_table_name = "fake" + } +} +``` + diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java index 88c068bee11..44a5640ffed 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java @@ -42,12 +42,6 @@ public class HbaseConfig { .noDefaultValue() .withDescription("Hbase rowkey column"); - public static final Option> QUERY_COLUMNS = - Options.key("query_columns") - .listType() - .noDefaultValue() - .withDescription("query Hbase columns"); - public static final Option ROWKEY_DELIMITER = Options.key("rowkey_delimiter") .stringType() @@ -104,6 +98,27 @@ public class HbaseConfig { .withDescription( "The expiration time configuration for writing hbase data. The default value is -1, indicating no expiration time."); + public static final Option HBASE_CACHE_BLOCKS_CONFIG = + Options.key("cache_blocks") + .booleanType() + .defaultValue(false) + .withDescription( + "When it is false, data blocks are not cached. When it is true, data blocks are cached. This value should be set to false when scanning a large amount of data to reduce memory consumption. The default value is false"); + + public static final Option HBASE_CACHING_CONFIG = + Options.key("caching") + .intType() + .defaultValue(-1) + .withDescription( + "Set the number of rows read from the server each time can reduce the number of round trips between the client and the server, thereby improving performance. The default value is -1."); + + public static final Option HBASE_BATCH_CONFIG = + Options.key("batch") + .intType() + .defaultValue(-1) + .withDescription( + "Set the batch size to control the maximum number of cells returned each time, thereby controlling the amount of data returned by a single RPC call. The default value is -1."); + public enum NullMode { SKIP, EMPTY; diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java index 490e2481070..c25f04b3753 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java @@ -30,10 +30,12 @@ import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME; +import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_BATCH_CONFIG; +import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_CACHE_BLOCKS_CONFIG; +import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_CACHING_CONFIG; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE; @@ -60,8 +62,14 @@ public class HbaseParameters implements Serializable { private Map hbaseExtraConfig; + @Builder.Default private int caching = HBASE_CACHING_CONFIG.defaultValue(); + + @Builder.Default private int batch = HBASE_BATCH_CONFIG.defaultValue(); + @Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue(); + @Builder.Default private boolean cacheBlocks = HBASE_CACHE_BLOCKS_CONFIG.defaultValue(); + @Builder.Default private String rowkeyDelimiter = ROWKEY_DELIMITER.defaultValue(); @Builder.Default private HbaseConfig.NullMode nullMode = NULL_MODE.defaultValue(); @@ -72,7 +80,7 @@ public class HbaseParameters implements Serializable { @Builder.Default private HbaseConfig.EnCoding enCoding = ENCODING.defaultValue(); - public static HbaseParameters buildWithConfig(Config pluginConfig) { + public static HbaseParameters buildWithSinkConfig(Config pluginConfig) { HbaseParametersBuilder builder = HbaseParameters.builder(); // required parameters @@ -113,18 +121,26 @@ public static HbaseParameters buildWithConfig(Config pluginConfig) { return builder.build(); } - public static HbaseParameters buildWithSinkConfig(Config pluginConfig) { + public static HbaseParameters buildWithSourceConfig(Config pluginConfig) { HbaseParametersBuilder builder = HbaseParameters.builder(); // required parameters builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key())); builder.table(pluginConfig.getString(TABLE.key())); - builder.columns(pluginConfig.getStringList(QUERY_COLUMNS.key())); if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) { Config extraConfig = pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key()); builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig)); } + if (pluginConfig.hasPath(HBASE_CACHING_CONFIG.key())) { + builder.caching(pluginConfig.getInt(HBASE_CACHING_CONFIG.key())); + } + if (pluginConfig.hasPath(HBASE_BATCH_CONFIG.key())) { + builder.batch(pluginConfig.getInt(HBASE_BATCH_CONFIG.key())); + } + if (pluginConfig.hasPath(HBASE_CACHE_BLOCKS_CONFIG.key())) { + builder.cacheBlocks(pluginConfig.getBoolean(HBASE_CACHE_BLOCKS_CONFIG.key())); + } return builder.build(); } } diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java index 848e1e82053..4f7b929223f 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java @@ -79,7 +79,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SINK, result.getMsg())); } - this.hbaseParameters = HbaseParameters.buildWithConfig(pluginConfig); + this.hbaseParameters = HbaseParameters.buildWithSinkConfig(pluginConfig); if (hbaseParameters.getFamilyNames().size() == 0) { throw new HbaseConnectorException( SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java index 869e33f6235..3aca3161516 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java @@ -44,7 +44,6 @@ import java.util.List; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM; @@ -68,8 +67,7 @@ public String getPluginName() { HbaseSource(Config pluginConfig) { this.pluginConfig = pluginConfig; CheckResult result = - CheckConfigUtil.checkAllExists( - pluginConfig, ZOOKEEPER_QUORUM.key(), TABLE.key(), QUERY_COLUMNS.key()); + CheckConfigUtil.checkAllExists(pluginConfig, ZOOKEEPER_QUORUM.key(), TABLE.key()); if (!result.isSuccess()) { throw new HbaseConnectorException( SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, @@ -77,7 +75,7 @@ public String getPluginName() { "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, result.getMsg())); } - this.hbaseParameters = HbaseParameters.buildWithSinkConfig(pluginConfig); + this.hbaseParameters = HbaseParameters.buildWithSourceConfig(pluginConfig); this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig); this.seaTunnelRowType = catalogTable.getSeaTunnelRowType(); } diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java index 4eec3e00482..2de385dbd18 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java @@ -45,7 +45,6 @@ public OptionRule optionRule() { return OptionRule.builder() .required(HbaseConfig.ZOOKEEPER_QUORUM) .required(HbaseConfig.TABLE) - .required(HbaseConfig.QUERY_COLUMNS) .build(); } diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java index 556374844e9..526ac826db1 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -39,13 +40,13 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Deque; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.stream.Collectors; @Slf4j public class HbaseSourceReader implements SourceReader { @@ -54,7 +55,6 @@ public class HbaseSourceReader implements SourceReader namesMap; - private final Set columnFamilies = new LinkedHashSet<>(); private final SourceReader.Context context; private final SeaTunnelRowType seaTunnelRowType; private volatile boolean noMoreSplit = false; @@ -74,16 +74,17 @@ public HbaseSourceReader( this.seaTunnelRowType = seaTunnelRowType; this.namesMap = Maps.newConcurrentMap(); - this.columnNames = hbaseParameters.getColumns(); + this.columnNames = + Arrays.asList(seaTunnelRowType.getFieldNames()).stream() + .filter(name -> !ROW_KEY.equals(name)) + .collect(Collectors.toList()); // Check if input column names are in format: [ columnFamily:column ]. this.columnNames.stream() - .peek( + .forEach( column -> Preconditions.checkArgument( - (column.contains(":") && column.split(":").length == 2) - || this.ROW_KEY.equalsIgnoreCase(column), - "Invalid column names, it should be [ColumnFamily:Column] format")) - .forEach(column -> this.columnFamilies.add(column.split(":")[0])); + column.contains(":") && column.split(":").length == 2, + "Invalid column names, it should be [ColumnFamily:Column] format")); connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters); } @@ -122,6 +123,15 @@ public void pollNext(Collector output) throws Exception { Scan scan = new Scan(); scan.withStartRow(split.getStartRow(), true); scan.withStopRow(split.getEndRow(), true); + scan.setCacheBlocks(hbaseParameters.isCacheBlocks()); + scan.setCaching(hbaseParameters.getCaching()); + scan.setBatch(hbaseParameters.getBatch()); + for (String columnName : this.columnNames) { + String[] columnNameSplit = columnName.split(":"); + scan.addColumn( + Bytes.toBytes(columnNameSplit[0]), + Bytes.toBytes(columnNameSplit[1])); + } this.currentScanner = this.connection .getTable(TableName.valueOf(hbaseParameters.getTable())) @@ -152,7 +162,7 @@ private byte[][] convertRawRow(Result result) { byte[] bytes; try { // handle rowkey column - if (this.ROW_KEY.equals(columnName)) { + if (ROW_KEY.equals(columnName)) { bytes = result.getRow(); } else { byte[][] arr = this.namesMap.get(columnName); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java index 13a7a8805a6..85ceef92353 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java @@ -93,18 +93,7 @@ public void tearDown() throws Exception { @TestTemplate public void testHbaseSink(TestContainer container) throws IOException, InterruptedException { - deleteData(table); - Container.ExecResult sinkExecResult = container.executeJob("/fake-to-hbase.conf"); - Assertions.assertEquals(0, sinkExecResult.getExitCode()); - Table hbaseTable = hbaseConnection.getTable(table); - Scan scan = new Scan(); - ResultScanner scanner = hbaseTable.getScanner(scan); - ArrayList results = new ArrayList<>(); - for (Result result : scanner) { - results.add(result); - } - Assertions.assertEquals(results.size(), 5); - scanner.close(); + fakeToHbase(container); Container.ExecResult sourceExecResult = container.executeJob("/hbase-to-assert.conf"); Assertions.assertEquals(0, sourceExecResult.getExitCode()); } @@ -177,6 +166,30 @@ public void testHbaseSinkAssignCfSink(TestContainer container) Assertions.assertEquals(cf2Count, 5); } + @TestTemplate + public void testHbaseSourceWithBatchQuery(TestContainer container) + throws IOException, InterruptedException { + fakeToHbase(container); + Container.ExecResult sourceExecResult = + container.executeJob("/hbase-source-to-assert-with-batch-query.conf"); + Assertions.assertEquals(0, sourceExecResult.getExitCode()); + } + + private void fakeToHbase(TestContainer container) throws IOException, InterruptedException { + deleteData(table); + Container.ExecResult sinkExecResult = container.executeJob("/fake-to-hbase.conf"); + Assertions.assertEquals(0, sinkExecResult.getExitCode()); + Table hbaseTable = hbaseConnection.getTable(table); + Scan scan = new Scan(); + ResultScanner scanner = hbaseTable.getScanner(scan); + ArrayList results = new ArrayList<>(); + for (Result result : scanner) { + results.add(result); + } + Assertions.assertEquals(results.size(), 5); + scanner.close(); + } + private void deleteData(TableName table) throws IOException { Table hbaseTable = hbaseConnection.getTable(table); Scan scan = new Scan(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-to-assert-with-batch-query.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-to-assert-with-batch-query.conf new file mode 100644 index 00000000000..c89cf28e25d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-to-assert-with-batch-query.conf @@ -0,0 +1,132 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Hbase { + zookeeper_quorum = "hbase_e2e:2181" + table = "seatunnel_test" + query_columns=["rowkey", "info:age", "info:c_double", "info:c_boolean","info:c_bigint","info:c_smallint","info:c_tinyint","info:c_float"] + caching = 1000 + batch = 100 + cache_blocks = false + schema = { + columns = [ + { + name = rowkey + type = string + }, + { + name = "info:age" + type = int + }, + { + name = "info:c_double" + type = double + }, + { + name = "info:c_boolean" + type = boolean + }, + { + name = "info:c_bigint" + type = bigint + }, + { + name = "info:c_smallint" + type = smallint + }, + { + name = "info:c_tinyint" + type = tinyint + }, + { + name = "info:c_float" + type = float + } + ] + } + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + }, + { + rule_type = MIN_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = rowkey + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = "info:c_boolean" + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = "info:c_double" + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = "info:c_bigint" + field_type = bigint + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = "info:age" + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file