Skip to content

Commit

Permalink
Fix ut caused by quick closed connection (#6694)
Browse files Browse the repository at this point in the history
close #6686
  • Loading branch information
xzhangxian1008 authored Feb 2, 2023
1 parent 7cf64b0 commit 7298855
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 30 deletions.
54 changes: 28 additions & 26 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,10 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
, enable_fine_grained_shuffle_flag(enableFineGrainedShuffle(fine_grained_shuffle_stream_count_))
, output_stream_count(enable_fine_grained_shuffle_flag ? std::min(max_streams_, fine_grained_shuffle_stream_count_) : max_streams_)
, max_buffer_size(std::max<size_t>(batch_packet_count, std::max(source_num, max_streams_) * 2))
, connection_uncreated_num(source_num)
, thread_manager(newThreadManager())
, live_connections(0)
, live_local_connections(0)
, live_connections(source_num)
, state(ExchangeReceiverState::NORMAL)
, exc_log(Logger::get(req_id, executor_id))
, collected(false)
Expand All @@ -336,6 +337,7 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
{
try
{
handleConnectionAfterException();
cancel();
thread_manager->wait();
}
Expand Down Expand Up @@ -366,6 +368,16 @@ ExchangeReceiverBase<RPCContext>::~ExchangeReceiverBase()
}
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::handleConnectionAfterException()
{
std::lock_guard lock(mu);
live_connections -= connection_uncreated_num;

// some cv may have been blocked, wake them up and recheck the condition.
cv.notify_all();
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::waitAllConnectionDone()
{
Expand Down Expand Up @@ -424,24 +436,9 @@ template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::addLocalConnectionNum()
{
std::lock_guard lock(mu);
++live_connections;
++live_local_connections;
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::addSyncConnectionNum()
{
std::lock_guard lock(mu);
++live_connections;
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::addAsyncConnectionNum(Int32 conn_num)
{
std::lock_guard lock(mu);
live_connections += conn_num;
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::setUpConnection()
{
Expand Down Expand Up @@ -475,6 +472,7 @@ void ExchangeReceiverBase<RPCContext>::setUpConnection()
req.source_index,
local_request_handler,
enable_fine_grained_shuffle_flag);
--connection_uncreated_num;
}
else
{
Expand All @@ -486,12 +484,14 @@ void ExchangeReceiverBase<RPCContext>::setUpConnection()
});

++thread_count;
--connection_uncreated_num;
}
}

// TODO: reduce this thread in the future.
if (!async_requests.empty())
{
auto async_conn_num = async_requests.size();
thread_manager->schedule(true, "RecvReactor", [this, async_requests = std::move(async_requests)] {
if (enable_fine_grained_shuffle_flag)
reactor<true>(async_requests);
Expand All @@ -500,6 +500,7 @@ void ExchangeReceiverBase<RPCContext>::setUpConnection()
});

++thread_count;
connection_uncreated_num -= async_conn_num;
}
}

Expand All @@ -517,7 +518,6 @@ void ExchangeReceiverBase<RPCContext>::reactor(const std::vector<Request> & asyn
CPUAffinityManager::getInstance().bindSelfQueryThread();

size_t alive_async_connections = async_requests.size();
addAsyncConnectionNum(alive_async_connections);
MPMCQueue<AsyncHandler *> ready_requests(alive_async_connections * 2);

std::vector<std::unique_ptr<AsyncHandler>> handlers;
Expand Down Expand Up @@ -550,8 +550,6 @@ template <typename RPCContext>
template <bool enable_fine_grained_shuffle>
void ExchangeReceiverBase<RPCContext>::readLoop(const Request & req)
{
addSyncConnectionNum();

GET_METRIC(tiflash_thread_count, type_threads_of_receiver_read_loop).Increment();
SCOPE_EXIT({
GET_METRIC(tiflash_thread_count, type_threads_of_receiver_read_loop).Decrement();
Expand Down Expand Up @@ -818,7 +816,7 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
const String & local_err_msg,
const LoggerPtr & log)
{
Int32 copy_live_conn = -1;
Int32 copy_live_connections;
{
std::lock_guard lock(mu);

Expand All @@ -829,26 +827,30 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
if (err_msg.empty())
err_msg = local_err_msg;
}
copy_live_conn = --live_connections;

copy_live_connections = --live_connections;
}

LOG_DEBUG(
log,
"connection end. meet error: {}, err msg: {}, current alive connections: {}",
meet_error,
local_err_msg,
copy_live_conn);
copy_live_connections);

if (copy_live_conn == 0)
if (copy_live_connections == 0)
{
LOG_DEBUG(log, "All threads end in ExchangeReceiver");
cv.notify_all();
}
else if (copy_live_conn < 0)
throw Exception("live_connections should not be less than 0!");
else if (copy_live_connections < 0)
throw Exception("alive_connection_num should not be less than 0!");

if (meet_error || copy_live_conn == 0)
if (meet_error || copy_live_connections == 0)
{
LOG_INFO(exc_log, "receiver channels finished");
finishAllMsgChannels();
}
}

template <typename RPCContext>
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,8 @@ class ExchangeReceiverBase
private:
void prepareMsgChannels();
void addLocalConnectionNum();
void addSyncConnectionNum();
void addAsyncConnectionNum(Int32 conn_num);

void connectionLocalDone();
void handleConnectionAfterException();

bool isReceiverForTiFlashStorage()
{
Expand All @@ -180,6 +178,7 @@ class ExchangeReceiverBase
const bool enable_fine_grained_shuffle_flag;
const size_t output_stream_count;
const size_t max_buffer_size;
Int32 connection_uncreated_num;

std::shared_ptr<ThreadManager> thread_manager;
DAGSchema schema;
Expand All @@ -189,8 +188,8 @@ class ExchangeReceiverBase
std::mutex mu;
std::condition_variable cv;
/// should lock `mu` when visit these members
Int32 live_connections;
Int32 live_local_connections;
Int32 live_connections;
ExchangeReceiverState state;
String err_msg;

Expand Down

0 comments on commit 7298855

Please sign in to comment.