Skip to content

Commit

Permalink
support partitioning with expressins
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Apr 3, 2023
1 parent f57ffab commit a4a71b4
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 90 deletions.
58 changes: 50 additions & 8 deletions utils/local-engine/Shuffle/NativeSplitter.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "NativeSplitter.h"
#include <cstddef>
#include <functional>
#include <memory>
#include <Functions/FunctionFactory.h>
Expand Down Expand Up @@ -27,19 +28,41 @@ void NativeSplitter::split(DB::Block & block)
{
return;
}
auto column_num = block.columns();
if (!output_header.columns()) [[unlikely]]
{
if (output_columns_indicies.empty())
{
output_header = block.cloneEmpty();
for (size_t i = 0; i < block.columns(); ++i)
{
output_columns_indicies.push_back(i);
}
}
else
{
DB::ColumnsWithTypeAndName cols;
for (const auto & index : output_columns_indicies)
{
cols.push_back(block.getByPosition(index));
}
output_header = DB::Block(cols);
}
}
computePartitionId(block);
const auto & columns = block.getColumns();
for (size_t i = 0; i < column_num; i++)
DB::Block out_block;
for (size_t col = 0; col < output_header.columns(); ++col)
{
out_block.insert(block.getByPosition(output_columns_indicies[col]));
}
for (size_t col = 0; col < output_header.columns(); ++col)
{
auto materialized_column = columns[i]->convertToFullColumnIfConst();
for (size_t j = 0; j < partition_info.partition_num; ++j)
{
size_t from = partition_info.partition_start_points[j];
size_t length = partition_info.partition_start_points[j + 1] - from;
if (length == 0)
continue; // no data for this partition continue;
partition_buffer[j]->appendSelective(i, block, partition_info.partition_selector, from, length);
partition_buffer[j]->appendSelective(col, out_block, partition_info.partition_selector, from, length);
}
}

Expand Down Expand Up @@ -161,10 +184,19 @@ HashNativeSplitter::HashNativeSplitter(NativeSplitter::Options options_, jobject
: NativeSplitter(options_, input)
{
Poco::StringTokenizer exprs_list(options_.exprs_buffer, ",");
std::vector<std::string> hash_fields;
hash_fields.insert(hash_fields.end(), exprs_list.begin(), exprs_list.end());
std::vector<size_t> hash_fields;
for (auto iter = exprs_list.begin(); iter != exprs_list.end(); ++iter)
{
hash_fields.push_back(std::stoi(*iter));
}

selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, std::vector<std::size_t>(), "cityHash64");
Poco::StringTokenizer output_column_tokenizer(options_.schema_buffer, ",");
for (auto iter = output_column_tokenizer.begin(); iter != output_column_tokenizer.end(); ++iter)
{
output_columns_indicies.push_back(std::stoi(*iter));
}

selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, "cityHash64");
}

void HashNativeSplitter::computePartitionId(Block & block)
Expand All @@ -174,6 +206,11 @@ void HashNativeSplitter::computePartitionId(Block & block)

RoundRobinNativeSplitter::RoundRobinNativeSplitter(NativeSplitter::Options options_, jobject input) : NativeSplitter(options_, input)
{
Poco::StringTokenizer output_column_tokenizer(options_.schema_buffer, ",");
for (auto iter = output_column_tokenizer.begin(); iter != output_column_tokenizer.end(); ++iter)
{
output_columns_indicies.push_back(std::stoi(*iter));
}
selector_builder = std::make_unique<RoundRobinSelectorBuilder>(options_.partition_nums);
}

