Skip to content

Commit

Permalink
[fix](move-memtable) fix schema use-after-free in delta writer v2 (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Jan 23, 2024
1 parent 06cbb96 commit 0f8e30b
Show file tree
Hide file tree
Showing 14 changed files with 34 additions and 31 deletions.
4 changes: 2 additions & 2 deletions be/src/olap/delta_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ struct WriteRequest {
TupleDescriptor* tuple_desc = nullptr;
// slots are in order of tablet's schema
const std::vector<SlotDescriptor*>* slots = nullptr;
OlapTableSchemaParam* table_schema_param = nullptr;
std::shared_ptr<OlapTableSchemaParam> table_schema_param = nullptr;
bool is_high_priority = false;
bool write_file_cache = false;
};

} // namespace doris
} // namespace doris
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Status DeltaWriterV2::init() {
if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == nullptr) {
return Status::InternalError("failed to find tablet schema for {}", _req.index_id);
}
_build_current_tablet_schema(_req.index_id, _req.table_schema_param,
_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_streams[0]->tablet_schema(_req.index_id));
RowsetWriterContext context;
context.txn_id = _req.txn_id;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ Status RowsetBuilder::init() {
RETURN_IF_ERROR(prepare_txn());

// build tablet schema in request level
_build_current_tablet_schema(_req.index_id, _req.table_schema_param, *_tablet->tablet_schema());
_build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(),
*_tablet->tablet_schema());
RowsetWriterContext context;
context.txn_id = _req.txn_id;
context.load_id = _req.load_id;
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ inline std::ostream& operator<<(std::ostream& ostr, const TabletStream& tablet_s
return ostr;
}

Status TabletStream::init(OlapTableSchemaParam* schema, int64_t index_id, int64_t partition_id) {
Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id,
int64_t partition_id) {
WriteRequest req {
.tablet_id = _id,
.txn_id = _txn_id,
Expand Down Expand Up @@ -291,7 +292,7 @@ Status IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, in
int64_t partition_id) {
tablet_stream = std::make_shared<TabletStream>(_load_id, tablet_id, _txn_id, _load_stream_mgr,
_profile);
RETURN_IF_ERROR(tablet_stream->init(_schema.get(), _id, partition_id));
RETURN_IF_ERROR(tablet_stream->init(_schema, _id, partition_id));
_tablet_streams_map[tablet_id] = tablet_stream;
return Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class TabletStream {
TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr,
RuntimeProfile* profile);

Status init(OlapTableSchemaParam* schema, int64_t index_id, int64_t partition_id);
Status init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id,
int64_t partition_id);

Status append_data(const PStreamHeader& header, butil::IOBuf* data);
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {
<< ", timeout(s): " << request.load_channel_timeout_s();
_txn_id = request.txn_id();
_index_id = request.index_id();
_schema = std::make_unique<OlapTableSchemaParam>();
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request.schema()));
_tuple_desc = _schema->tuple_desc();

Expand Down Expand Up @@ -189,7 +189,7 @@ Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para
wrequest.tuple_desc = _tuple_desc;
wrequest.slots = index_slots;
wrequest.is_high_priority = _is_high_priority;
wrequest.table_schema_param = _schema.get();
wrequest.table_schema_param = _schema;

