Skip to content

Commit 2046a26

Browse files
Merge branch 'master' into master-fix1
2 parents 261f979 + 03c625f commit 2046a26

File tree

13 files changed

+682
-72
lines changed

13 files changed

+682
-72
lines changed

.asf.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,13 @@ github:
152152
- LemonLiTree
153153
- Yukang-Lian
154154
- TangSiyang2001
155-
- Lchangliang
156155
- freemandealer
157156
- shuke987
158157
- wm1581066
159158
- KassieZ
160159
- yujun777
161160
- doris-robot
161+
- LiBinfeng-01
162162

163163
notifications:
164164
pullrequests_status: commits@doris.apache.org

be/src/cloud/cloud_tablet.cpp

+35-10
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,36 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version,
108108
return capture_rs_readers_unlocked(version_path, rs_splits);
109109
}
110110

111+
Status CloudTablet::merge_rowsets_schema() {
112+
// Find the rowset with the max version
113+
auto max_version_rowset =
114+
std::max_element(
115+
_rs_version_map.begin(), _rs_version_map.end(),
116+
[](const auto& a, const auto& b) {
117+
return !a.second->tablet_schema()
118+
? true
119+
: (!b.second->tablet_schema()
120+
? false
121+
: a.second->tablet_schema()->schema_version() <
122+
b.second->tablet_schema()
123+
->schema_version());
124+
})
125+
->second;
126+
TabletSchemaSPtr max_version_schema = max_version_rowset->tablet_schema();
127+
// If the schema has variant columns, perform a merge to create a wide tablet schema
128+
if (max_version_schema->num_variant_columns() > 0) {
129+
std::vector<TabletSchemaSPtr> schemas;
130+
std::transform(_rs_version_map.begin(), _rs_version_map.end(), std::back_inserter(schemas),
131+
[](const auto& rs_meta) { return rs_meta.second->tablet_schema(); });
132+
// Merge the collected schemas to obtain the least common schema
133+
RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(schemas, nullptr,
134+
max_version_schema));
135+
VLOG_DEBUG << "dump schema: " << max_version_schema->dump_full_schema();
136+
_merged_tablet_schema = max_version_schema;
137+
}
138+
return Status::OK();
139+
}
140+
111141
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
112142
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
113143
Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) {
@@ -133,6 +163,10 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data)
133163
if (st.is<ErrorCode::NOT_FOUND>()) {
134164
clear_cache();
135165
}
166+
167+
// Merge all rowset schemas within a CloudTablet
168+
RETURN_IF_ERROR(merge_rowsets_schema());
169+
136170
return st;
137171
}
138172

@@ -188,16 +222,7 @@ Status CloudTablet::sync_if_not_running() {
188222
}
189223

190224
TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
191-
std::shared_lock rdlock(_meta_lock);
192-
TabletSchemaSPtr target_schema;
193-
std::vector<TabletSchemaSPtr> schemas;
194-
for (const auto& [_, rowset] : _rs_version_map) {
195-
schemas.push_back(rowset->tablet_schema());
196-
}
197-
// get the max version schema and merge all schema
198-
static_cast<void>(
199-
vectorized::schema_util::get_least_common_schema(schemas, nullptr, target_schema));
200-
return target_schema;
225+
return _merged_tablet_schema;
201226
}
202227

203228
void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,

be/src/cloud/cloud_tablet.h

