Skip to content

Commit

Permalink
upload PartDiskMap infos to meta
Browse files Browse the repository at this point in the history
fix comments and unit test

update

try fix

fix
  • Loading branch information
Nivras committed Nov 30, 2021
1 parent 683f773 commit bca42ae
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 6 deletions.
15 changes: 15 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2414,6 +2414,21 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
} else {
req.set_leader_partIds(std::move(leaderIds));
}

kvstore::DiskParts diskParts;
if (listener_ != nullptr) {
listener_->fetchDiskParts(diskParts);
if (diskParts_ != diskParts) {
{
folly::RWSpinLock::WriteHolder holder(&diskPartsLock_);
diskParts_.clear();
diskParts_ = diskParts;
}
req.set_disk_parts(diskParts);
}
} else {
req.set_disk_parts(diskParts);
}
}

folly::Promise<StatusOr<bool>> promise;
Expand Down
6 changes: 6 additions & 0 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "interface/gen-cpp2/MetaServiceAsyncClient.h"
#include "interface/gen-cpp2/common_types.h"
#include "interface/gen-cpp2/meta_types.h"
#include "kvstore/DiskManager.h"

DECLARE_int32(meta_client_retry_times);
DECLARE_int32(heartbeat_interval_secs);
Expand Down Expand Up @@ -162,6 +163,7 @@ class MetaChangedListener {
virtual void onPartUpdated(const PartHosts& partHosts) = 0;
virtual void fetchLeaderInfo(
std::unordered_map<GraphSpaceID, std::vector<cpp2::LeaderInfo>>& leaders) = 0;
virtual void fetchDiskParts(kvstore::DiskParts& diskParts) = 0;
virtual void onListenerAdded(GraphSpaceID spaceId,
PartitionID partId,
const ListenerHosts& listenerHosts) = 0;
Expand Down Expand Up @@ -733,6 +735,10 @@ class MetaClient {
// leaderIdsLock_ is used to protect leaderIds_
std::unordered_map<GraphSpaceID, std::vector<cpp2::LeaderInfo>> leaderIds_;
folly::RWSpinLock leaderIdsLock_;
// diskPartsLock_ is used to protect diskParts_;
kvstore::DiskParts diskParts_;
folly::RWSpinLock diskPartsLock_;

std::atomic<int64_t> localDataLastUpdateTime_{-1};
std::atomic<int64_t> localCfgLastUpdateTime_{-1};
std::atomic<int64_t> metadLastUpdateTime_{0};
Expand Down
48 changes: 47 additions & 1 deletion src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ static const std::unordered_map<
{"balance_task", {"__balance_task__", nullptr}},
{"balance_plan", {"__balance_plan__", nullptr}},
{"ft_index", {"__ft_index__", nullptr}},
{"local_id", {"__local_id__", MetaKeyUtils::parseLocalIdSpace}}};
{"local_id", {"__local_id__", MetaKeyUtils::parseLocalIdSpace}},
{"disk_parts", {"__disk_parts__", MetaKeyUtils::parseDiskPartsSpace}}};

// clang-format off
static const std::string kSpacesTable = tableMaps.at("spaces").first; // NOLINT
Expand All @@ -71,6 +72,7 @@ static const std::string kLeaderTermsTable = tableMaps.at("leader_terms").fir
static const std::string kGroupsTable = systemTableMaps.at("groups").first; // NOLINT
static const std::string kZonesTable = systemTableMaps.at("zones").first; // NOLINT
static const std::string kListenerTable = tableMaps.at("listener").first; // NOLINT
static const std::string kDiskPartsTable = tableMaps.at("disk_parts").first; // NOLINT

// Used to record the number of vertices and edges in the space
// The number of vertices of each tag in the space
Expand Down Expand Up @@ -1150,4 +1152,48 @@ GraphSpaceID MetaKeyUtils::parseLocalIdSpace(folly::StringPiece rawData) {
return *reinterpret_cast<const GraphSpaceID*>(rawData.data() + offset);
}

GraphSpaceID MetaKeyUtils::parseDiskPartsSpace(folly::StringPiece rawData) {
auto offset = kDiskPartsTable.size();
return *reinterpret_cast<const GraphSpaceID*>(rawData.data() + offset);
}

std::string MetaKeyUtils::diskPartsPrefix() { return kDiskPartsTable; }

