Skip to content

Commit

Permalink
separate PageIdGenerator from StoragePool (#4154)
Browse files Browse the repository at this point in the history
ref #3594
  • Loading branch information
lidezhu authored Mar 1, 2022
1 parent 8a0d000 commit 4954a24
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 83 deletions.
20 changes: 11 additions & 9 deletions dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,31 +218,31 @@ int benchEntry(const std::vector<std::string> & opts)
std::cerr << "invalid dtfile version: " << version << std::endl;
return -EINVAL;
}
auto algorithm_ = vm["algorithm"].as<std::string>();
auto algorithm_config = vm["algorithm"].as<std::string>();
DB::ChecksumAlgo algorithm;
if (algorithm_ == "xxh3")
if (algorithm_config == "xxh3")
{
algorithm = DB::ChecksumAlgo::XXH3;
}
else if (algorithm_ == "crc32")
else if (algorithm_config == "crc32")
{
algorithm = DB::ChecksumAlgo::CRC32;
}
else if (algorithm_ == "crc64")
else if (algorithm_config == "crc64")
{
algorithm = DB::ChecksumAlgo::CRC64;
}
else if (algorithm_ == "city128")
else if (algorithm_config == "city128")
{
algorithm = DB::ChecksumAlgo::City128;
}
else if (algorithm_ == "none")
else if (algorithm_config == "none")
{
algorithm = DB::ChecksumAlgo::None;
}
else
{
std::cerr << "invalid algorithm: " << algorithm_ << std::endl;
std::cerr << "invalid algorithm: " << algorithm_config << std::endl;
return -EINVAL;
}
auto frame = vm["frame"].as<size_t>();
Expand Down Expand Up @@ -286,15 +286,15 @@ int benchEntry(const std::vector<std::string> & opts)
"encryption: {}\n"
"algorithm: {}";
DB::DM::DMConfigurationOpt opt = std::nullopt;
auto logger = &Poco::Logger::get("DTTool::Bench");
auto * logger = &Poco::Logger::get("DTTool::Bench");
if (version == 1)
{
LOG_FMT_INFO(logger, SUMMARY_TEMPLATE_V1, version, column, size, field, random, encryption, workdir);
DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V2;
}
else
{
LOG_FMT_INFO(logger, SUMMARY_TEMPLATE_V2, version, column, size, field, random, workdir, frame, encryption, algorithm_);
LOG_FMT_INFO(logger, SUMMARY_TEMPLATE_V2, version, column, size, field, random, workdir, frame, encryption, algorithm_config);
opt.emplace(std::map<std::string, std::string>{}, frame, algorithm);
DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V3;
}
Expand Down Expand Up @@ -323,11 +323,13 @@ int benchEntry(const std::vector<std::string> & opts)
auto db_context = env.getContext();
auto path_pool = std::make_unique<DB::StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
auto storage_pool = std::make_unique<DB::DM::StoragePool>("test.t1", *path_pool, *db_context, db_context->getSettingsRef());
auto page_id_generator = std::make_unique<DB::DM::PageIdGenerator>();
auto dm_settings = DB::DM::DeltaMergeStore::Settings{};
auto dm_context = std::make_unique<DB::DM::DMContext>( //
*db_context,
*path_pool,
*storage_pool,
*page_id_generator,
/*hash_salt*/ 0,
0,
dm_settings.not_compress_columns,
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic
}
auto path_pool = std::make_unique<DB::StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
auto storage_pool = std::make_unique<DB::DM::StoragePool>("test.t1", *path_pool, *db_context, db_context->getSettingsRef());
auto page_id_generator = std::make_unique<DB::DM::PageIdGenerator>();
auto dm_settings = DB::DM::DeltaMergeStore::Settings{};
auto dm_context = std::make_unique<DB::DM::DMContext>( //
*db_context,
*path_pool,
*storage_pool,
*page_id_generator,
/*hash_salt*/ 0,
0,
dm_settings.not_compress_columns,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ ColumnTinyFilePtr ColumnFileTiny::writeColumnFile(DMContext & context, const Blo

PageId ColumnFileTiny::writeColumnFileData(DMContext & context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs)
{
auto page_id = context.storage_pool.newLogPageId();
auto page_id = context.page_id_generator.newLogPageId();

MemoryWriteBuffer write_buf;
PageFieldSizes col_data_sizes;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class StoragePathPool;
namespace DM
{
class StoragePool;
class PageIdGenerator;
using NotCompress = std::unordered_set<ColId>;
struct DMContext;
using DMContextPtr = std::shared_ptr<DMContext>;
Expand All @@ -26,6 +27,7 @@ struct DMContext : private boost::noncopyable

StoragePathPool & path_pool;
StoragePool & storage_pool;
PageIdGenerator & page_id_generator;
const UInt64 hash_salt;

// gc safe-point, maybe update.
Expand Down Expand Up @@ -73,6 +75,7 @@ struct DMContext : private boost::noncopyable
DMContext(const Context & db_context_,
StoragePathPool & path_pool_,
StoragePool & storage_pool_,
PageIdGenerator & page_id_generator_,
const UInt64 hash_salt_,
const DB::Timestamp min_version_,
const NotCompress & not_compress_,
Expand All @@ -83,6 +86,7 @@ struct DMContext : private boost::noncopyable
: db_context(db_context_)
, path_pool(path_pool_)
, storage_pool(storage_pool_)
, page_id_generator(page_id_generator_)
, hash_salt(hash_salt_)
, min_version(min_version_)
, not_compress(not_compress_)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,15 @@ ColumnFilePersisteds ColumnFilePersistedSet::checkHeadAndCloneTail(DMContext & c
else if (auto * t_file = column_file->tryToTinyFile(); t_file)
{
// Use a newly created page_id to reference the data page_id of current column file.
PageId new_data_page_id = context.storage_pool.newLogPageId();
PageId new_data_page_id = context.page_id_generator.newLogPageId();
wbs.log.putRefPage(new_data_page_id, t_file->getDataPageId());
auto new_column_file = t_file->cloneWith(new_data_page_id);
cloned_tail.push_back(new_column_file);
}
else if (auto * b_file = column_file->tryToBigFile(); b_file)
{
auto delegator = context.path_pool.getStableDiskDelegator();
auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto new_ref_id = context.page_id_generator.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto file_id = b_file->getFile()->fileId();
wbs.data.putRefPage(new_ref_id, file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ ColumnFiles MemTableSet::cloneColumnFiles(DMContext & context, const RowKeyRange
else if (auto * t = column_file->tryToTinyFile(); t)
{
// Use a newly created page_id to reference the data page_id of current column file.
PageId new_data_page_id = context.storage_pool.newLogPageId();
PageId new_data_page_id = context.page_id_generator.newLogPageId();
wbs.log.putRefPage(new_data_page_id, t->getDataPageId());
auto new_column_file = t->cloneWith(new_data_page_id);

Expand All @@ -87,7 +87,7 @@ ColumnFiles MemTableSet::cloneColumnFiles(DMContext & context, const RowKeyRange
else if (auto * f = column_file->tryToBigFile(); f)
{
auto delegator = context.path_pool.getStableDiskDelegator();
auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto new_ref_id = context.page_id_generator.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto file_id = f->getFile()->fileId();
wbs.data.putRefPage(new_ref_id, file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,11 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
try
{
storage_pool.restore(); // restore from disk
if (!storage_pool.maxMetaPageId())
page_id_generator.restore(storage_pool);
if (!page_id_generator.maxMetaPageId())
{
// Create the first segment.
auto segment_id = storage_pool.newMetaPageId();
auto segment_id = page_id_generator.newMetaPageId();
if (segment_id != DELTA_MERGE_FIRST_SEGMENT_ID)
throw Exception(fmt::format("The first segment id should be {}", DELTA_MERGE_FIRST_SEGMENT_ID), ErrorCodes::LOGICAL_ERROR);
auto first_segment
Expand Down Expand Up @@ -391,6 +392,7 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB:
auto * ctx = new DMContext(db_context.getGlobalContext(),
path_pool,
storage_pool,
page_id_generator,
hash_salt,
latest_gc_safe_point.load(std::memory_order_acquire),
settings.not_compress_columns,
Expand Down Expand Up @@ -591,7 +593,7 @@ std::tuple<String, PageId> DeltaMergeStore::preAllocateIngestFile()

auto delegator = path_pool.getStableDiskDelegator();
auto parent_path = delegator.choosePath();
auto new_id = storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto new_id = page_id_generator.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
return {parent_path, new_id};
}

Expand Down Expand Up @@ -708,7 +710,7 @@ void DeltaMergeStore::ingestFiles(
/// Generate DMFile instance with a new ref_id pointed to the file_id.
auto file_id = file->fileId();
const auto & file_parent_path = file->parentPath();
auto ref_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
auto ref_id = page_id_generator.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);

auto ref_file = DMFile::restore(file_provider, file_id, ref_id, file_parent_path, DMFile::ReadMetaMode::all());
auto column_file = std::make_shared<ColumnFileBig>(*dm_context, ref_file, segment_range);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ class DeltaMergeStore : private boost::noncopyable
StoragePathPool path_pool;
Settings settings;
StoragePool storage_pool;
PageIdGenerator page_id_generator;

String db_name;
String table_name;
Expand Down
22 changes: 11 additions & 11 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ StableValueSpacePtr createNewStable(DMContext & context,
DMFileBlockOutputStream::Flags flags;
flags.setSingleFile(context.db_context.getSettingsRef().dt_enable_single_file_mode_dmfile);

PageId dtfile_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
PageId dtfile_id = context.page_id_generator.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto dtfile = writeIntoNewDMFile(context, schema_snap, input_stream, dtfile_id, store_path, flags);
auto stable = std::make_shared<StableValueSpace>(stable_id);
stable->setFiles({dtfile}, RowKeyRange::newAll(context.is_common_handle, context.rowkey_column_size));
Expand Down Expand Up @@ -215,8 +215,8 @@ SegmentPtr Segment::newSegment(
rowkey_range,
segment_id,
next_segment_id,
context.storage_pool.newMetaPageId(),
context.storage_pool.newMetaPageId());
context.page_id_generator.newMetaPageId(),
context.page_id_generator.newMetaPageId());
}

SegmentPtr Segment::restoreSegment(DMContext & context, PageId segment_id)
Expand Down Expand Up @@ -882,7 +882,7 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitLogical(DMContext & dm_co

EventRecorder recorder(ProfileEvents::DMSegmentSplit, ProfileEvents::DMSegmentSplitNS);

auto & storage_pool = dm_context.storage_pool;
auto & page_id_generator = dm_context.page_id_generator;

RowKeyRange my_range(rowkey_range.start, split_point, is_common_handle, rowkey_column_size);
RowKeyRange other_range(split_point, rowkey_range.end, is_common_handle, rowkey_column_size);
Expand All @@ -899,7 +899,7 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitLogical(DMContext & dm_co
}

GenPageId log_gen_page_id = [&]() {
return storage_pool.newLogPageId();
return page_id_generator.newLogPageId();
};

DMFiles my_stable_files;
Expand All @@ -912,8 +912,8 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitLogical(DMContext & dm_co
auto file_id = dmfile->fileId();
auto file_parent_path = delegate.getDTFilePath(file_id);

auto my_dmfile_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
auto other_dmfile_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
auto my_dmfile_id = page_id_generator.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
auto other_dmfile_id = page_id_generator.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);

wbs.data.putRefPage(my_dmfile_id, file_id);
wbs.data.putRefPage(other_dmfile_id, file_id);
Expand All @@ -936,7 +936,7 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitLogical(DMContext & dm_co
other_stable_files.push_back(other_dmfile);
}

auto other_stable_id = storage_pool.newMetaPageId();
auto other_stable_id = page_id_generator.newMetaPageId();

auto my_stable = std::make_shared<StableValueSpace>(segment_snap->stable->getId());
auto other_stable = std::make_shared<StableValueSpace>(other_stable_id);
Expand Down Expand Up @@ -1038,7 +1038,7 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitPhysical(DMContext & dm_c
*read_info.read_columns,
dm_context.min_version,
is_common_handle);
auto other_stable_id = dm_context.storage_pool.newMetaPageId();
auto other_stable_id = dm_context.page_id_generator.newMetaPageId();
other_stable = createNewStable(dm_context, schema_snap, other_data, other_stable_id, wbs);
}

Expand Down Expand Up @@ -1075,8 +1075,8 @@ SegmentPair Segment::applySplit(DMContext & dm_context, //
// Created references to tail pages' pages in "log" storage, we need to write them down.
wbs.writeLogAndData();

auto other_segment_id = dm_context.storage_pool.newMetaPageId();
auto other_delta_id = dm_context.storage_pool.newMetaPageId();
auto other_segment_id = dm_context.page_id_generator.newMetaPageId();
auto other_delta_id = dm_context.page_id_generator.newMetaPageId();

auto my_delta = std::make_shared<DeltaValueSpace>(delta->getId(), my_persisted_files, my_in_memory_files);
auto other_delta = std::make_shared<DeltaValueSpace>(other_delta_id, other_persisted_files, other_in_memory_files);
Expand Down
75 changes: 38 additions & 37 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ StoragePool::StoragePool(const String & name, StoragePathPool & path_pool, const
path_pool.getPSDiskDelegatorMulti("meta"),
extractConfig(settings, StorageType::Meta),
global_ctx.getFileProvider()))
, max_log_page_id(0)
, max_data_page_id(0)
, max_meta_page_id(0)
, global_context(global_ctx)
{}

Expand All @@ -82,10 +79,6 @@ void StoragePool::restore()
log_storage->restore();
data_storage->restore();
meta_storage->restore();

max_log_page_id = log_storage->getMaxId();
max_data_page_id = data_storage->getMaxId();
max_meta_page_id = meta_storage->getMaxId();
}

void StoragePool::drop()
Expand All @@ -95,36 +88,6 @@ void StoragePool::drop()
log_storage->drop();
}

PageId StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who)
{
// In case that there is a DTFile created on disk but TiFlash crashes without persisting the ID.
// After TiFlash process restored, the ID will be inserted into the stable delegator, but we may
// get a duplicated ID from the `storage_pool.data`. (tics#2756)
PageId dtfile_id;
do
{
dtfile_id = ++max_data_page_id;

auto existed_path = delegator.getDTFilePath(dtfile_id, /*throw_on_not_exist=*/false);
fiu_do_on(FailPoints::force_set_dtfile_exist_when_acquire_id, {
static size_t fail_point_called = 0;
if (existed_path.empty() && fail_point_called % 10 == 0)
{
existed_path = "<mock for existed path>";
}
fail_point_called++;
});
if (likely(existed_path.empty()))
{
break;
}
// else there is a DTFile with that id, continue to acquire a new ID.
LOG_WARNING(&Poco::Logger::get(who),
fmt::format("The DTFile is already exists, continute to acquire another ID. [path={}] [id={}]", existed_path, dtfile_id));
} while (true);
return dtfile_id;
}

bool StoragePool::gc(const Settings & settings, const Seconds & try_gc_period)
{
{
Expand Down Expand Up @@ -155,5 +118,43 @@ bool StoragePool::gc(const Settings & settings, const Seconds & try_gc_period)
return done_anything;
}

void PageIdGenerator::restore(const StoragePool & storage_pool)
{
max_log_page_id = storage_pool.log_storage->getMaxId();
max_data_page_id = storage_pool.data_storage->getMaxId();
max_meta_page_id = storage_pool.meta_storage->getMaxId();
}

PageId PageIdGenerator::newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who)
{
// In case that there is a DTFile created on disk but TiFlash crashes without persisting the ID.
// After TiFlash process restored, the ID will be inserted into the stable delegator, but we may
// get a duplicated ID from the `storage_pool.data`. (tics#2756)
PageId dtfile_id;
do
{
dtfile_id = ++max_data_page_id;

auto existed_path = delegator.getDTFilePath(dtfile_id, /*throw_on_not_exist=*/false);
fiu_do_on(FailPoints::force_set_dtfile_exist_when_acquire_id, {
static size_t fail_point_called = 0;
if (existed_path.empty() && fail_point_called % 10 == 0)
{
existed_path = "<mock for existed path>";
}
fail_point_called++;
});
if (likely(existed_path.empty()))
{
break;
}
// else there is a DTFile with that id, continue to acquire a new ID.
LOG_FMT_WARNING(&Poco::Logger::get(who),
"The DTFile is already exists, continute to acquire another ID. [path={}] [id={}]",
existed_path,
dtfile_id);
} while (true);
return dtfile_id;
}
} // namespace DM
} // namespace DB
Loading

0 comments on commit 4954a24

Please sign in to comment.