Skip to content

Commit

Permalink
[#4516] 2dc: Initial support for colocated databases
Browse files Browse the repository at this point in the history
Summary:
Adding initial support for 2dc with colocated databases.
This is an initial implementation for this feature and thus has a lot of room for improvements.

This implementation allows for a user to setup replication for a colocated database's colocated
tablet. This will also perform a schema validation of all the tables on the producer side and ensure
that they match the tables on the consumer side (this also performs validation that the tables have
the same postgres table oid - see #5982). Once validation is complete, a single stream is setup
between the colocated tablets which copies all the table data from the producer.

Currently, the main implementation is primarily based around using each colocated database's parent
colocated table id since that is the name of the directory where all its data is stored. I believe
this is still the cleanest way of implementing this on the backend, but work will need to be done in
order to make the user facing side more friendly. Note that this will require some additional
mapping on the _consumer_ side that maps _producer_ parent colocated table id to dbname.

Usage: Currently using all the same regular 2dc yb-admin commands without any changes. In order to
reference a colocated database, use its parent colocated table id instead of a regular table id (this
can be found via `yb-admin ... list_tables include_table_id`, or by knowing the database id and
appending `.colocated.parent.uuid`).

Example:
```
yb-admin -master_addresses 127.0.0.2:7100 setup_universe_replication \
  2498e14e-b964-481d-9894-794f4cf06be3 127.0.0.1:7100 \
  00004000000030008000000000000000.colocated.parent.uuid
```
This also works similarly for `alter_universe_replication` and `bootstrap_cdc_producer`.

`list_cdc_streams` output: (notice the table_id field)
```
~/code/yugabyte-db >>> build/latest/bin/yb-admin -master_addresses 127.0.0.1:7100 list_cdc_streams
CDC Streams:
streams {
  stream_id: "6d2715509021495ba269b3395508ce19"
  table_id: "00004000000030008000000000000000.colocated.parent.uuid"
  options {
      key: "record_format"
      value: "WAL"
  }
  options {
      key: "record_type"
      value: "CHANGE"
  }
}
```

Test Plan:
```
ybd --cxx-test twodc_ysql-test
```

Reviewers: bogdan, nicolas, rahuldesirazu

Reviewed By: rahuldesirazu

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D9841
  • Loading branch information
hulien22 committed Dec 17, 2020
1 parent 71ed762 commit 07f2c71
Show file tree
Hide file tree
Showing 20 changed files with 1,717 additions and 339 deletions.
2 changes: 2 additions & 0 deletions ent/src/yb/integration-tests/CMakeLists-include.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ string(REPLACE ${CMAKE_SOURCE_DIR} ${CMAKE_SOURCE_DIR}/ent

set(INTEGRATION_TESTS_SRCS_EXTENSIONS
${YB_ENT_CURRENT_SOURCE_DIR}/external_mini_cluster_ent.cc
${YB_ENT_CURRENT_SOURCE_DIR}/twodc_test_base.cc
PARENT_SCOPE)

# Additional tests support.
Expand All @@ -41,5 +42,6 @@ set(INTEGRATION_TESTS_EXTENSIONS_TESTS
encryption-test
cdc_service-int-test
cdc_service-txn-test
twodc_ysql-test
twodc-test
PARENT_SCOPE)
294 changes: 28 additions & 266 deletions ent/src/yb/integration-tests/twodc-test.cc

Large diffs are not rendered by default.

235 changes: 235 additions & 0 deletions ent/src/yb/integration-tests/twodc_test_base.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/integration-tests/twodc_test_base.h"

#include <string>

#include "yb/cdc/cdc_service.h"
#include "yb/client/client.h"

#include "yb/integration-tests/cdc_test_util.h"
#include "yb/integration-tests/mini_cluster.h"
#include "yb/master/catalog_manager.h"
#include "yb/master/master.h"
#include "yb/master/mini_master.h"
#include "yb/tserver/cdc_consumer.h"
#include "yb/tserver/tablet_server.h"
#include "yb/util/test_util.h"
#include "yb/yql/pgwrapper/libpq_utils.h"
#include "yb/yql/pgwrapper/pg_wrapper.h"

namespace yb {

using client::YBClient;
using tserver::enterprise::CDCConsumer;

namespace enterprise {

void TwoDCTestBase::Destroy() {
LOG(INFO) << "Destroying CDC Clusters";
if (consumer_cluster()) {
if (consumer_cluster_.pg_supervisor_) {
consumer_cluster_.pg_supervisor_->Stop();
}
consumer_cluster_.mini_cluster_->Shutdown();
consumer_cluster_.mini_cluster_.reset();
}

if (producer_cluster()) {
if (producer_cluster_.pg_supervisor_) {
producer_cluster_.pg_supervisor_->Stop();
}
producer_cluster_.mini_cluster_->Shutdown();
producer_cluster_.mini_cluster_.reset();
}

producer_cluster_.client_.reset();
producer_cluster_.client_.reset();
}

Status TwoDCTestBase::SetupUniverseReplication(
MiniCluster* producer_cluster, MiniCluster* consumer_cluster, YBClient* consumer_client,
const std::string& universe_id, const std::vector<std::shared_ptr<client::YBTable>>& tables,
bool leader_only) {
master::SetupUniverseReplicationRequestPB req;
master::SetupUniverseReplicationResponsePB resp;

req.set_producer_id(universe_id);
string master_addr = producer_cluster->GetMasterAddresses();
if (leader_only) master_addr = producer_cluster->leader_mini_master()->bound_rpc_addr_str();
auto hp_vec = VERIFY_RESULT(HostPort::ParseStrings(master_addr, 0));
HostPortsToPBs(hp_vec, req.mutable_producer_master_addresses());

req.mutable_producer_table_ids()->Reserve(tables.size());
for (const auto& table : tables) {
req.add_producer_table_ids(table->id());
}

auto master_proxy = std::make_shared<master::MasterServiceProxy>(
&consumer_client->proxy_cache(),
consumer_cluster->leader_mini_master()->bound_rpc_addr());

rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
RETURN_NOT_OK(master_proxy->SetupUniverseReplication(req, &resp, &rpc));
if (resp.has_error()) {
return STATUS(IllegalState, "Failed setting up universe replication");
}
return Status::OK();
}

Status TwoDCTestBase::VerifyUniverseReplication(
MiniCluster* consumer_cluster, YBClient* consumer_client,
const std::string& universe_id, master::GetUniverseReplicationResponsePB* resp) {
return LoggedWaitFor([=]() -> Result<bool> {
master::GetUniverseReplicationRequestPB req;
req.set_producer_id(universe_id);
resp->Clear();

auto master_proxy = std::make_shared<master::MasterServiceProxy>(
&consumer_client->proxy_cache(),
consumer_cluster->leader_mini_master()->bound_rpc_addr());
rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));

Status s = master_proxy->GetUniverseReplication(req, resp, &rpc);
return s.ok() && !resp->has_error() &&
resp->entry().state() == master::SysUniverseReplicationEntryPB::ACTIVE;
}, MonoDelta::FromSeconds(kRpcTimeout), "Verify universe replication");
}

Status TwoDCTestBase::ToggleUniverseReplication(
MiniCluster* consumer_cluster, YBClient* consumer_client,
const std::string& universe_id, bool is_enabled) {
master::SetUniverseReplicationEnabledRequestPB req;
master::SetUniverseReplicationEnabledResponsePB resp;

req.set_producer_id(universe_id);
req.set_is_enabled(is_enabled);

auto master_proxy = std::make_shared<master::MasterServiceProxy>(
&consumer_client->proxy_cache(),
consumer_cluster->leader_mini_master()->bound_rpc_addr());

rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
RETURN_NOT_OK(master_proxy->SetUniverseReplicationEnabled(req, &resp, &rpc));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
return Status::OK();
}

Status TwoDCTestBase::VerifyUniverseReplicationDeleted(MiniCluster* consumer_cluster,
YBClient* consumer_client, const std::string& universe_id, int timeout) {
return LoggedWaitFor([=]() -> Result<bool> {
master::GetUniverseReplicationRequestPB req;
master::GetUniverseReplicationResponsePB resp;
req.set_producer_id(universe_id);

auto master_proxy = std::make_shared<master::MasterServiceProxy>(
&consumer_client->proxy_cache(),
consumer_cluster->leader_mini_master()->bound_rpc_addr());
rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));

Status s = master_proxy->GetUniverseReplication(req, &resp, &rpc);
return resp.has_error() && resp.error().code() == master::MasterErrorPB::OBJECT_NOT_FOUND;
}, MonoDelta::FromMilliseconds(timeout), "Verify universe replication deleted");
}

Status TwoDCTestBase::GetCDCStreamForTable(
const std::string& table_id, master::ListCDCStreamsResponsePB* resp) {
return LoggedWaitFor([=]() -> Result<bool> {
master::ListCDCStreamsRequestPB req;
req.set_table_id(table_id);
resp->Clear();

Status s = producer_cluster()->leader_mini_master()->master()->catalog_manager()->
ListCDCStreams(&req, resp);
return s.ok() && !resp->has_error() && resp->streams_size() == 1;
}, MonoDelta::FromSeconds(kRpcTimeout), "Get CDC stream for table");
}

uint32_t TwoDCTestBase::GetSuccessfulWriteOps(MiniCluster* cluster) {
uint32_t size = 0;
for (const auto& mini_tserver : cluster->mini_tablet_servers()) {
auto* tserver = dynamic_cast<tserver::enterprise::TabletServer*>(mini_tserver->server());
CDCConsumer* cdc_consumer;
if (tserver && (cdc_consumer = tserver->GetCDCConsumer())) {
size += cdc_consumer->GetNumSuccessfulWriteRpcs();
}
}
return size;
}

Status TwoDCTestBase::DeleteUniverseReplication(const std::string& universe_id) {
return DeleteUniverseReplication(universe_id, consumer_client(), consumer_cluster());
}

Status TwoDCTestBase::DeleteUniverseReplication(
const std::string& universe_id, YBClient* client, MiniCluster* cluster) {
master::DeleteUniverseReplicationRequestPB req;
master::DeleteUniverseReplicationResponsePB resp;

req.set_producer_id(universe_id);

auto master_proxy = std::make_shared<master::MasterServiceProxy>(
&client->proxy_cache(),
cluster->leader_mini_master()->bound_rpc_addr());

rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
RETURN_NOT_OK(master_proxy->DeleteUniverseReplication(req, &resp, &rpc));
LOG(INFO) << "Delete universe succeeded";
return Status::OK();
}

uint32_t TwoDCTestBase::NumProducerTabletsPolled(MiniCluster* cluster) {
uint32_t size = 0;
for (const auto& mini_tserver : cluster->mini_tablet_servers()) {
uint32_t new_size = 0;
auto* tserver = dynamic_cast<tserver::enterprise::TabletServer*>(
mini_tserver->server());
CDCConsumer* cdc_consumer;
if (tserver && (cdc_consumer = tserver->GetCDCConsumer())) {
auto tablets_running = cdc_consumer->TEST_producer_tablets_running();
new_size = tablets_running.size();
}
size += new_size;
}
return size;
}

Status TwoDCTestBase::CorrectlyPollingAllTablets(
MiniCluster* cluster, uint32_t num_producer_tablets) {
return LoggedWaitFor([=]() -> Result<bool> {
static int i = 0;
constexpr int kNumIterationsWithCorrectResult = 5;
auto cur_tablets = NumProducerTabletsPolled(cluster);
if (cur_tablets == num_producer_tablets) {
if (i++ == kNumIterationsWithCorrectResult) {
i = 0;
return true;
}
} else {
i = 0;
}
LOG(INFO) << "Tablets being polled: " << cur_tablets;
return false;
}, MonoDelta::FromSeconds(kRpcTimeout), "Num producer tablets being polled");
}

} // namespace enterprise
} // namespace yb
139 changes: 139 additions & 0 deletions ent/src/yb/integration-tests/twodc_test_base.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#ifndef ENT_SRC_YB_INTEGRATION_TESTS_TWODC_TEST_BASE_H
#define ENT_SRC_YB_INTEGRATION_TESTS_TWODC_TEST_BASE_H