// TODO(plat1ko): CloudDeltaWriter
auto delta_writer = std::make_unique<DeltaWriter>(*StorageEngine::instance(), &wrequest,
Expand Down Expand Up @@ -451,7 +451,7 @@ Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& req
.load_id = request.id(),
.tuple_desc = _tuple_desc,
.slots = index_slots,
.table_schema_param = _schema.get(),
.table_schema_param = _schema,
.is_high_priority = _is_high_priority,
.write_file_cache = request.write_file_cache(),
};
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class BaseTabletsChannel {
// initialized in open function
int64_t _txn_id = -1;
int64_t _index_id = -1;
std::unique_ptr<OlapTableSchemaParam> _schema;
std::shared_ptr<OlapTableSchemaParam> _schema;

TupleDescriptor* _tuple_desc = nullptr;

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
.partition_id = rows.partition_id,
.load_id = _load_id,
.tuple_desc = _output_tuple_desc,
.table_schema_param = _schema.get(),
.table_schema_param = _schema,
.is_high_priority = _is_high_priority,
.write_file_cache = _write_file_cache,
};
Expand Down
16 changes: 8 additions & 8 deletions be/test/olap/delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ TEST_F(TestDeltaWriter, open) {
DescriptorTbl* desc_tbl = nullptr;
static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
OlapTableSchemaParam param;
auto param = std::make_shared<OlapTableSchemaParam>();

PUniqueId load_id;
load_id.set_hi(0);
Expand All @@ -504,7 +504,7 @@ TEST_F(TestDeltaWriter, open) {
write_req.tuple_desc = tuple_desc;
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = true;
write_req.table_schema_param = &param;
write_req.table_schema_param = param;

// test vec delta writer
profile = std::make_unique<RuntimeProfile>("LoadChannels");
Expand Down Expand Up @@ -536,7 +536,7 @@ TEST_F(TestDeltaWriter, vec_write) {
static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
// const std::vector<SlotDescriptor*>& slots = tuple_desc->slots();
OlapTableSchemaParam param;
auto param = std::make_shared<OlapTableSchemaParam>();

PUniqueId load_id;
load_id.set_hi(0);
Expand All @@ -550,7 +550,7 @@ TEST_F(TestDeltaWriter, vec_write) {
write_req.tuple_desc = tuple_desc;
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
write_req.table_schema_param = &param;
write_req.table_schema_param = param;
profile = std::make_unique<RuntimeProfile>("LoadChannels");
auto delta_writer =
std::make_unique<DeltaWriter>(*k_engine, &write_req, profile.get(), TUniqueId {});
Expand Down Expand Up @@ -699,7 +699,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
DescriptorTbl* desc_tbl = nullptr;
static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
OlapTableSchemaParam param;
auto param = std::make_shared<OlapTableSchemaParam>();

PUniqueId load_id;
load_id.set_hi(0);
Expand All @@ -713,7 +713,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
write_req.tuple_desc = tuple_desc;
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
write_req.table_schema_param = &param;
write_req.table_schema_param = param;
profile = std::make_unique<RuntimeProfile>("LoadChannels");
auto delta_writer =
std::make_unique<DeltaWriter>(*k_engine, &write_req, profile.get(), TUniqueId {});
Expand Down Expand Up @@ -814,7 +814,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
DescriptorTbl* desc_tbl = nullptr;
static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
OlapTableSchemaParam param;
auto param = std::make_shared<OlapTableSchemaParam>();

PUniqueId load_id;
load_id.set_hi(0);
Expand All @@ -828,7 +828,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
write_req.tuple_desc = tuple_desc;
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
write_req.table_schema_param = &param;
write_req.table_schema_param = param;
std::unique_ptr<RuntimeProfile> profile1;
profile1 = std::make_unique<RuntimeProfile>("LoadChannels1");
std::unique_ptr<RuntimeProfile> profile2;
Expand Down
4 changes: 2 additions & 2 deletions be/test/olap/engine_storage_migration_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) {
DescriptorTbl* desc_tbl = nullptr;
static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
OlapTableSchemaParam param;
auto param = std::make_shared<OlapTableSchemaParam>();

PUniqueId load_id;
load_id.set_hi(0);
Expand All @@ -201,7 +201,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) {
write_req.tuple_desc = tuple_desc;
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
write_req.table_schema_param = &param;
write_req.table_schema_param = param;

profile = std::make_unique<RuntimeProfile>("LoadChannels");
auto delta_writer =
Expand Down
4 changes: 2 additions & 2 deletions be/test/olap/memtable_memory_limiter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) {
DescriptorTbl* desc_tbl = nullptr;
static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
OlapTableSchemaParam param;
auto param = std::make_shared<OlapTableSchemaParam>();

PUniqueId load_id;
load_id.set_hi(0);
Expand All @@ -142,7 +142,7 @@ TEST_F(MemTableMemoryLimiterTest, handle_memtable_flush_test) {
write_req.tuple_desc = tuple_desc;
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
write_req.table_schema_param = &param;
write_req.table_schema_param = param;
profile = std::make_unique<RuntimeProfile>("MemTableMemoryLimiterTest");
auto delta_writer =
std::make_unique<DeltaWriter>(*_engine, &write_req, profile.get(), TUniqueId {});
Expand Down
4 changes: 2 additions & 2 deletions be/test/olap/remote_rowset_gc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ TEST_F(RemoteRowsetGcTest, normal) {
DescriptorTbl* desc_tbl = nullptr;
DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
OlapTableSchemaParam param;
auto param = std::make_shared<OlapTableSchemaParam>();

PUniqueId load_id;
load_id.set_hi(0);
Expand All @@ -194,7 +194,7 @@ TEST_F(RemoteRowsetGcTest, normal) {
write_req.tuple_desc = tuple_desc;
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
write_req.table_schema_param = &param;
write_req.table_schema_param = param;
std::unique_ptr<RuntimeProfile> profile;
profile = std::make_unique<RuntimeProfile>("LoadChannels");
DeltaWriter* delta_writer = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/test/olap/tablet_cooldown_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t schema_ha
DescriptorTbl* desc_tbl = nullptr;
static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
OlapTableSchemaParam param;
auto param = std::make_shared<OlapTableSchemaParam>();

// write data
PUniqueId load_id;
Expand All @@ -339,7 +339,7 @@ void createTablet(TabletSharedPtr* tablet, int64_t replica_id, int32_t schema_ha
write_req.tuple_desc = tuple_desc;
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
write_req.table_schema_param = &param;
write_req.table_schema_param = param;

profile = std::make_unique<RuntimeProfile>("LoadChannels");
auto delta_writer =
Expand Down
6 changes: 3 additions & 3 deletions be/test/runtime/load_stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,9 @@ class LoadStreamMgrTest : public testing::Test {
id.set_hi(1);
id.set_lo(1);

OlapTableSchemaParam param;
construct_schema(&param);
*request.mutable_schema() = *param.to_protobuf();
auto param = std::make_shared<OlapTableSchemaParam>();
construct_schema(param.get());
*request.mutable_schema() = *param->to_protobuf();
*request.mutable_load_id() = id;
request.set_txn_id(NORMAL_TXN_ID);
request.set_src_id(sender_id);
Expand Down

0 comments on commit 0f8e30b

Please sign in to comment.