Skip to content

Commit

Permalink
tune exchange receiver to accelerate receiving data (#2401)
Browse files Browse the repository at this point in the history
  • Loading branch information
fzhedu authored Sep 10, 2021
1 parent d0825f0 commit bc9b63e
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 86 deletions.
3 changes: 1 addition & 2 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,11 @@ void InterpreterDAG::initMPPExchangeReceiver(const DAGQueryBlock & dag_query_blo
}
if (dag_query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver)
{
/// use max_streams * 5 as the default receiver buffer size, maybe make it more configurable
mpp_exchange_receiver_maps[dag_query_block.source_name] = std::make_shared<ExchangeReceiver>(
context,
dag_query_block.source->exchange_receiver(),
dag.getDAGContext().getMPPTaskMeta(),
max_streams * 5);
max_streams);
}
}

Expand Down
150 changes: 127 additions & 23 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@ void ExchangeReceiver::setUpConnection()
}
}

String getReceiverStateStr(const State & s)
{
switch (s)
{
case NORMAL:
return "NORMAL";
case ERROR:
return "ERROR";
case CANCELED:
return "CANCELED";
case CLOSED:
return "CLOSED";
default:
return "UNKNOWN";
}
}

void ExchangeReceiver::ReadLoop(const String & meta_raw, size_t source_index)
{
bool meet_error = false;
Expand All @@ -53,8 +70,11 @@ void ExchangeReceiver::ReadLoop(const String & meta_raw, size_t source_index)
try
{
auto sender_task = new mpp::TaskMeta();
if (!sender_task->ParseFromString(meta_raw))
{
throw Exception("parse task meta error!");
}
send_task_id = sender_task->task_id();
sender_task->ParseFromString(meta_raw);
auto req = std::make_shared<mpp::EstablishMPPConnectionRequest>();
req->set_allocated_receiver_meta(new mpp::TaskMeta(task_meta));
req->set_allocated_sender_meta(sender_task);
Expand All @@ -66,27 +86,54 @@ void ExchangeReceiver::ReadLoop(const String & meta_raw, size_t source_index)
grpc::ClientContext client_context;
auto reader = cluster->rpc_client->sendStreamRequest(req->sender_meta().address(), &client_context, call);
reader->WaitForInitialMetadata();
mpp::MPPDataPacket packet;
std::shared_ptr<ReceivedPacket> packet;
String req_info = "tunnel" + std::to_string(send_task_id) + "+" + std::to_string(recv_task_id);
bool has_data = false;
for (;;)
{
LOG_TRACE(log, "begin next ");
bool success = reader->Read(&packet);
{
std::unique_lock<std::mutex> lock(mu);
cv.wait(lock, [&] { return res_buffer.hasEmpty() || state != NORMAL; });
if (state == NORMAL)
{
res_buffer.popEmpty(packet);
cv.notify_all();
}
else
{
meet_error = true;
local_err_msg = "receiver's state is " + getReceiverStateStr(state) + ", exit from ReadLoop";
LOG_WARNING(log, local_err_msg);
break;
}
}
packet->req_info = req_info;
packet->source_index = source_index;
bool success = reader->Read(packet->packet.get());
if (!success)
break;
else
has_data = true;
if (packet.has_error())
if (packet->packet->has_error())
{
throw Exception("Exchange receiver meet error : " + packet.error().msg());
throw Exception("Exchange receiver meet error : " + packet->packet->error().msg());
}
if (!decodePacket(packet, source_index, req_info))
{
meet_error = true;
local_err_msg = "Decode packet meet error";
LOG_WARNING(log, "Decode packet meet error, exit from ReadLoop");
break;
std::unique_lock<std::mutex> lock(mu);
cv.wait(lock, [&] { return res_buffer.canPush() || state != NORMAL; });
if (state == NORMAL)
{
res_buffer.pushObject(packet);
cv.notify_all();
}
else
{
meet_error = true;
local_err_msg = "receiver's state is " + getReceiverStateStr(state) + ", exit from ReadLoop";
LOG_WARNING(log, local_err_msg);
break;
}
}
}
// if meet error, such as decode packect fails, it will not retry.
Expand Down Expand Up @@ -133,22 +180,79 @@ void ExchangeReceiver::ReadLoop(const String & meta_raw, size_t source_index)
meet_error = true;
local_err_msg = "fatal error";
}
std::lock_guard<std::mutex> lock(mu);
live_connections--;

// avoid concurrent conflict
Int32 live_conn_copy = live_connections;
Int32 copy_live_conn = -1;
{
std::unique_lock<std::mutex> lock(mu);
live_connections--;
if (meet_error && state == NORMAL)
state = ERROR;
if (meet_error && err_msg.empty())
err_msg = local_err_msg;
copy_live_conn = live_connections;
cv.notify_all();
}
LOG_DEBUG(log, fmt::format("{} -> {} end! current alive connections: {}", send_task_id, recv_task_id, copy_live_conn));

if (meet_error && state == NORMAL)
state = ERROR;
if (meet_error && err_msg.empty())
err_msg = local_err_msg;
cv.notify_all();
if (copy_live_conn == 0)
LOG_DEBUG(log, fmt::format("All threads end in ExchangeReceiver"));
else if (copy_live_conn < 0)
throw Exception("live_connections should not be less than 0!");
}

LOG_DEBUG(log, fmt::format("{} -> {} end! current alive connections: {}", send_task_id, recv_task_id, live_conn_copy));
ExchangeReceiverResult ExchangeReceiver::nextResult()
{
std::shared_ptr<ReceivedPacket> packet;
{
std::unique_lock<std::mutex> lock(mu);
cv.wait(lock, [&] { return res_buffer.hasObjects() || live_connections == 0 || state != NORMAL; });

if (live_conn_copy == 0)
LOG_DEBUG(log, fmt::format("All threads end in ExchangeReceiver"));
if (state != NORMAL)
{
String msg;
if (state == CANCELED)
msg = "query canceled";
else if (state == CLOSED)
msg = "ExchangeReceiver closed";
else if (!err_msg.empty())
msg = err_msg;
else
msg = "Unknown error";
return {nullptr, 0, "ExchangeReceiver", true, msg, false};
}
else if (res_buffer.hasObjects())
{
res_buffer.popObject(packet);
cv.notify_all();
}
else /// live_connections == 0, res_buffer is empty, and state is NORMAL, that is the end.
{
return {nullptr, 0, "ExchangeReceiver", false, "", true};
}
}
assert(packet != nullptr && packet->packet != nullptr);
ExchangeReceiverResult result;
if (packet->packet->has_error())
{
result = {nullptr, packet->source_index, packet->req_info, true, packet->packet->error().msg(), false};
}
else
{
auto resp_ptr = std::make_shared<tipb::SelectResponse>();
if (!resp_ptr->ParseFromString(packet->packet->data()))
{
result = {nullptr, packet->source_index, packet->req_info, true, "decode error", false};
}
else
{
result = {resp_ptr, packet->source_index, packet->req_info};
}
}
packet->packet->Clear();
std::unique_lock<std::mutex> lock(mu);
cv.wait(lock, [&] { return res_buffer.canPushEmpty(); });
res_buffer.pushEmpty(std::move(packet));
cv.notify_all();
return result;
}

} // namespace DB
Loading

0 comments on commit bc9b63e

Please sign in to comment.