Skip to content

Commit

Permalink
#2964: Improved RPC heartbeats sending
Browse files Browse the repository at this point in the history
Summary:
In case of network errors RPC heartbeats could accumulate in `TcpStream::sending_` queue
until `TcpStream` is closed or healed. Added a restriction to not queue another heartbeat if
previous one is still in queue.

Test Plan: Jenkins

Reviewers: bogdan, raju, sergei

Reviewed By: sergei

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D7635
  • Loading branch information
ttyusupov committed Jan 17, 2020
1 parent b0dd1e7 commit bc12637
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/yb/rpc/tcp_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ Status TcpStream::DoWrite() {
iovec iov[kMaxIov];
auto fill_result = FillIov(iov);

context_->UpdateLastWrite();
if (!fill_result.only_heartbeats) {
context_->UpdateLastActivity();
}
Expand All @@ -216,6 +215,8 @@ Status TcpStream::DoWrite() {
}
}

context_->UpdateLastWrite();

send_position_ += written;
while (!sending_.empty()) {
auto& front = sending_.front();
Expand Down
17 changes: 11 additions & 6 deletions src/yb/rpc/yb_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,17 @@ void YBInboundConnectionContext::HandleTimeout(ev::timer& watcher, int revents)
const auto deadline =
std::max(last_heartbeat_sending_time_, last_write_time_) + HeartbeatPeriod();
if (now >= deadline) {
VLOG(4) << connection->ToString() << ": " << "Sending heartbeat, now: " << AsString(now)
<< ", deadline: " << AsString(deadline)
<< ", last_write_time_: " << AsString(last_write_time_)
<< ", last_heartbeat_sending_time_: " << AsString(last_heartbeat_sending_time_);
connection->QueueOutboundData(HeartbeatOutboundData::Instance());
last_heartbeat_sending_time_ = now;
if (last_write_time_ >= last_heartbeat_sending_time_) {
// last_write_time_ < last_heartbeat_sending_time_ means that last heartbeat we've queued
// for sending is still in queue due to RPC/networking issues, so no need to queue
// another one.
VLOG(4) << connection->ToString() << ": " << "Sending heartbeat, now: " << AsString(now)
<< ", deadline: " << AsString(deadline)
<< ", last_write_time_: " << AsString(last_write_time_)
<< ", last_heartbeat_sending_time_: " << AsString(last_heartbeat_sending_time_);
connection->QueueOutboundData(HeartbeatOutboundData::Instance());
last_heartbeat_sending_time_ = now;
}
timer_.Start(HeartbeatPeriod());
} else {
timer_.Start(deadline - now);
Expand Down

0 comments on commit bc12637

Please sign in to comment.