From b0e42954ae9fdb83113f394f677f83f9ae134cfb Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 12 Nov 2024 17:21:02 +0800 Subject: [PATCH 1/5] add auto-increment debug point and logs --- be/src/vec/sink/autoinc_buffer.cpp | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/be/src/vec/sink/autoinc_buffer.cpp b/be/src/vec/sink/autoinc_buffer.cpp index c30d1245e2caa3..87057ccea9e89a 100644 --- a/be/src/vec/sink/autoinc_buffer.cpp +++ b/be/src/vec/sink/autoinc_buffer.cpp @@ -26,6 +26,7 @@ #include "common/status.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" +#include "util/debug_points.h" #include "util/runtime_profile.h" #include "util/thrift_rpc_helper.h" @@ -45,9 +46,23 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) { } Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { + LOG_INFO( + "[AutoIncIDBuffer::_fetch_ids_from_fe] begin to fetch auto-increment values from fe, " + "db_id={}, table_id={}, column_id={}, length={}", + _db_id, _table_id, _column_id, length); constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3; _rpc_status = Status::OK(); - TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; + + DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", { + _rpc_status = Status::InternalError("injected error"); + LOG_WARNING( + "AutoIncIDBuffer::_fetch_ids_from_fe.failed, " + "db_id={}, table_id={}, column_id={}, length={}", + _db_id, _table_id, _column_id, length); + return _rpc_status; + }); + + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) { TAutoIncrementRangeRequest request; TAutoIncrementRangeResult result; @@ -154,10 +169,19 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) { RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() { auto&& res = _fetch_ids_from_fe(length); if (!res.has_value()) [[unlikely]] { + auto&& err = res.error(); + LOG_WARNING( + "[AutoIncIDBuffer::_launch_async_fetch_task] failed to fetch auto-increment " + "values from fe, status={}", + err); _is_fetching = false; return; } int64_t start = res.value(); + LOG_INFO( + "[AutoIncIDBuffer::_launch_async_fetch_task] successfully fetch auto-increment " + "values from fe, start={}, length={}", + start, length); { std::lock_guard lock {_latch}; _buffers.emplace_back(start, length); From 146d83a9d21ce33c51072b1c6746fde76eb39de4 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 12 Nov 2024 18:24:42 +0800 Subject: [PATCH 2/5] add reproduce case --- .../test_auto_inc_fetch_fail.out | 7 +++ .../test_auto_inc_fetch_fail.groovy | 62 +++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out create mode 100644 regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy diff --git a/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out b/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out new file mode 100644 index 00000000000000..a7d1ce351d29aa --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 + +-- !sql -- +4 + diff --git a/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy b/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy new file mode 100644 index 00000000000000..9c03bfb3c25e56 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean + +suite("test_auto_inc_fetch_fail", "nonConcurrent") { + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + def table1 = "test_auto_inc_fetch_fail" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k` int, + `c1` int, + `c2` int, + `c3` int, + `id` BIGINT NOT NULL AUTO_INCREMENT(10000), + ) UNIQUE KEY(k) + DISTRIBUTED BY HASH(k) BUCKETS 1 + PROPERTIES ("replication_num" = "1"); """ + + GetDebugPoint().enableDebugPointForAllBEs("AutoIncIDBuffer::_fetch_ids_from_fe.failed") + + try { + sql """insert into ${table1}(k,c1,c2,c3) values(1,1,1,1),(2,2,2,2),(3,3,3,3),(4,4,4,4); """ + } catch (Exception e) { + logger.info("error : ${e}") + } + qt_sql "select count(*) from ${table1};" + + GetDebugPoint().clearDebugPointsForAllBEs() + + Thread.sleep(1000) + + sql """insert into ${table1}(k,c1,c2,c3) values(1,1,1,1),(2,2,2,2),(3,3,3,3),(4,4,4,4); """ + qt_sql "select count(*) from ${table1} where id < 10000;" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } +} From 49478da684324403fe35e5bc0f3470eef41582e9 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 12 Nov 2024 18:38:08 +0800 Subject: [PATCH 3/5] fix --- be/src/vec/sink/autoinc_buffer.cpp | 35 ++++++++----------- .../test_auto_inc_fetch_fail.out | 2 +- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/be/src/vec/sink/autoinc_buffer.cpp b/be/src/vec/sink/autoinc_buffer.cpp index 87057ccea9e89a..90895792092cd8 100644 --- a/be/src/vec/sink/autoinc_buffer.cpp +++ b/be/src/vec/sink/autoinc_buffer.cpp @@ -52,18 +52,12 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { _db_id, _table_id, _column_id, length); constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3; _rpc_status = Status::OK(); - - DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", { - _rpc_status = Status::InternalError("injected error"); - LOG_WARNING( - "AutoIncIDBuffer::_fetch_ids_from_fe.failed, " - "db_id={}, table_id={}, column_id={}, length={}", - _db_id, _table_id, _column_id, length); - return _rpc_status; - }); - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) { + DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", { + _rpc_status = Status::InternalError("injected error"); + break; + }); TAutoIncrementRangeRequest request; TAutoIncrementRangeResult result; request.__set_db_id(_db_id); @@ -83,8 +77,9 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { if (_rpc_status.is()) { LOG_WARNING( - "Failed to fetch auto-incremnt range, requested to non-master FE@{}:{}, change " - "to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, column_id={}", + "Failed to fetch auto-increment range, requested to non-master FE@{}:{}, " + "change to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, " + "column_id={}", master_addr.hostname, master_addr.port, result.master_address.hostname, result.master_address.port, retry_times, _db_id, _table_id, _column_id); master_addr = result.master_address; @@ -94,7 +89,7 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { if (!_rpc_status.ok()) { LOG_WARNING( - "Failed to fetch auto-incremnt range, encounter rpc failure. " + "Failed to fetch auto-increment range, encounter rpc failure. " "errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}", _rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id); std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -102,7 +97,7 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { } if (result.length != length) [[unlikely]] { auto msg = fmt::format( - "Failed to fetch auto-incremnt range, request length={}, but get " + "Failed to fetch auto-increment range, request length={}, but get " "result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}", length, result.length, retry_times, _db_id, _table_id, _column_id); LOG(WARNING) << msg; @@ -112,14 +107,14 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { } LOG_INFO( - "get auto-incremnt range from FE@{}:{}, start={}, length={}, elapsed={}ms, " + "get auto-increment range from FE@{}:{}, start={}, length={}, elapsed={}ms, " "retry_time={}, db_id={}, table_id={}, column_id={}", master_addr.hostname, master_addr.port, result.start, result.length, get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id); return result.start; } CHECK(!_rpc_status.ok()); - return _rpc_status; + return ResultError(_rpc_status); } void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers( @@ -172,16 +167,16 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) { auto&& err = res.error(); LOG_WARNING( "[AutoIncIDBuffer::_launch_async_fetch_task] failed to fetch auto-increment " - "values from fe, status={}", - err); + "values from fe, db_id={}, table_id={}, column_id={}, status={}", + _db_id, _table_id, _column_id, err); _is_fetching = false; return; } int64_t start = res.value(); LOG_INFO( "[AutoIncIDBuffer::_launch_async_fetch_task] successfully fetch auto-increment " - "values from fe, start={}, length={}", - start, length); + "values from fe, db_id={}, table_id={}, column_id={}, start={}, length={}", + _db_id, _table_id, _column_id, start, length); { std::lock_guard lock {_latch}; _buffers.emplace_back(start, length); diff --git a/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out b/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out index a7d1ce351d29aa..512f31fa655eb3 100644 --- a/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out +++ b/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out @@ -3,5 +3,5 @@ 0 -- !sql -- -4 +0 From f1b9ff0ed6a51a4dc2b706cffce3bcfd59d765e1 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 12 Nov 2024 18:46:12 +0800 Subject: [PATCH 4/5] fix --- be/src/vec/sink/autoinc_buffer.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/vec/sink/autoinc_buffer.cpp b/be/src/vec/sink/autoinc_buffer.cpp index 90895792092cd8..00b58fc6bddd24 100644 --- a/be/src/vec/sink/autoinc_buffer.cpp +++ b/be/src/vec/sink/autoinc_buffer.cpp @@ -52,7 +52,7 @@ Result AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { _db_id, _table_id, _column_id, length); constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3; _rpc_status = Status::OK(); - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) { DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", { _rpc_status = Status::InternalError("injected error"); @@ -187,4 +187,4 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) { return Status::OK(); } -} // namespace doris::vectorized +} // namespace doris::vectorized \ No newline at end of file From ca8de90d6f76cfa62881d94729f1eb90bf4f14d1 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 12 Nov 2024 18:48:53 +0800 Subject: [PATCH 5/5] update --- .../data/fault_injection_p0/test_auto_inc_fetch_fail.out | 3 +++ .../suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy | 1 + 2 files changed, 4 insertions(+) diff --git a/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out b/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out index 512f31fa655eb3..453e378f9c43f6 100644 --- a/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out +++ b/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out @@ -2,6 +2,9 @@ -- !sql -- 0 +-- !sql -- +4 + -- !sql -- 0 diff --git a/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy b/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy index 9c03bfb3c25e56..e9bb6ae9a3c6ca 100644 --- a/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy +++ b/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy @@ -50,6 +50,7 @@ suite("test_auto_inc_fetch_fail", "nonConcurrent") { Thread.sleep(1000) sql """insert into ${table1}(k,c1,c2,c3) values(1,1,1,1),(2,2,2,2),(3,3,3,3),(4,4,4,4); """ + qt_sql "select count(*) from ${table1};" qt_sql "select count(*) from ${table1} where id < 10000;" } catch(Exception e) {