Expand All @@ -185,6 +222,11 @@ void RoundRobinNativeSplitter::computePartitionId(Block & block)
RangePartitionNativeSplitter::RangePartitionNativeSplitter(NativeSplitter::Options options_, jobject input)
: NativeSplitter(options_, input)
{
Poco::StringTokenizer output_column_tokenizer(options_.schema_buffer, ",");
for (auto iter = output_column_tokenizer.begin(); iter != output_column_tokenizer.end(); ++iter)
{
output_columns_indicies.push_back(std::stoi(*iter));
}
selector_builder = std::make_unique<RangeSelectorBuilder>(options_.exprs_buffer, options_.partition_nums);
}

Expand Down
4 changes: 3 additions & 1 deletion utils/local-engine/Shuffle/NativeSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class NativeSplitter : BlockIterator
size_t buffer_size = DEFAULT_BLOCK_SIZE;
size_t partition_nums;
std::string exprs_buffer;
std::string schema_buffer;
};

struct Holder
Expand All @@ -48,7 +49,8 @@ class NativeSplitter : BlockIterator
virtual void computePartitionId(DB::Block &) { }
Options options;
PartitionInfo partition_info;

std::vector<size_t> output_columns_indicies;
DB::Block output_header;

private:
void split(DB::Block & block);
Expand Down
51 changes: 6 additions & 45 deletions utils/local-engine/Shuffle/SelectorBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
}

