diff --git a/ent/src/yb/integration-tests/CMakeLists-include.txt b/ent/src/yb/integration-tests/CMakeLists-include.txt index ee2ca5cbf3d0..b36eb5196349 100644 --- a/ent/src/yb/integration-tests/CMakeLists-include.txt +++ b/ent/src/yb/integration-tests/CMakeLists-include.txt @@ -24,6 +24,7 @@ string(REPLACE ${CMAKE_SOURCE_DIR} ${CMAKE_SOURCE_DIR}/ent set(INTEGRATION_TESTS_SRCS_EXTENSIONS ${YB_ENT_CURRENT_SOURCE_DIR}/external_mini_cluster_ent.cc + ${YB_ENT_CURRENT_SOURCE_DIR}/twodc_test_base.cc PARENT_SCOPE) # Additional tests support. @@ -41,5 +42,6 @@ set(INTEGRATION_TESTS_EXTENSIONS_TESTS encryption-test cdc_service-int-test cdc_service-txn-test + twodc_ysql-test twodc-test PARENT_SCOPE) diff --git a/ent/src/yb/integration-tests/twodc-test.cc b/ent/src/yb/integration-tests/twodc-test.cc index eea9b90c22ea..1f6814eeec3c 100644 --- a/ent/src/yb/integration-tests/twodc-test.cc +++ b/ent/src/yb/integration-tests/twodc-test.cc @@ -44,6 +44,7 @@ #include "yb/gutil/strings/substitute.h" #include "yb/integration-tests/cdc_test_util.h" #include "yb/integration-tests/mini_cluster.h" +#include "yb/integration-tests/twodc_test_base.h" #include "yb/integration-tests/yb_mini_cluster_test_base.h" #include "yb/master/mini_master.h" #include "yb/master/master.h" @@ -68,18 +69,13 @@ using namespace std::literals; DECLARE_int32(replication_factor); -DECLARE_int32(cdc_read_rpc_timeout_ms); -DECLARE_int32(cdc_write_rpc_timeout_ms); DECLARE_bool(TEST_twodc_write_hybrid_time); DECLARE_int32(cdc_wal_retention_time_secs); -DECLARE_bool(TEST_check_broadcast_address); DECLARE_int32(replication_failure_delay_exponent); DECLARE_double(TEST_respond_write_failed_probability); DECLARE_int32(cdc_max_apply_batch_num_records); DECLARE_int32(async_replication_idle_delay_ms); DECLARE_int32(async_replication_max_idle_wait); -DECLARE_bool(flush_rocksdb_on_shutdown); -DECLARE_bool(cdc_enable_replicate_intents); namespace yb { @@ -101,11 +97,7 @@ using tserver::enterprise::CDCConsumer; namespace enterprise { -constexpr int kRpcTimeout = NonTsanVsTsan(30, 120); -static const std::string kUniverseId = "test_universe"; -static const std::string kNamespaceName = "test_namespace"; - -class TwoDCTest : public YBTest, public testing::WithParamInterface { +class TwoDCTest : public TwoDCTestBase, public testing::WithParamInterface { public: Result>> SetUpWithParams( const std::vector& num_consumer_tablets, @@ -113,36 +105,29 @@ class TwoDCTest : public YBTest, public testing::WithParamInterface { uint32_t replication_factor, uint32_t num_masters = 1) { FLAGS_enable_ysql = false; - // Allow for one-off network instability by ensuring a single CDC RPC timeout << test timeout. - FLAGS_cdc_read_rpc_timeout_ms = (kRpcTimeout / 4) * 1000; - FLAGS_cdc_write_rpc_timeout_ms = (kRpcTimeout / 4) * 1000; - // Not a useful test for us. It's testing Public+Private IP NW errors and we're only public - FLAGS_TEST_check_broadcast_address = false; FLAGS_cdc_max_apply_batch_num_records = GetParam(); - FLAGS_cdc_enable_replicate_intents = true; - FLAGS_flush_rocksdb_on_shutdown = false; + TwoDCTestBase::SetUp(); - YBTest::SetUp(); MiniClusterOptions opts; opts.num_tablet_servers = replication_factor; opts.num_masters = num_masters; FLAGS_replication_factor = replication_factor; opts.cluster_id = "producer"; - producer_cluster_ = std::make_unique(Env::Default(), opts); - RETURN_NOT_OK(producer_cluster_->StartSync()); - RETURN_NOT_OK(producer_cluster_->WaitForTabletServerCount(replication_factor)); + producer_cluster_.mini_cluster_ = std::make_unique(Env::Default(), opts); + RETURN_NOT_OK(producer_cluster()->StartSync()); + RETURN_NOT_OK(producer_cluster()->WaitForTabletServerCount(replication_factor)); opts.cluster_id = "consumer"; - consumer_cluster_ = std::make_unique(Env::Default(), opts); - RETURN_NOT_OK(consumer_cluster_->StartSync()); - RETURN_NOT_OK(consumer_cluster_->WaitForTabletServerCount(replication_factor)); + consumer_cluster_.mini_cluster_ = std::make_unique(Env::Default(), opts); + RETURN_NOT_OK(consumer_cluster()->StartSync()); + RETURN_NOT_OK(consumer_cluster()->WaitForTabletServerCount(replication_factor)); - producer_client_ = VERIFY_RESULT(producer_cluster_->CreateClient()); - consumer_client_ = VERIFY_RESULT(consumer_cluster_->CreateClient()); + producer_cluster_.client_ = VERIFY_RESULT(producer_cluster()->CreateClient()); + consumer_cluster_.client_ = VERIFY_RESULT(consumer_cluster()->CreateClient()); RETURN_NOT_OK(clock_->Init()); - producer_txn_mgr_.emplace(producer_client_.get(), clock_, client::LocalTabletFilter()); - consumer_txn_mgr_.emplace(consumer_client_.get(), clock_, client::LocalTabletFilter()); + producer_cluster_.txn_mgr_.emplace(producer_client(), clock_, client::LocalTabletFilter()); + consumer_cluster_.txn_mgr_.emplace(consumer_client(), clock_, client::LocalTabletFilter()); YBSchemaBuilder b; b.AddColumn("c0")->Type(INT32)->NotNull()->HashPrimaryKey(); @@ -167,15 +152,15 @@ class TwoDCTest : public YBTest, public testing::WithParamInterface { std::vector tables; std::vector> yb_tables; for (int i = 0; i < num_consumer_tablets.size(); i++) { - RETURN_NOT_OK(CreateTable(i, num_producer_tablets[i], producer_client_.get(), &tables)); + RETURN_NOT_OK(CreateTable(i, num_producer_tablets[i], producer_client(), &tables)); std::shared_ptr producer_table; - RETURN_NOT_OK(producer_client_->OpenTable(tables[i * 2], &producer_table)); + RETURN_NOT_OK(producer_client()->OpenTable(tables[i * 2], &producer_table)); yb_tables.push_back(producer_table); - RETURN_NOT_OK(CreateTable(i, num_consumer_tablets[i], consumer_client_.get(), + RETURN_NOT_OK(CreateTable(i, num_consumer_tablets[i], consumer_client(), consumer_schema, &tables)); std::shared_ptr consumer_table; - RETURN_NOT_OK(consumer_client_->OpenTable(tables[(i * 2) + 1], &consumer_table)); + RETURN_NOT_OK(consumer_client()->OpenTable(tables[(i * 2) + 1], &consumer_table)); yb_tables.push_back(consumer_table); } @@ -218,126 +203,6 @@ class TwoDCTest : public YBTest, public testing::WithParamInterface { return Status::OK(); } - Status SetupUniverseReplication( - MiniCluster* producer_cluster, MiniCluster* consumer_cluster, YBClient* consumer_client, - const std::string& universe_id, const std::vector>& tables, - bool leader_only = true) { - master::SetupUniverseReplicationRequestPB req; - master::SetupUniverseReplicationResponsePB resp; - - req.set_producer_id(universe_id); - string master_addr = producer_cluster->GetMasterAddresses(); - if (leader_only) master_addr = producer_cluster->leader_mini_master()->bound_rpc_addr_str(); - auto hp_vec = VERIFY_RESULT(HostPort::ParseStrings(master_addr, 0)); - HostPortsToPBs(hp_vec, req.mutable_producer_master_addresses()); - - req.mutable_producer_table_ids()->Reserve(tables.size()); - for (const auto& table : tables) { - req.add_producer_table_ids(table->id()); - } - - auto master_proxy = std::make_shared( - &consumer_client->proxy_cache(), - consumer_cluster->leader_mini_master()->bound_rpc_addr()); - - rpc::RpcController rpc; - rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); - RETURN_NOT_OK(master_proxy->SetupUniverseReplication(req, &resp, &rpc)); - if (resp.has_error()) { - return STATUS(IllegalState, "Failed setting up universe replication"); - } - return Status::OK(); - } - - Status VerifyUniverseReplication( - MiniCluster* consumer_cluster, YBClient* consumer_client, - const std::string& universe_id, master::GetUniverseReplicationResponsePB* resp) { - return LoggedWaitFor([=]() -> Result { - master::GetUniverseReplicationRequestPB req; - req.set_producer_id(universe_id); - resp->Clear(); - - auto master_proxy = std::make_shared( - &consumer_client->proxy_cache(), - consumer_cluster->leader_mini_master()->bound_rpc_addr()); - rpc::RpcController rpc; - rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); - - Status s = master_proxy->GetUniverseReplication(req, resp, &rpc); - return s.ok() && !resp->has_error() && - resp->entry().state() == master::SysUniverseReplicationEntryPB::ACTIVE; - }, MonoDelta::FromSeconds(kRpcTimeout), "Verify universe replication"); - } - - Status ToggleUniverseReplication( - MiniCluster* consumer_cluster, YBClient* consumer_client, - const std::string& universe_id, bool is_enabled) { - master::SetUniverseReplicationEnabledRequestPB req; - master::SetUniverseReplicationEnabledResponsePB resp; - - req.set_producer_id(universe_id); - req.set_is_enabled(is_enabled); - - auto master_proxy = std::make_shared( - &consumer_client->proxy_cache(), - consumer_cluster->leader_mini_master()->bound_rpc_addr()); - - rpc::RpcController rpc; - rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); - RETURN_NOT_OK(master_proxy->SetUniverseReplicationEnabled(req, &resp, &rpc)); - if (resp.has_error()) { - return StatusFromPB(resp.error().status()); - } - return Status::OK(); - } - - Status VerifyUniverseReplicationDeleted(MiniCluster* consumer_cluster, YBClient* consumer_client, - const std::string& universe_id, int timeout) { - return LoggedWaitFor([=]() -> Result { - master::GetUniverseReplicationRequestPB req; - master::GetUniverseReplicationResponsePB resp; - req.set_producer_id(universe_id); - - auto master_proxy = std::make_shared( - &consumer_client->proxy_cache(), - consumer_cluster->leader_mini_master()->bound_rpc_addr()); - rpc::RpcController rpc; - rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); - - Status s = master_proxy->GetUniverseReplication(req, &resp, &rpc); - return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND; - }, MonoDelta::FromMilliseconds(timeout), "Verify universe replication deleted"); - } - - Status GetCDCStreamForTable( - const std::string& table_id, master::ListCDCStreamsResponsePB* resp) { - return LoggedWaitFor([=]() -> Result { - master::ListCDCStreamsRequestPB req; - req.set_table_id(table_id); - resp->Clear(); - - Status s = producer_cluster_->leader_mini_master()->master()->catalog_manager()-> - ListCDCStreams(&req, resp); - return s.ok() && !resp->has_error() && resp->streams_size() == 1; - }, MonoDelta::FromSeconds(kRpcTimeout), "Get CDC stream for table"); - } - - void Destroy() { - LOG(INFO) << "Destroying CDC Clusters"; - if (consumer_cluster_) { - consumer_cluster_->Shutdown(); - consumer_cluster_.reset(); - } - - if (producer_cluster_) { - producer_cluster_->Shutdown(); - producer_cluster_.reset(); - } - - producer_client_.reset(); - consumer_client_.reset(); - } - void WriteWorkload(uint32_t start, uint32_t end, YBClient* client, const YBTableName& table, bool delete_op = false) { auto session = client->NewSession(); @@ -355,19 +220,6 @@ class TwoDCTest : public YBTest, public testing::WithParamInterface { } } - uint32_t GetSuccessfulWriteOps(MiniCluster* cluster) { - uint32_t size = 0; - for (const auto& mini_tserver : cluster->mini_tablet_servers()) { - auto* tserver = dynamic_cast( - mini_tserver->server()); - CDCConsumer* cdc_consumer; - if (tserver && (cdc_consumer = tserver->GetCDCConsumer())) { - size += cdc_consumer->GetNumSuccessfulWriteRpcs(); - } - } - return size; - } - client::YBTransactionPtr CreateTransaction(client::TransactionManager* txn_mgr) { auto result = std::make_shared(txn_mgr); ReadHybridTime read_time; @@ -412,12 +264,11 @@ class TwoDCTest : public YBTest, public testing::WithParamInterface { return result; } - Status VerifyWrittenRecords(const YBTableName& producer_table, const YBTableName& consumer_table) { return LoggedWaitFor([=]() -> Result { - auto producer_results = ScanToStrings(producer_table, producer_client_.get()); - auto consumer_results = ScanToStrings(consumer_table, consumer_client_.get()); + auto producer_results = ScanToStrings(producer_table, producer_client()); + auto consumer_results = ScanToStrings(consumer_table, consumer_client()); return producer_results == consumer_results; }, MonoDelta::FromSeconds(kRpcTimeout), "Verify written records"); } @@ -429,96 +280,7 @@ class TwoDCTest : public YBTest, public testing::WithParamInterface { }, MonoDelta::FromSeconds(kRpcTimeout), "Verify number of records"); } - Status DeleteUniverseReplication(const std::string& universe_id) { - return DeleteUniverseReplication(universe_id, consumer_client(), consumer_cluster()); - } - - Status DeleteUniverseReplication( - const std::string& universe_id, YBClient* client, MiniCluster* cluster) { - master::DeleteUniverseReplicationRequestPB req; - master::DeleteUniverseReplicationResponsePB resp; - - req.set_producer_id(universe_id); - - auto master_proxy = std::make_shared( - &client->proxy_cache(), - cluster->leader_mini_master()->bound_rpc_addr()); - - rpc::RpcController rpc; - rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); - RETURN_NOT_OK(master_proxy->DeleteUniverseReplication(req, &resp, &rpc)); - LOG(INFO) << "Delete universe succeeded"; - return Status::OK(); - } - - YBClient* producer_client() { - return producer_client_.get(); - } - - YBClient* consumer_client() { - return consumer_client_.get(); - } - - MiniCluster* producer_cluster() { - return producer_cluster_.get(); - } - - MiniCluster* consumer_cluster() { - return consumer_cluster_.get(); - } - - client::TransactionManager* producer_txn_mgr() { - return producer_txn_mgr_.get_ptr(); - } - - client::TransactionManager* consumer_txn_mgr() { - return consumer_txn_mgr_.get_ptr(); - } - - uint32_t NumProducerTabletsPolled(MiniCluster* cluster) { - uint32_t size = 0; - for (const auto& mini_tserver : cluster->mini_tablet_servers()) { - uint32_t new_size = 0; - auto* tserver = dynamic_cast( - mini_tserver->server()); - CDCConsumer* cdc_consumer; - if (tserver && (cdc_consumer = tserver->GetCDCConsumer())) { - auto tablets_running = cdc_consumer->TEST_producer_tablets_running(); - new_size = tablets_running.size(); - } - size += new_size; - } - return size; - } - - Status CorrectlyPollingAllTablets(MiniCluster* cluster, uint32_t num_producer_tablets) { - return LoggedWaitFor([=]() -> Result { - static int i = 0; - constexpr int kNumIterationsWithCorrectResult = 5; - auto cur_tablets = NumProducerTabletsPolled(cluster); - if (cur_tablets == num_producer_tablets) { - if (i++ == kNumIterationsWithCorrectResult) { - i = 0; - return true; - } - } else { - i = 0; - } - LOG(INFO) << "Tablets being polled: " << cur_tablets; - return false; - }, MonoDelta::FromSeconds(kRpcTimeout), "Num producer tablets being polled"); - } - - std::unique_ptr producer_cluster_; - std::unique_ptr consumer_cluster_; - private: - - std::unique_ptr producer_client_; - std::unique_ptr consumer_client_; - - boost::optional producer_txn_mgr_; - boost::optional consumer_txn_mgr_; server::ClockPtr clock_{new server::HybridClock()}; YBSchema schema_; @@ -773,19 +535,19 @@ TEST_P(TwoDCTest, PollWithConsumerRestart) { // After creating the cluster, make sure all tablets being polled for. ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); - consumer_cluster_->mini_tablet_server(0)->Shutdown(); + consumer_cluster()->mini_tablet_server(0)->Shutdown(); // After shutting down a single consumer node, the other consumers should pick up the slack. if (replication_factor > 1) { ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); } - ASSERT_OK(consumer_cluster_->mini_tablet_server(0)->Start()); + ASSERT_OK(consumer_cluster()->mini_tablet_server(0)->Start()); // After restarting the node. ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); - ASSERT_OK(consumer_cluster_->RestartSync()); + ASSERT_OK(consumer_cluster()->RestartSync()); // After consumer cluster restart. ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); @@ -820,7 +582,7 @@ TEST_P(TwoDCTest, PollWithProducerNodesRestart) { ASSERT_OK(producer_cluster()->WaitForAllTabletServers()); // Stop a TServer on the Producer after failing its master. - producer_cluster_->mini_tablet_server(0)->Shutdown(); + producer_cluster()->mini_tablet_server(0)->Shutdown(); // This Verifies: // 1. Consumer successfully transitions over to using the new master for Tablet lookup. // 2. Consumer cluster has rebalanced all the CDC Pollers @@ -830,7 +592,7 @@ TEST_P(TwoDCTest, PollWithProducerNodesRestart) { // Restart the Producer TServer and verify that rebalancing happens. ASSERT_OK(old_master->Start()); - ASSERT_OK(producer_cluster_->mini_tablet_server(0)->Start()); + ASSERT_OK(producer_cluster()->mini_tablet_server(0)->Start()); ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); WriteWorkload(6, 10, producer_client(), tables[0]->name()); ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); @@ -856,7 +618,7 @@ TEST_P(TwoDCTest, PollWithProducerClusterRestart) { ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); // Restart the ENTIRE Producer cluster. - ASSERT_OK(producer_cluster_->RestartSync()); + ASSERT_OK(producer_cluster()->RestartSync()); // After producer cluster restart. ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 4)); @@ -1503,8 +1265,8 @@ TEST_P(TwoDCTest, TestProducerUniverseExpansion) { // Add new node and wait for tablets to be rebalanced. // After rebalancing, each node will be leader for 1 tablet. - ASSERT_OK(producer_cluster_->AddTabletServer()); - ASSERT_OK(producer_cluster_->WaitForTabletServerCount(2)); + ASSERT_OK(producer_cluster()->AddTabletServer()); + ASSERT_OK(producer_cluster()->WaitForTabletServerCount(2)); ASSERT_OK(WaitFor([&] () { return producer_client()->IsLoadBalanced(2); }, MonoDelta::FromSeconds(kRpcTimeout), "IsLoadBalanced")); @@ -1600,7 +1362,7 @@ TEST_P(TwoDCTest, TestInsertDeleteWorkloadWithRestart) { // Verify that both clusters have the same records. ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); - ASSERT_OK(consumer_cluster_->RestartSync()); + ASSERT_OK(consumer_cluster()->RestartSync()); // Verify that both clusters have the same records. ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); diff --git a/ent/src/yb/integration-tests/twodc_test_base.cc b/ent/src/yb/integration-tests/twodc_test_base.cc new file mode 100644 index 000000000000..b15ff5eb0f82 --- /dev/null +++ b/ent/src/yb/integration-tests/twodc_test_base.cc @@ -0,0 +1,235 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/integration-tests/twodc_test_base.h" + +#include + +#include "yb/cdc/cdc_service.h" +#include "yb/client/client.h" + +#include "yb/integration-tests/cdc_test_util.h" +#include "yb/integration-tests/mini_cluster.h" +#include "yb/master/catalog_manager.h" +#include "yb/master/master.h" +#include "yb/master/mini_master.h" +#include "yb/tserver/cdc_consumer.h" +#include "yb/tserver/tablet_server.h" +#include "yb/util/test_util.h" +#include "yb/yql/pgwrapper/libpq_utils.h" +#include "yb/yql/pgwrapper/pg_wrapper.h" + +namespace yb { + +using client::YBClient; +using tserver::enterprise::CDCConsumer; + +namespace enterprise { + +void TwoDCTestBase::Destroy() { + LOG(INFO) << "Destroying CDC Clusters"; + if (consumer_cluster()) { + if (consumer_cluster_.pg_supervisor_) { + consumer_cluster_.pg_supervisor_->Stop(); + } + consumer_cluster_.mini_cluster_->Shutdown(); + consumer_cluster_.mini_cluster_.reset(); + } + + if (producer_cluster()) { + if (producer_cluster_.pg_supervisor_) { + producer_cluster_.pg_supervisor_->Stop(); + } + producer_cluster_.mini_cluster_->Shutdown(); + producer_cluster_.mini_cluster_.reset(); + } + + producer_cluster_.client_.reset(); + producer_cluster_.client_.reset(); +} + +Status TwoDCTestBase::SetupUniverseReplication( + MiniCluster* producer_cluster, MiniCluster* consumer_cluster, YBClient* consumer_client, + const std::string& universe_id, const std::vector>& tables, + bool leader_only) { + master::SetupUniverseReplicationRequestPB req; + master::SetupUniverseReplicationResponsePB resp; + + req.set_producer_id(universe_id); + string master_addr = producer_cluster->GetMasterAddresses(); + if (leader_only) master_addr = producer_cluster->leader_mini_master()->bound_rpc_addr_str(); + auto hp_vec = VERIFY_RESULT(HostPort::ParseStrings(master_addr, 0)); + HostPortsToPBs(hp_vec, req.mutable_producer_master_addresses()); + + req.mutable_producer_table_ids()->Reserve(tables.size()); + for (const auto& table : tables) { + req.add_producer_table_ids(table->id()); + } + + auto master_proxy = std::make_shared( + &consumer_client->proxy_cache(), + consumer_cluster->leader_mini_master()->bound_rpc_addr()); + + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + RETURN_NOT_OK(master_proxy->SetupUniverseReplication(req, &resp, &rpc)); + if (resp.has_error()) { + return STATUS(IllegalState, "Failed setting up universe replication"); + } + return Status::OK(); +} + +Status TwoDCTestBase::VerifyUniverseReplication( + MiniCluster* consumer_cluster, YBClient* consumer_client, + const std::string& universe_id, master::GetUniverseReplicationResponsePB* resp) { + return LoggedWaitFor([=]() -> Result { + master::GetUniverseReplicationRequestPB req; + req.set_producer_id(universe_id); + resp->Clear(); + + auto master_proxy = std::make_shared( + &consumer_client->proxy_cache(), + consumer_cluster->leader_mini_master()->bound_rpc_addr()); + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + + Status s = master_proxy->GetUniverseReplication(req, resp, &rpc); + return s.ok() && !resp->has_error() && + resp->entry().state() == master::SysUniverseReplicationEntryPB::ACTIVE; + }, MonoDelta::FromSeconds(kRpcTimeout), "Verify universe replication"); +} + +Status TwoDCTestBase::ToggleUniverseReplication( + MiniCluster* consumer_cluster, YBClient* consumer_client, + const std::string& universe_id, bool is_enabled) { + master::SetUniverseReplicationEnabledRequestPB req; + master::SetUniverseReplicationEnabledResponsePB resp; + + req.set_producer_id(universe_id); + req.set_is_enabled(is_enabled); + + auto master_proxy = std::make_shared( + &consumer_client->proxy_cache(), + consumer_cluster->leader_mini_master()->bound_rpc_addr()); + + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + RETURN_NOT_OK(master_proxy->SetUniverseReplicationEnabled(req, &resp, &rpc)); + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + return Status::OK(); +} + +Status TwoDCTestBase::VerifyUniverseReplicationDeleted(MiniCluster* consumer_cluster, + YBClient* consumer_client, const std::string& universe_id, int timeout) { + return LoggedWaitFor([=]() -> Result { + master::GetUniverseReplicationRequestPB req; + master::GetUniverseReplicationResponsePB resp; + req.set_producer_id(universe_id); + + auto master_proxy = std::make_shared( + &consumer_client->proxy_cache(), + consumer_cluster->leader_mini_master()->bound_rpc_addr()); + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + + Status s = master_proxy->GetUniverseReplication(req, &resp, &rpc); + return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND; + }, MonoDelta::FromMilliseconds(timeout), "Verify universe replication deleted"); +} + +Status TwoDCTestBase::GetCDCStreamForTable( + const std::string& table_id, master::ListCDCStreamsResponsePB* resp) { + return LoggedWaitFor([=]() -> Result { + master::ListCDCStreamsRequestPB req; + req.set_table_id(table_id); + resp->Clear(); + + Status s = producer_cluster()->leader_mini_master()->master()->catalog_manager()-> + ListCDCStreams(&req, resp); + return s.ok() && !resp->has_error() && resp->streams_size() == 1; + }, MonoDelta::FromSeconds(kRpcTimeout), "Get CDC stream for table"); +} + +uint32_t TwoDCTestBase::GetSuccessfulWriteOps(MiniCluster* cluster) { + uint32_t size = 0; + for (const auto& mini_tserver : cluster->mini_tablet_servers()) { + auto* tserver = dynamic_cast(mini_tserver->server()); + CDCConsumer* cdc_consumer; + if (tserver && (cdc_consumer = tserver->GetCDCConsumer())) { + size += cdc_consumer->GetNumSuccessfulWriteRpcs(); + } + } + return size; +} + +Status TwoDCTestBase::DeleteUniverseReplication(const std::string& universe_id) { + return DeleteUniverseReplication(universe_id, consumer_client(), consumer_cluster()); +} + +Status TwoDCTestBase::DeleteUniverseReplication( + const std::string& universe_id, YBClient* client, MiniCluster* cluster) { + master::DeleteUniverseReplicationRequestPB req; + master::DeleteUniverseReplicationResponsePB resp; + + req.set_producer_id(universe_id); + + auto master_proxy = std::make_shared( + &client->proxy_cache(), + cluster->leader_mini_master()->bound_rpc_addr()); + + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + RETURN_NOT_OK(master_proxy->DeleteUniverseReplication(req, &resp, &rpc)); + LOG(INFO) << "Delete universe succeeded"; + return Status::OK(); +} + +uint32_t TwoDCTestBase::NumProducerTabletsPolled(MiniCluster* cluster) { + uint32_t size = 0; + for (const auto& mini_tserver : cluster->mini_tablet_servers()) { + uint32_t new_size = 0; + auto* tserver = dynamic_cast( + mini_tserver->server()); + CDCConsumer* cdc_consumer; + if (tserver && (cdc_consumer = tserver->GetCDCConsumer())) { + auto tablets_running = cdc_consumer->TEST_producer_tablets_running(); + new_size = tablets_running.size(); + } + size += new_size; + } + return size; +} + +Status TwoDCTestBase::CorrectlyPollingAllTablets( + MiniCluster* cluster, uint32_t num_producer_tablets) { + return LoggedWaitFor([=]() -> Result { + static int i = 0; + constexpr int kNumIterationsWithCorrectResult = 5; + auto cur_tablets = NumProducerTabletsPolled(cluster); + if (cur_tablets == num_producer_tablets) { + if (i++ == kNumIterationsWithCorrectResult) { + i = 0; + return true; + } + } else { + i = 0; + } + LOG(INFO) << "Tablets being polled: " << cur_tablets; + return false; + }, MonoDelta::FromSeconds(kRpcTimeout), "Num producer tablets being polled"); +} + +} // namespace enterprise +} // namespace yb diff --git a/ent/src/yb/integration-tests/twodc_test_base.h b/ent/src/yb/integration-tests/twodc_test_base.h new file mode 100644 index 000000000000..7c9ded8eafe7 --- /dev/null +++ b/ent/src/yb/integration-tests/twodc_test_base.h @@ -0,0 +1,139 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#ifndef ENT_SRC_YB_INTEGRATION_TESTS_TWODC_TEST_BASE_H +#define ENT_SRC_YB_INTEGRATION_TESTS_TWODC_TEST_BASE_H + +#include + +#include "yb/client/client.h" + +#include "yb/integration-tests/cdc_test_util.h" +#include "yb/integration-tests/mini_cluster.h" +#include "yb/master/master.h" + +#include "yb/util/test_util.h" +#include "yb/yql/pgwrapper/libpq_utils.h" +#include "yb/yql/pgwrapper/pg_wrapper.h" + +DECLARE_int32(cdc_read_rpc_timeout_ms); +DECLARE_int32(cdc_write_rpc_timeout_ms); +DECLARE_bool(TEST_check_broadcast_address); +DECLARE_bool(flush_rocksdb_on_shutdown); +DECLARE_bool(cdc_enable_replicate_intents); + +namespace yb { + +using client::YBClient; + +namespace enterprise { + +constexpr int kRpcTimeout = NonTsanVsTsan(30, 120); +static const std::string kUniverseId = "test_universe"; +static const std::string kNamespaceName = "test_namespace"; + +class TwoDCTestBase : public YBTest { + public: + class Cluster { + public: + std::unique_ptr mini_cluster_; + std::unique_ptr client_; + std::unique_ptr pg_supervisor_; + HostPort pg_host_port_; + boost::optional txn_mgr_; + + Result Connect() { + return pgwrapper::PGConn::Connect(pg_host_port_); + } + + Result ConnectToDB(const std::string& dbname) { + return pgwrapper::PGConn::Connect(pg_host_port_, dbname); + } + }; + + void SetUp() override { + YBTest::SetUp(); + // Allow for one-off network instability by ensuring a single CDC RPC timeout << test timeout. + FLAGS_cdc_read_rpc_timeout_ms = (kRpcTimeout / 4) * 1000; + FLAGS_cdc_write_rpc_timeout_ms = (kRpcTimeout / 4) * 1000; + // Not a useful test for us. It's testing Public+Private IP NW errors and we're only public + FLAGS_TEST_check_broadcast_address = false; + FLAGS_cdc_enable_replicate_intents = true; + FLAGS_flush_rocksdb_on_shutdown = false; + } + + void Destroy(); + + CHECKED_STATUS SetupUniverseReplication( + MiniCluster* producer_cluster, MiniCluster* consumer_cluster, YBClient* consumer_client, + const std::string& universe_id, const std::vector>& tables, + bool leader_only = true); + + CHECKED_STATUS VerifyUniverseReplication( + MiniCluster* consumer_cluster, YBClient* consumer_client, + const std::string& universe_id, master::GetUniverseReplicationResponsePB* resp); + + CHECKED_STATUS ToggleUniverseReplication( + MiniCluster* consumer_cluster, YBClient* consumer_client, + const std::string& universe_id, bool is_enabled); + + CHECKED_STATUS VerifyUniverseReplicationDeleted(MiniCluster* consumer_cluster, + YBClient* consumer_client, const std::string& universe_id, int timeout); + + CHECKED_STATUS GetCDCStreamForTable( + const std::string& table_id, master::ListCDCStreamsResponsePB* resp); + + uint32_t GetSuccessfulWriteOps(MiniCluster* cluster); + + CHECKED_STATUS DeleteUniverseReplication(const std::string& universe_id); + + CHECKED_STATUS DeleteUniverseReplication( + const std::string& universe_id, YBClient* client, MiniCluster* cluster); + + uint32_t NumProducerTabletsPolled(MiniCluster* cluster); + + CHECKED_STATUS CorrectlyPollingAllTablets(MiniCluster* cluster, uint32_t num_producer_tablets); + + YBClient* producer_client() { + return producer_cluster_.client_.get(); + } + + YBClient* consumer_client() { + return consumer_cluster_.client_.get(); + } + + MiniCluster* producer_cluster() { + return producer_cluster_.mini_cluster_.get(); + } + + MiniCluster* consumer_cluster() { + return consumer_cluster_.mini_cluster_.get(); + } + + client::TransactionManager* producer_txn_mgr() { + return producer_cluster_.txn_mgr_.get_ptr(); + } + + client::TransactionManager* consumer_txn_mgr() { + return consumer_cluster_.txn_mgr_.get_ptr(); + } + + protected: + Cluster producer_cluster_; + Cluster consumer_cluster_; +}; + +} // namespace enterprise +} // namespace yb + +#endif // ENT_SRC_YB_INTEGRATION_TESTS_TWODC_TEST_BASE_H diff --git a/ent/src/yb/integration-tests/twodc_ysql-test.cc b/ent/src/yb/integration-tests/twodc_ysql-test.cc new file mode 100644 index 000000000000..bd1b1dbf6caf --- /dev/null +++ b/ent/src/yb/integration-tests/twodc_ysql-test.cc @@ -0,0 +1,825 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "yb/common/common.pb.h" +#include "yb/common/entity_ids.h" +#include "yb/common/wire_protocol.h" +#include "yb/common/schema.h" + +#include "yb/cdc/cdc_service.h" +#include "yb/cdc/cdc_service.pb.h" +#include "yb/cdc/cdc_service.proxy.h" +#include "yb/client/client.h" +#include "yb/client/client-test-util.h" +#include "yb/client/meta_cache.h" +#include "yb/client/schema.h" +#include "yb/client/session.h" +#include "yb/client/table.h" +#include "yb/client/table_alterer.h" +#include "yb/client/table_creator.h" +#include "yb/client/table_handle.h" +#include "yb/client/transaction.h" +#include "yb/client/yb_op.h" + +#include "yb/gutil/gscoped_ptr.h" +#include "yb/gutil/stl_util.h" +#include "yb/gutil/strings/join.h" +#include "yb/gutil/strings/substitute.h" +#include "yb/integration-tests/cdc_test_util.h" +#include "yb/integration-tests/mini_cluster.h" +#include "yb/integration-tests/twodc_test_base.h" +#include "yb/integration-tests/yb_mini_cluster_test_base.h" +#include "yb/master/catalog_entity_info.h" +#include "yb/master/catalog_manager.h" +#include "yb/master/mini_master.h" +#include "yb/master/master.h" +#include "yb/master/master.pb.h" +#include "yb/master/master-test-util.h" + +#include "yb/master/cdc_consumer_registry_service.h" +#include "yb/tablet/tablet.h" +#include "yb/tablet/tablet_peer.h" +#include "yb/tserver/mini_tablet_server.h" +#include "yb/tserver/tablet_server.h" +#include "yb/tserver/ts_tablet_manager.h" + +#include "yb/tserver/cdc_consumer.h" +#include "yb/util/atomic.h" +#include "yb/util/faststring.h" +#include "yb/util/format.h" +#include "yb/util/monotime.h" +#include "yb/util/random.h" +#include "yb/util/random_util.h" +#include "yb/util/result.h" +#include "yb/util/stopwatch.h" +#include "yb/util/test_util.h" +#include "yb/yql/pgwrapper/libpq_utils.h" +#include "yb/yql/pgwrapper/pg_wrapper.h" + +DECLARE_int32(replication_factor); +DECLARE_int32(cdc_max_apply_batch_num_records); +DECLARE_int32(client_read_write_timeout_ms); +DECLARE_int32(pgsql_proxy_webserver_port); +DECLARE_bool(master_auto_run_initdb); +DECLARE_bool(hide_pg_catalog_table_creation_logs); +DECLARE_int32(pggate_rpc_timeout_secs); + +namespace yb { + +using client::YBClient; +using client::YBClientBuilder; +using client::YBColumnSchema; +using client::YBError; +using client::YBSchema; +using client::YBSchemaBuilder; +using client::YBSession; +using client::YBTable; +using client::YBTableAlterer; +using client::YBTableCreator; +using client::YBTableType; +using client::YBTableName; +using master::GetNamespaceInfoResponsePB; +using master::MiniMaster; +using tserver::MiniTabletServer; +using tserver::enterprise::CDCConsumer; + +using pgwrapper::AsString; +using pgwrapper::GetInt32; +using pgwrapper::PGConn; +using pgwrapper::PGResultPtr; +using pgwrapper::PgSupervisor; + +namespace enterprise { + +constexpr static const char* const kKeyColumnName = "key"; + +class TwoDCYsqlTest : public TwoDCTestBase, public testing::WithParamInterface { + public: + Result>> + SetUpWithParams(std::vector num_consumer_tablets, + std::vector num_producer_tablets, + uint32_t replication_factor, + uint32_t num_masters = 1, + bool colocated = false) { + master::SetDefaultInitialSysCatalogSnapshotFlags(); + TwoDCTestBase::SetUp(); + FLAGS_enable_ysql = true; + FLAGS_master_auto_run_initdb = true; + FLAGS_hide_pg_catalog_table_creation_logs = true; + FLAGS_pggate_rpc_timeout_secs = 120; + FLAGS_cdc_max_apply_batch_num_records = GetParam(); + + MiniClusterOptions opts; + opts.num_tablet_servers = replication_factor; + opts.num_masters = num_masters; + FLAGS_replication_factor = replication_factor; + opts.cluster_id = "producer"; + producer_cluster_.mini_cluster_ = std::make_unique(env_.get(), opts); + RETURN_NOT_OK(producer_cluster()->StartSync()); + RETURN_NOT_OK(producer_cluster()->WaitForTabletServerCount(replication_factor)); + RETURN_NOT_OK(WaitForInitDb(producer_cluster())); + + opts.cluster_id = "consumer"; + consumer_cluster_.mini_cluster_ = std::make_unique(env_.get(), opts); + RETURN_NOT_OK(consumer_cluster()->StartSync()); + RETURN_NOT_OK(consumer_cluster()->WaitForTabletServerCount(replication_factor)); + RETURN_NOT_OK(WaitForInitDb(consumer_cluster())); + + producer_cluster_.client_ = VERIFY_RESULT(producer_cluster()->CreateClient()); + consumer_cluster_.client_ = VERIFY_RESULT(consumer_cluster()->CreateClient()); + + RETURN_NOT_OK(InitPostgres(&producer_cluster_)); + RETURN_NOT_OK(InitPostgres(&consumer_cluster_)); + + if (num_consumer_tablets.size() != num_producer_tablets.size()) { + return STATUS(IllegalState, + Format("Num consumer tables: $0 num producer tables: $1 must be equal.", + num_consumer_tablets.size(), num_producer_tablets.size())); + } + + RETURN_NOT_OK(CreateDatabase(&producer_cluster_, kNamespaceName, colocated)); + RETURN_NOT_OK(CreateDatabase(&consumer_cluster_, kNamespaceName, colocated)); + + std::vector tables; + std::vector> yb_tables; + for (int i = 0; i < num_consumer_tablets.size(); i++) { + RETURN_NOT_OK(CreateTable(i, num_producer_tablets[i], &producer_cluster_, + &tables, colocated)); + std::shared_ptr producer_table; + RETURN_NOT_OK(producer_client()->OpenTable(tables[i * 2], &producer_table)); + yb_tables.push_back(producer_table); + + RETURN_NOT_OK(CreateTable(i, num_consumer_tablets[i], &consumer_cluster_, + &tables, colocated)); + std::shared_ptr consumer_table; + RETURN_NOT_OK(consumer_client()->OpenTable(tables[(i * 2) + 1], &consumer_table)); + yb_tables.push_back(consumer_table); + } + + return yb_tables; + } + + Status InitPostgres(Cluster* cluster) { + auto pg_ts = RandomElement(cluster->mini_cluster_->mini_tablet_servers()); + auto port = cluster->mini_cluster_->AllocateFreePort(); + yb::pgwrapper::PgProcessConf pg_process_conf = + VERIFY_RESULT(yb::pgwrapper::PgProcessConf::CreateValidateAndRunInitDb( + yb::ToString(Endpoint(pg_ts->bound_rpc_addr().address(), port)), + pg_ts->options()->fs_opts.data_paths.front() + "/pg_data", + pg_ts->server()->GetSharedMemoryFd())); + pg_process_conf.master_addresses = pg_ts->options()->master_addresses_flag; + pg_process_conf.force_disable_log_file = true; + FLAGS_pgsql_proxy_webserver_port = cluster->mini_cluster_->AllocateFreePort(); + + LOG(INFO) << "Starting PostgreSQL server listening on " + << pg_process_conf.listen_addresses << ":" << pg_process_conf.pg_port << ", data: " + << pg_process_conf.data_dir + << ", pgsql webserver port: " << FLAGS_pgsql_proxy_webserver_port; + cluster->pg_supervisor_ = std::make_unique(pg_process_conf); + RETURN_NOT_OK(cluster->pg_supervisor_->Start()); + + cluster->pg_host_port_ = HostPort(pg_process_conf.listen_addresses, pg_process_conf.pg_port); + return Status::OK(); + } + + Status CreateDatabase(Cluster* cluster, + const std::string& namespace_name = kNamespaceName, + bool colocated = false) { + auto conn = EXPECT_RESULT(cluster->Connect()); + EXPECT_OK(conn.ExecuteFormat("CREATE DATABASE $0$1", + namespace_name, colocated ? " colocated = true" : "")); + return Status::OK(); + } + + Result GetUniverseId(Cluster* cluster) { + master::GetMasterClusterConfigRequestPB req; + master::GetMasterClusterConfigResponsePB resp; + + auto master_proxy = std::make_shared( + &cluster->client_->proxy_cache(), + cluster->mini_cluster_->leader_mini_master()->bound_rpc_addr()); + + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + RETURN_NOT_OK(master_proxy->GetMasterClusterConfig(req, &resp, &rpc)); + if (resp.has_error()) { + return STATUS(IllegalState, "Error getting cluster config"); + } + return resp.cluster_config().cluster_uuid(); + } + + Result CreateTable(Cluster* cluster, + const std::string& namespace_name, + const std::string& table_name, + uint32_t num_tablets, + bool colocated = false, + const int table_oid = 0) { + auto conn = EXPECT_RESULT(cluster->ConnectToDB(namespace_name)); + std::string table_oid_string = ""; + if (table_oid > 0) { + // Need to turn on session flag to allow for CREATE WITH table_oid. + EXPECT_OK(conn.Execute("set yb_enable_create_with_table_oid=true")); + table_oid_string = Format("table_oid = $0,", table_oid); + } + EXPECT_OK(conn.ExecuteFormat( + "CREATE TABLE $0($1 int PRIMARY KEY) WITH ($2colocated = $3) SPLIT INTO $4 TABLETS", + table_name, kKeyColumnName, table_oid_string, colocated, num_tablets)); + return GetTable(cluster, namespace_name, table_name); + } + + Status CreateTable(uint32_t idx, uint32_t num_tablets, Cluster* cluster, + std::vector* tables, bool colocated = false) { + // Generate table_oid based on index so that we have the same table_oid for producer/consumer. + const int table_oid = colocated ? (idx + 1) * 111111 : 0; + auto table = VERIFY_RESULT(CreateTable(cluster, kNamespaceName, Format("test_table_$0", idx), + num_tablets, colocated, table_oid)); + tables->push_back(table); + return Status::OK(); + } + + Result GetTable(Cluster* cluster, + const std::string& namespace_name, + const std::string& table_name, + bool verify_table_name = true, + bool exclude_system_tables = true) { + master::ListTablesRequestPB req; + master::ListTablesResponsePB resp; + + req.set_name_filter(table_name); + req.mutable_namespace_()->set_name(namespace_name); + req.mutable_namespace_()->set_database_type(YQL_DATABASE_PGSQL); + if (!exclude_system_tables) { + req.set_exclude_system_tables(true); + req.add_relation_type_filter(master::USER_TABLE_RELATION); + } + + auto master_proxy = std::make_shared( + &cluster->client_->proxy_cache(), + cluster->mini_cluster_->leader_mini_master()->bound_rpc_addr()); + + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + RETURN_NOT_OK(master_proxy->ListTables(req, &resp, &rpc)); + if (resp.has_error()) { + return STATUS(IllegalState, "Failed listing tables"); + } + + // Now need to find the table and return it. + for (const auto& table : resp.tables()) { + // If !verify_table_name, just return the first table. + if (!verify_table_name || + (table.name() == table_name && table.namespace_().name() == namespace_name)) { + YBTableName yb_table; + yb_table.set_table_id(table.id()); + yb_table.set_namespace_id(table.namespace_().id()); + return yb_table; + } + } + return STATUS(IllegalState, + strings::Substitute("Unable to find table $0 in namespace $1", + table_name, namespace_name)); + } + + void WriteWorkload(uint32_t start, uint32_t end, Cluster* cluster, const YBTableName& table, + bool delete_op = false) { + auto conn = EXPECT_RESULT(cluster->ConnectToDB(table.namespace_name())); + + LOG(INFO) << "Writing " << end-start << (delete_op ? " deletes" : " inserts"); + for (uint32_t i = start; i < end; i++) { + if (delete_op) { + EXPECT_OK(conn.ExecuteFormat("DELETE FROM $0 WHERE $1 = $2", + table.table_name(), kKeyColumnName, i)); + } else { + // TODO(#6582) transactions currently don't work, so don't use ON CONFLICT DO NOTHING now. + EXPECT_OK(conn.ExecuteFormat("INSERT INTO $0($1) VALUES ($2)", // ON CONFLICT DO NOTHING", + table.table_name(), kKeyColumnName, i)); + } + } + } + + void WriteTransactionalWorkload(uint32_t start, uint32_t end, Cluster* cluster, + const YBTableName& table) { + auto conn = EXPECT_RESULT(cluster->ConnectToDB(table.namespace_name())); + EXPECT_OK(conn.Execute("BEGIN")); + for (uint32_t i = start; i < end; i++) { + EXPECT_OK(conn.ExecuteFormat("INSERT INTO $0($1) VALUES ($2) ON CONFLICT DO NOTHING", + table.table_name(), kKeyColumnName, i)); + } + EXPECT_OK(conn.Execute("COMMIT")); + } + + void DeleteWorkload(uint32_t start, uint32_t end, Cluster* cluster, const YBTableName& table) { + WriteWorkload(start, end, cluster, table, true /* delete_op */); + } + + PGResultPtr ScanToStrings(const YBTableName& table_name, Cluster* cluster) { + auto conn = EXPECT_RESULT(cluster->ConnectToDB(table_name.namespace_name())); + auto result = + EXPECT_RESULT(conn.FetchFormat("SELECT * FROM $0 ORDER BY key", table_name.table_name())); + return result; + } + + Status VerifyWrittenRecords(const YBTableName& producer_table, + const YBTableName& consumer_table) { + return LoggedWaitFor([=]() -> Result { + auto producer_results = ScanToStrings(producer_table, &producer_cluster_); + auto consumer_results = ScanToStrings(consumer_table, &consumer_cluster_); + if (PQntuples(producer_results.get()) != PQntuples(consumer_results.get())) { + return false; + } + for (int i = 0; i < PQntuples(producer_results.get()); ++i) { + auto prod_val = EXPECT_RESULT(AsString(producer_results.get(), i, 0)); + auto cons_val = EXPECT_RESULT(AsString(consumer_results.get(), i, 0)); + if (prod_val != cons_val) { + return false; + } + } + return true; + }, MonoDelta::FromSeconds(kRpcTimeout), "Verify written records"); + } + + Status VerifyNumRecords(const YBTableName& table, Cluster* cluster, int expected_size) { + return LoggedWaitFor([=]() -> Result { + auto results = ScanToStrings(table, cluster); + return PQntuples(results.get()) == expected_size; + }, MonoDelta::FromSeconds(kRpcTimeout), "Verify number of records"); + } +}; + +INSTANTIATE_TEST_CASE_P(BatchSize, TwoDCYsqlTest, ::testing::Values(1, 100)); + +TEST_P(TwoDCYsqlTest, YB_DISABLE_TEST_IN_TSAN(SetupUniverseReplication)) { + auto tables = ASSERT_RESULT(SetUpWithParams({8, 4}, {6, 6}, 3, 1, false /* colocated */)); + const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); + + std::vector> producer_tables; + // tables contains both producer and consumer universe tables (alternately). + // Pick out just the producer tables from the list. + producer_tables.reserve(tables.size() / 2); + for (int i = 0; i < tables.size(); i += 2) { + producer_tables.push_back(tables[i]); + } + ASSERT_OK(SetupUniverseReplication( + producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); + + // Verify that universe was setup on consumer. + master::GetUniverseReplicationResponsePB resp; + ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); + ASSERT_EQ(resp.entry().producer_id(), kUniverseId); + ASSERT_EQ(resp.entry().tables_size(), producer_tables.size()); + for (int i = 0; i < producer_tables.size(); i++) { + ASSERT_EQ(resp.entry().tables(i), producer_tables[i]->id()); + } + + // Verify that CDC streams were created on producer for all tables. + for (int i = 0; i < producer_tables.size(); i++) { + master::ListCDCStreamsResponsePB stream_resp; + ASSERT_OK(GetCDCStreamForTable(producer_tables[i]->id(), &stream_resp)); + ASSERT_EQ(stream_resp.streams_size(), 1); + ASSERT_EQ(stream_resp.streams(0).table_id(), producer_tables[i]->id()); + } + + ASSERT_OK(DeleteUniverseReplication(kUniverseId)); + + Destroy(); +} + +TEST_P(TwoDCYsqlTest, YB_DISABLE_TEST_IN_TSAN(SimpleReplication)) { + constexpr int kNTabletsPerTable = 1; + std::vector tables_vector = {kNTabletsPerTable, kNTabletsPerTable}; + auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1)); + const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); + + // tables contains both producer and consumer universe tables (alternately). + // Pick out just the producer tables from the list. + std::vector> producer_tables; + std::vector> consumer_tables; + producer_tables.reserve(tables.size() / 2); + consumer_tables.reserve(tables.size() / 2); + for (int i = 0; i < tables.size(); ++i) { + if (i % 2 == 0) { + producer_tables.push_back(tables[i]); + } else { + consumer_tables.push_back(tables[i]); + } + } + + // 1. Write some data. + for (const auto& producer_table : producer_tables) { + LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); + WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); + } + + // Verify data is written on the producer. + for (const auto& producer_table : producer_tables) { + auto producer_results = ScanToStrings(producer_table->name(), &producer_cluster_); + ASSERT_EQ(100, PQntuples(producer_results.get())); + int result; + for (int i = 0; i < 100; ++i) { + result = ASSERT_RESULT(GetInt32(producer_results.get(), i, 0)); + ASSERT_EQ(i, result); + } + } + + // 2. Setup replication. + ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), + kUniverseId, producer_tables)); + + // 3. Verify everything is setup correctly. + master::GetUniverseReplicationResponsePB get_universe_replication_resp; + ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, + &get_universe_replication_resp)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), + tables_vector.size() * kNTabletsPerTable)); + + auto data_replicated_correctly = [&](int num_results) -> Result { + for (const auto& consumer_table : consumer_tables) { + LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); + auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); + + if (num_results != PQntuples(consumer_results.get())) { + return false; + } + int result; + for (int i = 0; i < num_results; ++i) { + result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); + if (i != result) { + return false; + } + } + } + return true; + }; + ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(100); }, + MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); + + // 4. Write more data. + for (const auto& producer_table : producer_tables) { + WriteWorkload(100, 105, &producer_cluster_, producer_table->name()); + } + + // 5. Make sure this data is also replicated now. + ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(105); }, + MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); + Destroy(); +} + +TEST_P(TwoDCYsqlTest, YB_DISABLE_TEST_IN_TSAN(SetupUniverseReplicationWithProducerBootstrapId)) { + constexpr int kNTabletsPerTable = 1; + std::vector tables_vector = {kNTabletsPerTable, kNTabletsPerTable}; + auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 3)); + const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); + auto producer_master_proxy = std::make_shared( + &producer_client()->proxy_cache(), + producer_cluster()->leader_mini_master()->bound_rpc_addr()); + + std::unique_ptr client; + std::unique_ptr producer_cdc_proxy; + client = ASSERT_RESULT(consumer_cluster()->CreateClient()); + producer_cdc_proxy = std::make_unique( + &client->proxy_cache(), + HostPort::FromBoundEndpoint(producer_cluster()->mini_tablet_server(0)->bound_rpc_addr())); + + // tables contains both producer and consumer universe tables (alternately). + // Pick out just the producer tables from the list. + std::vector> producer_tables; + std::vector> consumer_tables; + producer_tables.reserve(tables.size() / 2); + consumer_tables.reserve(tables.size() / 2); + for (int i = 0; i < tables.size(); ++i) { + if (i % 2 == 0) { + producer_tables.push_back(tables[i]); + } else { + consumer_tables.push_back(tables[i]); + } + } + + // 1. Write some data so that we can verify that only new records get replicated. + for (const auto& producer_table : producer_tables) { + LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); + WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); + } + + SleepFor(MonoDelta::FromSeconds(10)); + cdc::BootstrapProducerRequestPB req; + cdc::BootstrapProducerResponsePB resp; + + for (const auto& producer_table : producer_tables) { + req.add_table_ids(producer_table->id()); + } + + rpc::RpcController rpc; + producer_cdc_proxy->BootstrapProducer(req, &resp, &rpc); + ASSERT_FALSE(resp.has_error()); + + ASSERT_EQ(resp.cdc_bootstrap_ids().size(), producer_tables.size()); + + int table_idx = 0; + for (const auto& bootstrap_id : resp.cdc_bootstrap_ids()) { + LOG(INFO) << "Got bootstrap id " << bootstrap_id + << " for table " << producer_tables[table_idx++]->name().table_name(); + } + + std::unordered_map tablet_bootstraps; + + // Verify that for each of the table's tablets, a new row in cdc_state table with the returned + // id was inserted. + client::TableHandle table; + client::YBTableName cdc_state_table( + YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName); + ASSERT_OK(table.Open(cdc_state_table, producer_client())); + + // 2 tables with 8 tablets each. + ASSERT_EQ(tables_vector.size() * kNTabletsPerTable, boost::size(client::TableRange(table))); + int nrows = 0; + for (const auto& row : client::TableRange(table)) { + nrows++; + string stream_id = row.column(0).string_value(); + tablet_bootstraps[stream_id]++; + + string checkpoint = row.column(2).string_value(); + auto s = OpId::FromString(checkpoint); + ASSERT_OK(s); + OpId op_id = *s; + ASSERT_GT(op_id.index, 0); + + LOG(INFO) << "Bootstrap id " << stream_id + << " for tablet " << row.column(1).string_value(); + } + + ASSERT_EQ(tablet_bootstraps.size(), producer_tables.size()); + // Check that each bootstrap id has 8 tablets. + for (const auto& e : tablet_bootstraps) { + ASSERT_EQ(e.second, kNTabletsPerTable); + } + + // Map table -> bootstrap_id. We will need when setting up replication. + std::unordered_map table_bootstrap_ids; + for (int i = 0; i < resp.cdc_bootstrap_ids_size(); i++) { + table_bootstrap_ids[req.table_ids(i)] = resp.cdc_bootstrap_ids(i); + } + + // 2. Setup replication. + master::SetupUniverseReplicationRequestPB setup_universe_req; + master::SetupUniverseReplicationResponsePB setup_universe_resp; + setup_universe_req.set_producer_id(kUniverseId); + string master_addr = producer_cluster()->GetMasterAddresses(); + auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); + HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); + + setup_universe_req.mutable_producer_table_ids()->Reserve(producer_tables.size()); + for (const auto& producer_table : producer_tables) { + setup_universe_req.add_producer_table_ids(producer_table->id()); + const auto& iter = table_bootstrap_ids.find(producer_table->id()); + ASSERT_NE(iter, table_bootstrap_ids.end()); + setup_universe_req.add_producer_bootstrap_ids(iter->second); + } + + auto master_proxy = std::make_shared( + &consumer_client()->proxy_cache(), + consumer_cluster()->leader_mini_master()->bound_rpc_addr()); + + rpc.Reset(); + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + ASSERT_OK(master_proxy->SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc)); + ASSERT_FALSE(setup_universe_resp.has_error()); + + // 3. Verify everything is setup correctly. + master::GetUniverseReplicationResponsePB get_universe_replication_resp; + ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, + &get_universe_replication_resp)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), + tables_vector.size() * kNTabletsPerTable)); + + // 4. Write more data. + for (const auto& producer_table : producer_tables) { + WriteWorkload(1000, 1005, &producer_cluster_, producer_table->name()); + } + + // 5. Verify that only new writes get replicated to consumer since we bootstrapped the producer + // after we had already written some data, therefore the old data (whatever was there before we + // bootstrapped the producer) should not be replicated. + auto data_replicated_correctly = [&]() -> Result { + for (const auto& consumer_table : consumer_tables) { + LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); + auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); + + if (5 != PQntuples(consumer_results.get())) { + return false; + } + int result; + for (int i = 0; i < 5; ++i) { + result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); + if ((1000 + i) != result) { + return false; + } + } + } + return true; + }; + + ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(); }, + MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); + Destroy(); +} + +TEST_P(TwoDCYsqlTest, YB_DISABLE_TEST_IN_TSAN(ColocatedDatabaseReplication)) { + constexpr int kNTabletsPerColocatedTable = 1; + constexpr int kNTabletsPerTable = 3; + std::vector tables_vector = {kNTabletsPerColocatedTable, kNTabletsPerColocatedTable}; + // Create two colocated tables on each cluster. + auto colocated_tables = + ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 3, 1, true /* colocated */)); + const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); + + // Also create an additional non-colocated table in each database. + auto non_colocated_table = ASSERT_RESULT(CreateTable(&producer_cluster_, + kNamespaceName, + "test_table_2", + kNTabletsPerTable, + false /* colocated */)); + std::shared_ptr non_colocated_producer_table; + ASSERT_OK(producer_client()->OpenTable(non_colocated_table, &non_colocated_producer_table)); + non_colocated_table = ASSERT_RESULT(CreateTable(&consumer_cluster_, + kNamespaceName, + "test_table_2", + kNTabletsPerTable, + false /* colocated */)); + std::shared_ptr non_colocated_consumer_table; + ASSERT_OK(consumer_client()->OpenTable(non_colocated_table, &non_colocated_consumer_table)); + + // colocated_tables contains both producer and consumer universe tables (alternately). + // Pick out just the producer tables from the list. + std::vector> producer_tables; + std::vector> consumer_tables; + std::vector> colocated_producer_tables; + std::vector> colocated_consumer_tables; + producer_tables.reserve(colocated_tables.size() / 2 + 1); + consumer_tables.reserve(colocated_tables.size() / 2 + 1); + colocated_producer_tables.reserve(colocated_tables.size() / 2); + colocated_consumer_tables.reserve(colocated_tables.size() / 2); + for (int i = 0; i < colocated_tables.size(); ++i) { + if (i % 2 == 0) { + producer_tables.push_back(colocated_tables[i]); + colocated_producer_tables.push_back(colocated_tables[i]); + } else { + consumer_tables.push_back(colocated_tables[i]); + colocated_consumer_tables.push_back(colocated_tables[i]); + } + } + producer_tables.push_back(non_colocated_producer_table); + consumer_tables.push_back(non_colocated_consumer_table); + + // 1. Write some data to all tables. + for (const auto& producer_table : producer_tables) { + LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); + WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); + } + + // 2. Setup replication for only the colocated tables. + // Get the producer namespace id, so we can construct the colocated parent table id. + GetNamespaceInfoResponsePB ns_resp; + ASSERT_OK(producer_client()->GetNamespaceInfo("", kNamespaceName, YQL_DATABASE_PGSQL, &ns_resp)); + + rpc::RpcController rpc; + master::SetupUniverseReplicationRequestPB setup_universe_req; + master::SetupUniverseReplicationResponsePB setup_universe_resp; + setup_universe_req.set_producer_id(kUniverseId); + string master_addr = producer_cluster()->GetMasterAddresses(); + auto hp_vec = ASSERT_RESULT(HostPort::ParseStrings(master_addr, 0)); + HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); + // Only need to add the colocated parent table id. + setup_universe_req.mutable_producer_table_ids()->Reserve(1); + setup_universe_req.add_producer_table_ids( + ns_resp.namespace_().id() + master::kColocatedParentTableIdSuffix); + auto master_proxy = std::make_shared( + &consumer_client()->proxy_cache(), + consumer_cluster()->leader_mini_master()->bound_rpc_addr()); + + rpc.Reset(); + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + ASSERT_OK(master_proxy->SetupUniverseReplication(setup_universe_req, &setup_universe_resp, &rpc)); + ASSERT_FALSE(setup_universe_resp.has_error()); + + // 3. Verify everything is setup correctly. + master::GetUniverseReplicationResponsePB get_universe_replication_resp; + ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, + &get_universe_replication_resp)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), kNTabletsPerColocatedTable)); + + // 4. Check that colocated tables are being replicated. + auto data_replicated_correctly = [&](int num_results, bool onlyColocated) -> Result { + auto &tables = onlyColocated ? colocated_consumer_tables : consumer_tables; + for (const auto& consumer_table : tables) { + LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); + auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); + + if (num_results != PQntuples(consumer_results.get())) { + return false; + } + int result; + for (int i = 0; i < num_results; ++i) { + result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); + if (i != result) { + return false; + } + } + } + return true; + }; + ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(100, true); }, + MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); + // Ensure that the non colocated table is not replicated. + auto non_coloc_results = ScanToStrings(non_colocated_consumer_table->name(), &consumer_cluster_); + ASSERT_EQ(0, PQntuples(non_coloc_results.get())); + + // 5. Add the regular table to replication. + // Prepare and send AlterUniverseReplication command. + master::AlterUniverseReplicationRequestPB alter_req; + master::AlterUniverseReplicationResponsePB alter_resp; + alter_req.set_producer_id(kUniverseId); + alter_req.add_producer_table_ids_to_add(non_colocated_producer_table->id()); + + rpc.Reset(); + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + ASSERT_OK(master_proxy->AlterUniverseReplication(alter_req, &alter_resp, &rpc)); + ASSERT_FALSE(alter_resp.has_error()); + // Wait until we have 2 tables (colocated tablet + regular table) logged. + ASSERT_OK(LoggedWaitFor([&]() -> Result { + master::GetUniverseReplicationResponsePB tmp_resp; + return VerifyUniverseReplication(consumer_cluster(), consumer_client(), + kUniverseId, &tmp_resp).ok() && + tmp_resp.entry().tables_size() == 2; + }, MonoDelta::FromSeconds(kRpcTimeout), "Verify table created with alter.")); + + ASSERT_OK(CorrectlyPollingAllTablets( + consumer_cluster(), kNTabletsPerColocatedTable + kNTabletsPerTable)); + // Check that all data is replicated for the new table as well. + ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(100, false); }, + MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); + + // 6. Add additional data to all tables + for (const auto& producer_table : producer_tables) { + LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); + WriteWorkload(100, 150, &producer_cluster_, producer_table->name()); + } + + // 7. Verify all tables are properly replicated. + ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(150, false); }, + MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); + Destroy(); +} + +TEST_P(TwoDCYsqlTest, YB_DISABLE_TEST_IN_TSAN(ColocatedDatabaseDifferentTableOids)) { + auto colocated_tables = ASSERT_RESULT(SetUpWithParams({}, {}, 3, 1, true /* colocated */)); + const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); + + // Create two tables with different table oids. + auto conn = ASSERT_RESULT(producer_cluster_.ConnectToDB(kNamespaceName)); + auto table_info = ASSERT_RESULT(CreateTable(&producer_cluster_, + kNamespaceName, + "test_table_0", + 1 /* num_tablets */, + true /* colocated */, + 123456 /* table_oid */)); + ASSERT_RESULT(CreateTable(&consumer_cluster_, + kNamespaceName, + "test_table_0", + 1 /* num_tablets */, + true /* colocated */, + 123457 /* table_oid */)); + std::shared_ptr producer_table; + ASSERT_OK(producer_client()->OpenTable(table_info, &producer_table)); + + // Try to setup replication, should fail on schema validation due to different table oids. + ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), + kUniverseId, {producer_table})); + master::GetUniverseReplicationResponsePB get_universe_replication_resp; + ASSERT_NOK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, + &get_universe_replication_resp)); + Destroy(); +} + +// TODO adapt rest of twodc-test.cc tests. + +} // namespace enterprise +} // namespace yb diff --git a/ent/src/yb/master/catalog_manager.h b/ent/src/yb/master/catalog_manager.h index 33a31bc2036d..323af4cd054c 100644 --- a/ent/src/yb/master/catalog_manager.h +++ b/ent/src/yb/master/catalog_manager.h @@ -283,9 +283,27 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon return cluster_config_; } + // Helper functions for GetTableSchemaCallback and GetColocatedTabletSchemaCallback: + // Validates a single table's schema with the corresponding table on the consumer side, and + // updates consumer_table_id with the new table id. + CHECKED_STATUS ValidateTableSchema( + const std::shared_ptr& info, + const std::unordered_map& table_bootstrap_ids, + TableId* consumer_table_id); + // Adds a validated table to the sys catalog table map for the given universe, and if all tables + // have been validated, creates a CDC stream for each table. + CHECKED_STATUS AddValidatedTableAndCreateCdcStreams( + scoped_refptr universe, + const std::unordered_map& table_bootstrap_ids, + const TableId& producer_table, + const TableId& consumer_table); + void GetTableSchemaCallback( const std::string& universe_id, const std::shared_ptr& info, const std::unordered_map& producer_bootstrap_ids, const Status& s); + void GetColocatedTabletSchemaCallback( + const std::string& universe_id, const std::shared_ptr>& info, + const std::unordered_map& producer_bootstrap_ids, const Status& s); void GetCDCStreamCallback(const CDCStreamId& bootstrap_id, std::shared_ptr table_id, std::shared_ptr> options, diff --git a/ent/src/yb/master/catalog_manager_ent.cc b/ent/src/yb/master/catalog_manager_ent.cc index ffb2fc80195d..88c1d272ad6d 100644 --- a/ent/src/yb/master/catalog_manager_ent.cc +++ b/ent/src/yb/master/catalog_manager_ent.cc @@ -10,7 +10,9 @@ // or implied. See the License for the specific language governing permissions and limitations // under the License. +#include #include +#include #include #include "yb/master/catalog_manager.h" @@ -27,6 +29,7 @@ #include "yb/client/table_alterer.h" #include "yb/client/yb_op.h" #include "yb/common/common.pb.h" +#include "yb/common/entity_ids.h" #include "yb/common/ql_name.h" #include "yb/common/wire_protocol.h" #include "yb/consensus/consensus.h" @@ -2273,13 +2276,23 @@ Status CatalogManager::SetupUniverseReplication(const SetupUniverseReplicationRe // For each table, run an async RPC task to verify a sufficient Producer:Consumer schema match. for (int i = 0; i < req->producer_table_ids_size(); i++) { - auto table_info = std::make_shared(); // SETUP CONTINUES after this async call. - const Status s = cdc_rpc->client()->GetTableSchemaById( - req->producer_table_ids(i), table_info, - Bind(&enterprise::CatalogManager::GetTableSchemaCallback, Unretained(this), - ri->id(), table_info, table_id_to_bootstrap_id)); + Status s; + if (IsColocatedParentTableId(req->producer_table_ids(i))) { + auto tables_info = std::make_shared>(); + s = cdc_rpc->client()->GetColocatedTabletSchemaById( + req->producer_table_ids(i), tables_info, + Bind(&enterprise::CatalogManager::GetColocatedTabletSchemaCallback, Unretained(this), + ri->id(), tables_info, table_id_to_bootstrap_id)); + } else { + auto table_info = std::make_shared(); + s = cdc_rpc->client()->GetTableSchemaById( + req->producer_table_ids(i), table_info, + Bind(&enterprise::CatalogManager::GetTableSchemaCallback, Unretained(this), + ri->id(), table_info, table_id_to_bootstrap_id)); + } + if (!s.ok()) { MarkUniverseReplicationFailed(ri); return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s); @@ -2307,27 +2320,10 @@ void CatalogManager::MarkUniverseReplicationFailed( l->Commit(); } -void CatalogManager::GetTableSchemaCallback( - const std::string& universe_id, const std::shared_ptr& info, - const std::unordered_map& table_bootstrap_ids, const Status& s) { - scoped_refptr universe; - { - std::shared_lock catalog_lock(lock_); - TRACE("Acquired catalog manager lock"); - - universe = FindPtrOrNull(universe_replication_map_, universe_id); - if (universe == nullptr) { - LOG(ERROR) << "Universe not found: " << universe_id; - return; - } - } - - if (!s.ok()) { - MarkUniverseReplicationFailed(universe); - LOG(ERROR) << "Error getting schema for table " << info->table_id << ": " << s; - return; - } - +Status CatalogManager::ValidateTableSchema( + const std::shared_ptr& info, + const std::unordered_map& table_bootstrap_ids, + TableId* consumer_table_id) { // Get corresponding table schema on local universe. GetTableSchemaRequestPB req; GetTableSchemaResponsePB resp; @@ -2347,9 +2343,7 @@ void CatalogManager::GetTableSchemaCallback( list_req.set_name_filter(info->table_name.table_name()); Status status = ListTables(&list_req, &list_resp); if (!status.ok() || list_resp.has_error()) { - LOG(ERROR) << "Error while listing table: " << status; - MarkUniverseReplicationFailed(universe); - return; + return STATUS(NotFound, Substitute("Error while listing table: $0", status.ToString())); } // TODO: This does not work for situation where tables in different YSQL schemas have the same @@ -2363,54 +2357,76 @@ void CatalogManager::GetTableSchemaCallback( } if (!table->has_table_id()) { - LOG(ERROR) << "Could not find matching table for " << info->table_name.ToString(); - MarkUniverseReplicationFailed(universe); - return; + return STATUS(NotFound, + Substitute("Could not find matching table for $0", info->table_name.ToString())); } // We have a table match. Now get the table schema and validate status = GetTableSchema(&req, &resp); if (!status.ok() || resp.has_error()) { - LOG(ERROR) << "Error while getting table schema: " << status; - MarkUniverseReplicationFailed(universe); - return; + return STATUS(NotFound, Substitute("Error while getting table schema: $0", status.ToString())); } auto result = info->schema.EquivalentForDataCopy(resp.schema()); if (!result.ok() || !*result) { - LOG(ERROR) << "Source and target schemas don't match: Source: " << info->table_id - << ", Target: " << resp.identifier().table_id() - << ", Source schema: " << info->schema.ToString() - << ", Target schema: " << resp.schema().DebugString(); - MarkUniverseReplicationFailed(universe); - return; + return STATUS(IllegalState, + Substitute("Source and target schemas don't match: " + "Source: $0, Target: $1, Source schema: $2, Target schema: $3", + info->table_id, resp.identifier().table_id(), + info->schema.ToString(), resp.schema().DebugString())); + } + + // Still need to make map of table id to resp table id (to add to validated map) + // For colocated tables, only add the parent table since we only added the parent table to the + // original pb (we use the number of tables in the pb to determine when validation is done). + if (info->colocated) { + // For now we require that colocated tables have the same table oid. + auto source_oid = CHECK_RESULT(GetPgsqlTableOid(info->table_id)); + auto target_oid = CHECK_RESULT(GetPgsqlTableOid(resp.identifier().table_id())); + if (source_oid != target_oid) { + return STATUS(IllegalState, + Substitute("Source and target table oids don't match for colocated table: " + "Source: $0, Target: $1, Source table oid: $2, Target table oid: $3", + info->table_id, resp.identifier().table_id(), source_oid, target_oid)); + } + string parent_table_id = resp.identifier().namespace_().id() + kColocatedParentTableIdSuffix; + *consumer_table_id = parent_table_id; + } else { + *consumer_table_id = resp.identifier().table_id(); } + return Status::OK(); +} + +Status CatalogManager::AddValidatedTableAndCreateCdcStreams( + scoped_refptr universe, + const std::unordered_map& table_bootstrap_ids, + const TableId& producer_table, + const TableId& consumer_table) { auto l = universe->LockForWrite(); auto master_addresses = l->data().pb.producer_master_addresses(); auto res = universe->GetOrCreateCDCRpcTasks(master_addresses); if (!res.ok()) { - LOG(ERROR) << "Error while setting up client for producer " << universe->id(); l->mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED); - const Status s = sys_catalog_->UpdateItem(universe.get(), leader_ready_term()); if (!s.ok()) { - return (void)CheckStatus(s, "updating universe replication info in sys-catalog"); + return CheckStatus(s, "updating universe replication info in sys-catalog"); } l->Commit(); - return; + return STATUS(InternalError, + Substitute("Error while setting up client for producer $0", universe->id())); } std::shared_ptr cdc_rpc = *res; vector validated_tables; if (l->data().is_deleted_or_failed()) { // Nothing to do since universe is being deleted. - return; + return STATUS(Aborted, "Universe is being deleted"); } auto map = l->mutable_data()->pb.mutable_validated_tables(); - (*map)[info->table_id] = resp.identifier().table_id(); + (*map)[producer_table] = consumer_table; // Now, all tables are validated. if (l->mutable_data()->pb.validated_tables_size() == l->mutable_data()->pb.tables_size()) { @@ -2420,12 +2436,13 @@ void CatalogManager::GetTableSchemaCallback( } // TODO: end of config validation should be where SetupUniverseReplication exits back to user - LOG(INFO) << "UpdateItem in GetTableSchemaCallback"; + LOG(INFO) << "UpdateItem in AddValidatedTable"; // Update sys_catalog. - status = sys_catalog_->UpdateItem(universe.get(), leader_ready_term()); + Status status = sys_catalog_->UpdateItem(universe.get(), leader_ready_term()); if (!status.ok()) { - return (void)CheckStatus(status, "updating universe replication info in sys-catalog"); + LOG(ERROR) << "Error during UpdateItem: " << status; + return CheckStatus(status, "updating universe replication info in sys-catalog"); } l->Commit(); @@ -2457,6 +2474,141 @@ void CatalogManager::GetTableSchemaCallback( } } } + return Status::OK(); +} + +void CatalogManager::GetTableSchemaCallback( + const std::string& universe_id, const std::shared_ptr& info, + const std::unordered_map& table_bootstrap_ids, const Status& s) { + // First get the universe. + scoped_refptr universe; + { + std::shared_lock catalog_lock(lock_); + TRACE("Acquired catalog manager lock"); + + universe = FindPtrOrNull(universe_replication_map_, universe_id); + if (universe == nullptr) { + LOG(ERROR) << "Universe not found: " << universe_id; + return; + } + } + + if (!s.ok()) { + MarkUniverseReplicationFailed(universe); + LOG(ERROR) << "Error getting schema for table " << info->table_id << ": " << s; + return; + } + + // Validate the table schema. + TableId table_id; + Status status = ValidateTableSchema(info, table_bootstrap_ids, &table_id); + if (!status.ok()) { + MarkUniverseReplicationFailed(universe); + LOG(ERROR) << "Found error while validating table schema for table " << info->table_id + << ": " << status; + return; + } + + status = AddValidatedTableAndCreateCdcStreams(universe, + table_bootstrap_ids, + info->table_id, + table_id); + if (!status.ok()) { + LOG(ERROR) << "Found error while adding validated table to system catalog: " << info->table_id + << ": " << status; + return; + } +} + +void CatalogManager::GetColocatedTabletSchemaCallback( + const std::string& universe_id, const std::shared_ptr>& infos, + const std::unordered_map& table_bootstrap_ids, const Status& s) { + // First get the universe. + scoped_refptr universe; + { + std::shared_lock catalog_lock(lock_); + TRACE("Acquired catalog manager lock"); + + universe = FindPtrOrNull(universe_replication_map_, universe_id); + if (universe == nullptr) { + LOG(ERROR) << "Universe not found: " << universe_id; + return; + } + } + + if (!s.ok()) { + MarkUniverseReplicationFailed(universe); + std::ostringstream oss; + for (int i = 0; i < infos->size(); ++i) { + oss << ((i == 0) ? "" : ", ") << (*infos)[i].table_id; + } + LOG(ERROR) << "Error getting schema for tables: [ " << oss.str() << " ]: " << s; + return; + } + + if (infos->empty()) { + LOG(WARNING) << "Received empty list of tables to validate: " << s; + return; + } + + // Validate table schemas. + std::unordered_set producer_parent_table_ids; + std::unordered_set consumer_parent_table_ids; + for (const auto& info : *infos) { + // Verify that we have a colocated table. + if (!info.colocated) { + MarkUniverseReplicationFailed(universe); + LOG(ERROR) << "Received non-colocated table: " << info.table_id; + return; + } + // Validate each table, and get the parent colocated table id for the consumer. + TableId consumer_parent_table_id; + Status table_status = ValidateTableSchema(std::make_shared(info), + table_bootstrap_ids, + &consumer_parent_table_id); + if (!table_status.ok()) { + MarkUniverseReplicationFailed(universe); + LOG(ERROR) << "Found error while validating table schema for table " << info.table_id + << ": " << table_status; + return; + } + // Store the parent table ids. + producer_parent_table_ids.insert( + info.table_name.namespace_id() + kColocatedParentTableIdSuffix); + consumer_parent_table_ids.insert(consumer_parent_table_id); + } + + // Verify that we only found one producer and one consumer colocated parent table id. + if (producer_parent_table_ids.size() != 1) { + MarkUniverseReplicationFailed(universe); + std::ostringstream oss; + for (auto it = producer_parent_table_ids.begin(); it != producer_parent_table_ids.end(); ++it) { + oss << ((it == producer_parent_table_ids.begin()) ? "" : ", ") << *it; + } + LOG(ERROR) << "Found incorrect number of producer colocated parent table ids." + << "Expected 1, but found: [ " << oss.str() << " ]"; + return; + } + if (consumer_parent_table_ids.size() != 1) { + MarkUniverseReplicationFailed(universe); + std::ostringstream oss; + for (auto it = consumer_parent_table_ids.begin(); it != consumer_parent_table_ids.end(); ++it) { + oss << ((it == consumer_parent_table_ids.begin()) ? "" : ", ") << *it; + } + LOG(ERROR) << "Found incorrect number of consumer colocated parent table ids." + << "Expected 1, but found: [ " << oss.str() << " ]"; + return; + } + + Status status = AddValidatedTableAndCreateCdcStreams(universe, + table_bootstrap_ids, + *producer_parent_table_ids.begin(), + *consumer_parent_table_ids.begin()); + if (!status.ok()) { + LOG(ERROR) << "Found error while adding validated table to system catalog: " + << *producer_parent_table_ids.begin() << ": " << status; + return; + } } void CatalogManager::GetCDCStreamCallback( diff --git a/src/yb/client/client-internal.cc b/src/yb/client/client-internal.cc index a606757fcc4c..49333d5aaf82 100644 --- a/src/yb/client/client-internal.cc +++ b/src/yb/client/client-internal.cc @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -109,6 +110,7 @@ using rpc::RpcController; namespace client { using internal::GetTableSchemaRpc; +using internal::GetColocatedTabletSchemaRpc; using internal::RemoteTablet; using internal::RemoteTabletServer; using internal::UpdateLocalTsState; @@ -297,6 +299,7 @@ YB_CLIENT_SPECIALIZE_SIMPLE(CreateTable); YB_CLIENT_SPECIALIZE_SIMPLE(DeleteTable); YB_CLIENT_SPECIALIZE_SIMPLE(GetMasterClusterConfig); YB_CLIENT_SPECIALIZE_SIMPLE(GetTableSchema); +YB_CLIENT_SPECIALIZE_SIMPLE(GetColocatedTabletSchema); YB_CLIENT_SPECIALIZE_SIMPLE(IsAlterTableDone); YB_CLIENT_SPECIALIZE_SIMPLE(IsFlushTablesDone); YB_CLIENT_SPECIALIZE_SIMPLE(IsCreateTableDone); @@ -1245,6 +1248,46 @@ class GetTableSchemaRpc YBTableInfo* info_; }; +// Gets all table schemas for a colocated tablet from the leader master. See ClientMasterRpc. +class GetColocatedTabletSchemaRpc : public ClientMasterRpc { + public: + GetColocatedTabletSchemaRpc(YBClient* client, + StatusCallback user_cb, + const YBTableName& parent_colocated_table, + vector* info, + CoarseTimePoint deadline, + rpc::Messenger* messenger, + rpc::ProxyCache* proxy_cache); + GetColocatedTabletSchemaRpc(YBClient* client, + StatusCallback user_cb, + const TableId& parent_colocated_table_id, + vector* info, + CoarseTimePoint deadline, + rpc::Messenger* messenger, + rpc::ProxyCache* proxy_cache); + + std::string ToString() const override; + + virtual ~GetColocatedTabletSchemaRpc(); + + private: + GetColocatedTabletSchemaRpc(YBClient* client, + StatusCallback user_cb, + const master::TableIdentifierPB& parent_colocated_table_identifier, + vector* info, + CoarseTimePoint deadline, + rpc::Messenger* messenger, + rpc::ProxyCache* proxy_cache); + + void CallRemoteMethod() override; + void ProcessResponse(const Status& status) override; + + StatusCallback user_cb_; + master::TableIdentifierPB table_identifier_; + vector* info_; +}; + namespace { master::TableIdentifierPB ToTableIdentifierPB(const YBTableName& table_name) { @@ -1370,6 +1413,32 @@ void ClientMasterRpc::Finished(const Status& status) { ProcessResponse(new_status); } +// Helper function to create YBTableInfo from GetTableSchemaResponsePB. +Status CreateTableInfoFromTableSchemaResp(const GetTableSchemaResponsePB& resp, YBTableInfo* info) { + std::unique_ptr schema = std::make_unique(Schema()); + RETURN_NOT_OK(SchemaFromPB(resp.schema(), schema.get())); + info->schema.Reset(std::move(schema)); + info->schema.set_version(resp.version()); + RETURN_NOT_OK(PartitionSchema::FromPB(resp.partition_schema(), + GetSchema(&info->schema), + &info->partition_schema)); + + info->table_name.GetFromTableIdentifierPB(resp.identifier()); + info->table_id = resp.identifier().table_id(); + RETURN_NOT_OK(YBTable::PBToClientTableType(resp.table_type(), &info->table_type)); + info->index_map.FromPB(resp.indexes()); + if (resp.has_index_info()) { + info->index_info.emplace(resp.index_info()); + } + if (resp.has_replication_info()) { + info->replication_info.emplace(resp.replication_info()); + } + SCHECK_GT(info->table_id.size(), 0, IllegalState, "Running against a too-old master"); + info->colocated = resp.colocated(); + + return Status::OK(); +} + GetTableSchemaRpc::GetTableSchemaRpc(YBClient* client, StatusCallback user_cb, const YBTableName& table_name, @@ -1423,27 +1492,74 @@ string GetTableSchemaRpc::ToString() const { void GetTableSchemaRpc::ProcessResponse(const Status& status) { auto new_status = status; if (new_status.ok()) { - std::unique_ptr schema(new Schema()); - new_status = SchemaFromPB(resp_.schema(), schema.get()); - if (new_status.ok()) { - info_->schema.Reset(std::move(schema)); - info_->schema.set_version(resp_.version()); - new_status = PartitionSchema::FromPB(resp_.partition_schema(), - GetSchema(&info_->schema), - &info_->partition_schema); - - info_->table_name.GetFromTableIdentifierPB(resp_.identifier()); - info_->table_id = resp_.identifier().table_id(); - CHECK_OK(YBTable::PBToClientTableType(resp_.table_type(), &info_->table_type)); - info_->index_map.FromPB(resp_.indexes()); - if (resp_.has_index_info()) { - info_->index_info.emplace(resp_.index_info()); - } - if (resp_.has_replication_info()) { - info_->replication_info.emplace(resp_.replication_info()); + new_status = CreateTableInfoFromTableSchemaResp(resp_, info_); + } + if (!new_status.ok()) { + LOG(WARNING) << ToString() << " failed: " << new_status.ToString(); + } + user_cb_.Run(new_status); +} + +GetColocatedTabletSchemaRpc::GetColocatedTabletSchemaRpc(YBClient* client, + StatusCallback user_cb, + const YBTableName& table_name, + vector* info, + CoarseTimePoint deadline, + rpc::Messenger* messenger, + rpc::ProxyCache* proxy_cache) + : GetColocatedTabletSchemaRpc( + client, user_cb, ToTableIdentifierPB(table_name), info, deadline, messenger, + proxy_cache) { +} + +GetColocatedTabletSchemaRpc::GetColocatedTabletSchemaRpc(YBClient* client, + StatusCallback user_cb, + const TableId& table_id, + vector* info, + CoarseTimePoint deadline, + rpc::Messenger* messenger, + rpc::ProxyCache* proxy_cache) + : GetColocatedTabletSchemaRpc( + client, user_cb, ToTableIdentifierPB(table_id), info, deadline, messenger, proxy_cache) {} + +GetColocatedTabletSchemaRpc::GetColocatedTabletSchemaRpc( + YBClient* client, + StatusCallback user_cb, + const master::TableIdentifierPB& table_identifier, + vector* info, + CoarseTimePoint deadline, + rpc::Messenger* messenger, + rpc::ProxyCache* proxy_cache) + : ClientMasterRpc(client, deadline, messenger, proxy_cache), + user_cb_(std::move(user_cb)), + table_identifier_(table_identifier), + info_(DCHECK_NOTNULL(info)) { + req_.mutable_parent_colocated_table()->CopyFrom(table_identifier_); +} + +GetColocatedTabletSchemaRpc::~GetColocatedTabletSchemaRpc() { +} + +void GetColocatedTabletSchemaRpc::CallRemoteMethod() { + master_proxy()->GetColocatedTabletSchemaAsync( + req_, &resp_, mutable_retrier()->mutable_controller(), + std::bind(&GetColocatedTabletSchemaRpc::Finished, this, Status::OK())); +} + +string GetColocatedTabletSchemaRpc::ToString() const { + return Substitute("GetColocatedTabletSchemaRpc(table_identifier: $0, num_attempts: $1)", + table_identifier_.ShortDebugString(), num_attempts()); +} + +void GetColocatedTabletSchemaRpc::ProcessResponse(const Status& status) { + auto new_status = status; + if (new_status.ok()) { + for (const auto& resp : resp_.get_table_schema_response_pbs()) { + info_->emplace_back(); + new_status = CreateTableInfoFromTableSchemaResp(resp, &info_->back()); + if (!new_status.ok()) { + break; } - CHECK_GT(info_->table_id.size(), 0) << "Running against a too-old master"; - info_->colocated = resp_.colocated(); } } if (!new_status.ok()) { @@ -1732,6 +1848,23 @@ Status YBClient::Data::GetTableSchemaById(YBClient* client, return Status::OK(); } +Status YBClient::Data::GetColocatedTabletSchemaById( + YBClient* client, + const TableId& parent_colocated_table_id, + CoarseTimePoint deadline, + std::shared_ptr> info, + StatusCallback callback) { + auto rpc = rpc::StartRpc( + client, + callback, + parent_colocated_table_id, + info.get(), + deadline, + messenger_, + proxy_cache_.get()); + return Status::OK(); +} + Result YBClient::Data::GetIndexPermissions( YBClient* client, const TableId& table_id, diff --git a/src/yb/client/client-internal.h b/src/yb/client/client-internal.h index ea64a3708f0a..d4a8ea32e217 100644 --- a/src/yb/client/client-internal.h +++ b/src/yb/client/client-internal.h @@ -230,6 +230,11 @@ class YBClient::Data { CoarseTimePoint deadline, std::shared_ptr info, StatusCallback callback); + CHECKED_STATUS GetColocatedTabletSchemaById(YBClient* client, + const TableId& parent_colocated_table_id, + CoarseTimePoint deadline, + std::shared_ptr> info, + StatusCallback callback); Result GetIndexPermissions( YBClient* client, diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 2afef2231fbf..436b5f7781f9 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -94,6 +94,8 @@ using yb::master::GetNamespaceInfoRequestPB; using yb::master::GetNamespaceInfoResponsePB; using yb::master::GetTableSchemaRequestPB; using yb::master::GetTableSchemaResponsePB; +using yb::master::GetColocatedTabletSchemaRequestPB; +using yb::master::GetColocatedTabletSchemaResponsePB; using yb::master::GetTableLocationsRequestPB; using yb::master::GetTableLocationsResponsePB; using yb::master::GetTabletLocationsRequestPB; @@ -631,6 +633,17 @@ Status YBClient::GetTableSchemaById(const TableId& table_id, std::shared_ptrGetTableSchemaById(this, table_id, deadline, info, callback); } +Status YBClient::GetColocatedTabletSchemaById(const TableId& parent_colocated_table_id, + std::shared_ptr> info, + StatusCallback callback) { + auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout(); + return data_->GetColocatedTabletSchemaById(this, + parent_colocated_table_id, + deadline, + info, + callback); +} + Result YBClient::GetIndexPermissions( const TableId& table_id, const TableId& index_id) { diff --git a/src/yb/client/client.h b/src/yb/client/client.h index cf00ce88fed4..bcf6a44b8f4a 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -330,6 +330,10 @@ class YBClient { CHECKED_STATUS GetTableSchemaById(const TableId& table_id, std::shared_ptr info, StatusCallback callback); + CHECKED_STATUS GetColocatedTabletSchemaById(const TableId& parent_colocated_table_id, + std::shared_ptr> info, + StatusCallback callback); + Result GetIndexPermissions( const TableId& table_id, const TableId& index_id); @@ -735,6 +739,7 @@ class YBClient { friend class YBTableCreator; friend class internal::Batcher; friend class internal::GetTableSchemaRpc; + friend class internal::GetColocatedTabletSchemaRpc; friend class internal::LookupRpc; friend class internal::MetaCache; friend class internal::RemoteTablet; diff --git a/src/yb/client/schema.h b/src/yb/client/schema.h index d740f7cbe7c4..c47333ccee3c 100644 --- a/src/yb/client/schema.h +++ b/src/yb/client/schema.h @@ -73,6 +73,7 @@ namespace client { namespace internal { class GetTableSchemaRpc; +class GetColocatedTabletSchemaRpc; class LookupRpc; class WriteRpc; diff --git a/src/yb/client/table.h b/src/yb/client/table.h index 164be442ef58..5cfd2271ba5a 100644 --- a/src/yb/client/table.h +++ b/src/yb/client/table.h @@ -147,6 +147,7 @@ class YBTable : public std::enable_shared_from_this { private: friend class YBClient; friend class internal::GetTableSchemaRpc; + friend class internal::GetColocatedTabletSchemaRpc; struct VersionedPartitions { std::vector keys; diff --git a/src/yb/integration-tests/CMakeLists.txt b/src/yb/integration-tests/CMakeLists.txt index 7b5cabf95e81..4ba696335321 100644 --- a/src/yb/integration-tests/CMakeLists.txt +++ b/src/yb/integration-tests/CMakeLists.txt @@ -53,6 +53,7 @@ target_link_libraries(integration-tests tserver tserver_test_util master + pq_utils ql_api ysck yb-admin_lib diff --git a/src/yb/integration-tests/mini_cluster.cc b/src/yb/integration-tests/mini_cluster.cc index fdf608d1d8cd..eead3c392722 100644 --- a/src/yb/integration-tests/mini_cluster.cc +++ b/src/yb/integration-tests/mini_cluster.cc @@ -819,12 +819,12 @@ Status WaitForInitDb(MiniCluster* cluster) { LOG(INFO) << "IsInitDbDone failure: " << status; continue; } - if (resp.done()) { - return Status::OK(); - } if (resp.has_initdb_error()) { return STATUS_FORMAT(RuntimeError, "Init DB failed: $0", resp.initdb_error()); } + if (resp.done()) { + return Status::OK(); + } std::this_thread::sleep_for(500ms); } diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 80d03e7d8fdf..326db76ad424 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -4387,6 +4387,65 @@ Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req, return Status::OK(); } +Status CatalogManager::GetColocatedTabletSchema(const GetColocatedTabletSchemaRequestPB* req, + GetColocatedTabletSchemaResponsePB* resp) { + VLOG(1) << "Servicing GetColocatedTabletSchema request for " << req->ShortDebugString(); + RETURN_NOT_OK(CheckOnline()); + + // Lookup the given parent colocated table and verify if it exists. + scoped_refptr parent_colocated_table; + { + TRACE("Looking up table"); + RETURN_NOT_OK(FindTable(req->parent_colocated_table(), &parent_colocated_table)); + TRACE("Locking table"); + auto l = parent_colocated_table->LockForRead(); + RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(l.get(), resp)); + } + + if (!parent_colocated_table->colocated() || !IsColocatedParentTable(*parent_colocated_table)) { + return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_TABLE_TYPE, + STATUS(InvalidArgument, "Table provided is not a parent colocated table")); + } + + // Next get all the user tables that are in the database. + ListTablesRequestPB listTablesReq; + ListTablesResponsePB ListTablesResp; + + listTablesReq.mutable_namespace_()->set_id(parent_colocated_table->namespace_id()); + listTablesReq.mutable_namespace_()->set_database_type(YQL_DATABASE_PGSQL); + listTablesReq.set_exclude_system_tables(true); + Status status = ListTables(&listTablesReq, &ListTablesResp); + if (!status.ok() || ListTablesResp.has_error()) { + LOG(ERROR) << "Error while listing tables: " << status; + return SetupError(resp->mutable_error(), MasterErrorPB::OBJECT_NOT_FOUND, status); + } + + // Get the table schema for each colocated table. + for (const auto& t : ListTablesResp.tables()) { + // Need to check if this table is colocated first. + scoped_refptr table; + TableIdentifierPB t_pb; + t_pb.set_table_id(t.id()); + TRACE("Looking up table"); + RETURN_NOT_OK(FindTable(t_pb, &table)); + + if (table->colocated()) { + // Now we can get the schema for this table. + GetTableSchemaRequestPB schemaReq; + GetTableSchemaResponsePB schemaResp; + schemaReq.mutable_table()->Swap(&t_pb); + status = GetTableSchema(&schemaReq, &schemaResp); + if (!status.ok() || schemaResp.has_error()) { + LOG(ERROR) << "Error while getting table schema: " << status; + return SetupError(resp->mutable_error(), MasterErrorPB::OBJECT_NOT_FOUND, status); + } + resp->add_get_table_schema_response_pbs()->Swap(&schemaResp); + } + } + + return Status::OK(); +} + Status CatalogManager::ListTables(const ListTablesRequestPB* req, ListTablesResponsePB* resp) { RETURN_NOT_OK(CheckOnline()); diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index 4770610b3a37..eb0feb208e4e 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -289,6 +289,10 @@ class CatalogManager : public tserver::TabletPeerLookupIf { CHECKED_STATUS GetTableSchema(const GetTableSchemaRequestPB* req, GetTableSchemaResponsePB* resp); + // Get the information about the specified colocated databsae. + CHECKED_STATUS GetColocatedTabletSchema(const GetColocatedTabletSchemaRequestPB* req, + GetColocatedTabletSchemaResponsePB* resp); + // List all the running tables. CHECKED_STATUS ListTables(const ListTablesRequestPB* req, ListTablesResponsePB* resp); diff --git a/src/yb/master/master.proto b/src/yb/master/master.proto index d491e8c74998..138dd22e724e 100644 --- a/src/yb/master/master.proto +++ b/src/yb/master/master.proto @@ -1188,6 +1188,18 @@ message GetTableSchemaResponsePB { optional bool colocated = 13; } +message GetColocatedTabletSchemaRequestPB { + required TableIdentifierPB parent_colocated_table = 1; +} + +message GetColocatedTabletSchemaResponsePB { + // The error, if an error occurred with this request. + optional MasterErrorPB error = 1; + + // List of all colocated user tables in this colocated database. + repeated GetTableSchemaResponsePB get_table_schema_response_pbs = 2; +} + // ============================================================================ // Administration/monitoring // ============================================================================ @@ -1954,6 +1966,8 @@ service MasterService { rpc ListTables(ListTablesRequestPB) returns (ListTablesResponsePB); rpc GetTableLocations(GetTableLocationsRequestPB) returns (GetTableLocationsResponsePB); rpc GetTableSchema(GetTableSchemaRequestPB) returns (GetTableSchemaResponsePB); + rpc GetColocatedTabletSchema(GetColocatedTabletSchemaRequestPB) + returns (GetColocatedTabletSchemaResponsePB); rpc CreateNamespace(CreateNamespaceRequestPB) returns (CreateNamespaceResponsePB); rpc IsCreateNamespaceDone(IsCreateNamespaceDoneRequestPB) diff --git a/src/yb/master/master_service.cc b/src/yb/master/master_service.cc index 2c57390b0c64..b80decfff726 100644 --- a/src/yb/master/master_service.cc +++ b/src/yb/master/master_service.cc @@ -397,6 +397,12 @@ void MasterServiceImpl::GetTableSchema(const GetTableSchemaRequestPB* req, HandleIn(req, resp, &rpc, &CatalogManager::GetTableSchema); } +void MasterServiceImpl::GetColocatedTabletSchema(const GetColocatedTabletSchemaRequestPB* req, + GetColocatedTabletSchemaResponsePB* resp, + RpcContext rpc) { + HandleIn(req, resp, &rpc, &CatalogManager::GetColocatedTabletSchema); +} + void MasterServiceImpl::CreateNamespace(const CreateNamespaceRequestPB* req, CreateNamespaceResponsePB* resp, RpcContext rpc) { diff --git a/src/yb/master/master_service.h b/src/yb/master/master_service.h index 4a477dcf9c5b..eecbea2aa684 100644 --- a/src/yb/master/master_service.h +++ b/src/yb/master/master_service.h @@ -93,6 +93,9 @@ class MasterServiceImpl : public MasterServiceIf, void GetTableSchema(const GetTableSchemaRequestPB* req, GetTableSchemaResponsePB* resp, rpc::RpcContext rpc) override; + void GetColocatedTabletSchema(const GetColocatedTabletSchemaRequestPB* req, + GetColocatedTabletSchemaResponsePB* resp, + rpc::RpcContext rpc) override; void ListTabletServers(const ListTabletServersRequestPB* req, ListTabletServersResponsePB* resp, rpc::RpcContext rpc) override;