diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 40c14539644..4c3c2532c6a 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -424,6 +424,7 @@ extern const int DEADLOCK_AVOIDED = 10013; extern const int PTHREAD_ERROR = 10014; extern const int PS_ENTRY_NOT_EXISTS = 10015; extern const int PS_ENTRY_NO_VALID_VERSION = 10016; +extern const int PS_DIR_APPLY_INVALID_STATUS = 10017; } // namespace ErrorCodes } // namespace DB diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 1c74b5f402e..26e950d7798 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1650,8 +1650,9 @@ bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool) auto lock = getLock(); if (shared->global_storage_pool) { - // GlobalStoragePool may be initialized many times in some test cases for restore. - LOG_WARNING(shared->log, "GlobalStoragePool has already been initialized."); + // Can't init GlobalStoragePool twice. + // otherwise the pagestorage instances in `StoragePool` for each table won't be updated and cause unexpected problem. + throw Exception("GlobalStoragePool has already been initialized.", ErrorCodes::LOGICAL_ERROR); } CurrentMetrics::set(CurrentMetrics::GlobalStorageRunMode, static_cast(shared->storage_run_mode)); if (shared->storage_run_mode == PageStorageRunMode::MIX_MODE || shared->storage_run_mode == PageStorageRunMode::ONLY_V3) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 2d53f85f516..f20239638b3 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -243,7 +243,8 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, try { page_storage_run_mode = storage_pool->restore(); // restore from disk - if (!storage_pool->maxMetaPageId()) + if (const auto first_segment_entry = storage_pool->metaReader()->getPageEntry(DELTA_MERGE_FIRST_SEGMENT_ID); + !first_segment_entry.isValid()) { // Create the first segment. auto segment_id = storage_pool->newMetaPageId(); diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index 70e4b08ea14..b94cc3c1735 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -82,26 +82,21 @@ PageStorage::Config extractConfig(const Settings & settings, StorageType subtype } GlobalStoragePool::GlobalStoragePool(const PathPool & path_pool, Context & global_ctx, const Settings & settings) - : // The iops and bandwidth in log_storage are relatively high, use multi-disks if possible - log_storage(PageStorage::create("__global__.log", - path_pool.getPSDiskDelegatorGlobalMulti("log"), - extractConfig(settings, StorageType::Log), - global_ctx.getFileProvider(), - true)) - , - // The iops in data_storage is low, only use the first disk for storing data - data_storage(PageStorage::create("__global__.data", - path_pool.getPSDiskDelegatorGlobalSingle("data"), - extractConfig(settings, StorageType::Data), - global_ctx.getFileProvider(), - true)) - , - // The iops in meta_storage is relatively high, use multi-disks if possible - meta_storage(PageStorage::create("__global__.meta", - path_pool.getPSDiskDelegatorGlobalMulti("meta"), - extractConfig(settings, StorageType::Meta), - global_ctx.getFileProvider(), - true)) + : log_storage(PageStorage::create("__global__.log", + path_pool.getPSDiskDelegatorGlobalMulti("log"), + extractConfig(settings, StorageType::Log), + global_ctx.getFileProvider(), + true)) + , data_storage(PageStorage::create("__global__.data", + path_pool.getPSDiskDelegatorGlobalMulti("data"), + extractConfig(settings, StorageType::Data), + global_ctx.getFileProvider(), + true)) + , meta_storage(PageStorage::create("__global__.meta", + path_pool.getPSDiskDelegatorGlobalMulti("meta"), + extractConfig(settings, StorageType::Meta), + global_ctx.getFileProvider(), + true)) , global_context(global_ctx) { } @@ -118,9 +113,9 @@ GlobalStoragePool::~GlobalStoragePool() void GlobalStoragePool::restore() { - log_max_ids = log_storage->restore(); - data_max_ids = data_storage->restore(); - meta_max_ids = meta_storage->restore(); + log_storage->restore(); + data_storage->restore(); + meta_storage->restore(); gc_handle = global_context.getBackgroundPool().addTask( [this] { @@ -181,7 +176,7 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo extractConfig(global_context.getSettingsRef(), StorageType::Log), global_context.getFileProvider()); data_storage_v2 = PageStorage::create(name + ".data", - storage_path_pool.getPSDiskDelegatorSingle("data"), + storage_path_pool.getPSDiskDelegatorSingle("data"), // keep for behavior not changed extractConfig(global_context.getSettingsRef(), StorageType::Data), global_ctx.getFileProvider()); meta_storage_v2 = PageStorage::create(name + ".meta", @@ -295,41 +290,35 @@ void StoragePool::forceTransformMetaV2toV3() PageStorageRunMode StoragePool::restore() { - const auto & global_storage_pool = global_context.getGlobalStoragePool(); - switch (run_mode) { case PageStorageRunMode::ONLY_V2: { - auto log_max_ids = log_storage_v2->restore(); - auto data_max_ids = data_storage_v2->restore(); - auto meta_max_ids = meta_storage_v2->restore(); + log_storage_v2->restore(); + data_storage_v2->restore(); + meta_storage_v2->restore(); - assert(log_max_ids.size() == 1); - assert(data_max_ids.size() == 1); - assert(meta_max_ids.size() == 1); - - max_log_page_id = log_max_ids[0]; - max_data_page_id = data_max_ids[0]; - max_meta_page_id = meta_max_ids[0]; + max_log_page_id = log_storage_v2->getMaxId(ns_id); + max_data_page_id = data_storage_v2->getMaxId(ns_id); + max_meta_page_id = meta_storage_v2->getMaxId(ns_id); storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV2Only}; break; } case PageStorageRunMode::ONLY_V3: { - max_log_page_id = global_storage_pool->getLogMaxId(ns_id); - max_data_page_id = global_storage_pool->getDataMaxId(ns_id); - max_meta_page_id = global_storage_pool->getMetaMaxId(ns_id); + max_log_page_id = log_storage_v3->getMaxId(ns_id); + max_data_page_id = data_storage_v3->getMaxId(ns_id); + max_meta_page_id = meta_storage_v3->getMaxId(ns_id); storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only}; break; } case PageStorageRunMode::MIX_MODE: { - auto v2_log_max_ids = log_storage_v2->restore(); - auto v2_data_max_ids = data_storage_v2->restore(); - auto v2_meta_max_ids = meta_storage_v2->restore(); + log_storage_v2->restore(); + data_storage_v2->restore(); + meta_storage_v2->restore(); // The pages on data and log can be rewritten to V3 and the old pages on V2 are deleted by `delta merge`. // However, the pages on meta V2 can not be deleted. As the pages in meta are small, we perform a forceTransformMetaV2toV3 to convert pages before all. @@ -349,10 +338,6 @@ PageStorageRunMode StoragePool::restore() LOG_FMT_INFO(logger, "Current meta translate already done before restored.[ns_id={}] ", ns_id); } - assert(v2_log_max_ids.size() == 1); - assert(v2_data_max_ids.size() == 1); - assert(v2_meta_max_ids.size() == 1); - // Check number of valid pages in v2 // If V2 already have no any data in disk, Then change run_mode to ONLY_V3 if (log_storage_v2->getNumberOfPages() == 0 && data_storage_v2->getNumberOfPages() == 0 && meta_storage_v2->getNumberOfPages() == 0) @@ -375,18 +360,18 @@ PageStorageRunMode StoragePool::restore() data_storage_writer = std::make_shared(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, data_storage_v3); meta_storage_writer = std::make_shared(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, meta_storage_v3); - max_log_page_id = global_storage_pool->getLogMaxId(ns_id); - max_data_page_id = global_storage_pool->getDataMaxId(ns_id); - max_meta_page_id = global_storage_pool->getMetaMaxId(ns_id); + max_log_page_id = log_storage_v3->getMaxId(ns_id); + max_data_page_id = data_storage_v3->getMaxId(ns_id); + max_meta_page_id = meta_storage_v3->getMaxId(ns_id); run_mode = PageStorageRunMode::ONLY_V3; storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only}; } else // Still running Mix Mode { - max_log_page_id = std::max(v2_log_max_ids[0], global_storage_pool->getLogMaxId(ns_id)); - max_data_page_id = std::max(v2_data_max_ids[0], global_storage_pool->getDataMaxId(ns_id)); - max_meta_page_id = std::max(v2_meta_max_ids[0], global_storage_pool->getMetaMaxId(ns_id)); + max_log_page_id = std::max(log_storage_v2->getMaxId(ns_id), log_storage_v3->getMaxId(ns_id)); + max_data_page_id = std::max(data_storage_v2->getMaxId(ns_id), data_storage_v3->getMaxId(ns_id)); + max_meta_page_id = std::max(meta_storage_v2->getMaxId(ns_id), meta_storage_v3->getMaxId(ns_id)); storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolMixMode}; } break; diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index 2e3b3f563f5..f106ac725e4 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -51,39 +51,6 @@ class GlobalStoragePool : private boost::noncopyable friend class StoragePool; - PageId getLogMaxId(NamespaceId ns_id) const - { - PageId max_log_page_id = 0; - if (const auto & it = log_max_ids.find(ns_id); it != log_max_ids.end()) - { - max_log_page_id = it->second; - } - - return max_log_page_id; - } - - PageId getDataMaxId(NamespaceId ns_id) const - { - PageId max_data_page_id = 0; - if (const auto & it = data_max_ids.find(ns_id); it != data_max_ids.end()) - { - max_data_page_id = it->second; - } - - return max_data_page_id; - } - - PageId getMetaMaxId(NamespaceId ns_id) const - { - PageId max_meta_page_id = 0; - if (const auto & it = meta_max_ids.find(ns_id); it != meta_max_ids.end()) - { - max_meta_page_id = it->second; - } - - return max_meta_page_id; - } - // GC immediately // Only used on dbgFuncMisc bool gc(); @@ -96,10 +63,6 @@ class GlobalStoragePool : private boost::noncopyable PageStoragePtr data_storage; PageStoragePtr meta_storage; - std::map log_max_ids; - std::map data_max_ids; - std::map meta_max_ids; - std::atomic last_try_gc_time = Clock::now(); Context & global_context; diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index 5fc69c29364..06f3be5d1f7 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -229,16 +229,12 @@ class PageStorage : private boost::noncopyable virtual ~PageStorage() = default; - // Return the map[ns_id, max_page_id] - // The caller should ensure that it only allocate new id that is larger than `max_page_id`. Reusing the - // same ID for different kind of write (put/ref/put_external) would make PageStorage run into unexpected error. - // - // Note that for V2, we always return a map with only one element: cause V2 have no - // idea about ns_id. - virtual std::map restore() = 0; + virtual void restore() = 0; virtual void drop() = 0; + virtual PageId getMaxId(NamespaceId ns_id) = 0; + virtual SnapshotPtr getSnapshot(const String & tracing_id) = 0; // Get some statistics of all living snapshots and the oldest living snapshot. diff --git a/dbms/src/Storages/Page/V2/PageStorage.cpp b/dbms/src/Storages/Page/V2/PageStorage.cpp index 0452d0cc8ae..3ab62d55242 100644 --- a/dbms/src/Storages/Page/V2/PageStorage.cpp +++ b/dbms/src/Storages/Page/V2/PageStorage.cpp @@ -196,7 +196,7 @@ toConcreteSnapshot(const DB::PageStorage::SnapshotPtr & ptr) return assert_cast(ptr.get()); } -std::map PageStorage::restore() +void PageStorage::restore() { LOG_FMT_INFO(log, "{} begin to restore data from disk. [path={}] [num_writers={}]", storage_name, delegator->defaultPath(), write_files.size()); @@ -353,9 +353,12 @@ std::map PageStorage::restore() auto snapshot = getConcreteSnapshot(); size_t num_pages = snapshot->version()->numPages(); LOG_FMT_INFO(log, "{} restore {} pages, write batch sequence: {}, {}", storage_name, num_pages, write_batch_seq, statistics.toString()); +} - // Fixed namespace id 0 - return {{0, snapshot->version()->maxId()}}; +PageId PageStorage::getMaxId(NamespaceId /*ns_id*/) +{ + std::lock_guard write_lock(write_mutex); + return versioned_page_entries.getSnapshot("")->version()->maxId(); } PageId PageStorage::getNormalPageIdImpl(NamespaceId /*ns_id*/, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) diff --git a/dbms/src/Storages/Page/V2/PageStorage.h b/dbms/src/Storages/Page/V2/PageStorage.h index 6276c5e5086..cb55a769f37 100644 --- a/dbms/src/Storages/Page/V2/PageStorage.h +++ b/dbms/src/Storages/Page/V2/PageStorage.h @@ -91,10 +91,12 @@ class PageStorage : public DB::PageStorage const FileProviderPtr & file_provider_); ~PageStorage() = default; - std::map restore() override; + void restore() override; void drop() override; + PageId getMaxId(NamespaceId ns_id) override; + PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) override; DB::PageStorage::SnapshotPtr getSnapshot(const String & tracing_id) override; diff --git a/dbms/src/Storages/Page/V2/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V2/tests/gtest_page_storage.cpp index 5bbd319192b..fc429dde0ac 100644 --- a/dbms/src/Storages/Page/V2/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V2/tests/gtest_page_storage.cpp @@ -79,15 +79,6 @@ class PageStorage_test : public DB::base::TiFlashStorageTestBasic return storage; } - std::pair, std::map> reopen() - { - auto delegator = path_pool->getPSDiskDelegatorSingle("log"); - auto storage = std::make_shared("test.t", delegator, config, file_provider); - auto max_ids = storage->restore(); - return {storage, max_ids}; - } - - protected: PageStorage::Config config; std::shared_ptr storage; @@ -736,25 +727,6 @@ try } CATCH -TEST_F(PageStorage_test, getMaxIdsFromRestore) -try -{ - { - WriteBatch batch; - batch.putExternal(1, 0); - batch.putExternal(2, 0); - batch.delPage(1); - batch.delPage(2); - storage->write(std::move(batch)); - } - - storage = nullptr; - auto [page_storage, max_ids] = reopen(); - ASSERT_EQ(max_ids.size(), 1); - ASSERT_EQ(max_ids[0], 2); -} -CATCH - TEST_F(PageStorage_test, IgnoreIncompleteWriteBatch1) try { diff --git a/dbms/src/Storages/Page/V3/BlobStore.h b/dbms/src/Storages/Page/V3/BlobStore.h index b5cf5a677f1..e527eb0f3bf 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.h +++ b/dbms/src/Storages/Page/V3/BlobStore.h @@ -34,7 +34,7 @@ extern const int LOGICAL_ERROR; namespace PS::V3 { -using PageIdAndVersionedEntries = std::vector>; +using PageIdAndVersionedEntries = std::vector>; class BlobStore : private Allocator { diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 35e73e5c5ff..06b26156529 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -57,6 +57,7 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; extern const int PS_ENTRY_NOT_EXISTS; extern const int PS_ENTRY_NO_VALID_VERSION; +extern const int PS_DIR_APPLY_INVALID_STATUS; } // namespace ErrorCodes namespace PS::V3 @@ -65,7 +66,7 @@ namespace PS::V3 * VersionedPageEntries methods * ********************************/ -void VersionedPageEntries::createNewEntry(const PageVersionType & ver, const PageEntryV3 & entry) +void VersionedPageEntries::createNewEntry(const PageVersion & ver, const PageEntryV3 & entry) { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_DELETE) @@ -77,7 +78,7 @@ void VersionedPageEntries::createNewEntry(const PageVersionType & ver, const Pag if (type == EditRecordType::VAR_ENTRY) { - auto last_iter = MapUtils::findLess(entries, PageVersionType(ver.sequence + 1, 0)); + auto last_iter = MapUtils::findLess(entries, PageVersion(ver.sequence + 1, 0)); if (last_iter == entries.end()) { entries.emplace(ver, EntryOrDelete::newNormalEntry(entry)); @@ -93,11 +94,12 @@ void VersionedPageEntries::createNewEntry(const PageVersionType & ver, const Pag // to replace the entry with newer sequence. if (unlikely(last_iter->second.being_ref_count != 1 && last_iter->first.sequence < ver.sequence)) { - throw Exception(fmt::format( - "Try to replace normal entry with an newer seq [ver={}] [prev_ver={}] [last_entry={}]", - ver, - last_iter->first, - last_iter->second.toDebugString())); + throw Exception( + fmt::format("Try to replace normal entry with an newer seq [ver={}] [prev_ver={}] [last_entry={}]", + ver, + last_iter->first, + last_iter->second.toDebugString()), + ErrorCodes::LOGICAL_ERROR); } // create a new version that inherit the `being_ref_count` of the last entry entries.emplace(ver, EntryOrDelete::newRepalcingEntry(last_iter->second, entry)); @@ -105,18 +107,19 @@ void VersionedPageEntries::createNewEntry(const PageVersionType & ver, const Pag return; } - throw Exception(fmt::format( - "try to create entry version with invalid state " - "[ver={}] [entry={}] [state={}]", - ver, - ::DB::PS::V3::toDebugString(entry), - toDebugString())); + throw Exception( + fmt::format("try to create entry version with invalid state " + "[ver={}] [entry={}] [state={}]", + ver, + ::DB::PS::V3::toDebugString(entry), + toDebugString()), + ErrorCodes::PS_DIR_APPLY_INVALID_STATUS); } // Create a new external version with version=`ver`. // If create success, then return a shared_ptr as a holder for page_id. The holder // will be release when this external version is totally removed. -std::shared_ptr VersionedPageEntries::createNewExternal(const PageVersionType & ver) +std::shared_ptr VersionedPageEntries::createNewExternal(const PageVersion & ver) { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_DELETE) @@ -124,7 +127,7 @@ std::shared_ptr VersionedPageEntries::createNewExternal(const type = EditRecordType::VAR_EXTERNAL; is_deleted = false; create_ver = ver; - delete_ver = PageVersionType(0); + delete_ver = PageVersion(0); being_ref_count = 1; // return the new created holder to caller to set the page_id external_holder = std::make_shared(0, 0); @@ -140,7 +143,7 @@ std::shared_ptr VersionedPageEntries::createNewExternal(const { is_deleted = false; create_ver = ver; - delete_ver = PageVersionType(0); + delete_ver = PageVersion(0); being_ref_count = 1; // return the new created holder to caller to set the page_id external_holder = std::make_shared(0, 0); @@ -160,15 +163,16 @@ std::shared_ptr VersionedPageEntries::createNewExternal(const } } - throw Exception(fmt::format( - "try to create external version with invalid state " - "[ver={}] [state={}]", - ver, - toDebugString())); + throw Exception( + fmt::format("try to create external version with invalid state " + "[ver={}] [state={}]", + ver, + toDebugString()), + ErrorCodes::PS_DIR_APPLY_INVALID_STATUS); } // Create a new delete version with version=`ver`. -void VersionedPageEntries::createDelete(const PageVersionType & ver) +void VersionedPageEntries::createDelete(const PageVersion & ver) { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_ENTRY) @@ -203,7 +207,7 @@ void VersionedPageEntries::createDelete(const PageVersionType & ver) // Create a new reference version with version=`ver` and `ori_page_id_`. // If create success, then return true, otherwise return false. -bool VersionedPageEntries::createNewRef(const PageVersionType & ver, PageIdV3Internal ori_page_id_) +bool VersionedPageEntries::createNewRef(const PageVersion & ver, PageIdV3Internal ori_page_id_) { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_DELETE) @@ -225,7 +229,7 @@ bool VersionedPageEntries::createNewRef(const PageVersionType & ver, PageIdV3Int ori_page_id = ori_page_id_; create_ver = ver; is_deleted = false; - delete_ver = PageVersionType(0); + delete_ver = PageVersion(0); return true; } else if (ori_page_id == ori_page_id_) @@ -248,11 +252,12 @@ bool VersionedPageEntries::createNewRef(const PageVersionType & ver, PageIdV3Int // adding ref to replace put/external is not allowed throw Exception(fmt::format( - "try to create ref version with invalid state " - "[ver={}] [ori_page_id={}] [state={}]", - ver, - ori_page_id_, - toDebugString())); + "try to create ref version with invalid state " + "[ver={}] [ori_page_id={}] [state={}]", + ver, + ori_page_id_, + toDebugString()), + ErrorCodes::PS_DIR_APPLY_INVALID_STATUS); } std::shared_ptr VersionedPageEntries::fromRestored(const PageEntriesEdit::EditRecord & rec) @@ -287,14 +292,14 @@ std::shared_ptr VersionedPageEntries::fromRestored(const PageE } } -std::tuple +std::tuple VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 * entry) { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_ENTRY) { // entries are sorted by , find the first one less than - if (auto iter = MapUtils::findLess(entries, PageVersionType(seq + 1)); + if (auto iter = MapUtils::findLess(entries, PageVersion(seq + 1)); iter != entries.end()) { // If we applied write batches like this: [ver=1]{put 10}, [ver=2]{ref 11->10, del 10} @@ -304,7 +309,7 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 * { if (iter == entries.begin()) { - return {RESOLVE_FAIL, buildV3Id(0, 0), PageVersionType(0)}; + return {RESOLVE_FAIL, buildV3Id(0, 0), PageVersion(0)}; } --iter; // fallover the check the prev item @@ -314,7 +319,7 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 * { if (entry != nullptr) *entry = iter->second.entry; - return {RESOLVE_TO_NORMAL, buildV3Id(0, 0), PageVersionType(0)}; + return {RESOLVE_TO_NORMAL, buildV3Id(0, 0), PageVersion(0)}; } // fallover to FAIL } } @@ -324,7 +329,7 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 * bool ok = check_prev ? true : (!is_deleted || seq < delete_ver.sequence); if (create_ver.sequence <= seq && ok) { - return {RESOLVE_TO_NORMAL, buildV3Id(0, 0), PageVersionType(0)}; + return {RESOLVE_TO_NORMAL, buildV3Id(0, 0), PageVersion(0)}; } } else if (type == EditRecordType::VAR_REF) @@ -339,7 +344,7 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 * LOG_FMT_WARNING(&Poco::Logger::get("VersionedPageEntries"), "Can't reslove the EditRecordType {}", type); } - return {RESOLVE_FAIL, buildV3Id(0, 0), PageVersionType(0)}; + return {RESOLVE_FAIL, buildV3Id(0, 0), PageVersion(0)}; } std::optional VersionedPageEntries::getEntry(UInt64 seq) const @@ -348,10 +353,10 @@ std::optional VersionedPageEntries::getEntry(UInt64 seq) const if (type == EditRecordType::VAR_ENTRY) { // entries are sorted by , find the first one less than - if (auto iter = MapUtils::findLess(entries, PageVersionType(seq + 1)); + if (auto iter = MapUtils::findLess(entries, PageVersion(seq + 1)); iter != entries.end()) { - // NORMAL + // not deleted if (iter->second.isEntry()) return iter->second.entry; } @@ -375,12 +380,48 @@ std::optional VersionedPageEntries::getLastEntry() const return std::nullopt; } -Int64 VersionedPageEntries::incrRefCount(const PageVersionType & ver) +// Returns true when **this id** is "visible" by `seq`. +// If this page id is marked as deleted or not created, it is "not visible". +// Note that not visible does not means this id can be GC. +bool VersionedPageEntries::isVisible(UInt64 seq) const +{ + auto page_lock = acquireLock(); + if (type == EditRecordType::VAR_DELETE) + { + return false; + } + else if (type == EditRecordType::VAR_ENTRY) + { + // entries are sorted by , find the first one less than + if (auto iter = MapUtils::findLess(entries, PageVersion(seq + 1)); + iter != entries.end()) + { + // not deleted + return iter->second.isEntry(); + } + // else there are no valid entry less than seq + return false; + } + else if (type == EditRecordType::VAR_EXTERNAL || type == EditRecordType::VAR_REF) + { + // `delete_ver` is only valid when `is_deleted == true` + return create_ver.sequence <= seq && !(is_deleted && delete_ver.sequence <= seq); + } + + throw Exception(fmt::format( + "calling isDeleted with invalid state " + "[seq={}] [state={}]", + seq, + toDebugString()), + ErrorCodes::LOGICAL_ERROR); +} + +Int64 VersionedPageEntries::incrRefCount(const PageVersion & ver) { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_ENTRY) { - if (auto iter = MapUtils::findMutLess(entries, PageVersionType(ver.sequence + 1)); + if (auto iter = MapUtils::findMutLess(entries, PageVersion(ver.sequence + 1)); iter != entries.end()) { if (iter->second.isEntry()) @@ -435,7 +476,7 @@ PageSize VersionedPageEntries::getEntriesByBlobIds( bool VersionedPageEntries::cleanOutdatedEntries( UInt64 lowest_seq, - std::map> * normal_entries_to_deref, + std::map> * normal_entries_to_deref, PageEntriesV3 & entries_removed, const PageLock & /*page_lock*/) { @@ -474,7 +515,7 @@ bool VersionedPageEntries::cleanOutdatedEntries( return true; } - auto iter = MapUtils::findLess(entries, PageVersionType(lowest_seq + 1)); + auto iter = MapUtils::findLess(entries, PageVersion(lowest_seq + 1)); // If we can't find any seq lower than `lowest_seq` then // all version in this list don't need gc. if (iter == entries.begin() || iter == entries.end()) @@ -522,7 +563,7 @@ bool VersionedPageEntries::cleanOutdatedEntries( return entries.empty() || (entries.size() == 1 && entries.begin()->second.isDelete()); } -bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersionType & deref_ver, const Int64 deref_count, PageEntriesV3 & entries_removed) +bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, const Int64 deref_count, PageEntriesV3 & entries_removed) { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_EXTERNAL) @@ -538,7 +579,7 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag { // Decrease the ref-counter. The entry may be moved to a newer entry with same sequence but higher epoch, // so we need to find the one less than and decrease the ref-counter of it. - auto iter = MapUtils::findMutLess(entries, PageVersionType(deref_ver.sequence + 1, 0)); + auto iter = MapUtils::findMutLess(entries, PageVersion(deref_ver.sequence + 1, 0)); if (iter == entries.end()) { throw Exception(fmt::format("Can not find entry for decreasing ref count [page_id={}] [ver={}] [deref_count={}]", page_id, deref_ver, deref_count)); @@ -598,7 +639,7 @@ void VersionedPageEntries::collapseTo(const UInt64 seq, const PageIdV3Internal p if (type == EditRecordType::VAR_ENTRY) { // dump the latest entry if it is not a "delete" - auto last_iter = MapUtils::findLess(entries, PageVersionType(seq + 1)); + auto last_iter = MapUtils::findLess(entries, PageVersion(seq + 1)); if (last_iter == entries.end()) return; @@ -709,7 +750,7 @@ PageIDAndEntryV3 PageDirectory::get(PageIdV3Internal page_id, const PageDirector PageEntryV3 entry_got; PageIdV3Internal id_to_resolve = page_id; - PageVersionType ver_to_resolve(snap->sequence, 0); + PageVersion ver_to_resolve(snap->sequence, 0); bool ok = true; while (ok) { @@ -762,8 +803,8 @@ std::pair PageDirectory::get(const PageIdV3Internal PageEntryV3 entry_got; PageIds page_not_found = {}; - const PageVersionType init_ver_to_resolve(snap->sequence, 0); - auto get_one = [&entry_got, init_ver_to_resolve, throw_on_not_exist, this](PageIdV3Internal page_id, PageVersionType ver_to_resolve, size_t idx) { + const PageVersion init_ver_to_resolve(snap->sequence, 0); + auto get_one = [&entry_got, init_ver_to_resolve, throw_on_not_exist, this](PageIdV3Internal page_id, PageVersion ver_to_resolve, size_t idx) { PageIdV3Internal id_to_resolve = page_id; bool ok = true; while (ok) @@ -825,7 +866,7 @@ std::pair PageDirectory::get(const PageIdV3Internal PageIdV3Internal PageDirectory::getNormalPageId(PageIdV3Internal page_id, const PageDirectorySnapshotPtr & snap, bool throw_on_not_exist) const { PageIdV3Internal id_to_resolve = page_id; - PageVersionType ver_to_resolve(snap->sequence, 0); + PageVersion ver_to_resolve(snap->sequence, 0); bool keep_resolve = true; while (keep_resolve) { @@ -899,10 +940,31 @@ PageId PageDirectory::getMaxId(NamespaceId ns_id) const // iter is not at the beginning and mvcc_table_directory is not empty, // so iter-- must be a valid iterator, and it's the largest page id which is smaller than the target page id. iter--; - if (iter->first.high == ns_id) - return iter->first.low; - else - return 0; + + do + { + // Can't find any entries in current ns_id + if (iter->first.high != ns_id) + { + break; + } + + // Check and return whether this id is visible, otherwise continue to check the previous one. + if (iter->second->isVisible(UINT64_MAX - 1)) + { + return iter->first.low; + } + + // Current entry/ref/external is deleted and there are no entries before it. + if (iter == mvcc_table_directory.begin()) + { + break; + } + + iter--; + } while (true); + + return 0; } } @@ -923,17 +985,17 @@ void PageDirectory::applyRefEditRecord( MVCCMapType & mvcc_table_directory, const VersionedPageEntriesPtr & version_list, const PageEntriesEdit::EditRecord & rec, - const PageVersionType & version) + const PageVersion & version) { // applying ref 3->2, existing ref 2->1, normal entry 1, then we should collapse // the ref to be 3->1, increase the refcounting of normale entry 1 - auto [resolve_success, resolved_id, resolved_ver] = [&mvcc_table_directory](PageIdV3Internal id_to_resolve, PageVersionType ver_to_resolve) - -> std::tuple { + auto [resolve_success, resolved_id, resolved_ver] = [&mvcc_table_directory](PageIdV3Internal id_to_resolve, PageVersion ver_to_resolve) + -> std::tuple { while (true) { auto resolve_ver_iter = mvcc_table_directory.find(id_to_resolve); if (resolve_ver_iter == mvcc_table_directory.end()) - return {false, buildV3Id(0, 0), PageVersionType(0)}; + return {false, buildV3Id(0, 0), PageVersion(0)}; const VersionedPageEntriesPtr & resolve_version_list = resolve_ver_iter->second; // If we already hold the lock from `id_to_resolve`, then we should not request it again. @@ -999,7 +1061,7 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write // TODO: It is totally serialized, make it a pipeline std::unique_lock write_lock(table_rw_mutex); UInt64 last_sequence = sequence.load(); - PageVersionType new_version(last_sequence + 1, 0); + PageVersion new_version(last_sequence + 1, 0); // stage 1, persisted the changes to WAL with version [seq=last_seq + 1, epoch=0] wal->apply(edit, new_version, write_limiter); @@ -1233,7 +1295,7 @@ PageEntriesV3 PageDirectory::gcInMemEntries() // The page_id that we need to decrease ref count // { id_0: , id_1: <...>, ... } - std::map> normal_entries_to_deref; + std::map> normal_entries_to_deref; // Iterate all page_id and try to clean up useless var entries while (true) { diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 7f56676e363..635cf04bfe6 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -149,13 +149,13 @@ class VersionedPageEntries return std::lock_guard(m); } - void createNewEntry(const PageVersionType & ver, const PageEntryV3 & entry); + void createNewEntry(const PageVersion & ver, const PageEntryV3 & entry); - bool createNewRef(const PageVersionType & ver, PageIdV3Internal ori_page_id); + bool createNewRef(const PageVersion & ver, PageIdV3Internal ori_page_id); - std::shared_ptr createNewExternal(const PageVersionType & ver); + std::shared_ptr createNewExternal(const PageVersion & ver); - void createDelete(const PageVersionType & ver); + void createDelete(const PageVersion & ver); std::shared_ptr fromRestored(const PageEntriesEdit::EditRecord & rec); @@ -165,15 +165,17 @@ class VersionedPageEntries RESOLVE_TO_REF, RESOLVE_TO_NORMAL, }; - std::tuple + std::tuple resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 * entry); - Int64 incrRefCount(const PageVersionType & ver); + Int64 incrRefCount(const PageVersion & ver); std::optional getEntry(UInt64 seq) const; std::optional getLastEntry() const; + bool isVisible(UInt64 seq) const; + /** * If there are entries point to file in `blob_ids`, take out the and * store them into `blob_versioned_entries`. @@ -220,13 +222,13 @@ class VersionedPageEntries */ bool cleanOutdatedEntries( UInt64 lowest_seq, - std::map> * normal_entries_to_deref, + std::map> * normal_entries_to_deref, PageEntriesV3 & entries_removed, const PageLock & page_lock); bool derefAndClean( UInt64 lowest_seq, PageIdV3Internal page_id, - const PageVersionType & deref_ver, + const PageVersion & deref_ver, Int64 deref_count, PageEntriesV3 & entries_removed); @@ -258,15 +260,21 @@ class VersionedPageEntries private: mutable std::mutex m; + // Valid value of `type` is one of + // - VAR_DELETE + // - VAR_ENTRY + // - VAR_REF + // - VAR_EXTERNAL EditRecordType type; + // Has been deleted, valid when type == VAR_REF/VAR_EXTERNAL bool is_deleted; // Entries sorted by version, valid when type == VAR_ENTRY - std::multimap entries; + std::multimap entries; // The created version, valid when type == VAR_REF/VAR_EXTERNAL - PageVersionType create_ver; + PageVersion create_ver; // The deleted version, valid when type == VAR_REF/VAR_EXTERNAL && is_deleted = true - PageVersionType delete_ver; + PageVersion delete_ver; // Original page id, valid when type == VAR_REF PageIdV3Internal ori_page_id; // Being ref counter, valid when type == VAR_EXTERNAL @@ -379,7 +387,7 @@ class PageDirectory MVCCMapType & mvcc_table_directory, const VersionedPageEntriesPtr & version_list, const PageEntriesEdit::EditRecord & rec, - const PageVersionType & version); + const PageVersion & version); static inline PageDirectorySnapshotPtr toConcreteSnapshot(const DB::PageStorageSnapshotPtr & ptr) diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index 4f2a8a3fbd4..40b12b64f06 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -20,7 +20,13 @@ #include -namespace DB::PS::V3 +namespace DB +{ +namespace ErrorCodes +{ +extern const int PS_DIR_APPLY_INVALID_STATUS; +} // namespace ErrorCodes +namespace PS::V3 { PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, WALStore::Config config) { @@ -101,80 +107,94 @@ PageDirectoryPtr PageDirectoryFactory::createFromEdit(String storage_name, FileP void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntriesEdit & edit) { - PageDirectory::MVCCMapType & mvcc_table_directory = dir->mvcc_table_directory; - for (const auto & r : edit.getRecords()) { - if (auto it = max_apply_page_ids.find(r.page_id.high); it == max_apply_page_ids.end()) - { - max_apply_page_ids[r.page_id.high] = r.page_id.low; - } - else - { - it->second = std::max(it->second, r.page_id.low); - } - if (max_applied_ver < r.version) max_applied_ver = r.version; max_applied_page_id = std::max(r.page_id, max_applied_page_id); - auto [iter, created] = mvcc_table_directory.insert(std::make_pair(r.page_id, nullptr)); - if (created) + // We can not avoid page id from being reused under some corner situation. Try to do gcInMemEntries + // and apply again to resolve the error. + if (bool ok = applyRecord(dir, r, /*throw_on_error*/ false); unlikely(!ok)) { - iter->second = std::make_shared(); + dir->gcInMemEntries(); + applyRecord(dir, r, /*throw_on_error*/ true); + LOG_FMT_INFO(DB::Logger::get("PageDirectoryFactory"), "resolve from error status done, continue to restore"); } + } +} - const auto & version_list = iter->second; - const auto & restored_version = r.version; - try +bool PageDirectoryFactory::applyRecord( + const PageDirectoryPtr & dir, + const PageEntriesEdit::EditRecord & r, + bool throw_on_error) +{ + auto [iter, created] = dir->mvcc_table_directory.insert(std::make_pair(r.page_id, nullptr)); + if (created) + { + iter->second = std::make_shared(); + } + + const auto & version_list = iter->second; + const auto & restored_version = r.version; + try + { + switch (r.type) { - switch (r.type) - { - case EditRecordType::VAR_EXTERNAL: - case EditRecordType::VAR_REF: + case EditRecordType::VAR_EXTERNAL: + case EditRecordType::VAR_REF: + { + auto holder = version_list->fromRestored(r); + if (holder) { - auto holder = version_list->fromRestored(r); - if (holder) - { - *holder = r.page_id; - dir->external_ids.emplace_back(std::weak_ptr(holder)); - } - break; + *holder = r.page_id; + dir->external_ids.emplace_back(std::weak_ptr(holder)); } - case EditRecordType::VAR_ENTRY: - version_list->fromRestored(r); - break; - case EditRecordType::PUT_EXTERNAL: + break; + } + case EditRecordType::VAR_ENTRY: + version_list->fromRestored(r); + break; + case EditRecordType::PUT_EXTERNAL: + { + auto holder = version_list->createNewExternal(restored_version); + if (holder) { - auto holder = version_list->createNewExternal(restored_version); - if (holder) - { - *holder = r.page_id; - dir->external_ids.emplace_back(std::weak_ptr(holder)); - } - break; - } - case EditRecordType::PUT: - version_list->createNewEntry(restored_version, r.entry); - break; - case EditRecordType::DEL: - case EditRecordType::VAR_DELETE: // nothing different from `DEL` - version_list->createDelete(restored_version); - break; - case EditRecordType::REF: - PageDirectory::applyRefEditRecord(mvcc_table_directory, version_list, r, restored_version); - break; - case EditRecordType::UPSERT: - version_list->createNewEntry(restored_version, r.entry); - break; + *holder = r.page_id; + dir->external_ids.emplace_back(std::weak_ptr(holder)); } + break; } - catch (DB::Exception & e) + case EditRecordType::PUT: + version_list->createNewEntry(restored_version, r.entry); + break; + case EditRecordType::DEL: + case EditRecordType::VAR_DELETE: // nothing different from `DEL` + version_list->createDelete(restored_version); + break; + case EditRecordType::REF: + PageDirectory::applyRefEditRecord( + dir->mvcc_table_directory, + version_list, + r, + restored_version); + break; + case EditRecordType::UPSERT: + version_list->createNewEntry(restored_version, r.entry); + break; + } + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format(" [type={}] [page_id={}] [ver={}]", r.type, r.page_id, restored_version)); + if (throw_on_error || e.code() != ErrorCodes::PS_DIR_APPLY_INVALID_STATUS) { - e.addMessage(fmt::format(" [type={}] [page_id={}] [ver={}]", r.type, r.page_id, restored_version)); throw e; } + LOG_FMT_WARNING(DB::Logger::get("PageDirectoryFactory"), "try to resolve error during restore: {}", e.message()); + return false; } + return true; } void PageDirectoryFactory::loadFromDisk(const PageDirectoryPtr & dir, WALStoreReaderPtr && reader) @@ -196,4 +216,5 @@ void PageDirectoryFactory::loadFromDisk(const PageDirectoryPtr & dir, WALStoreRe loadEdit(dir, edit); } } -} // namespace DB::PS::V3 +} // namespace PS::V3 +} // namespace DB diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h index 4136e626050..11337e4a6cc 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h @@ -37,7 +37,7 @@ using WALStoreReaderPtr = std::shared_ptr; class PageDirectoryFactory { public: - PageVersionType max_applied_ver; + PageVersion max_applied_ver; PageIdV3Internal max_applied_page_id; PageDirectoryFactory & setBlobStore(BlobStore & blob_store) @@ -58,17 +58,15 @@ class PageDirectoryFactory return *this; } - std::map getMaxApplyPageIds() const - { - return max_apply_page_ids; - } - private: void loadFromDisk(const PageDirectoryPtr & dir, WALStoreReaderPtr && reader); void loadEdit(const PageDirectoryPtr & dir, const PageEntriesEdit & edit); + static bool applyRecord( + const PageDirectoryPtr & dir, + const PageEntriesEdit::EditRecord & r, + bool throw_on_error); BlobStore::BlobStats * blob_stats = nullptr; - std::map max_apply_page_ids; }; } // namespace PS::V3 diff --git a/dbms/src/Storages/Page/V3/PageEntriesEdit.h b/dbms/src/Storages/Page/V3/PageEntriesEdit.h index 1702b9e575f..b75b16da4a4 100644 --- a/dbms/src/Storages/Page/V3/PageEntriesEdit.h +++ b/dbms/src/Storages/Page/V3/PageEntriesEdit.h @@ -27,45 +27,45 @@ namespace DB::PS::V3 // `PageDirectory::apply` with create a version={directory.sequence, epoch=0}. // After data compaction and page entries need to be updated, will create // some entries with a version={old_sequence, epoch=old_epoch+1}. -struct PageVersionType +struct PageVersion { UInt64 sequence; // The write sequence UInt64 epoch; // The GC epoch - explicit PageVersionType(UInt64 seq) + explicit PageVersion(UInt64 seq) : sequence(seq) , epoch(0) {} - PageVersionType(UInt64 seq, UInt64 epoch_) + PageVersion(UInt64 seq, UInt64 epoch_) : sequence(seq) , epoch(epoch_) {} - PageVersionType() - : PageVersionType(0) + PageVersion() + : PageVersion(0) {} - bool operator<(const PageVersionType & rhs) const + bool operator<(const PageVersion & rhs) const { if (sequence == rhs.sequence) return epoch < rhs.epoch; return sequence < rhs.sequence; } - bool operator==(const PageVersionType & rhs) const + bool operator==(const PageVersion & rhs) const { return (sequence == rhs.sequence) && (epoch == rhs.epoch); } - bool operator<=(const PageVersionType & rhs) const + bool operator<=(const PageVersion & rhs) const { if (sequence == rhs.sequence) return epoch <= rhs.epoch; return sequence <= rhs.sequence; } }; -using VersionedEntry = std::pair; +using VersionedEntry = std::pair; using VersionedEntries = std::vector; enum class EditRecordType @@ -76,7 +76,8 @@ enum class EditRecordType DEL, // UPSERT, - // + // Variant types for dumping the in-memory entries into + // snapshot VAR_ENTRY, VAR_REF, VAR_EXTERNAL, @@ -138,7 +139,7 @@ class PageEntriesEdit records.emplace_back(record); } - void upsertPage(PageIdV3Internal page_id, const PageVersionType & ver, const PageEntryV3 & entry) + void upsertPage(PageIdV3Internal page_id, const PageVersion & ver, const PageEntryV3 & entry) { EditRecord record{}; record.type = EditRecordType::UPSERT; @@ -165,7 +166,7 @@ class PageEntriesEdit records.emplace_back(record); } - void varRef(PageIdV3Internal ref_id, const PageVersionType & ver, PageIdV3Internal ori_page_id) + void varRef(PageIdV3Internal ref_id, const PageVersion & ver, PageIdV3Internal ori_page_id) { EditRecord record{}; record.type = EditRecordType::VAR_REF; @@ -175,7 +176,7 @@ class PageEntriesEdit records.emplace_back(record); } - void varExternal(PageIdV3Internal page_id, const PageVersionType & create_ver, Int64 being_ref_count) + void varExternal(PageIdV3Internal page_id, const PageVersion & create_ver, Int64 being_ref_count) { EditRecord record{}; record.type = EditRecordType::VAR_EXTERNAL; @@ -185,7 +186,7 @@ class PageEntriesEdit records.emplace_back(record); } - void varEntry(PageIdV3Internal page_id, const PageVersionType & ver, const PageEntryV3 & entry, Int64 being_ref_count) + void varEntry(PageIdV3Internal page_id, const PageVersion & ver, const PageEntryV3 & entry, Int64 being_ref_count) { EditRecord record{}; record.type = EditRecordType::VAR_ENTRY; @@ -196,7 +197,7 @@ class PageEntriesEdit records.emplace_back(record); } - void varDel(PageIdV3Internal page_id, const PageVersionType & delete_ver) + void varDel(PageIdV3Internal page_id, const PageVersion & delete_ver) { EditRecord record{}; record.type = EditRecordType::VAR_DELETE; @@ -216,7 +217,7 @@ class PageEntriesEdit EditRecordType type; PageIdV3Internal page_id; PageIdV3Internal ori_page_id; - PageVersionType version; + PageVersion version; PageEntryV3 entry; Int64 being_ref_count; @@ -259,7 +260,7 @@ class PageEntriesEdit { putExternal(buildV3Id(TEST_NAMESPACE_ID, page_id)); } - void upsertPage(PageId page_id, const PageVersionType & ver, const PageEntryV3 & entry) + void upsertPage(PageId page_id, const PageVersion & ver, const PageEntryV3 & entry) { upsertPage(buildV3Id(TEST_NAMESPACE_ID, page_id), ver, entry); } @@ -271,19 +272,19 @@ class PageEntriesEdit { ref(buildV3Id(TEST_NAMESPACE_ID, ref_id), buildV3Id(TEST_NAMESPACE_ID, page_id)); } - void varRef(PageId ref_id, const PageVersionType & ver, PageId ori_page_id) + void varRef(PageId ref_id, const PageVersion & ver, PageId ori_page_id) { varRef(buildV3Id(TEST_NAMESPACE_ID, ref_id), ver, buildV3Id(TEST_NAMESPACE_ID, ori_page_id)); } - void varExternal(PageId page_id, const PageVersionType & create_ver, Int64 being_ref_count) + void varExternal(PageId page_id, const PageVersion & create_ver, Int64 being_ref_count) { varExternal(buildV3Id(TEST_NAMESPACE_ID, page_id), create_ver, being_ref_count); } - void varEntry(PageId page_id, const PageVersionType & ver, const PageEntryV3 & entry, Int64 being_ref_count) + void varEntry(PageId page_id, const PageVersion & ver, const PageEntryV3 & entry, Int64 being_ref_count) { varEntry(buildV3Id(TEST_NAMESPACE_ID, page_id), ver, entry, being_ref_count); } - void varDel(PageId page_id, const PageVersionType & delete_ver) + void varDel(PageId page_id, const PageVersion & delete_ver) { varDel(buildV3Id(TEST_NAMESPACE_ID, page_id), delete_ver); } @@ -315,7 +316,7 @@ class PageEntriesEdit /// See https://fmt.dev/latest/api.html#formatting-user-defined-types template <> -struct fmt::formatter +struct fmt::formatter { static constexpr auto parse(format_parse_context & ctx) { @@ -330,7 +331,7 @@ struct fmt::formatter } template - auto format(const DB::PS::V3::PageVersionType & ver, FormatContext & ctx) + auto format(const DB::PS::V3::PageVersion & ver, FormatContext & ctx) { return format_to(ctx.out(), "<{},{}>", ver.sequence, ver.epoch); } diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index de0a26f68c7..6966f794de9 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -43,7 +43,7 @@ PageStorageImpl::PageStorageImpl( PageStorageImpl::~PageStorageImpl() = default; -std::map PageStorageImpl::restore() +void PageStorageImpl::restore() { // TODO: clean up blobstore. // TODO: Speedup restoring @@ -53,7 +53,11 @@ std::map PageStorageImpl::restore() page_directory = factory .setBlobStore(blob_store) .create(storage_name, file_provider, delegator, parseWALConfig(config)); - return factory.getMaxApplyPageIds(); +} + +PageId PageStorageImpl::getMaxId(NamespaceId ns_id) +{ + return page_directory->getMaxId(ns_id); } void PageStorageImpl::drop() diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.h b/dbms/src/Storages/Page/V3/PageStorageImpl.h index 62c45ac685d..f3b696d0351 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.h +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.h @@ -60,10 +60,11 @@ class PageStorageImpl : public DB::PageStorage return wal_config; } - std::map restore() override; + void restore() override; void drop() override; + PageId getMaxId(NamespaceId ns_id) override; PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) override; diff --git a/dbms/src/Storages/Page/V3/WAL/serialize.cpp b/dbms/src/Storages/Page/V3/WAL/serialize.cpp index 26854f9a640..f8e26617499 100644 --- a/dbms/src/Storages/Page/V3/WAL/serialize.cpp +++ b/dbms/src/Storages/Page/V3/WAL/serialize.cpp @@ -23,13 +23,13 @@ namespace DB::PS::V3::ser { -inline void serializeVersionTo(const PageVersionType & version, WriteBuffer & buf) +inline void serializeVersionTo(const PageVersion & version, WriteBuffer & buf) { writeIntBinary(version.sequence, buf); writeIntBinary(version.epoch, buf); } -inline void deserializeVersionFrom(ReadBuffer & buf, PageVersionType & version) +inline void deserializeVersionFrom(ReadBuffer & buf, PageVersion & version) { readIntBinary(version.sequence, buf); readIntBinary(version.epoch, buf); @@ -174,7 +174,7 @@ void deserializeDelFrom([[maybe_unused]] const EditRecordType record_type, ReadB PageIdV3Internal page_id; readIntBinary(page_id, buf); - PageVersionType version; + PageVersion version; deserializeVersionFrom(buf, version); PageEntriesEdit::EditRecord rec; diff --git a/dbms/src/Storages/Page/V3/WALStore.cpp b/dbms/src/Storages/Page/V3/WALStore.cpp index 6759e80f416..1f1eaf3bc33 100644 --- a/dbms/src/Storages/Page/V3/WALStore.cpp +++ b/dbms/src/Storages/Page/V3/WALStore.cpp @@ -69,7 +69,7 @@ WALStore::WALStore( { } -void WALStore::apply(PageEntriesEdit & edit, const PageVersionType & version, const WriteLimiterPtr & write_limiter) +void WALStore::apply(PageEntriesEdit & edit, const PageVersion & version, const WriteLimiterPtr & write_limiter) { for (auto & r : edit.getMutRecords()) { diff --git a/dbms/src/Storages/Page/V3/WALStore.h b/dbms/src/Storages/Page/V3/WALStore.h index 8984c10e5e8..039903a8608 100644 --- a/dbms/src/Storages/Page/V3/WALStore.h +++ b/dbms/src/Storages/Page/V3/WALStore.h @@ -99,7 +99,7 @@ class WALStore PSDiskDelegatorPtr & delegator, WALStore::Config config); - void apply(PageEntriesEdit & edit, const PageVersionType & version, const WriteLimiterPtr & write_limiter = nullptr); + void apply(PageEntriesEdit & edit, const PageVersion & version, const WriteLimiterPtr & write_limiter = nullptr); void apply(const PageEntriesEdit & edit, const WriteLimiterPtr & write_limiter = nullptr); struct FilesSnapshot diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index 9789fd0fe83..4511cc8ddd7 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -625,16 +625,16 @@ CATCH #define INSERT_BLOBID_ENTRY(BLOBID, VERSION) \ PageEntryV3 entry_v##VERSION{.file_id = (BLOBID), .size = (VERSION), .tag = 0, .offset = 0x123, .checksum = 0x4567}; \ - entries.createNewEntry(PageVersionType(VERSION), entry_v##VERSION); + entries.createNewEntry(PageVersion(VERSION), entry_v##VERSION); #define INSERT_ENTRY(VERSION) INSERT_BLOBID_ENTRY(1, VERSION) #define INSERT_GC_ENTRY(VERSION, EPOCH) \ PageEntryV3 entry_gc_v##VERSION##_##EPOCH{.file_id = 2, .size = 100 * (VERSION) + (EPOCH), .tag = 0, .offset = 0x234, .checksum = 0x5678}; \ - entries.createNewEntry(PageVersionType((VERSION), (EPOCH)), entry_gc_v##VERSION##_##EPOCH); + entries.createNewEntry(PageVersion((VERSION), (EPOCH)), entry_gc_v##VERSION##_##EPOCH); class VersionedEntriesTest : public ::testing::Test { public: - using DerefCounter = std::map>; + using DerefCounter = std::map>; std::tuple runClean(UInt64 seq) { DerefCounter deref_counter; @@ -643,7 +643,7 @@ class VersionedEntriesTest : public ::testing::Test return {all_removed, removed_entries, deref_counter}; } - std::tuple runDeref(UInt64 seq, PageVersionType ver, Int64 decrease_num) + std::tuple runDeref(UInt64 seq, PageVersion ver, Int64 decrease_num) { PageEntriesV3 removed_entries; bool all_removed = entries.derefAndClean(seq, buildV3Id(TEST_NAMESPACE_ID, page_id), ver, decrease_num, removed_entries); @@ -694,7 +694,7 @@ TEST_F(VersionedEntriesTest, InsertGet) // Insert delete. Can not get entry with seq >= delete_version. // But it won't affect reading with old seq. - entries.createDelete(PageVersionType(15)); + entries.createDelete(PageVersion(15)); ASSERT_FALSE(entries.getEntry(1).has_value()); ASSERT_SAME_ENTRY(*entries.getEntry(2), entry_gc_v2_1); ASSERT_SAME_ENTRY(*entries.getEntry(3), entry_gc_v2_1); @@ -722,6 +722,126 @@ TEST_F(VersionedEntriesTest, InsertWithLowerVersion) ASSERT_SAME_ENTRY(*entries.getEntry(2), entry_v2); } +TEST_F(VersionedEntriesTest, EntryIsVisible) +try +{ + // init state + ASSERT_FALSE(entries.isVisible(0)); + ASSERT_FALSE(entries.isVisible(1)); + ASSERT_FALSE(entries.isVisible(2)); + ASSERT_FALSE(entries.isVisible(10000)); + + // insert some entries + INSERT_ENTRY(2); + INSERT_ENTRY(3); + INSERT_ENTRY(5); + + ASSERT_FALSE(entries.isVisible(1)); + ASSERT_TRUE(entries.isVisible(2)); + ASSERT_TRUE(entries.isVisible(3)); + ASSERT_TRUE(entries.isVisible(4)); + ASSERT_TRUE(entries.isVisible(5)); + ASSERT_TRUE(entries.isVisible(6)); + + // insert delete + entries.createDelete(PageVersion(6)); + + ASSERT_FALSE(entries.isVisible(1)); + ASSERT_TRUE(entries.isVisible(2)); + ASSERT_TRUE(entries.isVisible(3)); + ASSERT_TRUE(entries.isVisible(4)); + ASSERT_TRUE(entries.isVisible(5)); + ASSERT_FALSE(entries.isVisible(6)); + ASSERT_FALSE(entries.isVisible(10000)); + + // insert entry after delete + INSERT_ENTRY(7); + + ASSERT_FALSE(entries.isVisible(1)); + ASSERT_TRUE(entries.isVisible(2)); + ASSERT_TRUE(entries.isVisible(3)); + ASSERT_TRUE(entries.isVisible(4)); + ASSERT_TRUE(entries.isVisible(5)); + ASSERT_FALSE(entries.isVisible(6)); + ASSERT_TRUE(entries.isVisible(7)); + ASSERT_TRUE(entries.isVisible(10000)); +} +CATCH + +TEST_F(VersionedEntriesTest, ExternalPageIsVisible) +try +{ + // init state + ASSERT_FALSE(entries.isVisible(0)); + ASSERT_FALSE(entries.isVisible(1)); + ASSERT_FALSE(entries.isVisible(2)); + ASSERT_FALSE(entries.isVisible(10000)); + + // insert some entries + entries.createNewExternal(PageVersion(2)); + + ASSERT_FALSE(entries.isVisible(1)); + ASSERT_TRUE(entries.isVisible(2)); + ASSERT_TRUE(entries.isVisible(10000)); + + // insert delete + entries.createDelete(PageVersion(6)); + + ASSERT_FALSE(entries.isVisible(1)); + ASSERT_TRUE(entries.isVisible(2)); + ASSERT_TRUE(entries.isVisible(3)); + ASSERT_TRUE(entries.isVisible(4)); + ASSERT_TRUE(entries.isVisible(5)); + ASSERT_FALSE(entries.isVisible(6)); + ASSERT_FALSE(entries.isVisible(10000)); + + // insert entry after delete + entries.createNewExternal(PageVersion(7)); + + // after re-create external page, the visible for 1~5 has changed + ASSERT_FALSE(entries.isVisible(6)); + ASSERT_TRUE(entries.isVisible(7)); + ASSERT_TRUE(entries.isVisible(10000)); +} +CATCH + +TEST_F(VersionedEntriesTest, RefPageIsVisible) +try +{ + // init state + ASSERT_FALSE(entries.isVisible(0)); + ASSERT_FALSE(entries.isVisible(1)); + ASSERT_FALSE(entries.isVisible(2)); + ASSERT_FALSE(entries.isVisible(10000)); + + // insert some entries + entries.createNewRef(PageVersion(2), buildV3Id(TEST_NAMESPACE_ID, 2)); + + ASSERT_FALSE(entries.isVisible(1)); + ASSERT_TRUE(entries.isVisible(2)); + ASSERT_TRUE(entries.isVisible(10000)); + + // insert delete + entries.createDelete(PageVersion(6)); + + ASSERT_FALSE(entries.isVisible(1)); + ASSERT_TRUE(entries.isVisible(2)); + ASSERT_TRUE(entries.isVisible(3)); + ASSERT_TRUE(entries.isVisible(4)); + ASSERT_TRUE(entries.isVisible(5)); + ASSERT_FALSE(entries.isVisible(6)); + ASSERT_FALSE(entries.isVisible(10000)); + + // insert entry after delete + entries.createNewRef(PageVersion(7), buildV3Id(TEST_NAMESPACE_ID, 2)); + + // after re-create ref page, the visible for 1~5 has changed + ASSERT_FALSE(entries.isVisible(6)); + ASSERT_TRUE(entries.isVisible(7)); + ASSERT_TRUE(entries.isVisible(10000)); +} +CATCH + TEST_F(VersionedEntriesTest, GC) try { @@ -732,7 +852,7 @@ try INSERT_GC_ENTRY(5, 2); INSERT_ENTRY(10); INSERT_ENTRY(11); - entries.createDelete(PageVersionType(15)); + entries.createDelete(PageVersion(15)); // noting to be removed auto [all_removed, removed_entries, deref_counter] = runClean(1); @@ -785,15 +905,15 @@ CATCH TEST_F(VersionedEntriesTest, DeleteMultiTime) try { - entries.createDelete(PageVersionType(1)); + entries.createDelete(PageVersion(1)); INSERT_ENTRY(2); INSERT_GC_ENTRY(2, 1); - entries.createDelete(PageVersionType(15)); - entries.createDelete(PageVersionType(17)); - entries.createDelete(PageVersionType(16)); + entries.createDelete(PageVersion(15)); + entries.createDelete(PageVersion(17)); + entries.createDelete(PageVersion(16)); bool all_removed; - std::map> deref_counter; + std::map> deref_counter; PageEntriesV3 removed_entries; // <2,0> get removed. @@ -818,13 +938,13 @@ TEST_F(VersionedEntriesTest, DontCleanWhenBeingRef) try { bool all_removed; - std::map> deref_counter; + std::map> deref_counter; PageEntriesV3 removed_entries; INSERT_ENTRY(2); - entries.incrRefCount(PageVersionType(2)); - entries.incrRefCount(PageVersionType(2)); - entries.createDelete(PageVersionType(5)); + entries.incrRefCount(PageVersion(2)); + entries.incrRefCount(PageVersion(2)); + entries.createDelete(PageVersion(5)); // <2, 0> is not available after seq=5, but not get removed ASSERT_SAME_ENTRY(entry_v2, *entries.getEntry(4)); @@ -838,13 +958,13 @@ try ASSERT_EQ(deref_counter.size(), 0); // decrease 1 ref counting - std::tie(all_removed, removed_entries) = runDeref(5, PageVersionType(2), 1); + std::tie(all_removed, removed_entries) = runDeref(5, PageVersion(2), 1); ASSERT_EQ(removed_entries.size(), 0); ASSERT_FALSE(all_removed); // should not remove this chain ASSERT_FALSE(entries.getEntry(5)); // clear all - std::tie(all_removed, removed_entries) = runDeref(5, PageVersionType(2), 1); + std::tie(all_removed, removed_entries) = runDeref(5, PageVersion(2), 1); ASSERT_EQ(removed_entries.size(), 1); ASSERT_SAME_ENTRY(removed_entries[0], entry_v2); ASSERT_TRUE(all_removed); // should remove this chain @@ -856,13 +976,13 @@ TEST_F(VersionedEntriesTest, DontCleanWhenBeingRef2) try { bool all_removed; - std::map> deref_counter; + std::map> deref_counter; PageEntriesV3 removed_entries; INSERT_ENTRY(2); - entries.incrRefCount(PageVersionType(2)); - entries.incrRefCount(PageVersionType(2)); - entries.createDelete(PageVersionType(5)); + entries.incrRefCount(PageVersion(2)); + entries.incrRefCount(PageVersion(2)); + entries.createDelete(PageVersion(5)); // <2, 0> is not available after seq=5, but not get removed ASSERT_SAME_ENTRY(entry_v2, *entries.getEntry(4)); @@ -876,7 +996,7 @@ try ASSERT_EQ(deref_counter.size(), 0); // clear all - std::tie(all_removed, removed_entries) = runDeref(5, PageVersionType(2), 2); + std::tie(all_removed, removed_entries) = runDeref(5, PageVersion(2), 2); ASSERT_EQ(removed_entries.size(), 1); ASSERT_SAME_ENTRY(removed_entries[0], entry_v2); ASSERT_TRUE(all_removed); // should remove this chain @@ -888,12 +1008,12 @@ TEST_F(VersionedEntriesTest, CleanDuplicatedWhenBeingRefAndAppliedUpsert) try { bool all_removed; - std::map> deref_counter; + std::map> deref_counter; PageEntriesV3 removed_entries; INSERT_ENTRY(2); - entries.incrRefCount(PageVersionType(2)); - entries.incrRefCount(PageVersionType(2)); + entries.incrRefCount(PageVersion(2)); + entries.incrRefCount(PageVersion(2)); INSERT_GC_ENTRY(2, 1); INSERT_GC_ENTRY(2, 2); @@ -910,7 +1030,7 @@ try ASSERT_EQ(deref_counter.size(), 0); // clear all - std::tie(all_removed, removed_entries) = runDeref(5, PageVersionType(2), 2); + std::tie(all_removed, removed_entries) = runDeref(5, PageVersion(2), 2); ASSERT_EQ(removed_entries.size(), 0); ASSERT_FALSE(all_removed); // should not remove this chain ASSERT_SAME_ENTRY(entry_gc_v2_2, *entries.getEntry(4)); @@ -921,15 +1041,15 @@ TEST_F(VersionedEntriesTest, CleanDuplicatedWhenBeingRefAndAppliedUpsert2) try { bool all_removed; - std::map> deref_counter; + std::map> deref_counter; PageEntriesV3 removed_entries; INSERT_ENTRY(2); - entries.incrRefCount(PageVersionType(2)); - entries.incrRefCount(PageVersionType(2)); + entries.incrRefCount(PageVersion(2)); + entries.incrRefCount(PageVersion(2)); INSERT_GC_ENTRY(2, 1); INSERT_GC_ENTRY(2, 2); - entries.createDelete(PageVersionType(5)); + entries.createDelete(PageVersion(5)); // <2, 2> is not available after seq=5, but not get removed ASSERT_SAME_ENTRY(entry_gc_v2_2, *entries.getEntry(4)); @@ -945,7 +1065,7 @@ try ASSERT_EQ(deref_counter.size(), 0); // clear all - std::tie(all_removed, removed_entries) = runDeref(5, PageVersionType(2), 2); + std::tie(all_removed, removed_entries) = runDeref(5, PageVersion(2), 2); ASSERT_EQ(removed_entries.size(), 1); ASSERT_SAME_ENTRY(removed_entries[0], entry_gc_v2_2); ASSERT_TRUE(all_removed); // should remove this chain @@ -957,7 +1077,7 @@ TEST_F(VersionedEntriesTest, ReadAfterGcApplied) try { bool all_removed; - std::map> deref_counter; + std::map> deref_counter; PageEntriesV3 removed_entries; INSERT_ENTRY(2); @@ -2094,6 +2214,191 @@ try } CATCH +TEST_F(PageDirectoryGCTest, RestoreWithDuplicateID) +try +{ + auto restore_from_edit = [](const PageEntriesEdit & edit) { + auto ctx = ::DB::tests::TiFlashTestEnv::getContext(); + auto provider = ctx.getFileProvider(); + auto path = getTemporaryPath(); + PSDiskDelegatorPtr delegator = std::make_shared(path); + PageDirectoryFactory factory; + auto d = factory.createFromEdit(getCurrentTestName(), provider, delegator, edit); + return d; + }; + + const PageId target_id = 100; + // ========= 1 =======// + // Reuse same id: PUT_EXT/DEL/REF + { + PageEntryV3 entry_50{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntriesEdit edit; + edit.put(50, entry_50); + edit.putExternal(target_id); + edit.del(target_id); + // restart and reuse id=100 as ref to replace put_ext + edit.ref(target_id, 50); + + auto restored_dir = restore_from_edit(edit); + auto snap = restored_dir->createSnapshot(); + ASSERT_EQ(restored_dir->getNormalPageId(target_id, snap).low, 50); + } + // Reuse same id: PUT_EXT/DEL/PUT + { + PageEntryV3 entry_50{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_100{.file_id = 100, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntriesEdit edit; + edit.put(50, entry_50); + edit.putExternal(target_id); + edit.del(target_id); + // restart and reuse id=100 as put to replace put_ext + edit.put(target_id, entry_100); + + auto restored_dir = restore_from_edit(edit); + auto snap = restored_dir->createSnapshot(); + ASSERT_SAME_ENTRY(restored_dir->get(target_id, snap).second, entry_100); + } + + // ========= 1-invalid =======// + // Reuse same id: PUT_EXT/BEING REF/DEL/REF + { + PageEntryV3 entry_50{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntriesEdit edit; + edit.put(50, entry_50); + edit.putExternal(target_id); + edit.ref(101, target_id); + edit.del(target_id); + // restart and reuse id=100 as ref. Should not happen because 101 still ref to 100 + edit.ref(target_id, 50); + + ASSERT_THROW(restore_from_edit(edit);, DB::Exception); + } + // Reuse same id: PUT_EXT/BEING REF/DEL/PUT + { + PageEntryV3 entry_50{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_100{.file_id = 100, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntriesEdit edit; + edit.put(50, entry_50); + edit.putExternal(target_id); + edit.ref(101, target_id); + edit.del(target_id); + // restart and reuse id=100 as put. Should not happen because 101 still ref to 100 + edit.put(target_id, entry_100); + + ASSERT_THROW(restore_from_edit(edit);, DB::Exception); + } + + // ========= 2 =======// + // Reuse same id: PUT/DEL/REF + { + PageEntryV3 entry_50{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntriesEdit edit; + edit.put(50, entry_50); + edit.put(target_id, entry_50); + edit.del(target_id); + // restart and reuse id=100 as ref to replace put + edit.ref(target_id, 50); + + auto restored_dir = restore_from_edit(edit); + auto snap = restored_dir->createSnapshot(); + ASSERT_EQ(restored_dir->getNormalPageId(target_id, snap).low, 50); + } + // Reuse same id: PUT/DEL/PUT_EXT + { + PageEntryV3 entry_50{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntriesEdit edit; + edit.put(50, entry_50); + edit.put(target_id, entry_50); + edit.del(target_id); + // restart and reuse id=100 as external to replace put + edit.putExternal(target_id); + + auto restored_dir = restore_from_edit(edit); + auto snap = restored_dir->createSnapshot(); + auto ext_ids = restored_dir->getAliveExternalIds(TEST_NAMESPACE_ID); + ASSERT_EQ(ext_ids.size(), 1); + ASSERT_EQ(*ext_ids.begin(), target_id); + } + + // ========= 2-invalid =======// + // Reuse same id: PUT/BEING REF/DEL/REF + { + PageEntryV3 entry_50{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntriesEdit edit; + edit.put(50, entry_50); + edit.put(target_id, entry_50); + edit.ref(101, target_id); + edit.del(target_id); + // restart and reuse id=100 as ref to replace put + edit.ref(target_id, 50); + + ASSERT_THROW(restore_from_edit(edit);, DB::Exception); + } + // Reuse same id: PUT/BEING REF/DEL/PUT_EXT + { + PageEntryV3 entry_50{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntriesEdit edit; + edit.put(50, entry_50); + edit.put(target_id, entry_50); + edit.ref(101, target_id); + edit.del(target_id); + // restart and reuse id=100 as external to replace put + edit.putExternal(target_id); + + ASSERT_THROW(restore_from_edit(edit);, DB::Exception); + } + + // ========= 3 =======// + // Reuse same id: REF/DEL/PUT + { + PageEntryV3 entry_50{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_100{.file_id = 100, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntriesEdit edit; + edit.put(50, entry_50); + edit.ref(target_id, 50); + edit.del(target_id); + // restart and reuse id=100 as put to replace ref + edit.put(target_id, entry_100); + + auto restored_dir = restore_from_edit(edit); + auto snap = restored_dir->createSnapshot(); + ASSERT_SAME_ENTRY(restored_dir->get(target_id, snap).second, entry_100); + } + // Reuse same id: REF/DEL/PUT_EXT + { + PageEntryV3 entry_50{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntriesEdit edit; + edit.put(50, entry_50); + edit.ref(target_id, 50); + edit.del(target_id); + // restart and reuse id=100 as external to replace ref + edit.putExternal(target_id); + + auto restored_dir = restore_from_edit(edit); + auto snap = restored_dir->createSnapshot(); + auto ext_ids = restored_dir->getAliveExternalIds(TEST_NAMESPACE_ID); + ASSERT_EQ(ext_ids.size(), 1); + ASSERT_EQ(*ext_ids.begin(), target_id); + } + // Reuse same id: REF/DEL/REF another id + { + PageEntryV3 entry_50{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_51{.file_id = 2, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntriesEdit edit; + edit.put(50, entry_50); + edit.put(51, entry_51); + edit.ref(target_id, 50); + edit.del(target_id); + // restart and reuse id=target_id as external to replace put + edit.ref(target_id, 51); + + auto restored_dir = restore_from_edit(edit); + auto snap = restored_dir->createSnapshot(); + ASSERT_EQ(restored_dir->getNormalPageId(target_id, snap).low, 51); + } +} +CATCH + TEST_F(PageDirectoryTest, GetMaxId) try { @@ -2127,6 +2432,81 @@ try ASSERT_EQ(dir->getMaxId(medium), 320); ASSERT_EQ(dir->getMaxId(large), 2); } + + { + PageEntriesEdit edit; + edit.del(buildV3Id(medium, 320)); + dir->apply(std::move(edit)); + ASSERT_EQ(dir->getMaxId(medium), 300); + } + + { + PageEntriesEdit edit; + edit.del(buildV3Id(medium, 300)); + dir->apply(std::move(edit)); + ASSERT_EQ(dir->getMaxId(medium), 0); + } +} +CATCH + +TEST_F(PageDirectoryTest, GetMaxIdAfterDelete) +try +{ + /// test for deleting put + PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(1, entry1); + edit.put(2, entry2); + dir->apply(std::move(edit)); + } + + ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 2); + + { + PageEntriesEdit edit; + edit.del(2); + dir->apply(std::move(edit)); + } + ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 1); + + { + PageEntriesEdit edit; + edit.del(1); + dir->apply(std::move(edit)); + } + ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 0); + + dir->gcInMemEntries(); + ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 0); + + /// test for deleting put_ext/ref + + { + PageEntriesEdit edit; + edit.putExternal(1); + edit.ref(2, 1); + dir->apply(std::move(edit)); + } + + { + PageEntriesEdit edit; + edit.del(1); + dir->apply(std::move(edit)); + } + ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 2); + dir->gcInMemEntries(); + ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 2); + + { + PageEntriesEdit edit; + edit.del(2); + dir->apply(std::move(edit)); + } + ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 0); + dir->gcInMemEntries(); + ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 0); } CATCH diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index 4bd3b2832b0..b0c2625466d 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -67,15 +67,6 @@ class PageStorageTest : public DB::base::TiFlashStorageTestBasic } - std::pair, std::map> reopen() - { - auto path = getTemporaryPath(); - auto delegator = std::make_shared(path); - auto storage = std::make_shared("test.t", delegator, config, file_provider); - auto max_ids = storage->restore(); - return {storage, max_ids}; - } - protected: FileProviderPtr file_provider; std::unique_ptr path_pool; @@ -1310,41 +1301,5 @@ try } CATCH -TEST_F(PageStorageTest, getMaxIdsFromRestore) -try -{ - { - WriteBatch batch; - batch.putExternal(1, 0); - batch.putExternal(2, 0); - batch.delPage(1); - batch.delPage(2); - page_storage->write(std::move(batch)); - - WriteBatch batch2{TEST_NAMESPACE_ID + 1}; - batch2.putExternal(1, 0); - batch2.putExternal(2, 0); - batch2.putRefPage(3, 1); - batch2.putRefPage(100, 2); - page_storage->write(std::move(batch2)); - - WriteBatch batch3{TEST_NAMESPACE_ID + 2}; - batch3.putExternal(1, 0); - batch3.putExternal(2, 0); - batch3.putRefPage(3, 1); - batch3.putRefPage(10, 2); - batch3.delPage(10); - page_storage->write(std::move(batch3)); - } - - page_storage = nullptr; - auto [page_storage, max_ids] = reopen(); - ASSERT_EQ(max_ids.size(), 3); - ASSERT_EQ(max_ids[TEST_NAMESPACE_ID], 2); // external page 2 is marked as deleted, but we can still restore it. - ASSERT_EQ(max_ids[TEST_NAMESPACE_ID + 1], 100); - ASSERT_EQ(max_ids[TEST_NAMESPACE_ID + 2], 10); // page 10 is marked as deleted, but we can still restore it. -} -CATCH - } // namespace PS::V3::tests } // namespace DB diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp index 2a5714de027..d3fdafe57e8 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp @@ -43,16 +43,17 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic storage_path_pool_v3 = std::make_unique(Strings{path}, Strings{path}, Strings{}, std::make_shared(0, paths, caps, Strings{}, caps), global_context.getFileProvider(), true); global_context.setPageStorageRunMode(PageStorageRunMode::MIX_MODE); + if (!global_context.getGlobalStoragePool()) + global_context.initializeGlobalStoragePoolIfNeed(*storage_path_pool_v3); } void SetUp() override { - auto & global_context = DB::tests::TiFlashTestEnv::getGlobalContext(); - global_context.setPageStorageRunMode(PageStorageRunMode::MIX_MODE); TiFlashStorageTestBasic::SetUp(); const auto & path = getTemporaryPath(); createIfNotExist(path); + auto & global_context = DB::tests::TiFlashTestEnv::getGlobalContext(); std::vector caps = {}; Strings paths = {path}; @@ -74,7 +75,7 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic PageStorageRunMode reloadMixedStoragePool() { - db_context->setPageStorageRunMode(PageStorageRunMode::MIX_MODE); + DB::tests::TiFlashTestEnv::getContext().setPageStorageRunMode(PageStorageRunMode::MIX_MODE); PageStorageRunMode run_mode = storage_pool_mix->restore(); page_writer_mix = storage_pool_mix->logWriter(); page_reader_mix = storage_pool_mix->logReader(); @@ -83,7 +84,7 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic void reloadV2StoragePool() { - db_context->setPageStorageRunMode(PageStorageRunMode::ONLY_V2); + DB::tests::TiFlashTestEnv::getContext().setPageStorageRunMode(PageStorageRunMode::ONLY_V2); storage_pool_v2->restore(); page_writer_v2 = storage_pool_v2->logWriter(); page_reader_v2 = storage_pool_v2->logReader(); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp index fadc0fb3bae..89c4e54f7e7 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp @@ -36,7 +36,7 @@ TEST(WALSeriTest, AllPuts) { PageEntryV3 entry_p1{.file_id = 1, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_p2{.file_id = 1, .size = 2, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageVersionType ver20(/*seq=*/20); + PageVersion ver20(/*seq=*/20); PageEntriesEdit edit; edit.put(1, entry_p1); edit.put(2, entry_p2); @@ -58,7 +58,7 @@ try { PageEntryV3 entry_p3{.file_id = 1, .size = 3, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_p5{.file_id = 1, .size = 5, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageVersionType ver21(/*seq=*/21); + PageVersion ver21(/*seq=*/21); PageEntriesEdit edit; edit.put(3, entry_p3); edit.ref(4, 3); @@ -107,8 +107,8 @@ TEST(WALSeriTest, Upserts) PageEntryV3 entry_p1_2{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_p3_2{.file_id = 2, .size = 3, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_p5_2{.file_id = 2, .size = 5, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageVersionType ver20_1(/*seq=*/20, /*epoch*/ 1); - PageVersionType ver21_1(/*seq=*/21, /*epoch*/ 1); + PageVersion ver20_1(/*seq=*/20, /*epoch*/ 1); + PageVersion ver21_1(/*seq=*/21, /*epoch*/ 1); PageEntriesEdit edit; edit.upsertPage(1, ver20_1, entry_p1_2); edit.upsertPage(3, ver21_1, entry_p3_2); @@ -135,9 +135,9 @@ TEST(WALSeriTest, Upserts) TEST(WALSeriTest, RefExternalAndEntry) { - PageVersionType ver1_0(/*seq=*/1, /*epoch*/ 0); - PageVersionType ver2_0(/*seq=*/2, /*epoch*/ 0); - PageVersionType ver3_0(/*seq=*/3, /*epoch*/ 0); + PageVersion ver1_0(/*seq=*/1, /*epoch*/ 0); + PageVersion ver2_0(/*seq=*/2, /*epoch*/ 0); + PageVersion ver3_0(/*seq=*/3, /*epoch*/ 0); { PageEntriesEdit edit; edit.varExternal(1, ver1_0, 2); @@ -407,7 +407,7 @@ try // Stage 2. Apply with only puts PageEntryV3 entry_p1{.file_id = 1, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_p2{.file_id = 1, .size = 2, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageVersionType ver20(/*seq=*/20); + PageVersion ver20(/*seq=*/20); { PageEntriesEdit edit; edit.put(1, entry_p1); @@ -437,7 +437,7 @@ try // Stage 3. Apply with puts and refs PageEntryV3 entry_p3{.file_id = 1, .size = 3, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_p5{.file_id = 1, .size = 5, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageVersionType ver21(/*seq=*/21); + PageVersion ver21(/*seq=*/21); { PageEntriesEdit edit; edit.put(3, entry_p3); @@ -471,8 +471,8 @@ try PageEntryV3 entry_p1_2{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_p3_2{.file_id = 2, .size = 3, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_p5_2{.file_id = 2, .size = 5, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageVersionType ver20_1(/*seq=*/20, /*epoch*/ 1); - PageVersionType ver21_1(/*seq=*/21, /*epoch*/ 1); + PageVersion ver20_1(/*seq=*/20, /*epoch*/ 1); + PageVersion ver21_1(/*seq=*/21, /*epoch*/ 1); { PageEntriesEdit edit; edit.upsertPage(1, ver20_1, entry_p1_2); @@ -516,7 +516,7 @@ try // Stage 1. Apply with only puts PageEntryV3 entry_p1{.file_id = 1, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_p2{.file_id = 1, .size = 2, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageVersionType ver20(/*seq=*/20); + PageVersion ver20(/*seq=*/20); { PageEntriesEdit edit; edit.put(1, entry_p1); @@ -528,7 +528,7 @@ try // Stage 2. Apply with puts and refs PageEntryV3 entry_p3{.file_id = 1, .size = 3, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_p5{.file_id = 1, .size = 5, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageVersionType ver21(/*seq=*/21); + PageVersion ver21(/*seq=*/21); { PageEntriesEdit edit; edit.put(3, entry_p3); @@ -543,8 +543,8 @@ try PageEntryV3 entry_p1_2{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_p3_2{.file_id = 2, .size = 3, .tag = 0, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry_p5_2{.file_id = 2, .size = 5, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageVersionType ver20_1(/*seq=*/20, /*epoch*/ 1); - PageVersionType ver21_1(/*seq=*/21, /*epoch*/ 1); + PageVersion ver20_1(/*seq=*/20, /*epoch*/ 1); + PageVersion ver21_1(/*seq=*/21, /*epoch*/ 1); { PageEntriesEdit edit; edit.upsertPage(1, ver20_1, entry_p1_2); @@ -611,7 +611,7 @@ try PageId page_id = 0; std::vector size_each_edit; size_each_edit.reserve(num_edits_test); - PageVersionType ver(/*seq*/ 32); + PageVersion ver(/*seq*/ 32); for (size_t i = 0; i < num_edits_test; ++i) { PageEntryV3 entry{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; @@ -665,7 +665,7 @@ try // just fill in some random entry for (size_t i = 0; i < 70; ++i) { - snap_edit.varEntry(d_10000(rd), PageVersionType(345, 22), entry, 1); + snap_edit.varEntry(d_10000(rd), PageVersion(345, 22), entry, 1); } std::tie(wal, reader) = WALStore::create(getCurrentTestName(), enc_provider, delegator, config); bool done = wal->saveSnapshot(std::move(file_snap), std::move(snap_edit)); diff --git a/dbms/src/Storages/Page/V3/tests/page_storage_ctl.cpp b/dbms/src/Storages/Page/V3/tests/page_storage_ctl.cpp index 4f3cefa0ad7..7ea8da6892a 100644 --- a/dbms/src/Storages/Page/V3/tests/page_storage_ctl.cpp +++ b/dbms/src/Storages/Page/V3/tests/page_storage_ctl.cpp @@ -399,7 +399,7 @@ class PageStorageControl size_t index = 0; std::cout << fmt::format("Begin to check all of datas CRC. enable_fo_check={}", static_cast(enable_fo_check)) << std::endl; - std::list> error_versioned_pages; + std::list> error_versioned_pages; for (const auto & [internal_id, versioned_entries] : mvcc_table_directory) { if (index == total_pages / 10 * cut_index) @@ -474,4 +474,4 @@ int main(int argc, char ** argv) const auto & options = ControlOptions::parse(argc, argv); PageStorageControl(options).run(); return 0; -} \ No newline at end of file +} diff --git a/dbms/src/TestUtils/TiFlashTestEnv.cpp b/dbms/src/TestUtils/TiFlashTestEnv.cpp index 264fd6009a3..34355d43775 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.cpp +++ b/dbms/src/TestUtils/TiFlashTestEnv.cpp @@ -94,7 +94,6 @@ Context TiFlashTestEnv::getContext(const DB::Settings & settings, Strings testda context.setPath(root_path); auto paths = getPathPool(testdata_path); context.setPathPool(paths.first, paths.second, Strings{}, true, context.getPathCapacity(), context.getFileProvider()); - global_context->initializeGlobalStoragePoolIfNeed(context.getPathPool()); context.getSettingsRef() = settings; return context; } diff --git a/tests/delta-merge-test/ddl/alter.test b/tests/delta-merge-test/ddl/alter.test index 3dc57b05843..4bf405ac9e1 100644 --- a/tests/delta-merge-test/ddl/alter.test +++ b/tests/delta-merge-test/ddl/alter.test @@ -73,19 +73,18 @@ ## rename table -# FIXME: No support rename after PR 4850 (PageStorage V3) -# >> drop table if exists dm_test_renamed -# >> rename table dm_test to dm_test_renamed -# >> select * from dm_test -# Received exception from server (version {#WORD}): -# Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.dm_test doesn't exist.. +>> drop table if exists dm_test_renamed +>> rename table dm_test to dm_test_renamed +>> select * from dm_test +Received exception from server (version {#WORD}): +Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.dm_test doesn't exist.. -# >> select * from dm_test_renamed -# ┌─a─┬────b─┬─────c─┬────d─┐ -# │ 1 │ 0 │ 0 │ \N │ -# │ 2 │ 1024 │ 65535 │ 4096 │ -# │ 3 │ 2048 │ 65536 │ \N │ -# └───┴──────┴───────┴──────┘ +>> select * from dm_test_renamed +┌─a─┬────b─┬─────c─┬────d─┐ +│ 1 │ 0 │ 0 │ \N │ +│ 2 │ 1024 │ 65535 │ 4096 │ +│ 3 │ 2048 │ 65536 │ \N │ +└───┴──────┴───────┴──────┘ ## Clean up diff --git a/tests/delta-merge-test/raft/schema/partition_table_restart.test b/tests/delta-merge-test/raft/schema/partition_table_restart.test index bd0facdc153..893bb617af4 100644 --- a/tests/delta-merge-test/raft/schema/partition_table_restart.test +++ b/tests/delta-merge-test/raft/schema/partition_table_restart.test @@ -31,12 +31,11 @@ => DBGInvoke __mock_tidb_partition(default, test, 9998) => DBGInvoke __mock_tidb_partition(default, test, 9997) => DBGInvoke __refresh_schemas() + => drop table default.test_9997 # schema syncer guarantees logical table creation at last, so there won't be cases that logical table exists whereas physical table not. => drop table default.test => DBGInvoke __reset_schemas() -# remove obsolete entry left by previous dropped table -=> DBGInvoke __gc_global_storage_pool() => DBGInvoke __add_column_to_tidb_table(default, test, 'col_3 Nullable(Int8)') => DBGInvoke __rename_tidb_table(default, test, test1) diff --git a/tests/delta-merge-test/raft/txn_mock/partition_table.test b/tests/delta-merge-test/raft/txn_mock/partition_table.test index 3967d34b48e..2f8e67a61a8 100644 --- a/tests/delta-merge-test/raft/txn_mock/partition_table.test +++ b/tests/delta-merge-test/raft/txn_mock/partition_table.test @@ -94,20 +94,19 @@ │ 0 │ └──────────────┘ -# FIXME: No support rename after PR 4850 (PageStorage V3) -# => DBGInvoke __rename_tidb_table(default, test, test1) -# => DBGInvoke __refresh_schemas() -# => select count(*) from default.test1_9997 -# ┌─count()─┐ -# │ 2 │ -# └─────────┘ - -# => DBGInvoke __drop_tidb_table(default, test1) -# => DBGInvoke __refresh_schemas() -# => DBGInvoke is_tombstone(default, test1_9999) -# ┌─is_tombstone(default, test_9999)─┐ -# │ true │ -# └──────────────────────────────────┘ +=> DBGInvoke __rename_tidb_table(default, test, test1) +=> DBGInvoke __refresh_schemas() +=> select count(*) from default.test1_9997 +┌─count()─┐ +│ 2 │ +└─────────┘ + +=> DBGInvoke __drop_tidb_table(default, test1) +=> DBGInvoke __refresh_schemas() +=> DBGInvoke is_tombstone(default, test1_9999) +┌─is_tombstone(default, test_9999)─┐ +│ true │ +└──────────────────────────────────┘ => drop table if exists default.test => drop table if exists default.test1