Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix potential data inconsistency under heavy ddl operation #5044

Merged
merged 3 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(exception_when_read_from_log) \
M(exception_mpp_hash_build) \
M(exception_before_drop_segment) \
M(exception_after_drop_segment)
M(exception_after_drop_segment) \
M(exception_between_schema_change_in_the_same_diff)

#define APPLY_FOR_FAILPOINTS(M) \
M(skip_check_segment_update) \
Expand Down
18 changes: 17 additions & 1 deletion dbms/src/Debug/dbgFuncSchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace DB
{
namespace ErrorCodes
{
extern const int FAIL_POINT_ERROR;
extern const int UNKNOWN_TABLE;
} // namespace ErrorCodes

Expand Down Expand Up @@ -62,7 +63,22 @@ void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer
{
TMTContext & tmt = context.getTMTContext();
auto schema_syncer = tmt.getSchemaSyncer();
schema_syncer->syncSchemas(context);
try
{
schema_syncer->syncSchemas(context);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::FAIL_POINT_ERROR)
{
output(e.message());
return;
}
else
{
throw;
}
}

output("schemas refreshed");
}
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,15 @@ class IManageableStorage : public IStorage
/// when `need_block` is true, it will try return a cached block corresponding to DecodingStorageSchemaSnapshotConstPtr,
/// and `releaseDecodingBlock` need to be called when the block is free
/// when `need_block` is false, it will just return an nullptr
virtual std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(bool /* need_block */)
/// This method must be called under the protection of table structure lock
virtual std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & /* table_structure_lock */, bool /* need_block */)
{
throw Exception("Method getDecodingSchemaSnapshot is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
};

virtual void releaseDecodingBlock(Int64 /* schema_version */, BlockUPtr /* block */)
/// The `block_decoding_schema_version` is just an internal version for `DecodingStorageSchemaSnapshot`,
/// And it has no relation with the table schema version.
virtual void releaseDecodingBlock(Int64 /* block_decoding_schema_version */, BlockUPtr /* block */)
{
throw Exception("Method getDecodingSchemaSnapshot is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
Expand Down
13 changes: 8 additions & 5 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -901,14 +901,16 @@ void StorageDeltaMerge::deleteRows(const Context & context, size_t delete_rows)
LOG_FMT_ERROR(log, "Rows after delete range not match, expected: {}, got: {}", (total_rows - delete_rows), after_delete_rows);
}

std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> StorageDeltaMerge::getSchemaSnapshotAndBlockForDecoding(bool need_block)
std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> StorageDeltaMerge::getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & table_structure_lock, bool need_block)
{
(void)table_structure_lock;
std::lock_guard lock{decode_schema_mutex};
if (!decoding_schema_snapshot || decoding_schema_snapshot->schema_version < tidb_table_info.schema_version)
if (!decoding_schema_snapshot || decoding_schema_changed)
{
auto & store = getAndMaybeInitStore();
decoding_schema_snapshot = std::make_shared<DecodingStorageSchemaSnapshot>(store->getStoreColumns(), tidb_table_info, store->getHandle());
decoding_schema_snapshot = std::make_shared<DecodingStorageSchemaSnapshot>(store->getStoreColumns(), tidb_table_info, store->getHandle(), decoding_schema_version++);
cache_blocks.clear();
decoding_schema_changed = false;
}

if (need_block)
Expand All @@ -930,10 +932,10 @@ std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> StorageDeltaMerg
}
}

void StorageDeltaMerge::releaseDecodingBlock(Int64 schema_version, BlockUPtr block_ptr)
void StorageDeltaMerge::releaseDecodingBlock(Int64 block_decoding_schema_version, BlockUPtr block_ptr)
{
std::lock_guard lock{decode_schema_mutex};
if (!decoding_schema_snapshot || schema_version < decoding_schema_snapshot->schema_version)
if (!decoding_schema_snapshot || block_decoding_schema_version < decoding_schema_snapshot->decoding_schema_version)
return;
if (cache_blocks.size() >= max_cached_blocks_num)
return;
Expand Down Expand Up @@ -1113,6 +1115,7 @@ try
updateTableColumnInfo();
}
}
decoding_schema_changed = true;

SortDescription pk_desc = getPrimarySortDescription();
ColumnDefines store_columns = getStoreColumnDefines();
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ class StorageDeltaMerge

size_t getRowKeyColumnSize() const override { return rowkey_column_size; }

std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(bool /* need_block */) override;
std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(const TableStructureLockHolder & table_structure_lock, bool /* need_block */) override;

void releaseDecodingBlock(Int64 schema_version, BlockUPtr block) override;
void releaseDecodingBlock(Int64 block_decoding_schema_version, BlockUPtr block) override;

