Skip to content

Commit 87d155e

Browse files
committedMay 2, 2024·
[Enhancement] add information_schema.table_options(#32572)
1 parent e9d2442 commit 87d155e

File tree

11 files changed

+407
-2
lines changed

11 files changed

+407
-2
lines changed
 

‎be/src/exec/schema_scanner.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "exec/schema_scanner/schema_rowsets_scanner.h"
4242
#include "exec/schema_scanner/schema_schema_privileges_scanner.h"
4343
#include "exec/schema_scanner/schema_schemata_scanner.h"
44+
#include "exec/schema_scanner/schema_table_options_scanner.h"
4445
#include "exec/schema_scanner/schema_table_privileges_scanner.h"
4546
#include "exec/schema_scanner/schema_tables_scanner.h"
4647
#include "exec/schema_scanner/schema_user_privileges_scanner.h"
@@ -170,6 +171,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
170171
return SchemaUserScanner::create_unique();
171172
case TSchemaTableType::SCH_WORKLOAD_SCHEDULE_POLICY:
172173
return SchemaWorkloadSchedulePolicyScanner::create_unique();
174+
case TSchemaTableType::SCH_TABLE_OPTIONS:
175+
return SchemaTableOptionsScanner::create_unique();
173176
default:
174177
return SchemaDummyScanner::create_unique();
175178
break;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "exec/schema_scanner/schema_table_options_scanner.h"
19+
20+
#include "runtime/client_cache.h"
21+
#include "runtime/exec_env.h"
22+
#include "runtime/runtime_state.h"
23+
#include "util/thrift_rpc_helper.h"
24+
#include "vec/common/string_ref.h"
25+
#include "vec/core/block.h"
26+
#include "vec/data_types/data_type_factory.hpp"
27+
28+
namespace doris {
29+
std::vector<SchemaScanner::ColumnDesc> SchemaTableOptionsScanner::_s_tbls_columns = {
30+
{"TABLE_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
31+
{"TABLE_CATALOG", TYPE_VARCHAR, sizeof(StringRef), true},
32+
{"TABLE_SCHEMA", TYPE_VARCHAR, sizeof(StringRef), true},
33+
{"DISTRIBUTE_KEY", TYPE_STRING, sizeof(StringRef), true},
34+
{"DISTRIBUTE_TYPE", TYPE_STRING, sizeof(StringRef), true},
35+
{"BUCKETS_NUM", TYPE_INT, sizeof(int32_t), true},
36+
{"PARTITION_NUM", TYPE_INT, sizeof(int32_t), true},
37+
{"PROPERTIES", TYPE_STRING, sizeof(StringRef), true},
38+
};
39+
40+
SchemaTableOptionsScanner::SchemaTableOptionsScanner()
41+
: SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_TABLE_OPTIONS) {}
42+
43+
Status SchemaTableOptionsScanner::start(RuntimeState* state) {
44+
_block_rows_limit = state->batch_size();
45+
_rpc_timeout = state->execution_timeout() * 1000;
46+
return Status::OK();
47+
}
48+
49+
Status SchemaTableOptionsScanner::get_block_from_fe() {
50+
TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address;
51+
52+
TSchemaTableRequestParams schema_table_request_params;
53+
for (int i = 0; i < _s_tbls_columns.size(); i++) {
54+
schema_table_request_params.__isset.columns_name = true;
55+
schema_table_request_params.columns_name.emplace_back(_s_tbls_columns[i].name);
56+
}
57+
schema_table_request_params.__set_current_user_ident(*_param->common_param->current_user_ident);
58+
59+
TFetchSchemaTableDataRequest request;
60+
request.__set_schema_table_name(TSchemaTableName::TABLE_OPTIONS);
61+
request.__set_schema_table_params(schema_table_request_params);
62+
63+
TFetchSchemaTableDataResult result;
64+
65+
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
66+
master_addr.hostname, master_addr.port,
67+
[&request, &result](FrontendServiceConnection& client) {
68+
client->fetchSchemaTableData(result, request);
69+
},
70+
_rpc_timeout));
71+
72+
Status status(Status::create(result.status));
73+
if (!status.ok()) {
74+
LOG(WARNING) << "fetch table options from FE failed, errmsg=" << status;
75+
return status;
76+
}
77+
std::vector<TRow> result_data = result.data_batch;
78+
79+
_tableoptions_block = vectorized::Block::create_unique();
80+
for (int i = 0; i < _s_tbls_columns.size(); ++i) {
81+
TypeDescriptor descriptor(_s_tbls_columns[i].type);
82+
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
83+
_tableoptions_block->insert(vectorized::ColumnWithTypeAndName(
84+
data_type->create_column(), data_type, _s_tbls_columns[i].name));
85+
}
86+
_tableoptions_block->reserve(_block_rows_limit);
87+
if (result_data.size() > 0) {
88+
int col_size = result_data[0].column_value.size();
89+
if (col_size != _s_tbls_columns.size()) {
90+
return Status::InternalError<false>("table options schema is not match for FE and BE");
91+
}
92+
}
93+
94+
for (int i = 0; i < result_data.size(); i++) {
95+
TRow row = result_data[i];
96+
for (int j = 0; j < _s_tbls_columns.size(); j++) {
97+
RETURN_IF_ERROR(insert_block_column(row.column_value[j], j, _tableoptions_block.get(),
98+
_s_tbls_columns[j].type));
99+
}
100+
}
101+
return Status::OK();
102+
}
103+
104+
Status SchemaTableOptionsScanner::get_next_block(vectorized::Block* block, bool* eos) {
105+
if (!_is_init) {
106+
return Status::InternalError("Used before initialized.");
107+
}
108+
109+
if (nullptr == block || nullptr == eos) {
110+
return Status::InternalError("input pointer is nullptr.");
111+
}
112+
113+
if (_tableoptions_block == nullptr) {
114+
RETURN_IF_ERROR(get_block_from_fe());
115+
_total_rows = _tableoptions_block->rows();
116+
}
117+
118+
if (_row_idx == _total_rows) {
119+
*eos = true;
120+
return Status::OK();
121+
}
122+
123+
int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
124+
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
125+
mblock.add_rows(_tableoptions_block.get(), _row_idx, current_batch_rows);
126+
_row_idx += current_batch_rows;
127+
128+
*eos = _row_idx == _total_rows;
129+
return Status::OK();
130+
}
131+
132+
} // namespace doris
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <vector>
21+
22+
#include "common/status.h"
23+
#include "exec/schema_scanner.h"
24+
25+
namespace doris {
26+
class RuntimeState;
27+
namespace vectorized {
28+
class Block;
29+
} // namespace vectorized
30+
31+
class SchemaTableOptionsScanner : public SchemaScanner {
32+
ENABLE_FACTORY_CREATOR(SchemaTableOptionsScanner);
33+
34+
public:
35+
SchemaTableOptionsScanner();
36+
~SchemaTableOptionsScanner() override = default;
37+
38+
Status start(RuntimeState* state) override;
39+
Status get_next_block(vectorized::Block* block, bool* eos) override;
40+
41+
static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
42+
43+
private:
44+
Status get_block_from_fe();
45+
46+
int _block_rows_limit = 4096;
47+
int _row_idx = 0;
48+
int _total_rows = 0;
49+
std::unique_ptr<vectorized::Block> _tableoptions_block = nullptr;
50+
int _rpc_timeout = 3000;
51+
};
52+
}; // namespace doris

