From 5794cdd9f29cd55051dcbf392befe942401e7d38 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Fri, 31 Jul 2020 14:00:35 +0800 Subject: [PATCH 1/8] bench base --- src/storage/exec/EdgeNode.h | 4 +- src/storage/exec/TagNode.h | 2 +- src/storage/test/GetNeighborsBenchmark.cpp | 363 +++++++++++++-------- 3 files changed, 235 insertions(+), 134 deletions(-) diff --git a/src/storage/exec/EdgeNode.h b/src/storage/exec/EdgeNode.h index 949d4bebb..2b9b88d54 100644 --- a/src/storage/exec/EdgeNode.h +++ b/src/storage/exec/EdgeNode.h @@ -138,7 +138,7 @@ class FetchEdgeNode final : public EdgeNode { } else { iter_.reset(); } - return kvstore::ResultCode::SUCCEEDED; + return ret; } }; @@ -170,7 +170,7 @@ class SingleEdgeNode final : public EdgeNode { } else { iter_.reset(); } - return kvstore::ResultCode::SUCCEEDED; + return ret; } }; diff --git a/src/storage/exec/TagNode.h b/src/storage/exec/TagNode.h index 1cb5e4966..415e9e8c4 100644 --- a/src/storage/exec/TagNode.h +++ b/src/storage/exec/TagNode.h @@ -65,7 +65,7 @@ class TagNode final : public IterateNode { } else { iter_.reset(); } - return kvstore::ResultCode::SUCCEEDED; + return ret; } kvstore::ResultCode collectTagPropsIfValid(NullHandler nullHandler, diff --git a/src/storage/test/GetNeighborsBenchmark.cpp b/src/storage/test/GetNeighborsBenchmark.cpp index 8570f64f5..4c83ad518 100644 --- a/src/storage/test/GetNeighborsBenchmark.cpp +++ b/src/storage/test/GetNeighborsBenchmark.cpp @@ -9,9 +9,12 @@ #include "common/fs/TempDir.h" #include "storage/query/GetNeighborsProcessor.h" #include "storage/test/QueryTestUtils.h" +#include "storage/exec/EdgeNode.h" DEFINE_uint64(max_rank, 1000, "max rank of each edge"); DEFINE_double(filter_ratio, 0.5, "ratio of data would pass filter"); +DEFINE_bool(go_record, false, ""); +DEFINE_bool(kv_record, false, ""); std::unique_ptr gCluster; @@ -46,6 +49,37 @@ void setUp(const char* path, EdgeRanking maxRank) { } // namespace storage } // namespace nebula +void initContext(std::unique_ptr& planCtx, + nebula::storage::EdgeContext& edgeContext, + const std::vector& serveProps) { + nebula::GraphSpaceID spaceId = 1; + auto* env = gCluster->storageEnv_.get(); + auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId).value(); + planCtx = std::make_unique(env, spaceId, vIdLen); + + nebula::EdgeType serve = 101; + edgeContext.schemas_ = std::move(env->schemaMan_->getAllVerEdgeSchema(spaceId)).value(); + + auto edgeName = env->schemaMan_->toEdgeName(spaceId, std::abs(serve)); + edgeContext.edgeNames_.emplace(serve, std::move(edgeName).value()); + auto iter = edgeContext.schemas_.find(std::abs(serve)); + const auto& edgeSchema = iter->second.back(); + + std::vector ctxs; + for (const auto& prop : serveProps) { + auto field = edgeSchema->field(prop); + nebula::storage::PropContext ctx(prop.c_str()); + ctx.returned_ = true; + ctx.field_ = field; + ctxs.emplace_back(std::move(ctx)); + } + edgeContext.propContexts_.emplace_back(serve, std::move(ctxs)); + edgeContext.indexMap_.emplace(serve, edgeContext.propContexts_.size() - 1); + + const auto& ec = edgeContext.propContexts_.front(); + planCtx->props_ = &ec.second; +} + void go(int32_t iters, const std::vector& vertex, const std::vector& playerProps, @@ -60,6 +94,7 @@ void go(int32_t iters, auto fut = processor->getFuture(); processor->process(req); auto resp = std::move(fut).get(); + folly::doNotOptimizeAway(resp); } } @@ -89,6 +124,148 @@ void goFilter(int32_t iters, auto fut = processor->getFuture(); processor->process(req); auto resp = std::move(fut).get(); + folly::doNotOptimizeAway(resp); + } +} + +void goEdgeNode(int32_t iters, + const std::vector& vertex, + const std::vector& playerProps, + const std::vector& serveProps) { + UNUSED(playerProps); + std::unique_ptr planCtx; + std::unique_ptr edgeNode; + nebula::storage::EdgeContext edgeContext; + BENCHMARK_SUSPEND { + initContext(planCtx, edgeContext, serveProps); + const auto& ec = edgeContext.propContexts_.front(); + edgeNode = std::make_unique( + planCtx.get(), &edgeContext, ec.first, &ec.second); + } + auto totalParts = gCluster->getTotalParts(); + for (decltype(iters) i = 0; i < iters; i++) { + nebula::DataSet resultDataSet; + std::hash hash; + for (const auto& vId : vertex) { + nebula::PartitionID partId = (hash(vId) % totalParts) + 1; + std::vector row; + row.emplace_back(vId); + row.emplace_back(nebula::List()); + { + edgeNode->execute(partId, vId); + int32_t count = 0; + auto& cell = row[1].mutableList(); + for (; edgeNode->valid(); edgeNode->next()) { + nebula::List list; + auto key = edgeNode->key(); + folly::doNotOptimizeAway(key); + auto reader = edgeNode->reader(); + auto props = planCtx->props_; + for (const auto& prop : *props) { + auto value = nebula::storage::QueryUtils::readValue( + reader, prop.name_, prop.field_); + CHECK(value.ok()); + list.emplace_back(std::move(value).value()); + } + cell.values.emplace_back(std::move(list)); + count++; + } + CHECK_EQ(FLAGS_max_rank, count); + } + resultDataSet.rows.emplace_back(std::move(row)); + } + CHECK_EQ(vertex.size(), resultDataSet.rowSize()); + } +} + +void prefix(int32_t iters, + const std::vector& vertex, + const std::vector& playerProps, + const std::vector& serveProps) { + std::unique_ptr planCtx; + nebula::storage::EdgeContext edgeContext; + BENCHMARK_SUSPEND { + initContext(planCtx, edgeContext, serveProps); + } + for (decltype(iters) i = 0; i < iters; i++) { + nebula::GraphSpaceID spaceId = 1; + nebula::TagID player = 1; + nebula::EdgeType serve = 101; + + std::hash hash; + auto* env = gCluster->storageEnv_.get(); + auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId).value(); + auto totalParts = gCluster->getTotalParts(); + + auto tagSchemas = env->schemaMan_->getAllVerTagSchema(spaceId).value(); + auto tagSchemaIter = tagSchemas.find(player); + CHECK(tagSchemaIter != tagSchemas.end()); + CHECK(!tagSchemaIter->second.empty()); + auto* tagSchema = &(tagSchemaIter->second); + + auto edgeSchemas = env->schemaMan_->getAllVerEdgeSchema(spaceId).value(); + auto edgeSchemaIter = edgeSchemas.find(std::abs(serve)); + CHECK(edgeSchemaIter != edgeSchemas.end()); + CHECK(!edgeSchemaIter->second.empty()); + auto* edgeSchema = &(edgeSchemaIter->second); + + std::unique_ptr reader; + nebula::DataSet resultDataSet; + for (const auto& vId : vertex) { + nebula::PartitionID partId = (hash(vId) % totalParts) + 1; + std::vector row; + row.emplace_back(vId); + row.emplace_back(nebula::List()); + row.emplace_back(nebula::List()); + { + // read tags + std::unique_ptr iter; + auto prefix = nebula::NebulaKeyUtils::vertexPrefix(vIdLen, partId, vId, player); + auto code = env->kvstore_->prefix(spaceId, partId, prefix, &iter); + CHECK_EQ(code, nebula::kvstore::ResultCode::SUCCEEDED); + CHECK(iter->valid()); + auto val = iter->val(); + reader = nebula::RowReader::getRowReader(*tagSchema, val); + CHECK_NOTNULL(reader); + auto& cell = row[1].mutableList(); + for (const auto& prop : playerProps) { + cell.emplace_back(reader->getValueByName(prop)); + } + } + { + // read edges + std::unique_ptr iter; + auto prefix = nebula::NebulaKeyUtils::edgePrefix(vIdLen, partId, vId, serve); + auto code = env->kvstore_->prefix(spaceId, partId, prefix, &iter); + CHECK_EQ(code, nebula::kvstore::ResultCode::SUCCEEDED); + int32_t count = 0; + auto& cell = row[2].mutableList(); + for (; iter->valid(); iter->next()) { + nebula::List list; + auto key = iter->key(); + folly::doNotOptimizeAway(key); + auto val = iter->val(); + if (!reader) { + reader = nebula::RowReader::getRowReader(*edgeSchema, val); + CHECK_NOTNULL(reader); + } else if (!reader->reset(*edgeSchema, val)) { + LOG(FATAL) << "Should not happen"; + } + auto props = planCtx->props_; + for (const auto& prop : *props) { + auto value = nebula::storage::QueryUtils::readValue( + reader.get(), prop.name_, prop.field_); + CHECK(value.ok()); + list.emplace_back(std::move(value).value()); + } + cell.values.emplace_back(std::move(list)); + count++; + } + CHECK_EQ(FLAGS_max_rank, count); + } + resultDataSet.rows.emplace_back(std::move(row)); + } + CHECK_EQ(vertex.size(), resultDataSet.rowSize()); } } @@ -100,17 +277,15 @@ BENCHMARK(OneVertexOneProperty, iters) { BENCHMARK_RELATIVE(OneVertexOnePropertyWithFilter, iters) { goFilter(iters, {"Tim Duncan"}, {"name"}, {"teamName"}); } - -BENCHMARK(OneVertexThreeProperty, iters) { - go(iters, {"Tim Duncan"}, {"name", "age", "avgScore"}, {nebula::kDst, "startYear", "endYear"}); +BENCHMARK_RELATIVE(OneVertexOnePropertyOnlyEdgeNode, iters) { + goEdgeNode(iters, {"Tim Duncan"}, {"name"}, {"teamName"}); } -BENCHMARK_RELATIVE(OneVertexThreePropertyWithFilter, iters) { - goFilter(iters, - {"Tim Duncan"}, - {"name", "age", "avgScore"}, - {nebula::kDst, "startYear", "endYear"}); +BENCHMARK_RELATIVE(OneVertexOneProperyOnlyKV, iters) { + prefix(iters, {"Tim Duncan"}, {"name"}, {"teamName"}); } +BENCHMARK_DRAW_LINE(); + BENCHMARK(TenVertexOneProperty, iters) { go(iters, {"Tim Duncan", "Kobe Bryant", "Stephen Curry", "Manu Ginobili", "Joel Embiid", @@ -126,159 +301,85 @@ BENCHMARK_RELATIVE(TenVertexOnePropertyWithFilter, iters) { {"name"}, {"teamName"}); } - -BENCHMARK(TenVertexThreeProperty, iters) { - go(iters, - {"Tim Duncan", "Kobe Bryant", "Stephen Curry", "Manu Ginobili", "Joel Embiid", +BENCHMARK_RELATIVE(TenVertexOnePropertyOnlyEdgeNode, iters) { + goEdgeNode( + iters, + {"Tim Duncan", "Kobe Bryant", "Stephen Curry", "Manu Ginobili", "Joel Embiid", "Giannis Antetokounmpo", "Yao Ming", "Damian Lillard", "Dirk Nowitzki", "Klay Thompson"}, - {"name", "age", "avgScore"}, - {nebula::kDst, "startYear", "endYear"}); + {"name"}, + {"teamName"}); } -BENCHMARK_RELATIVE(TenVertexThreePropertyWithFilter, iters) { - goFilter( +BENCHMARK_RELATIVE(TenVertexOneProperyOnlyKV, iters) { + prefix( iters, {"Tim Duncan", "Kobe Bryant", "Stephen Curry", "Manu Ginobili", "Joel Embiid", "Giannis Antetokounmpo", "Yao Ming", "Damian Lillard", "Dirk Nowitzki", "Klay Thompson"}, - {"name", "age", "avgScore"}, - {nebula::kDst, "startYear", "endYear"}); + {"name"}, + {"teamName"}); } int main(int argc, char** argv) { folly::init(&argc, &argv, true); nebula::fs::TempDir rootPath("/tmp/GetNeighborsBenchmark.XXXXXX"); nebula::storage::setUp(rootPath.path(), FLAGS_max_rank); - folly::runBenchmarks(); + if (FLAGS_go_record) { + go(1000000, {"Tim Duncan"}, {"name"}, {"teamName"}); + } else if (FLAGS_kv_record) { + prefix(1000000, {"Tim Duncan"}, {"name"}, {"teamName"}); + } else { + folly::runBenchmarks(); + } gCluster.reset(); return 0; } /* -Debug: No concurrency - ---max_rank=100 --filter_ratio=0.1 -============================================================================ -GetNeighborsBenchmark.cpprelative time/iter iters/s -============================================================================ -OneVertexOneProperty 660.59us 1.51K -OneVertexOnePropertyWithFilter 128.11% 515.66us 1.94K -OneVertexThreeProperty 879.71us 1.14K -OneVertexThreePropertyWithFilter 162.34% 541.89us 1.85K -TenVertexOneProperty 5.79ms 172.80 -TenVertexOnePropertyWithFilter 138.86% 4.17ms 239.96 -TenVertexThreeProperty 7.88ms 126.87 -TenVertexThreePropertyWithFilter 178.17% 4.42ms 226.04 -============================================================================ +40 processors, Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz. +release --max_rank=1000 --filter_ratio=0.1 ============================================================================ -GetNeighborsBenchmark.cpprelative time/iter iters/s -============================================================================ -OneVertexOneProperty 5.39ms 185.55 -OneVertexOnePropertyWithFilter 142.80% 3.77ms 264.98 -OneVertexThreeProperty 7.39ms 135.31 -OneVertexThreePropertyWithFilter 184.83% 4.00ms 250.10 -TenVertexOneProperty 52.48ms 19.06 -TenVertexOnePropertyWithFilter 142.63% 36.79ms 27.18 -TenVertexThreeProperty 72.96ms 13.71 -TenVertexThreePropertyWithFilter 186.05% 39.22ms 25.50 -============================================================================ - ---max_rank=10000 --filter_ratio=0.1 -============================================================================ -GetNeighborsBenchmark.cpprelative time/iter iters/s +/home/doodle.wang/Git/nebula-storage/src/storage/test/GetNeighborsBenchmark.cpprelative time/iter iters/s ============================================================================ -OneVertexOneProperty 58.37ms 17.13 -OneVertexOnePropertyWithFilter 142.90% 40.85ms 24.48 -OneVertexThreeProperty 81.73ms 12.24 -OneVertexThreePropertyWithFilter 187.62% 43.56ms 22.96 -TenVertexOneProperty 587.09ms 1.70 -TenVertexOnePropertyWithFilter 142.45% 412.15ms 2.43 -TenVertexThreeProperty 833.53ms 1.20 -TenVertexThreePropertyWithFilter 190.63% 437.25ms 2.29 -============================================================================ - ---max_rank=100 --filter_ratio=0.5 -============================================================================ -GetNeighborsBenchmark.cpprelative time/iter iters/s -============================================================================ -OneVertexOneProperty 661.60us 1.51K -OneVertexOnePropertyWithFilter 96.12% 688.34us 1.45K -OneVertexThreeProperty 879.63us 1.14K -OneVertexThreePropertyWithFilter 110.58% 795.49us 1.26K -TenVertexOneProperty 5.82ms 171.69 -TenVertexOnePropertyWithFilter 98.96% 5.89ms 169.91 -TenVertexThreeProperty 7.92ms 126.26 -TenVertexThreePropertyWithFilter 114.11% 6.94ms 144.08 +OneVertexOneProperty 533.81us 1.87K +OneVertexOnePropertyWithFilter 111.64% 478.15us 2.09K +OneVertexOnePropertyOnlyEdgeNode 108.02% 494.18us 2.02K +OneVertexOneProperyOnlyKV 109.63% 486.93us 2.05K +---------------------------------------------------------------------------- +TenVertexOneProperty 5.33ms 187.70 +TenVertexOnePropertyWithFilter 112.74% 4.73ms 211.62 +TenVertexOnePropertyOnlyEdgeNode 105.42% 5.05ms 197.88 +TenVertexOneProperyOnlyKV 107.75% 4.94ms 202.25 ============================================================================ --max_rank=1000 --filter_ratio=0.5 ============================================================================ -GetNeighborsBenchmark.cpprelative time/iter iters/s -============================================================================ -OneVertexOneProperty 5.38ms 185.88 -OneVertexOnePropertyWithFilter 99.68% 5.40ms 185.30 -OneVertexThreeProperty 7.47ms 133.79 -OneVertexThreePropertyWithFilter 114.63% 6.52ms 153.36 -TenVertexOneProperty 52.43ms 19.07 -TenVertexOnePropertyWithFilter 98.43% 53.27ms 18.77 -TenVertexThreeProperty 74.08ms 13.50 -TenVertexThreePropertyWithFilter 114.83% 64.51ms 15.50 -============================================================================ - ---max_rank=10000 --filter_ratio=0.5 -============================================================================ -GetNeighborsBenchmark.cpprelative time/iter iters/s +/home/doodle.wang/Git/nebula-storage/src/storage/test/GetNeighborsBenchmark.cpprelative time/iter iters/s ============================================================================ -OneVertexOneProperty 58.58ms 17.07 -OneVertexOnePropertyWithFilter 103.72% 56.48ms 17.71 -OneVertexThreeProperty 80.95ms 12.35 -OneVertexThreePropertyWithFilter 119.57% 67.70ms 14.77 -TenVertexOneProperty 589.36ms 1.70 -TenVertexOnePropertyWithFilter 102.98% 572.28ms 1.75 -TenVertexThreeProperty 810.38ms 1.23 -TenVertexThreePropertyWithFilter 119.14% 680.17ms 1.47 -============================================================================ - ---max_rank=100 --filter_ratio=1 -============================================================================ -GetNeighborsBenchmark.cpprelative time/iter iters/s -============================================================================ -OneVertexOneProperty 657.72us 1.52K -OneVertexOnePropertyWithFilter 75.40% 872.30us 1.15K -OneVertexThreeProperty 869.05us 1.15K -OneVertexThreePropertyWithFilter 79.84% 1.09ms 918.68 -TenVertexOneProperty 5.78ms 172.95 -TenVertexOnePropertyWithFilter 74.37% 7.77ms 128.63 -TenVertexThreeProperty 7.85ms 127.45 -TenVertexThreePropertyWithFilter 79.25% 9.90ms 101.00 +OneVertexOneProperty 529.59us 1.89K +OneVertexOnePropertyWithFilter 81.76% 647.75us 1.54K +OneVertexOnePropertyOnlyEdgeNode 107.54% 492.47us 2.03K +OneVertexOneProperyOnlyKV 108.38% 488.65us 2.05K +---------------------------------------------------------------------------- +TenVertexOneProperty 5.30ms 188.54 +TenVertexOnePropertyWithFilter 81.95% 6.47ms 154.50 +TenVertexOnePropertyOnlyEdgeNode 106.09% 5.00ms 200.02 +TenVertexOneProperyOnlyKV 108.26% 4.90ms 204.12 ============================================================================ --max_rank=1000 --filter_ratio=1 ============================================================================ -GetNeighborsBenchmark.cpprelative time/iter iters/s -============================================================================ -OneVertexOneProperty 4.75ms 210.74 -OneVertexOnePropertyWithFilter 73.43% 6.46ms 154.74 -OneVertexThreeProperty 6.64ms 150.62 -OneVertexThreePropertyWithFilter 78.81% 8.42ms 118.70 -TenVertexOneProperty 47.59ms 21.01 -TenVertexOnePropertyWithFilter 72.98% 65.21ms 15.33 -TenVertexThreeProperty 66.23ms 15.10 -TenVertexThreePropertyWithFilter 78.48% 84.39ms 11.85 -============================================================================ - ---max_rank=10000 --filter_ratio=1 -============================================================================ -GetNeighborsBenchmark.cpprelative time/iter iters/s +/home/doodle.wang/Git/nebula-storage/src/storage/test/GetNeighborsBenchmark.cpprelative time/iter iters/s ============================================================================ -OneVertexOneProperty 51.88ms 19.27 -OneVertexOnePropertyWithFilter 76.75% 67.60ms 14.79 -OneVertexThreeProperty 71.34ms 14.02 -OneVertexThreePropertyWithFilter 81.37% 87.67ms 11.41 -TenVertexOneProperty 519.31ms 1.93 -TenVertexOnePropertyWithFilter 75.84% 684.75ms 1.46 -TenVertexThreeProperty 714.19ms 1.40 -TenVertexThreePropertyWithFilter 80.26% 889.86ms 1.12 +OneVertexOneProperty 522.41us 1.91K +OneVertexOnePropertyWithFilter 62.76% 832.45us 1.20K +OneVertexOnePropertyOnlyEdgeNode 108.25% 482.58us 2.07K +OneVertexOneProperyOnlyKV 107.87% 484.31us 2.06K +---------------------------------------------------------------------------- +TenVertexOneProperty 5.30ms 188.83 +TenVertexOnePropertyWithFilter 69.70% 7.60ms 131.62 +TenVertexOnePropertyOnlyEdgeNode 119.34% 4.44ms 225.34 +TenVertexOneProperyOnlyKV 120.89% 4.38ms 228.27 ============================================================================ */ From 0fb72e9b66e169646ad7a10e6443ee17b91346eb Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Mon, 3 Aug 2020 14:15:10 +0800 Subject: [PATCH 2/8] only retrieve srcId/type/rank/dstId in collectEdgeProps if needed --- src/storage/exec/AggregateNode.h | 13 ++++--------- src/storage/exec/GetNeighborsNode.h | 7 ++----- src/storage/exec/GetPropNode.h | 4 ++-- src/storage/exec/QueryUtils.h | 10 ++++++---- src/storage/exec/RelNode.h | 9 ++------- src/storage/test/ScanEdgePropBenchmark.cpp | 12 ++++-------- 6 files changed, 20 insertions(+), 35 deletions(-) diff --git a/src/storage/exec/AggregateNode.h b/src/storage/exec/AggregateNode.h index e9eb5eafc..9b84d31ab 100644 --- a/src/storage/exec/AggregateNode.h +++ b/src/storage/exec/AggregateNode.h @@ -57,8 +57,7 @@ class AggregateNode : public IterateNode { void next() override { if (!stats_.empty()) { // we need to collect the stat during `next` - collectEdgeStats(srcId(), edgeType(), edgeRank(), dstId(), - this->reader(), planContext_->props_); + collectEdgeStats(this->key(), this->reader(), planContext_->props_); } IterateNode::next(); } @@ -120,18 +119,14 @@ class AggregateNode : public IterateNode { } } - kvstore::ResultCode collectEdgeStats(VertexIDSlice srcId, - EdgeType edgeType, - EdgeRanking edgeRank, - VertexIDSlice dstId, + kvstore::ResultCode collectEdgeStats(folly::StringPiece key, RowReader* reader, const std::vector* props) { for (const auto& prop : *props) { if (prop.hasStat_) { for (const auto statIndex : prop.statIndex_) { - VLOG(2) << "Collect stat prop " << prop.name_ << ", type " << edgeType; - auto value = QueryUtils::readEdgeProp(srcId, edgeType, edgeRank, dstId, - reader, prop); + VLOG(2) << "Collect stat prop " << prop.name_; + auto value = QueryUtils::readEdgeProp(key, planContext_->vIdLen_, reader, prop); if (!value.ok()) { return kvstore::ResultCode::ERR_EDGE_PROP_NOT_FOUND; } diff --git a/src/storage/exec/GetNeighborsNode.h b/src/storage/exec/GetNeighborsNode.h index 5fdbef687..790ea63b9 100644 --- a/src/storage/exec/GetNeighborsNode.h +++ b/src/storage/exec/GetNeighborsNode.h @@ -87,13 +87,11 @@ class GetNeighborsNode : public QueryNode { } auto key = aggregateNode_->key(); auto reader = aggregateNode_->reader(); - auto edgeType = planContext_->edgeType_; auto props = planContext_->props_; auto columnIdx = planContext_->columnIdx_; // collect props need to return - auto ret = collectEdgeProps(edgeType, - reader, + auto ret = collectEdgeProps(reader, key, planContext_->vIdLen_, props, @@ -176,8 +174,7 @@ class GetNeighborsSampleNode : public GetNeighborsNode { continue; } - auto ret = collectEdgeProps(edgeType, - reader.get(), + auto ret = collectEdgeProps(reader.get(), std::get<2>(sample), planContext_->vIdLen_, std::get<3>(sample), diff --git a/src/storage/exec/GetPropNode.h b/src/storage/exec/GetPropNode.h index 2bcfedb7a..70286d60c 100644 --- a/src/storage/exec/GetPropNode.h +++ b/src/storage/exec/GetPropNode.h @@ -103,9 +103,9 @@ class GetEdgePropNode : public QueryNode { RowReader* reader, const std::vector* props) -> kvstore::ResultCode { + UNUSED(edgeType); nebula::List list; - auto code = collectEdgeProps(edgeType, - reader, + auto code = collectEdgeProps(reader, key, vIdLen_, props, diff --git a/src/storage/exec/QueryUtils.h b/src/storage/exec/QueryUtils.h index ecaf23dac..f4068039b 100644 --- a/src/storage/exec/QueryUtils.h +++ b/src/storage/exec/QueryUtils.h @@ -57,10 +57,8 @@ class QueryUtils final { } - static StatusOr readEdgeProp(VertexIDSlice srcId, - EdgeType edgeType, - EdgeRanking edgeRank, - VertexIDSlice dstId, + static StatusOr readEdgeProp(folly::StringPiece key, + size_t vIdLen, RowReader* reader, const PropContext& prop) { switch (prop.propInKeyType_) { @@ -69,15 +67,19 @@ class QueryUtils final { return readValue(reader, prop.name_, prop.field_); } case PropContext::PropInKeyType::SRC: { + auto srcId = NebulaKeyUtils::getSrcId(vIdLen, key); return srcId.subpiece(0, srcId.find_first_of('\0')); } case PropContext::PropInKeyType::TYPE: { + auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen, key); return edgeType; } case PropContext::PropInKeyType::RANK: { + auto edgeRank = NebulaKeyUtils::getRank(vIdLen, key); return edgeRank; } case PropContext::PropInKeyType::DST: { + auto dstId = NebulaKeyUtils::getDstId(vIdLen, key); return dstId.subpiece(0, dstId.find_first_of('\0')); } } diff --git a/src/storage/exec/RelNode.h b/src/storage/exec/RelNode.h index b5b85b18b..02216c132 100644 --- a/src/storage/exec/RelNode.h +++ b/src/storage/exec/RelNode.h @@ -80,7 +80,6 @@ class QueryNode : public RelNode { protected: // if yields is not empty, will eval the yield expression, otherwize, collect property kvstore::ResultCode collectEdgeProps( - EdgeType edgeType, RowReader* reader, folly::StringPiece key, size_t vIdLen, @@ -88,12 +87,8 @@ class QueryNode : public RelNode { nebula::List& list) { for (const auto& prop : *props) { if (prop.returned_) { - auto srcId = NebulaKeyUtils::getSrcId(vIdLen, key); - auto edgeRank = NebulaKeyUtils::getRank(vIdLen, key); - auto dstId = NebulaKeyUtils::getDstId(vIdLen, key); - VLOG(2) << "Collect prop " << prop.name_ << ", type " << edgeType; - auto value = QueryUtils::readEdgeProp(srcId, edgeType, edgeRank, dstId, - reader, prop); + VLOG(2) << "Collect prop " << prop.name_; + auto value = QueryUtils::readEdgeProp(key, vIdLen, reader, prop); if (!value.ok()) { return kvstore::ResultCode::ERR_EDGE_PROP_NOT_FOUND; } diff --git a/src/storage/test/ScanEdgePropBenchmark.cpp b/src/storage/test/ScanEdgePropBenchmark.cpp index bb5ebe64e..24ccb5434 100644 --- a/src/storage/test/ScanEdgePropBenchmark.cpp +++ b/src/storage/test/ScanEdgePropBenchmark.cpp @@ -136,8 +136,7 @@ TEST_P(ScanEdgePropBench, ProcessEdgeProps) { ASSERT_TRUE(schema != nullptr); auto wrapper = std::make_unique(); ASSERT_TRUE(wrapper->reset(schema.get(), val, readerVer)); - auto code = node.collectEdgeProps(edgeType, wrapper.get(), - key, vIdLen, &props, list); + auto code = node.collectEdgeProps(wrapper.get(), key, vIdLen, &props, list); ASSERT_EQ(kvstore::ResultCode::SUCCEEDED, code); result.mutableList().values.emplace_back(std::move(list)); } @@ -163,8 +162,7 @@ TEST_P(ScanEdgePropBench, ProcessEdgeProps) { reader = RowReader::getEdgePropReader(env->schemaMan_, spaceId, std::abs(edgeType), val); ASSERT_TRUE(reader.get() != nullptr); - auto code = node.collectEdgeProps(edgeType, reader.get(), - key, vIdLen, &props, list); + auto code = node.collectEdgeProps(reader.get(), key, vIdLen, &props, list); ASSERT_EQ(kvstore::ResultCode::SUCCEEDED, code); result.mutableList().values.emplace_back(std::move(list)); } @@ -195,8 +193,7 @@ TEST_P(ScanEdgePropBench, ProcessEdgeProps) { ASSERT_TRUE(reader->resetEdgePropReader(env->schemaMan_, spaceId, std::abs(edgeType), val)); } - auto code = node.collectEdgeProps(edgeType, reader.get(), - key, vIdLen, &props, list); + auto code = node.collectEdgeProps(reader.get(), key, vIdLen, &props, list); ASSERT_EQ(kvstore::ResultCode::SUCCEEDED, code); result.mutableList().values.emplace_back(std::move(list)); } @@ -234,8 +231,7 @@ TEST_P(ScanEdgePropBench, ProcessEdgeProps) { } else { ASSERT_TRUE(reader->reset(schemas, val)); } - auto code = node.collectEdgeProps(edgeType, reader.get(), - key, vIdLen, &props, list); + auto code = node.collectEdgeProps(reader.get(), key, vIdLen, &props, list); ASSERT_EQ(kvstore::ResultCode::SUCCEEDED, code); result.mutableList().values.emplace_back(std::move(list)); } From 729583ec2c9bf5ac99ab2cbdbcb23155e5175c60 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Mon, 3 Aug 2020 14:42:32 +0800 Subject: [PATCH 3/8] remove filter/agg --- src/storage/exec/AggregateNode.h | 15 +++------ src/storage/exec/GetNeighborsNode.h | 37 +++++++++++---------- src/storage/query/GetNeighborsProcessor.cpp | 37 +++++++++++++-------- 3 files changed, 48 insertions(+), 41 deletions(-) diff --git a/src/storage/exec/AggregateNode.h b/src/storage/exec/AggregateNode.h index 9b84d31ab..67a065f24 100644 --- a/src/storage/exec/AggregateNode.h +++ b/src/storage/exec/AggregateNode.h @@ -47,25 +47,18 @@ class AggregateNode : public IterateNode { return ret; } - if (edgeContext_->statCount_ > 0) { - initStatValue(edgeContext_); - } - this->result_ = NullType::__NULL__; + CHECK_GT(edgeContext_->statCount_, 0); + initStatValue(edgeContext_); return kvstore::ResultCode::SUCCEEDED; } void next() override { - if (!stats_.empty()) { - // we need to collect the stat during `next` - collectEdgeStats(this->key(), this->reader(), planContext_->props_); - } + // we need to collect the stat during `next` + collectEdgeStats(this->key(), this->reader(), planContext_->props_); IterateNode::next(); } void calculateStat() { - if (stats_.empty()) { - return; - } nebula::List result; result.values.reserve(stats_.size()); for (const auto& stat : stats_) { diff --git a/src/storage/exec/GetNeighborsNode.h b/src/storage/exec/GetNeighborsNode.h index 790ea63b9..e4d01eca7 100644 --- a/src/storage/exec/GetNeighborsNode.h +++ b/src/storage/exec/GetNeighborsNode.h @@ -23,14 +23,14 @@ class GetNeighborsNode : public QueryNode { public: GetNeighborsNode(PlanContext* planCtx, - HashJoinNode* hashJoinNode, - AggregateNode* aggregateNode, + IterateNode* hashJoinNode, + IterateNode* upstream, EdgeContext* edgeContext, nebula::DataSet* resultDataSet, int64_t limit = 0) : planContext_(planCtx) , hashJoinNode_(hashJoinNode) - , aggregateNode_(aggregateNode) + , upstream_(upstream) , edgeContext_(edgeContext) , resultDataSet_(resultDataSet) , limit_(limit) {} @@ -65,10 +65,12 @@ class GetNeighborsNode : public QueryNode { return ret; } - aggregateNode_->calculateStat(); - if (aggregateNode_->result().type() == Value::Type::LIST) { + if (edgeContext_->statCount_ > 0) { + auto agg = dynamic_cast*>(upstream_); + CHECK_NOTNULL(agg); + agg->calculateStat(); // set stat list to second columns - row[1].setList(aggregateNode_->mutableResult().moveList()); + row[1].setList(agg->mutableResult().moveList()); } resultDataSet_->rows.emplace_back(std::move(row)); @@ -81,15 +83,16 @@ class GetNeighborsNode : public QueryNode { virtual kvstore::ResultCode iterateEdges(std::vector& row) { int64_t edgeRowCount = 0; nebula::List list; - for (; aggregateNode_->valid(); aggregateNode_->next(), ++edgeRowCount) { + for (; upstream_->valid(); upstream_->next(), ++edgeRowCount) { if (limit_ > 0 && edgeRowCount >= limit_) { return kvstore::ResultCode::SUCCEEDED; } - auto key = aggregateNode_->key(); - auto reader = aggregateNode_->reader(); + auto key = upstream_->key(); + auto reader = upstream_->reader(); auto props = planContext_->props_; auto columnIdx = planContext_->columnIdx_; + list.reserve(props->size()); // collect props need to return auto ret = collectEdgeProps(reader, key, @@ -111,8 +114,8 @@ class GetNeighborsNode : public QueryNode { } PlanContext* planContext_; - HashJoinNode* hashJoinNode_; - AggregateNode* aggregateNode_; + IterateNode* hashJoinNode_; + IterateNode* upstream_; EdgeContext* edgeContext_; nebula::DataSet* resultDataSet_; int64_t limit_; @@ -121,12 +124,12 @@ class GetNeighborsNode : public QueryNode { class GetNeighborsSampleNode : public GetNeighborsNode { public: GetNeighborsSampleNode(PlanContext* planCtx, - HashJoinNode* hashJoinNode, - AggregateNode* aggregateNode, + IterateNode* hashJoinNode, + IterateNode* upstream, EdgeContext* edgeContext, nebula::DataSet* resultDataSet, int64_t limit) - : GetNeighborsNode(planCtx, hashJoinNode, aggregateNode, edgeContext, resultDataSet, limit) + : GetNeighborsNode(planCtx, hashJoinNode, upstream, edgeContext, resultDataSet, limit) , sampler_(std::make_unique>(limit_)) {} private: @@ -139,9 +142,9 @@ class GetNeighborsSampleNode : public GetNeighborsNode { kvstore::ResultCode iterateEdges(std::vector& row) override { int64_t edgeRowCount = 0; nebula::List list; - for (; aggregateNode_->valid(); aggregateNode_->next(), ++edgeRowCount) { - auto val = aggregateNode_->val(); - auto key = aggregateNode_->key(); + for (; upstream_->valid(); upstream_->next(), ++edgeRowCount) { + auto val = upstream_->val(); + auto key = upstream_->key(); auto edgeType = planContext_->edgeType_; auto props = planContext_->props_; auto columnIdx = planContext_->columnIdx_; diff --git a/src/storage/query/GetNeighborsProcessor.cpp b/src/storage/query/GetNeighborsProcessor.cpp index bcd9f0912..4e7583594 100644 --- a/src/storage/query/GetNeighborsProcessor.cpp +++ b/src/storage/query/GetNeighborsProcessor.cpp @@ -117,26 +117,37 @@ StoragePlan GetNeighborsProcessor::buildPlan(nebula::DataSet* result, for (auto* edge : edges) { hashJoin->addDependency(edge); } - auto filter = std::make_unique>( - planContext_.get(), hashJoin.get(), expCtx_.get(), filter_.get()); - filter->addDependency(hashJoin.get()); - auto agg = std::make_unique>( - planContext_.get(), filter.get(), &edgeContext_); - agg->addDependency(filter.get()); + IterateNode* join = hashJoin.get(); + IterateNode* upstream = hashJoin.get(); + plan.addNode(std::move(hashJoin)); + + if (filter_) { + auto filter = std::make_unique>( + planContext_.get(), upstream, expCtx_.get(), filter_.get()); + filter->addDependency(upstream); + upstream = filter.get(); + plan.addNode(std::move(filter)); + } + + if (edgeContext_.statCount_ > 0) { + auto agg = std::make_unique>( + planContext_.get(), upstream, &edgeContext_); + agg->addDependency(upstream); + upstream = agg.get(); + plan.addNode(std::move(agg)); + } + std::unique_ptr output; if (random) { output = std::make_unique( - planContext_.get(), hashJoin.get(), agg.get(), &edgeContext_, result, limit); + planContext_.get(), join, upstream, &edgeContext_, result, limit); } else { output = std::make_unique( - planContext_.get(), hashJoin.get(), agg.get(), &edgeContext_, result, limit); + planContext_.get(), join, upstream, &edgeContext_, result, limit); } - output->addDependency(agg.get()); - - plan.addNode(std::move(hashJoin)); - plan.addNode(std::move(filter)); - plan.addNode(std::move(agg)); + output->addDependency(upstream); plan.addNode(std::move(output)); + return plan; } From 153deeab0b06dbee494a27a5376e2e8c75a96eb5 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Mon, 3 Aug 2020 17:22:14 +0800 Subject: [PATCH 4/8] add FLAGS_enable_multi_versions --- resources/gflags.json | 1 - src/storage/StorageFlags.cpp | 5 ++--- src/storage/StorageFlags.h | 4 +--- src/storage/exec/StorageIterator.h | 22 +++++++++++---------- src/storage/exec/UpdateNode.h | 5 +++-- src/storage/mutate/AddEdgesProcessor.cpp | 4 ++-- src/storage/mutate/AddVerticesProcessor.cpp | 4 ++-- src/storage/query/QueryBaseProcessor.inl | 2 -- src/storage/test/AddEdgesTest.cpp | 4 ++-- src/storage/test/AddVerticesTest.cpp | 4 ++-- src/storage/test/DeleteEdgesTest.cpp | 4 ++-- src/storage/test/DeleteVerticesTest.cpp | 4 ++-- src/storage/test/GetNeighborsTest.cpp | 2 ++ src/storage/test/IndexWriteTest.cpp | 2 ++ 14 files changed, 34 insertions(+), 33 deletions(-) diff --git a/resources/gflags.json b/resources/gflags.json index 2e1280647..b46338736 100644 --- a/resources/gflags.json +++ b/resources/gflags.json @@ -1,6 +1,5 @@ { "MUTABLE": [ - "max_edge_returned_per_vertex", "minloglevel", "v", "heartbeat_interval_secs", diff --git a/src/storage/StorageFlags.cpp b/src/storage/StorageFlags.cpp index 577f314d2..216181796 100644 --- a/src/storage/StorageFlags.cpp +++ b/src/storage/StorageFlags.cpp @@ -26,6 +26,5 @@ DEFINE_bool(enable_vertex_cache, true, "Enable vertex cache"); DEFINE_int32(reader_handlers, 32, "Total reader handlers"); -DEFINE_int32(max_edge_returned_per_vertex, INT_MAX, "Max edge number returnred searching vertex"); - -DEFINE_bool(enable_reservoir_sampling, false, "Will do reservoir sampling if set true."); +DEFINE_bool(enable_multi_versions, false, "If true, the insert timestamp will be the wall clock. " + "If false, always has the same timestamp of max"); diff --git a/src/storage/StorageFlags.h b/src/storage/StorageFlags.h index e986bd139..f134838b5 100644 --- a/src/storage/StorageFlags.h +++ b/src/storage/StorageFlags.h @@ -29,8 +29,6 @@ DECLARE_bool(enable_vertex_cache); DECLARE_int32(reader_handlers); -DECLARE_int32(max_edge_returned_per_vertex); - -DECLARE_bool(enable_reservoir_sampling); +DECLARE_bool(enable_multi_versions); #endif // STORAGE_STORAGEFLAGS_H_ diff --git a/src/storage/exec/StorageIterator.h b/src/storage/exec/StorageIterator.h index b59acb7a7..4d08af625 100644 --- a/src/storage/exec/StorageIterator.h +++ b/src/storage/exec/StorageIterator.h @@ -10,6 +10,7 @@ #include "common/base/Base.h" #include "kvstore/KVIterator.h" #include "storage/CommonUtils.h" +#include "storage/StorageFlags.h" namespace nebula { namespace storage { @@ -176,12 +177,17 @@ class SingleEdgeIterator : public StorageIterator { // return true when the value iter to a valid edge value bool check() { reader_.reset(); - auto key = iter_->key(); - auto rank = NebulaKeyUtils::getRank(planContext_->vIdLen_, key); - auto dstId = NebulaKeyUtils::getDstId(planContext_->vIdLen_, key); - if (!firstLoop_ && rank == lastRank_ && lastDstId_ == dstId) { - // pass old version data of same edge - return false; + if (FLAGS_enable_multi_versions) { + auto key = iter_->key(); + auto rank = NebulaKeyUtils::getRank(planContext_->vIdLen_, key); + auto dstId = NebulaKeyUtils::getDstId(planContext_->vIdLen_, key); + if (!firstLoop_ && rank == lastRank_ && lastDstId_ == dstId) { + // pass old version data of same edge + return false; + } + firstLoop_ = false; + lastRank_ = rank; + lastDstId_ = dstId.str(); } auto val = iter_->val(); @@ -196,10 +202,6 @@ class SingleEdgeIterator : public StorageIterator { return false; } - firstLoop_ = false; - lastRank_ = rank; - lastDstId_ = dstId.str(); - if (ttl_->hasValue()) { auto ttlValue = ttl_->value(); if (CommonUtils::checkDataExpiredForTTL(schemas_->back().get(), reader_.get(), diff --git a/src/storage/exec/UpdateNode.h b/src/storage/exec/UpdateNode.h index d18c32b3e..166f43d7a 100644 --- a/src/storage/exec/UpdateNode.h +++ b/src/storage/exec/UpdateNode.h @@ -13,6 +13,7 @@ #include "storage/exec/TagNode.h" #include "storage/exec/FilterNode.h" #include "kvstore/LogEncoder.h" +#include "storage/StorageFlags.h" namespace nebula { namespace storage { @@ -147,8 +148,8 @@ class UpdateTagNode : public RelNode { } // build key, value is emtpy - auto version = - std::numeric_limits::max() - time::WallClock::fastNowInMicroSec(); + auto version = FLAGS_enable_multi_versions ? + std::numeric_limits::max() - time::WallClock::fastNowInMicroSec() : 0L; // Switch version to big-endian, make sure the key is in ordered. version = folly::Endian::big(version); key_ = NebulaKeyUtils::vertexKey(planContext_->vIdLen_, diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index 939f87cea..a300df0b8 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -15,8 +15,8 @@ namespace nebula { namespace storage { void AddEdgesProcessor::process(const cpp2::AddEdgesRequest& req) { - auto version = - std::numeric_limits::max() - time::WallClock::fastNowInMicroSec(); + auto version = FLAGS_enable_multi_versions ? + std::numeric_limits::max() - time::WallClock::fastNowInMicroSec() : 0L; // Switch version to big-endian, make sure the key is in ordered. version = folly::Endian::big(version); diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index 54cdc357f..a784406cf 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -18,8 +18,8 @@ namespace nebula { namespace storage { void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) { - auto version = - std::numeric_limits::max() - time::WallClock::fastNowInMicroSec(); + auto version = FLAGS_enable_multi_versions ? + std::numeric_limits::max() - time::WallClock::fastNowInMicroSec() : 0L; // Switch version to big-endian, make sure the key is in ordered. version = folly::Endian::big(version); diff --git a/src/storage/query/QueryBaseProcessor.inl b/src/storage/query/QueryBaseProcessor.inl index 94c290c88..f8d2eed0c 100644 --- a/src/storage/query/QueryBaseProcessor.inl +++ b/src/storage/query/QueryBaseProcessor.inl @@ -6,9 +6,7 @@ DECLARE_int32(max_handlers_per_req); DECLARE_int32(min_vertices_per_bucket); -DECLARE_int32(max_edge_returned_per_vertex); DECLARE_bool(enable_vertex_cache); -DECLARE_bool(enable_reservoir_sampling); namespace nebula { namespace storage { diff --git a/src/storage/test/AddEdgesTest.cpp b/src/storage/test/AddEdgesTest.cpp index 85eb68d47..9ef30b4d5 100644 --- a/src/storage/test/AddEdgesTest.cpp +++ b/src/storage/test/AddEdgesTest.cpp @@ -64,6 +64,7 @@ TEST(AddEdgesTest, SpecifyPropertyNameTest) { } TEST(AddEdgesTest, MultiVersionTest) { + FLAGS_enable_multi_versions = true; fs::TempDir rootPath("/tmp/AddEdgesTest.XXXXXX"); mock::MockCluster cluster; cluster.initStorageKV(rootPath.path()); @@ -91,8 +92,7 @@ TEST(AddEdgesTest, MultiVersionTest) { } LOG(INFO) << "Check data in kv store..."; - // The number of data in serve is 668 - checkAddEdgesData(req, env, 668, 2); + FLAGS_enable_multi_versions = false; } } // namespace storage diff --git a/src/storage/test/AddVerticesTest.cpp b/src/storage/test/AddVerticesTest.cpp index a849a8622..7ee22547b 100644 --- a/src/storage/test/AddVerticesTest.cpp +++ b/src/storage/test/AddVerticesTest.cpp @@ -65,6 +65,7 @@ TEST(AddVerticesTest, SpecifyPropertyNameTest) { } TEST(AddVerticesTest, MultiVersionTest) { + FLAGS_enable_multi_versions = true; fs::TempDir rootPath("/tmp/AddVerticesTest.XXXXXX"); mock::MockCluster cluster; cluster.initStorageKV(rootPath.path()); @@ -92,8 +93,7 @@ TEST(AddVerticesTest, MultiVersionTest) { } LOG(INFO) << "Check data in kv store..."; - // The number of vertices is 162 - checkAddVerticesData(req, env, 162, 2); + FLAGS_enable_multi_versions = false; } } // namespace storage diff --git a/src/storage/test/DeleteEdgesTest.cpp b/src/storage/test/DeleteEdgesTest.cpp index 72fa3a2f3..98e6e7a2c 100644 --- a/src/storage/test/DeleteEdgesTest.cpp +++ b/src/storage/test/DeleteEdgesTest.cpp @@ -68,6 +68,7 @@ TEST(DeleteEdgesTest, SimpleTest) { } TEST(DeleteEdgesTest, MultiVersionTest) { + FLAGS_enable_multi_versions = true; fs::TempDir rootPath("/tmp/DeleteEdgesTest.XXXXXX"); mock::MockCluster cluster; cluster.initStorageKV(rootPath.path()); @@ -97,8 +98,6 @@ TEST(DeleteEdgesTest, MultiVersionTest) { } LOG(INFO) << "Check data in kv store..."; - // The number of data in serve is 668 - checkAddEdgesData(req, env, 668, 2); } // Delete edges @@ -122,6 +121,7 @@ TEST(DeleteEdgesTest, MultiVersionTest) { // All the added datas are deleted, the number of edge is 0 checkEdgesData(spaceVidLen, req.space_id, req.parts, env, 0); } + FLAGS_enable_multi_versions = false; } } // namespace storage diff --git a/src/storage/test/DeleteVerticesTest.cpp b/src/storage/test/DeleteVerticesTest.cpp index 6b56713df..e106ec7de 100644 --- a/src/storage/test/DeleteVerticesTest.cpp +++ b/src/storage/test/DeleteVerticesTest.cpp @@ -68,6 +68,7 @@ TEST(DeleteVerticesTest, SimpleTest) { } TEST(DeleteVerticesTest, MultiVersionTest) { + FLAGS_enable_multi_versions = true; fs::TempDir rootPath("/tmp/DeleteVertexTest.XXXXXX"); mock::MockCluster cluster; cluster.initStorageKV(rootPath.path()); @@ -98,8 +99,6 @@ TEST(DeleteVerticesTest, MultiVersionTest) { } LOG(INFO) << "Check data in kv store..."; - // The number of vertices is 162 - checkAddVerticesData(req, env, 162, 2); } // Delete vertices @@ -123,6 +122,7 @@ TEST(DeleteVerticesTest, MultiVersionTest) { // All the added datas are deleted, the number of vertices is 0 checkVerticesData(spaceVidLen, req.space_id, req.parts, env, 0); } + FLAGS_enable_multi_versions = false; } } // namespace storage diff --git a/src/storage/test/GetNeighborsTest.cpp b/src/storage/test/GetNeighborsTest.cpp index 114af16de..883b6ebe0 100644 --- a/src/storage/test/GetNeighborsTest.cpp +++ b/src/storage/test/GetNeighborsTest.cpp @@ -1106,6 +1106,7 @@ TEST(GetNeighborsTest, GoOverAllTest) { } TEST(GetNeighborsTest, MultiVersionTest) { + FLAGS_enable_multi_versions = true; fs::TempDir rootPath("/tmp/GetNeighborsTest.XXXXXX"); mock::MockCluster cluster; cluster.initStorageKV(rootPath.path()); @@ -1132,6 +1133,7 @@ TEST(GetNeighborsTest, MultiVersionTest) { // vId, stat, player, team, general tag, - teammate, - serve, + serve, + teammate, expr QueryTestUtils::checkResponse(resp.vertices, vertices, 1, 10); } + FLAGS_enable_multi_versions = false; } TEST(GetNeighborsTest, FilterTest) { diff --git a/src/storage/test/IndexWriteTest.cpp b/src/storage/test/IndexWriteTest.cpp index 9babd0c34..d5d5c8322 100644 --- a/src/storage/test/IndexWriteTest.cpp +++ b/src/storage/test/IndexWriteTest.cpp @@ -359,6 +359,7 @@ TEST(IndexTest, VerticesValueTest) { * Test the all indexes works well. **/ TEST(IndexTest, AlterTagIndexTest) { + FLAGS_enable_multi_versions = true; GraphSpaceID spaceId = 1; TagID tagId = 111; IndexID indexId1 = 222; @@ -495,6 +496,7 @@ TEST(IndexTest, AlterTagIndexTest) { EXPECT_EQ(1, retNum); } } + FLAGS_enable_multi_versions = false; } } // namespace storage } // namespace nebula From ba80bbfb2172d5d2b7345ddf62d9c6eb32676966 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Mon, 3 Aug 2020 18:04:32 +0800 Subject: [PATCH 5/8] improve has ttl --- src/storage/exec/StorageIterator.h | 55 +++++++++++++++++------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/src/storage/exec/StorageIterator.h b/src/storage/exec/StorageIterator.h index 4d08af625..eb0f113ea 100644 --- a/src/storage/exec/StorageIterator.h +++ b/src/storage/exec/StorageIterator.h @@ -41,9 +41,12 @@ class SingleTagIterator : public StorageIterator { : planContext_(planCtx) , iter_(std::move(iter)) , tagId_(tagId) - , schemas_(schemas) - , ttl_(ttl) { - lookupOne_ = true; + , schemas_(schemas) { + if (ttl->hasValue()) { + hasTtl_ = true; + ttlCol_ = ttl->value().first; + ttlDuration_ = ttl->value().second; + } check(iter_->val()); } @@ -52,9 +55,12 @@ class SingleTagIterator : public StorageIterator { const std::vector>* schemas, const folly::Optional>* ttl) : planContext_(planCtx) - , schemas_(schemas) - , ttl_(ttl) { - lookupOne_ = true; + , schemas_(schemas) { + if (ttl->hasValue()) { + hasTtl_ = true; + ttlCol_ = ttl->value().first; + ttlDuration_ = ttl->value().second; + } check(val); } @@ -87,13 +93,10 @@ class SingleTagIterator : public StorageIterator { return false; } - if (ttl_->hasValue()) { - auto ttlValue = ttl_->value(); - if (CommonUtils::checkDataExpiredForTTL(schemas_->back().get(), reader_.get(), - ttlValue.first, ttlValue.second)) { - reader_.reset(); - return false; - } + if (hasTtl_ && CommonUtils::checkDataExpiredForTTL(schemas_->back().get(), reader_.get(), + ttlCol_, ttlDuration_)) { + reader_.reset(); + return false; } return true; @@ -103,7 +106,9 @@ class SingleTagIterator : public StorageIterator { std::unique_ptr iter_; TagID tagId_; const std::vector> *schemas_ = nullptr; - const folly::Optional> *ttl_ = nullptr; + bool hasTtl_ = false; + std::string ttlCol_; + int64_t ttlDuration_; bool lookupOne_ = true; std::unique_ptr reader_; @@ -123,10 +128,13 @@ class SingleEdgeIterator : public StorageIterator { , iter_(std::move(iter)) , edgeType_(edgeType) , schemas_(schemas) - , ttl_(ttl) , moveToValidRecord_(moveToValidRecord) { CHECK(!!iter_); - lookupOne_ = true; + if (ttl->hasValue()) { + hasTtl_ = true; + ttlCol_ = ttl->value().first; + ttlDuration_ = ttl->value().second; + } // If moveToValidRecord is true, iterator will try to move to first valid record, // which is used in GetNeighbors. If it is false, it will only check the latest record, // which is used in GetProps and UpdateEdge. @@ -202,13 +210,10 @@ class SingleEdgeIterator : public StorageIterator { return false; } - if (ttl_->hasValue()) { - auto ttlValue = ttl_->value(); - if (CommonUtils::checkDataExpiredForTTL(schemas_->back().get(), reader_.get(), - ttlValue.first, ttlValue.second)) { - reader_.reset(); - return false; - } + if (hasTtl_ && CommonUtils::checkDataExpiredForTTL(schemas_->back().get(), reader_.get(), + ttlCol_, ttlDuration_)) { + reader_.reset(); + return false; } return true; @@ -218,7 +223,9 @@ class SingleEdgeIterator : public StorageIterator { std::unique_ptr iter_; EdgeType edgeType_; const std::vector> *schemas_ = nullptr; - const folly::Optional> *ttl_ = nullptr; + bool hasTtl_ = false; + std::string ttlCol_; + int64_t ttlDuration_; bool moveToValidRecord_{true}; bool lookupOne_ = true; From 7843042cffa1bcb8b99263d522aa59a3f56c8d8c Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Tue, 4 Aug 2020 10:08:59 +0800 Subject: [PATCH 6/8] refactor RowReaderWrapper --- src/codec/NebulaCodecImpl.cpp | 2 +- src/codec/RowReader.cpp | 173 ------------------ src/codec/RowReader.h | 50 ----- src/codec/RowReaderV1.cpp | 1 - src/codec/RowReaderV1.h | 4 - src/codec/RowReaderV2.h | 4 - src/codec/RowReaderWrapper.cpp | 115 ++++++++++++ src/codec/RowReaderWrapper.h | 79 +++++++- src/codec/test/NebulaCodecTest.cpp | 4 +- src/codec/test/RowReaderBenchmark.cpp | 15 +- src/codec/test/RowReaderV1Test.cpp | 10 +- src/codec/test/RowReaderV2Test.cpp | 9 +- src/codec/test/RowWriterV2Test.cpp | 18 +- src/storage/BaseProcessor.h | 2 +- src/storage/CompactionFilter.h | 9 +- .../admin/RebuildEdgeIndexProcessor.cpp | 8 +- .../admin/RebuildTagIndexProcessor.cpp | 8 +- src/storage/exec/EdgeNode.h | 1 - src/storage/exec/GetNeighborsNode.h | 17 +- src/storage/exec/StorageIterator.h | 13 +- src/storage/exec/UpdateNode.h | 25 +-- src/storage/index/IndexExecutor.inl | 18 +- src/storage/mutate/AddEdgesProcessor.cpp | 18 +- src/storage/mutate/AddVerticesProcessor.cpp | 18 +- src/storage/mutate/DeleteEdgesProcessor.cpp | 10 +- .../mutate/DeleteVerticesProcessor.cpp | 10 +- src/storage/query/ScanEdgeProcessor.cpp | 3 +- src/storage/query/ScanVertexProcessor.cpp | 2 +- src/storage/test/GetNeighborsBenchmark.cpp | 12 +- src/storage/test/IndexScanTest.cpp | 16 +- src/storage/test/QueryStatsTest.cpp | 2 +- src/storage/test/ScanEdgePropBenchmark.cpp | 23 +-- src/storage/test/ScanEdgeTest.cpp | 2 +- src/storage/test/ScanVertexTest.cpp | 2 +- src/storage/test/StorageLookupBenchmark.cpp | 8 +- src/storage/test/TestUtils.h | 5 +- src/storage/test/UpdateEdgeTest.cpp | 82 ++++----- src/storage/test/UpdateVertexTest.cpp | 16 +- src/tools/dbDump/DbDumper.cpp | 7 +- src/tools/dbDump/DbDumper.h | 2 +- .../storage-perf/StorageIntegrityTool.cpp | 2 +- 41 files changed, 382 insertions(+), 443 deletions(-) diff --git a/src/codec/NebulaCodecImpl.cpp b/src/codec/NebulaCodecImpl.cpp index 5d46dd490..dd11e925e 100644 --- a/src/codec/NebulaCodecImpl.cpp +++ b/src/codec/NebulaCodecImpl.cpp @@ -60,7 +60,7 @@ NebulaCodecImpl::decode(std::string encoded, folly::StringPiece piece; ResultType code; - auto reader = RowReader::getRowReader(encoded, schema); + auto reader = RowReaderWrapper::getRowReader(encoded, schema); std::unordered_map result; for (size_t index = 0; index < schema->getNumFields(); index++) { auto field = schema->getFieldName(index); diff --git a/src/codec/RowReader.cpp b/src/codec/RowReader.cpp index f327dfc54..53db0cbf1 100644 --- a/src/codec/RowReader.cpp +++ b/src/codec/RowReader.cpp @@ -25,19 +25,6 @@ Value RowReader::Cell::value() const noexcept { * class RowReader::Iterator * ********************************************/ -RowReader::Iterator::Iterator(Iterator&& iter) - : reader_(iter.reader_) - , cell_(std::move(iter.cell_)) - , index_(iter.index_) { -} - - -void RowReader::Iterator::operator=(Iterator&& rhs) { - reader_ = rhs.reader_; - cell_ = std::move(rhs.cell_); - index_ = rhs.index_; -} - bool RowReader::Iterator::operator==(const Iterator& rhs) const noexcept { return reader_ == rhs.reader_ && index_ == rhs.index_; @@ -68,166 +55,6 @@ RowReader::Iterator& RowReader::Iterator::operator++() { * ********************************************/ -// static -std::unique_ptr RowReader::getTagPropReader( - meta::SchemaManager* schemaMan, - GraphSpaceID space, - TagID tag, - folly::StringPiece row) { - auto reader = std::make_unique(); - if (reader->resetTagPropReader(schemaMan, space, tag, row)) { - return reader; - } - LOG(ERROR) << "Failed to initiate the reader, most likely the data" - "is corrupted. The data is [" - << toHexStr(row) - << "]"; - return std::unique_ptr(); -} - - -// static -std::unique_ptr RowReader::getEdgePropReader( - meta::SchemaManager* schemaMan, - GraphSpaceID space, - EdgeType edge, - folly::StringPiece row) { - auto reader = std::make_unique(); - if (reader->resetEdgePropReader(schemaMan, space, edge, row)) { - return reader; - } - LOG(ERROR) << "Failed to initiate the reader, most likely the data" - "is corrupted. The data is [" - << toHexStr(row) - << "]"; - return std::unique_ptr(); -} - -// static -std::unique_ptr RowReader::getRowReader( - const meta::SchemaProviderIf* schema, - folly::StringPiece row) { - auto reader = std::make_unique(); - SchemaVer schemaVer; - int32_t readerVer; - RowReaderWrapper::getVersions(row, schemaVer, readerVer); - if (schemaVer != schema->getVersion()) { - return std::unique_ptr(); - } - if (reader->reset(schema, row, readerVer)) { - return reader; - } else { - LOG(ERROR) << "Failed to initiate the reader, most likely the data" - "is corrupted. The data is [" - << toHexStr(row) - << "]"; - return std::unique_ptr(); - } -} - -// static -std::unique_ptr RowReader::getRowReader( - const std::vector>& schemas, - folly::StringPiece row) { - auto reader = std::make_unique(); - SchemaVer schemaVer; - int32_t readerVer; - RowReaderWrapper::getVersions(row, schemaVer, readerVer); - if (static_cast(schemaVer) >= schemas.size()) { - return std::unique_ptr(); - } - // the schema is stored from oldest to newest, so just use version as idx - if (schemaVer != schemas[schemaVer]->getVersion()) { - return std::unique_ptr(); - } - if (reader->reset(schemas[schemaVer].get(), row, readerVer)) { - return reader; - } else { - LOG(ERROR) << "Failed to initiate the reader, most likely the data" - "is corrupted. The data is [" - << toHexStr(row) - << "]"; - return std::unique_ptr(); - } -} - -bool RowReader::resetTagPropReader( - meta::SchemaManager* schemaMan, - GraphSpaceID space, - TagID tag, - folly::StringPiece row) { - if (schemaMan == nullptr) { - LOG(ERROR) << "schemaMan should not be nullptr!"; - return false; - } - SchemaVer schemaVer; - int32_t readerVer; - RowReaderWrapper::getVersions(row, schemaVer, readerVer); - if (schemaVer >= 0) { - auto schema = schemaMan->getTagSchema(space, tag, schemaVer); - if (schema == nullptr) { - return false; - } - return reset(schema.get(), row, readerVer); - } else { - LOG(WARNING) << "Invalid schema version in the row data!"; - return false; - } -} - -bool RowReader::resetEdgePropReader( - meta::SchemaManager* schemaMan, - GraphSpaceID space, - EdgeType edge, - folly::StringPiece row) { - if (schemaMan == nullptr) { - LOG(ERROR) << "schemaMan should not be nullptr!"; - return false; - } - SchemaVer schemaVer; - int32_t readerVer; - RowReaderWrapper::getVersions(row, schemaVer, readerVer); - if (schemaVer >= 0) { - auto schema = schemaMan->getEdgeSchema(space, edge, schemaVer); - if (schema == nullptr) { - return false; - } - return reset(schema.get(), row, readerVer); - } else { - LOG(WARNING) << "Invalid schema version in the row data!"; - return false; - } -} - -bool RowReader::reset(meta::SchemaProviderIf const* schema, - folly::StringPiece row) noexcept { - if (schema == nullptr) { - return false; - } - SchemaVer schemaVer; - int32_t readerVer; - RowReaderWrapper::getVersions(row, schemaVer, readerVer); - if (schemaVer != schema->getVersion()) { - return false; - } - return reset(schema, row, readerVer); -} - -bool RowReader::reset(const std::vector>& schemas, - folly::StringPiece row) noexcept { - SchemaVer schemaVer; - int32_t readerVer; - RowReaderWrapper::getVersions(row, schemaVer, readerVer); - if (static_cast(schemaVer) >= schemas.size()) { - return false; - } - // the schema is stored from oldest to newest, so just use version as idx - if (schemaVer != schemas[schemaVer]->getVersion()) { - return false; - } - return reset(schemas[schemaVer].get(), row, readerVer); -} - bool RowReader::resetImpl(meta::SchemaProviderIf const* schema, folly::StringPiece row) noexcept { schema_ = schema; diff --git a/src/codec/RowReader.h b/src/codec/RowReader.h index 7b12a4c57..c24344a22 100644 --- a/src/codec/RowReader.h +++ b/src/codec/RowReader.h @@ -38,10 +38,6 @@ class RowReader { friend class Cell; friend class RowReader; public: - Iterator(Iterator&& iter); - - void operator=(Iterator&& rhs); - const Cell& operator*() const noexcept; const Cell* operator->() const noexcept; @@ -70,48 +66,6 @@ class RowReader { public: - static std::unique_ptr getTagPropReader( - meta::SchemaManager* schemaMan, - GraphSpaceID space, - TagID tag, - folly::StringPiece row); - - static std::unique_ptr getEdgePropReader( - meta::SchemaManager* schemaMan, - GraphSpaceID space, - EdgeType edge, - folly::StringPiece row); - - static std::unique_ptr getRowReader( - meta::SchemaProviderIf const* schema, - folly::StringPiece row); - - // notice: the schemas are from oldest to newest, - // usually from getAllVerTagSchema or getAllVerEdgeSchema in SchemaMan - static std::unique_ptr getRowReader( - const std::vector>& schemas, - folly::StringPiece row); - - bool resetTagPropReader( - meta::SchemaManager* schemaMan, - GraphSpaceID space, - TagID tag, - folly::StringPiece row); - - bool resetEdgePropReader( - meta::SchemaManager* schemaMan, - GraphSpaceID space, - EdgeType edge, - folly::StringPiece row); - - bool reset(meta::SchemaProviderIf const* schema, - folly::StringPiece row) noexcept; - - // notice: the schemas are from oldest to newest, - // usually from getAllVerTagSchema or getAllVerEdgeSchema in SchemaMan - bool reset(const std::vector>& schemas, - folly::StringPiece row) noexcept; - virtual ~RowReader() = default; virtual Value getValueByName(const std::string& prop) const noexcept = 0; @@ -156,10 +110,6 @@ class RowReader { virtual bool resetImpl(meta::SchemaProviderIf const* schema, folly::StringPiece row) noexcept; - virtual bool reset(meta::SchemaProviderIf const* schema, - folly::StringPiece row, - int32_t readerVer) noexcept = 0; - private: Iterator endIter_; }; diff --git a/src/codec/RowReaderV1.cpp b/src/codec/RowReaderV1.cpp index 79454d1e1..5980a2382 100644 --- a/src/codec/RowReaderV1.cpp +++ b/src/codec/RowReaderV1.cpp @@ -46,7 +46,6 @@ bool RowReaderV1::resetImpl(meta::SchemaProviderIf const* schema, } } - bool RowReaderV1::processHeader(folly::StringPiece row) { const uint8_t* it = reinterpret_cast(row.begin()); if (reinterpret_cast(it) == row.end()) { diff --git a/src/codec/RowReaderV1.h b/src/codec/RowReaderV1.h index 22a43bea7..730525e41 100644 --- a/src/codec/RowReaderV1.h +++ b/src/codec/RowReaderV1.h @@ -38,10 +38,6 @@ class RowReaderV1 : public RowReader { return headerLen_; } - bool reset(meta::SchemaProviderIf const*, folly::StringPiece, int32_t) noexcept override { - LOG(FATAL) << "Not implemented"; - } - protected: bool resetImpl(meta::SchemaProviderIf const* schema, folly::StringPiece row) noexcept override; diff --git a/src/codec/RowReaderV2.h b/src/codec/RowReaderV2.h index f3c9acf8f..22dad4bb0 100644 --- a/src/codec/RowReaderV2.h +++ b/src/codec/RowReaderV2.h @@ -38,10 +38,6 @@ class RowReaderV2 : public RowReader { return headerLen_; } - bool reset(meta::SchemaProviderIf const*, folly::StringPiece, int32_t) noexcept override { - LOG(FATAL) << "Not implemented"; - } - protected: bool resetImpl(meta::SchemaProviderIf const* schema, folly::StringPiece row) noexcept override; diff --git a/src/codec/RowReaderWrapper.cpp b/src/codec/RowReaderWrapper.cpp index 5aa6fc12b..52d09b811 100644 --- a/src/codec/RowReaderWrapper.cpp +++ b/src/codec/RowReaderWrapper.cpp @@ -8,6 +8,92 @@ namespace nebula { +// static +RowReaderWrapper RowReaderWrapper::getTagPropReader(meta::SchemaManager* schemaMan, + GraphSpaceID space, + TagID tag, + folly::StringPiece row) { + SchemaVer schemaVer; + int32_t readerVer; + RowReaderWrapper::getVersions(row, schemaVer, readerVer); + if (schemaVer >= 0) { + auto schema = schemaMan->getTagSchema(space, tag, schemaVer); + if (schema == nullptr) { + return RowReaderWrapper(); + } + return RowReaderWrapper(schema.get(), row, readerVer); + } else { + LOG(WARNING) << "Invalid schema version in the row data!"; + return RowReaderWrapper(); + } +} + + +// static +RowReaderWrapper RowReaderWrapper::getEdgePropReader(meta::SchemaManager* schemaMan, + GraphSpaceID space, + EdgeType edge, + folly::StringPiece row) { + if (schemaMan == nullptr) { + LOG(ERROR) << "schemaMan should not be nullptr!"; + return RowReaderWrapper(); + } + SchemaVer schemaVer; + int32_t readerVer; + RowReaderWrapper::getVersions(row, schemaVer, readerVer); + if (schemaVer >= 0) { + auto schema = schemaMan->getEdgeSchema(space, edge, schemaVer); + if (schema == nullptr) { + return RowReaderWrapper(); + } + return RowReaderWrapper(schema.get(), row, readerVer); + } else { + LOG(WARNING) << "Invalid schema version in the row data!"; + return RowReaderWrapper(); + } +} + +// static +RowReaderWrapper RowReaderWrapper::getRowReader(const meta::SchemaProviderIf* schema, + folly::StringPiece row) { + SchemaVer schemaVer; + int32_t readerVer; + RowReaderWrapper::getVersions(row, schemaVer, readerVer); + if (schemaVer != schema->getVersion()) { + return RowReaderWrapper(); + } + return RowReaderWrapper(schema, row, readerVer); +} + +// static +RowReaderWrapper RowReaderWrapper::getRowReader( + const std::vector>& schemas, + folly::StringPiece row) { + SchemaVer schemaVer; + int32_t readerVer; + RowReaderWrapper::getVersions(row, schemaVer, readerVer); + if (static_cast(schemaVer) >= schemas.size() || + schemaVer != schemas[schemaVer]->getVersion()) { + return RowReaderWrapper(); + } + return RowReaderWrapper(schemas[schemaVer].get(), row, readerVer); +} + +RowReaderWrapper::RowReaderWrapper(const meta::SchemaProviderIf* schema, + const folly::StringPiece& row, + int32_t& readerVer) { + CHECK_NOTNULL(schema); + if (readerVer == 1) { + readerV1_.resetImpl(schema, row); + currReader_ = &readerV1_; + } else if (readerVer == 2) { + readerV2_.resetImpl(schema, row); + currReader_ = &readerV2_; + } else { + LOG(FATAL) << "Should not reach here"; + } +} + bool RowReaderWrapper::reset(meta::SchemaProviderIf const* schema, folly::StringPiece row, int32_t readerVer) noexcept { @@ -27,6 +113,35 @@ bool RowReaderWrapper::reset(meta::SchemaProviderIf const* schema, } } +bool RowReaderWrapper::reset(meta::SchemaProviderIf const* schema, + folly::StringPiece row) noexcept { + if (schema == nullptr) { + return false; + } + SchemaVer schemaVer; + int32_t readerVer; + RowReaderWrapper::getVersions(row, schemaVer, readerVer); + if (schemaVer != schema->getVersion()) { + return false; + } + return reset(schema, row, readerVer); +} + +bool RowReaderWrapper::reset( + const std::vector>& schemas, + folly::StringPiece row) noexcept { + SchemaVer schemaVer; + int32_t readerVer; + RowReaderWrapper::getVersions(row, schemaVer, readerVer); + if (static_cast(schemaVer) >= schemas.size()) { + return false; + } + // the schema is stored from oldest to newest, so just use version as idx + if (schemaVer != schemas[schemaVer]->getVersion()) { + return false; + } + return reset(schemas[schemaVer].get(), row, readerVer); +} // static void RowReaderWrapper::getVersions(const folly::StringPiece& row, diff --git a/src/codec/RowReaderWrapper.h b/src/codec/RowReaderWrapper.h index e85e45440..19df50d33 100644 --- a/src/codec/RowReaderWrapper.h +++ b/src/codec/RowReaderWrapper.h @@ -20,9 +20,44 @@ class RowReaderWrapper : public RowReader { FRIEND_TEST(RowReaderV2, encodedData); public: + RowReaderWrapper() = default; + RowReaderWrapper(const RowReaderWrapper&) = delete; + RowReaderWrapper& operator=(const RowReaderWrapper&) = delete; + RowReaderWrapper(RowReaderWrapper&&) = default; + RowReaderWrapper& operator=(RowReaderWrapper&&) = default; + + static RowReaderWrapper getTagPropReader(meta::SchemaManager* schemaMan, + GraphSpaceID space, + TagID tag, + folly::StringPiece row); + + static RowReaderWrapper getEdgePropReader(meta::SchemaManager* schemaMan, + GraphSpaceID space, + EdgeType edge, + folly::StringPiece row); + + static RowReaderWrapper getRowReader(meta::SchemaProviderIf const* schema, + folly::StringPiece row); + + // notice: the schemas are from oldest to newest, + // usually from getAllVerTagSchema or getAllVerEdgeSchema in SchemaMan + static RowReaderWrapper getRowReader( + const std::vector>& schemas, + folly::StringPiece row); + + RowReaderWrapper(const meta::SchemaProviderIf* schema, + const folly::StringPiece& row, + int32_t& readerVer); + bool reset(meta::SchemaProviderIf const* schema, folly::StringPiece row, - int32_t readVer) noexcept override; + int32_t readVer) noexcept; + + bool reset(meta::SchemaProviderIf const* schema, + folly::StringPiece row) noexcept; + + bool reset(const std::vector>& schemas, + folly::StringPiece row) noexcept; Value getValueByName(const std::string& prop) const noexcept override { DCHECK(!!currReader_); @@ -79,10 +114,50 @@ class RowReaderWrapper : public RowReader { SchemaVer& schemaVer, int32_t& readerVer); + operator bool() const noexcept { + return operator!=(nullptr); + } + + bool operator==(std::nullptr_t) const noexcept { + return !operator!=(nullptr); + } + + bool operator!=(std::nullptr_t) const noexcept { + return currReader_ != nullptr; + } + + bool operator==(const RowReaderWrapper& rhs) const noexcept { + return !operator!=(rhs); + } + + bool operator!=(const RowReaderWrapper& rhs) const noexcept { + return data_ == rhs.data_; + } + + RowReaderWrapper* operator->() const noexcept { + return get(); + } + + RowReaderWrapper* get() const noexcept { + return const_cast(this); + } + + RowReaderWrapper* get() noexcept { + return this; + } + + RowReaderWrapper& operator*() const noexcept { + return *get(); + } + + void reset() noexcept { + currReader_ = nullptr; + } + private: RowReaderV1 readerV1_; RowReaderV2 readerV2_; - RowReader* currReader_; + RowReader* currReader_ = nullptr; }; } // namespace nebula diff --git a/src/codec/test/NebulaCodecTest.cpp b/src/codec/test/NebulaCodecTest.cpp index 286e664ba..dbd0b60ad 100644 --- a/src/codec/test/NebulaCodecTest.cpp +++ b/src/codec/test/NebulaCodecTest.cpp @@ -37,7 +37,7 @@ TEST(NebulaCodec, encode) { dataman::NebulaCodecImpl codec; std::string encoded = codec.encode(v, schema); - auto reader = RowReader::getRowReader(encoded, schema); + auto reader = RowReaderWrapper::getRowReader(encoded, schema); EXPECT_EQ(5, reader->numFields()); // check int field @@ -86,7 +86,7 @@ TEST(NebulaCodec, encode) { SchemaWriter emptyWriter; auto emptySchema = std::make_shared(emptyWriter.moveSchema()); - auto emptyReader = RowReader::getRowReader(emptyEncoded, emptySchema); + auto emptyReader = RowReaderWrapper::getRowReader(emptyEncoded, emptySchema); EXPECT_EQ(0, emptyReader->numFields()); } diff --git a/src/codec/test/RowReaderBenchmark.cpp b/src/codec/test/RowReaderBenchmark.cpp index 15cb7ac63..e8c0edc57 100644 --- a/src/codec/test/RowReaderBenchmark.cpp +++ b/src/codec/test/RowReaderBenchmark.cpp @@ -10,12 +10,13 @@ #include "codec/test/SchemaWriter.h" #include "codec/test/RowWriterV1.h" #include "codec/RowWriterV2.h" -#include "codec/RowReader.h" +#include "codec/RowReaderWrapper.h" using nebula::SchemaWriter; using nebula::RowWriterV1; using nebula::RowWriterV2; using nebula::RowReader; +using nebula::RowReaderWrapper; using nebula::meta::cpp2::PropertyType; SchemaWriter schemaShort; @@ -104,7 +105,7 @@ std::vector generateRandom(SchemaWriter* schema) { void sequentialRead(SchemaWriter* schema, const std::string& encoded, size_t iters) { - auto reader = RowReader::getRowReader(schema, encoded); + auto reader = RowReaderWrapper::getRowReader(schema, encoded); DCHECK_EQ(reader->readerVer(), ((encoded[0] & 0x18) >> 3) + 1); for (size_t i = 0; i < iters; i++) { @@ -120,7 +121,7 @@ void randomRead(SchemaWriter* schema, const std::string& encoded, const std::vector& randomList, size_t iters) { - auto reader = RowReader::getRowReader(schema, encoded); + auto reader = RowReaderWrapper::getRowReader(schema, encoded); DCHECK_EQ(reader->readerVer(), ((encoded[0] & 0x18) >> 3) + 1); for (size_t i = 0; i < iters; i++) { @@ -135,9 +136,9 @@ void randomRead(SchemaWriter* schema, void sequentialTest(SchemaWriter* schema, const std::string& encodedV1, const std::string& encodedV2) { - auto reader1 = RowReader::getRowReader(schema, encodedV1); + auto reader1 = RowReaderWrapper::getRowReader(schema, encodedV1); DCHECK_EQ(reader1->readerVer(), ((encodedV1[0] & 0x18) >> 3) + 1); - auto reader2 = RowReader::getRowReader(schema, encodedV2); + auto reader2 = RowReaderWrapper::getRowReader(schema, encodedV2); DCHECK_EQ(reader2->readerVer(), ((encodedV2[0] & 0x18) >> 3) + 1); for (size_t i = 0; i < schema->getNumFields(); i++) { @@ -152,9 +153,9 @@ void randomTest(SchemaWriter* schema, const std::string& encodedV1, const std::string& encodedV2, const std::vector& randomList) { - auto reader1 = RowReader::getRowReader(schema, encodedV1); + auto reader1 = RowReaderWrapper::getRowReader(schema, encodedV1); DCHECK_EQ(reader1->readerVer(), ((encodedV1[0] & 0x18) >> 3) + 1); - auto reader2 = RowReader::getRowReader(schema, encodedV2); + auto reader2 = RowReaderWrapper::getRowReader(schema, encodedV2); DCHECK_EQ(reader2->readerVer(), ((encodedV2[0] & 0x18) >> 3) + 1); for (size_t i = 0; i < randomList.size(); i++) { diff --git a/src/codec/test/RowReaderV1Test.cpp b/src/codec/test/RowReaderV1Test.cpp index e11777a26..32174ea1d 100644 --- a/src/codec/test/RowReaderV1Test.cpp +++ b/src/codec/test/RowReaderV1Test.cpp @@ -16,8 +16,8 @@ TEST(RowReaderV1, headerInfo) { // Simplest row, nothing in it char data1[] = {0x00}; SchemaWriter schema1; - auto reader = RowReader::getRowReader(&schema1, - folly::StringPiece(data1, sizeof(data1))); + auto reader = RowReaderWrapper::getRowReader(&schema1, + folly::StringPiece(data1, sizeof(data1))); ASSERT_TRUE(!!reader); EXPECT_EQ(0, reader->schemaVer()); EXPECT_EQ(sizeof(data1), reader->headerLen()); @@ -71,7 +71,7 @@ TEST(RowReaderV1, headerInfo) { // Empty row, return illegal schema version SchemaWriter schema5; - auto reader2 = RowReader::getRowReader(&schema5, + auto reader2 = RowReaderWrapper::getRowReader(&schema5, folly::StringPiece("")); ASSERT_FALSE(!!reader2); ASSERT_FALSE(reader2->reset(&schema5, folly::StringPiece(""))); @@ -160,7 +160,7 @@ TEST(RowReaderV1, encodedData) { /************************** * Now let's read it *************************/ - auto reader = RowReader::getRowReader(&schema, encoded); + auto reader = RowReaderWrapper::getRowReader(&schema, encoded); // Header info RowReaderWrapper* r = dynamic_cast(reader.get()); @@ -280,7 +280,7 @@ TEST(RowReaderV1, iterator) { encoded.append(1, i + 1); } - auto reader = RowReader::getRowReader(&schema, encoded); + auto reader = RowReaderWrapper::getRowReader(&schema, encoded); auto it = reader->begin(); int32_t index = 0; while (it != reader->end()) { diff --git a/src/codec/test/RowReaderV2Test.cpp b/src/codec/test/RowReaderV2Test.cpp index 99a2e05fc..9037169c9 100644 --- a/src/codec/test/RowReaderV2Test.cpp +++ b/src/codec/test/RowReaderV2Test.cpp @@ -18,7 +18,7 @@ TEST(RowReaderV2, headerInfo) { // Simplest row, nothing in it char data1[] = {0x08}; SchemaWriter schema1; - auto reader = RowReader::getRowReader(&schema1, + auto reader = RowReaderWrapper::getRowReader(&schema1, folly::StringPiece(data1, sizeof(data1))); ASSERT_TRUE(!!reader); EXPECT_EQ(0, reader->schemaVer()); @@ -56,8 +56,7 @@ TEST(RowReaderV2, headerInfo) { // Empty row, return illegal schema version SchemaWriter schema5; - auto reader2 = RowReader::getRowReader(&schema5, - folly::StringPiece("")); + auto reader2 = RowReaderWrapper::getRowReader(&schema5, folly::StringPiece("")); ASSERT_FALSE(!!reader2); ASSERT_FALSE(reader2->reset(&schema5, folly::StringPiece(""))); } @@ -167,7 +166,7 @@ TEST(RowReaderV2, encodedData) { /************************** * Now let's read it *************************/ - auto reader = RowReader::getRowReader(&schema, encoded); + auto reader = RowReaderWrapper::getRowReader(&schema, encoded); // Header info RowReaderWrapper* r = dynamic_cast(reader.get()); @@ -315,7 +314,7 @@ TEST(RowReaderV2, iterator) { encoded.append(7, 0); } - auto reader = RowReader::getRowReader(&schema, encoded); + auto reader = RowReaderWrapper::getRowReader(&schema, encoded); auto it = reader->begin(); int32_t index = 0; while (it != reader->end()) { diff --git a/src/codec/test/RowWriterV2Test.cpp b/src/codec/test/RowWriterV2Test.cpp index 0547c934b..fca2da612 100644 --- a/src/codec/test/RowWriterV2Test.cpp +++ b/src/codec/test/RowWriterV2Test.cpp @@ -7,7 +7,7 @@ #include "common/base/Base.h" #include #include "codec/RowWriterV2.h" -#include "codec/RowReaderV2.h" +#include "codec/RowReaderWrapper.h" #include "codec/test/SchemaWriter.h" namespace nebula { @@ -83,8 +83,8 @@ TEST(RowWriterV2, NoDefaultValue) { std::string encoded1 = std::move(writer1).moveEncodedStr(); std::string encoded2 = writer2.getEncodedStr(); - auto reader1 = RowReader::getRowReader(&schema, encoded1); - auto reader2 = RowReader::getRowReader(&schema, encoded2); + auto reader1 = RowReaderWrapper::getRowReader(&schema, encoded1); + auto reader2 = RowReaderWrapper::getRowReader(&schema, encoded2); // Col01 Value v1 = reader1->getValueByName("Col01"); @@ -195,7 +195,7 @@ TEST(RowWriterV2, WithDefaultValue) { ASSERT_EQ(WriteResult::SUCCEEDED, writer.finish()); std::string encoded = std::move(writer).moveEncodedStr(); - auto reader = RowReader::getRowReader(&schema, encoded); + auto reader = RowReaderWrapper::getRowReader(&schema, encoded); // Col01 Value v1 = reader->getValueByName("Col01"); @@ -250,7 +250,7 @@ TEST(RowWriterV2, DoubleSet) { ASSERT_EQ(WriteResult::SUCCEEDED, writer.finish()); std::string encoded = std::move(writer).moveEncodedStr(); - auto reader = RowReader::getRowReader(&schema, encoded); + auto reader = RowReaderWrapper::getRowReader(&schema, encoded); // Col01 Value v1 = reader->getValueByName("Col01"); @@ -305,7 +305,7 @@ TEST(RowWriterV2, Update) { std::string encoded1 = writer.moveEncodedStr(); - auto oldReader = RowReader::getRowReader(&schema, encoded1); + auto oldReader = RowReaderWrapper::getRowReader(&schema, encoded1); RowWriterV2 updater2(*oldReader); RowWriterV2 updater1(&schema, std::move(encoded1)); @@ -324,8 +324,8 @@ TEST(RowWriterV2, Update) { std::string encoded3 = updater2.moveEncodedStr(); EXPECT_EQ(encoded2, encoded3); - auto reader1 = RowReader::getRowReader(&schema, encoded2); - auto reader2 = RowReader::getRowReader(&schema, encoded3); + auto reader1 = RowReaderWrapper::getRowReader(&schema, encoded2); + auto reader2 = RowReaderWrapper::getRowReader(&schema, encoded3); // Col01 Value v1 = reader1->getValueByName("Col01"); @@ -405,7 +405,7 @@ TEST(RowWriterV2, EmptyString) { ASSERT_EQ(WriteResult::SUCCEEDED, writer.finish()); std::string encoded = std::move(writer).moveEncodedStr(); - auto reader = RowReader::getRowReader(&schema, encoded); + auto reader = RowReaderWrapper::getRowReader(&schema, encoded); // Col01 Value v1 = reader->getValueByName("Col01"); diff --git a/src/storage/BaseProcessor.h b/src/storage/BaseProcessor.h index c75d8b083..9398f8594 100644 --- a/src/storage/BaseProcessor.h +++ b/src/storage/BaseProcessor.h @@ -15,7 +15,7 @@ #include #include #include "storage/CommonUtils.h" -#include "codec/RowReader.h" +#include "codec/RowReaderWrapper.h" #include "codec/RowWriterV2.h" #include "utils/IndexKeyUtils.h" diff --git a/src/storage/CompactionFilter.h b/src/storage/CompactionFilter.h index db0604b70..e2df4b600 100644 --- a/src/storage/CompactionFilter.h +++ b/src/storage/CompactionFilter.h @@ -105,7 +105,8 @@ class StorageCompactionFilter final : public kvstore::KVFilter { VLOG(3) << "Space " << spaceId << ", Tag " << tagId << " invalid"; return false; } - auto reader = nebula::RowReader::getTagPropReader(schemaMan_, spaceId, tagId, val); + auto reader = nebula::RowReaderWrapper::getTagPropReader( + schemaMan_, spaceId, tagId, val); return checkDataTtlValid(schema.get(), reader.get()); } else if (NebulaKeyUtils::isEdge(vIdLen_, key)) { auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, key); @@ -114,10 +115,8 @@ class StorageCompactionFilter final : public kvstore::KVFilter { VLOG(3) << "Space " << spaceId << ", EdgeType " << edgeType << " invalid"; return false; } - auto reader = nebula::RowReader::getEdgePropReader(schemaMan_, - spaceId, - std::abs(edgeType), - val); + auto reader = nebula::RowReaderWrapper::getEdgePropReader( + schemaMan_, spaceId, std::abs(edgeType), val); return checkDataTtlValid(schema.get(), reader.get()); } return true; diff --git a/src/storage/admin/RebuildEdgeIndexProcessor.cpp b/src/storage/admin/RebuildEdgeIndexProcessor.cpp index d0aad6b88..3a214df34 100644 --- a/src/storage/admin/RebuildEdgeIndexProcessor.cpp +++ b/src/storage/admin/RebuildEdgeIndexProcessor.cpp @@ -78,10 +78,10 @@ void RebuildEdgeIndexProcessor::process(const cpp2::RebuildIndexRequest& req) { currentDstVertex = destination; } auto ranking = NebulaKeyUtils::getRank(key); - auto reader = RowReader::getEdgePropReader(schemaMan_, - std::move(val), - space, - edgeType); + auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_, + std::move(val), + space, + edgeType); auto values = collectIndexValues(reader.get(), item->get_fields()); auto indexKey = NebulaKeyUtils::edgeIndexKey(part, indexID, source, ranking, destination, values); diff --git a/src/storage/admin/RebuildTagIndexProcessor.cpp b/src/storage/admin/RebuildTagIndexProcessor.cpp index 9b7187bcb..925353c13 100644 --- a/src/storage/admin/RebuildTagIndexProcessor.cpp +++ b/src/storage/admin/RebuildTagIndexProcessor.cpp @@ -75,10 +75,10 @@ void RebuildTagIndexProcessor::process(const cpp2::RebuildIndexRequest& req) { } else { currentVertex = vertex; } - auto reader = RowReader::getTagPropReader(schemaMan_, - std::move(val), - space, - tagID); + auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, + std::move(val), + space, + tagID); auto values = collectIndexValues(reader.get(), item->get_fields()); auto indexKey = NebulaKeyUtils::vertexIndexKey(part, indexID, vertex, values); diff --git a/src/storage/exec/EdgeNode.h b/src/storage/exec/EdgeNode.h index 2b9b88d54..a08328a2e 100644 --- a/src/storage/exec/EdgeNode.h +++ b/src/storage/exec/EdgeNode.h @@ -96,7 +96,6 @@ class EdgeNode : public IterateNode { folly::Optional> ttl_; std::string edgeName_; - // std::unique_ptr reader_; std::unique_ptr iter_; std::string prefix_; }; diff --git a/src/storage/exec/GetNeighborsNode.h b/src/storage/exec/GetNeighborsNode.h index e4d01eca7..f3f0066dc 100644 --- a/src/storage/exec/GetNeighborsNode.h +++ b/src/storage/exec/GetNeighborsNode.h @@ -151,7 +151,7 @@ class GetNeighborsSampleNode : public GetNeighborsNode { sampler_->sampling(std::make_tuple(edgeType, val.str(), key.str(), props, columnIdx)); } - std::unique_ptr reader; + RowReaderWrapper reader; auto samples = std::move(*sampler_).samples(); for (auto& sample : samples) { auto columnIdx = std::get<4>(sample); @@ -162,18 +162,11 @@ class GetNeighborsSampleNode : public GetNeighborsNode { auto edgeType = std::get<0>(sample); auto val = std::get<1>(sample); + reader = RowReaderWrapper::getEdgePropReader(planContext_->env_->schemaMan_, + planContext_->spaceId_, + std::abs(edgeType), + val); if (!reader) { - reader = RowReader::getEdgePropReader(planContext_->env_->schemaMan_, - planContext_->spaceId_, - std::abs(edgeType), - val); - if (!reader) { - continue; - } - } else if (!reader->resetEdgePropReader(planContext_->env_->schemaMan_, - planContext_->spaceId_, - std::abs(edgeType), - val)) { continue; } diff --git a/src/storage/exec/StorageIterator.h b/src/storage/exec/StorageIterator.h index eb0f113ea..c59b74334 100644 --- a/src/storage/exec/StorageIterator.h +++ b/src/storage/exec/StorageIterator.h @@ -87,7 +87,7 @@ class SingleTagIterator : public StorageIterator { protected: // return true when the value iter to a valid tag value bool check(folly::StringPiece val) { - reader_ = RowReader::getRowReader(*schemas_, val); + reader_.reset(*schemas_, val); if (!reader_) { planContext_->resultStat_ = ResultStatus::ILLEGAL_DATA; return false; @@ -111,7 +111,7 @@ class SingleTagIterator : public StorageIterator { int64_t ttlDuration_; bool lookupOne_ = true; - std::unique_ptr reader_; + RowReaderWrapper reader_; }; // Iterator of single specified type @@ -199,13 +199,8 @@ class SingleEdgeIterator : public StorageIterator { } auto val = iter_->val(); + reader_.reset(*schemas_, val); if (!reader_) { - reader_ = RowReader::getRowReader(*schemas_, val); - if (!reader_) { - planContext_->resultStat_ = ResultStatus::ILLEGAL_DATA; - return false; - } - } else if (!reader_->reset(*schemas_, val)) { planContext_->resultStat_ = ResultStatus::ILLEGAL_DATA; return false; } @@ -229,7 +224,7 @@ class SingleEdgeIterator : public StorageIterator { bool moveToValidRecord_{true}; bool lookupOne_ = true; - std::unique_ptr reader_; + RowReaderWrapper reader_; EdgeRanking lastRank_ = 0; VertexID lastDstId_ = ""; bool firstLoop_ = true; diff --git a/src/storage/exec/UpdateNode.h b/src/storage/exec/UpdateNode.h index 166f43d7a..110385cb2 100644 --- a/src/storage/exec/UpdateNode.h +++ b/src/storage/exec/UpdateNode.h @@ -58,7 +58,9 @@ class UpdateTagNode : public RelNode { return folly::none; } - this->reader_ = filterNode_->reader(); + if (filterNode_->valid()) { + this->reader_ = filterNode_->reader(); + } // reset StorageExpressionContext reader_ to nullptr this->expCtx_->reset(); @@ -235,7 +237,7 @@ class UpdateTagNode : public RelNode { // when TTL exists, there is no index. // when insert_ is true, not old index, val_ is empty. if (!indexes_.empty()) { - std::unique_ptr nReader; + RowReaderWrapper nReader; for (auto& index : indexes_) { if (tagId_ == index->get_schema_id().get_tag_id()) { // step 1, delete old version index if exists. @@ -252,10 +254,8 @@ class UpdateTagNode : public RelNode { // step 2, insert new vertex index if (!nReader) { - nReader = RowReader::getTagPropReader(planContext_->env_->schemaMan_, - planContext_->spaceId_, - tagId_, - nVal); + nReader = RowReaderWrapper::getTagPropReader( + planContext_->env_->schemaMan_, planContext_->spaceId_, tagId_, nVal); } if (!nReader) { LOG(ERROR) << "Bad format row"; @@ -357,7 +357,9 @@ class UpdateEdgeNode : public RelNode { return folly::none; } - this->reader_ = filterNode_->reader(); + if (filterNode_->valid()) { + this->reader_ = filterNode_->reader(); + } // reset StorageExpressionContext reader_ to nullptr this->expCtx_->reset(); @@ -550,7 +552,7 @@ class UpdateEdgeNode : public RelNode { // when TTL exists, there is no index. // when insert_ is true, not old index, val_ is empty. if (!indexes_.empty()) { - std::unique_ptr nReader; + RowReaderWrapper nReader; for (auto& index : indexes_) { if (edgeType_ == index->get_schema_id().get_edge_type()) { // step 1, delete old version index if exists. @@ -567,10 +569,9 @@ class UpdateEdgeNode : public RelNode { // step 2, insert new edge index if (!nReader) { - nReader = RowReader::getEdgePropReader(planContext_->env_->schemaMan_, - planContext_->spaceId_, - std::abs(edgeType_), - nVal); + nReader = RowReaderWrapper::getEdgePropReader( + planContext_->env_->schemaMan_, planContext_->spaceId_, + std::abs(edgeType_), nVal); } if (!nReader) { LOG(ERROR) << "Bad format row"; diff --git a/src/storage/index/IndexExecutor.inl b/src/storage/index/IndexExecutor.inl index a43386ffe..3fe6f086b 100644 --- a/src/storage/index/IndexExecutor.inl +++ b/src/storage/index/IndexExecutor.inl @@ -183,7 +183,7 @@ kvstore::ResultCode IndexExecutor::getVertexRow(PartitionID partId, auto result = vertexCache_->get(std::make_pair(vId, tagOrEdge_), partId); if (result.ok()) { auto v = std::move(result).value(); - auto reader = RowReader::getTagPropReader(schemaMan_, v, spaceId_, tagOrEdge_); + auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, v, spaceId_, tagOrEdge_); auto row = getRowFromReader(reader.get()); data->set_props(std::move(row)); VLOG(3) << "Hit cache for vId " << vId << ", tagId " << tagOrEdge_; @@ -200,10 +200,10 @@ kvstore::ResultCode IndexExecutor::getVertexRow(PartitionID partId, return ret; } if (iter && iter->valid()) { - auto reader = RowReader::getTagPropReader(schemaMan_, - iter->val(), - spaceId_, - tagOrEdge_); + auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, + iter->val(), + spaceId_, + tagOrEdge_); auto row = getRowFromReader(reader.get()); data->set_props(std::move(row)); if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr) { @@ -244,10 +244,10 @@ kvstore::ResultCode IndexExecutor::getEdgeRow(PartitionID partId, return ret; } if (iter && iter->valid()) { - auto reader = RowReader::getEdgePropReader(schemaMan_, - iter->val(), - spaceId_, - tagOrEdge_); + auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_, + iter->val(), + spaceId_, + tagOrEdge_); auto row = getRowFromReader(reader.get()); data->set_props(std::move(row)); } else { diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index a300df0b8..99ac78bd3 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -135,7 +135,7 @@ AddEdgesProcessor::addEdges(PartitionID partId, for (auto& e : newEdges) { std::string val; - std::unique_ptr nReader; + RowReaderWrapper nReader; auto edgeType = NebulaKeyUtils::getEdgeType(spaceVidLen_, e.first); for (auto& index : indexes_) { if (edgeType == index->get_schema_id().get_edge_type()) { @@ -150,10 +150,10 @@ AddEdgesProcessor::addEdges(PartitionID partId, val = std::move(obsIdx).value(); } if (!val.empty()) { - auto reader = RowReader::getEdgePropReader(this->env_->schemaMan_, - spaceId_, - edgeType, - val); + auto reader = RowReaderWrapper::getEdgePropReader(this->env_->schemaMan_, + spaceId_, + edgeType, + val); if (reader == nullptr) { LOG(ERROR) << "Bad format row"; return folly::none; @@ -167,10 +167,10 @@ AddEdgesProcessor::addEdges(PartitionID partId, * step 2 , Insert new edge index */ if (nReader == nullptr) { - nReader = RowReader::getEdgePropReader(this->env_->schemaMan_, - spaceId_, - edgeType, - e.second); + nReader = RowReaderWrapper::getEdgePropReader(this->env_->schemaMan_, + spaceId_, + edgeType, + e.second); if (nReader == nullptr) { LOG(ERROR) << "Bad format row"; return folly::none; diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index a784406cf..45e23a95c 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -142,7 +142,7 @@ AddVerticesProcessor::addVertices(PartitionID partId, for (auto& v : newVertices) { std::string val; - std::unique_ptr nReader; + RowReaderWrapper nReader; auto tagId = NebulaKeyUtils::getTagId(spaceVidLen_, v.first); auto vId = NebulaKeyUtils::getVertexId(spaceVidLen_, v.first); for (auto& index : indexes_) { @@ -158,10 +158,10 @@ AddVerticesProcessor::addVertices(PartitionID partId, val = std::move(obsIdx).value(); } if (!val.empty()) { - auto reader = RowReader::getTagPropReader(this->env_->schemaMan_, - spaceId_, - tagId, - val); + auto reader = RowReaderWrapper::getTagPropReader(this->env_->schemaMan_, + spaceId_, + tagId, + val); if (reader == nullptr) { LOG(ERROR) << "Bad format row"; return folly::none; @@ -175,10 +175,10 @@ AddVerticesProcessor::addVertices(PartitionID partId, * step 2 , Insert new vertex index */ if (nReader == nullptr) { - nReader = RowReader::getTagPropReader(this->env_->schemaMan_, - spaceId_, - tagId, - v.second); + nReader = RowReaderWrapper::getTagPropReader(this->env_->schemaMan_, + spaceId_, + tagId, + v.second); if (nReader == nullptr) { LOG(ERROR) << "Bad format row"; return folly::none; diff --git a/src/storage/mutate/DeleteEdgesProcessor.cpp b/src/storage/mutate/DeleteEdgesProcessor.cpp index 141689b89..c9477b78d 100644 --- a/src/storage/mutate/DeleteEdgesProcessor.cpp +++ b/src/storage/mutate/DeleteEdgesProcessor.cpp @@ -122,15 +122,15 @@ DeleteEdgesProcessor::deleteEdges(PartitionID partId, * just get the latest version edge for index. */ if (isLatestVE) { - std::unique_ptr reader; + RowReaderWrapper reader; for (auto& index : indexes_) { auto indexId = index->get_index_id(); if (type == index->get_schema_id().get_edge_type()) { if (reader == nullptr) { - reader = RowReader::getEdgePropReader(this->env_->schemaMan_, - spaceId_, - type, - iter->val()); + reader = RowReaderWrapper::getEdgePropReader(this->env_->schemaMan_, + spaceId_, + type, + iter->val()); if (reader == nullptr) { LOG(WARNING) << "Bad format row!"; return folly::none; diff --git a/src/storage/mutate/DeleteVerticesProcessor.cpp b/src/storage/mutate/DeleteVerticesProcessor.cpp index 14a24e388..da33ea790 100644 --- a/src/storage/mutate/DeleteVerticesProcessor.cpp +++ b/src/storage/mutate/DeleteVerticesProcessor.cpp @@ -139,15 +139,15 @@ DeleteVerticesProcessor::deleteVertices(PartitionID partId, * Using newlyVertexId to identify if it is the latest version */ if (latestVVId != tagId) { - std::unique_ptr reader; + RowReaderWrapper reader; for (auto& index : indexes_) { auto indexId = index->get_index_id(); if (index->get_schema_id().get_tag_id() == tagId) { if (reader == nullptr) { - reader = RowReader::getTagPropReader(this->env_->schemaMan_, - spaceId_, - tagId, - iter->val()); + reader = RowReaderWrapper::getTagPropReader(this->env_->schemaMan_, + spaceId_, + tagId, + iter->val()); if (reader == nullptr) { LOG(WARNING) << "Bad format row"; return folly::none; diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index 735b422a0..e51ec7630 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -85,7 +85,8 @@ void ScanEdgeProcessor::process(const cpp2::ScanEdgeRequest& req) { data.set_value(value.str()); } else if (!ctxIter->second.empty()) { // only return specified columns - auto reader = RowReader::getEdgePropReader(schemaMan_, value, spaceId_, edgeType); + auto reader = RowReaderWrapper::getEdgePropReader( + schemaMan_, value, spaceId_, edgeType); RowWriter writer; PropsCollector collector(&writer); auto& props = ctxIter->second; diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index 6bc45328a..d264ddf3e 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -80,7 +80,7 @@ void ScanVertexProcessor::process(const cpp2::ScanVertexRequest& req) { data.set_value(value.str()); } else if (!ctxIter->second.empty()) { // only return specified columns - auto reader = RowReader::getTagPropReader(schemaMan_, value, spaceId_, tagId); + auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, value, spaceId_, tagId); RowWriter writer; PropsCollector collector(&writer); auto& props = ctxIter->second; diff --git a/src/storage/test/GetNeighborsBenchmark.cpp b/src/storage/test/GetNeighborsBenchmark.cpp index 4c83ad518..5630baa9e 100644 --- a/src/storage/test/GetNeighborsBenchmark.cpp +++ b/src/storage/test/GetNeighborsBenchmark.cpp @@ -209,8 +209,9 @@ void prefix(int32_t iters, CHECK(!edgeSchemaIter->second.empty()); auto* edgeSchema = &(edgeSchemaIter->second); - std::unique_ptr reader; nebula::DataSet resultDataSet; + nebula::RowReaderWrapper reader; + for (const auto& vId : vertex) { nebula::PartitionID partId = (hash(vId) % totalParts) + 1; std::vector row; @@ -225,7 +226,7 @@ void prefix(int32_t iters, CHECK_EQ(code, nebula::kvstore::ResultCode::SUCCEEDED); CHECK(iter->valid()); auto val = iter->val(); - reader = nebula::RowReader::getRowReader(*tagSchema, val); + reader.reset(*tagSchema, val); CHECK_NOTNULL(reader); auto& cell = row[1].mutableList(); for (const auto& prop : playerProps) { @@ -245,12 +246,7 @@ void prefix(int32_t iters, auto key = iter->key(); folly::doNotOptimizeAway(key); auto val = iter->val(); - if (!reader) { - reader = nebula::RowReader::getRowReader(*edgeSchema, val); - CHECK_NOTNULL(reader); - } else if (!reader->reset(*edgeSchema, val)) { - LOG(FATAL) << "Should not happen"; - } + reader.reset(*edgeSchema, val); auto props = planCtx->props_; for (const auto& prop : *props) { auto value = nebula::storage::QueryUtils::readValue( diff --git a/src/storage/test/IndexScanTest.cpp b/src/storage/test/IndexScanTest.cpp index 265713bf9..7636dcc57 100644 --- a/src/storage/test/IndexScanTest.cpp +++ b/src/storage/test/IndexScanTest.cpp @@ -74,10 +74,10 @@ static std::string genEdgeIndexKey(meta::SchemaManager* schemaMan, std::shared_ptr& index, VertexID src, VertexID dst) { - auto reader = RowReader::getEdgePropReader(schemaMan, - prop, - spaceId, - type); + auto reader = RowReaderWrapper::getEdgePropReader(schemaMan, + prop, + spaceId, + type); auto values = collectIndexValues(reader.get(), index->get_fields()); auto indexKey = NebulaKeyUtils::edgeIndexKey(partId, @@ -96,10 +96,10 @@ static std::string genVertexIndexKey(meta::SchemaManager* schemaMan, TagID tagId, std::shared_ptr &index, VertexID vId) { - auto reader = RowReader::getTagPropReader(schemaMan, - prop, - spaceId, - tagId); + auto reader = RowReaderWrapper::getTagPropReader(schemaMan, + prop, + spaceId, + tagId); auto values = collectIndexValues(reader.get(), index->get_fields()); auto indexKey = NebulaKeyUtils::vertexIndexKey(partId, diff --git a/src/storage/test/QueryStatsTest.cpp b/src/storage/test/QueryStatsTest.cpp index bd02d39c0..723cef1c7 100644 --- a/src/storage/test/QueryStatsTest.cpp +++ b/src/storage/test/QueryStatsTest.cpp @@ -87,7 +87,7 @@ void checkResponse(const cpp2::QueryStatsResponse& resp) { expected.emplace_back("col_6", nebula::cpp2::SupportedType::INT, 6); expected.emplace_back("col_8", nebula::cpp2::SupportedType::INT, 8); - auto reader = RowReader::getRowReader(resp.data, provider); + auto reader = RowReaderWrapper::getRowReader(resp.data, provider); auto numFields = provider->getNumFields(); for (size_t i = 0; i < numFields; i++) { const auto* name = provider->getFieldName(i); diff --git a/src/storage/test/ScanEdgePropBenchmark.cpp b/src/storage/test/ScanEdgePropBenchmark.cpp index 24ccb5434..2969ff541 100644 --- a/src/storage/test/ScanEdgePropBenchmark.cpp +++ b/src/storage/test/ScanEdgePropBenchmark.cpp @@ -154,13 +154,13 @@ TEST_P(ScanEdgePropBench, ProcessEdgeProps) { iter.reset(new TestSingleEdgeIterator(std::move(kvIter), edgeType, vIdLen)); } size_t edgeRowCount = 0; - std::unique_ptr reader; + RowReaderWrapper reader; folly::stop_watch watch; for (; iter->valid(); iter->next(), edgeRowCount++) { auto key = iter->key(); auto val = iter->val(); - reader = RowReader::getEdgePropReader(env->schemaMan_, spaceId, - std::abs(edgeType), val); + reader = RowReaderWrapper::getEdgePropReader(env->schemaMan_, spaceId, + std::abs(edgeType), val); ASSERT_TRUE(reader.get() != nullptr); auto code = node.collectEdgeProps(reader.get(), key, vIdLen, &props, list); ASSERT_EQ(kvstore::ResultCode::SUCCEEDED, code); @@ -180,19 +180,14 @@ TEST_P(ScanEdgePropBench, ProcessEdgeProps) { iter.reset(new TestSingleEdgeIterator(std::move(kvIter), edgeType, vIdLen)); } size_t edgeRowCount = 0; - std::unique_ptr reader; + RowReaderWrapper reader; folly::stop_watch watch; for (; iter->valid(); iter->next(), edgeRowCount++) { auto key = iter->key(); auto val = iter->val(); - if (reader.get() == nullptr) { - reader = RowReader::getEdgePropReader(env->schemaMan_, spaceId, - std::abs(edgeType), val); - ASSERT_TRUE(reader.get() != nullptr); - } else { - ASSERT_TRUE(reader->resetEdgePropReader(env->schemaMan_, spaceId, - std::abs(edgeType), val)); - } + reader = RowReaderWrapper::getEdgePropReader(env->schemaMan_, spaceId, + std::abs(edgeType), val); + ASSERT_TRUE(reader.get() != nullptr); auto code = node.collectEdgeProps(reader.get(), key, vIdLen, &props, list); ASSERT_EQ(kvstore::ResultCode::SUCCEEDED, code); result.mutableList().values.emplace_back(std::move(list)); @@ -211,7 +206,7 @@ TEST_P(ScanEdgePropBench, ProcessEdgeProps) { iter.reset(new TestSingleEdgeIterator(std::move(kvIter), edgeType, vIdLen)); } size_t edgeRowCount = 0; - std::unique_ptr reader; + RowReaderWrapper reader; // find all version of edge schema auto edges = env->schemaMan_->getAllVerEdgeSchema(spaceId); @@ -226,7 +221,7 @@ TEST_P(ScanEdgePropBench, ProcessEdgeProps) { auto key = iter->key(); auto val = iter->val(); if (reader.get() == nullptr) { - reader = RowReader::getRowReader(schemas, val); + reader = RowReaderWrapper::getRowReader(schemas, val); ASSERT_TRUE(reader.get() != nullptr); } else { ASSERT_TRUE(reader->reset(schemas, val)); diff --git a/src/storage/test/ScanEdgeTest.cpp b/src/storage/test/ScanEdgeTest.cpp index 528bb63c1..4c86f0a67 100644 --- a/src/storage/test/ScanEdgeTest.cpp +++ b/src/storage/test/ScanEdgeTest.cpp @@ -152,7 +152,7 @@ void checkResponse(PartitionID partId, cpp2::ScanEdgeResponse& resp, std::string auto schemaIter = resp.edge_schema.find(edgeType); EXPECT_TRUE(schemaIter != resp.edge_schema.end()); auto provider = std::make_shared(schemaIter->second); - auto reader = RowReader::getRowReader(scanEdge.value, provider); + auto reader = RowReaderWrapper::getRowReader(scanEdge.value, provider); if (!returnAllColumns) { for (int64_t i = 0; i < 10; i += 2) { diff --git a/src/storage/test/ScanVertexTest.cpp b/src/storage/test/ScanVertexTest.cpp index 183730e41..18801f32a 100644 --- a/src/storage/test/ScanVertexTest.cpp +++ b/src/storage/test/ScanVertexTest.cpp @@ -152,7 +152,7 @@ void checkResponse(PartitionID partId, cpp2::ScanVertexResponse& resp, std::stri auto schemaIter = resp.vertex_schema.find(tagId); EXPECT_TRUE(schemaIter != resp.vertex_schema.end()); auto provider = std::make_shared(schemaIter->second); - auto reader = RowReader::getRowReader(scanVertex.value, provider); + auto reader = RowReaderWrapper::getRowReader(scanVertex.value, provider); if (!returnAllColumns) { { diff --git a/src/storage/test/StorageLookupBenchmark.cpp b/src/storage/test/StorageLookupBenchmark.cpp index 4cf814297..e5b9d76a7 100644 --- a/src/storage/test/StorageLookupBenchmark.cpp +++ b/src/storage/test/StorageLookupBenchmark.cpp @@ -58,10 +58,10 @@ std::string genVertexIndexKey(meta::SchemaManager* schemaMan, const std::string &prop, std::shared_ptr &index, VertexID vId) { - auto reader = RowReader::getTagPropReader(schemaMan, - prop, - spaceId, - tagId); + auto reader = RowReaderWrapper::getTagPropReader(schemaMan, + prop, + spaceId, + tagId); auto values = collectIndexValues(reader.get(), index->get_fields()); auto indexKey = NebulaKeyUtils::vertexIndexKey(partId, diff --git a/src/storage/test/TestUtils.h b/src/storage/test/TestUtils.h index acc5b164e..46483bd0e 100644 --- a/src/storage/test/TestUtils.h +++ b/src/storage/test/TestUtils.h @@ -8,6 +8,7 @@ #define STORAGE_TEST_TESTUTILS_H_ #include "common/base/Base.h" +#include "codec/RowReaderWrapper.h" #include "kvstore/KVStore.h" #include "kvstore/PartManager.h" #include "kvstore/NebulaStore.h" @@ -51,7 +52,7 @@ void checkAddVerticesData(cpp2::AddVerticesRequest req, int num = 0; while (iter && iter->valid()) { - auto reader = RowReader::getRowReader(schema.get(), iter->val()); + auto reader = RowReaderWrapper::getRowReader(schema.get(), iter->val()); // For players tagId is 1 Value val; if (mode == 0) { @@ -201,7 +202,7 @@ void checkAddEdgesData(cpp2::AddEdgesRequest req, Value val; int num = 0; while (iter && iter->valid()) { - auto reader = RowReader::getRowReader(schema.get(), iter->val()); + auto reader = RowReaderWrapper::getRowReader(schema.get(), iter->val()); if (mode == 0) { for (auto i = 0; i < 7; i++) { val = reader->getValueByIndex(i); diff --git a/src/storage/test/UpdateEdgeTest.cpp b/src/storage/test/UpdateEdgeTest.cpp index 520a33869..4bdad2745 100644 --- a/src/storage/test/UpdateEdgeTest.cpp +++ b/src/storage/test/UpdateEdgeTest.cpp @@ -197,10 +197,10 @@ TEST(UpdateEdgeTest, No_Filter_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto edgeReader = RowReader::getEdgePropReader(env->schemaMan_, - spaceId, - std::abs(edgeType), - iter->val()); + auto edgeReader = RowReaderWrapper::getEdgePropReader(env->schemaMan_, + spaceId, + std::abs(edgeType), + iter->val()); auto val = edgeReader->getValueByName("playerName"); EXPECT_EQ("Tim Duncan", val.getStr()); @@ -305,10 +305,10 @@ TEST(UpdateEdgeTest, No_Filter_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto edgeReader = RowReader::getEdgePropReader(env->schemaMan_, - spaceId, - std::abs(edgeType), - iter->val()); + auto edgeReader = RowReaderWrapper::getEdgePropReader(env->schemaMan_, + spaceId, + std::abs(edgeType), + iter->val()); auto val = edgeReader->getValueByName("playerName"); EXPECT_EQ("Tim Duncan", val.getStr()); val = edgeReader->getValueByName("teamName"); @@ -455,10 +455,10 @@ TEST(UpdateEdgeTest, Filter_Yield_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto reader = RowReader::getEdgePropReader(env->schemaMan_, - spaceId, - std::abs(edgeType), - iter->val()); + auto reader = RowReaderWrapper::getEdgePropReader(env->schemaMan_, + spaceId, + std::abs(edgeType), + iter->val()); auto val = reader->getValueByName("playerName"); EXPECT_EQ("Tim Duncan", val.getStr()); val = reader->getValueByName("teamName"); @@ -584,10 +584,10 @@ TEST(UpdateEdgeTest, Insertable_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto reader = RowReader::getEdgePropReader(env->schemaMan_, - spaceId, - std::abs(edgeType), - iter->val()); + auto reader = RowReaderWrapper::getEdgePropReader(env->schemaMan_, + spaceId, + std::abs(edgeType), + iter->val()); auto val = reader->getValueByName("playerName"); EXPECT_EQ("Brandon Ingram", val.getStr()); val = reader->getValueByName("teamName"); @@ -690,10 +690,10 @@ TEST(UpdateEdgeTest, Invalid_Update_Prop_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto reader = RowReader::getEdgePropReader(env->schemaMan_, - spaceId, - std::abs(edgeType), - iter->val()); + auto reader = RowReaderWrapper::getEdgePropReader(env->schemaMan_, + spaceId, + std::abs(edgeType), + iter->val()); auto val = reader->getValueByName("playerName"); EXPECT_EQ("Tim Duncan", val.getStr()); val = reader->getValueByName("teamName"); @@ -821,10 +821,10 @@ TEST(UpdateEdgeTest, Invalid_Filter_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto reader = RowReader::getEdgePropReader(env->schemaMan_, - spaceId, - std::abs(edgeType), - iter->val()); + auto reader = RowReaderWrapper::getEdgePropReader(env->schemaMan_, + spaceId, + std::abs(edgeType), + iter->val()); auto val = reader->getValueByName("playerName"); EXPECT_EQ("Tim Duncan", val.getStr()); val = reader->getValueByName("teamName"); @@ -977,10 +977,10 @@ TEST(UpdateEdgeTest, Insertable_Filter_value_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto reader = RowReader::getEdgePropReader(env->schemaMan_, - spaceId, - std::abs(edgeType), - iter->val()); + auto reader = RowReaderWrapper::getEdgePropReader(env->schemaMan_, + spaceId, + std::abs(edgeType), + iter->val()); auto val = reader->getValueByName("playerName"); EXPECT_EQ("Brandon Ingram", val.getStr()); val = reader->getValueByName("teamName"); @@ -1278,10 +1278,10 @@ TEST(UpdateEdgeTest, TTL_Insert_No_Exist_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto reader = RowReader::getEdgePropReader(env->schemaMan_, - spaceId, - std::abs(edgeType), - iter->val()); + auto reader = RowReaderWrapper::getEdgePropReader(env->schemaMan_, + spaceId, + std::abs(edgeType), + iter->val()); auto val = reader->getValueByName("playerName"); EXPECT_EQ("Tim", val.getStr()); val = reader->getValueByName("teamName"); @@ -1417,7 +1417,7 @@ TEST(UpdateEdgeTest, TTL_Insert_Test) { int count = 0; while (iter && iter->valid()) { - auto edgeReader = RowReader::getRowReader(schema.get(), iter->val()); + auto edgeReader = RowReaderWrapper::getRowReader(schema.get(), iter->val()); auto val = edgeReader->getValueByName("playerName"); EXPECT_EQ("Tim Duncan", val.getStr()); @@ -1572,10 +1572,10 @@ TEST(UpdateEdgeTest, Yield_Key_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto edgeReader = RowReader::getEdgePropReader(env->schemaMan_, - spaceId, - std::abs(edgeType), - iter->val()); + auto edgeReader = RowReaderWrapper::getEdgePropReader(env->schemaMan_, + spaceId, + std::abs(edgeType), + iter->val()); auto val = edgeReader->getValueByName("playerName"); EXPECT_EQ("Tim Duncan", val.getStr()); val = edgeReader->getValueByName("teamName"); @@ -1693,10 +1693,10 @@ TEST(UpdateEdgeTest, Yield_Illegal_Key_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto edgeReader = RowReader::getEdgePropReader(env->schemaMan_, - spaceId, - std::abs(edgeType), - iter->val()); + auto edgeReader = RowReaderWrapper::getEdgePropReader(env->schemaMan_, + spaceId, + std::abs(edgeType), + iter->val()); auto val = edgeReader->getValueByName("playerName"); EXPECT_EQ("Tim Duncan", val.getStr()); val = edgeReader->getValueByName("teamName"); diff --git a/src/storage/test/UpdateVertexTest.cpp b/src/storage/test/UpdateVertexTest.cpp index ea91fb41f..ae49005c4 100644 --- a/src/storage/test/UpdateVertexTest.cpp +++ b/src/storage/test/UpdateVertexTest.cpp @@ -184,7 +184,7 @@ TEST(UpdateVertexTest, No_Filter_Test) { ASSERT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto reader = RowReader::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); + auto reader = RowReaderWrapper::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); auto val = reader->getValueByName("name"); EXPECT_EQ("Tim Duncan", val.getStr()); val = reader->getValueByName("age"); @@ -312,7 +312,7 @@ TEST(UpdateVertexTest, Filter_Yield_Test2) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto reader = RowReader::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); + auto reader = RowReaderWrapper::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); auto val = reader->getValueByName("name"); EXPECT_EQ("Tim Duncan", val.getStr()); val = reader->getValueByName("age"); @@ -407,7 +407,7 @@ TEST(UpdateVertexTest, Insertable_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto reader = RowReader::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); + auto reader = RowReaderWrapper::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); auto val = reader->getValueByName("name"); EXPECT_EQ("Brandon Ingram", val.getStr()); val = reader->getValueByName("age"); @@ -492,7 +492,7 @@ TEST(UpdateVertexTest, Invalid_Update_Prop_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto reader = RowReader::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); + auto reader = RowReaderWrapper::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); auto val = reader->getValueByName("name"); EXPECT_EQ("Tim Duncan", val.getStr()); val = reader->getValueByName("age"); @@ -608,7 +608,7 @@ TEST(UpdateVertexTest, Invalid_Filter_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto reader = RowReader::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); + auto reader = RowReaderWrapper::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); auto val = reader->getValueByName("name"); EXPECT_EQ("Tim Duncan", val.getStr()); val = reader->getValueByName("age"); @@ -728,7 +728,7 @@ TEST(UpdateVertexTest, Insertable_Filter_Value_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto reader = RowReader::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); + auto reader = RowReaderWrapper::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); auto val = reader->getValueByName("name"); EXPECT_EQ("Brandon Ingram", val.getStr()); val = reader->getValueByName("age"); @@ -980,7 +980,7 @@ TEST(UpdateVertexTest, TTL_Insert_No_Exist_Test) { EXPECT_EQ(kvstore::ResultCode::SUCCEEDED, ret); EXPECT_TRUE(iter && iter->valid()); - auto reader = RowReader::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); + auto reader = RowReaderWrapper::getTagPropReader(env->schemaMan_, spaceId, tagId, iter->val()); auto val = reader->getValueByName("name"); EXPECT_EQ("Tim", val.getStr()); val = reader->getValueByName("age"); @@ -1106,7 +1106,7 @@ TEST(UpdateVertexTest, TTL_Insert_Test) { int count = 0; while (iter && iter->valid()) { - auto reader = RowReader::getRowReader(schema.get(), iter->val()); + auto reader = RowReaderWrapper::getRowReader(schema.get(), iter->val()); if (count == 1) { auto val = reader->getValueByName("name"); EXPECT_EQ("Tim Duncan", val.getStr()); diff --git a/src/tools/dbDump/DbDumper.cpp b/src/tools/dbDump/DbDumper.cpp index fdede5457..b52a2657b 100644 --- a/src/tools/dbDump/DbDumper.cpp +++ b/src/tools/dbDump/DbDumper.cpp @@ -450,7 +450,8 @@ void DbDumper::iterates(kvstore::RocksPrefixIter* it) { // only print to screen with scan mode if (FLAGS_mode == "scan") { printTagKey(key); - auto reader = RowReader::getTagPropReader(schemaMng_.get(), spaceId_, tagId, value); + auto reader = RowReaderWrapper::getTagPropReader( + schemaMng_.get(), spaceId_, tagId, value); if (!reader) { std::cerr << "Can't get tag reader of " << tagId; continue; @@ -488,8 +489,8 @@ void DbDumper::iterates(kvstore::RocksPrefixIter* it) { // only print to screen with scan mode if (FLAGS_mode == "scan") { printEdgeKey(key); - auto reader = RowReader::getEdgePropReader(schemaMng_.get(), spaceId_, - edgeType, value); + auto reader = RowReaderWrapper::getEdgePropReader(schemaMng_.get(), spaceId_, + edgeType, value); if (!reader) { std::cerr << "Can't get edge reader of " << edgeType; continue; diff --git a/src/tools/dbDump/DbDumper.h b/src/tools/dbDump/DbDumper.h index d7643ae4d..5add06ae7 100644 --- a/src/tools/dbDump/DbDumper.h +++ b/src/tools/dbDump/DbDumper.h @@ -13,7 +13,7 @@ #include "common/meta/ServerBasedSchemaManager.h" #include #include "kvstore/RocksEngine.h" -#include "codec/RowReader.h" +#include "codec/RowReaderWrapper.h" DECLARE_string(space_name); DECLARE_string(db_path); diff --git a/src/tools/storage-perf/StorageIntegrityTool.cpp b/src/tools/storage-perf/StorageIntegrityTool.cpp index 8c96071af..b5e74c7bc 100644 --- a/src/tools/storage-perf/StorageIntegrityTool.cpp +++ b/src/tools/storage-perf/StorageIntegrityTool.cpp @@ -245,7 +245,7 @@ class IntegrityTest { if (iter == vdata.tag_data.end()) { return false; } - auto tagReader = RowReader::getRowReader(iter->data, tagProvider); + auto tagReader = RowReaderWrapper::getRowReader(iter->data, tagProvider); auto ret = RowReader::getPropByName(tagReader.get(), propName_); CHECK(ok(ret)); nextId = boost::get(value(ret)); From 598d692e9e689752e6247e30c9755dff53fac4ec Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Wed, 12 Aug 2020 19:44:54 +0800 Subject: [PATCH 7/8] fix failed ut, rebased --- src/codec/RowReaderWrapper.cpp | 12 ++++---- src/codec/RowReaderWrapper.h | 40 ++++++++++++++++++------- src/codec/test/RowReaderV2Test.cpp | 4 +-- src/codec/test/RowWriterV2Test.cpp | 2 +- src/storage/exec/UpdateNode.h | 4 +-- src/storage/test/AddEdgesTest.cpp | 2 ++ src/storage/test/AddVerticesTest.cpp | 2 ++ src/storage/test/DeleteEdgesTest.cpp | 2 ++ src/storage/test/DeleteVerticesTest.cpp | 2 ++ 9 files changed, 50 insertions(+), 20 deletions(-) diff --git a/src/codec/RowReaderWrapper.cpp b/src/codec/RowReaderWrapper.cpp index 52d09b811..52b0bea2f 100644 --- a/src/codec/RowReaderWrapper.cpp +++ b/src/codec/RowReaderWrapper.cpp @@ -81,12 +81,13 @@ RowReaderWrapper RowReaderWrapper::getRowReader( RowReaderWrapper::RowReaderWrapper(const meta::SchemaProviderIf* schema, const folly::StringPiece& row, - int32_t& readerVer) { + int32_t& readerVer) + : readerVer_(readerVer) { CHECK_NOTNULL(schema); - if (readerVer == 1) { + if (readerVer_ == 1) { readerV1_.resetImpl(schema, row); currReader_ = &readerV1_; - } else if (readerVer == 2) { + } else if (readerVer_ == 2) { readerV2_.resetImpl(schema, row); currReader_ = &readerV2_; } else { @@ -98,11 +99,12 @@ bool RowReaderWrapper::reset(meta::SchemaProviderIf const* schema, folly::StringPiece row, int32_t readerVer) noexcept { CHECK_NOTNULL(schema); - if (readerVer == 1) { + readerVer_ = readerVer; + if (readerVer_ == 1) { readerV1_.resetImpl(schema, row); currReader_ = &readerV1_; return true; - } else if (readerVer == 2) { + } else if (readerVer_ == 2) { readerV2_.resetImpl(schema, row); currReader_ = &readerV2_; return true; diff --git a/src/codec/RowReaderWrapper.h b/src/codec/RowReaderWrapper.h index 19df50d33..91c6b4c37 100644 --- a/src/codec/RowReaderWrapper.h +++ b/src/codec/RowReaderWrapper.h @@ -21,10 +21,37 @@ class RowReaderWrapper : public RowReader { public: RowReaderWrapper() = default; + RowReaderWrapper(const RowReaderWrapper&) = delete; + RowReaderWrapper& operator=(const RowReaderWrapper&) = delete; - RowReaderWrapper(RowReaderWrapper&&) = default; - RowReaderWrapper& operator=(RowReaderWrapper&&) = default; + + RowReaderWrapper(RowReaderWrapper&& rhs) { + this->readerVer_ = rhs.readerVer_; + if (this->readerVer_ == 1) { + this->readerV1_ = std::move(rhs.readerV1_); + this->currReader_ = &(this->readerV1_); + } else if (this->readerVer_ == 2) { + this->readerV2_ = std::move(rhs.readerV2_); + this->currReader_ = &(this->readerV2_); + } else { + this->currReader_ = nullptr; + } + } + + RowReaderWrapper& operator=(RowReaderWrapper&& rhs) { + this->readerVer_ = rhs.readerVer_; + if (this->readerVer_ == 1) { + this->readerV1_ = std::move(rhs.readerV1_); + this->currReader_ = &(this->readerV1_); + } else if (this->readerVer_ == 2) { + this->readerV2_ = std::move(rhs.readerV2_); + this->currReader_ = &(this->readerV2_); + } else { + this->currReader_ = nullptr; + } + return *this; + } static RowReaderWrapper getTagPropReader(meta::SchemaManager* schemaMan, GraphSpaceID space, @@ -126,14 +153,6 @@ class RowReaderWrapper : public RowReader { return currReader_ != nullptr; } - bool operator==(const RowReaderWrapper& rhs) const noexcept { - return !operator!=(rhs); - } - - bool operator!=(const RowReaderWrapper& rhs) const noexcept { - return data_ == rhs.data_; - } - RowReaderWrapper* operator->() const noexcept { return get(); } @@ -158,6 +177,7 @@ class RowReaderWrapper : public RowReader { RowReaderV1 readerV1_; RowReaderV2 readerV2_; RowReader* currReader_ = nullptr; + int32_t readerVer_ = 0; }; } // namespace nebula diff --git a/src/codec/test/RowReaderV2Test.cpp b/src/codec/test/RowReaderV2Test.cpp index 9037169c9..d1c3db95d 100644 --- a/src/codec/test/RowReaderV2Test.cpp +++ b/src/codec/test/RowReaderV2Test.cpp @@ -18,8 +18,8 @@ TEST(RowReaderV2, headerInfo) { // Simplest row, nothing in it char data1[] = {0x08}; SchemaWriter schema1; - auto reader = RowReaderWrapper::getRowReader(&schema1, - folly::StringPiece(data1, sizeof(data1))); + auto reader = RowReaderWrapper::getRowReader( + &schema1, folly::StringPiece(data1, sizeof(data1))); ASSERT_TRUE(!!reader); EXPECT_EQ(0, reader->schemaVer()); EXPECT_EQ(sizeof(data1), reader->headerLen()); diff --git a/src/codec/test/RowWriterV2Test.cpp b/src/codec/test/RowWriterV2Test.cpp index fca2da612..ecf6d6106 100644 --- a/src/codec/test/RowWriterV2Test.cpp +++ b/src/codec/test/RowWriterV2Test.cpp @@ -374,7 +374,7 @@ TEST(RowWriterV2, Timestamp) { ASSERT_EQ(WriteResult::SUCCEEDED, writer.finish()); std::string encoded1 = writer.moveEncodedStr(); - auto reader1 = RowReader::getRowReader(&schema, encoded1); + auto reader1 = RowReaderWrapper::getRowReader(&schema, encoded1); // Col01 Value v1 = reader1->getValueByName("Col01"); diff --git a/src/storage/exec/UpdateNode.h b/src/storage/exec/UpdateNode.h index 110385cb2..11cfe4833 100644 --- a/src/storage/exec/UpdateNode.h +++ b/src/storage/exec/UpdateNode.h @@ -455,8 +455,8 @@ class UpdateEdgeNode : public RelNode { } // build key, value is emtpy - auto version = - std::numeric_limits::max() - time::WallClock::fastNowInMicroSec(); + auto version = FLAGS_enable_multi_versions ? + std::numeric_limits::max() - time::WallClock::fastNowInMicroSec() : 0L; // Switch version to big-endian, make sure the key is in ordered. version = folly::Endian::big(version); key_ = NebulaKeyUtils::edgeKey(planContext_->vIdLen_, diff --git a/src/storage/test/AddEdgesTest.cpp b/src/storage/test/AddEdgesTest.cpp index 9ef30b4d5..8f256e284 100644 --- a/src/storage/test/AddEdgesTest.cpp +++ b/src/storage/test/AddEdgesTest.cpp @@ -92,6 +92,8 @@ TEST(AddEdgesTest, MultiVersionTest) { } LOG(INFO) << "Check data in kv store..."; + // The number of data in serve is 668 + checkAddEdgesData(req, env, 668, 2); FLAGS_enable_multi_versions = false; } diff --git a/src/storage/test/AddVerticesTest.cpp b/src/storage/test/AddVerticesTest.cpp index 7ee22547b..007513903 100644 --- a/src/storage/test/AddVerticesTest.cpp +++ b/src/storage/test/AddVerticesTest.cpp @@ -93,6 +93,8 @@ TEST(AddVerticesTest, MultiVersionTest) { } LOG(INFO) << "Check data in kv store..."; + // The number of vertices is 162 + checkAddVerticesData(req, env, 162, 2); FLAGS_enable_multi_versions = false; } diff --git a/src/storage/test/DeleteEdgesTest.cpp b/src/storage/test/DeleteEdgesTest.cpp index 98e6e7a2c..be0893e14 100644 --- a/src/storage/test/DeleteEdgesTest.cpp +++ b/src/storage/test/DeleteEdgesTest.cpp @@ -98,6 +98,8 @@ TEST(DeleteEdgesTest, MultiVersionTest) { } LOG(INFO) << "Check data in kv store..."; + // The number of data in serve is 668 + checkAddEdgesData(req, env, 668, 2); } // Delete edges diff --git a/src/storage/test/DeleteVerticesTest.cpp b/src/storage/test/DeleteVerticesTest.cpp index e106ec7de..503af15d0 100644 --- a/src/storage/test/DeleteVerticesTest.cpp +++ b/src/storage/test/DeleteVerticesTest.cpp @@ -99,6 +99,8 @@ TEST(DeleteVerticesTest, MultiVersionTest) { } LOG(INFO) << "Check data in kv store..."; + // The number of vertices is 162 + checkAddVerticesData(req, env, 162, 2); } // Delete vertices From 21cd2a08a435ace935edafa181cb916b558c9d8c Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Mon, 10 Aug 2020 12:24:38 +0800 Subject: [PATCH 8/8] add encode for benchmark resp, update benchmark --- src/storage/test/GetNeighborsBenchmark.cpp | 217 ++++++++++++++++----- 1 file changed, 163 insertions(+), 54 deletions(-) diff --git a/src/storage/test/GetNeighborsBenchmark.cpp b/src/storage/test/GetNeighborsBenchmark.cpp index 5630baa9e..ba866e272 100644 --- a/src/storage/test/GetNeighborsBenchmark.cpp +++ b/src/storage/test/GetNeighborsBenchmark.cpp @@ -12,7 +12,7 @@ #include "storage/exec/EdgeNode.h" DEFINE_uint64(max_rank, 1000, "max rank of each edge"); -DEFINE_double(filter_ratio, 0.5, "ratio of data would pass filter"); +DEFINE_double(filter_ratio, 0.1, "ratio of data would pass filter"); DEFINE_bool(go_record, false, ""); DEFINE_bool(kv_record, false, ""); @@ -49,6 +49,12 @@ void setUp(const char* path, EdgeRanking maxRank) { } // namespace storage } // namespace nebula +std::string encode(const nebula::storage::cpp2::GetNeighborsResponse &resp) { + std::string val; + apache::thrift::CompactSerializer::serialize(resp, &val); + return val; +} + void initContext(std::unique_ptr& planCtx, nebula::storage::EdgeContext& edgeContext, const std::vector& serveProps) { @@ -94,7 +100,8 @@ void go(int32_t iters, auto fut = processor->getFuture(); processor->process(req); auto resp = std::move(fut).get(); - folly::doNotOptimizeAway(resp); + auto encoded = encode(resp); + folly::doNotOptimizeAway(encoded); } } @@ -102,12 +109,13 @@ void goFilter(int32_t iters, const std::vector& vertex, const std::vector& playerProps, const std::vector& serveProps, - int64_t value = FLAGS_max_rank * FLAGS_filter_ratio) { + int64_t value = FLAGS_max_rank * FLAGS_filter_ratio, + bool oneFilter = true) { nebula::storage::cpp2::GetNeighborsRequest req; BENCHMARK_SUSPEND { nebula::EdgeType serve = 101; req = nebula::storage::buildRequest(vertex, playerProps, serveProps); - { + if (oneFilter) { // where serve.startYear < value nebula::RelationalExpression exp( nebula::Expression::Kind::kRelLT, @@ -116,6 +124,25 @@ void goFilter(int32_t iters, new std::string("startYear")), new nebula::ConstantExpression(nebula::Value(value))); req.traverse_spec.set_filter(nebula::Expression::encode(exp)); + } else { + // where serve.startYear < value && serve.endYear < value + // since startYear always equal to endYear, the data of which can pass filter is same, + // just to test perf of multiple filter + nebula::LogicalExpression exp( + nebula::Expression::Kind::kLogicalAnd, + new nebula::RelationalExpression( + nebula::Expression::Kind::kRelLT, + new nebula::EdgePropertyExpression( + new std::string(folly::to(serve)), + new std::string("startYear")), + new nebula::ConstantExpression(nebula::Value(value))), + new nebula::RelationalExpression( + nebula::Expression::Kind::kRelLT, + new nebula::EdgePropertyExpression( + new std::string(folly::to(serve)), + new std::string("endYear")), + new nebula::ConstantExpression(nebula::Value(value)))); + req.traverse_spec.set_filter(nebula::Expression::encode(exp)); } } auto* env = gCluster->storageEnv_.get(); @@ -124,7 +151,8 @@ void goFilter(int32_t iters, auto fut = processor->getFuture(); processor->process(req); auto resp = std::move(fut).get(); - folly::doNotOptimizeAway(resp); + auto encoded = encode(resp); + folly::doNotOptimizeAway(encoded); } } @@ -144,6 +172,7 @@ void goEdgeNode(int32_t iters, } auto totalParts = gCluster->getTotalParts(); for (decltype(iters) i = 0; i < iters; i++) { + nebula::storage::cpp2::GetNeighborsResponse resp; nebula::DataSet resultDataSet; std::hash hash; for (const auto& vId : vertex) { @@ -175,6 +204,11 @@ void goEdgeNode(int32_t iters, resultDataSet.rows.emplace_back(std::move(row)); } CHECK_EQ(vertex.size(), resultDataSet.rowSize()); + nebula::storage::cpp2::ResponseCommon result; + resp.set_result(std::move(result)); + resp.set_vertices(std::move(resultDataSet)); + auto encoded = encode(resp); + folly::doNotOptimizeAway(encoded); } } @@ -188,6 +222,9 @@ void prefix(int32_t iters, initContext(planCtx, edgeContext, serveProps); } for (decltype(iters) i = 0; i < iters; i++) { + nebula::storage::cpp2::GetNeighborsResponse resp; + nebula::DataSet resultDataSet; + nebula::GraphSpaceID spaceId = 1; nebula::TagID player = 1; nebula::EdgeType serve = 101; @@ -209,9 +246,7 @@ void prefix(int32_t iters, CHECK(!edgeSchemaIter->second.empty()); auto* edgeSchema = &(edgeSchemaIter->second); - nebula::DataSet resultDataSet; nebula::RowReaderWrapper reader; - for (const auto& vId : vertex) { nebula::PartitionID partId = (hash(vId) % totalParts) + 1; std::vector row; @@ -262,16 +297,60 @@ void prefix(int32_t iters, resultDataSet.rows.emplace_back(std::move(row)); } CHECK_EQ(vertex.size(), resultDataSet.rowSize()); + nebula::storage::cpp2::ResponseCommon result; + resp.set_result(std::move(result)); + resp.set_vertices(std::move(resultDataSet)); + auto encoded = encode(resp); + folly::doNotOptimizeAway(encoded); } } +void encodeBench(int32_t iters, + const std::vector& vertex, + const std::vector& playerProps, + const std::vector& serveProps) { + nebula::storage::cpp2::GetNeighborsResponse resp; + BENCHMARK_SUSPEND { + auto* env = gCluster->storageEnv_.get(); + auto req = nebula::storage::buildRequest(vertex, playerProps, serveProps); + auto* processor = nebula::storage::GetNeighborsProcessor::instance(env, nullptr, nullptr); + auto fut = processor->getFuture(); + processor->process(req); + resp = std::move(fut).get(); + } + for (decltype(iters) i = 0; i < iters; i++) { + auto encoded = encode(resp); + folly::doNotOptimizeAway(encoded); + } +} + +BENCHMARK(EncodeOneProperty, iters) { + encodeBench(iters, {"Tim Duncan"}, {"name"}, {"teamName"}); +} +BENCHMARK_RELATIVE(EncodeThreeProperty, iters) { + encodeBench(iters, {"Tim Duncan"}, {"name"}, {"teamName", "startYear", "endYear"}); +} +BENCHMARK_RELATIVE(EncodeFiveProperty, iters) { + encodeBench(iters, {"Tim Duncan"}, {"name"}, + {"teamName", "startYear", "endYear", "teamCareer", "teamGames"}); +} + +BENCHMARK_DRAW_LINE(); + // Players may serve more than one team, the total edges = teamCount * maxRank, which would effect // the final result, so select some player only serve one team BENCHMARK(OneVertexOneProperty, iters) { go(iters, {"Tim Duncan"}, {"name"}, {"teamName"}); } -BENCHMARK_RELATIVE(OneVertexOnePropertyWithFilter, iters) { - goFilter(iters, {"Tim Duncan"}, {"name"}, {"teamName"}); +BENCHMARK_RELATIVE(OneVertexOnlyId, iters) { + go(iters, {"Tim Duncan"}, {"name"}, {nebula::kDst}); +} +BENCHMARK_RELATIVE(OneVertexThreeProperty, iters) { + go(iters, {"Tim Duncan"}, {"name"}, {"teamName", "startYear", "endYear"}); +} +BENCHMARK_RELATIVE(OneVertexFiveProperty, iters) { + go(iters, {"Tim Duncan"}, {"name"}, + {"teamName", "startYear", "endYear", "teamCareer", "teamGames"}); } BENCHMARK_RELATIVE(OneVertexOnePropertyOnlyEdgeNode, iters) { goEdgeNode(iters, {"Tim Duncan"}, {"name"}, {"teamName"}); @@ -282,6 +361,35 @@ BENCHMARK_RELATIVE(OneVertexOneProperyOnlyKV, iters) { BENCHMARK_DRAW_LINE(); +BENCHMARK(NoFilter, iters) { + go(iters, {"Tim Duncan"}, {"name"}, {"teamName"}); +} +BENCHMARK_RELATIVE(OneFilterNonePass, iters) { + goFilter(iters, {"Tim Duncan"}, {"name"}, {"teamName"}, FLAGS_max_rank * 0); +} +BENCHMARK_RELATIVE(OneFilterFewPass, iters) { + goFilter(iters, {"Tim Duncan"}, {"name"}, {"teamName"}, FLAGS_max_rank * 0.1); +} +BENCHMARK_RELATIVE(OneFilterHalfPass, iters) { + goFilter(iters, {"Tim Duncan"}, {"name"}, {"teamName"}, FLAGS_max_rank * 0.5); +} +BENCHMARK_RELATIVE(OneFilterAllPass, iters) { + goFilter(iters, {"Tim Duncan"}, {"name"}, {"teamName"}, FLAGS_max_rank * 1); +} +BENCHMARK_RELATIVE(TwoFilterNonePass, iters) { + goFilter(iters, {"Tim Duncan"}, {"name"}, {"teamName"}, FLAGS_max_rank * 0, false); +} +BENCHMARK_RELATIVE(TwoFilterFewPass, iters) { + goFilter(iters, {"Tim Duncan"}, {"name"}, {"teamName"}, FLAGS_max_rank * 0.1, false); +} +BENCHMARK_RELATIVE(TwoFilterHalfPass, iters) { + goFilter(iters, {"Tim Duncan"}, {"name"}, {"teamName"}, FLAGS_max_rank * 0.5, false); +} +BENCHMARK_RELATIVE(TwoFilterAllPass, iters) { + goFilter(iters, {"Tim Duncan"}, {"name"}, {"teamName"}, FLAGS_max_rank * 1, false); +} +BENCHMARK_DRAW_LINE(); + BENCHMARK(TenVertexOneProperty, iters) { go(iters, {"Tim Duncan", "Kobe Bryant", "Stephen Curry", "Manu Ginobili", "Joel Embiid", @@ -289,13 +397,26 @@ BENCHMARK(TenVertexOneProperty, iters) { {"name"}, {"teamName"}); } -BENCHMARK_RELATIVE(TenVertexOnePropertyWithFilter, iters) { - goFilter( - iters, - {"Tim Duncan", "Kobe Bryant", "Stephen Curry", "Manu Ginobili", "Joel Embiid", +BENCHMARK_RELATIVE(TenVertexOnlyId, iters) { + go(iters, + {"Tim Duncan", "Kobe Bryant", "Stephen Curry", "Manu Ginobili", "Joel Embiid", "Giannis Antetokounmpo", "Yao Ming", "Damian Lillard", "Dirk Nowitzki", "Klay Thompson"}, - {"name"}, - {"teamName"}); + {"name"}, + {nebula::kDst}); +} +BENCHMARK_RELATIVE(TenVertexThreeProperty, iters) { + go(iters, + {"Tim Duncan", "Kobe Bryant", "Stephen Curry", "Manu Ginobili", "Joel Embiid", + "Giannis Antetokounmpo", "Yao Ming", "Damian Lillard", "Dirk Nowitzki", "Klay Thompson"}, + {"name"}, + {"teamName", "startYear", "endYear"}); +} +BENCHMARK_RELATIVE(TenVertexFiveProperty, iters) { + go(iters, + {"Tim Duncan", "Kobe Bryant", "Stephen Curry", "Manu Ginobili", "Joel Embiid", + "Giannis Antetokounmpo", "Yao Ming", "Damian Lillard", "Dirk Nowitzki", "Klay Thompson"}, + {"name"}, + {"teamName", "startYear", "endYear", "teamCareer", "teamGames"}); } BENCHMARK_RELATIVE(TenVertexOnePropertyOnlyEdgeNode, iters) { goEdgeNode( @@ -319,9 +440,9 @@ int main(int argc, char** argv) { nebula::fs::TempDir rootPath("/tmp/GetNeighborsBenchmark.XXXXXX"); nebula::storage::setUp(rootPath.path(), FLAGS_max_rank); if (FLAGS_go_record) { - go(1000000, {"Tim Duncan"}, {"name"}, {"teamName"}); + go(100000, {"Tim Duncan"}, {"name"}, {"teamName"}); } else if (FLAGS_kv_record) { - prefix(1000000, {"Tim Duncan"}, {"name"}, {"teamName"}); + prefix(100000, {"Tim Duncan"}, {"name"}, {"teamName"}); } else { folly::runBenchmarks(); } @@ -336,46 +457,34 @@ release --max_rank=1000 --filter_ratio=0.1 ============================================================================ -/home/doodle.wang/Git/nebula-storage/src/storage/test/GetNeighborsBenchmark.cpprelative time/iter iters/s +GetNeighborsBenchmark.cpprelative time/iter iters/s ============================================================================ -OneVertexOneProperty 533.81us 1.87K -OneVertexOnePropertyWithFilter 111.64% 478.15us 2.09K -OneVertexOnePropertyOnlyEdgeNode 108.02% 494.18us 2.02K -OneVertexOneProperyOnlyKV 109.63% 486.93us 2.05K +EncodeOneProperty 65.01us 15.38K +EncodeThreeProperty 49.36% 131.70us 7.59K +EncodeFiveProperty 40.26% 161.47us 6.19K ---------------------------------------------------------------------------- -TenVertexOneProperty 5.33ms 187.70 -TenVertexOnePropertyWithFilter 112.74% 4.73ms 211.62 -TenVertexOnePropertyOnlyEdgeNode 105.42% 5.05ms 197.88 -TenVertexOneProperyOnlyKV 107.75% 4.94ms 202.25 -============================================================================ - ---max_rank=1000 --filter_ratio=0.5 -============================================================================ -/home/doodle.wang/Git/nebula-storage/src/storage/test/GetNeighborsBenchmark.cpprelative time/iter iters/s -============================================================================ -OneVertexOneProperty 529.59us 1.89K -OneVertexOnePropertyWithFilter 81.76% 647.75us 1.54K -OneVertexOnePropertyOnlyEdgeNode 107.54% 492.47us 2.03K -OneVertexOneProperyOnlyKV 108.38% 488.65us 2.05K +OneVertexOneProperty 446.58us 2.24K +OneVertexOnlyId 119.10% 374.96us 2.67K +OneVertexThreeProperty 59.50% 750.53us 1.33K +OneVertexFiveProperty 46.25% 965.55us 1.04K +OneVertexOnePropertyOnlyEdgeNode 113.92% 392.01us 2.55K +OneVertexOneProperyOnlyKV 113.08% 394.93us 2.53K ---------------------------------------------------------------------------- -TenVertexOneProperty 5.30ms 188.54 -TenVertexOnePropertyWithFilter 81.95% 6.47ms 154.50 -TenVertexOnePropertyOnlyEdgeNode 106.09% 5.00ms 200.02 -TenVertexOneProperyOnlyKV 108.26% 4.90ms 204.12 -============================================================================ - ---max_rank=1000 --filter_ratio=1 -============================================================================ -/home/doodle.wang/Git/nebula-storage/src/storage/test/GetNeighborsBenchmark.cpprelative time/iter iters/s -============================================================================ -OneVertexOneProperty 522.41us 1.91K -OneVertexOnePropertyWithFilter 62.76% 832.45us 1.20K -OneVertexOnePropertyOnlyEdgeNode 108.25% 482.58us 2.07K -OneVertexOneProperyOnlyKV 107.87% 484.31us 2.06K +NoFilter 444.84us 2.25K +OneFilterNonePass 106.16% 419.01us 2.39K +OneFilterFewPass 98.26% 452.73us 2.21K +OneFilterHalfPass 73.95% 601.51us 1.66K +OneFilterAllPass 57.33% 775.97us 1.29K +TwoFilterNonePass 66.52% 668.76us 1.50K +TwoFilterFewPass 62.92% 706.97us 1.41K +TwoFilterHalfPass 51.78% 859.14us 1.16K +TwoFilterAllPass 42.60% 1.04ms 957.56 ---------------------------------------------------------------------------- -TenVertexOneProperty 5.30ms 188.83 -TenVertexOnePropertyWithFilter 69.70% 7.60ms 131.62 -TenVertexOnePropertyOnlyEdgeNode 119.34% 4.44ms 225.34 -TenVertexOneProperyOnlyKV 120.89% 4.38ms 228.27 +TenVertexOneProperty 4.40ms 227.06 +TenVertexOnlyId 119.68% 3.68ms 271.76 +TenVertexThreeProperty 59.25% 7.43ms 134.53 +TenVertexFiveProperty 45.67% 9.64ms 103.69 +TenVertexOnePropertyOnlyEdgeNode 109.45% 4.02ms 248.53 +TenVertexOneProperyOnlyKV 109.23% 4.03ms 248.03 ============================================================================ */