diff --git a/contrib/kvproto b/contrib/kvproto index 26e28e6a281..9ccc6beaf0a 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 26e28e6a281abb927f91ef992eb8f93b39698ffa +Subproject commit 9ccc6beaf0aa9b0a4adad43b497348898ba653cf diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 8accc59b155..e6c86911422 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -105,6 +105,7 @@ namespace DB M(tiflash_schema_apply_duration_seconds, "Bucketed histogram of ddl apply duration", Histogram, \ F(type_ddl_apply_duration, {{"req", "ddl_apply_duration"}}, ExpBuckets{0.001, 2, 20})) \ M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \ + M(tiflash_stale_read_count, "Total number of stale read", Counter) \ M(tiflash_raft_read_index_duration_seconds, "Bucketed histogram of raft read index duration", Histogram, \ F(type_raft_read_index_duration, {{"type", "tmt_raft_read_index_duration"}}, ExpBuckets{0.001, 2, 20})) \ M(tiflash_raft_wait_index_duration_seconds, "Bucketed histogram of raft wait index duration", Histogram, \ diff --git a/dbms/src/Debug/DAGProperties.h b/dbms/src/Debug/DAGProperties.h index bcb4170c9ac..c4bfcc9b863 100644 --- a/dbms/src/Debug/DAGProperties.h +++ b/dbms/src/Debug/DAGProperties.h @@ -29,6 +29,11 @@ struct DAGProperties bool use_broadcast_join = false; Int32 mpp_partition_num = 1; Timestamp start_ts = DEFAULT_MAX_READ_TSO; + UInt64 query_ts = 0; + UInt64 server_id = 1; + UInt64 local_query_id = 1; + Int64 task_id = 1; + Int32 mpp_timeout = 10; }; } // namespace DB \ No newline at end of file diff --git a/dbms/src/Debug/MockComputeServerManager.cpp b/dbms/src/Debug/MockComputeServerManager.cpp index 41bf549fc61..839cd794b73 100644 --- a/dbms/src/Debug/MockComputeServerManager.cpp +++ b/dbms/src/Debug/MockComputeServerManager.cpp @@ -115,11 +115,14 @@ void MockComputeServerManager::addServer(size_t partition_id, std::unique_ptrset_start_ts(start_ts); + meta->set_query_ts(query_id.query_ts); + meta->set_local_query_id(query_id.local_query_id); + meta->set_server_id(query_id.server_id); + meta->set_start_ts(query_id.start_ts); mpp::CancelTaskResponse response; for (const auto & server : server_map) server.second->flashService()->cancelMPPTaskForTest(&req, &response); diff --git a/dbms/src/Debug/MockComputeServerManager.h b/dbms/src/Debug/MockComputeServerManager.h index 7e05224572b..6642388659f 100644 --- a/dbms/src/Debug/MockComputeServerManager.h +++ b/dbms/src/Debug/MockComputeServerManager.h @@ -22,7 +22,6 @@ namespace DB::tests { - /** Hold Mock Compute Server to manage the lifetime of them. * Maintains Mock Compute Server info. */ @@ -49,7 +48,7 @@ class MockComputeServerManager : public ext::Singleton void resetMockMPPServerInfo(size_t partition_num); - void cancelQuery(size_t start_ts); + void cancelQuery(const MPPQueryId & query_id); static String queryInfo(); diff --git a/dbms/src/Debug/MockExecutor/AstToPB.h b/dbms/src/Debug/MockExecutor/AstToPB.h index 518d04f89b9..c1560c90355 100644 --- a/dbms/src/Debug/MockExecutor/AstToPB.h +++ b/dbms/src/Debug/MockExecutor/AstToPB.h @@ -57,6 +57,9 @@ using MPPCtxPtr = std::shared_ptr; struct MPPInfo { Timestamp start_ts; + UInt64 query_ts; + UInt64 server_id; + UInt64 local_query_id; Int64 partition_id; Int64 task_id; const std::vector sender_target_task_ids; @@ -64,11 +67,17 @@ struct MPPInfo MPPInfo( Timestamp start_ts_, + UInt64 query_ts_, + UInt64 server_id_, + UInt64 local_query_id_, Int64 partition_id_, Int64 task_id_, const std::vector & sender_target_task_ids_, const std::unordered_map> & receiver_source_task_ids_map_) : start_ts(start_ts_) + , query_ts(query_ts_) + , server_id(server_id_) + , local_query_id(local_query_id_) , partition_id(partition_id_) , task_id(task_id_) , sender_target_task_ids(sender_target_task_ids_) diff --git a/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp b/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp index e7f0491b74f..706624856c0 100644 --- a/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp +++ b/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp @@ -42,6 +42,9 @@ bool ExchangeReceiverBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int3 { mpp::TaskMeta meta; meta.set_start_ts(mpp_info.start_ts); + meta.set_query_ts(mpp_info.query_ts); + meta.set_server_id(mpp_info.server_id); + meta.set_local_query_id(mpp_info.local_query_id); meta.set_task_id(it->second[i]); meta.set_partition_id(i); auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i].addr : Debug::LOCAL_HOST; diff --git a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp index 065d983cb60..aaba39868e1 100644 --- a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp +++ b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp @@ -41,6 +41,9 @@ bool ExchangeSenderBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_ { mpp::TaskMeta meta; meta.set_start_ts(mpp_info.start_ts); + meta.set_query_ts(mpp_info.query_ts); + meta.set_server_id(mpp_info.server_id); + meta.set_local_query_id(mpp_info.local_query_id); meta.set_task_id(task_id); meta.set_partition_id(i); auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i++].addr : Debug::LOCAL_HOST; diff --git a/dbms/src/Debug/dbgFuncMisc.cpp b/dbms/src/Debug/dbgFuncMisc.cpp index 50b6ebffb52..cbdf4629cb5 100644 --- a/dbms/src/Debug/dbgFuncMisc.cpp +++ b/dbms/src/Debug/dbgFuncMisc.cpp @@ -29,8 +29,8 @@ inline size_t getReadTSOForLog(const String & line) { std::regex rx(R"((0|[1-9][0-9]*))"); std::smatch m; - // Rely on that MPP task prefix "MPP" - auto pos = line.find("query:"); + // Rely on that MPP task prefix "MPP,task_id:42578433>" + auto pos = line.find(", start_ts:"); if (pos != std::string::npos && regex_search(line.cbegin() + pos, line.cend(), m, rx)) { return std::stoul(m[1]); diff --git a/dbms/src/Debug/dbgQueryCompiler.h b/dbms/src/Debug/dbgQueryCompiler.h index 2ab86df6dad..748b14d41e8 100644 --- a/dbms/src/Debug/dbgQueryCompiler.h +++ b/dbms/src/Debug/dbgQueryCompiler.h @@ -132,6 +132,9 @@ struct QueryFragment { MPPInfo mpp_info( properties.start_ts, + properties.query_ts, + properties.server_id, + properties.local_query_id, partition_id, task_ids[partition_id], sender_target_task_ids, @@ -141,7 +144,7 @@ struct QueryFragment } else { - MPPInfo mpp_info(properties.start_ts, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {}); + MPPInfo mpp_info(properties.start_ts, properties.query_ts, properties.server_id, properties.local_query_id, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {}); ret.push_back(toQueryTask(properties, mpp_info, context)); } return ret; diff --git a/dbms/src/Debug/dbgQueryExecutor.cpp b/dbms/src/Debug/dbgQueryExecutor.cpp index 38656747664..c5c0dbaf12c 100644 --- a/dbms/src/Debug/dbgQueryExecutor.cpp +++ b/dbms/src/Debug/dbgQueryExecutor.cpp @@ -42,6 +42,9 @@ BlockInputStreamPtr constructExchangeReceiverStream(Context & context, tipb::Exc mpp::TaskMeta root_tm; root_tm.set_start_ts(properties.start_ts); + root_tm.set_query_ts(properties.query_ts); + root_tm.set_local_query_id(properties.local_query_id); + root_tm.set_server_id(properties.server_id); root_tm.set_address(root_addr); root_tm.set_task_id(-1); root_tm.set_partition_id(-1); @@ -71,6 +74,9 @@ BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProp { mpp::TaskMeta tm; tm.set_start_ts(properties.start_ts); + tm.set_query_ts(properties.query_ts); + tm.set_local_query_id(properties.local_query_id); + tm.set_server_id(properties.server_id); tm.set_address(Debug::LOCAL_HOST); tm.set_task_id(root_task_id); tm.set_partition_id(-1); @@ -84,6 +90,9 @@ void prepareExchangeReceiverMetaWithMultipleContext(tipb::ExchangeReceiver & tip { mpp::TaskMeta tm; tm.set_start_ts(properties.start_ts); + tm.set_query_ts(properties.query_ts); + tm.set_local_query_id(properties.local_query_id); + tm.set_server_id(properties.server_id); tm.set_address(addr); tm.set_task_id(task_id); tm.set_partition_id(-1); @@ -109,6 +118,9 @@ void prepareDispatchTaskRequest(QueryTask & task, std::shared_ptrmutable_meta(); tm->set_start_ts(properties.start_ts); + tm->set_query_ts(properties.query_ts); + tm->set_local_query_id(properties.local_query_id); + tm->set_server_id(properties.server_id); tm->set_partition_id(task.partition_id); tm->set_address(addr); tm->set_task_id(task.task_id); @@ -128,6 +140,9 @@ void prepareDispatchTaskRequestWithMultipleContext(QueryTask & task, std::shared } auto * tm = req->mutable_meta(); tm->set_start_ts(properties.start_ts); + tm->set_query_ts(properties.query_ts); + tm->set_local_query_id(properties.local_query_id); + tm->set_server_id(properties.server_id); tm->set_partition_id(task.partition_id); tm->set_address(addr); tm->set_task_id(task.task_id); diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 6f0b46c45b1..ac17e3e2990 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -158,7 +158,7 @@ class DAGContext , flags(dag_request->flags()) , sql_mode(dag_request->sql_mode()) , mpp_task_meta(meta_) - , mpp_task_id(mpp_task_meta.start_ts(), mpp_task_meta.task_id()) + , mpp_task_id(mpp_task_meta) , max_recorded_error_count(getMaxErrorCount(*dag_request)) , warnings(max_recorded_error_count) , warning_count(0) diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index abd921a1119..57aa23a7e3c 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -365,7 +365,7 @@ grpc::Status FlashService::CancelMPPTask( auto & tmt_context = context->getTMTContext(); auto task_manager = tmt_context.getMPPTaskManager(); - task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from TiDB", AbortType::ONCANCELLATION); + task_manager->abortMPPQuery(MPPQueryId(request->meta()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION); return grpc::Status::OK; } @@ -407,7 +407,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(const ::mpp::CancelTaskRequest } auto & tmt_context = context->getTMTContext(); auto task_manager = tmt_context.getMPPTaskManager(); - task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from GTest", AbortType::ONCANCELLATION); + task_manager->abortMPPQuery(MPPQueryId(request->meta()), "Receive cancel request from GTest", AbortType::ONCANCELLATION); return grpc::Status::OK; } diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 45a180ff421..2b73629df7d 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -54,7 +54,7 @@ extern const char force_no_local_region_for_mpp_task[]; MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_) : meta(meta_) - , id(meta.start_ts(), meta.task_id()) + , id(meta) , context(context_) , manager(context_->getTMTContext().getMPPTaskManager().get()) , schedule_entry(manager, id) @@ -137,7 +137,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request) LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async); if (status != INITIALIZING) throw Exception(fmt::format("The tunnel {} can not be registered, because the task is not in initializing state", tunnel->id())); - tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel); + tunnel_set_local->registerTunnel(MPPTaskId(task_meta), tunnel); if (!dag_context->isRootMPPTask()) { FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task); @@ -202,7 +202,7 @@ std::pair MPPTask::getTunnel(const ::mpp::EstablishMPPConn return {nullptr, err_msg}; } - MPPTaskId receiver_id{request->receiver_meta().start_ts(), request->receiver_meta().task_id()}; + MPPTaskId receiver_id(request->receiver_meta()); RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set"); auto tunnel_ptr = tunnel_set->getTunnelByReceiverTaskId(receiver_id); if (tunnel_ptr == nullptr) @@ -450,7 +450,7 @@ void MPPTask::runImpl() void MPPTask::handleError(const String & error_msg) { auto updated_msg = fmt::format("From {}: {}", id.toString(), error_msg); - manager->abortMPPQuery(id.start_ts, updated_msg, AbortType::ONERROR); + manager->abortMPPQuery(id.query_id, updated_msg, AbortType::ONERROR); if (!registered) // if the task is not registered, need to cancel it explicitly abort(error_msg, AbortType::ONERROR); diff --git a/dbms/src/Flash/Mpp/MPPTaskId.cpp b/dbms/src/Flash/Mpp/MPPTaskId.cpp index bb025e6c24d..9f4a9ce3dfc 100644 --- a/dbms/src/Flash/Mpp/MPPTaskId.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskId.cpp @@ -13,19 +13,91 @@ // limitations under the License. #include +#include #include namespace DB { +bool isOldVersion(const MPPQueryId & mpp_query_id) +{ + return mpp_query_id.query_ts == 0 && mpp_query_id.local_query_id == 0 && mpp_query_id.server_id == 0; +} + + +bool MPPQueryId::operator<(const MPPQueryId & mpp_query_id) const +{ + // compare with MPP query generated by TiDB that version less than v6.6 + bool left_old_version = isOldVersion(*this); + bool right_old_version = isOldVersion(mpp_query_id); + if (unlikely(left_old_version && right_old_version)) + { + return start_ts < mpp_query_id.start_ts; + } + if (unlikely(left_old_version)) + { + return true; + } + if (unlikely(right_old_version)) + { + return false; + } + // compare with MPP query generated by TiDB that version after v6.6 + if (query_ts != mpp_query_id.query_ts) + { + return query_ts < mpp_query_id.query_ts; + } + if (server_id == mpp_query_id.server_id) + { + return local_query_id < mpp_query_id.local_query_id; + } + // now we can't compare reasonably, just choose one randomly by hash. + auto lhash = MPPQueryIdHash()(*this); + auto rhash = MPPQueryIdHash()(mpp_query_id); + if (lhash != rhash) + { + return lhash < rhash; + } + // hash values are same, just compare the rest fields. + if (local_query_id != mpp_query_id.local_query_id) + { + return local_query_id < mpp_query_id.local_query_id; + } + return server_id < mpp_query_id.server_id; +} +bool MPPQueryId::operator==(const MPPQueryId & rid) const +{ + return query_ts == rid.query_ts && local_query_id == rid.local_query_id && server_id == rid.server_id && start_ts == rid.start_ts; +} +bool MPPQueryId::operator!=(const MPPQueryId & rid) const +{ + return !(*this == rid); +} +bool MPPQueryId::operator<=(const MPPQueryId & rid) const +{ + return *this < rid || *this == rid; +} + +size_t MPPQueryIdHash::operator()(MPPQueryId const & mpp_query_id) const noexcept +{ + if (unlikely(isOldVersion(mpp_query_id))) + { + return std::hash()(mpp_query_id.start_ts); + } + return std::hash()(mpp_query_id.query_ts) ^ std::hash()(mpp_query_id.local_query_id) ^ std::hash()(mpp_query_id.server_id); +} + String MPPTaskId::toString() const { - return isUnknown() ? "MPP" : fmt::format("MPP", start_ts, task_id); + return isUnknown() ? "MPP" : fmt::format("MPP", query_id.toString(), task_id); } const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{}; +constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); +const MPPQueryId MPPTaskId::Max_Query_Id = MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64); + bool operator==(const MPPTaskId & lid, const MPPTaskId & rid) { - return lid.start_ts == rid.start_ts && lid.task_id == rid.task_id; + return lid.query_id == rid.query_id && lid.task_id == rid.task_id; } } // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTaskId.h b/dbms/src/Flash/Mpp/MPPTaskId.h index f1bee057206..eb81425c50c 100644 --- a/dbms/src/Flash/Mpp/MPPTaskId.h +++ b/dbms/src/Flash/Mpp/MPPTaskId.h @@ -15,28 +15,70 @@ #pragma once #include +#include +#include namespace DB { +// global unique MPP query id. +struct MPPQueryId +{ + UInt64 query_ts; + UInt64 local_query_id; + UInt64 server_id; + UInt64 start_ts; + MPPQueryId(UInt64 query_ts, UInt64 local_query_id, UInt64 server_id, UInt64 start_ts) + : query_ts(query_ts) + , local_query_id(local_query_id) + , server_id(server_id) + , start_ts(start_ts) + {} + explicit MPPQueryId(const mpp::TaskMeta & task_meta) + : query_ts(task_meta.query_ts()) + , local_query_id(task_meta.local_query_id()) + , server_id(task_meta.server_id()) + , start_ts(task_meta.start_ts()) + {} + bool operator<(const MPPQueryId & mpp_query_id) const; + bool operator==(const MPPQueryId & rid) const; + bool operator!=(const MPPQueryId & rid) const; + bool operator<=(const MPPQueryId & rid) const; + String toString() const + { + return fmt::format("", query_ts, local_query_id, server_id, start_ts); + } +}; + +struct MPPQueryIdHash +{ + size_t operator()(MPPQueryId const & mpp_query_id) const noexcept; +}; + // Identify a mpp task. struct MPPTaskId { MPPTaskId() - : start_ts(0) - , task_id(unknown_task_id){}; + : task_id(unknown_task_id) + , query_id({0, 0, 0, 0}){}; - MPPTaskId(UInt64 start_ts_, Int64 task_id_) - : start_ts(start_ts_) - , task_id(task_id_){}; + MPPTaskId(UInt64 start_ts, Int64 task_id_, UInt64 server_id, UInt64 query_ts, UInt64 local_query_id) + : task_id(task_id_) + , query_id(query_ts, local_query_id, server_id, start_ts) + {} + + explicit MPPTaskId(const mpp::TaskMeta & task_meta) + : task_id(task_meta.task_id()) + , query_id(task_meta) + {} - UInt64 start_ts; Int64 task_id; + MPPQueryId query_id; bool isUnknown() const { return task_id == unknown_task_id; } String toString() const; - static const MPPTaskId unknown_mpp_task_id; + static const MPPQueryId Max_Query_Id; private: static constexpr Int64 unknown_task_id = -1; @@ -53,7 +95,7 @@ class hash public: size_t operator()(const DB::MPPTaskId & id) const { - return hash()(id.start_ts) ^ hash()(id.task_id); + return DB::MPPQueryIdHash()(id.query_id) ^ hash()(id.task_id); } }; } // namespace std \ No newline at end of file diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index bc54c5db8fc..41b6e35b727 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -35,7 +35,7 @@ MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_) , log(Logger::get()) {} -MPPQueryTaskSetPtr MPPTaskManager::addMPPQueryTaskSet(UInt64 query_id) +MPPQueryTaskSetPtr MPPTaskManager::addMPPQueryTaskSet(const MPPQueryId & query_id) { auto ptr = std::make_shared(); mpp_query_map.insert({query_id, ptr}); @@ -43,7 +43,7 @@ MPPQueryTaskSetPtr MPPTaskManager::addMPPQueryTaskSet(UInt64 query_id) return ptr; } -void MPPTaskManager::removeMPPQueryTaskSet(UInt64 query_id, bool on_abort) +void MPPTaskManager::removeMPPQueryTaskSet(const MPPQueryId & query_id, bool on_abort) { scheduler->deleteQuery(query_id, *this, on_abort); mpp_query_map.erase(query_id); @@ -53,16 +53,16 @@ void MPPTaskManager::removeMPPQueryTaskSet(UInt64 query_id, bool on_abort) std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::EstablishMPPConnectionRequest * request, EstablishCallData * call_data, grpc::CompletionQueue * cq) { const auto & meta = request->sender_meta(); - MPPTaskId id{meta.start_ts(), meta.task_id()}; + MPPTaskId id{meta}; Int64 sender_task_id = meta.task_id(); Int64 receiver_task_id = request->receiver_meta().task_id(); std::unique_lock lock(mu); - auto query_it = mpp_query_map.find(id.start_ts); + auto query_it = mpp_query_map.find(id.query_id); if (query_it != mpp_query_map.end() && !query_it->second->isInNormalState()) { /// if the query is aborted, return the error message - LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.start_ts)); + LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toString())); /// meet error return {nullptr, query_it->second->error_message}; } @@ -73,7 +73,7 @@ std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::Est if (!call_data->isWaitingTunnelState()) { /// if call_data is in new_request state, put it to waiting tunnel state - auto query_set = query_it == mpp_query_map.end() ? addMPPQueryTaskSet(id.start_ts) : query_it->second; + auto query_set = query_it == mpp_query_map.end() ? addMPPQueryTaskSet(id.query_id) : query_it->second; auto & alarm = query_set->alarms[sender_task_id][receiver_task_id]; call_data->setToWaitingTunnelState(); alarm.Set(cq, Clock::now() + std::chrono::seconds(10), call_data); @@ -96,11 +96,11 @@ std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::Est { /// if the query task set has no mpp task, it has to be removed if there is no alarms left, /// otherwise the query task set itself may be left in MPPTaskManager forever - removeMPPQueryTaskSet(id.start_ts, false); + removeMPPQueryTaskSet(id.query_id, false); cv.notify_all(); } } - return {nullptr, fmt::format("Can't find task [{},{}] within 10 s.", id.start_ts, id.task_id)}; + return {nullptr, fmt::format("Can't find task [{}] within 10 s.", id.toString())}; } } /// don't need to delete the alarm here because registerMPPTask will delete all the related alarm @@ -112,13 +112,13 @@ std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::Est std::pair MPPTaskManager::findTunnelWithTimeout(const ::mpp::EstablishMPPConnectionRequest * request, std::chrono::seconds timeout) { const auto & meta = request->sender_meta(); - MPPTaskId id{meta.start_ts(), meta.task_id()}; + MPPTaskId id{meta}; std::unordered_map::iterator it; bool cancelled = false; String error_message; std::unique_lock lock(mu); auto ret = cv.wait_for(lock, timeout, [&] { - auto query_it = mpp_query_map.find(id.start_ts); + auto query_it = mpp_query_map.find(id.query_id); // TODO: how about the query has been cancelled in advance? if (query_it == mpp_query_map.end()) { @@ -127,7 +127,7 @@ std::pair MPPTaskManager::findTunnelWithTimeout(const ::mp else if (!query_it->second->isInNormalState()) { /// if the query is aborted, return true to stop waiting timeout. - LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.start_ts)); + LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toString())); cancelled = true; error_message = query_it->second->error_message; return true; @@ -147,9 +147,9 @@ std::pair MPPTaskManager::findTunnelWithTimeout(const ::mp return it->second->getTunnel(request); } -void MPPTaskManager::abortMPPQuery(UInt64 query_id, const String & reason, AbortType abort_type) +void MPPTaskManager::abortMPPQuery(const MPPQueryId & query_id, const String & reason, AbortType abort_type) { - LOG_WARNING(log, fmt::format("Begin to abort query: {}, abort type: {}, reason: {}", query_id, magic_enum::enum_name(abort_type), reason)); + LOG_WARNING(log, fmt::format("Begin to abort query: {}, abort type: {}, reason: {}", query_id.toString(), magic_enum::enum_name(abort_type), reason)); MPPQueryTaskSetPtr task_set; { /// abort task may take a long time, so first @@ -159,12 +159,12 @@ void MPPTaskManager::abortMPPQuery(UInt64 query_id, const String & reason, Abort auto it = mpp_query_map.find(query_id); if (it == mpp_query_map.end()) { - LOG_WARNING(log, fmt::format("{} does not found in task manager, skip abort", query_id)); + LOG_WARNING(log, fmt::format("{} does not found in task manager, skip abort", query_id.toString())); return; } else if (!it->second->isInNormalState()) { - LOG_WARNING(log, fmt::format("{} already in abort process, skip abort", query_id)); + LOG_WARNING(log, fmt::format("{} already in abort process, skip abort", query_id.toString())); return; } it->second->state = MPPQueryTaskSet::Aborting; @@ -178,7 +178,7 @@ void MPPTaskManager::abortMPPQuery(UInt64 query_id, const String & reason, Abort it->second->alarms.clear(); if (it->second->task_map.empty()) { - LOG_INFO(log, fmt::format("There is no mpp task for {}, finish abort", query_id)); + LOG_INFO(log, fmt::format("There is no mpp task for {}, finish abort", query_id.toString())); removeMPPQueryTaskSet(query_id, true); cv.notify_all(); return; @@ -189,7 +189,7 @@ void MPPTaskManager::abortMPPQuery(UInt64 query_id, const String & reason, Abort } FmtBuffer fmt_buf; - fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id); + fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id.toString()); for (auto & it : task_set->task_map) fmt_buf.fmtAppend("{} ", it.first.toString()); LOG_WARNING(log, fmt_buf.toString()); @@ -200,11 +200,11 @@ void MPPTaskManager::abortMPPQuery(UInt64 query_id, const String & reason, Abort { std::lock_guard lock(mu); auto it = mpp_query_map.find(query_id); - RUNTIME_ASSERT(it != mpp_query_map.end(), log, "MPPTaskQuerySet {} should remaining in MPPTaskManager", query_id); + RUNTIME_ASSERT(it != mpp_query_map.end(), log, "MPPTaskQuerySet {} should remaining in MPPTaskManager", query_id.toString()); it->second->state = MPPQueryTaskSet::Aborted; cv.notify_all(); } - LOG_WARNING(log, "Finish abort query: " + std::to_string(query_id)); + LOG_WARNING(log, "Finish abort query: " + query_id.toString()); } std::pair MPPTaskManager::registerTask(MPPTaskPtr task) @@ -214,7 +214,7 @@ std::pair MPPTaskManager::registerTask(MPPTaskPtr task) FAIL_POINT_PAUSE(FailPoints::pause_before_register_non_root_mpp_task); } std::unique_lock lock(mu); - const auto & it = mpp_query_map.find(task->id.start_ts); + const auto & it = mpp_query_map.find(task->id.query_id); if (it != mpp_query_map.end() && !it->second->isInNormalState()) { return {false, fmt::format("query is being aborted, error message = {}", it->second->error_message)}; @@ -226,7 +226,7 @@ std::pair MPPTaskManager::registerTask(MPPTaskPtr task) MPPQueryTaskSetPtr query_set; if (it == mpp_query_map.end()) /// the first one { - query_set = addMPPQueryTaskSet(task->id.start_ts); + query_set = addMPPQueryTaskSet(task->id.query_id); } else { @@ -251,7 +251,7 @@ std::pair MPPTaskManager::unregisterTask(const MPPTaskId & id) std::unique_lock lock(mu); auto it = mpp_query_map.end(); cv.wait(lock, [&] { - it = mpp_query_map.find(id.start_ts); + it = mpp_query_map.find(id.query_id); return it == mpp_query_map.end() || it->second->allowUnregisterTask(); }); if (it != mpp_query_map.end()) @@ -261,7 +261,7 @@ std::pair MPPTaskManager::unregisterTask(const MPPTaskId & id) { it->second->task_map.erase(task_it); if (it->second->task_map.empty() && it->second->alarms.empty()) - removeMPPQueryTaskSet(id.start_ts, false); + removeMPPQueryTaskSet(id.query_id, false); cv.notify_all(); return {true, ""}; } @@ -282,13 +282,13 @@ String MPPTaskManager::toString() return res + ")"; } -MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSetWithoutLock(UInt64 query_id) +MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSetWithoutLock(const MPPQueryId & query_id) { auto it = mpp_query_map.find(query_id); return it == mpp_query_map.end() ? nullptr : it->second; } -MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSet(UInt64 query_id) +MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSet(const MPPQueryId & query_id) { std::lock_guard lock(mu); return getQueryTaskSetWithoutLock(query_id); diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index 018a8631880..b986d8ae866 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -55,9 +55,9 @@ struct MPPQueryTaskSet using MPPQueryTaskSetPtr = std::shared_ptr; /// a map from the mpp query id to mpp query task set, we use -/// the start ts of a query as the query id as TiDB will guarantee -/// the uniqueness of the start ts -using MPPQueryMap = std::unordered_map; +/// the query_ts + local_query_id + serverID as the query id, because TiDB can't guarantee +/// the uniqueness of the start ts when stale read or set snapshot +using MPPQueryMap = std::unordered_map; // MPPTaskManger holds all running mpp tasks. It's a single instance holden in Context. class MPPTaskManager : private boost::noncopyable @@ -77,9 +77,9 @@ class MPPTaskManager : private boost::noncopyable ~MPPTaskManager() = default; - MPPQueryTaskSetPtr getQueryTaskSetWithoutLock(UInt64 query_id); + MPPQueryTaskSetPtr getQueryTaskSetWithoutLock(const MPPQueryId & query_id); - MPPQueryTaskSetPtr getQueryTaskSet(UInt64 query_id); + MPPQueryTaskSetPtr getQueryTaskSet(const MPPQueryId & query_id); std::pair registerTask(MPPTaskPtr task); @@ -93,13 +93,13 @@ class MPPTaskManager : private boost::noncopyable std::pair findAsyncTunnel(const ::mpp::EstablishMPPConnectionRequest * request, EstablishCallData * call_data, grpc::CompletionQueue * cq); - void abortMPPQuery(UInt64 query_id, const String & reason, AbortType abort_type); + void abortMPPQuery(const MPPQueryId & query_id, const String & reason, AbortType abort_type); String toString(); private: - MPPQueryTaskSetPtr addMPPQueryTaskSet(UInt64 query_id); - void removeMPPQueryTaskSet(UInt64 query_id, bool on_abort); + MPPQueryTaskSetPtr addMPPQueryTaskSet(const MPPQueryId & query_id); + void removeMPPQueryTaskSet(const MPPQueryId & query_id, bool on_abort); }; } // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp index 523103611ca..a18403ba5d5 100644 --- a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp @@ -96,14 +96,15 @@ void MPPTaskStatistics::logTracingJson() { LOG_INFO( logger, - R"({{"query_tso":{},"task_id":{},"is_root":{},"sender_executor_id":"{}","executors":{},"host":"{}")" + R"({{"query_tso":{},"task_id":{},"query_id":{},"is_root":{},"sender_executor_id":"{}","executors":{},"host":"{}")" R"(,"task_init_timestamp":{},"task_start_timestamp":{},"task_end_timestamp":{})" R"(,"compile_start_timestamp":{},"compile_end_timestamp":{})" R"(,"read_wait_index_start_timestamp":{},"read_wait_index_end_timestamp":{})" R"(,"local_input_bytes":{},"remote_input_bytes":{},"output_bytes":{})" R"(,"status":"{}","error_message":"{}","working_time":{},"memory_peak":{}}})", - id.start_ts, + id.query_id.start_ts, id.task_id, + id.query_id.toString(), is_root, sender_executor_id, executor_statistics_collector.resToJson(), diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp index b7deae93311..c22b3f7d063 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -25,11 +25,10 @@ namespace FailPoints extern const char random_min_tso_scheduler_failpoint[]; } // namespace FailPoints -constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); constexpr UInt64 OS_THREAD_SOFT_LIMIT = 100000; MinTSOScheduler::MinTSOScheduler(UInt64 soft_limit, UInt64 hard_limit, UInt64 active_set_soft_limit_) - : min_tso(MAX_UINT64) + : min_query_id(MPPTaskId::Max_Query_Id) , thread_soft_limit(soft_limit) , thread_hard_limit(hard_limit) , estimated_thread_usage(0) @@ -58,7 +57,7 @@ MinTSOScheduler::MinTSOScheduler(UInt64 soft_limit, UInt64 hard_limit, UInt64 ac { LOG_INFO(log, "thread_hard_limit is {}, thread_soft_limit is {}, and active_set_soft_limit is {} in MinTSOScheduler.", thread_hard_limit, thread_soft_limit, active_set_soft_limit); } - GET_METRIC(tiflash_task_scheduler, type_min_tso).Set(min_tso); + GET_METRIC(tiflash_task_scheduler, type_min_tso).Set(min_query_id.query_ts); GET_METRIC(tiflash_task_scheduler, type_thread_soft_limit).Set(thread_soft_limit); GET_METRIC(tiflash_task_scheduler, type_thread_hard_limit).Set(thread_hard_limit); GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage); @@ -78,34 +77,34 @@ bool MinTSOScheduler::tryToSchedule(MPPTaskScheduleEntry & schedule_entry, MPPTa return true; } const auto & id = schedule_entry.getMPPTaskId(); - auto query_task_set = task_manager.getQueryTaskSetWithoutLock(id.start_ts); + auto query_task_set = task_manager.getQueryTaskSetWithoutLock(id.query_id); if (nullptr == query_task_set || !query_task_set->isInNormalState()) { LOG_WARNING(log, "{} is scheduled with miss or abort.", id.toString()); return true; } bool has_error = false; - return scheduleImp(id.start_ts, query_task_set, schedule_entry, false, has_error); + return scheduleImp(id.query_id, query_task_set, schedule_entry, false, has_error); } -/// after finishing the query, there would be no threads released soon, so the updated min-tso query with waiting tasks should be scheduled. +/// after finishing the query, there would be no threads released soon, so the updated min-query-id query with waiting tasks should be scheduled. /// the cancelled query maybe hang, so trigger scheduling as needed when deleting cancelled query. -void MinTSOScheduler::deleteQuery(const UInt64 tso, MPPTaskManager & task_manager, const bool is_cancelled) +void MinTSOScheduler::deleteQuery(const MPPQueryId & query_id, MPPTaskManager & task_manager, const bool is_cancelled) { if (isDisabled()) { return; } - LOG_DEBUG(log, "{} query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", is_cancelled ? "Cancelled" : "Finished", tso, tso == min_tso, active_set.find(tso) != active_set.end(), active_set.size(), waiting_set.find(tso) != waiting_set.end(), waiting_set.size()); - active_set.erase(tso); - waiting_set.erase(tso); + LOG_DEBUG(log, "{} query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", is_cancelled ? "Cancelled" : "Finished", query_id.toString(), query_id == min_query_id, active_set.find(query_id) != active_set.end(), active_set.size(), waiting_set.find(query_id) != waiting_set.end(), waiting_set.size()); + active_set.erase(query_id); + waiting_set.erase(query_id); GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size()); if (is_cancelled) /// cancelled queries may have waiting tasks, and finished queries haven't. { - auto query_task_set = task_manager.getQueryTaskSetWithoutLock(tso); + auto query_task_set = task_manager.getQueryTaskSetWithoutLock(query_id); if (query_task_set) /// release all waiting tasks { while (!query_task_set->waiting_tasks.empty()) @@ -119,8 +118,8 @@ void MinTSOScheduler::deleteQuery(const UInt64 tso, MPPTaskManager & task_manage } } - /// NOTE: if updated min_tso query has waiting tasks, they should be scheduled, especially when the soft-limited threads are amost used and active tasks are in resources deadlock which cannot release threads soon. - if (updateMinTSO(tso, true, is_cancelled ? "when cancelling it" : "as finishing it")) + /// NOTE: if updated min_query_id query has waiting tasks, they should be scheduled, especially when the soft-limited threads are amost used and active tasks are in resources deadlock which cannot release threads soon. + if (updateMinQueryId(query_id, true, is_cancelled ? "when cancelling it" : "as finishing it")) { scheduleWaitingQueries(task_manager); } @@ -153,8 +152,8 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) auto query_task_set = task_manager.getQueryTaskSetWithoutLock(current_query_id); if (nullptr == query_task_set) /// silently solve this rare case { - LOG_ERROR(log, "the waiting query {} is not in the task manager.", current_query_id); - updateMinTSO(current_query_id, true, "as it is not in the task manager."); + LOG_ERROR(log, "the waiting query {} is not in the task manager.", current_query_id.toString()); + updateMinQueryId(current_query_id, true, "as it is not in the task manager."); active_set.erase(current_query_id); waiting_set.erase(current_query_id); GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); @@ -162,7 +161,7 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) continue; } - LOG_DEBUG(log, "query {} (is min = {}) with {} tasks is to be scheduled from waiting set (size = {}).", current_query_id, current_query_id == min_tso, query_task_set->waiting_tasks.size(), waiting_set.size()); + LOG_DEBUG(log, "query {} (is min = {}) with {} tasks is to be scheduled from waiting set (size = {}).", current_query_id.toString(), current_query_id == min_query_id, query_task_set->waiting_tasks.size(), waiting_set.size()); /// schedule tasks one by one while (!query_task_set->waiting_tasks.empty()) { @@ -180,22 +179,22 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) query_task_set->waiting_tasks.pop(); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); } - LOG_DEBUG(log, "query {} (is min = {}) is scheduled from waiting set (size = {}).", current_query_id, current_query_id == min_tso, waiting_set.size()); + LOG_DEBUG(log, "query {} (is min = {}) is scheduled from waiting set (size = {}).", current_query_id.toString(), current_query_id == min_query_id, waiting_set.size()); waiting_set.erase(current_query_id); /// all waiting tasks of this query are fully active GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); } } -/// [directly schedule, from waiting set] * [is min_tso query, not] * [can schedule, can't] totally 8 cases. -bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, MPPTaskScheduleEntry & schedule_entry, const bool isWaiting, bool & has_error) +/// [directly schedule, from waiting set] * [is min_query_id query, not] * [can schedule, can't] totally 8 cases. +bool MinTSOScheduler::scheduleImp(const MPPQueryId & query_id, const MPPQueryTaskSetPtr & query_task_set, MPPTaskScheduleEntry & schedule_entry, const bool isWaiting, bool & has_error) { auto needed_threads = schedule_entry.getNeededThreads(); - auto check_for_new_min_tso = tso <= min_tso && estimated_thread_usage + needed_threads <= thread_hard_limit; - auto check_for_not_min_tso = (active_set.size() < active_set_soft_limit || tso <= *active_set.rbegin()) && (estimated_thread_usage + needed_threads <= thread_soft_limit); + auto check_for_new_min_tso = query_id <= min_query_id && estimated_thread_usage + needed_threads <= thread_hard_limit; + auto check_for_not_min_tso = (active_set.size() < active_set_soft_limit || query_id <= *active_set.rbegin()) && (estimated_thread_usage + needed_threads <= thread_soft_limit); if (check_for_new_min_tso || check_for_not_min_tso) { - updateMinTSO(tso, false, isWaiting ? "from the waiting set" : "when directly schedule it"); - active_set.insert(tso); + updateMinQueryId(query_id, false, isWaiting ? "from the waiting set" : "when directly schedule it"); + active_set.insert(query_id); if (schedule_entry.schedule(ScheduleState::SCHEDULED)) { estimated_thread_usage += needed_threads; @@ -203,24 +202,24 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q } GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size()); GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage); - LOG_INFO(log, "{} is scheduled (active set size = {}) due to available threads {}, after applied for {} threads, used {} of the thread {} limit {}.", schedule_entry.getMPPTaskId().toString(), active_set.size(), isWaiting ? "from the waiting set" : "directly", needed_threads, estimated_thread_usage, min_tso == tso ? "hard" : "soft", min_tso == tso ? thread_hard_limit : thread_soft_limit); + LOG_INFO(log, "{} is scheduled (active set size = {}) due to available threads {}, after applied for {} threads, used {} of the thread {} limit {}.", schedule_entry.getMPPTaskId().toString(), active_set.size(), isWaiting ? "from the waiting set" : "directly", needed_threads, estimated_thread_usage, min_query_id == query_id ? "hard" : "soft", min_query_id == query_id ? thread_hard_limit : thread_soft_limit); return true; } else { - bool is_tso_min = tso <= min_tso; - fiu_do_on(FailPoints::random_min_tso_scheduler_failpoint, is_tso_min = true;); - if (is_tso_min) /// the min_tso query should fully run, otherwise throw errors here. + bool is_query_id_min = query_id <= min_query_id; + fiu_do_on(FailPoints::random_min_tso_scheduler_failpoint, is_query_id_min = true;); + if (is_query_id_min) /// the min_query_id query should fully run, otherwise throw errors here. { has_error = true; - auto msg = fmt::format("threads are unavailable for the query {} ({} min_tso {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", tso, tso == min_tso ? "is" : "is newer than", min_tso, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size()); + auto msg = fmt::format("threads are unavailable for the query {} ({} min_query_id {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", query_id.toString(), query_id == min_query_id ? "is" : "is newer than", min_query_id.toString(), isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size()); LOG_ERROR(log, "{}", msg); GET_METRIC(tiflash_task_scheduler, type_hard_limit_exceeded_count).Increment(); if (isWaiting) { - /// set this task be failed to schedule, and the task will throw exception, then TiDB will finally notify this tiflash node canceling all tasks of this tso and update metrics. + /// set this task be failed to schedule, and the task will throw exception, then TiDB will finally notify this tiflash node canceling all tasks of this query_id and update metrics. schedule_entry.schedule(ScheduleState::EXCEEDED); - waiting_set.erase(tso); /// avoid the left waiting tasks of this query reaching here many times. + waiting_set.erase(query_id); /// avoid the left waiting tasks of this query reaching here many times. } else { @@ -230,38 +229,38 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q } if (!isWaiting) { - waiting_set.insert(tso); + waiting_set.insert(query_id); query_task_set->waiting_tasks.push(schedule_entry.getMPPTaskId()); GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Increment(); } - LOG_INFO(log, "threads are unavailable for the query {} or active set is full (size = {}), need {}, but used {} of the thread soft limit {},{} waiting set size = {}", tso, active_set.size(), needed_threads, estimated_thread_usage, thread_soft_limit, isWaiting ? "" : " put into", waiting_set.size()); + LOG_INFO(log, "threads are unavailable for the query {} or active set is full (size = {}), need {}, but used {} of the thread soft limit {},{} waiting set size = {}", query_id.toString(), active_set.size(), needed_threads, estimated_thread_usage, thread_soft_limit, isWaiting ? "" : " put into", waiting_set.size()); return false; } } -/// if return true, then need to schedule the waiting tasks of the min_tso. -bool MinTSOScheduler::updateMinTSO(const UInt64 tso, const bool retired, const String & msg) +/// if return true, then need to schedule the waiting tasks of the min_query_id. +bool MinTSOScheduler::updateMinQueryId(const MPPQueryId & query_id, const bool retired, const String & msg) { - auto old_min_tso = min_tso; + auto old_min_query_id = min_query_id; bool force_scheduling = false; if (retired) { - if (tso == min_tso) /// elect a new min_tso from all queries. + if (query_id == min_query_id) /// elect a new min_query_id from all queries. { - min_tso = active_set.empty() ? MAX_UINT64 : *active_set.begin(); - min_tso = waiting_set.empty() ? min_tso : std::min(*waiting_set.begin(), min_tso); - force_scheduling = waiting_set.find(min_tso) != waiting_set.end(); /// if this min_tso has waiting tasks, these tasks should force being scheduled. + min_query_id = active_set.empty() ? MPPTaskId::Max_Query_Id : *active_set.begin(); + min_query_id = waiting_set.empty() ? min_query_id : std::min(*waiting_set.begin(), min_query_id); + force_scheduling = waiting_set.find(min_query_id) != waiting_set.end(); /// if this min_query_id has waiting tasks, these tasks should force being scheduled. } } else { - min_tso = std::min(tso, min_tso); + min_query_id = std::min(query_id, min_query_id); } - if (min_tso != old_min_tso) /// if min_tso == MAX_UINT64 and the query tso is not to be cancelled, the used_threads, active_set.size() and waiting_set.size() must be 0. + if (min_query_id != old_min_query_id) /// if min_query_id == MPPTaskId::Max_Query_Id and the query_id is not to be cancelled, the used_threads, active_set.size() and waiting_set.size() must be 0. { - GET_METRIC(tiflash_task_scheduler, type_min_tso).Set(min_tso); - LOG_INFO(log, "min_tso query is updated from {} to {} {}, used threads = {}, {} active and {} waiting queries.", old_min_tso, min_tso, msg, estimated_thread_usage, active_set.size(), waiting_set.size()); + GET_METRIC(tiflash_task_scheduler, type_min_tso).Set(min_query_id.query_ts == 0 ? min_query_id.start_ts : min_query_id.query_ts); + LOG_INFO(log, "min_query_id query is updated from {} to {} {}, used threads = {}, {} active and {} waiting queries.", old_min_query_id.toString(), min_query_id.toString(), msg, estimated_thread_usage, active_set.size(), waiting_set.size()); } return force_scheduling; } diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.h b/dbms/src/Flash/Mpp/MinTSOScheduler.h index 25abb770e44..dfe2921d1fb 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.h +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.h @@ -28,8 +28,8 @@ using MPPTaskManagerPtr = std::shared_ptr; struct MPPQueryTaskSet; using MPPQueryTaskSetPtr = std::shared_ptr; -/// scheduling tasks in the set according to the tso order under the soft limit of threads, but allow the min_tso query to preempt threads under the hard limit of threads. -/// The min_tso query avoids the deadlock resulted from threads competition among nodes. +/// scheduling tasks in the set according to the tso order under the soft limit of threads, but allow the min_query_id query to preempt threads under the hard limit of threads. +/// The min_query_id query avoids the deadlock resulted from threads competition among nodes. /// schedule tasks under the lock protection of the task manager. /// NOTE: if the updated min-tso query has waiting tasks, necessarily scheduling them, otherwise the query would hang. class MinTSOScheduler : private boost::noncopyable @@ -37,28 +37,28 @@ class MinTSOScheduler : private boost::noncopyable public: MinTSOScheduler(UInt64 soft_limit, UInt64 hard_limit, UInt64 active_set_soft_limit_); ~MinTSOScheduler() = default; - /// try to schedule this task if it is the min_tso query or there are enough threads, otherwise put it into the waiting set. + /// try to schedule this task if it is the min_query_id query or there are enough threads, otherwise put it into the waiting set. /// NOTE: call tryToSchedule under the lock protection of MPPTaskManager bool tryToSchedule(MPPTaskScheduleEntry & schedule_entry, MPPTaskManager & task_manager); - /// delete this to-be cancelled/finished query from scheduler and update min_tso if needed, so that there aren't cancelled/finished queries in the scheduler. + /// delete this to-be cancelled/finished query from scheduler and update min_query_id if needed, so that there aren't cancelled/finished queries in the scheduler. /// NOTE: call deleteQuery under the lock protection of MPPTaskManager - void deleteQuery(const UInt64 tso, MPPTaskManager & task_manager, const bool is_cancelled); + void deleteQuery(const MPPQueryId & query_id, MPPTaskManager & task_manager, const bool is_cancelled); /// all scheduled tasks should finally call this function to release threads and schedule new tasks void releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager); private: - bool scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, MPPTaskScheduleEntry & schedule_entry, const bool isWaiting, bool & has_error); - bool updateMinTSO(const UInt64 tso, const bool retired, const String & msg); + bool scheduleImp(const MPPQueryId & query_id, const MPPQueryTaskSetPtr & query_task_set, MPPTaskScheduleEntry & schedule_entry, const bool isWaiting, bool & has_error); + bool updateMinQueryId(const MPPQueryId & query_id, const bool retired, const String & msg); void scheduleWaitingQueries(MPPTaskManager & task_manager); bool isDisabled() { return thread_hard_limit == 0 && thread_soft_limit == 0; } - std::set waiting_set; - std::set active_set; - UInt64 min_tso; + std::set waiting_set; + std::set active_set; + MPPQueryId min_query_id; UInt64 thread_soft_limit; UInt64 thread_hard_limit; UInt64 estimated_thread_usage; diff --git a/dbms/src/Flash/Mpp/getMPPTaskTracingLog.cpp b/dbms/src/Flash/Mpp/getMPPTaskTracingLog.cpp index df04d25cac8..dce4e69db1f 100644 --- a/dbms/src/Flash/Mpp/getMPPTaskTracingLog.cpp +++ b/dbms/src/Flash/Mpp/getMPPTaskTracingLog.cpp @@ -13,11 +13,12 @@ // limitations under the License. #include +#include namespace DB { LoggerPtr getMPPTaskTracingLog(const MPPTaskId & mpp_task_id) { - return Logger::get(tracing_log_source, mpp_task_id.toString()); + return Logger::get(DB::tracing_log_source, mpp_task_id.toString()); } } // namespace DB diff --git a/dbms/src/Flash/Mpp/getMPPTaskTracingLog.h b/dbms/src/Flash/Mpp/getMPPTaskTracingLog.h index 1491d26dbf3..4da514f5795 100644 --- a/dbms/src/Flash/Mpp/getMPPTaskTracingLog.h +++ b/dbms/src/Flash/Mpp/getMPPTaskTracingLog.h @@ -19,9 +19,6 @@ namespace DB { -/// Tracing logs are filtered by SourceFilterChannel. -inline constexpr auto tracing_log_source = "mpp_task_tracing"; - /// All tracing logs must logged by the logger that got by `getMPPTaskTracingLog`. LoggerPtr getMPPTaskTracingLog(const MPPTaskId & mpp_task_id); } // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/tests/bench_exchange.cpp b/dbms/src/Flash/tests/bench_exchange.cpp index 3454242f2a7..6691a3b74a4 100644 --- a/dbms/src/Flash/tests/bench_exchange.cpp +++ b/dbms/src/Flash/tests/bench_exchange.cpp @@ -28,7 +28,6 @@ namespace DB { namespace tests { - std::random_device rd; MockBlockInputStream::MockBlockInputStream(const std::vector & blocks_, StopFlag & stop_flag_) @@ -167,6 +166,9 @@ ReceiverHelper::ReceiverHelper(int concurrency_, int source_num_, uint32_t fine_ { mpp::TaskMeta task; task.set_start_ts(0); + task.set_query_ts(i); + task.set_server_id(i); + task.set_local_query_id(i); task.set_task_id(i); task.set_partition_id(i); task.set_address(""); diff --git a/dbms/src/Flash/tests/bench_window.cpp b/dbms/src/Flash/tests/bench_window.cpp index 9f68ba9beb9..eb14e12f314 100644 --- a/dbms/src/Flash/tests/bench_window.cpp +++ b/dbms/src/Flash/tests/bench_window.cpp @@ -47,7 +47,7 @@ class WindowFunctionBench : public ExchangeBench buildDefaultRowsFrame(), fine_grained_shuffle_stream_count); tipb::DAGRequest req; - MPPInfo mpp_info(0, -1, -1, {}, std::unordered_map>{}); + MPPInfo mpp_info(0, 0, 0, 0, -1, -1, {}, std::unordered_map>{}); builder.getRoot()->toTiPBExecutor(req.mutable_root_executor(), /*collator_id=*/0, mpp_info, TiFlashTestEnv::getContext()); assert(req.root_executor().tp() == tipb::TypeWindow); window = req.root_executor().window(); diff --git a/dbms/src/Flash/tests/gtest_compute_server.cpp b/dbms/src/Flash/tests/gtest_compute_server.cpp index 43da894ec64..264db3ea876 100644 --- a/dbms/src/Flash/tests/gtest_compute_server.cpp +++ b/dbms/src/Flash/tests/gtest_compute_server.cpp @@ -18,7 +18,6 @@ namespace DB { namespace tests { - LoggerPtr MPPTaskTestUtils::log_ptr = nullptr; size_t MPPTaskTestUtils::server_num = 0; MPPTestMeta MPPTaskTestUtils::test_meta = {}; @@ -296,13 +295,13 @@ try { startServers(4); { - auto [start_ts, res] = prepareMPPStreams(context + auto [query_id, res] = prepareMPPStreams(context .scan("test_db", "test_table_1") .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) .project({"max(s1)"})); - EXPECT_TRUE(assertQueryActive(start_ts)); - MockComputeServerManager::instance().cancelQuery(start_ts); - EXPECT_TRUE(assertQueryCancelled(start_ts)); + EXPECT_TRUE(assertQueryActive(query_id)); + MockComputeServerManager::instance().cancelQuery(query_id); + EXPECT_TRUE(assertQueryCancelled(query_id)); } } CATCH @@ -312,12 +311,12 @@ try { startServers(4); { - auto [start_ts, res] = prepareMPPStreams(context + auto [query_id, res] = prepareMPPStreams(context .scan("test_db", "l_table") .join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")})); - EXPECT_TRUE(assertQueryActive(start_ts)); - MockComputeServerManager::instance().cancelQuery(start_ts); - EXPECT_TRUE(assertQueryCancelled(start_ts)); + EXPECT_TRUE(assertQueryActive(query_id)); + MockComputeServerManager::instance().cancelQuery(query_id); + EXPECT_TRUE(assertQueryCancelled(query_id)); } } CATCH @@ -327,14 +326,14 @@ try { startServers(4); { - auto [start_ts, _] = prepareMPPStreams(context + auto [query_id, _] = prepareMPPStreams(context .scan("test_db", "l_table") .join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")}) .aggregation({Max(col("l_table.s"))}, {col("l_table.s")}) .project({col("max(l_table.s)"), col("l_table.s")})); - EXPECT_TRUE(assertQueryActive(start_ts)); - MockComputeServerManager::instance().cancelQuery(start_ts); - EXPECT_TRUE(assertQueryCancelled(start_ts)); + EXPECT_TRUE(assertQueryActive(query_id)); + MockComputeServerManager::instance().cancelQuery(query_id); + EXPECT_TRUE(assertQueryCancelled(query_id)); } } CATCH @@ -344,27 +343,27 @@ try { startServers(4); { - auto [start_ts1, res1] = prepareMPPStreams(context + auto [query_id1, res1] = prepareMPPStreams(context .scan("test_db", "l_table") .join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")})); - auto [start_ts2, res2] = prepareMPPStreams(context + auto [query_id2, res2] = prepareMPPStreams(context .scan("test_db", "l_table") .join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")}) .aggregation({Max(col("l_table.s"))}, {col("l_table.s")}) .project({col("max(l_table.s)"), col("l_table.s")})); - EXPECT_TRUE(assertQueryActive(start_ts1)); - MockComputeServerManager::instance().cancelQuery(start_ts1); - EXPECT_TRUE(assertQueryCancelled(start_ts1)); + EXPECT_TRUE(assertQueryActive(query_id1)); + MockComputeServerManager::instance().cancelQuery(query_id1); + EXPECT_TRUE(assertQueryCancelled(query_id1)); - EXPECT_TRUE(assertQueryActive(start_ts2)); - MockComputeServerManager::instance().cancelQuery(start_ts2); - EXPECT_TRUE(assertQueryCancelled(start_ts2)); + EXPECT_TRUE(assertQueryActive(query_id2)); + MockComputeServerManager::instance().cancelQuery(query_id2); + EXPECT_TRUE(assertQueryCancelled(query_id2)); } // start 10 queries { - std::vector>> queries; + std::vector>> queries; for (size_t i = 0; i < 10; ++i) { queries.push_back(prepareMPPStreams(context @@ -373,10 +372,10 @@ try } for (size_t i = 0; i < 10; ++i) { - auto start_ts = std::get<0>(queries[i]); - EXPECT_TRUE(assertQueryActive(start_ts)); - MockComputeServerManager::instance().cancelQuery(start_ts); - EXPECT_TRUE(assertQueryCancelled(start_ts)); + auto query_id = std::get<0>(queries[i]); + EXPECT_TRUE(assertQueryActive(query_id)); + MockComputeServerManager::instance().cancelQuery(query_id); + EXPECT_TRUE(assertQueryCancelled(query_id)); } } } diff --git a/dbms/src/Storages/StorageDisaggregated.cpp b/dbms/src/Storages/StorageDisaggregated.cpp index 8a447637b31..652032b8dc5 100644 --- a/dbms/src/Storages/StorageDisaggregated.cpp +++ b/dbms/src/Storages/StorageDisaggregated.cpp @@ -97,8 +97,11 @@ StorageDisaggregated::RequestAndRegionIDs StorageDisaggregated::buildDispatchMPP std::vector region_ids; auto dispatch_req = std::make_shared<::mpp::DispatchTaskRequest>(); ::mpp::TaskMeta * dispatch_req_meta = dispatch_req->mutable_meta(); - dispatch_req_meta->set_start_ts(sender_target_task_start_ts); - dispatch_req_meta->set_task_id(sender_target_task_task_id); + dispatch_req_meta->set_start_ts(sender_target_mpp_task_id.query_id.start_ts); + dispatch_req_meta->set_query_ts(sender_target_mpp_task_id.query_id.query_ts); + dispatch_req_meta->set_local_query_id(sender_target_mpp_task_id.query_id.local_query_id); + dispatch_req_meta->set_server_id(sender_target_mpp_task_id.query_id.server_id); + dispatch_req_meta->set_task_id(sender_target_mpp_task_id.task_id); dispatch_req_meta->set_address(batch_cop_task.store_addr); const auto & settings = context.getSettings(); dispatch_req->set_timeout(60); @@ -164,10 +167,9 @@ StorageDisaggregated::RequestAndRegionIDs StorageDisaggregated::buildDispatchMPP tipb::Executor * executor = sender_dag_req.mutable_root_executor(); executor->set_tp(tipb::ExecType::TypeExchangeSender); // Exec summary of ExchangeSender will be merged into TableScan. - executor->set_executor_id(fmt::format("{}_{}_{}", + executor->set_executor_id(fmt::format("{}_{}", ExecIDPrefixForTiFlashStorageSender, - sender_target_task_start_ts, - sender_target_task_task_id)); + sender_target_mpp_task_id.toString())); tipb::ExchangeSender * sender = executor->mutable_exchange_sender(); sender->set_tp(tipb::ExchangeType::PassThrough); diff --git a/dbms/src/Storages/StorageDisaggregated.h b/dbms/src/Storages/StorageDisaggregated.h index b878ff21695..59971b05012 100644 --- a/dbms/src/Storages/StorageDisaggregated.h +++ b/dbms/src/Storages/StorageDisaggregated.h @@ -29,7 +29,6 @@ namespace DB { - // Naive implementation of StorageDisaggregated, all region data will be transferred by GRPC, // rewrite this when local cache is supported. // Naive StorageDisaggregated will convert TableScan to ExchangeReceiver(executed in tiflash_compute node), @@ -45,8 +44,7 @@ class StorageDisaggregated : public IStorage , context(context_) , table_scan(table_scan_) , log(Logger::get(context_.getDAGContext()->log ? context_.getDAGContext()->log->identifier() : "")) - , sender_target_task_start_ts(context_.getDAGContext()->getMPPTaskMeta().start_ts()) - , sender_target_task_task_id(context_.getDAGContext()->getMPPTaskMeta().task_id()) + , sender_target_mpp_task_id(context_.getDAGContext()->getMPPTaskMeta()) , push_down_filter(push_down_filter_) {} @@ -87,8 +85,7 @@ class StorageDisaggregated : public IStorage Context & context; const TiDBTableScan & table_scan; LoggerPtr log; - uint64_t sender_target_task_start_ts; - int64_t sender_target_task_task_id; + MPPTaskId sender_target_mpp_task_id; const PushDownFilter & push_down_filter; std::shared_ptr exchange_receiver; diff --git a/dbms/src/Storages/Transaction/LearnerRead.cpp b/dbms/src/Storages/Transaction/LearnerRead.cpp index a6f4bf18cea..bc9cde099fb 100644 --- a/dbms/src/Storages/Transaction/LearnerRead.cpp +++ b/dbms/src/Storages/Transaction/LearnerRead.cpp @@ -208,29 +208,40 @@ LearnerReadSnapshot doLearnerRead( std::vector batch_read_index_req; batch_read_index_req.reserve(ori_batch_region_size); + size_t stale_read_count = 0; { // If using `std::numeric_limits::max()`, set `start-ts` 0 to get the latest index but let read-index-worker do not record as history. auto read_index_tso = mvcc_query_info->read_tso == std::numeric_limits::max() ? 0 : mvcc_query_info->read_tso; - + RegionTable & region_table = tmt.getRegionTable(); for (size_t region_idx = region_begin_idx; region_idx < region_end_idx; ++region_idx) { const auto & region_to_query = regions_info[region_idx]; const RegionID region_id = region_to_query.region_id; - if (auto ori_read_index = mvcc_query_info.getReadIndexRes(region_id); ori_read_index) + UInt64 physical_tso = read_index_tso >> TsoPhysicalShiftBits; + bool can_stale_read = physical_tso < region_table.getSelfSafeTS(region_id); + if (!can_stale_read) { - auto resp = kvrpcpb::ReadIndexResponse(); - resp.set_read_index(ori_read_index); - batch_read_index_result.emplace(region_id, std::move(resp)); + if (auto ori_read_index = mvcc_query_info.getReadIndexRes(region_id); ori_read_index) + { + auto resp = kvrpcpb::ReadIndexResponse(); + resp.set_read_index(ori_read_index); + batch_read_index_result.emplace(region_id, std::move(resp)); + } + else + { + auto & region = regions_snapshot.find(region_id)->second; + batch_read_index_req.emplace_back(GenRegionReadIndexReq(*region, read_index_tso)); + } } else { - auto & region = regions_snapshot.find(region_id)->second; - batch_read_index_req.emplace_back(GenRegionReadIndexReq(*region, read_index_tso)); + batch_read_index_result.emplace(region_id, kvrpcpb::ReadIndexResponse()); + ++stale_read_count; } } } - + GET_METRIC(tiflash_stale_read_count).Increment(stale_read_count); GET_METRIC(tiflash_raft_read_index_count).Increment(batch_read_index_req.size()); const auto & make_default_batch_read_index_result = [&](bool with_region_error) { @@ -307,7 +318,11 @@ LearnerReadSnapshot doLearnerRead( else { // cache read-index to avoid useless overhead about retry. - mvcc_query_info.addReadIndexRes(region_id, resp.read_index()); + // resp.read_index() is 0 when stale read, skip it to avoid overwriting read_index res in last retry. + if (resp.read_index() != 0) + { + mvcc_query_info.addReadIndexRes(region_id, resp.read_index()); + } } } diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index a82fef8b1ad..956ca227051 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -505,6 +505,17 @@ bool RegionTable::isSafeTSLag(UInt64 region_id, UInt64 * leader_safe_ts, UInt64 return (*leader_safe_ts > *self_safe_ts) && (*leader_safe_ts - *self_safe_ts > SafeTsDiffThreshold); } +UInt64 RegionTable::getSelfSafeTS(UInt64 region_id) +{ + std::shared_lock lock(rw_lock); + auto it = safe_ts_map.find(region_id); + if (it == safe_ts_map.end()) + { + return 0; + } + return it->second->self_safe_ts.load(std::memory_order_relaxed); +} + void RegionTable::updateSafeTS(UInt64 region_id, UInt64 leader_safe_ts, UInt64 self_safe_ts) { { diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 9b4fe1a4286..36686b44d90 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -59,8 +59,15 @@ using RegionScanFilterPtr = std::shared_ptr; using SafeTS = UInt64; enum : SafeTS { - InvalidSafeTS = std::numeric_limits::max() + InvalidSafeTS = std::numeric_limits::max(), }; + +using TsoShiftBits = UInt64; +enum : TsoShiftBits +{ + TsoPhysicalShiftBits = 18, +}; + class RegionTable : private boost::noncopyable { public: @@ -189,6 +196,7 @@ class RegionTable : private boost::noncopyable static const UInt64 SafeTsDiffThreshold = 2 * 60 * 1000; bool isSafeTSLag(UInt64 region_id, UInt64 * leader_safe_ts, UInt64 * self_safe_ts); + UInt64 getSelfSafeTS(UInt64 region_id); private: friend class MockTiDB; diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.cpp b/dbms/src/TestUtils/MPPTaskTestUtils.cpp index d86ce0befd8..f9a9123057c 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.cpp +++ b/dbms/src/TestUtils/MPPTaskTestUtils.cpp @@ -23,6 +23,7 @@ DAGProperties getDAGPropertiesForTest(int server_num) properties.is_mpp_query = true; properties.mpp_partition_num = server_num; properties.start_ts = MockTimeStampGenerator::instance().nextTs(); + properties.local_query_id = properties.start_ts; return properties; } @@ -68,7 +69,7 @@ size_t MPPTaskTestUtils::serverNum() return server_num; } -std::tuple> MPPTaskTestUtils::prepareMPPStreams(DAGRequestBuilder builder) +std::tuple> MPPTaskTestUtils::prepareMPPStreams(DAGRequestBuilder builder) { auto properties = DB::tests::getDAGPropertiesForTest(serverNum()); auto tasks = builder.buildMPPTasks(context, properties); @@ -76,7 +77,7 @@ std::tuple> MPPTaskTestUtils::prepareMP TiFlashTestEnv::getGlobalContext(i).setCancelTest(); MockComputeServerManager::instance().setMockStorage(context.mockStorage()); auto res = executeMPPQueryWithMultipleContext(properties, tasks, MockComputeServerManager::instance().getServerConfigMap()); - return {properties.start_ts, res}; + return {MPPQueryId(properties.query_ts, properties.local_query_id, properties.server_id, properties.start_ts), res}; } ColumnsWithTypeAndName MPPTaskTestUtils::exeucteMPPTasks(QueryTasks & tasks, const DAGProperties & properties, std::unordered_map & server_config_map) @@ -133,14 +134,14 @@ String MPPTaskTestUtils::queryInfo(size_t server_id) return buf.toString(); } -::testing::AssertionResult MPPTaskTestUtils::assertQueryCancelled(size_t start_ts) +::testing::AssertionResult MPPTaskTestUtils::assertQueryCancelled(const MPPQueryId & query_id) { auto seconds = std::chrono::seconds(1); auto retry_times = 0; for (int i = test_meta.context_idx; i < TiFlashTestEnv::globalContextSize(); ++i) { // wait until the task is empty for - while (TiFlashTestEnv::getGlobalContext(i).getTMTContext().getMPPTaskManager()->getQueryTaskSet(start_ts) != nullptr) + while (TiFlashTestEnv::getGlobalContext(i).getTMTContext().getMPPTaskManager()->getQueryTaskSet(query_id) != nullptr) { std::this_thread::sleep_for(seconds); retry_times++; @@ -154,13 +155,13 @@ ::testing::AssertionResult MPPTaskTestUtils::assertQueryCancelled(size_t start_t return ::testing::AssertionSuccess(); } -::testing::AssertionResult MPPTaskTestUtils::assertQueryActive(size_t start_ts) +::testing::AssertionResult MPPTaskTestUtils::assertQueryActive(const MPPQueryId & query_id) { for (int i = test_meta.context_idx; i < TiFlashTestEnv::globalContextSize(); ++i) { - if (TiFlashTestEnv::getGlobalContext(i).getTMTContext().getMPPTaskManager()->getQueryTaskSet(start_ts) == nullptr) + if (TiFlashTestEnv::getGlobalContext(i).getTMTContext().getMPPTaskManager()->getQueryTaskSet(query_id) == nullptr) { - return ::testing::AssertionFailure() << "Query " << start_ts << "not active" << std::endl; + return ::testing::AssertionFailure() << "Query " << query_id.toString() << "not active" << std::endl; } } return ::testing::AssertionSuccess(); diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.h b/dbms/src/TestUtils/MPPTaskTestUtils.h index ab147864492..cb0e84a2a14 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.h +++ b/dbms/src/TestUtils/MPPTaskTestUtils.h @@ -23,7 +23,6 @@ namespace DB::tests { - DAGProperties getDAGPropertiesForTest(int server_num); class MockTimeStampGenerator : public ext::Singleton { @@ -81,14 +80,14 @@ class MPPTaskTestUtils : public ExecutorTest static size_t serverNum(); // run mpp tasks which are ready to cancel, the return value is the start_ts of query. - std::tuple> prepareMPPStreams(DAGRequestBuilder builder); + std::tuple> prepareMPPStreams(DAGRequestBuilder builder); ColumnsWithTypeAndName exeucteMPPTasks(QueryTasks & tasks, const DAGProperties & properties, std::unordered_map & server_config_map); ColumnsWithTypeAndName executeCoprocessorTask(std::shared_ptr & dag_request); - static ::testing::AssertionResult assertQueryCancelled(size_t start_ts); - static ::testing::AssertionResult assertQueryActive(size_t start_ts); + static ::testing::AssertionResult assertQueryCancelled(const MPPQueryId & query_id); + static ::testing::AssertionResult assertQueryActive(const MPPQueryId & query_id); static String queryInfo(size_t server_id); protected: diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index 2ed1de79057..e6c9a82a231 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -90,7 +90,7 @@ void DAGRequestBuilder::initDAGRequest(tipb::DAGRequest & dag_request) std::shared_ptr DAGRequestBuilder::build(MockDAGRequestContext & mock_context, DAGRequestType type) { // build tree struct base executor - MPPInfo mpp_info(properties.start_ts, -1, -1, {}, mock_context.receiver_source_task_ids_map); + MPPInfo mpp_info(properties.start_ts, properties.query_ts, properties.server_id, properties.local_query_id, -1, -1, {}, mock_context.receiver_source_task_ids_map); std::shared_ptr dag_request_ptr = std::make_shared(); tipb::DAGRequest & dag_request = *dag_request_ptr; initDAGRequest(dag_request); diff --git a/libs/libcommon/include/common/logger_useful.h b/libs/libcommon/include/common/logger_useful.h index 3858258d2ce..7e9498ec853 100644 --- a/libs/libcommon/include/common/logger_useful.h +++ b/libs/libcommon/include/common/logger_useful.h @@ -26,6 +26,12 @@ #define QUERY_PREVIEW_LENGTH 160 #endif +namespace DB +{ +/// Tracing logs are filtered by SourceFilterChannel. +inline constexpr auto tracing_log_source = "mpp_task_tracing"; +} // namespace DB + namespace LogFmtDetails { // https://stackoverflow.com/questions/8487986/file-macro-shows-full-path/54335644#54335644 diff --git a/libs/libdaemon/src/BaseDaemon.cpp b/libs/libdaemon/src/BaseDaemon.cpp index 61620e48735..0e572b13573 100644 --- a/libs/libdaemon/src/BaseDaemon.cpp +++ b/libs/libdaemon/src/BaseDaemon.cpp @@ -48,7 +48,6 @@ #include #include #include -#include #include #include #include diff --git a/metrics/grafana/tiflash_summary.json b/metrics/grafana/tiflash_summary.json index 87c5c9ad035..4f173c6a7b3 100644 --- a/metrics/grafana/tiflash_summary.json +++ b/metrics/grafana/tiflash_summary.json @@ -8766,10 +8766,10 @@ "h": 7, "w": 12, "x": 0, - "y": 107 + "y": 9 }, "hiddenSeries": false, - "id": 35, + "id": 166, "legend": { "alignAsTable": false, "avg": false, @@ -8799,8 +8799,10 @@ "steppedLine": false, "targets": [ { - "expr": "sum(rate(tiflash_raft_read_index_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance)", + "exemplar": true, + "expr": "sum(rate(tiflash_stale_read_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance)", "format": "time_series", + "interval": "", "intervalFactor": 1, "legendFormat": "{{instance}}", "refId": "A" @@ -8810,7 +8812,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Raft Read Index OPS", + "title": "Stale Read OPS", "tooltip": { "shared": true, "sort": 0, @@ -8826,6 +8828,7 @@ }, "yaxes": [ { + "$$hashKey": "object:435", "decimals": null, "format": "ops", "label": null, @@ -8835,6 +8838,7 @@ "show": true }, { + "$$hashKey": "object:436", "format": "none", "label": null, "logBase": 1, @@ -8858,16 +8862,16 @@ "defaults": {}, "overrides": [] }, - "fill": 1, + "fill": 0, "fillGradient": 0, "gridPos": { "h": 7, "w": 12, "x": 12, - "y": 107 + "y": 9 }, "hiddenSeries": false, - "id": 36, + "id": 35, "legend": { "alignAsTable": false, "avg": false, @@ -8897,39 +8901,18 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(1.00, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", + "expr": "sum(rate(tiflash_raft_read_index_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance)", "format": "time_series", "intervalFactor": 1, - "legendFormat": "max", + "legendFormat": "{{instance}}", "refId": "A" - }, - { - "expr": "histogram_quantile(0.99, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "99", - "refId": "B" - }, - { - "expr": "histogram_quantile(0.95, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "95", - "refId": "C" - }, - { - "expr": "histogram_quantile(0.80, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "80", - "refId": "D" } ], "thresholds": [], "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Raft Batch Read Index Duration", + "title": "Raft Read Index OPS", "tooltip": { "shared": true, "sort": 0, @@ -8945,7 +8928,8 @@ }, "yaxes": [ { - "format": "s", + "decimals": null, + "format": "ops", "label": null, "logBase": 1, "max": null, @@ -8953,7 +8937,7 @@ "show": true }, { - "format": "short", + "format": "none", "label": null, "logBase": 1, "max": null, @@ -8982,7 +8966,7 @@ "h": 7, "w": 12, "x": 0, - "y": 114 + "y": 16 }, "hiddenSeries": false, "id": 37, @@ -9116,7 +9100,126 @@ "h": 7, "w": 12, "x": 12, - "y": 114 + "y": 16 + }, + "hiddenSeries": false, + "id": 36, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(1.00, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "max", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "99", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "95", + "refId": "C" + }, + { + "expr": "histogram_quantile(0.80, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "80", + "refId": "D" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Raft Batch Read Index Duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of currently applying snapshots.", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 24, + "x": 0, + "y": 23 }, "hiddenSeries": false, "id": 75, @@ -9220,7 +9323,7 @@ "h": 7, "w": 24, "x": 0, - "y": 121 + "y": 30 }, "hiddenSeries": false, "id": 82, @@ -9373,7 +9476,7 @@ "h": 7, "w": 12, "x": 0, - "y": 128 + "y": 37 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9443,7 +9546,7 @@ "h": 7, "w": 12, "x": 12, - "y": 128 + "y": 37 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9513,7 +9616,7 @@ "h": 7, "w": 12, "x": 0, - "y": 135 + "y": 44 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9583,7 +9686,7 @@ "h": 7, "w": 12, "x": 12, - "y": 135 + "y": 44 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9647,7 +9750,7 @@ "h": 7, "w": 24, "x": 0, - "y": 142 + "y": 51 }, "height": "", "hiddenSeries": false, @@ -9761,7 +9864,7 @@ "h": 7, "w": 12, "x": 0, - "y": 149 + "y": 58 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9830,7 +9933,7 @@ "h": 7, "w": 12, "x": 12, - "y": 149 + "y": 58 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9900,7 +10003,7 @@ "h": 7, "w": 12, "x": 0, - "y": 156 + "y": 65 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9970,7 +10073,7 @@ "h": 7, "w": 12, "x": 12, - "y": 156 + "y": 65 }, "heatmap": {}, "hideZeroBuckets": true, @@ -10040,7 +10143,7 @@ "h": 7, "w": 12, "x": 0, - "y": 163 + "y": 72 }, "heatmap": {}, "hideZeroBuckets": true, @@ -10106,7 +10209,7 @@ "h": 7, "w": 12, "x": 12, - "y": 163 + "y": 72 }, "hiddenSeries": false, "id": 91,