diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index ad7cbb65004..f5e325bc824 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -403,14 +403,14 @@ inline Block getSubBlock(const Block & block, size_t offset, size_t limit) } } -// Add an extra handle column if handle reused the original column data. -Block DeltaMergeStore::addExtraColumnIfNeed(const Context & db_context, Block && block) const +// Add an extra handle column if pkIsHandle +Block DeltaMergeStore::addExtraColumnIfNeed(const Context & db_context, const ColumnDefine & handle_define, Block && block) { - if (pkIsHandle()) + if (pkIsHandle(handle_define)) { - if (!EXTRA_HANDLE_COLUMN_INT_TYPE->equals(*original_table_handle_define.type)) + if (!EXTRA_HANDLE_COLUMN_INT_TYPE->equals(*handle_define.type)) { - auto handle_pos = getPosByColumnId(block, original_table_handle_define.id); + auto handle_pos = getPosByColumnId(block, handle_define.id); addColumnToBlock(block, // EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_NAME, @@ -423,7 +423,7 @@ Block DeltaMergeStore::addExtraColumnIfNeed(const Context & db_context, Block && { // If types are identical, `FunctionToInt64` just take reference to the original column. // We need a deep copy for the pk column or it will make trobule for later processing. - auto pk_col_with_name = getByColumnId(block, original_table_handle_define.id); + auto pk_col_with_name = getByColumnId(block, handle_define.id); auto pk_column = pk_col_with_name.column; ColumnPtr handle_column = pk_column->cloneResized(pk_column->size()); addColumnToBlock(block, // @@ -447,7 +447,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ return; auto dm_context = newDMContext(db_context, db_settings); - Block block = addExtraColumnIfNeed(db_context, std::move(to_write)); + Block block = addExtraColumnIfNeed(db_context, original_table_handle_define, std::move(to_write)); const auto bytes = block.bytes(); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index d8dfc9c5b17..4030ef3c1eb 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -301,7 +301,7 @@ class DeltaMergeStore : private boost::noncopyable // Stop all background tasks. void shutdown(); - Block addExtraColumnIfNeed(const Context & db_context, Block && block) const; + static Block addExtraColumnIfNeed(const Context & db_context, const ColumnDefine & handle_define, Block && block); void write(const Context & db_context, const DB::Settings & db_settings, Block && block); @@ -404,7 +404,7 @@ class DeltaMergeStore : private boost::noncopyable DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id=""); - bool pkIsHandle() const { return original_table_handle_define.id != EXTRA_HANDLE_COLUMN_ID; } + static bool pkIsHandle(const ColumnDefine & handle_define) { return handle_define.id != EXTRA_HANDLE_COLUMN_ID; } void waitForWrite(const DMContextPtr & context, const SegmentPtr & segment); void waitForDeleteRange(const DMContextPtr & context, const SegmentPtr & segment); diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.cpp index 4bfeb209960..5f65d9168e9 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -23,32 +24,22 @@ namespace ErrorCodes extern const int ILLFORMAT_RAFT_ROW; } // namespace ErrorCodes -Block GenRegionBlockDatawithSchema( // - const RegionPtr &, - const std::shared_ptr &, - const DM::ColumnDefinesPtr &, - Timestamp, - bool, - TMTContext &); - namespace DM { SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( // - RegionPtr region_, - const SSTViewVec & snaps_, - const TiFlashRaftProxyHelper * proxy_helper_, - SSTFilesToBlockInputStream::StorageDeltaMergePtr ingest_storage_, - DM::ColumnDefinesPtr schema_snap_, - Timestamp gc_safepoint_, - bool force_decode_, - TMTContext & tmt_, - size_t expected_size_) + RegionPtr region_, + const SSTViewVec & snaps_, + const TiFlashRaftProxyHelper * proxy_helper_, + const DecodingStorageSchemaSnapshot & schema_snap_, + Timestamp gc_safepoint_, + bool force_decode_, + TMTContext & tmt_, + size_t expected_size_) : region(std::move(region_)), snaps(snaps_), proxy_helper(proxy_helper_), - ingest_storage(std::move(ingest_storage_)), - schema_snap(std::move(schema_snap_)), + schema_snap(schema_snap_), tmt(tmt_), gc_safepoint(gc_safepoint_), expected_size(expected_size_), @@ -237,7 +228,7 @@ Block SSTFilesToBlockInputStream::readCommitedBlock() { // Read block from `region`. If the schema has been updated, it will // throw an exception with code `ErrorCodes::REGION_DATA_SCHEMA_UPDATED` - return GenRegionBlockDatawithSchema(region, ingest_storage, schema_snap, gc_safepoint, force_decode, tmt); + return GenRegionBlockDatawithSchema(region, schema_snap, gc_safepoint, force_decode, tmt); } catch (DB::Exception & e) { @@ -260,18 +251,19 @@ Block SSTFilesToBlockInputStream::readCommitedBlock() /// Methods for BoundedSSTFilesToBlockInputStream BoundedSSTFilesToBlockInputStream::BoundedSSTFilesToBlockInputStream( // - SSTFilesToBlockInputStreamPtr child, - const ColId pk_column_id_, - const bool is_common_handle_) - : pk_column_id(pk_column_id_), is_common_handle(is_common_handle_), _raw_child(std::move(child)) + SSTFilesToBlockInputStreamPtr child, + const ColId pk_column_id_, + const DecodingStorageSchemaSnapshot & schema_snap) + : pk_column_id(pk_column_id_), _raw_child(std::move(child)) { + const bool is_common_handle = schema_snap.is_common_handle; // Initlize `mvcc_compact_stream` // First refine the boundary of blocks. Note that the rows decoded from SSTFiles are sorted by primary key asc, timestamp desc // (https://github.com/tikv/tikv/blob/v5.0.1/components/txn_types/src/types.rs#L103-L108). // While DMVersionFilter require rows sorted by primary key asc, timestamp asc, so we need an extra sort in PKSquashing. auto stream = std::make_shared>(_raw_child, pk_column_id, is_common_handle); mvcc_compact_stream = std::make_unique>( - stream, *(_raw_child->schema_snap), _raw_child->gc_safepoint, is_common_handle); + stream, *(schema_snap.column_defines), _raw_child->gc_safepoint, is_common_handle); } void BoundedSSTFilesToBlockInputStream::readPrefix() @@ -289,11 +281,6 @@ Block BoundedSSTFilesToBlockInputStream::read() return mvcc_compact_stream->read(); } -std::tuple, DM::ColumnDefinesPtr> BoundedSSTFilesToBlockInputStream::ingestingInfo() const -{ - return std::make_tuple(_raw_child->ingest_storage, _raw_child->schema_snap); -} - SSTFilesToBlockInputStream::ProcessKeys BoundedSSTFilesToBlockInputStream::getProcessKeys() const { return _raw_child->process_keys; diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h index 198f46b1732..af9183c867d 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -40,21 +41,19 @@ using BoundedSSTFilesToBlockInputStreamPtr = std::shared_ptr; - SSTFilesToBlockInputStream(RegionPtr region_, - const SSTViewVec & snaps_, - const TiFlashRaftProxyHelper * proxy_helper_, - StorageDeltaMergePtr ingest_storage_, - DM::ColumnDefinesPtr schema_snap_, - Timestamp gc_safepoint_, - bool force_decode_, - TMTContext & tmt_, - size_t expected_size_ = DEFAULT_MERGE_BLOCK_SIZE); + SSTFilesToBlockInputStream(RegionPtr region_, + const SSTViewVec & snaps_, + const TiFlashRaftProxyHelper * proxy_helper_, + const DecodingStorageSchemaSnapshot & schema_snap_, + Timestamp gc_safepoint_, + bool force_decode_, + TMTContext & tmt_, + size_t expected_size_ = DEFAULT_MERGE_BLOCK_SIZE); ~SSTFilesToBlockInputStream(); String getName() const override { return "SSTFilesToBlockInputStream"; } - Block getHeader() const override { return toEmptyBlock(*schema_snap); } + Block getHeader() const override { return toEmptyBlock(*(schema_snap.column_defines)); } void readPrefix() override; void readSuffix() override; @@ -76,15 +75,14 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream Block readCommitedBlock(); private: - RegionPtr region; - const SSTViewVec & snaps; - const TiFlashRaftProxyHelper * proxy_helper{nullptr}; - const StorageDeltaMergePtr ingest_storage; - const DM::ColumnDefinesPtr schema_snap; - TMTContext & tmt; - const Timestamp gc_safepoint; - size_t expected_size; - Poco::Logger * log; + RegionPtr region; + const SSTViewVec & snaps; + const TiFlashRaftProxyHelper * proxy_helper{nullptr}; + const DecodingStorageSchemaSnapshot & schema_snap; + TMTContext & tmt; + const Timestamp gc_safepoint; + size_t expected_size; + Poco::Logger * log; using SSTReaderPtr = std::unique_ptr; SSTReaderPtr write_cf_reader; @@ -107,7 +105,9 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream class BoundedSSTFilesToBlockInputStream final { public: - BoundedSSTFilesToBlockInputStream(SSTFilesToBlockInputStreamPtr child, const ColId pk_column_id_, const bool is_common_handle_); + BoundedSSTFilesToBlockInputStream(SSTFilesToBlockInputStreamPtr child, + const ColId pk_column_id_, + const DecodingStorageSchemaSnapshot & schema_snap); String getName() const { return "BoundedSSTFilesToBlockInputStream"; } @@ -117,8 +117,6 @@ class BoundedSSTFilesToBlockInputStream final Block read(); - std::tuple, DM::ColumnDefinesPtr> ingestingInfo() const; - SSTFilesToBlockInputStream::ProcessKeys getProcessKeys() const; const RegionPtr getRegion() const; @@ -128,7 +126,6 @@ class BoundedSSTFilesToBlockInputStream final private: const ColId pk_column_id; - const bool is_common_handle; // Note that we only keep _raw_child for getting ingest info / process key, etc. All block should be // read from `mvcc_compact_stream` diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp index d40ddead241..75952cf91cf 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -31,11 +32,13 @@ namespace DM { SSTFilesToDTFilesOutputStream::SSTFilesToDTFilesOutputStream( // - BoundedSSTFilesToBlockInputStreamPtr child_, - TiDB::SnapshotApplyMethod method_, - FileConvertJobType job_type_, - TMTContext & tmt_) + BoundedSSTFilesToBlockInputStreamPtr child_, + const DecodingStorageSchemaSnapshot & schema_snap_, + TiDB::SnapshotApplyMethod method_, + FileConvertJobType job_type_, + TMTContext & tmt_) : child(child_), // + schema_snap(schema_snap_), method(method_), job_type(job_type_), tmt(tmt_), @@ -63,10 +66,8 @@ void SSTFilesToDTFilesOutputStream::writeSuffix() auto dt_file = dt_stream->getFile(); assert(!dt_file->canGC()); // The DTFile should not be able to gc until it is ingested. // Add the DTFile to StoragePathPool so that we can restore it later - auto [ingest_storage, _schema_snap] = child->ingestingInfo(); - std::ignore = _schema_snap; - const auto bytes_written = dt_file->getBytesOnDisk(); - ingest_storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); + const auto bytes_written = dt_file->getBytesOnDisk(); + schema_snap.storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); // Report DMWriteBytes for calculating write amplification ProfileEvents::increment(ProfileEvents::DMWriteBytes, bytes_written); @@ -112,8 +113,7 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream() } // The parent_path and file_id are generated by the storage. - auto [ingest_storage, schema_snap] = child->ingestingInfo(); - auto [parent_path, file_id] = ingest_storage->getStore()->preAllocateIngestFile(); + auto [parent_path, file_id] = schema_snap.storage->getStore()->preAllocateIngestFile(); if (parent_path.empty()) { // Can no allocate path and id for storing DTFiles (the storage may be dropped / shutdown) @@ -123,7 +123,7 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream() LOG_INFO(log, "Create file for snapshot data " << child->getRegion()->toString(true) << " [file=" << dt_file->path() << "] [single_file_mode=" << flags.isSingleFile() << "]"); - dt_stream = std::make_unique(tmt.getContext(), dt_file, *schema_snap, flags); + dt_stream = std::make_unique(tmt.getContext(), dt_file, *(schema_snap.column_defines), flags); dt_stream->writePrefix(); ingest_files.emplace_back(dt_file); return true; diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h index be80372cfa0..418b9e6614b 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h @@ -24,6 +24,7 @@ struct SSTViewVec; struct TiFlashRaftProxyHelper; struct SSTReader; class StorageDeltaMerge; +struct DecodingStorageSchemaSnapshot; namespace DM { @@ -48,11 +49,11 @@ enum class FileConvertJobType class SSTFilesToDTFilesOutputStream : private boost::noncopyable { public: - using StorageDeltaMergePtr = std::shared_ptr; - SSTFilesToDTFilesOutputStream(BoundedSSTFilesToBlockInputStreamPtr child_, - TiDB::SnapshotApplyMethod method_, - FileConvertJobType job_type_, - TMTContext & tmt_); + SSTFilesToDTFilesOutputStream(BoundedSSTFilesToBlockInputStreamPtr child_, + const DecodingStorageSchemaSnapshot & schema_snap_, + TiDB::SnapshotApplyMethod method_, + FileConvertJobType job_type_, + TMTContext & tmt_); ~SSTFilesToDTFilesOutputStream(); void writePrefix(); @@ -65,18 +66,18 @@ class SSTFilesToDTFilesOutputStream : private boost::noncopyable void cancel(); private: - bool newDTFileStream(); // Stop the process for decoding committed data into DTFiles void stop(); private: - BoundedSSTFilesToBlockInputStreamPtr child; - const TiDB::SnapshotApplyMethod method; - const FileConvertJobType job_type; - TMTContext & tmt; - Poco::Logger * log; + BoundedSSTFilesToBlockInputStreamPtr child; + const DecodingStorageSchemaSnapshot & schema_snap; + const TiDB::SnapshotApplyMethod method; + const FileConvertJobType job_type; + TMTContext & tmt; + Poco::Logger * log; std::unique_ptr dt_stream; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 70c1878e755..3a137c7b6ce 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -251,7 +251,8 @@ try store = reload(cols, (pk_type == DMTestEnv::PkType::CommonHandle), 1); ASSERT_EQ(store->isCommonHandle(), pk_type == DMTestEnv::PkType::CommonHandle) << DMTestEnv::PkTypeToString(pk_type); - ASSERT_EQ(store->pkIsHandle(), (pk_type == DMTestEnv::PkType::PkIsHandleInt64 || pk_type == DMTestEnv::PkType::PkIsHandleInt32)) + ASSERT_EQ(store->pkIsHandle(store->original_table_handle_define), + (pk_type == DMTestEnv::PkType::PkIsHandleInt64 || pk_type == DMTestEnv::PkType::PkIsHandleInt32)) << DMTestEnv::PkTypeToString(pk_type); const size_t nrows = 20; @@ -265,7 +266,7 @@ try handle.type, store->isCommonHandle(), store->getRowKeyColumnSize()); - block1 = store->addExtraColumnIfNeed(*db_context, std::move(block1)); + block1 = DeltaMergeStore::addExtraColumnIfNeed(*db_context, store->getHandle(), std::move(block1)); ASSERT_EQ(block1.rows(), nrows); ASSERT_TRUE(block1.has(EXTRA_HANDLE_COLUMN_NAME)); for (const auto & c : block1) @@ -282,7 +283,7 @@ try handle.type, store->isCommonHandle(), store->getRowKeyColumnSize()); - block2 = store->addExtraColumnIfNeed(*db_context, std::move(block2)); + block2 = DeltaMergeStore::addExtraColumnIfNeed(*db_context, store->getHandle(), std::move(block2)); ASSERT_EQ(block2.rows(), nrows_2); ASSERT_TRUE(block2.has(EXTRA_HANDLE_COLUMN_NAME)); for (const auto & c : block2) diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index 25e370298af..82c58c3c56f 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -24,9 +25,6 @@ namespace DB { -std::tuple, bool, DM::ColumnDefinesPtr> // -AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt); - namespace FailPoints { extern const char force_set_sst_to_dtfile_block_size[]; @@ -328,8 +326,9 @@ std::vector KVStore::preHandleSSTsToDTFiles( { // Get storage schema atomically, will do schema sync if the storage does not exists. // Will return the storage even if it is tombstoned. - auto [dm_storage, is_common_handle, schema_snap] = AtomicGetStorageSchema(new_region, tmt); - if (unlikely(dm_storage == nullptr)) + const auto [table_drop_lock, schema_snap] = AtomicGetStorageSchema(new_region, tmt); + (void)table_drop_lock; + if (unlikely(schema_snap.storage == nullptr)) { // The storage must be physically dropped, throw exception and do cleanup. throw Exception("", ErrorCodes::TABLE_IS_DROPPED); @@ -346,10 +345,10 @@ std::vector KVStore::preHandleSSTsToDTFiles( // Read from SSTs and refine the boundary of blocks output to DTFiles auto sst_stream = std::make_shared( - new_region, snaps, proxy_helper, dm_storage, schema_snap, gc_safepoint, force_decode, tmt, expected_block_size); + new_region, snaps, proxy_helper, schema_snap, gc_safepoint, force_decode, tmt, expected_block_size); auto bounded_stream - = std::make_shared(sst_stream, ::DB::TiDBPkColumnID, is_common_handle); - stream = std::make_shared(bounded_stream, snapshot_apply_method, job_type, tmt); + = std::make_shared(sst_stream, ::DB::TiDBPkColumnID, schema_snap); + stream = std::make_shared(bounded_stream, schema_snap, snapshot_apply_method, job_type, tmt); stream->writePrefix(); stream->write(); @@ -376,6 +375,7 @@ std::vector KVStore::preHandleSSTsToDTFiles( } // Update schema and try to decode again + LOG_INFO(log, "Decoding Region data meet error: " << e.displayText() << ", sync schema and try to decode again"); auto metrics = context.getTiFlashMetrics(); GET_METRIC(metrics, tiflash_schema_trigger_count, type_raft_decode).Increment(); tmt.getSchemaSyncer()->syncSchemas(context); diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index 999328e7cbd..bcf54a8fd37 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -478,12 +479,11 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio return std::make_unique(std::move(res_block), schema_version, std::move(*data_list_read)); } -std::tuple, bool, DM::ColumnDefinesPtr> // +std::tuple // AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) { - bool is_common_handle = false; - std::shared_ptr dm_storage; - DM::ColumnDefinesPtr schema_snap; + TableLockHolder drop_lock = nullptr; + DecodingStorageSchemaSnapshot schema_snapshot; auto table_id = region->getMappedTableID(); auto context = tmt.getContext(); @@ -500,18 +500,23 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) // Get a structure read lock. It will throw exception if the table has been dropped, // the caller should handle this situation. auto table_lock = storage->lockStructureForShare(getThreadName()); - is_common_handle = storage->isCommonHandle(); + schema_snapshot.is_common_handle = storage->isCommonHandle(); + schema_snapshot.table_info = storage->getTableInfo(); + schema_snapshot.columns = storage->getColumns(); if (unlikely(storage->engineType() != ::TiDB::StorageEngine::DT)) { throw Exception("Try to get storage schema with unknown storage engine [table_id=" + DB::toString(table_id) + "] [engine_type=" + DB::toString(static_cast(storage->engineType())) + "]", ErrorCodes::LOGICAL_ERROR); } - if (dm_storage = std::dynamic_pointer_cast(storage); dm_storage != nullptr) + if (auto dm_storage = std::dynamic_pointer_cast(storage); dm_storage != nullptr) { + schema_snapshot.storage = dm_storage; auto store = dm_storage->getStore(); - schema_snap = store->getStoreColumns(); + schema_snapshot.column_defines = store->getStoreColumns(); + schema_snapshot.original_table_handle_define = store->getHandle(); } + std::tie(std::ignore, drop_lock) = std::move(table_lock).release(); return true; }; @@ -524,31 +529,8 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) throw Exception("Get " + region->toString() + " belonging table " + DB::toString(table_id) + " is_command_handle fail", ErrorCodes::LOGICAL_ERROR); } - return std::make_tuple(dm_storage, is_common_handle, schema_snap); -} - -static bool needUpdateSchema(const DM::ColumnDefinesPtr & a, const DM::ColumnDefinesPtr & b) -{ - // Note that we consider `a` is not `b` and need to update schema if either of them is `nullptr` - if (unlikely(a == nullptr || b == nullptr)) - return true; - - // If the two schema is not the same, then it need to be updated. - if (a->size() != b->size()) - return true; - for (size_t i = 0; i < a->size(); ++i) - { - const auto & ca = (*a)[i]; - const auto & cb = (*b)[i]; - - bool col_ok = ca.id == cb.id; - // bool name_ok = ca.name == cb.name; - bool type_ok = ca.type->equals(*cb.type); - if (!col_ok || !type_ok) - return true; - } - return false; + return {drop_lock, std::move(schema_snapshot)}; } static Block sortColumnsBySchemaSnap(Block && ori, const DM::ColumnDefines & schema) @@ -584,9 +566,8 @@ static Block sortColumnsBySchemaSnap(Block && ori, const DM::ColumnDefines & sch /// Decode region data into block and belonging schema snapshot, remove committed data from `region` /// The return value is a block that store the committed data scanned and removed from `region`. /// The columns of returned block is sorted by `schema_snap`. -Block GenRegionBlockDatawithSchema(const RegionPtr & region, - const std::shared_ptr & dm_storage, - const DM::ColumnDefinesPtr & schema_snap, +Block GenRegionBlockDatawithSchema(const RegionPtr & region, // + const DecodingStorageSchemaSnapshot & schema_snap, Timestamp gc_safepoint, bool force_decode, TMTContext & tmt) @@ -610,27 +591,24 @@ Block GenRegionBlockDatawithSchema(const RegionPtr & region, { Stopwatch watch; - // Get a structure read lock. It will throw exception if the table has been - // dropped, the caller should handle this situation. - auto table_lock = dm_storage->lockStructureForShare(getThreadName()); // Compare schema_snap with current schema, throw exception if changed. - auto store = dm_storage->getStore(); - auto cur_schema_snap = store->getStoreColumns(); - if (needUpdateSchema(cur_schema_snap, schema_snap)) - throw Exception("", ErrorCodes::REGION_DATA_SCHEMA_UPDATED); - - auto reader = RegionBlockReader(dm_storage); + auto reader = RegionBlockReader(schema_snap.table_info, schema_snap.columns); + reader.setReorderUInt64PK(false); // DeltaTree don't need to reordered UInt64 pk auto [block, ok] = reader.read(*data_list_read, force_decode); if (unlikely(!ok)) - throw Exception("", ErrorCodes::REGION_DATA_SCHEMA_UPDATED); + throw Exception("RegionBlockReader decode error", ErrorCodes::REGION_DATA_SCHEMA_UPDATED); GET_METRIC(metrics, tiflash_raft_write_data_to_storage_duration_seconds, type_decode).Observe(watch.elapsedSeconds()); + /** TODO: If the pk of the table has a common handle, the extra column is added in + * `RegionBlockReader::read`. If the pk is handle, the extra column is added by + * `DM::DeltaMergeStore::addExtraColumnIfNeed`. We may need to do some refaction. + */ // For DeltaMergeStore, we always store an extra column with column_id = -1 - res_block = store->addExtraColumnIfNeed(context, std::move(block)); + res_block = DM::DeltaMergeStore::addExtraColumnIfNeed(context, schema_snap.original_table_handle_define, std::move(block)); } - res_block = sortColumnsBySchemaSnap(std::move(res_block), *schema_snap); + res_block = sortColumnsBySchemaSnap(std::move(res_block), *(schema_snap.column_defines)); // Remove committed data RemoveRegionCommitCache(region, *data_list_read); diff --git a/dbms/src/Storages/Transaction/PartitionStreams.h b/dbms/src/Storages/Transaction/PartitionStreams.h new file mode 100644 index 00000000000..65a90ca6e12 --- /dev/null +++ b/dbms/src/Storages/Transaction/PartitionStreams.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +class Region; +using RegionPtr = std::shared_ptr; +class StorageDeltaMerge; + +// TODO: consider refactoring the table structure related classes +// Now there are some classes in IStorage/IManageableStorage/DeltaMergeStore level are both +// related to the table structure. It make applying DDL operations and decoding Raft data +// more complicated. +struct DecodingStorageSchemaSnapshot +{ + bool is_common_handle = false; + TiDB::TableInfo table_info; + ColumnsDescription columns; + std::shared_ptr storage = nullptr; + DM::ColumnDefinesPtr column_defines; + DM::ColumnDefine original_table_handle_define; + + + DecodingStorageSchemaSnapshot() = default; + + DecodingStorageSchemaSnapshot(const DecodingStorageSchemaSnapshot &) = delete; + DecodingStorageSchemaSnapshot & operator=(const DecodingStorageSchemaSnapshot &) = delete; + + DecodingStorageSchemaSnapshot(DecodingStorageSchemaSnapshot &&) = default; + DecodingStorageSchemaSnapshot & operator=(DecodingStorageSchemaSnapshot &&) = default; +}; + +std::tuple // +AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt); + +Block GenRegionBlockDatawithSchema(const RegionPtr & region, // + const DecodingStorageSchemaSnapshot & schema_snap, + Timestamp gc_safepoint, + bool force_decode, + TMTContext & tmt); + +} // namespace DB