From d44da49307b08967b1c200064eaadb031d80ffec Mon Sep 17 00:00:00 2001 From: nebula-bots <88429921+nebula-bots@users.noreply.github.com> Date: Thu, 17 Mar 2022 18:04:24 +0800 Subject: [PATCH] Clear space (#681) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What type of PR is this? - [ ] bug - [x] feature - [ ] enhancement ## What problem(s) does this PR solve? #### Issue(s) number: fix #3679 #### Description: ## How do you solve it? https://confluence.nebula-graph.io/pages/viewpage.action?pageId=40535032 ## Special notes for your reviewer, ex. impact of this fix, design document, etc: ## Checklist: Tests: - [x] Unit test(positive and negative cases) - [x] Function test - [ ] Performance test - [ ] N/A Affects: - [x] Documentation affected (Please add the label if documentation needs to be modified.) - [ ] Incompatibility (If it breaks the compatibility, please describe it and add the label.) - [x] If it's needed to cherry-pick (If cherry-pick to some branches is required, please label the destination version(s).) - [ ] Performance impacted: Consumes more CPU/Memory ## Release notes: Please confirm whether to be reflected in release notes and how to describe: > Added a clear space feature, which will clear space data and index data, but keep space schema and index schema. Migrated from vesoft-inc/nebula#3989 Co-authored-by: haifei.zhao <32253291+zhaohaifei@users.noreply.github.com> --- .linters/cpp/checkKeyword.py | 16 ++- src/clients/meta/MetaClient.cpp | 16 +++ src/clients/meta/MetaClient.h | 3 + src/common/plugin/fulltext/FTGraphAdapter.h | 3 + .../fulltext/elasticsearch/ESGraphAdapter.cpp | 41 +++++++ .../fulltext/elasticsearch/ESGraphAdapter.h | 14 +++ .../fulltext/test/FulltextPluginTest.cpp | 11 ++ src/graph/executor/Executor.cpp | 3 + src/graph/executor/admin/SpaceExecutor.cpp | 44 +++++++ src/graph/executor/admin/SpaceExecutor.h | 8 ++ src/graph/planner/plan/Admin.cpp | 7 ++ src/graph/planner/plan/Admin.h | 31 +++++ src/graph/planner/plan/PlanNode.cpp | 2 + src/graph/planner/plan/PlanNode.h | 1 + src/graph/service/PermissionCheck.cpp | 5 +- src/graph/util/FTIndexUtils.cpp | 14 +++ src/graph/util/FTIndexUtils.h | 4 + src/graph/validator/AdminValidator.cpp | 12 ++ src/graph/validator/AdminValidator.h | 12 ++ src/graph/validator/AuditCategory.cpp | 1 + src/graph/validator/Validator.cpp | 2 + src/interface/meta.thrift | 6 + src/interface/storage.thrift | 10 ++ src/kvstore/NebulaStore.cpp | 18 +++ src/kvstore/NebulaStore.h | 8 ++ src/kvstore/Part.h | 10 ++ src/kvstore/PartManager.h | 8 ++ src/meta/CMakeLists.txt | 1 + src/meta/MetaServiceHandler.cpp | 8 ++ src/meta/MetaServiceHandler.h | 2 + src/meta/processors/BaseProcessor-inl.h | 26 ++++ src/meta/processors/BaseProcessor.h | 9 ++ src/meta/processors/admin/AdminClient.cpp | 44 +++++++ src/meta/processors/admin/AdminClient.h | 10 ++ .../processors/admin/ClearSpaceProcessor.cpp | 82 +++++++++++++ .../processors/admin/ClearSpaceProcessor.h | 37 ++++++ .../processors/parts/ListPartsProcessor.cpp | 27 +---- .../processors/parts/ListPartsProcessor.h | 9 -- src/parser/AdminSentences.cpp | 4 + src/parser/AdminSentences.h | 27 +++++ src/parser/Sentence.h | 1 + src/parser/parser.yy | 11 +- src/parser/scanner.lex | 1 + src/storage/CMakeLists.txt | 1 + src/storage/StorageAdminServiceHandler.cpp | 7 ++ src/storage/StorageAdminServiceHandler.h | 2 + src/storage/admin/ClearSpaceProcessor.cpp | 32 +++++ src/storage/admin/ClearSpaceProcessor.h | 53 ++++++++ tests/conftest.py | 5 +- tests/tck/conftest.py | 58 +++++++-- tests/tck/features/mutate/ClearSpace.feature | 114 ++++++++++++++++++ 51 files changed, 827 insertions(+), 54 deletions(-) create mode 100644 src/meta/processors/admin/ClearSpaceProcessor.cpp create mode 100644 src/meta/processors/admin/ClearSpaceProcessor.h create mode 100644 src/storage/admin/ClearSpaceProcessor.cpp create mode 100644 src/storage/admin/ClearSpaceProcessor.h create mode 100644 tests/tck/features/mutate/ClearSpace.feature diff --git a/.linters/cpp/checkKeyword.py b/.linters/cpp/checkKeyword.py index 62e5ec1d9c4..e9201b79837 100755 --- a/.linters/cpp/checkKeyword.py +++ b/.linters/cpp/checkKeyword.py @@ -75,6 +75,7 @@ 'KW_ADD', 'KW_CREATE', 'KW_DROP', + 'KW_CLEAR', 'KW_REMOVE', 'KW_IF', 'KW_NOT', @@ -134,7 +135,8 @@ def get_unreserved_keyword(file_path): if flag == 1: if line.strip() == ';': break - unreserved_key_words.append(re.sub('\\s+[:|]\\s+(\\w+)\\s+.*', '\\1', line).strip()) + unreserved_key_words.append( + re.sub('\\s+[:|]\\s+(\\w+)\\s+.*', '\\1', line).strip()) continue parser_file.close() @@ -142,17 +144,19 @@ def get_unreserved_keyword(file_path): if __name__ == '__main__': - cmd = 'git diff --diff-filter=ACMRTUXB HEAD -p ' + SCANNER_FILE_PATH + '|grep "^+"|grep -v "^+++"|grep "KW_"' + cmd = 'git diff --diff-filter=ACMRTUXB HEAD -p ' + \ + SCANNER_FILE_PATH + '|grep "^+"|grep -v "^+++"|grep "KW_"' content = os.popen(cmd) - keywords=[] - for line in content.readlines(): - keyword = re.sub('.*(KW_\\w+)\s*;.*','\\1',line.strip()) + keywords = [] + for line in content.readlines(): + keyword = re.sub('.*(KW_\\w+)\s*;.*', '\\1', line.strip()) keywords.append(keyword) if len(keywords) == 0: exit(0) unreserved_key_words = get_unreserved_keyword(PARSER_FILE_PATH) - new_key_words = [word for word in keywords if word not in reserved_key_words] + new_key_words = [ + word for word in keywords if word not in reserved_key_words] if len(new_key_words) == 0: exit(0) result = [word for word in new_key_words if word not in unreserved_key_words] diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 902646e2651..d0316a7b0db 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -1357,6 +1357,22 @@ folly::Future> MetaClient::dropSpace(std::string name, const bool return future; } +folly::Future> MetaClient::clearSpace(std::string name, const bool ifExists) { + cpp2::ClearSpaceReq req; + req.space_name_ref() = std::move(name); + req.if_exists_ref() = ifExists; + folly::Promise> promise; + auto future = promise.getFuture(); + getResponse( + std::move(req), + [](auto client, auto request) { return client->future_clearSpace(request); }, + [](cpp2::ExecResp&& resp) -> bool { + return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED; + }, + std::move(promise)); + return future; +} + folly::Future>> MetaClient::listHosts(cpp2::ListHostType tp) { cpp2::ListHostsReq req; req.type_ref() = tp; diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index 14764347f7c..fe8e644858b 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -304,6 +304,9 @@ class MetaClient { folly::Future> dropSpace(std::string name, bool ifExists = false); + // clear space data, but keep the space schema. + folly::Future> clearSpace(std::string name, bool ifExists = false); + folly::Future>> listHosts( cpp2::ListHostType type = cpp2::ListHostType::ALLOC); diff --git a/src/common/plugin/fulltext/FTGraphAdapter.h b/src/common/plugin/fulltext/FTGraphAdapter.h index 42057d816b5..cf89ad82d08 100644 --- a/src/common/plugin/fulltext/FTGraphAdapter.h +++ b/src/common/plugin/fulltext/FTGraphAdapter.h @@ -47,6 +47,9 @@ class FTGraphAdapter { virtual StatusOr dropIndex(const HttpClient& client, const std::string& index) const = 0; + // Clear the fulltext index data and keep the index schema. + virtual StatusOr clearIndex(const HttpClient& client, const std::string& index) const = 0; + virtual StatusOr indexExists(const HttpClient& client, const std::string& index) const = 0; }; } // namespace plugin diff --git a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp b/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp index a347c8721d0..50b7685ece6 100644 --- a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp +++ b/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp @@ -196,6 +196,28 @@ std::string ESGraphAdapter::dropIndexCmd(const HttpClient& client, return os.str(); } +StatusOr ESGraphAdapter::clearIndex(const HttpClient& client, + const std::string& index) const { + // curl -H "Content-Type: application/json; charset=utf-8" + // -XPOST "http://127.0.0.1:9200/${index}/_delete_by_query?refresh&slices=5" + // -d '{"query": {"match_all":{}}}' + std::string cmd = clearIndexCmd(client, index); + auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); + if (!ret.ok() || ret.value().empty()) { + LOG(ERROR) << "Http POST Failed: " << cmd; + return Status::Error("command failed : %s", cmd.c_str()); + } + return clearCheck(ret.value()); +} + +std::string ESGraphAdapter::clearIndexCmd(const HttpClient& client, + const std::string& index) const noexcept { + std::stringstream os; + os << header() << XPOST << client.toString() << index << "/_delete_by_query?refresh&slices=5\"" + << " -d '{\"query\": {\"match_all\":{}}}'"; + return os.str(); +} + StatusOr ESGraphAdapter::indexExists(const HttpClient& client, const std::string& index) const { // curl -H "Content-Type: application/json; charset=utf-8" @@ -283,5 +305,24 @@ bool ESGraphAdapter::indexCheck(const std::string& ret) const { LOG(ERROR) << "error reason : " << ret; return false; } + +bool ESGraphAdapter::clearCheck(const std::string& ret) const { + try { + auto root = folly::parseJson(ret); + if (root.isArray()) { + return false; + } + auto result = root.find("failures"); + if (result != root.items().end() && result->second.isArray() && result->second.size() == 0) { + return true; + } + } catch (const std::exception& ex) { + LOG(ERROR) << "result error : " << ex.what(); + } + + LOG(ERROR) << "error reason : " << ret; + return false; +} + } // namespace plugin } // namespace nebula diff --git a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h b/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h index 24561c63751..beac919d59e 100644 --- a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h +++ b/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h @@ -21,6 +21,7 @@ class ESGraphAdapter final : public FTGraphAdapter { FRIEND_TEST(FulltextPluginTest, ESFuzzyTest); FRIEND_TEST(FulltextPluginTest, ESCreateIndexTest); FRIEND_TEST(FulltextPluginTest, ESDropIndexTest); + FRIEND_TEST(FulltextPluginTest, ESClearIndexTest); public: static std::unique_ptr kAdapter; @@ -53,6 +54,11 @@ class ESGraphAdapter final : public FTGraphAdapter { StatusOr dropIndex(const HttpClient& client, const std::string& index) const override; + // Clear the fulltext index data on es and keep the index schema. + // client: es client + // index: fulltext index name + StatusOr clearIndex(const HttpClient& client, const std::string& index) const override; + StatusOr indexExists(const HttpClient& client, const std::string& index) const override; private: @@ -88,12 +94,20 @@ class ESGraphAdapter final : public FTGraphAdapter { bool indexCheck(const std::string& ret) const; + // check the result + bool clearCheck(const std::string& ret) const; + std::string createIndexCmd(const HttpClient& client, const std::string& index, const std::string& indexTemplate = "") const noexcept; std::string dropIndexCmd(const HttpClient& client, const std::string& index) const noexcept; + // Encapsulates the clearIndex command. + // client: es client + // index: fulltext index name + std::string clearIndexCmd(const HttpClient& client, const std::string& index) const noexcept; + std::string indexExistsCmd(const HttpClient& client, const std::string& index) const noexcept; }; } // namespace plugin diff --git a/src/common/plugin/fulltext/test/FulltextPluginTest.cpp b/src/common/plugin/fulltext/test/FulltextPluginTest.cpp index 2c48d3e85b5..c17c5291f3d 100644 --- a/src/common/plugin/fulltext/test/FulltextPluginTest.cpp +++ b/src/common/plugin/fulltext/test/FulltextPluginTest.cpp @@ -64,6 +64,17 @@ TEST(FulltextPluginTest, ESDropIndexTest) { ASSERT_EQ(expected, ret); } +TEST(FulltextPluginTest, ESClearIndexTest) { + HostAddr localHost_{"127.0.0.1", 9200}; + HttpClient client(localHost_); + auto ret = ESGraphAdapter().clearIndexCmd(client, "test_index"); + auto expected = + "/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\"" + " -XPOST -k \"http://127.0.0.1:9200/test_index/_delete_by_query?refresh&slices=5\"" + " -d '{\"query\": {\"match_all\":{}}}'"; + ASSERT_EQ(expected, ret); +} + TEST(FulltextPluginTest, ESPutTest) { HostAddr localHost_{"127.0.0.1", 9200}; HttpClient hc(localHost_); diff --git a/src/graph/executor/Executor.cpp b/src/graph/executor/Executor.cpp index 254c44f2731..0939d680962 100644 --- a/src/graph/executor/Executor.cpp +++ b/src/graph/executor/Executor.cpp @@ -271,6 +271,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) { case PlanNode::Kind::kDropSpace: { return pool->add(new DropSpaceExecutor(node, qctx)); } + case PlanNode::Kind::kClearSpace: { + return pool->add(new ClearSpaceExecutor(node, qctx)); + } case PlanNode::Kind::kShowCreateSpace: { return pool->add(new ShowCreateSpaceExecutor(node, qctx)); } diff --git a/src/graph/executor/admin/SpaceExecutor.cpp b/src/graph/executor/admin/SpaceExecutor.cpp index 628723dc9a0..d08fca3b7e8 100644 --- a/src/graph/executor/admin/SpaceExecutor.cpp +++ b/src/graph/executor/admin/SpaceExecutor.cpp @@ -176,6 +176,50 @@ void DropSpaceExecutor::unRegisterSpaceLevelMetrics(const std::string &spaceName } } +folly::Future ClearSpaceExecutor::execute() { + SCOPED_TIMER(&execTime_); + + auto *csNode = asNode(node()); + + // prepare text search index. + std::vector ftIndexes; + auto spaceIdRet = qctx()->getMetaClient()->getSpaceIdByNameFromCache(csNode->getSpaceName()); + if (spaceIdRet.ok()) { + auto ftIndexesRet = qctx()->getMetaClient()->getFTIndexBySpaceFromCache(spaceIdRet.value()); + NG_RETURN_IF_ERROR(ftIndexesRet); + auto map = std::move(ftIndexesRet).value(); + auto get = [](const auto &ptr) { return ptr.first; }; + std::transform(map.begin(), map.end(), std::back_inserter(ftIndexes), get); + } else { + LOG(WARNING) << "Get space ID failed when prepare text index: " << csNode->getSpaceName(); + } + + return qctx() + ->getMetaClient() + ->clearSpace(csNode->getSpaceName(), csNode->getIfExists()) + .via(runner()) + .thenValue([this, csNode, spaceIdRet, ftIndexes = std::move(ftIndexes)](StatusOr resp) { + if (!resp.ok()) { + LOG(ERROR) << "Clear space `" << csNode->getSpaceName() << "' failed: " << resp.status(); + return resp.status(); + } + if (!ftIndexes.empty()) { + auto tsRet = FTIndexUtils::getTSClients(qctx()->getMetaClient()); + if (!tsRet.ok()) { + LOG(WARNING) << "Get text search clients failed"; + return Status::OK(); + } + for (const auto &ftindex : ftIndexes) { + auto ftRet = FTIndexUtils::clearTSIndex(std::move(tsRet).value(), ftindex); + if (!ftRet.ok()) { + LOG(WARNING) << "Clear fulltext index `" << ftindex << "' failed: " << ftRet.status(); + } + } + } + return Status::OK(); + }); +} + folly::Future ShowSpacesExecutor::execute() { SCOPED_TIMER(&execTime_); diff --git a/src/graph/executor/admin/SpaceExecutor.h b/src/graph/executor/admin/SpaceExecutor.h index 52f56bd2edf..98552765b76 100644 --- a/src/graph/executor/admin/SpaceExecutor.h +++ b/src/graph/executor/admin/SpaceExecutor.h @@ -45,6 +45,14 @@ class DropSpaceExecutor final : public Executor { folly::Future execute() override; }; +class ClearSpaceExecutor final : public Executor { + public: + ClearSpaceExecutor(const PlanNode *node, QueryContext *qctx) + : Executor("ClearSpaceExecutor", node, qctx) {} + + folly::Future execute() override; +}; + class ShowSpacesExecutor final : public Executor { public: ShowSpacesExecutor(const PlanNode *node, QueryContext *qctx) diff --git a/src/graph/planner/plan/Admin.cpp b/src/graph/planner/plan/Admin.cpp index bd67e4ee546..0f87f9d70bf 100644 --- a/src/graph/planner/plan/Admin.cpp +++ b/src/graph/planner/plan/Admin.cpp @@ -34,6 +34,13 @@ std::unique_ptr DropSpace::explain() const { return desc; } +std::unique_ptr ClearSpace::explain() const { + auto desc = SingleDependencyNode::explain(); + addDescription("spaceName", spaceName_, desc.get()); + addDescription("ifExists", util::toJson(ifExists_), desc.get()); + return desc; +} + std::unique_ptr DescSpace::explain() const { auto desc = SingleDependencyNode::explain(); addDescription("spaceName", spaceName_, desc.get()); diff --git a/src/graph/planner/plan/Admin.h b/src/graph/planner/plan/Admin.h index 63704edbdab..8e4947f5a4d 100644 --- a/src/graph/planner/plan/Admin.h +++ b/src/graph/planner/plan/Admin.h @@ -218,6 +218,37 @@ class DropSpace final : public SingleDependencyNode { bool ifExists_; }; +class ClearSpace final : public SingleDependencyNode { + public: + static ClearSpace* make(QueryContext* qctx, + PlanNode* input, + std::string spaceName, + bool ifExists) { + return qctx->objPool()->add(new ClearSpace(qctx, input, std::move(spaceName), ifExists)); + } + + std::unique_ptr explain() const override; + + const std::string& getSpaceName() const { + return spaceName_; + } + + bool getIfExists() const { + return ifExists_; + } + + private: + ClearSpace(QueryContext* qctx, PlanNode* input, std::string spaceName, bool ifExists) + : SingleDependencyNode(qctx, Kind::kClearSpace, input) { + spaceName_ = std::move(spaceName); + ifExists_ = ifExists; + } + + private: + std::string spaceName_; + bool ifExists_; +}; + class AlterSpace final : public SingleDependencyNode { public: static AlterSpace* make(QueryContext* qctx, diff --git a/src/graph/planner/plan/PlanNode.cpp b/src/graph/planner/plan/PlanNode.cpp index 5d74fcc4825..6c2bc109263 100644 --- a/src/graph/planner/plan/PlanNode.cpp +++ b/src/graph/planner/plan/PlanNode.cpp @@ -169,6 +169,8 @@ const char* PlanNode::toString(PlanNode::Kind kind) { return "ShowCreateEdgeIndex"; case Kind::kDropSpace: return "DropSpace"; + case Kind::kClearSpace: + return "ClearSpace"; case Kind::kDropTag: return "DropTag"; case Kind::kDropEdge: diff --git a/src/graph/planner/plan/PlanNode.h b/src/graph/planner/plan/PlanNode.h index 619636c4807..02e6080552f 100644 --- a/src/graph/planner/plan/PlanNode.h +++ b/src/graph/planner/plan/PlanNode.h @@ -96,6 +96,7 @@ class PlanNode { kShowCreateTag, kShowCreateEdge, kDropSpace, + kClearSpace, kDropTag, kDropEdge, kAlterSpace, diff --git a/src/graph/service/PermissionCheck.cpp b/src/graph/service/PermissionCheck.cpp index 4dc28ee1dcf..168dee97087 100644 --- a/src/graph/service/PermissionCheck.cpp +++ b/src/graph/service/PermissionCheck.cpp @@ -10,8 +10,8 @@ namespace graph { /** * Read space : kUse, kDescribeSpace - * Write space : kCreateSpace, kDropSpace, kCreateSnapshot, kDropSnapshot - * kBalance, kAdmin, kConfig, kIngest, kDownload + * Write space : kCreateSpace, kDropSpace, kClearSpace, kCreateSnapshot, + * kDropSnapshot, kBalance, kAdmin, kConfig, kIngest, kDownload * Read schema : kDescribeTag, kDescribeEdge, * kDescribeTagIndex, kDescribeEdgeIndex * Write schema : kCreateTag, kAlterTag, kCreateEdge, @@ -53,6 +53,7 @@ namespace graph { case Sentence::Kind::kAlterSpace: case Sentence::Kind::kCreateSpaceAs: case Sentence::Kind::kDropSpace: + case Sentence::Kind::kClearSpace: case Sentence::Kind::kCreateSnapshot: case Sentence::Kind::kDropSnapshot: case Sentence::Kind::kAddHosts: diff --git a/src/graph/util/FTIndexUtils.cpp b/src/graph/util/FTIndexUtils.cpp index c6d423c0c1f..685c60ede1a 100644 --- a/src/graph/util/FTIndexUtils.cpp +++ b/src/graph/util/FTIndexUtils.cpp @@ -76,6 +76,20 @@ StatusOr FTIndexUtils::dropTSIndex(const std::vector FTIndexUtils::clearTSIndex(const std::vector& tsClients, + const std::string& index) { + auto retryCnt = FLAGS_ft_request_retry_times; + while (--retryCnt > 0) { + auto ret = + nebula::plugin::ESGraphAdapter::kAdapter->clearIndex(randomFTClient(tsClients), index); + if (!ret.ok()) { + continue; + } + return std::move(ret).value(); + } + return Status::Error("clear fulltext index failed : %s", index.c_str()); +} + StatusOr FTIndexUtils::rewriteTSFilter( ObjectPool* pool, bool isEdge, diff --git a/src/graph/util/FTIndexUtils.h b/src/graph/util/FTIndexUtils.h index 55a6f1d235a..41f6b77185c 100644 --- a/src/graph/util/FTIndexUtils.h +++ b/src/graph/util/FTIndexUtils.h @@ -28,6 +28,10 @@ class FTIndexUtils final { static StatusOr dropTSIndex(const std::vector& tsClients, const std::string& index); + // Clears the full-text index data, but keeps the index schema + static StatusOr clearTSIndex(const std::vector& tsClients, + const std::string& index); + // Converts TextSearchExpression into a relational expresion that could be pushed down static StatusOr rewriteTSFilter( ObjectPool* pool, diff --git a/src/graph/validator/AdminValidator.cpp b/src/graph/validator/AdminValidator.cpp index f7ffd4260f6..d19b89546be 100644 --- a/src/graph/validator/AdminValidator.cpp +++ b/src/graph/validator/AdminValidator.cpp @@ -216,6 +216,18 @@ Status DropSpaceValidator::toPlan() { return Status::OK(); } +Status ClearSpaceValidator::validateImpl() { + return Status::OK(); +} + +Status ClearSpaceValidator::toPlan() { + auto sentence = static_cast(sentence_); + auto *doNode = ClearSpace::make(qctx_, nullptr, *sentence->spaceName(), sentence->isIfExists()); + root_ = doNode; + tail_ = root_; + return Status::OK(); +} + // Show the sentence to create this space. It's created from options of space, so maybe is different // from the origin sentence to create this space. Status ShowCreateSpaceValidator::validateImpl() { diff --git a/src/graph/validator/AdminValidator.h b/src/graph/validator/AdminValidator.h index f873c02e8d7..4881e1d4223 100644 --- a/src/graph/validator/AdminValidator.h +++ b/src/graph/validator/AdminValidator.h @@ -97,6 +97,18 @@ class DropSpaceValidator final : public Validator { Status toPlan() override; }; +class ClearSpaceValidator final : public Validator { + public: + ClearSpaceValidator(Sentence* sentence, QueryContext* context) : Validator(sentence, context) { + setNoSpaceRequired(); + } + + private: + Status validateImpl() override; + + Status toPlan() override; +}; + class ShowCreateSpaceValidator final : public Validator { public: ShowCreateSpaceValidator(Sentence* sentence, QueryContext* context) diff --git a/src/graph/validator/AuditCategory.cpp b/src/graph/validator/AuditCategory.cpp index c530076c760..5c8b9957146 100644 --- a/src/graph/validator/AuditCategory.cpp +++ b/src/graph/validator/AuditCategory.cpp @@ -37,6 +37,7 @@ std::string auditCategory(Sentence::Kind kind) { case Sentence::Kind::kDropSpace: case Sentence::Kind::kCreateFTIndex: case Sentence::Kind::kDropFTIndex: + case Sentence::Kind::kClearSpace: return "ddl"; case Sentence::Kind::kGo: case Sentence::Kind::kMatch: diff --git a/src/graph/validator/Validator.cpp b/src/graph/validator/Validator.cpp index e7e3c5eed4d..e8eea64ea22 100644 --- a/src/graph/validator/Validator.cpp +++ b/src/graph/validator/Validator.cpp @@ -278,6 +278,8 @@ std::unique_ptr Validator::makeValidator(Sentence* sentence, QueryCon return std::make_unique(sentence, context); case Sentence::Kind::kAlterSpace: return std::make_unique(sentence, context); + case Sentence::Kind::kClearSpace: + return std::make_unique(sentence, context); case Sentence::Kind::kUnknown: case Sentence::Kind::kReturn: { // nothing diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 3404fe9006c..1f1cc55ffdf 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -341,6 +341,11 @@ struct DropSpaceReq { 2: bool if_exists, } +struct ClearSpaceReq { + 1: binary space_name, + 2: bool if_exists, +} + struct ListSpacesReq { } @@ -1303,6 +1308,7 @@ struct SyncDataReq { service MetaService { ExecResp createSpace(1: CreateSpaceReq req); ExecResp dropSpace(1: DropSpaceReq req); + ExecResp clearSpace(1: ClearSpaceReq req); GetSpaceResp getSpace(1: GetSpaceReq req); ListSpacesResp listSpaces(1: ListSpacesReq req); ExecResp alterSpace(1: AlterSpaceReq req); diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index dc03ffc0f94..9292925ffc6 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -836,6 +836,14 @@ struct StopTaskResp { 1: common.ErrorCode code, } +struct ClearSpaceReq { + 1: common.GraphSpaceID space_id, +} + +struct ClearSpaceResp { + 1: common.ErrorCode code, +} + service StorageAdminService { // Interfaces for admin operations AdminExecResp transLeader(1: TransLeaderReq req); @@ -857,6 +865,8 @@ service StorageAdminService { AddTaskResp addAdminTask(1: AddTaskRequest req); StopTaskResp stopAdminTask(1: StopTaskRequest req); + + ClearSpaceResp clearSpace(1: ClearSpaceReq req); } diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 09c5568a01f..c82a094c8af 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -468,6 +468,24 @@ void NebulaStore::removeSpace(GraphSpaceID spaceId, bool isListener) { } } +nebula::cpp2::ErrorCode NebulaStore::clearSpace(GraphSpaceID spaceId) { + folly::RWSpinLock::ReadHolder rh(&lock_); + auto spaceIt = this->spaces_.find(spaceId); + if (spaceIt != this->spaces_.end()) { + for (auto& part : spaceIt->second->parts_) { + auto ret = part.second->cleanupSafely(); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "partition clear failed. space: " << spaceId << ", part: " << part.first; + return ret; + } + } + } else { + return nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND; + } + LOG(INFO) << "Space " << spaceId << " has been cleared!"; + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + void NebulaStore::removePart(GraphSpaceID spaceId, PartitionID partId) { folly::RWSpinLock::WriteHolder wh(&lock_); auto spaceIt = this->spaces_.find(spaceId); diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index c7123825155..65463c32537 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -581,6 +581,14 @@ class NebulaStore : public KVStore, public Handler { */ void removeSpace(GraphSpaceID spaceId, bool isListener) override; + /** + * @brief clear space data, but not remove the data dirs. + * + * @param spaceId space which will be cleared. + * @return + */ + nebula::cpp2::ErrorCode clearSpace(GraphSpaceID spaceId) override; + /** * @brief Remove a partition, called from part manager * diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index db5ca835a92..c49c6119c79 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -198,6 +198,16 @@ class Part : public raftex::RaftPart { reset(); } + /** + * @brief clean up data safely + * + * @return nebula::cpp2::ErrorCode + */ + nebula::cpp2::ErrorCode cleanupSafely() { + std::lock_guard g(raftLock_); + return cleanup(); + } + private: /** * Methods inherited from RaftPart diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index 52328f8ba81..aad98c65e8a 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -63,6 +63,14 @@ class Handler { */ virtual void removeSpace(GraphSpaceID spaceId, bool isListener = false) = 0; + /** + * @brief clear space data, but not remove the data dirs. + * + * @param spaceId space which will be cleared. + * @return + */ + virtual nebula::cpp2::ErrorCode clearSpace(GraphSpaceID spaceId) = 0; + /** * @brief Remove a partition * diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index c809634b223..eb39d890d70 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -42,6 +42,7 @@ nebula_add_library( processors/kv/SyncDataProcessor.cpp processors/admin/HBProcessor.cpp processors/admin/AgentHBProcessor.cpp + processors/admin/ClearSpaceProcessor.cpp processors/user/AuthenticationProcessor.cpp processors/admin/CreateSnapshotProcessor.cpp processors/admin/DropSnapshotProcessor.cpp diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index 8e2cc4754e8..dbe4722d88f 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -6,6 +6,8 @@ #include "meta/MetaServiceHandler.h" #include "common/utils/MetaKeyUtils.h" +#include "meta/processors/admin/AgentHBProcessor.h" +#include "meta/processors/admin/ClearSpaceProcessor.h" #include "meta/processors/admin/CreateBackupProcessor.h" #include "meta/processors/admin/CreateSnapshotProcessor.h" #include "meta/processors/admin/DropSnapshotProcessor.h" @@ -101,6 +103,12 @@ folly::Future MetaServiceHandler::future_dropSpace(const cpp2::D RETURN_FUTURE(processor); } +folly::Future MetaServiceHandler::future_clearSpace( + const cpp2::ClearSpaceReq& req) { + auto* processor = ClearSpaceProcessor::instance(kvstore_, adminClient_.get()); + RETURN_FUTURE(processor); +} + folly::Future MetaServiceHandler::future_listSpaces( const cpp2::ListSpacesReq& req) { auto* processor = ListSpacesProcessor::instance(kvstore_); diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index 2c59e84d723..38a5aa3f0a7 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -40,6 +40,8 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_dropSpace(const cpp2::DropSpaceReq& req) override; + folly::Future future_clearSpace(const cpp2::ClearSpaceReq& req) override; + folly::Future future_listSpaces(const cpp2::ListSpacesReq& req) override; folly::Future future_getSpace(const cpp2::GetSpaceReq& req) override; diff --git a/src/meta/processors/BaseProcessor-inl.h b/src/meta/processors/BaseProcessor-inl.h index 4107e265a40..e1acc91c3f1 100644 --- a/src/meta/processors/BaseProcessor-inl.h +++ b/src/meta/processors/BaseProcessor-inl.h @@ -588,6 +588,32 @@ Value BaseProcessor::variableValToValue(const std::string& varType, return Value::kNullValue; } +template +ErrorOr>> +BaseProcessor::getAllParts(GraphSpaceID spaceId) { + std::unordered_map> partHostsMap; + + const auto& prefix = MetaKeyUtils::partPrefix(spaceId); + auto ret = doPrefix(prefix); + if (!nebula::ok(ret)) { + auto retCode = nebula::error(ret); + LOG(ERROR) << "List Parts Failed, error: " << apache::thrift::util::enumNameSafe(retCode); + return retCode; + } + + auto iter = nebula::value(ret).get(); + while (iter->valid()) { + auto key = iter->key(); + PartitionID partId; + memcpy(&partId, key.data() + prefix.size(), sizeof(PartitionID)); + std::vector partHosts = MetaKeyUtils::parsePartVal(iter->val()); + partHostsMap.emplace(partId, std::move(partHosts)); + iter->next(); + } + + return partHostsMap; +} + } // namespace meta } // namespace nebula #endif diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index 038f46b4d57..bd4735e5776 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -448,6 +448,15 @@ class BaseProcessor { bool direct = false); Value variableValToValue(const std::string& varType, const std::string& varValue); + /** + * @brief Get all parts' distribution information of a space. + * + * @param spaceId + * @return ErrorOr>> map for part id -> peer hosts. + */ + ErrorOr>> + getAllParts(GraphSpaceID spaceId); protected: kvstore::KVStore* kvstore_ = nullptr; diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index 200994a1faf..c4e14620fd3 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -335,6 +335,50 @@ folly::Future AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID return fut; } +folly::Future AdminClient::clearSpace(GraphSpaceID spaceId, + const std::vector& hosts) { + folly::Promise promise; + auto f = promise.getFuture(); + + std::vector>> futures; + for (auto& host : hosts) { + folly::Promise> pro; + futures.emplace_back(pro.getFuture()); + + storage::cpp2::ClearSpaceReq req; + req.space_id_ref() = spaceId; + getResponseFromHost( + Utils::getAdminAddrFromStoreAddr(host), + std::move(req), + [](auto client, auto request) { return client->future_clearSpace(request); }, + [](auto&& resp) -> nebula::cpp2::ErrorCode { return resp.get_code(); }, + std::move(pro)); + } + + folly::collectAll(std::move(futures)) + .via(ioThreadPool_.get()) + .thenTry([pro = std::move(promise)](auto&& futureRet) mutable { + if (futureRet.hasException()) { + pro.setValue(nebula::cpp2::ErrorCode::E_RPC_FAILURE); + } else { + auto vec = std::move(futureRet).value(); + bool isAllOk = true; + for (auto& v : vec) { + auto resp = std::move(v).value(); + if (!resp.ok()) { + pro.setValue(nebula::cpp2::ErrorCode::E_RPC_FAILURE); + isAllOk = false; + break; + } + } + if (isAllOk) { + pro.setValue(nebula::cpp2::ErrorCode::SUCCEEDED); + } + } + }); + return f; +} + template folly::Future AdminClient::getResponseFromPart(const HostAddr& host, Request req, diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h index a49789c9eee..42df7710760 100644 --- a/src/meta/processors/admin/AdminClient.h +++ b/src/meta/processors/admin/AdminClient.h @@ -150,6 +150,16 @@ class AdminClient { */ virtual folly::Future checkPeers(GraphSpaceID spaceId, PartitionID partId); + /** + * @brief Clear space data in all corresponding storage hosts. + * + * @param spaceId space which will be cleared + * @param hosts storage admin service addresses + * @return folly::Future + */ + virtual folly::Future clearSpace(GraphSpaceID spaceId, + const std::vector& hosts); + /** * @brief Get the all partitions' leader distribution * diff --git a/src/meta/processors/admin/ClearSpaceProcessor.cpp b/src/meta/processors/admin/ClearSpaceProcessor.cpp new file mode 100644 index 00000000000..2b5f111ea9d --- /dev/null +++ b/src/meta/processors/admin/ClearSpaceProcessor.cpp @@ -0,0 +1,82 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "meta/processors/admin/ClearSpaceProcessor.h" + +namespace nebula { +namespace meta { + +void ClearSpaceProcessor::process(const cpp2::ClearSpaceReq& req) { + GraphSpaceID spaceId; + std::vector hosts; + { + folly::SharedMutex::ReadHolder holder(LockUtils::lock()); + + // 1. Fetch spaceId + const auto& spaceName = req.get_space_name(); + auto spaceRet = getSpaceId(spaceName); + if (!nebula::ok(spaceRet)) { + auto retCode = nebula::error(spaceRet); + if (retCode == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) { + if (req.get_if_exists()) { + retCode = nebula::cpp2::ErrorCode::SUCCEEDED; + } else { + LOG(WARNING) << "Clear space Failed, space " << spaceName << " not existed."; + } + } else { + LOG(ERROR) << "Clear space Failed, space " << spaceName + << " error: " << apache::thrift::util::enumNameSafe(retCode); + } + handleErrorCode(retCode); + onFinished(); + return; + } + spaceId = nebula::value(spaceRet); + + // 2. Fetch all parts info accroding the spaceId. + auto ret = getAllParts(spaceId); + if (!nebula::ok(ret)) { + handleErrorCode(nebula::error(ret)); + onFinished(); + return; + } + + // 3. Determine which hosts the space is distributed on. + std::unordered_set distributedOnHosts; + for (auto& partEntry : nebula::value(ret)) { + for (auto& host : partEntry.second) { + distributedOnHosts.insert(host); + } + } + + // 4. select the active hosts. + auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_); + if (!nebula::ok(activeHostsRet)) { + handleErrorCode(nebula::error(activeHostsRet)); + onFinished(); + return; + } + auto activeHosts = std::move(nebula::value(activeHostsRet)); + for (auto& host : distributedOnHosts) { + if (std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end()) { + hosts.push_back(host); + } + } + if (hosts.size() == 0) { + handleErrorCode(nebula::cpp2::ErrorCode::E_NO_HOSTS); + onFinished(); + return; + } + } + + // 5. Delete the space data on the corresponding hosts. + auto clearRet = adminClient_->clearSpace(spaceId, hosts).get(); + handleErrorCode(clearRet); + onFinished(); + return; +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/admin/ClearSpaceProcessor.h b/src/meta/processors/admin/ClearSpaceProcessor.h new file mode 100644 index 00000000000..b417562c12b --- /dev/null +++ b/src/meta/processors/admin/ClearSpaceProcessor.h @@ -0,0 +1,37 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef META_CLEARSPACEPROCESSOR_H_ +#define META_CLEARSPACEPROCESSOR_H_ + +#include "meta/processors/BaseProcessor.h" +#include "meta/processors/admin/AdminClient.h" + +namespace nebula { +namespace meta { + +/** + * @brief Clear space data and index data, but keep space schema and index schema. + */ +class ClearSpaceProcessor : public BaseProcessor { + public: + static ClearSpaceProcessor* instance(kvstore::KVStore* kvstore, AdminClient* adminClient) { + return new ClearSpaceProcessor(kvstore, adminClient); + } + + void process(const cpp2::ClearSpaceReq& req); + + private: + explicit ClearSpaceProcessor(kvstore::KVStore* kvstore, AdminClient* adminClient) + : BaseProcessor(kvstore), adminClient_(adminClient) {} + + private: + AdminClient* adminClient_{nullptr}; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_CLEARSPACEPROCESSOR_H_ diff --git a/src/meta/processors/parts/ListPartsProcessor.cpp b/src/meta/processors/parts/ListPartsProcessor.cpp index 0fa76e49bf8..3228d896d48 100644 --- a/src/meta/processors/parts/ListPartsProcessor.cpp +++ b/src/meta/processors/parts/ListPartsProcessor.cpp @@ -37,7 +37,7 @@ void ListPartsProcessor::process(const cpp2::ListPartsReq& req) { } } else { // Show all parts - auto ret = getAllParts(); + auto ret = getAllParts(spaceId_); if (!nebula::ok(ret)) { handleErrorCode(nebula::error(ret)); onFinished(); @@ -80,31 +80,6 @@ void ListPartsProcessor::process(const cpp2::ListPartsReq& req) { onFinished(); } -ErrorOr>> -ListPartsProcessor::getAllParts() { - std::unordered_map> partHostsMap; - - const auto& prefix = MetaKeyUtils::partPrefix(spaceId_); - auto ret = doPrefix(prefix); - if (!nebula::ok(ret)) { - auto retCode = nebula::error(ret); - LOG(INFO) << "List Parts Failed, error: " << apache::thrift::util::enumNameSafe(retCode); - return retCode; - } - - auto iter = nebula::value(ret).get(); - while (iter->valid()) { - auto key = iter->key(); - PartitionID partId; - memcpy(&partId, key.data() + prefix.size(), sizeof(PartitionID)); - std::vector partHosts = MetaKeyUtils::parsePartVal(iter->val()); - partHostsMap.emplace(partId, std::move(partHosts)); - iter->next(); - } - - return partHostsMap; -} - nebula::cpp2::ErrorCode ListPartsProcessor::getLeaderDist(std::vector& partItems) { auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_); if (!nebula::ok(activeHostsRet)) { diff --git a/src/meta/processors/parts/ListPartsProcessor.h b/src/meta/processors/parts/ListPartsProcessor.h index 4a3abc3d304..a5dc33e2dcb 100644 --- a/src/meta/processors/parts/ListPartsProcessor.h +++ b/src/meta/processors/parts/ListPartsProcessor.h @@ -29,15 +29,6 @@ class ListPartsProcessor : public BaseProcessor { explicit ListPartsProcessor(kvstore::KVStore* kvstore) : BaseProcessor(kvstore) {} - /** - * @brief Get all parts' distribution information. - * - * @return ErrorOr>> map for part id -> peer hosts. - */ - ErrorOr>> - getAllParts(); - /** * @brief Fill the given partItems with leader distribution. * diff --git a/src/parser/AdminSentences.cpp b/src/parser/AdminSentences.cpp index b8fcbd65d49..6b6ce622443 100644 --- a/src/parser/AdminSentences.cpp +++ b/src/parser/AdminSentences.cpp @@ -124,6 +124,10 @@ std::string DropSpaceSentence::toString() const { return folly::stringPrintf("DROP SPACE %s", spaceName_.get()->c_str()); } +std::string ClearSpaceSentence::toString() const { + return folly::stringPrintf("CLEAR SPACE %s", spaceName_.get()->c_str()); +} + std::string AlterSpaceSentence::toString() const { std::string zones = paras_.front(); for (size_t i = 1; i < paras_.size(); i++) { diff --git a/src/parser/AdminSentences.h b/src/parser/AdminSentences.h index 77b51c9b476..756babde173 100644 --- a/src/parser/AdminSentences.h +++ b/src/parser/AdminSentences.h @@ -423,6 +423,33 @@ class DropSpaceSentence final : public DropSentence { std::unique_ptr clusterName_; }; +// clear space data and index data, but keep space schema and index schema. +class ClearSpaceSentence final : public DropSentence { + public: + ClearSpaceSentence(std::string* spaceName, bool ifExist) : DropSentence(ifExist) { + spaceName_.reset(spaceName); + kind_ = Kind::kClearSpace; + } + + void setClusterName(std::string* clusterName) { + clusterName_.reset(clusterName); + } + + const std::string* spaceName() const { + return spaceName_.get(); + } + + const std::string* clusterName() const { + return clusterName_.get(); + } + + std::string toString() const override; + + private: + std::unique_ptr spaceName_; + std::unique_ptr clusterName_; +}; + class AlterSpaceSentence final : public Sentence { public: AlterSpaceSentence(std::string* spaceName, meta::cpp2::AlterSpaceOp op) diff --git a/src/parser/Sentence.h b/src/parser/Sentence.h index 25239d1ad1d..b2794140bc5 100644 --- a/src/parser/Sentence.h +++ b/src/parser/Sentence.h @@ -143,6 +143,7 @@ class Sentence { kSetVariable, kGetVariable, kAlterSpace, + kClearSpace, }; Kind kind() const { diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 8706a89be60..bc7997e9fda 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -176,7 +176,7 @@ static constexpr size_t kCommentLengthLimit = 256; %token KW_PARTITION_NUM KW_REPLICA_FACTOR KW_CHARSET KW_COLLATE KW_COLLATION KW_VID_TYPE %token KW_ATOMIC_EDGE %token KW_COMMENT KW_S2_MAX_LEVEL KW_S2_MAX_CELLS -%token KW_DROP KW_REMOVE KW_SPACES KW_INGEST KW_INDEX KW_INDEXES +%token KW_DROP KW_CLEAR KW_REMOVE KW_SPACES KW_INGEST KW_INDEX KW_INDEXES %token KW_IF KW_NOT KW_EXISTS KW_WITH %token KW_BY KW_DOWNLOAD KW_HDFS KW_UUID KW_CONFIGS KW_FORCE %token KW_GET KW_DECLARE KW_GRAPH KW_META KW_STORAGE KW_AGENT @@ -361,7 +361,7 @@ static constexpr size_t kCommentLengthLimit = 256; %type query_unique_identifier %type maintain_sentence -%type create_space_sentence describe_space_sentence drop_space_sentence alter_space_sentence +%type create_space_sentence describe_space_sentence drop_space_sentence clear_space_sentence alter_space_sentence %type create_tag_sentence create_edge_sentence %type alter_tag_sentence alter_edge_sentence %type drop_tag_sentence drop_edge_sentence @@ -3651,6 +3651,12 @@ drop_space_sentence } ; +clear_space_sentence + : KW_CLEAR KW_SPACE opt_if_exists name_label { + $$ = new ClearSpaceSentence($4, $3); + } + ; + // User manager sentences. create_user_sentence @@ -3957,6 +3963,7 @@ maintain_sentence | describe_space_sentence { $$ = $1; } | alter_space_sentence { $$ = $1; } | drop_space_sentence { $$ = $1; } + | clear_space_sentence { $$ = $1; } | create_tag_sentence { $$ = $1; } | create_edge_sentence { $$ = $1; } | alter_tag_sentence { $$ = $1; } diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index fcfb1f391ac..d0a2e5eb550 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -124,6 +124,7 @@ LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}* "ADD" { return TokenType::KW_ADD; } "CREATE" { return TokenType::KW_CREATE;} "DROP" { return TokenType::KW_DROP; } +"CLEAR" { return TokenType::KW_CLEAR; } "REMOVE" { return TokenType::KW_REMOVE; } "IF" { return TokenType::KW_IF; } "NOT" { return TokenType::KW_NOT; } diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index c0e3df15995..256fe4c9e74 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -22,6 +22,7 @@ nebula_add_library( admin/RebuildFTIndexTask.cpp admin/StatsTask.cpp admin/GetLeaderProcessor.cpp + admin/ClearSpaceProcessor.cpp ) nebula_add_library( diff --git a/src/storage/StorageAdminServiceHandler.cpp b/src/storage/StorageAdminServiceHandler.cpp index ba05566e9bb..f96818eeec7 100644 --- a/src/storage/StorageAdminServiceHandler.cpp +++ b/src/storage/StorageAdminServiceHandler.cpp @@ -7,6 +7,7 @@ #include "storage/admin/AdminProcessor.h" #include "storage/admin/AdminTaskProcessor.h" +#include "storage/admin/ClearSpaceProcessor.h" #include "storage/admin/CreateCheckpointProcessor.h" #include "storage/admin/DropCheckpointProcessor.h" #include "storage/admin/GetLeaderProcessor.h" @@ -99,5 +100,11 @@ folly::Future StorageAdminServiceHandler::future_stopAdminTa RETURN_FUTURE(processor); } +folly::Future StorageAdminServiceHandler::future_clearSpace( + const cpp2::ClearSpaceReq& req) { + auto* processor = ClearSpaceProcessor::instance(env_); + RETURN_FUTURE(processor); +} + } // namespace storage } // namespace nebula diff --git a/src/storage/StorageAdminServiceHandler.h b/src/storage/StorageAdminServiceHandler.h index 8c068a1c333..f0bd8db9d4a 100644 --- a/src/storage/StorageAdminServiceHandler.h +++ b/src/storage/StorageAdminServiceHandler.h @@ -48,6 +48,8 @@ class StorageAdminServiceHandler final : public cpp2::StorageAdminServiceSvIf { folly::Future future_stopAdminTask(const cpp2::StopTaskRequest& req) override; + folly::Future future_clearSpace(const cpp2::ClearSpaceReq& req) override; + private: StorageEnv* env_{nullptr}; }; diff --git a/src/storage/admin/ClearSpaceProcessor.cpp b/src/storage/admin/ClearSpaceProcessor.cpp new file mode 100644 index 00000000000..19dfe9735ca --- /dev/null +++ b/src/storage/admin/ClearSpaceProcessor.cpp @@ -0,0 +1,32 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "storage/admin/ClearSpaceProcessor.h" + +#include "kvstore/NebulaStore.h" +#include "storage/StorageFlags.h" + +namespace nebula { +namespace storage { + +void ClearSpaceProcessor::process(const cpp2::ClearSpaceReq& req) { + auto spaceId = req.get_space_id(); + if (FLAGS_store_type != "nebula") { + this->resp_.code_ref() = nebula::cpp2::ErrorCode::E_INVALID_STORE; + onFinished(); + return; + } + auto* store = static_cast(env_->kvstore_); + this->resp_.code_ref() = store->clearSpace(spaceId); + onFinished(); +} + +void ClearSpaceProcessor::onFinished() { + this->promise_.setValue(std::move(resp_)); + delete this; +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/admin/ClearSpaceProcessor.h b/src/storage/admin/ClearSpaceProcessor.h new file mode 100644 index 00000000000..8668a59c43b --- /dev/null +++ b/src/storage/admin/ClearSpaceProcessor.h @@ -0,0 +1,53 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef STORAGE_ADMIN_CLEARSPACEPROCESSOR_H_ +#define STORAGE_ADMIN_CLEARSPACEPROCESSOR_H_ + +#include "storage/CommonUtils.h" + +namespace nebula { +namespace storage { + +/** + * @brief Processor class to clear space. + */ +class ClearSpaceProcessor { + public: + /** + * @brief Construct a new instance of ClearSpaceProcessor. + * + * @param env Related environment variables for storage. + * @return ClearSpaceProcessor* ClearSpaceProcessor instance. + */ + static ClearSpaceProcessor* instance(StorageEnv* env) { + return new ClearSpaceProcessor(env); + } + + /** + * @brief Entry point to clear space. + * + * @param req Request for clearing space. + */ + void process(const cpp2::ClearSpaceReq& req); + + folly::Future getFuture() { + return promise_.getFuture(); + } + + private: + explicit ClearSpaceProcessor(StorageEnv* env) : env_(env) {} + + void onFinished(); + + StorageEnv* env_{nullptr}; // Related environment variables for storage. + folly::Promise promise_; + cpp2::ClearSpaceResp resp_; +}; + +} // namespace storage +} // namespace nebula + +#endif // STORAGE_ADMIN_CLEARSPACEPROCESSOR_H_ diff --git a/tests/conftest.py b/tests/conftest.py index c23d24a2401..da80af10008 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -73,7 +73,8 @@ def pytest_bdd_step_error(request, feature, scenario, step, step_func, step_func if step_func_args.get("graph_spaces") is not None: graph_spaces = step_func_args.get("graph_spaces") if graph_spaces.get("space_desc") is not None: - logging.info("error space is {}".format(graph_spaces.get("space_desc"))) + logging.info("error space is {}".format( + graph_spaces.get("space_desc"))) def pytest_configure(config): @@ -110,6 +111,7 @@ def get_ports(): raise Exception(f"Invalid port: {port}") return port + def get_ssl_config_from_tmp(): with open(NB_TMP_PATH, "r") as f: data = json.loads(f.readline()) @@ -263,6 +265,7 @@ def workarround_for_class( request.cls.cleanup() request.cls.drop_data() + @pytest.fixture(scope="class") def establish_a_rare_connection(conn_pool, pytestconfig): conn = conn_pool.get_connection() diff --git a/tests/tck/conftest.py b/tests/tck/conftest.py index d2cea41fabc..b8ac9157640 100644 --- a/tests/tck/conftest.py +++ b/tests/tck/conftest.py @@ -120,22 +120,26 @@ def wait_indexes_ready(sess): def graph_spaces(): return dict(result_set=None) + @given(parse('parameters: {parameters}')) def preload_parameters( parameters ): try: paramMap = json.loads(parameters) - for (k,v) in paramMap.items(): - params[k]=value(v) + for (k, v) in paramMap.items(): + params[k] = value(v) except: raise ValueError("preload parameters failed!") + @then("clear the used parameters") def clear_parameters(): params = {} # construct python-type to nebula.Value + + def value(any): v = Value() if (isinstance(any, bool)): @@ -151,9 +155,11 @@ def value(any): elif (isinstance(any, dict)): v.set_mVal(map2NMap(any)) else: - raise TypeError("Do not support convert "+str(type(any))+" to nebula.Value") + raise TypeError("Do not support convert " + + str(type(any))+" to nebula.Value") return v + def list2Nlist(list): nlist = NList() nlist.values = [] @@ -161,13 +167,15 @@ def list2Nlist(list): nlist.values.append(value(item)) return nlist + def map2NMap(map): nmap = NMap() - nmap.kvs={} - for k,v in map.items(): - nmap.kvs[k]=value(v) + nmap.kvs = {} + for k, v in map.items(): + nmap.kvs[k] = value(v) return nmap + @given(parse('a graph with space named "{space}"')) def preload_space( request, @@ -245,7 +253,8 @@ def new_space(request, session, graph_spaces): @given(parse('load "{data}" csv data to a new space')) def import_csv_data(request, data, graph_spaces, session, pytestconfig): - data_dir = os.path.join(DATA_DIR, normalize_outline_scenario(request, data)) + data_dir = os.path.join( + DATA_DIR, normalize_outline_scenario(request, data)) space_desc = load_csv_data( session, data_dir, @@ -348,6 +357,7 @@ def given_nebulacluster_with_param( class_fixture_variables["cluster"] = nebula_svc class_fixture_variables["pool"] = pool + @when(parse('login "{graph}" with "{user}" and "{password}"')) def when_login_graphd(graph, user, password, class_fixture_variables, pytestconfig): index = parse_service_index(graph) @@ -367,6 +377,8 @@ def when_login_graphd(graph, user, password, class_fixture_variables, pytestconf # This is a workaround to test login retry because nebula-python treats # authentication failure as exception instead of error. + + @when(parse('login "{graph}" with "{user}" and "{password}" should fail:\n{msg}')) def when_login_graphd_fail(graph, user, password, class_fixture_variables, msg): index = parse_service_index(graph) @@ -384,11 +396,13 @@ def when_login_graphd_fail(graph, user, password, class_fixture_variables, msg): except: raise + @when(parse("executing query:\n{query}")) def executing_query(query, graph_spaces, session, request): ngql = combine_query(query) exec_query(request, ngql, session, graph_spaces) + @when(parse("executing query with user {username} with password {password}:\n{query}")) def executing_query(username, password, conn_pool_to_first_graph_service, query, graph_spaces, request): try: @@ -418,6 +432,7 @@ def raised_authenticate_error(error_msg, graph_spaces): ), f'Could not find "{msg}" in "{res_msg}" when authenticate and execute query: "{ngql}"' + @when(parse("profiling query:\n{query}")) def profiling_query(query, graph_spaces, session, request): ngql = "PROFILE {" + combine_query(query) + "}" @@ -771,7 +786,8 @@ def check_plan(plan, graph_spaces): idx = column_names.index('dependencies') rows = expect.get("rows", []) for i, row in enumerate(rows): - row[idx] = [int(cell.strip()) for cell in row[idx].split(",") if len(cell) > 0] + row[idx] = [int(cell.strip()) + for cell in row[idx].split(",") if len(cell) > 0] rows[i] = row differ = PlanDiffer(resp.plan_desc(), expect) assert differ.diff(), differ.err_msg() @@ -899,3 +915,29 @@ def replace_result_with_cluster_info(result, class_fixture_variables): except: raise return result + + +@pytest.fixture() +def execute_response(): + return dict() + + +@when(parse("connect to nebula service with user[u:{user}, p:{password}]")) +def conncet_to_service_with_user(conn_pool, user, password, class_fixture_variables): + sess = conn_pool.get_session(user, password) + class_fixture_variables["sessions"].append(sess) + + +@when("executing clear space") +def executing_clear_space(class_fixture_variables, execute_response): + session_cnt = len(class_fixture_variables["sessions"]) + last_sess = class_fixture_variables["sessions"][session_cnt - 1] + resp = last_sess.execute(" CLEAR SPACE IF EXISTS clear_space") + execute_response["resp"] = resp + + +@then("the result should be failed") +def result_failed(execute_response): + assert execute_response["resp"].is_succeeded() == False + assert execute_response["resp"].error_msg( + ) == "PermissionError: No permission to write space." diff --git a/tests/tck/features/mutate/ClearSpace.feature b/tests/tck/features/mutate/ClearSpace.feature new file mode 100644 index 00000000000..7e1f4ca3cae --- /dev/null +++ b/tests/tck/features/mutate/ClearSpace.feature @@ -0,0 +1,114 @@ +# Copyright (c) 2022 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License. +Feature: Clear space test + This file is mainly used to test the function of clear space. + + Scenario: Clear space syntax test + Given an empty graph + And create a space with following options: + | name | clear_space | + | partition_num | 9 | + | replica_factor | 1 | + | vid_type | FIXED_STRING(20) | + And wait 2 seconds + When executing query: + """ + CLEAR SPACE IF EXISTS clear_space; + """ + Then the execution should be successful + When executing query: + """ + CLEAR SPACE clear_space; + """ + Then the execution should be successful + When executing query: + """ + CLEAR SPACE IF EXISTS clear_space_0; + """ + Then the execution should be successful + When executing query: + """ + CLEAR SPACE clear_space_0; + """ + Then a ExecutionError should be raised at runtime: Space not existed! + + Scenario: Clear space function test + Given an empty graph + And create a space with following options: + | name | clear_space | + | partition_num | 9 | + | replica_factor | 1 | + | vid_type | FIXED_STRING(20) | + And wait 2 seconds + When executing query: + """ + CREATE TAG IF NOT EXISTS player(name string, age int); + CREATE TAG INDEX IF NOT EXISTS name ON player(name(20)); + """ + And wait 5 seconds + Then the execution should be successful + When executing query: + """ + INSERT VERTEX player(name, age) VALUES + "Russell Westbrook": ("Russell Westbrook", 30), + "Chris Paul": ("Chris Paul", 33), + "Boris Diaw": ("Boris Diaw", 36), + "David West": ("David West", 38), + "Danny Green": ("Danny Green", 31), + "Tim Duncan": ("Tim Duncan", 42), + "James Harden": ("James Harden", 29), + "Tony Parker": ("Tony Parker", 36), + "Aron Baynes": ("Aron Baynes", 32), + "Ben Simmons": ("Ben Simmons", 22), + "Blake Griffin": ("Blake Griffin", 30); + """ + Then the execution should be successful + When executing query: + """ + submit job stats; + """ + And wait 2 seconds + Then the execution should be successful + When executing query: + """ + show stats; + """ + Then the execution should be successful + And the result should be, in any order, with relax comparison: + | Type | Name | Count | + | "Tag" | "player" | 11 | + | "Space" | "vertices" | 11 | + | "Space" | "edges" | 0 | + When executing query: + """ + CLEAR SPACE IF EXISTS clear_space; + """ + Then the execution should be successful + When executing query: + """ + submit job stats; + """ + And wait 2 seconds + Then the execution should be successful + When executing query: + """ + show stats; + """ + Then the execution should be successful + And the result should be, in any order, with relax comparison: + | Type | Name | Count | + | "Tag" | "player" | 0 | + | "Space" | "vertices" | 0 | + | "Space" | "edges" | 0 | + # permission test + When executing query: + """ + CREATE USER IF NOT EXISTS clear_space_user WITH PASSWORD 'nebula'; + GRANT ROLE ADMIN ON clear_space TO clear_space_user; + """ + And wait 2 seconds + Then the execution should be successful + When connect to nebula service with user[u:clear_space_user, p:nebula] + And executing clear space + Then the result should be failed