Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/pingcap/tiflash into fix_…
Browse files Browse the repository at this point in the history
…cpus
  • Loading branch information
xzhangxian1008 committed Dec 14, 2022
2 parents 4e8bfb4 + fa67119 commit 5f7408d
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 19 deletions.
3 changes: 3 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ namespace DB
F(type_merged_task, {{"type", "merged_task"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_mpp_task_manager, "The gauge of mpp task manager", Gauge, \
F(type_mpp_query_count, {"type", "mpp_query_count"})) \
M(tiflash_exchange_queueing_data_bytes, "Total bytes of data contained in the queue", Gauge, \
F(type_send, {{"type", "send_queue"}}), \
F(type_receive, {{"type", "recv_queue"}})) \
// clang-format on

/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ void EstablishCallData::trySendOneMsg()
switch (async_tunnel_sender->pop(res, this))
{
case GRPCSendQueueRes::OK:
async_tunnel_sender->subDataSizeMetric(res->getPacket().ByteSizeLong());
/// Note: has to switch the memory tracker before `write`
/// because after `write`, `async_tunnel_sender` can be destroyed at any time
/// so there is a risk that `res` is destructed after `aysnc_tunnel_sender`
Expand Down
38 changes: 34 additions & 4 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,26 @@ enum class AsyncRequestStage
FINISHED,
};

namespace ExchangeReceiverMetric
{
inline void addDataSizeMetric(std::atomic<Int64> & data_size_in_queue, size_t size)
{
data_size_in_queue.fetch_add(size);
GET_METRIC(tiflash_exchange_queueing_data_bytes, type_receive).Increment(size);
}

inline void subDataSizeMetric(std::atomic<Int64> & data_size_in_queue, size_t size)
{
data_size_in_queue.fetch_sub(size);
GET_METRIC(tiflash_exchange_queueing_data_bytes, type_receive).Decrement(size);
}

inline void clearDataSizeMetric(std::atomic<Int64> & data_size_in_queue)
{
GET_METRIC(tiflash_exchange_queueing_data_bytes, type_receive).Decrement(data_size_in_queue.load());
}
} // namespace ExchangeReceiverMetric

using Clock = std::chrono::system_clock;
using TimePoint = Clock::time_point;

Expand All @@ -176,12 +196,14 @@ class AsyncRequestHandler : public UnaryCallback<bool>
std::vector<MsgChannelPtr> * msg_channels_,
const std::shared_ptr<RPCContext> & context,
const Request & req,
const String & req_id)
const String & req_id,
std::atomic<Int64> * data_size_in_queue_)
: rpc_context(context)
, cq(&(GRPCCompletionQueuePool::global_instance->pickQueue()))
, request(&req)
, notify_queue(queue)
, msg_channels(msg_channels_)
, data_size_in_queue(data_size_in_queue_)
, req_info(fmt::format("tunnel{}+{}", req.send_task_id, req.recv_task_id))
, log(Logger::get(req_id, req_info))
{
Expand Down Expand Up @@ -369,6 +391,9 @@ class AsyncRequestHandler : public UnaryCallback<bool>
*msg_channels,
log))
return false;

ExchangeReceiverMetric::addDataSizeMetric(*data_size_in_queue, packet->getPacket().ByteSizeLong());

// can't reuse packet since it is sent to readers.
packet = std::make_shared<TrackedMppDataPacket>();
}
Expand All @@ -387,6 +412,7 @@ class AsyncRequestHandler : public UnaryCallback<bool>
const Request * request; // won't be null
MPMCQueue<Self *> * notify_queue; // won't be null
std::vector<MsgChannelPtr> * msg_channels; // won't be null
std::atomic<Int64> * data_size_in_queue; // won't be null

