From 8810eccb5215153ba1887e59335059527db80c4e Mon Sep 17 00:00:00 2001 From: CPWstatic <13495049+CPWstatic@users.noreply.github.com> Date: Thu, 2 Apr 2020 13:59:57 +0800 Subject: [PATCH] Sampling apply to all edges starting from a vertex. (#2013) * Sampling from all edges. * Update test for sampling multi edges. * Testing both inbound and outbound. * Fix clang compiling problem. * Update the sampling. Co-authored-by: dutor <440396+dutor@users.noreply.github.com> --- src/storage/query/QueryBaseProcessor.h | 6 +- src/storage/query/QueryBaseProcessor.inl | 23 +----- src/storage/query/QueryBoundProcessor.cpp | 98 +++++++++++++++++++++-- src/storage/query/QueryBoundProcessor.h | 5 ++ src/storage/query/QueryStatsProcessor.cpp | 11 +-- src/storage/test/QueryBoundTest.cpp | 81 ++++++++++++++----- 6 files changed, 170 insertions(+), 54 deletions(-) diff --git a/src/storage/query/QueryBaseProcessor.h b/src/storage/query/QueryBaseProcessor.h index eb028dc7a71..4409a04d80e 100644 --- a/src/storage/query/QueryBaseProcessor.h +++ b/src/storage/query/QueryBaseProcessor.h @@ -27,9 +27,8 @@ const std::unordered_map kPropsInKey_ = }; using EdgeProcessor - = std::function& props)>; + = std::function reader, + folly::StringPiece key)>; struct Bucket { std::vector> vertices_; }; @@ -106,7 +105,6 @@ class QueryBaseProcessor : public BaseProcessor { PartitionID partId, VertexID vId, EdgeType edgeType, - const std::vector& props, FilterContext* fcontext, EdgeProcessor proc); diff --git a/src/storage/query/QueryBaseProcessor.inl b/src/storage/query/QueryBaseProcessor.inl index 56ec947cb8b..10f4553e30c 100644 --- a/src/storage/query/QueryBaseProcessor.inl +++ b/src/storage/query/QueryBaseProcessor.inl @@ -480,7 +480,6 @@ kvstore::ResultCode QueryBaseProcessor::collectEdgeProps( PartitionID partId, VertexID vId, EdgeType edgeType, - const std::vector& props, FilterContext* fcontext, EdgeProcessor proc) { auto prefix = NebulaKeyUtils::edgePrefix(partId, vId, edgeType); @@ -496,15 +495,6 @@ kvstore::ResultCode QueryBaseProcessor::collectEdgeProps( int cnt = 0; bool onlyStructure = onlyStructures_[edgeType]; Getters getters; - std::unique_ptr, std::string>>> sampler; - if (FLAGS_enable_reservoir_sampling) { - sampler = std::make_unique< - nebula::algorithm::ReservoirSampling< - std::pair, std::string> - > - >(FLAGS_max_edge_returned_per_vertex); - } auto schema = this->schemaMan_->getEdgeSchema(spaceId_, std::abs(edgeType)); auto retTTL = getEdgeTTLInfo(edgeType); @@ -605,24 +595,13 @@ kvstore::ResultCode QueryBaseProcessor::collectEdgeProps( } } - if (FLAGS_enable_reservoir_sampling) { - sampler->sampling(std::make_pair(std::move(reader), key.str())); - } else { - proc(reader.get(), key, props); - } + proc(std::move(reader), key); ++cnt; if (firstLoop) { firstLoop = false; } } - if (FLAGS_enable_reservoir_sampling) { - auto samples = std::move(*sampler).samples(); - for (auto& sample : samples) { - proc(sample.first.get(), sample.second, props); - } - } - return ret; } diff --git a/src/storage/query/QueryBoundProcessor.cpp b/src/storage/query/QueryBoundProcessor.cpp index 8ee5fae110f..cabc6219da6 100644 --- a/src/storage/query/QueryBoundProcessor.cpp +++ b/src/storage/query/QueryBoundProcessor.cpp @@ -34,18 +34,19 @@ kvstore::ResultCode QueryBoundProcessor::processEdgeImpl(const PartitionID partI std::vector edges; edges.reserve(FLAGS_reserved_edges_one_vertex); auto ret = collectEdgeProps( - partId, vId, edgeType, props, &fcontext, - [&, this](RowReader* reader, folly::StringPiece k, const std::vector& p) { + partId, vId, edgeType, &fcontext, + [&, this](std::unique_ptr reader, + folly::StringPiece k) { cpp2::IdAndProp edge; if (!onlyStructure) { RowWriter writer(currEdgeSchema); PropsCollector collector(&writer); - this->collectProps(reader, k, p, &fcontext, &collector); + this->collectProps(reader.get(), k, props, &fcontext, &collector); edge.set_dst(collector.getDstId()); edge.set_props(writer.encode()); } else { PropsCollector collector(nullptr); - this->collectProps(reader, k, p, &fcontext, &collector); + this->collectProps(reader.get(), k, props, &fcontext, &collector); edge.set_dst(collector.getDstId()); } edges.emplace_back(std::move(edge)); @@ -80,6 +81,88 @@ kvstore::ResultCode QueryBoundProcessor::processEdge(PartitionID partId, VertexI return kvstore::ResultCode::SUCCEEDED; } +kvstore::ResultCode QueryBoundProcessor::processEdgeSampling(const PartitionID partId, + const VertexID vId, + FilterContext& fcontext, + cpp2::VertexData& vdata) { + using Sample = std::tuple< + EdgeType, /* type */ + std::string, /* key */ + std::unique_ptr, /* val */ + std::shared_ptr, /* schema of this value*/ + const std::vector* /* props needed */>; + auto sampler = std::make_unique< + nebula::algorithm::ReservoirSampling + >(FLAGS_max_edge_returned_per_vertex); + + for (const auto& ec : edgeContexts_) { + auto edgeType = ec.first; + auto* props = &ec.second; + bool onlyStructure = onlyStructures_[edgeType]; + std::shared_ptr currEdgeSchema; + if (!onlyStructure) { + auto schema = edgeSchema_.find(edgeType); + if (schema == edgeSchema_.end()) { + LOG(ERROR) << "Not found the edge type: " << edgeType; + return kvstore::ResultCode::ERR_EDGE_NOT_FOUND; + } + currEdgeSchema = schema->second; + } + if (!props->empty()) { + CHECK(!onlyVertexProps_); + auto ret = collectEdgeProps( + partId, vId, edgeType, &fcontext, + [&](std::unique_ptr reader, + folly::StringPiece k) { + sampler->sampling( + std::make_tuple(edgeType, k.str(), std::move(reader), + currEdgeSchema, props)); + }); + if (ret != kvstore::ResultCode::SUCCEEDED) { + return ret; + } + } + } + + std::unordered_map edgeDataMap; + auto samples = std::move(*sampler).samples(); + for (auto& sample : samples) { + auto edgeType = std::get<0>(sample); + auto currEdgeSchema = std::get<3>(sample); + auto& props = *std::get<4>(sample); + bool onlyStructure = onlyStructures_[edgeType]; + cpp2::IdAndProp edge; + if (!onlyStructure) { + RowWriter writer(currEdgeSchema); + PropsCollector collector(&writer); + this->collectProps( + std::get<2>(sample).get(), std::get<1>(sample), props, &fcontext, &collector); + edge.set_dst(collector.getDstId()); + edge.set_props(writer.encode()); + } else { + PropsCollector collector(nullptr); + this->collectProps( + std::get<2>(sample).get(), std::get<1>(sample), props, &fcontext, &collector); + edge.set_dst(collector.getDstId()); + } + auto edges = edgeDataMap.find(edgeType); + if (edges == edgeDataMap.end()) { + cpp2::EdgeData edgeData; + edgeData.set_type(edgeType); + edgeData.edges.emplace_back(std::move(edge)); + edgeDataMap.emplace(edgeType, std::move(edgeData)); + } else { + edges->second.edges.emplace_back(std::move(edge)); + } + } + + std::transform(edgeDataMap.begin(), edgeDataMap.end(), std::back_inserter(vdata.edge_data), + [] (auto& data) { + return std::move(data).second; + }); + return kvstore::ResultCode::SUCCEEDED; +} + kvstore::ResultCode QueryBoundProcessor::processVertex(PartitionID partId, VertexID vId) { cpp2::VertexData vResp; vResp.set_vertex_id(vId); @@ -119,7 +202,12 @@ kvstore::ResultCode QueryBoundProcessor::processVertex(PartitionID partId, Verte return kvstore::ResultCode::SUCCEEDED; } - auto ret = processEdge(partId, vId, fcontext, vResp); + kvstore::ResultCode ret; + if (FLAGS_enable_reservoir_sampling) { + ret = processEdgeSampling(partId, vId, fcontext, vResp); + } else { + ret = processEdge(partId, vId, fcontext, vResp); + } if (ret != kvstore::ResultCode::SUCCEEDED) { return ret; diff --git a/src/storage/query/QueryBoundProcessor.h b/src/storage/query/QueryBoundProcessor.h index 333dd684b5f..62671b3d9c5 100644 --- a/src/storage/query/QueryBoundProcessor.h +++ b/src/storage/query/QueryBoundProcessor.h @@ -47,6 +47,11 @@ class QueryBoundProcessor kvstore::ResultCode processEdge(PartitionID partId, VertexID vId, FilterContext &fcontext, cpp2::VertexData& vdata); + kvstore::ResultCode processEdgeSampling(const PartitionID partId, + const VertexID vId, + FilterContext& fcontext, + cpp2::VertexData& vdata); + kvstore::ResultCode processEdgeImpl(const PartitionID partId, const VertexID vId, const EdgeType edgeType, const std::vector& props, diff --git a/src/storage/query/QueryStatsProcessor.cpp b/src/storage/query/QueryStatsProcessor.cpp index f06b93c5263..2a7c4fc9c85 100644 --- a/src/storage/query/QueryStatsProcessor.cpp +++ b/src/storage/query/QueryStatsProcessor.cpp @@ -85,11 +85,12 @@ kvstore::ResultCode QueryStatsProcessor::processVertex(PartitionID partId, auto edgeType = ec.first; auto& props = ec.second; if (!props.empty()) { - auto r = this->collectEdgeProps(partId, vId, edgeType, props, &fcontext, - [&, this](RowReader* reader, folly::StringPiece key, - const std::vector& p) { - this->collectProps(reader, key, p, &fcontext, - &collector_); + auto r = this->collectEdgeProps(partId, vId, edgeType, &fcontext, + [&, this](std::unique_ptr reader, + folly::StringPiece key) { + this->collectProps( + reader.get(), key, props, &fcontext, + &collector_); }); if (r != kvstore::ResultCode::SUCCEEDED) { return r; diff --git a/src/storage/test/QueryBoundTest.cpp b/src/storage/test/QueryBoundTest.cpp index f9e1c1fe100..f81696ca4b7 100644 --- a/src/storage/test/QueryBoundTest.cpp +++ b/src/storage/test/QueryBoundTest.cpp @@ -206,6 +206,8 @@ void checkSamplingResponse(cpp2::QueryResponse& resp, int32_t edgeFields, int32_t dstIdStart, int32_t dstIdEnd, + int32_t dstIdStartReverse, + int32_t dstIdEndReverse, int32_t edgeNum) { EXPECT_EQ(0, resp.result.failed_codes.size()); @@ -242,16 +244,21 @@ void checkSamplingResponse(cpp2::QueryResponse& resp, checkTagData(vp.tag_data, 3005, "tag_3005_col_4", vschema, folly::stringPrintf("tag_string_col_4")); + int32_t rowNum = 0; for (auto& ep : vp.edge_data) { auto it2 = schema.find(ep.type); DCHECK(it2 != schema.end()); auto provider = it2->second; - int32_t rowNum = 0; for (auto& edge : ep.get_edges()) { auto dst = edge.get_dst(); VLOG(1) << "Check edge " << vp.vertex_id << " -> " << dst << " props..."; - CHECK_LE(dstIdStart, dst); - CHECK_GE(dstIdEnd, dst); + if (ep.type < 0) { + CHECK_LE(dstIdStartReverse, dst); + CHECK_GE(dstIdEndReverse, dst); + } else { + CHECK_LE(dstIdStart, dst); + CHECK_GE(dstIdEnd, dst); + } auto reader = RowReader::getRowReader(edge.props, provider); DCHECK(reader != nullptr); EXPECT_EQ(edgeFields, reader->numFields() + 1); @@ -275,9 +282,9 @@ void checkSamplingResponse(cpp2::QueryResponse& resp, } rowNum++; } - EXPECT_EQ(edgeNum, rowNum); - totalEdges += rowNum; } + totalEdges += rowNum; + EXPECT_EQ(edgeNum, rowNum); } EXPECT_EQ(totalEdges, *resp.get_total_edges()); } @@ -306,7 +313,7 @@ TEST(QueryBoundTest, OutBoundSimpleTest) { checkResponse(resp, 30, 12, 10001, 7); } -TEST(QueryBoundTest, inBoundSimpleTest) { +TEST(QueryBoundTest, InBoundSimpleTest) { fs::TempDir rootPath("/tmp/QueryBoundTest.XXXXXX"); LOG(INFO) << "Prepare meta..."; std::unique_ptr kv = TestUtils::initKV(rootPath.path()); @@ -593,21 +600,59 @@ TEST(QueryBoundTest, SamplingTest) { auto schemaMan = TestUtils::mockSchemaMan(); mockData(kv.get()); - cpp2::GetNeighborsRequest req; - std::vector et = {101}; - buildRequest(req, et); + { + cpp2::GetNeighborsRequest req; + std::vector et = {101}; + buildRequest(req, et); - LOG(INFO) << "Test QueryOutBoundRequest..."; - auto executor = std::make_unique(3); - auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get(), - nullptr, executor.get()); - auto f = processor->getFuture(); - processor->process(req); - auto resp = std::move(f).get(); + LOG(INFO) << "Test QueryOutBoundRequest..."; + auto executor = std::make_unique(3); + auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get(), + nullptr, executor.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + LOG(INFO) << "Check the results..."; + checkSamplingResponse(resp, 30, 12, 10001, 10007, 20001, 20005, + FLAGS_max_edge_returned_per_vertex); + } + { + cpp2::GetNeighborsRequest req; + std::vector et = {101, 102, 103}; + buildRequest(req, et); - LOG(INFO) << "Check the results..."; - checkSamplingResponse(resp, 30, 12, 10001, 10007, FLAGS_max_edge_returned_per_vertex); + LOG(INFO) << "Test QueryOutBoundRequest..."; + auto executor = std::make_unique(3); + auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get(), + nullptr, executor.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + LOG(INFO) << "Check the results..."; + checkSamplingResponse(resp, 30, 12, 10001, 10007, 20001, 20005, + FLAGS_max_edge_returned_per_vertex); + } + { + cpp2::GetNeighborsRequest req; + std::vector et = {101, -101}; + buildRequest(req, et); + + LOG(INFO) << "Test QueryOutBoundRequest..."; + auto executor = std::make_unique(3); + auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get(), + nullptr, executor.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + LOG(INFO) << "Check the results..."; + checkSamplingResponse(resp, 30, 12, 10001, 10007, 20001, 20005, + FLAGS_max_edge_returned_per_vertex); + } FLAGS_max_edge_returned_per_vertex = old_max_edge_returned; + FLAGS_enable_reservoir_sampling = false; } } // namespace storage } // namespace nebula