Skip to content

Commit

Permalink
Optimize subgraph (#3871)
Browse files Browse the repository at this point in the history
* Fix iterator with snapshot(parameter position) (#3785)

Co-authored-by: Harris.Chu <1726587+HarrisChu@users.noreply.github.com>
Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* add getNeighborIter size

* refactor subgraph

* fix error

* address comment

* remove project from subgraph plan

* add comment

* remove project

* fix error

Co-authored-by: Alex Xing <90179377+SuperYoko@users.noreply.github.com>
Co-authored-by: Harris.Chu <1726587+HarrisChu@users.noreply.github.com>
Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
Co-authored-by: Yee <2520865+yixinglu@users.noreply.github.com>
  • Loading branch information
5 people authored Mar 2, 2022
1 parent 3822d76 commit f975c8d
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 143 deletions.
25 changes: 25 additions & 0 deletions src/graph/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,29 @@ void GetNeighborsIter::next() {
}
}

size_t GetNeighborsIter::size() const {
size_t count = 0;
for (const auto& dsIdx : dsIndices_) {
for (const auto& row : dsIdx.ds->rows) {
for (const auto& edgeIdx : dsIdx.edgePropsMap) {
const auto& cell = row[edgeIdx.second.colIdx];
if (LIKELY(cell.isList())) {
count += cell.getList().size();
}
}
}
}
return count;
}

size_t GetNeighborsIter::numRows() const {
size_t count = 0;
for (const auto& dsIdx : dsIndices_) {
count += dsIdx.ds->size();
}
return count;
}

void GetNeighborsIter::erase() {
DCHECK_GE(bitIdx_, 0);
DCHECK_LT(bitIdx_, bitset_.size());
Expand Down Expand Up @@ -441,6 +464,7 @@ Value GetNeighborsIter::getVertex(const std::string& name) const {

List GetNeighborsIter::getVertices() {
List vertices;
vertices.reserve(numRows());
valid_ = true;
colIdx_ = -2;
for (currentDs_ = dsIndices_.begin(); currentDs_ < dsIndices_.end(); ++currentDs_) {
Expand Down Expand Up @@ -512,6 +536,7 @@ Value GetNeighborsIter::getEdge() const {

List GetNeighborsIter::getEdges() {
List edges;
edges.reserve(size());
for (; valid(); next()) {
auto edge = getEdge();
if (edge.isEdge()) {
Expand Down
8 changes: 5 additions & 3 deletions src/graph/context/Iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,11 @@ class GetNeighborsIter final : public Iterator {

void sample(int64_t count) override;

size_t size() const override {
LOG(FATAL) << "Unimplemented method for Get Neighbros iterator.";
}
// num of edges
size_t size() const override;

// num of vertices
size_t numRows() const;

const Value& getColumn(const std::string& col) const override;

Expand Down
3 changes: 2 additions & 1 deletion src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ struct SubgraphContext final : public AstContext {
Starts from;
StepClause steps;
std::string loopSteps;
YieldColumns* yieldExpr;
std::vector<std::string> colNames;
std::unordered_set<EdgeType> edgeTypes;
std::unordered_set<EdgeType> biDirectEdgeTypes;
std::vector<Value::Type> colType;
bool withProp{false};
bool getVertexProp{false};
bool getEdgeProp{false};
Expand Down
78 changes: 46 additions & 32 deletions src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,69 @@ folly::Future<Status> SubgraphExecutor::execute() {
DCHECK(currentStepVal.isInt());
auto currentStep = currentStepVal.getInt();
VLOG(1) << "Current Step is: " << currentStep << " Total Steps is: " << steps;

if (currentStep == steps) {
oneMoreStep();
return finish(ResultBuilder().value(Value(std::move(ds))).build());
}
auto resultVar = subgraph->resultVar();

VLOG(1) << "input: " << subgraph->inputVar() << " output: " << node()->outputVar();
auto iter = ectx_->getResult(subgraph->inputVar()).iter();
DCHECK(iter && iter->isGetNeighborsIter());
auto gnSize = iter->size();

ResultBuilder builder;
builder.value(iter->valuePtr());

std::unordered_map<Value, int64_t> currentVids;
currentVids.reserve(gnSize);
historyVids_.reserve(historyVids_.size() + gnSize);
if (currentStep == 1) {
for (; iter->valid(); iter->next()) {
const auto& src = iter->getColumn(nebula::kVid);
historyVids_.emplace(src);
currentVids.emplace(src, 0);
}
iter->reset();
}
for (; iter->valid(); iter->next()) {
const auto& dst = iter->getEdgeProp("*", nebula::kDst);
if (historyVids_.emplace(dst).second) {
Row row;
row.values.emplace_back(std::move(dst));
ds.rows.emplace_back(std::move(row));
}
}

VLOG(1) << "Next step vid is : " << ds;
return finish(ResultBuilder().value(Value(std::move(ds))).build());
}

void SubgraphExecutor::oneMoreStep() {
auto* subgraph = asNode<Subgraph>(node());
auto output = subgraph->oneMoreStepOutput();
VLOG(1) << "OneMoreStep Input: " << subgraph->inputVar() << " Output: " << output;
auto iter = ectx_->getResult(subgraph->inputVar()).iter();
DCHECK(iter && iter->isGetNeighborsIter());

ResultBuilder builder;
builder.value(iter->valuePtr());
auto& biDirectEdgeTypes = subgraph->biDirectEdgeTypes();
while (iter->valid()) {
const auto& dst = iter->getEdgeProp("*", nebula::kDst);
if (historyVids_.find(dst) == historyVids_.end()) {
iter->unstableErase();
auto findIter = historyVids_.find(dst);
if (findIter != historyVids_.end()) {
if (biDirectEdgeTypes.empty()) {
iter->next();
} else {
const auto& typeVal = iter->getEdgeProp("*", nebula::kType);
if (UNLIKELY(!typeVal.isInt())) {
iter->erase();
continue;
}
auto type = typeVal.getInt();
if (biDirectEdgeTypes.find(type) != biDirectEdgeTypes.end()) {
if (type < 0 || findIter->second + 2 == currentStep) {
iter->erase();
} else {
iter->next();
}
} else {
iter->next();
}
}
} else {
if (currentStep == steps) {
iter->erase();
continue;
}
if (currentVids.emplace(dst, currentStep).second) {
Row row;
row.values.emplace_back(std::move(dst));
ds.rows.emplace_back(std::move(row));
}
iter->next();
}
}
iter->reset();
builder.iter(std::move(iter));
ectx_->setResult(output, builder.build());
ectx_->setResult(resultVar, builder.build());
// update historyVids
historyVids_.insert(std::make_move_iterator(currentVids.begin()),
std::make_move_iterator(currentVids.end()));
return finish(ResultBuilder().value(Value(std::move(ds))).build());
}

} // namespace graph
Expand Down
33 changes: 29 additions & 4 deletions src/graph/executor/algo/SubgraphExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,34 @@

#include "graph/executor/Executor.h"

// Subgraph receive result from GetNeighbors
// There are two Main functions
// First : Extract the deduplicated destination VID from GetNeighbors
// Second: Delete previously visited edges and save the result(iter) to the variable `resultVar`
//
// Member:
// `historyVids_` : is hash table
// KEY : the VID of the visited destination Vertex
// VALUE : the number of steps to visit the KEY (starting vertex is 0)
// since each vertex will only be visited once, if it is a one-way edge expansion, there will be no
// duplicate edges. we only need to focus on the case of two-way expansion
//
// How to delete edges:
// determine whether a loop is formed by the number of steps. If the destination vid has been
// visited, and the number of steps of the destination vid differs by 2 from the current steps, it
// is judged that a loop is formed, the edge needs to be deleted
//
// For example: Topology is below
// a->c, a->b, b->a, b->c
// statement: get subgraph from 'a' both edge yield vertices as nodes, edges as relationships
// first steps : a->b, a->c, a<-b, all edges need to save
// second steps: b->a, b<-a, b->c, c<-a
// since it is a two-way expansion, the negative edge has already been visited,
// so b<-a & c<-a are deleted
// b->a : the number of steps of the destination vid `a` is 0, and the current steps is 2. it can be
// determined that a loop is formed, so this edge also needs to be deleted.
// b->c : determined by the number of steps that no loop is formed, so keep it

namespace nebula {
namespace graph {
class SubgraphExecutor : public Executor {
Expand All @@ -18,10 +46,7 @@ class SubgraphExecutor : public Executor {
folly::Future<Status> execute() override;

private:
void oneMoreStep();

private:
std::unordered_set<Value> historyVids_;
std::unordered_map<Value, int64_t> historyVids_;
};

} // namespace graph
Expand Down
66 changes: 36 additions & 30 deletions src/graph/executor/query/DataCollectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,43 +61,49 @@ folly::Future<Status> DataCollectExecutor::doCollect() {
}

Status DataCollectExecutor::collectSubgraph(const std::vector<std::string>& vars) {
const auto* dc = asNode<DataCollect>(node());
const auto& colType = dc->colType();
DataSet ds;
ds.colNames = std::move(colNames_);
std::unordered_set<std::tuple<Value, EdgeType, EdgeRanking, Value>> uniqueEdges;
for (auto i = vars.begin(); i != vars.end(); ++i) {
const auto& hist = ectx_->getHistory(*i);
for (auto j = hist.begin(); j != hist.end(); ++j) {
if (i == vars.begin() && j == hist.end() - 1) {
continue;
}
auto iter = (*j).iter();
if (!iter->isGetNeighborsIter()) {
std::stringstream msg;
msg << "Iterator should be kind of GetNeighborIter, but was: " << iter->kind();
return Status::Error(msg.str());
}
List vertices;
List edges;
auto* gnIter = static_cast<GetNeighborsIter*>(iter.get());
auto originVertices = gnIter->getVertices();
for (auto& v : originVertices.values) {
if (UNLIKELY(!v.isVertex())) {
continue;
const auto& hist = ectx_->getHistory(vars[0]);
for (const auto& result : hist) {
auto iter = result.iter();
auto* gnIter = static_cast<GetNeighborsIter*>(iter.get());
List vertices;
List edges;
Row row;
bool notEmpty = false;
for (const auto& type : colType) {
if (type == Value::Type::VERTEX) {
auto originVertices = gnIter->getVertices();
vertices.reserve(originVertices.size());
for (auto& v : originVertices.values) {
if (UNLIKELY(!v.isVertex())) {
continue;
}
vertices.emplace_back(std::move(v));
}
vertices.emplace_back(std::move(v));
}
auto originEdges = gnIter->getEdges();
for (auto& edge : originEdges.values) {
if (UNLIKELY(!edge.isEdge())) {
continue;
if (!vertices.empty()) {
notEmpty = true;
row.emplace_back(std::move(vertices));
}
const auto& e = edge.getEdge();
auto edgeKey = std::make_tuple(e.src, e.type, e.ranking, e.dst);
if (uniqueEdges.emplace(std::move(edgeKey)).second) {
} else {
auto originEdges = gnIter->getEdges();
edges.reserve(originEdges.size());
for (auto& edge : originEdges.values) {
if (UNLIKELY(!edge.isEdge())) {
continue;
}
edges.emplace_back(std::move(edge));
}
if (!edges.empty()) {
notEmpty = true;
}
row.emplace_back(std::move(edges));
}
ds.rows.emplace_back(Row({std::move(vertices), std::move(edges)}));
}
if (notEmpty) {
ds.rows.emplace_back(std::move(row));
}
}
result_.setDataSet(std::move(ds));
Expand Down
17 changes: 3 additions & 14 deletions src/graph/executor/test/DataCollectTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ TEST_F(DataCollectTest, CollectSubgraph) {
auto* dc = DataCollect::make(qctx_.get(), DataCollect::DCKind::kSubgraph);
dc->setInputVars({"input_datasets"});
dc->setColNames(std::vector<std::string>{"_vertices", "_edges"});
dc->setColType({Value::Type::VERTEX, Value::Type::EDGE});

auto dcExe = std::make_unique<DataCollectExecutor>(dc, qctx_.get());
auto future = dcExe->execute();
Expand All @@ -179,29 +180,21 @@ TEST_F(DataCollectTest, CollectSubgraph) {
auto iter = hist[0].iter();
auto* gNIter = static_cast<GetNeighborsIter*>(iter.get());
Row row;
std::unordered_set<Value> vids;
std::unordered_set<std::tuple<Value, int64_t, int64_t, Value>> edgeKeys;
List vertices;
List edges;
auto originVertices = gNIter->getVertices();
for (auto& v : originVertices.values) {
if (!v.isVertex()) {
continue;
}
if (vids.emplace(v.getVertex().vid).second) {
vertices.emplace_back(std::move(v));
}
vertices.emplace_back(std::move(v));
}
auto originEdges = gNIter->getEdges();
for (auto& e : originEdges.values) {
if (!e.isEdge()) {
continue;
}
auto edgeKey =
std::make_tuple(e.getEdge().src, e.getEdge().type, e.getEdge().ranking, e.getEdge().dst);
if (edgeKeys.emplace(std::move(edgeKey)).second) {
edges.emplace_back(std::move(e));
}
edges.emplace_back(std::move(e));
}
row.values.emplace_back(std::move(vertices));
row.values.emplace_back(std::move(edges));
Expand Down Expand Up @@ -243,10 +236,6 @@ TEST_F(DataCollectTest, EmptyResult) {

DataSet expected;
expected.colNames = {"_vertices", "_edges"};
Row row;
row.values.emplace_back(Value(List()));
row.values.emplace_back(Value(List()));
expected.rows.emplace_back(std::move(row));
EXPECT_EQ(result.value().getDataSet(), expected);
EXPECT_EQ(result.state(), Result::State::kSuccess);
}
Expand Down
Loading

0 comments on commit f975c8d

Please sign in to comment.