Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fleet_executor] Add retry to the message bus's send. Use unique_lock instead of calling lock(). #37087

Merged
merged 2 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions paddle/fluid/distributed/fleet_executor/interceptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ bool Interceptor::EnqueueRemoteInterceptorMessage(
// Called by Carrier, enqueue an InterceptorMessage to remote mailbox
VLOG(3) << "Enqueue message: " << interceptor_message.message_type()
<< " into " << interceptor_id_ << "'s remote mailbox.";
remote_mailbox_mutex_.lock();
std::unique_lock<std::mutex> lock(remote_mailbox_mutex_);
remote_mailbox_.push(interceptor_message);
remote_mailbox_mutex_.unlock();
return true;
}

Expand Down
25 changes: 18 additions & 7 deletions paddle/fluid/distributed/fleet_executor/message_bus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,25 @@ bool MessageBus::Send(const InterceptorMessage& interceptor_message) {
int64_t src_id = interceptor_message.src_id();
int64_t dst_id = interceptor_message.dst_id();
if (IsSameRank(src_id, dst_id)) {
VLOG(3) << "Send a message from: " << src_id << " to " << dst_id
<< " within a same rank.";
VLOG(3) << "Send a message from rank " << src_id << " to rank " << dst_id
<< ", which are same ranks.";
return SendIntraRank(interceptor_message);
} else {
VLOG(3) << "Send a message from: " << src_id << " to " << dst_id
<< " between different ranks.";
VLOG(3) << "Send a message from rank " << src_id << " to rank " << dst_id
<< ", which are different ranks.";
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
return SendInterRank(interceptor_message);
int retry_time = 0; // message bus will retry sending for 10 times
while (retry_time < 10) {
++retry_time;
if (SendInterRank(interceptor_message)) {
VLOG(3) << "Message bus sends inter rank successfully with "
<< retry_time << " times retries.";
return true;
}
}
VLOG(3) << "Message bus sends inter rank fail after 10 times retries.";
return false;
#else
PADDLE_THROW(platform::errors::Unavailable(
"Fleet executor does not support sending message between different "
Expand Down Expand Up @@ -134,6 +144,7 @@ bool MessageBus::SendInterRank(const InterceptorMessage& interceptor_message) {
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "baidu_std";
options.connect_timeout_ms = 1000;
options.timeout_ms = 1000;
options.max_retry = 5;
PADDLE_ENFORCE_EQ(
Expand All @@ -149,11 +160,11 @@ bool MessageBus::SendInterRank(const InterceptorMessage& interceptor_message) {
VLOG(3) << "Message bus: brpc sends success.";
return true;
} else {
VLOG(3) << "Message bus: InterceptorMessageService error.";
VLOG(4) << "Message bus: InterceptorMessageService error.";
return false;
}
} else {
VLOG(3) << "Message bus: brpc sends failed with error text: "
VLOG(4) << "Message bus: brpc sends failed with error text: "
<< ctrl.ErrorText();
return false;
}
Expand Down