Skip to content

Commit

Permalink
LookupIndex push aggregate (#3504)
Browse files Browse the repository at this point in the history
Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>
  • Loading branch information
Nivras and critical27 authored Jan 10, 2022
1 parent 83886a0 commit 8cab6d0
Show file tree
Hide file tree
Showing 18 changed files with 1,096 additions and 117 deletions.
3 changes: 3 additions & 0 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ struct LookupIndexResp {
// Each column represents one property. the column name is in the form of "tag_name.prop_alias"
// or "edge_type_name.prop_alias" in the same order which specified in return_columns of request
2: optional common.DataSet data,
// stat_data only have one column, the column name is the order in LookupIndexRequest.stat_prop
3: optional common.DataSet stat_data,
}

enum ScanType {
Expand Down Expand Up @@ -546,6 +548,7 @@ struct LookupIndexRequest {
// max row count of each partition in this response
6: optional i64 limit,
7: optional list<OrderBy> order_by,
8: optional list<StatProp> stat_columns,
}


Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#pragma once

#include "interface/gen-cpp2/storage_types.h"
#include "meta/processors/BaseProcessor.h"

namespace nebula {
Expand Down
27 changes: 27 additions & 0 deletions src/storage/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,5 +186,32 @@ StatusOr<std::string> BaseProcessor<RESP>::encodeRowVal(const meta::NebulaSchema
return std::move(rowWrite).moveEncodedStr();
}

template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::checkStatType(
const meta::SchemaProviderIf::Field& field, cpp2::StatType statType) {
// todo(doodle): how to deal with nullable fields? For now, null add anything
// is null, if there is even one null, the result will be invalid
auto fType = field.type();
switch (statType) {
case cpp2::StatType::SUM:
case cpp2::StatType::AVG:
case cpp2::StatType::MIN:
case cpp2::StatType::MAX: {
if (fType == nebula::cpp2::PropertyType::INT64 ||
fType == nebula::cpp2::PropertyType::INT32 ||
fType == nebula::cpp2::PropertyType::INT16 || fType == nebula::cpp2::PropertyType::INT8 ||
fType == nebula::cpp2::PropertyType::FLOAT ||
fType == nebula::cpp2::PropertyType::DOUBLE) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
return nebula::cpp2::ErrorCode::E_INVALID_STAT_TYPE;
}
case cpp2::StatType::COUNT: {
break;
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

} // namespace storage
} // namespace nebula
3 changes: 3 additions & 0 deletions src/storage/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class BaseProcessor {

void handleAsync(GraphSpaceID spaceId, PartitionID partId, nebula::cpp2::ErrorCode code);

nebula::cpp2::ErrorCode checkStatType(const meta::SchemaProviderIf::Field& field,
cpp2::StatType statType);

StatusOr<std::string> encodeRowVal(const meta::NebulaSchemaProvider* schema,
const std::vector<std::string>& propNames,
const std::vector<Value>& props,
Expand Down
1 change: 1 addition & 0 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ nebula_add_library(
exec/IndexDedupNode.cpp
exec/IndexEdgeScanNode.cpp
exec/IndexLimitNode.cpp
exec/IndexAggregateNode.cpp
exec/IndexProjectionNode.cpp
exec/IndexScanNode.cpp
exec/IndexSelectionNode.cpp
Expand Down
125 changes: 125 additions & 0 deletions src/storage/exec/IndexAggregateNode.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "storage/exec/IndexAggregateNode.h"

namespace nebula {
namespace storage {

IndexAggregateNode::IndexAggregateNode(const IndexAggregateNode& node)
: IndexNode(node), statInfos_(node.statInfos_), returnColumnsCount_(node.returnColumnsCount_) {
stats_ = node.stats_;
retColMap_ = node.retColMap_;
}

IndexAggregateNode::IndexAggregateNode(
RuntimeContext* context,
const std::vector<std::pair<std::string, cpp2::StatType>>& statInfos,
size_t returnColumnsCount)
: IndexNode(context, "IndexAggregateNode"),
statInfos_(statInfos),
returnColumnsCount_(returnColumnsCount) {}

nebula::cpp2::ErrorCode IndexAggregateNode::init(InitContext& ctx) {
DCHECK_EQ(children_.size(), 1);
for (const auto& statInfo : statInfos_) {
ctx.statColumns.insert(statInfo.first);
}
auto ret = children_[0]->init(ctx);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
initStatValue();
retColMap_.clear();
retColMap_ = ctx.retColMap;
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

void IndexAggregateNode::initStatValue() {
stats_.clear();
if (statInfos_.size() > 0) {
stats_.reserve(statInfos_.size());
for (const auto& statInfo : statInfos_) {
stats_.emplace_back(statInfo.second);
}
}
}

void IndexAggregateNode::addStatValue(const Value& value, ColumnStat* stat) {
switch (stat->statType_) {
case cpp2::StatType::SUM: {
stat->sum_ = stat->sum_ + value;
break;
}
case cpp2::StatType::COUNT: {
stat->count_ = stat->count_ + 1;
break;
}
case cpp2::StatType::MAX: {
stat->max_ = value > stat->max_ ? value : stat->max_;
break;
}
case cpp2::StatType::MIN: {
stat->min_ = value < stat->min_ ? value : stat->min_;
break;
}
default:
LOG(ERROR) << "get invalid stat type";
return;
}
}

Row IndexAggregateNode::project(Row&& row) {
Row ret;
ret.reserve(returnColumnsCount_);
for (size_t i = 0; i < returnColumnsCount_; i++) {
ret.emplace_back(std::move(row[i]));
}
return ret;
}

Row IndexAggregateNode::calculateStats() {
Row result;
result.values.reserve(stats_.size());
for (const auto& stat : stats_) {
if (stat.statType_ == cpp2::StatType::SUM) {
result.values.emplace_back(stat.sum_);
} else if (stat.statType_ == cpp2::StatType::COUNT) {
result.values.emplace_back(stat.count_);
} else if (stat.statType_ == cpp2::StatType::MAX) {
result.values.emplace_back(stat.max_);
} else if (stat.statType_ == cpp2::StatType::MIN) {
result.values.emplace_back(stat.min_);
}
}
return result;
}

IndexNode::Result IndexAggregateNode::doNext() {
DCHECK_EQ(children_.size(), 1);
auto& child = *children_[0];
Result result = child.next();
const auto& row = result.row();
if (result.hasData()) {
for (size_t i = 0; i < statInfos_.size(); i++) {
const auto& columnName = statInfos_[i].first;
addStatValue(row[retColMap_[columnName]], &stats_[i]);
}
result = Result(project(std::move(result).row()));
}
return result;
}

std::unique_ptr<IndexNode> IndexAggregateNode::copy() {
return std::make_unique<IndexAggregateNode>(*this);
}

std::string IndexAggregateNode::identify() {
return "";
}

} // namespace storage

} // namespace nebula
49 changes: 49 additions & 0 deletions src/storage/exec/IndexAggregateNode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once
#include "storage/exec/IndexNode.h"

namespace nebula {
namespace storage {

// used to save stat value for each column
struct ColumnStat {
ColumnStat() = default;

explicit ColumnStat(const cpp2::StatType& statType) : statType_(statType) {}

cpp2::StatType statType_;
mutable Value sum_ = 0L;
mutable Value count_ = 0L;
mutable Value min_ = std::numeric_limits<int64_t>::max();
mutable Value max_ = std::numeric_limits<int64_t>::min();
};

class IndexAggregateNode : public IndexNode {
public:
IndexAggregateNode(const IndexAggregateNode& node);
explicit IndexAggregateNode(RuntimeContext* context,
const std::vector<std::pair<std::string, cpp2::StatType>>& statInfos,
size_t returnColumnsCount);

nebula::cpp2::ErrorCode init(InitContext& ctx) override;
void initStatValue();
void addStatValue(const Value& value, ColumnStat* stat);
Row project(Row&& row);
Row calculateStats();

std::unique_ptr<IndexNode> copy() override;
std::string identify() override;

private:
Result doNext() override;
std::vector<std::pair<std::string, cpp2::StatType>> statInfos_;
std::vector<ColumnStat> stats_;
Map<std::string, size_t> retColMap_;
size_t returnColumnsCount_;
};

} // namespace storage
} // namespace nebula
89 changes: 89 additions & 0 deletions src/storage/exec/IndexExprContext.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once

#include "common/expression/Expression.h"
#include "storage/exec/IndexNode.h"

namespace nebula {
namespace storage {

class IndexExprContext : public ExpressionContext {
public:
explicit IndexExprContext(const Map<std::string, size_t> &colPos) : colPos_(colPos) {}
void setRow(const Row &row) {
row_ = &row;
}

Value getEdgeProp(const std::string &edgeType, const std::string &prop) const override {
UNUSED(edgeType);
DCHECK(row_ != nullptr);
auto iter = colPos_.find(prop);
DCHECK(iter != colPos_.end());
DCHECK(iter->second < row_->size());
return (*row_)[iter->second];
}

Value getTagProp(const std::string &tag, const std::string &prop) const override {
UNUSED(tag);
DCHECK(row_ != nullptr);
auto iter = colPos_.find(prop);
DCHECK(iter != colPos_.end());
DCHECK(iter->second < row_->size());
return (*row_)[iter->second];
}

// override
const Value &getVar(const std::string &var) const override {
UNUSED(var);
return fatal(__FILE__, __LINE__);
}
const Value &getVersionedVar(const std::string &var, int64_t version) const override {
UNUSED(var), UNUSED(version);
return fatal(__FILE__, __LINE__);
}
const Value &getVarProp(const std::string &var, const std::string &prop) const override {
UNUSED(var), UNUSED(prop);
return fatal(__FILE__, __LINE__);
}
Value getSrcProp(const std::string &tag, const std::string &prop) const override {
UNUSED(tag), UNUSED(prop);
return fatal(__FILE__, __LINE__);
}
const Value &getDstProp(const std::string &tag, const std::string &prop) const override {
UNUSED(tag), UNUSED(prop);
return fatal(__FILE__, __LINE__);
}
const Value &getInputProp(const std::string &prop) const override {
UNUSED(prop);
return fatal(__FILE__, __LINE__);
}
Value getVertex(const std::string &) const override {
return fatal(__FILE__, __LINE__);
}
Value getEdge() const override {
return fatal(__FILE__, __LINE__);
}
Value getColumn(int32_t index) const override {
UNUSED(index);
return fatal(__FILE__, __LINE__);
}
void setVar(const std::string &var, Value val) override {
UNUSED(var), UNUSED(val);
fatal(__FILE__, __LINE__);
}

private:
const Map<std::string, size_t> &colPos_;
const Row *row_;
inline const Value &fatal(const std::string &file, int line) const {
LOG(FATAL) << "Unexpect at " << file << ":" << line;
static Value placeholder;
return placeholder;
}
};

} // namespace storage
} // namespace nebula
3 changes: 3 additions & 0 deletions src/storage/exec/IndexNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ struct InitContext {
std::vector<std::string> returnColumns;
// The index of name in `returncolumns`
Map<std::string, size_t> retColMap;
// The columns in statColumns
// TODO(nivras) need refactor this, put statColumns in returnColumns
Set<std::string> statColumns;
};

class IndexNode {
Expand Down
6 changes: 6 additions & 0 deletions src/storage/exec/IndexProjectionNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ nebula::cpp2::ErrorCode IndexProjectionNode::init(InitContext& ctx) {
for (auto& col : requiredColumns_) {
ctx.requiredColumns.insert(col);
}
for (auto& col : ctx.statColumns) {
if (ctx.requiredColumns.find(col) == ctx.requiredColumns.end()) {
ctx.requiredColumns.insert(col);
requiredColumns_.push_back(col);
}
}
auto ret = children_[0]->init(ctx);
if (UNLIKELY(ret != ::nebula::cpp2::ErrorCode::SUCCEEDED)) {
return ret;
Expand Down
Loading

0 comments on commit 8cab6d0

Please sign in to comment.