Skip to content

Commit

Permalink
[#386] Switch ResettableHeartbeater with Periodic timer
Browse files Browse the repository at this point in the history
Summary:
Currently the raft consensus uses Resettable timer for keeping track of
heartbeats from other peers.
This has the disadvantage that there is a thread for each tablet peer that a
tablet is tracking. We need to move this to Periodic timer so that we can
use the ev thread as needed and not use a dedicated thread. This does not
change the heartbeater thread communicating from the tserver to the leader
master.

Test Plan: consensus_peer-test, Jenkins

Reviewers: sergei, mikhail

Reviewed By: sergei

Subscribers: kannan, ybase, bharat

Differential Revision: https://phabricator.dev.yugabyte.com/D5449
  • Loading branch information
rven1 committed Sep 15, 2018
1 parent c605e4c commit 92d442a
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 434 deletions.
14 changes: 14 additions & 0 deletions src/yb/consensus/consensus-test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "yb/consensus/raft_consensus.h"
#include "yb/gutil/map-util.h"
#include "yb/gutil/strings/substitute.h"
#include "yb/rpc/messenger.h"
#include "yb/server/clock.h"
#include "yb/util/countdown_latch.h"
#include "yb/util/locks.h"
Expand All @@ -72,6 +73,7 @@ namespace yb {
namespace consensus {

using log::Log;
using rpc::Messenger;
using strings::Substitute;

inline ReplicateMsgPtr CreateDummyReplicate(int term,
Expand Down Expand Up @@ -387,13 +389,19 @@ class NoOpTestPeerProxyFactory : public PeerProxyFactory {
public:
NoOpTestPeerProxyFactory() {
CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
messenger_ = CHECK_RESULT(rpc::MessengerBuilder("test").Build());
}

PeerProxyPtr NewProxy(const RaftPeerPB& peer_pb) override {
return std::make_unique<NoOpTestPeerProxy>(pool_.get(), peer_pb);
}

std::shared_ptr<rpc::Messenger> messenger() const override {
return messenger_;
}

gscoped_ptr<ThreadPool> pool_;
std::shared_ptr<rpc::Messenger> messenger_;
};

typedef std::unordered_map<std::string, std::shared_ptr<RaftConsensus> > TestPeerMap;
Expand Down Expand Up @@ -592,6 +600,7 @@ class LocalTestPeerProxyFactory : public PeerProxyFactory {
explicit LocalTestPeerProxyFactory(TestPeerMapManager* peers)
: peers_(peers) {
CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
messenger_ = CHECK_RESULT(rpc::MessengerBuilder("test").Build());
}

PeerProxyPtr NewProxy(const consensus::RaftPeerPB& peer_pb) override {
Expand All @@ -605,8 +614,13 @@ class LocalTestPeerProxyFactory : public PeerProxyFactory {
return proxies_;
}

std::shared_ptr<rpc::Messenger> messenger() const override {
return messenger_;
}

private:
gscoped_ptr<ThreadPool> pool_;
std::shared_ptr<rpc::Messenger> messenger_;
TestPeerMapManager* const peers_;
// NOTE: There is no need to delete this on the dctor because proxies are externally managed
vector<LocalTestPeerProxy*> proxies_;
Expand Down
15 changes: 12 additions & 3 deletions src/yb/consensus/consensus_peers-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "yb/consensus/log_util.h"
#include "yb/consensus/opid_util.h"
#include "yb/fs/fs_manager.h"
#include "yb/rpc/messenger.h"
#include "yb/server/hybrid_clock.h"
#include "yb/util/metrics.h"
#include "yb/util/test_macros.h"
Expand All @@ -62,6 +63,8 @@ namespace consensus {
using log::Log;
using log::LogOptions;
using log::LogAnchorRegistry;
using rpc::Messenger;
using rpc::MessengerBuilder;
using std::shared_ptr;
using std::unique_ptr;

Expand All @@ -79,6 +82,8 @@ class ConsensusPeersTest : public YBTest {

void SetUp() override {
YBTest::SetUp();
MessengerBuilder bld("test");
messenger_ = ASSERT_RESULT(bld.Build());
ASSERT_OK(ThreadPoolBuilder("test-raft-pool").Build(&raft_pool_));
raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
ASSERT_OK(ThreadPoolBuilder("append").Build(&append_pool_));
Expand Down Expand Up @@ -117,6 +122,7 @@ class ConsensusPeersTest : public YBTest {
ASSERT_OK(log_->WaitUntilAllFlushed());
append_pool_->Shutdown();
raft_pool_->Shutdown();
messenger_->Shutdown();
}

DelayablePeerProxy<NoOpTestPeerProxy>* NewRemotePeer(
Expand All @@ -128,7 +134,7 @@ class ConsensusPeersTest : public YBTest {
raft_pool_.get(), new NoOpTestPeerProxy(raft_pool_.get(), peer_pb));
*peer = CHECK_RESULT(Peer::NewRemotePeer(
peer_pb, kTabletId, kLeaderUuid, message_queue_.get(), raft_pool_token_.get(),
PeerProxyPtr(proxy_ptr), nullptr /* consensus */));
PeerProxyPtr(proxy_ptr), nullptr /* consensus */, messenger_));
return proxy_ptr;
}

Expand Down Expand Up @@ -165,6 +171,7 @@ class ConsensusPeersTest : public YBTest {
LogOptions options_;
unique_ptr<ThreadPoolToken> raft_pool_token_;
scoped_refptr<server::Clock> clock_;
shared_ptr<Messenger> messenger_;
};

// Tests that a remote peer is correctly built and tracked
Expand Down Expand Up @@ -311,7 +318,8 @@ TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
auto mock_proxy = new MockedPeerProxy(raft_pool_.get());
auto peer = ASSERT_RESULT(Peer::NewRemotePeer(
FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, message_queue_.get(),
raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */));
raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */,
messenger_));

// Make the peer respond without making any progress -- it always returns
// that it has only replicated op 0.0. When we see the response, we always
Expand Down Expand Up @@ -339,7 +347,8 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
auto mock_proxy = new MockedPeerProxy(raft_pool_.get());
auto peer = ASSERT_RESULT(Peer::NewRemotePeer(
FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, message_queue_.get(),
raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */));
raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */,
messenger_));

BOOST_SCOPE_EXIT(&peer) {
// This guarantees that the Peer object doesn't get destroyed if there is a pending request.
Expand Down
47 changes: 34 additions & 13 deletions src/yb/consensus/consensus_peers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "yb/gutil/stl_util.h"
#include "yb/gutil/strings/substitute.h"
#include "yb/rpc/messenger.h"
#include "yb/rpc/periodic.h"
#include "yb/tserver/tserver.pb.h"

#include "yb/util/backoff_waiter.h"
Expand Down Expand Up @@ -90,6 +91,7 @@ using log::Log;
using log::LogEntryBatch;
using std::shared_ptr;
using rpc::Messenger;
using rpc::PeriodicTimer;
using rpc::RpcController;
using strings::Substitute;

Expand All @@ -99,36 +101,53 @@ Result<PeerPtr> Peer::NewRemotePeer(const RaftPeerPB& peer_pb,
PeerMessageQueue* queue,
ThreadPoolToken* raft_pool_token,
PeerProxyPtr proxy,
Consensus* consensus) {
Consensus* consensus,
std::shared_ptr<rpc::Messenger> messenger) {
auto new_peer = std::make_shared<Peer>(
peer_pb, tablet_id, leader_uuid, std::move(proxy), queue, raft_pool_token, consensus);
peer_pb, tablet_id, leader_uuid, std::move(proxy), queue, raft_pool_token, consensus,
std::move(messenger));
RETURN_NOT_OK(new_peer->Init());
return Result<PeerPtr>(std::move(new_peer));
}

Peer::Peer(
const RaftPeerPB& peer_pb, string tablet_id, string leader_uuid, PeerProxyPtr proxy,
PeerMessageQueue* queue, ThreadPoolToken* raft_pool_token, Consensus* consensus)
PeerMessageQueue* queue, ThreadPoolToken* raft_pool_token, Consensus* consensus,
std::shared_ptr<rpc::Messenger> messenger)
: tablet_id_(std::move(tablet_id)),
leader_uuid_(std::move(leader_uuid)),
peer_pb_(peer_pb),
proxy_(std::move(proxy)),
queue_(queue),
heartbeater_(
peer_pb.permanent_uuid(), MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms),
std::bind(&Peer::SignalRequest, this, RequestTriggerMode::kAlwaysSend)),
raft_pool_token_(raft_pool_token),
consensus_(consensus) {}
consensus_(consensus),
messenger_(std::move(messenger)) {}

void Peer::SetTermForTest(int term) {
response_.set_responder_term(term);
}

Status Peer::Init() {
std::lock_guard<simple_spinlock> lock(peer_lock_);
queue_->TrackPeer(peer_pb_.permanent_uuid());
RETURN_NOT_OK(heartbeater_.Start());
state_ = kPeerStarted;
{
std::lock_guard<simple_spinlock> lock(peer_lock_);
queue_->TrackPeer(peer_pb_.permanent_uuid());
}
// Capture a weak_ptr reference into the functor so it can safely handle
// outliving the peer.
std::weak_ptr<Peer> w = shared_from_this();
heartbeater_ = PeriodicTimer::Create(
messenger_,
[w]() {
if (auto p = w.lock()) {
Status s = p->SignalRequest(RequestTriggerMode::kAlwaysSend);
}
},
MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
heartbeater_->Start();
{
std::lock_guard<simple_spinlock> lock(peer_lock_);
state_ = kPeerStarted;
}
return Status::OK();
}

Expand Down Expand Up @@ -273,7 +292,7 @@ void Peer::SendNextRequest(RequestTriggerMode trigger_mode) {

// If we're actually sending ops there's no need to heartbeat for a while, reset the heartbeater.
if (req_has_ops) {
heartbeater_.Reset();
heartbeater_->Snooze();
}

MAYBE_FAULT(FLAGS_fault_crash_on_leader_request_fraction);
Expand Down Expand Up @@ -422,7 +441,9 @@ string Peer::LogPrefix() const {
}

void Peer::Close() {
WARN_NOT_OK(heartbeater_.Stop(), "Could not stop heartbeater");
if (heartbeater_) {
heartbeater_->Stop();
}

// If the peer is already closed return.
{
Expand Down
15 changes: 11 additions & 4 deletions src/yb/consensus/consensus_peers.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,18 @@
#include "yb/util/countdown_latch.h"
#include "yb/util/locks.h"
#include "yb/util/net/net_util.h"
#include "yb/util/resettable_heartbeater.h"
#include "yb/util/semaphore.h"
#include "yb/util/status.h"

namespace yb {
class HostPort;
class ThreadPoolToken;

namespace rpc {
class Messenger;
class PeriodicTimer;
}

namespace log {
class Log;
}
Expand Down Expand Up @@ -123,7 +127,8 @@ class Peer : public std::enable_shared_from_this<Peer> {
public:
Peer(const RaftPeerPB& peer, std::string tablet_id, std::string leader_uuid,
PeerProxyPtr proxy, PeerMessageQueue* queue,
ThreadPoolToken* raft_pool_token, Consensus* consensus);
ThreadPoolToken* raft_pool_token, Consensus* consensus,
std::shared_ptr<rpc::Messenger> messenger);

// Initializes a peer and get its status.
CHECKED_STATUS Init();
Expand Down Expand Up @@ -165,7 +170,8 @@ class Peer : public std::enable_shared_from_this<Peer> {
PeerMessageQueue* queue,
ThreadPoolToken* raft_pool_token,
PeerProxyPtr proxy,
Consensus* consensus);
Consensus* consensus,
std::shared_ptr<rpc::Messenger> messenger);

uint64_t failed_attempts() {
std::lock_guard<simple_spinlock> l(peer_lock_);
Expand Down Expand Up @@ -242,7 +248,7 @@ class Peer : public std::enable_shared_from_this<Peer> {

// Heartbeater for remote peer implementations. This will send status only requests to the remote
// peers whenever we go more than 'FLAGS_raft_heartbeat_interval_ms' without sending actual data.
ResettableHeartbeater heartbeater_;
std::shared_ptr<rpc::PeriodicTimer> heartbeater_;

// Thread pool used to construct requests to this peer.
ThreadPoolToken* raft_pool_token_;
Expand All @@ -259,6 +265,7 @@ class Peer : public std::enable_shared_from_this<Peer> {
mutable simple_spinlock peer_lock_;
State state_ = kPeerCreated;
Consensus* consensus_ = nullptr;
std::shared_ptr<rpc::Messenger> messenger_;
};

// A proxy to another peer. Usually a thin wrapper around an rpc proxy but can be replaced for
Expand Down
12 changes: 12 additions & 0 deletions src/yb/consensus/leader_election-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "yb/consensus/leader_election.h"

#include <functional>
#include <memory>
#include <string>
#include <vector>

Expand All @@ -46,8 +47,14 @@
#include "yb/util/test_util.h"

namespace yb {

namespace rpc {
class Messenger;
} // namespace rpc

namespace consensus {

using std::shared_ptr;
using std::string;
using std::unordered_map;
using std::vector;
Expand Down Expand Up @@ -103,11 +110,16 @@ class FromMapPeerProxyFactory : public PeerProxyFactory {
used_peer_proxy_.clear();
}

shared_ptr<rpc::Messenger> messenger() const override {
return null_messenger_;
}

private:
// FYI, the tests may add and remove nodes from this map while we hold a
// reference to it.
const ProxyMap* const proxy_map_;
std::set<string> used_peer_proxy_;
shared_ptr<rpc::Messenger> null_messenger_;
};

class LeaderElectionTest : public YBTest {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/consensus/peer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void PeerManager::UpdateRaftConfig(const RaftConfigPB& config) {
VLOG(1) << GetLogPrefix() << "Adding remote peer. Peer: " << peer_pb.ShortDebugString();
auto remote_peer = Peer::NewRemotePeer(
peer_pb, tablet_id_, local_uuid_, queue_, raft_pool_token_,
peer_proxy_factory_->NewProxy(peer_pb), consensus_);
peer_proxy_factory_->NewProxy(peer_pb), consensus_, peer_proxy_factory_->messenger());
if (!remote_peer.ok()) {
LOG(WARNING) << "Failed to create remote peer for " << peer_pb.ShortDebugString() << ": "
<< remote_peer.status();
Expand Down
11 changes: 7 additions & 4 deletions src/yb/consensus/raft_consensus_quorum-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -745,17 +745,20 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) {
int repl0_init_count = counter_hook_rpl0->num_pre_update_calls();
int repl1_init_count = counter_hook_rpl1->num_pre_update_calls();

// Now wait for about 4 times the hearbeat period the counters
// should have increased 3/4 times.
// Now wait for about 4 times the heartbeat period the counters
// should have increased between 3 to 8 times.
//
// Why the variance? Heartbeat timing is jittered such that the period
// between heartbeats can be anywhere from half the interval to the full interval.
SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 4));

int repl0_final_count = counter_hook_rpl0->num_pre_update_calls();
int repl1_final_count = counter_hook_rpl1->num_pre_update_calls();

ASSERT_GE(repl0_final_count - repl0_init_count, 3);
ASSERT_LE(repl0_final_count - repl0_init_count, 4);
ASSERT_LE(repl0_final_count - repl0_init_count, 8);
ASSERT_GE(repl1_final_count - repl1_init_count, 3);
ASSERT_LE(repl1_final_count - repl1_init_count, 4);
ASSERT_LE(repl1_final_count - repl1_init_count, 8);

VerifyLogs(2, 0, 1);
}
Expand Down
2 changes: 0 additions & 2 deletions src/yb/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ set(UTIL_SRCS
pstack_watcher.cc
random_util.cc
ref_cnt_buffer.cc
resettable_heartbeater.cc
rolling_log.cc
rw_mutex.cc
rwc_lock.cc
Expand Down Expand Up @@ -362,7 +361,6 @@ ADD_YB_TEST(pstack_watcher-test)
ADD_YB_TEST(ref_cnt_buffer-test)
ADD_YB_TEST(random-test)
ADD_YB_TEST(random_util-test)
ADD_YB_TEST(resettable_heartbeater-test)
ADD_YB_TEST(result-test)
ADD_YB_TEST(rle-test)
ADD_YB_TEST(rolling_log-test)
Expand Down
Loading

0 comments on commit 92d442a

Please sign in to comment.