Skip to content

Commit

Permalink
[#2838] [#2051] [cdc] Alter Replication Command for CDC
Browse files Browse the repository at this point in the history
Summary:
Adds the ability to add & remove tables from an existing replication UUID in addition to
modifying the master addresses..

Test Plan:
1. ybd debug --cxx-test twodc-test
2. Manual Testing

Reviewers: hector, rahuldesirazu, neha

Reviewed By: neha

Subscribers: mikhail, ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D7796
  • Loading branch information
nspiegelberg committed Feb 26, 2020
1 parent 62c0598 commit 69f5ace
Show file tree
Hide file tree
Showing 19 changed files with 743 additions and 105 deletions.
177 changes: 176 additions & 1 deletion ent/src/yb/integration-tests/twodc-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -457,14 +457,16 @@ class TwoDCTest : public YBTest, public testing::WithParamInterface<int> {
return LoggedWaitFor([=]() -> Result<bool> {
static int i = 0;
constexpr int kNumIterationsWithCorrectResult = 5;
if (NumProducerTabletsPolled(cluster) == num_producer_tablets) {
auto cur_tablets = NumProducerTabletsPolled(cluster);
if (cur_tablets == num_producer_tablets) {
if (i++ == kNumIterationsWithCorrectResult) {
i = 0;
return true;
}
} else {
i = 0;
}
LOG(INFO) << "Tablets being polled: " << cur_tablets;
return false;
}, MonoDelta::FromSeconds(kRpcTimeout), "Num producer tablets being polled");
}
Expand Down Expand Up @@ -972,6 +974,179 @@ TEST_P(TwoDCTest, BiDirectionalWrites) {
Destroy();
}

TEST_P(TwoDCTest, AlterUniverseReplicationMasters) {
// Tablets = Servers + 1 to stay simple but ensure round robin gives a tablet to everyone.
uint32_t t_count = 2, master_count = 3;
auto tables = ASSERT_RESULT(SetUpWithParams(
{t_count, t_count}, {t_count, t_count}, 1, master_count));

// tables contains both producer and consumer universe tables (alternately).
// Pick out just the producer table from the list.
std::vector<std::shared_ptr<client::YBTable>> producer_tables{tables[0], tables[2]},
initial_tables{tables[0]};

// SetupUniverseReplication only utilizes 1 master.
ASSERT_OK(SetupUniverseReplication(
producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, initial_tables));

master::GetUniverseReplicationResponsePB v_resp;
ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &v_resp));
ASSERT_EQ(v_resp.entry().producer_master_addresses_size(), 1);
ASSERT_EQ(HostPortFromPB(v_resp.entry().producer_master_addresses(0)),
producer_cluster()->leader_mini_master()->bound_rpc_addr());

// After creating the cluster, make sure all producer tablets are being polled for.
ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), t_count));

LOG(INFO) << "Alter Replication to include all Masters";
// Alter Replication to include the other masters.
{
master::AlterUniverseReplicationRequestPB alter_req;
master::AlterUniverseReplicationResponsePB alter_resp;
alter_req.set_producer_id(kUniverseId);

// GetMasterAddresses returns 3 masters.
string master_addr = producer_cluster()->GetMasterAddresses();
auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0));
HostPortsToPBs(hp_vec, alter_req.mutable_producer_master_addresses());

rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));

auto master_proxy = std::make_shared<master::MasterServiceProxy>(
&consumer_client()->proxy_cache(),
consumer_cluster()->leader_mini_master()->bound_rpc_addr());
ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc));
ASSERT_FALSE(alter_resp.has_error());

// Verify that the consumer now has all masters.
ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
master::GetUniverseReplicationResponsePB tmp_resp;
return VerifyUniverseReplication(consumer_cluster(), consumer_client(),
kUniverseId, &tmp_resp).ok() &&
tmp_resp.entry().producer_master_addresses_size() == master_count;
}, MonoDelta::FromSeconds(kRpcTimeout), "Verify master count increased."));
ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), t_count));
}

