Skip to content

Commit

Permalink
Merge branch 'master' into insert_vertex_only
Browse files Browse the repository at this point in the history
  • Loading branch information
cangfengzhs authored Dec 17, 2021
2 parents 430fd0b + 2beee55 commit 6cc7c72
Show file tree
Hide file tree
Showing 40 changed files with 506 additions and 115 deletions.
14 changes: 7 additions & 7 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
#### Special notes for your reviewer, ex. impact of this fix, etc:


#### Additional context:
#### Additional context/ Design document:


#### Checklist
- [ ] Documentation affected Please add the label if documentation needs to be modified.)
- [ ] Incompatible (If it is incompatible, please describe it and add corresponding label.)
- [ ] Need to cherry-pick If need to cherry-pick to some branches, please label the destination version(s).)
#### Checklist:
- [ ] Documentation affected (Please add the label if documentation needs to be modified.)
- [ ] Incompatibility (If it breaks the compatibility, please describe it and add the corresponding label.)
- [ ] If it's needed to cherry-pick (If cherry-pick to some branches is required, please label the destination version(s).)
- [ ] Performance impacted: Consumes more CPU/Memory

#### Release notes:

#### Release notes:
Please confirm whether to reflect in release notes and how to describe:
Please confirm whether to be reflected in release notes and how to describe:
> `
1 change: 1 addition & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3554,6 +3554,7 @@ bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId)

Status MetaClient::verifyVersion() {
auto req = cpp2::VerifyClientVersionReq();
req.set_build_version(getOriginVersion());
req.set_host(options_.localHost_);
folly::Promise<StatusOr<cpp2::VerifyClientVersionResp>> promise;
auto future = promise.getFuture();
Expand Down
8 changes: 6 additions & 2 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addVertices(
const CommonRequestParam& param,
std::vector<cpp2::NewVertex> vertices,
std::unordered_map<TagID, std::vector<std::string>> propNames,
bool ifNotExists) {
bool ifNotExists,
bool ignoreExistedIndex) {
auto cbStatus = getIdFromNewVertex(param.space);
if (!cbStatus.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ExecResponse>>(
Expand All @@ -133,6 +134,7 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addVertices(
auto& req = requests[host];
req.set_space_id(param.space);
req.set_if_not_exists(ifNotExists);
req.set_ignore_existed_index(ignoreExistedIndex);
req.set_parts(std::move(c.second));
req.set_prop_names(propNames);
req.set_common(common);
Expand All @@ -149,7 +151,8 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addVertices(
StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addEdges(const CommonRequestParam& param,
std::vector<cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists) {
bool ifNotExists,
bool ignoreExistedIndex) {
auto cbStatus = getIdFromNewEdge(param.space);
if (!cbStatus.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ExecResponse>>(
Expand All @@ -170,6 +173,7 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addEdges(const CommonReq
auto& req = requests[host];
req.set_space_id(param.space);
req.set_if_not_exists(ifNotExists);
req.set_ignore_existed_index(ignoreExistedIndex);
req.set_parts(std::move(c.second));
req.set_prop_names(propNames);
req.set_common(common);
Expand Down
6 changes: 4 additions & 2 deletions src/clients/storage/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@ class StorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsyncCli
const CommonRequestParam& param,
std::vector<cpp2::NewVertex> vertices,
std::unordered_map<TagID, std::vector<std::string>> propNames,
bool ifNotExists);
bool ifNotExists,
bool ignoreExistedIndex);

StorageRpcRespFuture<cpp2::ExecResponse> addEdges(const CommonRequestParam& param,
std::vector<cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists);
bool ifNotExists,
bool ignoreExistedIndex);

StorageRpcRespFuture<cpp2::ExecResponse> deleteEdges(const CommonRequestParam& param,
std::vector<storage::cpp2::EdgeKey> edges);
Expand Down
2 changes: 0 additions & 2 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ void StorageClientBase<ClientType>::getResponseImpl(
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
auto spaceId = request.second.get_space_id();
auto partsId = getReqPartsId(request.second);
LOG(INFO) << "Send request to storage " << host;
remoteFunc(client.get(), request.second)
.via(evb)
.thenValue([spaceId, pro, this](Response&& resp) mutable {
Expand Down Expand Up @@ -357,7 +356,6 @@ StorageClientBase<ClientType>::getHostPartsWithCursor(GraphSpaceID spaceId) cons

// TODO support cursor
cpp2::ScanCursor c;
c.set_has_next(false);
auto parts = status.value();
for (auto partId = 1; partId <= parts; partId++) {
auto leader = getLeader(spaceId, partId);
Expand Down
12 changes: 10 additions & 2 deletions src/graph/executor/mutate/InsertExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ folly::Future<Status> InsertVerticesExecutor::insertVertices() {
ivNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
return qctx()
->getStorageClient()
->addVertices(param, ivNode->getVertices(), ivNode->getPropNames(), ivNode->getIfNotExists())
->addVertices(param,
ivNode->getVertices(),
ivNode->getPropNames(),
ivNode->getIfNotExists(),
ivNode->getIgnoreExistedIndex())
.via(runner())
.ensure([addVertTime]() {
VLOG(1) << "Add vertices time: " << addVertTime.elapsedInUSec() << "us";
Expand All @@ -52,7 +56,11 @@ folly::Future<Status> InsertEdgesExecutor::insertEdges() {
param.useExperimentalFeature = FLAGS_enable_experimental_feature;
return qctx()
->getStorageClient()
->addEdges(param, ieNode->getEdges(), ieNode->getPropNames(), ieNode->getIfNotExists())
->addEdges(param,
ieNode->getEdges(),
ieNode->getPropNames(),
ieNode->getIfNotExists(),
ieNode->getIgnoreExistedIndex())
.via(runner())
.ensure(
[addEdgeTime]() { VLOG(1) << "Add edge time: " << addEdgeTime.elapsedInUSec() << "us"; })
Expand Down
37 changes: 30 additions & 7 deletions src/graph/planner/plan/Mutate.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ class InsertVertices final : public SingleDependencyNode {
GraphSpaceID spaceId,
std::vector<storage::cpp2::NewVertex> vertices,
std::unordered_map<TagID, std::vector<std::string>> tagPropNames,
bool ifNotExists) {
return qctx->objPool()->add(new InsertVertices(
qctx, input, spaceId, std::move(vertices), std::move(tagPropNames), ifNotExists));
bool ifNotExists,
bool ignoreExistedIndex) {
return qctx->objPool()->add(new InsertVertices(qctx,
input,
spaceId,
std::move(vertices),
std::move(tagPropNames),
ifNotExists,
ignoreExistedIndex));
}

std::unique_ptr<PlanNodeDescription> explain() const override;
Expand All @@ -39,24 +45,29 @@ class InsertVertices final : public SingleDependencyNode {

bool getIfNotExists() const { return ifNotExists_; }

bool getIgnoreExistedIndex() const { return ignoreExistedIndex_; }

private:
InsertVertices(QueryContext* qctx,
PlanNode* input,
GraphSpaceID spaceId,
std::vector<storage::cpp2::NewVertex> vertices,
std::unordered_map<TagID, std::vector<std::string>> tagPropNames,
bool ifNotExists)
bool ifNotExists,
bool ignoreExistedIndex)
: SingleDependencyNode(qctx, Kind::kInsertVertices, input),
spaceId_(spaceId),
vertices_(std::move(vertices)),
tagPropNames_(std::move(tagPropNames)),
ifNotExists_(ifNotExists) {}
ifNotExists_(ifNotExists),
ignoreExistedIndex_(ignoreExistedIndex) {}

private:
GraphSpaceID spaceId_{-1};
std::vector<storage::cpp2::NewVertex> vertices_;
std::unordered_map<TagID, std::vector<std::string>> tagPropNames_;
bool ifNotExists_{false};
bool ignoreExistedIndex_{false};
};

class InsertEdges final : public SingleDependencyNode {
Expand All @@ -67,9 +78,16 @@ class InsertEdges final : public SingleDependencyNode {
std::vector<storage::cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists,
bool ignoreExistedIndex,
bool useChainInsert = false) {
return qctx->objPool()->add(new InsertEdges(
qctx, input, spaceId, std::move(edges), std::move(propNames), ifNotExists, useChainInsert));
return qctx->objPool()->add(new InsertEdges(qctx,
input,
spaceId,
std::move(edges),
std::move(propNames),
ifNotExists,
ignoreExistedIndex,
useChainInsert));
}

std::unique_ptr<PlanNodeDescription> explain() const override;
Expand All @@ -80,6 +98,8 @@ class InsertEdges final : public SingleDependencyNode {

bool getIfNotExists() const { return ifNotExists_; }

bool getIgnoreExistedIndex() const { return ignoreExistedIndex_; }

GraphSpaceID getSpace() const { return spaceId_; }

bool useChainInsert() const { return useChainInsert_; }
Expand All @@ -91,19 +111,22 @@ class InsertEdges final : public SingleDependencyNode {
std::vector<storage::cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists,
bool ignoreExistedIndex,
bool useChainInsert)
: SingleDependencyNode(qctx, Kind::kInsertEdges, input),
spaceId_(spaceId),
edges_(std::move(edges)),
propNames_(std::move(propNames)),
ifNotExists_(ifNotExists),
ignoreExistedIndex_(ignoreExistedIndex),
useChainInsert_(useChainInsert) {}

private:
GraphSpaceID spaceId_{-1};
std::vector<storage::cpp2::NewEdge> edges_;
std::vector<std::string> propNames_;
bool ifNotExists_{false};
bool ignoreExistedIndex_{false};
// if this enabled, add edge request will only sent to
// outbound edges. (toss)
bool useChainInsert_{false};
Expand Down
12 changes: 10 additions & 2 deletions src/graph/validator/MutateValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ Status InsertVerticesValidator::validateImpl() {
}

Status InsertVerticesValidator::toPlan() {
auto doNode = InsertVertices::make(
qctx_, nullptr, spaceId_, std::move(vertices_), std::move(tagPropNames_), ifNotExists_);
auto doNode = InsertVertices::make(qctx_,
nullptr,
spaceId_,
std::move(vertices_),
std::move(tagPropNames_),
ifNotExists_,
ignoreExistedIndex_);
root_ = doNode;
tail_ = root_;
return Status::OK();
Expand All @@ -32,6 +37,7 @@ Status InsertVerticesValidator::toPlan() {
Status InsertVerticesValidator::check() {
auto sentence = static_cast<InsertVerticesSentence *>(sentence_);
ifNotExists_ = sentence->isIfNotExists();
ignoreExistedIndex_ = sentence->ignoreExistedIndex();
rows_ = sentence->rows();
if (rows_.empty()) {
return Status::SemanticError("VALUES cannot be empty");
Expand Down Expand Up @@ -180,6 +186,7 @@ Status InsertEdgesValidator::toPlan() {
std::move(edges_),
std::move(entirePropNames_),
ifNotExists_,
ignoreExistedIndex_,
useChainInsert);
root_ = doNode;
tail_ = root_;
Expand All @@ -189,6 +196,7 @@ Status InsertEdgesValidator::toPlan() {
Status InsertEdgesValidator::check() {
auto sentence = static_cast<InsertEdgesSentence *>(sentence_);
ifNotExists_ = sentence->isIfNotExists();
ignoreExistedIndex_ = sentence->ignoreExistedIndex();
auto edgeStatus = qctx_->schemaMng()->toEdgeType(spaceId_, *sentence->edge());
NG_RETURN_IF_ERROR(edgeStatus);
edgeType_ = edgeStatus.value();
Expand Down
2 changes: 2 additions & 0 deletions src/graph/validator/MutateValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class InsertVerticesValidator final : public Validator {
std::vector<std::pair<TagID, TagSchema>> schemas_;
uint16_t propSize_{0};
bool ifNotExists_{false};
bool ignoreExistedIndex_{false};
std::vector<storage::cpp2::NewVertex> vertices_;
};

Expand Down Expand Up @@ -68,6 +69,7 @@ class InsertEdgesValidator final : public Validator {
private:
GraphSpaceID spaceId_{-1};
bool ifNotExists_{false};
bool ignoreExistedIndex_{false};
EdgeType edgeType_{-1};
std::shared_ptr<const meta::SchemaProviderIf> schema_;
std::vector<std::string> propNames_;
Expand Down
3 changes: 2 additions & 1 deletion src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1123,8 +1123,9 @@ struct VerifyClientVersionResp {


struct VerifyClientVersionReq {
1: required binary version = common.version;
1: required binary client_version = common.version;
2: common.HostAddr host;
3: binary build_version;
}

service MetaService {
Expand Down
10 changes: 6 additions & 4 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ struct AddVerticesRequest {
(cpp.template = "std::unordered_map") prop_names,
// if true, when (vertexID,tagID) already exists, do nothing
4: bool if_not_exists,
5: optional RequestCommon common,
5: bool ignore_existed_index = false,
6: optional RequestCommon common,
}

struct AddEdgesRequest {
Expand All @@ -356,7 +357,9 @@ struct AddEdgesRequest {
3: list<binary> prop_names,
// if true, when edge already exists, do nothing
4: bool if_not_exists,
5: optional RequestCommon common,
// If true, existed index won't be removed
5: bool ignore_existed_index = false,
6: optional RequestCommon common,
}

/*
Expand Down Expand Up @@ -561,9 +564,8 @@ struct LookupAndTraverseRequest {
*/

struct ScanCursor {
3: bool has_next,
// next start key of scan, only valid when has_next is true
4: optional binary next_cursor,
1: optional binary next_cursor,
}

struct ScanVertexRequest {
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,

void NebulaStore::removeSpace(GraphSpaceID spaceId, bool isListener) {
folly::RWSpinLock::WriteHolder wh(&lock_);
if (beforeRemoveSpace_) {
beforeRemoveSpace_(spaceId);
}

if (!isListener) {
auto spaceIt = this->spaces_.find(spaceId);
if (spaceIt != this->spaces_.end()) {
Expand Down
7 changes: 7 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,12 @@ class NebulaStore : public KVStore, public Handler {

void unregisterOnNewPartAdded(const std::string& funcName) { onNewPartAdded_.erase(funcName); }

void registerBeforeRemoveSpace(std::function<void(GraphSpaceID)> func) {
beforeRemoveSpace_ = func;
}

void unregisterBeforeRemoveSpace() { beforeRemoveSpace_ = nullptr; }

private:
void loadPartFromDataPath();

Expand Down Expand Up @@ -343,6 +349,7 @@ class NebulaStore : public KVStore, public Handler {
std::shared_ptr<DiskManager> diskMan_;
folly::ConcurrentHashMap<std::string, std::function<void(std::shared_ptr<Part>&)>>
onNewPartAdded_;
std::function<void(GraphSpaceID)> beforeRemoveSpace_{nullptr};
};

} // namespace kvstore
Expand Down
9 changes: 5 additions & 4 deletions src/meta/processors/admin/VerifyClientVersionProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ void VerifyClientVersionProcessor::process(const cpp2::VerifyClientVersionReq& r
std::unordered_set<std::string> whiteList;
folly::splitTo<std::string>(
":", FLAGS_client_white_list, std::inserter(whiteList, whiteList.begin()));
if (FLAGS_enable_client_white_list && whiteList.find(req.get_version()) == whiteList.end()) {
if (FLAGS_enable_client_white_list &&
whiteList.find(req.get_client_version()) == whiteList.end()) {
resp_.set_code(nebula::cpp2::ErrorCode::E_CLIENT_SERVER_INCOMPATIBLE);
resp_.set_error_msg(folly::stringPrintf(
"Meta client version(%s) is not accepted, current meta client white list: %s.",
req.get_version().c_str(),
req.get_client_version().c_str(),
FLAGS_client_white_list.c_str()));
} else {
auto host = req.get_host();
const auto& host = req.get_host();
auto versionKey = MetaKeyUtils::versionKey(host);
auto versionVal = MetaKeyUtils::versionVal(req.get_version().c_str());
auto versionVal = MetaKeyUtils::versionVal(req.get_build_version().c_str());
std::vector<kvstore::KV> versionData;
versionData.emplace_back(std::move(versionKey), std::move(versionVal));
doSyncPut(versionData);
Expand Down
3 changes: 2 additions & 1 deletion src/meta/test/VerifyClientVersionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ TEST(VerifyClientVersionTest, VersionTest) {
std::unique_ptr<kvstore::KVStore> kv(MockCluster::initMetaKV(rootPath.path()));
{
auto req = cpp2::VerifyClientVersionReq();
req.set_version("1.0.1");
req.set_client_version("1.0.1");
req.set_build_version("1.0.1-nightly");
auto* processor = VerifyClientVersionProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
Expand Down
Loading

0 comments on commit 6cc7c72

Please sign in to comment.