Skip to content

Commit

Permalink
� This is a combination of 2 commits.
Browse files Browse the repository at this point in the history
� This is the 1st commit message:

Support Grouping Sets, Rollup and Cube to extend group by statement (apache#2039)

[ADD] compatible with hive grouping clause (apache#2039)
[ADD] support grouping functions in expr like grouping(a) + grouping(b) (apache#2039)

[FIX] fix analyzer error in window function(apache#2039)

� This is the commit message apache#2:

Support Grouping Sets, Rollup and Cube to extend group by statement (apache#2039)

[ADD] compatible with hive grouping clause (apache#2039)
[ADD] support grouping functions in expr like grouping(a) + grouping(b) (apache#2039)

[FIX] fix analyzer error in window function(apache#2039)

clean some code style

clean some code style

[FIX] fix grouping functions not substitute in subquery(apache#2039)

[ADD] add design docs

[FIX] fix expr in groupby clause remove duplicate error[apache#2039]

[FIX] fix code style problems

add some comments

[ADD] add fe unit test

[FIX] fix according comments and add syntax check in where clause

remve unused code

fix by comments
  • Loading branch information
yangzhg committed Dec 16, 2019
1 parent e65a645 commit 64446af
Show file tree
Hide file tree
Showing 53 changed files with 3,870 additions and 414 deletions.
2 changes: 2 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "exprs/utility_functions.h"
#include "exprs/json_functions.h"
#include "exprs/hll_hash_function.h"
#include "exprs/grouping_sets_functions.h"
#include "exprs/timezone_db.h"
#include "exprs/bitmap_function.h"
#include "exprs/hll_function.h"
Expand Down Expand Up @@ -272,6 +273,7 @@ void init_daemon(int argc, char** argv, const std::vector<StorePath>& paths) {
HllHashFunctions::init();
ESFunctions::init();
GeoFunctions::init();
GroupingSetsFunctions::init();
TimezoneDatabase::init();
BitmapFunctions::init();
HllFunctions::init();
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ set(EXEC_FILES
spill_sort_node.cc
union_node.cpp
union_node_ir.cpp
repeat_node.cpp
schema_scanner.cpp
schema_scan_node.cpp
schema_scanner/schema_tables_scanner.cpp
Expand Down
7 changes: 6 additions & 1 deletion be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "exec/analytic_eval_node.h"
#include "exec/select_node.h"
#include "exec/union_node.h"
#include "exec/repeat_node.h"
#include "exec/assert_num_rows_node.h"
#include "runtime/exec_env.h"
#include "runtime/descriptors.h"
Expand Down Expand Up @@ -452,10 +453,14 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
*node = pool->add(new BrokerScanNode(pool, tnode, descs));
return Status::OK();

case TPlanNodeType::REPEAT_NODE:
*node = pool->add(new RepeatNode(pool, tnode, descs));
return Status::OK();

case TPlanNodeType::ASSERT_NUM_ROWS_NODE:
*node = pool->add(new AssertNumRowsNode(pool, tnode, descs));
return Status::OK();

default:
map<int, const char*>::const_iterator i =
_TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
Expand Down
224 changes: 224 additions & 0 deletions be/src/exec/repeat_node.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/repeat_node.h"

#include "exprs/expr.h"
#include "runtime/raw_value.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "gutil/strings/join.h"

namespace doris {

RepeatNode::RepeatNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
_slot_id_set_list(tnode.repeat_node.slot_id_set_list),
_repeat_id_list(tnode.repeat_node.repeat_id_list),
_grouping_list(tnode.repeat_node.grouping_list),
_output_tuple_id(tnode.repeat_node.output_tuple_id),
_tuple_desc(nullptr),
_child_row_batch(nullptr),
_child_eos(false),
_repeat_id_idx(0),
_runtime_state(nullptr) {
}

RepeatNode::~RepeatNode() {
}

Status RepeatNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
DCHECK(_repeat_id_idx >= 0);
for (const std::vector<int64_t>& v : _grouping_list) {
DCHECK(_repeat_id_idx <= (int)v.size());
}
_runtime_state = state;
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
if (_tuple_desc == NULL) {
return Status::InternalError("Failed to get tuple descriptor.");
}

return Status::OK();
}

Status RepeatNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(child(0)->open(state));
return Status::OK();
}

/**
* copy the rows to new tuple based on repeat_id_idx and _repeat_id_list and fill in row_batch,
* and then set grouping_id and other grouping function slot in child_row_batch
* e.g. _repeat_id_list = [0, 3, 1, 2], _repeat_id_idx = 2, _grouping_list [[0, 3, 1, 2], [0, 1, 1, 0]],
* row_batch tuple 0 ['a', 'b', 1] -> [['a', null, 1] tuple 1 [1, 1]]
*/
Status RepeatNode::get_repeated_batch(
RowBatch* child_row_batch, int repeat_id_idx, RowBatch* row_batch) {

DCHECK(child_row_batch != nullptr);
DCHECK_EQ(row_batch->num_rows(), 0);

// Fill all slots according to child
MemPool* tuple_pool = row_batch->tuple_data_pool();
const vector<TupleDescriptor*>& src_tuple_descs = child_row_batch->row_desc().tuple_descriptors();
const vector<TupleDescriptor*>& dst_tuple_descs = row_batch->row_desc().tuple_descriptors();
vector<Tuple*> dst_tuples(src_tuple_descs.size(), nullptr);
for (int i = 0; i < child_row_batch->num_rows(); ++i) {
int row_idx = row_batch->add_row();
TupleRow* dst_row = row_batch->get_row(row_idx);
TupleRow* src_row = child_row_batch->get_row(i);

auto src_it = src_tuple_descs.begin();
auto dst_it = dst_tuple_descs.begin();
for (int j = 0; src_it != src_tuple_descs.end() && dst_it != dst_tuple_descs.end();
++src_it, ++dst_it, ++j) {
Tuple* src_tuple = src_row->get_tuple(j);
if (src_tuple == NULL) {
continue;
}

if (dst_tuples[j] == nullptr) {
int size = row_batch->capacity() * (*dst_it)->byte_size();
void* tuple_buffer = tuple_pool->allocate(size);
if (tuple_buffer == nullptr) {
return Status::InternalError("Allocate memory for row batch failed.");
}
dst_tuples[j] = reinterpret_cast<Tuple*>(tuple_buffer);
} else {
char* new_tuple = reinterpret_cast<char*>(dst_tuples[j]);
new_tuple += (*dst_it)->byte_size();
dst_tuples[j] = reinterpret_cast<Tuple*>(new_tuple);
}
dst_row->set_tuple(j, dst_tuples[j]);
memset(dst_tuples[j], 0, (*dst_it)->num_null_bytes());
src_tuple->deep_copy(dst_tuples[j], **dst_it, tuple_pool);
for (int k = 0; k < (*src_it)->slots().size(); k++) {
SlotDescriptor* src_slot_desc = (*src_it)->slots()[k];
SlotDescriptor* dst_slot_desc = (*dst_it)->slots()[k];
DCHECK_EQ(src_slot_desc->type().type, dst_slot_desc->type().type);
DCHECK_EQ(src_slot_desc->col_name(), dst_slot_desc->col_name());
// set null base on repeated list
if (_slot_id_set_list[0].find(src_slot_desc->id()) != _slot_id_set_list[0].end()) {
std::set<SlotId>& repeat_ids = _slot_id_set_list[repeat_id_idx];
if (repeat_ids.find(src_slot_desc->id()) == repeat_ids.end()) {
dst_tuples[j]->set_null(dst_slot_desc->null_indicator_offset());
continue;
}
}
}
}
row_batch->commit_last_row();
}
Tuple *tuple = nullptr;
// Fill grouping ID to tuple
for (int i = 0; i < child_row_batch->num_rows(); ++i) {
int row_idx = i;
TupleRow *row = row_batch->get_row(row_idx);

if (tuple == nullptr) {
int size = row_batch->capacity() * _tuple_desc->byte_size();
void *tuple_buffer = tuple_pool->allocate(size);
if (tuple_buffer == nullptr) {
return Status::InternalError("Allocate memory for row batch failed.");
}
tuple = reinterpret_cast<Tuple *>(tuple_buffer);
} else {
char *new_tuple = reinterpret_cast<char *>(tuple);
new_tuple += _tuple_desc->byte_size();
tuple = reinterpret_cast<Tuple *>(new_tuple);
}

row->set_tuple(src_tuple_descs.size(), tuple);
memset(tuple, 0, _tuple_desc->num_null_bytes());

for(size_t slot_idx = 0; slot_idx < _grouping_list.size(); ++slot_idx) {
int64_t val = _grouping_list[slot_idx][repeat_id_idx];
DCHECK_LT(slot_idx, _tuple_desc->slots().size()) << "TupleDescriptor: " << _tuple_desc->debug_string();
const SlotDescriptor *slot_desc = _tuple_desc->slots()[slot_idx];
tuple->set_not_null(slot_desc->null_indicator_offset());
RawValue::write(&val, tuple, slot_desc, tuple_pool);
}
}

return Status::OK();
}

Status RepeatNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_CANCELLED(state);
// current child has finished its repeat, get child's next batch
if (_child_row_batch.get() == nullptr) {
if (_child_eos) {
*eos = true;
return Status::OK();
}

_child_row_batch.reset(
new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
RETURN_IF_ERROR(child(0)->get_next(state, _child_row_batch.get(), &_child_eos));

if (_child_row_batch->num_rows() <= 0) {
_child_row_batch.reset(nullptr);
*eos = true;
return Status::OK();
}
}

DCHECK_EQ(row_batch->num_rows(), 0);
RETURN_IF_ERROR(get_repeated_batch(_child_row_batch.get(), _repeat_id_idx, row_batch));
_repeat_id_idx++;

int size = _repeat_id_list.size();
if (_repeat_id_idx >= size) {
_child_row_batch.reset(nullptr);
_repeat_id_idx = 0;
}

return Status::OK();
}

Status RepeatNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
_child_row_batch.reset(nullptr);
RETURN_IF_ERROR(child(0)->close(state));
return ExecNode::close(state);
}

void RepeatNode::debug_string(int indentation_level, std::stringstream* out) const {
*out << string(indentation_level * 2, ' ');
*out << "RepeatNode(";
*out << "repeat pattern: [" << JoinElements(_repeat_id_list, ",") << "]\n";
*out << "add " << _grouping_list.size() << " columns. \n";
*out << "added column values: ";
for (const std::vector<int64_t> &v : _grouping_list ){
*out << "[" << JoinElements(v, ",") << "] ";
}
*out << "\n";
ExecNode::debug_string(indentation_level, out);
*out << ")";
}

}
61 changes: 61 additions & 0 deletions be/src/exec/repeat_node.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "exec/exec_node.h"

namespace doris {

class Tuple;
class RuntimeState;
class RowBatch;

// repeat tuple of children and set given slots to null, this class generates tuple rows according to the given
// _repeat_id_list, and sets the value of the slot corresponding to the grouping function according to _grouping_list
class RepeatNode : public ExecNode {
public:
RepeatNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
~RepeatNode();

virtual Status prepare(RuntimeState* state) override;
virtual Status open(RuntimeState* state) override;
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
virtual Status close(RuntimeState* state) override;

protected:
virtual void debug_string(int indentation_level, std::stringstream* out) const override;

private:
Status get_repeated_batch(RowBatch* child_row_batch, int repeat_id_idx, RowBatch* row_batch);

// Slot id set used to indicate those slots need to set to null.
std::vector<std::set<SlotId>> _slot_id_set_list;
// An integer bitmap list, it indicates the bit position of the exprs not null.
std::vector<int64_t> _repeat_id_list;
std::vector<std::vector<int64_t>> _grouping_list;
// Tulple id used for output, it has new slots.
TupleId _output_tuple_id;
const TupleDescriptor* _tuple_desc;

std::unique_ptr<RowBatch> _child_row_batch;
bool _child_eos;
int _repeat_id_idx;
RuntimeState* _runtime_state;
};

}
3 changes: 2 additions & 1 deletion be/src/exprs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ add_library(Exprs
agg_fn.cc
new_agg_fn_evaluator.cc
bitmap_function.cpp
hll_function.cpp)
hll_function.cpp
grouping_sets_functions.cpp)
36 changes: 36 additions & 0 deletions be/src/exprs/grouping_sets_functions.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exprs/grouping_sets_functions.h"

namespace doris {

void GroupingSetsFunctions::init() {
}

doris_udf::BigIntVal GroupingSetsFunctions::grouping_id(
doris_udf::FunctionContext* ctx, const doris_udf::BigIntVal& grouping_id) {
return grouping_id;
}

BigIntVal GroupingSetsFunctions::grouping(
doris_udf::FunctionContext* ctx, const doris_udf::BigIntVal& grouping) {
return grouping;
}

} // doris

Loading

0 comments on commit 64446af

Please sign in to comment.