From 02880b90576f07045da22abb1e9f55c50fa66b24 Mon Sep 17 00:00:00 2001 From: pengweisong <90180021+pengweisong@users.noreply.github.com> Date: Thu, 31 Mar 2022 01:00:22 +0800 Subject: [PATCH 1/6] add code for part peers backward compatible --- src/kvstore/Common.h | 1 + src/kvstore/NebulaStore.cpp | 61 ++++++++++++++++++++++++++++++++++++- src/kvstore/NebulaStore.h | 5 +++ 3 files changed, 66 insertions(+), 1 deletion(-) diff --git a/src/kvstore/Common.h b/src/kvstore/Common.h index e2ee85dd030..22bfde9d2f6 100644 --- a/src/kvstore/Common.h +++ b/src/kvstore/Common.h @@ -59,6 +59,7 @@ struct Peer { Status status; Peer() : addr(), status(Status::kNormalPeer) {} + explicit Peer(HostAddr a) : addr(a), status(Status::kNormalPeer) {} Peer(HostAddr a, Status s) : addr(a), status(s) {} std::string toString() const { diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 79b902aa25a..7aabf7f3d69 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -13,6 +13,7 @@ #include "common/fs/FileUtils.h" #include "common/network/NetworkUtils.h" +#include "common/utils/NebulaKeyUtils.h" #include "kvstore/NebulaSnapshotManager.h" #include "kvstore/RocksEngine.h" @@ -64,6 +65,7 @@ bool NebulaStore::init() { // todo(doodle): we could support listener and normal storage start at same // instance if (!isListener()) { + fillPartPeers(); // TODO(spw): need to refactor, we could load data from local regardless of partManager, // then adjust the data in loadPartFromPartManager. loadPartFromDataPath(); @@ -81,6 +83,62 @@ bool NebulaStore::init() { return true; } +void NebulaStore::fillPartPeers() { + CHECK(!!options_.partMan_); + + auto partsMap = options_.partMan_->parts(storeSvcAddr_); + + // fill empty parts with part manager + for (auto& path : options_.dataPaths_) { + auto rootPath = folly::stringPrintf("%s/nebula", path.c_str()); + auto dirs = fs::FileUtils::listAllDirsInDir(rootPath.c_str()); + for (auto& dir : dirs) { + LOG(INFO) << "Scan path \"" << rootPath << "/" << dir << "\""; + try { + GraphSpaceID spaceId; + try { + spaceId = folly::to(dir); + } catch (const std::exception& ex) { + LOG(ERROR) << folly::sformat("Data path {} invalid {}", dir, ex.what()); + continue; + } + + if (spaceId == 0) { + // skip the system space, only handle data space here. + continue; + } + + if (partsMap.find(spaceId) == partsMap.end()) { + // skip if the space not in the part manager + continue; + } + + std::vector data; + auto& partPeers = partsMap[spaceId]; + auto engine = newEngine(spaceId, path, options_.walPath_); + for (auto& [partId, raftPeers] : engine->allPartPeers()) { + if (partPeers.find(partId) == partPeers.end()) { + continue; + } + + if (raftPeers.size() == 0) { + Peers peersToPersist; + for (auto& peer : partPeers[partId].hosts_) { + peersToPersist.addOrUpdate(Peer(getRaftAddr(peer))); + } + data.emplace_back(NebulaKeyUtils::systemPartKey(partId), peersToPersist.toString()); + } + } + + auto code = engine->multiPut(data); + CHECK(code == nebula::cpp2::ErrorCode::SUCCEEDED); + } catch (std::exception& e) { + LOG(FATAL) << "Invalid data directory \"" << dir << "\""; + } + } + } +} + void NebulaStore::loadPartFromDataPath() { CHECK(!!options_.partMan_); LOG(INFO) << "Scan the local path, and init the spaces_"; @@ -110,8 +168,9 @@ void NebulaStore::loadPartFromDataPath() { auto engine = newEngine(spaceId, path, options_.walPath_); std::map partRaftPeers; for (auto& [partId, raftPeers] : engine->allPartPeers()) { - bool isNormalPeer = true; + CHECK_NE(raftPeers.size(), 0); + bool isNormalPeer = true; Peer raftPeer; bool exist = raftPeers.get(raftAddr_, &raftPeer); if (exist) { diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 0bfbfba53e6..f57fb640de3 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -720,6 +720,11 @@ class NebulaStore : public KVStore, public Handler { } private: + /** + * @brief Backward compatible: fill old system part info with peers info. + * + */ + void fillPartPeers(); /** * @brief Load partitions by reading system part keys in kv engine */ From af8e5b663f875ef966deb2edd4427e74390c128e Mon Sep 17 00:00:00 2001 From: pengweisong <90180021+pengweisong@users.noreply.github.com> Date: Thu, 31 Mar 2022 18:30:34 +0800 Subject: [PATCH 2/6] add balance keys --- src/common/utils/NebulaKeyUtils.cpp | 11 +++ src/common/utils/NebulaKeyUtils.h | 15 +++ src/common/utils/Types.h | 1 + src/kvstore/Common.h | 64 ++++++++++--- src/kvstore/KVEngine.h | 9 +- src/kvstore/NebulaStore.cpp | 140 +++++++++++++--------------- src/kvstore/NebulaStore.h | 5 - src/kvstore/RocksEngine.cpp | 26 ++++-- src/kvstore/RocksEngine.h | 18 +++- 9 files changed, 178 insertions(+), 111 deletions(-) diff --git a/src/common/utils/NebulaKeyUtils.cpp b/src/common/utils/NebulaKeyUtils.cpp index ff1c60ba8bd..a220c1dc0cb 100644 --- a/src/common/utils/NebulaKeyUtils.cpp +++ b/src/common/utils/NebulaKeyUtils.cpp @@ -119,6 +119,17 @@ std::string NebulaKeyUtils::systemPartKey(PartitionID partId) { return key; } +// static +std::string NebulaKeyUtils::systemBalanceKey(PartitionID partId) { + uint32_t item = (partId << kPartitionOffset) | static_cast(NebulaKeyType::kSystem); + uint32_t type = static_cast(NebulaSystemKeyType::kSystemBalance); + std::string key; + key.reserve(kSystemLen); + key.append(reinterpret_cast(&item), sizeof(PartitionID)) + .append(reinterpret_cast(&type), sizeof(NebulaSystemKeyType)); + return key; +} + // static std::string NebulaKeyUtils::kvKey(PartitionID partId, const folly::StringPiece& name) { std::string key; diff --git a/src/common/utils/NebulaKeyUtils.h b/src/common/utils/NebulaKeyUtils.h index 6d07383e4dd..049dc135659 100644 --- a/src/common/utils/NebulaKeyUtils.h +++ b/src/common/utils/NebulaKeyUtils.h @@ -77,6 +77,8 @@ class NebulaKeyUtils final { static std::string systemPartKey(PartitionID partId); + static std::string systemBalanceKey(PartitionID partId); + static std::string kvKey(PartitionID partId, const folly::StringPiece& name); /** @@ -188,6 +190,19 @@ class NebulaKeyUtils final { return static_cast(type) == NebulaSystemKeyType::kSystemPart; } + static bool isSystemBalance(const folly::StringPiece& rawKey) { + if (rawKey.size() != kSystemLen) { + return false; + } + if (!isSystem(rawKey)) { + return false; + } + auto position = rawKey.data() + sizeof(PartitionID); + auto len = sizeof(NebulaSystemKeyType); + auto type = readInt(position, len); + return static_cast(type) == NebulaSystemKeyType::kSystemBalance; + } + static VertexIDSlice getSrcId(size_t vIdLen, const folly::StringPiece& rawKey) { if (rawKey.size() < kEdgeLen + (vIdLen << 1)) { dumpBadKey(rawKey, kEdgeLen + (vIdLen << 1), vIdLen); diff --git a/src/common/utils/Types.h b/src/common/utils/Types.h index e365fb4270e..c5a812b9a16 100644 --- a/src/common/utils/Types.h +++ b/src/common/utils/Types.h @@ -26,6 +26,7 @@ enum class NebulaKeyType : uint32_t { enum class NebulaSystemKeyType : uint32_t { kSystemCommit = 0x00000001, kSystemPart = 0x00000002, + kSystemBalance = 0x00000003, }; enum class NebulaOperationType : uint32_t { diff --git a/src/kvstore/Common.h b/src/kvstore/Common.h index 22bfde9d2f6..dff241c93e4 100644 --- a/src/kvstore/Common.h +++ b/src/kvstore/Common.h @@ -12,6 +12,7 @@ #include "common/base/Base.h" #include "common/datatypes/HostAddr.h" #include "common/thrift/ThriftTypes.h" +#include "common/time/WallClock.h" #include "common/utils/Types.h" #include "interface/gen-cpp2/common_types.h" @@ -107,26 +108,37 @@ inline std::ostream& operator<<(std::ostream& os, const Peer& peer) { struct Peers { private: std::map peers; + int createdTime; public: - Peers() {} + Peers() { + createdTime = static_cast(time::WallClock::fastNowInSec()); + } explicit Peers(const std::vector& addrs) { // from normal peers for (auto& addr : addrs) { peers[addr] = Peer(addr, Peer::Status::kNormalPeer); } + createdTime = time::WallClock::fastNowInSec(); } explicit Peers(const std::vector& ps) { for (auto& p : ps) { peers[p.addr] = p; } + createdTime = time::WallClock::fastNowInSec(); + } + explicit Peers(std::map ps) : peers(std::move(ps)) { + createdTime = time::WallClock::fastNowInSec(); } - explicit Peers(std::map ps) : peers(std::move(ps)) {} void addOrUpdate(const Peer& peer) { peers[peer.addr] = peer; } - bool get(const HostAddr& addr, Peer* peer) { + bool exist(const HostAddr& addr) const { + return peers.find(addr) != peers.end(); + } + + bool get(const HostAddr& addr, Peer* peer) const { auto it = peers.find(addr); if (it == peers.end()) { return false; @@ -142,7 +154,7 @@ struct Peers { peers.erase(addr); } - size_t size() { + size_t size() const { return peers.size(); } @@ -150,6 +162,27 @@ struct Peers { return peers; } + bool allNormalPeers() const { + for (const auto& [addr, peer] : peers) { + if (peer.status == Peer::Status::kDeleted) { + continue; + } + + if (peer.status != Peer::Status::kNormalPeer) { + return false; + } + } + return true; + } + + bool isExpired() const { + return static_cast(time::WallClock::fastNowInSec()) - createdTime > 3600 * 24; + } + + void setCreatedTime(int time) { + createdTime = time; + } + std::string toString() const { std::stringstream os; os << "version:1," @@ -160,21 +193,28 @@ struct Peers { return os.str(); } - static std::pair extractHeader(const std::string& header) { + static std::tuple extractHeader(const std::string& header) { auto pos = header.find(":"); if (pos == std::string::npos) { - LOG(INFO) << "Parse part peers header error:" << header; - return {0, 0}; + LOG(INFO) << "Parse version from part peers header error:" << header; + return {0, 0, 0}; } int version = std::stoi(header.substr(pos + 1)); pos = header.find(":", pos + 1); if (pos == std::string::npos) { - LOG(INFO) << "Parse part peers header error:" << header; - return {0, 0}; + LOG(INFO) << "Parse count from part peers header error:" << header; + return {0, 0, 0}; } int count = std::stoi(header.substr(pos + 1)); - return {version, count}; + pos = header.find(":", pos + 1); + if (pos == std::string::npos) { + LOG(INFO) << "Parse created time from part peers header error:" << header; + return {0, 0, 0}; + } + int createdTime = std::stoi(header.substr(pos + 1)); + + return {version, count, createdTime}; } static Peers fromString(const std::string& str) { @@ -187,7 +227,7 @@ struct Peers { return peers; } - auto [version, count] = extractHeader(lines[0]); + auto [version, count, createdTime] = extractHeader(lines[0]); if (version != 1) { LOG(INFO) << "Wrong peers format version:" << version; return peers; @@ -199,6 +239,8 @@ struct Peers { return peers; } + peers.setCreatedTime(createdTime); + // skip header for (size_t i = 1; i < lines.size(); ++i) { auto& line = lines[i]; diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index 5d48206104b..d3a98d044cc 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -228,11 +228,10 @@ class KVEngine { virtual void addPart(PartitionID partId, const Peers& raftPeers) = 0; /** - * @brief Update part info. Could only update the peers' persist info now. + * @brief Update part info. Could only update the persist peers info in balancing now. * * @param partId - * @param raftPeer 1. if raftPeer.status is kDeleted, delete this peer. - * 2. if raftPeer.status is others, add or update this peer + * @param raftPeer */ virtual void updatePart(PartitionID partId, const Peer& raftPeer) = 0; @@ -251,11 +250,11 @@ class KVEngine { virtual std::vector allParts() = 0; /** - * @brief Return all partId->raft peers that current storage engine holds. + * @brief Return all balancing partId->raft peers that current storage engine holds. * * @return std::map partId-> raft peers for each part, including learners */ - virtual std::map allPartPeers() = 0; + virtual std::map balancePartPeers() = 0; /** * @brief Return total parts num diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 7aabf7f3d69..031810d6584 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -13,6 +13,7 @@ #include "common/fs/FileUtils.h" #include "common/network/NetworkUtils.h" +#include "common/time/WallClock.h" #include "common/utils/NebulaKeyUtils.h" #include "kvstore/NebulaSnapshotManager.h" #include "kvstore/RocksEngine.h" @@ -65,7 +66,6 @@ bool NebulaStore::init() { // todo(doodle): we could support listener and normal storage start at same // instance if (!isListener()) { - fillPartPeers(); // TODO(spw): need to refactor, we could load data from local regardless of partManager, // then adjust the data in loadPartFromPartManager. loadPartFromDataPath(); @@ -83,66 +83,11 @@ bool NebulaStore::init() { return true; } -void NebulaStore::fillPartPeers() { - CHECK(!!options_.partMan_); - - auto partsMap = options_.partMan_->parts(storeSvcAddr_); - - // fill empty parts with part manager - for (auto& path : options_.dataPaths_) { - auto rootPath = folly::stringPrintf("%s/nebula", path.c_str()); - auto dirs = fs::FileUtils::listAllDirsInDir(rootPath.c_str()); - for (auto& dir : dirs) { - LOG(INFO) << "Scan path \"" << rootPath << "/" << dir << "\""; - try { - GraphSpaceID spaceId; - try { - spaceId = folly::to(dir); - } catch (const std::exception& ex) { - LOG(ERROR) << folly::sformat("Data path {} invalid {}", dir, ex.what()); - continue; - } - - if (spaceId == 0) { - // skip the system space, only handle data space here. - continue; - } - - if (partsMap.find(spaceId) == partsMap.end()) { - // skip if the space not in the part manager - continue; - } - - std::vector data; - auto& partPeers = partsMap[spaceId]; - auto engine = newEngine(spaceId, path, options_.walPath_); - for (auto& [partId, raftPeers] : engine->allPartPeers()) { - if (partPeers.find(partId) == partPeers.end()) { - continue; - } - - if (raftPeers.size() == 0) { - Peers peersToPersist; - for (auto& peer : partPeers[partId].hosts_) { - peersToPersist.addOrUpdate(Peer(getRaftAddr(peer))); - } - data.emplace_back(NebulaKeyUtils::systemPartKey(partId), peersToPersist.toString()); - } - } - - auto code = engine->multiPut(data); - CHECK(code == nebula::cpp2::ErrorCode::SUCCEEDED); - } catch (std::exception& e) { - LOG(FATAL) << "Invalid data directory \"" << dir << "\""; - } - } - } -} - void NebulaStore::loadPartFromDataPath() { CHECK(!!options_.partMan_); LOG(INFO) << "Scan the local path, and init the spaces_"; - std::unordered_set> spacePartIdSet; + // avoid duplicate engine created + std::unordered_set> partSet; for (auto& path : options_.dataPaths_) { auto rootPath = folly::stringPrintf("%s/nebula", path.c_str()); auto dirs = fs::FileUtils::listAllDirsInDir(rootPath.c_str()); @@ -162,36 +107,77 @@ void NebulaStore::loadPartFromDataPath() { continue; } - // Load raft peers info which persisted to local engine. - // If the partition was in balancing process before restart, we should keep it - // though the part is not in the meta. auto engine = newEngine(spaceId, path, options_.walPath_); std::map partRaftPeers; - for (auto& [partId, raftPeers] : engine->allPartPeers()) { + + // load balancing part info which persisted to local engine. + for (auto& [partId, raftPeers] : engine->balancePartPeers()) { CHECK_NE(raftPeers.size(), 0); - bool isNormalPeer = true; - Peer raftPeer; - bool exist = raftPeers.get(raftAddr_, &raftPeer); - if (exist) { - if (raftPeer.status != Peer::Status::kNormalPeer) { - isNormalPeer = false; + auto spacePart = std::make_pair(spaceId, partId); + if (partSet.find(spacePart) == partSet.end()) { + partSet.insert(std::make_pair(spaceId, partId)); + + // join the balancing peers with meta peers + auto metaStatus = options_.partMan_->partMeta(spaceId, partId); + if (!metaStatus.ok()) { + LOG(INFO) << "space: " << spaceId << "; partId: " << partId + << " does not exist in part manager."; + continue; } + + auto partMeta = metaStatus.value(); + for (auto& h : partMeta.hosts_) { + if (h != storeSvcAddr_) { + auto raftAddr = getRaftAddr(h); + if (!raftPeers.exist(raftAddr)) { + VLOG(1) << "Add raft peer " << raftAddr; + raftPeers.addOrUpdate(Peer(raftAddr)); + } + } + } + partRaftPeers.emplace(partId, raftPeers); } + } - if (!options_.partMan_->partExist(storeSvcAddr_, spaceId, partId).ok() && isNormalPeer) { - LOG(INFO) << "Part " << partId - << " is a normal peer and does not exist in meta any more, will remove it!"; + // load normal part ids which persisted to local engine. + std::map normalPartPeers; + for (auto& partId : engine->allParts()) { + // first priority: balancing + bool inBalancing = partRaftPeers.find(partId) != partRaftPeers.end(); + if (inBalancing) { + continue; + } + + // second priority: meta + if (!options_.partMan_->partExist(storeSvcAddr_, spaceId, partId).ok()) { + LOG(INFO) + << "Part " << partId + << " is not in balancing and does not exist in meta any more, will remove it!"; engine->removePart(partId); continue; - } else { - auto spacePart = std::make_pair(spaceId, partId); - if (spacePartIdSet.find(spacePart) == spacePartIdSet.end()) { - spacePartIdSet.emplace(spacePart); - partRaftPeers.emplace(partId, raftPeers); + } + + auto spacePart = std::make_pair(spaceId, partId); + if (partSet.find(spacePart) == partSet.end()) { + partSet.emplace(spacePart); + + // fill the peers + auto metaStatus = options_.partMan_->partMeta(spaceId, partId); + CHECK(metaStatus.ok()); + auto partMeta = metaStatus.value(); + Peers peers; + for (auto& h : partMeta.hosts_) { + if (h != storeSvcAddr_) { + VLOG(1) << "Add raft peer " << getRaftAddr(h); + peers.addOrUpdate(Peer(getRaftAddr(h))); + } } + normalPartPeers.emplace(partId, peers); } } + + // there is no valid part in this engine, remove it if (partRaftPeers.empty()) { engine.reset(); // close engine if (!options_.partMan_->spaceExist(storeSvcAddr_, spaceId).ok()) { @@ -203,7 +189,7 @@ void NebulaStore::loadPartFromDataPath() { continue; } - // add to spaces if the part should exist + // add to spaces KVEngine* enginePtr = nullptr; { folly::RWSpinLock::WriteHolder wh(&lock_); diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index f57fb640de3..0bfbfba53e6 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -720,11 +720,6 @@ class NebulaStore : public KVStore, public Handler { } private: - /** - * @brief Backward compatible: fill old system part info with peers info. - * - */ - void fillPartPeers(); /** * @brief Load partitions by reading system part keys in kv engine */ diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index b1e4ca8d57c..47d91913e5a 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -368,30 +368,39 @@ std::string RocksEngine::partKey(PartitionID partId) { return NebulaKeyUtils::systemPartKey(partId); } +std::string RocksEngine::balanceKey(PartitionID partId) { + return NebulaKeyUtils::systemBalanceKey(partId); +} + void RocksEngine::addPart(PartitionID partId, const Peers& raftPeers) { - auto ret = put(partKey(partId), raftPeers.toString()); + auto ret = put(partKey(partId), ""); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { partsNum_++; CHECK_GE(partsNum_, 0); } + + if (!raftPeers.allNormalPeers()) { + put(balanceKey(partId), raftPeers.toString()); + } } void RocksEngine::updatePart(PartitionID partId, const Peer& raftPeer) { std::string val; - auto ret = get(partKey(partId), &val); + auto ret = get(balanceKey(partId), &val); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "Update part failed when get, partId=" << partId; return; } auto peers = Peers::fromString(val); - if (raftPeer.status == Peer::Status::kDeleted) { - peers.remove(raftPeer.addr); + peers.addOrUpdate(raftPeer); + + if (peers.allNormalPeers()) { + ret = remove(balanceKey(partId)); } else { - peers.addOrUpdate(raftPeer); + ret = put(balanceKey(partId), peers.toString()); } - ret = put(partKey(partId), peers.toString()); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "Update part failed when put back, partId=" << partId; } @@ -402,6 +411,7 @@ void RocksEngine::removePart(PartitionID partId) { options.disableWAL = FLAGS_rocksdb_disable_wal; std::vector sysKeysToDelete; sysKeysToDelete.emplace_back(partKey(partId)); + sysKeysToDelete.emplace_back(balanceKey(partId)); sysKeysToDelete.emplace_back(NebulaKeyUtils::systemCommitKey(partId)); auto code = multiRemove(sysKeysToDelete); if (code == nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -434,7 +444,7 @@ std::vector RocksEngine::allParts() { return parts; } -std::map RocksEngine::allPartPeers() { +std::map RocksEngine::balancePartPeers() { std::unique_ptr iter; std::map partRaftPeers; static const std::string prefixStr = NebulaKeyUtils::systemPrefix(); @@ -445,7 +455,7 @@ std::map RocksEngine::allPartPeers() { while (iter->valid()) { auto key = iter->key(); - if (!NebulaKeyUtils::isSystemPart(key)) { + if (!NebulaKeyUtils::isSystemBalance(key)) { iter->next(); continue; } diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index c14e53df10c..b8ad8d5f16a 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -358,11 +358,11 @@ class RocksEngine : public KVEngine { void addPart(PartitionID partId, const Peers& raftPeers = {}) override; /** - * @brief Update part info. Could only update the persist peers info now. + * @brief Update part info. Could only update the persist peers info in balancing now. * * @param partId - * @param raftPeer 1. if raftPeer.status is kDeleted, delete this peer. - * 2. if raftPeer.status is others, add or update this peer + * @param raftPeer + * */ void updatePart(PartitionID partId, const Peer& raftPeer) override; @@ -381,11 +381,11 @@ class RocksEngine : public KVEngine { std::vector allParts() override; /** - * @brief Retrun all the part->raft peers in rocksdb engine by scanning system part key. + * @brief Retrun all the balancing part->raft peers in rocksdb engine by scanning system part key. * * @return std::map */ - std::map allPartPeers() override; + std::map balancePartPeers() override; /** * @brief Return total partition numbers @@ -485,6 +485,14 @@ class RocksEngine : public KVEngine { */ std::string partKey(PartitionID partId); + /** + * @brief System balance key, containing balancing info + * + * @param partId + * @return std::string + */ + std::string balanceKey(PartitionID partId); + /** * @brief Open the rocksdb backup engine, mainly for rocksdb PlainTable mounted on tmpfs/ramfs * From 099d83892921ba493b1cee401a873fdf93497ac8 Mon Sep 17 00:00:00 2001 From: pengweisong <90180021+pengweisong@users.noreply.github.com> Date: Fri, 1 Apr 2022 16:37:50 +0800 Subject: [PATCH 3/6] fix bug: pass tests --- src/kvstore/Common.h | 2 +- src/kvstore/NebulaStore.cpp | 21 ++++++++------------- src/kvstore/test/NebulaStoreTest.cpp | 5 +++-- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/src/kvstore/Common.h b/src/kvstore/Common.h index dff241c93e4..a22849ee0c9 100644 --- a/src/kvstore/Common.h +++ b/src/kvstore/Common.h @@ -186,7 +186,7 @@ struct Peers { std::string toString() const { std::stringstream os; os << "version:1," - << "count:" << peers.size() << "\n"; + << "count:" << peers.size() << ",ts:" << createdTime << "\n"; for (const auto& [_, p] : peers) { os << p << "\n"; } diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 031810d6584..bc02f6dcbd1 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -122,13 +122,10 @@ void NebulaStore::loadPartFromDataPath() { auto metaStatus = options_.partMan_->partMeta(spaceId, partId); if (!metaStatus.ok()) { LOG(INFO) << "space: " << spaceId << "; partId: " << partId - << " does not exist in part manager."; - continue; - } - - auto partMeta = metaStatus.value(); - for (auto& h : partMeta.hosts_) { - if (h != storeSvcAddr_) { + << " does not exist in part manager when join balancing."; + } else { + auto partMeta = metaStatus.value(); + for (auto& h : partMeta.hosts_) { auto raftAddr = getRaftAddr(h); if (!raftPeers.exist(raftAddr)) { VLOG(1) << "Add raft peer " << raftAddr; @@ -136,12 +133,12 @@ void NebulaStore::loadPartFromDataPath() { } } } + partRaftPeers.emplace(partId, raftPeers); } } // load normal part ids which persisted to local engine. - std::map normalPartPeers; for (auto& partId : engine->allParts()) { // first priority: balancing bool inBalancing = partRaftPeers.find(partId) != partRaftPeers.end(); @@ -168,12 +165,10 @@ void NebulaStore::loadPartFromDataPath() { auto partMeta = metaStatus.value(); Peers peers; for (auto& h : partMeta.hosts_) { - if (h != storeSvcAddr_) { - VLOG(1) << "Add raft peer " << getRaftAddr(h); - peers.addOrUpdate(Peer(getRaftAddr(h))); - } + VLOG(1) << "Add raft peer " << getRaftAddr(h); + peers.addOrUpdate(Peer(getRaftAddr(h))); } - normalPartPeers.emplace(partId, peers); + partRaftPeers.emplace(partId, peers); } } diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index f603e97bd56..c8abdcb7171 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -166,7 +166,7 @@ TEST(NebulaStoreTest, PartsTest) { // disk2: 5, 6, 7, 15 KVOptions options; - options.dataPaths_ = std::move(paths); + options.dataPaths_ = paths; options.partMan_ = std::move(partMan); HostAddr local = {"", 0}; auto store = @@ -291,11 +291,12 @@ TEST(NebulaStoreTest, PersistPeersTest) { // disk2: 5, 6, 7, 15 KVOptions options; - options.dataPaths_ = std::move(paths); + options.dataPaths_ = paths; options.partMan_ = std::move(partMan); auto store = std::make_unique(std::move(options), ioThreadPool, local, getHandlers()); store->init(); + auto check = [&](GraphSpaceID spaceId) { for (int i = 0; i < static_cast(paths.size()); i++) { ASSERT_EQ(folly::stringPrintf("%s/disk%d/nebula/%d", rootPath.path(), i + 1, spaceId), From 07f0513e1a0653d5ad4dfa76268bc1676c5d5253 Mon Sep 17 00:00:00 2001 From: pengweisong <90180021+pengweisong@users.noreply.github.com> Date: Fri, 1 Apr 2022 18:30:37 +0800 Subject: [PATCH 4/6] add gflag && ignore --- src/kvstore/Common.h | 5 ++++- src/kvstore/NebulaStore.cpp | 8 +++++++- src/kvstore/RocksEngine.cpp | 3 +++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/kvstore/Common.h b/src/kvstore/Common.h index a22849ee0c9..e3f9c6721e3 100644 --- a/src/kvstore/Common.h +++ b/src/kvstore/Common.h @@ -16,6 +16,8 @@ #include "common/utils/Types.h" #include "interface/gen-cpp2/common_types.h" +DECLARE_int32(balance_expired_sesc); + namespace nebula { namespace kvstore { @@ -176,7 +178,8 @@ struct Peers { } bool isExpired() const { - return static_cast(time::WallClock::fastNowInSec()) - createdTime > 3600 * 24; + return static_cast(time::WallClock::fastNowInSec()) - createdTime > + static_cast(FLAGS_balance_expired_sesc); } void setCreatedTime(int time) { diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index bc02f6dcbd1..efb06e5fa5f 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -114,6 +114,12 @@ void NebulaStore::loadPartFromDataPath() { for (auto& [partId, raftPeers] : engine->balancePartPeers()) { CHECK_NE(raftPeers.size(), 0); + if (raftPeers.isExpired()) { + LOG(INFO) << "Space: " << spaceId << ", part:" << partId + << " balancing info expired, ignore it."; + continue; + } + auto spacePart = std::make_pair(spaceId, partId); if (partSet.find(spacePart) == partSet.end()) { partSet.insert(std::make_pair(spaceId, partId)); @@ -121,7 +127,7 @@ void NebulaStore::loadPartFromDataPath() { // join the balancing peers with meta peers auto metaStatus = options_.partMan_->partMeta(spaceId, partId); if (!metaStatus.ok()) { - LOG(INFO) << "space: " << spaceId << "; partId: " << partId + LOG(INFO) << "Space: " << spaceId << "; partId: " << partId << " does not exist in part manager when join balancing."; } else { auto partMeta = metaStatus.value(); diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index 47d91913e5a..458c6d49359 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -15,6 +15,9 @@ #include "kvstore/KVStore.h" DEFINE_bool(move_files, false, "Move the SST files instead of copy when ingest into dataset"); +DEFINE_int32(balance_expired_sesc, + 86400, + "The expired time of balancing part info persisted in the storaged"); namespace nebula { namespace kvstore { From 60816a5d079866b9f864f2c5d7f7d48b4746a712 Mon Sep 17 00:00:00 2001 From: pengweisong <90180021+pengweisong@users.noreply.github.com> Date: Sat, 2 Apr 2022 13:50:43 +0800 Subject: [PATCH 5/6] change int to int64_t --- src/kvstore/Common.h | 39 ++++++++++++++++--------------------- src/kvstore/RocksEngine.cpp | 2 +- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/src/kvstore/Common.h b/src/kvstore/Common.h index e3f9c6721e3..696edf5acae 100644 --- a/src/kvstore/Common.h +++ b/src/kvstore/Common.h @@ -9,6 +9,8 @@ #include #include +#include + #include "common/base/Base.h" #include "common/datatypes/HostAddr.h" #include "common/thrift/ThriftTypes.h" @@ -16,7 +18,7 @@ #include "common/utils/Types.h" #include "interface/gen-cpp2/common_types.h" -DECLARE_int32(balance_expired_sesc); +DECLARE_int64(balance_expired_sesc); namespace nebula { namespace kvstore { @@ -110,11 +112,11 @@ inline std::ostream& operator<<(std::ostream& os, const Peer& peer) { struct Peers { private: std::map peers; - int createdTime; + int64_t createdTime; public: Peers() { - createdTime = static_cast(time::WallClock::fastNowInSec()); + createdTime = time::WallClock::fastNowInSec(); } explicit Peers(const std::vector& addrs) { // from normal peers for (auto& addr : addrs) { @@ -178,8 +180,7 @@ struct Peers { } bool isExpired() const { - return static_cast(time::WallClock::fastNowInSec()) - createdTime > - static_cast(FLAGS_balance_expired_sesc); + return time::WallClock::fastNowInSec() - createdTime > FLAGS_balance_expired_sesc; } void setCreatedTime(int time) { @@ -196,26 +197,20 @@ struct Peers { return os.str(); } - static std::tuple extractHeader(const std::string& header) { - auto pos = header.find(":"); - if (pos == std::string::npos) { - LOG(INFO) << "Parse version from part peers header error:" << header; - return {0, 0, 0}; - } - int version = std::stoi(header.substr(pos + 1)); - pos = header.find(":", pos + 1); - if (pos == std::string::npos) { - LOG(INFO) << "Parse count from part peers header error:" << header; + static std::tuple extractHeader(const std::string& header) { + std::vector fields; + folly::split(":", header, fields, true); + if (fields.size() != 4) { + LOG(INFO) << "Parse part peers header error:" << header; return {0, 0, 0}; } - int count = std::stoi(header.substr(pos + 1)); - pos = header.find(":", pos + 1); - if (pos == std::string::npos) { - LOG(INFO) << "Parse created time from part peers header error:" << header; - return {0, 0, 0}; - } - int createdTime = std::stoi(header.substr(pos + 1)); + int version = std::stoi(fields[1]); + int count = std::stoi(fields[2]); + + int64_t createdTime; + std::istringstream iss(fields[3]); + iss >> createdTime; return {version, count, createdTime}; } diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index 458c6d49359..30206903a0f 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -15,7 +15,7 @@ #include "kvstore/KVStore.h" DEFINE_bool(move_files, false, "Move the SST files instead of copy when ingest into dataset"); -DEFINE_int32(balance_expired_sesc, +DEFINE_int64(balance_expired_sesc, 86400, "The expired time of balancing part info persisted in the storaged"); From 2f1ea74da8d4000ff9419161c671e7bbcf82a169 Mon Sep 17 00:00:00 2001 From: pengweisong <90180021+pengweisong@users.noreply.github.com> Date: Sat, 2 Apr 2022 14:19:09 +0800 Subject: [PATCH 6/6] chage to emplace --- src/kvstore/Common.h | 2 +- src/kvstore/NebulaStore.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/kvstore/Common.h b/src/kvstore/Common.h index 696edf5acae..c97e6c878a7 100644 --- a/src/kvstore/Common.h +++ b/src/kvstore/Common.h @@ -183,7 +183,7 @@ struct Peers { return time::WallClock::fastNowInSec() - createdTime > FLAGS_balance_expired_sesc; } - void setCreatedTime(int time) { + void setCreatedTime(int64_t time) { createdTime = time; } diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index efb06e5fa5f..5906f4b4dbb 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -122,7 +122,7 @@ void NebulaStore::loadPartFromDataPath() { auto spacePart = std::make_pair(spaceId, partId); if (partSet.find(spacePart) == partSet.end()) { - partSet.insert(std::make_pair(spaceId, partId)); + partSet.emplace(std::make_pair(spaceId, partId)); // join the balancing peers with meta peers auto metaStatus = options_.partMan_->partMeta(spaceId, partId);