Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the bug for restoring data with duplicated page id #4889

Merged
merged 16 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ extern const int DEADLOCK_AVOIDED = 10013;
extern const int PTHREAD_ERROR = 10014;
extern const int PS_ENTRY_NOT_EXISTS = 10015;
extern const int PS_ENTRY_NO_VALID_VERSION = 10016;
extern const int PS_DIR_APPLY_INVALID_STATUS = 10017;
} // namespace ErrorCodes

} // namespace DB
6 changes: 4 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1650,8 +1650,10 @@ bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool)
auto lock = getLock();
if (shared->global_storage_pool)
{
// GlobalStoragePool may be initialized many times in some test cases for restore.
LOG_WARNING(shared->log, "GlobalStoragePool has already been initialized.");
// Can't init GlobalStoragePool twice.
// Because we won't remove the gc task in BackGroundPool
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
// Also won't remove it from ~GlobalStoragePool()
throw Exception("GlobalStoragePool has already been initialized.", ErrorCodes::LOGICAL_ERROR);
}
CurrentMetrics::set(CurrentMetrics::GlobalStorageRunMode, static_cast<UInt8>(shared->storage_run_mode));
if (shared->storage_run_mode == PageStorageRunMode::MIX_MODE || shared->storage_run_mode == PageStorageRunMode::ONLY_V3)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
try
{
page_storage_run_mode = storage_pool->restore(); // restore from disk
if (!storage_pool->maxMetaPageId())
if (const auto entry1 = storage_pool->metaReader()->getPageEntry(1);
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
!entry1.isValid())
{
// Create the first segment.
auto segment_id = storage_pool->newMetaPageId();
Expand Down
50 changes: 21 additions & 29 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ GlobalStoragePool::~GlobalStoragePool()

void GlobalStoragePool::restore()
{
log_max_ids = log_storage->restore();
data_max_ids = data_storage->restore();
meta_max_ids = meta_storage->restore();
log_storage->restore();
data_storage->restore();
meta_storage->restore();

gc_handle = global_context.getBackgroundPool().addTask(
[this] {
Expand Down Expand Up @@ -301,35 +301,31 @@ PageStorageRunMode StoragePool::restore()
{
case PageStorageRunMode::ONLY_V2:
{
auto log_max_ids = log_storage_v2->restore();
auto data_max_ids = data_storage_v2->restore();
auto meta_max_ids = meta_storage_v2->restore();
log_storage_v2->restore();
data_storage_v2->restore();
meta_storage_v2->restore();

assert(log_max_ids.size() == 1);
assert(data_max_ids.size() == 1);
assert(meta_max_ids.size() == 1);

max_log_page_id = log_max_ids[0];
max_data_page_id = data_max_ids[0];
max_meta_page_id = meta_max_ids[0];
max_log_page_id = log_storage_v2->getMaxId(ns_id);
max_data_page_id = data_storage_v2->getMaxId(ns_id);
max_meta_page_id = meta_storage_v2->getMaxId(ns_id);

storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV2Only};
break;
}
case PageStorageRunMode::ONLY_V3:
{
max_log_page_id = global_storage_pool->getLogMaxId(ns_id);
max_data_page_id = global_storage_pool->getDataMaxId(ns_id);
max_meta_page_id = global_storage_pool->getMetaMaxId(ns_id);
max_log_page_id = global_storage_pool->log_storage->getMaxId(ns_id);
max_data_page_id = global_storage_pool->data_storage->getMaxId(ns_id);
max_meta_page_id = global_storage_pool->meta_storage->getMaxId(ns_id);

storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only};
break;
}
case PageStorageRunMode::MIX_MODE:
{
auto v2_log_max_ids = log_storage_v2->restore();
auto v2_data_max_ids = data_storage_v2->restore();
auto v2_meta_max_ids = meta_storage_v2->restore();
log_storage_v2->restore();
data_storage_v2->restore();
meta_storage_v2->restore();

// The pages on data and log can be rewritten to V3 and the old pages on V2 are deleted by `delta merge`.
// However, the pages on meta V2 can not be deleted. As the pages in meta are small, we perform a forceTransformMetaV2toV3 to convert pages before all.
Expand All @@ -349,10 +345,6 @@ PageStorageRunMode StoragePool::restore()
LOG_FMT_INFO(logger, "Current meta translate already done before restored.[ns_id={}] ", ns_id);
}

assert(v2_log_max_ids.size() == 1);
assert(v2_data_max_ids.size() == 1);
assert(v2_meta_max_ids.size() == 1);

// Check number of valid pages in v2
// If V2 already have no any data in disk, Then change run_mode to ONLY_V3
if (log_storage_v2->getNumberOfPages() == 0 && data_storage_v2->getNumberOfPages() == 0 && meta_storage_v2->getNumberOfPages() == 0)
Expand All @@ -375,18 +367,18 @@ PageStorageRunMode StoragePool::restore()
data_storage_writer = std::make_shared<PageWriter>(run_mode, /*storage_v2_*/ nullptr, data_storage_v3);
meta_storage_writer = std::make_shared<PageWriter>(run_mode, /*storage_v2_*/ nullptr, meta_storage_v3);

max_log_page_id = global_storage_pool->getLogMaxId(ns_id);
max_data_page_id = global_storage_pool->getDataMaxId(ns_id);
max_meta_page_id = global_storage_pool->getMetaMaxId(ns_id);
max_log_page_id = global_storage_pool->log_storage->getMaxId(ns_id);
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
max_data_page_id = global_storage_pool->data_storage->getMaxId(ns_id);
max_meta_page_id = global_storage_pool->meta_storage->getMaxId(ns_id);

run_mode = PageStorageRunMode::ONLY_V3;
storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only};
}
else // Still running Mix Mode
{
max_log_page_id = std::max(v2_log_max_ids[0], global_storage_pool->getLogMaxId(ns_id));
max_data_page_id = std::max(v2_data_max_ids[0], global_storage_pool->getDataMaxId(ns_id));
max_meta_page_id = std::max(v2_meta_max_ids[0], global_storage_pool->getMetaMaxId(ns_id));
max_log_page_id = std::max(log_storage_v2->getMaxId(ns_id), global_storage_pool->log_storage->getMaxId(ns_id));
max_data_page_id = std::max(data_storage_v2->getMaxId(ns_id), global_storage_pool->data_storage->getMaxId(ns_id));
max_meta_page_id = std::max(meta_storage_v2->getMaxId(ns_id), global_storage_pool->meta_storage->getMaxId(ns_id));
storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolMixMode};
}
break;
Expand Down
37 changes: 0 additions & 37 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,39 +51,6 @@ class GlobalStoragePool : private boost::noncopyable

friend class StoragePool;

PageId getLogMaxId(NamespaceId ns_id) const
{
PageId max_log_page_id = 0;
if (const auto & it = log_max_ids.find(ns_id); it != log_max_ids.end())
{
max_log_page_id = it->second;
}

return max_log_page_id;
}

PageId getDataMaxId(NamespaceId ns_id) const
{
PageId max_data_page_id = 0;
if (const auto & it = data_max_ids.find(ns_id); it != data_max_ids.end())
{
max_data_page_id = it->second;
}

return max_data_page_id;
}

PageId getMetaMaxId(NamespaceId ns_id) const
{
PageId max_meta_page_id = 0;
if (const auto & it = meta_max_ids.find(ns_id); it != meta_max_ids.end())
{
max_meta_page_id = it->second;
}

return max_meta_page_id;
}

// GC immediately
// Only used on dbgFuncMisc
bool gc();
Expand All @@ -96,10 +63,6 @@ class GlobalStoragePool : private boost::noncopyable
PageStoragePtr data_storage;
PageStoragePtr meta_storage;

std::map<NamespaceId, PageId> log_max_ids;
std::map<NamespaceId, PageId> data_max_ids;
std::map<NamespaceId, PageId> meta_max_ids;

std::atomic<Timepoint> last_try_gc_time = Clock::now();

Context & global_context;
Expand Down
10 changes: 3 additions & 7 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,12 @@ class PageStorage : private boost::noncopyable

virtual ~PageStorage() = default;

// Return the map[ns_id, max_page_id]
// The caller should ensure that it only allocate new id that is larger than `max_page_id`. Reusing the
// same ID for different kind of write (put/ref/put_external) would make PageStorage run into unexpected error.
//
// Note that for V2, we always return a map with only one element: <ns_id=0, max_id> cause V2 have no
// idea about ns_id.
virtual std::map<NamespaceId, PageId> restore() = 0;
virtual void restore() = 0;

virtual void drop() = 0;

virtual PageId getMaxId(NamespaceId ns_id) = 0;

virtual SnapshotPtr getSnapshot(const String & tracing_id) = 0;

// Get some statistics of all living snapshots and the oldest living snapshot.
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/Page/V2/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ toConcreteSnapshot(const DB::PageStorage::SnapshotPtr & ptr)
return assert_cast<PageStorage::ConcreteSnapshotRawPtr>(ptr.get());
}