String req_info;
bool meet_error = false;
Expand Down Expand Up @@ -423,16 +449,15 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
, state(ExchangeReceiverState::NORMAL)
, exc_log(Logger::get(req_id, executor_id))
, collected(false)
, data_size_in_queue(0)
, disaggregated_dispatch_reqs(disaggregated_dispatch_reqs_)
{
try
{
if (enable_fine_grained_shuffle_flag)
{
for (size_t i = 0; i < output_stream_count; ++i)
{
msg_channels.push_back(std::make_unique<MPMCQueue<std::shared_ptr<ReceivedMessage>>>(max_buffer_size));
}
}
else
{
Expand Down Expand Up @@ -465,6 +490,7 @@ ExchangeReceiverBase<RPCContext>::~ExchangeReceiverBase()
{
close();
thread_manager->wait();
ExchangeReceiverMetric::clearDataSizeMetric(data_size_in_queue);
}
catch (...)
{
Expand Down Expand Up @@ -545,7 +571,7 @@ void ExchangeReceiverBase<RPCContext>::reactor(const std::vector<Request> & asyn
std::vector<std::unique_ptr<AsyncHandler>> handlers;
handlers.reserve(alive_async_connections);
for (const auto & req : async_requests)
handlers.emplace_back(std::make_unique<AsyncHandler>(&ready_requests, &msg_channels, rpc_context, req, exc_log->identifier()));
handlers.emplace_back(std::make_unique<AsyncHandler>(&ready_requests, &msg_channels, rpc_context, req, exc_log->identifier(), &data_size_in_queue));

while (alive_async_connections > 0)
{
Expand Down Expand Up @@ -617,6 +643,8 @@ void ExchangeReceiverBase<RPCContext>::readLoop(const Request & req)
local_err_msg = fmt::format("Push mpp packet failed. {}", getStatusString());
break;
}

ExchangeReceiverMetric::addDataSizeMetric(data_size_in_queue, packet->getPacket().ByteSizeLong());
}
// if meet error, such as decode packet fails, it will not retry.
if (meet_error)
Expand Down Expand Up @@ -713,6 +741,8 @@ ExchangeReceiverResult ExchangeReceiverBase<RPCContext>::nextResult(
assert(recv_msg != nullptr);
if (unlikely(recv_msg->error_ptr != nullptr))
return ExchangeReceiverResult::newError(recv_msg->source_index, recv_msg->req_info, recv_msg->error_ptr->msg());

ExchangeReceiverMetric::subDataSizeMetric(data_size_in_queue, recv_msg->packet->getPacket().ByteSizeLong());
return toDecodeResult(block_queue, header, recv_msg, decoder_ptr);
}
}
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ class ExchangeReceiverBase
bool collected = false;
int thread_count = 0;

std::atomic<Int64> data_size_in_queue;

// For tiflash_compute node, need to send MPPTask to tiflash_storage node.
std::vector<StorageDisaggregated::RequestAndRegionIDs> disaggregated_dispatch_reqs;
};
Expand Down
18 changes: 12 additions & 6 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ String tunnelSenderModeToString(TunnelSenderMode mode)
}

// Update metric for tunnel's response bytes
inline void updateMetric(size_t pushed_data_size, TunnelSenderMode mode)
void updateMetric(std::atomic<Int64> & data_size_in_queue, size_t pushed_data_size, TunnelSenderMode mode)
{
switch (mode)
{
Expand All @@ -59,6 +59,7 @@ inline void updateMetric(size_t pushed_data_size, TunnelSenderMode mode)
default:
throw DB::Exception("Illegal TunnelSenderMode");
}
MPPTunnelMetric::addDataSizeMetric(data_size_in_queue, pushed_data_size);
}
} // namespace

