Skip to content

Commit

Permalink
ENG-4725: #932: Write RPC should take into account retryable_rpc_sing…
Browse files Browse the repository at this point in the history
…le_call_timeout_ms

Summary:
Write RPC does ignores value of retryable_rpc_single_call_timeout_ms flag for single RPC call.
It uses overall operation timeout. That causes issues, when initial request was sent to node that lost leadership during network partition.

Fixed by using retryable_rpc_single_call_timeout_ms for single RPC timeout calculation.

Also added flag combine_batcher_errors, so batcher would return actual error instead of IOError.
It simplifies writing tests.

Test Plan: ybd --cxx-test ql-stress-test --gtest_filter QLStressTest.NetworkPartition

Reviewers: timur, amitanand, mikhail

Reviewed By: mikhail

Subscribers: bogdan, ybase, bharat

Differential Revision: https://phabricator.dev.yugabyte.com/D6228
  • Loading branch information
spolitov committed Mar 8, 2019
1 parent 81c3b91 commit af4cec4
Show file tree
Hide file tree
Showing 29 changed files with 258 additions and 51 deletions.
10 changes: 7 additions & 3 deletions src/yb/client/async_initializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ AsyncClientInitialiser::AsyncClientInitialiser(
if (messenger) {
client_builder_.use_messenger(messenger);
}

init_client_thread_ = std::thread(std::bind(&AsyncClientInitialiser::InitClient, this));
}

AsyncClientInitialiser::~AsyncClientInitialiser() {
Shutdown();
init_client_thread_.join();
if (init_client_thread_.joinable()) {
init_client_thread_.join();
}
}

void AsyncClientInitialiser::Start() {
init_client_thread_ = std::thread(std::bind(&AsyncClientInitialiser::InitClient, this));
}

