Skip to content

Commit

Permalink
Merge branch 'master' into feature/bison-datetime-parser
Browse files Browse the repository at this point in the history
  • Loading branch information
Shylock-Hg authored Dec 8, 2021
2 parents bcac0e0 + d6f83f3 commit 828cc3e
Show file tree
Hide file tree
Showing 27 changed files with 545 additions and 137 deletions.
2 changes: 1 addition & 1 deletion src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2394,7 +2394,6 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
req.set_host(options_.localHost_);
req.set_role(options_.role_);
req.set_git_info_sha(options_.gitInfoSHA_);
req.set_version(getOriginVersion());
if (options_.role_ == cpp2::HostRole::STORAGE) {
if (options_.clusterId_.load() == 0) {
options_.clusterId_ = FileBasedClusterIdMan::getClusterIdFromFile(FLAGS_cluster_id_path);
Expand Down Expand Up @@ -3505,6 +3504,7 @@ bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId)

Status MetaClient::verifyVersion() {
auto req = cpp2::VerifyClientVersionReq();
req.set_host(options_.localHost_);
folly::Promise<StatusOr<cpp2::VerifyClientVersionResp>> promise;
auto future = promise.getFuture();
getResponse(
Expand Down
22 changes: 22 additions & 0 deletions src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace nebula {
static const std::unordered_map<std::string, std::pair<std::string, bool>> systemTableMaps = {
{"users", {"__users__", true}},
{"hosts", {"__hosts__", false}},
{"versions", {"__versions__", false}},
{"snapshots", {"__snapshots__", false}},
{"configs", {"__configs__", true}},
{"groups", {"__groups__", true}},
Expand Down Expand Up @@ -58,6 +59,7 @@ static const std::unordered_map<
static const std::string kSpacesTable = tableMaps.at("spaces").first; // NOLINT
static const std::string kPartsTable = tableMaps.at("parts").first; // NOLINT
static const std::string kHostsTable = systemTableMaps.at("hosts").first; // NOLINT
static const std::string kVersionsTable = systemTableMaps.at("versions").first; // NOLINT
static const std::string kTagsTable = tableMaps.at("tags").first; // NOLINT
static const std::string kEdgesTable = tableMaps.at("edges").first; // NOLINT
static const std::string kIndexesTable = tableMaps.at("indexes").first; // NOLINT
Expand Down Expand Up @@ -269,6 +271,26 @@ HostAddr MetaKeyUtils::parseHostKeyV2(folly::StringPiece key) {
return MetaKeyUtils::deserializeHostAddr(key);
}

std::string MetaKeyUtils::versionKey(const HostAddr& h) {
std::string key;
key.append(kVersionsTable.data(), kVersionsTable.size())
.append(MetaKeyUtils::serializeHostAddr(h));
return key;
}

std::string MetaKeyUtils::versionVal(const std::string& version) {
std::string val;
auto versionLen = version.size();
val.reserve(sizeof(int64_t) + versionLen);
val.append(reinterpret_cast<const char*>(&version), sizeof(int64_t)).append(version);
return val;
}

std::string MetaKeyUtils::parseVersion(folly::StringPiece val) {
auto len = *reinterpret_cast<const size_t*>(val.data());
return val.subpiece(sizeof(size_t), len).str();
}

std::string MetaKeyUtils::leaderKey(std::string addr, Port port) {
LOG(ERROR) << "deprecated function\n" << boost::stacktrace::stacktrace();
return leaderKeyV2(addr, port);
Expand Down
6 changes: 6 additions & 0 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ class MetaKeyUtils final {

static HostAddr parseHostKeyV2(folly::StringPiece key);

static std::string versionKey(const HostAddr& h);

static std::string versionVal(const std::string& version);

static std::string parseVersion(folly::StringPiece val);

static std::string leaderKey(std::string ip, Port port);

static std::string leaderKeyV2(std::string addr, Port port);
Expand Down
146 changes: 75 additions & 71 deletions src/graph/validator/MutateValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,17 @@ Status DeleteVerticesValidator::validateImpl() {
vertices_.emplace_back(std::move(idStatus).value());
}
}

auto ret = qctx_->schemaMng()->getAllEdge(spaceId_);
NG_RETURN_IF_ERROR(ret);
edgeNames_ = std::move(ret).value();
for (auto &name : edgeNames_) {
auto edgeStatus = qctx_->schemaMng()->toEdgeType(spaceId_, name);
NG_RETURN_IF_ERROR(edgeStatus);
auto edgeType = edgeStatus.value();
edgeTypes_.emplace_back(edgeType);
withEdge_ = sentence->withEdge();
if (withEdge_) {
auto ret = qctx_->schemaMng()->getAllEdge(spaceId_);
NG_RETURN_IF_ERROR(ret);
edgeNames_ = std::move(ret).value();
for (auto &name : edgeNames_) {
auto edgeStatus = qctx_->schemaMng()->toEdgeType(spaceId_, name);
NG_RETURN_IF_ERROR(edgeStatus);
auto edgeType = edgeStatus.value();
edgeTypes_.emplace_back(edgeType);
}
}
return Status::OK();
}
Expand Down Expand Up @@ -348,68 +350,71 @@ Status DeleteVerticesValidator::toPlan() {

auto *dedupVid = Dedup::make(qctx_, nullptr);
dedupVid->setInputVar(vidVar);
DeleteVertices *dvNode = nullptr;
if (withEdge_) {
std::vector<storage::cpp2::EdgeProp> edgeProps;
// make edgeRefs and edgeProp
auto index = 0u;
DCHECK(edgeTypes_.size() == edgeNames_.size());
auto *pool = qctx_->objPool();
for (auto &name : edgeNames_) {
auto *edgeKeyRef = new EdgeKeyRef(EdgeSrcIdExpression::make(pool, name),
EdgeDstIdExpression::make(pool, name),
EdgeRankExpression::make(pool, name));
edgeKeyRef->setType(EdgeTypeExpression::make(pool, name));
qctx_->objPool()->add(edgeKeyRef);
edgeKeyRefs_.emplace_back(edgeKeyRef);

storage::cpp2::EdgeProp edgeProp;
edgeProp.set_type(edgeTypes_[index]);
edgeProp.props_ref().value().emplace_back(kSrc);
edgeProp.props_ref().value().emplace_back(kDst);
edgeProp.props_ref().value().emplace_back(kType);
edgeProp.props_ref().value().emplace_back(kRank);
edgeProps.emplace_back(edgeProp);

edgeProp.set_type(-edgeTypes_[index]);
edgeProps.emplace_back(std::move(edgeProp));
index++;
}

std::vector<storage::cpp2::EdgeProp> edgeProps;
// make edgeRefs and edgeProp
auto index = 0u;
DCHECK(edgeTypes_.size() == edgeNames_.size());
auto *pool = qctx_->objPool();
for (auto &name : edgeNames_) {
auto *edgeKeyRef = new EdgeKeyRef(EdgeSrcIdExpression::make(pool, name),
EdgeDstIdExpression::make(pool, name),
EdgeRankExpression::make(pool, name));
edgeKeyRef->setType(EdgeTypeExpression::make(pool, name));
qctx_->objPool()->add(edgeKeyRef);
edgeKeyRefs_.emplace_back(edgeKeyRef);

storage::cpp2::EdgeProp edgeProp;
edgeProp.set_type(edgeTypes_[index]);
edgeProp.props_ref().value().emplace_back(kSrc);
edgeProp.props_ref().value().emplace_back(kDst);
edgeProp.props_ref().value().emplace_back(kType);
edgeProp.props_ref().value().emplace_back(kRank);
edgeProps.emplace_back(edgeProp);

edgeProp.set_type(-edgeTypes_[index]);
edgeProps.emplace_back(std::move(edgeProp));
index++;
}

auto vertexPropsPtr = std::make_unique<std::vector<storage::cpp2::VertexProp>>();
auto edgePropsPtr = std::make_unique<std::vector<storage::cpp2::EdgeProp>>(edgeProps);
auto statPropsPtr = std::make_unique<std::vector<storage::cpp2::StatProp>>();
auto exprPtr = std::make_unique<std::vector<storage::cpp2::Expr>>();
auto *getNeighbors = GetNeighbors::make(qctx_,
dedupVid,
spaceId_,
vidRef_,
edgeTypes_,
storage::cpp2::EdgeDirection::BOTH,
nullptr,
std::move(edgePropsPtr),
std::move(statPropsPtr),
std::move(exprPtr));

auto *yieldColumns = pool->makeAndAdd<YieldColumns>();
yieldColumns->addColumn(new YieldColumn(EdgeSrcIdExpression::make(pool, "*"), kSrc));
yieldColumns->addColumn(new YieldColumn(EdgeTypeExpression::make(pool, "*"), kType));
yieldColumns->addColumn(new YieldColumn(EdgeRankExpression::make(pool, "*"), kRank));
yieldColumns->addColumn(new YieldColumn(EdgeDstIdExpression::make(pool, "*"), kDst));
auto *edgeKey = Project::make(qctx_, getNeighbors, yieldColumns);

auto *dedupEdgeKey = Dedup::make(qctx_, edgeKey);

// create deleteEdges node
auto *edgeKeyRef = pool->makeAndAdd<EdgeKeyRef>(InputPropertyExpression::make(pool, kSrc),
InputPropertyExpression::make(pool, kDst),
InputPropertyExpression::make(pool, kRank),
true);
edgeKeyRef->setType(InputPropertyExpression::make(pool, kType));
auto *deNode = DeleteEdges::make(qctx_, dedupEdgeKey, spaceId_, edgeKeyRef);

auto *dvNode = DeleteVertices::make(qctx_, deNode, spaceId_, vidRef_);

dvNode->setInputVar(dedupVid->outputVar());
auto vertexPropsPtr = std::make_unique<std::vector<storage::cpp2::VertexProp>>();
auto edgePropsPtr = std::make_unique<std::vector<storage::cpp2::EdgeProp>>(edgeProps);
auto statPropsPtr = std::make_unique<std::vector<storage::cpp2::StatProp>>();
auto exprPtr = std::make_unique<std::vector<storage::cpp2::Expr>>();
auto *getNeighbors = GetNeighbors::make(qctx_,
dedupVid,
spaceId_,
vidRef_,
edgeTypes_,
storage::cpp2::EdgeDirection::BOTH,
nullptr,
std::move(edgePropsPtr),
std::move(statPropsPtr),
std::move(exprPtr));

auto *yieldColumns = pool->makeAndAdd<YieldColumns>();
yieldColumns->addColumn(new YieldColumn(EdgeSrcIdExpression::make(pool, "*"), kSrc));
yieldColumns->addColumn(new YieldColumn(EdgeTypeExpression::make(pool, "*"), kType));
yieldColumns->addColumn(new YieldColumn(EdgeRankExpression::make(pool, "*"), kRank));
yieldColumns->addColumn(new YieldColumn(EdgeDstIdExpression::make(pool, "*"), kDst));
auto *edgeKey = Project::make(qctx_, getNeighbors, yieldColumns);

auto *dedupEdgeKey = Dedup::make(qctx_, edgeKey);

// create deleteEdges node
auto *edgeKeyRef = pool->makeAndAdd<EdgeKeyRef>(InputPropertyExpression::make(pool, kSrc),
InputPropertyExpression::make(pool, kDst),
InputPropertyExpression::make(pool, kRank),
true);
edgeKeyRef->setType(InputPropertyExpression::make(pool, kType));
auto *deNode = DeleteEdges::make(qctx_, dedupEdgeKey, spaceId_, edgeKeyRef);

dvNode = DeleteVertices::make(qctx_, deNode, spaceId_, vidRef_);
dvNode->setInputVar(dedupVid->outputVar());
} else {
dvNode = DeleteVertices::make(qctx_, dedupVid, spaceId_, vidRef_);
}
root_ = dvNode;
tail_ = dedupVid;
return Status::OK();
Expand All @@ -418,7 +423,6 @@ Status DeleteVerticesValidator::toPlan() {
Status DeleteTagsValidator::validateImpl() {
auto sentence = static_cast<DeleteTagsSentence *>(sentence_);
spaceId_ = vctx_->whichSpace().id;

if (sentence->vertices()->isRef()) {
vidRef_ = sentence->vertices()->ref();
auto type = deduceExprType(vidRef_);
Expand Down
1 change: 1 addition & 0 deletions src/graph/validator/MutateValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class DeleteVerticesValidator final : public Validator {
std::vector<EdgeType> edgeTypes_;
std::vector<std::string> edgeNames_;
std::vector<EdgeKeyRef*> edgeKeyRefs_;
bool withEdge_{true};
};

class DeleteTagsValidator final : public Validator {
Expand Down
26 changes: 23 additions & 3 deletions src/graph/validator/test/MutateValidatorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ TEST_F(MutateValidatorTest, InsertEdgeTest) {
TEST_F(MutateValidatorTest, DeleteVertexTest) {
// succeed
{
auto cmd = "DELETE VERTEX \"A\"";
auto cmd = "DELETE VERTEX \"A\" WITH EDGE";
std::vector<PlanNode::Kind> expected = {
PK::kDeleteVertices,
PK::kDeleteEdges,
Expand All @@ -73,9 +73,18 @@ TEST_F(MutateValidatorTest, DeleteVertexTest) {
};
ASSERT_TRUE(checkResult(cmd, expected));
}
{
auto cmd = "DELETE VERTEX \"A\"";
std::vector<PlanNode::Kind> expected = {
PK::kDeleteVertices,
PK::kDedup,
PK::kStart,
};
ASSERT_TRUE(checkResult(cmd, expected));
}
// pipe
{
auto cmd = "GO FROM \"C\" OVER like YIELD like._dst as dst | DELETE VERTEX $-.dst";
auto cmd = "GO FROM \"C\" OVER like YIELD like._dst as dst | DELETE VERTEX $-.dst WITH EDGE";
std::vector<PlanNode::Kind> expected = {
PK::kDeleteVertices,
PK::kDeleteEdges,
Expand All @@ -89,9 +98,20 @@ TEST_F(MutateValidatorTest, DeleteVertexTest) {
};
ASSERT_TRUE(checkResult(cmd, expected));
}
{
auto cmd = "GO FROM \"C\" OVER like YIELD like._dst as dst | DELETE VERTEX $-.dst";
std::vector<PlanNode::Kind> expected = {
PK::kDeleteVertices,
PK::kDedup,
PK::kProject,
PK::kGetNeighbors,
PK::kStart,
};
ASSERT_TRUE(checkResult(cmd, expected));
}
// pipe wrong input
{
auto cmd = "GO FROM \"C\" OVER E YIELD E._dst as dst | DELETE VERTEX $-.a";
auto cmd = "GO FROM \"C\" OVER E YIELD E._dst as dst | DELETE VERTEX $-.a WITH EDGE";
ASSERT_FALSE(checkResult(cmd));
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -565,9 +565,7 @@ struct HBReq {
4: optional map<common.GraphSpaceID, list<LeaderInfo>>
(cpp.template = "std::unordered_map") leader_partIds;
5: binary git_info_sha,
// version of binary
6: optional binary version,
7: optional map<common.GraphSpaceID, map<binary, PartitionList>
6: optional map<common.GraphSpaceID, map<binary, PartitionList>
(cpp.template = "std::unordered_map")>
(cpp.template = "std::unordered_map") disk_parts;
}
Expand Down Expand Up @@ -1113,6 +1111,7 @@ struct VerifyClientVersionResp {

struct VerifyClientVersionReq {
1: required binary version = common.version;
2: common.HostAddr host;
}

service MetaService {
Expand Down
22 changes: 0 additions & 22 deletions src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ struct HostInfo {
int64_t lastHBTimeInMilliSec_ = 0;
cpp2::HostRole role_{cpp2::HostRole::UNKNOWN};
std::string gitInfoSha_;
// version of binary
folly::Optional<std::string> version_;

static HostInfo decode(const folly::StringPiece& data) {
if (data.size() == sizeof(int64_t)) {
Expand Down Expand Up @@ -71,12 +69,6 @@ struct HostInfo {
if (!info.gitInfoSha_.empty()) {
encode.append(info.gitInfoSha_.data(), len);
}

if (info.version_.has_value()) {
len = info.version_.value().size();
encode.append(reinterpret_cast<const char*>(&len), sizeof(std::size_t));
encode.append(info.version_.value().data(), len);
}
return encode;
}

Expand Down Expand Up @@ -104,20 +96,6 @@ struct HostInfo {
}

info.gitInfoSha_ = std::string(data.data() + offset, len);
offset += len;

if (offset == data.size()) {
return info;
}

len = *reinterpret_cast<const size_t*>(data.data() + offset);
offset += sizeof(size_t);

if (offset + len > data.size()) {
FLOG_FATAL("decode out of range, offset=%zu, actual=%zu", offset, data.size());
}

info.version_ = std::string(data.data() + offset, len);
return info;
}
};
Expand Down
3 changes: 0 additions & 3 deletions src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ void HBProcessor::process(const cpp2::HBReq& req) {
}

HostInfo info(time::WallClock::fastNowInMilliSec(), req.get_role(), req.get_git_info_sha());
if (req.version_ref().has_value()) {
info.version_ = *req.version_ref();
}
if (req.leader_partIds_ref().has_value()) {
ret = ActiveHostsMan::updateHostInfo(kvstore_, host, info, &*req.leader_partIds_ref());
} else {
Expand Down
6 changes: 6 additions & 0 deletions src/meta/processors/admin/VerifyClientVersionProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ void VerifyClientVersionProcessor::process(const cpp2::VerifyClientVersionReq& r
req.get_version().c_str(),
FLAGS_client_white_list.c_str()));
} else {
auto host = req.get_host();
auto versionKey = MetaKeyUtils::versionKey(host);
auto versionVal = MetaKeyUtils::versionVal(req.get_version().c_str());
std::vector<kvstore::KV> versionData;
versionData.emplace_back(std::move(versionKey), std::move(versionVal));
doSyncPut(versionData);
resp_.set_code(nebula::cpp2::ErrorCode::SUCCEEDED);
}
onFinished();
Expand Down
Loading

0 comments on commit 828cc3e

Please sign in to comment.