Skip to content

Commit

Permalink
Add column stats unit test for different data types (facebookincubato…
Browse files Browse the repository at this point in the history
…r#6549)

Summary: Pull Request resolved: facebookincubator#6549

Reviewed By: xiaoxmeng

Differential Revision: D49308980

Pulled By: kewang1024

fbshipit-source-id: 376c92eed592389625d800673dcb36cf93fa3444
  • Loading branch information
kewang1024 authored and codyschierbeck committed Sep 27, 2023
1 parent 43cefe5 commit 09ed962
Showing 1 changed file with 195 additions and 0 deletions.
195 changes: 195 additions & 0 deletions velox/exec/tests/TableWriteTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "folly/dynamic.h"
#include "velox/common/base/Fs.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/hyperloglog/SparseHll.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HivePartitionFunction.h"
Expand All @@ -39,6 +40,7 @@ using namespace facebook::velox::connector;
using namespace facebook::velox::connector::hive;
using namespace facebook::velox::dwio::common;
using namespace facebook::velox::common::testutil;
using namespace facebook::velox::common::hll;

enum class TestMode {
kUnpartitioned,
Expand Down Expand Up @@ -2265,6 +2267,199 @@ TEST_P(AllTableWriterTest, tableWriteOutputCheck) {
ASSERT_EQ(obj[TableWriteTraits::kLifeSpanContextKey], "TaskWide");
}

TEST_P(AllTableWriterTest, columnStatsDataTypes) {
auto rowType =
ROW({"c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"},
{BIGINT(),
INTEGER(),
SMALLINT(),
REAL(),
DOUBLE(),
VARCHAR(),
BOOLEAN(),
MAP(DATE(), BIGINT()),
ARRAY(BIGINT())});
setDataTypes(rowType);
std::vector<RowVectorPtr> input;
input.push_back(makeRowVector(
rowType_->names(),
{
makeFlatVector<int64_t>(1'000, [&](auto row) { return 1; }),
makeFlatVector<int32_t>(1'000, [&](auto row) { return 1; }),
makeFlatVector<int16_t>(1'000, [&](auto row) { return row; }),
makeFlatVector<float>(1'000, [&](auto row) { return row + 33.23; }),
makeFlatVector<double>(1'000, [&](auto row) { return row + 33.23; }),
makeFlatVector<StringView>(
1'000,
[&](auto row) {
return StringView(std::to_string(row).c_str());
}),
makeFlatVector<bool>(1'000, [&](auto row) { return true; }),
makeMapVector<int32_t, int64_t>(
1'000,
[](auto /*row*/) { return 5; },
[](auto row) { return row; },
[](auto row) { return row * 3; }),
makeArrayVector<int64_t>(
1'000,
[](auto /*row*/) { return 5; },
[](auto row) { return row * 3; }),
}));
createDuckDbTable(input);
auto outputDirectory = TempDirectoryPath::create();

std::vector<FieldAccessTypedExprPtr> groupingKeyFields;
for (int i = 0; i < partitionedBy_.size(); ++i) {
groupingKeyFields.emplace_back(std::make_shared<core::FieldAccessTypedExpr>(
partitionTypes_.at(i), partitionedBy_.at(i)));
}

// aggregation node
core::TypedExprPtr intInputField =
std::make_shared<const core::FieldAccessTypedExpr>(SMALLINT(), "c2");
auto minCallExpr = std::make_shared<const core::CallTypedExpr>(
SMALLINT(), std::vector<core::TypedExprPtr>{intInputField}, "min");
auto maxCallExpr = std::make_shared<const core::CallTypedExpr>(
SMALLINT(), std::vector<core::TypedExprPtr>{intInputField}, "max");
auto distinctCountCallExpr = std::make_shared<const core::CallTypedExpr>(
VARCHAR(),
std::vector<core::TypedExprPtr>{intInputField},
"approx_distinct");

core::TypedExprPtr strInputField =
std::make_shared<const core::FieldAccessTypedExpr>(VARCHAR(), "c5");
auto maxDataSizeCallExpr = std::make_shared<const core::CallTypedExpr>(
BIGINT(),
std::vector<core::TypedExprPtr>{strInputField},
"max_data_size_for_stats");
auto sumDataSizeCallExpr = std::make_shared<const core::CallTypedExpr>(
BIGINT(),
std::vector<core::TypedExprPtr>{strInputField},
"sum_data_size_for_stats");

core::TypedExprPtr boolInputField =
std::make_shared<const core::FieldAccessTypedExpr>(BOOLEAN(), "c6");
auto countCallExpr = std::make_shared<const core::CallTypedExpr>(
BIGINT(), std::vector<core::TypedExprPtr>{boolInputField}, "count");
auto countIfCallExpr = std::make_shared<const core::CallTypedExpr>(
BIGINT(), std::vector<core::TypedExprPtr>{boolInputField}, "count_if");

core::TypedExprPtr mapInputField =
std::make_shared<const core::FieldAccessTypedExpr>(
MAP(DATE(), BIGINT()), "c7");
auto countMapCallExpr = std::make_shared<const core::CallTypedExpr>(
BIGINT(), std::vector<core::TypedExprPtr>{mapInputField}, "count");
auto sumDataSizeMapCallExpr = std::make_shared<const core::CallTypedExpr>(
BIGINT(),
std::vector<core::TypedExprPtr>{mapInputField},
"sum_data_size_for_stats");

core::TypedExprPtr arrayInputField =
std::make_shared<const core::FieldAccessTypedExpr>(
MAP(DATE(), BIGINT()), "c7");
auto countArrayCallExpr = std::make_shared<const core::CallTypedExpr>(
BIGINT(), std::vector<core::TypedExprPtr>{mapInputField}, "count");
auto sumDataSizeArrayCallExpr = std::make_shared<const core::CallTypedExpr>(
BIGINT(),
std::vector<core::TypedExprPtr>{mapInputField},
"sum_data_size_for_stats");

const std::vector<std::string> aggregateNames = {
"min",
"max",
"approx_distinct",
"max_data_size_for_stats",
"sum_data_size_for_stats",
"count",
"count_if",
"count",
"sum_data_size_for_stats",
"count",
"sum_data_size_for_stats",
};
std::vector<core::AggregationNode::Aggregate> aggregates = {
core::AggregationNode::Aggregate{minCallExpr, nullptr, {}, {}},
core::AggregationNode::Aggregate{maxCallExpr, nullptr, {}, {}},
core::AggregationNode::Aggregate{distinctCountCallExpr, nullptr, {}, {}},
core::AggregationNode::Aggregate{maxDataSizeCallExpr, nullptr, {}, {}},
core::AggregationNode::Aggregate{sumDataSizeCallExpr, nullptr, {}, {}},
core::AggregationNode::Aggregate{countCallExpr, nullptr, {}, {}},
core::AggregationNode::Aggregate{countIfCallExpr, nullptr, {}, {}},
core::AggregationNode::Aggregate{countMapCallExpr, nullptr, {}, {}},
core::AggregationNode::Aggregate{sumDataSizeMapCallExpr, nullptr, {}, {}},
core::AggregationNode::Aggregate{countArrayCallExpr, nullptr, {}, {}},
core::AggregationNode::Aggregate{
sumDataSizeArrayCallExpr, nullptr, {}, {}},
};
const auto aggregationNode = std::make_shared<core::AggregationNode>(
core::PlanNodeId(),
core::AggregationNode::Step::kPartial,
groupingKeyFields,
std::vector<core::FieldAccessTypedExprPtr>{},
aggregateNames,
aggregates,
false, // ignoreNullKeys
PlanBuilder().values({input}).planNode());

auto plan = PlanBuilder()
.values({input})
.tableWrite(
rowType_,
rowType_->names(),
aggregationNode,
std::make_shared<core::InsertTableHandle>(
kHiveConnectorId,
makeHiveInsertTableHandle(
rowType_->names(),
rowType_->children(),
partitionedBy_,
nullptr,
makeLocationHandle(outputDirectory->path))),
false,
CommitStrategy::kNoCommit)
.planNode();

// the result is in format of : row/fragments/context/[partition]/[stats]
int nextColumnStatsIndex = 3 + partitionedBy_.size();
const RowVectorPtr result = AssertQueryBuilder(plan).copyResults(pool());
auto minStatsVector =
result->childAt(nextColumnStatsIndex++)->asFlatVector<int16_t>();
ASSERT_EQ(minStatsVector->valueAt(0), 0);
const auto maxStatsVector =
result->childAt(nextColumnStatsIndex++)->asFlatVector<int16_t>();
ASSERT_EQ(maxStatsVector->valueAt(0), 999);
const auto distinctCountStatsVector =
result->childAt(nextColumnStatsIndex++)->asFlatVector<StringView>();
HashStringAllocator allocator{pool_.get()};
DenseHll denseHll{
std::string(distinctCountStatsVector->valueAt(0)).c_str(), &allocator};
ASSERT_EQ(denseHll.cardinality(), 1000);
const auto maxDataSizeStatsVector =
result->childAt(nextColumnStatsIndex++)->asFlatVector<int64_t>();
ASSERT_EQ(maxDataSizeStatsVector->valueAt(0), 7);
const auto sumDataSizeStatsVector =
result->childAt(nextColumnStatsIndex++)->asFlatVector<int64_t>();
ASSERT_EQ(sumDataSizeStatsVector->valueAt(0), 6890);
const auto countStatsVector =
result->childAt(nextColumnStatsIndex++)->asFlatVector<int64_t>();
ASSERT_EQ(countStatsVector->valueAt(0), 1000);
const auto countIfStatsVector =
result->childAt(nextColumnStatsIndex++)->asFlatVector<int64_t>();
ASSERT_EQ(countStatsVector->valueAt(0), 1000);
const auto countMapStatsVector =
result->childAt(nextColumnStatsIndex++)->asFlatVector<int64_t>();
ASSERT_EQ(countMapStatsVector->valueAt(0), 1000);
const auto sumDataSizeMapStatsVector =
result->childAt(nextColumnStatsIndex++)->asFlatVector<int64_t>();
ASSERT_EQ(sumDataSizeMapStatsVector->valueAt(0), 64000);
const auto countArrayStatsVector =
result->childAt(nextColumnStatsIndex++)->asFlatVector<int64_t>();
ASSERT_EQ(countArrayStatsVector->valueAt(0), 1000);
const auto sumDataSizeArrayStatsVector =
result->childAt(nextColumnStatsIndex++)->asFlatVector<int64_t>();
ASSERT_EQ(sumDataSizeArrayStatsVector->valueAt(0), 64000);
}

TEST_P(AllTableWriterTest, columnStats) {
auto input = makeVectors(1, 100);
createDuckDbTable(input);
Expand Down

0 comments on commit 09ed962

Please sign in to comment.