Skip to content

Commit

Permalink
Revert changes to consensus_peers.cc
Browse files Browse the repository at this point in the history
Summary:
Revert "Improve performance when sending peer's requests (#350)"
This reverts commit 5a062dd54a568c29903f2262e413d5f66c0f7cb8.

Revert "ENG-3422: #350 Remove semaphore from Peer class"
This reverts commit ebd5508b07dd541d48c425977cf4f56621b44357.

Test Plan: Verify that it builds

Reviewers: sergei, bogdan, mikhail

Reviewed By: mikhail

Subscribers: bharat, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D5159
  • Loading branch information
hectorgcr committed Jul 16, 2018
1 parent e39ec3f commit e2c86a6
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 70 deletions.
38 changes: 11 additions & 27 deletions src/yb/consensus/consensus_peers-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ namespace consensus {
using log::Log;
using log::LogOptions;
using log::LogAnchorRegistry;
using std::shared_ptr;
using std::unique_ptr;

const char* kTableId = "test-peers-table";
Expand Down Expand Up @@ -119,7 +118,7 @@ class ConsensusPeersTest : public YBTest {

DelayablePeerProxy<NoOpTestPeerProxy>* NewRemotePeer(
const string& peer_name,
std::shared_ptr<Peer>* peer) {
std::unique_ptr<Peer>* peer) {
RaftPeerPB peer_pb;
peer_pb.set_permanent_uuid(peer_name);
auto proxy_ptr = new DelayablePeerProxy<NoOpTestPeerProxy>(
Expand Down Expand Up @@ -165,6 +164,7 @@ class ConsensusPeersTest : public YBTest {
scoped_refptr<server::Clock> clock_;
};


// Tests that a remote peer is correctly built and tracked
// by the message queue.
// After the operations are considered done the proxy (which
Expand All @@ -174,12 +174,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) {
// We use a majority size of 2 since we make one fake remote peer
// in addition to our real local log.

std::shared_ptr<Peer> remote_peer;
BOOST_SCOPE_EXIT(&remote_peer) {
// This guarantees that the Peer object doesn't get destroyed if there is a pending request.
remote_peer->Close();
} BOOST_SCOPE_EXIT_END

std::unique_ptr<Peer> remote_peer;
DelayablePeerProxy<NoOpTestPeerProxy>* proxy = NewRemotePeer(kFollowerUuid, &remote_peer);

// Append a bunch of messages to the queue
Expand All @@ -200,21 +195,20 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) {

TEST_F(ConsensusPeersTest, TestLocalAppendAndRemotePeerDelay) {
// Create a set of remote peers
std::shared_ptr<Peer> remote_peer1;
std::unique_ptr<Peer> remote_peer1;
NewRemotePeer("peer-1", &remote_peer1);

std::shared_ptr<Peer> remote_peer2;
std::unique_ptr<Peer> remote_peer2;
DelayablePeerProxy<NoOpTestPeerProxy>* remote_peer2_proxy =
NewRemotePeer("peer-2", &remote_peer2);

// Delay the response from the second remote peer.
const auto kAppendDelayTime = 1s;
log_->TEST_SetSleepDuration(kAppendDelayTime);
remote_peer2_proxy->DelayResponse();
BOOST_SCOPE_EXIT(&remote_peer1, &remote_peer2) {
// This guarantees that the Peer objects don't get destroyed if there is a pending request.
remote_peer1->Close();
remote_peer2->Close();
BOOST_SCOPE_EXIT(&log_, &remote_peer2_proxy) {
log_->TEST_SetSleepDuration(0s);
remote_peer2_proxy->Respond(TestPeerProxy::kUpdate);
} BOOST_SCOPE_EXIT_END

// Append one message to the queue.
Expand All @@ -235,22 +229,15 @@ TEST_F(ConsensusPeersTest, TestLocalAppendAndRemotePeerDelay) {
}

TEST_F(ConsensusPeersTest, TestRemotePeers) {
// Create a set of remote peers.
std::shared_ptr<Peer> remote_peer1;

// Create a set of remote peers
std::unique_ptr<Peer> remote_peer1;
DelayablePeerProxy<NoOpTestPeerProxy>* remote_peer1_proxy =
NewRemotePeer("peer-1", &remote_peer1);

std::shared_ptr<Peer> remote_peer2;
std::unique_ptr<Peer> remote_peer2;
DelayablePeerProxy<NoOpTestPeerProxy>* remote_peer2_proxy =
NewRemotePeer("peer-2", &remote_peer2);

BOOST_SCOPE_EXIT(&remote_peer1, &remote_peer2) {
// This guarantees that the Peer objects don't get destroyed if there is a pending request.
remote_peer1->Close();
remote_peer2->Close();
} BOOST_SCOPE_EXIT_END

// Delay the response from the second remote peer.
remote_peer2_proxy->DelayResponse();

Expand Down Expand Up @@ -387,9 +374,6 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
// OK to have called UpdateConsensus() a few times due to regularly
// scheduled heartbeats.
ASSERT_LT(mock_proxy->update_count() - initial_update_count, 5);

// This guarantees that the Peer object doesn't get destroyed if there is a pending request.
peer->Close();
}

} // namespace consensus
Expand Down
41 changes: 7 additions & 34 deletions src/yb/consensus/consensus_peers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ Status Peer::SignalRequest(RequestTriggerMode trigger_mode) {
std::lock_guard<simple_spinlock> l(peer_lock_);

if (PREDICT_FALSE(state_ == kPeerClosed)) {
shared_this_ = nullptr;
return STATUS(IllegalState, "Peer was closed.");
}

Expand Down Expand Up @@ -234,8 +233,7 @@ void Peer::SendNextRequest(RequestTriggerMode trigger_mode) {
auto status = consensus_->ChangeConfig(req, &DoNothingStatusCB, &error_code);
if (PREDICT_FALSE(!status.ok())) {
LOG(WARNING) << "Unable to change role for peer " << uuid << ": " << status.ToString(false);

// Since we released the semaphore, we need to call SignalRequest again to send a message.
// Since we released the semaphore, we need to call SignalRequest again to send a message
status = SignalRequest(RequestTriggerMode::kAlwaysSend);
if (PREDICT_FALSE(!status.ok())) {
LOG(WARNING) << "Unexpected error when trying to send request: "
Expand Down Expand Up @@ -336,11 +334,7 @@ void Peer::DoProcessResponse() {
bool more_pending = false;
queue_->ResponseFromPeer(peer_pb_.permanent_uuid(), response_, &more_pending);

auto state = state_.load(std::memory_order_acquire);
if (state == kPeerClosed) {
std::lock_guard<simple_spinlock> lock(peer_lock_);
shared_this_ = nullptr;
} else if (more_pending) {
if (more_pending && state_.load(std::memory_order_acquire) != kPeerClosed) {
lock.release();
SendNextRequest(RequestTriggerMode::kAlwaysSend);
}
Expand Down Expand Up @@ -378,7 +372,6 @@ void Peer::ProcessResponseError(const Status& status) {
<< " for tablet " << tablet_id_
<< " Status: " << status.ToString() << ". Retrying in the next heartbeat period."
<< " Already tried " << failed_attempts_ << " times. State: " << state_;

}

string Peer::LogPrefixUnlocked() const {
Expand All @@ -387,50 +380,30 @@ string Peer::LogPrefixUnlocked() const {
peer_pb_.last_known_addr().host(), peer_pb_.last_known_addr().port());
}

void Peer::Close(CreateReferenceToItself create_reference_to_itself) {
void Peer::Close() {
WARN_NOT_OK(heartbeater_.Stop(), "Could not stop heartbeater");

// If the peer is already closed return.
if (state_.load(std::memory_order_acquire) == kPeerClosed) {
return;
}
{
std::lock_guard<simple_spinlock> lock(peer_lock_);
if (state_ == kPeerClosed) return;
DCHECK(state_ == kPeerRunning || state_ == kPeerStarted) << "Unexpected state: " << state_;
state_ = kPeerClosed;

if (create_reference_to_itself) {
// We want this object to stay alive until we have processed the last response.
// If we can take ownership of the semaphore, we know it's safe to destroy this
// object, so we can destroy the reference we are creating here.
shared_this_ = shared_from_this();
}
}
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Closing peer: " << peer_pb_.permanent_uuid();

// Acquire the semaphore to wait for any concurrent request to finish. They will see the state_
// == kPeerClosed and not start any new requests, but we can't currently cancel the already-sent
// ones. (see KUDU-699)
std::unique_lock<Semaphore> lock(sem_, std::try_to_lock);
if (lock.owns_lock()) {
// If we are here there is no pending request. So we can destroy the reference to this object.
shared_this_ = nullptr;
}

std::lock_guard<Semaphore> l(sem_);
queue_->UntrackPeer(peer_pb_.permanent_uuid());
// We don't own the ops (the queue does).
request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), /* elements */ nullptr);
replicate_msg_refs_.clear();
}

Peer::~Peer() {
if (state_ != kPeerClosed) {
LOG_WITH_PREFIX_UNLOCKED(DFATAL)
<< "Peer::Closed() was not called before letting the object be destroyed";
Close(CreateReferenceToItself::kFalse);
}

// We don't own the ops (the queue does).
request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), /* elements */ nullptr);
Close();
}

RpcPeerProxy::RpcPeerProxy(HostPort hostport,
Expand Down
11 changes: 3 additions & 8 deletions src/yb/consensus/consensus_peers.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ namespace consensus {
// SignalRequest() return
//
class Peer;
typedef std::shared_ptr<Peer> PeerPtr;
typedef std::unique_ptr<Peer> PeerPtr;

class Peer : public std::enable_shared_from_this<Peer> {
class Peer {
public:
// Initializes a peer and get its status.
CHECKED_STATUS Init();
Expand All @@ -131,8 +131,6 @@ class Peer : public std::enable_shared_from_this<Peer> {
// behavior.
PeerProxy* GetPeerProxyForTests();

YB_STRONGLY_TYPED_BOOL(CreateReferenceToItself);

// Stop sending requests and periodic heartbeats.
//
// This does not block waiting on any current outstanding requests to finish.
Expand All @@ -142,7 +140,7 @@ class Peer : public std::enable_shared_from_this<Peer> {
// This method must be called before the Peer's associated ThreadPoolToken
// is destructed. Once this method returns, it is safe to destruct
// the ThreadPoolToken.
void Close(CreateReferenceToItself create_reference_to_itself = CreateReferenceToItself::kTrue);
void Close();

void SetTermForTest(int term);

Expand Down Expand Up @@ -248,9 +246,6 @@ class Peer : public std::enable_shared_from_this<Peer> {
mutable simple_spinlock peer_lock_;
std::atomic<State> state_;
Consensus* consensus_ = nullptr;

// Pointer to this object to avoid its destruction. Used to wait for pending requests.
std::shared_ptr<Peer> shared_this_ = nullptr;
};

// A proxy to another peer. Usually a thin wrapper around an rpc proxy but can be replaced for
Expand Down
2 changes: 1 addition & 1 deletion src/yb/consensus/peer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class PeerManager {
private:
std::string GetLogPrefix() const;

typedef std::unordered_map<std::string, std::shared_ptr<Peer>> PeersMap;
typedef std::unordered_map<std::string, std::unique_ptr<Peer>> PeersMap;
const std::string tablet_id_;
const std::string local_uuid_;
PeerProxyFactory* peer_proxy_factory_;
Expand Down

0 comments on commit e2c86a6

Please sign in to comment.