Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: hehechen <awd123456sss@gmail.com>
  • Loading branch information
hehechen committed Dec 14, 2022
1 parent ce91962 commit acd3eb6
Show file tree
Hide file tree
Showing 16 changed files with 127 additions and 130 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ namespace DB
F(type_exchange_partition, {"type", "exchange_partition"})) \
M(tiflash_schema_apply_duration_seconds, "Bucketed histogram of ddl apply duration", Histogram, \
F(type_ddl_apply_duration, {{"req", "ddl_apply_duration"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \
M(tiflash_stale_read_count, "Total number of stale read", Counter) \
M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \
M(tiflash_stale_read_count, "Total number of stale read", Counter) \
M(tiflash_raft_read_index_duration_seconds, "Bucketed histogram of raft read index duration", Histogram, \
F(type_raft_read_index_duration, {{"type", "tmt_raft_read_index_duration"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_raft_wait_index_duration_seconds, "Bucketed histogram of raft wait index duration", Histogram, \
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Debug/MockComputeServerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ void MockComputeServerManager::addServer(size_t partition_id, std::unique_ptr<Fl
server_map[partition_id] = std::move(server);
}

void MockComputeServerManager::cancelQuery(String query_id)
void MockComputeServerManager::cancelQuery(const MPPQueryId & query_id)
{
mpp::CancelTaskRequest req;
auto * meta = req.mutable_meta();
auto [query_ts, local_query_id_, server_id] = MPPTaskId::decodeQueryID(query_id);
meta->set_query_ts(query_ts);
meta->set_local_query_id(local_query_id_);
meta->set_server_id(server_id);
meta->set_query_ts(query_id.query_ts);
meta->set_local_query_id(query_id.local_query_id);
meta->set_server_id(query_id.server_id);
meta->set_start_ts(query_id.start_ts);
mpp::CancelTaskResponse response;
for (const auto & server : server_map)
server.second->flashService()->cancelMPPTaskForTest(&req, &response);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockComputeServerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class MockComputeServerManager : public ext::Singleton<MockComputeServerManager>

void resetMockMPPServerInfo(size_t partition_num);

void cancelQuery(String query_id);
void cancelQuery(const MPPQueryId & query_id);

static String queryInfo();

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ grpc::Status FlashService::CancelMPPTask(

auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(MPPTaskId::generateQueryID(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPQueryId(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
return grpc::Status::OK;
}

Expand Down Expand Up @@ -383,7 +383,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(const ::mpp::CancelTaskRequest
}
auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(MPPTaskId::generateQueryID(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPQueryId(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
return grpc::Status::OK;
}

Expand Down
47 changes: 29 additions & 18 deletions dbms/src/Flash/Mpp/MPPTaskId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,50 @@
// limitations under the License.

#include <Flash/Mpp/MPPTaskId.h>
#include <IO/WriteHelpers.h>
#include <fmt/core.h>

namespace DB
{
String MPPTaskId::toString() const
bool MPPQueryId::operator<(const MPPQueryId & mpp_query_id) const
{
return isUnknown() ? "MPP<query_id:N/A,start_ts:N/A,query_ts:N/A,task_id:N/A,server_id:N/A>" : fmt::format("MPP<query:{},start_ts:{},query_ts:{},task_id:{},server_id:{},local_query_id:{}>", query_id, start_ts, query_ts, task_id, server_id, local_query_id);
if (query_ts == 0 && local_query_id == 0 && server_id == 0)
{
return start_ts < mpp_query_id.start_ts;
}
return (query_ts < mpp_query_id.query_ts) || (local_query_id < mpp_query_id.local_query_id) || (server_id < mpp_query_id.server_id);
}
bool MPPQueryId::operator==(const MPPQueryId & rid) const
{
return query_ts == rid.query_ts && local_query_id == rid.local_query_id && server_id == rid.server_id && start_ts == rid.start_ts;
}
bool MPPQueryId::operator!=(const MPPQueryId & rid) const
{
return !(*this == rid);
}
bool MPPQueryId::operator<=(const MPPQueryId & rid) const
{
return *this < rid || *this == rid;
}

const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{};

constexpr UInt64 MAX_UINT64 = std::numeric_limits<UInt64>::max();
const String MPPTaskId::Max_Query_Id = generateQueryID(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64);

String MPPTaskId::generateQueryID(UInt64 query_ts_, UInt64 local_query_id_, UInt64 server_id_, UInt64 start_ts_)
size_t MPPQueryIdHash::operator()(MPPQueryId const & mpp_query_id) const noexcept
{
if (local_query_id_ == 0)
if (mpp_query_id.query_ts == 0 && mpp_query_id.local_query_id == 0 && mpp_query_id.server_id == 0)
{
return intToHex(0) + intToHex(0) + intToHex(start_ts_);
return std::hash<UInt64>()(mpp_query_id.start_ts);
}
return intToHex(query_ts_) + intToHex(local_query_id_) + intToHex(server_id_);
return std::hash<UInt64>()(mpp_query_id.query_ts) ^ std::hash<UInt64>()(mpp_query_id.local_query_id) ^ std::hash<UInt64>()(mpp_query_id.server_id);
}

std::tuple<UInt64, UInt64, UInt64> MPPTaskId::decodeQueryID(String query_id_str)
String MPPTaskId::toString() const
{
UInt64 decode_query_ts = std::stoull(query_id_str.substr(0, sizeof(Int64) * 2), 0, 16);
UInt64 decode_local_query_id = std::stoull(query_id_str.substr(sizeof(Int64) * 2, sizeof(Int64) * 2), 0, 16);
UInt64 decode_server_id = std::stoull(query_id_str.substr(sizeof(Int64) * 2 * 2, sizeof(UInt64) * 2), 0, 16);
LOG_DEBUG(Logger::get(__FUNCTION__), "query_ts_={}, local_query_id_={}, server_id_={}", decode_query_ts, decode_local_query_id, decode_server_id);
return {decode_query_ts, decode_local_query_id, decode_server_id};
return isUnknown() ? "MPP<query_id:N/A,start_ts:N/A,task_id:N/A>" : fmt::format("MPP<query:{},start_ts:{},task_id:{}>", query_id.toDebugString(), start_ts, task_id);
}

const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{};

constexpr UInt64 MAX_UINT64 = std::numeric_limits<UInt64>::max();
const MPPQueryId MPPTaskId::Max_Query_Id = MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64);

bool operator==(const MPPTaskId & lid, const MPPTaskId & rid)
{
return lid.query_id == rid.query_id && lid.task_id == rid.task_id;
Expand Down
63 changes: 37 additions & 26 deletions dbms/src/Flash/Mpp/MPPTaskId.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,64 @@
#pragma once

#include <common/types.h>
#include <fmt/core.h>

namespace DB
{
// global unique MPP query id.
struct MPPQueryId
{
UInt64 query_ts;
UInt64 local_query_id;
UInt64 server_id;
UInt64 start_ts;
MPPQueryId(UInt64 query_ts, UInt64 local_query_id, UInt64 server_id, UInt64 start_ts)
: query_ts(query_ts)
, local_query_id(local_query_id)
, server_id(server_id)
, start_ts(start_ts)
{}
bool operator<(const MPPQueryId & mpp_query_id) const;
bool operator==(const MPPQueryId & rid) const;
bool operator!=(const MPPQueryId & rid) const;
bool operator<=(const MPPQueryId & rid) const;
String toDebugString() const
{
return fmt::format("query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}", query_ts, local_query_id, server_id, start_ts);
}
};

struct MPPQueryIdHash
{
size_t operator()(MPPQueryId const & mpp_query_id) const noexcept;
};

// Identify a mpp task.
struct MPPTaskId
{
MPPTaskId()
: start_ts(0)
, server_id(0)
, query_ts(0)
, task_id(unknown_task_id){};
, task_id(unknown_task_id)
, query_id({0, 0, 0, 0}){};

MPPTaskId(UInt64 start_ts_, Int64 task_id_, UInt64 server_id_, UInt64 query_ts_, UInt64 local_query_id_)
MPPTaskId(UInt64 start_ts_, Int64 task_id_, UInt64 server_id, UInt64 query_ts, UInt64 local_query_id)
: start_ts(start_ts_)
, server_id(server_id_)
, query_ts(query_ts_)
, task_id(task_id_)
, local_query_id(local_query_id_)
{
query_id = generateQueryID(query_ts, local_query_id, server_id, start_ts_);
}
, query_id(query_ts, local_query_id, server_id, start_ts_)
{}

UInt64 start_ts;
UInt64 server_id;
UInt64 query_ts;
Int64 task_id;
UInt64 local_query_id;
String query_id;
MPPQueryId query_id;

bool isUnknown() const { return task_id == unknown_task_id; }

String toString() const;
static const MPPTaskId unknown_mpp_task_id;
static const String Max_Query_Id;
static String generateQueryID(UInt64 query_ts_, UInt64 local_query_id_, UInt64 server_id_, UInt64 start_ts_);
static std::tuple<UInt64, UInt64, UInt64> decodeQueryID(String query_id_str);
static const MPPQueryId Max_Query_Id;

private:
static constexpr Int64 unknown_task_id = -1;

static String intToHex(UInt64 i)
{
std::stringstream stream;
stream << std::setfill('0') << std::setw(sizeof(UInt64) * 2)
<< std::hex << i;
return stream.str();
}
};

bool operator==(const MPPTaskId & lid, const MPPTaskId & rid);
Expand All @@ -75,7 +86,7 @@ class hash<DB::MPPTaskId>
public:
size_t operator()(const DB::MPPTaskId & id) const
{
return hash<String>()(id.query_id);
return DB::MPPQueryIdHash()(id.query_id) ^ hash<Int64>()(id.task_id);
}
};
} // namespace std
30 changes: 15 additions & 15 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_)
, log(Logger::get())
{}

MPPQueryTaskSetPtr MPPTaskManager::addMPPQueryTaskSet(String query_id)
MPPQueryTaskSetPtr MPPTaskManager::addMPPQueryTaskSet(const MPPQueryId & query_id)
{
auto ptr = std::make_shared<MPPQueryTaskSet>();
mpp_query_map.insert({query_id, ptr});
GET_METRIC(tiflash_mpp_task_manager, type_mpp_query_count).Set(mpp_query_map.size());
return ptr;
}

void MPPTaskManager::removeMPPQueryTaskSet(String query_id, bool on_abort)
void MPPTaskManager::removeMPPQueryTaskSet(const MPPQueryId & query_id, bool on_abort)
{
scheduler->deleteQuery(query_id, *this, on_abort);
mpp_query_map.erase(query_id);
Expand All @@ -62,7 +62,7 @@ std::pair<MPPTunnelPtr, String> MPPTaskManager::findAsyncTunnel(const ::mpp::Est
if (query_it != mpp_query_map.end() && !query_it->second->isInNormalState())
{
/// if the query is aborted, return the error message
LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id));
LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toDebugString()));
/// meet error
return {nullptr, query_it->second->error_message};
}
Expand Down Expand Up @@ -100,7 +100,7 @@ std::pair<MPPTunnelPtr, String> MPPTaskManager::findAsyncTunnel(const ::mpp::Est
cv.notify_all();
}
}
return {nullptr, fmt::format("Can't find task [{},{},{}] within 10 s.", id.query_id, id.start_ts, id.task_id)};
return {nullptr, fmt::format("Can't find task [{},{},{}] within 10 s.", id.query_id.toDebugString(), id.start_ts, id.task_id)};
}
}
/// don't need to delete the alarm here because registerMPPTask will delete all the related alarm
Expand All @@ -127,7 +127,7 @@ std::pair<MPPTunnelPtr, String> MPPTaskManager::findTunnelWithTimeout(const ::mp
else if (!query_it->second->isInNormalState())
{
/// if the query is aborted, return true to stop waiting timeout.
LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id));
LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toDebugString()));
cancelled = true;
error_message = query_it->second->error_message;
return true;
Expand All @@ -147,9 +147,9 @@ std::pair<MPPTunnelPtr, String> MPPTaskManager::findTunnelWithTimeout(const ::mp
return it->second->getTunnel(request);
}

void MPPTaskManager::abortMPPQuery(String query_id, const String & reason, AbortType abort_type)
void MPPTaskManager::abortMPPQuery(const MPPQueryId & query_id, const String & reason, AbortType abort_type)
{
LOG_WARNING(log, fmt::format("Begin to abort query: {}, abort type: {}, reason: {}", query_id, magic_enum::enum_name(abort_type), reason));
LOG_WARNING(log, fmt::format("Begin to abort query: {}, abort type: {}, reason: {}", query_id.toDebugString(), magic_enum::enum_name(abort_type), reason));
MPPQueryTaskSetPtr task_set;
{
/// abort task may take a long time, so first
Expand All @@ -159,12 +159,12 @@ void MPPTaskManager::abortMPPQuery(String query_id, const String & reason, Abort
auto it = mpp_query_map.find(query_id);
if (it == mpp_query_map.end())
{
LOG_WARNING(log, fmt::format("{} does not found in task manager, skip abort", query_id));
LOG_WARNING(log, fmt::format("{} does not found in task manager, skip abort", query_id.toDebugString()));
return;
}
else if (!it->second->isInNormalState())
{
LOG_WARNING(log, fmt::format("{} already in abort process, skip abort", query_id));
LOG_WARNING(log, fmt::format("{} already in abort process, skip abort", query_id.toDebugString()));
return;
}
it->second->state = MPPQueryTaskSet::Aborting;
Expand All @@ -178,7 +178,7 @@ void MPPTaskManager::abortMPPQuery(String query_id, const String & reason, Abort
it->second->alarms.clear();
if (it->second->task_map.empty())
{
LOG_INFO(log, fmt::format("There is no mpp task for {}, finish abort", query_id));
LOG_INFO(log, fmt::format("There is no mpp task for {}, finish abort", query_id.toDebugString()));
removeMPPQueryTaskSet(query_id, true);
cv.notify_all();
return;
Expand All @@ -189,7 +189,7 @@ void MPPTaskManager::abortMPPQuery(String query_id, const String & reason, Abort
}

FmtBuffer fmt_buf;
fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id);
fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id.toDebugString());
for (auto & it : task_set->task_map)
fmt_buf.fmtAppend("{} ", it.first.toString());
LOG_WARNING(log, fmt_buf.toString());
Expand All @@ -200,11 +200,11 @@ void MPPTaskManager::abortMPPQuery(String query_id, const String & reason, Abort
{
std::lock_guard lock(mu);
auto it = mpp_query_map.find(query_id);
RUNTIME_ASSERT(it != mpp_query_map.end(), log, "MPPTaskQuerySet {} should remaining in MPPTaskManager", query_id);
RUNTIME_ASSERT(it != mpp_query_map.end(), log, "MPPTaskQuerySet {} should remaining in MPPTaskManager", query_id.toDebugString());
it->second->state = MPPQueryTaskSet::Aborted;
cv.notify_all();
}
LOG_WARNING(log, "Finish abort query: " + query_id);
LOG_WARNING(log, "Finish abort query: " + query_id.toDebugString());
}

std::pair<bool, String> MPPTaskManager::registerTask(MPPTaskPtr task)
Expand Down Expand Up @@ -282,13 +282,13 @@ String MPPTaskManager::toString()
return res + ")";
}

MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSetWithoutLock(String query_id)
MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSetWithoutLock(const MPPQueryId & query_id)
{
auto it = mpp_query_map.find(query_id);
return it == mpp_query_map.end() ? nullptr : it->second;
}

MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSet(String query_id)
MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSet(const MPPQueryId & query_id)
{
std::lock_guard lock(mu);
return getQueryTaskSetWithoutLock(query_id);
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ using MPPQueryTaskSetPtr = std::shared_ptr<MPPQueryTaskSet>;
/// a map from the mpp query id to mpp query task set, we use
/// the query_ts + local_query_id + serverID as the query id, because TiDB can't guarantee
/// the uniqueness of the start ts when stale read or set snapshot
using MPPQueryMap = std::unordered_map<String, MPPQueryTaskSetPtr>;
using MPPQueryMap = std::unordered_map<MPPQueryId, MPPQueryTaskSetPtr, MPPQueryIdHash>;

// MPPTaskManger holds all running mpp tasks. It's a single instance holden in Context.
class MPPTaskManager : private boost::noncopyable
Expand All @@ -77,9 +77,9 @@ class MPPTaskManager : private boost::noncopyable

~MPPTaskManager() = default;

MPPQueryTaskSetPtr getQueryTaskSetWithoutLock(String query_id);
MPPQueryTaskSetPtr getQueryTaskSetWithoutLock(const MPPQueryId & query_id);

MPPQueryTaskSetPtr getQueryTaskSet(String query_id);
MPPQueryTaskSetPtr getQueryTaskSet(const MPPQueryId & query_id);

std::pair<bool, String> registerTask(MPPTaskPtr task);

Expand All @@ -93,13 +93,13 @@ class MPPTaskManager : private boost::noncopyable

std::pair<MPPTunnelPtr, String> findAsyncTunnel(const ::mpp::EstablishMPPConnectionRequest * request, EstablishCallData * call_data, grpc::CompletionQueue * cq);

void abortMPPQuery(String query_id, const String & reason, AbortType abort_type);
void abortMPPQuery(const MPPQueryId & query_id, const String & reason, AbortType abort_type);

String toString();

private:
MPPQueryTaskSetPtr addMPPQueryTaskSet(String query_id);
void removeMPPQueryTaskSet(String query_id, bool on_abort);
MPPQueryTaskSetPtr addMPPQueryTaskSet(const MPPQueryId & query_id);
void removeMPPQueryTaskSet(const MPPQueryId & query_id, bool on_abort);
};

} // namespace DB
Loading

0 comments on commit acd3eb6

Please sign in to comment.