Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](schema change) Using tablet schema shared ptr instead of raw ptr #11475

Merged
merged 27 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool
_need_agg_finalize(need_agg_finalize),
_version(-1) {
_mem_tracker = tracker;
_tablet_schema = std::make_shared<TabletSchema>();
}

Status OlapScanner::prepare(
Expand All @@ -79,12 +80,12 @@ Status OlapScanner::prepare(
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
_tablet_schema.copy_from(*_tablet->tablet_schema());
_tablet_schema->copy_from(*_tablet->tablet_schema());
if (!_parent->_olap_scan_node.columns_desc.empty() &&
_parent->_olap_scan_node.columns_desc[0].col_unique_id >= 0) {
_tablet_schema.clear_columns();
_tablet_schema->clear_columns();
for (const auto& column_desc : _parent->_olap_scan_node.columns_desc) {
_tablet_schema.append_column(TabletColumn(column_desc));
_tablet_schema->append_column(TabletColumn(column_desc));
}
}
{
Expand Down Expand Up @@ -189,7 +190,7 @@ Status OlapScanner::_init_tablet_reader_params(
RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode));

_tablet_reader_params.tablet = _tablet;
_tablet_reader_params.tablet_schema = &_tablet_schema;
_tablet_reader_params.tablet_schema = _tablet_schema;
_tablet_reader_params.reader_type = READER_QUERY;
_tablet_reader_params.aggregation = _aggregation;
_tablet_reader_params.version = Version(0, _version);
Expand Down Expand Up @@ -234,7 +235,7 @@ Status OlapScanner::_init_tablet_reader_params(
_tablet_reader_params.return_columns.push_back(i);
}
for (auto index : _return_columns) {
if (_tablet_schema.column(index).is_key()) {
if (_tablet_schema->column(index).is_key()) {
continue;
} else {
_tablet_reader_params.return_columns.push_back(index);
Expand Down Expand Up @@ -270,31 +271,31 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) {
continue;
}
int32_t index = slot->col_unique_id() >= 0
? _tablet_schema.field_index(slot->col_unique_id())
: _tablet_schema.field_index(slot->col_name());
? _tablet_schema->field_index(slot->col_unique_id())
: _tablet_schema->field_index(slot->col_name());
if (index < 0) {
std::stringstream ss;
ss << "field name is invalid. field=" << slot->col_name();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
_return_columns.push_back(index);
if (slot->is_nullable() && !_tablet_schema.column(index).is_nullable())
if (slot->is_nullable() && !_tablet_schema->column(index).is_nullable())
_tablet_columns_convert_to_null_set.emplace(index);
_query_slots.push_back(slot);
}

// expand the sequence column
if (_tablet_schema.has_sequence_col() && need_seq_col) {
if (_tablet_schema->has_sequence_col() && need_seq_col) {
bool has_replace_col = false;
for (auto col : _return_columns) {
if (_tablet_schema.column(col).aggregation() ==
if (_tablet_schema->column(col).aggregation() ==
FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) {
has_replace_col = true;
break;
}
}
if (auto sequence_col_idx = _tablet_schema.sequence_col_idx();
if (auto sequence_col_idx = _tablet_schema->sequence_col_idx();
has_replace_col && std::find(_return_columns.begin(), _return_columns.end(),
sequence_col_idx) == _return_columns.end()) {
_return_columns.push_back(sequence_col_idx);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class OlapScanner {

MemTracker* _mem_tracker;

TabletSchema _tablet_schema;
TabletSchemaSPtr _tablet_schema;
};

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/olap/collect_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ CollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader,
CollectIterator::Level0Iterator::~Level0Iterator() = default;

Status CollectIterator::Level0Iterator::init() {
RETURN_NOT_OK_LOG(_row_cursor.init(*_reader->_tablet_schema, _reader->_seek_columns),
RETURN_NOT_OK_LOG(_row_cursor.init(_reader->_tablet_schema, _reader->_seek_columns),
"failed to init row cursor");
return (this->*_refresh_current_row)();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ Status Compaction::do_compaction_impl(int64_t permits) {
}

if (use_vectorized_compaction) {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), cur_tablet_schema.get(),
res = Merger::vmerge_rowsets(_tablet, compaction_type(), cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
} else {
res = Merger::merge_rowsets(_tablet, compaction_type(), cur_tablet_schema.get(),
res = Merger::merge_rowsets(_tablet, compaction_type(), cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ bool DeleteHandler::_parse_condition(const std::string& condition_str, TConditio
return true;
}

Status DeleteHandler::init(const TabletSchema& schema,
Status DeleteHandler::init(TabletSchemaSPtr schema,
const std::vector<DeletePredicatePB>& delete_conditions, int64_t version,
const TabletReader* reader) {
DCHECK(!_is_inited) << "reinitialize delete handler.";
Expand All @@ -258,7 +258,7 @@ Status DeleteHandler::init(const TabletSchema& schema,
return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
}

temp.del_cond->set_tablet_schema(&schema);
temp.del_cond->set_tablet_schema(schema);
for (const auto& sub_predicate : delete_condition.sub_predicates()) {
TCondition condition;
if (!_parse_condition(sub_predicate, &condition)) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/delete_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "olap/block_column_predicate.h"
#include "olap/column_predicate.h"
#include "olap/olap_define.h"
#include "olap/tablet_schema.h"

namespace doris {

Expand Down Expand Up @@ -89,7 +90,7 @@ class DeleteHandler {
// return:
// * Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_PARAMETERS): input parameters are not valid
// * Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR): alloc memory failed
Status init(const TabletSchema& schema, const std::vector<DeletePredicatePB>& delete_conditions,
Status init(TabletSchemaSPtr schema, const std::vector<DeletePredicatePB>& delete_conditions,
int64_t version, const doris::TabletReader* = nullptr);

// Return the delete conditions' size.
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Status DeltaWriter::init() {

RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING,
_tablet_schema, &_rowset_writer));
_schema.reset(new Schema(*_tablet_schema));
_schema.reset(new Schema(_tablet_schema));
_reset_mem_table();

// create flush handler
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class StorageReadOptions {
bool use_page_cache = false;
int block_row_max = 4096;

const TabletSchema* tablet_schema = nullptr;
TabletSchemaSPtr tablet_schema = nullptr;
bool record_rowids = false;
};

Expand Down
13 changes: 6 additions & 7 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
namespace doris {

Status Merger::merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
const TabletSchema* cur_tablet_schema,
TabletSchemaSPtr cur_tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, Merger::Statistics* stats_output) {
TRACE_COUNTER_SCOPE_LATENCY_US("merge_rowsets_latency_us");
Expand All @@ -47,9 +47,9 @@ Status Merger::merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,

RowCursor row_cursor;
RETURN_NOT_OK_LOG(
row_cursor.init(*cur_tablet_schema),
row_cursor.init(cur_tablet_schema),
"failed to init row cursor when merging rowsets of tablet " + tablet->full_name());
row_cursor.allocate_memory_for_string_type(*cur_tablet_schema);
row_cursor.allocate_memory_for_string_type(cur_tablet_schema);

std::unique_ptr<MemPool> mem_pool(new MemPool());

Expand Down Expand Up @@ -91,7 +91,7 @@ Status Merger::merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
}

Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
const TabletSchema* cur_tablet_schema,
TabletSchemaSPtr cur_tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, Statistics* stats_output) {
TRACE_COUNTER_SCOPE_LATENCY_US("merge_rowsets_latency_us");
Expand All @@ -108,8 +108,7 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
reader_params.record_rowids = true;
}

const auto& schema = *cur_tablet_schema;
reader_params.return_columns.resize(schema.num_columns());
reader_params.return_columns.resize(cur_tablet_schema->num_columns());
std::iota(reader_params.return_columns.begin(), reader_params.return_columns.end(), 0);
reader_params.origin_return_columns = &reader_params.return_columns;
RETURN_NOT_OK(reader.init(reader_params));
Expand All @@ -124,7 +123,7 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
}
}

vectorized::Block block = schema.create_block(reader_params.return_columns);
vectorized::Block block = cur_tablet_schema->create_block(reader_params.return_columns);
size_t output_rows = 0;
bool eof = false;
while (!eof) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ class Merger {
// return OLAP_SUCCESS and set statistics into `*stats_output`.
// return others on error
static Status merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
const TabletSchema* cur_tablet_schema,
TabletSchemaSPtr cur_tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, Statistics* stats_output);

static Status vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
const TabletSchema* cur_tablet_schema,
TabletSchemaSPtr cur_tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, Statistics* stats_output);
};
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/olap_cond.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class Conditions {
bool empty() const { return _columns.empty(); }

// TODO(yingchun): should do it in constructor
void set_tablet_schema(const TabletSchema* schema) { _schema = schema; }
void set_tablet_schema(TabletSchemaSPtr schema) { _schema = schema; }

// 如果成功,则_columns中增加一项,如果失败则无视此condition,同时输出日志
// 对于下列情况,将不会被处理
Expand All @@ -176,7 +176,7 @@ class Conditions {
}

private:
const TabletSchema* _schema = nullptr;
TabletSchemaSPtr _schema = nullptr;
// CondColumns in _index_conds are in 'AND' relationship
CondColumns _columns; // list of condition column

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur
}

// init schema
std::unique_ptr<Schema> schema(new (std::nothrow) Schema(*tablet_schema));
std::unique_ptr<Schema> schema(new (std::nothrow) Schema(tablet_schema));
if (schema == nullptr) {
LOG(WARNING) << "fail to create schema. tablet=" << cur_tablet->full_name();
res = Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
Expand Down Expand Up @@ -363,7 +363,7 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_ro
<< ", block_row_size=" << cur_tablet->num_rows_per_row_block();

// 4. Init RowCursor
if (!(res = row.init(*tablet_schema))) {
if (!(res = row.init(tablet_schema))) {
LOG(WARNING) << "fail to init rowcursor. res=" << res;
break;
}
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ Status TabletReader::_init_keys_param(const ReaderParams& read_params) {
}

Status res = _keys_param.start_keys[i].init_scan_key(
*_tablet_schema, read_params.start_key[i].values(), schema);
_tablet_schema, read_params.start_key[i].values(), schema);
if (!res.ok()) {
LOG(WARNING) << "fail to init row cursor. res = " << res;
return res;
Expand All @@ -424,7 +424,7 @@ Status TabletReader::_init_keys_param(const ReaderParams& read_params) {
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
}

Status res = _keys_param.end_keys[i].init_scan_key(*_tablet_schema,
Status res = _keys_param.end_keys[i].init_scan_key(_tablet_schema,
read_params.end_key[i].values(), schema);
if (!res.ok()) {
LOG(WARNING) << "fail to init row cursor. res = " << res;
Expand Down Expand Up @@ -613,7 +613,7 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
}

auto delete_init = [&]() -> Status {
return _delete_handler.init(*_tablet_schema, _tablet->delete_predicates(),
return _delete_handler.init(_tablet_schema, _tablet->delete_predicates(),
read_params.version.second, this);
};

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class TabletReader {
// mainly include tablet, data version and fetch range.
struct ReaderParams {
TabletSharedPtr tablet;
const TabletSchema* tablet_schema;
TabletSchemaSPtr tablet_schema;
ReaderType reader_type = READER_QUERY;
bool direct_mode = false;
bool aggregation = false;
Expand Down Expand Up @@ -186,7 +186,7 @@ class TabletReader {

TabletSharedPtr _tablet;
RowsetReaderContext _reader_context;
const TabletSchema* _tablet_schema;
TabletSchemaSPtr _tablet_schema;
KeysParam _keys_param;
std::vector<bool> _is_lower_keys_included;
std::vector<bool> _is_upper_keys_included;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/row_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ using std::vector;

namespace doris {

RowBlock::RowBlock(const TabletSchema* schema) : _capacity(0), _schema(schema) {
RowBlock::RowBlock(TabletSchemaSPtr schema) : _capacity(0), _schema(schema) {
_mem_pool.reset(new MemPool());
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/row_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class RowBlock {
friend class RowBlockChanger;

public:
RowBlock(const TabletSchema* schema);
RowBlock(TabletSchemaSPtr schema);

// 注意回收内部buffer
~RowBlock();
Expand All @@ -80,7 +80,7 @@ class RowBlock {

const uint32_t row_num() const { return _info.row_num; }
const RowBlockInfo& row_block_info() const { return _info; }
const TabletSchema& tablet_schema() const { return *_schema; }
const TabletSchemaSPtr tablet_schema() const { return _schema; }
size_t capacity() const { return _capacity; }

// Return field pointer, this pointer point to the nullbyte before the field
Expand Down Expand Up @@ -112,7 +112,7 @@ class RowBlock {

uint32_t _capacity;
RowBlockInfo _info;
const TabletSchema* _schema; // 内部保存的schema句柄
TabletSchemaSPtr _schema; // 内部保存的schema句柄

bool _null_supported;

Expand Down
Loading