Skip to content

Commit

Permalink
(#418) ENG-2810: Rate limiter for remote bootstrap
Browse files Browse the repository at this point in the history
Summary:
Currently we don't have a mechanism to prioritize network traffic. So if a tablet gets moved during a period of high read/writes ops, our performance will degrade during remote bootstraps.

With this change we provide a throttling mechanism that can be controlled with the flag remote_boostrap_rate_limit_bytes_per_sec. This is a per-node global rate for receiving and sending. So no matter how many concurrent remote bootstraps are happening, the max bandwidth utilization for sending or receiving will not exceed the  specified limit. For example, if remote_boostrap_rate_limit_bytes_per_sec is 10MB/s, then the maximum bandwidth for sending data to other nodes will not exceed 10MB/s. At the same time, the maximum bandwidth for receiving data in the same node will not exceed 10MB/s. So the maximum bandwidth per node allocated for remote bootstrap traffic will be 2 * remote_boostrap_rate_limit_bytes_per_sec.

To disable throttling,  set remote_boostrap_rate_limit_bytes_per_sec = 0.

Test Plan: new unit tests

Reviewers: bharat, venkatesh, mikhail, sergei

Reviewed By: bharat, venkatesh, sergei

Subscribers: kannan, ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D4608
  • Loading branch information
hectorgcr committed Aug 24, 2018
1 parent e4a55b9 commit 154810c
Show file tree
Hide file tree
Showing 11 changed files with 629 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/yb/master/async_rpc_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ class CommonInfoForRaftTask : public RetryingTSRpcTask {
virtual std::string change_config_ts_uuid() const { return change_config_ts_uuid_; }

protected:
// Used by SendRequest. Return's false if RPC should not be sent.
// Used by SendOrReceiveData. Return's false if RPC should not be sent.
virtual bool PrepareRequest(int attempt) = 0;

TabletServerId permanent_uuid() const;
Expand Down
67 changes: 63 additions & 4 deletions src/yb/tserver/remote_bootstrap_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@
#include "yb/util/env_util.h"
#include "yb/util/fault_injection.h"
#include "yb/util/flag_tags.h"
#include "yb/util/logging.h"
#include "yb/util/net/net_util.h"
#include "yb/util/net/rate_limiter.h"
#include "yb/util/size_literals.h"

using namespace yb::size_literals;
Expand Down Expand Up @@ -104,6 +104,12 @@ DEFINE_test_flag(int32, simulate_long_remote_bootstrap_sec, 0,
"We use this for testing a scenario where a remote bootstrap takes longer than "
"follower_unavailable_considered_failed_sec seconds");

DEFINE_int64(remote_boostrap_rate_limit_bytes_per_sec, 50_MB,
"Maximum transmission rate during a remote bootstrap. This is across all the remote "
"bootstrap sessions for which this process is acting as a sender or receiver. So "
"the total limit will be 2 * remote_boostrap_rate_limit_bytes_per_sec because a "
"tserver or master can act both as a sender and receiver at the same time.");

// RETURN_NOT_OK_PREPEND() with a remote-error unwinding step.
#define RETURN_NOT_OK_UNWIND_PREPEND(status, controller, msg) \
RETURN_NOT_OK_PREPEND(UnwindRemoteError(status, controller), msg)
Expand All @@ -130,6 +136,7 @@ using tablet::TabletStatusListener;
using tablet::TabletSuperBlockPB;

constexpr int kBytesReservedForMessageHeaders = 16384;
std::atomic<int32_t> RemoteBootstrapClient::n_started_(0);

RemoteBootstrapClient::RemoteBootstrapClient(std::string tablet_id,
FsManager* fs_manager,
Expand All @@ -156,6 +163,12 @@ RemoteBootstrapClient::~RemoteBootstrapClient() {
<< " in RemoteBootstrapClient destructor.";
WARN_NOT_OK(EndRemoteSession(), "Unable to close remote bootstrap session " + session_id_);
}
if (started_) {
auto old_count = n_started_.fetch_sub(1, std::memory_order_acq_rel);
if (old_count < 1) {
LOG(DFATAL) << "Invalid number of remote bootstrap sessions: " << old_count;
}
}
}

Status RemoteBootstrapClient::SetTabletToReplace(const scoped_refptr<TabletMetadata>& meta,
Expand Down Expand Up @@ -348,6 +361,12 @@ Status RemoteBootstrapClient::Start(const string& bootstrap_peer_uuid,
}

started_ = true;
auto old_count = n_started_.fetch_add(1, std::memory_order_acq_rel);
if (old_count < 0) {
LOG(DFATAL) << "Invalid number of remote bootstrap sessions: " << old_count;
n_started_ = 0;
}

if (meta) {
*meta = meta_;
}
Expand Down Expand Up @@ -607,7 +626,11 @@ Status RemoteBootstrapClient::DownloadRocksDBFiles() {
DataIdPB data_id;
data_id.set_type(DataIdPB::ROCKSDB_FILE);
for (auto const& file_pb : new_sb->rocksdb_files()) {
auto start = MonoTime::Now();
RETURN_NOT_OK(DownloadFile(file_pb, rocksdb_dir, &data_id));
auto elapsed = MonoTime::Now().GetDeltaSince(start);
LOG(INFO) << "Downloaded file " << file_pb.name() << " of size " << file_pb.size_bytes()
<< " in " << elapsed.ToSeconds() << " seconds";
}

// To avoid adding new file type to remote bootstrap we move intents as subdir of regular DB.
Expand All @@ -634,9 +657,15 @@ Status RemoteBootstrapClient::DownloadWAL(uint64_t wal_segment_seqno) {
gscoped_ptr<WritableFile> writer;
RETURN_NOT_OK_PREPEND(fs_manager_->env()->NewWritableFile(opts, dest_path, &writer),
"Unable to open file for writing");

auto start = MonoTime::Now();
RETURN_NOT_OK_PREPEND(DownloadFile(data_id, writer.get()),
Substitute("Unable to download WAL segment with seq. number $0",
wal_segment_seqno));
auto elapsed = MonoTime::Now().GetDeltaSince(start);
LOG_WITH_PREFIX(INFO) << "Downloaded WAL segment with seq. number " << wal_segment_seqno
<< " of size " << writer->Size() << " in " << elapsed.ToSeconds()
<< " seconds";
return Status::OK();
}

Expand Down Expand Up @@ -693,6 +722,23 @@ Status RemoteBootstrapClient::DownloadFile(const DataIdPB& data_id,
int32_t max_length = std::min(FLAGS_remote_bootstrap_max_chunk_size,
FLAGS_rpc_max_message_size - kBytesReservedForMessageHeaders);

std::unique_ptr<RateLimiter> rate_limiter;

if (FLAGS_remote_boostrap_rate_limit_bytes_per_sec > 0) {
static auto rate_updater = []() {
if (n_started_.load(std::memory_order_acquire) < 1) {
YB_LOG_EVERY_N(ERROR, 100) << "Invalid number of remote bootstrap sessions: " << n_started_;
return static_cast<uint64_t>(FLAGS_remote_boostrap_rate_limit_bytes_per_sec);
}
return static_cast<uint64_t>(FLAGS_remote_boostrap_rate_limit_bytes_per_sec / n_started_);
};

rate_limiter = std::make_unique<RateLimiter>(rate_updater);
} else {
// Inactive RateLimiter.
rate_limiter = std::make_unique<RateLimiter>();
}

rpc::RpcController controller;
controller.set_timeout(MonoDelta::FromMilliseconds(session_idle_timeout_millis_));
FetchDataRequestPB req;
Expand All @@ -703,25 +749,38 @@ Status RemoteBootstrapClient::DownloadFile(const DataIdPB& data_id,
req.set_session_id(session_id_);
req.mutable_data_id()->CopyFrom(data_id);
req.set_offset(offset);
if (rate_limiter->active()) {
auto max_size = rate_limiter->GetMaxSizeForNextTransmission();
if (max_size > std::numeric_limits<decltype(max_length)>::max()) {
max_size = std::numeric_limits<decltype(max_length)>::max();
}
max_length = std::min(max_length, decltype(max_length)(max_size));
}
req.set_max_length(max_length);

FetchDataResponsePB resp;
RETURN_NOT_OK_UNWIND_PREPEND(proxy_->FetchData(req, &resp, &controller),
controller,
"Unable to fetch data from remote");
auto status = rate_limiter->SendOrReceiveData([this, &req, &resp, &controller]() {
return proxy_->FetchData(req, &resp, &controller);
}, [&resp]() { return resp.ByteSize(); });
RETURN_NOT_OK_UNWIND_PREPEND(status, controller, "Unable to fetch data from remote");

// Sanity-check for corruption.
RETURN_NOT_OK_PREPEND(VerifyData(offset, resp.chunk()),
Substitute("Error validating data item $0", data_id.ShortDebugString()));

// Write the data.
RETURN_NOT_OK(appendable->Append(resp.chunk().data()));
VLOG(3) << "resp size: " << resp.ByteSize()
<< ", chunk size: " << resp.chunk().data().size();

if (offset + resp.chunk().data().size() == resp.chunk().total_data_length()) {
done = true;
}
offset += resp.chunk().data().size();
}

VLOG(2) << "Transmission rate: " << rate_limiter->GetRate();

return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions src/yb/tserver/remote_bootstrap_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#ifndef YB_TSERVER_REMOTE_BOOTSTRAP_CLIENT_H
#define YB_TSERVER_REMOTE_BOOTSTRAP_CLIENT_H

#include <atomic>
#include <string>
#include <memory>
#include <vector>
Expand Down Expand Up @@ -196,6 +197,9 @@ class RemoteBootstrapClient {

// State flags that enforce the progress of remote bootstrap.
bool started_; // Session started.
// Total number of remote bootstrap sessions. Used to calculate the transmission rate across all
// the sessions.
static std::atomic<int32_t> n_started_;
bool downloaded_wal_; // WAL segments downloaded.
bool downloaded_blocks_; // Data blocks downloaded.
bool downloaded_rocksdb_files_;
Expand Down
31 changes: 21 additions & 10 deletions src/yb/tserver/remote_bootstrap_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void RemoteBootstrapServiceImpl::BeginRemoteBootstrapSession(

scoped_refptr<RemoteBootstrapSessionClass> session;
{
boost::lock_guard<simple_spinlock> l(sessions_lock_);
std::lock_guard<simple_spinlock> l(sessions_lock_);
if (!FindCopy(sessions_, session_id, &session)) {
LOG(INFO) << "Beginning new remote bootstrap session on tablet " << tablet_id
<< " from peer " << requestor_uuid << " at " << context.requestor_string()
Expand All @@ -168,6 +168,11 @@ void RemoteBootstrapServiceImpl::BeginRemoteBootstrapSession(
Substitute("Error initializing remote bootstrap session for tablet $0",
tablet_id));
InsertOrDie(&sessions_, session_id, session);
auto new_nsessions = nsessions_.fetch_add(1, std::memory_order_acq_rel) + 1;
if (new_nsessions != sessions_.size()) {
LOG(DFATAL) << "nsessions_ " << new_nsessions
<< " != number of sessions " << sessions_.size();
}
} else {
LOG(INFO) << "Re-initializing existing remote bootstrap session on tablet " << tablet_id
<< " from peer " << requestor_uuid << " at " << context.requestor_string()
Expand Down Expand Up @@ -200,7 +205,7 @@ void RemoteBootstrapServiceImpl::CheckSessionActive(

// Look up and validate remote bootstrap session.
scoped_refptr<RemoteBootstrapSessionClass> session;
boost::lock_guard<simple_spinlock> l(sessions_lock_);
std::lock_guard<simple_spinlock> l(sessions_lock_);
RemoteBootstrapErrorPB::Code app_error;
Status status = FindSessionUnlocked(session_id, &app_error, &session);
if (status.ok()) {
Expand Down Expand Up @@ -228,6 +233,7 @@ Status RemoteBootstrapServiceImpl::GetDataFilePiece(
string* data,
int64_t* total_data_length,
RemoteBootstrapErrorPB::Code* error_code) {

switch (data_id.type()) {
case DataIdPB::BLOCK: {
// Fetching a data block chunk.
Expand Down Expand Up @@ -268,21 +274,25 @@ void RemoteBootstrapServiceImpl::FetchData(const FetchDataRequestPB* req,
FetchDataResponsePB* resp,
rpc::RpcContext context) {
const string& session_id = req->session_id();

// Look up and validate remote bootstrap session.
scoped_refptr<RemoteBootstrapSessionClass> session;
{
boost::lock_guard<simple_spinlock> l(sessions_lock_);
std::lock_guard<simple_spinlock> l(sessions_lock_);
RemoteBootstrapErrorPB::Code app_error;
RPC_RETURN_NOT_OK(FindSessionUnlocked(session_id, &app_error, &session),
app_error, "No such session");
ResetSessionExpirationUnlocked(session_id);
}

session->EnsureRateLimiterIsInitialized();

MAYBE_FAULT(FLAGS_fault_crash_on_handle_rb_fetch_data);

uint64_t offset = req->offset();
int64_t client_maxlen = req->max_length();

VLOG(3) << " rate limiter max len: " << session->rate_limiter().GetMaxSizeForNextTransmission();
int64_t client_maxlen = std::min(static_cast<uint64_t>(req->max_length()),
session->rate_limiter().GetMaxSizeForNextTransmission());
const DataIdPB& data_id = req->data_id();
RemoteBootstrapErrorPB::Code error_code = RemoteBootstrapErrorPB::UNKNOWN_ERROR;
RPC_RETURN_NOT_OK(ValidateFetchRequestDataId(data_id, &error_code, session),
Expand All @@ -296,12 +306,12 @@ void RemoteBootstrapServiceImpl::FetchData(const FetchDataRequestPB* req,
error_code, "Unable to get piece of data file");

data_chunk->set_total_data_length(total_data_length);
session->rate_limiter().UpdateDataSizeAndMaybeSleep(data->size());
data_chunk->set_offset(offset);

// Calculate checksum.
uint32_t crc32 = Crc32c(data->data(), data->length());
data_chunk->set_crc32(crc32);

context.RespondSuccess();
}

Expand All @@ -310,13 +320,13 @@ void RemoteBootstrapServiceImpl::EndRemoteBootstrapSession(
EndRemoteBootstrapSessionResponsePB* resp,
rpc::RpcContext context) {
{
boost::lock_guard<simple_spinlock> l(sessions_lock_);
std::lock_guard<simple_spinlock> l(sessions_lock_);
RemoteBootstrapErrorPB::Code app_error;
LOG(INFO) << "Request end of remote bootstrap session " << req->session_id()
<< " received from " << context.requestor_string();
RPC_RETURN_NOT_OK(DoEndRemoteBootstrapSessionUnlocked(req->session_id(), req->is_success(),
&app_error),
app_error, "No such session");
LOG(INFO) << "Request end of remote bootstrap session " << req->session_id()
<< " received from " << context.requestor_string();
}
context.RespondSuccess();
}
Expand Down Expand Up @@ -453,14 +463,15 @@ Status RemoteBootstrapServiceImpl::DoEndRemoteBootstrapSessionUnlocked(
LOG(INFO) << "Ending remote bootstrap session " << session_id << " on tablet "
<< session->tablet_id() << " with peer " << session->requestor_uuid();
CHECK_EQ(1, sessions_.erase(session_id));
nsessions_.fetch_sub(1, std::memory_order_acq_rel);
CHECK_EQ(1, session_expirations_.erase(session_id));

return Status::OK();
}

void RemoteBootstrapServiceImpl::EndExpiredSessions() {
do {
boost::lock_guard<simple_spinlock> l(sessions_lock_);
std::lock_guard<simple_spinlock> l(sessions_lock_);
MonoTime now = MonoTime::Now();

vector<string> expired_session_ids;
Expand Down
1 change: 1 addition & 0 deletions src/yb/tserver/remote_bootstrap_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class RemoteBootstrapServiceImpl : public RemoteBootstrapServiceIf {
// Protects sessions_ and session_expirations_ maps.
mutable simple_spinlock sessions_lock_;
SessionMap sessions_;
std::atomic<int32> nsessions_ = {0};
MonoTimeMap session_expirations_;

// Session expiration thread.
Expand Down
42 changes: 40 additions & 2 deletions src/yb/tserver/remote_bootstrap_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@
#include "yb/server/metadata.h"
#include "yb/tablet/tablet.h"
#include "yb/tablet/tablet_peer.h"
#include "yb/util/size_literals.h"
#include "yb/util/stopwatch.h"
#include "yb/util/trace.h"

DECLARE_int32(rpc_max_message_size);
DECLARE_int64(remote_boostrap_rate_limit_bytes_per_sec);

namespace yb {
namespace tserver {
Expand All @@ -68,14 +70,15 @@ using tablet::TabletSuperBlockPB;

RemoteBootstrapSession::RemoteBootstrapSession(
const std::shared_ptr<TabletPeer>& tablet_peer, std::string session_id,
std::string requestor_uuid, FsManager* fs_manager)
std::string requestor_uuid, FsManager* fs_manager, const std::atomic<int>* nsessions)
: tablet_peer_(tablet_peer),
session_id_(std::move(session_id)),
requestor_uuid_(std::move(requestor_uuid)),
fs_manager_(fs_manager),
blocks_deleter_(&blocks_),
logs_deleter_(&logs_),
succeeded_(false) {}
succeeded_(false),
nsessions_(nsessions) {}

RemoteBootstrapSession::~RemoteBootstrapSession() {
// No lock taken in the destructor, should only be 1 thread with access now.
Expand Down Expand Up @@ -279,6 +282,8 @@ Status RemoteBootstrapSession::Init() {
RETURN_NOT_OK(tablet_peer_->log_anchor_registry()->UpdateRegistration(
last_logged_opid.index, anchor_owner_token, &log_anchor_));

start_time_ = MonoTime::Now();

return Status::OK();
}

Expand Down Expand Up @@ -569,5 +574,38 @@ bool RemoteBootstrapSession::Succeeded() {
return succeeded_;
}

void RemoteBootstrapSession::EnsureRateLimiterIsInitialized() {
if (!rate_limiter_.IsInitialized()) {
InitRateLimiter();
}
}


void RemoteBootstrapSession::InitRateLimiter() {
if (FLAGS_remote_boostrap_rate_limit_bytes_per_sec > 0 && nsessions_) {
// Calling SetTargetRateUpdater will activate the rate limiter.
rate_limiter_.SetTargetRateUpdater([this]() -> uint64_t {
DCHECK_GT(FLAGS_remote_boostrap_rate_limit_bytes_per_sec, 0);
if (FLAGS_remote_boostrap_rate_limit_bytes_per_sec <= 0) {
YB_LOG_EVERY_N(ERROR, 1000)
<< "Invalid value for remote_boostrap_rate_limit_bytes_per_sec: "
<< FLAGS_remote_boostrap_rate_limit_bytes_per_sec;
// Since the rate limiter is initialized, it's expected that the value of
// FLAGS_remote_boostrap_rate_limit_bytes_per_sec is greater than 0. Since this is not the
// case, we'll log an error, and set the rate to 50 MB/s.
return 50_MB;
}
auto nsessions = nsessions_->load(std::memory_order_acquire);
if (nsessions > 0) {
return FLAGS_remote_boostrap_rate_limit_bytes_per_sec / nsessions;
} else {
LOG(DFATAL) << "Invalid number of sessions: " << nsessions;
return FLAGS_remote_boostrap_rate_limit_bytes_per_sec;
}
});
}
rate_limiter_.Init();
}

} // namespace tserver
} // namespace yb
Loading

0 comments on commit 154810c

Please sign in to comment.