Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mutex to protect exchange receiver's async client #5008

Merged
merged 7 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,14 @@ class AsyncRequestHandler : public UnaryCallback<bool>
switch (stage)
{
case AsyncRequestStage::WAIT_MAKE_READER:
{
// Use lock to ensure reader is created already in reactor thread
std::unique_lock lock(mu);
if (!ok)
reader.reset();
notifyReactor();
break;
}
case AsyncRequestStage::WAIT_BATCH_READ:
if (ok)
++read_packet_index;
Expand Down Expand Up @@ -227,6 +231,8 @@ class AsyncRequestHandler : public UnaryCallback<bool>
void start()
{
stage = AsyncRequestStage::WAIT_MAKE_READER;
// Use lock to ensure async reader is unreachable from grpc thread before this function returns
std::unique_lock lock(mu);
rpc_context->makeAsyncReader(*request, reader, thisAsUnaryCallback());
}

Expand Down Expand Up @@ -283,6 +289,7 @@ class AsyncRequestHandler : public UnaryCallback<bool>
size_t read_packet_index = 0;
Status finish_status = RPCContext::getStatusOK();
LoggerPtr log;
std::mutex mu;
};
} // namespace

Expand Down Expand Up @@ -393,10 +400,10 @@ void ExchangeReceiverBase<RPCContext>::reactor(const std::vector<Request> & asyn
MPMCQueue<AsyncHandler *> ready_requests(alive_async_connections * 2);
std::vector<AsyncHandler *> waiting_for_retry_requests;

std::vector<AsyncRequestHandler<RPCContext>> handlers;
std::vector<std::unique_ptr<AsyncHandler>> handlers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't get what would happen without this change, could you tell me something?

Copy link
Contributor Author

@yibin87 yibin87 May 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In AsyncGrpcExchangePacketReader‘s init function, reader = cluster->rpc_client->sendStreamRequestAsync(xxx), the right part can execute first, and grpc thread sees the reader.
I start from the issue's core down stack, so this suspicous one could be the problem. And I can't tell a whole story about what happend either. Just this seems a suspicous one. And fixed binary doesn't core down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's a great job and I've got what happened about this bug.

The only remaining question is why you change the element type of handlers from a value type to a unique_ptr.

It's ok, but why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutex field is not support for copy constructor...

handlers.reserve(alive_async_connections);
for (const auto & req : async_requests)
handlers.emplace_back(&ready_requests, &msg_channel, rpc_context, req, exc_log->identifier());
handlers.emplace_back(std::make_unique<AsyncHandler>(&ready_requests, &msg_channel, rpc_context, req, exc_log->identifier()));

while (alive_async_connections > 0)
{
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Functions/FunctionsDateTime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ void registerFunctionsDateTime(FunctionFactory & factory)
factory.registerFunction<FunctionToTiDBToSeconds>();
factory.registerFunction<FunctionToTiDBToDays>();
factory.registerFunction<FunctionTiDBFromDays>();

factory.registerFunction<FunctionToTimeZone>();
factory.registerFunction<FunctionToLastDay>();
}
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Functions/FunctionsDateTime.h
Original file line number Diff line number Diff line change
Expand Up @@ -3450,7 +3450,6 @@ using FunctionToTiDBDayOfYear = FunctionMyDateOrMyDateTimeToSomething<DataTypeUI
using FunctionToTiDBWeekOfYear = FunctionMyDateOrMyDateTimeToSomething<DataTypeUInt16, TiDBWeekOfYearTransformerImpl, return_nullable>;
using FunctionToTiDBToSeconds = FunctionMyDateOrMyDateTimeToSomething<DataTypeUInt64, TiDBToSecondsTransformerImpl, return_nullable>;
using FunctionToTiDBToDays = FunctionMyDateOrMyDateTimeToSomething<DataTypeUInt32, TiDBToDaysTransformerImpl, return_nullable>;

using FunctionToRelativeYearNum = FunctionDateOrDateTimeToSomething<DataTypeUInt16, ToRelativeYearNumImpl>;
using FunctionToRelativeQuarterNum = FunctionDateOrDateTimeToSomething<DataTypeUInt32, ToRelativeQuarterNumImpl>;
using FunctionToRelativeMonthNum = FunctionDateOrDateTimeToSomething<DataTypeUInt32, ToRelativeMonthNumImpl>;
Expand Down