Skip to content

Commit 486cf0e

Browse files
authored
[Feature] Lightweight schema change of add/drop column (#10136)
* [Schema Change] support fast add/drop column (#49) * [feature](schema-change) support fast schema change. coauthor: yixiutt * [schema change] Using columns desc from fe to read data. coauthor: Lchangliang * [feature](schema change) schema change optimize for add/drop columns. 1.add uniqueId field for class column. 2.schema change for add/drop columns directly update schema meta Co-authored-by: yixiutt <yixiu@selectdb.com> Co-authored-by: SWJTU-ZhangLei <1091517373@qq.com> [Feature](schema change) fix write and add regression test (#69) Co-authored-by: yixiutt <yixiu@selectdb.com> [schema change] be ssupport that delete use newest schema add delete regression test fix regression case (#107) tmp [feature](schema change) light schema change exclude rollup and agg/uniq/dup key type. [feature](schema change) fe olapTable maxUniqueId write in disk. [feature](schema change) add rpc iface for sc add column. [feature](schema change) add columnsDesc to TPushReq for ligtht sc. resolve the deadlock when schema change (#124) fix columns from fe don't has bitmap_index flag (#134) add update/delete case construct MATERIALIZED schema from origin schema when insert fix not vectorized compaction coredump use segment cache choose newest schema by schema version when compaction (#182) [bugfix](schema change) fix ligth schema change problem. [feature](schema change) light schema change add alter job. (#1) fix be ut [bug] (schema change) unique drop key column should not light schema change [feature](schema change) add schema change regression-test. fix regression test [bugfix](schema change) fix multi alter clauses for light schema change. (#2) [bugfix](schema change) fix multi clauses calculate column unique id (#3) modify PushTask process (#217) [Bugfix](schema change) fix jobId replay cause bdbje exception. [bug](schema change) fix max col unique id repeatitive. (#232) [optimize](schema change) modify pendingMaxColUniqueId generate rule. fix compaction error * fix be ut * fix snapshot load core fix unique_id error (#278) [refact](fe) remove redundant code for light schema change. (#4) [refact](fe) remove redundant code for light schema change. (#4) format fe core format be core fix be ut modify fe meta version fix rebase error flush schema into rowset_meta in old table [refactor](schema change) refact fe light schema change. (#5) delete the change of schemahash and support get max version schema * modify for review * fix be ut * fix schema change test
1 parent 41f9ee2 commit 486cf0e

File tree

101 files changed

+5683
-757
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+5683
-757
lines changed

be/src/exec/olap_scanner.cpp

+20-8
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
#include "exprs/expr_context.h"
2424
#include "gen_cpp/PaloInternalService_types.h"
2525
#include "olap/decimal12.h"
26+
#include "olap/field.h"
2627
#include "olap/storage_engine.h"
28+
#include "olap/tablet_schema.h"
2729
#include "olap/uint24.h"
2830
#include "olap_scan_node.h"
2931
#include "olap_utils.h"
@@ -86,6 +88,14 @@ Status OlapScanner::prepare(
8688
LOG(WARNING) << ss.str();
8789
return Status::InternalError(ss.str());
8890
}
91+
_tablet_schema = _tablet->tablet_schema();
92+
if (!_parent->_olap_scan_node.columns_desc.empty() &&
93+
_parent->_olap_scan_node.columns_desc[0].col_unique_id >= 0) {
94+
_tablet_schema.clear_columns();
95+
for (const auto& column_desc : _parent->_olap_scan_node.columns_desc) {
96+
_tablet_schema.append_column(TabletColumn(column_desc));
97+
}
98+
}
8999
{
90100
std::shared_lock rdlock(_tablet->get_header_lock());
91101
const RowsetSharedPtr rowset = _tablet->rowset_with_max_version();
@@ -170,6 +180,7 @@ Status OlapScanner::_init_tablet_reader_params(
170180
RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode));
171181

172182
_tablet_reader_params.tablet = _tablet;
183+
_tablet_reader_params.tablet_schema = &_tablet_schema;
173184
_tablet_reader_params.reader_type = READER_QUERY;
174185
_tablet_reader_params.aggregation = _aggregation;
175186
_tablet_reader_params.version = Version(0, _version);
@@ -210,7 +221,7 @@ Status OlapScanner::_init_tablet_reader_params(
210221
_tablet_reader_params.return_columns.push_back(i);
211222
}
212223
for (auto index : _return_columns) {
213-
if (_tablet->tablet_schema().column(index).is_key()) {
224+
if (_tablet_schema.column(index).is_key()) {
214225
continue;
215226
} else {
216227
_tablet_reader_params.return_columns.push_back(index);
@@ -219,13 +230,12 @@ Status OlapScanner::_init_tablet_reader_params(
219230
}
220231

221232
// use _tablet_reader_params.return_columns, because reader use this to merge sort
222-
Status res =
223-
_read_row_cursor.init(_tablet->tablet_schema(), _tablet_reader_params.return_columns);
233+
Status res = _read_row_cursor.init(_tablet_schema, _tablet_reader_params.return_columns);
224234
if (!res.ok()) {
225235
LOG(WARNING) << "fail to init row cursor.res = " << res;
226236
return Status::InternalError("failed to initialize storage read row cursor");
227237
}
228-
_read_row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
238+
_read_row_cursor.allocate_memory_for_string_type(_tablet_schema);
229239

230240
// If a agg node is this scan node direct parent
231241
// we will not call agg object finalize method in scan node,
@@ -244,15 +254,17 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) {
244254
if (!slot->is_materialized()) {
245255
continue;
246256
}
247-
int32_t index = _tablet->field_index(slot->col_name());
257+
int32_t index = slot->col_unique_id() >= 0
258+
? _tablet_schema.field_index(slot->col_unique_id())
259+
: _tablet_schema.field_index(slot->col_name());
248260
if (index < 0) {
249261
std::stringstream ss;
250262
ss << "field name is invalid. field=" << slot->col_name();
251263
LOG(WARNING) << ss.str();
252264
return Status::InternalError(ss.str());
253265
}
254266
_return_columns.push_back(index);
255-
if (slot->is_nullable() && !_tablet->tablet_schema().column(index).is_nullable())
267+
if (slot->is_nullable() && !_tablet_schema.column(index).is_nullable())
256268
_tablet_columns_convert_to_null_set.emplace(index);
257269
_query_slots.push_back(slot);
258270
}
@@ -261,13 +273,13 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) {
261273
if (_tablet->tablet_schema().has_sequence_col() && need_seq_col) {
262274
bool has_replace_col = false;
263275
for (auto col : _return_columns) {
264-
if (_tablet->tablet_schema().column(col).aggregation() ==
276+
if (_tablet_schema.column(col).aggregation() ==
265277
FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) {
266278
has_replace_col = true;
267279
break;
268280
}
269281
}
270-
if (auto sequence_col_idx = _tablet->tablet_schema().sequence_col_idx();
282+
if (auto sequence_col_idx = _tablet_schema.sequence_col_idx();
271283
has_replace_col && std::find(_return_columns.begin(), _return_columns.end(),
272284
sequence_col_idx) == _return_columns.end()) {
273285
_return_columns.push_back(sequence_col_idx);

be/src/exec/olap_scanner.h

+2
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ class OlapScanner {
145145
MonotonicStopWatch _watcher;
146146

147147
std::shared_ptr<MemTracker> _mem_tracker;
148+
149+
TabletSchema _tablet_schema;
148150
};
149151

150152
} // namespace doris

be/src/exec/tablet_info.cpp

+13
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ void OlapTableIndexSchema::to_protobuf(POlapTableIndexSchema* pindex) const {
3131
for (auto slot : slots) {
3232
pindex->add_columns(slot->col_name());
3333
}
34+
for (auto column : columns) {
35+
column->to_schema_pb(pindex->add_columns_desc());
36+
}
3437
}
3538

3639
Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
@@ -57,6 +60,11 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
5760
}
5861
index->slots.emplace_back(it->second);
5962
}
63+
for (auto& pcolumn_desc : p_index.columns_desc()) {
64+
TabletColumn* tc = _obj_pool.add(new TabletColumn());
65+
tc->init_from_pb(pcolumn_desc);
66+
index->columns.emplace_back(tc);
67+
}
6068
_indexes.emplace_back(index);
6169
}
6270

@@ -90,6 +98,11 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
9098
}
9199
index->slots.emplace_back(it->second);
92100
}
101+
for (auto& tcolumn_desc : t_index.columns_desc) {
102+
TabletColumn* tc = _obj_pool.add(new TabletColumn());
103+
tc->init_from_thrift(tcolumn_desc);
104+
index->columns.emplace_back(tc);
105+
}
93106
_indexes.emplace_back(index);
94107
}
95108

be/src/exec/tablet_info.h

+2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "common/status.h"
2828
#include "gen_cpp/Descriptors_types.h"
2929
#include "gen_cpp/descriptors.pb.h"
30+
#include "olap/tablet_schema.h"
3031
#include "runtime/descriptors.h"
3132
#include "runtime/raw_value.h"
3233
#include "runtime/tuple.h"
@@ -41,6 +42,7 @@ struct OlapTableIndexSchema {
4142
int64_t index_id;
4243
std::vector<SlotDescriptor*> slots;
4344
int32_t schema_hash;
45+
std::vector<TabletColumn*> columns;
4446

4547
void to_protobuf(POlapTableIndexSchema* pindex) const;
4648
};

be/src/olap/base_tablet.cpp

+11
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,15 @@ void BaseTablet::_gen_tablet_path() {
7272
}
7373
}
7474

