Skip to content

Commit

Permalink
add code for part peers backward compatible (#4101)
Browse files Browse the repository at this point in the history
* add code for part peers backward compatible

* add balance keys

* fix bug: pass tests

* add gflag && ignore

* change int to int64_t

* chage to emplace

Co-authored-by: panda-sheep <59197347+panda-sheep@users.noreply.github.com>
Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
3 people authored Apr 6, 2022
1 parent 6b20961 commit 99d93f5
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 59 deletions.
11 changes: 11 additions & 0 deletions src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ std::string NebulaKeyUtils::systemPartKey(PartitionID partId) {
return key;
}

// static
std::string NebulaKeyUtils::systemBalanceKey(PartitionID partId) {
uint32_t item = (partId << kPartitionOffset) | static_cast<uint32_t>(NebulaKeyType::kSystem);
uint32_t type = static_cast<uint32_t>(NebulaSystemKeyType::kSystemBalance);
std::string key;
key.reserve(kSystemLen);
key.append(reinterpret_cast<const char*>(&item), sizeof(PartitionID))
.append(reinterpret_cast<const char*>(&type), sizeof(NebulaSystemKeyType));
return key;
}

// static
std::string NebulaKeyUtils::kvKey(PartitionID partId, const folly::StringPiece& name) {
std::string key;
Expand Down
15 changes: 15 additions & 0 deletions src/common/utils/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class NebulaKeyUtils final {

static std::string systemPartKey(PartitionID partId);

static std::string systemBalanceKey(PartitionID partId);

static std::string kvKey(PartitionID partId, const folly::StringPiece& name);
static std::string kvPrefix(PartitionID partId);

Expand Down Expand Up @@ -189,6 +191,19 @@ class NebulaKeyUtils final {
return static_cast<NebulaSystemKeyType>(type) == NebulaSystemKeyType::kSystemPart;
}

static bool isSystemBalance(const folly::StringPiece& rawKey) {
if (rawKey.size() != kSystemLen) {
return false;
}
if (!isSystem(rawKey)) {
return false;
}
auto position = rawKey.data() + sizeof(PartitionID);
auto len = sizeof(NebulaSystemKeyType);
auto type = readInt<uint32_t>(position, len);
return static_cast<NebulaSystemKeyType>(type) == NebulaSystemKeyType::kSystemBalance;
}

static VertexIDSlice getSrcId(size_t vIdLen, const folly::StringPiece& rawKey) {
if (rawKey.size() < kEdgeLen + (vIdLen << 1)) {
dumpBadKey(rawKey, kEdgeLen + (vIdLen << 1), vIdLen);
Expand Down
1 change: 1 addition & 0 deletions src/common/utils/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum class NebulaKeyType : uint32_t {
enum class NebulaSystemKeyType : uint32_t {
kSystemCommit = 0x00000001,
kSystemPart = 0x00000002,
kSystemBalance = 0x00000003,
};

enum class NebulaOperationType : uint32_t {
Expand Down
77 changes: 59 additions & 18 deletions src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
#include <folly/Function.h>
#include <rocksdb/slice.h>

#include <sstream>

#include "common/base/Base.h"
#include "common/datatypes/HostAddr.h"
#include "common/thrift/ThriftTypes.h"
#include "common/time/WallClock.h"
#include "common/utils/Types.h"
#include "interface/gen-cpp2/common_types.h"

DECLARE_int64(balance_expired_sesc);

namespace nebula {
namespace kvstore {

Expand Down Expand Up @@ -59,6 +64,7 @@ struct Peer {
Status status;

Peer() : addr(), status(Status::kNormalPeer) {}
explicit Peer(HostAddr a) : addr(a), status(Status::kNormalPeer) {}
Peer(HostAddr a, Status s) : addr(a), status(s) {}

std::string toString() const {
Expand Down Expand Up @@ -106,26 +112,37 @@ inline std::ostream& operator<<(std::ostream& os, const Peer& peer) {
struct Peers {
private:
std::map<HostAddr, Peer> peers;
int64_t createdTime;

public:
Peers() {}
Peers() {
createdTime = time::WallClock::fastNowInSec();
}
explicit Peers(const std::vector<HostAddr>& addrs) { // from normal peers
for (auto& addr : addrs) {
peers[addr] = Peer(addr, Peer::Status::kNormalPeer);
}
createdTime = time::WallClock::fastNowInSec();
}
explicit Peers(const std::vector<Peer>& ps) {
for (auto& p : ps) {
peers[p.addr] = p;
}
createdTime = time::WallClock::fastNowInSec();
}
explicit Peers(std::map<HostAddr, Peer> ps) : peers(std::move(ps)) {
createdTime = time::WallClock::fastNowInSec();
}
explicit Peers(std::map<HostAddr, Peer> ps) : peers(std::move(ps)) {}

void addOrUpdate(const Peer& peer) {
peers[peer.addr] = peer;
}

bool get(const HostAddr& addr, Peer* peer) {
bool exist(const HostAddr& addr) const {
return peers.find(addr) != peers.end();
}

bool get(const HostAddr& addr, Peer* peer) const {
auto it = peers.find(addr);
if (it == peers.end()) {
return false;
Expand All @@ -141,39 +158,61 @@ struct Peers {
peers.erase(addr);
}

size_t size() {
size_t size() const {
return peers.size();
}

std::map<HostAddr, Peer> getPeers() {
return peers;
}

bool allNormalPeers() const {
for (const auto& [addr, peer] : peers) {
if (peer.status == Peer::Status::kDeleted) {
continue;
}

if (peer.status != Peer::Status::kNormalPeer) {
return false;
}
}
return true;
}

bool isExpired() const {
return time::WallClock::fastNowInSec() - createdTime > FLAGS_balance_expired_sesc;
}

void setCreatedTime(int64_t time) {
createdTime = time;
}

std::string toString() const {
std::stringstream os;
os << "version:1,"
<< "count:" << peers.size() << "\n";
<< "count:" << peers.size() << ",ts:" << createdTime << "\n";
for (const auto& [_, p] : peers) {
os << p << "\n";
}
return os.str();
}

static std::pair<int, int> extractHeader(const std::string& header) {
auto pos = header.find(":");
if (pos == std::string::npos) {
static std::tuple<int, int, int64_t> extractHeader(const std::string& header) {
std::vector<std::string> fields;
folly::split(":", header, fields, true);
if (fields.size() != 4) {
LOG(INFO) << "Parse part peers header error:" << header;
return {0, 0};
return {0, 0, 0};
}
int version = std::stoi(header.substr(pos + 1));
pos = header.find(":", pos + 1);
if (pos == std::string::npos) {
LOG(INFO) << "Parse part peers header error:" << header;
return {0, 0};
}
int count = std::stoi(header.substr(pos + 1));

return {version, count};
int version = std::stoi(fields[1]);
int count = std::stoi(fields[2]);

int64_t createdTime;
std::istringstream iss(fields[3]);
iss >> createdTime;

return {version, count, createdTime};
}

static Peers fromString(const std::string& str) {
Expand All @@ -186,7 +225,7 @@ struct Peers {
return peers;
}

auto [version, count] = extractHeader(lines[0]);
auto [version, count, createdTime] = extractHeader(lines[0]);
if (version != 1) {
LOG(INFO) << "Wrong peers format version:" << version;
return peers;
Expand All @@ -198,6 +237,8 @@ struct Peers {
return peers;
}

peers.setCreatedTime(createdTime);

// skip header
for (size_t i = 1; i < lines.size(); ++i) {
auto& line = lines[i];
Expand Down
9 changes: 4 additions & 5 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,10 @@ class KVEngine {
virtual void addPart(PartitionID partId, const Peers& raftPeers) = 0;

/**
* @brief Update part info. Could only update the peers' persist info now.
* @brief Update part info. Could only update the persist peers info in balancing now.
*
* @param partId
* @param raftPeer 1. if raftPeer.status is kDeleted, delete this peer.
* 2. if raftPeer.status is others, add or update this peer
* @param raftPeer
*/
virtual void updatePart(PartitionID partId, const Peer& raftPeer) = 0;

Expand All @@ -251,11 +250,11 @@ class KVEngine {
virtual std::vector<PartitionID> allParts() = 0;

/**
* @brief Return all partId->raft peers that current storage engine holds.
* @brief Return all balancing partId->raft peers that current storage engine holds.
*
* @return std::map<PartitionID, Peers> partId-> raft peers for each part, including learners
*/
virtual std::map<PartitionID, Peers> allPartPeers() = 0;
virtual std::map<PartitionID, Peers> balancePartPeers() = 0;

/**
* @brief Return total parts num
Expand Down
88 changes: 67 additions & 21 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

#include "common/fs/FileUtils.h"
#include "common/network/NetworkUtils.h"
#include "common/time/WallClock.h"
#include "common/utils/NebulaKeyUtils.h"
#include "kvstore/NebulaSnapshotManager.h"
#include "kvstore/RocksEngine.h"

Expand Down Expand Up @@ -81,7 +83,8 @@ bool NebulaStore::init() {
void NebulaStore::loadPartFromDataPath() {
CHECK(!!options_.partMan_);
LOG(INFO) << "Scan the local path, and init the spaces_";
std::unordered_set<std::pair<GraphSpaceID, PartitionID>> spacePartIdSet;
// avoid duplicate engine created
std::unordered_set<std::pair<GraphSpaceID, PartitionID>> partSet;
for (auto& path : options_.dataPaths_) {
auto rootPath = folly::stringPrintf("%s/nebula", path.c_str());
auto dirs = fs::FileUtils::listAllDirsInDir(rootPath.c_str());
Expand All @@ -101,35 +104,78 @@ void NebulaStore::loadPartFromDataPath() {
continue;
}

// Load raft peers info which persisted to local engine.
// If the partition was in balancing process before restart, we should keep it
// though the part is not in the meta.
auto engine = newEngine(spaceId, path, options_.walPath_);
std::map<PartitionID, Peers> partRaftPeers;
for (auto& [partId, raftPeers] : engine->allPartPeers()) {
bool isNormalPeer = true;

Peer raftPeer;
bool exist = raftPeers.get(raftAddr_, &raftPeer);
if (exist) {
if (raftPeer.status != Peer::Status::kNormalPeer) {
isNormalPeer = false;

// load balancing part info which persisted to local engine.
for (auto& [partId, raftPeers] : engine->balancePartPeers()) {
CHECK_NE(raftPeers.size(), 0);

if (raftPeers.isExpired()) {
LOG(INFO) << "Space: " << spaceId << ", part:" << partId
<< " balancing info expired, ignore it.";
continue;
}

auto spacePart = std::make_pair(spaceId, partId);
if (partSet.find(spacePart) == partSet.end()) {
partSet.emplace(std::make_pair(spaceId, partId));

// join the balancing peers with meta peers
auto metaStatus = options_.partMan_->partMeta(spaceId, partId);
if (!metaStatus.ok()) {
LOG(INFO) << "Space: " << spaceId << "; partId: " << partId
<< " does not exist in part manager when join balancing.";
} else {
auto partMeta = metaStatus.value();
for (auto& h : partMeta.hosts_) {
auto raftAddr = getRaftAddr(h);
if (!raftPeers.exist(raftAddr)) {
VLOG(1) << "Add raft peer " << raftAddr;
raftPeers.addOrUpdate(Peer(raftAddr));
}
}
}

partRaftPeers.emplace(partId, raftPeers);
}
}

// load normal part ids which persisted to local engine.
for (auto& partId : engine->allParts()) {
// first priority: balancing
bool inBalancing = partRaftPeers.find(partId) != partRaftPeers.end();
if (inBalancing) {
continue;
}

if (!options_.partMan_->partExist(storeSvcAddr_, spaceId, partId).ok() && isNormalPeer) {
LOG(INFO) << "Part " << partId
<< " is a normal peer and does not exist in meta any more, will remove it!";
// second priority: meta
if (!options_.partMan_->partExist(storeSvcAddr_, spaceId, partId).ok()) {
LOG(INFO)
<< "Part " << partId
<< " is not in balancing and does not exist in meta any more, will remove it!";
engine->removePart(partId);
continue;
} else {
auto spacePart = std::make_pair(spaceId, partId);
if (spacePartIdSet.find(spacePart) == spacePartIdSet.end()) {
spacePartIdSet.emplace(spacePart);
partRaftPeers.emplace(partId, raftPeers);
}

auto spacePart = std::make_pair(spaceId, partId);
if (partSet.find(spacePart) == partSet.end()) {
partSet.emplace(spacePart);

// fill the peers
auto metaStatus = options_.partMan_->partMeta(spaceId, partId);
CHECK(metaStatus.ok());
auto partMeta = metaStatus.value();
Peers peers;
for (auto& h : partMeta.hosts_) {
VLOG(1) << "Add raft peer " << getRaftAddr(h);
peers.addOrUpdate(Peer(getRaftAddr(h)));
}
partRaftPeers.emplace(partId, peers);
}
}

// there is no valid part in this engine, remove it
if (partRaftPeers.empty()) {
engine.reset(); // close engine
if (!options_.partMan_->spaceExist(storeSvcAddr_, spaceId).ok()) {
Expand All @@ -141,7 +187,7 @@ void NebulaStore::loadPartFromDataPath() {
continue;
}

// add to spaces if the part should exist
// add to spaces
KVEngine* enginePtr = nullptr;
{
folly::RWSpinLock::WriteHolder wh(&lock_);
Expand Down
Loading

0 comments on commit 99d93f5

Please sign in to comment.