const std::shared_ptr<client::YBClient>& AsyncClientInitialiser::client() const {
Expand Down
4 changes: 4 additions & 0 deletions src/yb/client/async_initializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
namespace yb {
namespace client {

YB_STRONGLY_TYPED_BOOL(AutoStart);

class AsyncClientInitialiser {
public:
AsyncClientInitialiser(
Expand All @@ -35,6 +37,8 @@ class AsyncClientInitialiser {

void Shutdown() { stopping_ = true; }

void Start();

const std::shared_ptr<client::YBClient>& client() const;
const std::shared_future<client::YBClientPtr>& get_client_future() const {
return client_future_;
Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/async_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ void WriteRpc::CallRemoteMethod() {
ADOPT_TRACE(trace.get());

tablet_invoker_.proxy()->WriteAsync(
req_, &resp_, PrepareController(MonoDelta::kMax),
req_, &resp_, PrepareController(),
std::bind(&WriteRpc::Finished, this, Status::OK()));
TRACE_TO(trace, "RpcDispatched Asynchronously");
}
Expand Down
6 changes: 6 additions & 0 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ DEFINE_bool(client_suppress_created_logs, false,
TAG_FLAG(client_suppress_created_logs, advanced);
TAG_FLAG(client_suppress_created_logs, hidden);

DECLARE_bool(running_test);

namespace yb {
namespace client {

Expand Down Expand Up @@ -397,6 +399,10 @@ Status YBClientBuilder::Build(shared_ptr<YBClient>* client) {
builder.set_metric_entity(data_->metric_entity_);
builder.UseDefaultConnectionContextFactory(data_->parent_mem_tracker_);
c->data_->messenger_ = VERIFY_RESULT(builder.Build());

if (FLAGS_running_test) {
c->data_->messenger_->TEST_SetOutboundIpBase(VERIFY_RESULT(HostToAddress("127.0.0.1")));
}
}
c->data_->proxy_cache_ = std::make_unique<rpc::ProxyCache>(c->data_->messenger_);
c->data_->metric_entity_ = data_->metric_entity_;
Expand Down
70 changes: 70 additions & 0 deletions src/yb/client/ql-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
DECLARE_double(respond_write_failed_probability);
DECLARE_bool(detect_duplicates_for_retryable_requests);
DECLARE_int32(raft_heartbeat_interval_ms);
DECLARE_bool(combine_batcher_errors);

using namespace std::literals;

Expand Down Expand Up @@ -481,5 +482,74 @@ TEST_F_EX(QLStressTest, FlushCompact, QLStressTestSingleTablet) {
ASSERT_GE(num_iter, 5);
}

// The scenario of this test is the following:
// We do writes in background.
// Isolate leader for 10 seconds.
// Restore connectivity.
// Check that old leader was able to catch up after the partition is healed.
TEST_F_EX(QLStressTest, OldLeaderCatchUpAfterNetworkPartition, QLStressTestSingleTablet) {
FLAGS_combine_batcher_errors = true;

tablet::TabletPeer* leader_peer = nullptr;
std::atomic<int> key(0);
{
std::atomic<bool> stop(false);

std::thread writer([this, &stop, &key] {
auto session = NewSession();
std::string value_prefix = "value_";
while (!stop.load(std::memory_order_acquire)) {
ASSERT_OK(WriteRow(session, key, value_prefix + std::to_string(key)));
++key;
}
});

BOOST_SCOPE_EXIT(&stop, &writer) {
stop.store(true);
writer.join();
} BOOST_SCOPE_EXIT_END;

tserver::MiniTabletServer* leader = nullptr;
for (int i = 0; i != cluster_->num_tablet_servers(); ++i) {
auto current = cluster_->mini_tablet_server(i);
auto peers = current->server()->tablet_manager()->GetTabletPeers();
ASSERT_EQ(peers.size(), 1);
if (peers.front()->LeaderStatus() != consensus::LeaderStatus::NOT_LEADER) {
leader = current;
leader_peer = peers.front().get();
break;
}
}

ASSERT_NE(leader, nullptr);

std::this_thread::sleep_for(5s * yb::kTimeMultiplier);

auto pre_isolate_op_id = leader_peer->GetLatestLogEntryOpId();
LOG(INFO) << "Isolate, last op id: " << pre_isolate_op_id << ", key: " << key;
ASSERT_EQ(pre_isolate_op_id.term, 1);
ASSERT_GT(pre_isolate_op_id.index, key);
leader->SetIsolated(true);
std::this_thread::sleep_for(10s * yb::kTimeMultiplier);

auto pre_restore_op_id = leader_peer->GetLatestLogEntryOpId();
LOG(INFO) << "Restore, last op id: " << pre_restore_op_id << ", key: " << key;
ASSERT_EQ(pre_restore_op_id.term, 1);
ASSERT_GE(pre_restore_op_id.index, pre_isolate_op_id.index);
ASSERT_LE(pre_restore_op_id.index, pre_isolate_op_id.index + 10);
leader->SetIsolated(false);
std::this_thread::sleep_for(5s * yb::kTimeMultiplier);
}

ASSERT_OK(WaitFor([leader_peer, &key] {
return leader_peer->GetLatestLogEntryOpId().index > key;
}, 5s, "Old leader has enough operations"));

auto finish_op_id = leader_peer->GetLatestLogEntryOpId();
LOG(INFO) << "Finish, last op id: " << finish_op_id << ", key: " << key;
ASSERT_GT(finish_op_id.term, 1);
ASSERT_GT(finish_op_id.index, key);
}

} // namespace client
} // namespace yb
19 changes: 10 additions & 9 deletions src/yb/consensus/raft_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2346,8 +2346,9 @@ MicrosTime RaftConsensus::MajorityReplicatedHtLeaseExpiration(
return state_->MajorityReplicatedHtLeaseExpiration(min_allowed, deadline);
}

std::string RaftConsensus::GetRequestVoteLogPrefix() const {
return state_->LogPrefix() + "Leader election vote request";
std::string RaftConsensus::GetRequestVoteLogPrefix(const VoteRequestPB& request) const {
return Format("$0 Leader $1election vote request",
state_->LogPrefix(), request.preelection() ? "pre-" : "");
}

void RaftConsensus::FillVoteResponseVoteGranted(
Expand All @@ -2368,7 +2369,7 @@ Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request
FillVoteResponseVoteDenied(ConsensusErrorPB::INVALID_TERM, response);
string msg = Substitute("$0: Denying vote to candidate $1 for earlier term $2. "
"Current term is $3.",
GetRequestVoteLogPrefix(),
GetRequestVoteLogPrefix(*request),
request->candidate_uuid(),
request->candidate_term(),
state_->GetCurrentTermUnlocked());
Expand All @@ -2382,7 +2383,7 @@ Status RaftConsensus::RequestVoteRespondVoteAlreadyGranted(const VoteRequestPB*
FillVoteResponseVoteGranted(*request, response);
LOG(INFO) << Substitute("$0: Already granted yes vote for candidate $1 in term $2. "
"Re-sending same reply.",
GetRequestVoteLogPrefix(),
GetRequestVoteLogPrefix(*request),
request->candidate_uuid(),
request->candidate_term());
return Status::OK();
Expand All @@ -2393,7 +2394,7 @@ Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB
FillVoteResponseVoteDenied(ConsensusErrorPB::ALREADY_VOTED, response);
string msg = Substitute("$0: Denying vote to candidate $1 in current term $2: "
"Already voted for candidate $3 in this term.",
GetRequestVoteLogPrefix(),
GetRequestVoteLogPrefix(*request),
request->candidate_uuid(),
state_->GetCurrentTermUnlocked(),
state_->GetVotedForCurrentTermUnlocked());
Expand All @@ -2409,7 +2410,7 @@ Status RaftConsensus::RequestVoteRespondLastOpIdTooOld(const OpId& local_last_lo
string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
"replica has last-logged OpId of $3, which is greater than that of the "
"candidate, which has last-logged OpId of $4.",
GetRequestVoteLogPrefix(),
GetRequestVoteLogPrefix(*request),
request->candidate_uuid(),
request->candidate_term(),
local_last_logged_opid.ShortDebugString(),
Expand All @@ -2425,7 +2426,7 @@ Status RaftConsensus::RequestVoteRespondLeaderIsAlive(const VoteRequestPB* reque
std::string msg = Format(
"$0: Denying vote to candidate $1 for term $2 because replica is either leader or believes a "
"valid leader to be alive. Time left: $3",
GetRequestVoteLogPrefix(), request->candidate_uuid(), request->candidate_term(),
GetRequestVoteLogPrefix(*request), request->candidate_uuid(), request->candidate_term(),
withhold_votes_until_ - MonoTime::Now());
LOG(INFO) << msg;
StatusToPB(STATUS(InvalidArgument, msg), response->mutable_consensus_error()->mutable_status());
Expand All @@ -2438,7 +2439,7 @@ Status RaftConsensus::RequestVoteRespondIsBusy(const VoteRequestPB* request,
string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
"replica is already servicing an update from a current leader "
"or another vote.",
GetRequestVoteLogPrefix(),
GetRequestVoteLogPrefix(*request),
request->candidate_uuid(),
request->candidate_term());
LOG(INFO) << msg;
Expand All @@ -2465,7 +2466,7 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
SnoozeFailureDetector(DO_NOT_LOG, additional_backoff);

LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.",
GetRequestVoteLogPrefix(),
GetRequestVoteLogPrefix(*request),
request->candidate_uuid(),
state_->GetCurrentTermUnlocked());
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion src/yb/consensus/raft_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
HybridTime propagated_safe_time);

// Return header string for RequestVote log messages. The ReplicaState lock must be held.
std::string GetRequestVoteLogPrefix() const;
std::string GetRequestVoteLogPrefix(const VoteRequestPB& request) const;

// Fills the response with the current status, if an update was successful.
void FillConsensusResponseOKUnlocked(ConsensusResponsePB* response);
Expand Down
13 changes: 10 additions & 3 deletions src/yb/integration-tests/kv_table_ts_failover_write_if-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "yb/util/test_util.h"
#include "yb/yql/cql/ql/util/statement_result.h"

DECLARE_bool(combine_batcher_errors);

namespace yb {

using client::YBSessionPtr;
Expand Down Expand Up @@ -88,9 +90,12 @@ class KVTableTsFailoverWriteIfTest : public integration_tests::YBTableTestBase {
LOG(INFO) << "Sending write: " << op_str;
ASSERT_OK(session->Apply(insert));
session->FlushAsync([insert, op_str](const Status& s){
ASSERT_TRUE(s.ok()) << "Failed to flush write " << op_str << ". Error: " << s;
ASSERT_EQ(insert->response().status(), QLResponsePB::YQL_STATUS_OK)
<< "Failed to write " << op_str;
ASSERT_TRUE(s.ok() || s.IsAlreadyPresent())
<< "Failed to flush write " << op_str << ". Error: " << s;
if (s.ok()) {
ASSERT_EQ(insert->response().status(), QLResponsePB::YQL_STATUS_OK)
<< "Failed to write " << op_str;
}
LOG(INFO) << "Written: " << op_str;
});
}
Expand Down Expand Up @@ -207,6 +212,8 @@ class KVTableTsFailoverWriteIfTest : public integration_tests::YBTableTestBase {

// Test for ENG-3471 - shouldn't run write-if when leader hasn't yet committed all pendings ops.
TEST_F(KVTableTsFailoverWriteIfTest, KillTabletServerDuringReplication) {
FLAGS_combine_batcher_errors = true;

const int32_t key = 0;
const int32_t initial_value = 10000;
const auto small_delay = 100ms;
Expand Down
4 changes: 3 additions & 1 deletion src/yb/integration-tests/mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,9 @@ Status MiniCluster::DoCreateClient(YBClientBuilder* builder,
CHECK(master);
builder->add_master_server_addr(master->bound_rpc_addr_str());
}
return builder->Build(client);
RETURN_NOT_OK(builder->Build(client));

return Status::OK();
}

HostPort MiniCluster::DoGetLeaderMasterBoundRpcAddr() {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/master/mini_master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ Status MiniMaster::StartOnPorts(uint16_t rpc_port, uint16_t web_port,

master_.swap(server);

server::TEST_BreakConnectivity(master_->messenger().get(), index_);
server::TEST_SetupConnectivity(master_->messenger().get(), index_);

tunnel_ = std::make_unique<Tunnel>(&master_->messenger()->io_service());
std::vector<Endpoint> local;
Expand Down
12 changes: 6 additions & 6 deletions src/yb/rpc/messenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ void Messenger::BreakConnectivity(const IpAddress& address, bool incoming, bool
LOG(INFO) << "TEST: Break " << (incoming ? "incoming" : "") << "/" << (outgoing ? "outgoing" : "")
<< " connectivity with: " << address;

std::unique_ptr<CountDownLatch> latch;
boost::optional<CountDownLatch> latch;
{
std::lock_guard<percpu_rwlock> guard(lock_);
if (broken_connectivity_from_.empty() || broken_connectivity_to_.empty()) {
Expand All @@ -287,7 +287,7 @@ void Messenger::BreakConnectivity(const IpAddress& address, bool incoming, bool
inserted_to = broken_connectivity_to_.insert(address).second;
}
if (inserted_from || inserted_to) {
latch.reset(new CountDownLatch(reactors_.size()));
latch.emplace(reactors_.size());
for (auto* reactor : reactors_) {
auto scheduled = reactor->ScheduleReactorTask(MakeFunctorReactorTask(
[&latch, address, incoming, outgoing](Reactor* reactor) {
Expand Down Expand Up @@ -340,15 +340,15 @@ void Messenger::RestoreConnectivity(const IpAddress& address, bool incoming, boo
}
}

bool Messenger::ShouldArtificiallyRejectIncomingCallsFrom(const IpAddress &remote) {
bool Messenger::TEST_ShouldArtificiallyRejectIncomingCallsFrom(const IpAddress &remote) {
if (has_broken_connectivity_.load(std::memory_order_acquire)) {
shared_lock<rw_spinlock> guard(lock_.get_lock());
return broken_connectivity_from_.count(remote) != 0;
}
return false;
}

bool Messenger::ShouldArtificiallyRejectOutgoingCallsTo(const IpAddress &remote) {
bool Messenger::TEST_ShouldArtificiallyRejectOutgoingCallsTo(const IpAddress &remote) {
if (has_broken_connectivity_.load(std::memory_order_acquire)) {
shared_lock<rw_spinlock> guard(lock_.get_lock());
return broken_connectivity_to_.count(remote) != 0;
Expand Down Expand Up @@ -456,7 +456,7 @@ void Messenger::QueueOutboundCall(OutboundCallPtr call) {
const auto& remote = call->conn_id().remote();
Reactor *reactor = RemoteToReactor(remote, call->conn_id().idx());

if (ShouldArtificiallyRejectOutgoingCallsTo(remote.address())) {
if (TEST_ShouldArtificiallyRejectOutgoingCallsTo(remote.address())) {
VLOG(1) << "TEST: Rejected connection to " << remote;
auto scheduled = reactor->ScheduleReactorTask(std::make_shared<NotifyDisconnectedReactorTask>(
call, SOURCE_LOCATION()));
Expand Down Expand Up @@ -502,7 +502,7 @@ void Messenger::Handle(InboundCallPtr call) {

void Messenger::RegisterInboundSocket(
const ConnectionContextFactoryPtr& factory, Socket *new_socket, const Endpoint& remote) {
if (ShouldArtificiallyRejectIncomingCallsFrom(remote.address())) {
if (TEST_ShouldArtificiallyRejectIncomingCallsFrom(remote.address())) {
auto status = new_socket->Close();
VLOG(1) << "TEST: Rejected connection from " << remote
<< ", close status: " << status.ToString();
Expand Down
14 changes: 12 additions & 2 deletions src/yb/rpc/messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,13 @@ class Messenger : public ProxyContext {
return *rpc_metrics_;
}

// Use specified IP address as base address for outbound connections from messenger.
void TEST_SetOutboundIpBase(const IpAddress& value) {
test_outbound_ip_base_ = value;
}

bool TEST_ShouldArtificiallyRejectIncomingCallsFrom(const IpAddress &remote);

private:
FRIEND_TEST(TestRpc, TestConnectionKeepalive);
friend class DelayedTask;
Expand All @@ -279,15 +286,15 @@ class Messenger : public ProxyContext {
// 'retain_self_' for more info.
void AllExternalReferencesDropped();

bool ShouldArtificiallyRejectIncomingCallsFrom(const IpAddress &remote);
bool ShouldArtificiallyRejectOutgoingCallsTo(const IpAddress &remote);
void BreakConnectivity(const IpAddress& address, bool incoming, bool outgoing);
void RestoreConnectivity(const IpAddress& address, bool incoming, bool outgoing);

// Take ownership of the socket via Socket::Release
void RegisterInboundSocket(
const ConnectionContextFactoryPtr& factory, Socket *new_socket, const Endpoint& remote);

bool TEST_ShouldArtificiallyRejectOutgoingCallsTo(const IpAddress &remote);

const std::string name_;

ConnectionContextFactoryPtr connection_context_factory_;
Expand Down Expand Up @@ -384,6 +391,9 @@ class Messenger : public ProxyContext {

std::unique_ptr<RpcMetrics> rpc_metrics_;

// Use this IP address as base address for outbound connections from messenger.
IpAddress test_outbound_ip_base_;

#ifndef NDEBUG
// This is so we can log where exactly a Messenger was instantiated to better diagnose a CHECK
// failure in the destructor (ENG-2838). This can be removed when that is fixed.
Expand Down
Loading

0 comments on commit af4cec4

Please sign in to comment.