75+
bool BaseTablet::set_tablet_schema_into_rowset_meta() {
76+
bool flag = false;
77+
for (RowsetMetaSharedPtr rowset_meta : _tablet_meta->all_mutable_rs_metas()) {
78+
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
79+
rowset_meta->set_tablet_schema(&_schema);
80+
flag = true;
81+
}
82+
}
83+
return flag;
84+
}
85+
7586
} /* namespace doris */

be/src/olap/base_tablet.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
6666
}
6767

6868
// properties encapsulated in TabletSchema
69-
const TabletSchema& tablet_schema() const;
69+
virtual const TabletSchema& tablet_schema() const;
70+
71+
bool set_tablet_schema_into_rowset_meta();
7072

7173
protected:
7274
void _gen_tablet_path();

be/src/olap/collect_iterator.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ Status CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
5555
// then merged with the base rowset.
5656
void CollectIterator::build_heap(const std::vector<RowsetReaderSharedPtr>& rs_readers) {
5757
DCHECK(rs_readers.size() == _children.size());
58-
_reverse = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS;
59-
SortType sort_type = _reader->_tablet->tablet_schema().sort_type();
60-
int sort_col_num = _reader->_tablet->tablet_schema().sort_col_num();
58+
_reverse = _reader->_tablet_schema->keys_type() == KeysType::UNIQUE_KEYS;
59+
SortType sort_type = _reader->_tablet_schema->sort_type();
60+
int sort_col_num = _reader->_tablet_schema->sort_col_num();
6161
if (_children.empty()) {
6262
_inner_iter.reset(nullptr);
6363
return;
@@ -200,7 +200,7 @@ CollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader,
200200
CollectIterator::Level0Iterator::~Level0Iterator() = default;
201201

202202
Status CollectIterator::Level0Iterator::init() {
203-
RETURN_NOT_OK_LOG(_row_cursor.init(_reader->_tablet->tablet_schema(), _reader->_seek_columns),
203+
RETURN_NOT_OK_LOG(_row_cursor.init(*_reader->_tablet_schema, _reader->_seek_columns),
204204
"failed to init row cursor");
205205
return (this->*_refresh_current_row)();
206206
}

be/src/olap/compaction.cpp

+12-7
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
#include "olap/compaction.h"
1919

20+
#include "common/status.h"
2021
#include "gutil/strings/substitute.h"
22+
#include "olap/rowset/rowset_meta.h"
23+
#include "olap/tablet.h"
2124
#include "util/time.h"
2225
#include "util/trace.h"
2326

@@ -141,8 +144,10 @@ Status Compaction::do_compaction_impl(int64_t permits) {
141144

142145
LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name()
143146
<< ", output_version=" << _output_version << ", permits: " << permits;
147+
// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
148+
const TabletSchema cur_tablet_schema = _tablet->tablet_schema();
144149

145-
RETURN_NOT_OK(construct_output_rowset_writer());
150+
RETURN_NOT_OK(construct_output_rowset_writer(&cur_tablet_schema));
146151
RETURN_NOT_OK(construct_input_rowset_readers());
147152
TRACE("prepare finished");
148153

@@ -152,11 +157,11 @@ Status Compaction::do_compaction_impl(int64_t permits) {
152157
Status res;
153158

154159
if (use_vectorized_compaction) {
155-
res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers,
156-
_output_rs_writer.get(), &stats);
160+
res = Merger::vmerge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
161+
_input_rs_readers, _output_rs_writer.get(), &stats);
157162
} else {
158-
res = Merger::merge_rowsets(_tablet, compaction_type(), _input_rs_readers,
159-
_output_rs_writer.get(), &stats);
163+
res = Merger::merge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
164+
_input_rs_readers, _output_rs_writer.get(), &stats);
160165
}
161166

162167
if (!res.ok()) {
@@ -219,8 +224,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
219224
return Status::OK();
220225
}
221226

222-
Status Compaction::construct_output_rowset_writer() {
223-
return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING,
227+
Status Compaction::construct_output_rowset_writer(const TabletSchema* schema) {
228+
return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING, schema,
224229
_oldest_write_timestamp, _newest_write_timestamp,
225230
&_output_rs_writer);
226231
}

be/src/olap/compaction.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class Compaction {
6767
Status modify_rowsets();
6868
void gc_output_rowset();
6969

70-
Status construct_output_rowset_writer();
70+
Status construct_output_rowset_writer(const TabletSchema* schema);
7171
Status construct_input_rowset_readers();
7272

7373
Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);

be/src/olap/data_dir.cpp

+14
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,11 @@ Status DataDir::load() {
485485
<< " schema hash: " << rowset_meta->tablet_schema_hash()
486486
<< " for txn: " << rowset_meta->txn_id();
487487
}
488+
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
489+
rowset_meta->set_tablet_schema(&tablet->tablet_schema());
490+
RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
491+
rowset_meta->get_rowset_pb());
492+
}
488493
} else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
489494
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
490495
Status publish_status = tablet->add_rowset(rowset);
@@ -506,6 +511,15 @@ Status DataDir::load() {
506511
++invalid_rowset_counter;
507512
}
508513
}
514+
515+
for (int64_t tablet_id : tablet_ids) {
516+
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
517+
if (tablet && tablet->set_tablet_schema_into_rowset_meta()) {
518+
TabletMetaManager::save(this, tablet->tablet_id(), tablet->schema_hash(),
519+
tablet->tablet_meta());
520+
}
521+
}
522+
509523
// At startup, we only count these invalid rowset, but do not actually delete it.
510524
// The actual delete operation is in StorageEngine::_clean_unused_rowset_metas,
511525
// which is cleaned up uniformly by the background cleanup thread.