+6
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ class CloudTablet final : public BaseTablet {
208208

209209
Status sync_if_not_running();
210210

211+
// Merge all rowset schemas within a CloudTablet
212+
Status merge_rowsets_schema();
213+
211214
CloudStorageEngine& _engine;
212215

213216
// this mutex MUST ONLY be used when sync meta
@@ -246,6 +249,9 @@ class CloudTablet final : public BaseTablet {
246249
std::mutex _base_compaction_lock;
247250
std::mutex _cumulative_compaction_lock;
248251
mutable std::mutex _rowset_update_lock;
252+
253+
// Schema will be merged from all rowsets when sync_rowsets
254+
TabletSchemaSPtr _merged_tablet_schema;
249255
};
250256

251257
using CloudTabletSPtr = std::shared_ptr<CloudTablet>;

be/src/http/action/adjust_log_level.cpp

+9-5
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@
1717

1818
#include <http/action/adjust_log_level.h>
1919

20+
#include <tuple>
21+
2022
#include "common/logging.h"
21-
#include "common/status.h"
2223
#include "http/http_channel.h"
2324
#include "http/http_request.h"
2425

2526
namespace doris {
2627

2728
// **Note**: If the module_name does not exist in the vlog modules, vlog
2829
// would create corresponding module for it.
29-
int handle_request(HttpRequest* req) {
30+
std::tuple<std::string, int, int> handle_request(HttpRequest* req) {
3031
auto parse_param = [&req](std::string param) {
3132
const auto& value = req->param(param);
3233
if (value.empty()) {
@@ -38,13 +39,16 @@ int handle_request(HttpRequest* req) {
3839
const auto& module = parse_param("module");
3940
const auto& level = parse_param("level");
4041
int new_level = std::stoi(level);
41-
return google::SetVLOGLevel(module.c_str(), new_level);
42+
return std::make_tuple(module, google::SetVLOGLevel(module.c_str(), new_level), new_level);
4243
}
4344

4445
void AdjustLogLevelAction::handle(HttpRequest* req) {
4546
try {
46-
auto old_level = handle_request(req);
47-
auto msg = fmt::format("adjust log level success, origin level is {}", old_level);
47+
auto handle_result = handle_request(req);
48+
auto msg =
49+
fmt::format("adjust vlog of {} from {} to {} succeed", std::get<0>(handle_result),
50+
std::get<1>(handle_result), std::get<2>(handle_result));
51+
LOG(INFO) << msg;
4852
HttpChannel::send_reply(req, msg);
4953
} catch (const std::exception& e) {
5054
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());

be/src/service/internal_service.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -1159,7 +1159,10 @@ void PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcControlle
11591159
LOG(WARNING) << "tablet does not exist, tablet id is " << tablet_id;
11601160
continue;
11611161
}
1162-
tablet_schemas.push_back(res.value()->merged_tablet_schema());
1162+
auto schema = res.value()->merged_tablet_schema();
1163+
if (schema != nullptr) {
1164+
tablet_schemas.push_back(schema);
1165+
}
11631166
}
11641167
if (!tablet_schemas.empty()) {
11651168
// merge all

fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java

-30
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,7 @@
3434
import com.google.common.collect.RangeMap;
3535
import lombok.Data;
3636

37-
import java.io.UnsupportedEncodingException;
38-
import java.net.URLDecoder;
39-
import java.nio.charset.StandardCharsets;
4037
import java.util.ArrayList;
41-
import java.util.Arrays;
4238
import java.util.HashMap;
4339
import java.util.List;
4440
import java.util.Map;
@@ -80,11 +76,6 @@ public TablePartitionValues(List<String> partitionNames, List<List<String>> part
8076
addPartitions(partitionNames, partitionValues, types);
8177
}
8278

83-
public TablePartitionValues(List<String> partitionNames, List<Type> types) {
84-
this();
85-
addPartitions(partitionNames, types);
86-
}
87-
8879
public void addPartitions(List<String> partitionNames, List<List<String>> partitionValues, List<Type> types) {
8980
Preconditions.checkState(partitionNames.size() == partitionValues.size());
9081
List<String> addPartitionNames = new ArrayList<>();
@@ -105,10 +96,6 @@ public void addPartitions(List<String> partitionNames, List<List<String>> partit
10596
addPartitionItems(addPartitionNames, addPartitionItems, types);
10697
}
10798

108-
public void addPartitions(List<String> partitionNames, List<Type> types) {
109-
addPartitions(partitionNames,
110-
partitionNames.stream().map(this::getHivePartitionValues).collect(Collectors.toList()), types);
111-
}
11299

113100
private void addPartitionItems(List<String> partitionNames, List<PartitionItem> partitionItems, List<Type> types) {
114101
Preconditions.checkState(partitionNames.size() == partitionItems.size());
@@ -196,23 +183,6 @@ private ListPartitionItem toListPartitionItem(List<String> partitionValues, List
196183
}
197184
}
198185

199-
private List<String> getHivePartitionValues(String partitionName) {
200-
// Partition name will be in format: nation=cn/city=beijing
201-
// parse it to get values "cn" and "beijing"
202-
return Arrays.stream(partitionName.split("/")).map(part -> {
203-
String[] kv = part.split("=");
204-
Preconditions.checkState(kv.length == 2, partitionName);
205-
String partitionValue;
206-
try {
207-
// hive partition value maybe contains special characters like '=' and '/'
208-
partitionValue = URLDecoder.decode(kv[1], StandardCharsets.UTF_8.name());
209-
} catch (UnsupportedEncodingException e) {
210-
// It should not be here
211-
throw new RuntimeException(e);
212-
}
213-
return partitionValue;
214-
}).collect(Collectors.toList());
215-
}
216186

217187
@Data
218188
public static class TablePartitionKey {

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ public Long getValue() {
244244
}
245245

246246
private HivePartitionValues loadPartitionValues(PartitionValueCacheKey key) {
247-
// partition name format: nation=cn/city=beijing
247+
// partition name format: nation=cn/city=beijing,`listPartitionNames` returned string is the encoded string.
248248
List<String> partitionNames = catalog.getClient().listPartitionNames(key.dbName, key.tblName);
249249
if (LOG.isDebugEnabled()) {
250250
LOG.debug("load #{} partitions for {} in catalog {}", partitionNames.size(), key, catalog.getName());
@@ -281,11 +281,10 @@ private HivePartitionValues loadPartitionValues(PartitionValueCacheKey key) {
281281
public ListPartitionItem toListPartitionItem(String partitionName, List<Type> types) {
282282
// Partition name will be in format: nation=cn/city=beijing
283283
// parse it to get values "cn" and "beijing"
284-
String[] parts = partitionName.split("/");
285-
Preconditions.checkState(parts.length == types.size(), partitionName + " vs. " + types);
284+
List<String> partitionValues = HiveUtil.toPartitionValues(partitionName);
285+
Preconditions.checkState(partitionValues.size() == types.size(), partitionName + " vs. " + types);
286286
List<PartitionValue> values = Lists.newArrayListWithExpectedSize(types.size());
287-
for (String part : parts) {
288-
String partitionValue = HiveUtil.getHivePartitionValue(part);
287+
for (String partitionValue : partitionValues) {
289288
values.add(new PartitionValue(partitionValue, HIVE_DEFAULT_PARTITION.equals(partitionValue)));
290289
}
291290
try {
@@ -325,9 +324,9 @@ private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<? extends
325324
StringBuilder sb = new StringBuilder();
326325
Preconditions.checkState(key.getValues().size() == partitionColumns.size());
327326
for (int i = 0; i < partitionColumns.size(); i++) {
328-
sb.append(partitionColumns.get(i).getName());
327+
// Partition name and value may contain special character, like / and so on. Need to encode.
328+
sb.append(FileUtils.escapePathName(partitionColumns.get(i).getName()));
329329
sb.append("=");
330-
// Partition value may contain special character, like / and so on. Need to encode.
331330
sb.append(FileUtils.escapePathName(key.getValues().get(i)));
332331
sb.append("/");
333332
}

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java

+17-12
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@
4949
import org.apache.hadoop.mapred.TextInputFormat;
5050
import org.apache.hadoop.util.ReflectionUtils;
5151

52-
import java.io.UnsupportedEncodingException;
53-
import java.net.URLDecoder;
54-
import java.nio.charset.StandardCharsets;
5552
import java.util.ArrayList;
5653
import java.util.HashMap;
5754
import java.util.HashSet;
@@ -123,16 +120,22 @@ public static boolean isSplittable(RemoteFileSystem remoteFileSystem, String inp
123120
return HMSExternalTable.SUPPORTED_HIVE_FILE_FORMATS.contains(inputFormat);
124121
}
125122

126-
public static String getHivePartitionValue(String part) {
127-
String[] kv = part.split("=");
128-
Preconditions.checkState(kv.length == 2, String.format("Malformed partition name %s", part));
129-
try {
130-
// hive partition value maybe contains special characters like '=' and '/'
131-
return URLDecoder.decode(kv[1], StandardCharsets.UTF_8.name());
132-
} catch (UnsupportedEncodingException e) {
133-
// It should not be here
134-
throw new RuntimeException(e);
123+
// "c1=a/c2=b/c3=c" ---> List(["c1","a"], ["c2","b"], ["c3","c"])
124+
// Similar to the `toPartitionValues` method, except that it adds the partition column name.
125+
public static List<String[]> toPartitionColNameAndValues(String partitionName) {
126+
127+
String[] parts = partitionName.split("/");
128+
List<String[]> result = new ArrayList<>(parts.length);
129+
for (String part : parts) {
130+
String[] kv = part.split("=");
131+
Preconditions.checkState(kv.length == 2, String.format("Malformed partition name %s", part));
132+
133+
result.add(new String[] {
134+
FileUtils.unescapePathName(kv[0]),
135+
FileUtils.unescapePathName(kv[1])
136+
});
135137
}
138+
return result;
136139
}
137140

138141
// "c1=a/c2=b/c3=c" ---> List("a","b","c")
@@ -151,6 +154,8 @@ public static List<String> toPartitionValues(String partitionName) {
151154
if (start > partitionName.length()) {
152155
break;
153156
}
157+
//Ref: common/src/java/org/apache/hadoop/hive/common/FileUtils.java
158+
//makePartName(List<String> partCols, List<String> vals,String defaultStr)
154159
resultBuilder.add(FileUtils.unescapePathName(partitionName.substring(start, end)));
155160
start = end + 1;
156161
}

fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java

+5
Original file line numberDiff line numberDiff line change
@@ -1933,6 +1933,11 @@ private void handleShowHMSTablePartitions(ShowPartitionsStmt showStmt) throws An
19331933
Map<String, Expr> filterMap = showStmt.getFilterMap();
19341934
List<OrderByPair> orderByPairs = showStmt.getOrderByPairs();
19351935

1936+
// catalog.getClient().listPartitionNames() returned string is the encoded string.
1937+
// example: insert into tmp partition(pt="1=3/3") values( xxx );
1938+
// show partitions from tmp: pt=1%3D3%2F3
1939+
// Need to consider whether to call `HiveUtil.toPartitionColNameAndValues` method
1940+
19361941
if (limit != null && limit.hasLimit() && limit.getOffset() == 0
19371942
&& (orderByPairs == null || !orderByPairs.get(0).isDesc())) {
19381943
// hmsClient returns unordered partition list, hence if offset > 0 cannot pass limit

fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java

+1
Original file line numberDiff line numberDiff line change
@@ -1912,6 +1912,7 @@ public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields,
19121912
: new ShortCircuitQueryContext(planner, (Queriable) parsedStmt);
19131913
coordBase = new PointQueryExecutor(shortCircuitQueryContext,
19141914
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
1915+
context.getState().setIsQuery(true);
19151916
} else if (planner instanceof NereidsPlanner && ((NereidsPlanner) planner).getDistributedPlans() != null) {
19161917
coord = new NereidsCoordinator(context, analyzer,
19171918
planner, context.getStatsErrorEstimator(),

fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,11 @@ private void getPartitionColumnStats() {
104104
for (String names : partitionNames) {
105105
// names is like "date=20230101" for one level partition
106106
// and like "date=20230101/hour=12" for two level partition
107-
String[] parts = names.split("/");
108-
for (String part : parts) {
109-
if (part.startsWith(col.getName())) {
110-
String value = HiveUtil.getHivePartitionValue(part);
107+
List<String[]> parts = HiveUtil.toPartitionColNameAndValues(names);
108+
for (String[] part : parts) {
109+
String colName = part[0];
110+
String value = part[1];
111+
if (colName != null && colName.equals(col.getName())) {
111112
// HIVE_DEFAULT_PARTITION hive partition value when the partition name is not specified.
112113
if (value == null || value.isEmpty() || value.equals(HiveMetaStoreCache.HIVE_DEFAULT_PARTITION)) {
113114
numNulls += 1;

0 commit comments

Comments
 (0)