Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TiFlash supports stale read #6459

Merged
merged 8 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ namespace DB
M(tiflash_schema_apply_duration_seconds, "Bucketed histogram of ddl apply duration", Histogram, \
F(type_ddl_apply_duration, {{"req", "ddl_apply_duration"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \
M(tiflash_stale_read_count, "Total number of stale read", Counter) \
M(tiflash_raft_read_index_duration_seconds, "Bucketed histogram of raft read index duration", Histogram, \
F(type_raft_read_index_duration, {{"type", "tmt_raft_read_index_duration"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_raft_wait_index_duration_seconds, "Bucketed histogram of raft wait index duration", Histogram, \
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Debug/DAGProperties.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ struct DAGProperties
bool use_broadcast_join = false;
Int32 mpp_partition_num = 1;
Timestamp start_ts = DEFAULT_MAX_READ_TSO;
UInt64 query_ts = 0;
UInt64 server_id = 1;
UInt64 local_query_id = 1;
Int64 task_id = 1;

Int32 mpp_timeout = 10;
};
} // namespace DB
7 changes: 5 additions & 2 deletions dbms/src/Debug/MockComputeServerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,14 @@ void MockComputeServerManager::addServer(size_t partition_id, std::unique_ptr<Fl
server_map[partition_id] = std::move(server);
}

void MockComputeServerManager::cancelQuery(size_t start_ts)
void MockComputeServerManager::cancelQuery(const MPPQueryId & query_id)
{
mpp::CancelTaskRequest req;
auto * meta = req.mutable_meta();
meta->set_start_ts(start_ts);
meta->set_query_ts(query_id.query_ts);
meta->set_local_query_id(query_id.local_query_id);
meta->set_server_id(query_id.server_id);
meta->set_start_ts(query_id.start_ts);
mpp::CancelTaskResponse response;
for (const auto & server : server_map)
server.second->flashService()->cancelMPPTaskForTest(&req, &response);
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Debug/MockComputeServerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

namespace DB::tests
{

/** Hold Mock Compute Server to manage the lifetime of them.
* Maintains Mock Compute Server info.
*/
Expand All @@ -49,7 +48,7 @@ class MockComputeServerManager : public ext::Singleton<MockComputeServerManager>

void resetMockMPPServerInfo(size_t partition_num);

void cancelQuery(size_t start_ts);
void cancelQuery(const MPPQueryId & query_id);

static String queryInfo();

Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Debug/MockExecutor/AstToPB.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,27 @@ using MPPCtxPtr = std::shared_ptr<MPPCtx>;
struct MPPInfo
{
Timestamp start_ts;
UInt64 query_ts;
UInt64 server_id;
UInt64 local_query_id;
Int64 partition_id;
Int64 task_id;
const std::vector<Int64> sender_target_task_ids;
const std::unordered_map<String, std::vector<Int64>> receiver_source_task_ids_map;

MPPInfo(
Timestamp start_ts_,
UInt64 query_ts_,
UInt64 server_id_,
UInt64 local_query_id_,
Int64 partition_id_,
Int64 task_id_,
const std::vector<Int64> & sender_target_task_ids_,
const std::unordered_map<String, std::vector<Int64>> & receiver_source_task_ids_map_)
: start_ts(start_ts_)
, query_ts(query_ts_)
, server_id(server_id_)
, local_query_id(local_query_id_)
, partition_id(partition_id_)
, task_id(task_id_)
, sender_target_task_ids(sender_target_task_ids_)
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ bool ExchangeReceiverBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int3
{
mpp::TaskMeta meta;
meta.set_start_ts(mpp_info.start_ts);
meta.set_query_ts(mpp_info.query_ts);
meta.set_server_id(mpp_info.server_id);
meta.set_local_query_id(mpp_info.local_query_id);
meta.set_task_id(it->second[i]);
meta.set_partition_id(i);
auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i].addr : Debug::LOCAL_HOST;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ bool ExchangeSenderBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_
{
mpp::TaskMeta meta;
meta.set_start_ts(mpp_info.start_ts);
meta.set_query_ts(mpp_info.query_ts);
meta.set_server_id(mpp_info.server_id);
meta.set_local_query_id(mpp_info.local_query_id);
meta.set_task_id(task_id);
meta.set_partition_id(i);
auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i++].addr : Debug::LOCAL_HOST;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncMisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ inline size_t getReadTSOForLog(const String & line)
{
std::regex rx(R"((0|[1-9][0-9]*))");
std::smatch m;
// Rely on that MPP task prefix "MPP<query:435802637197639681,task:1>"
auto pos = line.find("query:");
// Rely on that MPP task prefix "MPP<query:<query_ts:1671124209981679458, local_query_id:42578432, server_id:3340035, start_ts:438075169172357120>,task_id:42578433>"
auto pos = line.find(", start_ts:");
hehechen marked this conversation as resolved.
Show resolved Hide resolved
if (pos != std::string::npos && regex_search(line.cbegin() + pos, line.cend(), m, rx))
{
return std::stoul(m[1]);
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Debug/dbgQueryCompiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ struct QueryFragment
{
MPPInfo mpp_info(
properties.start_ts,
properties.query_ts,
properties.server_id,
properties.local_query_id,
partition_id,
task_ids[partition_id],
sender_target_task_ids,
Expand All @@ -141,7 +144,7 @@ struct QueryFragment
}
else
{
MPPInfo mpp_info(properties.start_ts, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {});
MPPInfo mpp_info(properties.start_ts, properties.query_ts, properties.server_id, properties.local_query_id, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {});
ret.push_back(toQueryTask(properties, mpp_info, context));
}
return ret;
Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ BlockInputStreamPtr constructExchangeReceiverStream(Context & context, tipb::Exc

mpp::TaskMeta root_tm;
root_tm.set_start_ts(properties.start_ts);
root_tm.set_query_ts(properties.query_ts);
root_tm.set_local_query_id(properties.local_query_id);
root_tm.set_server_id(properties.server_id);
root_tm.set_address(root_addr);
root_tm.set_task_id(-1);
root_tm.set_partition_id(-1);
Expand Down Expand Up @@ -71,6 +74,9 @@ BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProp
{
mpp::TaskMeta tm;
tm.set_start_ts(properties.start_ts);
tm.set_query_ts(properties.query_ts);
tm.set_local_query_id(properties.local_query_id);
tm.set_server_id(properties.server_id);
tm.set_address(Debug::LOCAL_HOST);
tm.set_task_id(root_task_id);
tm.set_partition_id(-1);
Expand All @@ -84,6 +90,9 @@ void prepareExchangeReceiverMetaWithMultipleContext(tipb::ExchangeReceiver & tip
{
mpp::TaskMeta tm;
tm.set_start_ts(properties.start_ts);
tm.set_query_ts(properties.query_ts);
tm.set_local_query_id(properties.local_query_id);
tm.set_server_id(properties.server_id);
tm.set_address(addr);
tm.set_task_id(task_id);
tm.set_partition_id(-1);
Expand All @@ -109,6 +118,9 @@ void prepareDispatchTaskRequest(QueryTask & task, std::shared_ptr<mpp::DispatchT
}
auto * tm = req->mutable_meta();
tm->set_start_ts(properties.start_ts);
tm->set_query_ts(properties.query_ts);
tm->set_local_query_id(properties.local_query_id);
tm->set_server_id(properties.server_id);
tm->set_partition_id(task.partition_id);
tm->set_address(addr);
tm->set_task_id(task.task_id);
Expand All @@ -128,6 +140,9 @@ void prepareDispatchTaskRequestWithMultipleContext(QueryTask & task, std::shared
}
auto * tm = req->mutable_meta();
tm->set_start_ts(properties.start_ts);
tm->set_query_ts(properties.query_ts);
tm->set_local_query_id(properties.local_query_id);
tm->set_server_id(properties.server_id);
tm->set_partition_id(task.partition_id);
tm->set_address(addr);
tm->set_task_id(task.task_id);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class DAGContext
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, mpp_task_meta(meta_)
, mpp_task_id(mpp_task_meta.start_ts(), mpp_task_meta.task_id())
, mpp_task_id(mpp_task_meta)
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ grpc::Status FlashService::CancelMPPTask(

auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPQueryId(request->meta()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION);
return grpc::Status::OK;
}

Expand Down Expand Up @@ -407,7 +407,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(const ::mpp::CancelTaskRequest
}
auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
task_manager->abortMPPQuery(MPPQueryId(request->meta()), "Receive cancel request from GTest", AbortType::ONCANCELLATION);
return grpc::Status::OK;
}

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ extern const char force_no_local_region_for_mpp_task[];

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
: meta(meta_)
, id(meta.start_ts(), meta.task_id())
, id(meta)
, context(context_)
, manager(context_->getTMTContext().getMPPTaskManager().get())
, schedule_entry(manager, id)
Expand Down Expand Up @@ -137,7 +137,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async);
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnel {} can not be registered, because the task is not in initializing state", tunnel->id()));
tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel);
tunnel_set_local->registerTunnel(MPPTaskId(task_meta), tunnel);
if (!dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task);
Expand Down Expand Up @@ -202,7 +202,7 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
return {nullptr, err_msg};
}

MPPTaskId receiver_id{request->receiver_meta().start_ts(), request->receiver_meta().task_id()};
MPPTaskId receiver_id(request->receiver_meta());
RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set");
auto tunnel_ptr = tunnel_set->getTunnelByReceiverTaskId(receiver_id);
if (tunnel_ptr == nullptr)
Expand Down Expand Up @@ -450,7 +450,7 @@ void MPPTask::runImpl()
void MPPTask::handleError(const String & error_msg)
{
auto updated_msg = fmt::format("From {}: {}", id.toString(), error_msg);
manager->abortMPPQuery(id.start_ts, updated_msg, AbortType::ONERROR);
manager->abortMPPQuery(id.query_id, updated_msg, AbortType::ONERROR);
if (!registered)
// if the task is not registered, need to cancel it explicitly
abort(error_msg, AbortType::ONERROR);
Expand Down
76 changes: 74 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,91 @@
// limitations under the License.

#include <Flash/Mpp/MPPTaskId.h>
#include <common/likely.h>
#include <fmt/core.h>

namespace DB
{
bool isOldVersion(const MPPQueryId & mpp_query_id)
{
return mpp_query_id.query_ts == 0 && mpp_query_id.local_query_id == 0 && mpp_query_id.server_id == 0;
}


bool MPPQueryId::operator<(const MPPQueryId & mpp_query_id) const
{
// compare with MPP query generated by TiDB that version less than v6.6
bool left_old_version = isOldVersion(*this);
bool right_old_version = isOldVersion(mpp_query_id);
if (unlikely(left_old_version && right_old_version))
{
return start_ts < mpp_query_id.start_ts;
}
if (unlikely(left_old_version))
{
return true;
}
if (unlikely(right_old_version))
{
return false;
}
// compare with MPP query generated by TiDB that version after v6.6
if (query_ts != mpp_query_id.query_ts)
{
return query_ts < mpp_query_id.query_ts;
}
if (server_id == mpp_query_id.server_id)
{
return local_query_id < mpp_query_id.local_query_id;
}
// now we can't compare reasonably, just choose one randomly by hash.
auto lhash = MPPQueryIdHash()(*this);
auto rhash = MPPQueryIdHash()(mpp_query_id);
if (lhash != rhash)
{
return lhash < rhash;
}
// hash values are same, just compare the rest fields.
if (local_query_id != mpp_query_id.local_query_id)
{
return local_query_id < mpp_query_id.local_query_id;
}
return server_id < mpp_query_id.server_id;
}
bool MPPQueryId::operator==(const MPPQueryId & rid) const
{
return query_ts == rid.query_ts && local_query_id == rid.local_query_id && server_id == rid.server_id && start_ts == rid.start_ts;
}
bool MPPQueryId::operator!=(const MPPQueryId & rid) const
{
return !(*this == rid);
}
bool MPPQueryId::operator<=(const MPPQueryId & rid) const
{
return *this < rid || *this == rid;
}

size_t MPPQueryIdHash::operator()(MPPQueryId const & mpp_query_id) const noexcept
{
if (unlikely(isOldVersion(mpp_query_id)))
{
return std::hash<UInt64>()(mpp_query_id.start_ts);
}
return std::hash<UInt64>()(mpp_query_id.query_ts) ^ std::hash<UInt64>()(mpp_query_id.local_query_id) ^ std::hash<UInt64>()(mpp_query_id.server_id);
}

String MPPTaskId::toString() const
{
return isUnknown() ? "MPP<query:N/A,task:N/A>" : fmt::format("MPP<query:{},task:{}>", start_ts, task_id);
return isUnknown() ? "MPP<query_id:N/A,task_id:N/A>" : fmt::format("MPP<query:{},task_id:{}>", query_id.toString(), task_id);
}

const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{};

constexpr UInt64 MAX_UINT64 = std::numeric_limits<UInt64>::max();
const MPPQueryId MPPTaskId::Max_Query_Id = MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64);

bool operator==(const MPPTaskId & lid, const MPPTaskId & rid)
{
return lid.start_ts == rid.start_ts && lid.task_id == rid.task_id;
return lid.query_id == rid.query_id && lid.task_id == rid.task_id;
}
} // namespace DB
Loading