Skip to content

Commit

Permalink
#3571: Extract HeartbeatDataProvider from Heartbeater
Browse files Browse the repository at this point in the history
Summary:
We have different logic inside `Heartbeater::Thread::TryHeartbeat` single method,
particularly functionality for gathering tserver metrics for putting them into Heartbeat RPC.
For tablet splitting we also need to put splitting-related information about tablets into Hearbeat
RPC. Overloading `Heartbeater::Thread::TryHeartbeat` with such extra functionality leads to
unnecessary dependencies, tight coupling and unnecessary knowledge of Heartbeater
about other components. Also it makes Heartbeater code larger each time we need to add something new
into Heartbeat.

This revision introduces HeartbeatDataProvider interface and moves out of Heartbeater logic for
gathering tserver metrics into TServerMetricsHeartbeatDataProvider thus reducing Heartbeater
complexity and removing unnecessary dependencies.

In subsequent revision for tablet splitting feature this framework will be used to implement
TabletSplittingDataProvider for sending tablet splitting related data inside Heartbeat RPC.

Fixed lint issues

Test Plan: Jenkins

Reviewers: mikhail, sergei, bogdan

Reviewed By: sergei

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D7913
  • Loading branch information
ttyusupov committed Feb 20, 2020
1 parent c8cb24b commit f4a489e
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 104 deletions.
1 change: 1 addition & 0 deletions src/yb/master/master_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class ChangeEncryptionInfoRequestPB;
class ChangeEncryptionInfoResponsePB;
class IsEncryptionEnabledRequestPB;
class IsEncryptionEnabledResponsePB;
class TSHeartbeatRequestPB;
class TSHeartbeatResponsePB;

} // namespace master
Expand Down
2 changes: 2 additions & 0 deletions src/yb/tserver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ add_dependencies(tserver_util gen_proto)

