Skip to content

Commit

Permalink
*: fix crash issue (#6957)
Browse files Browse the repository at this point in the history
ref #6880, close #6951
  • Loading branch information
solotzg authored Mar 6, 2023
1 parent 0ff2b96 commit 03510c9
Showing 1 changed file with 31 additions and 12 deletions.
43 changes: 31 additions & 12 deletions dbms/src/Flash/Mpp/MPPTunnelSetWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ static void broadcastOrPassThroughWriteImpl(
FuncIsLocalTunnel && isLocalTunnel,
FuncWriteToTunnel && writeToTunnel)
{
assert(tunnel_cnt > 0);
assert(ori_packet_bytes > 0);

const size_t remote_tunnel_cnt = tunnel_cnt - local_tunnel_cnt;
Expand All @@ -127,23 +128,41 @@ static void broadcastOrPassThroughWriteImpl(
}

// TODO avoid copy packet for broadcast.
for (size_t i = 0, local_cnt = 0, remote_cnt = 0; i < tunnel_cnt; ++i)

if (local_tracked_packet == remote_tracked_packet)
{
if (isLocalTunnel(i))
// `TrackedMppDataPacket` in `TrackedMppDataPacketPtr` is mutable.
// If `local_tracked_packet` and `remote_tracked_packet` share same object, there is also another optional way to copy `remote_tracked_packet` early.
// Use this fast path to reduce maximum memory usage
auto tracked_packet = std::move(local_tracked_packet);
remote_tracked_packet = nullptr;

for (size_t i = 1; i < tunnel_cnt; ++i)
{
local_cnt++;
if (local_cnt == local_tunnel_cnt)
writeToTunnel(std::move(local_tracked_packet), i);
else
writeToTunnel(local_tracked_packet->copy(), i); // NOLINT
writeToTunnel(tracked_packet->copy(), i);
}
else
writeToTunnel(std::move(tracked_packet), 0);
}
else
{
for (size_t i = 0, local_cnt = 0, remote_cnt = 0; i < tunnel_cnt; ++i)
{
remote_cnt++;
if (remote_cnt == remote_tunnel_cnt)
writeToTunnel(std::move(remote_tracked_packet), i);
if (isLocalTunnel(i))
{
local_cnt++;
if (local_cnt == local_tunnel_cnt)
writeToTunnel(std::move(local_tracked_packet), i);
else
writeToTunnel(local_tracked_packet->copy(), i); // NOLINT
}
else
writeToTunnel(remote_tracked_packet->copy(), i); // NOLINT
{
remote_cnt++;
if (remote_cnt == remote_tunnel_cnt)
writeToTunnel(std::move(remote_tracked_packet), i);
else
writeToTunnel(remote_tracked_packet->copy(), i); // NOLINT
}
}
}

Expand Down

0 comments on commit 03510c9

Please sign in to comment.