// Stop the old master.
LOG(INFO) << "Failover to new Master";
MiniMaster* old_master = producer_cluster()->leader_mini_master();
producer_cluster()->leader_mini_master()->Shutdown();
MiniMaster* new_master = producer_cluster()->leader_mini_master();
ASSERT_NE(nullptr, new_master);
ASSERT_NE(old_master, new_master);

LOG(INFO) << "Add Table after Master Failover";
// Add a new table to replication and ensure that it can read using the new master config.
{
master::AlterUniverseReplicationRequestPB alter_req;
master::AlterUniverseReplicationResponsePB alter_resp;
alter_req.set_producer_id(kUniverseId);
alter_req.add_producer_table_ids_to_add(producer_tables[1]->id());
rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));

auto master_proxy = std::make_shared<master::MasterServiceProxy>(
&consumer_client()->proxy_cache(),
consumer_cluster()->leader_mini_master()->bound_rpc_addr());
ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc));
ASSERT_FALSE(alter_resp.has_error());

// Verify that the consumer now has both tables in the universe.
ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
master::GetUniverseReplicationResponsePB tmp_resp;
return VerifyUniverseReplication(consumer_cluster(), consumer_client(),
kUniverseId, &tmp_resp).ok() &&
tmp_resp.entry().tables_size() == 2;
}, MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter."));
ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), t_count * 2));
}

ASSERT_OK(DeleteUniverseReplication(kUniverseId));
Destroy();
}

TEST_P(TwoDCTest, AlterUniverseReplicationTables) {
// Setup the consumer and producer cluster.
auto tables = ASSERT_RESULT(SetUpWithParams({3, 3}, {3, 3}, 1));
std::vector<std::shared_ptr<client::YBTable>> producer_tables{tables[0], tables[2]};
std::vector<std::shared_ptr<client::YBTable>> consumer_tables{tables[1], tables[3]};

// Setup universe replication on the first table.
auto initial_table = { producer_tables[0] };
ASSERT_OK(SetupUniverseReplication(
producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, initial_table));

// Verify that universe was setup on consumer.
master::GetUniverseReplicationResponsePB v_resp;
ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &v_resp));
ASSERT_EQ(v_resp.entry().producer_id(), kUniverseId);
ASSERT_EQ(v_resp.entry().tables_size(), 1);
ASSERT_EQ(v_resp.entry().tables(0), producer_tables[0]->id());

ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 3));

// 'add_table'. Add the next table with the alter command.
{
master::AlterUniverseReplicationRequestPB alter_req;
master::AlterUniverseReplicationResponsePB alter_resp;
alter_req.set_producer_id(kUniverseId);
alter_req.add_producer_table_ids_to_add(producer_tables[1]->id());
rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));

auto master_proxy = std::make_shared<master::MasterServiceProxy>(
&consumer_client()->proxy_cache(),
consumer_cluster()->leader_mini_master()->bound_rpc_addr());
ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc));
ASSERT_FALSE(alter_resp.has_error());

// Verify that the consumer now has both tables in the universe.
ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
master::GetUniverseReplicationResponsePB tmp_resp;
return VerifyUniverseReplication(consumer_cluster(), consumer_client(),
kUniverseId, &tmp_resp).ok() &&
tmp_resp.entry().tables_size() == 2;
}, MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter."));
ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 6));
}

// Write some rows to the new table on the Producer. Ensure that the Consumer gets it.
WriteWorkload(6, 10, producer_client(), producer_tables[1]->name());
ASSERT_OK(VerifyWrittenRecords(producer_tables[1]->name(), consumer_tables[1]->name()));

// 'remove_table'. Remove the original table, leaving only the new one.
{
master::AlterUniverseReplicationRequestPB alter_req;
master::AlterUniverseReplicationResponsePB alter_resp;
alter_req.set_producer_id(kUniverseId);
alter_req.add_producer_table_ids_to_remove(producer_tables[0]->id());
rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));