‎fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ public enum SchemaTableType {
7777
SCH_PROCS_PRIV("procs_priv", "procs_priv", TSchemaTableType.SCH_PROCS_PRIV),
7878

7979
SCH_WORKLOAD_SCHEDULE_POLICY("WORKLOAD_SCHEDULE_POLICY", "WORKLOAD_SCHEDULE_POLICY",
80-
TSchemaTableType.SCH_WORKLOAD_SCHEDULE_POLICY);
80+
TSchemaTableType.SCH_WORKLOAD_SCHEDULE_POLICY),
81+
SCH_TABLE_OPTIONS("TABLE_OPTIONS", "TABLE_OPTIONS",
82+
TSchemaTableType.SCH_TABLE_OPTIONS);
8183

8284
private static final String dbName = "INFORMATION_SCHEMA";
8385
private static SelectList fullSelectLists;

‎fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java

+11
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,17 @@ public class SchemaTable extends Table {
510510
.column("VERSION", ScalarType.createType(PrimitiveType.INT))
511511
.column("WORKLOAD_GROUP", ScalarType.createStringType())
512512
.build()))
513+
.put("table_options",
514+
new SchemaTable(SystemIdGenerator.getNextId(), "table_options", TableType.SCHEMA,
515+
builder().column("TABLE_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
516+
.column("TABLE_CATALOG", ScalarType.createVarchar(NAME_CHAR_LEN))
517+
.column("TABLE_SCHEMA", ScalarType.createVarchar(NAME_CHAR_LEN))
518+
.column("DISTRIBUTE_KEY", ScalarType.createStringType())
519+
.column("DISTRIBUTE_TYPE", ScalarType.createStringType())
520+
.column("BUCKETS_NUM", ScalarType.createType(PrimitiveType.INT))
521+
.column("PARTITION_NUM", ScalarType.createType(PrimitiveType.INT))
522+
.column("PROPERTIES", ScalarType.createStringType())
523+
.build()))
513524
.build();
514525

515526
protected SchemaTable(long id, String name, TableType type, List<Column> baseSchema) {

‎fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java

+11
Original file line numberDiff line numberDiff line change
@@ -663,4 +663,15 @@ public String getStorageVaultName() {
663663
public void setStorageVaultName(String storageVaultName) {
664664
properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_VAULT_NAME, storageVaultName);
665665
}
666+
667+
public String getPropertiesString() {
668+
StringBuilder str = new StringBuilder("");
669+
for (Map.Entry<String, String> entry : properties.entrySet()) {
670+
if (str.length() != 0) {
671+
str.append(", ");
672+
}
673+
str.append(entry.getKey() + " = " + entry.getValue());
674+
}
675+
return str.toString();
676+
}
666677
}

‎fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java

+80
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,16 @@
2020
import org.apache.doris.analysis.UserIdentity;
2121
import org.apache.doris.catalog.Column;
2222
import org.apache.doris.catalog.DatabaseIf;
23+
import org.apache.doris.catalog.DistributionInfo;
24+
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
2325
import org.apache.doris.catalog.Env;
26+
import org.apache.doris.catalog.HashDistributionInfo;
2427
import org.apache.doris.catalog.MTMV;
28+
import org.apache.doris.catalog.OlapTable;
2529
import org.apache.doris.catalog.SchemaTable;
2630
import org.apache.doris.catalog.Table;
31+
import org.apache.doris.catalog.TableIf;
32+
import org.apache.doris.catalog.TableProperty;
2733
import org.apache.doris.common.AnalysisException;
2834
import org.apache.doris.common.ClientPool;
2935
import org.apache.doris.common.Pair;
@@ -100,6 +106,8 @@ public class MetadataGenerator {
100106

101107
private static final ImmutableMap<String, Integer> WORKLOAD_SCHED_POLICY_COLUMN_TO_INDEX;
102108

109+
private static final ImmutableMap<String, Integer> TABLE_OPTIONS_COLUMN_TO_INDEX;
110+
103111
static {
104112
ImmutableMap.Builder<String, Integer> activeQueriesbuilder = new ImmutableMap.Builder();
105113
List<Column> activeQueriesColList = SchemaTable.TABLE_MAP.get("active_queries").getFullSchema();
@@ -127,6 +135,12 @@ public class MetadataGenerator {
127135
}
128136
WORKLOAD_SCHED_POLICY_COLUMN_TO_INDEX = policyBuilder.build();
129137

138+
ImmutableMap.Builder<String, Integer> optionBuilder = new ImmutableMap.Builder();
139+
List<Column> optionColList = SchemaTable.TABLE_MAP.get("table_options").getFullSchema();
140+
for (int i = 0; i < optionColList.size(); i++) {
141+
optionBuilder.put(optionColList.get(i).getName().toLowerCase(), i);
142+
}
143+
TABLE_OPTIONS_COLUMN_TO_INDEX = optionBuilder.build();
130144
}
131145

132146
public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableDataRequest request) throws TException {
@@ -203,6 +217,10 @@ public static TFetchSchemaTableDataResult getSchemaTableData(TFetchSchemaTableDa
203217
result = workloadSchedPolicyMetadataResult(schemaTableParams);
204218
columnIndex = WORKLOAD_SCHED_POLICY_COLUMN_TO_INDEX;
205219
break;
220+
case TABLE_OPTIONS:
221+
result = tableOptionsMetadataResult(schemaTableParams);
222+
columnIndex = TABLE_OPTIONS_COLUMN_TO_INDEX;
223+
break;
206224
default:
207225
return errorResult("invalid schema table name.");
208226
}
@@ -873,4 +891,66 @@ private static TFetchSchemaTableDataResult routineInfoMetadataResult(TSchemaTabl
873891
result.setStatus(new TStatus(TStatusCode.OK));
874892
return result;
875893
}
894+
895+
private static TFetchSchemaTableDataResult tableOptionsMetadataResult(TSchemaTableRequestParams params) {
896+
if (!params.isSetCurrentUserIdent()) {
897+
return errorResult("current user ident is not set.");
898+
}
899+
900+
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
901+
List<TRow> dataBatch = Lists.newArrayList();
902+
List<Long> catalogIds = Env.getCurrentEnv().getCatalogMgr().getCatalogIds();
903+
for (Long catalogId : catalogIds) {
904+
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
905+
List<Long> dbIds = catalog.getDbIds();
906+
for (Long dbId : dbIds) {
907+
DatabaseIf database = catalog.getDbNullable(dbId);
908+
List<TableIf> tables = database.getTables();
909+
for (TableIf table : tables) {
910+
if (table instanceof OlapTable) {
911+
OlapTable olapTable = (OlapTable) table;
912+
TRow trow = new TRow();
913+
trow.addToColumnValue(new TCell().setStringVal(table.getName())); // TABLE_NAME
914+
trow.addToColumnValue(new TCell().setStringVal(catalog.getName())); // TABLE_CATALOG
915+
trow.addToColumnValue(new TCell().setStringVal(database.getFullName())); // TABLE_SCHEMA
916+
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
917+
if (distributionInfo.getType() == DistributionInfoType.HASH) {
918+
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
919+
List<Column> distributionColumns = hashDistributionInfo.getDistributionColumns();
920+
StringBuilder distributeKey = new StringBuilder();
921+
for (Column c : distributionColumns) {
922+
if (distributeKey.length() != 0) {
923+
distributeKey.append(",");
924+
}
925+
distributeKey.append(c.getName());
926+
}
927+
if (distributeKey.length() == 0) {
928+
trow.addToColumnValue(new TCell().setStringVal(""));
929+
} else {
930+
trow.addToColumnValue(
931+
new TCell().setStringVal(distributeKey.toString()));
932+
}
933+
trow.addToColumnValue(new TCell().setStringVal("HASH")); // DISTRIBUTE_TYPE
934+
} else {
935+
trow.addToColumnValue(new TCell().setStringVal("RANDOM")); // DISTRIBUTE_KEY
936+
trow.addToColumnValue(new TCell().setStringVal("RANDOM")); // DISTRIBUTE_TYPE
937+
}
938+
trow.addToColumnValue(new TCell().setIntVal(distributionInfo.getBucketNum())); // BUCKETS_NUM
939+
trow.addToColumnValue(new TCell().setIntVal(olapTable.getPartitionNum())); // PARTITION_NUM
940+
TableProperty property = olapTable.getTableProperty();
941+
if (property == null) {
942+
trow.addToColumnValue(new TCell().setStringVal("")); // PROPERTIES
943+
} else {
944+
trow.addToColumnValue(
945+
new TCell().setStringVal(property.getPropertiesString())); // PROPERTIES
946+
}
947+
dataBatch.add(trow);
948+
}
949+
}
950+
}
951+
}
952+
result.setDataBatch(dataBatch);
953+
result.setStatus(new TStatus(TStatusCode.OK));
954+
return result;
955+
}
876956
}

‎gensrc/thrift/Descriptors.thrift

+2-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ enum TSchemaTableType {
131131
SCH_WORKLOAD_GROUPS,
132132
SCH_USER,
133133
SCH_PROCS_PRIV,
134-
SCH_WORKLOAD_SCHEDULE_POLICY;
134+
SCH_WORKLOAD_SCHEDULE_POLICY,
135+
SCH_TABLE_OPTIONS;
135136
}
136137

137138
enum THdfsCompression {

‎gensrc/thrift/FrontendService.thrift

+1
Original file line numberDiff line numberDiff line change
@@ -948,6 +948,7 @@ enum TSchemaTableName {
948948
WORKLOAD_GROUPS = 3, // db information_schema's table
949949
ROUTINES_INFO = 4, // db information_schema's table
950950
WORKLOAD_SCHEDULE_POLICY = 5,
951+
TABLE_OPTIONS = 6,
951952
}
952953

953954
struct TMetadataTableRequestParams {

0 commit comments

Comments
 (0)
Please sign in to comment.