bool initStoreIfDataDirExist() override;

Expand Down Expand Up @@ -238,6 +238,11 @@ class StorageDeltaMerge

mutable std::mutex decode_schema_mutex;
DecodingStorageSchemaSnapshotPtr decoding_schema_snapshot;
// The following two members must be used under the protection of table structure lock
bool decoding_schema_changed = false;
// internal version for `decoding_schema_snapshot`
Int64 decoding_schema_version = 1;

// avoid creating block every time when decoding row
std::vector<BlockUPtr> cache_blocks;
// avoid creating too many cached blocks(the typical num should be less and equal than raft apply thread)
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/Transaction/DecodingStorageSchemaSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@ struct DecodingStorageSchemaSnapshot
bool pk_is_handle;
bool is_common_handle;
TMTPKType pk_type = TMTPKType::UNSPECIFIED;
Int64 schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION;
// an internal increasing version for `DecodingStorageSchemaSnapshot`, has no relation with the table schema version
Int64 decoding_schema_version;

DecodingStorageSchemaSnapshot(DM::ColumnDefinesPtr column_defines_, const TiDB::TableInfo & table_info_, const DM::ColumnDefine & original_handle_)
DecodingStorageSchemaSnapshot(DM::ColumnDefinesPtr column_defines_, const TiDB::TableInfo & table_info_, const DM::ColumnDefine & original_handle_, Int64 decoding_schema_version_)
: column_defines{std::move(column_defines_)}
, pk_is_handle{table_info_.pk_is_handle}
, is_common_handle{table_info_.is_common_handle}
, schema_version{table_info_.schema_version}
, decoding_schema_version{decoding_schema_version_}
{
std::unordered_map<ColumnID, size_t> column_lut;
for (size_t i = 0; i < table_info_.columns.size(); i++)
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ static void writeRegionDataToStorage(
/// Read region data as block.
Stopwatch watch;

Int64 block_schema_version = DEFAULT_UNSPECIFIED_SCHEMA_VERSION;
Int64 block_decoding_schema_version = -1;
BlockUPtr block_ptr = nullptr;
if (need_decode)
{
LOG_FMT_TRACE(log, "{} begin to decode table {}, region {}", FUNCTION_NAME, table_id, region->id());
DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot;
std::tie(decoding_schema_snapshot, block_ptr) = storage->getSchemaSnapshotAndBlockForDecoding(true);
block_schema_version = decoding_schema_snapshot->schema_version;
std::tie(decoding_schema_snapshot, block_ptr) = storage->getSchemaSnapshotAndBlockForDecoding(lock, true);
block_decoding_schema_version = decoding_schema_snapshot->decoding_schema_version;

auto reader = RegionBlockReader(decoding_schema_snapshot);
if (!reader.read(*block_ptr, data_list_read, force_decode))
Expand Down Expand Up @@ -153,7 +153,7 @@ static void writeRegionDataToStorage(
write_part_cost = watch.elapsedMilliseconds();
GET_METRIC(tiflash_raft_write_data_to_storage_duration_seconds, type_write).Observe(write_part_cost / 1000.0);
if (need_decode)
storage->releaseDecodingBlock(block_schema_version, std::move(block_ptr));
storage->releaseDecodingBlock(block_decoding_schema_version, std::move(block_ptr));

LOG_FMT_TRACE(log, "{}: table {}, region {}, cost [region decode {}, write part {}] ms", FUNCTION_NAME, table_id, region->id(), region_decode_cost, write_part_cost);
return true;
Expand Down Expand Up @@ -455,7 +455,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio
}

DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot;
std::tie(decoding_schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(false);
std::tie(decoding_schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(lock, false);
res_block = createBlockSortByColumnID(decoding_schema_snapshot);
auto reader = RegionBlockReader(decoding_schema_snapshot);
if (!reader.read(res_block, *data_list_read, force_decode))
Expand Down Expand Up @@ -508,7 +508,7 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt)
auto table_lock = storage->lockStructureForShare(getThreadName());
dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
// only dt storage engine support `getSchemaSnapshotAndBlockForDecoding`, other engine will throw exception
std::tie(schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(false);
std::tie(schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
std::tie(std::ignore, drop_lock) = std::move(table_lock).release();
return true;
};
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,11 @@ inline DecodingStorageSchemaSnapshotConstPtr getDecodingStorageSchemaSnapshot(co
if (handle_id != EXTRA_HANDLE_COLUMN_ID)
{
auto iter = std::find_if(store_columns.begin(), store_columns.end(), [&](const ColumnDefine & cd) { return cd.id == handle_id; });
return std::make_shared<DecodingStorageSchemaSnapshot>(std::make_shared<ColumnDefines>(store_columns), table_info, *iter);
return std::make_shared<DecodingStorageSchemaSnapshot>(std::make_shared<ColumnDefines>(store_columns), table_info, *iter, /* decoding_schema_version_ */ 1);
}
else
{
return std::make_shared<DecodingStorageSchemaSnapshot>(std::make_shared<ColumnDefines>(store_columns), table_info, store_columns[0]);
return std::make_shared<DecodingStorageSchemaSnapshot>(std::make_shared<ColumnDefines>(store_columns), table_info, store_columns[0], /* decoding_schema_version_ */ 1);
}
}

Expand Down
13 changes: 12 additions & 1 deletion dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ extern const char exception_before_step_2_rename_in_exchange_partition[];
extern const char exception_after_step_2_in_exchange_partition[];
extern const char exception_before_step_3_rename_in_exchange_partition[];
extern const char exception_after_step_3_in_exchange_partition[];
extern const char exception_between_schema_change_in_the_same_diff[];
} // namespace FailPoints

bool isReservedDatabase(Context & context, const String & database_name)
Expand Down Expand Up @@ -336,6 +337,7 @@ void SchemaBuilder<Getter, NameMapper>::applyAlterPhysicalTable(DBInfoPtr db_inf
FmtBuffer fmt_buf;
fmt_buf.fmtAppend("Detected schema changes: {}: ", name_mapper.debugCanonicalName(*db_info, *table_info));
for (const auto & schema_change : schema_changes)
{
for (const auto & command : schema_change.first)
{
if (command.type == AlterCommand::ADD_COLUMN)
Expand All @@ -347,6 +349,7 @@ void SchemaBuilder<Getter, NameMapper>::applyAlterPhysicalTable(DBInfoPtr db_inf
else if (command.type == AlterCommand::RENAME_COLUMN)
fmt_buf.fmtAppend("RENAME COLUMN from {} to {}, ", command.column_name, command.new_column_name);
}
}
return fmt_buf.toString();
};
LOG_DEBUG(log, log_str());
Expand All @@ -355,8 +358,16 @@ void SchemaBuilder<Getter, NameMapper>::applyAlterPhysicalTable(DBInfoPtr db_inf
// Using original table info with updated columns instead of using new_table_info directly,
// so that other changes (RENAME commands) won't be saved.
// Also, updating schema_version as altering column is structural.
for (const auto & schema_change : schema_changes)
for (size_t i = 0; i < schema_changes.size(); i++)
{
if (i > 0)
{
/// If there are multiple schema change in the same diff,
/// the table schema version will be set to the latest schema version after the first schema change is applied.
/// Throw exception in the middle of the schema change to mock the case that there is a race between data decoding and applying different schema change.
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_between_schema_change_in_the_same_diff);
}
const auto & schema_change = schema_changes[i];
/// Update column infos by applying schema change in this step.
schema_change.second(orig_table_info);
/// Update schema version aggressively for the sake of correctness.
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/TiDB/Schema/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@

namespace DB
{
namespace ErrorCodes
{
extern const int FAIL_POINT_ERROR;
};

template <bool mock_getter>
struct TiDBSchemaSyncer : public SchemaSyncer
{
Expand Down Expand Up @@ -177,6 +182,10 @@ struct TiDBSchemaSyncer : public SchemaSyncer
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::FAIL_POINT_ERROR)
{
throw;
}
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString());
return false;
Expand Down
18 changes: 17 additions & 1 deletion tests/fullstack-test2/ddl/alter_column_when_pk_is_handle.test
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,25 @@ mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.t
| 1 | world | 0.00 | 2 | NULL |
+---+-------+------+------+------+

# Need to apply a lossy type change to reorganize data. issue#3714
=> DBGInvoke __enable_schema_sync_service('false')

>> DBGInvoke __enable_fail_point(exception_between_schema_change_in_the_same_diff)

# stop decoding data
>> DBGInvoke __enable_fail_point(pause_before_apply_raft_cmd)

# Need to apply a lossy type change to reorganize data. issue#3714
mysql> alter table test.t modify c decimal(6,3)

# refresh schema and hit the `exception_between_schema_change_in_the_same_diff` failpoint
>> DBGInvoke __refresh_schemas()

>> DBGInvoke __disable_fail_point(exception_between_schema_change_in_the_same_diff)

>> DBGInvoke __disable_fail_point(pause_before_apply_raft_cmd)

=> DBGInvoke __enable_schema_sync_service('true')

mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.t
+---+-------+-------+------+------+
| a | b | c | d | e |
Expand Down