Skip to content

Commit

Permalink
(Backport to 2.2) [#5472] CDC Idle Throttling
Browse files Browse the repository at this point in the history
Summary:
In YugabyteDB clusters with bi-directional CDC enabled, we were seeing high CPU
utilization ( ~ 70%) in both the clusters without any workloads running on the clusters.  The CDC
GetChanges call for identifying the new changes in the table is very aggressive to minimize latency
and ensure minimal lag in high volume situations.  Our new heuristic has 2 goals:
  1. The Producer is active, we need to minimize lag and keep up.
  2. The Producer is mostly idle, we don’t want to waste hw resources.
For #2, we add an idle delay after X consecutive requests with no data.  As soon as we get new data
from GetChanges, we reset the delay.

Test Plan: Jenkins: rebase: 2.2

Reviewers: bogdan, kannan, alan, rahuldesirazu, hector

Reviewed By: hector

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D9377
  • Loading branch information
nspiegelberg committed Sep 15, 2020
1 parent a681c93 commit 8816dac
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 4 deletions.
121 changes: 121 additions & 0 deletions ent/src/yb/integration-tests/twodc-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "yb/cdc/cdc_service.proxy.h"
#include "yb/client/client.h"
#include "yb/client/client-test-util.h"
#include "yb/client/meta_cache.h"
#include "yb/client/schema.h"
#include "yb/client/session.h"
#include "yb/client/table.h"
Expand Down Expand Up @@ -73,6 +74,8 @@ DECLARE_bool(TEST_check_broadcast_address);
DECLARE_int32(replication_failure_delay_exponent);
DECLARE_double(TEST_respond_write_failed_probability);
DECLARE_int32(cdc_max_apply_batch_num_records);
DECLARE_int32(async_replication_idle_delay_ms);
DECLARE_int32(async_replication_max_idle_wait);

namespace yb {

Expand Down Expand Up @@ -848,6 +851,124 @@ TEST_P(TwoDCTest, PollWithProducerClusterRestart) {
Destroy();
}


TEST_P(TwoDCTest, PollAndObserveIdleDampening) {
uint32_t replication_factor = 3, tablet_count = 1, master_count = 1;
auto tables = ASSERT_RESULT(
SetUpWithParams({tablet_count}, {tablet_count}, replication_factor, master_count));

ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(),
kUniverseId, {tables[0]} , false ));

// After creating the cluster, make sure all tablets being polled for.
ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1));

// Write some Info and query GetChanges to setup the CDCTabletMetrics.
WriteWorkload(0, 5, producer_client(), tables[0]->name());
ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));

/*****************************************************************
* Find the CDC Tablet Metrics, which we will use for this test. *
*****************************************************************/
// Find the stream.
master::ListCDCStreamsResponsePB stream_resp;
ASSERT_OK(GetCDCStreamForTable(tables[0]->id(), &stream_resp));
ASSERT_EQ(stream_resp.streams_size(), 1);
ASSERT_EQ(stream_resp.streams(0).table_id(), tables[0]->id());
auto stream_id = stream_resp.streams(0).stream_id();

// Find the tablet id for the stream.
TabletId tablet_id;
{
yb::cdc::ListTabletsRequestPB tablets_req;
yb::cdc::ListTabletsResponsePB tablets_resp;
rpc::RpcController rpc;
tablets_req.set_stream_id(stream_id);

auto producer_cdc_proxy = std::make_unique<cdc::CDCServiceProxy>(
&producer_client()->proxy_cache(),
HostPort::FromBoundEndpoint(producer_cluster()->mini_tablet_server(0)->bound_rpc_addr()));
ASSERT_OK(producer_cdc_proxy->ListTablets(tablets_req, &tablets_resp, &rpc));
ASSERT_FALSE(tablets_resp.has_error());
ASSERT_EQ(tablets_resp.tablets_size(), 1);
tablet_id = tablets_resp.tablets(0).tablet_id();
}

// Find the TServer that is hosting this tablet.
tserver::TabletServer* cdc_ts = nullptr;
std::string ts_uuid;
std::mutex data_mutex;
{
ASSERT_OK(WaitFor([this, &tablet_id, &ts_uuid, &data_mutex] {
producer_client()->LookupTabletById(
tablet_id, CoarseMonoClock::Now() + MonoDelta::FromSeconds(3),
[&ts_uuid, &data_mutex](const Result<client::internal::RemoteTabletPtr>& result) {
if (result.ok()) {
std::lock_guard<std::mutex> l(data_mutex);
ts_uuid = (*result)->LeaderTServer()->permanent_uuid();
}
},
client::UseCache::kFalse);
std::lock_guard<std::mutex> l(data_mutex);
return !ts_uuid.empty();
}, MonoDelta::FromSeconds(10), "Get TS for Tablet"));

for (auto ts : producer_cluster()->mini_tablet_servers()) {
if (ts->server()->permanent_uuid() == ts_uuid) {
cdc_ts = ts->server();
break;
}
}
}
ASSERT_NOTNULL(cdc_ts);

// Find the CDCTabletMetric associated with the above pair.
auto cdc_service = dynamic_cast<cdc::CDCServiceImpl*>(
cdc_ts->rpc_server()->service_pool("yb.cdc.CDCService")->TEST_get_service().get());
std::shared_ptr<cdc::CDCTabletMetrics> metrics =
cdc_service->GetCDCTabletMetrics({"", stream_id, tablet_id});

/***********************************
* Setup Complete. Starting test. *
***********************************/
// Log the first heartbeat count for baseline
auto first_heartbeat_count = metrics->rpc_heartbeats_responded->value();
LOG(INFO) << "first_heartbeat_count = " << first_heartbeat_count;