set(TSERVER_SRCS
heartbeater.cc
heartbeater_factory.cc
metrics_snapshotter.cc
mini_tablet_server.cc
remote_bootstrap_client.cc
Expand All @@ -167,6 +168,7 @@ set(TSERVER_SRCS
tablet_service.cc
ts_tablet_manager.cc
tserver-path-handlers.cc
tserver_metrics_heartbeat_data_provider.cc
${TSERVER_SRCS_EXTENSIONS})

add_library(tserver ${TSERVER_SRCS})
Expand Down
127 changes: 30 additions & 97 deletions src/yb/tserver/heartbeater.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@
#include "yb/common/wire_protocol.h"
#include "yb/gutil/ref_counted.h"
#include "yb/gutil/strings/substitute.h"
#include "yb/master/master.h"
#include "yb/master/master.proxy.h"
#include "yb/master/master_rpc.h"
#include "yb/server/server_base.proxy.h"
#include "yb/server/webserver.h"
#include "yb/tablet/tablet.h"
#include "yb/tserver/tablet_server.h"
#include "yb/tserver/tablet_server_options.h"
Expand All @@ -57,7 +55,6 @@
#include "yb/util/net/net_util.h"
#include "yb/util/status.h"
#include "yb/util/thread.h"
#include "yb/util/mem_tracker.h"

using namespace std::literals;

Expand Down Expand Up @@ -103,7 +100,11 @@ namespace tserver {
// This is basically the "PIMPL" pattern.
class Heartbeater::Thread {
public:
Thread(const TabletServerOptions& opts, TabletServer* server);
Thread(
const TabletServerOptions& opts, TabletServer* server,
std::vector<std::unique_ptr<HeartbeatDataProvider>>&& data_providers);
Thread(const Thread& other) = delete;
void operator=(const Thread& other) = delete;

Status Start();
Status Stop();
Expand All @@ -126,10 +127,9 @@ class Heartbeater::Thread {
CHECKED_STATUS SetupRegistration(master::TSRegistrationPB* reg);
void SetupCommonField(master::TSToMasterCommonPB* common);
bool IsCurrentThread() const;
uint64_t CalculateUptime();

const std::string& LogPrefix() const {
return log_prefix_;
return server_->LogPrefix();
}

server::MasterAddressesPtr get_master_addresses() {
Expand All @@ -148,10 +148,6 @@ class Heartbeater::Thread {
// every new attempt at connecting.
server::MasterAddressesPtr master_addresses_;

// Index of the master we last succesfully obtained the master
// consensus configuration information from.
int last_locate_master_idx_ = 0;

// The server for which we are heartbeating.
TabletServer* const server_;

Expand Down Expand Up @@ -183,31 +179,21 @@ class Heartbeater::Thread {
bool should_run_ = false;
bool heartbeat_asap_ = false;

// The interval for sending tserver metrics in the heartbeat.
const MonoDelta tserver_metrics_interval_ = 5s;
// stores the granularity for updating file sizes and current read/write
MonoTime prev_tserver_metrics_submission_;

// Stores the total read and writes ops for computing iops
uint64_t prev_reads_ = 0;
uint64_t prev_writes_ = 0;

MonoTime start_time_;

rpc::Rpcs rpcs_;

const std::string log_prefix_;

DISALLOW_COPY_AND_ASSIGN(Thread);
std::vector<std::unique_ptr<HeartbeatDataProvider>> data_providers_;
};

////////////////////////////////////////////////////////////
// Heartbeater
////////////////////////////////////////////////////////////

Heartbeater::Heartbeater(const TabletServerOptions& opts, TabletServer* server)
: thread_(new Thread(opts, server)) {
Heartbeater::Heartbeater(
const TabletServerOptions& opts, TabletServer* server,
std::vector<std::unique_ptr<HeartbeatDataProvider>>&& data_providers)
: thread_(new Thread(opts, server, std::move(data_providers))) {
}

Heartbeater::~Heartbeater() {
WARN_NOT_OK(Stop(), "Unable to stop heartbeater thread");
}
Expand All @@ -224,13 +210,13 @@ void Heartbeater::set_master_addresses(server::MasterAddressesPtr master_address
// Heartbeater::Thread
////////////////////////////////////////////////////////////

Heartbeater::Thread::Thread(const TabletServerOptions& opts, TabletServer* server)
Heartbeater::Thread::Thread(
const TabletServerOptions& opts, TabletServer* server,
std::vector<std::unique_ptr<HeartbeatDataProvider>>&& data_providers)
: master_addresses_(opts.GetMasterAddresses()),
server_(server),
cond_(&mutex_),
prev_tserver_metrics_submission_(MonoTime::Now()),
start_time_(MonoTime::Now()),
log_prefix_(Format("P $0: ", server_->permanent_uuid())) {
data_providers_(std::move(data_providers)) {
CHECK_NOTNULL(master_addresses_.get());
CHECK(!master_addresses_->empty());
VLOG_WITH_PREFIX(1) << "Initializing heartbeater thread with master addresses: "
Expand Down Expand Up @@ -344,12 +330,6 @@ int Heartbeater::Thread::GetMillisUntilNextHeartbeat() const {
return FLAGS_heartbeat_interval_ms;
}

// Calculate Uptime
uint64_t Heartbeater::Thread::CalculateUptime() {
MonoDelta delta = MonoTime::Now().GetDeltaSince(start_time_);
uint64_t uptime_seconds = static_cast<uint64_t>(delta.ToSeconds());
return uptime_seconds;
}

Status Heartbeater::Thread::TryHeartbeat() {
master::TSHeartbeatRequestPB req;
Expand All @@ -376,67 +356,8 @@ Status Heartbeater::Thread::TryHeartbeat() {
req.set_num_live_tablets(server_->tablet_manager()->GetNumLiveTablets());
req.set_leader_count(server_->tablet_manager()->GetLeaderCount());

if (prev_tserver_metrics_submission_ + tserver_metrics_interval_ < MonoTime::Now()) {

// Get the total memory used.
size_t mem_usage = MemTracker::GetRootTracker()->GetUpdatedConsumption(true /* force */);
req.mutable_metrics()->set_total_ram_usage(static_cast<int64_t>(mem_usage));
VLOG_WITH_PREFIX(4) << "Total Memory Usage: " << mem_usage;

// Get the Total SST file sizes and set it in the proto buf
std::vector<shared_ptr<yb::tablet::TabletPeer> > tablet_peers;
uint64_t total_file_sizes = 0;
uint64_t uncompressed_file_sizes = 0;
uint64_t num_files = 0;
server_->tablet_manager()->GetTabletPeers(&tablet_peers);
for (auto it = tablet_peers.begin(); it != tablet_peers.end(); it++) {
shared_ptr<yb::tablet::TabletPeer> tablet_peer = *it;
if (tablet_peer) {
auto tablet_class = tablet_peer->shared_tablet();
total_file_sizes += (tablet_class)
? tablet_class->GetCurrentVersionSstFilesSize() : 0;
uncompressed_file_sizes += (tablet_class)
? tablet_class->GetCurrentVersionSstFilesUncompressedSize() : 0;
num_files += (tablet_class) ? tablet_class->GetCurrentVersionNumSSTFiles() : 0;
}
}
req.mutable_metrics()->set_total_sst_file_size(total_file_sizes);
req.mutable_metrics()->set_uncompressed_sst_file_size(uncompressed_file_sizes);
req.mutable_metrics()->set_num_sst_files(num_files);

// Get the total number of read and write operations.
scoped_refptr<Histogram> reads_hist = server_->GetMetricsHistogram
(TabletServerServiceIf::RpcMetricIndexes::kMetricIndexRead);
uint64_t num_reads = (reads_hist != nullptr) ? reads_hist->TotalCount() : 0;

scoped_refptr<Histogram> writes_hist = server_->GetMetricsHistogram
(TabletServerServiceIf::RpcMetricIndexes::kMetricIndexWrite);
uint64_t num_writes = (writes_hist != nullptr) ? writes_hist->TotalCount() : 0;

// Calculate the read and write ops per second.
MonoDelta diff = MonoTime::Now() - prev_tserver_metrics_submission_;
double_t div = diff.ToSeconds();

double rops_per_sec = (div > 0 && num_reads > 0) ?
(static_cast<double>(num_reads - prev_reads_) / div) : 0;

double wops_per_sec = (div > 0 && num_writes > 0) ?
(static_cast<double>(num_writes - prev_writes_) / div) : 0;

prev_reads_ = num_reads;
prev_writes_ = num_writes;
req.mutable_metrics()->set_read_ops_per_sec(rops_per_sec);
req.mutable_metrics()->set_write_ops_per_sec(wops_per_sec);
uint64_t uptime_seconds = CalculateUptime();

req.mutable_metrics()->set_uptime_seconds(uptime_seconds);

prev_tserver_metrics_submission_ = MonoTime::Now();

VLOG_WITH_PREFIX(4) << "Read Ops per second: " << rops_per_sec;
VLOG_WITH_PREFIX(4) << "Write Ops per second: " << wops_per_sec;
VLOG_WITH_PREFIX(4) << "Total SST File Sizes: "<< total_file_sizes;
VLOG_WITH_PREFIX(4) << "Uptime seconds: "<< uptime_seconds;
for (auto& data_provider : data_providers_) {
data_provider->AddData(&req);
}

RpcController rpc;
Expand Down Expand Up @@ -651,5 +572,17 @@ void Heartbeater::Thread::TriggerASAP() {
cond_.Signal();
}


const std::string& HeartbeatDataProvider::LogPrefix() const {
return server_.LogPrefix();
}

void PeriodicalHeartbeatDataProvider::AddData(master::TSHeartbeatRequestPB* req) {
if (prev_run_time_ + period_ < CoarseMonoClock::Now()) {
DoAddData(req);
prev_run_time_ = CoarseMonoClock::Now();
}
}

} // namespace tserver
} // namespace yb
49 changes: 45 additions & 4 deletions src/yb/tserver/heartbeater.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,47 @@

#include "yb/gutil/gscoped_ptr.h"
#include "yb/gutil/macros.h"
#include "yb/master/master_fwd.h"
#include "yb/tserver/tserver_fwd.h"
#include "yb/util/status.h"
#include "yb/util/net/net_util.h"

namespace yb {
namespace tserver {

class TabletServer;
class TabletServerOptions;
// Interface data providers to be used for filling data into heartbeat request.
// Data provider could fill in data into TSHeartbeatRequestPB that will be send by Heartbeater
// after that.
class HeartbeatDataProvider {
public:
explicit HeartbeatDataProvider(TabletServer* server) : server_(*CHECK_NOTNULL(server)) {}
virtual ~HeartbeatDataProvider() {}

// Add data to heartbeat, provider could skip and do nothing if is it too early for example for
// periodical provider.
// Called on every heartbeat from Heartbeater::Thread::TryHeartbeat.
virtual void AddData(master::TSHeartbeatRequestPB* req) = 0;

const std::string& LogPrefix() const;

TabletServer& server() { return server_; }

private:
TabletServer& server_;
};

// Component of the Tablet Server which is responsible for heartbeating to the
// leader master.
//
// TODO: send heartbeats to non-leader masters.
class Heartbeater {
public:
Heartbeater(const TabletServerOptions& options, TabletServer* server);
Heartbeater(
const TabletServerOptions& options, TabletServer* server,
std::vector<std::unique_ptr<HeartbeatDataProvider>>&& data_providers);
Heartbeater(const Heartbeater& other) = delete;
void operator=(const Heartbeater& other) = delete;

CHECKED_STATUS Start();
CHECKED_STATUS Stop();

Expand All @@ -68,9 +93,25 @@ class Heartbeater {
private:
class Thread;
gscoped_ptr<Thread> thread_;
DISALLOW_COPY_AND_ASSIGN(Heartbeater);
};

class PeriodicalHeartbeatDataProvider : public HeartbeatDataProvider {
public:
PeriodicalHeartbeatDataProvider(TabletServer* server, const MonoDelta& period) :
HeartbeatDataProvider(server), period_(period) {}

void AddData(master::TSHeartbeatRequestPB* req) override;

CoarseTimePoint prev_run_time() const { return prev_run_time_; }

private:
virtual void DoAddData(master::TSHeartbeatRequestPB* req) = 0;

MonoDelta period_;
CoarseTimePoint prev_run_time_;
};

} // namespace tserver
} // namespace yb

#endif /* YB_TSERVER_HEARTBEATER_H */
30 changes: 30 additions & 0 deletions src/yb/tserver/heartbeater_factory.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/tserver/heartbeater_factory.h"

#include "yb/tserver/tserver_metrics_heartbeat_data_provider.h"

namespace yb {
namespace tserver {

std::unique_ptr<Heartbeater> CreateHeartbeater(
const TabletServerOptions& options, TabletServer* server) {
std::vector<std::unique_ptr<HeartbeatDataProvider>> data_providers;
data_providers.push_back(
std::make_unique<TServerMetricsHeartbeatDataProvider>(server));
return std::make_unique<Heartbeater>(options, server, std::move(data_providers));
}

} // namespace tserver
} // namespace yb
28 changes: 28 additions & 0 deletions src/yb/tserver/heartbeater_factory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#ifndef YB_TSERVER_HEARTBEATER_FACTORY_H
#define YB_TSERVER_HEARTBEATER_FACTORY_H

#include "yb/tserver/heartbeater.h"

namespace yb {
namespace tserver {

std::unique_ptr<Heartbeater> CreateHeartbeater(
const TabletServerOptions& options, TabletServer* server);

} // namespace tserver
} // namespace yb

#endif // YB_TSERVER_HEARTBEATER_FACTORY_H
6 changes: 4 additions & 2 deletions src/yb/tserver/tablet_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
#include "yb/server/rpc_server.h"
#include "yb/server/webserver.h"
#include "yb/tablet/maintenance_manager.h"
#include "yb/tserver/heartbeater.h"
#include "yb/tserver/heartbeater_factory.h"
#include "yb/tserver/metrics_snapshotter.h"
#include "yb/tserver/tablet_service.h"
#include "yb/tserver/ts_tablet_manager.h"
Expand Down Expand Up @@ -236,7 +236,9 @@ Status TabletServer::Init() {
RETURN_NOT_OK(RpcAndWebServerBase::Init());
RETURN_NOT_OK(path_handlers_->Register(web_server_.get()));

heartbeater_.reset(new Heartbeater(opts_, this));
log_prefix_ = Format("P $0: ", permanent_uuid());

heartbeater_ = CreateHeartbeater(opts_, this);

if (FLAGS_tserver_enable_metrics_snapshotter) {
metrics_snapshotter_.reset(new MetricsSnapshotter(opts_, this));
Expand Down
Loading

0 comments on commit f4a489e

Please sign in to comment.