diff --git a/src/common/base/Base.h b/src/common/base/Base.h index 1e76a93dc14..35a72dc8d77 100644 --- a/src/common/base/Base.h +++ b/src/common/base/Base.h @@ -133,6 +133,9 @@ constexpr char kSrc[] = "_src"; constexpr char kType[] = "_type"; constexpr char kRank[] = "_rank"; constexpr char kDst[] = "_dst"; +constexpr char kEdgePrefix[] = "_edge"; +constexpr char kStatsPrefix[] = "_stats"; +constexpr char kExprPrefix[] = "_expr"; // Useful type traits diff --git a/src/common/datatypes/List.h b/src/common/datatypes/List.h index d2509d9b687..a2694c52ca8 100644 --- a/src/common/datatypes/List.h +++ b/src/common/datatypes/List.h @@ -80,6 +80,10 @@ struct List { return values[i]; } + Value& operator[](size_t i) { + return values[i]; + } + bool contains(const Value& value) const { return std::find(values.begin(), values.end(), value) != values.end(); } diff --git a/src/common/datatypes/Value.cpp b/src/common/datatypes/Value.cpp index 12d2b143ada..79b4a0a48ef 100644 --- a/src/common/datatypes/Value.cpp +++ b/src/common/datatypes/Value.cpp @@ -3000,4 +3000,43 @@ Value operator^(const Value& lhs, const Value& rhs) { } } } + +std::size_t VertexHash::operator()(const Value& v) const { + switch (v.type()) { + case Value::Type::VERTEX: { + auto& vid = v.getVertex().vid; + if (vid.type() == Value::Type::STRING) { + return std::hash()(vid.getStr()); + } else { + return vid.getInt(); + } + } + case Value::Type::STRING: { + return std::hash()(v.getStr()); + } + case Value::Type::INT: { + return v.getInt(); + } + default: { + return v.hash(); + } + } +} + +bool VertexEqual::operator()(const Value& lhs, const Value& rhs) const { + if (lhs.type() == rhs.type()) { + if (lhs.isVertex()) { + return lhs.getVertex().vid == rhs.getVertex().vid; + } + return lhs == rhs; + } + if (lhs.type() == Value::Type::VERTEX) { + return lhs.getVertex().vid == rhs; + } + if (rhs.type() == Value::Type::VERTEX) { + return lhs == rhs.getVertex().vid; + } + return lhs == rhs; +} + } // namespace nebula diff --git a/src/common/datatypes/Value.h b/src/common/datatypes/Value.h index 6d693533995..595ef6b58a8 100644 --- a/src/common/datatypes/Value.h +++ b/src/common/datatypes/Value.h @@ -550,6 +550,16 @@ inline uint64_t operator&(const Value::Type& lhs, const uint64_t rhs) { return static_cast(lhs) & rhs; } +struct VertexHash { + std::size_t operator()(const Value& v) const; +}; + +struct VertexEqual { + bool operator()(const Value& lhs, const Value& rhs) const; +}; + +using VidHashSet = std::unordered_set; + } // namespace nebula namespace std { diff --git a/src/common/datatypes/Vertex.h b/src/common/datatypes/Vertex.h index 539d256f28a..03561b3cd1d 100644 --- a/src/common/datatypes/Vertex.h +++ b/src/common/datatypes/Vertex.h @@ -24,6 +24,7 @@ struct Tag { Tag(const Tag& tag) : name(tag.name), props(tag.props) {} Tag(std::string tagName, std::unordered_map tagProps) : name(std::move(tagName)), props(std::move(tagProps)) {} + explicit Tag(const std::string& tagName) : name(tagName), props() {} void clear() { name.clear(); diff --git a/src/graph/context/CMakeLists.txt b/src/graph/context/CMakeLists.txt index 6c3c05a568f..2cbcbe26d37 100644 --- a/src/graph/context/CMakeLists.txt +++ b/src/graph/context/CMakeLists.txt @@ -13,6 +13,7 @@ nebula_add_library( iterator/Iterator.cpp iterator/PropIter.cpp iterator/SequentialIter.cpp + iterator/GetNbrsRespDataSetIter.cpp ) diff --git a/src/graph/context/Iterator.h b/src/graph/context/Iterator.h index af845b1d408..ae8b0deef59 100644 --- a/src/graph/context/Iterator.h +++ b/src/graph/context/Iterator.h @@ -7,6 +7,7 @@ #define GRAPH_CONTEXT_ITERATOR_H_ #include "graph/context/iterator/DefaultIter.h" +#include "graph/context/iterator/GetNbrsRespDataSetIter.h" #include "graph/context/iterator/GetNeighborsIter.h" #include "graph/context/iterator/Iterator.h" #include "graph/context/iterator/PropIter.h" diff --git a/src/graph/context/iterator/GetNbrsRespDataSetIter.cpp b/src/graph/context/iterator/GetNbrsRespDataSetIter.cpp new file mode 100644 index 00000000000..9d9a502ad9c --- /dev/null +++ b/src/graph/context/iterator/GetNbrsRespDataSetIter.cpp @@ -0,0 +1,137 @@ +/* Copyright (c) 2023 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "graph/context/iterator/GetNbrsRespDataSetIter.h" + +#include "common/base/Base.h" +#include "common/datatypes/Edge.h" +#include "common/datatypes/Vertex.h" + +namespace nebula { +namespace graph { + +bool isDataSetInvalid(const DataSet* dataset) { + const auto& colNames = dataset->colNames; + return colNames.size() < 3 || // the dataset has vid/_tag/_edge 3 columns at least + colNames[0] != nebula::kVid || // the first column should be Vid + colNames[1].find(kStatsPrefix) != 0 || // the second column could not be _stats column now + colNames.back().find(kExprPrefix) != 0; // the last column could not be _expr column now +} + +GetNbrsRespDataSetIter::GetNbrsRespDataSetIter(const DataSet* dataset) + : dataset_(DCHECK_NOTNULL(dataset)), firstEdgeColIdx_(-1), curRowIdx_(0) { + DCHECK(!isDataSetInvalid(dataset)); + for (size_t i = 0, e = dataset->colNames.size(); i < e; ++i) { + buildPropIndex(dataset->colNames[i], i); + } +} + +void GetNbrsRespDataSetIter::buildPropIndex(const std::string& colName, size_t colIdx) { + PropIndex propIdx; + propIdx.colIdx = colIdx; + + std::vector pieces; + folly::split(":", colName, pieces); + DCHECK_GE(pieces.size(), 2) << "Invalid column name: " << colName; + + propIdx.propIdxMap.reserve(pieces.size() - 2); + // if size == 2, it is the tag defined without props. + for (size_t i = 2; i < pieces.size(); ++i) { + const auto& name = pieces[i]; + size_t idx = i - 2; + if (name == kType) { + propIdx.edgeTypeIdx = idx; + } else if (name == kRank) { + propIdx.edgeRankIdx = idx; + } else if (name == kDst) { + propIdx.edgeDstIdx = idx; + } else { + DCHECK_NE(name, kSrc); + DCHECK_NE(name, kTag); + propIdx.propIdxMap.emplace(name, idx); + } + } + + folly::StringPiece prefix(pieces[0]), name(pieces[1]); + DCHECK(name.empty()) << "The name of tag/edge is empty"; + if (prefix.contains(kEdgePrefix)) { + DCHECK(name.startsWith("-") || name.startsWith("+")) << "the edge name has to start with '-/+'"; + edgePropsMap_.emplace(name, std::move(propIdx)); + + if (firstEdgeColIdx_ < 0) { + firstEdgeColIdx_ = colIdx; + } + } else if (prefix.contains(kTag)) { + tagPropsMap_.emplace(name, std::move(propIdx)); + } +} + +Value GetNbrsRespDataSetIter::getVertex() const { + // Always check the valid() before getVertex + DCHECK(valid()); + const Row& curRow = dataset_->rows[curRowIdx_]; + Vertex vertex; + vertex.vid = curRow[0]; + vertex.tags.reserve(tagPropsMap_.size()); + for (const auto& [tagName, propIdx] : tagPropsMap_) { + DCHECK_LT(propIdx.colIdx, curRow.size()); + const Value& propColumn = curRow[propIdx.colIdx]; + if (propColumn.isList()) { + const List& propList = propColumn.getList(); + + Tag tag(tagName); + tag.props.reserve(propIdx.propIdxMap.size()); + for (const auto& [propName, pIdx] : propIdx.propIdxMap) { + DCHECK_LT(pIdx, propList.size()); + tag.props.emplace(propName, propList[pIdx]); + } + + vertex.tags.emplace_back(std::move(tag)); + } + } + return vertex; +} + +std::vector GetNbrsRespDataSetIter::getAdjEdges(VidHashSet* dstSet) const { + DCHECK(valid()); + + std::vector adjEdges; + const Row& curRow = dataset_->rows[curRowIdx_]; + for (const auto& [edgeName, propIdx] : edgePropsMap_) { + DCHECK_LT(propIdx.colIdx, curRow.size()); + const Value& edgeColumn = curRow[propIdx.colIdx]; + if (edgeColumn.isList()) { + const List& propList = edgeColumn.getList(); + DCHECK_LT(propIdx.edgeDstIdx, propList.size()); + DCHECK_LT(propIdx.edgeTypeIdx, propList.size()); + DCHECK_LT(propIdx.edgeRankIdx, propList.size()); + + Edge edge; + edge.name = edgeName; + edge.src = curRow[0]; + edge.dst = propList[propIdx.edgeDstIdx]; + const Value& typeVal = propList[propIdx.edgeTypeIdx]; + edge.type = typeVal.isInt() ? typeVal.getInt() : 0; + const Value& rankVal = propList[propIdx.edgeRankIdx]; + edge.ranking = rankVal.isInt() ? rankVal.getInt() : 0; + + edge.props.reserve(propIdx.propIdxMap.size()); + for (const auto& [propName, pIdx] : propIdx.propIdxMap) { + DCHECK_LT(pIdx, propList.size()); + edge.props.emplace(propName, propList[pIdx]); + } + + if (dstSet) { + // TODO(yee): reserve dstSet + dstSet->emplace(edge.dst); + } + adjEdges.emplace_back(std::move(edge)); + } + } + return adjEdges; +} + +} // namespace graph +} // namespace nebula diff --git a/src/graph/context/iterator/GetNbrsRespDataSetIter.h b/src/graph/context/iterator/GetNbrsRespDataSetIter.h new file mode 100644 index 00000000000..e9c8532fffe --- /dev/null +++ b/src/graph/context/iterator/GetNbrsRespDataSetIter.h @@ -0,0 +1,56 @@ +/* Copyright (c) 2023 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef GRAPH_CONTEXT_ITERATOR_GETNBRSRESPDATASETITER_H_ +#define GRAPH_CONTEXT_ITERATOR_GETNBRSRESPDATASETITER_H_ + +#include "common/datatypes/DataSet.h" +#include "common/datatypes/Value.h" + +namespace nebula { +namespace graph { + +class GetNbrsRespDataSetIter final { + public: + explicit GetNbrsRespDataSetIter(const DataSet* dataset); + + bool valid() const { + return curRowIdx_ < dataset_->rowSize(); + } + // Next row in dataset + void next() { + curRowIdx_++; + } + + Value getVertex() const; + std::vector getAdjEdges(VidHashSet* dstSet) const; + + private: + struct PropIndex { + size_t colIdx; + size_t edgeTypeIdx; + size_t edgeRankIdx; + size_t edgeDstIdx; + // std::vector propList; + std::unordered_map propIdxMap; + }; + + void buildPropIndex(const std::string& colName, size_t colIdx); + + // my fields + const DataSet* dataset_; + int firstEdgeColIdx_; + size_t curRowIdx_; + + // _tag:t1:p1:p2 -> {t1 : [column_idx, [p1, p2], {p1 : 0, p2 : 1}]} + std::unordered_map tagPropsMap_; + // _edge:e1:p1:p2 -> {e1 : [column_idx, [p1, p2], {p1 : 0, p2 : 1}]} + std::unordered_map edgePropsMap_; +}; + +} // namespace graph +} // namespace nebula + +#endif // GRAPH_CONTEXT_ITERATOR_GETNBRSRESPDATASETITER_H_ diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index 3bbb9b54000..9a29e5aab44 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -42,21 +42,16 @@ Status TraverseExecutor::buildRequestVids() { bool mv = movable(traverse_->inputVars().front()); if (traverse_->trackPrevPath()) { - std::unordered_set uniqueVid; - uniqueVid.reserve(iterSize); for (; iter->valid(); iter->next()) { const auto& vid = src->eval(ctx(iter)); auto prevPath = mv ? iter->moveRow() : *iter->row(); auto vidIter = dst2PathsMap_.find(vid); if (vidIter == dst2PathsMap_.end()) { - std::vector tmp({std::move(prevPath)}); - dst2PathsMap_.emplace(vid, std::move(tmp)); + dst2PathsMap_.emplace(vid, std::vector{std::move(prevPath)}); } else { vidIter->second.emplace_back(std::move(prevPath)); } - if (uniqueVid.emplace(vid).second) { - vids_.emplace_back(vid); - } + vids_.emplace(vid); } } else { const auto& spaceInfo = qctx()->rctx()->session()->space(); @@ -64,11 +59,11 @@ Status TraverseExecutor::buildRequestVids() { auto vidType = SchemaUtil::propTypeToValueType(metaVidType.get_type()); for (; iter->valid(); iter->next()) { const auto& vid = src->eval(ctx(iter)); - if (vid.type() != vidType) { - LOG(ERROR) << "Mismatched vid type: " << vid.type() << ", space vid type: " << vidType; - continue; + DCHECK_EQ(vid.type(), vidType) + << "Mismatched vid type: " << vid.type() << ", space vid type: " << vidType; + if (vid.type() == vidType) { + vids_.emplace(vid); } - vids_.emplace_back(vid); } } return Status::OK(); @@ -83,10 +78,12 @@ folly::Future TraverseExecutor::getNeighbors() { qctx()->rctx()->session()->id(), qctx()->plan()->id(), qctx()->plan()->isProfileEnabled()); + std::vector vids(vids_.size()); + std::move(vids_.begin(), vids_.end(), vids.begin()); return storageClient ->getNeighbors(param, {nebula::kVid}, - std::move(vids_), + std::move(vids), traverse_->edgeTypes(), traverse_->edgeDirection(), finalStep ? traverse_->statProps() : nullptr, @@ -143,71 +140,43 @@ void TraverseExecutor::addStats(RpcResponse& resp, int64_t getNbrTimeInUSec) { otherStats_.emplace(folly::sformat("step[{}]", currentStep_), folly::toPrettyJson(stepObj)); } +size_t TraverseExecutor::numRowsOfRpcResp(const RpcResponse& resps) const { + size_t numRows = 0; + for (const auto& resp : resps.responses()) { + auto dataset = resp.get_vertices(); + if (dataset) { + numRows += dataset->rowSize(); + } + } + return numRows; +} + void TraverseExecutor::expandOneStep(const RpcResponse& resps) { - UNUSED(resps); - // size_t numRows = 0; - // for (const auto& resp : resps.responses()) { - // auto dataset = resp.get_vertices(); - // if (dataset) { - // numRows += dataset->rowSize(); - // } - // } - - // initVertices_.reserve(numRows); - - // for (const auto& resp : resps.responses()) { - // auto dataset = resp.get_vertices(); - // if (dataset) { - // List l; - // l.values.push_back(std::move(*dataset)); - // GetNeighborsIter iter(std::make_shared(std::move(l))); - // } - // } - // auto vertices = iter->getVertices(); - // // match (v0)-[e:Rel]-(v1:Label1)-[e1*2]->() where id(v0) in [6, 23] return v1 - // // save the vertex that meets the filter conditions as the starting vertex of the current - // // traverse - // for (auto& vertex : vertices.values) { - // if (vertex.isVertex()) { - // initVertices_.emplace_back(vertex); - // } - // } - // if (range_.min() == 0) { - // result_.rows = buildZeroStepPath(); - // } - - // if (iter->numRows() == 0) { - // return; - // } - - // Value curVertex; - // std::unordered_set uniqueVids; - // std::vector adjEdges; - // auto sz = iter->size(); - // uniqueVids.reserve(sz); - // adjEdges.reserve(sz); - // vids_.reserve(vids_.size() + sz); - // adjList_.reserve(adjList_.size() + iter->numRows() + 1u); - // for (; iter->valid(); iter->next()) { - // const auto& edge = iter->getEdge(); - // if (edge.empty()) { - // continue; - // } - // const auto& dst = edge.getEdge().dst; - // if (adjList_.find(dst) == adjList_.end() && uniqueVids.emplace(dst).second) { - // vids_.emplace_back(dst); - // } - // const auto& vertex = iter->getVertex(); - // curVertex = curVertex.empty() ? vertex : curVertex; - // if (curVertex != vertex) { - // adjList_.emplace(curVertex, std::move(adjEdges)); - // curVertex = vertex; - // } - // adjEdges.emplace_back(edge); - // } - // if (!curVertex.empty()) { - // adjList_.emplace(curVertex, std::move(adjEdges)); - // } + initVertices_.reserve(numRowsOfRpcResp(resps)); + + for (const auto& resp : resps.responses()) { + auto dataset = resp.get_vertices(); + if (dataset) { + for (GetNbrsRespDataSetIter iter(dataset); iter.valid(); iter.next()) { + Value v = iter.getVertex(); + initVertices_.emplace_back(v); + VidHashSet dstSet; + auto adjEdges = iter.getAdjEdges(&dstSet); + for (const Value& dst : dstSet) { + if (adjList_.find(dst) == adjList_.end()) { + vids_.emplace(dst); + } + } + DCHECK(adjList_.find(v) == adjList_.end()) + << "The adjacency list should not contain the source vertex"; + adjList_.emplace(v, std::move(adjEdges)); + } + } + } + + if (range_.min() == 0) { + result_.rows = buildZeroStepPath(); + } } folly::Future TraverseExecutor::handleResponse(RpcResponse&& resps) { @@ -260,10 +229,8 @@ void TraverseExecutor::expand(GetNeighborsIter* iter) { QueryExpressionContext ctx(ectx_); Value curVertex; - std::unordered_set uniqueVids; std::vector adjEdges; auto sz = iter->size(); - uniqueVids.reserve(sz); adjEdges.reserve(sz); vids_.reserve(vids_.size() + sz); adjList_.reserve(adjList_.size() + iter->numRows() + 1u); @@ -285,8 +252,8 @@ void TraverseExecutor::expand(GetNeighborsIter* iter) { continue; } const auto& dst = edge.getEdge().dst; - if (adjList_.find(dst) == adjList_.end() && uniqueVids.emplace(dst).second) { - vids_.emplace_back(dst); + if (adjList_.find(dst) == adjList_.end()) { + vids_.emplace(dst); } const auto& vertex = iter->getVertex(); curVertex = curVertex.empty() ? vertex : curVertex; diff --git a/src/graph/executor/query/TraverseExecutor.h b/src/graph/executor/query/TraverseExecutor.h index cd7e93afcd7..4d8cff8b336 100644 --- a/src/graph/executor/query/TraverseExecutor.h +++ b/src/graph/executor/query/TraverseExecutor.h @@ -51,48 +51,6 @@ class TraverseExecutor final : public StorageAccessExecutor { folly::Future execute() override; - struct VertexHash { - std::size_t operator()(const Value& v) const { - switch (v.type()) { - case Value::Type::VERTEX: { - auto& vid = v.getVertex().vid; - if (vid.type() == Value::Type::STRING) { - return std::hash()(vid.getStr()); - } else { - return vid.getInt(); - } - } - case Value::Type::STRING: { - return std::hash()(v.getStr()); - } - case Value::Type::INT: { - return v.getInt(); - } - default: { - return v.hash(); - } - } - } - }; - - struct VertexEqual { - bool operator()(const Value& lhs, const Value& rhs) const { - if (lhs.type() == rhs.type()) { - if (lhs.isVertex()) { - return lhs.getVertex().vid == rhs.getVertex().vid; - } - return lhs == rhs; - } - if (lhs.type() == Value::Type::VERTEX) { - return lhs.getVertex().vid == rhs; - } - if (rhs.type() == Value::Type::VERTEX) { - return lhs == rhs.getVertex().vid; - } - return lhs == rhs; - } - }; - template using VertexMap = std::unordered_map, VertexHash, VertexEqual>; @@ -103,6 +61,8 @@ class TraverseExecutor final : public StorageAccessExecutor { folly::Future getNeighbors(); + size_t numRowsOfRpcResp(const RpcResponse& resps) const; + void expand(GetNeighborsIter* iter); void expandOneStep(const RpcResponse& resps); folly::Future handleResponse(RpcResponse&& resps); @@ -128,7 +88,7 @@ class TraverseExecutor final : public StorageAccessExecutor { private: ObjectPool objPool_; - std::vector vids_; + VidHashSet vids_; std::vector initVertices_; DataSet result_; // Key : vertex Value : adjacent edges