Skip to content

Commit

Permalink
Sampling apply to all edges starting from a vertex. (#2013)
Browse files Browse the repository at this point in the history
* Sampling from all edges.

* Update test for sampling multi edges.

* Testing both inbound and outbound.

* Fix clang compiling problem.

* Update the sampling.

Co-authored-by: dutor <440396+dutor@users.noreply.github.com>
  • Loading branch information
CPWstatic and dutor authored Apr 2, 2020
1 parent 9fc4e7f commit 8810ecc
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 54 deletions.
6 changes: 2 additions & 4 deletions src/storage/query/QueryBaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ const std::unordered_map<std::string, PropContext::PropInKeyType> kPropsInKey_ =
};

using EdgeProcessor
= std::function<void(RowReader* reader,
folly::StringPiece key,
const std::vector<PropContext>& props)>;
= std::function<void(std::unique_ptr<RowReader> reader,
folly::StringPiece key)>;
struct Bucket {
std::vector<std::pair<PartitionID, VertexID>> vertices_;
};
Expand Down Expand Up @@ -106,7 +105,6 @@ class QueryBaseProcessor : public BaseProcessor<RESP> {
PartitionID partId,
VertexID vId,
EdgeType edgeType,
const std::vector<PropContext>& props,
FilterContext* fcontext,
EdgeProcessor proc);

Expand Down
23 changes: 1 addition & 22 deletions src/storage/query/QueryBaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,6 @@ kvstore::ResultCode QueryBaseProcessor<REQ, RESP>::collectEdgeProps(
PartitionID partId,
VertexID vId,
EdgeType edgeType,
const std::vector<PropContext>& props,
FilterContext* fcontext,
EdgeProcessor proc) {
auto prefix = NebulaKeyUtils::edgePrefix(partId, vId, edgeType);
Expand All @@ -496,15 +495,6 @@ kvstore::ResultCode QueryBaseProcessor<REQ, RESP>::collectEdgeProps(
int cnt = 0;
bool onlyStructure = onlyStructures_[edgeType];
Getters getters;
std::unique_ptr<nebula::algorithm::ReservoirSampling<
std::pair<std::unique_ptr<RowReader>, std::string>>> sampler;
if (FLAGS_enable_reservoir_sampling) {
sampler = std::make_unique<
nebula::algorithm::ReservoirSampling<
std::pair<std::unique_ptr<RowReader>, std::string>
>
>(FLAGS_max_edge_returned_per_vertex);
}

auto schema = this->schemaMan_->getEdgeSchema(spaceId_, std::abs(edgeType));
auto retTTL = getEdgeTTLInfo(edgeType);
Expand Down Expand Up @@ -605,24 +595,13 @@ kvstore::ResultCode QueryBaseProcessor<REQ, RESP>::collectEdgeProps(
}
}

if (FLAGS_enable_reservoir_sampling) {
sampler->sampling(std::make_pair(std::move(reader), key.str()));
} else {
proc(reader.get(), key, props);
}
proc(std::move(reader), key);
++cnt;
if (firstLoop) {
firstLoop = false;
}
}

if (FLAGS_enable_reservoir_sampling) {
auto samples = std::move(*sampler).samples();
for (auto& sample : samples) {
proc(sample.first.get(), sample.second, props);
}
}

return ret;
}

