Skip to content

Commit 3adbab1

Browse files
zhiqiang-hhhhstephen
authored and
stephen
committed
[opt](query cancel) optimization for query cancel apache#28778
1 parent 60f85fd commit 3adbab1

File tree

6 files changed

+36
-25
lines changed

6 files changed

+36
-25
lines changed

be/src/agent/heartbeat_server.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,11 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
220220

221221
if (master_info.__isset.frontend_infos) {
222222
ExecEnv::GetInstance()->update_frontends(master_info.frontend_infos);
223+
} else {
224+
LOG_EVERY_N(WARNING, 2) << fmt::format(
225+
"Heartbeat from {}:{} does not have frontend_infos, this may because we are "
226+
"upgrading cluster",
227+
master_info.network_address.hostname, master_info.network_address.port);
223228
}
224229

225230
if (need_report) {

be/src/runtime/exec_env.cpp

+15-22
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
#include "runtime/exec_env.h"
1919

2020
#include <gen_cpp/HeartbeatService_types.h>
21+
#include <glog/logging.h>
2122

2223
#include <mutex>
2324
#include <utility>
2425

2526
#include "common/config.h"
27+
#include "common/logging.h"
2628
#include "olap/olap_define.h"
2729
#include "olap/storage_engine.h"
2830
#include "olap/tablet_manager.h"
@@ -119,34 +121,25 @@ std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_running_frontends() {
119121
const auto now = GetCurrentTimeMicros() / 1000;
120122

121123
for (const auto& pair : _frontends) {
122-
if (pair.second.info.process_uuid != 0) {
123-
if (now - pair.second.last_reveiving_time_ms < expired_duration) {
124+
auto& brpc_addr = pair.first;
125+
auto& fe_info = pair.second;
126+
127+
if (fe_info.info.process_uuid == 0) {
128+
// FE is in an unknown state, regart it as alive. conservative
129+
res[brpc_addr] = fe_info;
130+
} else {
131+
if (now - fe_info.last_reveiving_time_ms < expired_duration) {
124132
// If fe info has just been update in last expired_duration, regard it as running.
125-
res[pair.first] = pair.second;
133+
res[brpc_addr] = fe_info;
126134
} else {
127135
// Fe info has not been udpate for more than expired_duration, regard it as an abnormal.
128136
// Abnormal means this fe can not connect to master, and it is not dropped from cluster.
129137
// or fe do not have master yet.
130-
LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info)
131-
<< " has not update its hb "
132-
<< "for more than " << config::fe_expire_duration_seconds
133-
<< " secs, regard it as abnormal.";
138+
LOG_EVERY_N(WARNING, 50) << fmt::format(
139+
"Frontend {}:{} has not update its hb for more than {} secs, regard it as "
140+
"abnormal",
141+
brpc_addr.hostname, brpc_addr.port, config::fe_expire_duration_seconds);
134142
}
135-
136-
continue;
137-
}
138-
139-
if (pair.second.last_reveiving_time_ms - pair.second.first_receiving_time_ms >
140-
expired_duration) {
141-
// A zero process-uuid that sustains more than 60 seconds(default).
142-
// We will regard this fe as a abnormal frontend.
143-
LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info)
144-
<< " has not update its hb "
145-
<< "for more than " << config::fe_expire_duration_seconds
146-
<< " secs, regard it as abnormal.";
147-
continue;
148-
} else {
149-
res[pair.first] = pair.second;
150143
}
151144
}
152145

be/src/runtime/exec_env.h

+1
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ class ExecEnv {
349349
std::shared_ptr<WalManager> _wal_manager;
350350

351351
std::mutex _frontends_lock;
352+
// ip:brpc_port -> frontend_indo
352353
std::map<TNetworkAddress, FrontendInfo> _frontends;
353354
GroupCommitMgr* _group_commit_mgr = nullptr;
354355

be/src/runtime/fragment_mgr.cpp

+12-1
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,8 @@ void FragmentMgr::cancel_worker() {
11671167
} else {
11681168
for (const auto& q : _query_ctx_map) {
11691169
if (q.second->get_fe_process_uuid() == 0) {
1170+
// zero means this query is from a older version fe or
1171+
// this fe is starting
11701172
continue;
11711173
}
11721174

@@ -1175,7 +1177,16 @@ void FragmentMgr::cancel_worker() {
11751177
if (q.second->get_fe_process_uuid() == itr->second.info.process_uuid ||
11761178
itr->second.info.process_uuid == 0) {
11771179
continue;
1180+
} else {
1181+
LOG_WARNING("Coordinator of query {} restarted, going to cancel it.",
1182+
print_id(q.second->query_id()));
11781183
}
1184+
} else {
1185+
LOG_WARNING(
1186+
"Could not find target coordinator {}:{} of query {}, going to "
1187+
"cancel it.",
1188+
q.second->coord_addr.hostname, q.second->coord_addr.port,
1189+
print_id(q.second->query_id()));
11791190
}
11801191

11811192
// Coorninator of this query has already dead.
@@ -1195,7 +1206,7 @@ void FragmentMgr::cancel_worker() {
11951206

11961207
if (!queries_to_cancel.empty()) {
11971208
LOG(INFO) << "There are " << queries_to_cancel.size()
1198-
<< " queries need to be cancelled, coordinator dead.";
1209+
<< " queries need to be cancelled, coordinator dead or restarted.";
11991210
}
12001211

12011212
for (const auto& qid : queries_to_cancel) {

be/src/runtime/query_context.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,9 @@ class QueryContext {
184184
return _query_options.be_exec_version;
185185
}
186186

187-
[[nodiscard]] int64_t get_fe_process_uuid() const { return _query_options.fe_process_uuid; }
187+
[[nodiscard]] int64_t get_fe_process_uuid() const {
188+
return _query_options.__isset.fe_process_uuid ? _query_options.fe_process_uuid : 0;
189+
}
188190

189191
RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); }
190192

be/src/util/debug_util.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ std::string PrintFrontendInfos(const std::vector<TFrontendInfo>& fe_infos) {
125125
std::string PrintFrontendInfo(const TFrontendInfo& fe_info) {
126126
std::stringstream ss;
127127
fe_info.printTo(ss);
128-
129128
return ss.str();
130129
}
131130

0 commit comments

Comments
 (0)