Skip to content

Commit

Permalink
Push filter down GetProps. (#3844)
Browse files Browse the repository at this point in the history
* Push limit down GetProps.

* Combine logic to function.

Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>
  • Loading branch information
Shylock-Hg and critical27 authored Mar 1, 2022
1 parent ff202e2 commit 3822d76
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 50 deletions.
52 changes: 43 additions & 9 deletions src/storage/exec/GetPropNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ class GetTagPropNode : public QueryNode<VertexID> {
GetTagPropNode(RuntimeContext* context,
std::vector<TagNode*> tagNodes,
nebula::DataSet* resultDataSet,
Expression* filter,
std::size_t limit)
: context_(context),
tagNodes_(std::move(tagNodes)),
resultDataSet_(resultDataSet),
expCtx_(filter == nullptr
? nullptr
: new StorageExpressionContext(context->vIdLen(), context->isIntId())),
filter_(filter),
limit_(limit) {
name_ = "GetTagPropNode";
}
Expand Down Expand Up @@ -63,19 +68,24 @@ class GetTagPropNode : public QueryNode<VertexID> {
auto isIntId = context_->isIntId();
for (auto* tagNode : tagNodes_) {
ret = tagNode->collectTagPropsIfValid(
[&row](const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
[&row, tagNode, this](const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
for (const auto& prop : *props) {
if (prop.returned_) {
row.emplace_back(Value());
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setTagProp(tagNode->getTagName(), prop.name_, Value());
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
},
[&row, vIdLen, isIntId](
[&row, vIdLen, isIntId, tagNode, this](
folly::StringPiece key,
RowReader* reader,
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
if (!QueryUtils::collectVertexProps(key, vIdLen, isIntId, reader, props, row).ok()) {
auto status = QueryUtils::collectVertexProps(
key, vIdLen, isIntId, reader, props, row, expCtx_.get(), tagNode->getTagName());
if (!status.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand All @@ -84,14 +94,21 @@ class GetTagPropNode : public QueryNode<VertexID> {
return ret;
}
}
resultDataSet_->rows.emplace_back(std::move(row));
if (filter_ == nullptr || (QueryUtils::vTrue(filter_->eval(*expCtx_)))) {
resultDataSet_->rows.emplace_back(std::move(row));
}
if (expCtx_ != nullptr) {
expCtx_->clear();
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

private:
RuntimeContext* context_;
std::vector<TagNode*> tagNodes_;
nebula::DataSet* resultDataSet_;
std::unique_ptr<StorageExpressionContext> expCtx_{nullptr};
Expression* filter_{nullptr};
const std::size_t limit_{std::numeric_limits<std::size_t>::max()};
};

Expand All @@ -102,10 +119,15 @@ class GetEdgePropNode : public QueryNode<cpp2::EdgeKey> {
GetEdgePropNode(RuntimeContext* context,
std::vector<EdgeNode<cpp2::EdgeKey>*> edgeNodes,
nebula::DataSet* resultDataSet,
Expression* filter,
std::size_t limit)
: context_(context),
edgeNodes_(std::move(edgeNodes)),
resultDataSet_(resultDataSet),
expCtx_(filter == nullptr
? nullptr
: new StorageExpressionContext(context->vIdLen(), context->isIntId())),
filter_(filter),
limit_(limit) {
QueryNode::name_ = "GetEdgePropNode";
}
Expand All @@ -124,35 +146,47 @@ class GetEdgePropNode : public QueryNode<cpp2::EdgeKey> {
auto isIntId = context_->isIntId();
for (auto* edgeNode : edgeNodes_) {
ret = edgeNode->collectEdgePropsIfValid(
[&row](const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
[&row, edgeNode, this](const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
for (const auto& prop : *props) {
if (prop.returned_) {
row.emplace_back(Value());
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setEdgeProp(edgeNode->getEdgeName(), prop.name_, Value());
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
},
[&row, vIdLen, isIntId](
[&row, vIdLen, isIntId, edgeNode, this](
folly::StringPiece key,
RowReader* reader,
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
if (!QueryUtils::collectEdgeProps(key, vIdLen, isIntId, reader, props, row).ok()) {
return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND;
auto status = QueryUtils::collectEdgeProps(
key, vIdLen, isIntId, reader, props, row, expCtx_.get(), edgeNode->getEdgeName());
if (!status.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
});
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
}
resultDataSet_->rows.emplace_back(std::move(row));
if (filter_ == nullptr || (QueryUtils::vTrue(filter_->eval(*expCtx_)))) {
resultDataSet_->rows.emplace_back(std::move(row));
}
if (expCtx_ != nullptr) {
expCtx_->clear();
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

private:
RuntimeContext* context_;
std::vector<EdgeNode<cpp2::EdgeKey>*> edgeNodes_;
nebula::DataSet* resultDataSet_;
std::unique_ptr<StorageExpressionContext> expCtx_{nullptr};
Expression* filter_{nullptr};
const std::size_t limit_{std::numeric_limits<std::size_t>::max()};
};

Expand Down
23 changes: 10 additions & 13 deletions src/storage/exec/HashJoinNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,16 @@ class HashJoinNode : public IterateNode<VertexID> {
nebula::List list;
list.reserve(props->size());
const auto& tagName = tagNode->getTagName();
for (const auto& prop : *props) {
VLOG(2) << "Collect prop " << prop.name_;
auto value = QueryUtils::readVertexProp(
key, context_->vIdLen(), context_->isIntId(), reader, prop);
if (!value.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setTagProp(tagName, prop.name_, value.value());
}
if (prop.returned_) {
list.emplace_back(std::move(value).value());
}
auto status = QueryUtils::collectVertexProps(key,
context_->vIdLen(),
context_->isIntId(),
reader,
props,
list,
expCtx_,
tagName);
if (!status.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
}
result.values.emplace_back(std::move(list));
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down
41 changes: 29 additions & 12 deletions src/storage/exec/QueryUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@
#include "common/expression/Expression.h"
#include "common/utils/DefaultValueContext.h"
#include "storage/CommonUtils.h"
#include "storage/context/StorageExpressionContext.h"
#include "storage/query/QueryBaseProcessor.h"

namespace nebula {
namespace storage {

class QueryUtils final {
public:
static inline bool vTrue(const Value& v) {
return v.isBool() && v.getBool();
}

enum class ReturnColType : uint16_t {
kVid,
kTag,
Expand Down Expand Up @@ -165,15 +170,21 @@ class QueryUtils final {
bool isIntId,
RowReader* reader,
const std::vector<PropContext>* props,
nebula::List& list) {
nebula::List& list,
StorageExpressionContext* expCtx = nullptr,
const std::string& tagName = "") {
for (const auto& prop : *props) {
if (!(prop.returned_ || (prop.filtered_ && expCtx != nullptr))) {
continue;
}
auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop);
NG_RETURN_IF_ERROR(value);
if (prop.returned_) {
VLOG(2) << "Collect prop " << prop.name_;
auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop);
if (!value.ok()) {
return value.status();
}
list.emplace_back(std::move(value).value());
list.emplace_back(value.value());
}
if (prop.filtered_ && expCtx != nullptr) {
expCtx->setTagProp(tagName, prop.name_, std::move(value).value());
}
}
return Status::OK();
Expand All @@ -184,15 +195,21 @@ class QueryUtils final {
bool isIntId,
RowReader* reader,
const std::vector<PropContext>* props,
nebula::List& list) {
nebula::List& list,
StorageExpressionContext* expCtx = nullptr,
const std::string& edgeName = "") {
for (const auto& prop : *props) {
if (!(prop.returned_ || (prop.filtered_ && expCtx != nullptr))) {
continue;
}
auto value = QueryUtils::readEdgeProp(key, vIdLen, isIntId, reader, prop);
NG_RETURN_IF_ERROR(value);
if (prop.returned_) {
VLOG(2) << "Collect prop " << prop.name_;
auto value = QueryUtils::readEdgeProp(key, vIdLen, isIntId, reader, prop);
if (!value.ok()) {
return value.status();
}
list.emplace_back(std::move(value).value());
list.emplace_back(value.value());
}
if (prop.filtered_ && expCtx != nullptr) {
expCtx->setEdgeProp(edgeName, prop.name_, std::move(value).value());
}
}
return Status::OK();
Expand Down
8 changes: 2 additions & 6 deletions src/storage/exec/ScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ namespace storage {

using Cursor = std::string;

inline bool vTrue(const Value& v) {
return v.isBool() && v.getBool();
}

// Node to scan vertices of one partition
class ScanVertexPropNode : public QueryNode<Cursor> {
public:
Expand Down Expand Up @@ -154,7 +150,7 @@ class ScanVertexPropNode : public QueryNode<Cursor> {
}
}
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED &&
(filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) {
(filter_ == nullptr || QueryUtils::vTrue(filter_->eval(*expCtx_)))) {
resultDataSet_->rows.emplace_back(std::move(row));
}
expCtx_->clear();
Expand Down Expand Up @@ -295,7 +291,7 @@ class ScanEdgePropNode : public QueryNode<Cursor> {
}
}
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED &&
(filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) {
(filter_ == nullptr || QueryUtils::vTrue(filter_->eval(*expCtx_)))) {
resultDataSet_->rows.emplace_back(std::move(row));
}
expCtx_->clear();
Expand Down
22 changes: 18 additions & 4 deletions src/storage/query/GetPropProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ StoragePlan<VertexID> GetPropProcessor::buildTagPlan(RuntimeContext* context,
tags.emplace_back(tag.get());
plan.addNode(std::move(tag));
}
auto output = std::make_unique<GetTagPropNode>(context, tags, result, limit_);
auto output = std::make_unique<GetTagPropNode>(context, tags, result, filter_, limit_);
for (auto* tag : tags) {
output->addDependency(tag);
}
Expand All @@ -219,7 +219,7 @@ StoragePlan<cpp2::EdgeKey> GetPropProcessor::buildEdgePlan(RuntimeContext* conte
edges.emplace_back(edge.get());
plan.addNode(std::move(edge));
}
auto output = std::make_unique<GetEdgePropNode>(context, edges, result, limit_);
auto output = std::make_unique<GetEdgePropNode>(context, edges, result, filter_, limit_);
for (auto* edge : edges) {
output->addDependency(edge);
}
Expand Down Expand Up @@ -251,14 +251,28 @@ nebula::cpp2::ErrorCode GetPropProcessor::checkAndBuildContexts(const cpp2::GetP
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
}
return buildTagContext(req);
code = buildTagContext(req);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
}
} else {
code = getSpaceEdgeSchema();
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
}
return buildEdgeContext(req);
code = buildEdgeContext(req);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
}
}
code = buildFilter(req, [](const cpp2::GetPropRequest& r) -> const std::string* {
if (r.filter_ref().has_value()) {
return r.get_filter();
} else {
return nullptr;
}
});
return code;
}

nebula::cpp2::ErrorCode GetPropProcessor::buildTagContext(const cpp2::GetPropRequest& req) {
Expand Down
Loading

0 comments on commit 3822d76

Please sign in to comment.