#include <string>

#include "yb/client/client.h"

#include "yb/integration-tests/cdc_test_util.h"
#include "yb/integration-tests/mini_cluster.h"
#include "yb/master/master.h"

#include "yb/util/test_util.h"
#include "yb/yql/pgwrapper/libpq_utils.h"
#include "yb/yql/pgwrapper/pg_wrapper.h"

DECLARE_int32(cdc_read_rpc_timeout_ms);
DECLARE_int32(cdc_write_rpc_timeout_ms);
DECLARE_bool(TEST_check_broadcast_address);
DECLARE_bool(flush_rocksdb_on_shutdown);
DECLARE_bool(cdc_enable_replicate_intents);

namespace yb {

using client::YBClient;

namespace enterprise {

constexpr int kRpcTimeout = NonTsanVsTsan(30, 120);
static const std::string kUniverseId = "test_universe";
static const std::string kNamespaceName = "test_namespace";

class TwoDCTestBase : public YBTest {
public:
class Cluster {
public:
std::unique_ptr<MiniCluster> mini_cluster_;
std::unique_ptr<YBClient> client_;
std::unique_ptr<yb::pgwrapper::PgSupervisor> pg_supervisor_;
HostPort pg_host_port_;
boost::optional<client::TransactionManager> txn_mgr_;

Result<pgwrapper::PGConn> Connect() {
return pgwrapper::PGConn::Connect(pg_host_port_);
}

Result<pgwrapper::PGConn> ConnectToDB(const std::string& dbname) {
return pgwrapper::PGConn::Connect(pg_host_port_, dbname);
}
};

void SetUp() override {
YBTest::SetUp();
// Allow for one-off network instability by ensuring a single CDC RPC timeout << test timeout.
FLAGS_cdc_read_rpc_timeout_ms = (kRpcTimeout / 4) * 1000;
FLAGS_cdc_write_rpc_timeout_ms = (kRpcTimeout / 4) * 1000;
// Not a useful test for us. It's testing Public+Private IP NW errors and we're only public
FLAGS_TEST_check_broadcast_address = false;
FLAGS_cdc_enable_replicate_intents = true;
FLAGS_flush_rocksdb_on_shutdown = false;
}

void Destroy();

CHECKED_STATUS SetupUniverseReplication(
MiniCluster* producer_cluster, MiniCluster* consumer_cluster, YBClient* consumer_client,
const std::string& universe_id, const std::vector<std::shared_ptr<client::YBTable>>& tables,
bool leader_only = true);

CHECKED_STATUS VerifyUniverseReplication(
MiniCluster* consumer_cluster, YBClient* consumer_client,
const std::string& universe_id, master::GetUniverseReplicationResponsePB* resp);

CHECKED_STATUS ToggleUniverseReplication(
MiniCluster* consumer_cluster, YBClient* consumer_client,
const std::string& universe_id, bool is_enabled);

CHECKED_STATUS VerifyUniverseReplicationDeleted(MiniCluster* consumer_cluster,
YBClient* consumer_client, const std::string& universe_id, int timeout);

CHECKED_STATUS GetCDCStreamForTable(
const std::string& table_id, master::ListCDCStreamsResponsePB* resp);

uint32_t GetSuccessfulWriteOps(MiniCluster* cluster);

CHECKED_STATUS DeleteUniverseReplication(const std::string& universe_id);

CHECKED_STATUS DeleteUniverseReplication(
const std::string& universe_id, YBClient* client, MiniCluster* cluster);

uint32_t NumProducerTabletsPolled(MiniCluster* cluster);

CHECKED_STATUS CorrectlyPollingAllTablets(MiniCluster* cluster, uint32_t num_producer_tablets);

YBClient* producer_client() {
return producer_cluster_.client_.get();
}

YBClient* consumer_client() {
return consumer_cluster_.client_.get();
}

MiniCluster* producer_cluster() {
return producer_cluster_.mini_cluster_.get();
}

MiniCluster* consumer_cluster() {
return consumer_cluster_.mini_cluster_.get();
}

client::TransactionManager* producer_txn_mgr() {
return producer_cluster_.txn_mgr_.get_ptr();
}

client::TransactionManager* consumer_txn_mgr() {
return consumer_cluster_.txn_mgr_.get_ptr();
}

protected:
Cluster producer_cluster_;
Cluster consumer_cluster_;
};

} // namespace enterprise
} // namespace yb

#endif // ENT_SRC_YB_INTEGRATION_TESTS_TWODC_TEST_BASE_H
Loading

0 comments on commit 07f2c71

Please sign in to comment.