diff --git a/dbms/src/Debug/dbgFuncMisc.cpp b/dbms/src/Debug/dbgFuncMisc.cpp index b9f62317189..e43ca5dd725 100644 --- a/dbms/src/Debug/dbgFuncMisc.cpp +++ b/dbms/src/Debug/dbgFuncMisc.cpp @@ -104,9 +104,7 @@ void dbgFuncTriggerGlobalPageStorageGC(Context & context, const ASTs & /*args*/, auto global_storage_pool = context.getGlobalStoragePool(); if (global_storage_pool) { - global_storage_pool->meta()->gc(); - global_storage_pool->log()->gc(); - global_storage_pool->data()->gc(); + global_storage_pool->gc(); } } } // namespace DB diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index ce4ecc692c0..25dffe263fd 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -161,6 +161,7 @@ struct ContextShared PathCapacityMetricsPtr path_capacity_ptr; /// Path capacity metrics FileProviderPtr file_provider; /// File provider. IORateLimiter io_rate_limiter; + PageStorageRunMode storage_run_mode; DM::GlobalStoragePoolPtr global_storage_pool; /// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests. @@ -1547,24 +1548,38 @@ ReadLimiterPtr Context::getReadLimiter() const return getIORateLimiter().getReadLimiter(); } -static bool isUsingPageStorageV3(const PathPool & path_pool, bool enable_ps_v3) + +static bool isPageStorageV2Existed(const PathPool & path_pool) { - // Check whether v3 is already enabled - for (const auto & path : path_pool.listGlobalPagePaths()) + for (const auto & path : path_pool.listKVStorePaths()) { - if (PS::V3::PageStorageImpl::isManifestsFileExists(path)) + Poco::File dir(path); + if (!dir.exists()) + continue; + + std::vector files; + dir.list(files); + if (!files.empty()) { - return true; + for (const auto & file_name : files) + { + const auto & find_index = file_name.find("page"); + if (find_index != std::string::npos) + { + return true; + } + } + // KVStore is not empty, but can't find any of v2 data in it. } } - // Check whether v3 on new node is enabled in the config, if not, no need to check anymore - if (!enable_ps_v3) - return false; + // If not data in KVStore. It means V2 data must not existed. + return false; +} - // Check whether there are any files in kvstore path, if exists, then this is not a new node. - // If it's a new node, then we enable v3. Otherwise, we use v2. - for (const auto & path : path_pool.listKVStorePaths()) +static bool isPageStorageV3Existed(const PathPool & path_pool) +{ + for (const auto & path : path_pool.listGlobalPagePaths()) { Poco::File dir(path); if (!dir.exists()) @@ -1574,23 +1589,76 @@ static bool isUsingPageStorageV3(const PathPool & path_pool, bool enable_ps_v3) dir.list(files); if (!files.empty()) { - return false; + return true; } } - return true; + return false; } -bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool, bool enable_ps_v3) +void Context::initializePageStorageMode(const PathPool & path_pool, UInt64 storage_page_format_version) { auto lock = getLock(); - if (isUsingPageStorageV3(path_pool, enable_ps_v3)) + + /** + * PageFormat::V2 + isPageStorageV3Existed is false + whatever isPageStorageV2Existed true or false = ONLY_V2 + * PageFormat::V2 + isPageStorageV3Existed is true + whatever isPageStorageV2Existed true or false = ERROR Config + * PageFormat::V3 + isPageStorageV2Existed is true + whatever isPageStorageV3Existed true or false = MIX_MODE + * PageFormat::V3 + isPageStorageV2Existed is false + whatever isPageStorageV3Existed true or false = ONLY_V3 + */ + + switch (storage_page_format_version) { - try + case PageFormat::V1: + case PageFormat::V2: + { + if (isPageStorageV3Existed(path_pool)) { - // create manifests file before initialize GlobalStoragePool - for (const auto & path : path_pool.listGlobalPagePaths()) - PS::V3::PageStorageImpl::createManifestsFileIfNeed(path); + throw Exception("Invalid config `storage.format_version`, Current page V3 data exist. But using the PageFormat::V2." + "If you are downgrading the format_version for this TiFlash node, you need to rebuild the data from scratch.", + ErrorCodes::LOGICAL_ERROR); + } + // not exist V3 + shared->storage_run_mode = PageStorageRunMode::ONLY_V2; + return; + } + case PageFormat::V3: + { + shared->storage_run_mode = isPageStorageV2Existed(path_pool) ? PageStorageRunMode::MIX_MODE : PageStorageRunMode::ONLY_V3; + return; + } + default: + throw Exception(fmt::format("Can't detect the format version of Page [page_version={}]", storage_page_format_version), + ErrorCodes::LOGICAL_ERROR); + } +} + +PageStorageRunMode Context::getPageStorageRunMode() const +{ + auto lock = getLock(); + return shared->storage_run_mode; +} +void Context::setPageStorageRunMode(PageStorageRunMode run_mode) const +{ + auto lock = getLock(); + shared->storage_run_mode = run_mode; +} + +bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool) +{ + auto lock = getLock(); + if (shared->global_storage_pool) + { + // Can't init GlobalStoragePool twice. + // Because we won't remove the gc task in BackGroundPool + // Also won't remove it from ~GlobalStoragePool() + throw Exception("GlobalStoragePool has already been initialized.", ErrorCodes::LOGICAL_ERROR); + } + + if (shared->storage_run_mode == PageStorageRunMode::MIX_MODE || shared->storage_run_mode == PageStorageRunMode::ONLY_V3) + { + try + { shared->global_storage_pool = std::make_shared(path_pool, *this, settings); shared->global_storage_pool->restore(); return true; diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 9e8a1b248ba..63aefcbece9 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -99,6 +99,7 @@ using WriteLimiterPtr = std::shared_ptr; class ReadLimiter; using ReadLimiterPtr = std::shared_ptr; +enum class PageStorageRunMode : UInt8; namespace DM { class MinMaxIndexCache; @@ -405,8 +406,10 @@ class Context ReadLimiterPtr getReadLimiter() const; IORateLimiter & getIORateLimiter() const; - bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool, bool enable_ps_v3); - + void initializePageStorageMode(const PathPool & path_pool, UInt64 storage_page_format_version); + void setPageStorageRunMode(PageStorageRunMode run_mode) const; + PageStorageRunMode getPageStorageRunMode() const; + bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool); DM::GlobalStoragePoolPtr getGlobalStoragePool() const; /// Call after initialization before using system logs. Call for global context. diff --git a/dbms/src/Server/DTTool/DTToolBench.cpp b/dbms/src/Server/DTTool/DTToolBench.cpp index 491ac710721..e30f95e14b2 100644 --- a/dbms/src/Server/DTTool/DTToolBench.cpp +++ b/dbms/src/Server/DTTool/DTToolBench.cpp @@ -336,7 +336,7 @@ int benchEntry(const std::vector & opts) auto settings = DB::Settings(); auto db_context = env.getContext(); auto path_pool = std::make_unique(db_context->getPathPool().withTable("test", "t1", false)); - auto storage_pool = std::make_unique("test.t1", /*table_id*/ 1, *path_pool, *db_context, db_context->getSettingsRef()); + auto storage_pool = std::make_unique(*db_context, /*ns_id*/ 1, *path_pool, "test.t1"); auto dm_settings = DB::DM::DeltaMergeStore::Settings{}; auto dm_context = std::make_unique( // *db_context, diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 4762935cd6f..705b8a533f3 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1112,13 +1112,10 @@ int Server::main(const std::vector & /*args*/) raft_config.enable_compatible_mode, // global_context->getPathCapacity(), global_context->getFileProvider()); - // must initialize before the following operation: - // 1. load data from disk(because this process may depend on the initialization of global StoragePool) - // 2. initialize KVStore service - // 1) because we need to check whether this is the first startup of this node, and we judge it based on whether there are any files in kvstore directory - // 2) KVStore service also choose its data format based on whether the GlobalStoragePool is initialized - if (global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool(), storage_config.enable_ps_v3)) - LOG_FMT_INFO(log, "PageStorage V3 enabled."); + + global_context->initializePageStorageMode(global_context->getPathPool(), STORAGE_FORMAT_CURRENT.page); + global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool()); + LOG_FMT_INFO(log, "Global PageStorage run mode is {}", static_cast(global_context->getPageStorageRunMode())); // Use pd address to define which default_database we use by default. // For mock test, we use "default". For deployed with pd/tidb/tikv use "system", which is always exist in TiFlash. diff --git a/dbms/src/Server/StorageConfigParser.cpp b/dbms/src/Server/StorageConfigParser.cpp index 270390ac1fa..d43ccb850f1 100644 --- a/dbms/src/Server/StorageConfigParser.cpp +++ b/dbms/src/Server/StorageConfigParser.cpp @@ -213,10 +213,8 @@ void TiFlashStorageConfig::parseMisc(const String & storage_section, Poco::Logge }; lazily_init_store = get_bool_config_or_default("lazily_init_store", lazily_init_store); - // config for experimental feature, may remove later - enable_ps_v3 = get_bool_config_or_default("enable_ps_v3", enable_ps_v3); - LOG_FMT_INFO(log, "format_version {} lazily_init_store {} enable_ps_v3 {}", format_version, lazily_init_store, enable_ps_v3); + LOG_FMT_INFO(log, "format_version {} lazily_init_store {}", format_version, lazily_init_store); } Strings TiFlashStorageConfig::getAllNormalPaths() const diff --git a/dbms/src/Server/StorageConfigParser.h b/dbms/src/Server/StorageConfigParser.h index 65df7be3174..4efc5637634 100644 --- a/dbms/src/Server/StorageConfigParser.h +++ b/dbms/src/Server/StorageConfigParser.h @@ -103,7 +103,6 @@ struct TiFlashStorageConfig UInt64 format_version = 0; bool lazily_init_store = true; - bool enable_ps_v3 = true; public: TiFlashStorageConfig() = default; diff --git a/dbms/src/Server/tests/gtest_dttool.cpp b/dbms/src/Server/tests/gtest_dttool.cpp index 6b0d6e3d5c2..a03fc779ae3 100644 --- a/dbms/src/Server/tests/gtest_dttool.cpp +++ b/dbms/src/Server/tests/gtest_dttool.cpp @@ -72,7 +72,7 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic properties.push_back(property); } auto path_pool = std::make_unique(db_context->getPathPool().withTable("test", "t1", false)); - auto storage_pool = std::make_unique("test.t1", /*table_id*/ 1, *path_pool, *db_context, db_context->getSettingsRef()); + auto storage_pool = std::make_unique(*db_context, /*ns_id*/ 1, *path_pool, "test.t1"); auto dm_settings = DB::DM::DeltaMergeStore::Settings{}; auto dm_context = std::make_unique( // *db_context, diff --git a/dbms/src/Server/tests/gtest_server_config.cpp b/dbms/src/Server/tests/gtest_server_config.cpp index 69e1fe4cb6a..53705f1a351 100644 --- a/dbms/src/Server/tests/gtest_server_config.cpp +++ b/dbms/src/Server/tests/gtest_server_config.cpp @@ -296,7 +296,7 @@ dt_page_gc_low_write_prob = 0.2 auto verify_persister_reload_config = [&global_ctx](RegionPersister & persister) { DB::Settings & settings = global_ctx.getSettingsRef(); - auto cfg = persister.page_storage->getSettings(); + auto cfg = persister.getPageStorageSettings(); EXPECT_NE(cfg.gc_min_files, settings.dt_storage_pool_data_gc_min_file_num); EXPECT_NE(cfg.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num); EXPECT_NE(cfg.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes); @@ -307,8 +307,7 @@ dt_page_gc_low_write_prob = 0.2 EXPECT_NE(cfg.prob_do_gc_when_write_is_low, settings.dt_page_gc_low_write_prob * 1000); persister.gc(); - cfg = persister.page_storage->getSettings(); - + cfg = persister.getPageStorageSettings(); EXPECT_NE(cfg.gc_min_files, settings.dt_storage_pool_data_gc_min_file_num); EXPECT_NE(cfg.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num); EXPECT_NE(cfg.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes); @@ -370,12 +369,12 @@ dt_page_gc_low_write_prob = 0.2 auto & global_ctx = TiFlashTestEnv::getGlobalContext(); std::unique_ptr path_pool = std::make_unique(global_ctx.getPathPool().withTable("test", "t1", false)); - std::unique_ptr storage_pool = std::make_unique("test.t1", /*table_id*/ 100, *path_pool, global_ctx, global_ctx.getSettingsRef()); + std::unique_ptr storage_pool = std::make_unique(global_ctx, /*ns_id*/ 100, *path_pool, "test.t1"); auto verify_storage_pool_reload_config = [&global_ctx](std::unique_ptr & storage_pool) { DB::Settings & settings = global_ctx.getSettingsRef(); - auto cfg = storage_pool->data()->getSettings(); + auto cfg = storage_pool->data_storage_v2->getSettings(); EXPECT_NE(cfg.gc_min_files, settings.dt_storage_pool_data_gc_min_file_num); EXPECT_NE(cfg.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num); EXPECT_NE(cfg.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes); @@ -387,7 +386,7 @@ dt_page_gc_low_write_prob = 0.2 storage_pool->gc(settings, DM::StoragePool::Seconds(0)); - cfg = storage_pool->data()->getSettings(); + cfg = storage_pool->data_storage_v2->getSettings(); EXPECT_EQ(cfg.gc_min_files, settings.dt_storage_pool_data_gc_min_file_num); EXPECT_EQ(cfg.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num); EXPECT_EQ(cfg.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index 9fe72839628..306f9470f5d 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -72,7 +72,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(DMContext & context, / readIntBinary(valid_rows, buf); readIntBinary(valid_bytes, buf); - auto file_id = context.storage_pool.dataReader().getNormalPageId(file_ref_id); + auto file_id = context.storage_pool.dataReader()->getNormalPageId(file_ref_id); auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id); auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, file_ref_id, file_parent_path, DMFile::ReadMetaMode::all()); diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp index b96acdad424..289caf5816c 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp @@ -107,7 +107,7 @@ ColumnFilePersistedSet::ColumnFilePersistedSet(PageId metadata_id_, const Column ColumnFilePersistedSetPtr ColumnFilePersistedSet::restore(DMContext & context, const RowKeyRange & segment_range, PageId id) { - Page page = context.storage_pool.metaReader().read(id); + Page page = context.storage_pool.metaReader()->read(id); ReadBufferFromMemory buf(page.data.begin(), page.data.size()); auto column_files = deserializeSavedColumnFiles(context, segment_range, buf); return std::make_shared(id, column_files); diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index b57ce3cb50a..132732d6989 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -236,12 +236,12 @@ bool DeltaValueSpace::compact(DMContext & context) LOG_FMT_DEBUG(log, "{} Nothing to compact", simpleInfo()); return true; } - log_storage_snap = context.storage_pool.log()->getSnapshot(/*tracing_id*/ fmt::format("minor_compact_{}", simpleInfo())); + log_storage_snap = context.storage_pool.logReader()->getSnapshot(/*tracing_id*/ fmt::format("minor_compact_{}", simpleInfo())); } // do compaction task WriteBatches wbs(context.storage_pool, context.getWriteLimiter()); - PageReader reader(context.storage_pool.getNamespaceId(), context.storage_pool.log(), std::move(log_storage_snap), context.getReadLimiter()); + const auto & reader = context.storage_pool.newLogReader(context.getReadLimiter(), log_storage_snap); compaction_task->prepare(context, wbs, reader); { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index b27c94305aa..6b4339938af 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -212,18 +212,15 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, , hash_salt(++DELTA_MERGE_STORE_HASH_SALT) , log(Logger::get("DeltaMergeStore", fmt::format("{}.{}", db_name, table_name))) { - LOG_FMT_INFO(log, "Restore DeltaMerge Store start [{}.{}]", db_name, table_name); - // for mock test, table_id_ should be DB::InvalidTableID NamespaceId ns_id = physical_table_id == DB::InvalidTableID ? TEST_NAMESPACE_ID : physical_table_id; - if (auto global_storage_pool = global_context.getGlobalStoragePool(); global_storage_pool) - { - storage_pool = std::make_shared(ns_id, *global_storage_pool, global_context); - } - else - { - storage_pool = std::make_shared(db_name_ + "." + table_name_, ns_id, path_pool, global_context, db_context.getSettingsRef()); - } + + LOG_FMT_INFO(log, "Restore DeltaMerge Store start [{}.{}] [table_id = {}]", db_name, table_name, physical_table_id); + + storage_pool = std::make_shared(global_context, + ns_id, + path_pool, + db_name_ + "." + table_name_); // Restore existing dm files and set capacity for path_pool. // Should be done before any background task setup. @@ -242,10 +239,10 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, store_columns = generateStoreColumns(original_table_columns, is_common_handle); auto dm_context = newDMContext(db_context, db_context.getSettingsRef()); - + PageStorageRunMode page_storage_run_mode; try { - storage_pool->restore(); // restore from disk + page_storage_run_mode = storage_pool->restore(); // restore from disk if (!storage_pool->maxMetaPageId()) { // Create the first segment. @@ -278,7 +275,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, setUpBackgroundTask(dm_context); - LOG_FMT_INFO(log, "Restore DeltaMerge Store end [{}.{}]", db_name, table_name); + LOG_FMT_INFO(log, "Restore DeltaMerge Store end [{}.{}], [ps_run_mode={}]", db_name, table_name, static_cast(page_storage_run_mode)); } DeltaMergeStore::~DeltaMergeStore() @@ -332,7 +329,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) }; callbacks.ns_id = storage_pool->getNamespaceId(); // remember to unregister it when shutdown - storage_pool->data()->registerExternalPagesCallbacks(callbacks); + storage_pool->dataRegisterExternalPagesCallbacks(callbacks); storage_pool->enableGC(); background_task_handle = background_pool.addTask([this] { return handleBackgroundTask(false); }); @@ -480,7 +477,7 @@ void DeltaMergeStore::shutdown() // shutdown before unregister to avoid conflict between this thread and background gc thread on the `ExternalPagesCallbacks` // because PageStorage V2 doesn't have any lock protection on the `ExternalPagesCallbacks`.(The order doesn't matter for V3) storage_pool->shutdown(); - storage_pool->data()->unregisterExternalPagesCallbacks(storage_pool->getNamespaceId()); + storage_pool->dataUnregisterExternalPagesCallbacks(storage_pool->getNamespaceId()); background_pool.removeTask(background_task_handle); blockable_background_pool.removeTask(blockable_background_pool_handle); @@ -716,7 +713,7 @@ void DeltaMergeStore::preIngestFile(const String & parent_path, const PageId fil void DeltaMergeStore::ingestFiles( const DMContextPtr & dm_context, const RowKeyRange & range, - const std::vector & file_ids, + const PageIds & file_ids, bool clear_data_in_range) { if (unlikely(shutdown_called.load(std::memory_order_relaxed))) @@ -2386,12 +2383,12 @@ DeltaMergeStoreStat DeltaMergeStore::getStat() static const String useless_tracing_id("DeltaMergeStore::getStat"); { - auto snaps_stat = storage_pool->data()->getSnapshotsStat(); + auto snaps_stat = storage_pool->dataReader()->getSnapshotsStat(); stat.storage_stable_num_snapshots = snaps_stat.num_snapshots; stat.storage_stable_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds; stat.storage_stable_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id; stat.storage_stable_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id; - PageStorage::SnapshotPtr stable_snapshot = storage_pool->data()->getSnapshot(useless_tracing_id); + PageStorage::SnapshotPtr stable_snapshot = storage_pool->dataReader()->getSnapshot(useless_tracing_id); const auto * concrete_snap = toConcreteSnapshot(stable_snapshot); if (concrete_snap) { @@ -2408,12 +2405,12 @@ DeltaMergeStoreStat DeltaMergeStore::getStat() } } { - auto snaps_stat = storage_pool->log()->getSnapshotsStat(); + auto snaps_stat = storage_pool->logReader()->getSnapshotsStat(); stat.storage_delta_num_snapshots = snaps_stat.num_snapshots; stat.storage_delta_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds; stat.storage_delta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id; stat.storage_delta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id; - PageStorage::SnapshotPtr log_snapshot = storage_pool->log()->getSnapshot(useless_tracing_id); + PageStorage::SnapshotPtr log_snapshot = storage_pool->logReader()->getSnapshot(useless_tracing_id); const auto * concrete_snap = toConcreteSnapshot(log_snapshot); if (concrete_snap) { @@ -2430,12 +2427,12 @@ DeltaMergeStoreStat DeltaMergeStore::getStat() } } { - auto snaps_stat = storage_pool->meta()->getSnapshotsStat(); + auto snaps_stat = storage_pool->metaReader()->getSnapshotsStat(); stat.storage_meta_num_snapshots = snaps_stat.num_snapshots; stat.storage_meta_oldest_snapshot_lifetime = snaps_stat.longest_living_seconds; stat.storage_meta_oldest_snapshot_thread_id = snaps_stat.longest_living_from_thread_id; stat.storage_meta_oldest_snapshot_tracing_id = snaps_stat.longest_living_from_tracing_id; - PageStorage::SnapshotPtr meta_snapshot = storage_pool->meta()->getSnapshot(useless_tracing_id); + PageStorage::SnapshotPtr meta_snapshot = storage_pool->metaReader()->getSnapshot(useless_tracing_id); const auto * concrete_snap = toConcreteSnapshot(meta_snapshot); if (concrete_snap) { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 9b09b6f37c5..c600ac9dbf8 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -330,13 +330,13 @@ class DeltaMergeStore : private boost::noncopyable void ingestFiles(const DMContextPtr & dm_context, // const RowKeyRange & range, - const std::vector & file_ids, + const PageIds & file_ids, bool clear_data_in_range); void ingestFiles(const Context & db_context, // const DB::Settings & db_settings, const RowKeyRange & range, - const std::vector & file_ids, + const PageIds & file_ids, bool clear_data_in_range) { auto dm_context = newDMContext(db_context, db_settings); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index eccc06a8257..e8cde4ba0c6 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -236,7 +236,7 @@ SegmentPtr Segment::newSegment( SegmentPtr Segment::restoreSegment(DMContext & context, PageId segment_id) { - Page page = context.storage_pool.metaReader().read(segment_id); // not limit restore + Page page = context.storage_pool.metaReader()->read(segment_id); // not limit restore ReadBufferFromMemory buf(page.data.begin(), page.data.size()); SegmentFormat::Version version; diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 648ffd4f084..752a112b565 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -90,7 +90,7 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageId id) { auto stable = std::make_shared(id); - Page page = context.storage_pool.metaReader().read(id); // not limit restore + Page page = context.storage_pool.metaReader()->read(id); // not limit restore ReadBufferFromMemory buf(page.data.begin(), page.data.size()); UInt64 version, valid_rows, valid_bytes, size; readIntBinary(version, buf); @@ -105,7 +105,7 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageId id) { readIntBinary(ref_id, buf); - auto file_id = context.storage_pool.dataReader().getNormalPageId(ref_id); + auto file_id = context.storage_pool.dataReader()->getNormalPageId(ref_id); auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id); auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, ref_id, file_parent_path, DMFile::ReadMetaMode::all()); diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index 8c4468ae2ed..2979a75e192 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -17,11 +17,15 @@ #include #include #include -#include #include namespace DB { +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} // namespace ErrorCodes + namespace FailPoints { extern const char force_set_dtfile_exist_when_acquire_id[]; @@ -60,34 +64,13 @@ PageStorage::Config extractConfig(const Settings & settings, StorageType subtype SET_CONFIG(meta); break; default: - throw Exception("Unknown subtype in extractConfig: " + DB::toString(static_cast(subtype))); + throw Exception(fmt::format("Unknown subtype in extractConfig: {} ", static_cast(subtype)), ErrorCodes::LOGICAL_ERROR); } #undef SET_CONFIG return config; } -template -static bool doStoragePoolGC(const Context & global_context, const Settings & settings, const T & storage_pool) -{ - bool done_anything = false; - auto write_limiter = global_context.getWriteLimiter(); - auto read_limiter = global_context.getReadLimiter(); - auto config = extractConfig(settings, StorageType::Meta); - storage_pool.meta()->reloadSettings(config); - done_anything |= storage_pool.meta()->gc(/*not_skip*/ false, write_limiter, read_limiter); - - config = extractConfig(settings, StorageType::Data); - storage_pool.data()->reloadSettings(config); - done_anything |= storage_pool.data()->gc(/*not_skip*/ false, write_limiter, read_limiter); - - config = extractConfig(settings, StorageType::Log); - storage_pool.log()->reloadSettings(config); - done_anything |= storage_pool.log()->gc(/*not_skip*/ false, write_limiter, read_limiter); - - return done_anything; -} - GlobalStoragePool::GlobalStoragePool(const PathPool & path_pool, Context & global_ctx, const Settings & settings) : // The iops and bandwidth in log_storage are relatively high, use multi-disks if possible log_storage(PageStorage::create("__global__.log", @@ -110,7 +93,18 @@ GlobalStoragePool::GlobalStoragePool(const PathPool & path_pool, Context & globa global_ctx.getFileProvider(), true)) , global_context(global_ctx) -{} +{ +} + + +GlobalStoragePool::~GlobalStoragePool() +{ + if (gc_handle) + { + global_context.getBackgroundPool().removeTask(gc_handle); + gc_handle = nullptr; + } +} void GlobalStoragePool::restore() { @@ -125,81 +119,135 @@ void GlobalStoragePool::restore() false); } -GlobalStoragePool::~GlobalStoragePool() +bool GlobalStoragePool::gc() { - if (gc_handle) - { - global_context.getBackgroundPool().removeTask(gc_handle); - gc_handle = nullptr; - } + return gc(Settings(), true, DELTA_MERGE_GC_PERIOD); } -bool GlobalStoragePool::gc(const Settings & settings, const Seconds & try_gc_period) +bool GlobalStoragePool::gc(const Settings & settings, bool immediately, const Seconds & try_gc_period) { + Timepoint now = Clock::now(); + if (!immediately) { - std::lock_guard lock(mutex); - - Timepoint now = Clock::now(); + // No need lock if (now < (last_try_gc_time.load() + try_gc_period)) return false; - - last_try_gc_time = now; } - return doStoragePoolGC(global_context, settings, *this); -} + last_try_gc_time = now; + bool done_anything = false; + auto write_limiter = global_context.getWriteLimiter(); + auto read_limiter = global_context.getReadLimiter(); + auto config = extractConfig(settings, StorageType::Meta); + meta_storage->reloadSettings(config); + done_anything |= meta_storage->gc(/*not_skip*/ false, write_limiter, read_limiter); -StoragePool::StoragePool(const String & name, NamespaceId ns_id_, StoragePathPool & path_pool, Context & global_ctx, const Settings & settings) - : owned_storage(true) - , ns_id(ns_id_) - , - // The iops and bandwidth in log_storage are relatively high, use multi-disks if possible - log_storage(PageStorage::create(name + ".log", - path_pool.getPSDiskDelegatorMulti("log"), - extractConfig(settings, StorageType::Log), - global_ctx.getFileProvider())) - , - // The iops in data_storage is low, only use the first disk for storing data - data_storage(PageStorage::create(name + ".data", - path_pool.getPSDiskDelegatorSingle("data"), - extractConfig(settings, StorageType::Data), - global_ctx.getFileProvider())) - , - // The iops in meta_storage is relatively high, use multi-disks if possible - meta_storage(PageStorage::create(name + ".meta", - path_pool.getPSDiskDelegatorMulti("meta"), - extractConfig(settings, StorageType::Meta), - global_ctx.getFileProvider())) - , log_storage_reader(ns_id, log_storage, nullptr) - , data_storage_reader(ns_id, data_storage, nullptr) - , meta_storage_reader(ns_id, meta_storage, nullptr) - , global_context(global_ctx) -{} + config = extractConfig(settings, StorageType::Data); + data_storage->reloadSettings(config); + done_anything |= data_storage->gc(/*not_skip*/ false, write_limiter, read_limiter); -StoragePool::StoragePool(NamespaceId ns_id_, const GlobalStoragePool & global_storage_pool, Context & global_ctx) - : owned_storage(false) + config = extractConfig(settings, StorageType::Log); + log_storage->reloadSettings(config); + done_anything |= log_storage->gc(/*not_skip*/ false, write_limiter, read_limiter); + + return done_anything; +} + +StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPool & storage_path_pool, const String & name) + : logger(Logger::get("StoragePool", !name.empty() ? name : DB::toString(ns_id_))) + , run_mode(global_ctx.getPageStorageRunMode()) , ns_id(ns_id_) - , log_storage(global_storage_pool.log()) - , data_storage(global_storage_pool.data()) - , meta_storage(global_storage_pool.meta()) - , log_storage_reader(ns_id, log_storage, nullptr) - , data_storage_reader(ns_id, data_storage, nullptr) - , meta_storage_reader(ns_id, meta_storage, nullptr) , global_context(global_ctx) - , v3_log_max_ids(global_storage_pool.getLogMaxIds()) - , v3_data_max_ids(global_storage_pool.getDataMaxIds()) - , v3_meta_max_ids(global_storage_pool.getMetaMaxIds()) -{} +{ + const auto & global_storage_pool = global_context.getGlobalStoragePool(); + switch (run_mode) + { + case PageStorageRunMode::ONLY_V2: + { + log_storage_v2 = PageStorage::create(name + ".log", + storage_path_pool.getPSDiskDelegatorMulti("log"), + extractConfig(global_context.getSettingsRef(), StorageType::Log), + global_context.getFileProvider()); + data_storage_v2 = PageStorage::create(name + ".data", + storage_path_pool.getPSDiskDelegatorSingle("data"), + extractConfig(global_context.getSettingsRef(), StorageType::Data), + global_ctx.getFileProvider()); + meta_storage_v2 = PageStorage::create(name + ".meta", + storage_path_pool.getPSDiskDelegatorMulti("meta"), + extractConfig(global_context.getSettingsRef(), StorageType::Meta), + global_ctx.getFileProvider()); + log_storage_reader = std::make_shared(run_mode, ns_id, log_storage_v2, /*storage_v3_*/ nullptr, nullptr); + data_storage_reader = std::make_shared(run_mode, ns_id, data_storage_v2, /*storage_v3_*/ nullptr, nullptr); + meta_storage_reader = std::make_shared(run_mode, ns_id, meta_storage_v2, /*storage_v3_*/ nullptr, nullptr); + + log_storage_writer = std::make_shared(run_mode, log_storage_v2, /*storage_v3_*/ nullptr); + data_storage_writer = std::make_shared(run_mode, data_storage_v2, /*storage_v3_*/ nullptr); + meta_storage_writer = std::make_shared(run_mode, meta_storage_v2, /*storage_v3_*/ nullptr); + break; + } + case PageStorageRunMode::ONLY_V3: + { + assert(global_storage_pool != nullptr); + log_storage_v3 = global_storage_pool->log_storage; + data_storage_v3 = global_storage_pool->data_storage; + meta_storage_v3 = global_storage_pool->meta_storage; + + log_storage_reader = std::make_shared(run_mode, ns_id, /*storage_v2_*/ nullptr, log_storage_v3, nullptr); + data_storage_reader = std::make_shared(run_mode, ns_id, /*storage_v2_*/ nullptr, data_storage_v3, nullptr); + meta_storage_reader = std::make_shared(run_mode, ns_id, /*storage_v2_*/ nullptr, meta_storage_v3, nullptr); + + log_storage_writer = std::make_shared(run_mode, /*storage_v2_*/ nullptr, log_storage_v3); + data_storage_writer = std::make_shared(run_mode, /*storage_v2_*/ nullptr, data_storage_v3); + meta_storage_writer = std::make_shared(run_mode, /*storage_v2_*/ nullptr, meta_storage_v3); -void StoragePool::restore() + break; + } + case PageStorageRunMode::MIX_MODE: + { + assert(global_storage_pool != nullptr); + log_storage_v3 = global_storage_pool->log_storage; + data_storage_v3 = global_storage_pool->data_storage; + meta_storage_v3 = global_storage_pool->meta_storage; + + log_storage_v2 = PageStorage::create(name + ".log", + storage_path_pool.getPSDiskDelegatorMulti("log"), + extractConfig(global_context.getSettingsRef(), StorageType::Log), + global_context.getFileProvider()); + data_storage_v2 = PageStorage::create(name + ".data", + storage_path_pool.getPSDiskDelegatorMulti("data"), + extractConfig(global_context.getSettingsRef(), StorageType::Data), + global_ctx.getFileProvider()); + meta_storage_v2 = PageStorage::create(name + ".meta", + storage_path_pool.getPSDiskDelegatorMulti("meta"), + extractConfig(global_context.getSettingsRef(), StorageType::Meta), + global_ctx.getFileProvider()); + + log_storage_reader = std::make_shared(run_mode, ns_id, log_storage_v2, log_storage_v3, nullptr); + data_storage_reader = std::make_shared(run_mode, ns_id, data_storage_v2, data_storage_v3, nullptr); + meta_storage_reader = std::make_shared(run_mode, ns_id, meta_storage_v2, meta_storage_v3, nullptr); + + log_storage_writer = std::make_shared(run_mode, log_storage_v2, log_storage_v3); + data_storage_writer = std::make_shared(run_mode, data_storage_v2, data_storage_v3); + meta_storage_writer = std::make_shared(run_mode, meta_storage_v2, meta_storage_v3); + break; + } + default: + throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast(run_mode)), ErrorCodes::LOGICAL_ERROR); + } +} + +PageStorageRunMode StoragePool::restore() { - // If the storage instances is not global, we need to initialize it by ourselves and add a gc task. - if (owned_storage) + const auto & global_storage_pool = global_context.getGlobalStoragePool(); + + switch (run_mode) + { + case PageStorageRunMode::ONLY_V2: { - auto log_max_ids = log_storage->restore(); - auto data_max_ids = data_storage->restore(); - auto meta_max_ids = meta_storage->restore(); + auto log_max_ids = log_storage_v2->restore(); + auto data_max_ids = data_storage_v2->restore(); + auto meta_max_ids = meta_storage_v2->restore(); assert(log_max_ids.size() == 1); assert(data_max_ids.size() == 1); @@ -208,37 +256,74 @@ void StoragePool::restore() max_log_page_id = log_max_ids[0]; max_data_page_id = data_max_ids[0]; max_meta_page_id = meta_max_ids[0]; + + break; } - else + case PageStorageRunMode::ONLY_V3: { - if (const auto & it = v3_log_max_ids.find(ns_id); it == v3_log_max_ids.end()) - { - max_log_page_id = 0; - } - else - { - max_log_page_id = it->second; - } + max_log_page_id = global_storage_pool->getLogMaxId(ns_id); + max_data_page_id = global_storage_pool->getDataMaxId(ns_id); + max_meta_page_id = global_storage_pool->getMetaMaxId(ns_id); - if (const auto & it = v3_data_max_ids.find(ns_id); it == v3_data_max_ids.end()) - { - max_data_page_id = 0; - } - else - { - max_data_page_id = it->second; - } + break; + } + case PageStorageRunMode::MIX_MODE: + { + auto v2_log_max_ids = log_storage_v2->restore(); + auto v2_data_max_ids = data_storage_v2->restore(); + auto v2_meta_max_ids = meta_storage_v2->restore(); - if (const auto & it = v3_meta_max_ids.find(ns_id); it == v3_meta_max_ids.end()) + assert(v2_log_max_ids.size() == 1); + assert(v2_data_max_ids.size() == 1); + assert(v2_meta_max_ids.size() == 1); + + // Check number of valid pages in v2 + // If V2 already have no any data in disk, Then change run_mode to ONLY_V3 + if (log_storage_v2->getNumberOfPages() == 0 && data_storage_v2->getNumberOfPages() == 0 && meta_storage_v2->getNumberOfPages() == 0) { - max_meta_page_id = 0; + // TODO: when `compaction` happend + // 1. Will rewrite meta into V3 by DT. + // 2. Need `DEL` meta in V2. + // 3. Need drop V2 in disk and check. + LOG_FMT_INFO(logger, "Current pagestorage change from {} to {}", static_cast(PageStorageRunMode::MIX_MODE), static_cast(PageStorageRunMode::ONLY_V3)); + + log_storage_v2 = nullptr; + data_storage_v2 = nullptr; + meta_storage_v2 = nullptr; + + log_storage_reader = std::make_shared(run_mode, ns_id, /*storage_v2_*/ nullptr, log_storage_v3, nullptr); + data_storage_reader = std::make_shared(run_mode, ns_id, /*storage_v2_*/ nullptr, data_storage_v3, nullptr); + meta_storage_reader = std::make_shared(run_mode, ns_id, /*storage_v2_*/ nullptr, meta_storage_v3, nullptr); + + log_storage_writer = std::make_shared(run_mode, /*storage_v2_*/ nullptr, log_storage_v3); + data_storage_writer = std::make_shared(run_mode, /*storage_v2_*/ nullptr, data_storage_v3); + meta_storage_writer = std::make_shared(run_mode, /*storage_v2_*/ nullptr, meta_storage_v3); + + max_log_page_id = global_storage_pool->getLogMaxId(ns_id); + max_data_page_id = global_storage_pool->getDataMaxId(ns_id); + max_meta_page_id = global_storage_pool->getMetaMaxId(ns_id); + + run_mode = PageStorageRunMode::ONLY_V3; } - else + else // Still running Mix Mode { - max_meta_page_id = it->second; + max_log_page_id = std::max(v2_log_max_ids[0], global_storage_pool->getLogMaxId(ns_id)); + max_data_page_id = std::max(v2_data_max_ids[0], global_storage_pool->getDataMaxId(ns_id)); + max_meta_page_id = std::max(v2_meta_max_ids[0], global_storage_pool->getMetaMaxId(ns_id)); } + break; + } + default: + throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast(run_mode)), ErrorCodes::LOGICAL_ERROR); } - // TODO: add a log to show max_*_page_id after mix mode pr ready. + LOG_FMT_TRACE(logger, "Finished StoragePool restore. [current_run_mode={}] [ns_id={}]" + " [max_log_page_id={}] [max_data_page_id={}] [max_meta_page_id={}]", + static_cast(run_mode), + ns_id, + max_log_page_id, + max_data_page_id, + max_meta_page_id); + return run_mode; } StoragePool::~StoragePool() @@ -248,13 +333,100 @@ StoragePool::~StoragePool() void StoragePool::enableGC() { - if (owned_storage) + // The data in V3 will be GCed by `GlobalStoragePool::gc`, only register gc task under only v2/mix mode + if (run_mode == PageStorageRunMode::ONLY_V2 || run_mode == PageStorageRunMode::MIX_MODE) + { gc_handle = global_context.getBackgroundPool().addTask([this] { return this->gc(global_context.getSettingsRef()); }); + } +} + +void StoragePool::dataRegisterExternalPagesCallbacks(const ExternalPageCallbacks & callbacks) +{ + switch (run_mode) + { + case PageStorageRunMode::ONLY_V2: + { + data_storage_v2->registerExternalPagesCallbacks(callbacks); + break; + } + case PageStorageRunMode::ONLY_V3: + { + data_storage_v3->registerExternalPagesCallbacks(callbacks); + break; + } + case PageStorageRunMode::MIX_MODE: + { + // When PageStorage run as Mix Mode. + // We need both get alive pages from V2 and V3 which will feedback for the DM. + // But V2 and V3 won't GC in the same time. So V3 need proxy V2 external pages callback. + // When V3 GC happend, scan the external pages from V3, in remover will scanner all of external pages from V2. + ExternalPageCallbacks mix_mode_callbacks; + + mix_mode_callbacks.scanner = callbacks.scanner; + mix_mode_callbacks.remover = [this, callbacks](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set & valid_ids) { + // ns_id won't used on V2 + auto v2_valid_page_ids = data_storage_v2->getAliveExternalPageIds(ns_id); + v2_valid_page_ids.insert(valid_ids.begin(), valid_ids.end()); + callbacks.remover(path_and_ids_vec, v2_valid_page_ids); + }; + mix_mode_callbacks.ns_id = ns_id; + data_storage_v3->registerExternalPagesCallbacks(mix_mode_callbacks); + break; + } + default: + throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast(run_mode)), ErrorCodes::LOGICAL_ERROR); + } +} + +void StoragePool::dataUnregisterExternalPagesCallbacks(NamespaceId ns_id) +{ + switch (run_mode) + { + case PageStorageRunMode::ONLY_V2: + { + data_storage_v2->unregisterExternalPagesCallbacks(ns_id); + break; + } + case PageStorageRunMode::ONLY_V3: + { + data_storage_v3->unregisterExternalPagesCallbacks(ns_id); + break; + } + case PageStorageRunMode::MIX_MODE: + { + // no need unregister callback in V2. + data_storage_v3->unregisterExternalPagesCallbacks(ns_id); + break; + } + default: + throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast(run_mode)), ErrorCodes::LOGICAL_ERROR); + } +} + + +bool StoragePool::doV2Gc(const Settings & settings) +{ + bool done_anything = false; + auto write_limiter = global_context.getWriteLimiter(); + auto read_limiter = global_context.getReadLimiter(); + + auto config = extractConfig(settings, StorageType::Meta); + meta_storage_v2->reloadSettings(config); + done_anything |= meta_storage_v2->gc(/*not_skip*/ false, write_limiter, read_limiter); + + config = extractConfig(settings, StorageType::Data); + data_storage_v2->reloadSettings(config); + done_anything |= data_storage_v2->gc(/*not_skip*/ false, write_limiter, read_limiter); + + config = extractConfig(settings, StorageType::Log); + log_storage_v2->reloadSettings(config); + done_anything |= log_storage_v2->gc(/*not_skip*/ false, write_limiter, read_limiter); + return done_anything; } bool StoragePool::gc(const Settings & settings, const Seconds & try_gc_period) { - if (!owned_storage) + if (run_mode == PageStorageRunMode::ONLY_V3) return false; { @@ -268,7 +440,8 @@ bool StoragePool::gc(const Settings & settings, const Seconds & try_gc_period) last_try_gc_time = now; } - return doStoragePoolGC(global_context, settings, *this); + // Only do the v2 GC + return doV2Gc(settings); } void StoragePool::shutdown() @@ -284,11 +457,11 @@ void StoragePool::drop() { shutdown(); - if (owned_storage) + if (run_mode == PageStorageRunMode::ONLY_V2 || run_mode == PageStorageRunMode::MIX_MODE) { - meta_storage->drop(); - data_storage->drop(); - log_storage->drop(); + meta_storage_v2->drop(); + data_storage_v2->drop(); + log_storage_v2->drop(); } } @@ -316,13 +489,63 @@ PageId StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, cons break; } // else there is a DTFile with that id, continue to acquire a new ID. - LOG_FMT_WARNING(&Poco::Logger::get(who), - "The DTFile is already exists, continute to acquire another ID. [path={}] [id={}]", + LOG_FMT_WARNING(logger, + "The DTFile is already exists, continute to acquire another ID. [call={}][path={}] [id={}]", + who, existed_path, dtfile_id); } while (true); return dtfile_id; } +template +inline static PageReader newReader(const PageStorageRunMode run_mode, const NamespaceId ns_id, T & storage_v2, T & storage_v3, ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id) +{ + switch (run_mode) + { + case PageStorageRunMode::ONLY_V2: + return PageReader(run_mode, ns_id, storage_v2, nullptr, snapshot_read ? storage_v2->getSnapshot(tracing_id) : nullptr, read_limiter); + case PageStorageRunMode::ONLY_V3: + return PageReader(run_mode, ns_id, nullptr, storage_v3, snapshot_read ? storage_v3->getSnapshot(tracing_id) : nullptr, read_limiter); + case PageStorageRunMode::MIX_MODE: + return PageReader(run_mode, ns_id, storage_v2, storage_v3, snapshot_read ? std::make_shared(storage_v2->getSnapshot(fmt::format("{}-v2", tracing_id)), // + storage_v3->getSnapshot(fmt::format("{}-v3", tracing_id))) + : nullptr, + read_limiter); + default: + throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast(run_mode)), ErrorCodes::LOGICAL_ERROR); + } +} + +PageReader StoragePool::newLogReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id) +{ + return newReader(run_mode, ns_id, log_storage_v2, log_storage_v3, read_limiter, snapshot_read, tracing_id); +} + +PageReader StoragePool::newLogReader(ReadLimiterPtr read_limiter, PageStorage::SnapshotPtr & snapshot) +{ + return PageReader(run_mode, ns_id, log_storage_v2, log_storage_v3, snapshot, read_limiter); +} + +PageReader StoragePool::newDataReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id) +{ + return newReader(run_mode, ns_id, data_storage_v2, data_storage_v3, read_limiter, snapshot_read, tracing_id); +} + +PageReader StoragePool::newDataReader(ReadLimiterPtr read_limiter, PageStorage::SnapshotPtr & snapshot) +{ + return PageReader(run_mode, ns_id, data_storage_v2, data_storage_v3, snapshot, read_limiter); +} + +PageReader StoragePool::newMetaReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id) +{ + return newReader(run_mode, ns_id, meta_storage_v2, meta_storage_v3, read_limiter, snapshot_read, tracing_id); +} + +PageReader StoragePool::newMetaReader(ReadLimiterPtr read_limiter, PageStorage::SnapshotPtr & snapshot) +{ + return PageReader(run_mode, ns_id, meta_storage_v2, meta_storage_v3, snapshot, read_limiter); +} + } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index 3a02232902b..5f439899a84 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -14,8 +14,10 @@ #pragma once +#include #include #include +#include #include #include @@ -31,8 +33,6 @@ namespace DM { class StoragePool; using StoragePoolPtr = std::shared_ptr; -class GlobalStoragePool; -using GlobalStoragePoolPtr = std::shared_ptr; static const std::chrono::seconds DELTA_MERGE_GC_PERIOD(60); @@ -45,32 +45,51 @@ class GlobalStoragePool : private boost::noncopyable GlobalStoragePool(const PathPool & path_pool, Context & global_ctx, const Settings & settings); - void restore(); - ~GlobalStoragePool(); - PageStoragePtr log() const { return log_storage; } - PageStoragePtr data() const { return data_storage; } - PageStoragePtr meta() const { return meta_storage; } + void restore(); + + friend class StoragePool; - std::map getLogMaxIds() const + PageId getLogMaxId(NamespaceId ns_id) const { - return log_max_ids; + PageId max_log_page_id = 0; + if (const auto & it = log_max_ids.find(ns_id); it != log_max_ids.end()) + { + max_log_page_id = it->second; + } + + return max_log_page_id; } - std::map getDataMaxIds() const + PageId getDataMaxId(NamespaceId ns_id) const { - return data_max_ids; + PageId max_data_page_id = 0; + if (const auto & it = data_max_ids.find(ns_id); it != data_max_ids.end()) + { + max_data_page_id = it->second; + } + + return max_data_page_id; } - std::map getMetaMaxIds() const + PageId getMetaMaxId(NamespaceId ns_id) const { - return meta_max_ids; + PageId max_meta_page_id = 0; + if (const auto & it = meta_max_ids.find(ns_id); it != meta_max_ids.end()) + { + max_meta_page_id = it->second; + } + + return max_meta_page_id; } + // GC immediately + // Only used on dbgFuncMisc + bool gc(); + private: - // TODO: maybe more frequent gc for GlobalStoragePool? - bool gc(const Settings & settings, const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD); + bool gc(const Settings & settings, bool immediately = false, const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD); private: PageStoragePtr log_storage; @@ -83,11 +102,10 @@ class GlobalStoragePool : private boost::noncopyable std::atomic last_try_gc_time = Clock::now(); - std::mutex mutex; - Context & global_context; BackgroundProcessingPool::TaskHandle gc_handle; }; +using GlobalStoragePoolPtr = std::shared_ptr; class StoragePool : private boost::noncopyable { @@ -96,39 +114,71 @@ class StoragePool : private boost::noncopyable using Timepoint = Clock::time_point; using Seconds = std::chrono::seconds; - StoragePool(const String & name, NamespaceId ns_id_, StoragePathPool & path_pool, Context & global_ctx, const Settings & settings); + StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPool & path_pool, const String & name = ""); - StoragePool(NamespaceId ns_id_, const GlobalStoragePool & global_storage_pool, Context & global_ctx); - - void restore(); + PageStorageRunMode restore(); ~StoragePool(); NamespaceId getNamespaceId() const { return ns_id; } - PageStoragePtr log() const { return log_storage; } - PageStoragePtr data() const { return data_storage; } - PageStoragePtr meta() const { return meta_storage; } + PageStorageRunMode getPageStorageRunMode() + { + return run_mode; + } - PageReader & logReader() { return log_storage_reader; } - PageReader & dataReader() { return data_storage_reader; } - PageReader & metaReader() { return meta_storage_reader; } + PageReaderPtr & logReader() + { + assert(log_storage_reader); + return log_storage_reader; + } - PageReader newLogReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id) + PageReaderPtr & dataReader() { - return PageReader(ns_id, log_storage, snapshot_read ? log_storage->getSnapshot(tracing_id) : nullptr, read_limiter); + assert(data_storage_reader); + return data_storage_reader; } - PageReader newDataReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id) + + PageReaderPtr & metaReader() { - return PageReader(ns_id, data_storage, snapshot_read ? data_storage->getSnapshot(tracing_id) : nullptr, read_limiter); + assert(meta_storage_reader); + return meta_storage_reader; } - PageReader newMetaReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id) + + PageWriterPtr & logWriter() { - return PageReader(ns_id, meta_storage, snapshot_read ? meta_storage->getSnapshot(tracing_id) : nullptr, read_limiter); + assert(log_storage_writer); + return log_storage_writer; } + PageWriterPtr & dataWriter() + { + assert(data_storage_writer); + return data_storage_writer; + } + + PageWriterPtr & metaWriter() + { + assert(meta_storage_writer); + return meta_storage_writer; + } + + + PageReader newLogReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id); + PageReader newLogReader(ReadLimiterPtr read_limiter, PageStorage::SnapshotPtr & snapshot); + + PageReader newDataReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id); + PageReader newDataReader(ReadLimiterPtr read_limiter, PageStorage::SnapshotPtr & snapshot); + + PageReader newMetaReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id); + PageReader newMetaReader(ReadLimiterPtr read_limiter, PageStorage::SnapshotPtr & snapshot); + void enableGC(); + void dataRegisterExternalPagesCallbacks(const ExternalPageCallbacks & callbacks); + + void dataUnregisterExternalPagesCallbacks(NamespaceId ns_id); + bool gc(const Settings & settings, const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD); void shutdown(); @@ -139,22 +189,37 @@ class StoragePool : private boost::noncopyable PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who); PageId maxMetaPageId() { return max_meta_page_id; } - PageId newLogPageId() { return ++max_log_page_id; } PageId newMetaPageId() { return ++max_meta_page_id; } - +#ifndef DBMS_PUBLIC_GTEST +private: +#endif + bool doV2Gc(const Settings & settings); +#ifndef DBMS_PUBLIC_GTEST private: +#endif + LoggerPtr logger; + + PageStorageRunMode run_mode; + // whether the three storage instance is owned by this StoragePool - const bool owned_storage = false; const NamespaceId ns_id; - const PageStoragePtr log_storage; - const PageStoragePtr data_storage; - const PageStoragePtr meta_storage; + PageStoragePtr log_storage_v2; + PageStoragePtr data_storage_v2; + PageStoragePtr meta_storage_v2; + + PageStoragePtr log_storage_v3; + PageStoragePtr data_storage_v3; + PageStoragePtr meta_storage_v3; - PageReader log_storage_reader; - PageReader data_storage_reader; - PageReader meta_storage_reader; + PageReaderPtr log_storage_reader; + PageReaderPtr data_storage_reader; + PageReaderPtr meta_storage_reader; + + PageWriterPtr log_storage_writer; + PageWriterPtr data_storage_writer; + PageWriterPtr meta_storage_writer; std::atomic last_try_gc_time = Clock::now(); @@ -162,11 +227,6 @@ class StoragePool : private boost::noncopyable Context & global_context; - // TBD: Will be replaced GlobalPathPoolPtr after mix mode ptr ready - std::map v3_log_max_ids; - std::map v3_data_max_ids; - std::map v3_meta_max_ids; - std::atomic max_log_page_id = 0; std::atomic max_data_page_id = 0; std::atomic max_meta_page_id = 0; diff --git a/dbms/src/Storages/DeltaMerge/WriteBatches.h b/dbms/src/Storages/DeltaMerge/WriteBatches.h index c64ff2cde38..2b508fed068 100644 --- a/dbms/src/Storages/DeltaMerge/WriteBatches.h +++ b/dbms/src/Storages/DeltaMerge/WriteBatches.h @@ -112,8 +112,8 @@ struct WriteBatches : private boost::noncopyable for (auto & w : data.getWrites()) data_write_pages.push_back(w.page_id); - storage_pool.log()->write(std::move(log), write_limiter); - storage_pool.data()->write(std::move(data), write_limiter); + storage_pool.logWriter()->write(std::move(log), write_limiter); + storage_pool.dataWriter()->write(std::move(data), write_limiter); for (auto page_id : log_write_pages) written_log.push_back(page_id); @@ -152,8 +152,8 @@ struct WriteBatches : private boost::noncopyable check(data_wb, "data_wb", logger); } - storage_pool.log()->write(std::move(log_wb), write_limiter); - storage_pool.data()->write(std::move(data_wb), write_limiter); + storage_pool.logWriter()->write(std::move(log_wb), write_limiter); + storage_pool.dataWriter()->write(std::move(data_wb), write_limiter); written_log.clear(); written_data.clear(); @@ -179,7 +179,7 @@ struct WriteBatches : private boost::noncopyable check(meta, "meta", logger); } - storage_pool.meta()->write(std::move(meta), write_limiter); + storage_pool.metaWriter()->write(std::move(meta), write_limiter); meta.clear(); } @@ -205,9 +205,9 @@ struct WriteBatches : private boost::noncopyable check(removed_meta, "removed_meta", logger); } - storage_pool.log()->write(std::move(removed_log), write_limiter); - storage_pool.data()->write(std::move(removed_data), write_limiter); - storage_pool.meta()->write(std::move(removed_meta), write_limiter); + storage_pool.logWriter()->write(std::move(removed_log), write_limiter); + storage_pool.dataWriter()->write(std::move(removed_data), write_limiter); + storage_pool.metaWriter()->write(std::move(removed_meta), write_limiter); removed_log.clear(); removed_data.clear(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index bb0e47bddbd..b759d1fe9ef 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -178,7 +178,7 @@ class DeltaMergeStoreRWTest return s; } - std::pair> genDMFile(DMContext & context, const Block & block) + std::pair genDMFile(DMContext & context, const Block & block) { auto input_stream = std::make_shared(block); auto [store_path, file_id] = store->preAllocateIngestFile(); @@ -1344,7 +1344,7 @@ try { auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); - std::vector file_ids; + PageIds file_ids; auto ingest_range = RowKeyRange::fromHandleRange(HandleRange{32, 256}); store->ingestFiles(dm_context, ingest_range, file_ids, /*clear_data_in_range*/ true); } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp index 93b366f3b15..2b64fd90c09 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp @@ -84,7 +84,7 @@ class DeltaValueSpaceTest : public DB::base::TiFlashStorageTestBasic { TiFlashStorageTestBasic::reload(std::move(db_settings)); storage_path_pool = std::make_unique(db_context->getPathPool().withTable("test", "t1", false)); - storage_pool = std::make_unique("test.t1", table_id, *storage_path_pool, *db_context, db_context->getSettingsRef()); + storage_pool = std::make_unique(*db_context, table_id, *storage_path_pool, "test.t1"); storage_pool->restore(); ColumnDefinesPtr cols = (!pre_define_columns) ? DMTestEnv::getDefaultColumns() : pre_define_columns; setColumns(cols); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index e6fb6f236f3..3ddf318509f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -105,7 +105,7 @@ class DMFile_Test parent_path = TiFlashStorageTestBasic::getTemporaryPath(); path_pool = std::make_unique(db_context->getPathPool().withTable("test", "DMFile_Test", false)); - storage_pool = std::make_unique("test.t1", /*table_id*/ 100, *path_pool, *db_context, db_context->getSettingsRef()); + storage_pool = std::make_unique(*db_context, /*ns_id*/ 100, *path_pool, "test.t1"); dm_file = DMFile::create(1, parent_path, single_file_mode, std::move(configuration)); table_columns_ = std::make_shared(); column_cache_ = std::make_shared(); @@ -1057,7 +1057,7 @@ class DMFile_Clustered_Index_Test : public DB::base::TiFlashStorageTestBasic auto configuration = mode == DMFileMode::DirectoryChecksum ? std::make_optional() : std::nullopt; path_pool = std::make_unique(db_context->getPathPool().withTable("test", "t", false)); - storage_pool = std::make_unique("test.t1", table_id, *path_pool, *db_context, DB::Settings()); + storage_pool = std::make_unique(*db_context, table_id, *path_pool, "test.t1"); dm_file = DMFile::create(0, path, single_file_mode, std::move(configuration)); table_columns_ = std::make_shared(); column_cache_ = std::make_shared(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 197ec73fdb5..6bf33465366 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -74,7 +74,7 @@ class Segment_test : public DB::base::TiFlashStorageTestBasic { TiFlashStorageTestBasic::reload(std::move(db_settings)); storage_path_pool = std::make_unique(db_context->getPathPool().withTable("test", "t1", false)); - storage_pool = std::make_unique("test.t1", /*table_id*/ 100, *storage_path_pool, *db_context, db_context->getSettingsRef()); + storage_pool = std::make_unique(*db_context, /*ns_id*/ 100, *storage_path_pool, "test.t1"); storage_pool->restore(); ColumnDefinesPtr cols = (!pre_define_columns) ? DMTestEnv::getDefaultColumns() : pre_define_columns; setColumns(cols); @@ -1280,7 +1280,7 @@ class Segment_test_2 : public Segment_test Segment_test::SetUp(); } - std::pair> genDMFile(DMContext & context, const Block & block) + std::pair genDMFile(DMContext & context, const Block & block) { auto delegator = context.path_pool.getStableDiskDelegator(); auto file_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp index 8fc5c145f81..1cc61663a2f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp @@ -53,7 +53,7 @@ class Segment_Common_Handle_test : public DB::base::TiFlashStorageTestBasic { TiFlashStorageTestBasic::reload(std::move(db_settings)); path_pool = std::make_unique(db_context->getPathPool().withTable("test", "t", false)); - storage_pool = std::make_unique("test.t1", /*table_id*/ 100, *path_pool, *db_context, db_context->getSettingsRef()); + storage_pool = std::make_unique(*db_context, /*table_id*/ 100, *path_pool, "test.t1"); storage_pool->restore(); if (!cols) cols = DMTestEnv::getDefaultColumns(is_common_handle ? DMTestEnv::PkType::CommonHandle : DMTestEnv::PkType::HiddenTiDBRowID); diff --git a/dbms/src/Storages/FormatVersion.h b/dbms/src/Storages/FormatVersion.h index 4095ffb1498..dbe3d9ee9bc 100644 --- a/dbms/src/Storages/FormatVersion.h +++ b/dbms/src/Storages/FormatVersion.h @@ -64,6 +64,11 @@ using Version = UInt32; inline static constexpr Version V1 = 1; // Support multiple thread-write && read with offset inside page. See FLASH_341 && FLASH-942 for details. inline static constexpr Version V2 = 2; +// Support multiple thread-write/read with offset inside page && support wal store meta && support space reused. +// If we do enabled PageFormat::V3, it is not means all data in Disk will be V3 format. +// - If we already have V2 data in disk. It will turn PageStorage into MIX_MODE +// - If we don't have any v2 data in disk. It will turn PageStorage into ONLY_V3 +inline static constexpr Version V3 = 3; } // namespace PageFormat struct StorageFormatVersion @@ -103,7 +108,17 @@ inline static const StorageFormatVersion STORAGE_FORMAT_V3 = StorageFormatVersio .identifier = 3, }; -inline StorageFormatVersion STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V3; + +inline static const StorageFormatVersion STORAGE_FORMAT_V4 = StorageFormatVersion{ + .segment = SegmentFormat::V2, + .dm_file = DMFileFormat::V2, + .stable = StableFormat::V1, + .delta = DeltaFormat::V3, + .page = PageFormat::V3, // diff + .identifier = 4, +}; + +inline StorageFormatVersion STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V4; inline const StorageFormatVersion & toStorageFormat(UInt64 setting) { @@ -115,6 +130,8 @@ inline const StorageFormatVersion & toStorageFormat(UInt64 setting) return STORAGE_FORMAT_V2; case 3: return STORAGE_FORMAT_V3; + case 4: + return STORAGE_FORMAT_V4; default: throw Exception("Illegal setting value: " + DB::toString(setting)); } diff --git a/dbms/src/Storages/Page/Page.h b/dbms/src/Storages/Page/Page.h index b36bf7245d6..0217fd82ccf 100644 --- a/dbms/src/Storages/Page/Page.h +++ b/dbms/src/Storages/Page/Page.h @@ -76,6 +76,20 @@ struct Page return ByteBuffer(data.begin() + beg, data.begin() + end); } + inline static PageFieldSizes fieldOffsetsToSizes(const PageFieldOffsetChecksums & field_offsets, size_t data_size) + { + PageFieldSizes field_size = {}; + + auto it = field_offsets.begin(); + while (it != field_offsets.end()) + { + PageFieldOffset beg = it->first; + ++it; + field_size.emplace_back(it == field_offsets.end() ? data_size - beg : it->first - beg); + } + return field_size; + } + size_t fieldSize() const { return field_offsets.size(); diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index 7f14d905858..cf8a9698a55 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -31,4 +31,596 @@ PageStoragePtr PageStorage::create( return std::make_shared(name, delegator, config, file_provider); } +/*************************** + * PageReaderImpl methods * + **************************/ + +class PageReaderImpl : private boost::noncopyable +{ +public: + static std::unique_ptr create( + PageStorageRunMode run_mode_, + NamespaceId ns_id_, + PageStoragePtr storage_v2_, + PageStoragePtr storage_v3_, + const PageStorage::SnapshotPtr & snap_, + ReadLimiterPtr read_limiter_); + + virtual ~PageReaderImpl() = default; + + virtual DB::Page read(PageId page_id) const = 0; + + virtual PageMap read(const PageIds & page_ids) const = 0; + + virtual void read(const PageIds & page_ids, PageHandler & handler) const = 0; + + using PageReadFields = PageStorage::PageReadFields; + virtual PageMap read(const std::vector & page_fields) const = 0; + + virtual PageId getNormalPageId(PageId page_id) const = 0; + + virtual PageEntry getPageEntry(PageId page_id) const = 0; + + virtual PageStorage::SnapshotPtr getSnapshot(const String & tracing_id) const = 0; + + // Get some statistics of all living snapshots and the oldest living snapshot. + virtual SnapshotsStatistics getSnapshotsStat() const = 0; + + virtual void traverse(const std::function & acceptor) const = 0; +}; + + +class PageReaderImplNormal : public PageReaderImpl +{ +public: + /// Not snapshot read. + explicit PageReaderImplNormal(NamespaceId ns_id_, PageStoragePtr storage_, ReadLimiterPtr read_limiter_) + : ns_id(ns_id_) + , storage(storage_) + , read_limiter(read_limiter_) + { + } + + /// Snapshot read. + PageReaderImplNormal(NamespaceId ns_id_, PageStoragePtr storage_, const PageStorage::SnapshotPtr & snap_, ReadLimiterPtr read_limiter_) + : ns_id(ns_id_) + , storage(storage_) + , snap(snap_) + , read_limiter(read_limiter_) + { + } + + DB::Page read(PageId page_id) const override + { + return storage->read(ns_id, page_id, read_limiter, snap); + } + + PageMap read(const PageIds & page_ids) const override + { + return storage->read(ns_id, page_ids, read_limiter, snap); + } + + void read(const PageIds & page_ids, PageHandler & handler) const override + { + storage->read(ns_id, page_ids, handler, read_limiter, snap); + } + + using PageReadFields = PageStorage::PageReadFields; + PageMap read(const std::vector & page_fields) const override + { + return storage->read(ns_id, page_fields, read_limiter, snap); + } + + PageId getNormalPageId(PageId page_id) const override + { + return storage->getNormalPageId(ns_id, page_id, snap); + } + + PageEntry getPageEntry(PageId page_id) const override + { + return storage->getEntry(ns_id, page_id, snap); + } + + PageStorage::SnapshotPtr getSnapshot(const String & tracing_id) const override + { + return storage->getSnapshot(tracing_id); + } + + // Get some statistics of all living snapshots and the oldest living snapshot. + SnapshotsStatistics getSnapshotsStat() const override + { + return storage->getSnapshotsStat(); + } + + void traverse(const std::function & acceptor) const override + { + storage->traverse(acceptor, nullptr); + } + +private: + NamespaceId ns_id; + PageStoragePtr storage; + PageStorage::SnapshotPtr snap; + ReadLimiterPtr read_limiter; +}; + + +class PageReaderImplMixed : public PageReaderImpl +{ +public: + /// Not snapshot read. + explicit PageReaderImplMixed(NamespaceId ns_id_, PageStoragePtr storage_v2_, PageStoragePtr storage_v3_, ReadLimiterPtr read_limiter_) + : ns_id(ns_id_) + , storage_v2(storage_v2_) + , storage_v3(storage_v3_) + , read_limiter(read_limiter_) + { + } + + /// Snapshot read. + PageReaderImplMixed(NamespaceId ns_id_, PageStoragePtr storage_v2_, PageStoragePtr storage_v3_, const PageStorage::SnapshotPtr & snap_, ReadLimiterPtr read_limiter_) + : ns_id(ns_id_) + , storage_v2(storage_v2_) + , storage_v3(storage_v3_) + , snap(snap_) + , read_limiter(read_limiter_) + { + } + + PageReaderImplMixed(NamespaceId ns_id_, PageStoragePtr storage_v2_, PageStoragePtr storage_v3_, PageStorage::SnapshotPtr && snap_, ReadLimiterPtr read_limiter_) + : ns_id(ns_id_) + , storage_v2(storage_v2_) + , storage_v3(storage_v3_) + , snap(std::move(snap_)) + , read_limiter(read_limiter_) + { + } + + DB::Page read(PageId page_id) const override + { + const auto & page_from_v3 = storage_v3->read(ns_id, page_id, read_limiter, toConcreteV3Snapshot(), false); + if (page_from_v3.isValid()) + { + return page_from_v3; + } + return storage_v2->read(ns_id, page_id, read_limiter, toConcreteV2Snapshot()); + } + + PageMap read(const PageIds & page_ids) const override + { + auto page_maps = storage_v3->read(ns_id, page_ids, read_limiter, toConcreteV3Snapshot(), false); + PageIds invalid_page_ids; + for (const auto & [query_page_id, page] : page_maps) + { + if (!page.isValid()) + { + invalid_page_ids.emplace_back(query_page_id); + } + } + + if (!invalid_page_ids.empty()) + { + const auto & page_maps_from_v2 = storage_v2->read(ns_id, invalid_page_ids, read_limiter, toConcreteV2Snapshot()); + for (const auto & [page_id_, page_] : page_maps_from_v2) + { + page_maps[page_id_] = page_; + } + } + + return page_maps; + } + + void read(const PageIds & page_ids, PageHandler & handler) const override + { + const auto & page_ids_not_found = storage_v3->read(ns_id, page_ids, handler, read_limiter, toConcreteV3Snapshot(), false); + storage_v2->read(ns_id, page_ids_not_found, handler, read_limiter, toConcreteV2Snapshot()); + } + + using PageReadFields = PageStorage::PageReadFields; + PageMap read(const std::vector & page_fields) const override + { + auto page_maps = storage_v3->read(ns_id, page_fields, read_limiter, toConcreteV3Snapshot(), false); + + std::vector invalid_page_fields; + + for (const auto & page_field : page_fields) + { + if (!page_maps[page_field.first].isValid()) + { + invalid_page_fields.emplace_back(page_field); + } + } + + if (!invalid_page_fields.empty()) + { + auto page_maps_from_v2 = storage_v2->read(ns_id, invalid_page_fields, read_limiter, toConcreteV2Snapshot()); + for (const auto & page_field : invalid_page_fields) + { + page_maps[page_field.first] = page_maps_from_v2[page_field.first]; + } + } + + return page_maps; + } + + PageId getNormalPageId(PageId page_id) const override + { + PageId resolved_page_id = storage_v3->getNormalPageId(ns_id, page_id, toConcreteV3Snapshot(), false); + if (resolved_page_id != INVALID_PAGE_ID) + { + return resolved_page_id; + } + return storage_v2->getNormalPageId(ns_id, page_id, toConcreteV2Snapshot()); + } + + PageEntry getPageEntry(PageId page_id) const override + { + PageEntry page_entry = storage_v3->getEntry(ns_id, page_id, toConcreteV3Snapshot()); + if (page_entry.file_id != INVALID_BLOBFILE_ID) + { + return page_entry; + } + return storage_v2->getEntry(ns_id, page_id, toConcreteV2Snapshot()); + } + + PageStorage::SnapshotPtr getSnapshot(const String & tracing_id) const override + { + return std::make_shared( + storage_v2->getSnapshot(fmt::format("{}-v2", tracing_id)), + storage_v3->getSnapshot(fmt::format("{}-v3", tracing_id))); + } + + // Get some statistics of all living snapshots and the oldest living snapshot. + SnapshotsStatistics getSnapshotsStat() const override + { + SnapshotsStatistics statistics_total; + const auto & statistics_from_v2 = storage_v2->getSnapshotsStat(); + const auto & statistics_from_v3 = storage_v3->getSnapshotsStat(); + + statistics_total.num_snapshots = statistics_from_v2.num_snapshots + statistics_from_v3.num_snapshots; + if (statistics_from_v2.longest_living_seconds > statistics_from_v3.longest_living_seconds) + { + statistics_total.longest_living_seconds = statistics_from_v2.longest_living_seconds; + statistics_total.longest_living_from_thread_id = statistics_from_v2.longest_living_from_thread_id; + statistics_total.longest_living_from_tracing_id = statistics_from_v2.longest_living_from_tracing_id; + } + else + { + statistics_total.longest_living_seconds = statistics_from_v3.longest_living_seconds; + statistics_total.longest_living_from_thread_id = statistics_from_v3.longest_living_from_thread_id; + statistics_total.longest_living_from_tracing_id = statistics_from_v3.longest_living_from_tracing_id; + } + + return statistics_total; + } + + void traverse(const std::function & acceptor) const override + { + // Used by RegionPersister::restore + // Must traverse storage_v3 before storage_v2 + storage_v3->traverse(acceptor, toConcreteV3Snapshot()); + storage_v2->traverse(acceptor, toConcreteV2Snapshot()); + } + +private: + PageStorage::SnapshotPtr toConcreteV3Snapshot() const + { + return snap ? toConcreteMixedSnapshot(snap)->getV3Snapshot() : snap; + } + + PageStorage::SnapshotPtr toConcreteV2Snapshot() const + { + return snap ? toConcreteMixedSnapshot(snap)->getV2Snapshot() : snap; + } + +private: + const NamespaceId ns_id; + PageStoragePtr storage_v2; + PageStoragePtr storage_v3; + PageStorage::SnapshotPtr snap; + ReadLimiterPtr read_limiter; +}; + +std::unique_ptr PageReaderImpl::create( + PageStorageRunMode run_mode_, + NamespaceId ns_id_, + PageStoragePtr storage_v2_, + PageStoragePtr storage_v3_, + const PageStorage::SnapshotPtr & snap_, + ReadLimiterPtr read_limiter_) +{ + switch (run_mode_) + { + case PageStorageRunMode::ONLY_V2: + { + return std::make_unique(ns_id_, storage_v2_, snap_, read_limiter_); + } + case PageStorageRunMode::ONLY_V3: + { + return std::make_unique(ns_id_, storage_v3_, snap_, read_limiter_); + } + case PageStorageRunMode::MIX_MODE: + { + return std::make_unique(ns_id_, storage_v2_, storage_v3_, snap_, read_limiter_); + } + default: + throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast(run_mode_)), ErrorCodes::LOGICAL_ERROR); + } +} + +/*********************** + * PageReader methods * + **********************/ +/// Not snapshot read. +PageReader::PageReader(const PageStorageRunMode & run_mode_, NamespaceId ns_id_, PageStoragePtr storage_v2_, PageStoragePtr storage_v3_, ReadLimiterPtr read_limiter_) + : impl(PageReaderImpl::create(run_mode_, ns_id_, storage_v2_, storage_v3_, /*snap_=*/nullptr, read_limiter_)) +{ +} + +/// Snapshot read. +PageReader::PageReader(const PageStorageRunMode & run_mode_, NamespaceId ns_id_, PageStoragePtr storage_v2_, PageStoragePtr storage_v3_, PageStorage::SnapshotPtr snap_, ReadLimiterPtr read_limiter_) + : impl(PageReaderImpl::create(run_mode_, ns_id_, storage_v2_, storage_v3_, std::move(snap_), read_limiter_)) +{ +} + +PageReader::~PageReader() = default; + +DB::Page PageReader::read(PageId page_id) const +{ + return impl->read(page_id); +} + +PageMap PageReader::read(const PageIds & page_ids) const +{ + return impl->read(page_ids); +} + +void PageReader::read(const PageIds & page_ids, PageHandler & handler) const +{ + impl->read(page_ids, handler); +} + +PageMap PageReader::read(const std::vector & page_fields) const +{ + return impl->read(page_fields); +} + +PageId PageReader::getNormalPageId(PageId page_id) const +{ + return impl->getNormalPageId(page_id); +} + +PageEntry PageReader::getPageEntry(PageId page_id) const +{ + return impl->getPageEntry(page_id); +} + +PageStorage::SnapshotPtr PageReader::getSnapshot(const String & tracing_id) const +{ + return impl->getSnapshot(tracing_id); +} + +// Get some statistics of all living snapshots and the oldest living snapshot. +SnapshotsStatistics PageReader::getSnapshotsStat() const +{ + return impl->getSnapshotsStat(); +} + +void PageReader::traverse(const std::function & acceptor) const +{ + impl->traverse(acceptor); +} + +/********************** + * PageWriter methods * + *********************/ + +void PageWriter::write(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const +{ + switch (run_mode) + { + case PageStorageRunMode::ONLY_V2: + { + writeIntoV2(std::move(write_batch), write_limiter); + break; + } + case PageStorageRunMode::ONLY_V3: + { + writeIntoV3(std::move(write_batch), write_limiter); + break; + } + case PageStorageRunMode::MIX_MODE: + { + writeIntoMixMode(std::move(write_batch), write_limiter); + break; + } + } +} + + +void PageWriter::writeIntoV2(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const +{ + storage_v2->write(std::move(write_batch), write_limiter); +} + +void PageWriter::writeIntoV3(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const +{ + storage_v3->write(std::move(write_batch), write_limiter); +} + +void PageWriter::writeIntoMixMode(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const +{ + const auto & ns_id = write_batch.getNamespaceId(); + WriteBatch wb_for_v2{ns_id}; + WriteBatch wb_for_put_v3{ns_id}; + + // If we do need copy entry from V2 into V3 + // We need hold mem from V2 pages after write. + std::list mem_holders; + + for (const auto & write : write_batch.getWrites()) + { + switch (write.type) + { + // PUT/PUT_EXTERNAL only for V3 + case WriteBatch::WriteType::PUT: + case WriteBatch::WriteType::PUT_EXTERNAL: + { + break; + } + // Both need del in v2 and v3 + case WriteBatch::WriteType::DEL: + { + wb_for_v2.copyWrite(write); + break; + } + case WriteBatch::WriteType::REF: + { + PageId resolved_page_id = storage_v3->getNormalPageId(ns_id, + write.ori_page_id, + /*snapshot*/ nullptr, + false); + // If the normal id is not found in v3, read from v2 and create a new put + ref + if (resolved_page_id == INVALID_PAGE_ID) + { + const auto & entry_for_put = storage_v2->getEntry(ns_id, write.ori_page_id, /*snapshot*/ {}); + if (entry_for_put.isValid()) + { + auto page_for_put = storage_v2->read(ns_id, write.ori_page_id); + + // Keep the mem hold, no need create new one. + mem_holders.emplace_back(page_for_put.mem_holder); + assert(entry_for_put.size == page_for_put.data.size()); + + // Page with fields + if (!entry_for_put.field_offsets.empty()) + { + wb_for_put_v3.putPage(write.ori_page_id, // + entry_for_put.tag, + std::make_shared(page_for_put.data.begin(), page_for_put.data.size()), + page_for_put.data.size(), + Page::fieldOffsetsToSizes(entry_for_put.field_offsets, entry_for_put.size)); + } + else + { // Normal page with fields + wb_for_put_v3.putPage(write.ori_page_id, // + entry_for_put.tag, + std::make_shared(page_for_put.data.begin(), + page_for_put.data.size()), + page_for_put.data.size()); + } + + LOG_FMT_INFO( + Logger::get("PageWriter"), + "Can't find the origin page in v3, migrate a new being ref page into V3 [page_id={}] [origin_id={}] [field_offsets={}]", + write.page_id, + write.ori_page_id, + entry_for_put.field_offsets.size()); + } + else + { + throw Exception(fmt::format("Can't find origin entry in V2 and V3, [ns_id={}, ori_page_id={}]", + ns_id, + write.ori_page_id), + ErrorCodes::LOGICAL_ERROR); + } + } + // else V3 found the origin one. + // Then do nothing. + break; + } + default: + { + throw Exception(fmt::format("Unknown write type: {}", write.type)); + } + } + } + + if (!wb_for_put_v3.empty()) + { + // The `writes` in wb_for_put_v3 must come before the `writes` in write_batch + wb_for_put_v3.copyWrites(write_batch.getWrites()); + storage_v3->write(std::move(wb_for_put_v3), write_limiter); + } + else + { + storage_v3->write(std::move(write_batch), write_limiter); + } + + if (!wb_for_v2.empty()) + { + storage_v2->write(std::move(wb_for_v2), write_limiter); + } +} + + +PageStorage::Config PageWriter::getSettings() const +{ + switch (run_mode) + { + case PageStorageRunMode::ONLY_V2: + { + return storage_v2->getSettings(); + } + case PageStorageRunMode::ONLY_V3: + { + return storage_v3->getSettings(); + } + case PageStorageRunMode::MIX_MODE: + { + throw Exception("Not support.", ErrorCodes::NOT_IMPLEMENTED); + } + default: + throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast(run_mode)), ErrorCodes::LOGICAL_ERROR); + } +} + +void PageWriter::reloadSettings(const PageStorage::Config & new_config) const +{ + switch (run_mode) + { + case PageStorageRunMode::ONLY_V2: + { + storage_v2->reloadSettings(new_config); + break; + } + case PageStorageRunMode::ONLY_V3: + { + storage_v3->reloadSettings(new_config); + break; + } + case PageStorageRunMode::MIX_MODE: + { + storage_v2->reloadSettings(new_config); + storage_v3->reloadSettings(new_config); + break; + } + default: + throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast(run_mode)), ErrorCodes::LOGICAL_ERROR); + } +}; + +bool PageWriter::gc(bool not_skip, const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter) const +{ + switch (run_mode) + { + case PageStorageRunMode::ONLY_V2: + { + return storage_v2->gc(not_skip, write_limiter, read_limiter); + } + case PageStorageRunMode::ONLY_V3: + { + return storage_v3->gc(not_skip, write_limiter, read_limiter); + } + case PageStorageRunMode::MIX_MODE: + { + bool ok = storage_v2->gc(not_skip, write_limiter, read_limiter); + ok |= storage_v3->gc(not_skip, write_limiter, read_limiter); + return ok; + } + default: + throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast(run_mode)), ErrorCodes::LOGICAL_ERROR); + } +} + } // namespace DB diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index 34d91dbd1ad..fe8946549cd 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -23,10 +24,12 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -34,6 +37,7 @@ #include #include + namespace DB { class FileProvider; @@ -45,6 +49,20 @@ using PSDiskDelegatorPtr = std::shared_ptr; class Context; class PageStorage; using PageStoragePtr = std::shared_ptr; +class RegionPersister; + +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} // namespace ErrorCodes + + +enum class PageStorageRunMode : UInt8 +{ + ONLY_V2 = 1, + ONLY_V3 = 2, + MIX_MODE = 3, +}; struct ExternalPageCallbacks { @@ -228,6 +246,8 @@ class PageStorage : private boost::noncopyable virtual size_t getNumberOfPages() = 0; + virtual std::set getAliveExternalPageIds(NamespaceId ns_id) = 0; + void write(WriteBatch && write_batch, const WriteLimiterPtr & write_limiter = nullptr) { writeImpl(std::move(write_batch), write_limiter); @@ -321,72 +341,81 @@ class PageStorage : private boost::noncopyable FileProviderPtr file_provider; }; - +// An impl class to hide the details for PageReaderImplMixed +class PageReaderImpl; +// A class to wrap read with a specify snapshot class PageReader : private boost::noncopyable { public: /// Not snapshot read. - explicit PageReader(NamespaceId ns_id_, PageStoragePtr storage_, ReadLimiterPtr read_limiter_) - : ns_id(ns_id_) - , storage(storage_) - , read_limiter(read_limiter_) - {} + explicit PageReader(const PageStorageRunMode & run_mode_, NamespaceId ns_id_, PageStoragePtr storage_v2_, PageStoragePtr storage_v3_, ReadLimiterPtr read_limiter_); + /// Snapshot read. - PageReader(NamespaceId ns_id_, PageStoragePtr storage_, const PageStorage::SnapshotPtr & snap_, ReadLimiterPtr read_limiter_) - : ns_id(ns_id_) - , storage(storage_) - , snap(snap_) - , read_limiter(read_limiter_) - {} - PageReader(NamespaceId ns_id_, PageStoragePtr storage_, PageStorage::SnapshotPtr && snap_, ReadLimiterPtr read_limiter_) - : ns_id(ns_id_) - , storage(storage_) - , snap(std::move(snap_)) - , read_limiter(read_limiter_) - {} - - DB::Page read(PageId page_id) const - { - return storage->read(ns_id, page_id, read_limiter, snap); - } + PageReader(const PageStorageRunMode & run_mode_, NamespaceId ns_id_, PageStoragePtr storage_v2_, PageStoragePtr storage_v3_, PageStorage::SnapshotPtr snap_, ReadLimiterPtr read_limiter_); - PageMap read(const std::vector & page_ids) const - { - return storage->read(ns_id, page_ids, read_limiter, snap); - } + ~PageReader(); - void read(const std::vector & page_ids, PageHandler & handler) const - { - storage->read(ns_id, page_ids, handler, read_limiter, snap); - } + DB::Page read(PageId page_id) const; + + PageMap read(const PageIds & page_ids) const; + + void read(const PageIds & page_ids, PageHandler & handler) const; using PageReadFields = PageStorage::PageReadFields; - PageMap read(const std::vector & page_fields) const - { - return storage->read(ns_id, page_fields, read_limiter, snap); - } + PageMap read(const std::vector & page_fields) const; - PageId getNormalPageId(PageId page_id) const - { - return storage->getNormalPageId(ns_id, page_id, snap); - } + PageId getNormalPageId(PageId page_id) const; - UInt64 getPageChecksum(PageId page_id) const - { - return storage->getEntry(ns_id, page_id, snap).checksum; - } + PageEntry getPageEntry(PageId page_id) const; + + PageStorage::SnapshotPtr getSnapshot(const String & tracing_id) const; + + // Get some statistics of all living snapshots and the oldest living snapshot. + SnapshotsStatistics getSnapshotsStat() const; - PageEntry getPageEntry(PageId page_id) const + void traverse(const std::function & acceptor) const; + +private: + std::unique_ptr impl; +}; +using PageReaderPtr = std::shared_ptr; + +class PageWriter : private boost::noncopyable +{ +public: + PageWriter(PageStorageRunMode run_mode_, PageStoragePtr storage_v2_, PageStoragePtr storage_v3_) + : run_mode(run_mode_) + , storage_v2(storage_v2_) + , storage_v3(storage_v3_) { - return storage->getEntry(ns_id, page_id, snap); } + void write(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const; + + friend class RegionPersister; + +private: + void writeIntoV2(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const; + + void writeIntoV3(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const; + + void writeIntoMixMode(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const; + + // A wrap of getSettings only used for `RegionPersister::gc` + PageStorage::Config getSettings() const; + + // A wrap of reloadSettings only used for `RegionPersister::gc` + void reloadSettings(const PageStorage::Config & new_config) const; + + // A wrap of gc only used for `RegionPersister::gc` + bool gc(bool not_skip, const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter) const; + private: - NamespaceId ns_id; - PageStoragePtr storage; - PageStorage::SnapshotPtr snap; - ReadLimiterPtr read_limiter; + PageStorageRunMode run_mode; + PageStoragePtr storage_v2; + PageStoragePtr storage_v3; }; +using PageWriterPtr = std::shared_ptr; } // namespace DB diff --git a/dbms/src/Storages/Page/Snapshot.h b/dbms/src/Storages/Page/Snapshot.h index 348d084382f..77e68f1b054 100644 --- a/dbms/src/Storages/Page/Snapshot.h +++ b/dbms/src/Storages/Page/Snapshot.h @@ -13,6 +13,7 @@ // limitations under the License. #pragma once +#include #include #include @@ -33,4 +34,37 @@ class PageStorageSnapshot }; using PageStorageSnapshotPtr = std::shared_ptr; +class PageStorageSnapshotMixed : public PageStorageSnapshot +{ +public: + // TODO: add/sub CurrentMetrics::PSMVCCNumSnapshots in here + PageStorageSnapshotMixed(const PageStorageSnapshotPtr & snapshot_v2_, const PageStorageSnapshotPtr & snapshot_v3_) + : snapshot_v2(snapshot_v2_) + , snapshot_v3(snapshot_v3_) + {} + + ~PageStorageSnapshotMixed() = default; + + PageStorageSnapshotPtr getV2Snapshot() + { + return snapshot_v2; + } + + PageStorageSnapshotPtr getV3Snapshot() + { + return snapshot_v3; + } + +private: + PageStorageSnapshotPtr snapshot_v2; + PageStorageSnapshotPtr snapshot_v3; +}; +using PageStorageSnapshotMixedPtr = std::shared_ptr; + +static inline PageStorageSnapshotMixedPtr +toConcreteMixedSnapshot(const PageStorageSnapshotPtr & ptr) +{ + return std::static_pointer_cast(ptr); +} + } // namespace DB diff --git a/dbms/src/Storages/Page/V2/PageFile.cpp b/dbms/src/Storages/Page/V2/PageFile.cpp index 60846685eb5..5f2ef7c26e0 100644 --- a/dbms/src/Storages/Page/V2/PageFile.cpp +++ b/dbms/src/Storages/Page/V2/PageFile.cpp @@ -134,7 +134,7 @@ std::pair genWriteData( // char * data_pos = data_buffer; PageUtil::put(meta_pos, meta_write_bytes); - PageUtil::put(meta_pos, STORAGE_FORMAT_CURRENT.page); + PageUtil::put(meta_pos, PageFormat::V2); PageUtil::put(meta_pos, wb.getSequence()); PageOffset page_data_file_off = page_file.getDataFileAppendPos(); diff --git a/dbms/src/Storages/Page/V2/PageStorage.cpp b/dbms/src/Storages/Page/V2/PageStorage.cpp index 4c47645e7d7..0452d0cc8ae 100644 --- a/dbms/src/Storages/Page/V2/PageStorage.cpp +++ b/dbms/src/Storages/Page/V2/PageStorage.cpp @@ -607,6 +607,19 @@ size_t PageStorage::getNumberOfPages() } } +std::set PageStorage::getAliveExternalPageIds(NamespaceId /*ns_id*/) +{ + const auto & concrete_snap = getConcreteSnapshot(); + if (concrete_snap) + { + return concrete_snap->version()->validNormalPageIds(); + } + else + { + throw Exception("Can't get concrete snapshot", ErrorCodes::LOGICAL_ERROR); + } +} + DB::Page PageStorage::readImpl(NamespaceId /*ns_id*/, PageId page_id, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) { if (!snapshot) diff --git a/dbms/src/Storages/Page/V2/PageStorage.h b/dbms/src/Storages/Page/V2/PageStorage.h index 1e32de40dd0..6276c5e5086 100644 --- a/dbms/src/Storages/Page/V2/PageStorage.h +++ b/dbms/src/Storages/Page/V2/PageStorage.h @@ -107,6 +107,8 @@ class PageStorage : public DB::PageStorage size_t getNumberOfPages() override; + std::set getAliveExternalPageIds(NamespaceId ns_id) override; + void writeImpl(DB::WriteBatch && wb, const WriteLimiterPtr & write_limiter) override; DB::PageEntry getEntryImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot) override; @@ -144,11 +146,11 @@ class PageStorage : public DB::PageStorage option.remove_tmp_files = false; auto page_files = listAllPageFiles(file_provider, delegator, log, option); if (page_files.empty()) - return STORAGE_FORMAT_CURRENT.page; + return PageFormat::V2; bool all_empty = true; PageFormat::Version max_binary_version = PageFormat::V1; - PageFormat::Version temp_version = STORAGE_FORMAT_CURRENT.page; + PageFormat::Version temp_version = PageFormat::V2; for (auto iter = page_files.rbegin(); iter != page_files.rend(); ++iter) { // Skip those files without valid meta @@ -167,7 +169,7 @@ class PageStorage : public DB::PageStorage LOG_FMT_DEBUG(log, "getMaxDataVersion done from {} [max version={}]", reader->toString(), max_binary_version); break; } - max_binary_version = (all_empty ? STORAGE_FORMAT_CURRENT.page : max_binary_version); + max_binary_version = (all_empty ? PageFormat::V2 : max_binary_version); return max_binary_version; } diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index 2c568d8e85b..3e58a3eb87e 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -380,6 +380,11 @@ void BlobStore::removePosFromStats(BlobFileId blob_id, BlobFileOffset offset, si void BlobStore::read(PageIDAndEntriesV3 & entries, const PageHandler & handler, const ReadLimiterPtr & read_limiter) { + if (entries.empty()) + { + return; + } + ProfileEvents::increment(ProfileEvents::PSMReadPages, entries.size()); // Sort in ascending order by offset in file. @@ -444,6 +449,11 @@ void BlobStore::read(PageIDAndEntriesV3 & entries, const PageHandler & handler, PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_limiter) { + if (to_read.empty()) + { + return {}; + } + ProfileEvents::increment(ProfileEvents::PSMReadPages, to_read.size()); // Sort in ascending order by offset in file. @@ -542,6 +552,11 @@ PageMap BlobStore::read(FieldReadInfos & to_read, const ReadLimiterPtr & read_li PageMap BlobStore::read(PageIDAndEntriesV3 & entries, const ReadLimiterPtr & read_limiter) { + if (entries.empty()) + { + return {}; + } + ProfileEvents::increment(ProfileEvents::PSMReadPages, entries.size()); // Sort in ascending order by offset in file. @@ -621,6 +636,13 @@ PageMap BlobStore::read(PageIDAndEntriesV3 & entries, const ReadLimiterPtr & rea Page BlobStore::read(const PageIDAndEntryV3 & id_entry, const ReadLimiterPtr & read_limiter) { + if (!id_entry.second.isValid()) + { + Page page_not_found; + page_not_found.page_id = INVALID_PAGE_ID; + return page_not_found; + } + const auto & [page_id_v3, entry] = id_entry; const size_t buf_size = entry.size; diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 2a272392b70..35e73e5c5ff 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -841,7 +841,7 @@ PageIdV3Internal PageDirectory::getNormalPageId(PageIdV3Internal page_id, const } else { - return PageIdV3Internal(0, INVALID_PAGE_ID); + return buildV3Id(0, INVALID_PAGE_ID); } } } @@ -878,7 +878,7 @@ PageIdV3Internal PageDirectory::getNormalPageId(PageIdV3Internal page_id, const } else { - return PageIdV3Internal(0, INVALID_PAGE_ID); + return buildV3Id(0, INVALID_PAGE_ID); } } diff --git a/dbms/src/Storages/Page/V3/PageEntry.h b/dbms/src/Storages/Page/V3/PageEntry.h index 22379611972..bcefc0e845b 100644 --- a/dbms/src/Storages/Page/V3/PageEntry.h +++ b/dbms/src/Storages/Page/V3/PageEntry.h @@ -45,6 +45,7 @@ struct PageEntryV3 { return size + padded_size; } + inline bool isValid() const { return file_id != INVALID_BLOBFILE_ID; } size_t getFieldSize(size_t index) const @@ -86,7 +87,13 @@ inline PageIdV3Internal buildV3Id(NamespaceId n_id, PageId p_id) inline String toDebugString(const PageEntryV3 & entry) { - return fmt::format("PageEntry{{file: {}, offset: 0x{:X}, size: {}, checksum: 0x{:X}}}", entry.file_id, entry.offset, entry.size, entry.checksum); + return fmt::format("PageEntry{{file: {}, offset: 0x{:X}, size: {}, checksum: 0x{:X}, tag: {}, field_offsets_size: {}}}", + entry.file_id, + entry.offset, + entry.size, + entry.checksum, + entry.tag, + entry.field_offsets.size()); } } // namespace PS::V3 diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index ab1ba0b04e1..de0a26f68c7 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -86,6 +86,11 @@ size_t PageStorageImpl::getNumberOfPages() return page_directory->numPages(); } +std::set PageStorageImpl::getAliveExternalPageIds(NamespaceId ns_id) +{ + return page_directory->getAliveExternalIds(ns_id); +} + void PageStorageImpl::writeImpl(DB::WriteBatch && write_batch, const WriteLimiterPtr & write_limiter) { if (unlikely(write_batch.empty())) @@ -134,13 +139,6 @@ DB::Page PageStorageImpl::readImpl(NamespaceId ns_id, PageId page_id, const Read } auto page_entry = throw_on_not_exist ? page_directory->get(buildV3Id(ns_id, page_id), snapshot) : page_directory->getOrNull(buildV3Id(ns_id, page_id), snapshot); - if (!page_entry.second.isValid()) - { - Page page_not_found; - page_not_found.page_id = INVALID_PAGE_ID; - return page_not_found; - } - return blob_store.read(page_entry, read_limiter); } @@ -153,7 +151,9 @@ PageMap PageStorageImpl::readImpl(NamespaceId ns_id, const PageIds & page_ids, c PageIdV3Internals page_id_v3s; for (auto p_id : page_ids) + { page_id_v3s.emplace_back(buildV3Id(ns_id, p_id)); + } if (throw_on_not_exist) { @@ -163,7 +163,8 @@ PageMap PageStorageImpl::readImpl(NamespaceId ns_id, const PageIds & page_ids, c else { auto [page_entries, page_ids_not_found] = page_directory->getOrNull(page_id_v3s, snapshot); - auto page_map = blob_store.read(page_entries, read_limiter); + PageMap page_map = blob_store.read(page_entries, read_limiter); + for (const auto & page_id_not_found : page_ids_not_found) { Page page_not_found; @@ -211,6 +212,7 @@ PageMap PageStorageImpl::readImpl(NamespaceId ns_id, const std::vectorget(buildV3Id(ns_id, page_id), snapshot) : page_directory->getOrNull(buildV3Id(ns_id, page_id), snapshot); + if (entry.isValid()) { auto info = BlobStore::FieldReadInfo(buildV3Id(ns_id, page_id), entry, field_indices); @@ -221,8 +223,8 @@ PageMap PageStorageImpl::readImpl(NamespaceId ns_id, const std::vectorgetAliveExternalIds(ns_id); + auto alive_external_ids = getAliveExternalPageIds(ns_id); callbacks.remover(pending_external_pages, alive_external_ids); } } diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.h b/dbms/src/Storages/Page/V3/PageStorageImpl.h index 7e86a110ab9..62c45ac685d 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.h +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.h @@ -73,6 +73,8 @@ class PageStorageImpl : public DB::PageStorage size_t getNumberOfPages() override; + std::set getAliveExternalPageIds(NamespaceId ns_id) override; + void writeImpl(DB::WriteBatch && write_batch, const WriteLimiterPtr & write_limiter) override; DB::PageEntry getEntryImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot) override; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp index c8facc39b80..adaa4e2ee08 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp @@ -328,6 +328,7 @@ try { for (const auto & [path, stats] : blob_store.blob_stats.getStats()) { + (void)path; for (const auto & stat : stats) { if (stat->id == file_id1) diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index a1cf73dd10a..4bd3b2832b0 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -190,7 +190,7 @@ try rate_target, LimiterType::UNKNOW); - std::vector page_ids; + PageIds page_ids; for (size_t i = 0; i < wb_nums; ++i) { page_ids.emplace_back(page_id + i); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp new file mode 100644 index 00000000000..d3fdafe57e8 --- /dev/null +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp @@ -0,0 +1,494 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +using namespace DM; +using namespace tests; +namespace PS::V3::tests +{ +class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic +{ +public: + static void SetUpTestCase() + { + auto path = TiFlashTestEnv::getTemporaryPath("PageStorageMixedTestV3Path"); + dropDataOnDisk(path); + createIfNotExist(path); + + std::vector caps = {}; + Strings paths = {path}; + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + + storage_path_pool_v3 = std::make_unique(Strings{path}, Strings{path}, Strings{}, std::make_shared(0, paths, caps, Strings{}, caps), global_context.getFileProvider(), true); + + global_context.setPageStorageRunMode(PageStorageRunMode::MIX_MODE); + if (!global_context.getGlobalStoragePool()) + global_context.initializeGlobalStoragePoolIfNeed(*storage_path_pool_v3); + } + + void SetUp() override + { + TiFlashStorageTestBasic::SetUp(); + const auto & path = getTemporaryPath(); + createIfNotExist(path); + + auto & global_context = DB::tests::TiFlashTestEnv::getGlobalContext(); + + std::vector caps = {}; + Strings paths = {path}; + + PathCapacityMetricsPtr cap_metrics = std::make_shared(0, paths, caps, Strings{}, caps); + storage_path_pool_v2 = std::make_unique(Strings{path}, Strings{path}, "test", "t1", true, cap_metrics, global_context.getFileProvider()); + + global_context.setPageStorageRunMode(PageStorageRunMode::ONLY_V2); + storage_pool_v2 = std::make_unique(global_context, TEST_NAMESPACE_ID, *storage_path_pool_v2, "test.t1"); + + global_context.setPageStorageRunMode(PageStorageRunMode::MIX_MODE); + storage_pool_mix = std::make_unique(global_context, + TEST_NAMESPACE_ID, + *storage_path_pool_v2, + "test.t1"); + + reloadV2StoragePool(); + } + + PageStorageRunMode reloadMixedStoragePool() + { + DB::tests::TiFlashTestEnv::getContext().setPageStorageRunMode(PageStorageRunMode::MIX_MODE); + PageStorageRunMode run_mode = storage_pool_mix->restore(); + page_writer_mix = storage_pool_mix->logWriter(); + page_reader_mix = storage_pool_mix->logReader(); + return run_mode; + } + + void reloadV2StoragePool() + { + DB::tests::TiFlashTestEnv::getContext().setPageStorageRunMode(PageStorageRunMode::ONLY_V2); + storage_pool_v2->restore(); + page_writer_v2 = storage_pool_v2->logWriter(); + page_reader_v2 = storage_pool_v2->logReader(); + } + +protected: + std::unique_ptr storage_path_pool_v2; + static std::unique_ptr storage_path_pool_v3; + std::unique_ptr storage_pool_v2; + std::unique_ptr storage_pool_mix; + + PageWriterPtr page_writer_v2; + PageWriterPtr page_writer_mix; + + PageReaderPtr page_reader_v2; + PageReaderPtr page_reader_mix; +}; + +std::unique_ptr PageStorageMixedTest::storage_path_pool_v3 = nullptr; + +inline ::testing::AssertionResult getPageCompare( + const char * /*buff_cmp_expr*/, + const char * buf_size_expr, + const char * /*page_cmp_expr*/, + const char * page_id_expr, + char * buff_cmp, + const size_t buf_size, + const Page & page_cmp, + const PageId & page_id) +{ + if (page_cmp.data.size() != buf_size) + { + return testing::internal::EqFailure( + DB::toString(buf_size).c_str(), + DB::toString(page_cmp.data.size()).c_str(), + buf_size_expr, + "page.data.size()", + false); + } + + if (page_cmp.page_id != page_id) + { + return testing::internal::EqFailure( + DB::toString(page_id).c_str(), + DB::toString(page_cmp.page_id).c_str(), + page_id_expr, + "page.page_id", + false); + } + + if (strncmp(page_cmp.data.begin(), buff_cmp, buf_size) != 0) + { + return ::testing::AssertionFailure( // + ::testing::Message( + "Page data not match the buffer")); + } + + return ::testing::AssertionSuccess(); +} + +#define ASSERT_PAGE_EQ(buff_cmp, buf_size, page_cmp, page_id) \ + ASSERT_PRED_FORMAT4(getPageCompare, buff_cmp, buf_size, page_cmp, page_id) +#define EXPECT_PAGE_EQ(buff_cmp, buf_size, page_cmp, page_id) \ + EXPECT_PRED_FORMAT4(getPageCompare, buff_cmp, buf_size, page_cmp, page_id) + +TEST_F(PageStorageMixedTest, WriteRead) +try +{ + UInt64 tag = 0; + const size_t buf_sz = 1024; + char c_buff[buf_sz]; + for (size_t i = 0; i < buf_sz; ++i) + { + c_buff[i] = i % 0xff; + } + + { + WriteBatch batch; + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1, tag, buff, buf_sz); + buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(2, tag, buff, buf_sz); + page_writer_v2->write(std::move(batch), nullptr); + } + + ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::MIX_MODE); + { + const auto & page1 = page_reader_mix->read(1); + const auto & page2 = page_reader_mix->read(2); + ASSERT_PAGE_EQ(c_buff, buf_sz, page1, 1); + ASSERT_PAGE_EQ(c_buff, buf_sz, page2, 2); + } + + { + WriteBatch batch; + const size_t buf_sz2 = 2048; + char c_buff2[buf_sz2] = {0}; + + ReadBufferPtr buff2 = std::make_shared(c_buff2, sizeof(c_buff2)); + batch.putPage(3, tag, buff2, buf_sz2); + page_writer_mix->write(std::move(batch), nullptr); + + const auto & page3 = page_reader_mix->read(3); + ASSERT_PAGE_EQ(c_buff2, buf_sz2, page3, 3); + reloadV2StoragePool(); + ASSERT_THROW(page_reader_v2->read(3), DB::Exception); + } + + { + // Revert v3 + WriteBatch batch; + batch.delPage(3); + page_writer_mix->write(std::move(batch), nullptr); + } +} +CATCH + +TEST_F(PageStorageMixedTest, Read) +try +{ + UInt64 tag = 0; + const size_t buf_sz = 1024; + char c_buff[buf_sz]; + for (size_t i = 0; i < buf_sz; ++i) + { + c_buff[i] = i % 0xff; + } + + { + WriteBatch batch; + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1, tag, buff, buf_sz); + buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(2, tag, buff, buf_sz, {20, 120, 400, 200, 15, 75, 170, 24}); + buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(7, tag, buff, buf_sz, {500, 500, 24}); + page_writer_v2->write(std::move(batch), nullptr); + } + + ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::MIX_MODE); + { + const auto & page1 = page_reader_mix->read(1); + const auto & page2 = page_reader_mix->read(2); + ASSERT_PAGE_EQ(c_buff, buf_sz, page1, 1); + ASSERT_PAGE_EQ(c_buff, buf_sz, page2, 2); + } + + const size_t buf_sz2 = 2048; + char c_buff2[buf_sz2] = {0}; + { + WriteBatch batch; + ReadBufferPtr buff2 = std::make_shared(c_buff2, sizeof(c_buff2)); + batch.putPage(3, tag, buff2, buf_sz2); + buff2 = std::make_shared(c_buff2, sizeof(c_buff2)); + batch.putPage(4, tag, buff2, buf_sz2, {20, 120, 400, 200, 15, 75, 170, 24, 500, 500, 24}); + page_writer_mix->write(std::move(batch), nullptr); + } + + { + PageIds page_ids = {1, 2, 3, 4}; + auto page_maps = page_reader_mix->read(page_ids); + ASSERT_EQ(page_maps.size(), 4); + ASSERT_PAGE_EQ(c_buff, buf_sz, page_maps[1], 1); + ASSERT_PAGE_EQ(c_buff, buf_sz, page_maps[2], 2); + ASSERT_PAGE_EQ(c_buff2, buf_sz2, page_maps[3], 3); + ASSERT_PAGE_EQ(c_buff2, buf_sz2, page_maps[4], 4); + + // Read page ids which only exited in V2 + page_ids = {1, 2, 7}; + page_maps = page_reader_mix->read(page_ids); + ASSERT_EQ(page_maps.size(), 3); + ASSERT_PAGE_EQ(c_buff, buf_sz, page_maps[1], 1); + ASSERT_PAGE_EQ(c_buff, buf_sz, page_maps[2], 2); + ASSERT_PAGE_EQ(c_buff, buf_sz, page_maps[7], 7); + } + + { + PageIds page_ids = {1, 2, 3, 4}; + PageHandler hander = [](DB::PageId /*page_id*/, const Page & /*page*/) { + }; + ASSERT_NO_THROW(page_reader_mix->read(page_ids, hander)); + + // Read page ids which only exited in V2 + page_ids = {1, 2, 7}; + ASSERT_NO_THROW(page_reader_mix->read(page_ids, hander)); + } + + { + std::vector read_fields; + read_fields.emplace_back(std::make_pair(2, {1, 3, 6})); + read_fields.emplace_back(std::make_pair(4, {1, 3, 4, 8, 10})); + read_fields.emplace_back(std::make_pair(7, {0, 1, 2})); + PageMap page_maps = page_reader_mix->read(read_fields); + ASSERT_EQ(page_maps.size(), 3); + ASSERT_EQ(page_maps[2].page_id, 2); + ASSERT_EQ(page_maps[2].field_offsets.size(), 3); + ASSERT_EQ(page_maps[4].page_id, 4); + ASSERT_EQ(page_maps[4].field_offsets.size(), 5); + ASSERT_EQ(page_maps[7].page_id, 7); + ASSERT_EQ(page_maps[7].field_offsets.size(), 3); + } + + { + // Read page ids which only exited in V2 + std::vector read_fields; + read_fields.emplace_back(std::make_pair(2, {1, 3, 6})); + ASSERT_NO_THROW(page_reader_mix->read(read_fields)); + } + + { + // Revert v3 + WriteBatch batch; + batch.delPage(3); + batch.delPage(4); + page_writer_mix->write(std::move(batch), nullptr); + } +} +CATCH + +TEST_F(PageStorageMixedTest, ReadWithSnapshot) +try +{ + UInt64 tag = 0; + const size_t buf_sz = 1024; + char c_buff[buf_sz]; + for (size_t i = 0; i < buf_sz; ++i) + { + c_buff[i] = i % 0xff; + } + + { + WriteBatch batch; + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1, tag, buff, buf_sz); + buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(2, tag, buff, buf_sz, {20, 120, 400, 200, 15, 75, 170, 24}); + page_writer_v2->write(std::move(batch), nullptr); + } + + ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::MIX_MODE); + const size_t buf_sz2 = 2048; + char c_buff2[buf_sz2] = {0}; + { + WriteBatch batch; + ReadBufferPtr buff2 = std::make_shared(c_buff2, sizeof(c_buff2)); + batch.putPage(3, tag, buff2, buf_sz2); + page_writer_mix->write(std::move(batch), nullptr); + } + + auto snapshot_mix = page_reader_mix->getSnapshot("ReadWithSnapshotTest"); + + { + auto page_reader_mix_with_snap = storage_pool_mix->newLogReader(nullptr, snapshot_mix); + + const auto & page1 = page_reader_mix_with_snap.read(1); + const auto & page2 = page_reader_mix_with_snap.read(2); + const auto & page3 = page_reader_mix_with_snap.read(3); + ASSERT_PAGE_EQ(c_buff, buf_sz, page1, 1); + ASSERT_PAGE_EQ(c_buff, buf_sz, page2, 2); + ASSERT_PAGE_EQ(c_buff2, buf_sz2, page3, 3); + } + + { + auto page_reader_mix_with_snap = storage_pool_mix->newLogReader(nullptr, true, "ReadWithSnapshotTest"); + const auto & page1 = page_reader_mix_with_snap.read(1); + const auto & page2 = page_reader_mix_with_snap.read(2); + const auto & page3 = page_reader_mix_with_snap.read(3); + ASSERT_PAGE_EQ(c_buff, buf_sz, page1, 1); + ASSERT_PAGE_EQ(c_buff, buf_sz, page2, 2); + ASSERT_PAGE_EQ(c_buff2, buf_sz2, page3, 3); + } + + { + WriteBatch batch; + ReadBufferPtr buff2 = std::make_shared(c_buff2, sizeof(c_buff2)); + batch.putPage(4, tag, buff2, buf_sz2); + page_writer_mix->write(std::move(batch), nullptr); + } + { + auto page_reader_mix_with_snap = storage_pool_mix->newLogReader(nullptr, snapshot_mix); + ASSERT_THROW(page_reader_mix_with_snap.read(4), DB::Exception); + } + + { + // Revert v3 + WriteBatch batch; + batch.delPage(3); + batch.delPage(4); + page_writer_mix->write(std::move(batch), nullptr); + } +} +CATCH + + +TEST_F(PageStorageMixedTest, PutExt) +try +{ + { + WriteBatch batch; + batch.putExternal(1, 0); + batch.putExternal(2, 0); + batch.putExternal(3, 0); + page_writer_v2->write(std::move(batch), nullptr); + } + + ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::MIX_MODE); +} +CATCH + + +TEST_F(PageStorageMixedTest, Del) +try +{ + const size_t buf_sz = 1024; + char c_buff[buf_sz] = {0}; + + { + WriteBatch batch; + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1, 0, buff, buf_sz); + page_writer_v2->write(std::move(batch), nullptr); + } + + ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::MIX_MODE); + + { + WriteBatch batch; + batch.delPage(1); + ASSERT_NO_THROW(page_writer_mix->write(std::move(batch), nullptr)); + } + + reloadV2StoragePool(); + ASSERT_THROW(page_reader_v2->read(1), DB::Exception); +} +CATCH + +TEST_F(PageStorageMixedTest, Ref) +try +{ + const size_t buf_sz = 1024; + char c_buff[buf_sz] = {0}; + + { + WriteBatch batch; + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(7, 0, buff, buf_sz); + buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(8, 0, buff, buf_sz, {20, 120, 400, 200, 15, 75, 170, 24}); + page_writer_v2->write(std::move(batch), nullptr); + } + + { + ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::MIX_MODE); + const auto & entry = page_reader_mix->getPageEntry(8); + ASSERT_EQ(entry.field_offsets.size(), 8); + } + + { + WriteBatch batch; + batch.putRefPage(9, 7); + // ASSERT_NO_THROW(page_writer_mix->write(std::move(batch), nullptr)); + page_writer_mix->write(std::move(batch), nullptr); + ASSERT_EQ(page_reader_mix->getNormalPageId(9), 7); + } + + reloadV2StoragePool(); + ASSERT_THROW(page_reader_v2->read(9), DB::Exception); + + { + WriteBatch batch; + batch.putRefPage(10, 8); + + ASSERT_NO_THROW(page_writer_mix->write(std::move(batch), nullptr)); + ASSERT_EQ(page_reader_mix->getNormalPageId(10), 8); + + std::vector read_fields; + read_fields.emplace_back(std::pair(10, {0, 1, 2, 6})); + + PageMap page_maps = page_reader_mix->read(read_fields); + ASSERT_EQ(page_maps.size(), 1); + ASSERT_EQ(page_maps[10].page_id, 10); + ASSERT_EQ(page_maps[10].field_offsets.size(), 4); + ASSERT_EQ(page_maps[10].data.size(), 710); + + auto field_offset = page_maps[10].field_offsets; + auto it = field_offset.begin(); + ASSERT_EQ(it->offset, 0); + ++it; + ASSERT_EQ(it->offset, 20); + ++it; + ASSERT_EQ(it->offset, 140); + ++it; + ASSERT_EQ(it->offset, 540); + } + + { + // Revert v3 + WriteBatch batch; + batch.delPage(9); + batch.delPage(10); + page_writer_mix->write(std::move(batch), nullptr); + } +} +CATCH + +} // namespace PS::V3::tests +} // namespace DB diff --git a/dbms/src/Storages/Page/WriteBatch.h b/dbms/src/Storages/Page/WriteBatch.h index bde03c4de57..30180874fe6 100644 --- a/dbms/src/Storages/Page/WriteBatch.h +++ b/dbms/src/Storages/Page/WriteBatch.h @@ -184,6 +184,19 @@ class WriteBatch : private boost::noncopyable std::swap(o.sequence, sequence); } + void copyWrite(const Write write) + { + writes.emplace_back(write); + } + + void copyWrites(const Writes & writes_) + { + for (const auto & write_ : writes_) + { + copyWrite(write_); + } + } + void clear() { Writes tmp; diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index 5a542ba0eea..0ae42ec7313 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -24,7 +25,6 @@ #include #include #include -#include #include #include @@ -64,7 +64,7 @@ PathPool::PathPool( , enable_raft_compatible_mode(enable_raft_compatible_mode_) , global_capacity(global_capacity_) , file_provider(file_provider_) - , log(&Poco::Logger::get("PathPool")) + , log(Logger::get("PathPool")) { if (kvstore_paths.empty()) { @@ -134,7 +134,7 @@ StoragePathPool::StoragePathPool( // , path_need_database_name(path_need_database_name_) , global_capacity(std::move(global_capacity_)) , file_provider(std::move(file_provider_)) - , log(&Poco::Logger::get("StoragePathPool")) + , log(Logger::get("StoragePathPool")) { if (unlikely(database.empty() || table.empty())) throw Exception(fmt::format("Can NOT create StoragePathPool [database={}] [table={}]", database, table), ErrorCodes::LOGICAL_ERROR); @@ -240,6 +240,7 @@ void StoragePathPool::drop(bool recursive, bool must_success) { if (Poco::File dir(path_info.path); dir.exists()) { + LOG_FMT_INFO(log, "Begin to drop [dir={}] from main_path_infos", path_info.path); file_provider->deleteDirectory(dir.path(), false, recursive); // update global used size @@ -269,6 +270,7 @@ void StoragePathPool::drop(bool recursive, bool must_success) { if (Poco::File dir(path_info.path); dir.exists()) { + LOG_FMT_INFO(log, "Begin to drop [dir={}] from latest_path_infos", path_info.path); file_provider->deleteDirectory(dir.path(), false, recursive); // When PageStorage is dropped, it will update the size in global_capacity. @@ -318,7 +320,7 @@ String genericChoosePath(const std::vector & paths, // const PathCapacityMetricsPtr & global_capacity, // std::function & paths, size_t idx)> path_generator, // std::function path_getter, // - Poco::Logger * log, // + LoggerPtr log, // const String & log_msg) { if (paths.size() == 1) diff --git a/dbms/src/Storages/PathPool.h b/dbms/src/Storages/PathPool.h index fd7b70a4bbd..40a3cb9a636 100644 --- a/dbms/src/Storages/PathPool.h +++ b/dbms/src/Storages/PathPool.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -109,7 +110,7 @@ class PathPool FileProviderPtr file_provider; - Poco::Logger * log; + LoggerPtr log; }; class StableDiskDelegator : private boost::noncopyable @@ -453,7 +454,7 @@ class StoragePathPool FileProviderPtr file_provider; - Poco::Logger * log; + LoggerPtr log; }; } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index b5b8b62b7ca..77901922e2f 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -42,11 +43,11 @@ extern const char force_disable_region_persister_compatible_mode[]; void RegionPersister::drop(RegionID region_id, const RegionTaskLock &) { - if (page_storage) + if (page_writer) { DB::WriteBatch wb_v2{ns_id}; wb_v2.delPage(region_id); - page_storage->write(std::move(wb_v2), global_context.getWriteLimiter()); + page_writer->write(std::move(wb_v2), global_context.getWriteLimiter()); } else { @@ -65,7 +66,7 @@ void RegionPersister::computeRegionWriteBuffer(const Region & region, RegionCach if (unlikely(region_size > static_cast(std::numeric_limits::max()))) { LOG_FMT_WARNING( - &Poco::Logger::get("RegionPersister"), + Logger::get("RegionPersister"), "Persisting big region: {} with data info: {}, serialized size {}", region.toString(), region.dataInfo(), @@ -101,9 +102,9 @@ void RegionPersister::doPersist(RegionCacheWriteElement & region_write_buffer, c std::lock_guard lock(mutex); - if (page_storage) + if (page_reader) { - auto entry = page_storage->getEntry(ns_id, region_id, nullptr); + auto entry = page_reader->getPageEntry(region_id); if (entry.isValid() && entry.tag > applied_index) return; } @@ -121,11 +122,11 @@ void RegionPersister::doPersist(RegionCacheWriteElement & region_write_buffer, c } auto read_buf = buffer.tryGetReadBuffer(); - if (page_storage) + if (page_writer) { DB::WriteBatch wb{ns_id}; wb.putPage(region_id, applied_index, read_buf, region_size); - page_storage->write(std::move(wb), global_context.getWriteLimiter()); + page_writer->write(std::move(wb), global_context.getWriteLimiter()); } else { @@ -138,9 +139,19 @@ void RegionPersister::doPersist(RegionCacheWriteElement & region_write_buffer, c RegionPersister::RegionPersister(Context & global_context_, const RegionManager & region_manager_) : global_context(global_context_) , region_manager(region_manager_) - , log(&Poco::Logger::get("RegionPersister")) + , log(Logger::get("RegionPersister")) {} +PageStorage::Config RegionPersister::getPageStorageSettings() const +{ + if (!page_writer) + { + throw Exception("Not support for PS v1", ErrorCodes::LOGICAL_ERROR); + } + + return page_writer->getSettings(); +} + PS::V1::PageStorage::Config getV1PSConfig(const PS::V2::PageStorage::Config & config) { PS::V1::PageStorage::Config c; @@ -165,21 +176,11 @@ RegionMap RegionPersister::restore(const TiFlashRaftProxyHelper * proxy_helper, auto & path_pool = global_context.getPathPool(); auto delegator = path_pool.getPSDiskDelegatorRaft(); auto provider = global_context.getFileProvider(); - // If the GlobalStoragePool is initialized, then use v3 format - bool use_v3_format = global_context.getGlobalStoragePool() != nullptr; - if (use_v3_format) - { - mergeConfigFromSettings(global_context.getSettingsRef(), config); + auto run_mode = global_context.getPageStorageRunMode(); - LOG_FMT_INFO(log, "RegionPersister running in v3 mode"); - page_storage = std::make_unique( // - "RegionPersister", - delegator, - config, - provider); - page_storage->restore(); - } - else + switch (run_mode) + { + case PageStorageRunMode::ONLY_V2: { // If there is no PageFile with basic version binary format, use version 2 of PageStorage. auto detect_binary_version = DB::PS::V2::PageStorage::getMaxDataVersion(provider, delegator); @@ -193,13 +194,14 @@ RegionMap RegionPersister::restore(const TiFlashRaftProxyHelper * proxy_helper, mergeConfigFromSettings(global_context.getSettingsRef(), config); config.num_write_slots = 4; // extend write slots to 4 at least - LOG_FMT_INFO(log, "RegionPersister running in v2 mode"); - page_storage = std::make_unique( + auto page_storage_v2 = std::make_shared( "RegionPersister", delegator, config, provider); - page_storage->restore(); + page_storage_v2->restore(); + page_writer = std::make_shared(run_mode, page_storage_v2, /*storage_v3_*/ nullptr); + page_reader = std::make_shared(run_mode, ns_id, page_storage_v2, /*storage_v3_*/ nullptr, /*readlimiter*/ global_context.getReadLimiter()); } else { @@ -211,20 +213,79 @@ RegionMap RegionPersister::restore(const TiFlashRaftProxyHelper * proxy_helper, c, provider); } + break; } + case PageStorageRunMode::ONLY_V3: + { + mergeConfigFromSettings(global_context.getSettingsRef(), config); + + auto page_storage_v3 = std::make_shared( // + "RegionPersister", + path_pool.getPSDiskDelegatorGlobalMulti("kvstore"), + config, + provider); + page_storage_v3->restore(); + page_writer = std::make_shared(run_mode, /*storage_v2_*/ nullptr, page_storage_v3); + page_reader = std::make_shared(run_mode, ns_id, /*storage_v2_*/ nullptr, page_storage_v3, global_context.getReadLimiter()); + break; + } + case PageStorageRunMode::MIX_MODE: + { + auto page_storage_v2 = std::make_shared( + "RegionPersister", + delegator, + config, + provider); + // V3 should not used getPSDiskDelegatorRaft + // Because V2 will delete all invalid(unrecognized) file when it restore + auto page_storage_v3 = std::make_shared( // + "RegionPersister", + path_pool.getPSDiskDelegatorGlobalMulti("kvstore"), + config, + provider); + + page_storage_v2->restore(); + page_storage_v3->restore(); + + if (page_storage_v2->getNumberOfPages() == 0) + { + page_storage_v2 = nullptr; + run_mode = PageStorageRunMode::ONLY_V3; + page_writer = std::make_shared(run_mode, /*storage_v2_*/ nullptr, page_storage_v3); + page_reader = std::make_shared(run_mode, ns_id, /*storage_v2_*/ nullptr, page_storage_v3, global_context.getReadLimiter()); + } + else + { + page_writer = std::make_shared(run_mode, page_storage_v2, page_storage_v3); + page_reader = std::make_shared(run_mode, ns_id, page_storage_v2, page_storage_v3, global_context.getReadLimiter()); + } + break; + } + } + + LOG_FMT_INFO(log, "RegionPersister running. Current Run Mode is {}", static_cast(run_mode)); } RegionMap regions; - if (page_storage) + if (page_reader) { auto acceptor = [&](const DB::Page & page) { + // We will traverse the pages in V3 before traverse the pages in V2 When we used MIX MODE + // If we found the page_id has been restored, just skip it. + if (const auto it = regions.find(page.page_id); it != regions.end()) + { + LOG_FMT_INFO(log, "Already exist [page_id={}], skip it.", page.page_id); + return; + } + ReadBufferFromMemory buf(page.data.begin(), page.data.size()); auto region = Region::deserialize(buf, proxy_helper); if (page.page_id != region->id()) throw Exception("region id and page id not match!", ErrorCodes::LOGICAL_ERROR); + regions.emplace(page.page_id, region); }; - page_storage->traverse(acceptor, nullptr); + page_reader->traverse(acceptor); } else { @@ -243,11 +304,11 @@ RegionMap RegionPersister::restore(const TiFlashRaftProxyHelper * proxy_helper, bool RegionPersister::gc() { - if (page_storage) + if (page_writer) { PageStorage::Config config = getConfigFromSettings(global_context.getSettingsRef()); - page_storage->reloadSettings(config); - return page_storage->gc(false, nullptr, nullptr); + page_writer->reloadSettings(config); + return page_writer->gc(false, nullptr, nullptr); } else return stable_page_storage->gc(); diff --git a/dbms/src/Storages/Transaction/RegionPersister.h b/dbms/src/Storages/Transaction/RegionPersister.h index 9341ded6f76..a4f611cbdbb 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.h +++ b/dbms/src/Storages/Transaction/RegionPersister.h @@ -14,11 +14,11 @@ #pragma once +#include #include #include #include #include -#include namespace DB { @@ -55,6 +55,8 @@ class RegionPersister final : private boost::noncopyable using RegionCacheWriteElement = std::tuple; static void computeRegionWriteBuffer(const Region & region, RegionCacheWriteElement & region_write_buffer); + PageStorage::Config getPageStorageSettings() const; + #ifndef DBMS_PUBLIC_GTEST private: #endif @@ -67,12 +69,14 @@ class RegionPersister final : private boost::noncopyable #endif Context & global_context; - PageStoragePtr page_storage; + PageWriterPtr page_writer; + PageReaderPtr page_reader; + std::shared_ptr stable_page_storage; NamespaceId ns_id = KVSTORE_NAMESPACE_ID; const RegionManager & region_manager; std::mutex mutex; - Poco::Logger * log; + LoggerPtr log; }; } // namespace DB diff --git a/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp index 55813edcbeb..16a35f42da1 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp @@ -309,7 +309,8 @@ try // Force to run in compatible mode FailPointHelper::enableFailPoint(FailPoints::force_enable_region_persister_compatible_mode); persister.restore(nullptr, config); - ASSERT_EQ(persister.page_storage, nullptr); + ASSERT_EQ(persister.page_writer, nullptr); + ASSERT_EQ(persister.page_reader, nullptr); ASSERT_NE(persister.stable_page_storage, nullptr); for (size_t i = 0; i < region_num; ++i) @@ -330,7 +331,8 @@ try RegionPersister persister(ctx, region_manager); // restore normally, should run in compatible mode. RegionMap new_regions = persister.restore(nullptr, config); - ASSERT_EQ(persister.page_storage, nullptr); + ASSERT_EQ(persister.page_writer, nullptr); + ASSERT_EQ(persister.page_reader, nullptr); ASSERT_NE(persister.stable_page_storage, nullptr); // Try to read for (size_t i = 0; i < region_num; ++i) @@ -349,7 +351,8 @@ try // Force to run in normal mode FailPointHelper::enableFailPoint(FailPoints::force_disable_region_persister_compatible_mode); RegionMap new_regions = persister.restore(nullptr, config); - ASSERT_NE(persister.page_storage, nullptr); + ASSERT_NE(persister.page_writer, nullptr); + ASSERT_NE(persister.page_reader, nullptr); ASSERT_EQ(persister.stable_page_storage, nullptr); // Try to read for (size_t i = 0; i < region_num; ++i) @@ -379,7 +382,8 @@ try RegionPersister persister(ctx, region_manager); // Restore normally, should run in normal mode. RegionMap new_regions = persister.restore(nullptr, config); - ASSERT_NE(persister.page_storage, nullptr); + ASSERT_NE(persister.page_writer, nullptr); + ASSERT_NE(persister.page_reader, nullptr); ASSERT_EQ(persister.stable_page_storage, nullptr); // Try to read for (size_t i = 0; i < region_num + region_num_under_nromal_mode; ++i) diff --git a/dbms/src/TestUtils/TiFlashTestEnv.cpp b/dbms/src/TestUtils/TiFlashTestEnv.cpp index 870db4217c4..8ec391c03d4 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.cpp +++ b/dbms/src/TestUtils/TiFlashTestEnv.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -66,6 +67,11 @@ void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, bool enable_ true, global_context->getPathCapacity(), global_context->getFileProvider()); + + global_context->setPageStorageRunMode(enable_ps_v3 ? PageStorageRunMode::ONLY_V3 : PageStorageRunMode::ONLY_V2); + global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool()); + LOG_FMT_INFO(Logger::get("TiFlashTestEnv"), "Storage mode : {}", static_cast(global_context->getPageStorageRunMode())); + TiFlashRaftConfig raft_config; raft_config.ignore_databases = {"default", "system"}; @@ -73,9 +79,6 @@ void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, bool enable_ raft_config.disable_bg_flush = true; global_context->createTMTContext(raft_config, pingcap::ClusterConfig()); - if (global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool(), enable_ps_v3)) - LOG_FMT_INFO(&Poco::Logger::get("TiFlashTestEnv"), "PageStorage V3 enabled."); - global_context->setDeltaIndexManager(1024 * 1024 * 100 /*100MB*/); global_context->getTMTContext().restore();