be/src/olap/delta_writer.cpp

+20-7
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool
4040
_tablet(nullptr),
4141
_cur_rowset(nullptr),
4242
_rowset_writer(nullptr),
43-
_tablet_schema(nullptr),
43+
_tablet_schema(new TabletSchema),
4444
_delta_written_success(false),
4545
_storage_engine(storage_engine),
4646
_is_vec(is_vec) {}
@@ -121,10 +121,11 @@ Status DeltaWriter::init() {
121121
RETURN_NOT_OK(_storage_engine->txn_manager()->prepare_txn(_req.partition_id, _tablet,
122122
_req.txn_id, _req.load_id));
123123
}
124+
// build tablet schema in request level
125+
_build_current_tablet_schema(_req.index_id, _req.ptable_schema_param, _tablet->tablet_schema());
124126

125127
RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING,
126-
&_rowset_writer));
127-
_tablet_schema = &(_tablet->tablet_schema());
128+
_tablet_schema.get(), &_rowset_writer));
128129
_schema.reset(new Schema(*_tablet_schema));
129130
_reset_mem_table();
130131

@@ -172,7 +173,6 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row
172173
if (_is_cancelled) {
173174
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
174175
}
175-
176176
for (const auto& row_idx : row_idxs) {
177177
_mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0));
178178
}
@@ -266,9 +266,9 @@ Status DeltaWriter::wait_flush() {
266266
}
267267

