Skip to content

Commit

Permalink
yugabyte#2050: Increase WAL retention time when CDC stream is setup o…
Browse files Browse the repository at this point in the history
…n table

Summary: When a CDC stream is created, we modify the WAL retention time for all the table's tablets. This is done with an AlterTable request. The new time is controlled by the gflag cdc_wal_retention_time_secs, and it is the same for any table for which a CDC stream is created.

Test Plan: Verify that all the tests that create a CDC stream succeed

Reviewers: bogdan, rahuldesirazu, neha

Reviewed By: neha

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D7115
  • Loading branch information
hectorgcr authored and d-uspenskiy committed Sep 5, 2019
1 parent 872e0dd commit c262d45
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 13 deletions.
3 changes: 2 additions & 1 deletion ent/src/yb/cdc/CMakeLists-include.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ target_link_libraries(cdc
tablet
yb_util
ql_util
gutil)
gutil
yb_client)

27 changes: 22 additions & 5 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "yb/common/wire_protocol.h"
#include "yb/consensus/raft_consensus.h"
#include "yb/client/table.h"
#include "yb/client/table_alterer.h"
#include "yb/client/table_handle.h"
#include "yb/client/session.h"
#include "yb/client/yb_table_name.h"
Expand All @@ -48,6 +49,10 @@ TAG_FLAG(cdc_ybclient_reactor_threads, advanced);
DEFINE_test_flag(bool, mock_get_changes_response_for_consumer_testing, false,
"Mock a successful response to consumer before stream id integration is set up.");

DEFINE_int32(cdc_wal_retention_time_secs, 4 * 3600,
"WAL retention time in seconds to be used for tables for which a CDC stream was "
"created.");

namespace yb {
namespace cdc {

Expand Down Expand Up @@ -155,26 +160,38 @@ void CDCServiceImpl::CreateCDCStream(const CreateCDCStreamRequestPB* req,
// Check if YSQL table has a primary key. CQL tables always have a user specified primary key.
RPC_CHECK_AND_RETURN_ERROR(
table->table_type() != client::YBTableType::PGSQL_TABLE_TYPE ||
YsqlTableHasPrimaryKey(table->schema()),
YsqlTableHasPrimaryKey(table->schema()),
STATUS(InvalidArgument, "Cannot setup CDC on table without primary key"),
resp->mutable_error(),
CDCErrorPB::INVALID_REQUEST,
context);
}

std::unique_ptr<client::YBTableAlterer> table_alterer(
async_client_init_->client()->NewTableAlterer(table->name()));

// Increase WAL retention. Fail the request if we fail to set the WAL retention.
auto wal_retention_secs = 0;
if (req->has_retention_sec()) {
wal_retention_secs = req->retention_sec();
} else {
wal_retention_secs = static_cast<uint32>(FLAGS_cdc_wal_retention_time_secs);
}

auto status = table_alterer->SetWalRetentionSecs(wal_retention_secs)->Alter();
RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);

std::unordered_map<std::string, std::string> options;
options.reserve(3);
options.emplace(kRecordType, CDCRecordType_Name(req->record_type()));
options.emplace(kRecordFormat, CDCRecordFormat_Name(req->record_format()));
if (req->has_retention_sec()) {
options.emplace(kRetentionSec, std::to_string(req->retention_sec()));
}
options.emplace(kRetentionSec, std::to_string(wal_retention_secs));

auto result = async_client_init_->client()->CreateCDCStream(req->table_id(), options);
RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(),
CDCErrorPB::INTERNAL_ERROR, context);

resp->set_stream_id(*result);
// TODO: Increase retention for WAL.

