-
Notifications
You must be signed in to change notification settings - Fork 409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refine the GC interface by PageEntriesEdit
#3879
Changes from all commits
eae7b90
ca65a7b
7382701
679a0ed
fd4bbbb
4057c79
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,10 @@ extern const int PS_ENTRY_NO_VALID_VERSION; | |
} // namespace ErrorCodes | ||
namespace PS::V3 | ||
{ | ||
/******************************** | ||
* VersionedPageEntries methods * | ||
********************************/ | ||
|
||
std::optional<PageEntryV3> VersionedPageEntries::getEntry(UInt64 seq) const | ||
{ | ||
auto page_lock = acquireLock(); | ||
|
@@ -32,7 +36,6 @@ std::optional<PageEntryV3> VersionedPageEntries::getEntry(UInt64 seq) const | |
return std::nullopt; | ||
} | ||
|
||
|
||
std::pair<VersionedEntries, PageSize> VersionedPageEntries::getEntriesByBlobId(BlobFileId blob_id) | ||
{ | ||
PageSize single_page_size = 0; | ||
|
@@ -103,6 +106,10 @@ std::pair<PageEntriesV3, bool> VersionedPageEntries::deleteAndGC(UInt64 lowest_s | |
return std::make_pair(std::move(del_entries), entries.empty() || (entries.size() == 1 && entries.begin()->second.is_delete)); | ||
} | ||
|
||
/************************** | ||
* PageDirectory methods * | ||
*************************/ | ||
|
||
PageDirectory::PageDirectory() | ||
: sequence(0) | ||
, log(getLogWithPrefix(nullptr, "PageDirectory")) | ||
|
@@ -117,7 +124,10 @@ void PageDirectory::restore() | |
PageDirectorySnapshotPtr PageDirectory::createSnapshot() const | ||
{ | ||
auto snap = std::make_shared<PageDirectorySnapshot>(sequence.load()); | ||
snapshots.emplace_back(std::weak_ptr<PageDirectorySnapshot>(snap)); | ||
{ | ||
std::lock_guard snapshots_lock(snapshots_mutex); | ||
snapshots.emplace_back(std::weak_ptr<PageDirectorySnapshot>(snap)); | ||
} | ||
return snap; | ||
} | ||
|
||
|
@@ -179,10 +189,8 @@ void PageDirectory::apply(PageEntriesEdit && edit) | |
std::unique_lock write_lock(table_rw_mutex); // TODO: It is totally serialized, make it a pipeline | ||
UInt64 last_sequence = sequence.load(); | ||
|
||
auto snap = createSnapshot(); | ||
|
||
// stage 1, persisted the changes to WAL | ||
// wal.apply(edit); | ||
// stage 1, persisted the changes to WAL with version [seq=last_seq + 1, epoch=0] | ||
// wal->apply(edit, PageVersionType(last_sequence + 1, 0)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so we do have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to set a proper version for edits written from ... While in |
||
|
||
// stage 2, create entry version list for pageId. nothing need to be rollback | ||
std::unordered_map<PageId, std::pair<PageLock, int>> updating_locks; | ||
|
@@ -269,23 +277,28 @@ void PageDirectory::apply(PageEntriesEdit && edit) | |
sequence.fetch_add(1); | ||
} | ||
|
||
|
||
void PageDirectory::gcApply(const PageIdAndVersionedEntryList & migrated_entries) | ||
void PageDirectory::gcApply(PageEntriesEdit && migrated_edit) | ||
{ | ||
for (const auto & [page_id, version, entry] : migrated_entries) | ||
std::shared_lock read_lock(table_rw_mutex); | ||
for (auto & record : migrated_edit.getMutRecords()) | ||
{ | ||
auto iter = mvcc_table_directory.find(page_id); | ||
if (iter == mvcc_table_directory.end()) | ||
auto iter = mvcc_table_directory.find(record.page_id); | ||
if (unlikely(iter == mvcc_table_directory.end())) | ||
{ | ||
throw Exception(fmt::format("Can't found [pageid={}] while doing gcApply", page_id), ErrorCodes::LOGICAL_ERROR); | ||
throw Exception(fmt::format("Can't found [pageid={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR); | ||
} | ||
|
||
auto versioned_page = iter->second; | ||
iter->second->acquireLock(); | ||
iter->second->createNewVersion(version.sequence, version.epoch + 1, entry); | ||
// Increase the epoch for migrated record. | ||
record.version.epoch += 1; | ||
|
||
// TBD: wal apply | ||
// Append the gc version to version list | ||
auto & versioned_entries = iter->second; | ||
auto page_lock = versioned_entries->acquireLock(); | ||
versioned_entries->createNewVersion(record.version.sequence, record.version.epoch, record.entry); | ||
} | ||
|
||
// Apply migrate edit into WAL with the increased epoch version | ||
// wal->apply(migrated_edit); | ||
} | ||
|
||
std::pair<std::map<BlobFileId, PageIdAndVersionedEntries>, PageSize> | ||
|
@@ -330,17 +343,21 @@ std::vector<PageEntriesV3> PageDirectory::gc() | |
UInt64 lowest_seq = sequence.load(); | ||
std::vector<PageEntriesV3> all_del_entries; | ||
|
||
// Cleanup released snapshots | ||
for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */) | ||
{ | ||
if (iter->expired()) | ||
iter = snapshots.erase(iter); | ||
else | ||
// Cleanup released snapshots | ||
std::lock_guard lock(snapshots_mutex); | ||
for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */) | ||
{ | ||
lowest_seq = std::min(lowest_seq, iter->lock()->sequence); | ||
++iter; | ||
if (iter->expired()) | ||
iter = snapshots.erase(iter); | ||
else | ||
{ | ||
lowest_seq = std::min(lowest_seq, iter->lock()->sequence); | ||
++iter; | ||
} | ||
} | ||
} | ||
|
||
{ | ||
std::unique_lock write_lock(table_rw_mutex); | ||
for (auto iter = mvcc_table_directory.begin(); iter != mvcc_table_directory.end(); /*empty*/) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why removed
UPSERT
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After digging into current gc routine implementation, we only use
PageEntriesEdit:: upsertPage
inBlobStore::gc
(with version set in the same time).WriteBatch::upsert
is not a valid input.https://github.com/pingcap/tics/blob/ded8a062b9f777ccfd70087f6341feac725bcb08/dbms/src/Storages/Page/V3/BlobStore.cpp#L406