diff --git a/ent/src/yb/integration-tests/twodc-test.cc b/ent/src/yb/integration-tests/twodc-test.cc index e52ebf7a96a3..3f8cdd6f084f 100644 --- a/ent/src/yb/integration-tests/twodc-test.cc +++ b/ent/src/yb/integration-tests/twodc-test.cc @@ -221,6 +221,28 @@ class TwoDCTest : public YBTest { }, MonoDelta::FromSeconds(kRpcTimeout), "Verify universe replication"); } + Status ToggleUniverseReplication( + MiniCluster* consumer_cluster, YBClient* consumer_client, + const std::string& universe_id, bool is_enabled) { + master::SetUniverseReplicationEnabledRequestPB req; + master::SetUniverseReplicationEnabledResponsePB resp; + + req.set_producer_id(universe_id); + req.set_is_enabled(is_enabled); + + auto master_proxy = std::make_shared( + &consumer_client->proxy_cache(), + consumer_cluster->leader_mini_master()->bound_rpc_addr()); + + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + RETURN_NOT_OK(master_proxy->SetUniverseReplicationEnabled(req, &resp, &rpc)); + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + return Status::OK(); + } + Status VerifyUniverseReplicationDeleted(MiniCluster* consumer_cluster, YBClient* consumer_client, const std::string& universe_id, int timeout) { return LoggedWaitFor([=]() -> Result { @@ -706,6 +728,36 @@ TEST_F(TwoDCTest, BiDirectionalWrites) { Destroy(); } +TEST_F(TwoDCTest, ToggleReplicationEnabled) { + uint32_t replication_factor = NonTsanVsTsan(3, 1); + auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, replication_factor)); + + std::vector> producer_tables; + // tables contains both producer and consumer universe tables (alternately). + // Pick out just the producer table from the list. + producer_tables.reserve(1); + producer_tables.push_back(tables[0]); + ASSERT_OK(SetupUniverseReplication( + producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); + + // Verify that universe is now ACTIVE + master::GetUniverseReplicationResponsePB resp; + ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); + + // After we know the universe is ACTIVE, make sure all tablets are getting polled. + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); + + // Disable the replication and ensure no tablets are being polled + ASSERT_OK(ToggleUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, false)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 0)); + + // Enable replication and ensure that all the tablets start being polled again + ASSERT_OK(ToggleUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, true)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); + + Destroy(); +} + TEST_F(TwoDCTest, TestDeleteUniverse) { FLAGS_cdc_rpc_timeout_ms = 5000; FLAGS_mock_get_changes_response_for_consumer_testing = true; diff --git a/ent/src/yb/master/catalog_manager.h b/ent/src/yb/master/catalog_manager.h index 57a617e1365e..14555de3e831 100644 --- a/ent/src/yb/master/catalog_manager.h +++ b/ent/src/yb/master/catalog_manager.h @@ -123,6 +123,11 @@ class CatalogManager : public yb::master::CatalogManager { DeleteUniverseReplicationResponsePB* resp, rpc::RpcContext* rpc); + // Enable/Disable an Existing Universe Replication. + CHECKED_STATUS SetUniverseReplicationEnabled(const SetUniverseReplicationEnabledRequestPB* req, + SetUniverseReplicationEnabledResponsePB* resp, + rpc::RpcContext* rpc); + // Get Universe Replication. CHECKED_STATUS GetUniverseReplication(const GetUniverseReplicationRequestPB* req, GetUniverseReplicationResponsePB* resp, diff --git a/ent/src/yb/master/catalog_manager_ent.cc b/ent/src/yb/master/catalog_manager_ent.cc index acc8746826fa..052d0323f1df 100644 --- a/ent/src/yb/master/catalog_manager_ent.cc +++ b/ent/src/yb/master/catalog_manager_ent.cc @@ -2094,6 +2094,75 @@ void CatalogManager::DeleteUniverseReplicationUnlocked( } } +Status CatalogManager::SetUniverseReplicationEnabled( + const SetUniverseReplicationEnabledRequestPB* req, + SetUniverseReplicationEnabledResponsePB* resp, + rpc::RpcContext* rpc) { + LOG(INFO) << "Servicing SetUniverseReplicationEnabled request from " << RequestorString(rpc) + << ": " << req->ShortDebugString(); + + // Sanity Checking Cluster State and Input. + RETURN_NOT_OK(CheckOnline()); + + if (!req->has_producer_id()) { + Status s = STATUS(InvalidArgument, "Producer universe ID must be provided", req->DebugString()); + return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s); + } + if (!req->has_is_enabled()) { + Status s = STATUS(InvalidArgument, "Must explicitly set whether to enable", req->DebugString()); + return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s); + } + + scoped_refptr universe; + { + std::shared_lock l(lock_); + + universe = FindPtrOrNull(universe_replication_map_, req->producer_id()); + if (universe == nullptr) { + Status s = STATUS(NotFound, "Could not find CDC producer universe", req->DebugString()); + return SetupError(resp->mutable_error(), MasterErrorPB::OBJECT_NOT_FOUND, s); + } + } + + // Update the Master's Universe Config with the new state. + { + auto l = universe->LockForWrite(); + if (l->data().pb.state() != SysUniverseReplicationEntryPB::DISABLED && + l->data().pb.state() != SysUniverseReplicationEntryPB::ACTIVE) { + Status s = STATUS(InvalidArgument, + Format("Universe Replication in invalid state: $0. Retry or Delete.", + SysUniverseReplicationEntryPB::State_Name(l->data().pb.state())), + req->DebugString()); + return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s); + } + if (req->is_enabled()) { + l->mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::ACTIVE); + } else { // DISABLE. + l->mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::DISABLED); + } + RETURN_NOT_OK(sys_catalog_->UpdateItem(universe.get(), leader_ready_term_)); + l->Commit(); + } + + // Modify the Consumer Registry, which will fan out this info to all TServers on heartbeat. + { + auto l = cluster_config_->LockForWrite(); + auto producer_map = l->mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); + auto it = producer_map->find(req->producer_id()); + if (it == producer_map->end()) { + LOG(WARNING) << "Valid Producer Universe not in Consumer Registry: " << req->producer_id(); + Status s = STATUS(NotFound, "Could not find CDC producer universe", req->DebugString()); + return SetupError(resp->mutable_error(), MasterErrorPB::OBJECT_NOT_FOUND, s); + } + (*it).second.set_disable_stream(!req->is_enabled()); + l->mutable_data()->pb.set_version(l->mutable_data()->pb.version() + 1); + RETURN_NOT_OK(sys_catalog_->UpdateItem(cluster_config_.get(), leader_ready_term_)); + l->Commit(); + } + + return Status::OK(); +} + Status CatalogManager::GetUniverseReplication(const GetUniverseReplicationRequestPB* req, GetUniverseReplicationResponsePB* resp, rpc::RpcContext* rpc) { diff --git a/ent/src/yb/tools/yb-admin_cli_ent.cc b/ent/src/yb/tools/yb-admin_cli_ent.cc index 7aca84413858..1aad374e333c 100644 --- a/ent/src/yb/tools/yb-admin_cli_ent.cc +++ b/ent/src/yb/tools/yb-admin_cli_ent.cc @@ -232,6 +232,21 @@ void ClusterAdminCli::RegisterCommandHandlers(ClusterAdminClientClass* client) { producer_id)); return Status::OK(); }); + + Register( + "set_universe_replication_enabled", " <0|1>", + [client](const CLIArguments& args) -> Status { + if (args.size() < 4) { + return ClusterAdminCli::kInvalidArguments; + } + const string producer_id = args[2]; + const bool is_enabled = atoi(args[3].c_str()) != 0; + RETURN_NOT_OK_PREPEND(client->SetUniverseReplicationEnabled(producer_id, is_enabled), + Substitute("Unable to $0 replication for universe $1", + is_enabled ? "enable" : "disable", + producer_id)); + return Status::OK(); + }); } } // namespace enterprise diff --git a/ent/src/yb/tools/yb-admin_client.h b/ent/src/yb/tools/yb-admin_client.h index 57854f2508bf..8e64ddab56b0 100644 --- a/ent/src/yb/tools/yb-admin_client.h +++ b/ent/src/yb/tools/yb-admin_client.h @@ -64,6 +64,9 @@ class ClusterAdminClient : public yb::tools::ClusterAdminClient { CHECKED_STATUS DeleteUniverseReplication(const std::string& producer_id); + CHECKED_STATUS SetUniverseReplicationEnabled(const std::string& producer_id, + bool is_enabled); + private: CHECKED_STATUS SendEncryptionRequest(const std::string& key_path, bool enable_encryption); diff --git a/ent/src/yb/tools/yb-admin_client_ent.cc b/ent/src/yb/tools/yb-admin_client_ent.cc index bfc2580d2007..30f2c80a2cca 100644 --- a/ent/src/yb/tools/yb-admin_client_ent.cc +++ b/ent/src/yb/tools/yb-admin_client_ent.cc @@ -615,6 +615,29 @@ Status ClusterAdminClient::DeleteUniverseReplication(const std::string& producer return Status::OK(); } +CHECKED_STATUS ClusterAdminClient::SetUniverseReplicationEnabled(const std::string& producer_id, + bool is_enabled) { + master::SetUniverseReplicationEnabledRequestPB req; + master::SetUniverseReplicationEnabledResponsePB resp; + req.set_producer_id(producer_id); + req.set_is_enabled(is_enabled); + const string toggle = (is_enabled ? "enabl" : "disabl"); + + RpcController rpc; + rpc.set_timeout(timeout_); + master_proxy_->SetUniverseReplicationEnabled(req, &resp, &rpc); + + if (resp.has_error()) { + cout << "Error " << toggle << "ing " + << "universe replication: " << resp.error().status().message() << endl; + return StatusFromPB(resp.error().status()); + } + + cout << "Replication " << toggle << "ed successfully" << endl; + return Status::OK(); +} + + } // namespace enterprise } // namespace tools } // namespace yb diff --git a/ent/src/yb/tserver/cdc_consumer.cc b/ent/src/yb/tserver/cdc_consumer.cc index b8d7d041f12e..a826361542cf 100644 --- a/ent/src/yb/tserver/cdc_consumer.cc +++ b/ent/src/yb/tserver/cdc_consumer.cc @@ -150,6 +150,9 @@ void CDCConsumer::UpdateInMemoryState(const cdc::ConsumerRegistryPB* consumer_re for (const auto& producer_map : DCHECK_NOTNULL(consumer_registry)->producer_map()) { const auto& producer_entry_pb = producer_map.second; proxy_manager_->UpdateProxies(producer_entry_pb); + if (producer_entry_pb.disable_stream()) { + continue; + } for (const auto& stream_entry : producer_entry_pb.stream_map()) { const auto& stream_entry_pb = stream_entry.second; for (const auto& tablet_entry : stream_entry_pb.consumer_producer_tablet_map()) { diff --git a/src/yb/cdc/cdc_consumer.proto b/src/yb/cdc/cdc_consumer.proto index fc42ed33df5e..f9353ef33856 100644 --- a/src/yb/cdc/cdc_consumer.proto +++ b/src/yb/cdc/cdc_consumer.proto @@ -54,6 +54,7 @@ message ProducerEntryPB { map stream_map = 1; repeated HostPortPB master_addrs = 2; repeated HostPortPB tserver_addrs = 3; + bool disable_stream = 4; // [default = false] implicit in proto3 } message ConsumerRegistryPB { diff --git a/src/yb/master/master.proto b/src/yb/master/master.proto index efb0c52ce51f..dca2098b24e0 100644 --- a/src/yb/master/master.proto +++ b/src/yb/master/master.proto @@ -406,6 +406,8 @@ message SysUniverseReplicationEntryPB { // creating CDC streams, starting subscribers. If any of these fail, we set the universe // replication state to FAILED. FAILED = 3; + // Disabled. + DISABLED = 6; // Deleted. DELETED = 4; // Error while cleaning up state of deleted entry. This indicates that universe replication has @@ -1556,6 +1558,16 @@ message DeleteUniverseReplicationResponsePB { optional MasterErrorPB error = 1; } + +message SetUniverseReplicationEnabledRequestPB { + optional string producer_id = 1; + optional bool is_enabled = 2; +} + +message SetUniverseReplicationEnabledResponsePB { + optional MasterErrorPB error = 1; +} + message GetUniverseReplicationRequestPB { optional string producer_id = 1; } @@ -1669,6 +1681,8 @@ service MasterService { returns (SetupUniverseReplicationResponsePB); rpc DeleteUniverseReplication(DeleteUniverseReplicationRequestPB) returns (DeleteUniverseReplicationResponsePB); + rpc SetUniverseReplicationEnabled(SetUniverseReplicationEnabledRequestPB) + returns (SetUniverseReplicationEnabledResponsePB); rpc GetUniverseReplication(GetUniverseReplicationRequestPB) returns (GetUniverseReplicationResponsePB); } diff --git a/src/yb/master/master_service.cc b/src/yb/master/master_service.cc index b19eda0b1c3b..93e2244cda4d 100644 --- a/src/yb/master/master_service.cc +++ b/src/yb/master/master_service.cc @@ -718,6 +718,13 @@ void MasterServiceImpl::DeleteUniverseReplication(const DeleteUniverseReplicatio HandleIn(req, resp, &rpc, &enterprise::CatalogManager::DeleteUniverseReplication); } +void MasterServiceImpl::SetUniverseReplicationEnabled( + const SetUniverseReplicationEnabledRequestPB* req, + SetUniverseReplicationEnabledResponsePB* resp, + rpc::RpcContext rpc) { + HandleIn(req, resp, &rpc, &enterprise::CatalogManager::SetUniverseReplicationEnabled); +} + void MasterServiceImpl::GetUniverseReplication(const GetUniverseReplicationRequestPB* req, GetUniverseReplicationResponsePB* resp, rpc::RpcContext rpc) { diff --git a/src/yb/master/master_service.h b/src/yb/master/master_service.h index 852ff7146e96..cfd491c6db12 100644 --- a/src/yb/master/master_service.h +++ b/src/yb/master/master_service.h @@ -252,6 +252,10 @@ class MasterServiceImpl : public MasterServiceIf, DeleteUniverseReplicationResponsePB* resp, rpc::RpcContext rpc) override; + void SetUniverseReplicationEnabled(const SetUniverseReplicationEnabledRequestPB* req, + SetUniverseReplicationEnabledResponsePB* resp, + rpc::RpcContext rpc) override; + void GetUniverseReplication(const GetUniverseReplicationRequestPB* req, GetUniverseReplicationResponsePB* resp, rpc::RpcContext rpc) override;