Skip to content

Commit

Permalink
ENG-3200: Table splits should be created based on primary cluster tse…
Browse files Browse the repository at this point in the history
…rver count.

Summary:
Added a new api to count only the tablet servers whose placement uuid matches the masters'. This would only count the tservers that are alive and heartbeating from the primary cluster. This is used to calculate the default number of tablets during table create on master leader. Also enhanced the c++ client to use the primary cluster count.

Test Plan:
Jenkins

Redis table created the correct number of tablets on Mac, using only primary cluster tserver count:
{F9093}

CQL table create also as expected now:
{F9095}

Reviewers: rahuldesirazu, bogdan, hector

Reviewed By: hector

Subscribers: kannan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D4680
  • Loading branch information
bbaddepudi committed Apr 27, 2018
1 parent 92cac97 commit e9f3735
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 20 deletions.
3 changes: 1 addition & 2 deletions java/yb-cql/src/test/java/org/yb/cql/TestSystemTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,7 @@ public void testSystemSchemaPartitionsTable() throws Exception {
List<Row> partitions = session.execute("SELECT * FROM system.partitions WHERE " +
"keyspace_name = 'test_keyspace' AND " +
"table_name = 'test_table';").all();
// Add 1 to account for tserver started in testSystemPeersTable().
assertEquals(miniCluster.getNumShardsPerTserver() * (NUM_TABLET_SERVERS + 1),
assertEquals(miniCluster.getNumShardsPerTserver() * NUM_TABLET_SERVERS,
partitions.size());

HashSet<InetAddress> contactPoints = new HashSet<>();
Expand Down
5 changes: 3 additions & 2 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -630,9 +630,10 @@ CHECKED_STATUS YBClient::DeleteUDType(const std::string &namespace_name,
return Status::OK();
}

Status YBClient::TabletServerCount(int *tserver_count) {
Status YBClient::TabletServerCount(int *tserver_count, bool primary_only) {
ListTabletServersRequestPB req;
ListTabletServersResponsePB resp;
req.set_primary_only(primary_only);
CALL_SYNC_LEADER_MASTER_RPC(req, resp, ListTabletServers);
*tserver_count = resp.servers_size();
return Status::OK();
Expand Down Expand Up @@ -1165,7 +1166,7 @@ Status YBTableCreator::Create() {
<< ": --yb_num_total_tablets is specified.";
} else {
int tserver_count = 0;
RETURN_NOT_OK(data_->client_->TabletServerCount(&tserver_count));
RETURN_NOT_OK(data_->client_->TabletServerCount(&tserver_count, true /* primary_only */));
data_->num_tablets_ = tserver_count * FLAGS_yb_num_shards_per_tserver;
VLOG(1) << "num_tablets = " << data_->num_tablets_ << ": "
<< "calculated as tserver_count * FLAGS_yb_num_shards_per_tserver ("
Expand Down
3 changes: 2 additions & 1 deletion src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ class YBClient : public std::enable_shared_from_this<YBClient> {

// Find the number of tservers. This function should not be called frequently for reading or
// writing actual data. Currently, it is called only for SQL DDL statements.
CHECKED_STATUS TabletServerCount(int *tserver_count);
// If primary_only is set to true, we expect the primary/sync cluster tserver count only.
CHECKED_STATUS TabletServerCount(int *tserver_count, bool primary_only = false);

CHECKED_STATUS ListTabletServers(std::vector<std::unique_ptr<YBTabletServer>>* tablet_servers);

Expand Down
33 changes: 21 additions & 12 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1550,15 +1550,24 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
// }
}

// Get cluster level placement info.
ReplicationInfoPB replication_info;
{
auto l = cluster_config_->LockForRead();
replication_info = l->data().pb.replication_info();
}

// Calculate number of tablets to be used.
TSDescriptorVector ts_descs;
master_->ts_manager()->GetAllLiveDescriptors(&ts_descs);
int num_live_tservers = ts_descs.size();
int num_tablets = req.num_tablets();
if (num_tablets <= 0) {
// Try to use default: client could have gotten the value before any tserver had heartbeated
// Use default as client could have gotten the value before any tserver had heartbeated
// to (a new) master leader.
num_tablets = num_live_tservers * FLAGS_yb_num_shards_per_tserver;
TSDescriptorVector ts_descs;
master_->ts_manager()->GetAllLiveDescriptorsInCluster(
&ts_descs, replication_info.live_replicas().placement_uuid());
num_tablets = ts_descs.size() * FLAGS_yb_num_shards_per_tserver;
LOG(INFO) << "Setting default tablets to " << num_tablets << " with "
<< ts_descs.size() << " primary servers";
}

// Create partitions.
Expand Down Expand Up @@ -1589,13 +1598,6 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_SCHEMA, s);
}

// Default to the cluster placement object.
ReplicationInfoPB replication_info;
{
auto l = cluster_config_->LockForRead();
replication_info = l->data().pb.replication_info();
}

TSDescriptorVector all_ts_descs;
master_->ts_manager()->GetAllLiveDescriptors(&all_ts_descs);
s = CheckValidReplicationInfo(replication_info, all_ts_descs, partitions, resp);
Expand Down Expand Up @@ -5297,6 +5299,13 @@ Status CatalogManager::GetReplicationFactor(int* num_replicas) {
return Status::OK();
}

string CatalogManager::placement_uuid() const {
DCHECK(cluster_config_) << "Missing cluster config for master!";
auto l = cluster_config_->LockForRead();
const ReplicationInfoPB& replication_info = l->data().pb.replication_info();
return replication_info.live_replicas().placement_uuid();
}

Status CatalogManager::IsLoadBalanced(const IsLoadBalancedRequestPB* req,
IsLoadBalancedResponsePB* resp) {
TSDescriptorVector ts_descs;
Expand Down
3 changes: 3 additions & 0 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,9 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
CHECKED_STATUS IsLoadBalanced(const IsLoadBalancedRequestPB* req,
IsLoadBalancedResponsePB* resp);

// Return the placement uuid of the primary cluster containing this master.
string placement_uuid() const;

// Clears out the existing metadata ('table_names_map_', 'table_ids_map_',
// and 'tablet_map_'), loads tables metadata into memory and if successful
// loads the tablets metadata.
Expand Down
1 change: 0 additions & 1 deletion src/yb/master/master-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,6 @@ Status MasterTest::DoCreateTable(const NamespaceName& namespace_name,
CreateTableResponsePB resp;

request->set_name(table_name);
request->set_num_tablets(3);
RETURN_NOT_OK(SchemaToPB(schema, request->mutable_schema()));

if (!namespace_name.empty()) {
Expand Down
2 changes: 2 additions & 0 deletions src/yb/master/master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,8 @@ message GetTableSchemaResponsePB {
// ============================================================================

message ListTabletServersRequestPB {
// If set to true, return only the tservers which are in the primary/sync cluster.
optional bool primary_only = 1 [default = false];
}

message ListTabletServersResponsePB {
Expand Down
9 changes: 8 additions & 1 deletion src/yb/master/master_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,14 @@ void MasterServiceImpl::ListTabletServers(const ListTabletServersRequestPB* req,
}

std::vector<std::shared_ptr<TSDescriptor> > descs;
server_->ts_manager()->GetAllDescriptors(&descs);
if (!req->primary_only()) {
server_->ts_manager()->GetAllDescriptors(&descs);
} else {
server_->ts_manager()->GetAllLiveDescriptorsInCluster(
&descs,
server_->catalog_manager()->placement_uuid());
}

for (const std::shared_ptr<TSDescriptor>& desc : descs) {
ListTabletServersResponsePB::Entry* entry = resp->add_servers();
desc->GetNodeInstancePB(entry->mutable_instance_id());
Expand Down
10 changes: 10 additions & 0 deletions src/yb/master/ts_descriptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,19 @@ Status TSDescriptor::RegisterUnlocked(const NodeInstancePB& instance,

proxies_.reset();

placement_uuid_ = "";
if (registration.common().has_placement_uuid()) {
placement_uuid_ = registration.common().placement_uuid();
}

return Status::OK();
}

std::string TSDescriptor::placement_uuid() const {
std::lock_guard<simple_spinlock> l(lock_);
return placement_uuid_;
}

std::string TSDescriptor::generate_placement_id(const CloudInfoPB& ci) {
return strings::Substitute(
"$0:$1:$2", ci.placement_cloud(), ci.placement_region(), ci.placement_zone());
Expand Down
5 changes: 5 additions & 0 deletions src/yb/master/ts_descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class TSDescriptor {
// Return the pre-computed placement_id, comprised of the cloud_info data.
std::string placement_id() const;

std::string placement_uuid() const;

bool IsRunningOn(const HostPortPB& hp) const;

void GetNodeInstancePB(NodeInstancePB* instance_pb) const;
Expand Down Expand Up @@ -272,6 +274,9 @@ class TSDescriptor {
gscoped_ptr<TSRegistrationPB> registration_;
std::string placement_id_;

// The (read replica) cluster uuid to which this tserver belongs.
std::string placement_uuid_;

YB_EDITION_NS_PREFIX ProxyTuple proxies_;

// Set of tablet uuids for which a delete is pending on this tablet server.
Expand Down
18 changes: 17 additions & 1 deletion src/yb/master/ts_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ void TSManager::GetDescriptors(std::function<bool(const TSDescSharedPtr&)> condi
}
}


void TSManager::GetAllDescriptors(TSDescriptorVector* descs) const {
GetDescriptors([](const TSDescSharedPtr& ts) -> bool { return !ts->IsRemoved(); }, descs);
}
Expand All @@ -173,6 +172,23 @@ void TSManager::GetAllReportedDescriptors(TSDescriptorVector* descs) const {
-> bool { return IsTSLive(ts) && ts->has_tablet_report(); }, descs);
}

bool TSManager::IsTsInCluster(const TSDescSharedPtr& ts, string cluster_uuid) {
return cluster_uuid.empty() || ts->placement_uuid() == cluster_uuid;
}

void TSManager::GetAllLiveDescriptorsInCluster(TSDescriptorVector* descs,
string placement_uuid) const {
descs->clear();
boost::shared_lock<rw_spinlock> l(lock_);
descs->reserve(servers_by_id_.size());
for (const TSDescriptorMap::value_type& entry : servers_by_id_) {
const TSDescSharedPtr& ts = entry.second;
if (IsTSLive(ts) && IsTsInCluster(ts, placement_uuid)) {
descs->push_back(ts);
}
}
}

const TSDescSharedPtr TSManager::GetTSDescriptor(const HostPortPB& host_port) const {
boost::shared_lock<rw_spinlock> l(lock_);
for (const TSDescriptorMap::value_type& entry : servers_by_id_) {
Expand Down
7 changes: 7 additions & 0 deletions src/yb/master/ts_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ class TSManager {
// heartbeat recently, indicating that they're alive and well.
void GetAllLiveDescriptors(TSDescriptorVector* descs) const;

// Return all of the currently registered TS descriptors that have sent a heartbeat
// recently and are in the same 'cluster' with given placement uuid.
void GetAllLiveDescriptorsInCluster(TSDescriptorVector* descs, string placement_uuid) const;

// Return all of the currently registered TS descriptors that have sent a
// heartbeat, indicating that they're alive and well, recently and have given
// full report of their tablets as well.
Expand All @@ -111,6 +115,9 @@ class TSManager {

static bool IsTSLive(const TSDescSharedPtr& ts);

// Check if the placement uuid of the tserver is same as given cluster uuid.
static bool IsTsInCluster(const TSDescSharedPtr& ts, string cluster_uuid);

private:

void GetDescriptors(std::function<bool(const TSDescSharedPtr&)> condition,
Expand Down

0 comments on commit e9f3735

Please sign in to comment.