Skip to content

Commit

Permalink
#2529 [CDC] Clean up cdc_state rows for deleted streams
Browse files Browse the repository at this point in the history
Summary:
Clean up cdc_state rows for deleted streams.

Also, modify catalog manager methods `DeleteCDCStreamsForTables` and `DeleteCDCStream` so that they don't remove the stream entries from the sys catalog and the streams map. Instead, these methods just mark the streams as DELETING, and the catalog manager background thread is responsible for cleaning up these entries. The cleaning up method `DeleteCDCStreams` is responsible for deleting the stream entries from the `cdc_state` table.

Test Plan: Added more code to TestDeleteCDCStream to verify that the stream entries from cdc_state table are deleted once the stream is deleted

Reviewers: rahuldesirazu, nicolas, bogdan, neha

Reviewed By: neha

Subscribers: bogdan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D7376
  • Loading branch information
hectorgcr committed Feb 12, 2020
1 parent 5dc1e21 commit d1bf6ed
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 40 deletions.
6 changes: 4 additions & 2 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ void CDCServiceImpl::DeleteCDCStream(const DeleteCDCStreamRequestPB* req,
return;
}

LOG(INFO) << "Received DeleteCDCStream request " << req->ShortDebugString();

RPC_CHECK_AND_RETURN_ERROR(req->stream_id_size() > 0,
STATUS(InvalidArgument, "Stream ID is required to delete CDC stream"),
resp->mutable_error(),
Expand Down Expand Up @@ -779,8 +781,8 @@ Result<OpId> CDCServiceImpl::GetLastCheckpoint(

auto cond = req->mutable_where_expr()->mutable_condition();
cond->set_op(QLOperator::QL_OP_AND);
QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx,
QL_OP_EQUAL, producer_tablet.stream_id);
QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL,
producer_tablet.stream_id);

table.AddColumns({master::kCdcCheckpoint}, req);
RETURN_NOT_OK(session->ApplyAndFlush(op));
Expand Down
101 changes: 95 additions & 6 deletions ent/src/yb/integration-tests/cdc_service-int-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,88 @@ void AssertChangeRecords(const google::protobuf::RepeatedPtrField<cdc::KeyValueP
ASSERT_EQ(changes[1].value().string_value(), expected_str);
}

