Skip to content

Commit

Permalink
Feature/view vertices edges (#3320)
Browse files Browse the repository at this point in the history
* Scan multiple parts.

* Add multiple parts test case.

* Add limit test.

* Remove unused include.

* Support multiple tags.

* Fix license header.

* Optimize the extra read operations.

* Fix compile error.

* Skip invalid tag in one loop.

* Avoid extra logical.

* Add scan executors.

* Add scan entry of match.

* Format.

* Push filter down to ScanVertices.

* Push filter over AppendVertices.

* Remove unused code.

* Support push limit down to scan vertices.

* Fix vfilter push down.

* Transforme traverse to get edges.

* Push limit to scan edges.

* Check limit for scan.

* Revert test cases.

* Resolve conflict.

* Remove redundant line.

Co-authored-by: cpw <13495049+CPWstatic@users.noreply.github.com>
  • Loading branch information
Shylock-Hg and CPWstatic authored Dec 9, 2021
1 parent 7c4e372 commit bc1aac6
Show file tree
Hide file tree
Showing 51 changed files with 1,724 additions and 132 deletions.
10 changes: 5 additions & 5 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,15 +558,15 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> StorageClient::lookupAndTravers
});
}

StorageRpcRespFuture<cpp2::ScanEdgeResponse> StorageClient::scanEdge(
StorageRpcRespFuture<cpp2::ScanResponse> StorageClient::scanEdge(
const CommonRequestParam& param,
const cpp2::EdgeProp& edgeProp,
const std::vector<cpp2::EdgeProp>& edgeProp,
int64_t limit,
const Expression* filter) {
std::unordered_map<HostAddr, cpp2::ScanEdgeRequest> requests;
auto status = getHostPartsWithCursor(param.space);
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ScanEdgeResponse>>(
return folly::makeFuture<StorageRpcResponse<cpp2::ScanResponse>>(
std::runtime_error(status.status().toString()));
}
auto& clusters = status.value();
Expand All @@ -589,15 +589,15 @@ StorageRpcRespFuture<cpp2::ScanEdgeResponse> StorageClient::scanEdge(
const cpp2::ScanEdgeRequest& r) { return client->future_scanEdge(r); });
}