// Write some Info to the producer, which should be consumed quickly by GetChanges.
WriteWorkload(6, 10, producer_client(), tables[0]->name());
ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));

// Sleep for the idle timeout.
SleepFor(MonoDelta::FromMilliseconds(FLAGS_async_replication_idle_delay_ms));
auto active_heartbeat_count = metrics->rpc_heartbeats_responded->value();
LOG(INFO) << "active_heartbeat_count = " << active_heartbeat_count;
// The new heartbeat count should be at least 3 (idle_wait)
ASSERT_GE(active_heartbeat_count - first_heartbeat_count, FLAGS_async_replication_max_idle_wait);

// Now, wait past update request frequency, so we should be using idle timing.
auto multiplier = 2;
SleepFor(MonoDelta::FromMilliseconds(FLAGS_async_replication_idle_delay_ms * multiplier));
auto idle_heartbeat_count = metrics->rpc_heartbeats_responded->value();
ASSERT_LE(idle_heartbeat_count - active_heartbeat_count, multiplier + 1 /*allow subtle race*/);
LOG(INFO) << "idle_heartbeat_count = " << idle_heartbeat_count;

// Write some more data to the producer and call GetChanges with some real data.
WriteWorkload(11, 15, producer_client(), tables[0]->name());
ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name()));

// Sleep for the idle timeout and Verify that the idle behavior ended now that we have new data.
SleepFor(MonoDelta::FromMilliseconds(FLAGS_async_replication_idle_delay_ms));
active_heartbeat_count = metrics->rpc_heartbeats_responded->value();
LOG(INFO) << "active_heartbeat_count = " << active_heartbeat_count;
// The new heartbeat count should be at least 3 (idle_wait)
ASSERT_GE(active_heartbeat_count - idle_heartbeat_count, FLAGS_async_replication_max_idle_wait);

// Cleanup.
ASSERT_OK(DeleteUniverseReplication(kUniverseId));
Destroy();
}

TEST_P(TwoDCTest, ApplyOperations) {
uint32_t replication_factor = NonTsanVsTsan(3, 1);
// Use just one tablet here to more easily catch lower-level write issues with this test.
Expand Down
21 changes: 17 additions & 4 deletions ent/src/yb/tserver/cdc_poller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@
#include "yb/util/logging.h"
#include "yb/util/threadpool.h"

// Similar heuristic to heartbeat_interval in heartbeater.cc.
DEFINE_int32(async_replication_polling_delay_ms, 0,
"How long to delay in ms between applying and repolling.");
"How long to delay in ms between applying and polling.");
DEFINE_int32(async_replication_idle_delay_ms, 100,
"How long to delay between polling when we expect no data at the destination.");
DEFINE_int32(async_replication_max_idle_wait, 3,
"Maximum number of consecutive empty GetChanges until the poller "
"backs off to the idle interval, rather than immediately retrying.");
DEFINE_int32(replication_failure_delay_exponent, 16 /* ~ 2^16/1000 ~= 65 sec */,
"Max number of failures (N) to use when calculating exponential backoff (2^N-1).");
DEFINE_bool(cdc_consumer_use_proxy_forwarding, false,
Expand Down Expand Up @@ -93,9 +99,14 @@ void CDCPoller::DoPoll() {
std::lock_guard<std::mutex> l(data_mutex_);

// determine if we should delay our upcoming poll
if (FLAGS_async_replication_polling_delay_ms > 0 || poll_failures_ > 0) {
int64_t delay = max(FLAGS_async_replication_polling_delay_ms, // user setting
(1 << poll_failures_) -1); // failure backoff
int64_t delay = FLAGS_async_replication_polling_delay_ms; // normal throttling.
if (idle_polls_ >= FLAGS_async_replication_max_idle_wait) {
delay = max(delay, (int64_t)FLAGS_async_replication_idle_delay_ms); // idle backoff.
}
if (poll_failures_ > 0) {
delay = max(delay, (int64_t)1 << poll_failures_); // exponential backoff for failures.
}
if (delay > 0) {
SleepFor(MonoDelta::FromMilliseconds(delay));
}

Expand Down Expand Up @@ -204,6 +215,8 @@ void CDCPoller::DoHandleApplyChanges(cdc::OutputClientResponse response) {

op_id_ = response.last_applied_op_id;

idle_polls_ = (response.processed_record_count == 0) ? idle_polls_ + 1 : 0;

Poll();
}
#undef RETURN_WHEN_OFFLINE
Expand Down
1 change: 1 addition & 0 deletions ent/src/yb/tserver/cdc_poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class CDCPoller {
std::atomic<bool> is_polling_{true};
int poll_failures_ GUARDED_BY(data_mutex_){0};
int apply_failures_ GUARDED_BY(data_mutex_){0};
int idle_polls_ GUARDED_BY(data_mutex_){0};
};

} // namespace enterprise
Expand Down
2 changes: 2 additions & 0 deletions ent/src/yb/tserver/twodc_output_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,10 @@ void TwoDCOutputClient::HandleResponse() {
response.status = error_status_;
if (response.status.ok()) {
response.last_applied_op_id = op_id_;
response.processed_record_count = processed_record_count_;
}
op_id_ = consensus::MinimumOpId();
processed_record_count_ = 0;
}
apply_changes_clbk_(response);
}
Expand Down
1 change: 1 addition & 0 deletions src/yb/cdc/cdc_output_client_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace cdc {
struct OutputClientResponse {
Status status;
consensus::OpId last_applied_op_id;
uint32_t processed_record_count;
};

class CDCOutputClient {
Expand Down

0 comments on commit 8816dac

Please sign in to comment.