diff --git a/ent/src/yb/cdc/cdc_service.cc b/ent/src/yb/cdc/cdc_service.cc index d78627bcf465..d46e4114130b 100644 --- a/ent/src/yb/cdc/cdc_service.cc +++ b/ent/src/yb/cdc/cdc_service.cc @@ -205,6 +205,8 @@ void CDCServiceImpl::DeleteCDCStream(const DeleteCDCStreamRequestPB* req, return; } + LOG(INFO) << "Received DeleteCDCStream request " << req->ShortDebugString(); + RPC_CHECK_AND_RETURN_ERROR(req->stream_id_size() > 0, STATUS(InvalidArgument, "Stream ID is required to delete CDC stream"), resp->mutable_error(), @@ -779,8 +781,8 @@ Result CDCServiceImpl::GetLastCheckpoint( auto cond = req->mutable_where_expr()->mutable_condition(); cond->set_op(QLOperator::QL_OP_AND); - QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, - QL_OP_EQUAL, producer_tablet.stream_id); + QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL, + producer_tablet.stream_id); table.AddColumns({master::kCdcCheckpoint}, req); RETURN_NOT_OK(session->ApplyAndFlush(op)); diff --git a/ent/src/yb/integration-tests/cdc_service-int-test.cc b/ent/src/yb/integration-tests/cdc_service-int-test.cc index 217cba02a944..346b91556ea4 100644 --- a/ent/src/yb/integration-tests/cdc_service-int-test.cc +++ b/ent/src/yb/integration-tests/cdc_service-int-test.cc @@ -153,7 +153,7 @@ void AssertChangeRecords(const google::protobuf::RepeatedPtrFieldcolumn(master::kCdcCheckpointIdx).string_value(); - size_t split = checkpoint.find("."); - auto index = boost::lexical_cast(checkpoint.substr(split + 1, string::npos)); + auto result = OpId::FromString(checkpoint); + ASSERT_OK(result); + OpId op_id = *result; // Verify that op id index has been advanced and is not 0. - ASSERT_GT(index, 0); + ASSERT_GT(op_id.index, 0); +} + +void VerifyCdcStateMatches(client::YBClient* client, + const CDCStreamId& stream_id, + const TabletId& tablet_id, + uint64_t term, + uint64_t index) { + client::TableHandle table; + client::YBTableName cdc_state_table( + YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); + ASSERT_OK(table.Open(cdc_state_table, client)); + const auto op = table.NewReadOp(); + auto* const req = op->mutable_request(); + QLAddStringHashValue(req, tablet_id); + auto cond = req->mutable_where_expr()->mutable_condition(); + cond->set_op(QLOperator::QL_OP_AND); + QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL, + stream_id); + table.AddColumns({master::kCdcCheckpoint}, req); + + auto session = client->NewSession(); + ASSERT_OK(session->ApplyAndFlush(op)); + + LOG(INFO) << strings::Substitute("Verifying tablet: $0, stream: $1, op_id: $2", + tablet_id, stream_id, OpId(term, index).ToString()); + + auto row_block = ql::RowsResult(op.get()).GetRowBlock(); + ASSERT_EQ(row_block->row_count(), 1); + + string checkpoint = row_block->row(0).column(0).string_value(); + auto result = OpId::FromString(checkpoint); + ASSERT_OK(result); + OpId op_id = *result; + + ASSERT_EQ(op_id.term, term); + ASSERT_EQ(op_id.index, index); +} + +void VerifyStreamDeletedFromCdcState(client::YBClient* client, + const CDCStreamId& stream_id, + const TabletId& tablet_id, + int timeout_secs) { + client::TableHandle table; + const client::YBTableName cdc_state_table( + YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); + ASSERT_OK(table.Open(cdc_state_table, client)); + + const auto op = table.NewReadOp(); + auto* const req = op->mutable_request(); + QLAddStringHashValue(req, tablet_id); + + auto cond = req->mutable_where_expr()->mutable_condition(); + cond->set_op(QLOperator::QL_OP_AND); + QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL, + stream_id); + + table.AddColumns({master::kCdcCheckpoint}, req); + auto session = client->NewSession(); + + // The deletion of cdc_state rows for the specified stream happen in an asynchronous thread, + // so even if the request has returned, it doesn't mean that the rows have been deleted yet. + ASSERT_OK(WaitFor([&](){ + EXPECT_OK(session->ApplyAndFlush(op)); + auto row_block = ql::RowsResult(op.get()).GetRowBlock(); + if (row_block->row_count() == 0) { + return true; + } + return false; + }, MonoDelta::FromSeconds(timeout_secs), "Stream rows in cdc_state have been deleted.")); } void CDCServiceTest::GetTablets(std::vector* tablet_ids) { @@ -247,6 +317,7 @@ TEST_F(CDCServiceTest, TestCreateCDCStreamWithDefaultRententionTime) { } TEST_F(CDCServiceTest, TestDeleteCDCStream) { + FLAGS_cdc_state_checkpoint_update_interval_ms = 0; CDCStreamId stream_id; CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id); @@ -255,6 +326,19 @@ TEST_F(CDCServiceTest, TestDeleteCDCStream) { ASSERT_OK(client_->GetCDCStream(stream_id, &table_id, &options)); ASSERT_EQ(table_id, table_.table()->id()); + std::vector tablet_ids; + std::vector ranges; + ASSERT_OK(client_->GetTablets(table_.table()->name(), 0 /* max_tablets */, &tablet_ids, &ranges)); + + bool get_changes_error = false; + // Send GetChanges requests so an entry for each tablet can be added to the cdc_state table. + // Term and index don't matter. + for (const auto& tablet_id : tablet_ids) { + GetChanges(tablet_id, stream_id, 1, 1, &get_changes_error); + ASSERT_FALSE(get_changes_error); + VerifyCdcStateMatches(client_.get(), stream_id, tablet_id, 1, 1); + } + ASSERT_OK(client_->DeleteCDCStream(stream_id)); // Check that the stream no longer exists. @@ -262,6 +346,10 @@ TEST_F(CDCServiceTest, TestDeleteCDCStream) { options.clear(); Status s = client_->GetCDCStream(stream_id, &table_id, &options); ASSERT_TRUE(s.IsNotFound()); + + for (const auto& tablet_id : tablet_ids) { + VerifyStreamDeletedFromCdcState(client_.get(), stream_id, tablet_id, 20); + } } TEST_F(CDCServiceTest, TestGetChanges) { @@ -800,7 +888,7 @@ TEST_F(CDCServiceTest, TestCheckpointUpdate) { } // Verify that cdc_state table has correct checkpoint. - ASSERT_NO_FATALS(VerifyCdcState(client_.get())); + ASSERT_NO_FATALS(VerifyCdcStateNotEmpty(client_.get())); // Call GetChanges again but without any from checkpoint. change_req.Clear(); @@ -819,7 +907,7 @@ TEST_F(CDCServiceTest, TestCheckpointUpdate) { } // Verify that cdc_state table's checkpoint is unaffected. - ASSERT_NO_FATALS(VerifyCdcState(client_.get())); + ASSERT_NO_FATALS(VerifyCdcStateNotEmpty(client_.get())); } namespace { @@ -1258,6 +1346,7 @@ void CDCServiceTestThreeServers::GetFirstTabletIdAndLeaderPeer(TabletId* tablet_ } } } + now = MonoTime::Now(); } } diff --git a/ent/src/yb/master/async_snapshot_tasks.h b/ent/src/yb/master/async_snapshot_tasks.h index 666f79aa278e..534191efb439 100644 --- a/ent/src/yb/master/async_snapshot_tasks.h +++ b/ent/src/yb/master/async_snapshot_tasks.h @@ -23,7 +23,7 @@ namespace master { // Keeps retrying until we get an "ok" response. class AsyncTabletSnapshotOp : public enterprise::RetryingTSRpcTask { public: - AsyncTabletSnapshotOp(Master *master, + AsyncTabletSnapshotOp(Master* master, ThreadPool* callback_pool, const scoped_refptr& tablet, const std::string& snapshot_id, diff --git a/ent/src/yb/master/catalog_entity_info.h b/ent/src/yb/master/catalog_entity_info.h index 4c266f2e7f21..ca3d58210276 100644 --- a/ent/src/yb/master/catalog_entity_info.h +++ b/ent/src/yb/master/catalog_entity_info.h @@ -29,6 +29,19 @@ struct PersistentCDCStreamInfo : public Persistent options() const { return pb.options(); } diff --git a/ent/src/yb/master/catalog_manager.h b/ent/src/yb/master/catalog_manager.h index 6f6bef6d3756..7ce23c5f820d 100644 --- a/ent/src/yb/master/catalog_manager.h +++ b/ent/src/yb/master/catalog_manager.h @@ -135,6 +135,12 @@ class CatalogManager : public yb::master::CatalogManager { GetUniverseReplicationResponsePB* resp, rpc::RpcContext* rpc); + // Find all the CDC streams that have been marked as DELETED. + CHECKED_STATUS FindCDCStreamsMarkedAsDeleting(std::vector>* streams); + + // Delete specified CDC streams. + CHECKED_STATUS CleanUpDeletedCDCStreams(const std::vector>& streams); + private: friend class SnapshotLoader; friend class ClusterLoadBalancer; @@ -196,8 +202,8 @@ class CatalogManager : public yb::master::CatalogManager { // Return all CDC streams. void GetAllCDCStreams(std::vector>* streams); - // Delete specified CDC streams. - CHECKED_STATUS DeleteCDCStreams(const std::vector>& streams); + // Mark specified CDC streams as DELETING so they can be removed later. + CHECKED_STATUS MarkCDCStreamsAsDeleting(const std::vector>& streams); // Find CDC streams for a table. std::vector> FindCDCStreamsForTable(const TableId& table_id); @@ -258,6 +264,9 @@ class CatalogManager : public yb::master::CatalogManager { std::unordered_map should_send_consumer_registry_ GUARDED_BY(should_send_consumer_registry_mutex_); + // YBClient used to modify the cdc_state table from the master. + std::unique_ptr cdc_ybclient_; + DISALLOW_COPY_AND_ASSIGN(CatalogManager); }; diff --git a/ent/src/yb/master/catalog_manager_ent.cc b/ent/src/yb/master/catalog_manager_ent.cc index 244871c5d7c5..f7862ee97c79 100644 --- a/ent/src/yb/master/catalog_manager_ent.cc +++ b/ent/src/yb/master/catalog_manager_ent.cc @@ -20,16 +20,21 @@ #include "yb/cdc/cdc_service.h" #include "yb/client/schema.h" +#include "yb/client/session.h" #include "yb/client/table.h" +#include "yb/client/table_handle.h" #include "yb/client/table_alterer.h" +#include "yb/client/yb_op.h" #include "yb/common/common.pb.h" #include "yb/gutil/bind.h" +#include "yb/gutil/strings/join.h" #include "yb/gutil/strings/substitute.h" #include "yb/master/master_defaults.h" #include "yb/master/master_util.h" #include "yb/master/sys_catalog.h" #include "yb/master/sys_catalog-internal.h" #include "yb/master/async_snapshot_tasks.h" +#include "yb/master/async_rpc_tasks.h" #include "yb/master/encryption_manager.h" #include "yb/tserver/backup.proxy.h" #include "yb/util/cast.h" @@ -107,7 +112,10 @@ class CDCStreamLoader : public Visitor { DCHECK(!ContainsKey(catalog_manager_->cdc_stream_map_, stream_id)) << "CDC stream already exists: " << stream_id; - if (!ContainsKey(*catalog_manager_->table_ids_map_, metadata.table_id())) { + scoped_refptr table = + FindPtrOrNull(*catalog_manager_->table_ids_map_, metadata.table_id()); + + if (!table) { LOG(ERROR) << "Invalid table ID " << metadata.table_id() << " for stream " << stream_id; // TODO (#2059): Potentially signals a race condition that table got deleted while stream was // being created. @@ -120,6 +128,12 @@ class CDCStreamLoader : public Visitor { auto l = stream->LockForWrite(); l->mutable_data()->pb.CopyFrom(metadata); + // If the table has been deleted, then mark this stream as DELETING so it can be deleted by the + // catalog manager background thread. + if (table->LockForRead()->data().is_deleting() && !l->data().is_deleting()) { + l->mutable_data()->pb.set_state(SysCDCStreamEntryPB::DELETING); + } + // Add the CDC stream to the CDC stream map. catalog_manager_->cdc_stream_map_[stream->id()] = stream; @@ -1327,6 +1341,19 @@ Status CatalogManager::IsCdcStateTableCreated(IsCreateTableDoneResponsePB* resp) return IsCreateTableDone(&req, resp); } +// Helper class to print a vector of CDCStreamInfo pointers. +namespace { + template + std::string JoinStreamsCSVLine(std::vector cdc_streams) { + std::vector cdc_stream_ids; + for (const auto& cdc_stream : cdc_streams) { + cdc_stream_ids.push_back(cdc_stream->id()); + } + return JoinCSVLine(cdc_stream_ids); + } +} // namespace + + Status CatalogManager::DeleteCDCStreamsForTable(const TableId& table_id) { return DeleteCDCStreamsForTables({table_id}); } @@ -1348,7 +1375,9 @@ Status CatalogManager::DeleteCDCStreamsForTables(const vector& table_id return Status::OK(); } - return DeleteCDCStreams(streams); + // Do not delete them here, just mark them as DELETING and the catalog manager background thread + // will handle the deletion. + return MarkCDCStreamsAsDeleting(streams); } std::vector> CatalogManager::FindCDCStreamsForTable( @@ -1359,7 +1388,7 @@ std::vector> CatalogManager::FindCDCStreamsForTable for (const auto& entry : cdc_stream_map_) { auto ltm = entry.second->LockForRead(); - if (ltm->data().table_id() == table_id) { + if (ltm->data().table_id() == table_id && !ltm->data().started_deleting()) { streams.push_back(entry.second); } } @@ -1371,7 +1400,9 @@ void CatalogManager::GetAllCDCStreams(std::vector>* streams->reserve(cdc_stream_map_.size()); std::shared_lock l(lock_); for (const CDCStreamInfoMap::value_type& e : cdc_stream_map_) { - streams->push_back(e.second); + if (!e.second->LockForRead()->data().is_deleting()) { + streams->push_back(e.second); + } } } @@ -1463,24 +1494,22 @@ Status CatalogManager::DeleteCDCStream(const DeleteCDCStreamRequestPB* req, } std::vector> streams; - std::string streams_str; { std::shared_lock l(lock_); for (const auto& stream_id : req->stream_id()) { auto stream = FindPtrOrNull(cdc_stream_map_, stream_id); - if (stream == nullptr) { + + if (stream == nullptr || stream->LockForRead()->data().is_deleting()) { Status s = STATUS(NotFound, "CDC stream does not exist", req->DebugString()); return SetupError(resp->mutable_error(), MasterErrorPB::OBJECT_NOT_FOUND, s); } streams.push_back(stream); - if (!streams_str.empty()) { - streams_str += ", "; - } - streams_str += stream->ToString(); } } - Status s = DeleteCDCStreams(streams); + // Do not delete them here, just mark them as DELETING and the catalog manager background thread + // will handle the deletion. + Status s = MarkCDCStreamsAsDeleting(streams); if (!s.ok()) { if (s.IsIllegalState()) { PANIC_RPC(rpc, s.message().ToString()); @@ -1488,26 +1517,161 @@ Status CatalogManager::DeleteCDCStream(const DeleteCDCStreamRequestPB* req, return CheckIfNoLongerLeaderAndSetupError(s, resp); } - LOG(INFO) << "Successfully deleted CDC streams " << streams_str + LOG(INFO) << "Successfully deleted CDC streams " << JoinStreamsCSVLine(streams) << " per request from " << RequestorString(rpc); return Status::OK(); } -Status CatalogManager::DeleteCDCStreams(const std::vector>& streams) { +Status CatalogManager::MarkCDCStreamsAsDeleting( + const std::vector>& streams) { std::vector> locks; + std::vector streams_to_mark; locks.reserve(streams.size()); for (auto& stream : streams) { - locks.push_back(stream->LockForWrite()); + auto l = stream->LockForWrite(); + l->mutable_data()->pb.set_state(SysCDCStreamEntryPB::DELETING); + locks.push_back(std::move(l)); + streams_to_mark.push_back(stream.get()); } + Status s = sys_catalog_->UpdateItems(streams_to_mark, leader_ready_term_); + if (!s.ok()) { + // The mutation will be aborted when 'l' exits the scope on early return. + s = s.CloneAndPrepend(Substitute("An error occurred while updating sys tables: $0", + s.ToString())); + LOG(WARNING) << s.ToString(); + return s; + } + LOG(INFO) << "Successfully marked streams " << JoinStreamsCSVLine(streams_to_mark) + << " as DELETING in sys catalog"; + for (auto& lock : locks) { + lock->Commit(); + } + return Status::OK(); +} + +Status CatalogManager::FindCDCStreamsMarkedAsDeleting( + std::vector>* streams) { + TRACE("Acquired catalog manager lock"); + std::shared_lock l(lock_); + for (const CDCStreamInfoMap::value_type& entry : cdc_stream_map_) { + auto ltm = entry.second->LockForRead(); + if (ltm->data().is_deleting()) { + LOG(INFO) << "Stream " << entry.second->id() << " was marked as DELETING"; + streams->push_back(entry.second); + } + } + return Status::OK(); +} - std::vector items; - items.reserve(streams.size()); +Status CatalogManager::CleanUpDeletedCDCStreams( + const std::vector>& streams) { + if (!cdc_ybclient_) { + // First. For each deleted stream, delete the cdc state rows. + std::vector addrs; + for (auto const& master_address : *master_->opts().GetMasterAddresses()) { + for (auto const& host_port : master_address) { + addrs.push_back(host_port.ToString()); + } + } + if (addrs.empty()) { + YB_LOG_EVERY_N_SECS(ERROR, 30) << "Unable to get master addresses for yb client"; + return STATUS(InternalError, "Unable to get master address for yb client"); + } + LOG(INFO) << "Using master addresses " << JoinCSVLine(addrs) << " to create cdc yb client"; + auto result = yb::client::YBClientBuilder() + .master_server_addrs(addrs) + .Build(); + + std::unique_ptr client; + if (!result.ok()) { + YB_LOG_EVERY_N_SECS(ERROR, 30) << "Unable to create client: " << result.status(); + return result.status().CloneAndPrepend("Unable to create yb client"); + } else { + cdc_ybclient_ = std::move(*result); + } + } + + // Delete all the entries in cdc_state table that contain all the deleted cdc streams. + client::TableHandle cdc_table; + const client::YBTableName cdc_state_table_name( + YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); + Status s = cdc_table.Open(cdc_state_table_name, cdc_ybclient_.get()); + if (!s.ok()) { + LOG(WARNING) << "Unable to open table " << master::kCdcStateTableName + << " to delete stream ids entries: " << s; + return s.CloneAndPrepend("Unable to open cdc_state table"); + } + + std::shared_ptr session = cdc_ybclient_->NewSession(); + std::vector>> stream_ops; + std::set failed_streams; for (const auto& stream : streams) { - items.push_back(stream.get()); + LOG(INFO) << "Deleting rows for stream " << stream->id(); + vector> tablets; + scoped_refptr table; + { + TRACE("Acquired catalog manager lock"); + std::shared_lock l(lock_); + table = FindPtrOrNull(*table_ids_map_, stream->table_id()); + } + // GetAllTablets locks lock_ in shared mode. + table->GetAllTablets(&tablets); + + for (const auto& tablet : tablets) { + const auto delete_op = cdc_table.NewDeleteOp(); + auto* delete_req = delete_op->mutable_request(); + + QLAddStringHashValue(delete_req, tablet->tablet_id()); + QLAddStringRangeValue(delete_req, stream->id()); + s = session->Apply(delete_op); + stream_ops.push_back(std::make_pair(stream->id(), delete_op)); + LOG(INFO) << "Deleting stream " << stream->id() << " for tablet " << tablet->tablet_id() + << " with request " << delete_req->ShortDebugString(); + if (!s.ok()) { + LOG(WARNING) << "Unable to delete stream with id " + << stream->id() << " from table " << master::kCdcStateTableName + << " for tablet " << tablet->tablet_id() + << ". Status: " << s + << ", Response: " << delete_op->response().ShortDebugString(); + } + } + } + // Flush all the delete operations. + s = session->Flush(); + if (!s.ok()) { + LOG(ERROR) << "Unable to flush operations to delete cdc streams: " << s; + return s.CloneAndPrepend("Error deleting cdc stream rows from cdc_state table"); + } + + for (const auto& e : stream_ops) { + if (!e.second->succeeded()) { + LOG(WARNING) << "Error deleting cdc_state row with tablet id " + << e.second->request().hashed_column_values(0).value().string_value() + << " and stream id " + << e.second->request().range_column_values(0).value().string_value() + << ": " << e.second->response().status(); + failed_streams.insert(e.first); + } + } + + // TODO: Read cdc_state table and verify that there are not rows with the specified cdc stream + // and keep those in the map in the DELETED state to retry later. + + std::vector> locks; + locks.reserve(streams.size() - failed_streams.size()); + std::vector streams_to_delete; + streams_to_delete.reserve(streams.size() - failed_streams.size()); + + // Delete from sys catalog only those streams that were successfully delete from cdc_state. + for (auto& stream : streams) { + if (failed_streams.find(stream->id()) == failed_streams.end()) { + locks.push_back(stream->LockForWrite()); + streams_to_delete.push_back(stream.get()); + } } - Status s = sys_catalog_->DeleteItems(items, leader_ready_term_); + s = sys_catalog_->DeleteItems(streams_to_delete, leader_ready_term_); if (!s.ok()) { // The mutation will be aborted when 'l' exits the scope on early return. s = s.CloneAndPrepend(Substitute("An error occurred while updating sys-catalog: $0", @@ -1515,17 +1679,21 @@ Status CatalogManager::DeleteCDCStreams(const std::vector l(lock_); - for (const auto& stream : streams) { + for (const auto& stream : streams_to_delete) { if (cdc_stream_map_.erase(stream->id()) < 1) { return STATUS(IllegalState, "Could not remove CDC stream from map", stream->id()); } } } + LOG(INFO) << "Successfully deleted streams " << JoinStreamsCSVLine(streams_to_delete) + << " from stream map"; for (auto& lock : locks) { lock->Commit(); @@ -1549,12 +1717,12 @@ Status CatalogManager::GetCDCStream(const GetCDCStreamRequestPB* req, scoped_refptr stream; { std::shared_lock l(lock_); - stream = FindPtrOrNull(cdc_stream_map_, req->stream_id()); - if (stream == nullptr) { - Status s = STATUS(NotFound, "Could not find CDC stream", req->DebugString()); - return SetupError(resp->mutable_error(), MasterErrorPB::OBJECT_NOT_FOUND, s); - } + } + + if (stream == nullptr || stream->LockForRead()->data().is_deleting()) { + Status s = STATUS(NotFound, "Could not find CDC stream", req->DebugString()); + return SetupError(resp->mutable_error(), MasterErrorPB::OBJECT_NOT_FOUND, s); } auto stream_lock = stream->LockForRead(); @@ -1592,8 +1760,8 @@ Status CatalogManager::ListCDCStreams(const ListCDCStreamsRequestPB* req, for (const CDCStreamInfoMap::value_type& entry : cdc_stream_map_) { auto ltm = entry.second->LockForRead(); - if (filter_table && table->id() != ltm->data().table_id()) { - continue; // Skip streams from other tables. + if ((filter_table && table->id() != ltm->data().table_id()) || ltm->data().is_deleting()) { + continue; // Skip deleting/deleted streams and streams from other tables. } CDCStreamInfoPB* stream = resp->add_streams(); @@ -1606,7 +1774,8 @@ Status CatalogManager::ListCDCStreams(const ListCDCStreamsRequestPB* req, bool CatalogManager::CDCStreamExistsUnlocked(const CDCStreamId& stream_id) { DCHECK(lock_.is_locked()); - if (FindPtrOrNull(cdc_stream_map_, stream_id) == nullptr) { + scoped_refptr stream = FindPtrOrNull(cdc_stream_map_, stream_id); + if (stream == nullptr || stream->LockForRead()->data().is_deleting()) { return false; } return true; diff --git a/src/yb/master/catalog_manager_bg_tasks.cc b/src/yb/master/catalog_manager_bg_tasks.cc index ae891b3a7b97..706a50968c4c 100644 --- a/src/yb/master/catalog_manager_bg_tasks.cc +++ b/src/yb/master/catalog_manager_bg_tasks.cc @@ -145,6 +145,11 @@ void CatalogManagerBgTasks::Run() { if (!to_delete.empty() || catalog_manager_->AreTablesDeleting()) { catalog_manager_->CleanUpDeletedTables(); } + std::vector> streams; + auto s = catalog_manager_->FindCDCStreamsMarkedAsDeleting(&streams); + if (s.ok() && !streams.empty()) { + s = catalog_manager_->CleanUpDeletedCDCStreams(streams); + } } WARN_NOT_OK(catalog_manager_->encryption_manager_-> GetUniverseKeyRegistry(&catalog_manager_->master_->proxy_cache()), diff --git a/src/yb/master/catalog_manager_bg_tasks.h b/src/yb/master/catalog_manager_bg_tasks.h index 7f82c4c4764e..451d302b2583 100644 --- a/src/yb/master/catalog_manager_bg_tasks.h +++ b/src/yb/master/catalog_manager_bg_tasks.h @@ -45,6 +45,10 @@ namespace master { class CatalogManager; +namespace enterprise { +class CatalogManager; +} + class CatalogManagerBgTasks final { public: explicit CatalogManagerBgTasks(CatalogManager *catalog_manager) @@ -52,7 +56,7 @@ class CatalogManagerBgTasks final { pending_updates_(false), cond_(&lock_), thread_(nullptr), - catalog_manager_(catalog_manager) { + catalog_manager_(down_cast(catalog_manager)) { } ~CatalogManagerBgTasks() {} @@ -73,7 +77,7 @@ class CatalogManagerBgTasks final { mutable Mutex lock_; ConditionVariable cond_; scoped_refptr thread_; - CatalogManager *catalog_manager_; + enterprise::CatalogManager *catalog_manager_; }; } // namespace master diff --git a/src/yb/master/master.proto b/src/yb/master/master.proto index 2b439df0347d..233877bbc3eb 100644 --- a/src/yb/master/master.proto +++ b/src/yb/master/master.proto @@ -407,8 +407,15 @@ message SysRedisConfigEntryPB { // The data part of a SysRowEntry in the sys.catalog table for a CDC stream. message SysCDCStreamEntryPB { + enum State { + ACTIVE = 0; + DELETING = 1; + // Currently DELETED is not being used because we delete streams entries from sys catalog. + DELETED = 2; + } optional string table_id = 1; repeated CDCStreamOptionsPB options = 2; + optional State state = 3 [default = ACTIVE]; } // The data part of a SysRowEntry in the sys.catalog table for a universe replication record.