From d1bf6ed34195691f03c2d33f3b8fc304f3852ab4 Mon Sep 17 00:00:00 2001 From: Hector Cuellar Date: Tue, 11 Feb 2020 17:37:03 -0800 Subject: [PATCH] #2529 [CDC] Clean up cdc_state rows for deleted streams Summary: Clean up cdc_state rows for deleted streams. Also, modify catalog manager methods `DeleteCDCStreamsForTables` and `DeleteCDCStream` so that they don't remove the stream entries from the sys catalog and the streams map. Instead, these methods just mark the streams as DELETING, and the catalog manager background thread is responsible for cleaning up these entries. The cleaning up method `DeleteCDCStreams` is responsible for deleting the stream entries from the `cdc_state` table. Test Plan: Added more code to TestDeleteCDCStream to verify that the stream entries from cdc_state table are deleted once the stream is deleted Reviewers: rahuldesirazu, nicolas, bogdan, neha Reviewed By: neha Subscribers: bogdan, ybase Differential Revision: https://phabricator.dev.yugabyte.com/D7376 --- ent/src/yb/cdc/cdc_service.cc | 6 +- .../integration-tests/cdc_service-int-test.cc | 101 +++++++- ent/src/yb/master/async_snapshot_tasks.h | 2 +- ent/src/yb/master/catalog_entity_info.h | 13 + ent/src/yb/master/catalog_manager.h | 13 +- ent/src/yb/master/catalog_manager_ent.cc | 223 +++++++++++++++--- src/yb/master/catalog_manager_bg_tasks.cc | 5 + src/yb/master/catalog_manager_bg_tasks.h | 8 +- src/yb/master/master.proto | 7 + 9 files changed, 338 insertions(+), 40 deletions(-) diff --git a/ent/src/yb/cdc/cdc_service.cc b/ent/src/yb/cdc/cdc_service.cc index d78627bcf465..d46e4114130b 100644 --- a/ent/src/yb/cdc/cdc_service.cc +++ b/ent/src/yb/cdc/cdc_service.cc @@ -205,6 +205,8 @@ void CDCServiceImpl::DeleteCDCStream(const DeleteCDCStreamRequestPB* req, return; } + LOG(INFO) << "Received DeleteCDCStream request " << req->ShortDebugString(); + RPC_CHECK_AND_RETURN_ERROR(req->stream_id_size() > 0, STATUS(InvalidArgument, "Stream ID is required to delete CDC stream"), resp->mutable_error(), @@ -779,8 +781,8 @@ Result CDCServiceImpl::GetLastCheckpoint( auto cond = req->mutable_where_expr()->mutable_condition(); cond->set_op(QLOperator::QL_OP_AND); - QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, - QL_OP_EQUAL, producer_tablet.stream_id); + QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL, + producer_tablet.stream_id); table.AddColumns({master::kCdcCheckpoint}, req); RETURN_NOT_OK(session->ApplyAndFlush(op)); diff --git a/ent/src/yb/integration-tests/cdc_service-int-test.cc b/ent/src/yb/integration-tests/cdc_service-int-test.cc index 217cba02a944..346b91556ea4 100644 --- a/ent/src/yb/integration-tests/cdc_service-int-test.cc +++ b/ent/src/yb/integration-tests/cdc_service-int-test.cc @@ -153,7 +153,7 @@ void AssertChangeRecords(const google::protobuf::RepeatedPtrFieldcolumn(master::kCdcCheckpointIdx).string_value(); - size_t split = checkpoint.find("."); - auto index = boost::lexical_cast(checkpoint.substr(split + 1, string::npos)); + auto result = OpId::FromString(checkpoint); + ASSERT_OK(result); + OpId op_id = *result; // Verify that op id index has been advanced and is not 0. - ASSERT_GT(index, 0); + ASSERT_GT(op_id.index, 0); +} + +void VerifyCdcStateMatches(client::YBClient* client, + const CDCStreamId& stream_id, + const TabletId& tablet_id, + uint64_t term, + uint64_t index) { + client::TableHandle table; + client::YBTableName cdc_state_table( + YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); + ASSERT_OK(table.Open(cdc_state_table, client)); + const auto op = table.NewReadOp(); + auto* const req = op->mutable_request(); + QLAddStringHashValue(req, tablet_id); + auto cond = req->mutable_where_expr()->mutable_condition(); + cond->set_op(QLOperator::QL_OP_AND); + QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL, + stream_id); + table.AddColumns({master::kCdcCheckpoint}, req); + + auto session = client->NewSession(); + ASSERT_OK(session->ApplyAndFlush(op)); + + LOG(INFO) << strings::Substitute("Verifying tablet: $0, stream: $1, op_id: $2", + tablet_id, stream_id, OpId(term, index).ToString()); + + auto row_block = ql::RowsResult(op.get()).GetRowBlock(); + ASSERT_EQ(row_block->row_count(), 1); + + string checkpoint = row_block->row(0).column(0).string_value(); + auto result = OpId::FromString(checkpoint); + ASSERT_OK(result); + OpId op_id = *result; + + ASSERT_EQ(op_id.term, term); + ASSERT_EQ(op_id.index, index); +} + +void VerifyStreamDeletedFromCdcState(client::YBClient* client, + const CDCStreamId& stream_id, + const TabletId& tablet_id, + int timeout_secs) { + client::TableHandle table; + const client::YBTableName cdc_state_table( + YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); + ASSERT_OK(table.Open(cdc_state_table, client)); + + const auto op = table.NewReadOp(); + auto* const req = op->mutable_request(); + QLAddStringHashValue(req, tablet_id); + + auto cond = req->mutable_where_expr()->mutable_condition(); + cond->set_op(QLOperator::QL_OP_AND); + QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx, QL_OP_EQUAL, + stream_id); + + table.AddColumns({master::kCdcCheckpoint}, req); + auto session = client->NewSession(); + + // The deletion of cdc_state rows for the specified stream happen in an asynchronous thread, + // so even if the request has returned, it doesn't mean that the rows have been deleted yet. + ASSERT_OK(WaitFor([&](){ + EXPECT_OK(session->ApplyAndFlush(op)); + auto row_block = ql::RowsResult(op.get()).GetRowBlock(); + if (row_block->row_count() == 0) { + return true; + } + return false; + }, MonoDelta::FromSeconds(timeout_secs), "Stream rows in cdc_state have been deleted.")); } void CDCServiceTest::GetTablets(std::vector* tablet_ids) { @@ -247,6 +317,7 @@ TEST_F(CDCServiceTest, TestCreateCDCStreamWithDefaultRententionTime) { } TEST_F(CDCServiceTest, TestDeleteCDCStream) { + FLAGS_cdc_state_checkpoint_update_interval_ms = 0; CDCStreamId stream_id; CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id); @@ -255,6 +326,19 @@ TEST_F(CDCServiceTest, TestDeleteCDCStream) { ASSERT_OK(client_->GetCDCStream(stream_id, &table_id, &options)); ASSERT_EQ(table_id, table_.table()->id()); + std::vector tablet_ids; + std::vector ranges; + ASSERT_OK(client_->GetTablets(table_.table()->name(), 0 /* max_tablets */, &tablet_ids, &ranges)); + + bool get_changes_error = false; + // Send GetChanges requests so an entry for each tablet can be added to the cdc_state table. + // Term and index don't matter. + for (const auto& tablet_id : tablet_ids) { + GetChanges(tablet_id, stream_id, 1, 1, &get_changes_error); + ASSERT_FALSE(get_changes_error); + VerifyCdcStateMatches(client_.get(), stream_id, tablet_id, 1, 1); + } + ASSERT_OK(client_->DeleteCDCStream(stream_id)); // Check that the stream no longer exists. @@ -262,6 +346,10 @@ TEST_F(CDCServiceTest, TestDeleteCDCStream) { options.clear(); Status s = client_->GetCDCStream(stream_id, &table_id, &options); ASSERT_TRUE(s.IsNotFound()); + + for (const auto& tablet_id : tablet_ids) { + VerifyStreamDeletedFromCdcState(client_.get(), stream_id, tablet_id, 20); + } } TEST_F(CDCServiceTest, TestGetChanges) { @@ -800,7 +888,7 @@ TEST_F(CDCServiceTest, TestCheckpointUpdate) { } // Verify that cdc_state table has correct checkpoint. - ASSERT_NO_FATALS(VerifyCdcState(client_.get())); + ASSERT_NO_FATALS(VerifyCdcStateNotEmpty(client_.get())); // Call GetChanges again but without any from checkpoint. change_req.Clear(); @@ -819,7 +907,7 @@ TEST_F(CDCServiceTest, TestCheckpointUpdate) { } // Verify that cdc_state table's checkpoint is unaffected. - ASSERT_NO_FATALS(VerifyCdcState(client_.get())); + ASSERT_NO_FATALS(VerifyCdcStateNotEmpty(client_.get())); } namespace { @@ -1258,6 +1346,7 @@ void CDCServiceTestThreeServers::GetFirstTabletIdAndLeaderPeer(TabletId* tablet_ } } } + now = MonoTime::Now(); } } diff --git a/ent/src/yb/master/async_snapshot_tasks.h b/ent/src/yb/master/async_snapshot_tasks.h index 666f79aa278e..534191efb439 100644 --- a/ent/src/yb/master/async_snapshot_tasks.h +++ b/ent/src/yb/master/async_snapshot_tasks.h @@ -23,7 +23,7 @@ namespace master { // Keeps retrying until we get an "ok" response. class AsyncTabletSnapshotOp : public enterprise::RetryingTSRpcTask { public: - AsyncTabletSnapshotOp(Master *master, + AsyncTabletSnapshotOp(Master* master, ThreadPool* callback_pool, const scoped_refptr& tablet, const std::string& snapshot_id, diff --git a/ent/src/yb/master/catalog_entity_info.h b/ent/src/yb/master/catalog_entity_info.h index 4c266f2e7f21..ca3d58210276 100644 --- a/ent/src/yb/master/catalog_entity_info.h +++ b/ent/src/yb/master/catalog_entity_info.h @@ -29,6 +29,19 @@ struct PersistentCDCStreamInfo : public Persistent options() const { return pb.options(); } diff --git a/ent/src/yb/master/catalog_manager.h b/ent/src/yb/master/catalog_manager.h index 6f6bef6d3756..7ce23c5f820d 100644 --- a/ent/src/yb/master/catalog_manager.h +++ b/ent/src/yb/master/catalog_manager.h @@ -135,6 +135,12 @@ class CatalogManager : public yb::master::CatalogManager { GetUniverseReplicationResponsePB* resp, rpc::RpcContext* rpc); + // Find all the CDC streams that have been marked as DELETED. + CHECKED_STATUS FindCDCStreamsMarkedAsDeleting(std::vector>* streams); + + // Delete specified CDC streams. + CHECKED_STATUS CleanUpDeletedCDCStreams(const std::vector>& streams); + private: friend class SnapshotLoader; friend class ClusterLoadBalancer; @@ -196,8 +202,8 @@ class CatalogManager : public yb::master::CatalogManager { // Return all CDC streams. void GetAllCDCStreams(std::vector>* streams); - // Delete specified CDC streams. - CHECKED_STATUS DeleteCDCStreams(const std::vector>& streams); + // Mark specified CDC streams as DELETING so they can be removed later. + CHECKED_STATUS MarkCDCStreamsAsDeleting(const std::vector>& streams); // Find CDC streams for a table. std::vector> FindCDCStreamsForTable(const TableId& table_id); @@ -258,6 +264,9 @@ class CatalogManager : public yb::master::CatalogManager { std::unordered_map should_send_consumer_registry_ GUARDED_BY(should_send_consumer_registry_mutex_); + // YBClient used to modify the cdc_state table from the master. + std::unique_ptr cdc_ybclient_; + DISALLOW_COPY_AND_ASSIGN(CatalogManager); }; diff --git a/ent/src/yb/master/catalog_manager_ent.cc b/ent/src/yb/master/catalog_manager_ent.cc index 244871c5d7c5..f7862ee97c79 100644 --- a/ent/src/yb/master/catalog_manager_ent.cc +++ b/ent/src/yb/master/catalog_manager_ent.cc @@ -20,16 +20,21 @@ #include "yb/cdc/cdc_service.h" #include "yb/client/schema.h" +#include "yb/client/session.h" #include "yb/client/table.h" +#include "yb/client/table_handle.h" #include "yb/client/table_alterer.h" +#include "yb/client/yb_op.h" #include "yb/common/common.pb.h" #include "yb/gutil/bind.h" +#include "yb/gutil/strings/join.h" #include "yb/gutil/strings/substitute.h" #include "yb/master/master_defaults.h" #include "yb/master/master_util.h" #include "yb/master/sys_catalog.h" #include "yb/master/sys_catalog-internal.h" #include "yb/master/async_snapshot_tasks.h" +#include "yb/master/async_rpc_tasks.h" #include "yb/master/encryption_manager.h" #include "yb/tserver/backup.proxy.h" #include "yb/util/cast.h" @@ -107,7 +112,10 @@ class CDCStreamLoader : public Visitor { DCHECK(!ContainsKey(catalog_manager_->cdc_stream_map_, stream_id)) << "CDC stream already exists: " << stream_id; - if (!ContainsKey(*catalog_manager_->table_ids_map_, metadata.table_id())) { + scoped_refptr table = + FindPtrOrNull(*catalog_manager_->table_ids_map_, metadata.table_id()); + + if (!table) { LOG(ERROR) << "Invalid table ID " << metadata.table_id() << " for stream " << stream_id; // TODO (#2059): Potentially signals a race condition that table got deleted while stream was // being created. @@ -120,6 +128,12 @@ class CDCStreamLoader : public Visitor { auto l = stream->LockForWrite(); l->mutable_data()->pb.CopyFrom(metadata); + // If the table has been deleted, then mark this stream as DELETING so it can be deleted by the + // catalog manager background thread. + if (table->LockForRead()->data().is_deleting() && !l->data().is_deleting()) { + l->mutable_data()->pb.set_state(SysCDCStreamEntryPB::DELETING); + } + // Add the CDC stream to the CDC stream map. catalog_manager_->cdc_stream_map_[stream->id()] = stream; @@ -1327,6 +1341,19 @@ Status CatalogManager::IsCdcStateTableCreated(IsCreateTableDoneResponsePB* resp) return IsCreateTableDone(&req, resp); } +// Helper class to print a vector of CDCStreamInfo pointers. +namespace { + template + std::string JoinStreamsCSVLine(std::vector cdc_streams) { + std::vector cdc_stream_ids; + for (const auto& cdc_stream : cdc_streams) { + cdc_stream_ids.push_back(cdc_stream->id()); + } + return JoinCSVLine(cdc_stream_ids); + } +} // namespace + + Status CatalogManager::DeleteCDCStreamsForTable(const TableId& table_id) { return DeleteCDCStreamsForTables({table_id}); } @@ -1348,7 +1375,9 @@ Status CatalogManager::DeleteCDCStreamsForTables(const vector& table_id return Status::OK(); } - return DeleteCDCStreams(streams); + // Do not delete them here, just mark them as DELETING and the catalog manager background thread + // will handle the deletion. + return MarkCDCStreamsAsDeleting(streams); } std::vector> CatalogManager::FindCDCStreamsForTable( @@ -1359,7 +1388,7 @@ std::vector> CatalogManager::FindCDCStreamsForTable for (const auto& entry : cdc_stream_map_) { auto ltm = entry.second->LockForRead(); - if (ltm->data().table_id() == table_id) { + if (ltm->data().table_id() == table_id && !ltm->data().started_deleting()) { streams.push_back(entry.second); } } @@ -1371,7 +1400,9 @@ void CatalogManager::GetAllCDCStreams(std::vector>* streams->reserve(cdc_stream_map_.size()); std::shared_lock l(lock_); for (const CDCStreamInfoMap::value_type& e : cdc_stream_map_) { - streams->push_back(e.second); + if (!e.second->LockForRead()->data().is_deleting()) { + streams->push_back(e.second); + } } } @@ -1463,24 +1494,22 @@ Status CatalogManager::DeleteCDCStream(const DeleteCDCStreamRequestPB* req, } std::vector> streams; - std::string streams_str; { std::shared_lock l(lock_); for (const auto& stream_id : req->stream_id()) { auto stream = FindPtrOrNull(cdc_stream_map_, stream_id); - if (stream == nullptr) { + + if (stream == nullptr || stream->LockForRead()->data().is_deleting()) { Status s = STATUS(NotFound, "CDC stream does not exist", req->DebugString()); return SetupError(resp->mutable_error(), MasterErrorPB::OBJECT_NOT_FOUND, s); } streams.push_back(stream); - if (!streams_str.empty()) { - streams_str += ", "; - } - streams_str += stream->ToString(); } } - Status s = DeleteCDCStreams(streams); + // Do not delete them here, just mark them as DELETING and the catalog manager background thread + // will handle the deletion. + Status s = MarkCDCStreamsAsDeleting(streams); if (!s.ok()) { if (s.IsIllegalState()) { PANIC_RPC(rpc, s.message().ToString()); @@ -1488,26 +1517,161 @@ Status CatalogManager::DeleteCDCStream(const DeleteCDCStreamRequestPB* req, return CheckIfNoLongerLeaderAndSetupError(s, resp); } - LOG(INFO) << "Successfully deleted CDC streams " << streams_str + LOG(INFO) << "Successfully deleted CDC streams " << JoinStreamsCSVLine(streams) << " per request from " << RequestorString(rpc); return Status::OK(); } -Status CatalogManager::DeleteCDCStreams(const std::vector>& streams) { +Status CatalogManager::MarkCDCStreamsAsDeleting( + const std::vector>& streams) { std::vector> locks; + std::vector streams_to_mark; locks.reserve(streams.size()); for (auto& stream : streams) { - locks.push_back(stream->LockForWrite()); + auto l = stream->LockForWrite(); + l->mutable_data()->pb.set_state(SysCDCStreamEntryPB::DELETING); + locks.push_back(std::move(l)); + streams_to_mark.push_back(stream.get()); } + Status s = sys_catalog_->UpdateItems(streams_to_mark, leader_ready_term_); + if (!s.ok()) { + // The mutation will be aborted when 'l' exits the scope on early return. + s = s.CloneAndPrepend(Substitute("An error occurred while updating sys tables: $0", + s.ToString())); + LOG(WARNING) << s.ToString(); + return s; + } + LOG(INFO) << "Successfully marked streams " << JoinStreamsCSVLine(streams_to_mark) + << " as DELETING in sys catalog"; + for (auto& lock : locks) { + lock->Commit(); + } + return Status::OK(); +} + +Status CatalogManager::FindCDCStreamsMarkedAsDeleting( + std::vector>* streams) { + TRACE("Acquired catalog manager lock"); + std::shared_lock l(lock_); + for (const CDCStreamInfoMap::value_type& entry : cdc_stream_map_) { + auto ltm = entry.second->LockForRead(); + if (ltm->data().is_deleting()) { + LOG(INFO) << "Stream " << entry.second->id() << " was marked as DELETING"; + streams->push_back(entry.second); + } + } + return Status::OK(); +} - std::vector items; - items.reserve(streams.size()); +Status CatalogManager::CleanUpDeletedCDCStreams( + const std::vector>& streams) { + if (!cdc_ybclient_) { + // First. For each deleted stream, delete the cdc state rows. + std::vector addrs; + for (auto const& master_address : *master_->opts().GetMasterAddresses()) { + for (auto const& host_port : master_address) { + addrs.push_back(host_port.ToString()); + } + } + if (addrs.empty()) { + YB_LOG_EVERY_N_SECS(ERROR, 30) << "Unable to get master addresses for yb client"; + return STATUS(InternalError, "Unable to get master address for yb client"); + } + LOG(INFO) << "Using master addresses " << JoinCSVLine(addrs) << " to create cdc yb client"; + auto result = yb::client::YBClientBuilder() + .master_server_addrs(addrs) + .Build(); + + std::unique_ptr client; + if (!result.ok()) { + YB_LOG_EVERY_N_SECS(ERROR, 30) << "Unable to create client: " << result.status(); + return result.status().CloneAndPrepend("Unable to create yb client"); + } else { + cdc_ybclient_ = std::move(*result); + } + } + + // Delete all the entries in cdc_state table that contain all the deleted cdc streams. + client::TableHandle cdc_table; + const client::YBTableName cdc_state_table_name( + YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); + Status s = cdc_table.Open(cdc_state_table_name, cdc_ybclient_.get()); + if (!s.ok()) { + LOG(WARNING) << "Unable to open table " << master::kCdcStateTableName + << " to delete stream ids entries: " << s; + return s.CloneAndPrepend("Unable to open cdc_state table"); + } + + std::shared_ptr session = cdc_ybclient_->NewSession(); + std::vector>> stream_ops; + std::set failed_streams; for (const auto& stream : streams) { - items.push_back(stream.get()); + LOG(INFO) << "Deleting rows for stream " << stream->id(); + vector> tablets; + scoped_refptr table; + { + TRACE("Acquired catalog manager lock"); + std::shared_lock l(lock_); + table = FindPtrOrNull(*table_ids_map_, stream->table_id()); + } + // GetAllTablets locks lock_ in shared mode. + table->GetAllTablets(&tablets); + + for (const auto& tablet : tablets) { + const auto delete_op = cdc_table.NewDeleteOp(); + auto* delete_req = delete_op->mutable_request(); + + QLAddStringHashValue(delete_req, tablet->tablet_id()); + QLAddStringRangeValue(delete_req, stream->id()); + s = session->Apply(delete_op); + stream_ops.push_back(std::make_pair(stream->id(), delete_op)); + LOG(INFO) << "Deleting stream " << stream->id() << " for tablet " << tablet->tablet_id() + << " with request " << delete_req->ShortDebugString(); + if (!s.ok()) { + LOG(WARNING) << "Unable to delete stream with id " + << stream->id() << " from table " << master::kCdcStateTableName + << " for tablet " << tablet->tablet_id() + << ". Status: " << s + << ", Response: " << delete_op->response().ShortDebugString(); + } + } + } + // Flush all the delete operations. + s = session->Flush(); + if (!s.ok()) { + LOG(ERROR) << "Unable to flush operations to delete cdc streams: " << s; + return s.CloneAndPrepend("Error deleting cdc stream rows from cdc_state table"); + } + + for (const auto& e : stream_ops) { + if (!e.second->succeeded()) { + LOG(WARNING) << "Error deleting cdc_state row with tablet id " + << e.second->request().hashed_column_values(0).value().string_value() + << " and stream id " + << e.second->request().range_column_values(0).value().string_value() + << ": " << e.second->response().status(); + failed_streams.insert(e.first); + } + } + + // TODO: Read cdc_state table and verify that there are not rows with the specified cdc stream + // and keep those in the map in the DELETED state to retry later. + + std::vector> locks; + locks.reserve(streams.size() - failed_streams.size()); + std::vector streams_to_delete; + streams_to_delete.reserve(streams.size() - failed_streams.size()); + + // Delete from sys catalog only those streams that were successfully delete from cdc_state. + for (auto& stream : streams) { + if (failed_streams.find(stream->id()) == failed_streams.end()) { + locks.push_back(stream->LockForWrite()); + streams_to_delete.push_back(stream.get()); + } } - Status s = sys_catalog_->DeleteItems(items, leader_ready_term_); + s = sys_catalog_->DeleteItems(streams_to_delete, leader_ready_term_); if (!s.ok()) { // The mutation will be aborted when 'l' exits the scope on early return. s = s.CloneAndPrepend(Substitute("An error occurred while updating sys-catalog: $0", @@ -1515,17 +1679,21 @@ Status CatalogManager::DeleteCDCStreams(const std::vector l(lock_); - for (const auto& stream : streams) { + for (const auto& stream : streams_to_delete) { if (cdc_stream_map_.erase(stream->id()) < 1) { return STATUS(IllegalState, "Could not remove CDC stream from map", stream->id()); } } } + LOG(INFO) << "Successfully deleted streams " << JoinStreamsCSVLine(streams_to_delete) + << " from stream map"; for (auto& lock : locks) { lock->Commit(); @@ -1549,12 +1717,12 @@ Status CatalogManager::GetCDCStream(const GetCDCStreamRequestPB* req, scoped_refptr stream; { std::shared_lock l(lock_); - stream = FindPtrOrNull(cdc_stream_map_, req->stream_id()); - if (stream == nullptr) { - Status s = STATUS(NotFound, "Could not find CDC stream", req->DebugString()); - return SetupError(resp->mutable_error(), MasterErrorPB::OBJECT_NOT_FOUND, s); - } + } + + if (stream == nullptr || stream->LockForRead()->data().is_deleting()) { + Status s = STATUS(NotFound, "Could not find CDC stream", req->DebugString()); + return SetupError(resp->mutable_error(), MasterErrorPB::OBJECT_NOT_FOUND, s); } auto stream_lock = stream->LockForRead(); @@ -1592,8 +1760,8 @@ Status CatalogManager::ListCDCStreams(const ListCDCStreamsRequestPB* req, for (const CDCStreamInfoMap::value_type& entry : cdc_stream_map_) { auto ltm = entry.second->LockForRead(); - if (filter_table && table->id() != ltm->data().table_id()) { - continue; // Skip streams from other tables. + if ((filter_table && table->id() != ltm->data().table_id()) || ltm->data().is_deleting()) { + continue; // Skip deleting/deleted streams and streams from other tables. } CDCStreamInfoPB* stream = resp->add_streams(); @@ -1606,7 +1774,8 @@ Status CatalogManager::ListCDCStreams(const ListCDCStreamsRequestPB* req, bool CatalogManager::CDCStreamExistsUnlocked(const CDCStreamId& stream_id) { DCHECK(lock_.is_locked()); - if (FindPtrOrNull(cdc_stream_map_, stream_id) == nullptr) { + scoped_refptr stream = FindPtrOrNull(cdc_stream_map_, stream_id); + if (stream == nullptr || stream->LockForRead()->data().is_deleting()) { return false; } return true; diff --git a/src/yb/master/catalog_manager_bg_tasks.cc b/src/yb/master/catalog_manager_bg_tasks.cc index ae891b3a7b97..706a50968c4c 100644 --- a/src/yb/master/catalog_manager_bg_tasks.cc +++ b/src/yb/master/catalog_manager_bg_tasks.cc @@ -145,6 +145,11 @@ void CatalogManagerBgTasks::Run() { if (!to_delete.empty() || catalog_manager_->AreTablesDeleting()) { catalog_manager_->CleanUpDeletedTables(); } + std::vector> streams; + auto s = catalog_manager_->FindCDCStreamsMarkedAsDeleting(&streams); + if (s.ok() && !streams.empty()) { + s = catalog_manager_->CleanUpDeletedCDCStreams(streams); + } } WARN_NOT_OK(catalog_manager_->encryption_manager_-> GetUniverseKeyRegistry(&catalog_manager_->master_->proxy_cache()), diff --git a/src/yb/master/catalog_manager_bg_tasks.h b/src/yb/master/catalog_manager_bg_tasks.h index 7f82c4c4764e..451d302b2583 100644 --- a/src/yb/master/catalog_manager_bg_tasks.h +++ b/src/yb/master/catalog_manager_bg_tasks.h @@ -45,6 +45,10 @@ namespace master { class CatalogManager; +namespace enterprise { +class CatalogManager; +} + class CatalogManagerBgTasks final { public: explicit CatalogManagerBgTasks(CatalogManager *catalog_manager) @@ -52,7 +56,7 @@ class CatalogManagerBgTasks final { pending_updates_(false), cond_(&lock_), thread_(nullptr), - catalog_manager_(catalog_manager) { + catalog_manager_(down_cast(catalog_manager)) { } ~CatalogManagerBgTasks() {} @@ -73,7 +77,7 @@ class CatalogManagerBgTasks final { mutable Mutex lock_; ConditionVariable cond_; scoped_refptr thread_; - CatalogManager *catalog_manager_; + enterprise::CatalogManager *catalog_manager_; }; } // namespace master diff --git a/src/yb/master/master.proto b/src/yb/master/master.proto index 2b439df0347d..233877bbc3eb 100644 --- a/src/yb/master/master.proto +++ b/src/yb/master/master.proto @@ -407,8 +407,15 @@ message SysRedisConfigEntryPB { // The data part of a SysRowEntry in the sys.catalog table for a CDC stream. message SysCDCStreamEntryPB { + enum State { + ACTIVE = 0; + DELETING = 1; + // Currently DELETED is not being used because we delete streams entries from sys catalog. + DELETED = 2; + } optional string table_id = 1; repeated CDCStreamOptionsPB options = 2; + optional State state = 3 [default = ACTIVE]; } // The data part of a SysRowEntry in the sys.catalog table for a universe replication record.