auto master_proxy = std::make_shared<master::MasterServiceProxy>(
&consumer_client()->proxy_cache(),
consumer_cluster()->leader_mini_master()->bound_rpc_addr());
ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc));
ASSERT_FALSE(alter_resp.has_error());

// Verify that the consumer now has only the new table created by the previous alter.
ASSERT_OK(LoggedWaitFor([&]() -> Result<bool> {
return VerifyUniverseReplication(consumer_cluster(), consumer_client(),
kUniverseId, &v_resp).ok() &&
v_resp.entry().tables_size() == 1;
}, MonoDelta::FromSeconds(kRpcTimeout), "Verify table removed with alter."));
ASSERT_EQ(v_resp.entry().tables(0), producer_tables[1]->id());
ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 3));
}

LOG(INFO) << "All alter tests passed. Tearing down...";

ASSERT_OK(DeleteUniverseReplication(kUniverseId));
Destroy();
}

TEST_P(TwoDCTest, ToggleReplicationEnabled) {
uint32_t replication_factor = NonTsanVsTsan(3, 1);
auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, replication_factor));
Expand Down
7 changes: 7 additions & 0 deletions ent/src/yb/master/catalog_entity_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,19 @@ Result<std::shared_ptr<CDCRpcTasks>> UniverseReplicationInfo::GetOrCreateCDCRpcT

std::lock_guard<decltype(lock_)> l(lock_);
if (cdc_rpc_tasks_ != nullptr) {
// Master Addresses changed, update YBClient with new retry logic.
if (master_addrs_ != master_addrs) {
if (cdc_rpc_tasks_->UpdateMasters(master_addrs).ok()) {
master_addrs_ = master_addrs;
}
}
return cdc_rpc_tasks_;
}

auto result = CDCRpcTasks::CreateWithMasterAddrs(producer_id_, master_addrs);
if (result.ok()) {
cdc_rpc_tasks_ = *result;
master_addrs_ = master_addrs;
}
return result;
}
Expand Down
1 change: 1 addition & 0 deletions ent/src/yb/master/catalog_entity_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class UniverseReplicationInfo : public RefCountedThreadSafe<UniverseReplicationI
const std::string producer_id_;

std::shared_ptr<CDCRpcTasks> cdc_rpc_tasks_;
std::string master_addrs_;

// Protects cdc_rpc_tasks_.
mutable rw_spinlock lock_;
Expand Down
6 changes: 6 additions & 0 deletions ent/src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ class CatalogManager : public yb::master::CatalogManager {
DeleteUniverseReplicationResponsePB* resp,
rpc::RpcContext* rpc);

// Alter Universe Replication.
CHECKED_STATUS AlterUniverseReplication(const AlterUniverseReplicationRequestPB* req,
AlterUniverseReplicationResponsePB* resp,
rpc::RpcContext* rpc);

// Enable/Disable an Existing Universe Replication.
CHECKED_STATUS SetUniverseReplicationEnabled(const SetUniverseReplicationEnabledRequestPB* req,
SetUniverseReplicationEnabledResponsePB* resp,
Expand Down Expand Up @@ -242,6 +247,7 @@ class CatalogManager : public yb::master::CatalogManager {
void AddCDCStreamToUniverseAndInitConsumer(const std::string& universe_id, const TableId& table,
const Result<CDCStreamId>& stream_id);

void MergeUniverseReplication(scoped_refptr<UniverseReplicationInfo> info);
void DeleteUniverseReplicationUnlocked(scoped_refptr<UniverseReplicationInfo> info);
void MarkUniverseReplicationFailed(scoped_refptr<UniverseReplicationInfo> universe);

Expand Down
Loading

0 comments on commit 69f5ace

Please sign in to comment.