Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a global max_id to fix reuse page_id problem (#4948) #4973

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,19 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
if (const auto first_segment_entry = storage_pool->metaReader()->getPageEntry(DELTA_MERGE_FIRST_SEGMENT_ID);
!first_segment_entry.isValid())
{
// Create the first segment.
auto segment_id = storage_pool->newMetaPageId();
if (segment_id != DELTA_MERGE_FIRST_SEGMENT_ID)
throw Exception(fmt::format("The first segment id should be {}", DELTA_MERGE_FIRST_SEGMENT_ID), ErrorCodes::LOGICAL_ERROR);
{
if (page_storage_run_mode == PageStorageRunMode::ONLY_V2)
{
throw Exception(fmt::format("The first segment id should be {}", DELTA_MERGE_FIRST_SEGMENT_ID), ErrorCodes::LOGICAL_ERROR);
}

// In ONLY_V3 or MIX_MODE, If create a new DeltaMergeStore
// Should used fixed DELTA_MERGE_FIRST_SEGMENT_ID to create first segment
segment_id = DELTA_MERGE_FIRST_SEGMENT_ID;
}

auto first_segment
= Segment::newSegment(*dm_context, store_columns, RowKeyRange::newAll(is_common_handle, rowkey_column_size), segment_id, 0);
segments.emplace(first_segment->getRowKeyRange().getEnd(), first_segment);
Expand Down
24 changes: 12 additions & 12 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,18 +378,18 @@ PageStorageRunMode StoragePool::restore()
data_storage_v2->restore();
meta_storage_v2->restore();

max_log_page_id = log_storage_v2->getMaxId(ns_id);
max_data_page_id = data_storage_v2->getMaxId(ns_id);
max_meta_page_id = meta_storage_v2->getMaxId(ns_id);
max_log_page_id = log_storage_v2->getMaxId();
max_data_page_id = data_storage_v2->getMaxId();
max_meta_page_id = meta_storage_v2->getMaxId();

storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV2Only};
break;
}
case PageStorageRunMode::ONLY_V3:
{
max_log_page_id = log_storage_v3->getMaxId(ns_id);
max_data_page_id = data_storage_v3->getMaxId(ns_id);
max_meta_page_id = meta_storage_v3->getMaxId(ns_id);
max_log_page_id = log_storage_v3->getMaxId();
max_data_page_id = data_storage_v3->getMaxId();
max_meta_page_id = meta_storage_v3->getMaxId();

storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only};
break;
Expand Down Expand Up @@ -456,18 +456,18 @@ PageStorageRunMode StoragePool::restore()
data_storage_writer = std::make_shared<PageWriter>(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, data_storage_v3);
meta_storage_writer = std::make_shared<PageWriter>(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, meta_storage_v3);

max_log_page_id = log_storage_v3->getMaxId(ns_id);
max_data_page_id = data_storage_v3->getMaxId(ns_id);
max_meta_page_id = meta_storage_v3->getMaxId(ns_id);
max_log_page_id = log_storage_v3->getMaxId();
max_data_page_id = data_storage_v3->getMaxId();
max_meta_page_id = meta_storage_v3->getMaxId();

run_mode = PageStorageRunMode::ONLY_V3;
storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only};
}
else // Still running Mix Mode
{
max_log_page_id = std::max(log_storage_v2->getMaxId(ns_id), log_storage_v3->getMaxId(ns_id));
max_data_page_id = std::max(data_storage_v2->getMaxId(ns_id), data_storage_v3->getMaxId(ns_id));
max_meta_page_id = std::max(meta_storage_v2->getMaxId(ns_id), meta_storage_v3->getMaxId(ns_id));
max_log_page_id = std::max(log_storage_v2->getMaxId(), log_storage_v3->getMaxId());
max_data_page_id = std::max(data_storage_v2->getMaxId(), data_storage_v3->getMaxId());
max_meta_page_id = std::max(meta_storage_v2->getMaxId(), meta_storage_v3->getMaxId());
storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolMixMode};
}
break;
Expand Down
11 changes: 9 additions & 2 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,18 @@ class StoragePool : private boost::noncopyable
// Caller must cancel gc tasks before drop
void drop();

PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who);
// For function `newLogPageId`,`newMetaPageId`,`newDataPageIdForDTFile`:
// For PageStorageRunMode::ONLY_V2, every table have its own three PageStorage (meta/data/log).
// So these functions return the Page id starts from 1 and is continuously incremented.
// For PageStorageRunMode::ONLY_V3/MIX_MODE, PageStorage is global(distinguish by ns_id for different table).
// In order to avoid Page id from being reused (and cause troubles while restoring WAL from disk),
// StoragePool will assign the max_log_page_id/max_meta_page_id/max_data_page_id by the global max id
// regardless of ns_id while being restored. This causes the ids in a table to not be continuously incremented.

PageId maxMetaPageId() { return max_meta_page_id; }
PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who);
PageId newLogPageId() { return ++max_log_page_id; }
PageId newMetaPageId() { return ++max_meta_page_id; }

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
Expand Down
12 changes: 11 additions & 1 deletion dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,17 @@ class PageStorage : private boost::noncopyable

virtual void drop() = 0;

virtual PageId getMaxId(NamespaceId ns_id) = 0;
// Get the max id from PageStorage.
//
// For V2, every table have its own three PageStorage (meta/data/log).
// So this function return the Page id starts from 0 and is continuously incremented to
// new pages.
// For V3, PageStorage is global(distinguish by ns_id for different table).
// In order to avoid Page id from being reused (and cause troubles while restoring WAL from disk),
// this function returns the global max id regardless of ns_id. This causes the ids in a table
// to not be continuously incremented.
// Note that Page id 1 in each ns_id is special.
virtual PageId getMaxId() = 0;

virtual SnapshotPtr getSnapshot(const String & tracing_id) = 0;

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/Page/V2/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ void PageStorage::restore()
LOG_FMT_INFO(log, "{} restore {} pages, write batch sequence: {}, {}", storage_name, num_pages, write_batch_seq, statistics.toString());
}