std::string MetaKeyUtils::diskPartsPrefix(HostAddr addr) {
std::string key;
std::string hostStr = serializeHostAddr(addr);
key.reserve(kDiskPartsTable.size() + sizeof(hostStr));
key.append(kDiskPartsTable.data(), kDiskPartsTable.size()).append(hostStr.data(), hostStr.size());
return key;
}

std::string MetaKeyUtils::diskPartsPrefix(HostAddr addr, GraphSpaceID spaceId) {
std::string key;
std::string prefix = diskPartsPrefix(addr);
key.reserve(prefix.size() + sizeof(GraphSpaceID));
key.append(prefix.data(), prefix.size())
.append(reinterpret_cast<const char*>(&spaceId), sizeof(GraphSpaceID));
return key;
}

std::string MetaKeyUtils::diskPartsKey(HostAddr addr, GraphSpaceID spaceId, std::string path) {
std::string key;
std::string prefix = diskPartsPrefix(addr, spaceId);
key.reserve(prefix.size() + path.size());
key.append(prefix.data(), prefix.size()).append(path.data(), path.size());
return key;
}

std::string MetaKeyUtils::diskPartsVal(const meta::cpp2::PartitionList& partList) {
std::string val;
apache::thrift::CompactSerializer::serialize(partList, &val);
return val;
}

meta::cpp2::PartitionList MetaKeyUtils::parseDiskPartsVal(const folly::StringPiece& rawData) {
meta::cpp2::PartitionList partList;
apache::thrift::CompactSerializer::deserialize(rawData, partList);
return partList;
}

} // namespace nebula
14 changes: 14 additions & 0 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,20 @@ class MetaKeyUtils final {
static std::unordered_map<std::string, std::pair<std::string, bool>> getSystemInfoMaps();

static std::unordered_map<std::string, std::pair<std::string, bool>> getSystemTableMaps();

static GraphSpaceID parseDiskPartsSpace(folly::StringPiece rawData);

static std::string diskPartsPrefix();

static std::string diskPartsPrefix(HostAddr addr);

static std::string diskPartsPrefix(HostAddr addr, GraphSpaceID spaceId);

static std::string diskPartsKey(HostAddr addr, GraphSpaceID spaceId, std::string path);

static std::string diskPartsVal(const meta::cpp2::PartitionList& partList);

static meta::cpp2::PartitionList parseDiskPartsVal(const folly::StringPiece& rawData);
};

} // namespace nebula
Expand Down
7 changes: 7 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,10 @@ struct LeaderInfo {
2: i64 term
}

struct PartitionList {
1: list<common.PartitionID> part_list;
}