Expand Down
98 changes: 93 additions & 5 deletions src/storage/query/QueryBoundProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,19 @@ kvstore::ResultCode QueryBoundProcessor::processEdgeImpl(const PartitionID partI
std::vector<cpp2::IdAndProp> edges;
edges.reserve(FLAGS_reserved_edges_one_vertex);
auto ret = collectEdgeProps(
partId, vId, edgeType, props, &fcontext,
[&, this](RowReader* reader, folly::StringPiece k, const std::vector<PropContext>& p) {
partId, vId, edgeType, &fcontext,
[&, this](std::unique_ptr<RowReader> reader,
folly::StringPiece k) {
cpp2::IdAndProp edge;
if (!onlyStructure) {
RowWriter writer(currEdgeSchema);
PropsCollector collector(&writer);
this->collectProps(reader, k, p, &fcontext, &collector);
this->collectProps(reader.get(), k, props, &fcontext, &collector);
edge.set_dst(collector.getDstId());
edge.set_props(writer.encode());
} else {
PropsCollector collector(nullptr);
this->collectProps(reader, k, p, &fcontext, &collector);
this->collectProps(reader.get(), k, props, &fcontext, &collector);
edge.set_dst(collector.getDstId());
}
edges.emplace_back(std::move(edge));
Expand Down Expand Up @@ -80,6 +81,88 @@ kvstore::ResultCode QueryBoundProcessor::processEdge(PartitionID partId, VertexI
return kvstore::ResultCode::SUCCEEDED;
}

kvstore::ResultCode QueryBoundProcessor::processEdgeSampling(const PartitionID partId,
const VertexID vId,
FilterContext& fcontext,
cpp2::VertexData& vdata) {
using Sample = std::tuple<
EdgeType, /* type */
std::string, /* key */
std::unique_ptr<RowReader>, /* val */
std::shared_ptr<meta::SchemaProviderIf>, /* schema of this value*/
const std::vector<PropContext>* /* props needed */>;
auto sampler = std::make_unique<
nebula::algorithm::ReservoirSampling<Sample>
>(FLAGS_max_edge_returned_per_vertex);

for (const auto& ec : edgeContexts_) {
auto edgeType = ec.first;
auto* props = &ec.second;
bool onlyStructure = onlyStructures_[edgeType];
std::shared_ptr<meta::SchemaProviderIf> currEdgeSchema;
if (!onlyStructure) {
auto schema = edgeSchema_.find(edgeType);
if (schema == edgeSchema_.end()) {
LOG(ERROR) << "Not found the edge type: " << edgeType;
return kvstore::ResultCode::ERR_EDGE_NOT_FOUND;
}
currEdgeSchema = schema->second;
}
if (!props->empty()) {
CHECK(!onlyVertexProps_);
auto ret = collectEdgeProps(
partId, vId, edgeType, &fcontext,
[&](std::unique_ptr<RowReader> reader,
folly::StringPiece k) {
sampler->sampling(
std::make_tuple(edgeType, k.str(), std::move(reader),
currEdgeSchema, props));
});
if (ret != kvstore::ResultCode::SUCCEEDED) {
return ret;
}
}
}

std::unordered_map<EdgeType, cpp2::EdgeData> edgeDataMap;
auto samples = std::move(*sampler).samples();
for (auto& sample : samples) {
auto edgeType = std::get<0>(sample);
auto currEdgeSchema = std::get<3>(sample);
auto& props = *std::get<4>(sample);
bool onlyStructure = onlyStructures_[edgeType];
cpp2::IdAndProp edge;
if (!onlyStructure) {
RowWriter writer(currEdgeSchema);
PropsCollector collector(&writer);
this->collectProps(
std::get<2>(sample).get(), std::get<1>(sample), props, &fcontext, &collector);
edge.set_dst(collector.getDstId());
edge.set_props(writer.encode());
} else {
PropsCollector collector(nullptr);
this->collectProps(
std::get<2>(sample).get(), std::get<1>(sample), props, &fcontext, &collector);
edge.set_dst(collector.getDstId());
}
auto edges = edgeDataMap.find(edgeType);
if (edges == edgeDataMap.end()) {
cpp2::EdgeData edgeData;
edgeData.set_type(edgeType);
edgeData.edges.emplace_back(std::move(edge));
edgeDataMap.emplace(edgeType, std::move(edgeData));
} else {
edges->second.edges.emplace_back(std::move(edge));
}
}

std::transform(edgeDataMap.begin(), edgeDataMap.end(), std::back_inserter(vdata.edge_data),
[] (auto& data) {
return std::move(data).second;
});
return kvstore::ResultCode::SUCCEEDED;
}

kvstore::ResultCode QueryBoundProcessor::processVertex(PartitionID partId, VertexID vId) {
cpp2::VertexData vResp;
vResp.set_vertex_id(vId);
Expand Down Expand Up @@ -119,7 +202,12 @@ kvstore::ResultCode QueryBoundProcessor::processVertex(PartitionID partId, Verte
return kvstore::ResultCode::SUCCEEDED;
}

auto ret = processEdge(partId, vId, fcontext, vResp);
kvstore::ResultCode ret;
if (FLAGS_enable_reservoir_sampling) {
ret = processEdgeSampling(partId, vId, fcontext, vResp);
} else {
ret = processEdge(partId, vId, fcontext, vResp);
}

if (ret != kvstore::ResultCode::SUCCEEDED) {
return ret;
Expand Down
5 changes: 5 additions & 0 deletions src/storage/query/QueryBoundProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ class QueryBoundProcessor
kvstore::ResultCode processEdge(PartitionID partId, VertexID vId, FilterContext &fcontext,
cpp2::VertexData& vdata);

kvstore::ResultCode processEdgeSampling(const PartitionID partId,
const VertexID vId,
FilterContext& fcontext,
cpp2::VertexData& vdata);

kvstore::ResultCode processEdgeImpl(const PartitionID partId, const VertexID vId,
const EdgeType edgeType,
const std::vector<PropContext>& props,
Expand Down
11 changes: 6 additions & 5 deletions src/storage/query/QueryStatsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,12 @@ kvstore::ResultCode QueryStatsProcessor::processVertex(PartitionID partId,
auto edgeType = ec.first;
auto& props = ec.second;
if (!props.empty()) {
auto r = this->collectEdgeProps(partId, vId, edgeType, props, &fcontext,
[&, this](RowReader* reader, folly::StringPiece key,
const std::vector<PropContext>& p) {
this->collectProps(reader, key, p, &fcontext,
&collector_);
auto r = this->collectEdgeProps(partId, vId, edgeType, &fcontext,
[&, this](std::unique_ptr<RowReader> reader,
folly::StringPiece key) {
this->collectProps(
reader.get(), key, props, &fcontext,
&collector_);
});
if (r != kvstore::ResultCode::SUCCEEDED) {
return r;
Expand Down
81 changes: 63 additions & 18 deletions src/storage/test/QueryBoundTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ void checkSamplingResponse(cpp2::QueryResponse& resp,
int32_t edgeFields,
int32_t dstIdStart,
int32_t dstIdEnd,
int32_t dstIdStartReverse,
int32_t dstIdEndReverse,
int32_t edgeNum) {
EXPECT_EQ(0, resp.result.failed_codes.size());

Expand Down Expand Up @@ -242,16 +244,21 @@ void checkSamplingResponse(cpp2::QueryResponse& resp,
checkTagData<std::string>(vp.tag_data, 3005, "tag_3005_col_4", vschema,
folly::stringPrintf("tag_string_col_4"));

int32_t rowNum = 0;
for (auto& ep : vp.edge_data) {
auto it2 = schema.find(ep.type);
DCHECK(it2 != schema.end());
auto provider = it2->second;
int32_t rowNum = 0;
for (auto& edge : ep.get_edges()) {
auto dst = edge.get_dst();
VLOG(1) << "Check edge " << vp.vertex_id << " -> " << dst << " props...";
CHECK_LE(dstIdStart, dst);
CHECK_GE(dstIdEnd, dst);
if (ep.type < 0) {
CHECK_LE(dstIdStartReverse, dst);
CHECK_GE(dstIdEndReverse, dst);
} else {
CHECK_LE(dstIdStart, dst);
CHECK_GE(dstIdEnd, dst);
}
auto reader = RowReader::getRowReader(edge.props, provider);
DCHECK(reader != nullptr);
EXPECT_EQ(edgeFields, reader->numFields() + 1);
Expand All @@ -275,9 +282,9 @@ void checkSamplingResponse(cpp2::QueryResponse& resp,
}
rowNum++;
}
EXPECT_EQ(edgeNum, rowNum);
totalEdges += rowNum;
}
totalEdges += rowNum;
EXPECT_EQ(edgeNum, rowNum);
}
EXPECT_EQ(totalEdges, *resp.get_total_edges());
}
Expand Down Expand Up @@ -306,7 +313,7 @@ TEST(QueryBoundTest, OutBoundSimpleTest) {
checkResponse(resp, 30, 12, 10001, 7);
}

TEST(QueryBoundTest, inBoundSimpleTest) {
TEST(QueryBoundTest, InBoundSimpleTest) {
fs::TempDir rootPath("/tmp/QueryBoundTest.XXXXXX");
LOG(INFO) << "Prepare meta...";
std::unique_ptr<kvstore::KVStore> kv = TestUtils::initKV(rootPath.path());
Expand Down Expand Up @@ -593,21 +600,59 @@ TEST(QueryBoundTest, SamplingTest) {
auto schemaMan = TestUtils::mockSchemaMan();
mockData(kv.get());

cpp2::GetNeighborsRequest req;
std::vector<EdgeType> et = {101};
buildRequest(req, et);
{
cpp2::GetNeighborsRequest req;
std::vector<EdgeType> et = {101};
buildRequest(req, et);

LOG(INFO) << "Test QueryOutBoundRequest...";
auto executor = std::make_unique<folly::CPUThreadPoolExecutor>(3);
auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get(),
nullptr, executor.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
LOG(INFO) << "Test QueryOutBoundRequest...";
auto executor = std::make_unique<folly::CPUThreadPoolExecutor>(3);
auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get(),
nullptr, executor.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();

LOG(INFO) << "Check the results...";
checkSamplingResponse(resp, 30, 12, 10001, 10007, 20001, 20005,
FLAGS_max_edge_returned_per_vertex);
}
{
cpp2::GetNeighborsRequest req;
std::vector<EdgeType> et = {101, 102, 103};
buildRequest(req, et);

LOG(INFO) << "Check the results...";
checkSamplingResponse(resp, 30, 12, 10001, 10007, FLAGS_max_edge_returned_per_vertex);
LOG(INFO) << "Test QueryOutBoundRequest...";
auto executor = std::make_unique<folly::CPUThreadPoolExecutor>(3);
auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get(),
nullptr, executor.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();

LOG(INFO) << "Check the results...";
checkSamplingResponse(resp, 30, 12, 10001, 10007, 20001, 20005,
FLAGS_max_edge_returned_per_vertex);
}
{
cpp2::GetNeighborsRequest req;
std::vector<EdgeType> et = {101, -101};
buildRequest(req, et);

LOG(INFO) << "Test QueryOutBoundRequest...";
auto executor = std::make_unique<folly::CPUThreadPoolExecutor>(3);
auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get(),
nullptr, executor.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();

LOG(INFO) << "Check the results...";
checkSamplingResponse(resp, 30, 12, 10001, 10007, 20001, 20005,
FLAGS_max_edge_returned_per_vertex);
}
FLAGS_max_edge_returned_per_vertex = old_max_edge_returned;
FLAGS_enable_reservoir_sampling = false;
}
} // namespace storage
} // namespace nebula
Expand Down

0 comments on commit 8810ecc

Please sign in to comment.