PageId PageStorage::getMaxId(NamespaceId /*ns_id*/)
PageId PageStorage::getMaxId()
{
std::lock_guard write_lock(write_mutex);
return versioned_page_entries.getSnapshot("")->version()->maxId();
Expand Down Expand Up @@ -893,9 +893,9 @@ void PageStorage::drop()
struct GcContext
{
PageFileIdAndLevel min_file_id;
PageFile::Type min_file_type;
PageFile::Type min_file_type = PageFile::Type::Invalid;
PageFileIdAndLevel max_file_id;
PageFile::Type max_file_type;
PageFile::Type max_file_type = PageFile::Type::Invalid;
size_t num_page_files = 0;
size_t num_legacy_files = 0;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V2/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class PageStorage : public DB::PageStorage

void drop() override;

PageId getMaxId(NamespaceId ns_id) override;
PageId getMaxId() override;

PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) override;

Expand Down
49 changes: 7 additions & 42 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,8 @@ void VersionedPageEntries::collapseTo(const UInt64 seq, const PageIdV3Internal p
*************************/

PageDirectory::PageDirectory(String storage_name, WALStorePtr && wal_, UInt64 max_persisted_log_files_)
: sequence(0)
: max_page_id(0)
, sequence(0)
, wal(std::move(wal_))
, max_persisted_log_files(max_persisted_log_files_)
, log(Logger::get("PageDirectory", std::move(storage_name)))
Expand Down Expand Up @@ -923,49 +924,10 @@ PageIdV3Internal PageDirectory::getNormalPageId(PageIdV3Internal page_id, const
}
}

PageId PageDirectory::getMaxId(NamespaceId ns_id) const
PageId PageDirectory::getMaxId() const
{
std::shared_lock read_lock(table_rw_mutex);
PageIdV3Internal upper_bound = buildV3Id(ns_id, UINT64_MAX);

auto iter = mvcc_table_directory.upper_bound(upper_bound);
if (iter == mvcc_table_directory.begin())
{
// The smallest page id is greater than the target page id or mvcc_table_directory is empty,
// and it means no page id is less than or equal to the target page id, return 0.
return 0;
}
else
{
// iter is not at the beginning and mvcc_table_directory is not empty,
// so iter-- must be a valid iterator, and it's the largest page id which is smaller than the target page id.
iter--;

do
{
// Can't find any entries in current ns_id
if (iter->first.high != ns_id)
{
break;
}

// Check and return whether this id is visible, otherwise continue to check the previous one.
if (iter->second->isVisible(UINT64_MAX - 1))
{
return iter->first.low;
}

// Current entry/ref/external is deleted and there are no entries before it.
if (iter == mvcc_table_directory.begin())
{
break;
}

iter--;
} while (true);

return 0;
}
return max_page_id;
}

std::set<PageIdV3Internal> PageDirectory::getAllPageIds()
Expand Down Expand Up @@ -1069,6 +1031,9 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write
// stage 2, create entry version list for page_id.
for (const auto & r : edit.getRecords())
{
// Protected in write_lock
max_page_id = std::max(max_page_id, r.page_id.low);

auto [iter, created] = mvcc_table_directory.insert(std::make_pair(r.page_id, nullptr));
if (created)
{
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class PageDirectory
}
#endif

PageId getMaxId(NamespaceId ns_id) const;
PageId getMaxId() const;

std::set<PageIdV3Internal> getAllPageIds();

Expand Down Expand Up @@ -397,6 +397,7 @@ class PageDirectory
}

private:
PageId max_page_id;
std::atomic<UInt64> sequence;
mutable std::shared_mutex table_rw_mutex;
MVCCMapType mvcc_table_directory;
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderP
// After restoring from the disk, we need cleanup all invalid entries in memory, or it will
// try to run GC again on some entries that are already marked as invalid in BlobStore.
dir->gcInMemEntries();
LOG_FMT_INFO(DB::Logger::get("PageDirectoryFactory"), "PageDirectory restored [max_page_id={}] [max_applied_ver={}]", dir->getMaxId(), dir->sequence);

if (blob_stats)
{
Expand Down Expand Up @@ -111,7 +112,6 @@ void PageDirectoryFactory::loadEdit(const PageDirectoryPtr & dir, const PageEntr
{
if (max_applied_ver < r.version)
max_applied_ver = r.version;
max_applied_page_id = std::max(r.page_id, max_applied_page_id);

// We can not avoid page id from being reused under some corner situation. Try to do gcInMemEntries
// and apply again to resolve the error.
Expand All @@ -135,6 +135,8 @@ bool PageDirectoryFactory::applyRecord(
iter->second = std::make_shared<VersionedPageEntries>();
}

dir->max_page_id = std::max(dir->max_page_id, r.page_id.low);

const auto & version_list = iter->second;
const auto & restored_version = r.version;
try
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectoryFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class PageDirectoryFactory
{
public:
PageVersion max_applied_ver;
PageIdV3Internal max_applied_page_id;

PageDirectoryFactory & setBlobStore(BlobStore & blob_store)
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ void PageStorageImpl::restore()
.create(storage_name, file_provider, delegator, parseWALConfig(config));
}

PageId PageStorageImpl::getMaxId(NamespaceId ns_id)
PageId PageStorageImpl::getMaxId()
{
return page_directory->getMaxId(ns_id);
return page_directory->getMaxId();
}

void PageStorageImpl::drop()
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/PageStorageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class PageStorageImpl : public DB::PageStorage

void drop() override;

PageId getMaxId(NamespaceId ns_id) override;
PageId getMaxId() override;

PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) override;

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/WAL/serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ void deserializeFrom(ReadBuffer & buf, PageEntriesEdit & edit)
break;
}
default:
throw Exception(fmt::format("Unknown record type: {}", record_type));
throw Exception(fmt::format("Unknown record type: {}", record_type), ErrorCodes::LOGICAL_ERROR);
}
}
}
Expand Down Expand Up @@ -261,7 +261,7 @@ PageEntriesEdit deserializeFrom(std::string_view record)
UInt32 version = 0;
readIntBinary(version, buf);
if (version != 1)
throw Exception("");
throw Exception(fmt::format("Unknown version for PageEntriesEdit deser [version={}]", version), ErrorCodes::LOGICAL_ERROR);

deserializeFrom(buf, edit);
return edit;
Expand Down
Loading