Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ut caused by quick closed connection #6694

Merged
merged 10 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 27 additions & 26 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,9 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
, output_stream_count(enable_fine_grained_shuffle_flag ? std::min(max_streams_, fine_grained_shuffle_stream_count_) : max_streams_)
, max_buffer_size(std::max<size_t>(batch_packet_count, std::max(source_num, max_streams_) * 2))
, thread_manager(newThreadManager())
, live_connections(0)
, live_local_connections(0)
, live_connections(source_num)
, connection_uncreated_num(source_num)
, state(ExchangeReceiverState::NORMAL)
, exc_log(Logger::get(req_id, executor_id))
, collected(false)
Expand All @@ -336,6 +337,7 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
{
try
{
handleConnectionAfterException();
cancel();
thread_manager->wait();
}
Expand Down Expand Up @@ -366,6 +368,16 @@ ExchangeReceiverBase<RPCContext>::~ExchangeReceiverBase()
}
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::handleConnectionAfterException()
{
std::lock_guard lock(mu);
live_connections -= connection_uncreated_num;

// some cv may have been blocked, wake them up and recheck the condition.
cv.notify_all();
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::waitAllConnectionDone()
{
Expand Down Expand Up @@ -424,24 +436,9 @@ template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::addLocalConnectionNum()
{
std::lock_guard lock(mu);
++live_connections;
++live_local_connections;
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::addSyncConnectionNum()
{
std::lock_guard lock(mu);
++live_connections;
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::addAsyncConnectionNum(Int32 conn_num)
{
std::lock_guard lock(mu);
live_connections += conn_num;
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::setUpConnection()
{
Expand Down Expand Up @@ -487,11 +484,13 @@ void ExchangeReceiverBase<RPCContext>::setUpConnection()

++thread_count;
}
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
--connection_uncreated_num;
}

// TODO: reduce this thread in the future.
if (!async_requests.empty())
{
auto async_conn_num = async_requests.size();
thread_manager->schedule(true, "RecvReactor", [this, async_requests = std::move(async_requests)] {
if (enable_fine_grained_shuffle_flag)
reactor<true>(async_requests);
Expand All @@ -500,6 +499,7 @@ void ExchangeReceiverBase<RPCContext>::setUpConnection()
});

++thread_count;
connection_uncreated_num -= async_conn_num;
}
}

Expand All @@ -517,7 +517,6 @@ void ExchangeReceiverBase<RPCContext>::reactor(const std::vector<Request> & asyn
CPUAffinityManager::getInstance().bindSelfQueryThread();

size_t alive_async_connections = async_requests.size();
addAsyncConnectionNum(alive_async_connections);
MPMCQueue<AsyncHandler *> ready_requests(alive_async_connections * 2);

std::vector<std::unique_ptr<AsyncHandler>> handlers;
Expand Down Expand Up @@ -550,8 +549,6 @@ template <typename RPCContext>
template <bool enable_fine_grained_shuffle>
void ExchangeReceiverBase<RPCContext>::readLoop(const Request & req)
{
addSyncConnectionNum();

GET_METRIC(tiflash_thread_count, type_threads_of_receiver_read_loop).Increment();
SCOPE_EXIT({
GET_METRIC(tiflash_thread_count, type_threads_of_receiver_read_loop).Decrement();
Expand Down Expand Up @@ -818,7 +815,7 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
const String & local_err_msg,
const LoggerPtr & log)
{
Int32 copy_live_conn = -1;
Int32 copy_live_connections;
{
std::lock_guard lock(mu);

Expand All @@ -829,26 +826,30 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
if (err_msg.empty())
err_msg = local_err_msg;
}
copy_live_conn = --live_connections;

copy_live_connections = --live_connections;
}

LOG_DEBUG(
log,
"connection end. meet error: {}, err msg: {}, current alive connections: {}",
meet_error,
local_err_msg,
copy_live_conn);
copy_live_connections);

if (copy_live_conn == 0)
if (copy_live_connections == 0)
{
LOG_DEBUG(log, "All threads end in ExchangeReceiver");
cv.notify_all();
}
else if (copy_live_conn < 0)
throw Exception("live_connections should not be less than 0!");
else if (copy_live_connections < 0)
throw Exception("alive_connection_num should not be less than 0!");

if (meet_error || copy_live_conn == 0)
if (meet_error || copy_live_connections == 0)
{
LOG_INFO(exc_log, "receiver finished");
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved
finishAllMsgChannels();
}
}

template <typename RPCContext>
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ class ExchangeReceiverBase
const std::shared_ptr<ReceivedMessage> & recv_msg,
std::unique_ptr<CHBlockChunkDecodeAndSquash> & decoder_ptr);

Int32 getAliveConnectionNumNolock();
xzhangxian1008 marked this conversation as resolved.
Show resolved Hide resolved

private:
void prepareMsgChannels();
void addLocalConnectionNum();
void addSyncConnectionNum();
void addAsyncConnectionNum(Int32 conn_num);

void connectionLocalDone();
void handleConnectionAfterException();

bool isReceiverForTiFlashStorage()
{
Expand All @@ -189,8 +189,9 @@ class ExchangeReceiverBase
std::mutex mu;
std::condition_variable cv;
/// should lock `mu` when visit these members
Int32 live_connections;
Int32 live_local_connections;
Int32 live_connections;
Int32 connection_uncreated_num;
ExchangeReceiverState state;
String err_msg;

Expand Down