void VerifyCdcState(client::YBClient* client) {
void VerifyCdcStateNotEmpty(client::YBClient* client) {
client::TableHandle table;
client::YBTableName cdc_state_table(
YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
ASSERT_OK(table.Open(cdc_state_table, client));
ASSERT_EQ(1, boost::size(client::TableRange(table)));
const auto& row = client::TableRange(table).begin();
string checkpoint = row->column(master::kCdcCheckpointIdx).string_value();
size_t split = checkpoint.find(".");
auto index = boost::lexical_cast<int>(checkpoint.substr(split + 1, string::npos));
auto result = OpId::FromString(checkpoint);
ASSERT_OK(result);
OpId op_id = *result;
// Verify that op id index has been advanced and is not 0.
ASSERT_GT(index, 0);
ASSERT_GT(op_id.index, 0);
}

void VerifyCdcStateMatches(client::YBClient* client,
const CDCStreamId& stream_id,
const TabletId& tablet_id,
uint64_t term,
uint64_t index) {
client::TableHandle table;
client::YBTableName cdc_state_table(
YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
ASSERT_OK(table.Open(cdc_state_table, client));
const auto op = table.NewReadOp();
auto* const req = op->mutable_request();
QLAddStringHashValue(req, tablet_id);
auto cond = req->mutable_where_expr()->mutable_condition();
cond->set_op(QLOperator::QL_OP_AND);
QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL,
stream_id);
table.AddColumns({master::kCdcCheckpoint}, req);

auto session = client->NewSession();
ASSERT_OK(session->ApplyAndFlush(op));

LOG(INFO) << strings::Substitute("Verifying tablet: $0, stream: $1, op_id: $2",
tablet_id, stream_id, OpId(term, index).ToString());

auto row_block = ql::RowsResult(op.get()).GetRowBlock();
ASSERT_EQ(row_block->row_count(), 1);

string checkpoint = row_block->row(0).column(0).string_value();
auto result = OpId::FromString(checkpoint);
ASSERT_OK(result);
OpId op_id = *result;

ASSERT_EQ(op_id.term, term);
ASSERT_EQ(op_id.index, index);
}

void VerifyStreamDeletedFromCdcState(client::YBClient* client,
const CDCStreamId& stream_id,
const TabletId& tablet_id,
int timeout_secs) {
client::TableHandle table;
const client::YBTableName cdc_state_table(
YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
ASSERT_OK(table.Open(cdc_state_table, client));

const auto op = table.NewReadOp();
auto* const req = op->mutable_request();
QLAddStringHashValue(req, tablet_id);

auto cond = req->mutable_where_expr()->mutable_condition();
cond->set_op(QLOperator::QL_OP_AND);
QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL,
stream_id);

table.AddColumns({master::kCdcCheckpoint}, req);
auto session = client->NewSession();

// The deletion of cdc_state rows for the specified stream happen in an asynchronous thread,
// so even if the request has returned, it doesn't mean that the rows have been deleted yet.
ASSERT_OK(WaitFor([&](){
EXPECT_OK(session->ApplyAndFlush(op));
auto row_block = ql::RowsResult(op.get()).GetRowBlock();
if (row_block->row_count() == 0) {
return true;
}
return false;
}, MonoDelta::FromSeconds(timeout_secs), "Stream rows in cdc_state have been deleted."));
}

void CDCServiceTest::GetTablets(std::vector<TabletId>* tablet_ids) {
Expand Down Expand Up @@ -247,6 +317,7 @@ TEST_F(CDCServiceTest, TestCreateCDCStreamWithDefaultRententionTime) {
}

TEST_F(CDCServiceTest, TestDeleteCDCStream) {
FLAGS_cdc_state_checkpoint_update_interval_ms = 0;
CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);

Expand All @@ -255,13 +326,30 @@ TEST_F(CDCServiceTest, TestDeleteCDCStream) {
ASSERT_OK(client_->GetCDCStream(stream_id, &table_id, &options));
ASSERT_EQ(table_id, table_.table()->id());

std::vector<std::string> tablet_ids;
std::vector<std::string> ranges;
ASSERT_OK(client_->GetTablets(table_.table()->name(), 0 /* max_tablets */, &tablet_ids, &ranges));

bool get_changes_error = false;
// Send GetChanges requests so an entry for each tablet can be added to the cdc_state table.
// Term and index don't matter.
for (const auto& tablet_id : tablet_ids) {
GetChanges(tablet_id, stream_id, 1, 1, &get_changes_error);
ASSERT_FALSE(get_changes_error);
VerifyCdcStateMatches(client_.get(), stream_id, tablet_id, 1, 1);
}

ASSERT_OK(client_->DeleteCDCStream(stream_id));

// Check that the stream no longer exists.
table_id.clear();
options.clear();
Status s = client_->GetCDCStream(stream_id, &table_id, &options);
ASSERT_TRUE(s.IsNotFound());

for (const auto& tablet_id : tablet_ids) {
VerifyStreamDeletedFromCdcState(client_.get(), stream_id, tablet_id, 20);
}
}

TEST_F(CDCServiceTest, TestGetChanges) {
Expand Down Expand Up @@ -800,7 +888,7 @@ TEST_F(CDCServiceTest, TestCheckpointUpdate) {
}

// Verify that cdc_state table has correct checkpoint.
ASSERT_NO_FATALS(VerifyCdcState(client_.get()));
ASSERT_NO_FATALS(VerifyCdcStateNotEmpty(client_.get()));

// Call GetChanges again but without any from checkpoint.
change_req.Clear();
Expand All @@ -819,7 +907,7 @@ TEST_F(CDCServiceTest, TestCheckpointUpdate) {
}

// Verify that cdc_state table's checkpoint is unaffected.
ASSERT_NO_FATALS(VerifyCdcState(client_.get()));
ASSERT_NO_FATALS(VerifyCdcStateNotEmpty(client_.get()));
}

namespace {
Expand Down Expand Up @@ -1258,6 +1346,7 @@ void CDCServiceTestThreeServers::GetFirstTabletIdAndLeaderPeer(TabletId* tablet_
}
}
}
now = MonoTime::Now();
}
}

