Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add code for part peers backward compatible #4101

Merged
merged 8 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ std::string NebulaKeyUtils::systemPartKey(PartitionID partId) {
return key;
}

// static
std::string NebulaKeyUtils::systemBalanceKey(PartitionID partId) {
uint32_t item = (partId << kPartitionOffset) | static_cast<uint32_t>(NebulaKeyType::kSystem);
uint32_t type = static_cast<uint32_t>(NebulaSystemKeyType::kSystemBalance);
std::string key;
key.reserve(kSystemLen);
key.append(reinterpret_cast<const char*>(&item), sizeof(PartitionID))
.append(reinterpret_cast<const char*>(&type), sizeof(NebulaSystemKeyType));
return key;
}

// static
std::string NebulaKeyUtils::kvKey(PartitionID partId, const folly::StringPiece& name) {
std::string key;
Expand Down
15 changes: 15 additions & 0 deletions src/common/utils/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class NebulaKeyUtils final {

static std::string systemPartKey(PartitionID partId);

static std::string systemBalanceKey(PartitionID partId);

static std::string kvKey(PartitionID partId, const folly::StringPiece& name);
static std::string kvPrefix(PartitionID partId);

Expand Down Expand Up @@ -189,6 +191,19 @@ class NebulaKeyUtils final {
return static_cast<NebulaSystemKeyType>(type) == NebulaSystemKeyType::kSystemPart;
}

static bool isSystemBalance(const folly::StringPiece& rawKey) {
if (rawKey.size() != kSystemLen) {
return false;
}
if (!isSystem(rawKey)) {
return false;
}
auto position = rawKey.data() + sizeof(PartitionID);
auto len = sizeof(NebulaSystemKeyType);
auto type = readInt<uint32_t>(position, len);
return static_cast<NebulaSystemKeyType>(type) == NebulaSystemKeyType::kSystemBalance;
}

static VertexIDSlice getSrcId(size_t vIdLen, const folly::StringPiece& rawKey) {
if (rawKey.size() < kEdgeLen + (vIdLen << 1)) {
dumpBadKey(rawKey, kEdgeLen + (vIdLen << 1), vIdLen);
Expand Down
1 change: 1 addition & 0 deletions src/common/utils/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum class NebulaKeyType : uint32_t {
enum class NebulaSystemKeyType : uint32_t {
kSystemCommit = 0x00000001,
kSystemPart = 0x00000002,
kSystemBalance = 0x00000003,
};

enum class NebulaOperationType : uint32_t {
Expand Down
77 changes: 59 additions & 18 deletions src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
#include <folly/Function.h>
#include <rocksdb/slice.h>

#include <sstream>

#include "common/base/Base.h"
#include "common/datatypes/HostAddr.h"
#include "common/thrift/ThriftTypes.h"
#include "common/time/WallClock.h"
#include "common/utils/Types.h"
#include "interface/gen-cpp2/common_types.h"

DECLARE_int64(balance_expired_sesc);

namespace nebula {
namespace kvstore {

Expand Down Expand Up @@ -59,6 +64,7 @@ struct Peer {
Status status;

Peer() : addr(), status(Status::kNormalPeer) {}
explicit Peer(HostAddr a) : addr(a), status(Status::kNormalPeer) {}
Peer(HostAddr a, Status s) : addr(a), status(s) {}

std::string toString() const {
Expand Down Expand Up @@ -106,26 +112,37 @@ inline std::ostream& operator<<(std::ostream& os, const Peer& peer) {
struct Peers {
private:
std::map<HostAddr, Peer> peers;
int64_t createdTime;

public:
Peers() {}
Peers() {
createdTime = time::WallClock::fastNowInSec();
}
explicit Peers(const std::vector<HostAddr>& addrs) { // from normal peers
for (auto& addr : addrs) {
peers[addr] = Peer(addr, Peer::Status::kNormalPeer);
}
createdTime = time::WallClock::fastNowInSec();
}
explicit Peers(const std::vector<Peer>& ps) {
for (auto& p : ps) {
peers[p.addr] = p;
}
createdTime = time::WallClock::fastNowInSec();
}
explicit Peers(std::map<HostAddr, Peer> ps) : peers(std::move(ps)) {
createdTime = time::WallClock::fastNowInSec();
}
explicit Peers(std::map<HostAddr, Peer> ps) : peers(std::move(ps)) {}

void addOrUpdate(const Peer& peer) {
peers[peer.addr] = peer;
}

bool get(const HostAddr& addr, Peer* peer) {
bool exist(const HostAddr& addr) const {
return peers.find(addr) != peers.end();
}

bool get(const HostAddr& addr, Peer* peer) const {
auto it = peers.find(addr);
if (it == peers.end()) {
return false;
Expand All @@ -141,39 +158,61 @@ struct Peers {
peers.erase(addr);
}

size_t size() {
size_t size() const {
return peers.size();
}

std::map<HostAddr, Peer> getPeers() {
return peers;
}

bool allNormalPeers() const {
for (const auto& [addr, peer] : peers) {
if (peer.status == Peer::Status::kDeleted) {
continue;
}

if (peer.status != Peer::Status::kNormalPeer) {
return false;
}
}
return true;
}

bool isExpired() const {
return time::WallClock::fastNowInSec() - createdTime > FLAGS_balance_expired_sesc;
}

void setCreatedTime(int64_t time) {
createdTime = time;
}

std::string toString() const {
std::stringstream os;
os << "version:1,"
<< "count:" << peers.size() << "\n";
<< "count:" << peers.size() << ",ts:" << createdTime << "\n";
for (const auto& [_, p] : peers) {
os << p << "\n";
}
return os.str();
}

static std::pair<int, int> extractHeader(const std::string& header) {
auto pos = header.find(":");
if (pos == std::string::npos) {
static std::tuple<int, int, int64_t> extractHeader(const std::string& header) {
std::vector<std::string> fields;
folly::split(":", header, fields, true);
if (fields.size() != 4) {
LOG(INFO) << "Parse part peers header error:" << header;
return {0, 0};
return {0, 0, 0};
}
int version = std::stoi(header.substr(pos + 1));
pos = header.find(":", pos + 1);
if (pos == std::string::npos) {
LOG(INFO) << "Parse part peers header error:" << header;
return {0, 0};
}
int count = std::stoi(header.substr(pos + 1));

return {version, count};
int version = std::stoi(fields[1]);
int count = std::stoi(fields[2]);

int64_t createdTime;
std::istringstream iss(fields[3]);
iss >> createdTime;

return {version, count, createdTime};
}

static Peers fromString(const std::string& str) {
Expand All @@ -186,7 +225,7 @@ struct Peers {
return peers;
}

auto [version, count] = extractHeader(lines[0]);
auto [version, count, createdTime] = extractHeader(lines[0]);
if (version != 1) {
LOG(INFO) << "Wrong peers format version:" << version;
return peers;
Expand All @@ -198,6 +237,8 @@ struct Peers {
return peers;
}

peers.setCreatedTime(createdTime);

// skip header
for (size_t i = 1; i < lines.size(); ++i) {
auto& line = lines[i];
Expand Down
9 changes: 4 additions & 5 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,10 @@ class KVEngine {
virtual void addPart(PartitionID partId, const Peers& raftPeers) = 0;

/**
* @brief Update part info. Could only update the peers' persist info now.
* @brief Update part info. Could only update the persist peers info in balancing now.
*
* @param partId
* @param raftPeer 1. if raftPeer.status is kDeleted, delete this peer.
* 2. if raftPeer.status is others, add or update this peer
* @param raftPeer
*/
virtual void updatePart(PartitionID partId, const Peer& raftPeer) = 0;

Expand All @@ -251,11 +250,11 @@ class KVEngine {
virtual std::vector<PartitionID> allParts() = 0;

/**
* @brief Return all partId->raft peers that current storage engine holds.
* @brief Return all balancing partId->raft peers that current storage engine holds.
*
* @return std::map<PartitionID, Peers> partId-> raft peers for each part, including learners
*/
virtual std::map<PartitionID, Peers> allPartPeers() = 0;
virtual std::map<PartitionID, Peers> balancePartPeers() = 0;

/**
* @brief Return total parts num
Expand Down
88 changes: 67 additions & 21 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

#include "common/fs/FileUtils.h"
#include "common/network/NetworkUtils.h"
#include "common/time/WallClock.h"
#include "common/utils/NebulaKeyUtils.h"
#include "kvstore/NebulaSnapshotManager.h"
#include "kvstore/RocksEngine.h"

Expand Down Expand Up @@ -81,7 +83,8 @@ bool NebulaStore::init() {
void NebulaStore::loadPartFromDataPath() {
CHECK(!!options_.partMan_);
LOG(INFO) << "Scan the local path, and init the spaces_";
std::unordered_set<std::pair<GraphSpaceID, PartitionID>> spacePartIdSet;
// avoid duplicate engine created
std::unordered_set<std::pair<GraphSpaceID, PartitionID>> partSet;
for (auto& path : options_.dataPaths_) {
auto rootPath = folly::stringPrintf("%s/nebula", path.c_str());
auto dirs = fs::FileUtils::listAllDirsInDir(rootPath.c_str());
Expand All @@ -101,35 +104,78 @@ void NebulaStore::loadPartFromDataPath() {
continue;
}

// Load raft peers info which persisted to local engine.
// If the partition was in balancing process before restart, we should keep it
// though the part is not in the meta.
auto engine = newEngine(spaceId, path, options_.walPath_);
std::map<PartitionID, Peers> partRaftPeers;
for (auto& [partId, raftPeers] : engine->allPartPeers()) {
bool isNormalPeer = true;

Peer raftPeer;
bool exist = raftPeers.get(raftAddr_, &raftPeer);
if (exist) {
if (raftPeer.status != Peer::Status::kNormalPeer) {
isNormalPeer = false;

// load balancing part info which persisted to local engine.
for (auto& [partId, raftPeers] : engine->balancePartPeers()) {
liwenhui-soul marked this conversation as resolved.
Show resolved Hide resolved
CHECK_NE(raftPeers.size(), 0);

if (raftPeers.isExpired()) {
LOG(INFO) << "Space: " << spaceId << ", part:" << partId
<< " balancing info expired, ignore it.";
continue;
}

auto spacePart = std::make_pair(spaceId, partId);
if (partSet.find(spacePart) == partSet.end()) {
partSet.emplace(std::make_pair(spaceId, partId));

// join the balancing peers with meta peers
auto metaStatus = options_.partMan_->partMeta(spaceId, partId);
if (!metaStatus.ok()) {
LOG(INFO) << "Space: " << spaceId << "; partId: " << partId
<< " does not exist in part manager when join balancing.";
} else {
auto partMeta = metaStatus.value();
for (auto& h : partMeta.hosts_) {
auto raftAddr = getRaftAddr(h);
if (!raftPeers.exist(raftAddr)) {
VLOG(1) << "Add raft peer " << raftAddr;
raftPeers.addOrUpdate(Peer(raftAddr));
}
}
}

partRaftPeers.emplace(partId, raftPeers);
}
}

// load normal part ids which persisted to local engine.
for (auto& partId : engine->allParts()) {
// first priority: balancing
bool inBalancing = partRaftPeers.find(partId) != partRaftPeers.end();
if (inBalancing) {
continue;
}

if (!options_.partMan_->partExist(storeSvcAddr_, spaceId, partId).ok() && isNormalPeer) {
LOG(INFO) << "Part " << partId
<< " is a normal peer and does not exist in meta any more, will remove it!";
// second priority: meta
if (!options_.partMan_->partExist(storeSvcAddr_, spaceId, partId).ok()) {
LOG(INFO)
<< "Part " << partId
<< " is not in balancing and does not exist in meta any more, will remove it!";
engine->removePart(partId);
continue;
} else {
auto spacePart = std::make_pair(spaceId, partId);
if (spacePartIdSet.find(spacePart) == spacePartIdSet.end()) {
spacePartIdSet.emplace(spacePart);
partRaftPeers.emplace(partId, raftPeers);
}

auto spacePart = std::make_pair(spaceId, partId);
if (partSet.find(spacePart) == partSet.end()) {
partSet.emplace(spacePart);

// fill the peers
auto metaStatus = options_.partMan_->partMeta(spaceId, partId);
CHECK(metaStatus.ok());
auto partMeta = metaStatus.value();
Peers peers;
for (auto& h : partMeta.hosts_) {
VLOG(1) << "Add raft peer " << getRaftAddr(h);
peers.addOrUpdate(Peer(getRaftAddr(h)));
}
partRaftPeers.emplace(partId, peers);
}
}

// there is no valid part in this engine, remove it
if (partRaftPeers.empty()) {
engine.reset(); // close engine
if (!options_.partMan_->spaceExist(storeSvcAddr_, spaceId).ok()) {
Expand All @@ -141,7 +187,7 @@ void NebulaStore::loadPartFromDataPath() {
continue;
}

// add to spaces if the part should exist
// add to spaces
KVEngine* enginePtr = nullptr;
{
folly::RWSpinLock::WriteHolder wh(&lock_);
Expand Down
Loading