Skip to content

Commit

Permalink
[Fix](load) Fix the channel leak when close wait has been cancelled (#…
Browse files Browse the repository at this point in the history
…38031)

When the close_wait is called, the NodeChannel has already been marked
as cancelled, but close_wait will set _is_closed to true. When it
actually sends a cancel request to the downstream LoadChannel, it finds
that _is_closed has already been set to true, so it will not send an RPC
request, causing a LoadChannel leak.
  • Loading branch information
liaoxin01 authored Jul 18, 2024
1 parent ec762f2 commit a9a633c
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,11 +895,6 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
Status VNodeChannel::close_wait(RuntimeState* state) {
DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { MemoryReclamation::process_full_gc(); });
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
// set _is_closed to true finally
Defer set_closed {[&]() {
std::lock_guard<std::mutex> l(_closed_lock);
_is_closed = true;
}};

auto st = none_of({_cancelled, !_eos_is_produced});
if (!st.ok()) {
Expand All @@ -923,8 +918,8 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
VLOG_CRITICAL << _parent->_sender_id << " close wait finished";
_close_time_ms = UnixMillis() - _close_time_ms;

if (_cancelled || state->is_cancelled()) {
cancel(state->cancel_reason().to_string());
if (state->is_cancelled()) {
_cancel_with_msg(state->cancel_reason().to_string());
}

if (_add_batches_finished) {
Expand All @@ -936,6 +931,11 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
_index_channel->set_error_tablet_in_state(state);
_index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id);
_index_channel->set_tablets_filtered_rows(_tablets_filtered_rows, _node_id);

std::lock_guard<std::mutex> l(_closed_lock);
// only when normal close, we set _is_closed to true.
// otherwise, we will set it to true in cancel().
_is_closed = true;
return Status::OK();
}

Expand Down

0 comments on commit a9a633c

Please sign in to comment.