From 9fbe1647e99f745626b5d75c62c725be10ef7535 Mon Sep 17 00:00:00 2001 From: Nivras <12605142+Nivras@users.noreply.github.com> Date: Wed, 15 Dec 2021 17:36:46 +0800 Subject: [PATCH 1/5] issue_3170: drop space after stop all AdminTaskJobs (#3406) --- src/kvstore/NebulaStore.cpp | 4 ++ src/kvstore/NebulaStore.h | 7 +++ src/storage/admin/AdminTask.h | 10 +++++ src/storage/admin/AdminTaskManager.cpp | 52 +++++++++++++++++++++- src/storage/admin/AdminTaskManager.h | 14 ++++++ src/storage/admin/RebuildEdgeIndexTask.cpp | 8 ++-- src/storage/admin/RebuildIndexTask.cpp | 7 ++- src/storage/admin/RebuildIndexTask.h | 7 ++- src/storage/admin/RebuildTagIndexTask.cpp | 8 ++-- src/storage/admin/StatsTask.cpp | 15 +++++++ src/storage/admin/StatsTask.h | 7 ++- 11 files changed, 125 insertions(+), 14 deletions(-) diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 6d6c841828c..11d05f36009 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -385,6 +385,10 @@ std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, void NebulaStore::removeSpace(GraphSpaceID spaceId, bool isListener) { folly::RWSpinLock::WriteHolder wh(&lock_); + if (beforeRemoveSpace_) { + beforeRemoveSpace_(spaceId); + } + if (!isListener) { auto spaceIt = this->spaces_.find(spaceId); if (spaceIt != this->spaces_.end()) { diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 384be9bd4b8..2e48f1e0407 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -285,6 +285,12 @@ class NebulaStore : public KVStore, public Handler { void unregisterOnNewPartAdded(const std::string& funcName) { onNewPartAdded_.erase(funcName); } + void registerBeforeRemoveSpace(std::function func) { + beforeRemoveSpace_ = func; + } + + void unregisterBeforeRemoveSpace() { beforeRemoveSpace_ = nullptr; } + private: void loadPartFromDataPath(); @@ -343,6 +349,7 @@ class NebulaStore : public KVStore, public Handler { std::shared_ptr diskMan_; folly::ConcurrentHashMap&)>> onNewPartAdded_; + std::function beforeRemoveSpace_{nullptr}; }; } // namespace kvstore diff --git a/src/storage/admin/AdminTask.h b/src/storage/admin/AdminTask.h index bbc99df3f1e..2e426192910 100644 --- a/src/storage/admin/AdminTask.h +++ b/src/storage/admin/AdminTask.h @@ -77,6 +77,7 @@ class AdminTask { ctx_.jobId_, ctx_.taskId_, apache::thrift::util::enumNameSafe(rc).c_str()); + running_ = false; nebula::meta::cpp2::StatsItem statsItem; ctx_.onFinish_(rc, statsItem); } @@ -85,6 +86,8 @@ class AdminTask { virtual int getTaskId() { return ctx_.taskId_; } + virtual GraphSpaceID getSpaceId() { return ctx_.parameters_.get_space_id(); } + virtual void setConcurrentReq(int concurrentReq) { if (concurrentReq > 0) { ctx_.concurrentReq_ = concurrentReq; @@ -102,20 +105,27 @@ class AdminTask { virtual void cancel() { FLOG_INFO("task(%d, %d) cancelled", ctx_.jobId_, ctx_.taskId_); + canceled_ = true; auto suc = nebula::cpp2::ErrorCode::SUCCEEDED; rc_.compare_exchange_strong(suc, nebula::cpp2::ErrorCode::E_USER_CANCEL); } + virtual bool isRunning() { return running_; } + + virtual bool isCanceled() { return canceled_; } + meta::cpp2::AdminCmd cmdType() { return ctx_.cmd_; } public: std::atomic unFinishedSubTask_; SubTaskQueue subtasks_; + std::atomic running_{false}; protected: StorageEnv* env_; TaskContext ctx_; std::atomic rc_{nebula::cpp2::ErrorCode::SUCCEEDED}; + std::atomic canceled_{false}; }; class AdminTaskFactory { diff --git a/src/storage/admin/AdminTaskManager.cpp b/src/storage/admin/AdminTaskManager.cpp index 5e444888d2e..c53a9655d27 100644 --- a/src/storage/admin/AdminTaskManager.cpp +++ b/src/storage/admin/AdminTaskManager.cpp @@ -23,6 +23,11 @@ bool AdminTaskManager::init() { auto threadFactory = std::make_shared("TaskManager"); pool_ = std::make_unique(FLAGS_max_concurrent_subtasks, threadFactory); bgThread_ = std::make_unique(); + if (env_ != nullptr) { + static_cast<::nebula::kvstore::NebulaStore*>(env_->kvstore_) + ->registerBeforeRemoveSpace( + [this](GraphSpaceID spaceId) { this->waitCancelTasks(spaceId); }); + } if (!bgThread_->start()) { LOG(ERROR) << "background thread start failed"; return false; @@ -112,7 +117,12 @@ void AdminTaskManager::handleUnreportedTasks() { jobId, taskId, fut.value().status().toString()); - ifAnyUnreported_ = true; + if (fut.value().status() == Status::Error("Space not existed!")) { + // space has been droped, remove the task status. + keys.emplace_back(key.data(), key.size()); + } else { + ifAnyUnreported_ = true; + } continue; } rc = fut.value().value(); @@ -249,6 +259,14 @@ void AdminTaskManager::schedule() { } auto task = it->second; + if (task->isCanceled()) { + LOG(INFO) << folly::sformat("job {} has been calceled", task->getJobId()); + task->finish(nebula::cpp2::ErrorCode::E_USER_CANCEL); + tasks_.erase(handle); + continue; + } + + task->running_ = true; auto errOrSubTasks = task->genSubTasks(); if (!nebula::ok(errOrSubTasks)) { LOG(ERROR) << folly::sformat( @@ -353,5 +371,37 @@ bool AdminTaskManager::isFinished(JobID jobID, TaskID taskID) { return iter->second->unFinishedSubTask_ == 0; } +void AdminTaskManager::cancelTasks(GraphSpaceID spaceId) { + auto it = tasks_.begin(); + while (it != tasks_.end()) { + if (it->second->getSpaceId() == spaceId) { + it->second->cancel(); + removeTaskStatus(it->second->getJobId(), it->second->getTaskId()); + FLOG_INFO("cancel task(%d, %d), spaceId: %d", it->first.first, it->first.second, spaceId); + } + ++it; + } +} + +int32_t AdminTaskManager::runningTaskCnt(GraphSpaceID spaceId) { + int32_t jobCnt = 0; + for (const auto& task : tasks_) { + auto taskSpaceId = task.second->getSpaceId(); + if (taskSpaceId == spaceId) { + if (task.second->isRunning()) { + jobCnt++; + } + } + } + return jobCnt; +} + +void AdminTaskManager::waitCancelTasks(GraphSpaceID spaceId) { + cancelTasks(spaceId); + while (runningTaskCnt(spaceId) != 0) { + usleep(1000 * 100); + } +} + } // namespace storage } // namespace nebula diff --git a/src/storage/admin/AdminTaskManager.h b/src/storage/admin/AdminTaskManager.h index 6138a52a108..25eff32b3ec 100644 --- a/src/storage/admin/AdminTaskManager.h +++ b/src/storage/admin/AdminTaskManager.h @@ -11,6 +11,7 @@ #include #include +#include "clients/meta/MetaClient.h" #include "common/base/Base.h" #include "interface/gen-cpp2/storage_types.h" #include "kvstore/NebulaStore.h" @@ -37,6 +38,12 @@ class AdminTaskManager { return &sAdminTaskManager; } + ~AdminTaskManager() { + if (metaClient_ != nullptr) { + metaClient_ = nullptr; + } + } + // Caller must make sure JobId + TaskId is unique void addAsyncTask(std::shared_ptr task); @@ -45,6 +52,10 @@ class AdminTaskManager { nebula::cpp2::ErrorCode cancelJob(JobID jobId); nebula::cpp2::ErrorCode cancelTask(JobID jobId, TaskID taskId = -1); + void cancelTasks(GraphSpaceID spaceId); + int32_t runningTaskCnt(GraphSpaceID spaceId); + void waitCancelTasks(GraphSpaceID spaceId); + bool init(); void shutdown(); @@ -67,6 +78,9 @@ class AdminTaskManager { nebula::cpp2::ErrorCode rc, const nebula::meta::cpp2::StatsItem& result); + protected: + meta::MetaClient* metaClient_{nullptr}; + private: void schedule(); void runSubTask(TaskHandle handle); diff --git a/src/storage/admin/RebuildEdgeIndexTask.cpp b/src/storage/admin/RebuildEdgeIndexTask.cpp index c8f454723da..0bb20fad91b 100644 --- a/src/storage/admin/RebuildEdgeIndexTask.cpp +++ b/src/storage/admin/RebuildEdgeIndexTask.cpp @@ -27,9 +27,9 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac PartitionID part, const IndexItems& items, kvstore::RateLimiter* rateLimiter) { - if (canceled_) { + if (UNLIKELY(canceled_)) { LOG(ERROR) << "Rebuild Edge Index is Canceled"; - return nebula::cpp2::ErrorCode::SUCCEEDED; + return nebula::cpp2::ErrorCode::E_USER_CANCEL; } auto vidSizeRet = env_->schemaMan_->getSpaceVidLen(space); @@ -64,9 +64,9 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac RowReaderWrapper reader; size_t batchSize = 0; while (iter && iter->valid()) { - if (canceled_) { + if (UNLIKELY(canceled_)) { LOG(ERROR) << "Rebuild Edge Index is Canceled"; - return nebula::cpp2::ErrorCode::SUCCEEDED; + return nebula::cpp2::ErrorCode::E_USER_CANCEL; } if (batchSize >= FLAGS_rebuild_index_batch_size) { diff --git a/src/storage/admin/RebuildIndexTask.cpp b/src/storage/admin/RebuildIndexTask.cpp index 195ee5eaaa0..63e19492eb4 100644 --- a/src/storage/admin/RebuildIndexTask.cpp +++ b/src/storage/admin/RebuildIndexTask.cpp @@ -118,10 +118,15 @@ nebula::cpp2::ErrorCode RebuildIndexTask::buildIndexOnOperations( GraphSpaceID space, PartitionID part, kvstore::RateLimiter* rateLimiter) { if (canceled_) { LOG(INFO) << folly::sformat("Rebuild index canceled, space={}, part={}", space, part); - return nebula::cpp2::ErrorCode::SUCCEEDED; + return nebula::cpp2::ErrorCode::E_USER_CANCEL; } while (true) { + if (UNLIKELY(canceled_)) { + LOG(INFO) << folly::sformat("Rebuild index canceled, space={}, part={}", space, part); + return nebula::cpp2::ErrorCode::E_USER_CANCEL; + } + std::unique_ptr operationIter; auto operationPrefix = OperationKeyUtils::operationPrefix(part); auto operationRet = env_->kvstore_->prefix(space, part, operationPrefix, &operationIter); diff --git a/src/storage/admin/RebuildIndexTask.h b/src/storage/admin/RebuildIndexTask.h index eb6ec9e24c9..27d8695fa5d 100644 --- a/src/storage/admin/RebuildIndexTask.h +++ b/src/storage/admin/RebuildIndexTask.h @@ -36,7 +36,11 @@ class RebuildIndexTask : public AdminTask { const IndexItems& items, kvstore::RateLimiter* rateLimiter) = 0; - void cancel() override { canceled_ = true; } + void cancel() override { + canceled_ = true; + auto suc = nebula::cpp2::ErrorCode::SUCCEEDED; + rc_.compare_exchange_strong(suc, nebula::cpp2::ErrorCode::E_USER_CANCEL); + } nebula::cpp2::ErrorCode buildIndexOnOperations(GraphSpaceID space, PartitionID part, @@ -59,7 +63,6 @@ class RebuildIndexTask : public AdminTask { nebula::cpp2::ErrorCode invoke(GraphSpaceID space, PartitionID part, const IndexItems& items); protected: - std::atomic canceled_{false}; GraphSpaceID space_; }; diff --git a/src/storage/admin/RebuildTagIndexTask.cpp b/src/storage/admin/RebuildTagIndexTask.cpp index 4368d707053..ff3336269a9 100644 --- a/src/storage/admin/RebuildTagIndexTask.cpp +++ b/src/storage/admin/RebuildTagIndexTask.cpp @@ -27,9 +27,9 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space PartitionID part, const IndexItems& items, kvstore::RateLimiter* rateLimiter) { - if (canceled_) { + if (UNLIKELY(canceled_)) { LOG(ERROR) << "Rebuild Tag Index is Canceled"; - return nebula::cpp2::ErrorCode::SUCCEEDED; + return nebula::cpp2::ErrorCode::E_USER_CANCEL; } auto vidSizeRet = env_->schemaMan_->getSpaceVidLen(space); @@ -64,9 +64,9 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space RowReaderWrapper reader; size_t batchSize = 0; while (iter && iter->valid()) { - if (canceled_) { + if (UNLIKELY(canceled_)) { LOG(ERROR) << "Rebuild Tag Index is Canceled"; - return nebula::cpp2::ErrorCode::SUCCEEDED; + return nebula::cpp2::ErrorCode::E_USER_CANCEL; } if (batchSize >= FLAGS_rebuild_index_batch_size) { diff --git a/src/storage/admin/StatsTask.cpp b/src/storage/admin/StatsTask.cpp index beeeaed347b..bed6150949d 100644 --- a/src/storage/admin/StatsTask.cpp +++ b/src/storage/admin/StatsTask.cpp @@ -73,6 +73,11 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId, PartitionID part, std::unordered_map tags, std::unordered_map edges) { + if (UNLIKELY(canceled_)) { + LOG(ERROR) << "Stats task is canceled"; + return nebula::cpp2::ErrorCode::E_USER_CANCEL; + } + auto vIdLenRet = env_->schemaMan_->getSpaceVidLen(spaceId); if (!vIdLenRet.ok()) { LOG(ERROR) << "Get space vid length failed"; @@ -140,6 +145,11 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId, // 2 5 // 3 1 while (vertexIter && vertexIter->valid()) { + if (UNLIKELY(canceled_)) { + LOG(ERROR) << "Stats task is canceled"; + return nebula::cpp2::ErrorCode::E_USER_CANCEL; + } + auto key = vertexIter->key(); auto vId = NebulaKeyUtils::getVertexId(vIdLen, key).str(); auto tagId = NebulaKeyUtils::getTagId(vIdLen, key); @@ -168,6 +178,11 @@ nebula::cpp2::ErrorCode StatsTask::genSubTask(GraphSpaceID spaceId, // 2 2 1 3 (invalid data, for example, edge data without edge // schema) 2 3 1 4 2 3 1 5 while (edgeIter && edgeIter->valid()) { + if (UNLIKELY(canceled_)) { + LOG(ERROR) << "Stats task is canceled"; + return nebula::cpp2::ErrorCode::E_USER_CANCEL; + } + auto key = edgeIter->key(); auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen, key); diff --git a/src/storage/admin/StatsTask.h b/src/storage/admin/StatsTask.h index 508aec7669a..672bcd2b27f 100644 --- a/src/storage/admin/StatsTask.h +++ b/src/storage/admin/StatsTask.h @@ -27,7 +27,11 @@ class StatsTask : public AdminTask { void finish(nebula::cpp2::ErrorCode rc) override; protected: - void cancel() override { canceled_ = true; } + void cancel() override { + canceled_ = true; + auto suc = nebula::cpp2::ErrorCode::SUCCEEDED; + rc_.compare_exchange_strong(suc, nebula::cpp2::ErrorCode::E_USER_CANCEL); + } nebula::cpp2::ErrorCode genSubTask(GraphSpaceID space, PartitionID part, @@ -38,7 +42,6 @@ class StatsTask : public AdminTask { nebula::cpp2::ErrorCode getSchemas(GraphSpaceID spaceId); protected: - std::atomic canceled_{false}; GraphSpaceID spaceId_; // All tagIds and tagName of the spaceId From 99f1f7a72d0578586a2711119d198bb08c6cfa6a Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Wed, 15 Dec 2021 18:41:17 +0800 Subject: [PATCH 2/5] Remove extra field (#3427) Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> --- src/clients/storage/StorageClientBase-inl.h | 1 - src/interface/storage.thrift | 3 +-- src/storage/exec/ScanNode.h | 6 ------ src/storage/query/ScanEdgeProcessor.cpp | 16 +++++++++------- src/storage/query/ScanVertexProcessor.cpp | 16 +++++++++------- src/storage/test/ScanEdgeTest.cpp | 9 +++++---- src/storage/test/ScanVertexTest.cpp | 9 +++++---- 7 files changed, 29 insertions(+), 31 deletions(-) diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index 79435d2252a..dd319fcc9f6 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -357,7 +357,6 @@ StorageClientBase::getHostPartsWithCursor(GraphSpaceID spaceId) cons // TODO support cursor cpp2::ScanCursor c; - c.set_has_next(false); auto parts = status.value(); for (auto partId = 1; partId <= parts; partId++) { auto leader = getLeader(spaceId, partId); diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 07c56e2b2fa..405114afc12 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -561,9 +561,8 @@ struct LookupAndTraverseRequest { */ struct ScanCursor { - 3: bool has_next, // next start key of scan, only valid when has_next is true - 4: optional binary next_cursor, + 1: optional binary next_cursor, } struct ScanVertexRequest { diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h index 22425dcb90b..6602752b5c2 100644 --- a/src/storage/exec/ScanNode.h +++ b/src/storage/exec/ScanNode.h @@ -92,10 +92,7 @@ class ScanVertexPropNode : public QueryNode { cpp2::ScanCursor c; if (iter->valid()) { - c.set_has_next(true); c.set_next_cursor(iter->key().str()); - } else { - c.set_has_next(false); } cursors_->emplace(partId, std::move(c)); return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -246,10 +243,7 @@ class ScanEdgePropNode : public QueryNode { cpp2::ScanCursor c; if (iter->valid()) { - c.set_has_next(true); c.set_next_cursor(iter->key().str()); - } else { - c.set_has_next(false); } cursors_->emplace(partId, std::move(c)); return nebula::cpp2::ErrorCode::SUCCEEDED; diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index 29b9d3cce33..7a8e99884ad 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -136,7 +136,8 @@ void ScanEdgeProcessor::runInSingleThread(const cpp2::ScanEdgeRequest& req) { auto partId = partEntry.first; auto cursor = partEntry.second; - auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : ""); + auto ret = plan.go( + partId, cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : ""); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED && failedParts.find(partId) == failedParts.end()) { failedParts.emplace(partId); @@ -158,12 +159,13 @@ void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) { size_t i = 0; std::vector>> futures; for (const auto& [partId, cursor] : req.get_parts()) { - futures.emplace_back(runInExecutor(&contexts_[i], - &results_[i], - &cursorsOfPart_[i], - partId, - cursor.get_has_next() ? *cursor.get_next_cursor() : "", - &expCtxs_[i])); + futures.emplace_back( + runInExecutor(&contexts_[i], + &results_[i], + &cursorsOfPart_[i], + partId, + cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "", + &expCtxs_[i])); i++; } diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index c8624a0d490..6c210f3c0dd 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -140,7 +140,8 @@ void ScanVertexProcessor::runInSingleThread(const cpp2::ScanVertexRequest& req) auto partId = partEntry.first; auto cursor = partEntry.second; - auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : ""); + auto ret = plan.go( + partId, cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : ""); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED && failedParts.find(partId) == failedParts.end()) { failedParts.emplace(partId); @@ -162,12 +163,13 @@ void ScanVertexProcessor::runInMultipleThread(const cpp2::ScanVertexRequest& req size_t i = 0; std::vector>> futures; for (const auto& [partId, cursor] : req.get_parts()) { - futures.emplace_back(runInExecutor(&contexts_[i], - &results_[i], - &cursorsOfPart_[i], - partId, - cursor.get_has_next() ? *cursor.get_next_cursor() : "", - &expCtxs_[i])); + futures.emplace_back( + runInExecutor(&contexts_[i], + &results_[i], + &cursorsOfPart_[i], + partId, + cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "", + &expCtxs_[i])); i++; } diff --git a/src/storage/test/ScanEdgeTest.cpp b/src/storage/test/ScanEdgeTest.cpp index 320361726e9..3b6b30cd6d0 100644 --- a/src/storage/test/ScanEdgeTest.cpp +++ b/src/storage/test/ScanEdgeTest.cpp @@ -28,8 +28,9 @@ cpp2::ScanEdgeRequest buildRequest( CHECK_EQ(partIds.size(), cursors.size()); std::unordered_map parts; for (std::size_t i = 0; i < partIds.size(); ++i) { - c.set_has_next(!cursors[i].empty()); - c.set_next_cursor(cursors[i]); + if (!cursors[i].empty()) { + c.set_next_cursor(cursors[i]); + } parts.emplace(partIds[i], c); } req.set_parts(std::move(parts)); @@ -168,7 +169,7 @@ TEST(ScanEdgeTest, CursorTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); checkResponse(*resp.props_ref(), edge, edge.second.size(), totalRowCount); - hasNext = resp.get_cursors().at(partId).get_has_next(); + hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value(); if (hasNext) { CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value()); cursor = *resp.get_cursors().at(partId).next_cursor_ref(); @@ -195,7 +196,7 @@ TEST(ScanEdgeTest, CursorTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); checkResponse(*resp.props_ref(), edge, edge.second.size(), totalRowCount); - hasNext = resp.get_cursors().at(partId).get_has_next(); + hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value(); if (hasNext) { CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value()); cursor = *resp.get_cursors().at(partId).next_cursor_ref(); diff --git a/src/storage/test/ScanVertexTest.cpp b/src/storage/test/ScanVertexTest.cpp index 09653b7d1bf..6f232ef25df 100644 --- a/src/storage/test/ScanVertexTest.cpp +++ b/src/storage/test/ScanVertexTest.cpp @@ -28,8 +28,9 @@ cpp2::ScanVertexRequest buildRequest( CHECK_EQ(partIds.size(), cursors.size()); std::unordered_map parts; for (std::size_t i = 0; i < partIds.size(); ++i) { - c.set_has_next(!cursors[i].empty()); - c.set_next_cursor(cursors[i]); + if (!cursors[i].empty()) { + c.set_next_cursor(cursors[i]); + } parts.emplace(partIds[i], c); } req.set_parts(std::move(parts)); @@ -183,7 +184,7 @@ TEST(ScanVertexTest, CursorTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); checkResponse(*resp.props_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); - hasNext = resp.get_cursors().at(partId).get_has_next(); + hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value(); if (hasNext) { CHECK(resp.get_cursors().at(partId).next_cursor_ref()); cursor = *resp.get_cursors().at(partId).next_cursor_ref(); @@ -209,7 +210,7 @@ TEST(ScanVertexTest, CursorTest) { ASSERT_EQ(0, resp.result.failed_parts.size()); checkResponse(*resp.props_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount); - hasNext = resp.get_cursors().at(partId).get_has_next(); + hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value(); if (hasNext) { CHECK(resp.get_cursors().at(partId).next_cursor_ref()); cursor = *resp.get_cursors().at(partId).next_cursor_ref(); From 75f111219e6d82acd87a2623d51a16d06e1b22ac Mon Sep 17 00:00:00 2001 From: Wey Gu <1651790+wey-gu@users.noreply.github.com> Date: Thu, 16 Dec 2021 11:50:14 +0800 Subject: [PATCH 3/5] Update pull_request_template.md (#3478) Removed some Chinese punctuation characters. --- .github/pull_request_template.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 13e059713cf..db27c4db9f9 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -12,16 +12,16 @@ #### Special notes for your reviewer, ex. impact of this fix, etc: -#### Additional context: +#### Additional context/ Design document: -#### Checklist: -- [ ] Documentation affected (Please add the label if documentation needs to be modified.) -- [ ] Incompatible (If it is incompatible, please describe it and add corresponding label.) -- [ ] Need to cherry-pick (If need to cherry-pick to some branches, please label the destination version(s).) +#### Checklist: +- [ ] Documentation affected (Please add the label if documentation needs to be modified.) +- [ ] Incompatibility (If it breaks the compatibility, please describe it and add the corresponding label.) +- [ ] If it's needed to cherry-pick (If cherry-pick to some branches is required, please label the destination version(s).) - [ ] Performance impacted: Consumes more CPU/Memory +#### Release notes: -#### Release notes: -Please confirm whether to reflect in release notes and how to describe: +Please confirm whether to be reflected in release notes and how to describe: > ` From f14aa0ed01d78e88d2edad8fbda4dd822e24ba49 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Thu, 16 Dec 2021 02:27:05 -0600 Subject: [PATCH 4/5] ignore existed index (#3392) * ignore existed index * remove a info log * fix memory leak --- src/clients/storage/StorageClient.cpp | 8 +- src/clients/storage/StorageClient.h | 6 +- src/clients/storage/StorageClientBase-inl.h | 1 - src/graph/executor/mutate/InsertExecutor.cpp | 12 ++- src/graph/planner/plan/Mutate.h | 37 ++++++-- src/graph/validator/MutateValidator.cpp | 12 ++- src/graph/validator/MutateValidator.h | 2 + src/interface/storage.thrift | 7 +- src/parser/MutateSentences.h | 18 +++- src/parser/parser.yy | 26 +++--- src/parser/scanner.lex | 3 +- src/parser/test/ParserTest.cpp | 28 +++++++ src/storage/mutate/AddEdgesProcessor.cpp | 31 ++++--- src/storage/mutate/AddEdgesProcessor.h | 1 + src/storage/mutate/AddVerticesProcessor.cpp | 27 +++--- src/storage/mutate/AddVerticesProcessor.h | 1 + src/tools/storage-perf/StoragePerfTool.cpp | 4 +- .../tck/features/insert/Insert.IntVid.feature | 84 +++++++++++++++++++ tests/tck/features/insert/Insert.feature | 84 +++++++++++++++++++ 19 files changed, 335 insertions(+), 57 deletions(-) diff --git a/src/clients/storage/StorageClient.cpp b/src/clients/storage/StorageClient.cpp index 5cc70ecde01..eca714f6977 100644 --- a/src/clients/storage/StorageClient.cpp +++ b/src/clients/storage/StorageClient.cpp @@ -112,7 +112,8 @@ StorageRpcRespFuture StorageClient::addVertices( const CommonRequestParam& param, std::vector vertices, std::unordered_map> propNames, - bool ifNotExists) { + bool ifNotExists, + bool ignoreExistedIndex) { auto cbStatus = getIdFromNewVertex(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( @@ -133,6 +134,7 @@ StorageRpcRespFuture StorageClient::addVertices( auto& req = requests[host]; req.set_space_id(param.space); req.set_if_not_exists(ifNotExists); + req.set_ignore_existed_index(ignoreExistedIndex); req.set_parts(std::move(c.second)); req.set_prop_names(propNames); req.set_common(common); @@ -149,7 +151,8 @@ StorageRpcRespFuture StorageClient::addVertices( StorageRpcRespFuture StorageClient::addEdges(const CommonRequestParam& param, std::vector edges, std::vector propNames, - bool ifNotExists) { + bool ifNotExists, + bool ignoreExistedIndex) { auto cbStatus = getIdFromNewEdge(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( @@ -170,6 +173,7 @@ StorageRpcRespFuture StorageClient::addEdges(const CommonReq auto& req = requests[host]; req.set_space_id(param.space); req.set_if_not_exists(ifNotExists); + req.set_ignore_existed_index(ignoreExistedIndex); req.set_parts(std::move(c.second)); req.set_prop_names(propNames); req.set_common(common); diff --git a/src/clients/storage/StorageClient.h b/src/clients/storage/StorageClient.h index 49c333fe715..ae2fd3d4079 100644 --- a/src/clients/storage/StorageClient.h +++ b/src/clients/storage/StorageClient.h @@ -81,12 +81,14 @@ class StorageClient : public StorageClientBase vertices, std::unordered_map> propNames, - bool ifNotExists); + bool ifNotExists, + bool ignoreExistedIndex); StorageRpcRespFuture addEdges(const CommonRequestParam& param, std::vector edges, std::vector propNames, - bool ifNotExists); + bool ifNotExists, + bool ignoreExistedIndex); StorageRpcRespFuture deleteEdges(const CommonRequestParam& param, std::vector edges); diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index dd319fcc9f6..a402bc71a88 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -242,7 +242,6 @@ void StorageClientBase::getResponseImpl( auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms); auto spaceId = request.second.get_space_id(); auto partsId = getReqPartsId(request.second); - LOG(INFO) << "Send request to storage " << host; remoteFunc(client.get(), request.second) .via(evb) .thenValue([spaceId, pro, this](Response&& resp) mutable { diff --git a/src/graph/executor/mutate/InsertExecutor.cpp b/src/graph/executor/mutate/InsertExecutor.cpp index bb8806ade8b..c3511332808 100644 --- a/src/graph/executor/mutate/InsertExecutor.cpp +++ b/src/graph/executor/mutate/InsertExecutor.cpp @@ -27,7 +27,11 @@ folly::Future InsertVerticesExecutor::insertVertices() { ivNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() - ->addVertices(param, ivNode->getVertices(), ivNode->getPropNames(), ivNode->getIfNotExists()) + ->addVertices(param, + ivNode->getVertices(), + ivNode->getPropNames(), + ivNode->getIfNotExists(), + ivNode->getIgnoreExistedIndex()) .via(runner()) .ensure([addVertTime]() { VLOG(1) << "Add vertices time: " << addVertTime.elapsedInUSec() << "us"; @@ -52,7 +56,11 @@ folly::Future InsertEdgesExecutor::insertEdges() { param.useExperimentalFeature = FLAGS_enable_experimental_feature; return qctx() ->getStorageClient() - ->addEdges(param, ieNode->getEdges(), ieNode->getPropNames(), ieNode->getIfNotExists()) + ->addEdges(param, + ieNode->getEdges(), + ieNode->getPropNames(), + ieNode->getIfNotExists(), + ieNode->getIgnoreExistedIndex()) .via(runner()) .ensure( [addEdgeTime]() { VLOG(1) << "Add edge time: " << addEdgeTime.elapsedInUSec() << "us"; }) diff --git a/src/graph/planner/plan/Mutate.h b/src/graph/planner/plan/Mutate.h index e144539f7ab..b845299f246 100644 --- a/src/graph/planner/plan/Mutate.h +++ b/src/graph/planner/plan/Mutate.h @@ -22,9 +22,15 @@ class InsertVertices final : public SingleDependencyNode { GraphSpaceID spaceId, std::vector vertices, std::unordered_map> tagPropNames, - bool ifNotExists) { - return qctx->objPool()->add(new InsertVertices( - qctx, input, spaceId, std::move(vertices), std::move(tagPropNames), ifNotExists)); + bool ifNotExists, + bool ignoreExistedIndex) { + return qctx->objPool()->add(new InsertVertices(qctx, + input, + spaceId, + std::move(vertices), + std::move(tagPropNames), + ifNotExists, + ignoreExistedIndex)); } std::unique_ptr explain() const override; @@ -39,24 +45,29 @@ class InsertVertices final : public SingleDependencyNode { bool getIfNotExists() const { return ifNotExists_; } + bool getIgnoreExistedIndex() const { return ignoreExistedIndex_; } + private: InsertVertices(QueryContext* qctx, PlanNode* input, GraphSpaceID spaceId, std::vector vertices, std::unordered_map> tagPropNames, - bool ifNotExists) + bool ifNotExists, + bool ignoreExistedIndex) : SingleDependencyNode(qctx, Kind::kInsertVertices, input), spaceId_(spaceId), vertices_(std::move(vertices)), tagPropNames_(std::move(tagPropNames)), - ifNotExists_(ifNotExists) {} + ifNotExists_(ifNotExists), + ignoreExistedIndex_(ignoreExistedIndex) {} private: GraphSpaceID spaceId_{-1}; std::vector vertices_; std::unordered_map> tagPropNames_; bool ifNotExists_{false}; + bool ignoreExistedIndex_{false}; }; class InsertEdges final : public SingleDependencyNode { @@ -67,9 +78,16 @@ class InsertEdges final : public SingleDependencyNode { std::vector edges, std::vector propNames, bool ifNotExists, + bool ignoreExistedIndex, bool useChainInsert = false) { - return qctx->objPool()->add(new InsertEdges( - qctx, input, spaceId, std::move(edges), std::move(propNames), ifNotExists, useChainInsert)); + return qctx->objPool()->add(new InsertEdges(qctx, + input, + spaceId, + std::move(edges), + std::move(propNames), + ifNotExists, + ignoreExistedIndex, + useChainInsert)); } std::unique_ptr explain() const override; @@ -80,6 +98,8 @@ class InsertEdges final : public SingleDependencyNode { bool getIfNotExists() const { return ifNotExists_; } + bool getIgnoreExistedIndex() const { return ignoreExistedIndex_; } + GraphSpaceID getSpace() const { return spaceId_; } bool useChainInsert() const { return useChainInsert_; } @@ -91,12 +111,14 @@ class InsertEdges final : public SingleDependencyNode { std::vector edges, std::vector propNames, bool ifNotExists, + bool ignoreExistedIndex, bool useChainInsert) : SingleDependencyNode(qctx, Kind::kInsertEdges, input), spaceId_(spaceId), edges_(std::move(edges)), propNames_(std::move(propNames)), ifNotExists_(ifNotExists), + ignoreExistedIndex_(ignoreExistedIndex), useChainInsert_(useChainInsert) {} private: @@ -104,6 +126,7 @@ class InsertEdges final : public SingleDependencyNode { std::vector edges_; std::vector propNames_; bool ifNotExists_{false}; + bool ignoreExistedIndex_{false}; // if this enabled, add edge request will only sent to // outbound edges. (toss) bool useChainInsert_{false}; diff --git a/src/graph/validator/MutateValidator.cpp b/src/graph/validator/MutateValidator.cpp index bc4d2aad968..f4ea352b20f 100644 --- a/src/graph/validator/MutateValidator.cpp +++ b/src/graph/validator/MutateValidator.cpp @@ -22,8 +22,13 @@ Status InsertVerticesValidator::validateImpl() { } Status InsertVerticesValidator::toPlan() { - auto doNode = InsertVertices::make( - qctx_, nullptr, spaceId_, std::move(vertices_), std::move(tagPropNames_), ifNotExists_); + auto doNode = InsertVertices::make(qctx_, + nullptr, + spaceId_, + std::move(vertices_), + std::move(tagPropNames_), + ifNotExists_, + ignoreExistedIndex_); root_ = doNode; tail_ = root_; return Status::OK(); @@ -32,6 +37,7 @@ Status InsertVerticesValidator::toPlan() { Status InsertVerticesValidator::check() { auto sentence = static_cast(sentence_); ifNotExists_ = sentence->isIfNotExists(); + ignoreExistedIndex_ = sentence->ignoreExistedIndex(); rows_ = sentence->rows(); if (rows_.empty()) { return Status::SemanticError("VALUES cannot be empty"); @@ -150,6 +156,7 @@ Status InsertEdgesValidator::toPlan() { std::move(edges_), std::move(entirePropNames_), ifNotExists_, + ignoreExistedIndex_, useChainInsert); root_ = doNode; tail_ = root_; @@ -159,6 +166,7 @@ Status InsertEdgesValidator::toPlan() { Status InsertEdgesValidator::check() { auto sentence = static_cast(sentence_); ifNotExists_ = sentence->isIfNotExists(); + ignoreExistedIndex_ = sentence->ignoreExistedIndex(); auto edgeStatus = qctx_->schemaMng()->toEdgeType(spaceId_, *sentence->edge()); NG_RETURN_IF_ERROR(edgeStatus); edgeType_ = edgeStatus.value(); diff --git a/src/graph/validator/MutateValidator.h b/src/graph/validator/MutateValidator.h index a6afd084825..13e52f6426a 100644 --- a/src/graph/validator/MutateValidator.h +++ b/src/graph/validator/MutateValidator.h @@ -35,6 +35,7 @@ class InsertVerticesValidator final : public Validator { std::vector> schemas_; uint16_t propSize_{0}; bool ifNotExists_{false}; + bool ignoreExistedIndex_{false}; std::vector vertices_; }; @@ -54,6 +55,7 @@ class InsertEdgesValidator final : public Validator { private: GraphSpaceID spaceId_{-1}; bool ifNotExists_{false}; + bool ignoreExistedIndex_{false}; EdgeType edgeType_{-1}; std::shared_ptr schema_; std::vector propNames_; diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 405114afc12..1cf3b675bd4 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -343,7 +343,8 @@ struct AddVerticesRequest { (cpp.template = "std::unordered_map") prop_names, // if true, when (vertexID,tagID) already exists, do nothing 4: bool if_not_exists, - 5: optional RequestCommon common, + 5: bool ignore_existed_index = false, + 6: optional RequestCommon common, } struct AddEdgesRequest { @@ -356,7 +357,9 @@ struct AddEdgesRequest { 3: list prop_names, // if true, when edge already exists, do nothing 4: bool if_not_exists, - 5: optional RequestCommon common, + // If true, existed index won't be removed + 5: bool ignore_existed_index = false, + 6: optional RequestCommon common, } /* diff --git a/src/parser/MutateSentences.h b/src/parser/MutateSentences.h index 094e3ea97bd..58758a65b44 100644 --- a/src/parser/MutateSentences.h +++ b/src/parser/MutateSentences.h @@ -136,10 +136,14 @@ class VertexRowList final { class InsertVerticesSentence final : public Sentence { public: - InsertVerticesSentence(VertexTagList *tagList, VertexRowList *rows, bool ifNotExists) { + InsertVerticesSentence(VertexTagList *tagList, + VertexRowList *rows, + bool ifNotExists, + bool ignoreExistedIndex) { tagList_.reset(tagList); rows_.reset(rows); ifNotExists_ = ifNotExists; + ignoreExistedIndex_ = ignoreExistedIndex; kind_ = Kind::kInsertVertices; } @@ -151,8 +155,11 @@ class InsertVerticesSentence final : public Sentence { bool isIfNotExists() const { return ifNotExists_; } + bool ignoreExistedIndex() const { return ignoreExistedIndex_; } + private: bool ifNotExists_{false}; + bool ignoreExistedIndex_{false}; std::unique_ptr tagList_; std::unique_ptr rows_; }; @@ -209,11 +216,15 @@ class EdgeRowList final { class InsertEdgesSentence final : public Sentence { public: - explicit InsertEdgesSentence(std::string *edge, EdgeRowList *rows, bool ifNotExists) + explicit InsertEdgesSentence(std::string *edge, + EdgeRowList *rows, + bool ifNotExists, + bool ignoreExistedIndex) : Sentence(Kind::kInsertEdges) { edge_.reset(edge); rows_.reset(rows); ifNotExists_ = ifNotExists; + ignoreExistedIndex_ = ignoreExistedIndex; } const std::string *edge() const { return edge_.get(); } @@ -231,6 +242,8 @@ class InsertEdgesSentence final : public Sentence { bool isIfNotExists() const { return ifNotExists_; } + bool ignoreExistedIndex() const { return ignoreExistedIndex_; } + void setDefaultPropNames() { isDefaultPropNames_ = true; } bool isDefaultPropNames() const { return isDefaultPropNames_; } @@ -240,6 +253,7 @@ class InsertEdgesSentence final : public Sentence { private: bool isDefaultPropNames_{false}; bool ifNotExists_{false}; + bool ignoreExistedIndex_{false}; std::unique_ptr edge_; std::unique_ptr properties_; std::unique_ptr rows_; diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 792039aaf72..2f092a03069 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -163,7 +163,7 @@ static constexpr size_t kCommentLengthLimit = 256; %token KW_BOOL KW_INT8 KW_INT16 KW_INT32 KW_INT64 KW_INT KW_FLOAT KW_DOUBLE %token KW_STRING KW_FIXED_STRING KW_TIMESTAMP KW_DATE KW_TIME KW_DATETIME %token KW_GO KW_AS KW_TO KW_USE KW_SET KW_FROM KW_WHERE KW_ALTER -%token KW_MATCH KW_INSERT KW_VALUE KW_VALUES KW_YIELD KW_RETURN KW_CREATE KW_VERTEX KW_VERTICES +%token KW_MATCH KW_INSERT KW_VALUE KW_VALUES KW_YIELD KW_RETURN KW_CREATE KW_VERTEX KW_VERTICES KW_IGNORE_EXISTED_INDEX %token KW_EDGE KW_EDGES KW_STEPS KW_OVER KW_UPTO KW_REVERSELY KW_SPACE KW_DELETE KW_FIND %token KW_TAG KW_TAGS KW_UNION KW_INTERSECT KW_MINUS %token KW_NO KW_OVERWRITE KW_IN KW_DESCRIBE KW_DESC KW_SHOW KW_HOST KW_HOSTS KW_PART KW_PARTS KW_ADD @@ -391,6 +391,7 @@ static constexpr size_t kCommentLengthLimit = 256; %type opt_if_not_exists %type opt_if_exists %type opt_with_properties +%type opt_ignore_existed_index %left QM COLON %left KW_OR KW_XOR @@ -2274,6 +2275,11 @@ opt_create_schema_prop_list } ; +opt_ignore_existed_index + : %empty { $$=false; } + | KW_IGNORE_EXISTED_INDEX { $$=true; } + ; + create_schema_prop_list : create_schema_prop_item { $$ = new SchemaPropList(); @@ -2782,8 +2788,8 @@ assignment_sentence ; insert_vertex_sentence - : KW_INSERT KW_VERTEX opt_if_not_exists vertex_tag_list KW_VALUES vertex_row_list { - $$ = new InsertVerticesSentence($4, $6, $3); + : KW_INSERT KW_VERTEX opt_if_not_exists opt_ignore_existed_index vertex_tag_list KW_VALUES vertex_row_list { + $$ = new InsertVerticesSentence($5, $7, $3, $4); } ; @@ -2860,19 +2866,19 @@ value_list ; insert_edge_sentence - : KW_INSERT KW_EDGE opt_if_not_exists name_label KW_VALUES edge_row_list { - auto sentence = new InsertEdgesSentence($4, $6, $3); + : KW_INSERT KW_EDGE opt_if_not_exists opt_ignore_existed_index name_label KW_VALUES edge_row_list { + auto sentence = new InsertEdgesSentence($5, $7, $3, $4); sentence->setDefaultPropNames(); $$ = sentence; } - | KW_INSERT KW_EDGE opt_if_not_exists name_label L_PAREN R_PAREN KW_VALUES edge_row_list { - auto sentence = new InsertEdgesSentence($4, $8, $3); + | KW_INSERT KW_EDGE opt_if_not_exists opt_ignore_existed_index name_label L_PAREN R_PAREN KW_VALUES edge_row_list { + auto sentence = new InsertEdgesSentence($5, $9, $3, $4); sentence->setProps(new PropertyList()); $$ = sentence; } - | KW_INSERT KW_EDGE opt_if_not_exists name_label L_PAREN prop_list R_PAREN KW_VALUES edge_row_list { - auto sentence = new InsertEdgesSentence($4, $9, $3); - sentence->setProps($6); + | KW_INSERT KW_EDGE opt_if_not_exists opt_ignore_existed_index name_label L_PAREN prop_list R_PAREN KW_VALUES edge_row_list { + auto sentence = new InsertEdgesSentence($5, $10, $3, $4); + sentence->setProps($7); $$ = sentence; } ; diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index 60563287143..1b023c16297 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -128,6 +128,7 @@ LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}* "IF" { return TokenType::KW_IF; } "NOT" { return TokenType::KW_NOT; } "EXISTS" { return TokenType::KW_EXISTS; } +"IGNORE_EXISTED_INDEX" { return TokenType::KW_IGNORE_EXISTED_INDEX; } "WITH" { return TokenType::KW_WITH; } "CHANGE" { return TokenType::KW_CHANGE; } "GRANT" { return TokenType::KW_GRANT; } @@ -250,7 +251,7 @@ LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}* "LISTENER" { return TokenType::KW_LISTENER; } "ELASTICSEARCH" { return TokenType::KW_ELASTICSEARCH; } "HTTP" { return TokenType::KW_HTTP; } -"HTTPS" { return TokenType::KW_HTTPS; } +"HTTPS" { return TokenType::KW_HTTPS; } "FULLTEXT" { return TokenType::KW_FULLTEXT; } "AUTO" { return TokenType::KW_AUTO; } "FUZZY" { return TokenType::KW_FUZZY; } diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index 9355c818263..21908cd2aec 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -833,6 +833,20 @@ TEST_F(ParserTest, InsertVertex) { auto result = parse(query); ASSERT_TRUE(result.ok()) << result.status(); } + { + std::string query = + "INSERT VERTEX IGNORE_EXISTED_INDEX person(name, age) " + "VALUES \"Tom\":(\"Tom\", 30)"; + auto result = parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + std::string query = + "INSERT VERTEX IF NOT EXISTS IGNORE_EXISTED_INDEX person(name, age) " + "VALUES \"Tom\":(\"Tom\", 30)"; + auto result = parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } // Test insert empty value { std::string query = @@ -1034,6 +1048,20 @@ TEST_F(ParserTest, InsertEdge) { auto result = parse(query); ASSERT_TRUE(result.ok()) << result.status(); } + { + std::string query = + "INSERT EDGE IGNORE_EXISTED_INDEX transfer(amount, time_) " + "VALUES \"12345\"->\"54321@1537408527\":(3.75, 1537408527)"; + auto result = parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + std::string query = + "INSERT EDGE IF NOT EXISTS IGNORE_EXISTED_INDEX transfer(amount, time_) " + "VALUES \"12345\"->\"54321@1537408527\":(3.75, 1537408527)"; + auto result = parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } } TEST_F(ParserTest, UpdateEdge) { diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index 0db34b3cfd5..7743309b1a4 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -48,6 +48,7 @@ void AddEdgesProcessor::process(const cpp2::AddEdgesRequest& req) { return; } indexes_ = std::move(iRet).value(); + ignoreExistedIndex_ = req.get_ignore_existed_index(); CHECK_NOTNULL(env_->kvstore_); @@ -218,21 +219,25 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { break; } if (*edgeKey.edge_type_ref() > 0) { + std::string oldVal; RowReaderWrapper nReader; RowReaderWrapper oReader; - auto obsIdx = findOldValue(partId, key); - if (nebula::ok(obsIdx)) { - // already exists in kvstore - if (ifNotExists_ && !nebula::value(obsIdx).empty()) { - continue; - } - if (!nebula::value(obsIdx).empty()) { - oReader = RowReaderWrapper::getEdgePropReader( - env_->schemaMan_, spaceId_, *edgeKey.edge_type_ref(), nebula::value(obsIdx)); + if (!ignoreExistedIndex_) { + auto obsIdx = findOldValue(partId, key); + if (nebula::ok(obsIdx)) { + // already exists in kvstore + if (ifNotExists_ && !nebula::value(obsIdx).empty()) { + continue; + } + if (!nebula::value(obsIdx).empty()) { + oldVal = std::move(value(obsIdx)); + oReader = RowReaderWrapper::getEdgePropReader( + env_->schemaMan_, spaceId_, *edgeKey.edge_type_ref(), oldVal); + } + } else { + code = nebula::error(obsIdx); + break; } - } else { - code = nebula::error(obsIdx); - break; } if (!retEnc.value().empty()) { nReader = RowReaderWrapper::getEdgePropReader( @@ -358,7 +363,7 @@ ErrorOr AddEdgesProcessor::addEdges( /* * step 1 , Delete old version index if exists. */ - if (val.empty()) { + if (!ignoreExistedIndex_ && val.empty()) { auto obsIdx = findOldValue(partId, e.first); if (!nebula::ok(obsIdx)) { return nebula::error(obsIdx); diff --git a/src/storage/mutate/AddEdgesProcessor.h b/src/storage/mutate/AddEdgesProcessor.h index dcb44d2a59b..d34f5718218 100644 --- a/src/storage/mutate/AddEdgesProcessor.h +++ b/src/storage/mutate/AddEdgesProcessor.h @@ -51,6 +51,7 @@ class AddEdgesProcessor : public BaseProcessor { GraphSpaceID spaceId_; std::vector> indexes_; bool ifNotExists_{false}; + bool ignoreExistedIndex_{false}; /// this is a hook function to keep out-edge and in-edge consist using ConsistOper = std::function*)>; diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index 055c4309538..e6dcc9fa71a 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -47,6 +47,7 @@ void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) { return; } indexes_ = std::move(iRet).value(); + ignoreExistedIndex_ = req.get_ignore_existed_index(); CHECK_NOTNULL(env_->kvstore_); if (indexes_.empty()) { @@ -189,18 +190,22 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re RowReaderWrapper nReader; RowReaderWrapper oReader; - auto obsIdx = findOldValue(partId, vid, tagId); - if (nebula::ok(obsIdx)) { - if (ifNotExists_ && !nebula::value(obsIdx).empty()) { - continue; - } - if (!nebula::value(obsIdx).empty()) { - oReader = RowReaderWrapper::getTagPropReader( - env_->schemaMan_, spaceId_, tagId, nebula::value(obsIdx)); + std::string oldVal; + if (!ignoreExistedIndex_) { + auto obsIdx = findOldValue(partId, vid, tagId); + if (nebula::ok(obsIdx)) { + if (ifNotExists_ && !nebula::value(obsIdx).empty()) { + continue; + } + if (!nebula::value(obsIdx).empty()) { + oldVal = std::move(value(obsIdx)); + oReader = + RowReaderWrapper::getTagPropReader(env_->schemaMan_, spaceId_, tagId, oldVal); + } + } else { + code = nebula::error(obsIdx); + break; } - } else { - code = nebula::error(obsIdx); - break; } WriteResult wRet; diff --git a/src/storage/mutate/AddVerticesProcessor.h b/src/storage/mutate/AddVerticesProcessor.h index 01047551433..ea8e34b56e4 100644 --- a/src/storage/mutate/AddVerticesProcessor.h +++ b/src/storage/mutate/AddVerticesProcessor.h @@ -47,6 +47,7 @@ class AddVerticesProcessor : public BaseProcessor { GraphSpaceID spaceId_; std::vector> indexes_; bool ifNotExists_{false}; + bool ignoreExistedIndex_{false}; }; } // namespace storage diff --git a/src/tools/storage-perf/StoragePerfTool.cpp b/src/tools/storage-perf/StoragePerfTool.cpp index dbb2c9ce7ef..8fb47a9dac9 100644 --- a/src/tools/storage-perf/StoragePerfTool.cpp +++ b/src/tools/storage-perf/StoragePerfTool.cpp @@ -333,7 +333,7 @@ class Perf { for (auto i = 0; i < tokens; i++) { auto start = time::WallClock::fastNowInMicroSec(); StorageClient::CommonRequestParam param(spaceId_, 0, 0); - storageClient_->addVertices(param, genVertices(), tagProps_, true) + storageClient_->addVertices(param, genVertices(), tagProps_, true, false) .via(evb) .thenValue([this, start](auto&& resps) { if (!resps.succeeded()) { @@ -362,7 +362,7 @@ class Perf { for (auto i = 0; i < tokens; i++) { auto start = time::WallClock::fastNowInMicroSec(); StorageClient::CommonRequestParam param(spaceId_, 0, 0); - storageClient_->addEdges(param, genEdges(), edgeProps_, true) + storageClient_->addEdges(param, genEdges(), edgeProps_, true, false) .via(evb) .thenValue([this, start](auto&& resps) { if (!resps.succeeded()) { diff --git a/tests/tck/features/insert/Insert.IntVid.feature b/tests/tck/features/insert/Insert.IntVid.feature index 1ee3326fa2e..46b61a93890 100644 --- a/tests/tck/features/insert/Insert.IntVid.feature +++ b/tests/tck/features/insert/Insert.IntVid.feature @@ -502,3 +502,87 @@ Feature: Insert int vid of vertex and edge # |$$.person.name| $$.person.age| schoolmate.likeness| # |'Mack' | 21 | 3 | Then drop the used space + + Scenario: int id ignore existed index + Given an empty graph + And create a space with following options: + | partition_num | 9 | + | replica_factor | 1 | + | vid_type | int | + And having executed: + """ + CREATE TAG person(id int); + CREATE TAG INDEX id_index ON person(id); + CREATE EDGE like(grade int); + CREATE EDGE INDEX grade_index ON like(grade); + """ + And wait 6 seconds + # test insert vertex + When try to execute query: + """ + INSERT VERTEX person(id) VALUES 100:(1), 200:(1) + """ + Then the execution should be successful + When executing query: + """ + LOOKUP ON person WHERE person.id == 1 YIELD id(vertex) as id + """ + Then the result should be, in any order: + | id | + | 100 | + | 200 | + When try to execute query: + """ + INSERT VERTEX IGNORE_EXISTED_INDEX person(id) VALUES 200:(2) + """ + Then the execution should be successful + When executing query: + """ + LOOKUP ON person WHERE person.id == 1 YIELD id(vertex) as id + """ + Then the result should be, in any order: + | id | + | 100 | + | 200 | + When executing query: + """ + LOOKUP ON person WHERE person.id == 2 YIELD id(vertex) as id + """ + Then the result should be, in any order: + | id | + | 200 | + # test insert edge + When try to execute query: + """ + INSERT EDGE like(grade) VALUES 100 -> 200:(666), 300 -> 400:(666); + """ + Then the execution should be successful + When executing query: + """ + LOOKUP ON like WHERE like.grade == 666 YIELD src(edge) as src, dst(edge) as dst + """ + Then the result should be, in any order: + | src | dst | + | 100 | 200 | + | 300 | 400 | + When try to execute query: + """ + INSERT EDGE IGNORE_EXISTED_INDEX like(grade) VALUES 300 -> 400:(888) + """ + Then the execution should be successful + When executing query: + """ + LOOKUP ON like WHERE like.grade == 666 YIELD src(edge) as src, dst(edge) as dst + """ + Then the result should be, in any order: + | src | dst | + | 100 | 200 | + | 300 | 400 | + When executing query: + """ + LOOKUP ON like WHERE like.grade == 888 YIELD src(edge) as src, dst(edge) as dst + """ + Then the result should be, in any order: + | src | dst | + | 300 | 400 | + Then drop the used space diff --git a/tests/tck/features/insert/Insert.feature b/tests/tck/features/insert/Insert.feature index c30210c8d54..096b96dcae6 100644 --- a/tests/tck/features/insert/Insert.feature +++ b/tests/tck/features/insert/Insert.feature @@ -531,3 +531,87 @@ Feature: Insert string vid of vertex and edge | student.name | student.age | | 'Tom' | 12 | Then drop the used space + + Scenario: string id ignore existed index + Given an empty graph + And create a space with following options: + | partition_num | 9 | + | replica_factor | 1 | + | vid_type | FIXED_STRING(10) | + And having executed: + """ + CREATE TAG person(id int); + CREATE TAG INDEX id_index ON person(id); + CREATE EDGE like(grade int); + CREATE EDGE INDEX grade_index ON like(grade); + """ + And wait 6 seconds + # test insert vertex + When try to execute query: + """ + INSERT VERTEX person(id) VALUES "100":(1), "200":(1) + """ + Then the execution should be successful + When executing query: + """ + LOOKUP ON person WHERE person.id == 1 YIELD id(vertex) as id + """ + Then the result should be, in any order: + | id | + | "100" | + | "200" | + When try to execute query: + """ + INSERT VERTEX IGNORE_EXISTED_INDEX person(id) VALUES "200":(2) + """ + Then the execution should be successful + When executing query: + """ + LOOKUP ON person WHERE person.id == 1 YIELD id(vertex) as id + """ + Then the result should be, in any order: + | id | + | "100" | + | "200" | + When executing query: + """ + LOOKUP ON person WHERE person.id == 2 YIELD id(vertex) as id + """ + Then the result should be, in any order: + | id | + | "200" | + # test insert edge + When try to execute query: + """ + INSERT EDGE like(grade) VALUES "100" -> "200":(666), "300" -> "400":(666); + """ + Then the execution should be successful + When executing query: + """ + LOOKUP ON like WHERE like.grade == 666 YIELD src(edge) as src, dst(edge) as dst + """ + Then the result should be, in any order: + | src | dst | + | "100" | "200" | + | "300" | "400" | + When try to execute query: + """ + INSERT EDGE IGNORE_EXISTED_INDEX like(grade) VALUES "300" -> "400":(888) + """ + Then the execution should be successful + When executing query: + """ + LOOKUP ON like WHERE like.grade == 666 YIELD src(edge) as src, dst(edge) as dst + """ + Then the result should be, in any order: + | src | dst | + | "100" | "200" | + | "300" | "400" | + When executing query: + """ + LOOKUP ON like WHERE like.grade == 888 YIELD src(edge) as src, dst(edge) as dst + """ + Then the result should be, in any order: + | src | dst | + | "300" | "400" | + Then drop the used space From 2beee553da7f2b98b1702f5fd16523c14f16eea9 Mon Sep 17 00:00:00 2001 From: "endy.li" <25311962+heroicNeZha@users.noreply.github.com> Date: Thu, 16 Dec 2021 22:15:46 +0800 Subject: [PATCH 5/5] save client version (#3483) --- src/clients/meta/MetaClient.cpp | 1 + src/interface/meta.thrift | 3 ++- .../processors/admin/VerifyClientVersionProcessor.cpp | 9 +++++---- src/meta/test/VerifyClientVersionTest.cpp | 3 ++- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 98504ab0782..89d53914aa6 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -3554,6 +3554,7 @@ bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId) Status MetaClient::verifyVersion() { auto req = cpp2::VerifyClientVersionReq(); + req.set_build_version(getOriginVersion()); req.set_host(options_.localHost_); folly::Promise> promise; auto future = promise.getFuture(); diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 6b3fe96a08c..353940088b8 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -1123,8 +1123,9 @@ struct VerifyClientVersionResp { struct VerifyClientVersionReq { - 1: required binary version = common.version; + 1: required binary client_version = common.version; 2: common.HostAddr host; + 3: binary build_version; } service MetaService { diff --git a/src/meta/processors/admin/VerifyClientVersionProcessor.cpp b/src/meta/processors/admin/VerifyClientVersionProcessor.cpp index d7328b5b145..84b84141fff 100644 --- a/src/meta/processors/admin/VerifyClientVersionProcessor.cpp +++ b/src/meta/processors/admin/VerifyClientVersionProcessor.cpp @@ -18,16 +18,17 @@ void VerifyClientVersionProcessor::process(const cpp2::VerifyClientVersionReq& r std::unordered_set whiteList; folly::splitTo( ":", FLAGS_client_white_list, std::inserter(whiteList, whiteList.begin())); - if (FLAGS_enable_client_white_list && whiteList.find(req.get_version()) == whiteList.end()) { + if (FLAGS_enable_client_white_list && + whiteList.find(req.get_client_version()) == whiteList.end()) { resp_.set_code(nebula::cpp2::ErrorCode::E_CLIENT_SERVER_INCOMPATIBLE); resp_.set_error_msg(folly::stringPrintf( "Meta client version(%s) is not accepted, current meta client white list: %s.", - req.get_version().c_str(), + req.get_client_version().c_str(), FLAGS_client_white_list.c_str())); } else { - auto host = req.get_host(); + const auto& host = req.get_host(); auto versionKey = MetaKeyUtils::versionKey(host); - auto versionVal = MetaKeyUtils::versionVal(req.get_version().c_str()); + auto versionVal = MetaKeyUtils::versionVal(req.get_build_version().c_str()); std::vector versionData; versionData.emplace_back(std::move(versionKey), std::move(versionVal)); doSyncPut(versionData); diff --git a/src/meta/test/VerifyClientVersionTest.cpp b/src/meta/test/VerifyClientVersionTest.cpp index 9f72f559e00..a72d6757ffa 100644 --- a/src/meta/test/VerifyClientVersionTest.cpp +++ b/src/meta/test/VerifyClientVersionTest.cpp @@ -20,7 +20,8 @@ TEST(VerifyClientVersionTest, VersionTest) { std::unique_ptr kv(MockCluster::initMetaKV(rootPath.path())); { auto req = cpp2::VerifyClientVersionReq(); - req.set_version("1.0.1"); + req.set_client_version("1.0.1"); + req.set_build_version("1.0.1-nightly"); auto* processor = VerifyClientVersionProcessor::instance(kv.get()); auto f = processor->getFuture(); processor->process(req);