Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move version info outside of HB #3378

Merged
merged 14 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2394,7 +2394,6 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
req.set_host(options_.localHost_);
req.set_role(options_.role_);
req.set_git_info_sha(options_.gitInfoSHA_);
req.set_version(getOriginVersion());
if (options_.role_ == cpp2::HostRole::STORAGE) {
if (options_.clusterId_.load() == 0) {
options_.clusterId_ = FileBasedClusterIdMan::getClusterIdFromFile(FLAGS_cluster_id_path);
Expand Down Expand Up @@ -3505,6 +3504,7 @@ bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId)

Status MetaClient::verifyVersion() {
auto req = cpp2::VerifyClientVersionReq();
req.set_host(options_.localHost_);
folly::Promise<StatusOr<cpp2::VerifyClientVersionResp>> promise;
auto future = promise.getFuture();
getResponse(
Expand Down
22 changes: 22 additions & 0 deletions src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace nebula {
static const std::unordered_map<std::string, std::pair<std::string, bool>> systemTableMaps = {
{"users", {"__users__", true}},
{"hosts", {"__hosts__", false}},
{"versions", {"__versions__", false}},
{"snapshots", {"__snapshots__", false}},
{"configs", {"__configs__", true}},
{"groups", {"__groups__", true}},
Expand Down Expand Up @@ -58,6 +59,7 @@ static const std::unordered_map<
static const std::string kSpacesTable = tableMaps.at("spaces").first; // NOLINT
static const std::string kPartsTable = tableMaps.at("parts").first; // NOLINT
static const std::string kHostsTable = systemTableMaps.at("hosts").first; // NOLINT
static const std::string kVersionsTable = systemTableMaps.at("versions").first; // NOLINT
static const std::string kTagsTable = tableMaps.at("tags").first; // NOLINT
static const std::string kEdgesTable = tableMaps.at("edges").first; // NOLINT
static const std::string kIndexesTable = tableMaps.at("indexes").first; // NOLINT
Expand Down Expand Up @@ -269,6 +271,26 @@ HostAddr MetaKeyUtils::parseHostKeyV2(folly::StringPiece key) {
return MetaKeyUtils::deserializeHostAddr(key);
}

std::string MetaKeyUtils::versionKey(const HostAddr& h) {
std::string key;
key.append(kVersionsTable.data(), kVersionsTable.size())
.append(MetaKeyUtils::serializeHostAddr(h));
return key;
}

std::string MetaKeyUtils::versionVal(const std::string& version) {
std::string val;
auto versionLen = version.size();
val.reserve(sizeof(int64_t) + versionLen);
val.append(reinterpret_cast<const char*>(&version), sizeof(int64_t)).append(version);
return val;
}

std::string MetaKeyUtils::parseVersion(folly::StringPiece val) {
auto len = *reinterpret_cast<const size_t*>(val.data());
return val.subpiece(sizeof(size_t), len).str();
}

std::string MetaKeyUtils::leaderKey(std::string addr, Port port) {
LOG(ERROR) << "deprecated function\n" << boost::stacktrace::stacktrace();
return leaderKeyV2(addr, port);
Expand Down
6 changes: 6 additions & 0 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ class MetaKeyUtils final {

static HostAddr parseHostKeyV2(folly::StringPiece key);

static std::string versionKey(const HostAddr& h);

static std::string versionVal(const std::string& version);

static std::string parseVersion(folly::StringPiece val);

static std::string leaderKey(std::string ip, Port port);

static std::string leaderKeyV2(std::string addr, Port port);
Expand Down
5 changes: 2 additions & 3 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -565,9 +565,7 @@ struct HBReq {
4: optional map<common.GraphSpaceID, list<LeaderInfo>>
(cpp.template = "std::unordered_map") leader_partIds;
5: binary git_info_sha,
// version of binary
6: optional binary version,
7: optional map<common.GraphSpaceID, map<binary, PartitionList>
6: optional map<common.GraphSpaceID, map<binary, PartitionList>
(cpp.template = "std::unordered_map")>
(cpp.template = "std::unordered_map") disk_parts;
}
Expand Down Expand Up @@ -1113,6 +1111,7 @@ struct VerifyClientVersionResp {

struct VerifyClientVersionReq {
1: required binary version = common.version;
2: common.HostAddr host;
}

service MetaService {
Expand Down
22 changes: 0 additions & 22 deletions src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ struct HostInfo {
int64_t lastHBTimeInMilliSec_ = 0;
cpp2::HostRole role_{cpp2::HostRole::UNKNOWN};
std::string gitInfoSha_;
// version of binary
folly::Optional<std::string> version_;

static HostInfo decode(const folly::StringPiece& data) {
if (data.size() == sizeof(int64_t)) {
Expand Down Expand Up @@ -71,12 +69,6 @@ struct HostInfo {
if (!info.gitInfoSha_.empty()) {
encode.append(info.gitInfoSha_.data(), len);
}

if (info.version_.has_value()) {
len = info.version_.value().size();
encode.append(reinterpret_cast<const char*>(&len), sizeof(std::size_t));
encode.append(info.version_.value().data(), len);
}
return encode;
}

Expand Down Expand Up @@ -104,20 +96,6 @@ struct HostInfo {
}

info.gitInfoSha_ = std::string(data.data() + offset, len);
offset += len;

if (offset == data.size()) {
return info;
}

len = *reinterpret_cast<const size_t*>(data.data() + offset);
offset += sizeof(size_t);

if (offset + len > data.size()) {
FLOG_FATAL("decode out of range, offset=%zu, actual=%zu", offset, data.size());
}

info.version_ = std::string(data.data() + offset, len);
return info;
}
};
Expand Down
3 changes: 0 additions & 3 deletions src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ void HBProcessor::process(const cpp2::HBReq& req) {
}

HostInfo info(time::WallClock::fastNowInMilliSec(), req.get_role(), req.get_git_info_sha());
if (req.version_ref().has_value()) {
info.version_ = *req.version_ref();
heroicNeZha marked this conversation as resolved.
Show resolved Hide resolved
}
if (req.leader_partIds_ref().has_value()) {
ret = ActiveHostsMan::updateHostInfo(kvstore_, host, info, &*req.leader_partIds_ref());
} else {
Expand Down
6 changes: 6 additions & 0 deletions src/meta/processors/admin/VerifyClientVersionProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ void VerifyClientVersionProcessor::process(const cpp2::VerifyClientVersionReq& r
req.get_version().c_str(),
FLAGS_client_white_list.c_str()));
} else {
auto host = req.get_host();
auto versionKey = MetaKeyUtils::versionKey(host);
auto versionVal = MetaKeyUtils::versionVal(req.get_version().c_str());
std::vector<kvstore::KV> versionData;
versionData.emplace_back(std::move(versionKey), std::move(versionVal));
doSyncPut(versionData);
resp_.set_code(nebula::cpp2::ErrorCode::SUCCEEDED);
}
onFinished();
Expand Down
9 changes: 7 additions & 2 deletions src/meta/processors/parts/ListHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,14 @@ nebula::cpp2::ErrorCode ListHostsProcessor::allHostsWithStatus(cpp2::HostRole ro

item.set_role(info.role_);
item.set_git_info_sha(info.gitInfoSha_);
if (info.version_.has_value()) {
item.set_version(info.version_.value());

auto versionKey = MetaKeyUtils::versionKey(item.get_hostAddr());
auto versionRet = doGet(versionKey);
if (nebula::ok(versionRet)) {
auto versionVal = MetaKeyUtils::parseVersion(value(versionRet));
item.set_version(versionVal);
}

if (now - info.lastHBTimeInMilliSec_ < FLAGS_removed_threshold_sec * 1000) {
// If meta didn't receive heartbeat with 2 periods, regard hosts as
// offline. Same as ActiveHostsMan::getActiveHosts
Expand Down
15 changes: 15 additions & 0 deletions src/meta/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,18 @@ nebula_add_test(
wangle
gtest
)

nebula_add_test(
NAME
verify_client_version_test
SOURCES
VerifyClientVersionTest.cpp
OBJECTS
${meta_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
${THRIFT_LIBRARIES}
${PROXYGEN_LIBRARIES}
wangle
gtest
)
75 changes: 75 additions & 0 deletions src/meta/test/VerifyClientVersionTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include <gtest/gtest.h>

#include "common/base/Base.h"
#include "common/fs/TempDir.h"
#include "meta/processors/admin/HBProcessor.h"
#include "meta/processors/admin/VerifyClientVersionProcessor.h"
#include "meta/processors/parts/ListHostsProcessor.h"
#include "meta/test/TestUtils.h"

namespace nebula {
namespace meta {

TEST(VerifyClientVersionTest, VersionTest) {
fs::TempDir rootPath("/tmp/VersionTest.XXXXXX");
std::unique_ptr<kvstore::KVStore> kv(MockCluster::initMetaKV(rootPath.path()));
heroicNeZha marked this conversation as resolved.
Show resolved Hide resolved
{
auto req = cpp2::VerifyClientVersionReq();
req.set_version("1.0.1");
auto* processor = VerifyClientVersionProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_CLIENT_SERVER_INCOMPATIBLE, resp.get_code());
}
{
for (auto i = 0; i < 5; i++) {
auto req = cpp2::VerifyClientVersionReq();
req.set_host(HostAddr(std::to_string(i), i));
auto* processor = VerifyClientVersionProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
}
{
const ClusterID kClusterId = 10;
for (auto i = 0; i < 5; i++) {
auto req = cpp2::HBReq();
req.set_role(cpp2::HostRole::GRAPH);
req.set_host(HostAddr(std::to_string(i), i));
req.set_cluster_id(kClusterId);
auto* processor = HBProcessor::instance(kv.get(), nullptr, kClusterId);
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
}
{
auto req = cpp2::ListHostsReq();
req.set_type(cpp2::ListHostType::GRAPH);
auto* processor = ListHostsProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
ASSERT_EQ(resp.get_hosts().size(), 5);
}
}

} // namespace meta
} // namespace nebula

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, true);
google::SetStderrLogging(google::INFO);
return RUN_ALL_TESTS();
}