Expand Down
2 changes: 1 addition & 1 deletion ent/src/yb/master/async_snapshot_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace master {
// Keeps retrying until we get an "ok" response.
class AsyncTabletSnapshotOp : public enterprise::RetryingTSRpcTask {
public:
AsyncTabletSnapshotOp(Master *master,
AsyncTabletSnapshotOp(Master* master,
ThreadPool* callback_pool,
const scoped_refptr<TabletInfo>& tablet,
const std::string& snapshot_id,
Expand Down
13 changes: 13 additions & 0 deletions ent/src/yb/master/catalog_entity_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ struct PersistentCDCStreamInfo : public Persistent<SysCDCStreamEntryPB, SysRowEn
return pb.table_id();
}

bool started_deleting() const {
return pb.state() == SysCDCStreamEntryPB::DELETING ||
pb.state() == SysCDCStreamEntryPB::DELETED;
}

bool is_deleting() const {
return pb.state() == SysCDCStreamEntryPB::DELETING;
}

bool is_deleted() const {
return pb.state() == SysCDCStreamEntryPB::DELETED;
}

const google::protobuf::RepeatedPtrField<CDCStreamOptionsPB> options() const {
return pb.options();
}
Expand Down
13 changes: 11 additions & 2 deletions ent/src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ class CatalogManager : public yb::master::CatalogManager {
GetUniverseReplicationResponsePB* resp,
rpc::RpcContext* rpc);

// Find all the CDC streams that have been marked as DELETED.
CHECKED_STATUS FindCDCStreamsMarkedAsDeleting(std::vector<scoped_refptr<CDCStreamInfo>>* streams);

// Delete specified CDC streams.
CHECKED_STATUS CleanUpDeletedCDCStreams(const std::vector<scoped_refptr<CDCStreamInfo>>& streams);

private:
friend class SnapshotLoader;
friend class ClusterLoadBalancer;
Expand Down Expand Up @@ -196,8 +202,8 @@ class CatalogManager : public yb::master::CatalogManager {
// Return all CDC streams.
void GetAllCDCStreams(std::vector<scoped_refptr<CDCStreamInfo>>* streams);

// Delete specified CDC streams.
CHECKED_STATUS DeleteCDCStreams(const std::vector<scoped_refptr<CDCStreamInfo>>& streams);
// Mark specified CDC streams as DELETING so they can be removed later.
CHECKED_STATUS MarkCDCStreamsAsDeleting(const std::vector<scoped_refptr<CDCStreamInfo>>& streams);

// Find CDC streams for a table.
std::vector<scoped_refptr<CDCStreamInfo>> FindCDCStreamsForTable(const TableId& table_id);
Expand Down Expand Up @@ -258,6 +264,9 @@ class CatalogManager : public yb::master::CatalogManager {
std::unordered_map<TabletServerId, bool> should_send_consumer_registry_
GUARDED_BY(should_send_consumer_registry_mutex_);

// YBClient used to modify the cdc_state table from the master.
std::unique_ptr<client::YBClient> cdc_ybclient_;

DISALLOW_COPY_AND_ASSIGN(CatalogManager);
};

Expand Down
Loading

0 comments on commit d1bf6ed

Please sign in to comment.