From d7422b674f3a81f5f4e393c8300aea02ccc8cf8d Mon Sep 17 00:00:00 2001 From: jude-zhu <51590253+jude-zhu@users.noreply.github.com> Date: Thu, 2 Apr 2020 18:11:21 +0800 Subject: [PATCH] RC4 Cherrypick #2031 #2042 #2025 #2013 #2038 (#2051) * remove bigint because we dont support it (#2031) Co-authored-by: dutor <440396+dutor@users.noreply.github.com> (cherry picked from commit 3423df1f9cabadd08827cbedb4543955d281adef) * fix crash (#2042) Co-authored-by: dutor <440396+dutor@users.noreply.github.com> (cherry picked from commit eea740763e3d7385588c3e291553d9f9614cea44) * Sampling apply to all edges starting from a vertex. (#2013) * Sampling from all edges. * Update test for sampling multi edges. * Testing both inbound and outbound. * Fix clang compiling problem. * Update the sampling. Co-authored-by: dutor <440396+dutor@users.noreply.github.com> (cherry picked from commit 8810eccb5215153ba1887e59335059527db80c4e) * Fix the error when vertex/edge not exist if update. (#2025) * Fix the error when vertex/edge not exist if update. * Optimize the atomic error logical. * Fix one alignment. Co-authored-by: dutor <440396+dutor@users.noreply.github.com> (cherry picked from commit 9fc4e7f0f25c0ef642385f0470f913b5c554cfd5) * Correct issue 2009 which the timestamp default value for edge not treat. (#2038) * Correct issue 2009 which the timestamp default value for edge not treat. * Add the cover cases. * Fix one typo. Co-authored-by: dutor <440396+dutor@users.noreply.github.com> (cherry picked from commit 4aa9217da418d71287b866a461b4974685b06516) Co-authored-by: flyingcat <1004815462@qq.com> Co-authored-by: laura-ding <48548375+laura-ding@users.noreply.github.com> Co-authored-by: CPWstatic <13495049+CPWstatic@users.noreply.github.com> Co-authored-by: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> --- .linters/cpp/checkKeyword.py | 1 - src/common/filter/Expressions.cpp | 4 - src/common/filter/Expressions.h | 2 +- src/graph/TraverseExecutor.cpp | 2 +- src/graph/UpdateEdgeExecutor.cpp | 2 +- src/graph/UpdateVertexExecutor.cpp | 2 +- src/graph/test/FetchVerticesTest.cpp | 12 +++ src/graph/test/SchemaTest.cpp | 29 ++++++ .../schemaMan/CreateEdgeProcessor.cpp | 15 ++- .../schemaMan/CreateTagProcessor.cpp | 4 +- src/parser/parser.yy | 3 +- src/parser/scanner.lex | 2 - src/parser/test/ScannerTest.cpp | 3 - src/parser/test/fuzzing/nebula.dict | 1 - src/storage/CommonUtils.h | 7 -- src/storage/mutate/UpdateEdgeProcessor.cpp | 56 ++++------- src/storage/mutate/UpdateEdgeProcessor.h | 4 +- src/storage/mutate/UpdateVertexProcessor.cpp | 54 +++------- src/storage/mutate/UpdateVertexProcessor.h | 4 +- src/storage/query/QueryBaseProcessor.h | 6 +- src/storage/query/QueryBaseProcessor.inl | 23 +---- src/storage/query/QueryBoundProcessor.cpp | 98 ++++++++++++++++++- src/storage/query/QueryBoundProcessor.h | 5 + src/storage/query/QueryStatsProcessor.cpp | 11 ++- src/storage/test/QueryBoundTest.cpp | 81 +++++++++++---- 25 files changed, 270 insertions(+), 161 deletions(-) diff --git a/.linters/cpp/checkKeyword.py b/.linters/cpp/checkKeyword.py index c95df039721..be85f463d7b 100755 --- a/.linters/cpp/checkKeyword.py +++ b/.linters/cpp/checkKeyword.py @@ -48,7 +48,6 @@ 'KW_INDEXES', 'KW_REBUILD', 'KW_INT', - 'KW_BIGINT', 'KW_DOUBLE', 'KW_STRING', 'KW_BOOL', diff --git a/src/common/filter/Expressions.cpp b/src/common/filter/Expressions.cpp index facf715652a..adb0222c4f5 100644 --- a/src/common/filter/Expressions.cpp +++ b/src/common/filter/Expressions.cpp @@ -615,8 +615,6 @@ std::string columnTypeToString(ColumnType type) { return "string"; case ColumnType::DOUBLE: return "double"; - case ColumnType::BIGINT: - return "bigint"; case ColumnType::BOOL: return "bool"; case ColumnType::TIMESTAMP: @@ -656,8 +654,6 @@ OptVariantType TypeCastingExpression::eval(Getters &getters) const { return Expression::toDouble(result.value()); case ColumnType::BOOL: return Expression::toBool(result.value()); - case ColumnType::BIGINT: - return Status::Error("Type bigint not supported yet"); } LOG(FATAL) << "casting to unknown type: " << static_cast(type_); } diff --git a/src/common/filter/Expressions.h b/src/common/filter/Expressions.h index b021c10eadb..12f1b14a705 100644 --- a/src/common/filter/Expressions.h +++ b/src/common/filter/Expressions.h @@ -19,7 +19,7 @@ class Cord; using OptVariantType = StatusOr; enum class ColumnType { - INT, STRING, DOUBLE, BIGINT, BOOL, TIMESTAMP, + INT, STRING, DOUBLE, BOOL, TIMESTAMP, }; std::string columnTypeToString(ColumnType type); diff --git a/src/graph/TraverseExecutor.cpp b/src/graph/TraverseExecutor.cpp index 3e97a3da516..c690dd20b79 100644 --- a/src/graph/TraverseExecutor.cpp +++ b/src/graph/TraverseExecutor.cpp @@ -142,7 +142,7 @@ nebula::cpp2::SupportedType TraverseExecutor::calculateExprType(Expression* exp) case Expression::kInputProp: { auto* propExp = static_cast(exp); const auto* propName = propExp->prop(); - if (inputs_ == nullptr) { + if (inputs_ == nullptr || !inputs_->hasData()) { return nebula::cpp2::SupportedType::UNKNOWN; } else { return inputs_->getColumnType(*propName); diff --git a/src/graph/UpdateEdgeExecutor.cpp b/src/graph/UpdateEdgeExecutor.cpp index 20b0c6ac685..d5e20533e2c 100644 --- a/src/graph/UpdateEdgeExecutor.cpp +++ b/src/graph/UpdateEdgeExecutor.cpp @@ -249,7 +249,7 @@ void UpdateEdgeExecutor::updateEdge(bool reversely) { return; default: std::string errMsg = - folly::stringPrintf("Maybe edge does not exist or filter failed, " + folly::stringPrintf("Maybe edge does not exist, " "part: %d, error code: %d!", code.get_part_id(), static_cast(code.get_code())); diff --git a/src/graph/UpdateVertexExecutor.cpp b/src/graph/UpdateVertexExecutor.cpp index ce614bec6d2..a0ec41a32ad 100644 --- a/src/graph/UpdateVertexExecutor.cpp +++ b/src/graph/UpdateVertexExecutor.cpp @@ -223,7 +223,7 @@ void UpdateVertexExecutor::execute() { break; default: std::string errMsg = - folly::stringPrintf("Maybe vertex does not exist or filter failed, " + folly::stringPrintf("Maybe vertex does not exist, " "part: %d, error code: %d!", code.get_part_id(), static_cast(code.get_code())); diff --git a/src/graph/test/FetchVerticesTest.cpp b/src/graph/test/FetchVerticesTest.cpp index 6bd4e5dde52..fcebcab29ff 100644 --- a/src/graph/test/FetchVerticesTest.cpp +++ b/src/graph/test/FetchVerticesTest.cpp @@ -422,5 +422,17 @@ TEST_F(FetchVerticesTest, NonexistentProp) { ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code); } } + +TEST_F(FetchVerticesTest, EmptyInput) { + // YIELD has input prop, and input is empty + { + cpp2::ExecutionResponse resp; + auto query = "GO FROM 11 over like YIELD like._dst as id " + "| FETCH PROP ON player 11 YIELD $-.id"; + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::E_SYNTAX_ERROR, code); + } +} + } // namespace graph } // namespace nebula diff --git a/src/graph/test/SchemaTest.cpp b/src/graph/test/SchemaTest.cpp index 671365d0a19..41096d3b413 100644 --- a/src/graph/test/SchemaTest.cpp +++ b/src/graph/test/SchemaTest.cpp @@ -1090,5 +1090,34 @@ TEST_F(SchemaTest, TestTagAndEdge) { LOG(FATAL) << "Space still exists after sleep " << retry << " seconds"; } +TEST_F(SchemaTest, issue2009) { + auto client = gEnv->getClient(); + ASSERT_NE(nullptr, client); + { + cpp2::ExecutionResponse resp; + std::string query = "CREATE SPACE issue2009; USE issue2009"; + auto code = client->execute(query, resp); + ASSERT_EQ(code, cpp2::ErrorCode::SUCCEEDED); + } + { + cpp2::ExecutionResponse resp; + std::string query = "CREATE EDGE IF NOT EXISTS relation" + "(intimacy int default 0, " + "isReversible bool default false, " + "name string default \"N/A\", " + "startTime timestamp default 0)"; + auto code = client->execute(query, resp); + ASSERT_EQ(code, cpp2::ErrorCode::SUCCEEDED); + } + ::sleep(FLAGS_heartbeat_interval_secs + 1); + { + cpp2::ExecutionResponse resp; + std::string query = "INSERT EDGE relation (intimacy) VALUES " + "hash(\"person.Tom\") -> hash(\"person.Marry\")@0:(3)"; + auto code = client->execute(query, resp); + ASSERT_EQ(code, cpp2::ErrorCode::SUCCEEDED); + } +} + } // namespace graph } // namespace nebula diff --git a/src/meta/processors/schemaMan/CreateEdgeProcessor.cpp b/src/meta/processors/schemaMan/CreateEdgeProcessor.cpp index eefafbacf3d..3090f4e634c 100644 --- a/src/meta/processors/schemaMan/CreateEdgeProcessor.cpp +++ b/src/meta/processors/schemaMan/CreateEdgeProcessor.cpp @@ -102,8 +102,21 @@ void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) { } defaultValue = value->get_string_value(); break; - default: + case nebula::cpp2::SupportedType::TIMESTAMP: + if (value->getType() != nebula::cpp2::Value::Type::timestamp) { + LOG(ERROR) << "Create Edge Failed: " << name + << " type mismatch"; + handleErrorCode(cpp2::ErrorCode::E_CONFLICT); + onFinished(); + return; + } + defaultValue = folly::to(value->get_timestamp()); break; + default: + LOG(ERROR) << "Unknown type " << static_cast(column.get_type().get_type()); + handleErrorCode(cpp2::ErrorCode::E_INVALID_PARM); + onFinished(); + return; } VLOG(3) << "Get Edge Default value: Property Name " << name << ", Value " << defaultValue; diff --git a/src/meta/processors/schemaMan/CreateTagProcessor.cpp b/src/meta/processors/schemaMan/CreateTagProcessor.cpp index 4370af7bc2f..46fb5798c95 100644 --- a/src/meta/processors/schemaMan/CreateTagProcessor.cpp +++ b/src/meta/processors/schemaMan/CreateTagProcessor.cpp @@ -114,7 +114,9 @@ void CreateTagProcessor::process(const cpp2::CreateTagReq& req) { defaultValue = folly::to(value->get_timestamp()); break; default: - LOG(ERROR) << "Unsupported type"; + LOG(ERROR) << "Unknown type " << static_cast(column.get_type().get_type()); + handleErrorCode(cpp2::ErrorCode::E_INVALID_PARM); + onFinished(); return; } diff --git a/src/parser/parser.yy b/src/parser/parser.yy index e0d214551e4..a8754e5c6c9 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -103,7 +103,7 @@ static constexpr size_t MAX_ABS_INTEGER = 9223372036854775808ULL; %token KW_GO KW_AS KW_TO KW_OR KW_AND KW_XOR KW_USE KW_SET KW_FROM KW_WHERE KW_ALTER %token KW_MATCH KW_INSERT KW_VALUES KW_YIELD KW_RETURN KW_CREATE KW_VERTEX KW_OFFLINE %token KW_EDGE KW_EDGES KW_STEPS KW_OVER KW_UPTO KW_REVERSELY KW_SPACE KW_DELETE KW_FIND KW_REBUILD -%token KW_INT KW_BIGINT KW_DOUBLE KW_STRING KW_BOOL KW_TAG KW_TAGS KW_UNION KW_INTERSECT KW_MINUS +%token KW_INT KW_DOUBLE KW_STRING KW_BOOL KW_TAG KW_TAGS KW_UNION KW_INTERSECT KW_MINUS %token KW_NO KW_OVERWRITE KW_IN KW_DESCRIBE KW_DESC KW_SHOW KW_HOSTS KW_PART KW_PARTS KW_TIMESTAMP KW_ADD %token KW_PARTITION_NUM KW_REPLICA_FACTOR KW_CHARSET KW_COLLATE KW_COLLATION %token KW_DROP KW_REMOVE KW_SPACES KW_INGEST KW_INDEX KW_INDEXES @@ -484,7 +484,6 @@ type_spec | KW_DOUBLE { $$ = ColumnType::DOUBLE; } | KW_STRING { $$ = ColumnType::STRING; } | KW_BOOL { $$ = ColumnType::BOOL; } - | KW_BIGINT { $$ = ColumnType::BIGINT; } | KW_TIMESTAMP { $$ = ColumnType::TIMESTAMP; } ; diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index 5d0bf56a3b0..65ebdc280d4 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -62,7 +62,6 @@ INDEXES ([Ii][Nn][Dd][Ee][Xx][Ee][Ss]) REBUILD ([Rr][Ee][Bb][Uu][Ii][Ll][Dd]) STATUS ([Ss][Tt][Aa][Tt][Uu][Ss]) INT ([Ii][Nn][Tt]) -BIGINT ([Bb][Ii][Gg][Ii][Nn][Tt]) DOUBLE ([Dd][Oo][Uu][Bb][Ll][Ee]) STRING ([Ss][Tt][Rr][Ii][Nn][Gg]) BOOL ([Bb][Oo][Oo][Ll]) @@ -204,7 +203,6 @@ RECOVER ([Rr][Ee][Cc][Oo][Vv][Ee][Rr]) {INDEXES} { return TokenType::KW_INDEXES; } {REBUILD} { return TokenType::KW_REBUILD; } {INT} { return TokenType::KW_INT; } -{BIGINT} { return TokenType::KW_BIGINT; } {DOUBLE} { return TokenType::KW_DOUBLE; } {STRING} { return TokenType::KW_STRING; } {BOOL} { return TokenType::KW_BOOL; } diff --git a/src/parser/test/ScannerTest.cpp b/src/parser/test/ScannerTest.cpp index ca9b7534cb2..6bc31b4896b 100644 --- a/src/parser/test/ScannerTest.cpp +++ b/src/parser/test/ScannerTest.cpp @@ -235,9 +235,6 @@ TEST(Scanner, Basic) { CHECK_SEMANTIC_TYPE("PARTS", TokenType::KW_PARTS), CHECK_SEMANTIC_TYPE("Parts", TokenType::KW_PARTS), CHECK_SEMANTIC_TYPE("parts", TokenType::KW_PARTS), - CHECK_SEMANTIC_TYPE("BIGINT", TokenType::KW_BIGINT), - CHECK_SEMANTIC_TYPE("Bigint", TokenType::KW_BIGINT), - CHECK_SEMANTIC_TYPE("bigint", TokenType::KW_BIGINT), CHECK_SEMANTIC_TYPE("DOUBLE", TokenType::KW_DOUBLE), CHECK_SEMANTIC_TYPE("double", TokenType::KW_DOUBLE), CHECK_SEMANTIC_TYPE("STRING", TokenType::KW_STRING), diff --git a/src/parser/test/fuzzing/nebula.dict b/src/parser/test/fuzzing/nebula.dict index 0a2129fd565..706154bb950 100644 --- a/src/parser/test/fuzzing/nebula.dict +++ b/src/parser/test/fuzzing/nebula.dict @@ -76,7 +76,6 @@ keyword_REVERSELY="REVERSELY" keyword_SPACE="SPACE" keyword_SPACES="SPACES" keyword_INT="INT" -keyword_BIGINT="BIGINT" keyword_DOUBLE="DOUBLE" keyword_STRING="STRING" keyword_BOOL="BOOL" diff --git a/src/storage/CommonUtils.h b/src/storage/CommonUtils.h index 546c1d429e7..e55678b9694 100644 --- a/src/storage/CommonUtils.h +++ b/src/storage/CommonUtils.h @@ -128,13 +128,6 @@ bool checkDataExpiredForTTL(const meta::SchemaProviderIf* schema, int64_t ttlDuration); -enum class FilterResult { - SUCCEEDED = 0, // pass filter - E_FILTER_OUT = -1, // filter out - E_ERROR = -2, // exception when filter - E_BAD_SCHEMA = -3, // Bad schema -}; - } // namespace storage } // namespace nebula diff --git a/src/storage/mutate/UpdateEdgeProcessor.cpp b/src/storage/mutate/UpdateEdgeProcessor.cpp index 309403b463a..e947d412ee4 100644 --- a/src/storage/mutate/UpdateEdgeProcessor.cpp +++ b/src/storage/mutate/UpdateEdgeProcessor.cpp @@ -328,23 +328,20 @@ std::string UpdateEdgeProcessor::updateAndWriteBack(PartitionID partId, } -FilterResult UpdateEdgeProcessor::checkFilter(const PartitionID partId, - const cpp2::EdgeKey& edgeKey) { +cpp2::ErrorCode UpdateEdgeProcessor::checkFilter(const PartitionID partId, + const cpp2::EdgeKey& edgeKey) { auto ret = collectEdgesProps(partId, edgeKey); - switch (ret) { - case kvstore::ResultCode::SUCCEEDED: - break; - case kvstore::ResultCode::ERR_CORRUPT_DATA: - return FilterResult::E_BAD_SCHEMA; - default: - return FilterResult::E_ERROR; + if (ret == kvstore::ResultCode::ERR_CORRUPT_DATA) { + return cpp2::ErrorCode::E_EDGE_NOT_FOUND; + } else if (ret != kvstore::ResultCode::SUCCEEDED) { + return to(ret); } for (auto& tc : this->tagContexts_) { VLOG(3) << "partId " << partId << ", vId " << edgeKey.src << ", tagId " << tc.tagId_ << ", prop size " << tc.props_.size(); ret = collectVertexProps(partId, edgeKey.src, tc.tagId_, tc.props_); if (ret != kvstore::ResultCode::SUCCEEDED) { - return FilterResult::E_ERROR; + return to(ret); } } @@ -379,14 +376,14 @@ FilterResult UpdateEdgeProcessor::checkFilter(const PartitionID partId, auto filterResult = this->exp_->eval(getters); if (!filterResult.ok()) { VLOG(1) << "Invalid filter expression"; - return FilterResult::E_ERROR; + return cpp2::ErrorCode::E_INVALID_FILTER; } if (!Expression::asBool(filterResult.value())) { VLOG(1) << "Filter skips the update"; - return FilterResult::E_FILTER_OUT; + return cpp2::ErrorCode::E_FILTER_OUT; } } - return FilterResult::SUCCEEDED; + return cpp2::ErrorCode::SUCCEEDED; } @@ -488,18 +485,11 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) { // TODO(shylock) the AtomicOP can't return various error // so put it in the processor filterResult_ = checkFilter(partId, edgeKey); - switch (filterResult_) { - case FilterResult::SUCCEEDED : { + if (filterResult_ == cpp2::ErrorCode::SUCCEEDED) { return updateAndWriteBack(partId, edgeKey); - } - case FilterResult::E_FILTER_OUT: - // fallthrough - case FilterResult::E_ERROR: - // fallthrough - default: { + } else { return folly::none; } - } }, [this, partId, edgeKey, req] (kvstore::ResultCode code) { while (true) { @@ -518,24 +508,12 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) { break; } if (code == kvstore::ResultCode::ERR_ATOMIC_OP_FAILED) { - switch (filterResult_) { - case FilterResult::E_FILTER_OUT: - // Filter out - // https://github.com/vesoft-inc/nebula/issues/1888 - // Only filter out so we still return the data - onProcessFinished(req.get_return_columns().size()); - this->pushResultCode(cpp2::ErrorCode::E_FILTER_OUT, partId); - break; - case FilterResult::E_ERROR: - this->pushResultCode(cpp2::ErrorCode::E_INVALID_FILTER, partId); - break; - case FilterResult::E_BAD_SCHEMA: - this->pushResultCode(cpp2::ErrorCode::E_EDGE_NOT_FOUND, partId); - break; - default: - this->pushResultCode(to(code), partId); - break; + // https://github.com/vesoft-inc/nebula/issues/1888 + // Only filter out so we still return the data + if (filterResult_ == cpp2::ErrorCode::E_FILTER_OUT) { + onProcessFinished(req.get_return_columns().size()); } + this->pushResultCode(filterResult_, partId); } else { this->pushResultCode(to(code), partId); } diff --git a/src/storage/mutate/UpdateEdgeProcessor.h b/src/storage/mutate/UpdateEdgeProcessor.h index 5414548d535..e83381f0180 100644 --- a/src/storage/mutate/UpdateEdgeProcessor.h +++ b/src/storage/mutate/UpdateEdgeProcessor.h @@ -53,7 +53,7 @@ class UpdateEdgeProcessor kvstore::ResultCode collectEdgesProps(const PartitionID partId, const cpp2::EdgeKey& edgeKey); - FilterResult checkFilter(const PartitionID partId, const cpp2::EdgeKey& edgeKey); + cpp2::ErrorCode checkFilter(const PartitionID partId, const cpp2::EdgeKey& edgeKey); std::string updateAndWriteBack(PartitionID partId, const cpp2::EdgeKey& edgeKey); @@ -68,7 +68,7 @@ class UpdateEdgeProcessor std::unique_ptr updater_; meta::IndexManager* indexMan_{nullptr}; std::vector> indexes_; - std::atomic filterResult_{FilterResult::E_ERROR}; + std::atomic filterResult_{cpp2::ErrorCode::SUCCEEDED}; }; } // namespace storage diff --git a/src/storage/mutate/UpdateVertexProcessor.cpp b/src/storage/mutate/UpdateVertexProcessor.cpp index 01dbea5e0aa..7b5879d74c1 100644 --- a/src/storage/mutate/UpdateVertexProcessor.cpp +++ b/src/storage/mutate/UpdateVertexProcessor.cpp @@ -158,18 +158,15 @@ kvstore::ResultCode UpdateVertexProcessor::collectVertexProps( } -FilterResult UpdateVertexProcessor::checkFilter(const PartitionID partId, const VertexID vId) { +cpp2::ErrorCode UpdateVertexProcessor::checkFilter(const PartitionID partId, const VertexID vId) { for (auto& tc : this->tagContexts_) { VLOG(3) << "partId " << partId << ", vId " << vId << ", tagId " << tc.tagId_ << ", prop size " << tc.props_.size(); auto ret = collectVertexProps(partId, vId, tc.tagId_, tc.props_); - switch (ret) { - case kvstore::ResultCode::SUCCEEDED: - break; - case kvstore::ResultCode::ERR_CORRUPT_DATA: - return FilterResult::E_BAD_SCHEMA; - default: - return FilterResult::E_ERROR; + if (ret == kvstore::ResultCode::ERR_CORRUPT_DATA) { + return cpp2::ErrorCode::E_TAG_NOT_FOUND; + } else if (ret != kvstore::ResultCode::SUCCEEDED) { + return to(ret); } } @@ -194,14 +191,14 @@ FilterResult UpdateVertexProcessor::checkFilter(const PartitionID partId, const if (this->exp_ != nullptr) { auto filterResult = this->exp_->eval(getters); if (!filterResult.ok()) { - return FilterResult::E_ERROR; + return cpp2::ErrorCode::E_INVALID_FILTER; } if (!Expression::asBool(filterResult.value())) { VLOG(1) << "Filter skips the update"; - return FilterResult::E_FILTER_OUT; + return cpp2::ErrorCode::E_FILTER_OUT; } } - return FilterResult::SUCCEEDED; + return cpp2::ErrorCode::SUCCEEDED; } @@ -428,19 +425,11 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) { // TODO(shylock) the AtomicOP can't return various error // so put it in the processor filterResult_ = checkFilter(partId, vId); - switch (filterResult_) { - case FilterResult::SUCCEEDED : { + if (filterResult_ == cpp2::ErrorCode::SUCCEEDED) { return updateAndWriteBack(partId, vId); - } - case FilterResult::E_FILTER_OUT: - // Fallthrough - case FilterResult::E_ERROR: - // Fallthrough - case FilterResult::E_BAD_SCHEMA: - default: { + } else { return folly::none; } - } }, [this, partId, vId, req] (kvstore::ResultCode code) { while (true) { @@ -455,24 +444,13 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) { break; } if (code == kvstore::ResultCode::ERR_ATOMIC_OP_FAILED) { - switch (filterResult_) { - case FilterResult::E_FILTER_OUT: - // Filter out - // https://github.com/vesoft-inc/nebula/issues/1888 - // Only filter out so we still return the data - onProcessFinished(req.get_return_columns().size()); - this->pushResultCode(cpp2::ErrorCode::E_FILTER_OUT, partId); - break; - case FilterResult::E_ERROR: - this->pushResultCode(cpp2::ErrorCode::E_INVALID_FILTER, partId); - break; - case FilterResult::E_BAD_SCHEMA: - this->pushResultCode(cpp2::ErrorCode::E_TAG_NOT_FOUND, partId); - break; - default: - this->pushResultCode(to(code), partId); - break; + if (filterResult_ == cpp2::ErrorCode::E_FILTER_OUT) { + // Filter out + // https://github.com/vesoft-inc/nebula/issues/1888 + // Only filter out so we still return the data + onProcessFinished(req.get_return_columns().size()); } + this->pushResultCode(filterResult_, partId); } else { this->pushResultCode(to(code), partId); } diff --git a/src/storage/mutate/UpdateVertexProcessor.h b/src/storage/mutate/UpdateVertexProcessor.h index 8483b670761..2184569e041 100644 --- a/src/storage/mutate/UpdateVertexProcessor.h +++ b/src/storage/mutate/UpdateVertexProcessor.h @@ -57,7 +57,7 @@ class UpdateVertexProcessor const TagID tagId, const std::vector& props); - FilterResult checkFilter(const PartitionID partId, const VertexID vId); + cpp2::ErrorCode checkFilter(const PartitionID partId, const VertexID vId); std::string updateAndWriteBack(const PartitionID partId, const VertexID vId); @@ -70,7 +70,7 @@ class UpdateVertexProcessor std::unordered_map> tagUpdaters_; meta::IndexManager* indexMan_{nullptr}; std::vector> indexes_; - std::atomic filterResult_{FilterResult::E_ERROR}; + std::atomic filterResult_{cpp2::ErrorCode::SUCCEEDED}; }; } // namespace storage diff --git a/src/storage/query/QueryBaseProcessor.h b/src/storage/query/QueryBaseProcessor.h index eb028dc7a71..4409a04d80e 100644 --- a/src/storage/query/QueryBaseProcessor.h +++ b/src/storage/query/QueryBaseProcessor.h @@ -27,9 +27,8 @@ const std::unordered_map kPropsInKey_ = }; using EdgeProcessor - = std::function& props)>; + = std::function reader, + folly::StringPiece key)>; struct Bucket { std::vector> vertices_; }; @@ -106,7 +105,6 @@ class QueryBaseProcessor : public BaseProcessor { PartitionID partId, VertexID vId, EdgeType edgeType, - const std::vector& props, FilterContext* fcontext, EdgeProcessor proc); diff --git a/src/storage/query/QueryBaseProcessor.inl b/src/storage/query/QueryBaseProcessor.inl index 56ec947cb8b..10f4553e30c 100644 --- a/src/storage/query/QueryBaseProcessor.inl +++ b/src/storage/query/QueryBaseProcessor.inl @@ -480,7 +480,6 @@ kvstore::ResultCode QueryBaseProcessor::collectEdgeProps( PartitionID partId, VertexID vId, EdgeType edgeType, - const std::vector& props, FilterContext* fcontext, EdgeProcessor proc) { auto prefix = NebulaKeyUtils::edgePrefix(partId, vId, edgeType); @@ -496,15 +495,6 @@ kvstore::ResultCode QueryBaseProcessor::collectEdgeProps( int cnt = 0; bool onlyStructure = onlyStructures_[edgeType]; Getters getters; - std::unique_ptr, std::string>>> sampler; - if (FLAGS_enable_reservoir_sampling) { - sampler = std::make_unique< - nebula::algorithm::ReservoirSampling< - std::pair, std::string> - > - >(FLAGS_max_edge_returned_per_vertex); - } auto schema = this->schemaMan_->getEdgeSchema(spaceId_, std::abs(edgeType)); auto retTTL = getEdgeTTLInfo(edgeType); @@ -605,24 +595,13 @@ kvstore::ResultCode QueryBaseProcessor::collectEdgeProps( } } - if (FLAGS_enable_reservoir_sampling) { - sampler->sampling(std::make_pair(std::move(reader), key.str())); - } else { - proc(reader.get(), key, props); - } + proc(std::move(reader), key); ++cnt; if (firstLoop) { firstLoop = false; } } - if (FLAGS_enable_reservoir_sampling) { - auto samples = std::move(*sampler).samples(); - for (auto& sample : samples) { - proc(sample.first.get(), sample.second, props); - } - } - return ret; } diff --git a/src/storage/query/QueryBoundProcessor.cpp b/src/storage/query/QueryBoundProcessor.cpp index 8ee5fae110f..cabc6219da6 100644 --- a/src/storage/query/QueryBoundProcessor.cpp +++ b/src/storage/query/QueryBoundProcessor.cpp @@ -34,18 +34,19 @@ kvstore::ResultCode QueryBoundProcessor::processEdgeImpl(const PartitionID partI std::vector edges; edges.reserve(FLAGS_reserved_edges_one_vertex); auto ret = collectEdgeProps( - partId, vId, edgeType, props, &fcontext, - [&, this](RowReader* reader, folly::StringPiece k, const std::vector& p) { + partId, vId, edgeType, &fcontext, + [&, this](std::unique_ptr reader, + folly::StringPiece k) { cpp2::IdAndProp edge; if (!onlyStructure) { RowWriter writer(currEdgeSchema); PropsCollector collector(&writer); - this->collectProps(reader, k, p, &fcontext, &collector); + this->collectProps(reader.get(), k, props, &fcontext, &collector); edge.set_dst(collector.getDstId()); edge.set_props(writer.encode()); } else { PropsCollector collector(nullptr); - this->collectProps(reader, k, p, &fcontext, &collector); + this->collectProps(reader.get(), k, props, &fcontext, &collector); edge.set_dst(collector.getDstId()); } edges.emplace_back(std::move(edge)); @@ -80,6 +81,88 @@ kvstore::ResultCode QueryBoundProcessor::processEdge(PartitionID partId, VertexI return kvstore::ResultCode::SUCCEEDED; } +kvstore::ResultCode QueryBoundProcessor::processEdgeSampling(const PartitionID partId, + const VertexID vId, + FilterContext& fcontext, + cpp2::VertexData& vdata) { + using Sample = std::tuple< + EdgeType, /* type */ + std::string, /* key */ + std::unique_ptr, /* val */ + std::shared_ptr, /* schema of this value*/ + const std::vector* /* props needed */>; + auto sampler = std::make_unique< + nebula::algorithm::ReservoirSampling + >(FLAGS_max_edge_returned_per_vertex); + + for (const auto& ec : edgeContexts_) { + auto edgeType = ec.first; + auto* props = &ec.second; + bool onlyStructure = onlyStructures_[edgeType]; + std::shared_ptr currEdgeSchema; + if (!onlyStructure) { + auto schema = edgeSchema_.find(edgeType); + if (schema == edgeSchema_.end()) { + LOG(ERROR) << "Not found the edge type: " << edgeType; + return kvstore::ResultCode::ERR_EDGE_NOT_FOUND; + } + currEdgeSchema = schema->second; + } + if (!props->empty()) { + CHECK(!onlyVertexProps_); + auto ret = collectEdgeProps( + partId, vId, edgeType, &fcontext, + [&](std::unique_ptr reader, + folly::StringPiece k) { + sampler->sampling( + std::make_tuple(edgeType, k.str(), std::move(reader), + currEdgeSchema, props)); + }); + if (ret != kvstore::ResultCode::SUCCEEDED) { + return ret; + } + } + } + + std::unordered_map edgeDataMap; + auto samples = std::move(*sampler).samples(); + for (auto& sample : samples) { + auto edgeType = std::get<0>(sample); + auto currEdgeSchema = std::get<3>(sample); + auto& props = *std::get<4>(sample); + bool onlyStructure = onlyStructures_[edgeType]; + cpp2::IdAndProp edge; + if (!onlyStructure) { + RowWriter writer(currEdgeSchema); + PropsCollector collector(&writer); + this->collectProps( + std::get<2>(sample).get(), std::get<1>(sample), props, &fcontext, &collector); + edge.set_dst(collector.getDstId()); + edge.set_props(writer.encode()); + } else { + PropsCollector collector(nullptr); + this->collectProps( + std::get<2>(sample).get(), std::get<1>(sample), props, &fcontext, &collector); + edge.set_dst(collector.getDstId()); + } + auto edges = edgeDataMap.find(edgeType); + if (edges == edgeDataMap.end()) { + cpp2::EdgeData edgeData; + edgeData.set_type(edgeType); + edgeData.edges.emplace_back(std::move(edge)); + edgeDataMap.emplace(edgeType, std::move(edgeData)); + } else { + edges->second.edges.emplace_back(std::move(edge)); + } + } + + std::transform(edgeDataMap.begin(), edgeDataMap.end(), std::back_inserter(vdata.edge_data), + [] (auto& data) { + return std::move(data).second; + }); + return kvstore::ResultCode::SUCCEEDED; +} + kvstore::ResultCode QueryBoundProcessor::processVertex(PartitionID partId, VertexID vId) { cpp2::VertexData vResp; vResp.set_vertex_id(vId); @@ -119,7 +202,12 @@ kvstore::ResultCode QueryBoundProcessor::processVertex(PartitionID partId, Verte return kvstore::ResultCode::SUCCEEDED; } - auto ret = processEdge(partId, vId, fcontext, vResp); + kvstore::ResultCode ret; + if (FLAGS_enable_reservoir_sampling) { + ret = processEdgeSampling(partId, vId, fcontext, vResp); + } else { + ret = processEdge(partId, vId, fcontext, vResp); + } if (ret != kvstore::ResultCode::SUCCEEDED) { return ret; diff --git a/src/storage/query/QueryBoundProcessor.h b/src/storage/query/QueryBoundProcessor.h index 333dd684b5f..62671b3d9c5 100644 --- a/src/storage/query/QueryBoundProcessor.h +++ b/src/storage/query/QueryBoundProcessor.h @@ -47,6 +47,11 @@ class QueryBoundProcessor kvstore::ResultCode processEdge(PartitionID partId, VertexID vId, FilterContext &fcontext, cpp2::VertexData& vdata); + kvstore::ResultCode processEdgeSampling(const PartitionID partId, + const VertexID vId, + FilterContext& fcontext, + cpp2::VertexData& vdata); + kvstore::ResultCode processEdgeImpl(const PartitionID partId, const VertexID vId, const EdgeType edgeType, const std::vector& props, diff --git a/src/storage/query/QueryStatsProcessor.cpp b/src/storage/query/QueryStatsProcessor.cpp index f06b93c5263..2a7c4fc9c85 100644 --- a/src/storage/query/QueryStatsProcessor.cpp +++ b/src/storage/query/QueryStatsProcessor.cpp @@ -85,11 +85,12 @@ kvstore::ResultCode QueryStatsProcessor::processVertex(PartitionID partId, auto edgeType = ec.first; auto& props = ec.second; if (!props.empty()) { - auto r = this->collectEdgeProps(partId, vId, edgeType, props, &fcontext, - [&, this](RowReader* reader, folly::StringPiece key, - const std::vector& p) { - this->collectProps(reader, key, p, &fcontext, - &collector_); + auto r = this->collectEdgeProps(partId, vId, edgeType, &fcontext, + [&, this](std::unique_ptr reader, + folly::StringPiece key) { + this->collectProps( + reader.get(), key, props, &fcontext, + &collector_); }); if (r != kvstore::ResultCode::SUCCEEDED) { return r; diff --git a/src/storage/test/QueryBoundTest.cpp b/src/storage/test/QueryBoundTest.cpp index f9e1c1fe100..f81696ca4b7 100644 --- a/src/storage/test/QueryBoundTest.cpp +++ b/src/storage/test/QueryBoundTest.cpp @@ -206,6 +206,8 @@ void checkSamplingResponse(cpp2::QueryResponse& resp, int32_t edgeFields, int32_t dstIdStart, int32_t dstIdEnd, + int32_t dstIdStartReverse, + int32_t dstIdEndReverse, int32_t edgeNum) { EXPECT_EQ(0, resp.result.failed_codes.size()); @@ -242,16 +244,21 @@ void checkSamplingResponse(cpp2::QueryResponse& resp, checkTagData(vp.tag_data, 3005, "tag_3005_col_4", vschema, folly::stringPrintf("tag_string_col_4")); + int32_t rowNum = 0; for (auto& ep : vp.edge_data) { auto it2 = schema.find(ep.type); DCHECK(it2 != schema.end()); auto provider = it2->second; - int32_t rowNum = 0; for (auto& edge : ep.get_edges()) { auto dst = edge.get_dst(); VLOG(1) << "Check edge " << vp.vertex_id << " -> " << dst << " props..."; - CHECK_LE(dstIdStart, dst); - CHECK_GE(dstIdEnd, dst); + if (ep.type < 0) { + CHECK_LE(dstIdStartReverse, dst); + CHECK_GE(dstIdEndReverse, dst); + } else { + CHECK_LE(dstIdStart, dst); + CHECK_GE(dstIdEnd, dst); + } auto reader = RowReader::getRowReader(edge.props, provider); DCHECK(reader != nullptr); EXPECT_EQ(edgeFields, reader->numFields() + 1); @@ -275,9 +282,9 @@ void checkSamplingResponse(cpp2::QueryResponse& resp, } rowNum++; } - EXPECT_EQ(edgeNum, rowNum); - totalEdges += rowNum; } + totalEdges += rowNum; + EXPECT_EQ(edgeNum, rowNum); } EXPECT_EQ(totalEdges, *resp.get_total_edges()); } @@ -306,7 +313,7 @@ TEST(QueryBoundTest, OutBoundSimpleTest) { checkResponse(resp, 30, 12, 10001, 7); } -TEST(QueryBoundTest, inBoundSimpleTest) { +TEST(QueryBoundTest, InBoundSimpleTest) { fs::TempDir rootPath("/tmp/QueryBoundTest.XXXXXX"); LOG(INFO) << "Prepare meta..."; std::unique_ptr kv = TestUtils::initKV(rootPath.path()); @@ -593,21 +600,59 @@ TEST(QueryBoundTest, SamplingTest) { auto schemaMan = TestUtils::mockSchemaMan(); mockData(kv.get()); - cpp2::GetNeighborsRequest req; - std::vector et = {101}; - buildRequest(req, et); + { + cpp2::GetNeighborsRequest req; + std::vector et = {101}; + buildRequest(req, et); - LOG(INFO) << "Test QueryOutBoundRequest..."; - auto executor = std::make_unique(3); - auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get(), - nullptr, executor.get()); - auto f = processor->getFuture(); - processor->process(req); - auto resp = std::move(f).get(); + LOG(INFO) << "Test QueryOutBoundRequest..."; + auto executor = std::make_unique(3); + auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get(), + nullptr, executor.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + LOG(INFO) << "Check the results..."; + checkSamplingResponse(resp, 30, 12, 10001, 10007, 20001, 20005, + FLAGS_max_edge_returned_per_vertex); + } + { + cpp2::GetNeighborsRequest req; + std::vector et = {101, 102, 103}; + buildRequest(req, et); - LOG(INFO) << "Check the results..."; - checkSamplingResponse(resp, 30, 12, 10001, 10007, FLAGS_max_edge_returned_per_vertex); + LOG(INFO) << "Test QueryOutBoundRequest..."; + auto executor = std::make_unique(3); + auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get(), + nullptr, executor.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + LOG(INFO) << "Check the results..."; + checkSamplingResponse(resp, 30, 12, 10001, 10007, 20001, 20005, + FLAGS_max_edge_returned_per_vertex); + } + { + cpp2::GetNeighborsRequest req; + std::vector et = {101, -101}; + buildRequest(req, et); + + LOG(INFO) << "Test QueryOutBoundRequest..."; + auto executor = std::make_unique(3); + auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get(), + nullptr, executor.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + LOG(INFO) << "Check the results..."; + checkSamplingResponse(resp, 30, 12, 10001, 10007, 20001, 20005, + FLAGS_max_edge_returned_per_vertex); + } FLAGS_max_edge_returned_per_vertex = old_max_edge_returned; + FLAGS_enable_reservoir_sampling = false; } } // namespace storage } // namespace nebula