struct HBReq {
1: HostRole role,
2: common.HostAddr host,
Expand All @@ -563,6 +567,9 @@ struct HBReq {
5: binary git_info_sha,
// version of binary
6: optional binary version,
7: optional map<common.GraphSpaceID, map<binary, PartitionList>
(cpp.template = "std::unordered_map")>
(cpp.template = "std::unordered_map") disk_parts;
}

struct IndexFieldDef {
Expand Down
19 changes: 14 additions & 5 deletions src/kvstore/DiskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,21 @@ void DiskManager::removePartFromPath(GraphSpaceID spaceId,
}
}

StatusOr<PartDiskMap> DiskManager::partDist(GraphSpaceID spaceId) {
auto spaceIt = partPath_.find(spaceId);
if (spaceIt == partPath_.end()) {
return Status::Error("Space not found");
void DiskManager::getDiskParts(DiskParts& diskParts) {
std::lock_guard<std::mutex> lg(lock_);
for (const auto& [space, partDiskMap] : partPath_) {
std::unordered_map<std::string, meta::cpp2::PartitionList> tmpPartPaths;
for (const auto& [path, partitions] : partDiskMap) {
std::vector<PartitionID> tmpPartitions;
for (const auto& partition : partitions) {
tmpPartitions.emplace_back(partition);
}
meta::cpp2::PartitionList ps;
ps.set_part_list(tmpPartitions);
tmpPartPaths[path] = ps;
}
diskParts.emplace(space, std::move(tmpPartPaths));
}
return spaceIt->second;
}

bool DiskManager::hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId) {
Expand Down
6 changes: 6 additions & 0 deletions src/kvstore/DiskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
#include "common/base/StatusOr.h"
#include "common/thread/GenericWorker.h"
#include "common/thrift/ThriftTypes.h"
#include "interface/gen-cpp2/meta_types.h"

namespace nebula {
namespace kvstore {

using PartDiskMap = std::unordered_map<std::string, std::set<PartitionID>>;
using DiskParts =
std::unordered_map<GraphSpaceID, std::unordered_map<std::string, meta::cpp2::PartitionList>>;

class DiskManager {
FRIEND_TEST(DiskManagerTest, AvailableTest);
Expand Down Expand Up @@ -52,6 +55,9 @@ class DiskManager {
// Given a space, return data path and all partition in the path
StatusOr<PartDiskMap> partDist(GraphSpaceID spaceId);

// Get all space data path and all partition in the path
void getDiskParts(DiskParts& diskParts);

private:
// refresh free bytes of data path periodically
void refresh();
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ void NebulaStore::checkRemoteListeners(GraphSpaceID spaceId,
}
}

void NebulaStore::fetchDiskParts(DiskParts& diskParts) { diskMan_->getDiskParts(diskParts); }

void NebulaStore::updateSpaceOption(GraphSpaceID spaceId,
const std::unordered_map<std::string, std::string>& options,
bool isDbOption) {
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ class NebulaStore : public KVStore, public Handler {
PartitionID partId,
const std::vector<HostAddr>& remoteListeners) override;

void fetchDiskParts(DiskParts& diskParts) override;

nebula::cpp2::ErrorCode multiPutWithoutReplicator(GraphSpaceID spaceId,
std::vector<KV> keyValues) override;

Expand Down
8 changes: 8 additions & 0 deletions src/kvstore/PartManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ void MetaServerBasedPartManager::fetchLeaderInfo(
}
}

void MetaServerBasedPartManager::fetchDiskParts(DiskParts& diskParts) {
if (handler_ != nullptr) {
handler_->fetchDiskParts(diskParts);
} else {
VLOG(1) << "handler_ is nullptr!";
}
}

meta::ListenersMap MetaServerBasedPartManager::listeners(const HostAddr& host) {
auto ret = client_->getListenersByHostFromCache(host);
if (ret.ok()) {
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "clients/meta/MetaClient.h"
#include "common/base/Base.h"
#include "common/meta/Common.h"
#include "kvstore/DiskManager.h"

namespace nebula {
namespace kvstore {
Expand Down Expand Up @@ -49,6 +50,8 @@ class Handler {
virtual void checkRemoteListeners(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<HostAddr>& remoteListeners) = 0;

virtual void fetchDiskParts(DiskParts& diskParts) = 0;
};

/**
Expand Down Expand Up @@ -210,6 +213,8 @@ class MetaServerBasedPartManager : public PartManager, public meta::MetaChangedL
void fetchLeaderInfo(
std::unordered_map<GraphSpaceID, std::vector<meta::cpp2::LeaderInfo>>& leaderParts) override;

void fetchDiskParts(DiskParts& diskParts) override;

void onListenerAdded(GraphSpaceID spaceId,
PartitionID partId,
const meta::ListenerHosts& listenerHosts) override;
Expand Down
12 changes: 12 additions & 0 deletions src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ void HBProcessor::process(const cpp2::HBReq& req) {
onFinished();
return;
}

if (req.disk_parts_ref().has_value()) {
for (const auto& [spaceId, partDiskMap] : *req.get_disk_parts()) {
for (const auto& [path, partList] : partDiskMap) {
auto partListVal = MetaKeyUtils::diskPartsVal(partList);
std::string key = MetaKeyUtils::diskPartsKey(host, spaceId, path);
std::vector<kvstore::KV> data;
data.push_back(std::make_pair(key, partListVal));
doPut(data);
}
}
}
}

HostInfo info(time::WallClock::fastNowInMilliSec(), req.get_role(), req.get_git_info_sha());
Expand Down
5 changes: 5 additions & 0 deletions src/meta/test/MetaClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,11 @@ class TestListener : public MetaChangedListener {
UNUSED(remoteListeners);
}

void fetchDiskParts(kvstore::DiskParts& diskParts) override {
UNUSED(diskParts);
LOG(INFO) << "Fetch Disk Paths";
}

int32_t spaceNum = 0;
int32_t partNum = 0;
int32_t partChanged = 0;
Expand Down

0 comments on commit bca42ae

Please sign in to comment.