Expand Down Expand Up @@ -86,6 +87,7 @@ MPPTunnel::MPPTunnel(
, mem_tracker(current_memory_tracker ? current_memory_tracker->shared_from_this() : nullptr)
, queue_size(std::max(5, input_steams_num_ * 5)) // MPMCQueue can benefit from a slightly larger queue size
, log(Logger::get(req_id, tunnel_id))
, data_size_in_queue(0)
{
RUNTIME_ASSERT(!(is_local_ && is_async_), log, "is_local: {}, is_async: {}.", is_local_, is_async_);
if (is_local_)
Expand All @@ -105,6 +107,7 @@ MPPTunnel::~MPPTunnel()
try
{
close("", true);
MPPTunnelMetric::clearDataSizeMetric(data_size_in_queue);
}
catch (...)
{
Expand Down Expand Up @@ -160,7 +163,7 @@ void MPPTunnel::write(TrackedMppDataPacketPtr && data)
auto pushed_data_size = data->getPacket().ByteSizeLong();
if (tunnel_sender->push(std::move(data)))
{
updateMetric(pushed_data_size, mode);
updateMetric(data_size_in_queue, pushed_data_size, mode);
connection_profile_info.bytes += pushed_data_size;
connection_profile_info.packets += 1;
return;
Expand Down Expand Up @@ -196,14 +199,14 @@ void MPPTunnel::connect(PacketWriter * writer)
case TunnelSenderMode::LOCAL:
{
RUNTIME_ASSERT(writer == nullptr, log);
local_tunnel_sender = std::make_shared<LocalTunnelSender>(queue_size, mem_tracker, log, tunnel_id);
local_tunnel_sender = std::make_shared<LocalTunnelSender>(queue_size, mem_tracker, log, tunnel_id, &data_size_in_queue);
tunnel_sender = local_tunnel_sender;
break;
}
case TunnelSenderMode::SYNC_GRPC:
{
RUNTIME_ASSERT(writer != nullptr, log, "Sync writer shouldn't be null");
sync_tunnel_sender = std::make_shared<SyncTunnelSender>(queue_size, mem_tracker, log, tunnel_id);
sync_tunnel_sender = std::make_shared<SyncTunnelSender>(queue_size, mem_tracker, log, tunnel_id, &data_size_in_queue);
sync_tunnel_sender->startSendThread(writer);
tunnel_sender = sync_tunnel_sender;
break;
Expand Down Expand Up @@ -231,11 +234,11 @@ void MPPTunnel::connectAsync(IAsyncCallData * call_data)
auto kick_func_for_test = call_data->getKickFuncForTest();
if (unlikely(kick_func_for_test.has_value()))
{
async_tunnel_sender = std::make_shared<AsyncTunnelSender>(queue_size, mem_tracker, log, tunnel_id, kick_func_for_test.value());
async_tunnel_sender = std::make_shared<AsyncTunnelSender>(queue_size, mem_tracker, log, tunnel_id, kick_func_for_test.value(), &data_size_in_queue);
}
else
{
async_tunnel_sender = std::make_shared<AsyncTunnelSender>(queue_size, mem_tracker, log, tunnel_id, call_data->grpcCall());
async_tunnel_sender = std::make_shared<AsyncTunnelSender>(queue_size, mem_tracker, log, tunnel_id, call_data->grpcCall(), &data_size_in_queue);
}
call_data->attachAsyncTunnelSender(async_tunnel_sender);
tunnel_sender = async_tunnel_sender;
Expand Down Expand Up @@ -345,6 +348,7 @@ void SyncTunnelSender::sendJob(PacketWriter * writer)
TrackedMppDataPacketPtr res;
while (send_queue.pop(res) == MPMCQueueResult::OK)
{
MPPTunnelMetric::subDataSizeMetric(*data_size_in_queue, res->getPacket().ByteSizeLong());
if (!writer->write(res->packet))
{
err_msg = "grpc writes failed.";
Expand Down Expand Up @@ -389,6 +393,8 @@ std::shared_ptr<DB::TrackedMppDataPacket> LocalTunnelSender::readForLocal()
auto result = send_queue.pop(res);
if (result == MPMCQueueResult::OK)
{
MPPTunnelMetric::subDataSizeMetric(*data_size_in_queue, res->getPacket().ByteSizeLong());

// switch tunnel's memory tracker into receiver's
res->switchMemTracker(current_memory_tracker);
return res;
Expand Down
45 changes: 38 additions & 7 deletions dbms/src/Flash/Mpp/MPPTunnel.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Common/Logger.h>
#include <Common/MPMCQueue.h>
#include <Common/ThreadManager.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/FlashService.h>
#include <Flash/Mpp/GRPCSendQueue.h>
#include <Flash/Mpp/PacketWriter.h>
Expand Down Expand Up @@ -49,6 +50,26 @@ namespace tests
class TestMPPTunnel;
} // namespace tests

namespace MPPTunnelMetric
{
inline void addDataSizeMetric(std::atomic<Int64> & data_size_in_queue, size_t size)
{
data_size_in_queue.fetch_add(size);
GET_METRIC(tiflash_exchange_queueing_data_bytes, type_send).Increment(size);
}

inline void subDataSizeMetric(std::atomic<Int64> & data_size_in_queue, size_t size)
{
data_size_in_queue.fetch_sub(size);
GET_METRIC(tiflash_exchange_queueing_data_bytes, type_send).Decrement(size);
}

inline void clearDataSizeMetric(std::atomic<Int64> & data_size_in_queue)
{
GET_METRIC(tiflash_exchange_queueing_data_bytes, type_send).Decrement(data_size_in_queue.load());
}
} // namespace MPPTunnelMetric

class IAsyncCallData;

enum class TunnelSenderMode
Expand All @@ -64,11 +85,12 @@ class TunnelSender : private boost::noncopyable
{
public:
virtual ~TunnelSender() = default;
TunnelSender(size_t queue_size, MemoryTrackerPtr & memory_tracker_, const LoggerPtr & log_, const String & tunnel_id_)
TunnelSender(size_t queue_size, MemoryTrackerPtr & memory_tracker_, const LoggerPtr & log_, const String & tunnel_id_, std::atomic<Int64> * data_size_in_queue_)
: memory_tracker(memory_tracker_)
, send_queue(MPMCQueue<TrackedMppDataPacketPtr>(queue_size))
, log(log_)
, tunnel_id(tunnel_id_)
, data_size_in_queue(data_size_in_queue_)
, send_queue(MPMCQueue<TrackedMppDataPacketPtr>(queue_size))
{
}

Expand Down Expand Up @@ -137,11 +159,14 @@ class TunnelSender : private boost::noncopyable
std::shared_future<String> future;
std::atomic<bool> msg_has_set{false};
};

MemoryTrackerPtr memory_tracker;
MPMCQueue<TrackedMppDataPacketPtr> send_queue;
ConsumerState consumer_state;
const LoggerPtr log;
const String tunnel_id;

std::atomic<Int64> * data_size_in_queue; // From MppTunnel
MPMCQueue<TrackedMppDataPacketPtr> send_queue;
};

/// SyncTunnelSender maintains a new thread itself to consume and send data
Expand All @@ -163,14 +188,14 @@ class SyncTunnelSender : public TunnelSender
class AsyncTunnelSender : public TunnelSender
{
public:
AsyncTunnelSender(size_t queue_size, MemoryTrackerPtr & memory_tracker, const LoggerPtr & log_, const String & tunnel_id_, grpc_call * call_)
: TunnelSender(0, memory_tracker, log_, tunnel_id_)
AsyncTunnelSender(size_t queue_size, MemoryTrackerPtr & memory_tracker, const LoggerPtr & log_, const String & tunnel_id_, grpc_call * call_, std::atomic<Int64> * data_size_in_queue)
: TunnelSender(0, memory_tracker, log_, tunnel_id_, data_size_in_queue)
, queue(queue_size, call_, log_)
{}

/// For gtest usage.
AsyncTunnelSender(size_t queue_size, MemoryTrackerPtr & memoryTracker, const LoggerPtr & log_, const String & tunnel_id_, GRPCKickFunc func)
: TunnelSender(0, memoryTracker, log_, tunnel_id_)
AsyncTunnelSender(size_t queue_size, MemoryTrackerPtr & memoryTracker, const LoggerPtr & log_, const String & tunnel_id_, GRPCKickFunc func, std::atomic<Int64> * data_size_in_queue)
: TunnelSender(0, memoryTracker, log_, tunnel_id_, data_size_in_queue)
, queue(queue_size, func)
{}

Expand Down Expand Up @@ -199,6 +224,11 @@ class AsyncTunnelSender : public TunnelSender
return queue.pop(data, new_tag);
}

void subDataSizeMetric(size_t size)
{
::DB::MPPTunnelMetric::subDataSizeMetric(*data_size_in_queue, size);
}

private:
GRPCSendQueue<TrackedMppDataPacketPtr> queue;
};
Expand Down Expand Up @@ -347,6 +377,7 @@ class MPPTunnel : private boost::noncopyable
SyncTunnelSenderPtr sync_tunnel_sender;
AsyncTunnelSenderPtr async_tunnel_sender;
LocalTunnelSenderPtr local_tunnel_sender;
std::atomic<Int64> data_size_in_queue;
};
using MPPTunnelPtr = std::shared_ptr<MPPTunnel>;

Expand Down
Loading

0 comments on commit 5f7408d

Please sign in to comment.