diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 27de092c26a..a74404f3dbb 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -244,10 +244,19 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, if (const auto first_segment_entry = storage_pool->metaReader()->getPageEntry(DELTA_MERGE_FIRST_SEGMENT_ID); !first_segment_entry.isValid()) { - // Create the first segment. auto segment_id = storage_pool->newMetaPageId(); if (segment_id != DELTA_MERGE_FIRST_SEGMENT_ID) - throw Exception(fmt::format("The first segment id should be {}", DELTA_MERGE_FIRST_SEGMENT_ID), ErrorCodes::LOGICAL_ERROR); + { + if (page_storage_run_mode == PageStorageRunMode::ONLY_V2) + { + throw Exception(fmt::format("The first segment id should be {}", DELTA_MERGE_FIRST_SEGMENT_ID), ErrorCodes::LOGICAL_ERROR); + } + + // In ONLY_V3 or MIX_MODE, If create a new DeltaMergeStore + // Should used fixed DELTA_MERGE_FIRST_SEGMENT_ID to create first segment + segment_id = DELTA_MERGE_FIRST_SEGMENT_ID; + } + auto first_segment = Segment::newSegment(*dm_context, store_columns, RowKeyRange::newAll(is_common_handle, rowkey_column_size), segment_id, 0); segments.emplace(first_segment->getRowKeyRange().getEnd(), first_segment); diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index fbb631064a7..a040c5b6c6a 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -378,18 +378,18 @@ PageStorageRunMode StoragePool::restore() data_storage_v2->restore(); meta_storage_v2->restore(); - max_log_page_id = log_storage_v2->getMaxId(ns_id); - max_data_page_id = data_storage_v2->getMaxId(ns_id); - max_meta_page_id = meta_storage_v2->getMaxId(ns_id); + max_log_page_id = log_storage_v2->getMaxId(); + max_data_page_id = data_storage_v2->getMaxId(); + max_meta_page_id = meta_storage_v2->getMaxId(); storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV2Only}; break; } case PageStorageRunMode::ONLY_V3: { - max_log_page_id = log_storage_v3->getMaxId(ns_id); - max_data_page_id = data_storage_v3->getMaxId(ns_id); - max_meta_page_id = meta_storage_v3->getMaxId(ns_id); + max_log_page_id = log_storage_v3->getMaxId(); + max_data_page_id = data_storage_v3->getMaxId(); + max_meta_page_id = meta_storage_v3->getMaxId(); storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only}; break; @@ -456,18 +456,18 @@ PageStorageRunMode StoragePool::restore() data_storage_writer = std::make_shared(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, data_storage_v3); meta_storage_writer = std::make_shared(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, meta_storage_v3); - max_log_page_id = log_storage_v3->getMaxId(ns_id); - max_data_page_id = data_storage_v3->getMaxId(ns_id); - max_meta_page_id = meta_storage_v3->getMaxId(ns_id); + max_log_page_id = log_storage_v3->getMaxId(); + max_data_page_id = data_storage_v3->getMaxId(); + max_meta_page_id = meta_storage_v3->getMaxId(); run_mode = PageStorageRunMode::ONLY_V3; storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only}; } else // Still running Mix Mode { - max_log_page_id = std::max(log_storage_v2->getMaxId(ns_id), log_storage_v3->getMaxId(ns_id)); - max_data_page_id = std::max(data_storage_v2->getMaxId(ns_id), data_storage_v3->getMaxId(ns_id)); - max_meta_page_id = std::max(meta_storage_v2->getMaxId(ns_id), meta_storage_v3->getMaxId(ns_id)); + max_log_page_id = std::max(log_storage_v2->getMaxId(), log_storage_v3->getMaxId()); + max_data_page_id = std::max(data_storage_v2->getMaxId(), data_storage_v3->getMaxId()); + max_meta_page_id = std::max(meta_storage_v2->getMaxId(), meta_storage_v3->getMaxId()); storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolMixMode}; } break; diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index 7ba6dd85995..d05454a5431 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -149,11 +149,18 @@ class StoragePool : private boost::noncopyable // Caller must cancel gc tasks before drop void drop(); - PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who); + // For function `newLogPageId`,`newMetaPageId`,`newDataPageIdForDTFile`: + // For PageStorageRunMode::ONLY_V2, every table have its own three PageStorage (meta/data/log). + // So these functions return the Page id starts from 1 and is continuously incremented. + // For PageStorageRunMode::ONLY_V3/MIX_MODE, PageStorage is global(distinguish by ns_id for different table). + // In order to avoid Page id from being reused (and cause troubles while restoring WAL from disk), + // StoragePool will assign the max_log_page_id/max_meta_page_id/max_data_page_id by the global max id + // regardless of ns_id while being restored. This causes the ids in a table to not be continuously incremented. - PageId maxMetaPageId() { return max_meta_page_id; } + PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who); PageId newLogPageId() { return ++max_log_page_id; } PageId newMetaPageId() { return ++max_meta_page_id; } + #ifndef DBMS_PUBLIC_GTEST private: #endif diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index 481888bdf33..479c368a585 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -233,7 +233,17 @@ class PageStorage : private boost::noncopyable virtual void drop() = 0; - virtual PageId getMaxId(NamespaceId ns_id) = 0; + // Get the max id from PageStorage. + // + // For V2, every table have its own three PageStorage (meta/data/log). + // So this function return the Page id starts from 0 and is continuously incremented to + // new pages. + // For V3, PageStorage is global(distinguish by ns_id for different table). + // In order to avoid Page id from being reused (and cause troubles while restoring WAL from disk), + // this function returns the global max id regardless of ns_id. This causes the ids in a table + // to not be continuously incremented. + // Note that Page id 1 in each ns_id is special. + virtual PageId getMaxId() = 0; virtual SnapshotPtr getSnapshot(const String & tracing_id) = 0; diff --git a/dbms/src/Storages/Page/V2/PageStorage.cpp b/dbms/src/Storages/Page/V2/PageStorage.cpp index 3ab62d55242..7a23afb11d4 100644 --- a/dbms/src/Storages/Page/V2/PageStorage.cpp +++ b/dbms/src/Storages/Page/V2/PageStorage.cpp @@ -355,7 +355,7 @@ void PageStorage::restore() LOG_FMT_INFO(log, "{} restore {} pages, write batch sequence: {}, {}", storage_name, num_pages, write_batch_seq, statistics.toString()); } -PageId PageStorage::getMaxId(NamespaceId /*ns_id*/) +PageId PageStorage::getMaxId() { std::lock_guard write_lock(write_mutex); return versioned_page_entries.getSnapshot("")->version()->maxId(); @@ -893,9 +893,9 @@ void PageStorage::drop() struct GcContext { PageFileIdAndLevel min_file_id; - PageFile::Type min_file_type; + PageFile::Type min_file_type = PageFile::Type::Invalid; PageFileIdAndLevel max_file_id; - PageFile::Type max_file_type; + PageFile::Type max_file_type = PageFile::Type::Invalid; size_t num_page_files = 0; size_t num_legacy_files = 0; diff --git a/dbms/src/Storages/Page/V2/PageStorage.h b/dbms/src/Storages/Page/V2/PageStorage.h index cb55a769f37..b9e16fd1775 100644 --- a/dbms/src/Storages/Page/V2/PageStorage.h +++ b/dbms/src/Storages/Page/V2/PageStorage.h @@ -95,7 +95,7 @@ class PageStorage : public DB::PageStorage void drop() override; - PageId getMaxId(NamespaceId ns_id) override; + PageId getMaxId() override; PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) override; diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 06b26156529..64a3fead674 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -685,7 +685,8 @@ void VersionedPageEntries::collapseTo(const UInt64 seq, const PageIdV3Internal p *************************/ PageDirectory::PageDirectory(String storage_name, WALStorePtr && wal_, UInt64 max_persisted_log_files_) - : sequence(0) + : max_page_id(0) + , sequence(0) , wal(std::move(wal_)) , max_persisted_log_files(max_persisted_log_files_) , log(Logger::get("PageDirectory", std::move(storage_name))) @@ -923,49 +924,10 @@ PageIdV3Internal PageDirectory::getNormalPageId(PageIdV3Internal page_id, const } } -PageId PageDirectory::getMaxId(NamespaceId ns_id) const +PageId PageDirectory::getMaxId() const { std::shared_lock read_lock(table_rw_mutex); - PageIdV3Internal upper_bound = buildV3Id(ns_id, UINT64_MAX); - - auto iter = mvcc_table_directory.upper_bound(upper_bound); - if (iter == mvcc_table_directory.begin()) - { - // The smallest page id is greater than the target page id or mvcc_table_directory is empty, - // and it means no page id is less than or equal to the target page id, return 0. - return 0; - } - else - { - // iter is not at the beginning and mvcc_table_directory is not empty, - // so iter-- must be a valid iterator, and it's the largest page id which is smaller than the target page id. - iter--; - - do - { - // Can't find any entries in current ns_id - if (iter->first.high != ns_id) - { - break; - } - - // Check and return whether this id is visible, otherwise continue to check the previous one. - if (iter->second->isVisible(UINT64_MAX - 1)) - { - return iter->first.low; - } - - // Current entry/ref/external is deleted and there are no entries before it. - if (iter == mvcc_table_directory.begin()) - { - break; - } - - iter--; - } while (true); - - return 0; - } + return max_page_id; } std::set PageDirectory::getAllPageIds() @@ -1069,6 +1031,9 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write // stage 2, create entry version list for page_id. for (const auto & r : edit.getRecords()) { + // Protected in write_lock + max_page_id = std::max(max_page_id, r.page_id.low); + auto [iter, created] = mvcc_table_directory.insert(std::make_pair(r.page_id, nullptr)); if (created) { diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 14833245c7a..a3c6b079fee 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -347,7 +347,7 @@ class PageDirectory } #endif - PageId getMaxId(NamespaceId ns_id) const; + PageId getMaxId() const; std::set getAllPageIds(); @@ -397,6 +397,7 @@ class PageDirectory } private: + PageId max_page_id; std::atomic sequence; mutable std::shared_mutex table_rw_mutex; MVCCMapType mvcc_table_directory; diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index 40b12b64f06..0592d1ddaa8 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -40,6 +40,7 @@ PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderP // After restoring from the disk, we need cleanup all invalid entries in memory, or it will // try to run GC again on some entries that are already marked as invalid in BlobStore. dir->gcInMemEntries(); + LOG_FMT_INFO(DB::Logger::get("PageDirectoryFactory"), "PageDirectory restored [max_page_id={}] [max_applied_ver={}]", dir->getMaxId(), dir->sequence); if (blob_stats) { @@ -111,7 +112,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr { if (max_applied_ver < r.version) max_applied_ver = r.version; - max_applied_page_id = std::max(r.page_id, max_applied_page_id); // We can not avoid page id from being reused under some corner situation. Try to do gcInMemEntries // and apply again to resolve the error. @@ -135,6 +135,8 @@ bool PageDirectoryFactory::applyRecord( iter->second = std::make_shared(); } + dir->max_page_id = std::max(dir->max_page_id, r.page_id.low); + const auto & version_list = iter->second; const auto & restored_version = r.version; try diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h index 11337e4a6cc..e4b76bfba0d 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h @@ -38,7 +38,6 @@ class PageDirectoryFactory { public: PageVersion max_applied_ver; - PageIdV3Internal max_applied_page_id; PageDirectoryFactory & setBlobStore(BlobStore & blob_store) { diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index 8aa9f92675c..58fe4b4dd4c 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -55,9 +55,9 @@ void PageStorageImpl::restore() .create(storage_name, file_provider, delegator, parseWALConfig(config)); } -PageId PageStorageImpl::getMaxId(NamespaceId ns_id) +PageId PageStorageImpl::getMaxId() { - return page_directory->getMaxId(ns_id); + return page_directory->getMaxId(); } void PageStorageImpl::drop() diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.h b/dbms/src/Storages/Page/V3/PageStorageImpl.h index f3b696d0351..082adb8df34 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.h +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.h @@ -64,7 +64,7 @@ class PageStorageImpl : public DB::PageStorage void drop() override; - PageId getMaxId(NamespaceId ns_id) override; + PageId getMaxId() override; PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) override; diff --git a/dbms/src/Storages/Page/V3/WAL/serialize.cpp b/dbms/src/Storages/Page/V3/WAL/serialize.cpp index f8e26617499..6b7bc9b8a21 100644 --- a/dbms/src/Storages/Page/V3/WAL/serialize.cpp +++ b/dbms/src/Storages/Page/V3/WAL/serialize.cpp @@ -218,7 +218,7 @@ void deserializeFrom(ReadBuffer & buf, PageEntriesEdit & edit) break; } default: - throw Exception(fmt::format("Unknown record type: {}", record_type)); + throw Exception(fmt::format("Unknown record type: {}", record_type), ErrorCodes::LOGICAL_ERROR); } } } @@ -261,7 +261,7 @@ PageEntriesEdit deserializeFrom(std::string_view record) UInt32 version = 0; readIntBinary(version, buf); if (version != 1) - throw Exception(""); + throw Exception(fmt::format("Unknown version for PageEntriesEdit deser [version={}]", version), ErrorCodes::LOGICAL_ERROR); deserializeFrom(buf, edit); return edit; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index 4511cc8ddd7..dfa33824473 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -2399,117 +2399,6 @@ try } CATCH -TEST_F(PageDirectoryTest, GetMaxId) -try -{ - NamespaceId small = 20; - NamespaceId medium = 50; - NamespaceId large = 100; - ASSERT_EQ(dir->getMaxId(small), 0); - ASSERT_EQ(dir->getMaxId(medium), 0); - ASSERT_EQ(dir->getMaxId(large), 0); - - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - { - PageEntriesEdit edit; - edit.put(buildV3Id(small, 1), entry1); - edit.put(buildV3Id(large, 2), entry2); - dir->apply(std::move(edit)); - ASSERT_EQ(dir->getMaxId(small), 1); - ASSERT_EQ(dir->getMaxId(medium), 0); - ASSERT_EQ(dir->getMaxId(large), 2); - } - - PageEntryV3 entry3{.file_id = 3, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry4{.file_id = 4, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - { - PageEntriesEdit edit; - edit.put(buildV3Id(medium, 300), entry1); - edit.put(buildV3Id(medium, 320), entry2); - dir->apply(std::move(edit)); - ASSERT_EQ(dir->getMaxId(small), 1); - ASSERT_EQ(dir->getMaxId(medium), 320); - ASSERT_EQ(dir->getMaxId(large), 2); - } - - { - PageEntriesEdit edit; - edit.del(buildV3Id(medium, 320)); - dir->apply(std::move(edit)); - ASSERT_EQ(dir->getMaxId(medium), 300); - } - - { - PageEntriesEdit edit; - edit.del(buildV3Id(medium, 300)); - dir->apply(std::move(edit)); - ASSERT_EQ(dir->getMaxId(medium), 0); - } -} -CATCH - -TEST_F(PageDirectoryTest, GetMaxIdAfterDelete) -try -{ - /// test for deleting put - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - { - PageEntriesEdit edit; - edit.put(1, entry1); - edit.put(2, entry2); - dir->apply(std::move(edit)); - } - - ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 2); - - { - PageEntriesEdit edit; - edit.del(2); - dir->apply(std::move(edit)); - } - ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 1); - - { - PageEntriesEdit edit; - edit.del(1); - dir->apply(std::move(edit)); - } - ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 0); - - dir->gcInMemEntries(); - ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 0); - - /// test for deleting put_ext/ref - - { - PageEntriesEdit edit; - edit.putExternal(1); - edit.ref(2, 1); - dir->apply(std::move(edit)); - } - - { - PageEntriesEdit edit; - edit.del(1); - dir->apply(std::move(edit)); - } - ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 2); - dir->gcInMemEntries(); - ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 2); - - { - PageEntriesEdit edit; - edit.del(2); - dir->apply(std::move(edit)); - } - ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 0); - dir->gcInMemEntries(); - ASSERT_EQ(dir->getMaxId(TEST_NAMESPACE_ID), 0); -} -CATCH - #undef INSERT_ENTRY_TO #undef INSERT_ENTRY #undef INSERT_ENTRY_ACQ_SNAP diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index 91dfcaac6a8..498fd4124e5 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -1329,5 +1329,56 @@ try } CATCH +TEST_F(PageStorageTest, GetMaxId) +try +{ + NamespaceId small = 20; + NamespaceId medium = 50; + NamespaceId large = 100; + + { + WriteBatch batch{small}; + batch.putExternal(1, 0); + batch.putExternal(1999, 0); + batch.putExternal(2000, 0); + page_storage->write(std::move(batch)); + ASSERT_EQ(page_storage->getMaxId(), 2000); + } + + { + page_storage = reopenWithConfig(config); + ASSERT_EQ(page_storage->getMaxId(), 2000); + } + + { + WriteBatch batch{medium}; + batch.putExternal(1, 0); + batch.putExternal(100, 0); + batch.putExternal(200, 0); + page_storage->write(std::move(batch)); + ASSERT_EQ(page_storage->getMaxId(), 2000); + } + + { + page_storage = reopenWithConfig(config); + ASSERT_EQ(page_storage->getMaxId(), 2000); + } + + { + WriteBatch batch{large}; + batch.putExternal(1, 0); + batch.putExternal(20000, 0); + batch.putExternal(20001, 0); + page_storage->write(std::move(batch)); + ASSERT_EQ(page_storage->getMaxId(), 20001); + } + + { + page_storage = reopenWithConfig(config); + ASSERT_EQ(page_storage->getMaxId(), 20001); + } +} +CATCH + } // namespace PS::V3::tests } // namespace DB