Skip to content

Commit

Permalink
Add a force transfrom meta from v2 to v3 (#4873)
Browse files Browse the repository at this point in the history
ref #3594
  • Loading branch information
jiaqizho authored May 13, 2022
1 parent c0d2d56 commit 9e46e67
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 11 deletions.
64 changes: 64 additions & 0 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,52 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo
}
}

void StoragePool::forceTransformMetaV2toV3()
{
assert(meta_storage_v2 != nullptr);
assert(meta_storage_v3 != nullptr);
auto meta_transform_storage_writer = std::make_shared<PageWriter>(run_mode, meta_storage_v2, meta_storage_v3);
auto meta_transform_storage_reader = std::make_shared<PageReader>(run_mode, ns_id, meta_storage_v2, meta_storage_v3, nullptr);

Pages pages_transform = {};
auto meta_transform_acceptor = [&](const DB::Page & page) {
pages_transform.emplace_back(page);
};

meta_transform_storage_reader->traverse(meta_transform_acceptor, /*only_v2*/ true, /*only_v3*/ false);

WriteBatch write_batch_transform{ns_id};
WriteBatch write_batch_del_v2{ns_id};

for (const auto & page_transform : pages_transform)
{
// Check pages have not contain field offset
// Also get the tag of page_id
const auto & page_transform_entry = meta_transform_storage_reader->getPageEntry(page_transform.page_id);
if (!page_transform_entry.field_offsets.empty())
{
throw Exception(fmt::format("Can't transfrom meta from V2 to V3, [page_id={}] {}", //
page_transform.page_id,
page_transform_entry.toDebugString()),
ErrorCodes::LOGICAL_ERROR);
}

write_batch_transform.putPage(page_transform.page_id, //
page_transform_entry.tag,
std::make_shared<ReadBufferFromMemory>(page_transform.data.begin(),
page_transform.data.size()),
page_transform.data.size());
// Record del for V2
write_batch_del_v2.delPage(page_transform.page_id);
}

// Will rewrite into V3.
meta_transform_storage_writer->write(std::move(write_batch_transform), nullptr);

// DEL must call after rewrite.
meta_transform_storage_writer->writeIntoV2(std::move(write_batch_del_v2), nullptr);
}

PageStorageRunMode StoragePool::restore()
{
const auto & global_storage_pool = global_context.getGlobalStoragePool();
Expand Down Expand Up @@ -273,6 +319,24 @@ PageStorageRunMode StoragePool::restore()
auto v2_data_max_ids = data_storage_v2->restore();
auto v2_meta_max_ids = meta_storage_v2->restore();

// The pages on data and log can be rewritten to V3 and the old pages on V2 are deleted by `delta merge`.
// However, the pages on meta V2 can not be deleted. As the pages in meta are small, we perform a forceTransformMetaV2toV3 to convert pages before all.
if (const auto & meta_remain_pages = meta_storage_v2->getNumberOfPages(); meta_remain_pages != 0)
{
LOG_FMT_INFO(logger, "Current meta transform to V3 begin, [ns_id={}] [pages_before_transform={}]", ns_id, meta_remain_pages);
forceTransformMetaV2toV3();
const auto & meta_remain_pages_after_transform = meta_storage_v2->getNumberOfPages();
LOG_FMT_INFO(logger, "Current meta transform to V3 finished. [ns_id={}] [done={}] [pages_before_transform={}], [pages_after_transform={}]", //
ns_id,
meta_remain_pages_after_transform == 0,
meta_remain_pages,
meta_remain_pages_after_transform);
}
else
{
LOG_FMT_INFO(logger, "Current meta translate already done before restored.[ns_id={}] ", ns_id);
}

assert(v2_log_max_ids.size() == 1);
assert(v2_data_max_ids.size() == 1);
assert(v2_meta_max_ids.size() == 1);
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ class StoragePool : private boost::noncopyable
private:
#endif
bool doV2Gc(const Settings & settings);

void forceTransformMetaV2toV3();

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Storages/Page/Page.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ struct PageEntry
return std::make_pair(file_id, level);
}

String toDebugString() const
{
return fmt::format("PageEntry{{file: {}, offset: 0x{:X}, size: {}, checksum: 0x{:X}, tag: {}, field_offsets_size: {}}}",
file_id,
offset,
size,
checksum,
tag,
field_offsets.size());
}