268268
void DeltaWriter::_reset_mem_table() {
269-
_mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema, _req.slots,
270-
_req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(),
271-
_mem_tracker, _is_vec));
269+
_mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema.get(),
270+
_req.slots, _req.tuple_desc, _tablet->keys_type(),
271+
_rowset_writer.get(), _mem_tracker, _is_vec));
272272
}
273273

274274
Status DeltaWriter::close() {
@@ -367,4 +367,17 @@ int64_t DeltaWriter::partition_id() const {
367367
return _req.partition_id;
368368
}
369369

370+
void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
371+
const POlapTableSchemaParam& ptable_schema_param,
372+
const TabletSchema& ori_tablet_schema) {
373+
*_tablet_schema = ori_tablet_schema;
374+
//new tablet schame if new table
375+
if (ptable_schema_param.indexes_size() > 0 &&
376+
ptable_schema_param.indexes(0).columns_desc_size() != 0 &&
377+
ptable_schema_param.indexes(0).columns_desc(0).unique_id() >= 0) {
378+
_tablet_schema->build_current_tablet_schema(index_id, ptable_schema_param,
379+
ori_tablet_schema);
380+
}
381+
}
382+
370383
} // namespace doris

be/src/olap/delta_writer.h

+11-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ struct WriteRequest {
4747
// slots are in order of tablet's schema
4848
const std::vector<SlotDescriptor*>* slots;
4949
bool is_high_priority = false;
50+
POlapTableSchemaParam ptable_schema_param;
51+
int64_t index_id;
5052
};
5153

5254
// Writer for a particular (load, index, tablet).
@@ -107,6 +109,10 @@ class DeltaWriter {
107109

108110
void _reset_mem_table();
109111

112+
void _build_current_tablet_schema(int64_t index_id,
113+
const POlapTableSchemaParam& table_schema_param,
114+
const TabletSchema& ori_tablet_schema);
115+
110116
bool _is_init = false;
111117
bool _is_cancelled = false;
112118
WriteRequest _req;
@@ -116,7 +122,11 @@ class DeltaWriter {
116122
// TODO: Recheck the lifetime of _mem_table, Look should use unique_ptr
117123
std::shared_ptr<MemTable> _mem_table;
118124
std::unique_ptr<Schema> _schema;
119-
const TabletSchema* _tablet_schema;
125+
//const TabletSchema* _tablet_schema;
126+
// tablet schema owned by delta writer, all write will use this tablet schema
127+
// it's build from tablet_schema(stored when create tablet) and OlapTableSchema
128+
// every request will have it's own tablet schema so simple schema change can work
129+
std::unique_ptr<TabletSchema> _tablet_schema;
120130
bool _delta_written_success;
121131

122132
StorageEngine* _storage_engine;

0 commit comments

Comments
 (0)