Skip to content

Commit

Permalink
ENG-3038: Add RPC read buffers to MemTracker
Browse files Browse the repository at this point in the history
Summary:
Added to MemTracker:
1) RPC read buffers.
2) Inbound call request data.
3) Redis parsed protobufs.

Test Plan: Jenkins

Reviewers: amitanand, hector, robert, mikhail

Reviewed By: mikhail

Subscribers: bogdan, bharat, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D4341
  • Loading branch information
spolitov committed Mar 20, 2018
1 parent 451b7f8 commit 0bf401e
Show file tree
Hide file tree
Showing 69 changed files with 381 additions and 120 deletions.
4 changes: 3 additions & 1 deletion src/yb/client/async_initializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ namespace client {
AsyncClientInitialiser::AsyncClientInitialiser(
const std::string& client_name, const uint32_t num_reactors, const uint32_t timeout_seconds,
const std::string& tserver_uuid, const yb::server::ServerBaseOptions* opts,
scoped_refptr<MetricEntity> metric_entity)
scoped_refptr<MetricEntity> metric_entity,
const std::shared_ptr<MemTracker>& parent_mem_tracker)
: client_future_(client_promise_.get_future()) {
client_builder_.set_client_name(client_name);
client_builder_.default_rpc_timeout(MonoDelta::FromSeconds(timeout_seconds));
Expand All @@ -32,6 +33,7 @@ AsyncClientInitialiser::AsyncClientInitialiser(
if (num_reactors > 0) {
client_builder_.set_num_reactors(num_reactors);
}
client_builder_.set_parent_mem_tracker(parent_mem_tracker);

// Build cloud_info_pb.
CloudInfoPB cloud_info_pb;
Expand Down
3 changes: 2 additions & 1 deletion src/yb/client/async_initializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class AsyncClientInitialiser {
AsyncClientInitialiser(const std::string& client_name, const uint32_t num_reactors,
const uint32_t timeout_seconds, const std::string& tserver_uuid,
const server::ServerBaseOptions* opts,
scoped_refptr<MetricEntity> metric_entity);
scoped_refptr<MetricEntity> metric_entity,
const std::shared_ptr<MemTracker>& parent_mem_tracker);

~AsyncClientInitialiser();

Expand Down
16 changes: 8 additions & 8 deletions src/yb/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ class ClientTest: public YBMiniClusterTestBase<MiniCluster> {

void DoApplyWithoutFlushTest(int sleep_micros);

Result<std::shared_ptr<rpc::Messenger>> CreateMessenger(const std::string& name) {
return rpc::MessengerBuilder(name).use_default_mem_tracker().Build();
}

enum WhichServerToKill {
DEAD_MASTER,
DEAD_TSERVER
Expand Down Expand Up @@ -1467,9 +1471,7 @@ TEST_F(ClientTest, TestReplicatedTabletWritesWithLeaderElection) {

// Since we waited before, hopefully all replicas will be up to date
// and we can just promote another replica.
rpc::MessengerBuilder bld("client");
auto client_messenger = bld.Build();
ASSERT_OK(client_messenger);
auto client_messenger = ASSERT_RESULT(CreateMessenger("client"));
gscoped_ptr<consensus::ConsensusServiceProxy> new_leader_proxy;

int new_leader_idx = -1;
Expand All @@ -1488,7 +1490,7 @@ TEST_F(ClientTest, TestReplicatedTabletWritesWithLeaderElection) {
MiniTabletServer* new_leader = cluster_->mini_tablet_server(new_leader_idx);
ASSERT_TRUE(new_leader != nullptr);
new_leader_proxy.reset(
new consensus::ConsensusServiceProxy(*client_messenger,
new consensus::ConsensusServiceProxy(client_messenger,
new_leader->bound_rpc_addr()));

consensus::RunLeaderElectionRequestPB req;
Expand Down Expand Up @@ -1909,16 +1911,14 @@ TEST_F(ClientTest, TestReadFromFollower) {
}
ASSERT_EQ(cluster_->num_tablet_servers() - 1, followers.size());

rpc::MessengerBuilder bld("client");
auto client_messenger = bld.Build();
ASSERT_OK(client_messenger);
auto client_messenger = ASSERT_RESULT(CreateMessenger("client"));
for (const master::TSInfoPB& ts_info : followers) {
// Try to read from followers.
auto endpoint = ParseEndpoint(ts_info.rpc_addresses(0).host(),
ts_info.rpc_addresses(0).port());
ASSERT_TRUE(endpoint.ok());
auto tserver_proxy = std::make_unique<tserver::TabletServerServiceProxy>(
*client_messenger, *endpoint);
client_messenger, *endpoint);

std::unique_ptr<QLRowBlock> rowBlock;
ASSERT_OK(WaitFor([&]() -> bool {
Expand Down
8 changes: 8 additions & 0 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#include "yb/yql/redis/redisserver/redis_constants.h"
#include "yb/yql/redis/redisserver/redis_parser.h"
#include "yb/rpc/messenger.h"
#include "yb/rpc/yb_rpc.h"
#include "yb/util/flag_tags.h"
#include "yb/util/init.h"
#include "yb/util/logging.h"
Expand Down Expand Up @@ -296,6 +297,11 @@ YBClientBuilder& YBClientBuilder::set_tserver_uuid(const TabletServerId& uuid) {
return *this;
}

YBClientBuilder& YBClientBuilder::set_parent_mem_tracker(const MemTrackerPtr& mem_tracker) {
data_->parent_mem_tracker_ = mem_tracker;
return *this;
}

YBClientBuilder& YBClientBuilder::set_skip_master_leader_resolution(bool value) {
data_->skip_master_leader_resolution_ = value;
return *this;
Expand All @@ -310,6 +316,8 @@ Status YBClientBuilder::Build(shared_ptr<YBClient>* client) {
MessengerBuilder builder(data_->client_name_);
builder.set_num_reactors(data_->num_reactors_);
builder.set_metric_entity(data_->metric_entity_);
builder.connection_context_factory()->SetParentMemTracker(
MemTracker::FindOrCreateTracker("RPC Outbound", data_->parent_mem_tracker_));
RETURN_NOT_OK(builder.Build().MoveTo(&c->data_->messenger_));

c->data_->master_server_endpoint_ = data_->master_server_endpoint_;
Expand Down
2 changes: 2 additions & 0 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ class YBClientBuilder {
// proxy clients.
YBClientBuilder& set_tserver_uuid(const TabletServerId& uuid);

YBClientBuilder& set_parent_mem_tracker(const std::shared_ptr<MemTracker>& mem_tracker);

// Creates the client.
//
// The return value may indicate an error in the create operation, or a
Expand Down
2 changes: 2 additions & 0 deletions src/yb/client/client_builder-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class YBClientBuilder::Data {
// aid in detecting local tservers.
TabletServerId uuid_;

std::shared_ptr<MemTracker> parent_mem_tracker_;

bool skip_master_leader_resolution_ = false;
private:
DISALLOW_COPY_AND_ASSIGN(Data);
Expand Down
8 changes: 8 additions & 0 deletions src/yb/client/yb_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ YBRedisWriteOp::YBRedisWriteOp(const shared_ptr<YBTable>& table)

YBRedisWriteOp::~YBRedisWriteOp() {}

size_t YBRedisWriteOp::space_used_by_request() const {
return redis_write_request_->ByteSizeLong();
}

std::string YBRedisWriteOp::ToString() const {
return "REDIS_WRITE " + redis_write_request_->key_value().key();
}
Expand All @@ -114,6 +118,10 @@ YBRedisReadOp::YBRedisReadOp(const shared_ptr<YBTable>& table)

YBRedisReadOp::~YBRedisReadOp() {}

size_t YBRedisReadOp::space_used_by_request() const {
return redis_read_request_->SpaceUsedLong();
}

std::string YBRedisReadOp::ToString() const {
return "REDIS_READ " + redis_read_request_->key_value().key();
}
Expand Down
3 changes: 3 additions & 0 deletions src/yb/client/yb_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class YBRedisOp : public YBOperation {
virtual ~YBRedisOp();

bool has_response() { return redis_response_ ? true : false; }
virtual size_t space_used_by_request() const = 0;

const RedisResponsePB& response() const;

Expand All @@ -138,6 +139,7 @@ class YBRedisWriteOp : public YBRedisOp {
// when the request is sent to tserver. It is restored after response is received from tserver
// (see WriteRpc's constructor).
const RedisWriteRequestPB& request() const { return *redis_write_request_; }
size_t space_used_by_request() const override;

RedisWriteRequestPB* mutable_request() { return redis_write_request_.get(); }

Expand Down Expand Up @@ -174,6 +176,7 @@ class YBRedisReadOp : public YBRedisOp {
// when the request is sent to tserver. It is restored after response is received from tserver
// (see ReadRpc's constructor).
const RedisReadRequestPB& request() const { return *redis_read_request_; }
size_t space_used_by_request() const override;

RedisReadRequestPB* mutable_request() { return redis_read_request_.get(); }

Expand Down
2 changes: 1 addition & 1 deletion src/yb/consensus/raft_consensus_quorum-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class RaftConsensusQuorumTest : public YBTest {
// Build the fsmanagers and logs
for (int i = 0; i < config_.peers_size(); i++) {
shared_ptr<MemTracker> parent_mem_tracker =
MemTracker::CreateTracker(-1, Substitute("peer-$0", i));
MemTracker::CreateTracker(Substitute("peer-$0", i));
parent_mem_trackers_.push_back(parent_mem_tracker);
string test_path = GetTestPath(Substitute("peer-$0-root", i));
FsManagerOpts opts;
Expand Down
4 changes: 2 additions & 2 deletions src/yb/fs/block_manager-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void BlockManagerTest<FileBlockManager>::RunLogContainerPreallocationTest() {

template <>
void BlockManagerTest<FileBlockManager>::RunMemTrackerTest() {
shared_ptr<MemTracker> tracker = MemTracker::CreateTracker(-1, "test tracker");
shared_ptr<MemTracker> tracker = MemTracker::CreateTracker("test tracker");
ASSERT_NO_FATALS(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
tracker,
{ GetTestDataDirectory() },
Expand Down Expand Up @@ -372,7 +372,7 @@ TYPED_TEST(BlockManagerTest, PersistenceTest) {
// on-disk metadata should still be clean.
gscoped_ptr<BlockManager> new_bm(this->CreateBlockManager(
scoped_refptr<MetricEntity>(),
MemTracker::CreateTracker(-1, "other tracker"),
MemTracker::CreateTracker("other tracker"),
{ GetTestDataDirectory() }));
ASSERT_OK(new_bm->Open());

Expand Down
3 changes: 1 addition & 2 deletions src/yb/fs/file_block_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,7 @@ FileBlockManager::FileBlockManager(Env* env, const BlockManagerOptions& opts)
read_only_(opts.read_only),
root_paths_(opts.root_paths),
rand_(GetRandomSeed32()),
mem_tracker_(MemTracker::CreateTracker(-1,
"file_block_manager",
mem_tracker_(MemTracker::CreateTracker("file_block_manager",
opts.parent_mem_tracker)) {
DCHECK_GT(root_paths_.size(), 0);
if (opts.metric_entity) {
Expand Down
4 changes: 2 additions & 2 deletions src/yb/integration-tests/create-table-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ class CreateTableStressTest : public YBMiniClusterTestBase<MiniCluster> {
.add_master_server_addr(cluster_->mini_master()->bound_rpc_addr_str())
.Build(&client_));

auto messenger = MessengerBuilder("stress-test-msgr").set_num_reactors(1).Build();
ASSERT_OK(messenger.MoveTo(&messenger_));
messenger_ = ASSERT_RESULT(
MessengerBuilder("stress-test-msgr").set_num_reactors(1).use_default_mem_tracker().Build());
master_proxy_.reset(new MasterServiceProxy(messenger_,
cluster_->mini_master()->bound_rpc_addr()));
ASSERT_OK(CreateTabletServerMap(master_proxy_.get(), messenger_, &ts_map_));
Expand Down
8 changes: 6 additions & 2 deletions src/yb/integration-tests/external_mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "yb/server/server_base.pb.h"
#include "yb/server/server_base.proxy.h"
#include "yb/tserver/tserver_service.proxy.h"
#include "yb/rpc/connection_context.h"
#include "yb/rpc/messenger.h"
#include "yb/master/sys_catalog.h"
#include "yb/util/async_util.h"
Expand Down Expand Up @@ -209,8 +210,11 @@ Status ExternalMiniCluster::Start() {
RETURN_NOT_OK(HandleOptions());
FLAGS_replication_factor = opts_.num_masters;

RETURN_NOT_OK_PREPEND(rpc::MessengerBuilder("minicluster-messenger")
.set_num_reactors(1).Build().MoveTo(&messenger_),
rpc::MessengerBuilder builder("minicluster-messenger");
builder.set_num_reactors(1);
builder.connection_context_factory()->SetParentMemTracker(
MemTracker::FindOrCreateTracker("minicluster"));
RETURN_NOT_OK_PREPEND(builder.Build().MoveTo(&messenger_),
"Failed to start Messenger for minicluster");

Status s = Env::Default()->CreateDir(data_root_);
Expand Down
2 changes: 1 addition & 1 deletion src/yb/integration-tests/master_sysnamespace-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class MasterSysNamespaceTest : public YBTest {
cluster_.reset(new MiniCluster(env_.get(), opts));
ASSERT_OK(cluster_->Start());
rpc::MessengerBuilder bld("Client");
ASSERT_OK(bld.Build().MoveTo(&client_messenger_));
client_messenger_ = ASSERT_RESULT(bld.use_default_mem_tracker().Build());
proxy_.reset(new MasterServiceProxy(client_messenger_,
cluster_->leader_mini_master()->bound_rpc_addr()));
}
Expand Down
2 changes: 1 addition & 1 deletion src/yb/integration-tests/placement_info-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class PlacementInfoTest : public YBTest {
YBClientBuilder builder;
ASSERT_OK(cluster_->CreateClient(&builder, &client_));
rpc::MessengerBuilder bld("Client");
ASSERT_OK(bld.Build().MoveTo(&client_messenger_));
client_messenger_ = ASSERT_RESULT(bld.use_default_mem_tracker().Build());
proxy_.reset(new master::MasterServiceProxy(client_messenger_,
cluster_->leader_mini_master()->bound_rpc_addr()));

Expand Down
9 changes: 4 additions & 5 deletions src/yb/integration-tests/raft_consensus-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,13 +479,12 @@ TEST_F(RaftConsensusITest, TestGetPermanentUuid) {
const string expected_uuid = leader->instance_id.permanent_uuid();

rpc::MessengerBuilder builder("test builder");
builder.set_num_reactors(1);
auto messenger = builder.Build();
ASSERT_OK(messenger);
builder.set_num_reactors(1).use_default_mem_tracker();
auto messenger = ASSERT_RESULT(builder.Build());

// Set a decent timeout for allowing the masters to find eachother.
const uint64_t kTimeoutMs = 30000;
ASSERT_OK(consensus::SetPermanentUuidForRemotePeer(*messenger, kTimeoutMs, &peer));
ASSERT_OK(consensus::SetPermanentUuidForRemotePeer(messenger, kTimeoutMs, &peer));
ASSERT_EQ(expected_uuid, peer.permanent_uuid());
}

Expand Down Expand Up @@ -2311,7 +2310,7 @@ TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) {
master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");

// Very low memory limit to ease testing.
ts_flags.push_back("--memory_limit_hard_bytes=4194304");
ts_flags.push_back(Format("--memory_limit_hard_bytes=$0", 16_MB));

// Don't let transaction memory tracking get in the way.
ts_flags.push_back("--tablet_operation_memory_limit_mb=-1");
Expand Down
2 changes: 1 addition & 1 deletion src/yb/integration-tests/ts_tablet_manager-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void TsTabletManagerITest::SetUp() {
YBTest::SetUp();

MessengerBuilder bld("client");
ASSERT_OK(bld.Build().MoveTo(&client_messenger_));
client_messenger_ = ASSERT_RESULT(bld.use_default_mem_tracker().Build());

MiniClusterOptions opts;
opts.num_tablet_servers = kNumReplicas;
Expand Down
3 changes: 1 addition & 2 deletions src/yb/master/master-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ class MasterTest : public YBTest {
ASSERT_OK(mini_master_->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests());

// Create a client proxy to it.
MessengerBuilder bld("Client");
ASSERT_OK(bld.Build().MoveTo(&client_messenger_));
client_messenger_ = ASSERT_RESULT(MessengerBuilder("Client").use_default_mem_tracker().Build());
proxy_.reset(new MasterServiceProxy(client_messenger_, mini_master_->bound_rpc_addr()));

// Create the default test namespace.
Expand Down
1 change: 1 addition & 0 deletions src/yb/rpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ set(YRPC_SRCS
acceptor.cc
binary_call_parser.cc
connection.cc
connection_context.cc
growable_buffer.cc
inbound_call.cc
io_thread_pool.cc
Expand Down
3 changes: 2 additions & 1 deletion src/yb/rpc/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ Connection::Connection(Reactor* reactor,
remote_(remote),
direction_(direction),
last_activity_time_(CoarseMonoClock::Now()),
read_buffer_(FLAGS_rpc_initial_buffer_size, context->BufferLimit()),
read_buffer_(
FLAGS_rpc_initial_buffer_size, context->BufferLimit(), context->GetMemTracker()),
context_(std::move(context)) {
const auto metric_entity = reactor->messenger()->metric_entity();
handler_latency_outbound_transfer_ = metric_entity ?
Expand Down
28 changes: 28 additions & 0 deletions src/yb/rpc/connection_context.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/rpc/connection_context.h"

#include "yb/util/mem_tracker.h"

namespace yb {
namespace rpc {

void ConnectionContextFactory::SetParentMemTracker(
const std::shared_ptr<MemTracker>& parent_mem_tracker) {
read_buffer_tracker_ = MemTracker::FindOrCreateTracker("Read Buffer", parent_mem_tracker);
call_tracker_ = MemTracker::FindOrCreateTracker("Call", parent_mem_tracker);
}

} // namespace rpc
} // namespace yb
Loading

0 comments on commit 0bf401e

Please sign in to comment.