Skip to content

Commit 46b2839

Browse files
committed
[Enhancement](load) Limit the number of incorrect data drops
1 parent 1f9aa8a commit 46b2839

File tree

6 files changed

+143
-2
lines changed

6 files changed

+143
-2
lines changed

be/src/common/config.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,8 @@ DEFINE_Int32(single_replica_load_download_num_workers, "64");
470470
DEFINE_Int64(load_data_reserve_hours, "4");
471471
// log error log will be removed after this time
472472
DEFINE_mInt64(load_error_log_reserve_hours, "48");
473+
// error log size limit, default 200MB
474+
DEFINE_mInt64(load_error_log_limit_bytes, "209715200");
473475

474476
DEFINE_Int32(brpc_heavy_work_pool_threads, "-1");
475477
DEFINE_Int32(brpc_light_work_pool_threads, "-1");

be/src/common/config.h

+2
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,8 @@ DECLARE_Int32(single_replica_load_download_num_workers);
521521
DECLARE_Int64(load_data_reserve_hours);
522522
// log error log will be removed after this time
523523
DECLARE_mInt64(load_error_log_reserve_hours);
524+
// error log size limit, default 200MB
525+
DECLARE_mInt64(load_error_log_limit_bytes);
524526

525527
// be brpc interface is classified into two categories: light and heavy
526528
// each category has diffrent thread number

be/src/runtime/runtime_state.cpp

+10-2
Original file line numberDiff line numberDiff line change
@@ -414,8 +414,16 @@ Status RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
414414
}
415415
}
416416

417-
if (out.size() > 0) {
418-
(*_error_log_file) << fmt::to_string(out) << std::endl;
417+
size_t error_row_size = out.size();
418+
if (error_row_size > 0) {
419+
if (error_row_size > config::load_error_log_limit_bytes) {
420+
fmt::memory_buffer limit_byte_out;
421+
limit_byte_out.append(out.data(), out.data() + config::load_error_log_limit_bytes);
422+
(*_error_log_file) << fmt::to_string(limit_byte_out) + "error log is too long"
423+
<< std::endl;
424+
} else {
425+
(*_error_log_file) << fmt::to_string(out) << std::endl;
426+
}
419427
}
420428
return Status::OK();
421429
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
1,abc
2+
1,abc
3+
1,abc
4+
1,abc
5+
1,abc
6+
1,abc
7+
1,abc
8+
1,abc
9+
1,abc
10+
1,abc
11+
1,abc
12+
1,abc
13+
1,abc
14+
1,abc
15+
1,abc
16+
1,abc
17+
1,abc
18+
1,abc
19+
1,abc
20+
1,abc
21+
1,abc
22+
1,abc
23+
1,abc
24+
1,abc
25+
1,abc
26+
1,abc
27+
1,abc
28+
1,abc
29+
1,abc
30+
1,abc
31+
1,abc
32+
1,abc
33+
1,abc
34+
1,abc
35+
1,abc
36+
1,abc
37+
1,abc
38+
1,abc
39+
1,abc
40+
1,abc
41+
1,abc
42+
1,abc
43+
1,abc
44+
1,abc
45+
1,abc
46+
1,abc
47+
1,abc
48+
1,abc
49+
1,abc
50+
1,abc
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !sql --
3+
0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_stream_load_err_log_limit", "p0") {
19+
sql "show tables"
20+
21+
def tableName = "test_stream_load_err_log_limit_table"
22+
23+
sql """ DROP TABLE IF EXISTS ${tableName} """
24+
sql """
25+
CREATE TABLE IF NOT EXISTS ${tableName} (
26+
`k1` int NOT NULL,
27+
`k2` varchar(20) NOT NULL
28+
) ENGINE=OLAP
29+
DUPLICATE KEY(`k1`)
30+
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
31+
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
32+
"""
33+
34+
def backendId_to_backendIP = [:]
35+
def backendId_to_backendHttpPort = [:]
36+
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
37+
38+
def set_be_param = { paramName, paramValue ->
39+
for (String id in backendId_to_backendIP.keySet()) {
40+
def beIp = backendId_to_backendIP.get(id)
41+
def bePort = backendId_to_backendHttpPort.get(id)
42+
def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue))
43+
assertTrue(out.contains("OK"))
44+
}
45+
}
46+
47+
try {
48+
set_be_param.call("load_error_log_limit_bytes", "100")
49+
50+
streamLoad {
51+
table "${tableName}"
52+
set 'column_separator', ','
53+
set 'columns', 'k1, k2, k3'
54+
file 'test_stream_load_err_log_limit.csv'
55+
56+
check { result, exception, startTime, endTime ->
57+
if (exception != null) {
58+
throw exception
59+
}
60+
log.info("Stream load result: ${result}".toString())
61+
def json = parseJson(result)
62+
def (code, out, err) = curl("GET", json.ErrorURL)
63+
log.info("error result: " + out)
64+
def checkError = out.contains("error log is too long")
65+
assertTrue(checkError)
66+
log.info("url: " + json.ErrorURL)
67+
}
68+
}
69+
} finally {
70+
set_be_param.call("load_error_log_limit_bytes", "209715200")
71+
}
72+
73+
sql "sync"
74+
qt_sql "select count(*) from ${tableName}"
75+
sql """ DROP TABLE IF EXISTS ${tableName} """
76+
}

0 commit comments

Comments
 (0)