Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the bug for restoring data with duplicated page id #4889

Merged
merged 16 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ extern const int DEADLOCK_AVOIDED = 10013;
extern const int PTHREAD_ERROR = 10014;
extern const int PS_ENTRY_NOT_EXISTS = 10015;
extern const int PS_ENTRY_NO_VALID_VERSION = 10016;
extern const int PS_DIR_APPLY_INVALID_STATUS = 10017;
} // namespace ErrorCodes

} // namespace DB
5 changes: 3 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1650,8 +1650,9 @@ bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool)
auto lock = getLock();
if (shared->global_storage_pool)
{
// GlobalStoragePool may be initialized many times in some test cases for restore.
LOG_WARNING(shared->log, "GlobalStoragePool has already been initialized.");
// Can't init GlobalStoragePool twice.
// otherwise the pagestorage instances in `StoragePool` for each table won't be updated and cause unexpected problem.
throw Exception("GlobalStoragePool has already been initialized.", ErrorCodes::LOGICAL_ERROR);
}
CurrentMetrics::set(CurrentMetrics::GlobalStorageRunMode, static_cast<UInt8>(shared->storage_run_mode));
if (shared->storage_run_mode == PageStorageRunMode::MIX_MODE || shared->storage_run_mode == PageStorageRunMode::ONLY_V3)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
try
{
page_storage_run_mode = storage_pool->restore(); // restore from disk
if (!storage_pool->maxMetaPageId())
if (const auto first_segment_entry = storage_pool->metaReader()->getPageEntry(DELTA_MERGE_FIRST_SEGMENT_ID);
!first_segment_entry.isValid())
{
// Create the first segment.
auto segment_id = storage_pool->newMetaPageId();
Expand Down
89 changes: 37 additions & 52 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,26 +82,21 @@ PageStorage::Config extractConfig(const Settings & settings, StorageType subtype
}

GlobalStoragePool::GlobalStoragePool(const PathPool & path_pool, Context & global_ctx, const Settings & settings)
: // The iops and bandwidth in log_storage are relatively high, use multi-disks if possible
log_storage(PageStorage::create("__global__.log",
path_pool.getPSDiskDelegatorGlobalMulti("log"),
extractConfig(settings, StorageType::Log),
global_ctx.getFileProvider(),
true))
,
// The iops in data_storage is low, only use the first disk for storing data
data_storage(PageStorage::create("__global__.data",
path_pool.getPSDiskDelegatorGlobalSingle("data"),
extractConfig(settings, StorageType::Data),
global_ctx.getFileProvider(),
true))
,
// The iops in meta_storage is relatively high, use multi-disks if possible
meta_storage(PageStorage::create("__global__.meta",
path_pool.getPSDiskDelegatorGlobalMulti("meta"),
extractConfig(settings, StorageType::Meta),
global_ctx.getFileProvider(),
true))
: log_storage(PageStorage::create("__global__.log",
path_pool.getPSDiskDelegatorGlobalMulti("log"),
extractConfig(settings, StorageType::Log),
global_ctx.getFileProvider(),
true))
, data_storage(PageStorage::create("__global__.data",
path_pool.getPSDiskDelegatorGlobalMulti("data"),
extractConfig(settings, StorageType::Data),
global_ctx.getFileProvider(),
true))
, meta_storage(PageStorage::create("__global__.meta",
path_pool.getPSDiskDelegatorGlobalMulti("meta"),
extractConfig(settings, StorageType::Meta),
global_ctx.getFileProvider(),
true))
, global_context(global_ctx)
{
}
Expand All @@ -118,9 +113,9 @@ GlobalStoragePool::~GlobalStoragePool()

void GlobalStoragePool::restore()
{
log_max_ids = log_storage->restore();
data_max_ids = data_storage->restore();
meta_max_ids = meta_storage->restore();
log_storage->restore();
data_storage->restore();
meta_storage->restore();

gc_handle = global_context.getBackgroundPool().addTask(
[this] {
Expand Down Expand Up @@ -181,7 +176,7 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo
extractConfig(global_context.getSettingsRef(), StorageType::Log),
global_context.getFileProvider());
data_storage_v2 = PageStorage::create(name + ".data",
storage_path_pool.getPSDiskDelegatorSingle("data"),
storage_path_pool.getPSDiskDelegatorSingle("data"), // keep for behavior not changed
extractConfig(global_context.getSettingsRef(), StorageType::Data),
global_ctx.getFileProvider());
meta_storage_v2 = PageStorage::create(name + ".meta",
Expand Down Expand Up @@ -295,41 +290,35 @@ void StoragePool::forceTransformMetaV2toV3()

PageStorageRunMode StoragePool::restore()
{
const auto & global_storage_pool = global_context.getGlobalStoragePool();

switch (run_mode)
{
case PageStorageRunMode::ONLY_V2:
{
auto log_max_ids = log_storage_v2->restore();
auto data_max_ids = data_storage_v2->restore();
auto meta_max_ids = meta_storage_v2->restore();
log_storage_v2->restore();
data_storage_v2->restore();
meta_storage_v2->restore();

assert(log_max_ids.size() == 1);
assert(data_max_ids.size() == 1);
assert(meta_max_ids.size() == 1);

max_log_page_id = log_max_ids[0];
max_data_page_id = data_max_ids[0];
max_meta_page_id = meta_max_ids[0];
max_log_page_id = log_storage_v2->getMaxId(ns_id);
max_data_page_id = data_storage_v2->getMaxId(ns_id);
max_meta_page_id = meta_storage_v2->getMaxId(ns_id);

storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV2Only};
break;
}
case PageStorageRunMode::ONLY_V3:
{
max_log_page_id = global_storage_pool->getLogMaxId(ns_id);
max_data_page_id = global_storage_pool->getDataMaxId(ns_id);
max_meta_page_id = global_storage_pool->getMetaMaxId(ns_id);
max_log_page_id = log_storage_v3->getMaxId(ns_id);
max_data_page_id = data_storage_v3->getMaxId(ns_id);
max_meta_page_id = meta_storage_v3->getMaxId(ns_id);

storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only};
break;
}
case PageStorageRunMode::MIX_MODE:
{
auto v2_log_max_ids = log_storage_v2->restore();
auto v2_data_max_ids = data_storage_v2->restore();
auto v2_meta_max_ids = meta_storage_v2->restore();
log_storage_v2->restore();
data_storage_v2->restore();
meta_storage_v2->restore();

// The pages on data and log can be rewritten to V3 and the old pages on V2 are deleted by `delta merge`.
// However, the pages on meta V2 can not be deleted. As the pages in meta are small, we perform a forceTransformMetaV2toV3 to convert pages before all.
Expand All @@ -349,10 +338,6 @@ PageStorageRunMode StoragePool::restore()
LOG_FMT_INFO(logger, "Current meta translate already done before restored.[ns_id={}] ", ns_id);
}

assert(v2_log_max_ids.size() == 1);
assert(v2_data_max_ids.size() == 1);
assert(v2_meta_max_ids.size() == 1);

// Check number of valid pages in v2
// If V2 already have no any data in disk, Then change run_mode to ONLY_V3
if (log_storage_v2->getNumberOfPages() == 0 && data_storage_v2->getNumberOfPages() == 0 && meta_storage_v2->getNumberOfPages() == 0)
Expand All @@ -375,18 +360,18 @@ PageStorageRunMode StoragePool::restore()
data_storage_writer = std::make_shared<PageWriter>(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, data_storage_v3);
meta_storage_writer = std::make_shared<PageWriter>(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, meta_storage_v3);

max_log_page_id = global_storage_pool->getLogMaxId(ns_id);
max_data_page_id = global_storage_pool->getDataMaxId(ns_id);
max_meta_page_id = global_storage_pool->getMetaMaxId(ns_id);
max_log_page_id = log_storage_v3->getMaxId(ns_id);
max_data_page_id = data_storage_v3->getMaxId(ns_id);
max_meta_page_id = meta_storage_v3->getMaxId(ns_id);

run_mode = PageStorageRunMode::ONLY_V3;
storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only};
}
else // Still running Mix Mode
{
max_log_page_id = std::max(v2_log_max_ids[0], global_storage_pool->getLogMaxId(ns_id));
max_data_page_id = std::max(v2_data_max_ids[0], global_storage_pool->getDataMaxId(ns_id));
max_meta_page_id = std::max(v2_meta_max_ids[0], global_storage_pool->getMetaMaxId(ns_id));
max_log_page_id = std::max(log_storage_v2->getMaxId(ns_id), log_storage_v3->getMaxId(ns_id));
max_data_page_id = std::max(data_storage_v2->getMaxId(ns_id), data_storage_v3->getMaxId(ns_id));
max_meta_page_id = std::max(meta_storage_v2->getMaxId(ns_id), meta_storage_v3->getMaxId(ns_id));
storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolMixMode};
}
break;
Expand Down
37 changes: 0 additions & 37 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,39 +51,6 @@ class GlobalStoragePool : private boost::noncopyable

friend class StoragePool;

PageId getLogMaxId(NamespaceId ns_id) const
{
PageId max_log_page_id = 0;
if (const auto & it = log_max_ids.find(ns_id); it != log_max_ids.end())
{
max_log_page_id = it->second;
}

return max_log_page_id;
}

PageId getDataMaxId(NamespaceId ns_id) const
{
PageId max_data_page_id = 0;
if (const auto & it = data_max_ids.find(ns_id); it != data_max_ids.end())
{
max_data_page_id = it->second;
}

return max_data_page_id;
}

PageId getMetaMaxId(NamespaceId ns_id) const
{
PageId max_meta_page_id = 0;
if (const auto & it = meta_max_ids.find(ns_id); it != meta_max_ids.end())
{
max_meta_page_id = it->second;
}

return max_meta_page_id;
}

// GC immediately
// Only used on dbgFuncMisc
bool gc();
Expand All @@ -96,10 +63,6 @@ class GlobalStoragePool : private boost::noncopyable
PageStoragePtr data_storage;
PageStoragePtr meta_storage;

std::map<NamespaceId, PageId> log_max_ids;
std::map<NamespaceId, PageId> data_max_ids;
std::map<NamespaceId, PageId> meta_max_ids;

std::atomic<Timepoint> last_try_gc_time = Clock::now();

Context & global_context;
Expand Down
10 changes: 3 additions & 7 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,12 @@ class PageStorage : private boost::noncopyable

virtual ~PageStorage() = default;

// Return the map[ns_id, max_page_id]
// The caller should ensure that it only allocate new id that is larger than `max_page_id`. Reusing the
// same ID for different kind of write (put/ref/put_external) would make PageStorage run into unexpected error.
//
// Note that for V2, we always return a map with only one element: <ns_id=0, max_id> cause V2 have no
// idea about ns_id.
virtual std::map<NamespaceId, PageId> restore() = 0;
virtual void restore() = 0;

virtual void drop() = 0;

virtual PageId getMaxId(NamespaceId ns_id) = 0;

virtual SnapshotPtr getSnapshot(const String & tracing_id) = 0;

// Get some statistics of all living snapshots and the oldest living snapshot.
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/Page/V2/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ toConcreteSnapshot(const DB::PageStorage::SnapshotPtr & ptr)
return assert_cast<PageStorage::ConcreteSnapshotRawPtr>(ptr.get());
}

std::map<NamespaceId, PageId> PageStorage::restore()
void PageStorage::restore()
{
LOG_FMT_INFO(log, "{} begin to restore data from disk. [path={}] [num_writers={}]", storage_name, delegator->defaultPath(), write_files.size());

Expand Down Expand Up @@ -353,9 +353,12 @@ std::map<NamespaceId, PageId> PageStorage::restore()
auto snapshot = getConcreteSnapshot();
size_t num_pages = snapshot->version()->numPages();
LOG_FMT_INFO(log, "{} restore {} pages, write batch sequence: {}, {}", storage_name, num_pages, write_batch_seq, statistics.toString());
}

// Fixed namespace id 0
return {{0, snapshot->version()->maxId()}};
PageId PageStorage::getMaxId(NamespaceId /*ns_id*/)
{
std::lock_guard write_lock(write_mutex);
return versioned_page_entries.getSnapshot("")->version()->maxId();
}

PageId PageStorage::getNormalPageIdImpl(NamespaceId /*ns_id*/, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist)
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Page/V2/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ class PageStorage : public DB::PageStorage
const FileProviderPtr & file_provider_);
~PageStorage() = default;

