From 120a2e708ff2bfc793526d6ea5343e8f13b3cfd6 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 5 Aug 2021 17:01:37 +0800 Subject: [PATCH] Split storage from DecodingStorageSchemaSnapshot Signed-off-by: JaySon-Huang --- .../Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp | 8 +++++--- .../Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h | 3 +++ dbms/src/Storages/Transaction/ApplySnapshot.cpp | 7 ++++--- dbms/src/Storages/Transaction/PartitionStreams.cpp | 8 ++++---- dbms/src/Storages/Transaction/PartitionStreams.h | 3 +-- 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp index 75952cf91cf..93226690c69 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp @@ -33,11 +33,13 @@ namespace DM SSTFilesToDTFilesOutputStream::SSTFilesToDTFilesOutputStream( // BoundedSSTFilesToBlockInputStreamPtr child_, + StorageDeltaMergePtr storage_, const DecodingStorageSchemaSnapshot & schema_snap_, TiDB::SnapshotApplyMethod method_, FileConvertJobType job_type_, TMTContext & tmt_) - : child(child_), // + : child(std::move(child_)), // + storage(std::move(storage_)), schema_snap(schema_snap_), method(method_), job_type(job_type_), @@ -67,7 +69,7 @@ void SSTFilesToDTFilesOutputStream::writeSuffix() assert(!dt_file->canGC()); // The DTFile should not be able to gc until it is ingested. // Add the DTFile to StoragePathPool so that we can restore it later const auto bytes_written = dt_file->getBytesOnDisk(); - schema_snap.storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); + storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); // Report DMWriteBytes for calculating write amplification ProfileEvents::increment(ProfileEvents::DMWriteBytes, bytes_written); @@ -113,7 +115,7 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream() } // The parent_path and file_id are generated by the storage. - auto [parent_path, file_id] = schema_snap.storage->getStore()->preAllocateIngestFile(); + auto [parent_path, file_id] = storage->getStore()->preAllocateIngestFile(); if (parent_path.empty()) { // Can no allocate path and id for storing DTFiles (the storage may be dropped / shutdown) diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h index 418b9e6614b..7b80b973bac 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h @@ -24,6 +24,7 @@ struct SSTViewVec; struct TiFlashRaftProxyHelper; struct SSTReader; class StorageDeltaMerge; +using StorageDeltaMergePtr = std::shared_ptr; struct DecodingStorageSchemaSnapshot; namespace DM @@ -50,6 +51,7 @@ class SSTFilesToDTFilesOutputStream : private boost::noncopyable { public: SSTFilesToDTFilesOutputStream(BoundedSSTFilesToBlockInputStreamPtr child_, + StorageDeltaMergePtr storage_, const DecodingStorageSchemaSnapshot & schema_snap_, TiDB::SnapshotApplyMethod method_, FileConvertJobType job_type_, @@ -73,6 +75,7 @@ class SSTFilesToDTFilesOutputStream : private boost::noncopyable private: BoundedSSTFilesToBlockInputStreamPtr child; + StorageDeltaMergePtr storage; const DecodingStorageSchemaSnapshot & schema_snap; const TiDB::SnapshotApplyMethod method; const FileConvertJobType job_type; diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index b1dd7268635..9f3b752ebd5 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -326,8 +326,8 @@ std::vector KVStore::preHandleSSTsToDTFiles( { // Get storage schema atomically, will do schema sync if the storage does not exists. // Will return the storage even if it is tombstoned. - const auto [table_drop_lock, schema_snap] = AtomicGetStorageSchema(new_region, tmt); - if (unlikely(schema_snap.storage == nullptr)) + const auto [table_drop_lock, storage, schema_snap] = AtomicGetStorageSchema(new_region, tmt); + if (unlikely(storage == nullptr)) { // The storage must be physically dropped, throw exception and do cleanup. throw Exception("", ErrorCodes::TABLE_IS_DROPPED); @@ -346,7 +346,8 @@ std::vector KVStore::preHandleSSTsToDTFiles( auto sst_stream = std::make_shared( new_region, snaps, proxy_helper, schema_snap, gc_safepoint, force_decode, tmt, expected_block_size); auto bounded_stream = std::make_shared(sst_stream, ::DB::TiDBPkColumnID, schema_snap); - stream = std::make_shared(bounded_stream, schema_snap, snapshot_apply_method, job_type, tmt); + stream = std::make_shared( + bounded_stream, storage, schema_snap, snapshot_apply_method, job_type, tmt); stream->writePrefix(); stream->write(); diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index bcf54a8fd37..ebb199f0bed 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -479,10 +479,11 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio return std::make_unique(std::move(res_block), schema_version, std::move(*data_list_read)); } -std::tuple // +std::tuple, DecodingStorageSchemaSnapshot> // AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) { TableLockHolder drop_lock = nullptr; + std::shared_ptr dm_storage; DecodingStorageSchemaSnapshot schema_snapshot; auto table_id = region->getMappedTableID(); @@ -509,9 +510,8 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) + "] [engine_type=" + DB::toString(static_cast(storage->engineType())) + "]", ErrorCodes::LOGICAL_ERROR); } - if (auto dm_storage = std::dynamic_pointer_cast(storage); dm_storage != nullptr) + if (dm_storage = std::dynamic_pointer_cast(storage); dm_storage != nullptr) { - schema_snapshot.storage = dm_storage; auto store = dm_storage->getStore(); schema_snapshot.column_defines = store->getStoreColumns(); schema_snapshot.original_table_handle_define = store->getHandle(); @@ -530,7 +530,7 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) ErrorCodes::LOGICAL_ERROR); } - return {drop_lock, std::move(schema_snapshot)}; + return {std::move(drop_lock), std::move(dm_storage), std::move(schema_snapshot)}; } static Block sortColumnsBySchemaSnap(Block && ori, const DM::ColumnDefines & schema) diff --git a/dbms/src/Storages/Transaction/PartitionStreams.h b/dbms/src/Storages/Transaction/PartitionStreams.h index 65a90ca6e12..6bb9bf1333d 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.h +++ b/dbms/src/Storages/Transaction/PartitionStreams.h @@ -20,7 +20,6 @@ struct DecodingStorageSchemaSnapshot bool is_common_handle = false; TiDB::TableInfo table_info; ColumnsDescription columns; - std::shared_ptr storage = nullptr; DM::ColumnDefinesPtr column_defines; DM::ColumnDefine original_table_handle_define; @@ -34,7 +33,7 @@ struct DecodingStorageSchemaSnapshot DecodingStorageSchemaSnapshot & operator=(DecodingStorageSchemaSnapshot &&) = default; }; -std::tuple // +std::tuple, DecodingStorageSchemaSnapshot> // AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt); Block GenRegionBlockDatawithSchema(const RegionPtr & region, //