Skip to content

Commit

Permalink
[Feature] Lightweight schema change of add/drop column (apache#10136)
Browse files Browse the repository at this point in the history
* [Schema Change] support fast add/drop column  (apache#49)

* [feature](schema-change) support fast schema change. coauthor: yixiutt

* [schema change] Using columns desc from fe to read data. coauthor: Lchangliang

* [feature](schema change) schema change optimize for add/drop columns.

1.add uniqueId field for class column.
2.schema change for add/drop columns directly update schema meta

Co-authored-by: yixiutt <yixiu@selectdb.com>
Co-authored-by: SWJTU-ZhangLei <1091517373@qq.com>

[Feature](schema change) fix write and add regression test (apache#69)

Co-authored-by: yixiutt <yixiu@selectdb.com>

[schema change] be ssupport that delete use newest schema

add delete regression test

fix regression case (apache#107)

tmp

[feature](schema change) light schema change exclude rollup and agg/uniq/dup key type.

[feature](schema change) fe olapTable maxUniqueId write in disk.

[feature](schema change) add rpc iface for sc add column.

[feature](schema change) add columnsDesc to TPushReq for ligtht sc.

resolve the deadlock when schema change (apache#124)

fix columns from fe don't has bitmap_index flag (apache#134)

add update/delete case

construct MATERIALIZED schema from origin schema when insert

fix not vectorized compaction coredump

use segment cache

choose newest schema by schema version when compaction (apache#182)

[bugfix](schema change) fix ligth schema change problem.

[feature](schema change) light schema change add alter job. (apache#1)

fix be ut

[bug] (schema change) unique drop key column should not light schema
change

[feature](schema change) add schema change regression-test.

fix regression test

[bugfix](schema change) fix multi alter clauses for light schema change. (apache#2)

[bugfix](schema change) fix multi clauses calculate column unique id (apache#3)

modify PushTask process (apache#217)

[Bugfix](schema change) fix jobId replay cause bdbje exception.

[bug](schema change) fix max col unique id repeatitive. (apache#232)

[optimize](schema change) modify pendingMaxColUniqueId generate rule.

fix compaction error
* fix be ut

* fix snapshot load core

fix unique_id error (apache#278)

[refact](fe) remove redundant code for light schema change. (apache#4)

[refact](fe) remove redundant code for light schema change. (apache#4)

format fe core

format be core

fix be ut

modify fe meta version

fix rebase error

flush schema into rowset_meta in old table

[refactor](schema change) refact fe light schema change. (apache#5)

delete the change of schemahash and support get max version schema

* modify for review

* fix be ut

* fix schema change test
  • Loading branch information
Lchangliang authored and eldenmoon committed Aug 1, 2022
1 parent c7c03a0 commit ad37873
Show file tree
Hide file tree
Showing 101 changed files with 5,683 additions and 757 deletions.
28 changes: 20 additions & 8 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
#include "exprs/expr_context.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "olap/decimal12.h"
#include "olap/field.h"
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "olap/uint24.h"
#include "olap_scan_node.h"
#include "olap_utils.h"
Expand Down Expand Up @@ -86,6 +88,14 @@ Status OlapScanner::prepare(
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
_tablet_schema = _tablet->tablet_schema();
if (!_parent->_olap_scan_node.columns_desc.empty() &&
_parent->_olap_scan_node.columns_desc[0].col_unique_id >= 0) {
_tablet_schema.clear_columns();
for (const auto& column_desc : _parent->_olap_scan_node.columns_desc) {
_tablet_schema.append_column(TabletColumn(column_desc));
}
}
{
std::shared_lock rdlock(_tablet->get_header_lock());
const RowsetSharedPtr rowset = _tablet->rowset_with_max_version();
Expand Down Expand Up @@ -170,6 +180,7 @@ Status OlapScanner::_init_tablet_reader_params(
RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode));

_tablet_reader_params.tablet = _tablet;
_tablet_reader_params.tablet_schema = &_tablet_schema;
_tablet_reader_params.reader_type = READER_QUERY;
_tablet_reader_params.aggregation = _aggregation;
_tablet_reader_params.version = Version(0, _version);
Expand Down Expand Up @@ -210,7 +221,7 @@ Status OlapScanner::_init_tablet_reader_params(
_tablet_reader_params.return_columns.push_back(i);
}
for (auto index : _return_columns) {
if (_tablet->tablet_schema().column(index).is_key()) {
if (_tablet_schema.column(index).is_key()) {
continue;
} else {
_tablet_reader_params.return_columns.push_back(index);
Expand All @@ -219,13 +230,12 @@ Status OlapScanner::_init_tablet_reader_params(
}

// use _tablet_reader_params.return_columns, because reader use this to merge sort
Status res =
_read_row_cursor.init(_tablet->tablet_schema(), _tablet_reader_params.return_columns);
Status res = _read_row_cursor.init(_tablet_schema, _tablet_reader_params.return_columns);
if (!res.ok()) {
LOG(WARNING) << "fail to init row cursor.res = " << res;
return Status::InternalError("failed to initialize storage read row cursor");
}
_read_row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
_read_row_cursor.allocate_memory_for_string_type(_tablet_schema);

// If a agg node is this scan node direct parent
// we will not call agg object finalize method in scan node,
Expand All @@ -244,15 +254,17 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) {
if (!slot->is_materialized()) {
continue;
}
int32_t index = _tablet->field_index(slot->col_name());
int32_t index = slot->col_unique_id() >= 0
? _tablet_schema.field_index(slot->col_unique_id())
: _tablet_schema.field_index(slot->col_name());
if (index < 0) {
std::stringstream ss;
ss << "field name is invalid. field=" << slot->col_name();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
_return_columns.push_back(index);
if (slot->is_nullable() && !_tablet->tablet_schema().column(index).is_nullable())
if (slot->is_nullable() && !_tablet_schema.column(index).is_nullable())
_tablet_columns_convert_to_null_set.emplace(index);
_query_slots.push_back(slot);
}
Expand All @@ -261,13 +273,13 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) {
if (_tablet->tablet_schema().has_sequence_col() && need_seq_col) {
bool has_replace_col = false;
for (auto col : _return_columns) {
if (_tablet->tablet_schema().column(col).aggregation() ==
if (_tablet_schema.column(col).aggregation() ==
FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) {
has_replace_col = true;
break;
}
}
if (auto sequence_col_idx = _tablet->tablet_schema().sequence_col_idx();
if (auto sequence_col_idx = _tablet_schema.sequence_col_idx();
has_replace_col && std::find(_return_columns.begin(), _return_columns.end(),
sequence_col_idx) == _return_columns.end()) {
_return_columns.push_back(sequence_col_idx);
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class OlapScanner {
MonotonicStopWatch _watcher;

std::shared_ptr<MemTracker> _mem_tracker;

TabletSchema _tablet_schema;
};

} // namespace doris
13 changes: 13 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ void OlapTableIndexSchema::to_protobuf(POlapTableIndexSchema* pindex) const {
for (auto slot : slots) {
pindex->add_columns(slot->col_name());
}
for (auto column : columns) {
column->to_schema_pb(pindex->add_columns_desc());
}
}

Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
Expand All @@ -57,6 +60,11 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
}
index->slots.emplace_back(it->second);
}
for (auto& pcolumn_desc : p_index.columns_desc()) {
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_pb(pcolumn_desc);
index->columns.emplace_back(tc);
}
_indexes.emplace_back(index);
}

Expand Down Expand Up @@ -90,6 +98,11 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
}
index->slots.emplace_back(it->second);
}
for (auto& tcolumn_desc : t_index.columns_desc) {
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_thrift(tcolumn_desc);
index->columns.emplace_back(tc);
}
_indexes.emplace_back(index);
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "common/status.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/descriptors.pb.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "runtime/raw_value.h"
#include "runtime/tuple.h"
Expand All @@ -41,6 +42,7 @@ struct OlapTableIndexSchema {
int64_t index_id;
std::vector<SlotDescriptor*> slots;
int32_t schema_hash;
std::vector<TabletColumn*> columns;

void to_protobuf(POlapTableIndexSchema* pindex) const;
};
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,15 @@ void BaseTablet::_gen_tablet_path() {
}
}

bool BaseTablet::set_tablet_schema_into_rowset_meta() {
bool flag = false;
for (RowsetMetaSharedPtr rowset_meta : _tablet_meta->all_mutable_rs_metas()) {
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
rowset_meta->set_tablet_schema(&_schema);
flag = true;
}
}
return flag;
}

} /* namespace doris */
4 changes: 3 additions & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
}

// properties encapsulated in TabletSchema
const TabletSchema& tablet_schema() const;
virtual const TabletSchema& tablet_schema() const;

bool set_tablet_schema_into_rowset_meta();

protected:
void _gen_tablet_path();
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/collect_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ Status CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
// then merged with the base rowset.
void CollectIterator::build_heap(const std::vector<RowsetReaderSharedPtr>& rs_readers) {
DCHECK(rs_readers.size() == _children.size());
_reverse = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS;
SortType sort_type = _reader->_tablet->tablet_schema().sort_type();
int sort_col_num = _reader->_tablet->tablet_schema().sort_col_num();
_reverse = _reader->_tablet_schema->keys_type() == KeysType::UNIQUE_KEYS;
SortType sort_type = _reader->_tablet_schema->sort_type();
int sort_col_num = _reader->_tablet_schema->sort_col_num();
if (_children.empty()) {
_inner_iter.reset(nullptr);
return;
Expand Down Expand Up @@ -200,7 +200,7 @@ CollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader,
CollectIterator::Level0Iterator::~Level0Iterator() = default;

Status CollectIterator::Level0Iterator::init() {
RETURN_NOT_OK_LOG(_row_cursor.init(_reader->_tablet->tablet_schema(), _reader->_seek_columns),
RETURN_NOT_OK_LOG(_row_cursor.init(*_reader->_tablet_schema, _reader->_seek_columns),
"failed to init row cursor");
return (this->*_refresh_current_row)();
}
Expand Down
19 changes: 12 additions & 7 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

#include "olap/compaction.h"

#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/tablet.h"
#include "util/time.h"
#include "util/trace.h"

Expand Down Expand Up @@ -141,8 +144,10 @@ Status Compaction::do_compaction_impl(int64_t permits) {

LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version << ", permits: " << permits;
// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
const TabletSchema cur_tablet_schema = _tablet->tablet_schema();

RETURN_NOT_OK(construct_output_rowset_writer());
RETURN_NOT_OK(construct_output_rowset_writer(&cur_tablet_schema));
RETURN_NOT_OK(construct_input_rowset_readers());
TRACE("prepare finished");

Expand All @@ -152,11 +157,11 @@ Status Compaction::do_compaction_impl(int64_t permits) {
Status res;

if (use_vectorized_compaction) {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers,
_output_rs_writer.get(), &stats);
res = Merger::vmerge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
} else {
res = Merger::merge_rowsets(_tablet, compaction_type(), _input_rs_readers,
_output_rs_writer.get(), &stats);
res = Merger::merge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
}

if (!res.ok()) {
Expand Down Expand Up @@ -219,8 +224,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
return Status::OK();
}

Status Compaction::construct_output_rowset_writer() {
return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING,
Status Compaction::construct_output_rowset_writer(const TabletSchema* schema) {
return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING, schema,
_oldest_write_timestamp, _newest_write_timestamp,
&_output_rs_writer);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class Compaction {
Status modify_rowsets();
void gc_output_rowset();

Status construct_output_rowset_writer();
Status construct_output_rowset_writer(const TabletSchema* schema);
Status construct_input_rowset_readers();

Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);
Expand Down
14 changes: 14 additions & 0 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ Status DataDir::load() {
<< " schema hash: " << rowset_meta->tablet_schema_hash()
<< " for txn: " << rowset_meta->txn_id();
}
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
rowset_meta->set_tablet_schema(&tablet->tablet_schema());
RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
rowset_meta->get_rowset_pb());
}
} else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
Status publish_status = tablet->add_rowset(rowset);
Expand All @@ -506,6 +511,15 @@ Status DataDir::load() {
++invalid_rowset_counter;
}
}

for (int64_t tablet_id : tablet_ids) {
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
if (tablet && tablet->set_tablet_schema_into_rowset_meta()) {
TabletMetaManager::save(this, tablet->tablet_id(), tablet->schema_hash(),
tablet->tablet_meta());
}
}

// At startup, we only count these invalid rowset, but do not actually delete it.
// The actual delete operation is in StorageEngine::_clean_unused_rowset_metas,
// which is cleaned up uniformly by the background cleanup thread.
Expand Down
27 changes: 20 additions & 7 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool
_tablet(nullptr),
_cur_rowset(nullptr),
_rowset_writer(nullptr),
_tablet_schema(nullptr),
_tablet_schema(new TabletSchema),
_delta_written_success(false),
_storage_engine(storage_engine),
_is_vec(is_vec) {}
Expand Down Expand Up @@ -121,10 +121,11 @@ Status DeltaWriter::init() {
RETURN_NOT_OK(_storage_engine->txn_manager()->prepare_txn(_req.partition_id, _tablet,
_req.txn_id, _req.load_id));
}
// build tablet schema in request level
_build_current_tablet_schema(_req.index_id, _req.ptable_schema_param, _tablet->tablet_schema());

RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING,
&_rowset_writer));
_tablet_schema = &(_tablet->tablet_schema());
_tablet_schema.get(), &_rowset_writer));
_schema.reset(new Schema(*_tablet_schema));
_reset_mem_table();

Expand Down Expand Up @@ -172,7 +173,6 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row
if (_is_cancelled) {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}

for (const auto& row_idx : row_idxs) {
_mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0));
}
Expand Down Expand Up @@ -266,9 +266,9 @@ Status DeltaWriter::wait_flush() {
}

void DeltaWriter::_reset_mem_table() {
_mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema, _req.slots,
_req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(),
_mem_tracker, _is_vec));
_mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema.get(),
_req.slots, _req.tuple_desc, _tablet->keys_type(),
_rowset_writer.get(), _mem_tracker, _is_vec));
}

Status DeltaWriter::close() {
Expand Down Expand Up @@ -367,4 +367,17 @@ int64_t DeltaWriter::partition_id() const {
return _req.partition_id;
}

void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
const POlapTableSchemaParam& ptable_schema_param,
const TabletSchema& ori_tablet_schema) {
*_tablet_schema = ori_tablet_schema;
//new tablet schame if new table
if (ptable_schema_param.indexes_size() > 0 &&
ptable_schema_param.indexes(0).columns_desc_size() != 0 &&
ptable_schema_param.indexes(0).columns_desc(0).unique_id() >= 0) {
_tablet_schema->build_current_tablet_schema(index_id, ptable_schema_param,
ori_tablet_schema);
}
}

} // namespace doris
12 changes: 11 additions & 1 deletion be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ struct WriteRequest {
// slots are in order of tablet's schema
const std::vector<SlotDescriptor*>* slots;
bool is_high_priority = false;
POlapTableSchemaParam ptable_schema_param;
int64_t index_id;
};

// Writer for a particular (load, index, tablet).
Expand Down Expand Up @@ -107,6 +109,10 @@ class DeltaWriter {

void _reset_mem_table();

void _build_current_tablet_schema(int64_t index_id,
const POlapTableSchemaParam& table_schema_param,
const TabletSchema& ori_tablet_schema);

bool _is_init = false;
bool _is_cancelled = false;
WriteRequest _req;
Expand All @@ -116,7 +122,11 @@ class DeltaWriter {
// TODO: Recheck the lifetime of _mem_table, Look should use unique_ptr
std::shared_ptr<MemTable> _mem_table;
std::unique_ptr<Schema> _schema;
const TabletSchema* _tablet_schema;
//const TabletSchema* _tablet_schema;
// tablet schema owned by delta writer, all write will use this tablet schema
// it's build from tablet_schema(stored when create tablet) and OlapTableSchema
// every request will have it's own tablet schema so simple schema change can work
std::unique_ptr<TabletSchema> _tablet_schema;
bool _delta_written_success;

StorageEngine* _storage_engine;
Expand Down
Loading

0 comments on commit ad37873

Please sign in to comment.