// Add stream to cache.
AddStreamMetadataToCache(*result, std::make_shared<StreamMetadata>(req->table_id(),
Expand Down
69 changes: 63 additions & 6 deletions ent/src/yb/integration-tests/cdc_service-int-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "yb/yql/cql/ql/util/errcodes.h"
#include "yb/yql/cql/ql/util/statement_result.h"

DECLARE_int32(cdc_wal_retention_time_secs);

namespace yb {
namespace cdc {

Expand Down Expand Up @@ -73,6 +75,9 @@ class CDCServiceTest : public YBMiniClusterTestBase<MiniCluster> {
void CreateTable(int num_tablets, TableHandle* table);
void GetTablet(std::string* tablet_id);

void VerifyWalRetentionTime(const TableId& table_id,
const uint32_t expected_wal_retention_secs);

std::unique_ptr<CDCServiceProxy> cdc_proxy_;
std::unique_ptr<client::YBClient> client_;
TableHandle table_;
Expand All @@ -93,6 +98,25 @@ void CDCServiceTest::CreateTable(int num_tablets, TableHandle* table) {
ASSERT_OK(table->Create(kTableName, num_tablets, client_.get(), &builder));
}

void CDCServiceTest::VerifyWalRetentionTime(const TableId& table_id,
const uint32_t expected_wal_retention_secs) {
vector<std::shared_ptr<tablet::TabletPeer> > peers;
cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletPeers(&peers);

bool wal_retention_time_checked = false;
// Find the right tablet peer.
for (const auto& peer : peers) {
if (peer->tablet_metadata()->table_id() == table_id) {
LOG(INFO) << "Checking wal retention time for tablet " << peer->tablet()->tablet_id();
ASSERT_EQ(peer->log()->wal_retention_secs(), expected_wal_retention_secs);
ASSERT_EQ(peer->tablet_metadata()->wal_retention_secs(), expected_wal_retention_secs);
wal_retention_time_checked = true;
break;
}
}
ASSERT_TRUE(wal_retention_time_checked);
}

void AssertChangeRecords(const google::protobuf::RepeatedPtrField<cdc::KeyValuePairPB>& changes,
int32_t expected_int, std::string expected_str) {
ASSERT_EQ(changes.size(), 2);
Expand All @@ -112,17 +136,50 @@ void CDCServiceTest::GetTablet(std::string* tablet_id) {

TEST_F(CDCServiceTest, TestCreateCDCStream) {
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
CreateCDCStream(cdc_proxy_, table_.table()->id(), boost::none /* retention time */, &stream_id);

TableId table_id;
std::unordered_map<std::string, std::string> options;
ASSERT_OK(client_->GetCDCStream(stream_id, &table_id, &options));
ASSERT_EQ(table_id, table_.table()->id());
}

TEST_F(CDCServiceTest, TestCreateCDCStreamWithDefaultRententionTime) {
// Set default WAL retention time to 10 hours.
FLAGS_cdc_wal_retention_time_secs = 36000;

CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), boost::none /* retention time */, &stream_id);

TableId table_id;
std::unordered_map<std::string, std::string> options;
ASSERT_OK(client_->GetCDCStream(stream_id, &table_id, &options));

// Verify that the wal retention time was set at the tablet level.
VerifyWalRetentionTime(table_id, FLAGS_cdc_wal_retention_time_secs);
}

TEST_F(CDCServiceTest, TestCreateCDCStreamWithSpecifiedtRententionTime) {
// Set default WAL retention time to 10 hours.
FLAGS_cdc_wal_retention_time_secs = 36000;

// Set WAL retention time to 1 hour.
constexpr uint32_t wal_retention_secs = 3600;

CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), wal_retention_secs, &stream_id);

TableId table_id;
std::unordered_map<std::string, std::string> options;
ASSERT_OK(client_->GetCDCStream(stream_id, &table_id, &options));

// Verify that the wal retention time was set at the tablet level.
VerifyWalRetentionTime(table_id, wal_retention_secs);
}

TEST_F(CDCServiceTest, TestDeleteCDCStream) {
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
CreateCDCStream(cdc_proxy_, table_.table()->id(), boost::none /* retention time */, &stream_id);

TableId table_id;
std::unordered_map<std::string, std::string> options;
Expand All @@ -140,7 +197,7 @@ TEST_F(CDCServiceTest, TestDeleteCDCStream) {

TEST_F(CDCServiceTest, TestGetChanges) {
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
CreateCDCStream(cdc_proxy_, table_.table()->id(), boost::none /* retention time */, &stream_id);

std::string tablet_id;
GetTablet(&tablet_id);
Expand Down Expand Up @@ -260,7 +317,7 @@ TEST_F(CDCServiceTest, TestGetChanges) {

TEST_F(CDCServiceTest, TestGetCheckpoint) {
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
CreateCDCStream(cdc_proxy_, table_.table()->id(), boost::none /* retention time */, &stream_id);

std::string tablet_id;
GetTablet(&tablet_id);
Expand All @@ -284,7 +341,7 @@ TEST_F(CDCServiceTest, TestGetCheckpoint) {

TEST_F(CDCServiceTest, TestListTablets) {
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
CreateCDCStream(cdc_proxy_, table_.table()->id(), boost::none /* retention time */, &stream_id);

std::string tablet_id;
GetTablet(&tablet_id);
Expand All @@ -308,7 +365,7 @@ TEST_F(CDCServiceTest, TestListTablets) {

TEST_F(CDCServiceTest, TestOnlyGetLocalChanges) {
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
CreateCDCStream(cdc_proxy_, table_.table()->id(), boost::none /* retention time */, &stream_id);

std::string tablet_id;
GetTablet(&tablet_id);
Expand Down
3 changes: 2 additions & 1 deletion ent/src/yb/integration-tests/cdc_service-txn-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ TEST_F(CDCServiceTxnTest, TestGetChanges) {

// Create CDC stream on table.
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);
CreateCDCStream(cdc_proxy_, table_.table()->id(), boost::none /* wal retention time */,
&stream_id);

GetChangesRequestPB change_req;
GetChangesResponsePB change_resp;
Expand Down
4 changes: 4 additions & 0 deletions ent/src/yb/util/cdc_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ void AssertIntKey(const google::protobuf::RepeatedPtrField<cdc::KeyValuePairPB>&

void CreateCDCStream(const std::unique_ptr<CDCServiceProxy>& cdc_proxy,
const TableId& table_id,
boost::optional<uint32_t> wal_retention_secs,
CDCStreamId* stream_id) {
CreateCDCStreamRequestPB req;
CreateCDCStreamResponsePB resp;
req.set_table_id(table_id);
if (wal_retention_secs) {
req.set_retention_sec(*wal_retention_secs);
}

rpc::RpcController rpc;
cdc_proxy->CreateCDCStream(req, &resp, &rpc);
Expand Down
1 change: 1 addition & 0 deletions ent/src/yb/util/cdc_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ void AssertIntKey(const google::protobuf::RepeatedPtrField<cdc::KeyValuePairPB>&

void CreateCDCStream(const std::unique_ptr<CDCServiceProxy>& cdc_proxy,
const TableId& table_id,
boost::optional<uint32_t> wal_retention_secs,
CDCStreamId* stream_id);

} // namespace cdc
Expand Down
1 change: 1 addition & 0 deletions src/yb/consensus/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ Status Log::WaitUntilAllFlushed() {
}

void Log::set_wal_retention_secs(uint32_t wal_retention_secs) {
LOG(INFO) << "Setting wal retention time to " << wal_retention_secs << " seconds";
wal_retention_secs_.store(wal_retention_secs, std::memory_order_release);
}

Expand Down

0 comments on commit c262d45

Please sign in to comment.