Skip to content

Commit

Permalink
Revert "Fix ut caused by quick closed connection (pingcap#6694)"
Browse files Browse the repository at this point in the history
This reverts commit 7298855.
  • Loading branch information
solotzg committed Feb 2, 2023
1 parent 632fdaa commit db7551c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 31 deletions.
54 changes: 26 additions & 28 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,9 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
, enable_fine_grained_shuffle_flag(enableFineGrainedShuffle(fine_grained_shuffle_stream_count_))
, output_stream_count(enable_fine_grained_shuffle_flag ? std::min(max_streams_, fine_grained_shuffle_stream_count_) : max_streams_)
, max_buffer_size(std::max<size_t>(batch_packet_count, std::max(source_num, max_streams_) * 2))
, connection_uncreated_num(source_num)
, thread_manager(newThreadManager())
, live_connections(0)
, live_local_connections(0)
, live_connections(source_num)
, state(ExchangeReceiverState::NORMAL)
, exc_log(Logger::get(req_id, executor_id))
, collected(false)
Expand All @@ -337,7 +336,6 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
{
try
{
handleConnectionAfterException();
cancel();
thread_manager->wait();
}
Expand Down Expand Up @@ -368,16 +366,6 @@ ExchangeReceiverBase<RPCContext>::~ExchangeReceiverBase()
}
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::handleConnectionAfterException()
{
std::lock_guard lock(mu);
live_connections -= connection_uncreated_num;

// some cv may have been blocked, wake them up and recheck the condition.
cv.notify_all();
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::waitAllConnectionDone()
{
Expand Down Expand Up @@ -436,9 +424,24 @@ template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::addLocalConnectionNum()
{
std::lock_guard lock(mu);
++live_connections;
++live_local_connections;
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::addSyncConnectionNum()
{
std::lock_guard lock(mu);
++live_connections;
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::addAsyncConnectionNum(Int32 conn_num)
{
std::lock_guard lock(mu);
live_connections += conn_num;
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::setUpConnection()
{
Expand Down Expand Up @@ -472,7 +475,6 @@ void ExchangeReceiverBase<RPCContext>::setUpConnection()
req.source_index,
local_request_handler,
enable_fine_grained_shuffle_flag);
--connection_uncreated_num;
}
else
{
Expand All @@ -484,14 +486,12 @@ void ExchangeReceiverBase<RPCContext>::setUpConnection()
});

++thread_count;
--connection_uncreated_num;
}
}

// TODO: reduce this thread in the future.
if (!async_requests.empty())
{
auto async_conn_num = async_requests.size();
thread_manager->schedule(true, "RecvReactor", [this, async_requests = std::move(async_requests)] {
if (enable_fine_grained_shuffle_flag)
reactor<true>(async_requests);
Expand All @@ -500,7 +500,6 @@ void ExchangeReceiverBase<RPCContext>::setUpConnection()
});

++thread_count;
connection_uncreated_num -= async_conn_num;
}
}

Expand All @@ -518,6 +517,7 @@ void ExchangeReceiverBase<RPCContext>::reactor(const std::vector<Request> & asyn
CPUAffinityManager::getInstance().bindSelfQueryThread();

size_t alive_async_connections = async_requests.size();
addAsyncConnectionNum(alive_async_connections);
MPMCQueue<AsyncHandler *> ready_requests(alive_async_connections * 2);

std::vector<std::unique_ptr<AsyncHandler>> handlers;
Expand Down Expand Up @@ -550,6 +550,8 @@ template <typename RPCContext>
template <bool enable_fine_grained_shuffle>
void ExchangeReceiverBase<RPCContext>::readLoop(const Request & req)
{
addSyncConnectionNum();

GET_METRIC(tiflash_thread_count, type_threads_of_receiver_read_loop).Increment();
SCOPE_EXIT({
GET_METRIC(tiflash_thread_count, type_threads_of_receiver_read_loop).Decrement();
Expand Down Expand Up @@ -816,7 +818,7 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
const String & local_err_msg,
const LoggerPtr & log)
{
Int32 copy_live_connections;
Int32 copy_live_conn = -1;
{
std::lock_guard lock(mu);

Expand All @@ -827,30 +829,26 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
if (err_msg.empty())
err_msg = local_err_msg;
}

copy_live_connections = --live_connections;
copy_live_conn = --live_connections;
}

LOG_DEBUG(
log,
"connection end. meet error: {}, err msg: {}, current alive connections: {}",
meet_error,
local_err_msg,
copy_live_connections);
copy_live_conn);

if (copy_live_connections == 0)
if (copy_live_conn == 0)
{
LOG_DEBUG(log, "All threads end in ExchangeReceiver");
cv.notify_all();
}
else if (copy_live_connections < 0)
throw Exception("alive_connection_num should not be less than 0!");
else if (copy_live_conn < 0)
throw Exception("live_connections should not be less than 0!");

if (meet_error || copy_live_connections == 0)
{
LOG_INFO(exc_log, "receiver channels finished");
if (meet_error || copy_live_conn == 0)
finishAllMsgChannels();
}
}

template <typename RPCContext>
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ class ExchangeReceiverBase
private:
void prepareMsgChannels();
void addLocalConnectionNum();
void addSyncConnectionNum();
void addAsyncConnectionNum(Int32 conn_num);

void connectionLocalDone();
void handleConnectionAfterException();

bool isReceiverForTiFlashStorage()
{
Expand All @@ -178,7 +180,6 @@ class ExchangeReceiverBase
const bool enable_fine_grained_shuffle_flag;
const size_t output_stream_count;
const size_t max_buffer_size;
Int32 connection_uncreated_num;

std::shared_ptr<ThreadManager> thread_manager;
DAGSchema schema;
Expand All @@ -188,8 +189,8 @@ class ExchangeReceiverBase
std::mutex mu;
std::condition_variable cv;
/// should lock `mu` when visit these members
Int32 live_local_connections;
Int32 live_connections;
Int32 live_local_connections;
ExchangeReceiverState state;
String err_msg;

Expand Down

0 comments on commit db7551c

Please sign in to comment.