Skip to content

Commit

Permalink
fix error for checkpoint when space doesn't exist in storageNode. (ve…
Browse files Browse the repository at this point in the history
…soft-inc#2287)

* fix error for checkpoint

* fix error for checkpoint

* Addressed darionyaphet's comment

Co-authored-by: dangleptr <37216992+dangleptr@users.noreply.github.com>
  • Loading branch information
bright-starry-sky and dangleptr authored Aug 17, 2020
1 parent b8bcf0e commit f9e50f7
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 57 deletions.
7 changes: 7 additions & 0 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ std::string MetaServiceUtils::partPrefix(GraphSpaceID spaceId) {
return prefix;
}

std::string MetaServiceUtils::partPrefix() {
std::string prefix;
prefix.reserve(kPartsTable.size() + sizeof(GraphSpaceID));
prefix.append(kPartsTable.data(), kPartsTable.size());
return prefix;
}

std::vector<nebula::cpp2::HostAddr> MetaServiceUtils::parsePartVal(folly::StringPiece val) {
std::vector<nebula::cpp2::HostAddr> hosts;
static const size_t unitSize = sizeof(int32_t) * 2;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/MetaServiceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class MetaServiceUtils final {

static std::string partVal(const std::vector<nebula::cpp2::HostAddr>& hosts);

static const std::string& partPrefix();
static std::string partPrefix();

static std::string partPrefix(GraphSpaceID spaceId);

Expand Down
97 changes: 45 additions & 52 deletions src/meta/processors/admin/SnapShot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@
namespace nebula {
namespace meta {
cpp2::ErrorCode Snapshot::createSnapshot(const std::string& name) {
std::vector<GraphSpaceID> spaces;
kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED;
if (!getAllSpaces(spaces, ret)) {
LOG(ERROR) << "Can't access kvstore, ret = d"
<< static_cast<int32_t>(ret);
auto retSpacesHosts = getSpacesHosts();
if (!retSpacesHosts.ok()) {
return cpp2::ErrorCode::E_STORE_FAILURE;
}
auto hosts = ActiveHostsMan::getActiveHosts(kv_);
for (const auto& host : hosts) {
for (auto& space : spaces) {
auto status = client_->createSnapshot(space, name, host).get();
auto spacesHosts = retSpacesHosts.value();
for (const auto& spaceHosts : spacesHosts) {
for (const auto& host : spaceHosts.second) {
auto status = client_->createSnapshot(spaceHosts.first, name, host).get();
if (!status.ok()) {
return cpp2::ErrorCode::E_RPC_FAILURE;
}
Expand All @@ -35,19 +32,15 @@ cpp2::ErrorCode Snapshot::createSnapshot(const std::string& name) {

cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name,
const std::vector<HostAddr>& hosts) {
std::vector<GraphSpaceID> spaces;
kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED;
if (!getAllSpaces(spaces, ret)) {
LOG(ERROR) << "Can't access kvstore, ret = d"
<< static_cast<int32_t>(ret);
auto retSpacesHosts = getSpacesHosts();
if (!retSpacesHosts.ok()) {
return cpp2::ErrorCode::E_STORE_FAILURE;
}
// The drop checkpoint will be skip if original host has been lost.
auto activeHosts = ActiveHostsMan::getActiveHosts(kv_);
for (auto& host : hosts) {
if (std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end()) {
std::for_each(spaces.begin(), spaces.end(), [name, host, this](auto& space) {
auto status = client_->dropSnapshot(space, name, host).get();
auto spacesHosts = retSpacesHosts.value();
for (const auto& spaceHosts : spacesHosts) {
for (const auto& host : spaceHosts.second) {
if (std::find(hosts.begin(), hosts.end(), host) != hosts.end()) {
auto status = client_->dropSnapshot(spaceHosts.first, name, host).get();
if (!status.ok()) {
auto msg = "failed drop checkpoint : \"%s\". on host %s. error %s";
auto error = folly::stringPrintf(msg,
Expand All @@ -56,49 +49,49 @@ cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name,
status.toString().c_str());
LOG(ERROR) << error;
}
});
}
}
}
return cpp2::ErrorCode::SUCCEEDED;
}

bool Snapshot::getAllSpaces(std::vector<GraphSpaceID>& spaces, kvstore::ResultCode& retCode) {
// Get all spaces
folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
auto prefix = MetaServiceUtils::spacePrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (ret != kvstore::ResultCode::SUCCEEDED) {
retCode = ret;
return false;
cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType sign) {
auto retSpacesHosts = getSpacesHosts();
if (!retSpacesHosts.ok()) {
return cpp2::ErrorCode::E_STORE_FAILURE;
}
while (iter->valid()) {
auto spaceId = MetaServiceUtils::spaceId(iter->key());
spaces.push_back(spaceId);
iter->next();
auto spacesHosts = retSpacesHosts.value();
for (const auto& spaceHosts : spacesHosts) {
for (const auto& host : spaceHosts.second) {
auto status = client_->blockingWrites(spaceHosts.first, sign, host).get();
if (!status.ok()) {
LOG(ERROR) << " Send blocking sign error on host : "
<< network::NetworkUtils::toHosts({host});
}
}
}
return true;
return cpp2::ErrorCode::SUCCEEDED;
}

cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType sign) {
std::vector<GraphSpaceID> spaces;
kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED;
if (!getAllSpaces(spaces, ret)) {
LOG(ERROR) << "Can't access kvstore, ret = d"
<< static_cast<int32_t>(ret);
return cpp2::ErrorCode::E_STORE_FAILURE;
StatusOr<std::map<GraphSpaceID, std::set<HostAddr>>> Snapshot::getSpacesHosts() {
folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
std::map<GraphSpaceID, std::set<HostAddr>> hostsByspaces;
auto prefix = MetaServiceUtils::partPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto kvRet = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (kvRet != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "Get hosts meta data error";
return Status::Error("Get hosts meta data error");
}
auto hosts = ActiveHostsMan::getActiveHosts(kv_);
for (const auto& host : hosts) {
std::for_each(spaces.begin(), spaces.end(), [host, sign, this](auto& space) {
auto status = client_->blockingWrites(space, sign, host).get();
if (!status.ok()) {
LOG(ERROR) << " Send blocking sign error on host : "
<< network::NetworkUtils::toHosts({host});
}
});
while (iter->valid()) {
auto partHosts = MetaServiceUtils::parsePartVal(iter->val());
auto space = MetaServiceUtils::parsePartKeySpaceId(iter->key());
for (auto& ph : partHosts) {
hostsByspaces[space].emplace(HostAddr(ph.get_ip(), ph.get_port()));
}
iter->next();
}
return cpp2::ErrorCode::SUCCEEDED;
return hostsByspaces;
}

} // namespace meta
Expand Down
5 changes: 1 addition & 4 deletions src/meta/processors/admin/SnapShot.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,12 @@ class Snapshot {

cpp2::ErrorCode blockingWrites(storage::cpp2::EngineSignType sign);

std::unordered_map<HostAddr, std::vector<PartitionID>>
getLeaderParts(HostLeaderMap *hostLeaderMap, GraphSpaceID spaceId);

private:
Snapshot(kvstore::KVStore* kv, AdminClient* client) : kv_(kv), client_(client) {
executor_.reset(new folly::CPUThreadPoolExecutor(1));
}

bool getAllSpaces(std::vector<GraphSpaceID>& spaces, kvstore::ResultCode& retCode);
StatusOr<std::map<GraphSpaceID, std::set<HostAddr>>> getSpacesHosts();

private:
kvstore::KVStore* kv_{nullptr};
Expand Down

0 comments on commit f9e50f7

Please sign in to comment.