Skip to content

Commit df5af07

Browse files
committed
fix bugs
1 parent c0caad1 commit df5af07

File tree

6 files changed

+46
-37
lines changed

6 files changed

+46
-37
lines changed

be/src/olap/schema_change.cpp

+15-16
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,6 @@ class MultiBlockMerger {
238238
RowRefComparator _cmp;
239239
};
240240

241-
RowBlockChanger::RowBlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl desc_tbl)
242-
: _desc_tbl(desc_tbl) {
243-
_schema_mapping.resize(tablet_schema->num_columns());
244-
}
245-
246241
RowBlockChanger::RowBlockChanger(TabletSchemaSPtr tablet_schema,
247242
const DeleteHandler* delete_handler, DescriptorTbl desc_tbl)
248243
: _desc_tbl(desc_tbl) {
@@ -1137,7 +1132,8 @@ void RowBlockMerger::_pop_heap() {
11371132
}
11381133

11391134
Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
1140-
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) {
1135+
TabletSharedPtr new_tablet,
1136+
TabletSchemaSPtr base_tablet_schema) {
11411137
// In some cases, there may be more than one type of rowset in a tablet,
11421138
// in which case the conversion cannot be done directly by linked schema change,
11431139
// but requires direct schema change to rewrite the data.
@@ -1146,7 +1142,7 @@ Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWr
11461142
<< " in base tablet " << base_tablet->tablet_id() << " is not same as type "
11471143
<< rowset_writer->type() << ", use direct schema change.";
11481144
return SchemaChangeHandler::get_sc_procedure(_row_block_changer, false, true)
1149-
->process(rowset_reader, rowset_writer, new_tablet, base_tablet);
1145+
->process(rowset_reader, rowset_writer, new_tablet, base_tablet_schema);
11501146
} else {
11511147
Status status = rowset_writer->add_rowset_for_linked_schema_change(rowset_reader->rowset());
11521148
if (!status) {
@@ -1201,7 +1197,7 @@ Status reserve_block(std::unique_ptr<RowBlock, RowBlockDeleter>* block_handle_pt
12011197

12021198
Status SchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
12031199
RowsetWriter* rowset_writer, TabletSharedPtr new_tablet,
1204-
TabletSharedPtr base_tablet) {
1200+
TabletSchemaSPtr base_tablet_schema) {
12051201
if (_row_block_allocator == nullptr) {
12061202
_row_block_allocator = new RowBlockAllocator(new_tablet->tablet_schema(), 0);
12071203
if (_row_block_allocator == nullptr) {
@@ -1271,17 +1267,21 @@ Status SchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
12711267
Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
12721268
RowsetWriter* rowset_writer,
12731269
TabletSharedPtr new_tablet,
1274-
TabletSharedPtr base_tablet) {
1270+
TabletSchemaSPtr base_tablet_schema) {
12751271
auto new_block =
12761272
std::make_unique<vectorized::Block>(new_tablet->tablet_schema()->create_block());
1277-
auto ref_block =
1278-
std::make_unique<vectorized::Block>(base_tablet->tablet_schema()->create_block());
1273+
auto ref_block = std::make_unique<vectorized::Block>(base_tablet_schema->create_block());
12791274

12801275
int origin_columns_size = ref_block->columns();
1276+
LOG(INFO) << "origin columns size: " << origin_columns_size;
1277+
LOG(INFO) << "ssref block: " << ref_block->dump_structure();
12811278

12821279
rowset_reader->next_block(ref_block.get());
12831280
while (ref_block->rows()) {
12841281
RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
1282+
LOG(INFO) << "ref block: " << ref_block->dump_structure();
1283+
LOG(INFO) << "new block: " << new_block->dump_structure();
1284+
LOG(INFO) << "new block: " << new_block->dump_structure();
12851285
RETURN_IF_ERROR(rowset_writer->add_block(new_block.get()));
12861286

12871287
new_block->clear_column_data();
@@ -1320,7 +1320,7 @@ VSchemaChangeWithSorting::VSchemaChangeWithSorting(const RowBlockChanger& row_bl
13201320
Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
13211321
RowsetWriter* rowset_writer,
13221322
TabletSharedPtr new_tablet,
1323-
TabletSharedPtr base_tablet) {
1323+
TabletSchemaSPtr base_tablet_schema) {
13241324
if (_row_block_allocator == nullptr) {
13251325
_row_block_allocator =
13261326
new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(), _memory_limitation);
@@ -1488,7 +1488,7 @@ Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_read
14881488
Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
14891489
RowsetWriter* rowset_writer,
14901490
TabletSharedPtr new_tablet,
1491-
TabletSharedPtr base_tablet) {
1491+
TabletSchemaSPtr base_tablet_schema) {
14921492
// for internal sorting
14931493
std::vector<std::unique_ptr<vectorized::Block>> blocks;
14941494

@@ -1511,8 +1511,7 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
15111511

15121512
auto new_block =
15131513
std::make_unique<vectorized::Block>(new_tablet->tablet_schema()->create_block());
1514-
auto ref_block =
1515-
std::make_unique<vectorized::Block>(base_tablet->tablet_schema()->create_block());
1514+
auto ref_block = std::make_unique<vectorized::Block>(base_tablet_schema->create_block());
15161515

15171516
int origin_columns_size = ref_block->columns();
15181517

@@ -2072,7 +2071,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
20722071
}
20732072

20742073
if (res = sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet,
2075-
sc_params.base_tablet);
2074+
base_tablet_schema);
20762075
!res) {
20772076
LOG(WARNING) << "failed to process the version."
20782077
<< " version=" << rs_reader->version().first << "-"

be/src/olap/schema_change.h

+9-18
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ class RowBlockChanger {
4141
RowBlockChanger(TabletSchemaSPtr tablet_schema, const DeleteHandler* delete_handler,
4242
DescriptorTbl desc_tbl);
4343

44-
RowBlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl desc_tbl);
45-
4644
~RowBlockChanger();
4745

4846
ColumnMapping* get_mutable_column_mapping(size_t column_index);
@@ -89,7 +87,7 @@ class SchemaChange {
8987
virtual ~SchemaChange() = default;
9088

9189
virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
92-
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) {
90+
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) {
9391
if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) {
9492
RETURN_WITH_WARN_IF_ERROR(
9593
rowset_writer->flush(),
@@ -103,7 +101,8 @@ class SchemaChange {
103101
_filtered_rows = 0;
104102
_merged_rows = 0;
105103

106-
RETURN_IF_ERROR(_inner_process(rowset_reader, rowset_writer, new_tablet, base_tablet));
104+
RETURN_IF_ERROR(
105+
_inner_process(rowset_reader, rowset_writer, new_tablet, base_tablet_schema));
107106
_add_filtered_rows(rowset_reader->filtered_rows());
108107

109108
// Check row num changes
@@ -127,7 +126,7 @@ class SchemaChange {
127126
void _add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; }
128127

129128
virtual Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
130-
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) {
129+
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) {
131130
return Status::NotSupported("inner process unsupported.");
132131
};
133132

@@ -155,7 +154,7 @@ class LinkedSchemaChange : public SchemaChange {
155154
~LinkedSchemaChange() override = default;
156155

157156
Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
158-
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
157+
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) override;
159158

160159
private:
161160
const RowBlockChanger& _row_block_changer;
@@ -172,7 +171,7 @@ class SchemaChangeDirectly : public SchemaChange {
172171

173172
private:
174173
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
175-
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
174+
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) override;
176175

177176
const RowBlockChanger& _row_block_changer;
178177
RowBlockAllocator* _row_block_allocator;
@@ -189,7 +188,7 @@ class VSchemaChangeDirectly : public SchemaChange {
189188

190189
private:
191190
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
192-
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
191+
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) override;
193192

194193
const RowBlockChanger& _changer;
195194
};
@@ -203,7 +202,7 @@ class SchemaChangeWithSorting : public SchemaChange {
203202

204203
private:
205204
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
206-
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
205+
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) override;
207206

208207
bool _internal_sorting(const std::vector<RowBlock*>& row_block_arr,
209208
const Version& temp_delta_versions, int64_t oldest_write_timestamp,
@@ -228,7 +227,7 @@ class VSchemaChangeWithSorting : public SchemaChange {
228227

229228
private:
230229
Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
231-
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
230+
TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) override;
232231

233232
Status _internal_sorting(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
234233
const Version& temp_delta_versions, int64_t oldest_write_timestamp,
@@ -276,14 +275,6 @@ class SchemaChangeHandler {
276275
static bool tablet_in_converting(int64_t tablet_id);
277276

278277
private:
279-
// Check the status of schema change and clear information between "a pair" of Schema change tables
280-
// Since A->B's schema_change information for A will be overwritten in subsequent processing (no extra cleanup here)
281-
// Returns:
282-
// Success: If there is historical information, then clear it if there is no problem; or no historical information
283-
// Failure: otherwise, if there is history information and it cannot be emptied (version has not been completed)
284-
static Status _check_and_clear_schema_change_info(TabletSharedPtr tablet,
285-
const TAlterTabletReq& request);
286-
287278
static Status _get_versions_to_be_changed(TabletSharedPtr base_tablet,
288279
std::vector<Version>* versions_to_be_changed,
289280
RowsetSharedPtr* max_rowset);

be/src/olap/tablet.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -1857,9 +1857,13 @@ Status Tablet::remove_all_remote_rowsets() {
18571857

18581858
TabletSchemaSPtr Tablet::tablet_schema() const {
18591859
std::shared_lock wrlock(_meta_lock);
1860+
LOG(INFO) << "tablet meta num columns: " << BaseTablet::tablet_schema()->num_columns();
18601861
if (UNLIKELY(_tablet_meta->all_rs_metas().empty())) {
18611862
return BaseTablet::tablet_schema();
18621863
}
1864+
for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
1865+
LOG(INFO) << "rs meta num columns: " << rs_meta->tablet_schema()->num_columns();
1866+
}
18631867
const RowsetMetaSharedPtr rowset_meta =
18641868
rowset_meta_with_max_schema_version(_tablet_meta->all_rs_metas());
18651869
return rowset_meta->tablet_schema();

be/src/olap/tablet_schema.cpp

+14-1
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,16 @@ void TabletSchema::merge_dropped_columns(TabletSchemaSPtr src_schema) {
612612
}
613613
}
614614

615+
// Dropped column is in _field_id_to_index but not in _field_name_to_index
616+
// Could refer to append_column method
617+
bool TabletSchema::is_dropped_column(TabletColumn& col) {
618+
CHECK(_field_id_to_index.find(col.unique_id()) != _field_id_to_index.end())
619+
<< "could not find col with unique id = " << col.unique_id()
620+
<< " and name = " << col.name();
621+
return _field_name_to_index.find(col.name()) == _field_name_to_index.end() ||
622+
column(col.name()).unique_id() != col.unique_id();
623+
}
624+
615625
void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const {
616626
tablet_schema_pb->set_keys_type(_keys_type);
617627
for (auto& col : _cols) {
@@ -702,9 +712,12 @@ vectorized::Block TabletSchema::create_block(
702712
return block;
703713
}
704714

705-
vectorized::Block TabletSchema::create_block() const {
715+
vectorized::Block TabletSchema::create_block(bool ignore_dropped_col) const {
706716
vectorized::Block block;
707717
for (const auto& col : _cols) {
718+
if (ignore_dropped_col && is_dropped_column(col)) {
719+
continue;
720+
}
708721
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(col);
709722
block.insert({data_type->create_column(), data_type, col.name()});
710723
}

be/src/olap/tablet_schema.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ class TabletSchema {
166166
vectorized::Block create_block(
167167
const std::vector<uint32_t>& return_columns,
168168
const std::unordered_set<uint32_t>* tablet_columns_need_convert_null = nullptr) const;
169-
vectorized::Block create_block() const;
169+
vectorized::Block create_block(bool ignore_dropped_col = true) const;
170170

171171
void build_current_tablet_schema(int64_t index_id, int32_t version,
172172
const POlapTableIndexSchema& index,
@@ -185,6 +185,8 @@ class TabletSchema {
185185
// Because they have same name, so that the dropped column should not be added to the map, only with unique id.
186186
void merge_dropped_columns(std::shared_ptr<TabletSchema> src_schema);
187187

188+
void is_dropped_column(TabletColumn& col);
189+
188190
private:
189191
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
190192
friend bool operator!=(const TabletSchema& a, const TabletSchema& b);

fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
443443
long originIdxId = indexIdMap.get(shadowIdxId);
444444
int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
445445
int originSchemaHash = tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));
446-
List<Column> originSchemaColumns = tbl.getSchemaByIndexId(originIdxId);
446+
List<Column> originSchemaColumns = tbl.getSchemaByIndexId(originIdxId, true);
447447
for (Tablet shadowTablet : shadowIdx.getTablets()) {
448448
long shadowTabletId = shadowTablet.getId();
449449
long originTabletId = partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId);

0 commit comments

Comments
 (0)