Skip to content

Commit adfbe69

Browse files
branch-3.0: [Fix](auto-increment) Fix duplicate auto-increment column value problem #43774 (#43983)
Cherry-picked from #43774 Co-authored-by: bobhan1 <baohan@selectdb.com>
1 parent 122d1ae commit adfbe69

File tree

3 files changed

+99
-7
lines changed

3 files changed

+99
-7
lines changed

be/src/vec/sink/autoinc_buffer.cpp

+26-7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "common/status.h"
2727
#include "runtime/client_cache.h"
2828
#include "runtime/exec_env.h"
29+
#include "util/debug_points.h"
2930
#include "util/runtime_profile.h"
3031
#include "util/thrift_rpc_helper.h"
3132

@@ -44,10 +45,18 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) {
4445
}
4546

4647
Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
48+
LOG_INFO(
49+
"[AutoIncIDBuffer::_fetch_ids_from_fe] begin to fetch auto-increment values from fe, "
50+
"db_id={}, table_id={}, column_id={}, length={}",
51+
_db_id, _table_id, _column_id, length);
4752
constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
4853
_rpc_status = Status::OK();
4954
TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
5055
for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
56+
DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", {
57+
_rpc_status = Status::InternalError<false>("injected error");
58+
break;
59+
});
5160
TAutoIncrementRangeRequest request;
5261
TAutoIncrementRangeResult result;
5362
request.__set_db_id(_db_id);
@@ -67,8 +76,9 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
6776

6877
if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
6978
LOG_WARNING(
70-
"Failed to fetch auto-incremnt range, requested to non-master FE@{}:{}, change "
71-
"to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, column_id={}",
79+
"Failed to fetch auto-increment range, requested to non-master FE@{}:{}, "
80+
"change to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, "
81+
"column_id={}",
7282
master_addr.hostname, master_addr.port, result.master_address.hostname,
7383
result.master_address.port, retry_times, _db_id, _table_id, _column_id);
7484
master_addr = result.master_address;
@@ -78,15 +88,15 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
7888

7989
if (!_rpc_status.ok()) {
8090
LOG_WARNING(
81-
"Failed to fetch auto-incremnt range, encounter rpc failure. "
91+
"Failed to fetch auto-increment range, encounter rpc failure. "
8292
"errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}",
8393
_rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id);
8494
std::this_thread::sleep_for(std::chrono::milliseconds(10));
8595
continue;
8696
}
8797
if (result.length != length) [[unlikely]] {
8898
auto msg = fmt::format(
89-
"Failed to fetch auto-incremnt range, request length={}, but get "
99+
"Failed to fetch auto-increment range, request length={}, but get "
90100
"result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}",
91101
length, result.length, retry_times, _db_id, _table_id, _column_id);
92102
LOG(WARNING) << msg;
@@ -96,14 +106,14 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
96106
}
97107

98108
LOG_INFO(
99-
"get auto-incremnt range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
109+
"get auto-increment range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
100110
"retry_time={}, db_id={}, table_id={}, column_id={}",
101111
master_addr.hostname, master_addr.port, result.start, result.length,
102112
get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id);
103113
return result.start;
104114
}
105115
CHECK(!_rpc_status.ok());
106-
return _rpc_status;
116+
return ResultError(_rpc_status);
107117
}
108118

109119
void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
@@ -153,10 +163,19 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
153163
RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
154164
auto&& res = _fetch_ids_from_fe(length);
155165
if (!res.has_value()) [[unlikely]] {
166+
auto&& err = res.error();
167+
LOG_WARNING(
168+
"[AutoIncIDBuffer::_launch_async_fetch_task] failed to fetch auto-increment "
169+
"values from fe, db_id={}, table_id={}, column_id={}, status={}",
170+
_db_id, _table_id, _column_id, err);
156171
_is_fetching = false;
157172
return;
158173
}
159174
int64_t start = res.value();
175+
LOG_INFO(
176+
"[AutoIncIDBuffer::_launch_async_fetch_task] successfully fetch auto-increment "
177+
"values from fe, db_id={}, table_id={}, column_id={}, start={}, length={}",
178+
_db_id, _table_id, _column_id, start, length);
160179
{
161180
std::lock_guard<std::mutex> lock {_latch};
162181
_buffers.emplace_back(start, length);
@@ -167,4 +186,4 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
167186
return Status::OK();
168187
}
169188

170-
} // namespace doris::vectorized
189+
} // namespace doris::vectorized
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !sql --
3+
0
4+
5+
-- !sql --
6+
4
7+
8+
-- !sql --
9+
0
10+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.junit.Assert
19+
import java.util.concurrent.TimeUnit
20+
import java.util.concurrent.atomic.AtomicBoolean
21+
22+
suite("test_auto_inc_fetch_fail", "nonConcurrent") {
23+
24+
try {
25+
GetDebugPoint().clearDebugPointsForAllFEs()
26+
GetDebugPoint().clearDebugPointsForAllBEs()
27+
def table1 = "test_auto_inc_fetch_fail"
28+
sql "DROP TABLE IF EXISTS ${table1} FORCE;"
29+
sql """ CREATE TABLE IF NOT EXISTS ${table1} (
30+
`k` int,
31+
`c1` int,
32+
`c2` int,
33+
`c3` int,
34+
`id` BIGINT NOT NULL AUTO_INCREMENT(10000),
35+
) UNIQUE KEY(k)
36+
DISTRIBUTED BY HASH(k) BUCKETS 1
37+
PROPERTIES ("replication_num" = "1"); """
38+
39+
GetDebugPoint().enableDebugPointForAllBEs("AutoIncIDBuffer::_fetch_ids_from_fe.failed")
40+
41+
try {
42+
sql """insert into ${table1}(k,c1,c2,c3) values(1,1,1,1),(2,2,2,2),(3,3,3,3),(4,4,4,4); """
43+
} catch (Exception e) {
44+
logger.info("error : ${e}")
45+
}
46+
qt_sql "select count(*) from ${table1};"
47+
48+
GetDebugPoint().clearDebugPointsForAllBEs()
49+
50+
Thread.sleep(1000)
51+
52+
sql """insert into ${table1}(k,c1,c2,c3) values(1,1,1,1),(2,2,2,2),(3,3,3,3),(4,4,4,4); """
53+
qt_sql "select count(*) from ${table1};"
54+
qt_sql "select count(*) from ${table1} where id < 10000;"
55+
56+
} catch(Exception e) {
57+
logger.info(e.getMessage())
58+
throw e
59+
} finally {
60+
GetDebugPoint().clearDebugPointsForAllFEs()
61+
GetDebugPoint().clearDebugPointsForAllBEs()
62+
}
63+
}

0 commit comments

Comments
 (0)