Skip to content

Commit f918a23

Browse files
committed
[improvement](light-schema-change) Support tablet schema cache
1 parent 388db05 commit f918a23

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+481
-342
lines changed

be/src/exec/olap_scanner.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ Status OlapScanner::prepare(
8383
LOG(WARNING) << ss.str();
8484
return Status::InternalError(ss.str());
8585
}
86-
_tablet_schema = _tablet->tablet_schema();
86+
_tablet_schema.copy_from(*_tablet->tablet_schema());
8787
if (!_parent->_olap_scan_node.columns_desc.empty() &&
8888
_parent->_olap_scan_node.columns_desc[0].col_unique_id >= 0) {
8989
_tablet_schema.clear_columns();
@@ -289,7 +289,7 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) {
289289
}
290290

291291
// expand the sequence column
292-
if (_tablet->tablet_schema().has_sequence_col() && need_seq_col) {
292+
if (_tablet_schema.has_sequence_col() && need_seq_col) {
293293
bool has_replace_col = false;
294294
for (auto col : _return_columns) {
295295
if (_tablet_schema.column(col).aggregation() ==

be/src/olap/base_tablet.cpp

+5-6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "gutil/strings/substitute.h"
2121
#include "olap/data_dir.h"
22+
#include "olap/tablet_schema_cache.h"
2223
#include "util/doris_metrics.h"
2324
#include "util/path_util.h"
2425

@@ -29,10 +30,8 @@ extern MetricPrototype METRIC_query_scan_rows;
2930
extern MetricPrototype METRIC_query_scan_count;
3031

3132
BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
32-
: _state(tablet_meta->tablet_state()),
33-
_tablet_meta(tablet_meta),
34-
_schema(tablet_meta->tablet_schema()),
35-
_data_dir(data_dir) {
33+
: _state(tablet_meta->tablet_state()), _tablet_meta(tablet_meta), _data_dir(data_dir) {
34+
_schema = TabletSchemaCache::instance()->insert(_tablet_meta->tablet_schema().to_key());
3635
_gen_tablet_path();
3736

3837
std::stringstream ss;
@@ -72,8 +71,8 @@ void BaseTablet::_gen_tablet_path() {
7271
bool BaseTablet::set_tablet_schema_into_rowset_meta() {
7372
bool flag = false;
7473
for (RowsetMetaSharedPtr rowset_meta : _tablet_meta->all_mutable_rs_metas()) {
75-
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
76-
rowset_meta->set_tablet_schema(&_schema);
74+
if (!rowset_meta->tablet_schema()) {
75+
rowset_meta->set_tablet_schema(_schema);
7776
flag = true;
7877
}
7978
}

be/src/olap/base_tablet.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include "olap/olap_define.h"
2424
#include "olap/tablet_meta.h"
25+
#include "olap/tablet_schema.h"
2526
#include "olap/utils.h"
2627
#include "util/metrics.h"
2728

@@ -64,7 +65,7 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
6465
void set_storage_policy(const std::string& policy) { _tablet_meta->set_storage_policy(policy); }
6566

6667
// properties encapsulated in TabletSchema
67-
virtual const TabletSchema& tablet_schema() const;
68+
virtual TabletSchemaSPtr tablet_schema() const;
6869

6970
bool set_tablet_schema_into_rowset_meta();
7071

@@ -74,7 +75,7 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
7475
protected:
7576
TabletState _state;
7677
TabletMetaSharedPtr _tablet_meta;
77-
const TabletSchema& _schema;
78+
TabletSchemaSPtr _schema;
7879

7980
DataDir* _data_dir;
8081
std::string _tablet_path;
@@ -145,7 +146,7 @@ inline bool BaseTablet::equal(int64_t id, int32_t hash) {
145146
return (tablet_id() == id) && (schema_hash() == hash);
146147
}
147148

148-
inline const TabletSchema& BaseTablet::tablet_schema() const {
149+
inline TabletSchemaSPtr BaseTablet::tablet_schema() const {
149150
return _schema;
150151
}
151152

be/src/olap/compaction.cpp

+10-5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "common/status.h"
2121
#include "gutil/strings/substitute.h"
22+
#include "olap/rowset/rowset.h"
2223
#include "olap/rowset/rowset_meta.h"
2324
#include "olap/tablet.h"
2425
#include "util/time.h"
@@ -150,9 +151,13 @@ Status Compaction::do_compaction_impl(int64_t permits) {
150151
LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name()
151152
<< ", output_version=" << _output_version << ", permits: " << permits;
152153
// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
153-
const TabletSchema cur_tablet_schema = _tablet->tablet_schema();
154+
std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
155+
std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(),
156+
[](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
157+
TabletSchemaSPtr cur_tablet_schema =
158+
_tablet->rowset_meta_with_max_schema_version(rowset_metas)->tablet_schema();
154159

155-
RETURN_NOT_OK(construct_output_rowset_writer(&cur_tablet_schema));
160+
RETURN_NOT_OK(construct_output_rowset_writer(cur_tablet_schema));
156161
RETURN_NOT_OK(construct_input_rowset_readers());
157162
TRACE("prepare finished");
158163

@@ -166,10 +171,10 @@ Status Compaction::do_compaction_impl(int64_t permits) {
166171
}
167172

168173
if (use_vectorized_compaction) {
169-
res = Merger::vmerge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
174+
res = Merger::vmerge_rowsets(_tablet, compaction_type(), cur_tablet_schema.get(),
170175
_input_rs_readers, _output_rs_writer.get(), &stats);
171176
} else {
172-
res = Merger::merge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
177+
res = Merger::merge_rowsets(_tablet, compaction_type(), cur_tablet_schema.get(),
173178
_input_rs_readers, _output_rs_writer.get(), &stats);
174179
}
175180

@@ -233,7 +238,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
233238
return Status::OK();
234239
}
235240

236-
Status Compaction::construct_output_rowset_writer(const TabletSchema* schema) {
241+
Status Compaction::construct_output_rowset_writer(TabletSchemaSPtr schema) {
237242
return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING, schema,
238243
_oldest_write_timestamp, _newest_write_timestamp,
239244
&_output_rs_writer);

be/src/olap/compaction.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class Compaction {
6565
Status modify_rowsets();
6666
void gc_output_rowset();
6767

68-
Status construct_output_rowset_writer(const TabletSchema* schema);
68+
Status construct_output_rowset_writer(TabletSchemaSPtr schema);
6969
Status construct_input_rowset_readers();
7070

7171
Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);

be/src/olap/data_dir.cpp

+5-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "olap/data_dir.h"
1919

2020
#include <ctype.h>
21+
#include <gen_cpp/olap_file.pb.h>
2122
#include <mntent.h>
2223
#include <stdio.h>
2324
#include <sys/file.h>
@@ -475,8 +476,8 @@ Status DataDir::load() {
475476
}
476477
if (rowset_meta->rowset_state() == RowsetStatePB::COMMITTED &&
477478
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
478-
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
479-
rowset_meta->set_tablet_schema(&tablet->tablet_schema());
479+
if (!rowset_meta->tablet_schema()) {
480+
rowset_meta->set_tablet_schema(tablet->tablet_schema());
480481
RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
481482
rowset_meta->get_rowset_pb());
482483
}
@@ -498,8 +499,8 @@ Status DataDir::load() {
498499
}
499500
} else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
500501
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
501-
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
502-
rowset_meta->set_tablet_schema(&tablet->tablet_schema());
502+
if (!rowset_meta->tablet_schema()) {
503+
rowset_meta->set_tablet_schema(tablet->tablet_schema());
503504
RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
504505
rowset_meta->get_rowset_pb());
505506
}

be/src/olap/delta_writer.cpp

+4-3
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,11 @@ Status DeltaWriter::init() {
123123
_req.txn_id, _req.load_id));
124124
}
125125
// build tablet schema in request level
126-
_build_current_tablet_schema(_req.index_id, _req.ptable_schema_param, _tablet->tablet_schema());
126+
_build_current_tablet_schema(_req.index_id, _req.ptable_schema_param,
127+
*_tablet->tablet_schema());
127128

128129
RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING,
129-
_tablet_schema.get(), &_rowset_writer));
130+
_tablet_schema, &_rowset_writer));
130131
_schema.reset(new Schema(*_tablet_schema));
131132
_reset_mem_table();
132133

@@ -379,7 +380,7 @@ int64_t DeltaWriter::partition_id() const {
379380
void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
380381
const POlapTableSchemaParam& ptable_schema_param,
381382
const TabletSchema& ori_tablet_schema) {
382-
*_tablet_schema = ori_tablet_schema;
383+
_tablet_schema->copy_from(ori_tablet_schema);
383384
//new tablet schame if new table
384385
if (ptable_schema_param.indexes_size() > 0 &&
385386
ptable_schema_param.indexes(0).columns_desc_size() != 0 &&

be/src/olap/delta_writer.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class DeltaWriter {
128128
// tablet schema owned by delta writer, all write will use this tablet schema
129129
// it's build from tablet_schema(stored when create tablet) and OlapTableSchema
130130
// every request will have it's own tablet schema so simple schema change can work
131-
std::unique_ptr<TabletSchema> _tablet_schema;
131+
TabletSchemaSPtr _tablet_schema;
132132
bool _delta_written_success;
133133

134134
StorageEngine* _storage_engine;

be/src/olap/push_handler.cpp

+13-11
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "olap/schema_change.h"
3232
#include "olap/storage_engine.h"
3333
#include "olap/tablet.h"
34+
#include "olap/tablet_schema.h"
3435
#include "runtime/exec_env.h"
3536

3637
namespace doris {
@@ -115,7 +116,8 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
115116
}
116117

117118
DeletePredicatePB del_pred;
118-
auto tablet_schema = tablet_var.tablet->tablet_schema();
119+
TabletSchema tablet_schema;
120+
tablet_schema.copy_from(*tablet_var.tablet->tablet_schema());
119121
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
120122
tablet_schema.clear_columns();
121123
for (const auto& column_desc : request.columns_desc) {
@@ -141,25 +143,25 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
141143
<< ". tablet: " << tablet_vars->at(0).tablet->full_name();
142144
return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_VERSION);
143145
}
144-
145-
auto tablet_schema = tablet_vars->at(0).tablet->tablet_schema();
146+
auto tablet_schema = std::make_shared<TabletSchema>();
147+
tablet_schema->copy_from(*tablet_vars->at(0).tablet->tablet_schema());
146148
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
147-
tablet_schema.clear_columns();
149+
tablet_schema->clear_columns();
148150
for (const auto& column_desc : request.columns_desc) {
149-
tablet_schema.append_column(TabletColumn(column_desc));
151+
tablet_schema->append_column(TabletColumn(column_desc));
150152
}
151153
}
152154

153155
// writes
154156
if (push_type == PUSH_NORMAL_V2) {
155157
res = _convert_v2(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
156158
&(tablet_vars->at(0).rowset_to_add), &(tablet_vars->at(1).rowset_to_add),
157-
&tablet_schema);
159+
tablet_schema);
158160

159161
} else {
160162
res = _convert(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
161163
&(tablet_vars->at(0).rowset_to_add), &(tablet_vars->at(1).rowset_to_add),
162-
&tablet_schema);
164+
tablet_schema);
163165
}
164166
if (!res.ok()) {
165167
LOG(WARNING) << "fail to convert tmp file when realtime push. res=" << res
@@ -219,7 +221,7 @@ void PushHandler::_get_tablet_infos(const std::vector<TabletVars>& tablet_vars,
219221

220222
Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet,
221223
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
222-
const TabletSchema* tablet_schema) {
224+
TabletSchemaSPtr tablet_schema) {
223225
Status res = Status::OK();
224226
uint32_t num_rows = 0;
225227
PUniqueId load_id;
@@ -344,7 +346,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_
344346

345347
Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet,
346348
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
347-
const TabletSchema* tablet_schema) {
349+
TabletSchemaSPtr tablet_schema) {
348350
Status res = Status::OK();
349351
RowCursor row;
350352
BinaryFile raw_file;
@@ -515,7 +517,7 @@ IBinaryReader* IBinaryReader::create(bool need_decompress) {
515517

516518
BinaryReader::BinaryReader() : _row_buf(nullptr), _row_buf_size(0) {}
517519

518-
Status BinaryReader::init(const TabletSchema* tablet_schema, BinaryFile* file) {
520+
Status BinaryReader::init(TabletSchemaSPtr tablet_schema, BinaryFile* file) {
519521
Status res = Status::OK();
520522

521523
do {
@@ -657,7 +659,7 @@ LzoBinaryReader::LzoBinaryReader()
657659
_row_num(0),
658660
_next_row_start(0) {}
659661

660-
Status LzoBinaryReader::init(const TabletSchema* tablet_schema, BinaryFile* file) {
662+
Status LzoBinaryReader::init(TabletSchemaSPtr tablet_schema, BinaryFile* file) {
661663
Status res = Status::OK();
662664

663665
do {

be/src/olap/push_handler.h

+7-6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "olap/olap_common.h"
3131
#include "olap/row_cursor.h"
3232
#include "olap/rowset/rowset.h"
33+
#include "olap/tablet_schema.h"
3334

3435
namespace doris {
3536

@@ -61,12 +62,12 @@ class PushHandler {
6162
private:
6263
Status _convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet_vec,
6364
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
64-
const TabletSchema* tablet_schema);
65+
TabletSchemaSPtr tablet_schema);
6566
// Convert local data file to internal formatted delta,
6667
// return new delta's SegmentGroup
6768
Status _convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet_vec,
6869
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
69-
const TabletSchema* tablet_schema);
70+
TabletSchemaSPtr tablet_schema);
7071

7172
// Only for debug
7273
std::string _debug_version_list(const Versions& versions) const;
@@ -114,7 +115,7 @@ class IBinaryReader {
114115
static IBinaryReader* create(bool need_decompress);
115116
virtual ~IBinaryReader() = default;
116117

117-
virtual Status init(const TabletSchema* tablet_schema, BinaryFile* file) = 0;
118+
virtual Status init(TabletSchemaSPtr tablet_schema, BinaryFile* file) = 0;
118119
virtual Status finalize() = 0;
119120

120121
virtual Status next(RowCursor* row) = 0;
@@ -133,7 +134,7 @@ class IBinaryReader {
133134
_ready(false) {}
134135

135136
BinaryFile* _file;
136-
const TabletSchema* _tablet_schema;
137+
TabletSchemaSPtr _tablet_schema;
137138
size_t _content_len;
138139
size_t _curr;
139140
uint32_t _adler_checksum;
@@ -146,7 +147,7 @@ class BinaryReader : public IBinaryReader {
146147
explicit BinaryReader();
147148
~BinaryReader() override { finalize(); }
148149

149-
Status init(const TabletSchema* tablet_schema, BinaryFile* file) override;
150+
Status init(TabletSchemaSPtr tablet_schema, BinaryFile* file) override;
150151
Status finalize() override;
151152

152153
Status next(RowCursor* row) override;
@@ -163,7 +164,7 @@ class LzoBinaryReader : public IBinaryReader {
163164
explicit LzoBinaryReader();
164165
~LzoBinaryReader() override { finalize(); }
165166

166-
Status init(const TabletSchema* tablet_schema, BinaryFile* file) override;
167+
Status init(TabletSchemaSPtr tablet_schema, BinaryFile* file) override;
167168
Status finalize() override;
168169

169170
Status next(RowCursor* row) override;

be/src/olap/rowset/beta_rowset.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "io/fs/s3_file_system.h"
2929
#include "olap/olap_define.h"
3030
#include "olap/rowset/beta_rowset_reader.h"
31+
#include "olap/tablet_schema.h"
3132
#include "olap/utils.h"
3233
#include "util/doris_metrics.h"
3334

@@ -59,7 +60,7 @@ std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId& r
5960
segment_id);
6061
}
6162

62-
BetaRowset::BetaRowset(const TabletSchema* schema, const std::string& tablet_path,
63+
BetaRowset::BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path,
6364
RowsetMetaSharedPtr rowset_meta)
6465
: Rowset(schema, tablet_path, std::move(rowset_meta)) {}
6566

be/src/olap/rowset/beta_rowset.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class BetaRowset : public Rowset {
7777
Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments);
7878

7979
protected:
80-
BetaRowset(const TabletSchema* schema, const std::string& tablet_path,
80+
BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path,
8181
RowsetMetaSharedPtr rowset_meta);
8282

8383
// init segment groups

be/src/olap/rowset/rowset.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
#include "olap/rowset/rowset.h"
1919

20+
#include "olap/tablet_schema.h"
21+
#include "olap/tablet_schema_cache.h"
2022
#include "util/time.h"
2123

2224
namespace doris {
2325

24-
Rowset::Rowset(const TabletSchema* schema, const std::string& tablet_path,
26+
Rowset::Rowset(TabletSchemaSPtr schema, const std::string& tablet_path,
2527
RowsetMetaSharedPtr rowset_meta)
2628
: _tablet_path(tablet_path), _rowset_meta(std::move(rowset_meta)), _refs_by_reader(0) {
2729
_is_pending = !_rowset_meta->has_version();
@@ -32,7 +34,7 @@ Rowset::Rowset(const TabletSchema* schema, const std::string& tablet_path,
3234
_is_cumulative = version.first != version.second;
3335
}
3436
// build schema from RowsetMeta.tablet_schema or Tablet.tablet_schema
35-
_schema = _rowset_meta->tablet_schema() != nullptr ? _rowset_meta->tablet_schema() : schema;
37+
_schema = _rowset_meta->tablet_schema() ? _rowset_meta->tablet_schema() : schema;
3638
}
3739

3840
Status Rowset::load(bool use_cache) {

0 commit comments

Comments
 (0)