Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix](light weight schema change) support delete condition in schema change #11869

Merged
merged 6 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,11 @@ Status OlapScanner::_init_tablet_reader_params(
std::copy(_tablet->delete_predicates().cbegin(), _tablet->delete_predicates().cend(),
std::inserter(_tablet_reader_params.delete_predicates,
_tablet_reader_params.delete_predicates.begin()));

// Merge the columns in delete predicate that not in latest schema in to current tablet schema
for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) {
_tablet_schema->merge_dropped_columns(
_tablet->tablet_schema(Version(del_pred_pb.version(), del_pred_pb.version())));
}
// Range
for (auto key_range : key_ranges) {
if (key_range->begin_scan_range.size() == 1 &&
Expand Down
39 changes: 20 additions & 19 deletions be/src/olap/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
#include "gen_cpp/olap_file.pb.h"
#include "olap/olap_common.h"
#include "olap/olap_cond.h"
#include "olap/reader.h"
#include "olap/predicate_creator.h"
#include "olap/tablet.h"
#include "olap/utils.h"

using apache::thrift::ThriftDebugString;
Expand Down Expand Up @@ -237,52 +238,54 @@ bool DeleteHandler::_parse_condition(const std::string& condition_str, TConditio
return true;
}

Status DeleteHandler::init(TabletSchemaSPtr schema,
const std::vector<DeletePredicatePB>& delete_conditions, int64_t version,
const TabletReader* reader) {
Status DeleteHandler::init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr tablet_schema,
const std::vector<DeletePredicatePB>& delete_conditions,
int64_t version) {
DCHECK(!_is_inited) << "reinitialize delete handler.";
DCHECK(version >= 0) << "invalid parameters. version=" << version;
_predicate_mem_pool.reset(new MemPool());

for (const auto& delete_condition : delete_conditions) {
// Skip the delete condition with large version
if (delete_condition.version() > version) {
continue;
}

// Need the tablet schema at the delete condition to parse the accurate column unique id
TabletSchemaSPtr delete_pred_related_schema = tablet->tablet_schema(
Version(delete_condition.version(), delete_condition.version()));
DeleteConditions temp;
temp.filter_version = delete_condition.version();
temp.del_cond = new (std::nothrow) Conditions();
temp.del_cond = new (std::nothrow) Conditions(tablet_schema);

if (temp.del_cond == nullptr) {
LOG(FATAL) << "fail to malloc Conditions. size=" << sizeof(Conditions);
return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
}

temp.del_cond->set_tablet_schema(schema);
for (const auto& sub_predicate : delete_condition.sub_predicates()) {
TCondition condition;
if (!_parse_condition(sub_predicate, &condition)) {
LOG(WARNING) << "fail to parse condition. [condition=" << sub_predicate << "]";
return Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_PARAMETERS);
}

condition.__set_column_unique_id(
delete_pred_related_schema->column(condition.column_name).unique_id());
Status res = temp.del_cond->append_condition(condition);
if (!res.ok()) {
LOG(WARNING) << "fail to append condition.res = " << res;
return res;
}

if (reader != nullptr) {
auto predicate = reader->_parse_to_predicate(condition, true);
if (predicate != nullptr) {
temp.column_predicate_vec.push_back(predicate);
}
auto predicate =
parse_to_predicate(tablet_schema, condition, _predicate_mem_pool.get(), true);
if (predicate != nullptr) {
temp.column_predicate_vec.push_back(predicate);
}
}

for (const auto& in_predicate : delete_condition.in_predicates()) {
TCondition condition;
condition.__set_column_name(in_predicate.column_name());
condition.__set_column_unique_id(
delete_pred_related_schema->column(condition.column_name).unique_id());
if (in_predicate.is_not_in()) {
condition.__set_condition_op("!*=");
} else {
Expand All @@ -296,10 +299,8 @@ Status DeleteHandler::init(TabletSchemaSPtr schema,
LOG(WARNING) << "fail to append condition.res = " << res;
return res;
}

if (reader != nullptr) {
temp.column_predicate_vec.push_back(reader->_parse_to_predicate(condition, true));
}
temp.column_predicate_vec.push_back(
parse_to_predicate(tablet_schema, condition, _predicate_mem_pool.get(), true));
}

_del_conds.emplace_back(std::move(temp));
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/delete_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace doris {

class Conditions;
class RowCursor;
class Tablet;
class TabletReader;
class TabletSchema;

Expand Down Expand Up @@ -90,8 +91,8 @@ class DeleteHandler {
// return:
// * Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_PARAMETERS): input parameters are not valid
// * Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR): alloc memory failed
Status init(TabletSchemaSPtr schema, const std::vector<DeletePredicatePB>& delete_conditions,
int64_t version, const doris::TabletReader* = nullptr);
Status init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr tablet_schema,
const std::vector<DeletePredicatePB>& delete_conditions, int64_t version);

// Return the delete conditions' size.
size_t conditions_num() const { return _del_conds.size(); }
Expand All @@ -118,6 +119,7 @@ class DeleteHandler {
bool _is_inited = false;
// DeleteConditions in _del_conds are in 'OR' relationship
std::vector<DeleteConditions> _del_conds;
std::unique_ptr<MemPool> _predicate_mem_pool;

DISALLOW_COPY_AND_ASSIGN(DeleteHandler);
};
Expand Down
20 changes: 16 additions & 4 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,14 @@ Status Merger::merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
std::inserter(reader_params.delete_predicates,
reader_params.delete_predicates.begin()));
}

reader_params.tablet_schema = cur_tablet_schema;
TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
merge_tablet_schema->copy_from(*cur_tablet_schema);
// Merge the columns in delete predicate that not in latest schema in to current tablet schema
for (auto& del_pred_pb : reader_params.delete_predicates) {
merge_tablet_schema->merge_dropped_columns(
tablet->tablet_schema(Version(del_pred_pb.version(), del_pred_pb.version())));
}
reader_params.tablet_schema = merge_tablet_schema;
RETURN_NOT_OK(reader.init(reader_params));

RowCursor row_cursor;
Expand Down Expand Up @@ -108,14 +114,20 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
reader_params.reader_type = reader_type;
reader_params.rs_readers = src_rowset_readers;
reader_params.version = dst_rowset_writer->version();
reader_params.tablet_schema = cur_tablet_schema;
{
std::shared_lock rdlock(tablet->get_header_lock());
std::copy(tablet->delete_predicates().cbegin(), tablet->delete_predicates().cend(),
std::inserter(reader_params.delete_predicates,
reader_params.delete_predicates.begin()));
}

TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't repeat yourself. same as the line 50, maybe we need a function

merge_tablet_schema->copy_from(*cur_tablet_schema);
// Merge the columns in delete predicate that not in latest schema in to current tablet schema
for (auto& del_pred_pb : reader_params.delete_predicates) {
merge_tablet_schema->merge_dropped_columns(
tablet->tablet_schema(Version(del_pred_pb.version(), del_pred_pb.version())));
}
reader_params.tablet_schema = merge_tablet_schema;
if (tablet->enable_unique_key_merge_on_write()) {
reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/olap_cond.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ bool CondColumn::eval(const segment_v2::BloomFilter* bf) const {

Status Conditions::append_condition(const TCondition& tcond) {
DCHECK(_schema != nullptr);
int32_t index = _schema->field_index(tcond.column_name);
int32_t index = _schema->field_index(tcond.column_unique_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we hide index in the TabletSchema? We can use a function like this _schema->get_clolumn_by_unique_id(tcond.column_unique_id).

if (index < 0) {
LOG(WARNING) << "fail to get field index, field name=" << tcond.column_name;
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
Expand Down
8 changes: 3 additions & 5 deletions be/src/olap/olap_cond.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,10 @@ class Conditions {
public:
// Key: field index of condition's column
// Value: CondColumn object
// col_unique_id --> CondColumn
typedef std::map<int32_t, CondColumn*> CondColumns;

Conditions() {}
Conditions(TabletSchemaSPtr schema) : _schema(schema) {}
~Conditions() { finalize(); }

void finalize() {
Expand All @@ -157,9 +158,6 @@ class Conditions {
}
bool empty() const { return _columns.empty(); }

// TODO(yingchun): should do it in constructor
void set_tablet_schema(TabletSchemaSPtr schema) { _schema = schema; }

// 如果成功,则_columns中增加一项,如果失败则无视此condition,同时输出日志
// 对于下列情况,将不会被处理
// 1. column不属于key列
Expand All @@ -168,7 +166,7 @@ class Conditions {

const CondColumns& columns() const { return _columns; }

CondColumn* get_column(int32_t cid) const;
CondColumn* get_column(int32_t col_unique_id) const;

private:
bool _cond_column_is_key_or_duplicate(const CondColumn* cc) const {
Expand Down
47 changes: 47 additions & 0 deletions be/src/olap/predicate_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
#include "olap/column_predicate.h"
#include "olap/comparison_predicate.h"
#include "olap/in_list_predicate.h"
#include "olap/null_predicate.h"
#include "olap/olap_cond.h"
#include "olap/tablet_schema.h"
#include "util/date_func.h"
#include "util/string_util.h"

namespace doris {

Expand Down Expand Up @@ -260,4 +262,49 @@ inline ColumnPredicate* create_list_predicate(const TabletColumn& column, int in
pool);
}

// This method is called in reader and in deletehandler.
// When it is called by delete handler, then it should use the delete predicate's tablet schema
// to parse the conditions.
inline ColumnPredicate* parse_to_predicate(TabletSchemaSPtr tablet_schema,
const TCondition& condition, MemPool* mem_pool,
bool opposite = false) {
int32_t col_unique_id = condition.column_unique_id;
// TODO: not equal and not in predicate is not pushed down
const TabletColumn& column = tablet_schema->column_by_uid(col_unique_id);
uint32_t index = tablet_schema->field_index(col_unique_id);

if (to_lower(condition.condition_op) == "is") {
return new NullPredicate(index, to_lower(condition.condition_values[0]) == "null",
opposite);
}

if ((condition.condition_op == "*=" || condition.condition_op == "!*=") &&
condition.condition_values.size() > 1) {
decltype(create_list_predicate<PredicateType::UNKNOWN>)* create = nullptr;

if (condition.condition_op == "*=") {
create = create_list_predicate<PredicateType::IN_LIST>;
} else {
create = create_list_predicate<PredicateType::NOT_IN_LIST>;
}
return create(column, index, condition.condition_values, opposite, mem_pool);
}

decltype(create_comparison_predicate<PredicateType::UNKNOWN>)* create = nullptr;
if (condition.condition_op == "*=" || condition.condition_op == "=") {
create = create_comparison_predicate<PredicateType::EQ>;
} else if (condition.condition_op == "!*=" || condition.condition_op == "!=") {
create = create_comparison_predicate<PredicateType::NE>;
} else if (condition.condition_op == "<<") {
create = create_comparison_predicate<PredicateType::LT>;
} else if (condition.condition_op == "<=") {
create = create_comparison_predicate<PredicateType::LE>;
} else if (condition.condition_op == ">>") {
create = create_comparison_predicate<PredicateType::GT>;
} else if (condition.condition_op == ">=") {
create = create_comparison_predicate<PredicateType::GE>;
}
return create(column, index, condition.condition_values[0], opposite, mem_pool);
}

} //namespace doris
Loading