diff --git a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp index da92dfaa5f5..ae122201d91 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp @@ -101,6 +101,7 @@ static void broadcastOrPassThroughWriteImpl( FuncIsLocalTunnel && isLocalTunnel, FuncWriteToTunnel && writeToTunnel) { + assert(tunnel_cnt > 0); assert(ori_packet_bytes > 0); const size_t remote_tunnel_cnt = tunnel_cnt - local_tunnel_cnt; @@ -127,23 +128,41 @@ static void broadcastOrPassThroughWriteImpl( } // TODO avoid copy packet for broadcast. - for (size_t i = 0, local_cnt = 0, remote_cnt = 0; i < tunnel_cnt; ++i) + + if (local_tracked_packet == remote_tracked_packet) { - if (isLocalTunnel(i)) + // `TrackedMppDataPacket` in `TrackedMppDataPacketPtr` is mutable. + // If `local_tracked_packet` and `remote_tracked_packet` share same object, there is also another optional way to copy `remote_tracked_packet` early. + // Use this fast path to reduce maximum memory usage + auto tracked_packet = std::move(local_tracked_packet); + remote_tracked_packet = nullptr; + + for (size_t i = 1; i < tunnel_cnt; ++i) { - local_cnt++; - if (local_cnt == local_tunnel_cnt) - writeToTunnel(std::move(local_tracked_packet), i); - else - writeToTunnel(local_tracked_packet->copy(), i); // NOLINT + writeToTunnel(tracked_packet->copy(), i); } - else + writeToTunnel(std::move(tracked_packet), 0); + } + else + { + for (size_t i = 0, local_cnt = 0, remote_cnt = 0; i < tunnel_cnt; ++i) { - remote_cnt++; - if (remote_cnt == remote_tunnel_cnt) - writeToTunnel(std::move(remote_tracked_packet), i); + if (isLocalTunnel(i)) + { + local_cnt++; + if (local_cnt == local_tunnel_cnt) + writeToTunnel(std::move(local_tracked_packet), i); + else + writeToTunnel(local_tracked_packet->copy(), i); // NOLINT + } else - writeToTunnel(remote_tracked_packet->copy(), i); // NOLINT + { + remote_cnt++; + if (remote_cnt == remote_tunnel_cnt) + writeToTunnel(std::move(remote_tracked_packet), i); + else + writeToTunnel(remote_tracked_packet->copy(), i); // NOLINT + } } }