Skip to content

Commit

Permalink
Addressed darionyaphet's comment
Browse files Browse the repository at this point in the history
  • Loading branch information
bright-starry-sky committed Aug 13, 2020
1 parent 1bff2ad commit 617fb24
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 60 deletions.
7 changes: 7 additions & 0 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ std::string MetaServiceUtils::partPrefix(GraphSpaceID spaceId) {
return prefix;
}

std::string MetaServiceUtils::partPrefix() {
std::string prefix;
prefix.reserve(kPartsTable.size() + sizeof(GraphSpaceID));
prefix.append(kPartsTable.data(), kPartsTable.size());
return prefix;
}

std::vector<nebula::cpp2::HostAddr> MetaServiceUtils::parsePartVal(folly::StringPiece val) {
std::vector<nebula::cpp2::HostAddr> hosts;
static const size_t unitSize = sizeof(int32_t) * 2;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/MetaServiceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class MetaServiceUtils final {

static std::string partVal(const std::vector<nebula::cpp2::HostAddr>& hosts);

static const std::string& partPrefix();
static std::string partPrefix();

static std::string partPrefix(GraphSpaceID spaceId);

Expand Down
86 changes: 30 additions & 56 deletions src/meta/processors/admin/SnapShot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@
namespace nebula {
namespace meta {
cpp2::ErrorCode Snapshot::createSnapshot(const std::string& name) {
std::vector<GraphSpaceID> spaces;
kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED;
if (!getAllSpaces(spaces, ret)) {
LOG(ERROR) << "Can't access kvstore, ret = d"
<< static_cast<int32_t>(ret);
auto retSpacesHosts = getSpacesHosts();
if (!retSpacesHosts.ok()) {
return cpp2::ErrorCode::E_STORE_FAILURE;
}
for (auto& space : spaces) {
auto hostsBySpace = getHostsBySpace(space);
for (const auto& host : hostsBySpace) {
auto status = client_->createSnapshot(space, name, host).get();
auto spacesHosts = retSpacesHosts.value();
for (const auto& spaceHosts : spacesHosts) {
for (const auto& host : spaceHosts.second) {
auto status = client_->createSnapshot(spaceHosts.first, name, host).get();
if (!status.ok()) {
return cpp2::ErrorCode::E_RPC_FAILURE;
}
Expand All @@ -35,19 +32,15 @@ cpp2::ErrorCode Snapshot::createSnapshot(const std::string& name) {

cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name,
const std::vector<HostAddr>& hosts) {
std::vector<GraphSpaceID> spaces;
kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED;
if (!getAllSpaces(spaces, ret)) {
LOG(ERROR) << "Can't access kvstore, ret = d"
<< static_cast<int32_t>(ret);
auto retSpacesHosts = getSpacesHosts();
if (!retSpacesHosts.ok()) {
return cpp2::ErrorCode::E_STORE_FAILURE;
}

for (const auto& space : spaces) {
auto hostsBySpace = getHostsBySpace(space);
for (const auto& host : hostsBySpace) {
auto spacesHosts = retSpacesHosts.value();
for (const auto& spaceHosts : spacesHosts) {
for (const auto& host : spaceHosts.second) {
if (std::find(hosts.begin(), hosts.end(), host) != hosts.end()) {
auto status = client_->dropSnapshot(space, name, host).get();
auto status = client_->dropSnapshot(spaceHosts.first, name, host).get();
if (!status.ok()) {
auto msg = "failed drop checkpoint : \"%s\". on host %s. error %s";
auto error = folly::stringPrintf(msg,
Expand All @@ -62,62 +55,43 @@ cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name,
return cpp2::ErrorCode::SUCCEEDED;
}

bool Snapshot::getAllSpaces(std::vector<GraphSpaceID>& spaces, kvstore::ResultCode& retCode) {
// Get all spaces
folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
auto prefix = MetaServiceUtils::spacePrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (ret != kvstore::ResultCode::SUCCEEDED) {
retCode = ret;
return false;
}
while (iter->valid()) {
auto spaceId = MetaServiceUtils::spaceId(iter->key());
spaces.push_back(spaceId);
iter->next();
}
return true;
}

cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType sign) {
std::vector<GraphSpaceID> spaces;
kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED;
if (!getAllSpaces(spaces, ret)) {
LOG(ERROR) << "Can't access kvstore, ret = d"
<< static_cast<int32_t>(ret);
auto retSpacesHosts = getSpacesHosts();
if (!retSpacesHosts.ok()) {
return cpp2::ErrorCode::E_STORE_FAILURE;
}
for (auto& space : spaces) {
auto hostsBySpace = getHostsBySpace(space);
for (const auto& host : hostsBySpace) {
auto status = client_->blockingWrites(space, sign, host).get();
if (!status.ok()) {
LOG(ERROR) << " Send blocking sign error on host : "
<< network::NetworkUtils::toHosts({host});
}
auto spacesHosts = retSpacesHosts.value();
for (const auto& spaceHosts : spacesHosts) {
for (const auto& host : spaceHosts.second) {
auto status = client_->blockingWrites(spaceHosts.first, sign, host).get();
if (!status.ok()) {
LOG(ERROR) << " Send blocking sign error on host : "
<< network::NetworkUtils::toHosts({host});
}
}
}
return cpp2::ErrorCode::SUCCEEDED;
}

std::set<HostAddr> Snapshot::getHostsBySpace(GraphSpaceID space) {
StatusOr<std::map<GraphSpaceID, std::set<HostAddr>>> Snapshot::getSpacesHosts() {
folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
std::set<HostAddr> hosts;
auto prefix = MetaServiceUtils::partPrefix(space);
std::map<GraphSpaceID, std::set<HostAddr>> hostsByspaces;
auto prefix = MetaServiceUtils::partPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto kvRet = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (kvRet != kvstore::ResultCode::SUCCEEDED) {
return {};
LOG(ERROR) << "Get hosts meta data error";
return Status::Error("Get hosts meta data error");
}
while (iter->valid()) {
auto partHosts = MetaServiceUtils::parsePartVal(iter->val());
auto space = MetaServiceUtils::parsePartKeySpaceId(iter->key());
for (auto& ph : partHosts) {
hosts.emplace(HostAddr(ph.get_ip(), ph.get_port()));
hostsByspaces[space].emplace(HostAddr(ph.get_ip(), ph.get_port()));
}
iter->next();
}
return hosts;
return hostsByspaces;
}

} // namespace meta
Expand Down
4 changes: 1 addition & 3 deletions src/meta/processors/admin/SnapShot.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ class Snapshot {
executor_.reset(new folly::CPUThreadPoolExecutor(1));
}

bool getAllSpaces(std::vector<GraphSpaceID>& spaces, kvstore::ResultCode& retCode);

std::set<HostAddr> getHostsBySpace(GraphSpaceID space);
StatusOr<std::map<GraphSpaceID, std::set<HostAddr>>> getSpacesHosts();

private:
kvstore::KVStore* kv_{nullptr};
Expand Down

0 comments on commit 617fb24

Please sign in to comment.