StorageRpcRespFuture<cpp2::ScanVertexResponse> StorageClient::scanVertex(
StorageRpcRespFuture<cpp2::ScanResponse> StorageClient::scanVertex(
const CommonRequestParam& param,
const std::vector<cpp2::VertexProp>& vertexProp,
int64_t limit,
const Expression* filter) {
std::unordered_map<HostAddr, cpp2::ScanVertexRequest> requests;
auto status = getHostPartsWithCursor(param.space);
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ScanVertexResponse>>(
return folly::makeFuture<StorageRpcResponse<cpp2::ScanResponse>>(
std::runtime_error(status.status().toString()));
}
auto& clusters = status.value();
Expand Down
10 changes: 5 additions & 5 deletions src/clients/storage/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ class StorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsyncCli
StorageRpcRespFuture<cpp2::GetNeighborsResponse> lookupAndTraverse(
const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec);

StorageRpcRespFuture<cpp2::ScanEdgeResponse> scanEdge(const CommonRequestParam& param,
const cpp2::EdgeProp& vertexProp,
int64_t limit,
const Expression* filter);
StorageRpcRespFuture<cpp2::ScanResponse> scanEdge(const CommonRequestParam& param,
const std::vector<cpp2::EdgeProp>& vertexProp,
int64_t limit,
const Expression* filter);

StorageRpcRespFuture<cpp2::ScanVertexResponse> scanVertex(
StorageRpcRespFuture<cpp2::ScanResponse> scanVertex(
const CommonRequestParam& param,
const std::vector<cpp2::VertexProp>& vertexProp,
int64_t limit,
Expand Down
12 changes: 12 additions & 0 deletions src/common/meta/SchemaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,17 @@ StatusOr<std::pair<bool, int32_t>> SchemaManager::getSchemaIDByName(GraphSpaceID
return Status::Error("Schema not exist: %s", schemaName.str().c_str());
}

StatusOr<std::unordered_map<TagID, std::string>> SchemaManager::getAllTags(GraphSpaceID space) {
std::unordered_map<TagID, std::string> tags;
auto tagSchemas = getAllLatestVerTagSchema(space);
NG_RETURN_IF_ERROR(tagSchemas);
for (auto& tagSchema : tagSchemas.value()) {
auto tagName = toTagName(space, tagSchema.first);
NG_RETURN_IF_ERROR(tagName);
tags.emplace(tagSchema.first, tagName.value());
}
return tags;
}

} // namespace meta
} // namespace nebula
2 changes: 2 additions & 0 deletions src/common/meta/SchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class SchemaManager {

virtual StatusOr<std::vector<std::string>> getAllEdge(GraphSpaceID space) = 0;

StatusOr<std::unordered_map<TagID, std::string>> getAllTags(GraphSpaceID space);

// get all version of all tag schema
virtual StatusOr<TagSchemas> getAllVerTagSchema(GraphSpaceID space) = 0;

Expand Down
2 changes: 2 additions & 0 deletions src/graph/context/ast/CypherAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ struct ScanInfo {
std::vector<IndexID> indexIds;
// use for seek by edge only
MatchEdge::Direction direction{MatchEdge::Direction::OUT_EDGE};
// use for scan seek
bool anyLabel{false};
};

struct CypherClauseContextBase : AstContext {
Expand Down
2 changes: 2 additions & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ nebula_add_library(
query/InnerJoinExecutor.cpp
query/IndexScanExecutor.cpp
query/AssignExecutor.cpp
query/ScanVerticesExecutor.cpp
query/ScanEdgesExecutor.cpp
query/TraverseExecutor.cpp
query/AppendVerticesExecutor.cpp
algo/ConjunctPathExecutor.cpp
Expand Down
8 changes: 8 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
#include "graph/executor/query/MinusExecutor.h"
#include "graph/executor/query/ProjectExecutor.h"
#include "graph/executor/query/SampleExecutor.h"
#include "graph/executor/query/ScanEdgesExecutor.h"
#include "graph/executor/query/ScanVerticesExecutor.h"
#include "graph/executor/query/SortExecutor.h"
#include "graph/executor/query/TopNExecutor.h"
#include "graph/executor/query/TraverseExecutor.h"
Expand Down Expand Up @@ -170,6 +172,12 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kGetVertices: {
return pool->add(new GetVerticesExecutor(node, qctx));
}
case PlanNode::Kind::kScanEdges: {
return pool->add(new ScanEdgesExecutor(node, qctx));
}
case PlanNode::Kind::kScanVertices: {
return pool->add(new ScanVerticesExecutor(node, qctx));
}
case PlanNode::Kind::kGetNeighbors: {
return pool->add(new GetNeighborsExecutor(node, qctx));
}
Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/query/GetPropExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class GetPropExecutor : public StorageAccessExecutor {
GetPropExecutor(const std::string &name, const PlanNode *node, QueryContext *qctx)
: StorageAccessExecutor(name, node, qctx) {}

Status handleResp(storage::StorageRpcResponse<storage::cpp2::GetPropResponse> &&rpcResp,
template <typename Response>
Status handleResp(storage::StorageRpcResponse<Response> &&rpcResp,
const std::vector<std::string> &colNames) {
auto result = handleCompleteness(rpcResp, FLAGS_accept_partial_success);
NG_RETURN_IF_ERROR(result);
Expand Down
50 changes: 50 additions & 0 deletions src/graph/executor/query/ScanEdgesExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "graph/executor/query/ScanEdgesExecutor.h"

#include "common/time/ScopedTimer.h"
#include "graph/context/QueryContext.h"
#include "graph/planner/plan/Query.h"
#include "graph/util/SchemaUtil.h"

using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::ScanResponse;

namespace nebula {
namespace graph {

folly::Future<Status> ScanEdgesExecutor::execute() { return scanEdges(); }

folly::Future<Status> ScanEdgesExecutor::scanEdges() {
SCOPED_TIMER(&execTime_);
StorageClient *client = qctx()->getStorageClient();
auto *se = asNode<ScanEdges>(node());
if (se->limit() < 0) {
return Status::Error("Scan edges must specify limit number.");
}

time::Duration scanEdgesTime;
StorageClient::CommonRequestParam param(se->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
qctx()->plan()->isProfileEnabled());
return DCHECK_NOTNULL(client)
->scanEdge(param, *DCHECK_NOTNULL(se->props()), se->limit(), se->filter())
.via(runner())
.ensure([this, scanEdgesTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc", folly::sformat("{}(us)", scanEdgesTime.elapsedInUSec()));
})
.thenValue([this, se](StorageRpcResponse<ScanResponse> &&rpcResp) {
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
return handleResp(std::move(rpcResp), se->colNames());
});
}

} // namespace graph
} // namespace nebula
22 changes: 22 additions & 0 deletions src/graph/executor/query/ScanEdgesExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "graph/executor/query/GetPropExecutor.h"

namespace nebula {
namespace graph {
class ScanEdgesExecutor final : public GetPropExecutor {
public:
ScanEdgesExecutor(const PlanNode *node, QueryContext *qctx)
: GetPropExecutor("ScanEdgesExecutor", node, qctx) {}

folly::Future<Status> execute() override;

private:
folly::Future<Status> scanEdges();
};

} // namespace graph
} // namespace nebula
50 changes: 50 additions & 0 deletions src/graph/executor/query/ScanVerticesExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "graph/executor/query/ScanVerticesExecutor.h"

#include "common/time/ScopedTimer.h"
#include "graph/context/QueryContext.h"
#include "graph/util/SchemaUtil.h"

using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::ScanResponse;

namespace nebula {
namespace graph {

folly::Future<Status> ScanVerticesExecutor::execute() { return scanVertices(); }

folly::Future<Status> ScanVerticesExecutor::scanVertices() {
SCOPED_TIMER(&execTime_);

auto *sv = asNode<ScanVertices>(node());
if (sv->limit() < 0) {
return Status::Error("Scan vertices must specify limit number.");
}
StorageClient *storageClient = qctx()->getStorageClient();

time::Duration scanVertexTime;
StorageClient::CommonRequestParam param(sv->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
qctx()->plan()->isProfileEnabled());
return DCHECK_NOTNULL(storageClient)
->scanVertex(param, *DCHECK_NOTNULL(sv->props()), sv->limit(), sv->filter())
.via(runner())
.ensure([this, scanVertexTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc", folly::sformat("{}(us)", scanVertexTime.elapsedInUSec()));
})
.thenValue([this, sv](StorageRpcResponse<ScanResponse> &&rpcResp) {
SCOPED_TIMER(&execTime_);
addStats(rpcResp, otherStats_);
return handleResp(std::move(rpcResp), sv->colNames());
});
}

} // namespace graph
} // namespace nebula
26 changes: 26 additions & 0 deletions src/graph/executor/query/ScanVerticesExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#pragma once

#include "graph/executor/query/GetPropExecutor.h"
#include "graph/planner/plan/Query.h"

namespace nebula {
namespace graph {

class ScanVerticesExecutor final : public GetPropExecutor {
public:
ScanVerticesExecutor(const PlanNode *node, QueryContext *qctx)
: GetPropExecutor("ScanVerticesExecutor", node, qctx) {}

folly::Future<Status> execute() override;

private:
folly::Future<Status> scanVertices();
};

} // namespace graph
} // namespace nebula
5 changes: 5 additions & 0 deletions src/graph/optimizer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ nebula_add_library(
rule/PushFilterDownAggregateRule.cpp
rule/PushFilterDownProjectRule.cpp
rule/PushFilterDownLeftJoinRule.cpp
rule/PushFilterDownScanVerticesRule.cpp
rule/PushVFilterDownScanVerticesRule.cpp
rule/OptimizeEdgeIndexScanByFilterRule.cpp
rule/OptimizeTagIndexScanByFilterRule.cpp
rule/UnionAllIndexScanBaseRule.cpp
Expand All @@ -45,6 +47,9 @@ nebula_add_library(
rule/PushLimitDownEdgeIndexPrefixScanRule.cpp
rule/PushLimitDownEdgeIndexRangeScanRule.cpp
rule/PushLimitDownProjectRule.cpp
rule/PushLimitDownScanAppendVerticesRule.cpp
rule/GetEdgesTransformRule.cpp
rule/PushLimitDownScanEdgesAppendVerticesRule.cpp
)

nebula_add_subdirectory(test)
Loading

0 comments on commit bc1aac6

Please sign in to comment.