size_t getFieldSize(size_t index) const
{
if (unlikely(index >= field_offsets.size()))
Expand Down
33 changes: 25 additions & 8 deletions dbms/src/Storages/Page/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class PageReaderImpl : private boost::noncopyable
// Get some statistics of all living snapshots and the oldest living snapshot.
virtual SnapshotsStatistics getSnapshotsStat() const = 0;

virtual void traverse(const std::function<void(const DB::Page & page)> & acceptor) const = 0;
virtual void traverse(const std::function<void(const DB::Page & page)> & acceptor, bool only_v2, bool only_v3) const = 0;
};


Expand Down Expand Up @@ -132,7 +132,7 @@ class PageReaderImplNormal : public PageReaderImpl
return storage->getSnapshotsStat();
}

void traverse(const std::function<void(const DB::Page & page)> & acceptor) const override
void traverse(const std::function<void(const DB::Page & page)> & acceptor, bool /*only_v2*/, bool /*only_v3*/) const override
{
storage->traverse(acceptor, nullptr);
}
Expand Down Expand Up @@ -294,12 +294,30 @@ class PageReaderImplMixed : public PageReaderImpl
return statistics_total;
}

void traverse(const std::function<void(const DB::Page & page)> & acceptor) const override
void traverse(const std::function<void(const DB::Page & page)> & acceptor, bool only_v2, bool only_v3) const override
{
// Used by RegionPersister::restore
// Must traverse storage_v3 before storage_v2
storage_v3->traverse(acceptor, toConcreteV3Snapshot());
storage_v2->traverse(acceptor, toConcreteV2Snapshot());
if (only_v3 && only_v2)
{
throw Exception("Can't enable both only_v2 and only_v3", ErrorCodes::LOGICAL_ERROR);
}

if (only_v3)
{
storage_v3->traverse(acceptor, toConcreteV3Snapshot());
}
else if (only_v2)
{
storage_v2->traverse(acceptor, toConcreteV2Snapshot());
}
else
{
// Used by RegionPersister::restore
// Must traverse storage_v3 before storage_v2
storage_v3->traverse(acceptor, toConcreteV3Snapshot());
storage_v2->traverse(acceptor, toConcreteV2Snapshot());
}
}

private:
Expand Down Expand Up @@ -406,9 +424,9 @@ SnapshotsStatistics PageReader::getSnapshotsStat() const
return impl->getSnapshotsStat();
}

void PageReader::traverse(const std::function<void(const DB::Page & page)> & acceptor) const
void PageReader::traverse(const std::function<void(const DB::Page & page)> & acceptor, bool only_v2, bool only_v3) const
{
impl->traverse(acceptor);
impl->traverse(acceptor, only_v2, only_v3);
}

/**********************
Expand Down Expand Up @@ -437,7 +455,6 @@ void PageWriter::write(WriteBatch && write_batch, WriteLimiterPtr write_limiter)
}
}


void PageWriter::writeIntoV2(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const
{
storage_v2->write(std::move(write_batch), write_limiter);
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ class PageReader : private boost::noncopyable
// Get some statistics of all living snapshots and the oldest living snapshot.
SnapshotsStatistics getSnapshotsStat() const;

void traverse(const std::function<void(const DB::Page & page)> & acceptor) const;
void traverse(const std::function<void(const DB::Page & page)> & acceptor, bool only_v2 = false, bool only_v3 = false) const;

private:
std::unique_ptr<PageReaderImpl> impl;
Expand All @@ -394,9 +394,10 @@ class PageWriter : private boost::noncopyable

friend class RegionPersister;

private:
// Only used for META and KVStore write del.
void writeIntoV2(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const;

private:
void writeIntoV3(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const;

void writeIntoMixMode(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/PageEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ inline PageIdV3Internal buildV3Id(NamespaceId n_id, PageId p_id)

inline String toDebugString(const PageEntryV3 & entry)
{
return fmt::format("PageEntry{{file: {}, offset: 0x{:X}, size: {}, checksum: 0x{:X}, tag: {}, field_offsets_size: {}}}",
return fmt::format("PageEntryV3{{file: {}, offset: 0x{:X}, size: {}, checksum: 0x{:X}, tag: {}, field_offsets_size: {}}}",
entry.file_id,
entry.offset,
entry.size,
Expand Down

0 comments on commit 9e46e67

Please sign in to comment.