Skip to content

Commit

Permalink
Split storage from DecodingStorageSchemaSnapshot
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <jayson.hjs@gmail.com>
  • Loading branch information
JaySon-Huang committed Aug 5, 2021
1 parent 669c65b commit 120a2e7
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ namespace DM

SSTFilesToDTFilesOutputStream::SSTFilesToDTFilesOutputStream( //
BoundedSSTFilesToBlockInputStreamPtr child_,
StorageDeltaMergePtr storage_,
const DecodingStorageSchemaSnapshot & schema_snap_,
TiDB::SnapshotApplyMethod method_,
FileConvertJobType job_type_,
TMTContext & tmt_)
: child(child_), //
: child(std::move(child_)), //
storage(std::move(storage_)),
schema_snap(schema_snap_),
method(method_),
job_type(job_type_),
Expand Down Expand Up @@ -67,7 +69,7 @@ void SSTFilesToDTFilesOutputStream::writeSuffix()
assert(!dt_file->canGC()); // The DTFile should not be able to gc until it is ingested.
// Add the DTFile to StoragePathPool so that we can restore it later
const auto bytes_written = dt_file->getBytesOnDisk();
schema_snap.storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written);
storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written);

// Report DMWriteBytes for calculating write amplification
ProfileEvents::increment(ProfileEvents::DMWriteBytes, bytes_written);
Expand Down Expand Up @@ -113,7 +115,7 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream()
}

// The parent_path and file_id are generated by the storage.
auto [parent_path, file_id] = schema_snap.storage->getStore()->preAllocateIngestFile();
auto [parent_path, file_id] = storage->getStore()->preAllocateIngestFile();
if (parent_path.empty())
{
// Can no allocate path and id for storing DTFiles (the storage may be dropped / shutdown)
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct SSTViewVec;
struct TiFlashRaftProxyHelper;
struct SSTReader;
class StorageDeltaMerge;
using StorageDeltaMergePtr = std::shared_ptr<StorageDeltaMerge>;
struct DecodingStorageSchemaSnapshot;

namespace DM
Expand All @@ -50,6 +51,7 @@ class SSTFilesToDTFilesOutputStream : private boost::noncopyable
{
public:
SSTFilesToDTFilesOutputStream(BoundedSSTFilesToBlockInputStreamPtr child_,
StorageDeltaMergePtr storage_,
const DecodingStorageSchemaSnapshot & schema_snap_,
TiDB::SnapshotApplyMethod method_,
FileConvertJobType job_type_,
Expand All @@ -73,6 +75,7 @@ class SSTFilesToDTFilesOutputStream : private boost::noncopyable

private:
BoundedSSTFilesToBlockInputStreamPtr child;
StorageDeltaMergePtr storage;
const DecodingStorageSchemaSnapshot & schema_snap;
const TiDB::SnapshotApplyMethod method;
const FileConvertJobType job_type;
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/Transaction/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
{
// Get storage schema atomically, will do schema sync if the storage does not exists.
// Will return the storage even if it is tombstoned.
const auto [table_drop_lock, schema_snap] = AtomicGetStorageSchema(new_region, tmt);
if (unlikely(schema_snap.storage == nullptr))
const auto [table_drop_lock, storage, schema_snap] = AtomicGetStorageSchema(new_region, tmt);
if (unlikely(storage == nullptr))
{
// The storage must be physically dropped, throw exception and do cleanup.
throw Exception("", ErrorCodes::TABLE_IS_DROPPED);
Expand All @@ -346,7 +346,8 @@ std::vector<UInt64> KVStore::preHandleSSTsToDTFiles(
auto sst_stream = std::make_shared<DM::SSTFilesToBlockInputStream>(
new_region, snaps, proxy_helper, schema_snap, gc_safepoint, force_decode, tmt, expected_block_size);
auto bounded_stream = std::make_shared<DM::BoundedSSTFilesToBlockInputStream>(sst_stream, ::DB::TiDBPkColumnID, schema_snap);
stream = std::make_shared<DM::SSTFilesToDTFilesOutputStream>(bounded_stream, schema_snap, snapshot_apply_method, job_type, tmt);
stream = std::make_shared<DM::SSTFilesToDTFilesOutputStream>(
bounded_stream, storage, schema_snap, snapshot_apply_method, job_type, tmt);

stream->writePrefix();
stream->write();
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/Transaction/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,11 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio
return std::make_unique<RegionPreDecodeBlockData>(std::move(res_block), schema_version, std::move(*data_list_read));
}

std::tuple<TableLockHolder, DecodingStorageSchemaSnapshot> //
std::tuple<TableLockHolder, std::shared_ptr<StorageDeltaMerge>, DecodingStorageSchemaSnapshot> //
AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt)
{
TableLockHolder drop_lock = nullptr;
std::shared_ptr<StorageDeltaMerge> dm_storage;
DecodingStorageSchemaSnapshot schema_snapshot;

auto table_id = region->getMappedTableID();
Expand All @@ -509,9 +510,8 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt)
+ "] [engine_type=" + DB::toString(static_cast<Int32>(storage->engineType())) + "]",
ErrorCodes::LOGICAL_ERROR);
}
if (auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage); dm_storage != nullptr)
if (dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage); dm_storage != nullptr)
{
schema_snapshot.storage = dm_storage;
auto store = dm_storage->getStore();
schema_snapshot.column_defines = store->getStoreColumns();
schema_snapshot.original_table_handle_define = store->getHandle();
Expand All @@ -530,7 +530,7 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt)
ErrorCodes::LOGICAL_ERROR);
}

return {drop_lock, std::move(schema_snapshot)};
return {std::move(drop_lock), std::move(dm_storage), std::move(schema_snapshot)};
}

static Block sortColumnsBySchemaSnap(Block && ori, const DM::ColumnDefines & schema)
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/Transaction/PartitionStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ struct DecodingStorageSchemaSnapshot
bool is_common_handle = false;
TiDB::TableInfo table_info;
ColumnsDescription columns;
std::shared_ptr<StorageDeltaMerge> storage = nullptr;
DM::ColumnDefinesPtr column_defines;
DM::ColumnDefine original_table_handle_define;

Expand All @@ -34,7 +33,7 @@ struct DecodingStorageSchemaSnapshot
DecodingStorageSchemaSnapshot & operator=(DecodingStorageSchemaSnapshot &&) = default;
};

std::tuple<TableLockHolder, DecodingStorageSchemaSnapshot> //
std::tuple<TableLockHolder, std::shared_ptr<StorageDeltaMerge>, DecodingStorageSchemaSnapshot> //
AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt);

Block GenRegionBlockDatawithSchema(const RegionPtr & region, //
Expand Down

0 comments on commit 120a2e7

Please sign in to comment.