diff --git a/src/meta/MetaServiceUtils.cpp b/src/meta/MetaServiceUtils.cpp index a617fa91c2d..520553dd09f 100644 --- a/src/meta/MetaServiceUtils.cpp +++ b/src/meta/MetaServiceUtils.cpp @@ -113,6 +113,13 @@ std::string MetaServiceUtils::partPrefix(GraphSpaceID spaceId) { return prefix; } +std::string MetaServiceUtils::partPrefix() { + std::string prefix; + prefix.reserve(kPartsTable.size() + sizeof(GraphSpaceID)); + prefix.append(kPartsTable.data(), kPartsTable.size()); + return prefix; +} + std::vector MetaServiceUtils::parsePartVal(folly::StringPiece val) { std::vector hosts; static const size_t unitSize = sizeof(int32_t) * 2; diff --git a/src/meta/MetaServiceUtils.h b/src/meta/MetaServiceUtils.h index 671a40d8160..5abcf7ca97a 100644 --- a/src/meta/MetaServiceUtils.h +++ b/src/meta/MetaServiceUtils.h @@ -55,7 +55,7 @@ class MetaServiceUtils final { static std::string partVal(const std::vector& hosts); - static const std::string& partPrefix(); + static std::string partPrefix(); static std::string partPrefix(GraphSpaceID spaceId); diff --git a/src/meta/processors/admin/SnapShot.cpp b/src/meta/processors/admin/SnapShot.cpp index 5032d42e4ef..d48ecf0af97 100644 --- a/src/meta/processors/admin/SnapShot.cpp +++ b/src/meta/processors/admin/SnapShot.cpp @@ -14,17 +14,14 @@ namespace nebula { namespace meta { cpp2::ErrorCode Snapshot::createSnapshot(const std::string& name) { - std::vector spaces; - kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED; - if (!getAllSpaces(spaces, ret)) { - LOG(ERROR) << "Can't access kvstore, ret = d" - << static_cast(ret); + auto retSpacesHosts = getSpacesHosts(); + if (!retSpacesHosts.ok()) { return cpp2::ErrorCode::E_STORE_FAILURE; } - auto hosts = ActiveHostsMan::getActiveHosts(kv_); - for (const auto& host : hosts) { - for (auto& space : spaces) { - auto status = client_->createSnapshot(space, name, host).get(); + auto spacesHosts = retSpacesHosts.value(); + for (const auto& spaceHosts : spacesHosts) { + for (const auto& host : spaceHosts.second) { + auto status = client_->createSnapshot(spaceHosts.first, name, host).get(); if (!status.ok()) { return cpp2::ErrorCode::E_RPC_FAILURE; } @@ -35,19 +32,15 @@ cpp2::ErrorCode Snapshot::createSnapshot(const std::string& name) { cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name, const std::vector& hosts) { - std::vector spaces; - kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED; - if (!getAllSpaces(spaces, ret)) { - LOG(ERROR) << "Can't access kvstore, ret = d" - << static_cast(ret); + auto retSpacesHosts = getSpacesHosts(); + if (!retSpacesHosts.ok()) { return cpp2::ErrorCode::E_STORE_FAILURE; } - // The drop checkpoint will be skip if original host has been lost. - auto activeHosts = ActiveHostsMan::getActiveHosts(kv_); - for (auto& host : hosts) { - if (std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end()) { - std::for_each(spaces.begin(), spaces.end(), [name, host, this](auto& space) { - auto status = client_->dropSnapshot(space, name, host).get(); + auto spacesHosts = retSpacesHosts.value(); + for (const auto& spaceHosts : spacesHosts) { + for (const auto& host : spaceHosts.second) { + if (std::find(hosts.begin(), hosts.end(), host) != hosts.end()) { + auto status = client_->dropSnapshot(spaceHosts.first, name, host).get(); if (!status.ok()) { auto msg = "failed drop checkpoint : \"%s\". on host %s. error %s"; auto error = folly::stringPrintf(msg, @@ -56,49 +49,49 @@ cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name, status.toString().c_str()); LOG(ERROR) << error; } - }); + } } } return cpp2::ErrorCode::SUCCEEDED; } -bool Snapshot::getAllSpaces(std::vector& spaces, kvstore::ResultCode& retCode) { - // Get all spaces - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); - auto prefix = MetaServiceUtils::spacePrefix(); - std::unique_ptr iter; - auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); - if (ret != kvstore::ResultCode::SUCCEEDED) { - retCode = ret; - return false; +cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType sign) { + auto retSpacesHosts = getSpacesHosts(); + if (!retSpacesHosts.ok()) { + return cpp2::ErrorCode::E_STORE_FAILURE; } - while (iter->valid()) { - auto spaceId = MetaServiceUtils::spaceId(iter->key()); - spaces.push_back(spaceId); - iter->next(); + auto spacesHosts = retSpacesHosts.value(); + for (const auto& spaceHosts : spacesHosts) { + for (const auto& host : spaceHosts.second) { + auto status = client_->blockingWrites(spaceHosts.first, sign, host).get(); + if (!status.ok()) { + LOG(ERROR) << " Send blocking sign error on host : " + << network::NetworkUtils::toHosts({host}); + } + } } - return true; + return cpp2::ErrorCode::SUCCEEDED; } -cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType sign) { - std::vector spaces; - kvstore::ResultCode ret = kvstore::ResultCode::SUCCEEDED; - if (!getAllSpaces(spaces, ret)) { - LOG(ERROR) << "Can't access kvstore, ret = d" - << static_cast(ret); - return cpp2::ErrorCode::E_STORE_FAILURE; +StatusOr>> Snapshot::getSpacesHosts() { + folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + std::map> hostsByspaces; + auto prefix = MetaServiceUtils::partPrefix(); + std::unique_ptr iter; + auto kvRet = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (kvRet != kvstore::ResultCode::SUCCEEDED) { + LOG(ERROR) << "Get hosts meta data error"; + return Status::Error("Get hosts meta data error"); } - auto hosts = ActiveHostsMan::getActiveHosts(kv_); - for (const auto& host : hosts) { - std::for_each(spaces.begin(), spaces.end(), [host, sign, this](auto& space) { - auto status = client_->blockingWrites(space, sign, host).get(); - if (!status.ok()) { - LOG(ERROR) << " Send blocking sign error on host : " - << network::NetworkUtils::toHosts({host}); - } - }); + while (iter->valid()) { + auto partHosts = MetaServiceUtils::parsePartVal(iter->val()); + auto space = MetaServiceUtils::parsePartKeySpaceId(iter->key()); + for (auto& ph : partHosts) { + hostsByspaces[space].emplace(HostAddr(ph.get_ip(), ph.get_port())); + } + iter->next(); } - return cpp2::ErrorCode::SUCCEEDED; + return hostsByspaces; } } // namespace meta diff --git a/src/meta/processors/admin/SnapShot.h b/src/meta/processors/admin/SnapShot.h index a3a31f4968a..bb28897ad75 100644 --- a/src/meta/processors/admin/SnapShot.h +++ b/src/meta/processors/admin/SnapShot.h @@ -32,15 +32,12 @@ class Snapshot { cpp2::ErrorCode blockingWrites(storage::cpp2::EngineSignType sign); - std::unordered_map> - getLeaderParts(HostLeaderMap *hostLeaderMap, GraphSpaceID spaceId); - private: Snapshot(kvstore::KVStore* kv, AdminClient* client) : kv_(kv), client_(client) { executor_.reset(new folly::CPUThreadPoolExecutor(1)); } - bool getAllSpaces(std::vector& spaces, kvstore::ResultCode& retCode); + StatusOr>> getSpacesHosts(); private: kvstore::KVStore* kv_{nullptr};