diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 7ca9b43e0d82..dbd965bc39b5 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -8624,5 +8624,361 @@ TEST_F(CDCSDKYsqlTest, TestTablesWithEnumArrayColumnShouldNotGetAddedToStream) { ASSERT_EQ(stream.stream().table_id_size(), 0); } +void CDCSDKYsqlTest::TestDisableOfDynamicTableAdditionOnCDCStream( + bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_catalog_manager_bg_task_wait_ms) = 100; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_dynamic_tables_disable_option) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams(3, 3, false)); + + const vector table_list_suffix = {"_0", "_1", "_2", "_3", "_4"}; + const int kNumTables = 5; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + // Create and populate data in the first two tables. + for (idx = 0; idx < 2; idx++) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + } + + auto stream_id1 = use_consistent_snapshot_stream ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(EXPLICIT)); + auto stream_id2 = use_consistent_snapshot_stream ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(EXPLICIT)); + + std::unordered_set expected_table_ids = {table[0].table_id(), table[1].table_id()}; + VerifyTablesInStreamMetadata( + stream_id1, expected_table_ids, "Waiting for stream metadata after stream creation."); + VerifyTablesInStreamMetadata( + stream_id2, expected_table_ids, "Waiting for stream metadata after stream creation."); + + // Since dynamic table addition is not yet disabled, create a new table and verify that it gets + // added to stream metadata of both the streams. + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + idx += 1; + + expected_table_ids.insert(table[idx - 1].table_id()); + VerifyTablesInStreamMetadata( + stream_id1, expected_table_ids, "Waiting for GetDBStreamInfo after creating a new table."); + VerifyTablesInStreamMetadata( + stream_id2, expected_table_ids, "Waiting for GetDBStreamInfo after creating a new table."); + + // Disable dynamic table addition on stream1 via the yb-admin command. + ASSERT_OK(DisableDynamicTableAdditionOnCDCSDKStream(stream_id1)); + + // Create a new table and verify that it only gets added to stream2's metadata. + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + idx += 1; + + // wait for the bg thread responsible for dynamic table addition to complete its processing. + SleepFor(MonoDelta::FromSeconds(2 * kTimeMultiplier)); + + // Stream1's metadata should not contain table_4 as dynamic table addition is disabled. Therefore, + // the expected set of tables remains same as before. + VerifyTablesInStreamMetadata( + stream_id1, expected_table_ids, + "Waiting for GetDBStreamInfo after disabling dynamic table addition on stream1."); + + // Stream2's metadata should contain table_4 as dynamic table addition is not disabled. + auto expected_table_ids_for_stream2 = expected_table_ids; + expected_table_ids_for_stream2.insert(table[idx - 1].table_id()); + VerifyTablesInStreamMetadata( + stream_id2, expected_table_ids_for_stream2, + "Waiting for GetDBStreamInfo after disabling dynamic table addition on stream1."); + + // Verify tablets of table_4 have only been added to cdc_state table for stream2. + std::unordered_set expected_tablets_for_stream1; + std::unordered_set expected_tablets_for_stream2; + for (int i = 0; i < idx; i++) { + if (i < 3) { + expected_tablets_for_stream1.insert(tablets[i].Get(0).tablet_id()); + } + expected_tablets_for_stream2.insert(tablets[i].Get(0).tablet_id()); + } + + CheckTabletsInCDCStateTable(expected_tablets_for_stream1, test_client(), stream_id1); + CheckTabletsInCDCStateTable(expected_tablets_for_stream2, test_client(), stream_id2); + + // Even on a master restart, table_4 should not get added to the stream1. + auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster()); + ASSERT_OK(leader_master->Restart()); + LOG(INFO) << "Master Restarted"; + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Any newly created table after master restart should not get added to stream1. + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + idx += 1; + + // wait for the bg thread responsible for dynamic table addition to complete its processing. + SleepFor(MonoDelta::FromSeconds(2 * kTimeMultiplier)); + + // Stream1's metadata should not contain table_5 as dynamic table addition is disabled. + VerifyTablesInStreamMetadata( + stream_id1, expected_table_ids, + "Waiting for GetDBStreamInfo after creating new table on master restart."); + + // Stream2's metadata should contain table_5 as dynamic table addition is not disabled. + expected_table_ids_for_stream2.insert(table[idx - 1].table_id()); + VerifyTablesInStreamMetadata( + stream_id2, expected_table_ids_for_stream2, + "Waiting for GetDBStreamInfo after creating new table on master restart."); + + // verify tablets of table_4 & table_5 have not been added to cdc_state table for stream1. + CheckTabletsInCDCStateTable(expected_tablets_for_stream1, test_client(), stream_id1); + + // Tablets of table_5 should be added to cdc state table for stream2. + expected_tablets_for_stream2.insert(tablets[idx - 1].Get(0).tablet_id()); + CheckTabletsInCDCStateTable(expected_tablets_for_stream2, test_client(), stream_id2); +} + +TEST_F(CDCSDKYsqlTest, TestDisableOfDynamicTableAdditionOnNonConsistentSnapshotStream) { + TestDisableOfDynamicTableAdditionOnCDCStream( + /* use_consistent_snapshot_stream */ false); +} + +TEST_F(CDCSDKYsqlTest, TestDisableOfDynamicTableAdditionOnConsistentSnapshotStream) { + TestDisableOfDynamicTableAdditionOnCDCStream( + /* use_consistent_snapshot_stream */ true); +} + +void CDCSDKYsqlTest::TestUserTableRemovalFromCDCStream(bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_dynamic_tables_disable_option) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams(1, 1, false)); + + const vector table_list_suffix = {"_0", "_1", "_2"}; + const int kNumTables = 3; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + // Create and populate data in the all 3 tables. + for (idx = 0; idx < kNumTables; idx++) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + } + + auto stream_id = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + + // Before we remove a table, get the initial stream metadata as well as cdc state table entries. + std::unordered_set expected_tables; + for (const auto& table_entry : table) { + expected_tables.insert(table_entry.table_id()); + } + + VerifyTablesInStreamMetadata( + stream_id, expected_tables, "Waiting for GetDBStreamInfo after stream creation"); + + std::unordered_set expected_tablets; + for (const auto& tablets_entries : tablets) { + for (const auto& tablet : tablets_entries) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + // Disable dynamic table addition on stream via the yb-admin command. + ASSERT_OK(DisableDynamicTableAdditionOnCDCSDKStream(stream_id)); + + // Remove table_1 from stream using yb-admin command. This command will remove table from stream + // metadata as well as update its corresponding state table tablet entries with checkpoint as max. + ASSERT_OK(RemoveUserTableFromCDCSDKStream(stream_id, table[0].table_id())); + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Stream metadata should no longer contain the removed table i.e. table_1. + expected_tables.erase(table[0].table_id()); + std::unordered_set expected_tables_after_table_removal = expected_tables; + VerifyTablesInStreamMetadata( + stream_id, expected_tables_after_table_removal, + "Waiting for GetDBStreamInfo after table removal from CDC stream."); + + // Since checkpoint will be set to max for table_1's tablet entries, wait for + // UpdatePeersAndMetrics to delete those entries. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Verify tablets of table_1 are removed from cdc_state table. + expected_tablets.clear(); + for (int i = 1; i < idx; i++) { + for (const auto& tablet : tablets[i]) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + ASSERT_OK(WaitForFlushTables( + {table[0].table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, + /* is_compaction = */ false)); + + // Split table_1's tablet. + WaitUntilSplitIsSuccesful(tablets[0].Get(0).tablet_id(), table[0], 2); + google::protobuf::RepeatedPtrField table1_tablets_after_split; + ASSERT_OK(test_client()->GetTablets( + table[0], 0, &table1_tablets_after_split, /* partition_list_version =*/nullptr)); + ASSERT_EQ(table1_tablets_after_split.size(), 2); + + // Wait for sometime so that tablet split codepath has completed adding new cdc state entries. + SleepFor(MonoDelta::FromSeconds(3 * kTimeMultiplier)); + + // Children tablets of table_1 shouldnt get added to cdc state table since the table no longer + // exists in stream metadata. + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster()); + ASSERT_OK(leader_master->Restart()); + LOG(INFO) << "Master Restarted"; + + // Even after a restart, we shouldn't see table_1 in stream metadata as well as cdc state table + // entries shouldnt contain any of the table_1 tablets. + VerifyTablesInStreamMetadata( + stream_id, expected_tables_after_table_removal, + "Waiting for GetBStreamInfo after master restart."); + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); +} + +TEST_F(CDCSDKYsqlTest, TestUserTableRemovalFromNonConsistentSnapshotCDCStream) { + TestUserTableRemovalFromCDCStream(/* use_consistent_snapshot_stream */ false); +} + +TEST_F(CDCSDKYsqlTest, TestUserTableRemovalFromConsistentSnapshotCDCStream) { + TestUserTableRemovalFromCDCStream(/* use_consistent_snapshot_stream */ true); +} + +void CDCSDKYsqlTest::TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( + bool use_consistent_snapshot_stream) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = + use_consistent_snapshot_stream; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal) = + true; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_dynamic_tables_disable_option) = true; + // Setup cluster. + ASSERT_OK(SetUpWithParams(3, 3, false)); + + const vector table_list_suffix = {"_0", "_1", "_2"}; + const int kNumTables = 3; + vector table(kNumTables); + int idx = 0; + vector> tablets(kNumTables); + + // Create and populate data in the all 3 tables. + for (idx = 0; idx < kNumTables; idx++) { + table[idx] = ASSERT_RESULT(CreateTable( + &test_cluster_, kNamespaceName, kTableName, 3, true, false, 0, true, + table_list_suffix[idx])); + ASSERT_OK(test_client()->GetTablets( + table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); + ASSERT_OK(WriteEnumsRows( + 0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName, + kTableName)); + } + + auto stream_id = use_consistent_snapshot_stream + ? ASSERT_RESULT(CreateConsistentSnapshotStream()) + : ASSERT_RESULT(CreateDBStream(CDCCheckpointType::EXPLICIT)); + + // Before we remove a table, get the initial stream metadata as well as cdc state table entries. + std::unordered_set expected_tables; + for (const auto& table_entry : table) { + expected_tables.insert(table_entry.table_id()); + } + + VerifyTablesInStreamMetadata( + stream_id, expected_tables, "Waiting for GetDBStreamInfo after stream creation"); + + std::unordered_set expected_tablets; + for (const auto& tablets_entries : tablets) { + for (const auto& tablet : tablets_entries) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + // Disable dynamic table addition on stream via the yb-admin command. + ASSERT_OK(DisableDynamicTableAdditionOnCDCSDKStream(stream_id)); + + // Remove table_1 from stream using yb-admin command. This command will remove table from stream + // metadata but skip updating cdc state entries because the test flag + // skip_updating_cdc_state_entries_on_table_removal is set. + ASSERT_OK(RemoveUserTableFromCDCSDKStream(stream_id, table[0].table_id())); + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Stream metadata should no longer contain the removed table i.e. table_1. + expected_tables.erase(table[0].table_id()); + std::unordered_set expected_tables_after_table_removal = expected_tables; + VerifyTablesInStreamMetadata( + stream_id, expected_tables_after_table_removal, + "Waiting for GetDBStreamInfo after table removal from CDC stream."); + + // Verify that cdc state table still contains entries for the table that was removed. + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); + + // Now, validate the cdc state entries using the yb-admin command + // 'validate_cdc_state_table_entries_on_change_data_stream'. It will find state table entries for + // table_1 and update their checkpoints to max. + ASSERT_OK(ValidateAndSyncCDCStateEntriesForCDCSDKStream(stream_id)); + + // Since checkpoint will be set to max for table_1's tablet entries, wait for + // UpdatePeersAndMetrics to delete those entries. + SleepFor(MonoDelta::FromSeconds(5 * kTimeMultiplier)); + + // Verify tablets of table_1 are removed from cdc_state table. + expected_tablets.clear(); + for (int i = 1; i < idx; i++) { + for (const auto& tablet : tablets[i]) { + expected_tablets.insert(tablet.tablet_id()); + } + } + + CheckTabletsInCDCStateTable(expected_tablets, test_client(), stream_id); +} + +TEST_F( + CDCSDKYsqlTest, + TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemovalOnNonConsistentSnapshotStream) { + TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( + /* use_consistent_snapshot_stream */ false); +} + +TEST_F( + CDCSDKYsqlTest, + TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemovalOnConsistentSnapshotStream) { + TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( + /* use_consistent_snapshot_stream */ true); +} + } // namespace cdc } // namespace yb diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc index 124730b79e52..43010f34647e 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc @@ -4503,5 +4503,53 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { return oss.str(); } + Status CDCSDKYsqlTest::ExecuteYBAdminCommand( + const std::string& command_name, const std::vector& command_args) { + string tool_path = GetToolPath("../bin", "yb-admin"); + vector argv; + argv.push_back(tool_path); + argv.push_back("--master_addresses"); + argv.push_back(AsString(test_cluster_.mini_cluster_->GetMasterAddresses())); + argv.push_back(command_name); + for (const auto& command_arg : command_args) { + argv.push_back(command_arg); + } + + RETURN_NOT_OK(Subprocess::Call(argv)); + + return Status::OK(); + } + + Status CDCSDKYsqlTest::DisableDynamicTableAdditionOnCDCSDKStream( + const xrepl::StreamId& stream_id) { + std::string yb_admin_command = "disable_dynamic_table_addition_on_change_data_stream"; + vector command_args; + command_args.push_back(stream_id.ToString()); + RETURN_NOT_OK(ExecuteYBAdminCommand(yb_admin_command, command_args)); + return Status::OK(); + } + + Status CDCSDKYsqlTest::RemoveUserTableFromCDCSDKStream( + const xrepl::StreamId& stream_id, const TableId& table_id) { + std::string yb_admin_command = "remove_user_table_from_change_data_stream"; + vector command_args; + command_args.push_back(stream_id.ToString()); + command_args.push_back(table_id); + RETURN_NOT_OK(ExecuteYBAdminCommand(yb_admin_command, command_args)); + + return Status::OK(); + } + + Status CDCSDKYsqlTest::ValidateAndSyncCDCStateEntriesForCDCSDKStream( + const xrepl::StreamId& stream_id) { + std::string yb_admin_command = + "validate_and_sync_cdc_state_table_entries_on_change_data_stream"; + vector command_args; + command_args.push_back(stream_id.ToString()); + RETURN_NOT_OK(ExecuteYBAdminCommand(yb_admin_command, command_args)); + + return Status::OK(); + } + } // namespace cdc } // namespace yb diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.h b/src/yb/integration-tests/cdcsdk_ysql_test_base.h index 3b8d2eaa965c..4e4fa39760c0 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.h +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.h @@ -117,6 +117,8 @@ DECLARE_uint64(TEST_cdcsdk_publication_list_refresh_interval_micros); DECLARE_bool(cdcsdk_enable_dynamic_table_support); DECLARE_bool(enable_cdcsdk_setting_get_changes_response_byte_limit); DECLARE_uint64(cdcsdk_vwal_getchanges_resp_max_size_bytes); +DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option); +DECLARE_bool(TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal); namespace yb { @@ -781,6 +783,22 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { std::string GetPubRefreshTimesString(vector pub_refresh_times); void TestNonUserTableShouldNotGetAddedToCDCStream (bool create_consistent_snapshot_stream); + + Status ExecuteYBAdminCommand( + const std::string& command_name, const std::vector& command_args); + + Status DisableDynamicTableAdditionOnCDCSDKStream(const xrepl::StreamId& stream_id); + + void TestDisableOfDynamicTableAdditionOnCDCStream(bool use_consistent_snapshot_stream); + + Status RemoveUserTableFromCDCSDKStream(const xrepl::StreamId& stream_id, const TableId& table_id); + + void TestUserTableRemovalFromCDCStream(bool use_consistent_snapshot_stream); + + Status ValidateAndSyncCDCStateEntriesForCDCSDKStream(const xrepl::StreamId& stream_id); + + void TestValidationAndSyncOfCDCStateEntriesAfterUserTableRemoval( + bool use_consistent_snapshot_stream); }; } // namespace cdc diff --git a/src/yb/master/catalog_entity_info.cc b/src/yb/master/catalog_entity_info.cc index c7acaed05bac..5ce566841e3c 100644 --- a/src/yb/master/catalog_entity_info.cc +++ b/src/yb/master/catalog_entity_info.cc @@ -61,6 +61,7 @@ using std::string; using strings::Substitute; DECLARE_int32(tserver_unresponsive_timeout_ms); +DECLARE_bool(cdcsdk_enable_dynamic_tables_disable_option); DEFINE_RUNTIME_AUTO_bool( use_parent_table_id_field, kLocalPersisted, false, true, @@ -1264,6 +1265,16 @@ CDCStreamInfo::GetReplicaIdentityMap() const { return l->pb.replica_identity_map(); } +bool CDCStreamInfo::IsDynamicTableAdditionDisabled() const { + if (!FLAGS_cdcsdk_enable_dynamic_tables_disable_option) { + return false; + } + + auto l = LockForRead(); + return l->pb.has_cdcsdk_disable_dynamic_table_addition() && + l->pb.cdcsdk_disable_dynamic_table_addition(); +} + std::string CDCStreamInfo::ToString() const { auto l = LockForRead(); if (l->pb.has_namespace_id()) { diff --git a/src/yb/master/catalog_entity_info.h b/src/yb/master/catalog_entity_info.h index 22d2dfef3370..ca5da4f3cf92 100644 --- a/src/yb/master/catalog_entity_info.h +++ b/src/yb/master/catalog_entity_info.h @@ -1154,6 +1154,8 @@ class CDCStreamInfo : public RefCountedThreadSafe, const google::protobuf::Map<::std::string, ::yb::PgReplicaIdentity> GetReplicaIdentityMap() const; + bool IsDynamicTableAdditionDisabled() const; + std::string ToString() const override; bool IsXClusterStream() const; diff --git a/src/yb/master/catalog_entity_info.proto b/src/yb/master/catalog_entity_info.proto index f2f278826466..9d34e1334179 100644 --- a/src/yb/master/catalog_entity_info.proto +++ b/src/yb/master/catalog_entity_info.proto @@ -535,6 +535,11 @@ message SysCDCStreamEntryPB { map replica_identity_map = 9; optional string cdcsdk_ysql_replication_slot_plugin_name = 10; + + // Dynamic tables are the tables which are created after the creation of the stream. + // This field controls if dynamic tables should automatically be added to the CDC stream or not. + // If set to true, dynamic table wont get added to the CDC stream. + optional bool cdcsdk_disable_dynamic_table_addition = 11; } diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index b4c52a24f22a..c264f244885c 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -45,6 +45,7 @@ #include #include "yb/cdc/cdc_service.pb.h" +#include "yb/cdc/cdc_state_table.h" #include "yb/cdc/xcluster_types.h" #include "yb/common/constants.h" #include "yb/common/entity_ids.h" @@ -1352,6 +1353,18 @@ class CatalogManager : public tserver::TabletPeerLookupIf, YsqlBackfillReplicationSlotNameToCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc); + Status DisableDynamicTableAdditionOnCDCSDKStream( + const DisableDynamicTableAdditionOnCDCSDKStreamRequestPB* req, + DisableDynamicTableAdditionOnCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc); + + Status RemoveUserTableFromCDCSDKStream( + const RemoveUserTableFromCDCSDKStreamRequestPB* req, + RemoveUserTableFromCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc); + + Status ValidateAndSyncCDCStateEntriesForCDCSDKStream( + const ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB* req, + ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc); + // Query if Bootstrapping is required for a CDC stream (e.g. Are we missing logs). Status IsBootstrapRequired( const IsBootstrapRequiredRequestPB* req, @@ -1482,7 +1495,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf, // Find all CDCSDK streams which do not have metadata for the newly added tables. Status FindCDCSDKStreamsForAddedTables(TableStreamIdsMap* table_to_unprocessed_streams_map); - bool CanTableBeAddedToCDCSDKStream( + bool IsTableEligibleForCDCSDKStream( const TableInfoPtr& table_info, const Schema& schema) const REQUIRES_SHARED(mutex_); // This method compares all tables in the namespace to all the tables added to a CDCSDK stream, @@ -3102,6 +3115,14 @@ class CatalogManager : public tserver::TabletPeerLookupIf, const TabletInfo& tablet, const ScheduleMinRestoreTime& schedule_to_min_restore_time) EXCLUDES(mutex_); + Result> UpdateCheckpointForTabletEntriesInCDCState( + const xrepl::StreamId& stream_id, + const std::unordered_set& tables_in_stream_metadata, + const TableId& table_to_be_removed = ""); + + Status RemoveTableFromCDCStreamMetadataAndMaps( + const CDCStreamInfoPtr stream, const TableId table_id); + // Should be bumped up when tablet locations are changed. std::atomic tablet_locations_version_{0}; diff --git a/src/yb/master/master_replication.proto b/src/yb/master/master_replication.proto index d0832391f181..d090d61b8a24 100644 --- a/src/yb/master/master_replication.proto +++ b/src/yb/master/master_replication.proto @@ -42,6 +42,11 @@ message CDCStreamInfoPB { map replica_identity_map = 10; optional string cdcsdk_ysql_replication_slot_plugin_name = 11; + + // Dynamic tables are the tables which are created after the creation of the stream. + // This field controls if dynamic tables should automatically be added to the CDC stream or not. + // If set to true, dynamic table wont get added to the CDC stream. + optional bool cdcsdk_disable_dynamic_table_addition = 12; } message ValidateReplicationInfoRequestPB { @@ -755,6 +760,32 @@ message GetUniverseReplicationInfoResponsePB { repeated DbScopedInfoPB db_scoped_infos = 5; } +message DisableDynamicTableAdditionOnCDCSDKStreamRequestPB { + optional string stream_id = 1; +} + +message DisableDynamicTableAdditionOnCDCSDKStreamResponsePB { + optional MasterErrorPB error = 1; +} + +message RemoveUserTableFromCDCSDKStreamRequestPB { + optional string stream_id = 1; + optional string table_id = 2; +} + +message RemoveUserTableFromCDCSDKStreamResponsePB { + optional MasterErrorPB error = 1; +} + +message ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB { + optional string stream_id = 1; +} + +message ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB { + optional MasterErrorPB error = 1; + repeated string updated_tablet_entries = 2; +} + service MasterReplication { option (yb.rpc.custom_service_name) = "yb.master.MasterService"; @@ -871,4 +902,15 @@ service MasterReplication { returns (AddNamespaceToXClusterReplicationResponsePB); rpc IsAlterXClusterReplicationDone(IsAlterXClusterReplicationDoneRequestPB) returns (IsAlterXClusterReplicationDoneResponsePB); + + // Introduced for bug (#22876, #22773) + rpc DisableDynamicTableAdditionOnCDCSDKStream (DisableDynamicTableAdditionOnCDCSDKStreamRequestPB) + returns (DisableDynamicTableAdditionOnCDCSDKStreamResponsePB); + // Introduced for bug (#22876, #22773) + rpc RemoveUserTableFromCDCSDKStream (RemoveUserTableFromCDCSDKStreamRequestPB) + returns (RemoveUserTableFromCDCSDKStreamResponsePB); + // Introduced for bug (#22876, #22773) + rpc ValidateAndSyncCDCStateEntriesForCDCSDKStream( + ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB) + returns (ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB); } diff --git a/src/yb/master/master_replication_service.cc b/src/yb/master/master_replication_service.cc index 54fed364fa6e..b9ce83d56a61 100644 --- a/src/yb/master/master_replication_service.cc +++ b/src/yb/master/master_replication_service.cc @@ -57,6 +57,9 @@ class MasterReplicationServiceImpl : public MasterServiceBase, public MasterRepl (ChangeXClusterRole) (BootstrapProducer) (YsqlBackfillReplicationSlotNameToCDCSDKStream) + (DisableDynamicTableAdditionOnCDCSDKStream) + (RemoveUserTableFromCDCSDKStream) + (ValidateAndSyncCDCStateEntriesForCDCSDKStream) ) MASTER_SERVICE_IMPL_ON_LEADER_WITH_LOCK( diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index 82a638182be5..5febd70735f7 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -138,6 +138,22 @@ DEFINE_test_flag(bool, fail_universe_replication_merge, false, "Causes MergeUniv DEFINE_test_flag(bool, xcluster_fail_setup_stream_update, false, "Fail UpdateCDCStream RPC call"); +DEFINE_RUNTIME_AUTO_bool(cdcsdk_enable_dynamic_tables_disable_option, + kLocalPersisted, + false, + true, + "This flag needs to be true in order to disable addition of dynamic tables " + "to CDC stream. This flag is required to be to true for execution of " + "yb-admin commands - " + "\'disable_dynamic_table_addition_on_change_data_stream\', " + "\'remove_user_table_from_change_data_stream\'"); +TAG_FLAG(cdcsdk_enable_dynamic_tables_disable_option, advanced); +TAG_FLAG(cdcsdk_enable_dynamic_tables_disable_option, hidden); + +DEFINE_test_flag(bool, cdcsdk_skip_updating_cdc_state_entries_on_table_removal, false, + "Skip updating checkpoint to max for cdc state table entries while removing a user table from " + "CDCSDK stream."); + DECLARE_bool(xcluster_wait_on_ddl_alter); DECLARE_int32(master_rpc_timeout_ms); DECLARE_bool(ysql_yb_enable_replication_commands); @@ -1718,6 +1734,11 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables( continue; } + // skip streams on which dynamic table addition is disabled. + if(stream_info->IsDynamicTableAdditionDisabled()) { + continue; + } + auto const unprocessed_tables = FindOrNull(namespace_to_unprocessed_table_map, stream_info->namespace_id()); if (!unprocessed_tables) { @@ -1741,7 +1762,7 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables( continue; } - if (!CanTableBeAddedToCDCSDKStream(table, schema)) { + if (!IsTableEligibleForCDCSDKStream(table, schema)) { RemoveTableFromCDCSDKUnprocessedMap(unprocessed_table_id, stream_info->namespace_id()); continue; } @@ -1878,7 +1899,7 @@ std::vector CatalogManager::FindAllTablesForCDCSDK(const Namespace } } - if (!CanTableBeAddedToCDCSDKStream(table_info.get(), schema)) { + if (!IsTableEligibleForCDCSDKStream(table_info.get(), schema)) { continue; } @@ -1888,7 +1909,7 @@ std::vector CatalogManager::FindAllTablesForCDCSDK(const Namespace return tables; } -bool CatalogManager::CanTableBeAddedToCDCSDKStream( +bool CatalogManager::IsTableEligibleForCDCSDKStream( const TableInfoPtr& table_info, const Schema& schema) const { bool has_pk = true; bool has_invalid_pg_typeoid = false; @@ -2479,6 +2500,12 @@ Status CatalogManager::GetCDCStream( stream_info->set_stream_creation_time(stream_lock->pb.stream_creation_time()); } + if (FLAGS_cdcsdk_enable_dynamic_tables_disable_option && + stream_lock->pb.has_cdcsdk_disable_dynamic_table_addition()) { + stream_info->set_cdcsdk_disable_dynamic_table_addition( + stream_lock->pb.cdcsdk_disable_dynamic_table_addition()); + } + auto replica_identity_map = stream_lock->pb.replica_identity_map(); stream_info->mutable_replica_identity_map()->swap(replica_identity_map); @@ -2612,6 +2639,11 @@ Status CatalogManager::ListCDCStreams( stream->set_stream_creation_time(ltm->pb.stream_creation_time()); } + if (FLAGS_cdcsdk_enable_dynamic_tables_disable_option && + ltm->pb.has_cdcsdk_disable_dynamic_table_addition()) { + stream->set_cdcsdk_disable_dynamic_table_addition( + ltm->pb.cdcsdk_disable_dynamic_table_addition()); + } } return Status::OK(); } @@ -6102,6 +6134,230 @@ Status CatalogManager::YsqlBackfillReplicationSlotNameToCDCSDKStream( return Status::OK(); } +Status CatalogManager::DisableDynamicTableAdditionOnCDCSDKStream( + const DisableDynamicTableAdditionOnCDCSDKStreamRequestPB* req, + DisableDynamicTableAdditionOnCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc) { + LOG(INFO) << "Servicing DisableDynamicTableAdditionOnCDCSDKStream request from " + << RequestorString(rpc) << ": " << req->ShortDebugString(); + + if (!req->has_stream_id()) { + RETURN_INVALID_REQUEST_STATUS("CDC Stream ID must be provided"); + } + + if (!FLAGS_cdcsdk_enable_dynamic_tables_disable_option) { + RETURN_INVALID_REQUEST_STATUS( + "Disabling addition of dynamic tables to CDC stream is disallowed in the middle of an " + "upgrade. Finalize the upgrade and try again"); + } + + auto stream_id = VERIFY_RESULT(xrepl::StreamId::FromString(req->stream_id())); + + CDCStreamInfoPtr stream; + { + SharedLock lock(mutex_); + stream = FindPtrOrNull(cdc_stream_map_, stream_id); + } + + if (stream == nullptr || stream->LockForRead()->is_deleting()) { + return STATUS( + NotFound, "Could not find CDC stream", MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); + } + + if (!stream->IsCDCSDKStream()) { + RETURN_INVALID_REQUEST_STATUS("Not a CDC stream"); + } + + // We only want to allow disabling dynamic table addition on older streams that are not associated + // with a replication slot. + if (!stream->GetCdcsdkYsqlReplicationSlotName().empty()) { + RETURN_INVALID_REQUEST_STATUS( + "Cannot disable dynamic table addition on CDC streams associated with a replication slot"); + } + + if (stream->IsDynamicTableAdditionDisabled()) { + return STATUS(AlreadyPresent, "Dynamic table addition already disabled on the CDC stream"); + } + + // Disable dynamic table addition by setting the stream metadata field to true. + { + auto stream_lock = stream->LockForWrite(); + auto& pb = stream_lock.mutable_data()->pb; + + pb.set_cdcsdk_disable_dynamic_table_addition(true); + + RETURN_ACTION_NOT_OK( + sys_catalog_->Upsert(leader_ready_term(), stream), "Updating CDC stream in system catalog"); + + stream_lock.Commit(); + } + + LOG_WITH_FUNC(INFO) << "Successfully disabled dynamic table addition on CDC stream: " + << stream_id; + + return Status::OK(); +} + +Status CatalogManager::RemoveUserTableFromCDCSDKStream( + const RemoveUserTableFromCDCSDKStreamRequestPB* req, + RemoveUserTableFromCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc) { + LOG(INFO) << "Servicing RemoveUserTableFromCDCSDKStream request from " << RequestorString(rpc) + << ": " << req->ShortDebugString(); + + if (!req->has_stream_id() || !req->has_table_id()) { + RETURN_INVALID_REQUEST_STATUS("Both CDC Stream ID and table ID must be provided"); + } + + if (!FLAGS_cdcsdk_enable_dynamic_tables_disable_option) { + RETURN_INVALID_REQUEST_STATUS( + "Removal of user table from CDC stream is disallowed in the middle of an " + "upgrade. Finalize the upgrade and try again"); + } + + auto stream_id = VERIFY_RESULT(xrepl::StreamId::FromString(req->stream_id())); + auto table_id = req->table_id(); + + CDCStreamInfoPtr stream; + { + SharedLock lock(mutex_); + stream = FindPtrOrNull(cdc_stream_map_, stream_id); + } + + if (stream == nullptr || stream->LockForRead()->is_deleting()) { + return STATUS( + NotFound, "Could not find CDC stream", MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); + } + + if (!stream->IsCDCSDKStream()) { + RETURN_INVALID_REQUEST_STATUS("Not a CDC stream"); + } + + if (!stream->GetCdcsdkYsqlReplicationSlotName().empty()) { + RETURN_INVALID_REQUEST_STATUS( + "Cannot remove table from CDC streams that are associated with a replication slot"); + } + + if (!stream->IsDynamicTableAdditionDisabled()) { + RETURN_INVALID_REQUEST_STATUS( + "Cannot remove table unless dynamic table addition is disabled for the stream. Please use " + "the yb-admin command \"disable_dynamic_table_addition_in_change_data_stream\" to disable " + "dynamic table addition on the stream."); + } + + auto stream_ns_id = stream->LockForRead()->namespace_id(); + + scoped_refptr table; + { + SharedLock lock(mutex_); + table = tables_->FindTableOrNull(table_id); + } + + if (table == nullptr || table->LockForRead()->is_deleting()) { + return STATUS(NotFound, "Could not find table", MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); + } + + Schema schema; + Status status = table->GetSchema(&schema); + if (!status.ok()) { + return STATUS(InternalError, Format("Error while getting schema for table: $0", table->name())); + } + + { + SharedLock lock(mutex_); + if (!IsTableEligibleForCDCSDKStream(table, schema)) { + RETURN_INVALID_REQUEST_STATUS( + "Only allowed to remove user tables from CDC streams via this command."); + } + } + + auto table_ns_id = table->LockForRead()->namespace_id(); + if (table_ns_id != stream_ns_id) { + RETURN_INVALID_REQUEST_STATUS("Stream and Table are not under the same namespace"); + } + + if (!FLAGS_TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal) { + std::unordered_set tables_in_stream_metadata; + { + auto stream_lock = stream->LockForRead(); + for (const auto& table_id : stream_lock->table_id()) { + tables_in_stream_metadata.insert(table_id); + } + } + + // Explicitly remove the table from the set since we want to remove the tablet entries of this + // table from the cdc state table. + tables_in_stream_metadata.erase(table_id); + RETURN_NOT_OK_PREPEND( + UpdateCheckpointForTabletEntriesInCDCState(stream_id, tables_in_stream_metadata), + "Error updating tablet entries from cdc state table"); + } + + // Now remove the table from the CDC stream metadata & cdcsdk_tables_to_stream_map_ and persist + // the updated metadata. + RETURN_NOT_OK_PREPEND( + RemoveTableFromCDCStreamMetadataAndMaps(stream, table_id), + "Error removing table from stream metadata and maps"); + + LOG_WITH_FUNC(INFO) + << "Successfully removed table " << table_id << " from CDC stream: " << stream_id + << " and updated the checkpoint to max for corresponding cdc state table entries."; + + return Status::OK(); +} + +Status CatalogManager::ValidateAndSyncCDCStateEntriesForCDCSDKStream( + const ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB* req, + ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB* resp, rpc::RpcContext* rpc) { + LOG(INFO) << "Servicing ValidateAndSyncCDCStateEntriesForCDCSDKStream request from " + << RequestorString(rpc) << ": " << req->ShortDebugString(); + + if (!req->has_stream_id()) { + RETURN_INVALID_REQUEST_STATUS("CDC Stream ID must be provided"); + } + + auto stream_id = VERIFY_RESULT(xrepl::StreamId::FromString(req->stream_id())); + CDCStreamInfoPtr stream; + { + SharedLock lock(mutex_); + stream = FindPtrOrNull(cdc_stream_map_, stream_id); + } + + if (stream == nullptr || stream->LockForRead()->is_deleting()) { + return STATUS( + NotFound, "Could not find CDC stream", MasterError(MasterErrorPB::OBJECT_NOT_FOUND)); + } + + if (!stream->IsCDCSDKStream()) { + RETURN_INVALID_REQUEST_STATUS("Not a CDC stream"); + } + + if (!stream->GetCdcsdkYsqlReplicationSlotName().empty()) { + RETURN_INVALID_REQUEST_STATUS( + "Cannot validate and sync cdc state table entries for CDC streams that are associated with " + "a replication slot"); + } + + std::unordered_set tables_in_stream_metadata; + { + auto stream_lock = stream->LockForRead(); + tables_in_stream_metadata.reserve(stream_lock->table_id().size()); + for (const auto& table_id : stream_lock->table_id()) { + tables_in_stream_metadata.insert(table_id); + } + } + + auto updated_state_table_entries = VERIFY_RESULT( + UpdateCheckpointForTabletEntriesInCDCState(stream_id, tables_in_stream_metadata)); + + for (const auto& entry : updated_state_table_entries) { + resp->add_updated_tablet_entries(entry.key.tablet_id); + } + + LOG_WITH_FUNC(INFO) + << "Successfully validated and synced cdc state table entries for CDC stream: " << stream_id; + + return Status::OK(); +} + std::vector CatalogManager::GetAllXClusterUniverseReplicationInfos() { SharedLock lock(mutex_); @@ -7453,5 +7709,123 @@ void CatalogManager::CDCSDKPopulateDeleteRetainerInfoForTabletDrop( delete_retainer.active_cdcsdk = IsTablePartOfCDCSDK(tablet_info.table()->id()); } +Result> +CatalogManager::UpdateCheckpointForTabletEntriesInCDCState( + const xrepl::StreamId& stream_id, const std::unordered_set& tables_in_stream_metadata, + const TableId& table_to_be_removed) { + std::unordered_set tablet_entries_to_be_removed; + + // If the table_id to be removed is provided, we will only find out cdc state table entries + // corresponding to this table and update their checkpoints. Otherwise, we'll consider all state + // table entries for checkpoint update. + if (!table_to_be_removed.empty()) { + scoped_refptr table; + { + SharedLock lock(mutex_); + table = tables_->FindTableOrNull(table_to_be_removed); + } + + // First we'll update the checkpoint to OpId max for all the cdc state entries correponding to + // the table. Therefore, get all the tablets for the table to be removed. + TabletInfos tablets; + tablets = VERIFY_RESULT(table->GetTablets(IncludeInactive::kTrue)); + + for (const auto& tablet : tablets) { + tablet_entries_to_be_removed.insert(tablet->tablet_id()); + } + } + + Status iteration_status; + auto all_entry_keys = + VERIFY_RESULT(cdc_state_table_->GetTableRange({} /* just key columns */, &iteration_status)); + std::vector entries_to_update; + // Get all the tablet, stream pairs from cdc_state for the given stream. + std::vector cdc_state_tablet_entries; + for (const auto& entry_result : all_entry_keys) { + RETURN_NOT_OK(entry_result); + const auto& entry = *entry_result; + + if (entry.key.stream_id == stream_id) { + // If table_id is provided, filter out state entries belonging to tablets of the table. + if (table_to_be_removed.empty() || + (!table_to_be_removed.empty() && + tablet_entries_to_be_removed.contains(entry.key.tablet_id))) { + cdc_state_tablet_entries.push_back(entry.key.tablet_id); + } + } + } + RETURN_NOT_OK(iteration_status); + + // Get the tablet info for state table entries of the stream. + auto tablet_infos = GetTabletInfos(cdc_state_tablet_entries); + + // For each state table entry present in cdc_state_tablet_entries, verify that the tablet's table + // is present in the CDC stream metadata. If not, update checkpoint of such tablet entries to + // OpId::Max. For colocated tables, even if one of the colocated table is present in the CDC + // stream metadata, skip updating the checkpoint for that tablet, stream pair. + for (const auto& tablet_info : tablet_infos) { + bool table_found = false; + for (const auto& table_id : tablet_info->GetTableIds()) { + if (tables_in_stream_metadata.contains(table_id)) { + table_found = true; + } + } + + if (!table_found) { + cdc::CDCStateTableEntry update_entry(tablet_info->tablet_id(), stream_id); + update_entry.checkpoint = OpId::Max(); + entries_to_update.emplace_back(std::move(update_entry)); + LOG_WITH_FUNC(INFO) + << "Setting checkpoint to OpId::Max() for cdc state table entry (tablet,stream) - " + << update_entry.ToString(); + } + } + + if (!entries_to_update.empty()) { + LOG_WITH_FUNC(INFO) + << "Updating checkpoint to max for " << entries_to_update.size() + << " cdc state entries as part of validating cdc state table entries for CDC stream: " + << stream_id; + RETURN_NOT_OK_PREPEND( + cdc_state_table_->UpdateEntries(entries_to_update), + "Error setting checkpoint to OpId::Max() in cdc_state table"); + } + + return entries_to_update; +} + +Status CatalogManager::RemoveTableFromCDCStreamMetadataAndMaps( + const CDCStreamInfoPtr stream, const TableId table_id) { + // Remove the table from the CDC stream metadata & cdcsdk_tables_to_stream_map_ and persist + // the updated metadata. + { + auto ltm = stream->LockForWrite(); + bool need_to_update_stream = false; + + auto table_id_iter = std::find(ltm->table_id().begin(), ltm->table_id().end(), table_id); + if (table_id_iter != ltm->table_id().end()) { + need_to_update_stream = true; + ltm.mutable_data()->pb.mutable_table_id()->erase(table_id_iter); + } + + if (need_to_update_stream) { + RETURN_ACTION_NOT_OK( + sys_catalog_->Upsert(leader_ready_term(), stream), + "Updating CDC streams in system catalog"); + } + + ltm.Commit(); + + if (need_to_update_stream) { + { + LockGuard lock(mutex_); + cdcsdk_tables_to_stream_map_[table_id].erase(stream->StreamId()); + } + } + } + + return Status::OK(); +} + } // namespace master } // namespace yb diff --git a/src/yb/tools/yb-admin_cli.cc b/src/yb/tools/yb-admin_cli.cc index b47da2e9d029..d54106c19b5b 100644 --- a/src/yb/tools/yb-admin_cli.cc +++ b/src/yb/tools/yb-admin_cli.cc @@ -1945,6 +1945,50 @@ Status ysql_backfill_change_data_stream_with_replication_slot_action( return Status::OK(); } +const auto disable_dynamic_table_addition_on_change_data_stream_args = ""; +Status disable_dynamic_table_addition_on_change_data_stream_action( + const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) { + if (args.size() != 1) { + return ClusterAdminCli::kInvalidArguments; + } + + const string stream_id = args[0]; + string msg = Format("Failed to disable dynamic table addition on CDC stream $0", stream_id); + + RETURN_NOT_OK_PREPEND(client->DisableDynamicTableAdditionOnCDCSDKStream(stream_id), msg); + return Status::OK(); +} + +const auto remove_user_table_from_change_data_stream_args = " "; +Status remove_user_table_from_change_data_stream_action( + const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) { + if (args.size() != 2) { + return ClusterAdminCli::kInvalidArguments; + } + + const string stream_id = args[0]; + const string table_id = args[1]; + string msg = Format("Failed to remove table $0 from CDC stream $1", table_id, stream_id); + + RETURN_NOT_OK_PREPEND(client->RemoveUserTableFromCDCSDKStream(stream_id, table_id), msg); + return Status::OK(); +} + +const auto validate_and_sync_cdc_state_table_entries_on_change_data_stream_args = ""; +Status validate_and_sync_cdc_state_table_entries_on_change_data_stream_action( + const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) { + if (args.size() != 1) { + return ClusterAdminCli::kInvalidArguments; + } + + const string stream_id = args[0]; + string msg = + Format("Failed to validate and sync cdc state table entries for CDC stream $0", stream_id); + + RETURN_NOT_OK_PREPEND(client->ValidateAndSyncCDCStateEntriesForCDCSDKStream(stream_id), msg); + return Status::OK(); +} + const auto setup_universe_replication_args = " " " [] " @@ -2731,6 +2775,9 @@ void ClusterAdminCli::RegisterCommandHandlers() { REGISTER_COMMAND(list_change_data_streams); REGISTER_COMMAND(get_change_data_stream_info); REGISTER_COMMAND(ysql_backfill_change_data_stream_with_replication_slot); + REGISTER_COMMAND(disable_dynamic_table_addition_on_change_data_stream); + REGISTER_COMMAND(remove_user_table_from_change_data_stream); + REGISTER_COMMAND(validate_and_sync_cdc_state_table_entries_on_change_data_stream); // xCluster Source commands REGISTER_COMMAND(bootstrap_cdc_producer); REGISTER_COMMAND(list_cdc_streams); diff --git a/src/yb/tools/yb-admin_client.cc b/src/yb/tools/yb-admin_client.cc index 7d263f632b57..caacae7ae9c9 100644 --- a/src/yb/tools/yb-admin_client.cc +++ b/src/yb/tools/yb-admin_client.cc @@ -3792,6 +3792,84 @@ Status ClusterAdminClient::YsqlBackfillReplicationSlotNameToCDCSDKStream( return Status::OK(); } +Status ClusterAdminClient::DisableDynamicTableAdditionOnCDCSDKStream(const std::string& stream_id) { + master::DisableDynamicTableAdditionOnCDCSDKStreamRequestPB req; + master::DisableDynamicTableAdditionOnCDCSDKStreamResponsePB resp; + + req.set_stream_id(stream_id); + + RpcController rpc; + rpc.set_timeout(timeout_); + RETURN_NOT_OK( + master_replication_proxy_->DisableDynamicTableAdditionOnCDCSDKStream(req, &resp, &rpc)); + + if (resp.has_error()) { + cout << "Error disabling dynamic table addition from CDC stream: " + << resp.error().status().message() << endl; + return StatusFromPB(resp.error().status()); + } + + cout << "Successfully disabled dynamic table addition on CDC stream: " << stream_id << "\n"; + + return Status::OK(); +} + +Status ClusterAdminClient::RemoveUserTableFromCDCSDKStream( + const std::string& stream_id, const std::string& table_id) { + master::RemoveUserTableFromCDCSDKStreamRequestPB req; + master::RemoveUserTableFromCDCSDKStreamResponsePB resp; + + req.set_stream_id(stream_id); + req.set_table_id(table_id); + + RpcController rpc; + // Set a higher timeout since this RPC verifes that each cdc state table entry for the stream + // belongs to one of the tables in the stream metadata. + rpc.set_timeout(MonoDelta::FromSeconds(std::max(timeout_.ToSeconds(), 120.0))); + RETURN_NOT_OK(master_replication_proxy_->RemoveUserTableFromCDCSDKStream(req, &resp, &rpc)); + + if (resp.has_error()) { + cout << "Error removing user table from CDC stream: " << resp.error().status().message() + << endl; + return StatusFromPB(resp.error().status()); + } + + cout << "Successfully removed user table: " << table_id << " from CDC stream: " << stream_id + << "\n"; + + return Status::OK(); +} + +Status ClusterAdminClient::ValidateAndSyncCDCStateEntriesForCDCSDKStream( + const std::string& stream_id) { + master::ValidateAndSyncCDCStateEntriesForCDCSDKStreamRequestPB req; + master::ValidateAndSyncCDCStateEntriesForCDCSDKStreamResponsePB resp; + + req.set_stream_id(stream_id); + + RpcController rpc; + rpc.set_timeout(timeout_); + RETURN_NOT_OK( + master_replication_proxy_->ValidateAndSyncCDCStateEntriesForCDCSDKStream(req, &resp, &rpc)); + + if (resp.has_error()) { + cout << "Error validating CDC state table entries on CDC stream: " + << resp.error().status().message() << endl; + return StatusFromPB(resp.error().status()); + } + + cout << "Successfully validated and synced CDC state table entries on CDC stream: " << stream_id + << "\n"; + if (resp.updated_tablet_entries().size() > 0) { + cout << "Updated checkpoint for the stream's cdc state table entries for following tablet_ids: " + << AsString(resp.updated_tablet_entries()) << "\n"; + } else { + cout << "No additional entries found in cdc state table that requires update. \n"; + } + + return Status::OK(); +} + Status ClusterAdminClient::WaitForSetupUniverseReplicationToFinish( const string& replication_group_id) { master::IsSetupUniverseReplicationDoneRequestPB req; diff --git a/src/yb/tools/yb-admin_client.h b/src/yb/tools/yb-admin_client.h index 28a902f90df1..a8088b91a29d 100644 --- a/src/yb/tools/yb-admin_client.h +++ b/src/yb/tools/yb-admin_client.h @@ -409,6 +409,12 @@ class ClusterAdminClient { Status YsqlBackfillReplicationSlotNameToCDCSDKStream( const std::string& stream_id, const std::string& replication_slot_name); + Status DisableDynamicTableAdditionOnCDCSDKStream(const std::string& stream_id); + + Status RemoveUserTableFromCDCSDKStream(const std::string& stream_id, const std::string& table_id); + + Status ValidateAndSyncCDCStateEntriesForCDCSDKStream(const std::string& stream_id); + Status SetupNamespaceReplicationWithBootstrap(const std::string& replication_id, const std::vector& producer_addresses, const TypedNamespaceName& ns,