diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 4c27a0f3025..8accc59b155 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -262,6 +262,9 @@ namespace DB F(type_merged_task, {{"type", "merged_task"}}, ExpBuckets{0.001, 2, 20})) \ M(tiflash_mpp_task_manager, "The gauge of mpp task manager", Gauge, \ F(type_mpp_query_count, {"type", "mpp_query_count"})) \ + M(tiflash_exchange_queueing_data_bytes, "Total bytes of data contained in the queue", Gauge, \ + F(type_send, {{"type", "send_queue"}}), \ + F(type_receive, {{"type", "recv_queue"}})) \ // clang-format on /// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)] diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index c9b1bbe95c6..df7b4e41a33 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -233,6 +233,7 @@ void EstablishCallData::trySendOneMsg() switch (async_tunnel_sender->pop(res, this)) { case GRPCSendQueueRes::OK: + async_tunnel_sender->subDataSizeMetric(res->getPacket().ByteSizeLong()); /// Note: has to switch the memory tracker before `write` /// because after `write`, `async_tunnel_sender` can be destroyed at any time /// so there is a risk that `res` is destructed after `aysnc_tunnel_sender` diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index adbbad0e48b..26e2ccda376 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -155,6 +155,26 @@ enum class AsyncRequestStage FINISHED, }; +namespace ExchangeReceiverMetric +{ +inline void addDataSizeMetric(std::atomic & data_size_in_queue, size_t size) +{ + data_size_in_queue.fetch_add(size); + GET_METRIC(tiflash_exchange_queueing_data_bytes, type_receive).Increment(size); +} + +inline void subDataSizeMetric(std::atomic & data_size_in_queue, size_t size) +{ + data_size_in_queue.fetch_sub(size); + GET_METRIC(tiflash_exchange_queueing_data_bytes, type_receive).Decrement(size); +} + +inline void clearDataSizeMetric(std::atomic & data_size_in_queue) +{ + GET_METRIC(tiflash_exchange_queueing_data_bytes, type_receive).Decrement(data_size_in_queue.load()); +} +} // namespace ExchangeReceiverMetric + using Clock = std::chrono::system_clock; using TimePoint = Clock::time_point; @@ -176,12 +196,14 @@ class AsyncRequestHandler : public UnaryCallback std::vector * msg_channels_, const std::shared_ptr & context, const Request & req, - const String & req_id) + const String & req_id, + std::atomic * data_size_in_queue_) : rpc_context(context) , cq(&(GRPCCompletionQueuePool::global_instance->pickQueue())) , request(&req) , notify_queue(queue) , msg_channels(msg_channels_) + , data_size_in_queue(data_size_in_queue_) , req_info(fmt::format("tunnel{}+{}", req.send_task_id, req.recv_task_id)) , log(Logger::get(req_id, req_info)) { @@ -369,6 +391,9 @@ class AsyncRequestHandler : public UnaryCallback *msg_channels, log)) return false; + + ExchangeReceiverMetric::addDataSizeMetric(*data_size_in_queue, packet->getPacket().ByteSizeLong()); + // can't reuse packet since it is sent to readers. packet = std::make_shared(); } @@ -387,6 +412,7 @@ class AsyncRequestHandler : public UnaryCallback const Request * request; // won't be null MPMCQueue * notify_queue; // won't be null std::vector * msg_channels; // won't be null + std::atomic * data_size_in_queue; // won't be null String req_info; bool meet_error = false; @@ -423,6 +449,7 @@ ExchangeReceiverBase::ExchangeReceiverBase( , state(ExchangeReceiverState::NORMAL) , exc_log(Logger::get(req_id, executor_id)) , collected(false) + , data_size_in_queue(0) , disaggregated_dispatch_reqs(disaggregated_dispatch_reqs_) { try @@ -430,9 +457,7 @@ ExchangeReceiverBase::ExchangeReceiverBase( if (enable_fine_grained_shuffle_flag) { for (size_t i = 0; i < output_stream_count; ++i) - { msg_channels.push_back(std::make_unique>>(max_buffer_size)); - } } else { @@ -465,6 +490,7 @@ ExchangeReceiverBase::~ExchangeReceiverBase() { close(); thread_manager->wait(); + ExchangeReceiverMetric::clearDataSizeMetric(data_size_in_queue); } catch (...) { @@ -545,7 +571,7 @@ void ExchangeReceiverBase::reactor(const std::vector & asyn std::vector> handlers; handlers.reserve(alive_async_connections); for (const auto & req : async_requests) - handlers.emplace_back(std::make_unique(&ready_requests, &msg_channels, rpc_context, req, exc_log->identifier())); + handlers.emplace_back(std::make_unique(&ready_requests, &msg_channels, rpc_context, req, exc_log->identifier(), &data_size_in_queue)); while (alive_async_connections > 0) { @@ -617,6 +643,8 @@ void ExchangeReceiverBase::readLoop(const Request & req) local_err_msg = fmt::format("Push mpp packet failed. {}", getStatusString()); break; } + + ExchangeReceiverMetric::addDataSizeMetric(data_size_in_queue, packet->getPacket().ByteSizeLong()); } // if meet error, such as decode packet fails, it will not retry. if (meet_error) @@ -713,6 +741,8 @@ ExchangeReceiverResult ExchangeReceiverBase::nextResult( assert(recv_msg != nullptr); if (unlikely(recv_msg->error_ptr != nullptr)) return ExchangeReceiverResult::newError(recv_msg->source_index, recv_msg->req_info, recv_msg->error_ptr->msg()); + + ExchangeReceiverMetric::subDataSizeMetric(data_size_in_queue, recv_msg->packet->getPacket().ByteSizeLong()); return toDecodeResult(block_queue, header, recv_msg, decoder_ptr); } } diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index a05bff8a0cc..b5cc4877903 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -221,6 +221,8 @@ class ExchangeReceiverBase bool collected = false; int thread_count = 0; + std::atomic data_size_in_queue; + // For tiflash_compute node, need to send MPPTask to tiflash_storage node. std::vector disaggregated_dispatch_reqs; }; diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 18791132675..431b473f4e8 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -45,7 +45,7 @@ String tunnelSenderModeToString(TunnelSenderMode mode) } // Update metric for tunnel's response bytes -inline void updateMetric(size_t pushed_data_size, TunnelSenderMode mode) +void updateMetric(std::atomic & data_size_in_queue, size_t pushed_data_size, TunnelSenderMode mode) { switch (mode) { @@ -59,6 +59,7 @@ inline void updateMetric(size_t pushed_data_size, TunnelSenderMode mode) default: throw DB::Exception("Illegal TunnelSenderMode"); } + MPPTunnelMetric::addDataSizeMetric(data_size_in_queue, pushed_data_size); } } // namespace @@ -86,6 +87,7 @@ MPPTunnel::MPPTunnel( , mem_tracker(current_memory_tracker ? current_memory_tracker->shared_from_this() : nullptr) , queue_size(std::max(5, input_steams_num_ * 5)) // MPMCQueue can benefit from a slightly larger queue size , log(Logger::get(req_id, tunnel_id)) + , data_size_in_queue(0) { RUNTIME_ASSERT(!(is_local_ && is_async_), log, "is_local: {}, is_async: {}.", is_local_, is_async_); if (is_local_) @@ -105,6 +107,7 @@ MPPTunnel::~MPPTunnel() try { close("", true); + MPPTunnelMetric::clearDataSizeMetric(data_size_in_queue); } catch (...) { @@ -160,7 +163,7 @@ void MPPTunnel::write(TrackedMppDataPacketPtr && data) auto pushed_data_size = data->getPacket().ByteSizeLong(); if (tunnel_sender->push(std::move(data))) { - updateMetric(pushed_data_size, mode); + updateMetric(data_size_in_queue, pushed_data_size, mode); connection_profile_info.bytes += pushed_data_size; connection_profile_info.packets += 1; return; @@ -196,14 +199,14 @@ void MPPTunnel::connect(PacketWriter * writer) case TunnelSenderMode::LOCAL: { RUNTIME_ASSERT(writer == nullptr, log); - local_tunnel_sender = std::make_shared(queue_size, mem_tracker, log, tunnel_id); + local_tunnel_sender = std::make_shared(queue_size, mem_tracker, log, tunnel_id, &data_size_in_queue); tunnel_sender = local_tunnel_sender; break; } case TunnelSenderMode::SYNC_GRPC: { RUNTIME_ASSERT(writer != nullptr, log, "Sync writer shouldn't be null"); - sync_tunnel_sender = std::make_shared(queue_size, mem_tracker, log, tunnel_id); + sync_tunnel_sender = std::make_shared(queue_size, mem_tracker, log, tunnel_id, &data_size_in_queue); sync_tunnel_sender->startSendThread(writer); tunnel_sender = sync_tunnel_sender; break; @@ -231,11 +234,11 @@ void MPPTunnel::connectAsync(IAsyncCallData * call_data) auto kick_func_for_test = call_data->getKickFuncForTest(); if (unlikely(kick_func_for_test.has_value())) { - async_tunnel_sender = std::make_shared(queue_size, mem_tracker, log, tunnel_id, kick_func_for_test.value()); + async_tunnel_sender = std::make_shared(queue_size, mem_tracker, log, tunnel_id, kick_func_for_test.value(), &data_size_in_queue); } else { - async_tunnel_sender = std::make_shared(queue_size, mem_tracker, log, tunnel_id, call_data->grpcCall()); + async_tunnel_sender = std::make_shared(queue_size, mem_tracker, log, tunnel_id, call_data->grpcCall(), &data_size_in_queue); } call_data->attachAsyncTunnelSender(async_tunnel_sender); tunnel_sender = async_tunnel_sender; @@ -345,6 +348,7 @@ void SyncTunnelSender::sendJob(PacketWriter * writer) TrackedMppDataPacketPtr res; while (send_queue.pop(res) == MPMCQueueResult::OK) { + MPPTunnelMetric::subDataSizeMetric(*data_size_in_queue, res->getPacket().ByteSizeLong()); if (!writer->write(res->packet)) { err_msg = "grpc writes failed."; @@ -389,6 +393,8 @@ std::shared_ptr LocalTunnelSender::readForLocal() auto result = send_queue.pop(res); if (result == MPMCQueueResult::OK) { + MPPTunnelMetric::subDataSizeMetric(*data_size_in_queue, res->getPacket().ByteSizeLong()); + // switch tunnel's memory tracker into receiver's res->switchMemTracker(current_memory_tracker); return res; diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index db1326b3c61..953d40be3c2 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -49,6 +50,26 @@ namespace tests class TestMPPTunnel; } // namespace tests +namespace MPPTunnelMetric +{ +inline void addDataSizeMetric(std::atomic & data_size_in_queue, size_t size) +{ + data_size_in_queue.fetch_add(size); + GET_METRIC(tiflash_exchange_queueing_data_bytes, type_send).Increment(size); +} + +inline void subDataSizeMetric(std::atomic & data_size_in_queue, size_t size) +{ + data_size_in_queue.fetch_sub(size); + GET_METRIC(tiflash_exchange_queueing_data_bytes, type_send).Decrement(size); +} + +inline void clearDataSizeMetric(std::atomic & data_size_in_queue) +{ + GET_METRIC(tiflash_exchange_queueing_data_bytes, type_send).Decrement(data_size_in_queue.load()); +} +} // namespace MPPTunnelMetric + class IAsyncCallData; enum class TunnelSenderMode @@ -64,11 +85,12 @@ class TunnelSender : private boost::noncopyable { public: virtual ~TunnelSender() = default; - TunnelSender(size_t queue_size, MemoryTrackerPtr & memory_tracker_, const LoggerPtr & log_, const String & tunnel_id_) + TunnelSender(size_t queue_size, MemoryTrackerPtr & memory_tracker_, const LoggerPtr & log_, const String & tunnel_id_, std::atomic * data_size_in_queue_) : memory_tracker(memory_tracker_) - , send_queue(MPMCQueue(queue_size)) , log(log_) , tunnel_id(tunnel_id_) + , data_size_in_queue(data_size_in_queue_) + , send_queue(MPMCQueue(queue_size)) { } @@ -137,11 +159,14 @@ class TunnelSender : private boost::noncopyable std::shared_future future; std::atomic msg_has_set{false}; }; + MemoryTrackerPtr memory_tracker; - MPMCQueue send_queue; ConsumerState consumer_state; const LoggerPtr log; const String tunnel_id; + + std::atomic * data_size_in_queue; // From MppTunnel + MPMCQueue send_queue; }; /// SyncTunnelSender maintains a new thread itself to consume and send data @@ -163,14 +188,14 @@ class SyncTunnelSender : public TunnelSender class AsyncTunnelSender : public TunnelSender { public: - AsyncTunnelSender(size_t queue_size, MemoryTrackerPtr & memory_tracker, const LoggerPtr & log_, const String & tunnel_id_, grpc_call * call_) - : TunnelSender(0, memory_tracker, log_, tunnel_id_) + AsyncTunnelSender(size_t queue_size, MemoryTrackerPtr & memory_tracker, const LoggerPtr & log_, const String & tunnel_id_, grpc_call * call_, std::atomic * data_size_in_queue) + : TunnelSender(0, memory_tracker, log_, tunnel_id_, data_size_in_queue) , queue(queue_size, call_, log_) {} /// For gtest usage. - AsyncTunnelSender(size_t queue_size, MemoryTrackerPtr & memoryTracker, const LoggerPtr & log_, const String & tunnel_id_, GRPCKickFunc func) - : TunnelSender(0, memoryTracker, log_, tunnel_id_) + AsyncTunnelSender(size_t queue_size, MemoryTrackerPtr & memoryTracker, const LoggerPtr & log_, const String & tunnel_id_, GRPCKickFunc func, std::atomic * data_size_in_queue) + : TunnelSender(0, memoryTracker, log_, tunnel_id_, data_size_in_queue) , queue(queue_size, func) {} @@ -199,6 +224,11 @@ class AsyncTunnelSender : public TunnelSender return queue.pop(data, new_tag); } + void subDataSizeMetric(size_t size) + { + ::DB::MPPTunnelMetric::subDataSizeMetric(*data_size_in_queue, size); + } + private: GRPCSendQueue queue; }; @@ -347,6 +377,7 @@ class MPPTunnel : private boost::noncopyable SyncTunnelSenderPtr sync_tunnel_sender; AsyncTunnelSenderPtr async_tunnel_sender; LocalTunnelSenderPtr local_tunnel_sender; + std::atomic data_size_in_queue; }; using MPPTunnelPtr = std::shared_ptr; diff --git a/metrics/grafana/tiflash_summary.json b/metrics/grafana/tiflash_summary.json index 9d69c8ec6d8..87c5c9ad035 100644 --- a/metrics/grafana/tiflash_summary.json +++ b/metrics/grafana/tiflash_summary.json @@ -51,8 +51,7 @@ "editable": true, "gnetId": null, "graphTooltip": 1, - "id": null, - "iteration": 1667534599787, + "iteration": 1670904884485, "links": [], "panels": [ { @@ -3961,6 +3960,105 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 0, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 45 + }, + "hiddenSeries": false, + "id": 166, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "tiflash_exchange_queueing_data_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Data size in send and receive queue", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "bytes", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "repeat": null,