From ec305b8b805cb40dd58419b17f42a31851f510dd Mon Sep 17 00:00:00 2001 From: Xin Liao Date: Thu, 18 Jul 2024 09:53:52 +0800 Subject: [PATCH] [Fix](load) Fix the channel leak when close wait has been cancelled (#38031) When the close_wait is called, the NodeChannel has already been marked as cancelled, but close_wait will set _is_closed to true. When it actually sends a cancel request to the downstream LoadChannel, it finds that _is_closed has already been set to true, so it will not send an RPC request, causing a LoadChannel leak. --- be/src/vec/sink/vtablet_sink.cpp | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index c684d251a3c35c..4bf4cb0ff09348 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -865,12 +865,6 @@ bool VNodeChannel::is_send_data_rpc_done() const { Status VNodeChannel::close_wait(RuntimeState* state) { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - // set _is_closed to true finally - Defer set_closed {[&]() { - std::lock_guard l(_closed_lock); - _is_closed = true; - }}; - auto st = none_of({_cancelled, !_eos_is_produced}); if (!st.ok()) { if (_cancelled) { @@ -891,8 +885,8 @@ Status VNodeChannel::close_wait(RuntimeState* state) { } _close_time_ms = UnixMillis() - _close_time_ms; - if (_cancelled || state->is_cancelled()) { - cancel(state->cancel_reason()); + if (state->is_cancelled()) { + _cancel_with_msg(state->cancel_reason()); } if (_add_batches_finished) { @@ -904,6 +898,10 @@ Status VNodeChannel::close_wait(RuntimeState* state) { _index_channel->set_error_tablet_in_state(state); _index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id); _index_channel->set_tablets_filtered_rows(_tablets_filtered_rows, _node_id); + std::lock_guard l(_closed_lock); + // only when normal close, we set _is_closed to true. + // otherwise, we will set it to true in cancel(). + _is_closed = true; return Status::OK(); }