std::map<NamespaceId, PageId> restore() override;
void restore() override;

void drop() override;

PageId getMaxId(NamespaceId ns_id) override;

PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) override;

DB::PageStorage::SnapshotPtr getSnapshot(const String & tracing_id) override;
Expand Down
28 changes: 0 additions & 28 deletions dbms/src/Storages/Page/V2/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,6 @@ class PageStorage_test : public DB::base::TiFlashStorageTestBasic
return storage;
}

std::pair<std::shared_ptr<PageStorage>, std::map<NamespaceId, PageId>> reopen()
{
auto delegator = path_pool->getPSDiskDelegatorSingle("log");
auto storage = std::make_shared<PageStorage>("test.t", delegator, config, file_provider);
auto max_ids = storage->restore();
return {storage, max_ids};
}


protected:
PageStorage::Config config;
std::shared_ptr<PageStorage> storage;
Expand Down Expand Up @@ -736,25 +727,6 @@ try
}
CATCH

TEST_F(PageStorage_test, getMaxIdsFromRestore)
try
{
{
WriteBatch batch;
batch.putExternal(1, 0);
batch.putExternal(2, 0);
batch.delPage(1);
batch.delPage(2);
storage->write(std::move(batch));
}

storage = nullptr;
auto [page_storage, max_ids] = reopen();
ASSERT_EQ(max_ids.size(), 1);
ASSERT_EQ(max_ids[0], 2);
}
CATCH

TEST_F(PageStorage_test, IgnoreIncompleteWriteBatch1)
try
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/BlobStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ extern const int LOGICAL_ERROR;

namespace PS::V3
{
using PageIdAndVersionedEntries = std::vector<std::tuple<PageIdV3Internal, PageVersionType, PageEntryV3>>;
using PageIdAndVersionedEntries = std::vector<std::tuple<PageIdV3Internal, PageVersion, PageEntryV3>>;

class BlobStore : private Allocator<false>
{
Expand Down
Loading