Skip to content

Commit

Permalink
fix test case
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Nov 29, 2021
1 parent af37db1 commit 88f47bb
Show file tree
Hide file tree
Showing 16 changed files with 342 additions and 247 deletions.
13 changes: 8 additions & 5 deletions src/meta/processors/parts/CreateSpaceAsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ void CreateSpaceAsProcessor::process(const cpp2::CreateSpaceAsReq &req) {
}

std::vector<kvstore::KV> data;

auto newSpaceData =
makeNewSpaceData(nebula::value(oldSpaceId), nebula::value(newSpaceId), newSpaceName);
if (nebula::ok(newSpaceData)) {
Expand Down Expand Up @@ -123,9 +122,10 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>> CreateSpaceAsProcesso
return nebula::error(partPrefix);
}
auto iter = nebula::value(partPrefix).get();
for (; iter->valid(); iter->next()) {
while (iter->valid()) {
auto partId = MetaKeyUtils::parsePartKeyPartId(iter->key());
data.emplace_back(MetaKeyUtils::partKey(newSpaceId, partId), iter->val());
iter->next();
}
return data;
}
Expand All @@ -145,7 +145,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>> CreateSpaceAsProcesso

std::vector<kvstore::KV> data;
auto iter = nebula::value(tagPrefix).get();
for (; iter->valid(); iter->next()) {
while (iter->valid()) {
auto val = iter->val();

auto tagId = MetaKeyUtils::parseTagId(iter->key());
Expand All @@ -157,6 +157,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>> CreateSpaceAsProcesso
auto tagVer = MetaKeyUtils::parseTagVersion(iter->key());
auto key = MetaKeyUtils::schemaTagKey(newSpaceId, tagId, tagVer);
data.emplace_back(std::move(key), val.str());
iter->next();
}
return data;
}
Expand All @@ -176,7 +177,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>> CreateSpaceAsProcesso

std::vector<kvstore::KV> data;
auto iter = nebula::value(edgePrefix).get();
for (; iter->valid(); iter->next()) {
while (iter->valid()) {
auto val = iter->val();

auto edgeType = MetaKeyUtils::parseEdgeType(iter->key());
Expand All @@ -188,6 +189,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>> CreateSpaceAsProcesso
auto ver = MetaKeyUtils::parseEdgeVersion(iter->key());
auto key = MetaKeyUtils::schemaEdgeKey(newSpaceId, edgeType, ver);
data.emplace_back(std::move(key), val.str());
iter->next();
}
return data;
}
Expand All @@ -207,7 +209,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>> CreateSpaceAsProcesso

std::vector<kvstore::KV> data;
auto iter = nebula::value(indexPrefix).get();
for (; iter->valid(); iter->next()) {
while (iter->valid()) {
auto val = iter->val();

auto indexId = MetaKeyUtils::parseIndexesKeyIndexID(iter->key());
Expand All @@ -219,6 +221,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>> CreateSpaceAsProcesso
std::string(reinterpret_cast<const char *>(&indexId), sizeof(indexId)));

data.emplace_back(MetaKeyUtils::indexKey(newSpaceId, indexId), MetaKeyUtils::indexVal(idxItem));
iter->next();
}
return data;
}
Expand Down
238 changes: 111 additions & 127 deletions src/meta/processors/parts/CreateSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
// storage
properties.set_partition_num(partitionNum);
}