namespace local_engine
{

Expand Down Expand Up @@ -59,29 +60,19 @@ PartitionInfo RoundRobinSelectorBuilder::build(DB::Block & block)

HashSelectorBuilder::HashSelectorBuilder(
UInt32 parts_num_,
const std::vector<std::string> & exprs_,
const std::vector<std::size_t> & exprs_index_,
const std::vector<size_t> & exprs_index_,
const std::string & hash_function_name_)
: parts_num(parts_num_), exprs(exprs_), exprs_index(exprs_index_), hash_function_name(hash_function_name_)
: parts_num(parts_num_), exprs_index(exprs_index_), hash_function_name(hash_function_name_)
{
}

PartitionInfo HashSelectorBuilder::build(DB::Block & block)
{
ColumnsWithTypeAndName args;
auto rows = block.rows();
for (size_t i = 0; i < exprs.size(); i++)
for (size_t i = 0; i < exprs_index.size(); i++)
{
auto & name = exprs.at(i);
auto * type_and_name = block.findByName(name);
if (type_and_name == nullptr)
{
args.emplace_back(block.getByPosition(exprs_index.at(i)));
}
else
{
args.emplace_back(block.getByName(name));
}
args.emplace_back(block.getByPosition(exprs_index.at(i)));
}

if (!hash_function) [[unlikely]]
Expand Down Expand Up @@ -123,17 +114,6 @@ RangeSelectorBuilder::RangeSelectorBuilder(const std::string & option, const siz
{
Poco::JSON::Parser parser;
auto info = parser.parse(option).extract<Poco::JSON::Object::Ptr>();
if (info->has("projection_plan"))
{
// for convenient, we use a serialzied protobuf to store the projeciton plan
String encoded_str = info->get("projection_plan").convert<std::string>();
Poco::MemoryInputStream istr(encoded_str.data(), encoded_str.size());
Poco::Base64Decoder decoder(istr);
String decoded_str;
Poco::StreamCopier::copyToString(decoder, decoded_str);
projection_plan_pb = std::make_unique<substrait::Plan>();
projection_plan_pb->ParseFromString(decoded_str);
}
auto ordering_infos = info->get("ordering").extract<Poco::JSON::Array::Ptr>();
initSortInformation(ordering_infos);
initRangeBlock(info->get("range_bounds").extract<Poco::JSON::Array::Ptr>());
Expand All @@ -143,26 +123,7 @@ RangeSelectorBuilder::RangeSelectorBuilder(const std::string & option, const siz
PartitionInfo RangeSelectorBuilder::build(DB::Block & block)
{
DB::IColumn::Selector result;
if (projection_plan_pb)
{
if (!has_init_actions_dag) [[unlikely]]
initActionsDAG(block);
DB::Block copied_block = block;
projection_expression_actions->execute(copied_block, block.rows());

// need to append the order keys columns to the original block
DB::ColumnsWithTypeAndName columns = block.getColumnsWithTypeAndName();
for (const auto & projected_col : copied_block.getColumnsWithTypeAndName())
{
columns.push_back(projected_col);
}
DB::Block projected_block(columns);
computePartitionIdByBinarySearch(projected_block, result);
}
else
{
computePartitionIdByBinarySearch(block, result);
}
computePartitionIdByBinarySearch(block, result);
return PartitionInfo::fromSelector(std::move(result), partition_num);
}

Expand Down
6 changes: 2 additions & 4 deletions utils/local-engine/Shuffle/SelectorBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ class HashSelectorBuilder
public:
explicit HashSelectorBuilder(
UInt32 parts_num_,
const std::vector<std::string> & exprs_,
const std::vector<std::size_t> & exprs_index_,
const std::vector<size_t> & exprs_index_,
const std::string & hash_function_name_);
PartitionInfo build(DB::Block & block);
private:
UInt32 parts_num;
std::vector<std::string> exprs;
std::vector<std::size_t> exprs_index;
std::vector<size_t> exprs_index;
std::string hash_function_name;
DB::FunctionBasePtr hash_function;
};
Expand Down
69 changes: 51 additions & 18 deletions utils/local-engine/Shuffle/ShuffleSplitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,40 @@ SplitResult ShuffleSplitter::stop()
}
void ShuffleSplitter::splitBlockByPartition(DB::Block & block)
{
auto column_num = block.columns();
for (size_t i = 0; i < column_num; i++)
if (!output_header.columns()) [[unlikely]]
{
if (output_columns_indicies.empty())
{
output_header = block.cloneEmpty();
for (size_t i = 0; i < block.columns(); ++i)
{
output_columns_indicies.push_back(i);
}
}
else
{
DB::ColumnsWithTypeAndName cols;
for (const auto & index : output_columns_indicies)
{
cols.push_back(block.getByPosition(index));
}
output_header = DB::Block(cols);
}
}
DB::Block out_block;
for (size_t col = 0; col < output_header.columns(); ++col)
{
out_block.insert(block.getByPosition(output_columns_indicies[col]));
}
for (size_t col = 0; col < output_header.columns(); ++col)
{
for (size_t j = 0; j < partition_info.partition_num; ++j)
{
size_t from = partition_info.partition_start_points[j];
size_t length = partition_info.partition_start_points[j + 1] - from;
if (length == 0)
continue; // no data for this partition continue;
partition_buffer[j].appendSelective(i, block, partition_info.partition_selector, from, length);
partition_buffer[j].appendSelective(col, out_block, partition_info.partition_selector, from, length);
}
}

Expand Down Expand Up @@ -213,7 +237,7 @@ void ColumnsBuffer::add(DB::Block & block, int start, int end)
{
if (header.columns() == 0)
header = block.cloneEmpty();
if (accumulated_columns.empty())
if (accumulated_columns.empty()) [[unlikely]]
{
accumulated_columns.reserve(block.columns());
for (size_t i = 0; i < block.columns(); i++)
Expand All @@ -232,7 +256,7 @@ void ColumnsBuffer::appendSelective(size_t column_idx, const DB::Block & source,
{
if (header.columns() == 0)
header = source.cloneEmpty();
if (accumulated_columns.empty())
if (accumulated_columns.empty()) [[unlikely]]
{
accumulated_columns.reserve(source.columns());
for (size_t i = 0; i < source.columns(); i++)
Expand Down Expand Up @@ -276,7 +300,12 @@ ColumnsBuffer::ColumnsBuffer(size_t prefer_buffer_size_) : prefer_buffer_size(pr

RoundRobinSplitter::RoundRobinSplitter(SplitOptions options_) : ShuffleSplitter(std::move(options_))
{
selector_builder = std::make_unique<RoundRobinSelectorBuilder>(options.partition_nums);
Poco::StringTokenizer output_column_tokenizer(options_.out_exprs, ",");
for (auto iter = output_column_tokenizer.begin(); iter != output_column_tokenizer.end(); ++iter)
{
output_columns_indicies.push_back(std::stoi(*iter));
}
selector_builder = std::make_unique<RoundRobinSelectorBuilder>(options.partition_nums);
}

void RoundRobinSplitter::computeAndCountPartitionId(DB::Block & block)
Expand All @@ -294,21 +323,20 @@ std::unique_ptr<ShuffleSplitter> RoundRobinSplitter::create(SplitOptions && opti

HashSplitter::HashSplitter(SplitOptions options_) : ShuffleSplitter(std::move(options_))
{
Poco::StringTokenizer exprs_list(options_.exprs, ",");
std::vector<std::string> hash_fields;
hash_fields.insert(hash_fields.end(), exprs_list.begin(), exprs_list.end());
Poco::StringTokenizer exprs_list(options_.hash_exprs, ",");
std::vector<size_t> hash_fields;
for (auto iter = exprs_list.begin(); iter != exprs_list.end(); ++iter)
{
hash_fields.push_back(std::stoi(*iter));
}

std::vector<std::size_t> hash_fields_index;
if (!options_.exprs_index.empty())
Poco::StringTokenizer output_column_tokenizer(options_.out_exprs, ",");
for (auto iter = output_column_tokenizer.begin(); iter != output_column_tokenizer.end(); ++iter)
{
Poco::StringTokenizer exprs_list_index(options_.exprs_index, ",");
for (const auto & expr_index : exprs_list_index)
{
hash_fields_index.insert(hash_fields_index.end(), static_cast<size_t>(stoi(expr_index)));
}
output_columns_indicies.push_back(std::stoi(*iter));
}

selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, hash_fields_index, "cityHash64");
selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_nums, hash_fields, "cityHash64");
}
std::unique_ptr<ShuffleSplitter> HashSplitter::create(SplitOptions && options_)
{
Expand All @@ -330,7 +358,12 @@ std::unique_ptr<ShuffleSplitter> RangeSplitter::create(SplitOptions && options_)

RangeSplitter::RangeSplitter(SplitOptions options_) : ShuffleSplitter(std::move(options_))
{
selector_builder = std::make_unique<RangeSelectorBuilder>(options.exprs, options.partition_nums);
Poco::StringTokenizer output_column_tokenizer(options_.out_exprs, ",");
for (auto iter = output_column_tokenizer.begin(); iter != output_column_tokenizer.end(); ++iter)
{
output_columns_indicies.push_back(std::stoi(*iter));
}
selector_builder = std::make_unique<RangeSelectorBuilder>(options.hash_exprs, options.partition_nums);
}
void RangeSplitter::computeAndCountPartitionId(DB::Block & block)
{
Expand Down
6 changes: 4 additions & 2 deletions utils/local-engine/Shuffle/ShuffleSplitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ struct SplitOptions
int shuffle_id;
int map_id;
size_t partition_nums;
std::string exprs;
std::string exprs_index;
std::string hash_exprs;
std::string out_exprs;
// std::vector<std::string> exprs;
std::string compress_method = "zstd";
int compress_level;
Expand Down Expand Up @@ -88,6 +88,8 @@ class ShuffleSplitter
std::vector<std::unique_ptr<DB::NativeWriter>> partition_outputs;
std::vector<std::unique_ptr<DB::WriteBuffer>> partition_write_buffers;
std::vector<std::unique_ptr<DB::WriteBuffer>> partition_cached_write_buffers;
std::vector<size_t> output_columns_indicies;
DB::Block output_header;
SplitOptions options;
SplitResult split_result;
};
Expand Down
Loading

0 comments on commit a4a71b4

Please sign in to comment.