std::map<NamespaceId, PageId> PageStorage::restore()
void PageStorage::restore()
{
LOG_FMT_INFO(log, "{} begin to restore data from disk. [path={}] [num_writers={}]", storage_name, delegator->defaultPath(), write_files.size());

Expand Down Expand Up @@ -353,9 +353,12 @@ std::map<NamespaceId, PageId> PageStorage::restore()
auto snapshot = getConcreteSnapshot();
size_t num_pages = snapshot->version()->numPages();
LOG_FMT_INFO(log, "{} restore {} pages, write batch sequence: {}, {}", storage_name, num_pages, write_batch_seq, statistics.toString());
}

// Fixed namespace id 0
return {{0, snapshot->version()->maxId()}};
PageId PageStorage::getMaxId(NamespaceId /*ns_id*/)
{
std::lock_guard write_lock(write_mutex);
return versioned_page_entries.getSnapshot("")->version()->maxId();
}

PageId PageStorage::getNormalPageIdImpl(NamespaceId /*ns_id*/, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist)
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Page/V2/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ class PageStorage : public DB::PageStorage
const FileProviderPtr & file_provider_);
~PageStorage() = default;

std::map<NamespaceId, PageId> restore() override;
void restore() override;

void drop() override;

PageId getMaxId(NamespaceId ns_id) override;

PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) override;

DB::PageStorage::SnapshotPtr getSnapshot(const String & tracing_id) override;
Expand Down
28 changes: 0 additions & 28 deletions dbms/src/Storages/Page/V2/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,6 @@ class PageStorage_test : public DB::base::TiFlashStorageTestBasic
return storage;
}

std::pair<std::shared_ptr<PageStorage>, std::map<NamespaceId, PageId>> reopen()
{
auto delegator = path_pool->getPSDiskDelegatorSingle("log");
auto storage = std::make_shared<PageStorage>("test.t", delegator, config, file_provider);
auto max_ids = storage->restore();
return {storage, max_ids};
}


protected:
PageStorage::Config config;
std::shared_ptr<PageStorage> storage;
Expand Down Expand Up @@ -736,25 +727,6 @@ try
}
CATCH

TEST_F(PageStorage_test, getMaxIdsFromRestore)
try
{
{
WriteBatch batch;
batch.putExternal(1, 0);
batch.putExternal(2, 0);
batch.delPage(1);
batch.delPage(2);
storage->write(std::move(batch));
}

storage = nullptr;
auto [page_storage, max_ids] = reopen();
ASSERT_EQ(max_ids.size(), 1);
ASSERT_EQ(max_ids[0], 2);
}
CATCH

TEST_F(PageStorage_test, IgnoreIncompleteWriteBatch1)
try
{
Expand Down
Loading