if (replicaFactor == 0) {
replicaFactor = FLAGS_default_replica_factor;
if (replicaFactor <= 0) {
Expand All @@ -73,12 +74,14 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
// storage
properties.set_replica_factor(replicaFactor);
}

if (vidSize == 0) {
LOG(ERROR) << "Create Space Failed : vid_size is illegal: " << vidSize;
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}

if (vidType != nebula::cpp2::PropertyType::INT64 &&
vidType != nebula::cpp2::PropertyType::FIXED_STRING) {
LOG(ERROR) << "Create Space Failed : vid_type is illegal: "
Expand All @@ -87,6 +90,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
onFinished();
return;
}

if (vidType == nebula::cpp2::PropertyType::INT64 && vidSize != 8) {
LOG(ERROR) << "Create Space Failed : vid_size should be 8 if vid type is integer: " << vidSize;
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
Expand All @@ -105,148 +109,139 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {

auto spaceId = nebula::value(idRet);
std::vector<kvstore::KV> data;
data.emplace_back(MetaKeyUtils::indexSpaceKey(spaceName),
std::string(reinterpret_cast<const char*>(&spaceId), sizeof(spaceId)));
data.emplace_back(MetaKeyUtils::spaceKey(spaceId), MetaKeyUtils::spaceVal(properties));

std::vector<::std::string> zones;
nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED;
if (!properties.get_zone_names().empty()) {
auto zones = properties.get_zone_names();
for (auto& zone : zones) {
auto zoneKey = MetaKeyUtils::zoneKey(zone);
auto ret = doGet(zoneKey);
if (!nebula::ok(ret)) {
auto retCode = nebula::error(ret);
if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
code = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND;
}
LOG(ERROR) << " Get Zone Name: " << zone << " failed.";
break;
}
}

if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Create space failed";
if (properties.get_zone_names().empty()) {
const auto& zonePrefix = MetaKeyUtils::zonePrefix();
auto zoneIterRet = doPrefix(zonePrefix);
if (!nebula::ok(zoneIterRet)) {
code = nebula::error(zoneIterRet);
LOG(ERROR) << "Get zones failed, error: " << apache::thrift::util::enumNameSafe(code);
handleErrorCode(code);
onFinished();
return;
}

int32_t zoneNum = zones.size();
if (replicaFactor > zoneNum) {
LOG(ERROR) << "Replication number should less than or equal to zone number.";
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
auto zoneIter = nebula::value(zoneIterRet).get();
while (zoneIter->valid()) {
auto zoneName = MetaKeyUtils::parseZoneName(zoneIter->key());
zones.emplace_back(std::move(zoneName));
zoneIter->next();
}

auto hostLoadingRet = getHostLoading();
if (!nebula::ok(hostLoadingRet)) {
LOG(ERROR) << "Get host loading failed.";
auto retCode = nebula::error(hostLoadingRet);
if (retCode != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
retCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
properties.set_zone_names(zones);
} else {
zones = properties.get_zone_names();
}

data.emplace_back(MetaKeyUtils::indexSpaceKey(spaceName),
std::string(reinterpret_cast<const char*>(&spaceId), sizeof(spaceId)));
data.emplace_back(MetaKeyUtils::spaceKey(spaceId), MetaKeyUtils::spaceVal(properties));
for (auto& zone : zones) {
auto zoneKey = MetaKeyUtils::zoneKey(zone);
auto ret = doGet(zoneKey);
if (!nebula::ok(ret)) {
auto retCode = nebula::error(ret);
if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
code = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND;
}
handleErrorCode(retCode);
onFinished();
return;
LOG(ERROR) << " Get Zone Name: " << zone << " failed.";
break;
}
}

hostLoading_ = std::move(nebula::value(hostLoadingRet));
std::unordered_map<std::string, Hosts> zoneHosts;
for (auto& zone : zones) {
auto zoneKey = MetaKeyUtils::zoneKey(zone);
auto zoneValueRet = doGet(std::move(zoneKey));
if (!nebula::ok(zoneValueRet)) {
code = nebula::error(zoneValueRet);
if (code == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
code = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND;
}
LOG(ERROR) << "Get zone " << zone << " failed.";
break;
}
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Create space failed";
handleErrorCode(code);
onFinished();
return;
}

auto hosts = MetaKeyUtils::parseZoneHosts(std::move(nebula::value(zoneValueRet)));
for (auto& host : hosts) {
auto hostIter = hostLoading_.find(host);
if (hostIter == hostLoading_.end()) {
hostLoading_[host] = 0;
zoneLoading_[zone] += 0;
} else {
zoneLoading_[zone] += hostIter->second;
}
}
zoneHosts[zone] = std::move(hosts);
}
int32_t zoneNum = zones.size();
if (replicaFactor > zoneNum) {
LOG(ERROR) << "Replication number should less than or equal to zone number.";
LOG(ERROR) << "Replication number: " << replicaFactor << ", Zones size: " << zones.size();
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}

if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Create space failed";
handleErrorCode(code);
onFinished();
return;
auto hostLoadingRet = getHostLoading();
if (!nebula::ok(hostLoadingRet)) {
LOG(ERROR) << "Get host loading failed.";
auto retCode = nebula::error(hostLoadingRet);
if (retCode != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
retCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
}
handleErrorCode(retCode);
onFinished();
return;
}

for (auto partId = 1; partId <= partitionNum; partId++) {
auto pickedZonesRet = pickLightLoadZones(replicaFactor);
if (!pickedZonesRet.ok()) {
LOG(ERROR) << "Pick zone failed.";
code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}

auto pickedZones = std::move(pickedZonesRet).value();
auto partHostsRet = pickHostsWithZone(pickedZones, zoneHosts);
if (!partHostsRet.ok()) {
LOG(ERROR) << "Pick hosts with zone failed.";
code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
hostLoading_ = std::move(nebula::value(hostLoadingRet));
std::unordered_map<std::string, Hosts> zoneHosts;
for (auto& zone : zones) {
auto zoneKey = MetaKeyUtils::zoneKey(zone);
auto zoneValueRet = doGet(std::move(zoneKey));
if (!nebula::ok(zoneValueRet)) {
code = nebula::error(zoneValueRet);
if (code == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
code = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND;
}
LOG(ERROR) << "Get zone " << zone << " failed.";
break;
}

auto partHosts = std::move(partHostsRet).value();
if (partHosts.empty()) {
LOG(ERROR) << "Pick hosts is empty.";
code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
auto hosts = MetaKeyUtils::parseZoneHosts(std::move(nebula::value(zoneValueRet)));
for (auto& host : hosts) {
auto hostIter = hostLoading_.find(host);
if (hostIter == hostLoading_.end()) {
hostLoading_[host] = 0;
zoneLoading_[zone] += 0;
} else {
zoneLoading_[zone] += hostIter->second;
}
}
zoneHosts[zone] = std::move(hosts);
}

std::stringstream ss;
for (const auto& host : partHosts) {
ss << host << ", ";
}
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Create space failed";
handleErrorCode(code);
onFinished();
return;
}

VLOG(3) << "Space " << spaceId << " part " << partId << " hosts " << ss.str();
data.emplace_back(MetaKeyUtils::partKey(spaceId, partId), MetaKeyUtils::partVal(partHosts));
}
} else {
auto hostsRet = ActiveHostsMan::getActiveHosts(kvstore_);
if (!nebula::ok(hostsRet)) {
auto retCode = nebula::error(hostsRet);
LOG(ERROR) << "Create Space Failed when get active host, error "
<< apache::thrift::util::enumNameSafe(retCode);
handleErrorCode(retCode);
onFinished();
return;
for (auto partId = 1; partId <= partitionNum; partId++) {
auto pickedZonesRet = pickLightLoadZones(replicaFactor);
if (!pickedZonesRet.ok()) {
LOG(ERROR) << "Pick zone failed.";
code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}
auto hosts = std::move(nebula::value(hostsRet));
if (hosts.empty()) {
LOG(ERROR) << "Create Space Failed : No Hosts!";
handleErrorCode(nebula::cpp2::ErrorCode::E_NO_HOSTS);
onFinished();
return;

auto pickedZones = std::move(pickedZonesRet).value();
auto partHostsRet = pickHostsWithZone(pickedZones, zoneHosts);
if (!partHostsRet.ok()) {
LOG(ERROR) << "Pick hosts with zone failed.";
code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}

if ((int32_t)hosts.size() < replicaFactor) {
LOG(ERROR) << "Not enough hosts existed for replica " << replicaFactor << ", hosts num "
<< hosts.size();
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
auto partHosts = std::move(partHostsRet).value();
if (partHosts.empty()) {
LOG(ERROR) << "Pick hosts is empty.";
code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}

for (auto partId = 1; partId <= partitionNum; partId++) {
auto partHosts = pickHosts(partId, hosts, replicaFactor);
data.emplace_back(MetaKeyUtils::partKey(spaceId, partId), MetaKeyUtils::partVal(partHosts));
std::stringstream ss;
for (const auto& host : partHosts) {
ss << host << ", ";
}

VLOG(3) << "Space " << spaceId << " part " << partId << " hosts " << ss.str();
data.emplace_back(MetaKeyUtils::partKey(spaceId, partId), MetaKeyUtils::partVal(partHosts));
}

if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand All @@ -261,17 +256,6 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
LOG(INFO) << "Create space " << spaceName;
}

Hosts CreateSpaceProcessor::pickHosts(PartitionID partId,
const Hosts& hosts,
int32_t replicaFactor) {
auto startIndex = partId;
Hosts pickedHosts;
for (int32_t i = 0; i < replicaFactor; i++) {
pickedHosts.emplace_back(toThriftHost(hosts[startIndex++ % hosts.size()]));
}
return pickedHosts;
}

ErrorOr<nebula::cpp2::ErrorCode, std::unordered_map<HostAddr, int32_t>>
CreateSpaceProcessor::getHostLoading() {
const auto& prefix = MetaKeyUtils::partPrefix();
Expand Down
2 changes: 0 additions & 2 deletions src/meta/processors/parts/CreateSpaceProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class CreateSpaceProcessor : public BaseProcessor<cpp2::ExecResp> {
explicit CreateSpaceProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::ExecResp>(kvstore) {}

Hosts pickHosts(PartitionID partId, const Hosts& hosts, int32_t replicaFactor);

// Get the host with the least load in the zone
StatusOr<Hosts> pickHostsWithZone(const std::vector<std::string>& zones,
const std::unordered_map<std::string, Hosts>& zoneHosts);
Expand Down
Loading

0 comments on commit 88f47bb

Please sign in to comment.